Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/operations/service.py: 0.00%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

95 statements  

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17OperationsService 

18================= 

19 

20""" 

21 

22import logging 

23 

24import grpc 

25 

26from google.protobuf.empty_pb2 import Empty 

27 

28from buildgrid._exceptions import InvalidArgumentError, RetriableError 

29from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2 

30from buildgrid.server._authentication import AuthContext, authorize 

31from buildgrid.server.metrics_names import ( 

32 OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME, 

33 OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME, 

34 OPERATIONS_GET_OPERATION_TIME_METRIC_NAME, 

35 OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME 

36) 

37from buildgrid.server.metrics_utils import DurationMetric 

38 

39 

40class OperationsService(operations_pb2_grpc.OperationsServicer): 

41 

42 def __init__(self, server): 

43 self._logger = logging.getLogger(__name__) 

44 

45 self._instances = {} 

46 

47 operations_pb2_grpc.add_OperationsServicer_to_server(self, server) 

48 

49 # --- Public API --- 

50 

51 def add_instance(self, instance_name, instance): 

52 """Registers a new servicer instance. 

53 

54 Args: 

55 instance_name (str): The new instance's name. 

56 instance (OperationsInstance): The new instance itself. 

57 """ 

58 self._instances[instance_name] = instance 

59 

60 # --- Public API: Servicer --- 

61 

62 @authorize(AuthContext) 

63 @DurationMetric(OPERATIONS_GET_OPERATION_TIME_METRIC_NAME) 

64 def GetOperation(self, request, context): 

65 self._logger.info(f"GetOperation request from [{context.peer()}]") 

66 

67 try: 

68 name = request.name 

69 

70 instance_name = self._parse_instance_name(name) 

71 instance = self._get_instance(instance_name) 

72 

73 operation_name = self._parse_operation_name(name) 

74 operation, _ = instance.get_operation(operation_name) 

75 op = operations_pb2.Operation() 

76 op.CopyFrom(operation) 

77 op.name = name 

78 

79 # TODO: Convert the metadata returned by `get_operation()` 

80 # into a `RequestMetadata` message and attach it to gRPC 

81 # trailing metadata as an entry of type: 

82 # ('requestmetadata-bin', RequestMetadata). 

83 

84 return op 

85 

86 except InvalidArgumentError as e: 

87 self._logger.info(e) 

88 context.set_details(str(e)) 

89 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

90 

91 except RetriableError as e: 

92 context.abort_with_status(e.error_status) 

93 

94 except Exception as e: 

95 self._logger.exception( 

96 f"Unexpected error in GetOperation; request=[{request}]" 

97 ) 

98 context.set_code(grpc.StatusCode.INTERNAL) 

99 

100 return operations_pb2.Operation() 

101 

102 @authorize(AuthContext) 

103 @DurationMetric(OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME) 

104 def ListOperations(self, request, context): 

105 self._logger.info(f"ListOperations request from [{context.peer()}]") 

106 

107 try: 

108 # The request name should be the collection name 

109 # In our case, this is just the instance_name 

110 instance_name = request.name 

111 instance = self._get_instance(instance_name) 

112 

113 result = instance.list_operations(request.filter, 

114 request.page_size, 

115 request.page_token) 

116 

117 for operation in result.operations: 

118 operation.name = f"{instance_name}/{operation.name}" 

119 

120 return result 

121 

122 except InvalidArgumentError as e: 

123 # This is a client error. Don't log at error on the server side 

124 self._logger.info(e) 

125 context.set_details(str(e)) 

126 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

127 

128 except RetriableError as e: 

129 context.abort_with_status(e.error_status) 

130 

131 except Exception as e: 

132 self._logger.exception( 

133 f"Unexpected error in ListOperations; request=[{request}]" 

134 ) 

135 context.set_code(grpc.StatusCode.INTERNAL) 

136 

137 return operations_pb2.ListOperationsResponse() 

138 

139 @authorize(AuthContext) 

140 @DurationMetric(OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME) 

141 def DeleteOperation(self, request, context): 

142 self._logger.info(f"DeleteOperation request from [{context.peer()}]") 

143 

144 context.set_details("BuildGrid does not support DeleteOperation.") 

145 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

146 return Empty() 

147 

148 @authorize(AuthContext) 

149 @DurationMetric(OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME) 

150 def CancelOperation(self, request, context): 

151 self._logger.info(f"CancelOperation request from [{context.peer()}]") 

152 

153 try: 

154 name = request.name 

155 

156 instance_name = self._parse_instance_name(name) 

157 instance = self._get_instance(instance_name) 

158 

159 operation_name = self._parse_operation_name(name) 

160 instance.cancel_operation(operation_name) 

161 

162 except InvalidArgumentError as e: 

163 self._logger.info(e) 

164 context.set_details(str(e)) 

165 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

166 

167 except RetriableError as e: 

168 context.abort_with_status(e.error_status) 

169 

170 except Exception as e: 

171 self._logger.exception( 

172 f"Unexpected error in CancelOperation; request=[{request}]" 

173 ) 

174 context.set_code(grpc.StatusCode.INTERNAL) 

175 

176 return Empty() 

177 

178 # --- Private API --- 

179 

180 @staticmethod 

181 def _parse_instance_name(name): 

182 """ If the instance name is not blank, 'name' will have the form 

183 {instance_name}/{operation_uuid}. Otherwise, it will just be 

184 {operation_uuid} """ 

185 names = name.split('/') 

186 return '/'.join(names[:-1]) if len(names) > 1 else '' 

187 

188 @staticmethod 

189 def _parse_operation_name(name): 

190 names = name.split('/') 

191 return names[-1] if len(names) > 1 else name 

192 

193 def _get_instance(self, name): 

194 try: 

195 return self._instances[name] 

196 

197 except KeyError: 

198 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]")