Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/service.py: 68.80%
125 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17ExecutionService
18================
20Serves remote execution requests.
21"""
23import logging
24from functools import partial
26import grpc
29from buildgrid._exceptions import (
30 CancelledError,
31 FailedPreconditionError,
32 InvalidArgumentError,
33 RetriableError
34)
35from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
36from buildgrid._protos.google.longrunning import operations_pb2
37from buildgrid.server._authentication import AuthContext, authorize
38from buildgrid.server.peer import Peer
39from buildgrid.server._resources import ExecContext, limit
40from buildgrid.server.metrics_utils import (
41 Counter,
42 generator_method_duration_metric
43)
44from buildgrid.server.metrics_names import (
45 EXECUTE_REQUEST_COUNT_METRIC_NAME,
46 EXECUTE_SERVICER_TIME_METRIC_NAME,
47 WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME,
48 WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME
49)
50from buildgrid.server.request_metadata_utils import (
51 extract_request_metadata,
52 printable_request_metadata
53)
56class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
58 def __init__(self, server, monitor=False):
59 self.__logger = logging.getLogger(__name__)
61 self.__peers_by_instance = None
62 self.__peers = None
64 self._instances = {}
66 remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
68 self._is_instrumented = monitor
70 if self._is_instrumented:
71 self.__peers_by_instance = {}
72 self.__peers = {}
74 # --- Public API ---
76 def add_instance(self, instance_name, instance):
77 """Registers a new servicer instance.
79 Args:
80 instance_name (str): The new instance's name.
81 instance (ExecutionInstance): The new instance itself.
82 """
83 self._instances[instance_name] = instance
85 if self._is_instrumented:
86 self.__peers_by_instance[instance_name] = set()
88 def get_scheduler(self, instance_name):
89 """Retrieves a reference to the scheduler for an instance.
91 Args:
92 instance_name (str): The name of the instance to query.
94 Returns:
95 Scheduler: A reference to the scheduler for `instance_name`.
97 Raises:
98 InvalidArgumentError: If no instance named `instance_name` exists.
99 """
100 instance = self._get_instance(instance_name)
102 return instance.scheduler
104 # --- Public API: Servicer ---
106 @authorize(AuthContext)
107 @limit(ExecContext)
108 @generator_method_duration_metric(EXECUTE_SERVICER_TIME_METRIC_NAME)
109 def Execute(self, request, context):
110 """Handles ExecuteRequest messages.
112 Args:
113 request (ExecuteRequest): The incoming RPC request.
114 context (grpc.ServicerContext): Context for the RPC call.
115 """
116 self.__logger.info(f"Execute request from [{context.peer()}] "
117 f"([{printable_request_metadata(context.invocation_metadata())}])")
119 metric_name = EXECUTE_REQUEST_COUNT_METRIC_NAME
121 instance_name = request.instance_name
123 with Counter(metric_name=EXECUTE_REQUEST_COUNT_METRIC_NAME,
124 instance_name=instance_name) as num_requests:
125 num_requests.increment(1)
127 yield from self._handle_request(request, context, metric_name,
128 instance_name)
130 @authorize(AuthContext)
131 @limit(ExecContext)
132 @generator_method_duration_metric(WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME)
133 def WaitExecution(self, request, context):
134 """Handles WaitExecutionRequest messages.
136 Args:
137 request (WaitExecutionRequest): The incoming RPC request.
138 context (grpc.ServicerContext): Context for the RPC call.
139 """
140 self.__logger.info(f"WaitExecution request from [{context.peer()}] "
141 f"([{printable_request_metadata(context.invocation_metadata())}])")
143 metric_name = WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME
145 names = request.name.split('/')
146 instance_name = '/'.join(names[:-1])
148 with Counter(metric_name=WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME,
149 instance_name=instance_name) as num_requests:
150 num_requests.increment(1)
152 yield from self._handle_request(request, context, metric_name,
153 instance_name)
155 # --- Public API: Monitoring ---
157 @property
158 def is_instrumented(self):
159 return self._is_instrumented
161 def query_n_clients(self):
162 if self.__peers is not None:
163 return len(self.__peers)
164 return 0
166 def query_n_clients_for_instance(self, instance_name):
167 try:
168 if self.__peers_by_instance is not None:
169 return len(self.__peers_by_instance[instance_name])
170 except KeyError:
171 pass
172 return 0
174 # --- Private API ---
176 def _rpc_termination_callback(self, peer_uid, instance_name, operation_name):
177 self.__logger.debug(f"RPC terminated for peer_uid=[{peer_uid}], "
178 f"instance_name=[{instance_name}], "
179 f"operation_name=[{operation_name}]")
181 instance = self._get_instance(instance_name)
183 instance.unregister_operation_peer(operation_name, peer_uid)
185 if self._is_instrumented:
186 if self.__peers[peer_uid] > 1:
187 self.__peers[peer_uid] -= 1
188 else:
189 self.__peers_by_instance[instance_name].remove(peer_uid)
190 del self.__peers[peer_uid]
192 Peer.deregister_peer(peer_uid)
194 def _get_instance(self, name):
195 try:
196 return self._instances[name]
198 except KeyError:
199 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]")
201 def _handle_request(self, request, context, metric_name, instance_name):
202 peer_uid = context.peer()
204 Peer.register_peer(uid=peer_uid, context=context)
206 try:
207 instance = self._get_instance(instance_name)
209 if metric_name == EXECUTE_REQUEST_COUNT_METRIC_NAME:
210 request_metadata = extract_request_metadata(context.invocation_metadata())
211 job_name = instance.execute(request.action_digest,
212 request.skip_cache_lookup,
213 request.execution_policy.priority)
215 operation_name = instance.register_job_peer(
216 job_name, peer_uid, request_metadata=request_metadata)
218 elif metric_name == WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME:
219 names = request.name.split('/')
220 operation_name = names[-1]
222 instance.register_operation_peer(operation_name, peer_uid)
224 context.add_callback(partial(self._rpc_termination_callback,
225 peer_uid, instance_name, operation_name))
227 if self._is_instrumented:
228 if peer_uid not in self.__peers:
229 self.__peers_by_instance[instance_name].add(peer_uid)
230 self.__peers[peer_uid] = 1
231 else:
232 self.__peers[peer_uid] += 1
234 operation_full_name = f"{instance_name}/{operation_name}"
236 for operation in instance.stream_operation_updates(operation_name, context):
237 operation.name = operation_full_name
238 yield operation
240 if not context.is_active():
241 self.__logger.info(f"Peer peer_uid=[{peer_uid}] was holding up a thread for "
242 f"`stream_operation_updates()` for instance_name=[{instance_name}], "
243 f"operation_name=[{operation_name}], but the rpc context is not "
244 "active anymore; releasing thread.")
246 except InvalidArgumentError as e:
247 self.__logger.info(e)
248 context.set_details(str(e))
249 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
250 yield operations_pb2.Operation()
252 except FailedPreconditionError as e:
253 self.__logger.error(e)
254 context.set_details(str(e))
255 context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
256 yield operations_pb2.Operation()
258 except CancelledError as e:
259 self.__logger.info(f"Operation cancelled [{operation_full_name}]")
260 context.set_details(str(e))
261 context.set_code(grpc.StatusCode.CANCELLED)
262 yield e.last_response
264 # Attempt to catch postgres connection failures and instruct clients to retry
265 except RetriableError as e:
266 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}")
267 context.abort_with_status(e.error_status)
269 except Exception as e:
270 self.__logger.exception(
271 f"Unexpected error in Execute: request=[{request}]"
272 )
273 context.set_details(str(e))
274 context.set_code(grpc.StatusCode.INTERNAL)