Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/operations/service.py: 0.00%

98 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17OperationsService 

18================= 

19 

20""" 

21 

22import logging 

23 

24import grpc 

25 

26from google.protobuf.empty_pb2 import Empty 

27 

28from buildgrid._exceptions import InvalidArgumentError, RetriableError 

29from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2 

30from buildgrid.server._authentication import AuthContext, authorize 

31from buildgrid.server.metrics_names import ( 

32 OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME, 

33 OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME, 

34 OPERATIONS_GET_OPERATION_TIME_METRIC_NAME, 

35 OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME 

36) 

37from buildgrid.server.metrics_utils import DurationMetric 

38 

39 

40class OperationsService(operations_pb2_grpc.OperationsServicer): 

41 

42 def __init__(self, server): 

43 self._logger = logging.getLogger(__name__) 

44 

45 self._instances = {} 

46 

47 operations_pb2_grpc.add_OperationsServicer_to_server(self, server) 

48 

49 # --- Public API --- 

50 

51 def add_instance(self, instance_name, instance): 

52 """Registers a new servicer instance. 

53 

54 Args: 

55 instance_name (str): The new instance's name. 

56 instance (OperationsInstance): The new instance itself. 

57 """ 

58 self._instances[instance_name] = instance 

59 

60 # --- Public API: Servicer --- 

61 

62 @authorize(AuthContext) 

63 @DurationMetric(OPERATIONS_GET_OPERATION_TIME_METRIC_NAME) 

64 def GetOperation(self, request, context): 

65 self._logger.info(f"GetOperation request from [{context.peer()}]") 

66 

67 try: 

68 name = request.name 

69 

70 instance_name = self._parse_instance_name(name) 

71 instance = self._get_instance(instance_name) 

72 

73 operation_name = self._parse_operation_name(name) 

74 operation, _ = instance.get_operation(operation_name) 

75 op = operations_pb2.Operation() 

76 op.CopyFrom(operation) 

77 op.name = name 

78 

79 # TODO: Convert the metadata returned by `get_operation()` 

80 # into a `RequestMetadata` message and attach it to gRPC 

81 # trailing metadata as an entry of type: 

82 # ('requestmetadata-bin', RequestMetadata). 

83 

84 return op 

85 

86 except InvalidArgumentError as e: 

87 self._logger.info(e) 

88 context.set_details(str(e)) 

89 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

90 

91 except RetriableError as e: 

92 context.abort_with_status(e.error_status) 

93 

94 except Exception as e: 

95 self._logger.exception( 

96 f"Unexpected error in GetOperation; request=[{request}]" 

97 ) 

98 context.set_details(str(e)) 

99 context.set_code(grpc.StatusCode.INTERNAL) 

100 

101 return operations_pb2.Operation() 

102 

103 @authorize(AuthContext) 

104 @DurationMetric(OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME) 

105 def ListOperations(self, request, context): 

106 self._logger.info(f"ListOperations request from [{context.peer()}]") 

107 

108 try: 

109 # The request name should be the collection name 

110 # In our case, this is just the instance_name 

111 instance_name = request.name 

112 instance = self._get_instance(instance_name) 

113 

114 result = instance.list_operations(request.filter, 

115 request.page_size, 

116 request.page_token) 

117 

118 for operation in result.operations: 

119 operation.name = f"{instance_name}/{operation.name}" 

120 

121 return result 

122 

123 except InvalidArgumentError as e: 

124 # This is a client error. Don't log at error on the server side 

125 self._logger.info(e) 

126 context.set_details(str(e)) 

127 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

128 

129 except RetriableError as e: 

130 context.abort_with_status(e.error_status) 

131 

132 except Exception as e: 

133 self._logger.exception( 

134 f"Unexpected error in ListOperations; request=[{request}]" 

135 ) 

136 context.set_details(str(e)) 

137 context.set_code(grpc.StatusCode.INTERNAL) 

138 

139 return operations_pb2.ListOperationsResponse() 

140 

141 @authorize(AuthContext) 

142 @DurationMetric(OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME) 

143 def DeleteOperation(self, request, context): 

144 self._logger.info(f"DeleteOperation request from [{context.peer()}]") 

145 

146 context.set_details("BuildGrid does not support DeleteOperation.") 

147 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

148 return Empty() 

149 

150 @authorize(AuthContext) 

151 @DurationMetric(OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME) 

152 def CancelOperation(self, request, context): 

153 self._logger.info(f"CancelOperation request from [{context.peer()}]") 

154 

155 try: 

156 name = request.name 

157 

158 instance_name = self._parse_instance_name(name) 

159 instance = self._get_instance(instance_name) 

160 

161 operation_name = self._parse_operation_name(name) 

162 instance.cancel_operation(operation_name) 

163 

164 except InvalidArgumentError as e: 

165 self._logger.info(e) 

166 context.set_details(str(e)) 

167 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

168 

169 except RetriableError as e: 

170 context.abort_with_status(e.error_status) 

171 

172 except Exception as e: 

173 self._logger.exception( 

174 f"Unexpected error in CancelOperation; request=[{request}]" 

175 ) 

176 context.set_details(str(e)) 

177 context.set_code(grpc.StatusCode.INTERNAL) 

178 

179 return Empty() 

180 

181 # --- Private API --- 

182 

183 @staticmethod 

184 def _parse_instance_name(name): 

185 """ If the instance name is not blank, 'name' will have the form 

186 {instance_name}/{operation_uuid}. Otherwise, it will just be 

187 {operation_uuid} """ 

188 names = name.split('/') 

189 return '/'.join(names[:-1]) if len(names) > 1 else '' 

190 

191 @staticmethod 

192 def _parse_operation_name(name): 

193 names = name.split('/') 

194 return names[-1] if len(names) > 1 else name 

195 

196 def _get_instance(self, name): 

197 try: 

198 return self._instances[name] 

199 

200 except KeyError: 

201 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]")