Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/service.py: 92.00%

75 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-04-15 14:01 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ExecutionService 

18================ 

19 

20Serves remote execution requests. 

21""" 

22 

23import logging 

24from typing import Iterable, Iterator, Union, cast 

25 

26import grpc 

27 

28from buildgrid._exceptions import CancelledError, RetriableError 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteRequest, WaitExecutionRequest 

31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import ( 

32 ExecutionServicer, 

33 add_ExecutionServicer_to_server, 

34) 

35from buildgrid._protos.google.longrunning import operations_pb2 

36from buildgrid.server.auth.manager import authorize_unary_stream 

37from buildgrid.server.execution.instance import ExecutionInstance 

38from buildgrid.server.metrics_names import ( 

39 EXECUTE_EXCEPTION_COUNT_METRIC_NAME, 

40 EXECUTE_REQUEST_COUNT_METRIC_NAME, 

41 EXECUTE_SERVICER_TIME_METRIC_NAME, 

42 WAIT_EXECUTION_EXCEPTION_COUNT_METRIC_NAME, 

43 WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME, 

44 WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME, 

45) 

46from buildgrid.server.metrics_utils import ( 

47 Counter, 

48 generator_method_duration_metric, 

49 generator_method_exception_counter, 

50) 

51from buildgrid.server.request_metadata_utils import ( 

52 extract_client_identity, 

53 extract_request_metadata, 

54 printable_client_identity, 

55 printable_request_metadata, 

56) 

57from buildgrid.server.servicer import InstancedServicer 

58from buildgrid.server.utils.context import CancellationContext 

59from buildgrid.server.utils.decorators import handle_errors_unary_stream, track_request_id_generator 

60 

61LOGGER = logging.getLogger(__name__) 

62 

63 

64def _parse_instance_name(operation_name: str) -> str: 

65 names = operation_name.split("/") 

66 return "/".join(names[:-1]) 

67 

68 

69class ExecutionService(ExecutionServicer, InstancedServicer[ExecutionInstance]): 

70 REGISTER_METHOD = add_ExecutionServicer_to_server 

71 FULL_NAME = RE_DESCRIPTOR.services_by_name["Execution"].full_name 

72 

73 execute_ignored_exceptions = (RetriableError,) 

74 

75 @authorize_unary_stream(lambda r: cast(str, r.instance_name)) 

76 @track_request_id_generator 

77 @generator_method_duration_metric(EXECUTE_SERVICER_TIME_METRIC_NAME) 

78 @generator_method_exception_counter( 

79 EXECUTE_EXCEPTION_COUNT_METRIC_NAME, 

80 ignored_exceptions=execute_ignored_exceptions, 

81 ) 

82 @handle_errors_unary_stream(operations_pb2.Operation) 

83 def Execute(self, request: ExecuteRequest, context: grpc.ServicerContext) -> Iterator[operations_pb2.Operation]: 

84 """Handles ExecuteRequest messages. 

85 

86 Args: 

87 request (ExecuteRequest): The incoming RPC request. 

88 context (grpc.ServicerContext): Context for the RPC call. 

89 """ 

90 LOGGER.info( 

91 f"Execute request from [{context.peer()}] " 

92 f"([{printable_request_metadata(context.invocation_metadata())}]) " 

93 f"([{printable_client_identity(request.instance_name, context.invocation_metadata())}])" 

94 ) 

95 

96 instance_name = request.instance_name 

97 

98 with Counter(metric_name=EXECUTE_REQUEST_COUNT_METRIC_NAME, instance_name=instance_name) as num_requests: 

99 num_requests.increment(1) 

100 

101 yield from self._handle_request(request, context, instance_name) 

102 

103 wait_execution_ignored_exceptions = (RetriableError,) 

104 

105 @authorize_unary_stream(lambda r: _parse_instance_name(r.name)) 

106 @track_request_id_generator 

107 @generator_method_duration_metric(WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME) 

108 @generator_method_exception_counter( 

109 WAIT_EXECUTION_EXCEPTION_COUNT_METRIC_NAME, 

110 ignored_exceptions=wait_execution_ignored_exceptions, 

111 ) 

112 @handle_errors_unary_stream(operations_pb2.Operation) 

113 def WaitExecution( 

114 self, request: WaitExecutionRequest, context: grpc.ServicerContext 

115 ) -> Iterator[operations_pb2.Operation]: 

116 """Handles WaitExecutionRequest messages. 

117 

118 Args: 

119 request (WaitExecutionRequest): The incoming RPC request. 

120 context (grpc.ServicerContext): Context for the RPC call. 

121 """ 

122 LOGGER.info( 

123 f"WaitExecution request from [{context.peer()}] " 

124 f"([{printable_request_metadata(context.invocation_metadata())}])" 

125 ) 

126 

127 instance_name = _parse_instance_name(request.name) 

128 

129 with Counter( 

130 metric_name=WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME, instance_name=instance_name 

131 ) as num_requests: 

132 num_requests.increment(1) 

133 

134 yield from self._handle_request(request, context, instance_name) 

135 

136 def query_n_clients(self) -> int: 

137 return sum(map(self.query_n_clients_for_instance, self.instances)) 

138 

139 def query_n_clients_for_instance(self, instance_name: str) -> int: 

140 if instance := self.instances.get(instance_name): 

141 return instance.scheduler.ops_notifier.listener_count() 

142 return 0 

143 

144 def _handle_request( 

145 self, 

146 request: Union[ExecuteRequest, WaitExecutionRequest], 

147 context: grpc.ServicerContext, 

148 instance_name: str, 

149 ) -> Iterable[operations_pb2.Operation]: 

150 peer_uid = context.peer() 

151 

152 try: 

153 instance = self.get_instance(instance_name) 

154 

155 if isinstance(request, ExecuteRequest): 

156 request_metadata = extract_request_metadata(context.invocation_metadata()) 

157 client_identity = extract_client_identity(instance_name, context.invocation_metadata()) 

158 operation_name = instance.execute( 

159 action_digest=request.action_digest, 

160 skip_cache_lookup=request.skip_cache_lookup, 

161 priority=request.execution_policy.priority, 

162 request_metadata=request_metadata, 

163 client_identity=client_identity, 

164 ) 

165 else: # isinstance(request, WaitExecutionRequest)" 

166 names = request.name.split("/") 

167 operation_name = names[-1] 

168 

169 operation_full_name = f"{instance_name}/{operation_name}" 

170 

171 for operation in instance.stream_operation_updates(operation_name, CancellationContext(context)): 

172 operation.name = operation_full_name 

173 yield operation 

174 

175 if not context.is_active(): 

176 LOGGER.info( 

177 f"Peer peer_uid=[{peer_uid}] was holding up a thread for " 

178 f"`stream_operation_updates()` for instance_name=[{instance_name}], " 

179 f"operation_name=[{operation_name}], but the rpc context is not " 

180 "active anymore; releasing thread." 

181 ) 

182 

183 except CancelledError as e: 

184 LOGGER.info(e) 

185 context.set_details(str(e)) 

186 context.set_code(grpc.StatusCode.CANCELLED) 

187 yield e.last_response # type: ignore[misc] # need a better signature