Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/service.py: 69.13%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

149 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ExecutionService 

18================ 

19 

20Serves remote execution requests. 

21""" 

22 

23import logging 

24from functools import partial 

25 

26import grpc 

27 

28 

29from buildgrid._exceptions import ( 

30 CancelledError, 

31 FailedPreconditionError, 

32 InvalidArgumentError, 

33 RetriableError 

34) 

35from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc 

36from buildgrid._protos.google.longrunning import operations_pb2 

37from buildgrid.server._authentication import AuthContext, authorize 

38from buildgrid.server.peer import Peer 

39from buildgrid.server._resources import ExecContext, limit 

40from buildgrid.server.metrics_utils import ( 

41 Counter, 

42 generator_method_duration_metric 

43) 

44from buildgrid.server.metrics_names import ( 

45 EXECUTE_REQUEST_COUNT_METRIC_NAME, 

46 EXECUTE_SERVICER_TIME_METRIC_NAME, 

47 WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME, 

48 WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME 

49) 

50from buildgrid.server.request_metadata_utils import ( 

51 extract_request_metadata, 

52 printable_request_metadata 

53) 

54 

55 

56class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): 

57 

58 def __init__(self, server, monitor=False): 

59 self.__logger = logging.getLogger(__name__) 

60 

61 self.__peers_by_instance = None 

62 self.__peers = None 

63 

64 self._instances = {} 

65 

66 remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server) 

67 

68 self._is_instrumented = monitor 

69 

70 if self._is_instrumented: 

71 self.__peers_by_instance = {} 

72 self.__peers = {} 

73 

74 # --- Public API --- 

75 

76 def add_instance(self, instance_name, instance): 

77 """Registers a new servicer instance. 

78 

79 Args: 

80 instance_name (str): The new instance's name. 

81 instance (ExecutionInstance): The new instance itself. 

82 """ 

83 self._instances[instance_name] = instance 

84 

85 if self._is_instrumented: 

86 self.__peers_by_instance[instance_name] = set() 

87 

88 def get_scheduler(self, instance_name): 

89 """Retrieves a reference to the scheduler for an instance. 

90 

91 Args: 

92 instance_name (str): The name of the instance to query. 

93 

94 Returns: 

95 Scheduler: A reference to the scheduler for `instance_name`. 

96 

97 Raises: 

98 InvalidArgumentError: If no instance named `instance_name` exists. 

99 """ 

100 instance = self._get_instance(instance_name) 

101 

102 return instance.scheduler 

103 

104 # --- Public API: Servicer --- 

105 

106 @authorize(AuthContext) 

107 @limit(ExecContext) 

108 @generator_method_duration_metric(EXECUTE_SERVICER_TIME_METRIC_NAME) 

109 def Execute(self, request, context): 

110 """Handles ExecuteRequest messages. 

111 

112 Args: 

113 request (ExecuteRequest): The incoming RPC request. 

114 context (grpc.ServicerContext): Context for the RPC call. 

115 """ 

116 self.__logger.info(f"Execute request from [{context.peer()}] " 

117 f"([{printable_request_metadata(context.invocation_metadata())}])") 

118 

119 request_metadata = extract_request_metadata(context.invocation_metadata()) 

120 instance_name = request.instance_name 

121 peer_uid = context.peer() 

122 

123 Peer.register_peer(uid=peer_uid, context=context) 

124 

125 with Counter(metric_name=EXECUTE_REQUEST_COUNT_METRIC_NAME, 

126 instance_name=instance_name) as num_requests: 

127 num_requests.increment(1) 

128 

129 try: 

130 instance = self._get_instance(instance_name) 

131 

132 job_name = instance.execute(request.action_digest, 

133 request.skip_cache_lookup, 

134 request.execution_policy.priority) 

135 

136 operation_name = instance.register_job_peer( 

137 job_name, peer_uid, request_metadata=request_metadata) 

138 

139 context.add_callback(partial(self._rpc_termination_callback, 

140 peer_uid, instance_name, operation_name)) 

141 

142 if self._is_instrumented: 

143 if peer_uid not in self.__peers: 

144 self.__peers_by_instance[instance_name].add(peer_uid) 

145 self.__peers[peer_uid] = 1 

146 else: 

147 self.__peers[peer_uid] += 1 

148 

149 operation_full_name = f"{instance_name}/{operation_name}" 

150 

151 self.__logger.info( 

152 f"Operation [{operation_full_name}] created for job [{job_name}]") 

153 

154 for operation in instance.stream_operation_updates(operation_name, context): 

155 operation.name = operation_full_name 

156 yield operation 

157 

158 if not context.is_active(): 

159 self.__logger.info(f"Peer peer_uid=[{peer_uid}] was holding up a thread for " 

160 f"`stream_operation_updates()` for instance_name=[{instance_name}], " 

161 f"operation_name=[{operation_name}], but the rpc context is not " 

162 "active anymore; releasing thread.") 

163 

164 except InvalidArgumentError as e: 

165 self.__logger.info(e) 

166 context.set_details(str(e)) 

167 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

168 yield operations_pb2.Operation() 

169 

170 except FailedPreconditionError as e: 

171 self.__logger.error(e) 

172 context.set_details(str(e)) 

173 context.set_code(grpc.StatusCode.FAILED_PRECONDITION) 

174 yield operations_pb2.Operation() 

175 

176 except CancelledError as e: 

177 self.__logger.info(f"Operation cancelled [{operation_full_name}]") 

178 context.set_details(str(e)) 

179 context.set_code(grpc.StatusCode.CANCELLED) 

180 yield e.last_response 

181 

182 # Attempt to catch postgres connection failures and instruct clients to retry 

183 except RetriableError as e: 

184 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

185 context.abort_with_status(e.error_status) 

186 

187 except Exception as e: 

188 self.__logger.exception( 

189 f"Unexpected error in Execute: request=[{request}]" 

190 ) 

191 context.set_code(grpc.StatusCode.INTERNAL) 

192 

193 @authorize(AuthContext) 

194 @limit(ExecContext) 

195 @generator_method_duration_metric(WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME) 

196 def WaitExecution(self, request, context): 

197 """Handles WaitExecutionRequest messages. 

