Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/service.py: 68.80%

125 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ExecutionService 

18================ 

19 

20Serves remote execution requests. 

21""" 

22 

23import logging 

24from functools import partial 

25 

26import grpc 

27 

28 

29from buildgrid._exceptions import ( 

30 CancelledError, 

31 FailedPreconditionError, 

32 InvalidArgumentError, 

33 RetriableError 

34) 

35from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc 

36from buildgrid._protos.google.longrunning import operations_pb2 

37from buildgrid.server._authentication import AuthContext, authorize 

38from buildgrid.server.peer import Peer 

39from buildgrid.server._resources import ExecContext, limit 

40from buildgrid.server.metrics_utils import ( 

41 Counter, 

42 generator_method_duration_metric 

43) 

44from buildgrid.server.metrics_names import ( 

45 EXECUTE_REQUEST_COUNT_METRIC_NAME, 

46 EXECUTE_SERVICER_TIME_METRIC_NAME, 

47 WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME, 

48 WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME 

49) 

50from buildgrid.server.request_metadata_utils import ( 

51 extract_request_metadata, 

52 printable_request_metadata 

53) 

54 

55 

56class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): 

57 

58 def __init__(self, server, monitor=False): 

59 self.__logger = logging.getLogger(__name__) 

60 

61 self.__peers_by_instance = None 

62 self.__peers = None 

63 

64 self._instances = {} 

65 

66 remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server) 

67 

68 self._is_instrumented = monitor 

69 

70 if self._is_instrumented: 

71 self.__peers_by_instance = {} 

72 self.__peers = {} 

73 

74 # --- Public API --- 

75 

76 def add_instance(self, instance_name, instance): 

77 """Registers a new servicer instance. 

78 

79 Args: 

80 instance_name (str): The new instance's name. 

81 instance (ExecutionInstance): The new instance itself. 

82 """ 

83 self._instances[instance_name] = instance 

84 

85 if self._is_instrumented: 

86 self.__peers_by_instance[instance_name] = set() 

87 

88 def get_scheduler(self, instance_name): 

89 """Retrieves a reference to the scheduler for an instance. 

90 

91 Args: 

92 instance_name (str): The name of the instance to query. 

93 

94 Returns: 

95 Scheduler: A reference to the scheduler for `instance_name`. 

96 

97 Raises: 

98 InvalidArgumentError: If no instance named `instance_name` exists. 

99 """ 

100 instance = self._get_instance(instance_name) 

101 

102 return instance.scheduler 

103 

104 # --- Public API: Servicer --- 

105 

106 @authorize(AuthContext) 

107 @limit(ExecContext) 

108 @generator_method_duration_metric(EXECUTE_SERVICER_TIME_METRIC_NAME) 

109 def Execute(self, request, context): 

110 """Handles ExecuteRequest messages. 

111 

112 Args: 

113 request (ExecuteRequest): The incoming RPC request. 

114 context (grpc.ServicerContext): Context for the RPC call. 

115 """ 

116 self.__logger.info(f"Execute request from [{context.peer()}] " 

117 f"([{printable_request_metadata(context.invocation_metadata())}])") 

118 

119 metric_name = EXECUTE_REQUEST_COUNT_METRIC_NAME 

120 

121 instance_name = request.instance_name 

122 

123 with Counter(metric_name=EXECUTE_REQUEST_COUNT_METRIC_NAME, 

124 instance_name=instance_name) as num_requests: 

125 num_requests.increment(1) 

126 

127 yield from self._handle_request(request, context, metric_name, 

128 instance_name) 

129 

130 @authorize(AuthContext) 

131 @limit(ExecContext) 

132 @generator_method_duration_metric(WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME) 

133 def WaitExecution(self, request, context): 

134 """Handles WaitExecutionRequest messages. 

135 

136 Args: 

137 request (WaitExecutionRequest): The incoming RPC request. 

138 context (grpc.ServicerContext): Context for the RPC call. 

