Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/service.py: 90.57%

53 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-07-10 13:10 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from typing import Iterable, Iterator, cast 

17 

18import grpc 

19 

20from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR 

21from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteRequest, WaitExecutionRequest 

22from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import ( 

23 ExecutionServicer, 

24 add_ExecutionServicer_to_server, 

25) 

26from buildgrid._protos.google.longrunning import operations_pb2 

27from buildgrid.server.context import current_instance 

28from buildgrid.server.decorators import rpc 

29from buildgrid.server.exceptions import CancelledError 

30from buildgrid.server.execution.instance import ExecutionInstance 

31from buildgrid.server.logging import buildgrid_logger 

32from buildgrid.server.metadata import extract_client_identity, extract_request_metadata, extract_scheduling_metadata 

33from buildgrid.server.servicer import InstancedServicer 

34from buildgrid.server.utils.cancellation import CancellationContext 

35 

36LOGGER = buildgrid_logger(__name__) 

37 

38 

39def _parse_instance_name(operation_name: str) -> str: 

40 names = operation_name.split("/") 

41 return "/".join(names[:-1]) 

42 

43 

44class ExecutionService(ExecutionServicer, InstancedServicer[ExecutionInstance]): 

45 SERVICE_NAME = "Execution" 

46 REGISTER_METHOD = add_ExecutionServicer_to_server 

47 FULL_NAME = RE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name 

48 

49 @rpc(instance_getter=lambda r: cast(str, r.instance_name)) 

50 def Execute(self, request: ExecuteRequest, context: grpc.ServicerContext) -> Iterator[operations_pb2.Operation]: 

51 yield from self._handle_request(request, context) 

52 

53 @rpc(instance_getter=lambda r: _parse_instance_name(r.name)) 

54 def WaitExecution( 

55 self, request: WaitExecutionRequest, context: grpc.ServicerContext 

56 ) -> Iterator[operations_pb2.Operation]: 

57 yield from self._handle_request(request, context) 

58 

59 def query_connected_clients_for_instance(self, instance_name: str) -> int: 

60 if instance := self.instances.get(instance_name): 

61 return instance.scheduler.ops_notifier.listener_count() 

62 return 0 

63 

64 def _handle_request( 

65 self, 

66 request: ExecuteRequest | WaitExecutionRequest, 

67 context: grpc.ServicerContext, 

68 ) -> Iterable[operations_pb2.Operation]: 

69 peer_uid = context.peer() 

70 

71 try: 

72 if isinstance(request, ExecuteRequest): 

73 request_metadata = extract_request_metadata(context.invocation_metadata()) 

74 client_identity = extract_client_identity(current_instance(), context.invocation_metadata()) 

75 scheduling_metadata = extract_scheduling_metadata(context.invocation_metadata()) 

76 operation_name = self.current_instance.execute( 

77 action_digest=request.action_digest, 

78 skip_cache_lookup=request.skip_cache_lookup, 

79 priority=request.execution_policy.priority, 

80 request_metadata=request_metadata, 

81 client_identity=client_identity, 

82 scheduling_metadata=scheduling_metadata, 

83 ) 

84 else: # isinstance(request, WaitExecutionRequest)" 

85 names = request.name.split("/") 

86 operation_name = names[-1] 

87 

88 operation_full_name = f"{current_instance()}/{operation_name}" 

89 

90 for operation in self.current_instance.stream_operation_updates( 

91 operation_name, context=CancellationContext(context) 

92 ): 

93 operation.name = operation_full_name 

94 yield operation 

95 

96 if not context.is_active(): 

97 LOGGER.info( 

98 "Peer was holding thread for operation updates but the rpc context is inactive; releasing thread.", 

99 tags=dict(peer_uid=peer_uid, instance_name=current_instance(), operation_name=operation_name), 

100 ) 

101 

102 except CancelledError as e: 

103 LOGGER.info(e) 

104 context.set_details(str(e)) 

105 context.set_code(grpc.StatusCode.CANCELLED) 

106 yield e.last_response # type: ignore[misc] # need a better signature