198 

199 Args: 

200 request (WaitExecutionRequest): The incoming RPC request. 

201 context (grpc.ServicerContext): Context for the RPC call. 

202 """ 

203 self.__logger.info(f"WaitExecution request from [{context.peer()}] " 

204 f"([{printable_request_metadata(context.invocation_metadata())}])") 

205 

206 names = request.name.split('/') 

207 instance_name = '/'.join(names[:-1]) 

208 operation_name = names[-1] 

209 peer = context.peer() 

210 

211 Peer.register_peer(uid=peer, context=context) 

212 

213 with Counter(metric_name=WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME, 

214 instance_name=instance_name) as num_requests: 

215 num_requests.increment(1) 

216 

217 try: 

218 instance = self._get_instance(instance_name) 

219 

220 instance.register_operation_peer(operation_name, peer) 

221 

222 context.add_callback(partial(self._rpc_termination_callback, 

223 peer, instance_name, operation_name)) 

224 

225 if self._is_instrumented: 

226 if peer not in self.__peers: 

227 self.__peers_by_instance[instance_name].add(peer) 

228 self.__peers[peer] = 1 

229 else: 

230 self.__peers[peer] += 1 

231 

232 operation_full_name = f"{instance_name}/{operation_name}" 

233 

234 for operation in instance.stream_operation_updates(operation_name, context): 

235 operation.name = operation_full_name 

236 yield operation 

237 

238 if not context.is_active(): 

239 self.__logger.info(f"Peer peer_uid=[{peer}] was holding up a thread for " 

240 f"`stream_operation_updates()` for instance_name=[{instance_name}], " 

241 f"operation_name=[{operation_name}], but the rpc context is not " 

242 "active anymore; releasing thread.") 

243 

244 except InvalidArgumentError as e: 

245 self.__logger.info(e) 

246 context.set_details(str(e)) 

247 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

248 yield operations_pb2.Operation() 

249 

250 except CancelledError as e: 

251 self.__logger.info(f"Operation cancelled [{operation_full_name}]") 

252 context.set_details(str(e)) 

253 context.set_code(grpc.StatusCode.CANCELLED) 

254 yield e.last_response 

255 

256 # Attempt to catch postgres connection failures and instruct clients to retry 

257 except RetriableError as e: 

258 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

259 context.abort_with_status(e.error_status) 

260 

261 except Exception as e: 

262 self.__logger.exception( 

263 f"Unexpected error in WaitExecution; request=[{request}]" 

264 ) 

265 context.set_code(grpc.StatusCode.INTERNAL) 

266 

267 # --- Public API: Monitoring --- 

268 

269 @property 

270 def is_instrumented(self): 

271 return self._is_instrumented 

272 

273 def query_n_clients(self): 

274 if self.__peers is not None: 

275 return len(self.__peers) 

276 return 0 

277 

278 def query_n_clients_for_instance(self, instance_name): 

279 try: 

280 if self.__peers_by_instance is not None: 

281 return len(self.__peers_by_instance[instance_name]) 

282 except KeyError: 

283 pass 

284 return 0 

285 

286 # --- Private API --- 

287 

288 def _rpc_termination_callback(self, peer_uid, instance_name, operation_name): 

289 self.__logger.debug(f"RPC terminated for peer_uid=[{peer_uid}], " 

290 f"instance_name=[{instance_name}], " 

291 f"operation_name=[{operation_name}]") 

292 

293 instance = self._get_instance(instance_name) 

294 

295 instance.unregister_operation_peer(operation_name, peer_uid) 

296 

297 if self._is_instrumented: 

298 if self.__peers[peer_uid] > 1: 

299 self.__peers[peer_uid] -= 1 

300 else: 

301 self.__peers_by_instance[instance_name].remove(peer_uid) 

302 del self.__peers[peer_uid] 

303 

304 Peer.deregister_peer(peer_uid) 

305 

306 def _get_instance(self, name): 

307 try: 

308 return self._instances[name] 

309 

310 except KeyError: 

311 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]")