139 """ 

140 self.__logger.info(f"WaitExecution request from [{context.peer()}] " 

141 f"([{printable_request_metadata(context.invocation_metadata())}])") 

142 

143 metric_name = WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME 

144 

145 names = request.name.split('/') 

146 instance_name = '/'.join(names[:-1]) 

147 

148 with Counter(metric_name=WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME, 

149 instance_name=instance_name) as num_requests: 

150 num_requests.increment(1) 

151 

152 yield from self._handle_request(request, context, metric_name, 

153 instance_name) 

154 

155 # --- Public API: Monitoring --- 

156 

157 @property 

158 def is_instrumented(self): 

159 return self._is_instrumented 

160 

161 def query_n_clients(self): 

162 if self.__peers is not None: 

163 return len(self.__peers) 

164 return 0 

165 

166 def query_n_clients_for_instance(self, instance_name): 

167 try: 

168 if self.__peers_by_instance is not None: 

169 return len(self.__peers_by_instance[instance_name]) 

170 except KeyError: 

171 pass 

172 return 0 

173 

174 # --- Private API --- 

175 

176 def _rpc_termination_callback(self, peer_uid, instance_name, operation_name): 

177 self.__logger.debug(f"RPC terminated for peer_uid=[{peer_uid}], " 

178 f"instance_name=[{instance_name}], " 

179 f"operation_name=[{operation_name}]") 

180 

181 instance = self._get_instance(instance_name) 

182 

183 instance.unregister_operation_peer(operation_name, peer_uid) 

184 

185 if self._is_instrumented: 

186 if self.__peers[peer_uid] > 1: 

187 self.__peers[peer_uid] -= 1 

188 else: 

189 self.__peers_by_instance[instance_name].remove(peer_uid) 

190 del self.__peers[peer_uid] 

191 

192 Peer.deregister_peer(peer_uid) 

193 

194 def _get_instance(self, name): 

195 try: 

196 return self._instances[name] 

197 

198 except KeyError: 

199 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]") 

200 

201 def _handle_request(self, request, context, metric_name, instance_name): 

202 peer_uid = context.peer() 

203 

204 Peer.register_peer(uid=peer_uid, context=context) 

205 

206 try: 

207 instance = self._get_instance(instance_name) 

208 

209 if metric_name == EXECUTE_REQUEST_COUNT_METRIC_NAME: 

210 request_metadata = extract_request_metadata(context.invocation_metadata()) 

211 job_name = instance.execute(request.action_digest, 

212 request.skip_cache_lookup, 

213 request.execution_policy.priority) 

214 

215 operation_name = instance.register_job_peer( 

216 job_name, peer_uid, request_metadata=request_metadata) 

217 

218 elif metric_name == WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME: 

219 names = request.name.split('/') 

220 operation_name = names[-1] 

221 

222 instance.register_operation_peer(operation_name, peer_uid) 

223 

224 context.add_callback(partial(self._rpc_termination_callback, 

225 peer_uid, instance_name, operation_name)) 

226 

227 if self._is_instrumented: 

228 if peer_uid not in self.__peers: 

229 self.__peers_by_instance[instance_name].add(peer_uid) 

230 self.__peers[peer_uid] = 1 

231 else: 

232 self.__peers[peer_uid] += 1 

233 

234 operation_full_name = f"{instance_name}/{operation_name}" 

235 

236 for operation in instance.stream_operation_updates(operation_name, context): 

237 operation.name = operation_full_name 

238 yield operation 

239 

240 if not context.is_active(): 

241 self.__logger.info(f"Peer peer_uid=[{peer_uid}] was holding up a thread for " 

242 f"`stream_operation_updates()` for instance_name=[{instance_name}], " 

243 f"operation_name=[{operation_name}], but the rpc context is not " 

244 "active anymore; releasing thread.") 

245 

246 except InvalidArgumentError as e: 

247 self.__logger.info(e) 

248 context.set_details(str(e)) 

249 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

250 yield operations_pb2.Operation() 

251 

252 except FailedPreconditionError as e: 

253 self.__logger.error(e) 

254 context.set_details(str(e)) 

255 context.set_code(grpc.StatusCode.FAILED_PRECONDITION) 

256 yield operations_pb2.Operation() 

257 

258 except CancelledError as e: 

259 self.__logger.info(f"Operation cancelled [{operation_full_name}]") 

260 context.set_details(str(e)) 

261 context.set_code(grpc.StatusCode.CANCELLED) 

262 yield e.last_response 

263 

264 # Attempt to catch postgres connection failures and instruct clients to retry 

265 except RetriableError as e: 

266 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

267 context.abort_with_status(e.error_status) 

268 

269 except Exception as e: 

270 self.__logger.exception( 

271 f"Unexpected error in Execute: request=[{request}]" 

272 ) 

273 context.set_details(str(e)) 

274 context.set_code(grpc.StatusCode.INTERNAL)