Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/operations/service.py: 82.86%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

105 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17OperationsService 

18================= 

19 

20""" 

21 

22import logging 

23 

24import grpc 

25 

26from google.protobuf.empty_pb2 import Empty 

27 

28from buildgrid._exceptions import InvalidArgumentError, RetriableError 

29from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2 

30from buildgrid.server._authentication import AuthContext, authorize 

31from buildgrid.server.metrics_names import ( 

32 OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME, 

33 OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME, 

34 OPERATIONS_GET_OPERATION_TIME_METRIC_NAME, 

35 OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME 

36) 

37from buildgrid.server.metrics_utils import DurationMetric 

38from buildgrid.server.request_metadata_utils import request_metadata_from_scheduler_dict 

39 

40 

41class OperationsService(operations_pb2_grpc.OperationsServicer): 

42 

43 def __init__(self, server): 

44 self.__logger = logging.getLogger(__name__) 

45 

46 self._instances = {} 

47 

48 operations_pb2_grpc.add_OperationsServicer_to_server(self, server) 

49 

50 # --- Public API --- 

51 

52 def add_instance(self, instance_name, instance): 

53 """Registers a new servicer instance. 

54 

55 Args: 

56 instance_name (str): The new instance's name. 

57 instance (OperationsInstance): The new instance itself. 

58 """ 

59 self._instances[instance_name] = instance 

60 

61 # --- Public API: Servicer --- 

62 

63 @authorize(AuthContext) 

64 @DurationMetric(OPERATIONS_GET_OPERATION_TIME_METRIC_NAME) 

65 def GetOperation(self, request, context): 

66 self.__logger.info(f"GetOperation request from [{context.peer()}]") 

67 

68 try: 

69 name = request.name 

70 

71 instance_name = self._parse_instance_name(name) 

72 instance = self._get_instance(instance_name) 

73 

74 operation_name = self._parse_operation_name(name) 

75 operation, metadata = instance.get_operation(operation_name) 

76 op = operations_pb2.Operation() 

77 op.CopyFrom(operation) 

78 op.name = name 

79 

80 if metadata is not None: 

81 metadata_entry = self._operation_request_metadata_entry(metadata) 

82 context.set_trailing_metadata([metadata_entry]) 

83 

84 return op 

85 

86 except InvalidArgumentError as e: 

87 self.__logger.info(e) 

88 context.set_details(str(e)) 

89 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

90 

91 except RetriableError as e: 

92 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

93 context.abort_with_status(e.error_status) 

94 

95 except Exception as e: 

96 self.__logger.exception( 

97 f"Unexpected error in GetOperation; request=[{request}]" 

98 ) 

99 context.set_code(grpc.StatusCode.INTERNAL) 

100 

101 return operations_pb2.Operation() 

102 

103 @authorize(AuthContext) 

104 @DurationMetric(OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME) 

105 def ListOperations(self, request, context): 

106 self.__logger.info(f"ListOperations request from [{context.peer()}]") 

107 

108 try: 

109 # The request name should be the collection name 

110 # In our case, this is just the instance_name 

111 instance_name = request.name 

112 instance = self._get_instance(instance_name) 

113 

114 result = instance.list_operations(request.filter, 

115 request.page_size, 

116 request.page_token) 

117 

118 for operation in result.operations: 

119 operation.name = f"{instance_name}/{operation.name}" 

120 

121 return result 

122 

123 except InvalidArgumentError as e: 

124 # This is a client error. Don't log at error on the server side 

125 self.__logger.info(e) 

126 context.set_details(str(e)) 

127 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

128 

129 except RetriableError as e: 

130 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

131 context.abort_with_status(e.error_status) 

132 

133 except Exception as e: 

134 self.__logger.exception( 

135 f"Unexpected error in ListOperations; request=[{request}]" 

136 ) 

137 context.set_code(grpc.StatusCode.INTERNAL) 

138 

139 return operations_pb2.ListOperationsResponse() 

140 

141 @authorize(AuthContext) 

142 @DurationMetric(OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME) 

143 def DeleteOperation(self, request, context): 

144 self.__logger.info(f"DeleteOperation request from [{context.peer()}]") 

145 

146 context.set_details("BuildGrid does not support DeleteOperation.") 

147 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

148 return Empty() 

149 

150 @authorize(AuthContext) 

151 @DurationMetric(OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME) 

152 def CancelOperation(self, request, context): 

153 self.__logger.info(f"CancelOperation request from [{context.peer()}]") 

154 

155 try: 

156 name = request.name 

157 

158 instance_name = self._parse_instance_name(name) 

159 instance = self._get_instance(instance_name) 

160 

161 operation_name = self._parse_operation_name(name) 

162 instance.cancel_operation(operation_name) 

163 

164 except InvalidArgumentError as e: 

165 self.__logger.info(e) 

166 context.set_details(str(e)) 

167 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

168 

169 except RetriableError as e: 

170 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

171 context.abort_with_status(e.error_status) 

172 

173 except Exception as e: 

174 self.__logger.exception( 

175 f"Unexpected error in CancelOperation; request=[{request}]" 

176 ) 

177 context.set_code(grpc.StatusCode.INTERNAL) 

178 

179 return Empty() 

180 

181 # --- Private API --- 

182 

183 def _parse_instance_name(self, name): 

184 """ If the instance name is not blank, 'name' will have the form 

185 {instance_name}/{operation_uuid}. Otherwise, it will just be 

186 {operation_uuid} """ 

187 names = name.split('/') 

188 return '/'.join(names[:-1]) if len(names) > 1 else '' 

189 

190 def _parse_operation_name(self, name): 

191 names = name.split('/') 

192 return names[-1] if len(names) > 1 else name 

193 

194 def _get_instance(self, name): 

195 try: 

196 return self._instances[name] 

197 

198 except KeyError: 

199 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]") 

200 

201 @staticmethod 

202 def _operation_request_metadata_entry(operation_metadata): 

203 request_metadata = request_metadata_from_scheduler_dict(operation_metadata) 

204 return 'requestmetadata-bin', request_metadata.SerializeToString()