Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/service.py: 90.57%
53 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-07-10 13:10 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-07-10 13:10 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from typing import Iterable, Iterator, cast
18import grpc
20from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR
21from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteRequest, WaitExecutionRequest
22from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import (
23 ExecutionServicer,
24 add_ExecutionServicer_to_server,
25)
26from buildgrid._protos.google.longrunning import operations_pb2
27from buildgrid.server.context import current_instance
28from buildgrid.server.decorators import rpc
29from buildgrid.server.exceptions import CancelledError
30from buildgrid.server.execution.instance import ExecutionInstance
31from buildgrid.server.logging import buildgrid_logger
32from buildgrid.server.metadata import extract_client_identity, extract_request_metadata, extract_scheduling_metadata
33from buildgrid.server.servicer import InstancedServicer
34from buildgrid.server.utils.cancellation import CancellationContext
36LOGGER = buildgrid_logger(__name__)
39def _parse_instance_name(operation_name: str) -> str:
40 names = operation_name.split("/")
41 return "/".join(names[:-1])
44class ExecutionService(ExecutionServicer, InstancedServicer[ExecutionInstance]):
45 SERVICE_NAME = "Execution"
46 REGISTER_METHOD = add_ExecutionServicer_to_server
47 FULL_NAME = RE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name
49 @rpc(instance_getter=lambda r: cast(str, r.instance_name))
50 def Execute(self, request: ExecuteRequest, context: grpc.ServicerContext) -> Iterator[operations_pb2.Operation]:
51 yield from self._handle_request(request, context)
53 @rpc(instance_getter=lambda r: _parse_instance_name(r.name))
54 def WaitExecution(
55 self, request: WaitExecutionRequest, context: grpc.ServicerContext
56 ) -> Iterator[operations_pb2.Operation]:
57 yield from self._handle_request(request, context)
59 def query_connected_clients_for_instance(self, instance_name: str) -> int:
60 if instance := self.instances.get(instance_name):
61 return instance.scheduler.ops_notifier.listener_count()
62 return 0
64 def _handle_request(
65 self,
66 request: ExecuteRequest | WaitExecutionRequest,
67 context: grpc.ServicerContext,
68 ) -> Iterable[operations_pb2.Operation]:
69 peer_uid = context.peer()
71 try:
72 if isinstance(request, ExecuteRequest):
73 request_metadata = extract_request_metadata(context.invocation_metadata())
74 client_identity = extract_client_identity(current_instance(), context.invocation_metadata())
75 scheduling_metadata = extract_scheduling_metadata(context.invocation_metadata())
76 operation_name = self.current_instance.execute(
77 action_digest=request.action_digest,
78 skip_cache_lookup=request.skip_cache_lookup,
79 priority=request.execution_policy.priority,
80 request_metadata=request_metadata,
81 client_identity=client_identity,
82 scheduling_metadata=scheduling_metadata,
83 )
84 else: # isinstance(request, WaitExecutionRequest)"
85 names = request.name.split("/")
86 operation_name = names[-1]
88 operation_full_name = f"{current_instance()}/{operation_name}"
90 for operation in self.current_instance.stream_operation_updates(
91 operation_name, context=CancellationContext(context)
92 ):
93 operation.name = operation_full_name
94 yield operation
96 if not context.is_active():
97 LOGGER.info(
98 "Peer was holding thread for operation updates but the rpc context is inactive; releasing thread.",
99 tags=dict(peer_uid=peer_uid, instance_name=current_instance(), operation_name=operation_name),
100 )
102 except CancelledError as e:
103 LOGGER.info(e)
104 context.set_details(str(e))
105 context.set_code(grpc.StatusCode.CANCELLED)
106 yield e.last_response # type: ignore[misc] # need a better signature