Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/service.py: 92.31%

78 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ExecutionService 

18================ 

19 

20Serves remote execution requests. 

21""" 

22 

23import logging 

24from typing import Iterable, Iterator, Union, cast 

25 

26import grpc 

27 

28from buildgrid._exceptions import CancelledError, RetriableError 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteRequest, WaitExecutionRequest 

31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import ( 

32 ExecutionServicer, 

33 add_ExecutionServicer_to_server, 

34) 

35from buildgrid._protos.google.longrunning import operations_pb2 

36from buildgrid.server.auth.manager import authorize 

37from buildgrid.server.execution.instance import ExecutionInstance 

38from buildgrid.server.instance import instanced 

39from buildgrid.server.metrics_names import ( 

40 EXECUTE_EXCEPTION_COUNT_METRIC_NAME, 

41 EXECUTE_REQUEST_COUNT_METRIC_NAME, 

42 EXECUTE_SERVICER_TIME_METRIC_NAME, 

43 WAIT_EXECUTION_EXCEPTION_COUNT_METRIC_NAME, 

44 WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME, 

45 WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME, 

46) 

47from buildgrid.server.metrics_utils import ( 

48 Counter, 

49 generator_method_duration_metric, 

50 generator_method_exception_counter, 

51) 

52from buildgrid.server.request_metadata_utils import ( 

53 extract_client_identity, 

54 extract_request_metadata, 

55 printable_client_identity, 

56 printable_request_metadata, 

57) 

58from buildgrid.server.servicer import InstancedServicer 

59from buildgrid.server.utils.context import CancellationContext 

60from buildgrid.server.utils.decorators import handle_errors_unary_stream, track_request_id_generator 

61 

62LOGGER = logging.getLogger(__name__) 

63 

64 

65def _parse_instance_name(operation_name: str) -> str: 

66 names = operation_name.split("/") 

67 return "/".join(names[:-1]) 

68 

69 

70class ExecutionService(ExecutionServicer, InstancedServicer[ExecutionInstance]): 

71 REGISTER_METHOD = add_ExecutionServicer_to_server 

72 FULL_NAME = RE_DESCRIPTOR.services_by_name["Execution"].full_name 

73 

74 execute_ignored_exceptions = (RetriableError,) 

75 

76 @instanced(lambda r: cast(str, r.instance_name)) 

77 @authorize 

78 @track_request_id_generator 

79 @generator_method_duration_metric(EXECUTE_SERVICER_TIME_METRIC_NAME) 

80 @generator_method_exception_counter( 

81 EXECUTE_EXCEPTION_COUNT_METRIC_NAME, 

82 ignored_exceptions=execute_ignored_exceptions, 

83 ) 

84 @handle_errors_unary_stream(operations_pb2.Operation) 

85 def Execute(self, request: ExecuteRequest, context: grpc.ServicerContext) -> Iterator[operations_pb2.Operation]: 

86 """Handles ExecuteRequest messages. 

87 

88 Args: 

89 request (ExecuteRequest): The incoming RPC request. 

90 context (grpc.ServicerContext): Context for the RPC call. 

91 """ 

92 LOGGER.info( 

93 f"Execute request from [{context.peer()}] " 

94 f"([{printable_request_metadata(context.invocation_metadata())}]) " 

95 f"([{printable_client_identity(request.instance_name, context.invocation_metadata())}])" 

96 ) 

97 

98 instance_name = request.instance_name 

99 

100 with Counter(metric_name=EXECUTE_REQUEST_COUNT_METRIC_NAME, instance_name=instance_name) as num_requests: 

101 num_requests.increment(1) 

102 

103 yield from self._handle_request(request, context, instance_name) 

104 

105 wait_execution_ignored_exceptions = (RetriableError,) 

106 

107 @instanced(lambda r: _parse_instance_name(r.name)) 

108 @authorize 

109 @track_request_id_generator 

110 @generator_method_duration_metric(WAIT_EXECUTION_SERVICER_TIME_METRIC_NAME) 

111 @generator_method_exception_counter( 

112 WAIT_EXECUTION_EXCEPTION_COUNT_METRIC_NAME, 

113 ignored_exceptions=wait_execution_ignored_exceptions, 

114 ) 

115 @handle_errors_unary_stream(operations_pb2.Operation) 

116 def WaitExecution( 

117 self, request: WaitExecutionRequest, context: grpc.ServicerContext 

118 ) -> Iterator[operations_pb2.Operation]: 

119 """Handles WaitExecutionRequest messages. 

120 

121 Args: 

122 request (WaitExecutionRequest): The incoming RPC request. 

123 context (grpc.ServicerContext): Context for the RPC call. 

124 """ 

125 LOGGER.info( 

126 f"WaitExecution request from [{context.peer()}] " 

127 f"([{printable_request_metadata(context.invocation_metadata())}])" 

128 ) 

129 

130 instance_name = _parse_instance_name(request.name) 

131 

132 with Counter( 

133 metric_name=WAIT_EXECUTION_REQUEST_COUNT_METRIC_NAME, instance_name=instance_name 

134 ) as num_requests: 

135 num_requests.increment(1) 

136 

137 yield from self._handle_request(request, context, instance_name) 

138 

139 def query_n_clients(self) -> int: 

140 return sum(map(self.query_n_clients_for_instance, self.instances)) 

141 

142 def query_n_clients_for_instance(self, instance_name: str) -> int: 

143 if instance := self.instances.get(instance_name): 

144 return instance.scheduler.ops_notifier.listener_count() 

145 return 0 

146 

147 def _handle_request( 

148 self, 

149 request: Union[ExecuteRequest, WaitExecutionRequest], 

150 context: grpc.ServicerContext, 

151 instance_name: str, 

152 ) -> Iterable[operations_pb2.Operation]: 

153 peer_uid = context.peer() 

154 

155 try: 

156 instance = self.get_instance(instance_name) 

157 

158 if isinstance(request, ExecuteRequest): 

159 request_metadata = extract_request_metadata(context.invocation_metadata()) 

160 client_identity = extract_client_identity(instance_name, context.invocation_metadata()) 

161 operation_name = instance.execute( 

162 action_digest=request.action_digest, 

163 skip_cache_lookup=request.skip_cache_lookup, 

164 priority=request.execution_policy.priority, 

165 request_metadata=request_metadata, 

166 client_identity=client_identity, 

167 ) 

168 else: # isinstance(request, WaitExecutionRequest)" 

169 names = request.name.split("/") 

170 operation_name = names[-1] 

171 

172 operation_full_name = f"{instance_name}/{operation_name}" 

173 

174 for operation in instance.stream_operation_updates(operation_name, CancellationContext(context)): 

175 operation.name = operation_full_name 

176 yield operation 

177 

178 if not context.is_active(): 

179 LOGGER.info( 

180 f"Peer peer_uid=[{peer_uid}] was holding up a thread for " 

181 f"`stream_operation_updates()` for instance_name=[{instance_name}], " 

182 f"operation_name=[{operation_name}], but the rpc context is not " 

183 "active anymore; releasing thread." 

184 ) 

185 

186 except CancelledError as e: 

187 LOGGER.info(e) 

188 context.set_details(str(e)) 

189 context.set_code(grpc.StatusCode.CANCELLED) 

190 yield e.last_response # type: ignore[misc] # need a better signature