Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/service.py: 92.00%
75 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17ExecutionService
18================
20Serves remote execution requests.
21"""
23import logging
24from typing import Iterable, Iterator, Union, cast
26import grpc
28from buildgrid._exceptions import CancelledError, RetriableError
29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR
30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteRequest, WaitExecutionRequest
31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import (
32 ExecutionServicer,
33 add_ExecutionServicer_to_server,
34)
35from buildgrid._protos.google.longrunning import operations_pb2
36from buildgrid.server.auth.manager import authorize_unary_stream
37from buildgrid.server.execution.instance import ExecutionInstance
38from buildgrid.server.metrics_names import (
39 EXECUTE_EXCEPTION_COUNT_METRIC_NAME,
40 EXECUTE_REQUEST_COUNT_METRIC_NAME,
41 EXECUTE_SERVICER_TIME_METRIC_NAME,
42 WAIT_EXECUTION_EXCEPTION_COUNT_METRIC_NAME,
43 WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME,
44 WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME,
45)
46from buildgrid.server.metrics_utils import (
47 Counter,
48 generator_method_duration_metric,
49 generator_method_exception_counter,
50)
51from buildgrid.server.request_metadata_utils import (
52 extract_client_identity,
53 extract_request_metadata,
54 printable_client_identity,
55 printable_request_metadata,
56)
57from buildgrid.server.servicer import InstancedServicer
58from buildgrid.server.utils.context import CancellationContext
59from buildgrid.server.utils.decorators import handle_errors_unary_stream, track_request_id_generator
61LOGGER = logging.getLogger(__name__)
64def _parse_instance_name(operation_name: str) -> str:
65 names = operation_name.split("/")
66 return "/".join(names[:-1])
69class ExecutionService(ExecutionServicer, InstancedServicer[ExecutionInstance]):
70 REGISTER_METHOD = add_ExecutionServicer_to_server
71 FULL_NAME = RE_DESCRIPTOR.services_by_name["Execution"].full_name
73 execute_ignored_exceptions = (RetriableError,)
75 @authorize_unary_stream(lambda r: cast(str, r.instance_name))
76 @track_request_id_generator
77 @generator_method_duration_metric(EXECUTE_SERVICER_TIME_METRIC_NAME)
78 @generator_method_exception_counter(
79 EXECUTE_EXCEPTION_COUNT_METRIC_NAME,
80 ignored_exceptions=execute_ignored_exceptions,
81 )
82 @handle_errors_unary_stream(operations_pb2.Operation)
83 def Execute(self, request: ExecuteRequest, context: grpc.ServicerContext) -> Iterator[operations_pb2.Operation]:
84 """Handles ExecuteRequest messages.
86 Args:
87 request (ExecuteRequest): The incoming RPC request.
88 context (grpc.ServicerContext): Context for the RPC call.
89 """
90 LOGGER.info(
91 f"Execute request from [{context.peer()}] "
92 f"([{printable_request_metadata(context.invocation_metadata())}]) "
93 f"([{printable_client_identity(request.instance_name, context.invocation_metadata())}])"
94 )
96 instance_name = request.instance_name
98 with Counter(metric_name=EXECUTE_REQUEST_COUNT_METRIC_NAME, instance_name=instance_name) as num_requests:
99 num_requests.increment(1)
101 yield from self._handle_request(request, context, instance_name)
103 wait_execution_ignored_exceptions = (RetriableError,)
105 @authorize_unary_stream(lambda r: _parse_instance_name(r.name))
106 @track_request_id_generator
107 @generator_method_duration_metric(WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME)
108 @generator_method_exception_counter(
109 WAIT_EXECUTION_EXCEPTION_COUNT_METRIC_NAME,
110 ignored_exceptions=wait_execution_ignored_exceptions,
111 )
112 @handle_errors_unary_stream(operations_pb2.Operation)
113 def WaitExecution(
114 self, request: WaitExecutionRequest, context: grpc.ServicerContext
115 ) -> Iterator[operations_pb2.Operation]:
116 """Handles WaitExecutionRequest messages.
118 Args:
119 request (WaitExecutionRequest): The incoming RPC request.
120 context (grpc.ServicerContext): Context for the RPC call.
121 """
122 LOGGER.info(
123 f"WaitExecution request from [{context.peer()}] "
124 f"([{printable_request_metadata(context.invocation_metadata())}])"
125 )
127 instance_name = _parse_instance_name(request.name)
129 with Counter(
130 metric_name=WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME, instance_name=instance_name
131 ) as num_requests:
132 num_requests.increment(1)
134 yield from self._handle_request(request, context, instance_name)
136 def query_n_clients(self) -> int:
137 return sum(map(self.query_n_clients_for_instance, self.instances))
139 def query_n_clients_for_instance(self, instance_name: str) -> int:
140 if instance := self.instances.get(instance_name):
141 return instance.scheduler.ops_notifier.listener_count()
142 return 0
144 def _handle_request(
145 self,
146 request: Union[ExecuteRequest, WaitExecutionRequest],
147 context: grpc.ServicerContext,
148 instance_name: str,
149 ) -> Iterable[operations_pb2.Operation]:
150 peer_uid = context.peer()
152 try:
153 instance = self.get_instance(instance_name)
155 if isinstance(request, ExecuteRequest):
156 request_metadata = extract_request_metadata(context.invocation_metadata())
157 client_identity = extract_client_identity(instance_name, context.invocation_metadata())
158 operation_name = instance.execute(
159 action_digest=request.action_digest,
160 skip_cache_lookup=request.skip_cache_lookup,
161 priority=request.execution_policy.priority,
162 request_metadata=request_metadata,
163 client_identity=client_identity,
164 )
165 else: # isinstance(request, WaitExecutionRequest)"
166 names = request.name.split("/")
167 operation_name = names[-1]
169 operation_full_name = f"{instance_name}/{operation_name}"
171 for operation in instance.stream_operation_updates(operation_name, CancellationContext(context)):
172 operation.name = operation_full_name
173 yield operation
175 if not context.is_active():
176 LOGGER.info(
177 f"Peer peer_uid=[{peer_uid}] was holding up a thread for "
178 f"`stream_operation_updates()` for instance_name=[{instance_name}], "
179 f"operation_name=[{operation_name}], but the rpc context is not "
180 "active anymore; releasing thread."
181 )
183 except CancelledError as e:
184 LOGGER.info(e)
185 context.set_details(str(e))
186 context.set_code(grpc.StatusCode.CANCELLED)
187 yield e.last_response # type: ignore[misc] # need a better signature