Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/operations/service.py: 80.56%

108 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17OperationsService 

18================= 

19 

20""" 

21 

22import logging 

23 

24import grpc 

25 

26from google.protobuf.empty_pb2 import Empty 

27 

28from buildgrid._exceptions import InvalidArgumentError, RetriableError 

29from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2 

30from buildgrid.server._authentication import AuthContext, authorize 

31from buildgrid.server.metrics_names import ( 

32 OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME, 

33 OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME, 

34 OPERATIONS_GET_OPERATION_TIME_METRIC_NAME, 

35 OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME 

36) 

37from buildgrid.server.metrics_utils import DurationMetric 

38from buildgrid.server.request_metadata_utils import request_metadata_from_scheduler_dict 

39 

40 

41class OperationsService(operations_pb2_grpc.OperationsServicer): 

42 

43 def __init__(self, server): 

44 self.__logger = logging.getLogger(__name__) 

45 

46 self._instances = {} 

47 

48 operations_pb2_grpc.add_OperationsServicer_to_server(self, server) 

49 

50 # --- Public API --- 

51 

52 def add_instance(self, instance_name, instance): 

53 """Registers a new servicer instance. 

54 

55 Args: 

56 instance_name (str): The new instance's name. 

57 instance (OperationsInstance): The new instance itself. 

58 """ 

59 self._instances[instance_name] = instance 

60 

61 # --- Public API: Servicer --- 

62 

63 @authorize(AuthContext) 

64 @DurationMetric(OPERATIONS_GET_OPERATION_TIME_METRIC_NAME) 

65 def GetOperation(self, request, context): 

66 self.__logger.info(f"GetOperation request from [{context.peer()}]") 

67 

68 try: 

69 name = request.name 

70 

71 instance_name = self._parse_instance_name(name) 

72 instance = self._get_instance(instance_name) 

73 

74 operation_name = self._parse_operation_name(name) 

75 operation, metadata = instance.get_operation(operation_name) 

76 op = operations_pb2.Operation() 

77 op.CopyFrom(operation) 

78 op.name = name 

79 

80 if metadata is not None: 

81 metadata_entry = self._operation_request_metadata_entry(metadata) 

82 context.set_trailing_metadata([metadata_entry]) 

83 

84 return op 

85 

86 except InvalidArgumentError as e: 

87 self.__logger.info(e) 

88 context.set_details(str(e)) 

89 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

90 

91 except RetriableError as e: 

92 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

93 context.abort_with_status(e.error_status) 

94 

95 except Exception as e: 

96 self.__logger.exception( 

97 f"Unexpected error in GetOperation; request=[{request}]" 

98 ) 

99 context.set_details(str(e)) 

100 context.set_code(grpc.StatusCode.INTERNAL) 

101 

102 return operations_pb2.Operation() 

103 

104 @authorize(AuthContext) 

105 @DurationMetric(OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME) 

106 def ListOperations(self, request, context): 

107 self.__logger.info(f"ListOperations request from [{context.peer()}]") 

108 

109 try: 

110 # The request name should be the collection name 

111 # In our case, this is just the instance_name 

112 instance_name = request.name 

113 instance = self._get_instance(instance_name) 

114 

115 result = instance.list_operations(request.filter, 

116 request.page_size, 

117 request.page_token) 

118 

119 for operation in result.operations: 

120 operation.name = f"{instance_name}/{operation.name}" 

121 

122 return result 

123 

124 except InvalidArgumentError as e: 

125 # This is a client error. Don't log at error on the server side 

126 self.__logger.info(e) 

127 context.set_details(str(e)) 

128 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

129 

130 except RetriableError as e: 

131 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

132 context.abort_with_status(e.error_status) 

133 

134 except Exception as e: 

135 self.__logger.exception( 

136 f"Unexpected error in ListOperations; request=[{request}]" 

137 ) 

138 context.set_details(str(e)) 

139 context.set_code(grpc.StatusCode.INTERNAL) 

140 

141 return operations_pb2.ListOperationsResponse() 

142 

143 @authorize(AuthContext) 

144 @DurationMetric(OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME) 

145 def DeleteOperation(self, request, context): 

146 self.__logger.info(f"DeleteOperation request from [{context.peer()}]") 

147 

148 context.set_details("BuildGrid does not support DeleteOperation.") 

149 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

150 return Empty() 

151 

152 @authorize(AuthContext) 

153 @DurationMetric(OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME) 

154 def CancelOperation(self, request, context): 

155 self.__logger.info(f"CancelOperation request from [{context.peer()}]") 

156 

157 try: 

158 name = request.name 

159 

160 instance_name = self._parse_instance_name(name) 

161 instance = self._get_instance(instance_name) 

162 

163 operation_name = self._parse_operation_name(name) 

164 instance.cancel_operation(operation_name) 

165 

166 except InvalidArgumentError as e: 

167 self.__logger.info(e) 

168 context.set_details(str(e)) 

169 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

170 

171 except RetriableError as e: 

172 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

173 context.abort_with_status(e.error_status) 

174 

175 except Exception as e: 

176 self.__logger.exception( 

177 f"Unexpected error in CancelOperation; request=[{request}]" 

178 ) 

179 context.set_details(str(e)) 

180 context.set_code(grpc.StatusCode.INTERNAL) 

181 

182 return Empty() 

183 

184 # --- Private API --- 

185 

186 def _parse_instance_name(self, name): 

187 """ If the instance name is not blank, 'name' will have the form 

188 {instance_name}/{operation_uuid}. Otherwise, it will just be 

189 {operation_uuid} """ 

190 names = name.split('/') 

191 return '/'.join(names[:-1]) if len(names) > 1 else '' 

192 

193 def _parse_operation_name(self, name): 

194 names = name.split('/') 

195 return names[-1] if len(names) > 1 else name 

196 

197 def _get_instance(self, name): 

198 try: 

199 return self._instances[name] 

200 

201 except KeyError: 

202 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]") 

203 

204 @staticmethod 

205 def _operation_request_metadata_entry(operation_metadata): 

206 request_metadata = request_metadata_from_scheduler_dict(operation_metadata) 

207 return 'requestmetadata-bin', request_metadata.SerializeToString()