Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/instance.py: 95.51%

89 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ExecutionInstance 

18================= 

19An instance of the Remote Execution Service. 

20""" 

21 

22 

23from contextlib import ExitStack 

24from typing import Iterable, Optional 

25 

26from buildgrid_metering.models.dataclasses import Identity, RPCUsage, Usage 

27 

28from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

30 Action, 

31 Command, 

32 Digest, 

33 Platform, 

34 RequestMetadata, 

35) 

36from buildgrid._protos.google.longrunning.operations_pb2 import Operation 

37from buildgrid.server.auth.manager import get_context_client_identity 

38from buildgrid.server.enums import MeteringThrottleAction 

39from buildgrid.server.exceptions import FailedPreconditionError, InvalidArgumentError, ResourceExhaustedError 

40from buildgrid.server.logging import buildgrid_logger 

41from buildgrid.server.scheduler import Scheduler 

42from buildgrid.server.servicer import Instance 

43from buildgrid.server.sql.models import ClientIdentityEntry 

44from buildgrid.server.utils.cancellation import CancellationContext 

45 

46# All priorities >= this value will not be throttled / deprioritized 

47EXECUTION_DEPRIORITIZATION_LIMIT = 1 

48 

49LOGGER = buildgrid_logger(__name__) 

50 

51 

52class ExecutionInstance(Instance): 

53 SERVICE_NAME = RE_DESCRIPTOR.services_by_name["Execution"].full_name 

54 

55 def __init__( 

56 self, 

57 scheduler: Scheduler, 

58 operation_stream_keepalive_timeout: Optional[int] = None, 

59 ) -> None: 

60 self._stack = ExitStack() 

61 self.scheduler = scheduler 

62 

63 self._operation_stream_keepalive_timeout = operation_stream_keepalive_timeout 

64 

65 def start(self) -> None: 

66 self.scheduler.start(start_job_watcher=True) 

67 self._stack.callback(self.scheduler.stop) 

68 

69 def stop(self) -> None: 

70 self._stack.close() 

71 LOGGER.info("Stopped Execution.") 

72 

73 def execute( 

74 self, 

75 *, 

76 action_digest: Digest, 

77 skip_cache_lookup: bool, 

78 priority: int = 0, 

79 request_metadata: Optional[RequestMetadata] = None, 

80 client_identity: Optional[ClientIdentityEntry] = None, 

81 ) -> str: 

82 """ 

83 Sends a job for execution. Queues an action and creates an Operation to be associated with this action. 

84 """ 

85 

86 action = self.scheduler.storage.get_message(action_digest, Action) 

87 if not action: 

88 raise FailedPreconditionError("Could not get action from storage.") 

89 

90 command = self.scheduler.storage.get_message(action.command_digest, Command) 

91 if not command: 

92 raise FailedPreconditionError("Could not get command from storage.") 

93 

94 if action.HasField("platform"): 

95 platform = action.platform 

96 elif command.HasField("platform"): 

97 platform = command.platform 

98 else: 

99 platform = Platform() 

100 

101 property_label, platform_requirements = self.scheduler.property_set.execution_properties(platform) 

102 

103 should_throttle = self._should_throttle_execution(priority, client_identity) 

104 if should_throttle: 

105 if self.scheduler.metering_throttle_action == MeteringThrottleAction.REJECT: 

106 raise ResourceExhaustedError("User quota exceeded") 

107 

108 # TODO test_execution_instance is a total mess. It mocks way too much making tests 

109 # brittle. when possible merge it into execution_service tests and use proper logging here. 

110 # Should be able to write `action_digest=[{action_digest.hash}/{action_digest.size_bytes}]`, but cant 

111 # AttributeError: 'str' object has no attribute 'hash' 

112 LOGGER.info( 

113 "Job priority throttled.", 

114 tags=dict(digest=action_digest, old_priority=priority, new_priority=EXECUTION_DEPRIORITIZATION_LIMIT), 

115 ) 

116 priority = EXECUTION_DEPRIORITIZATION_LIMIT 

117 

118 operation_name = self.scheduler.queue_job_action( 

119 action=action, 

120 action_digest=action_digest, 

121 command=command, 

122 platform_requirements=platform_requirements, 

123 property_label=property_label, 

124 skip_cache_lookup=skip_cache_lookup, 

125 priority=priority, 

126 request_metadata=request_metadata, 

127 client_identity=client_identity, 

128 ) 

129 self._meter_execution(client_identity, operation_name) 

130 return operation_name 

131 

132 def stream_operation_updates(self, operation_name: str, context: CancellationContext) -> Iterable[Operation]: 

133 job_name = self.scheduler.get_operation_job_name(operation_name) 

134 if not job_name: 

135 raise InvalidArgumentError(f"Operation name does not exist: [{operation_name}]") 

136 # Start the listener as soon as we get the job name and re-query. This avoids potentially missing 

137 # the completed update if it triggers in between sending back the first result and the yield resuming. 

138 with self.scheduler.ops_notifier.subscription(job_name) as update_requested: 

139 yield (operation := self.scheduler.load_operation(operation_name)) 

140 if operation.done: 

141 return 

142 

143 # When the context is deactivated, we can quickly stop waiting. 

144 context.on_cancel(update_requested.set) 

145 while not context.is_cancelled(): 

146 update_requested.wait(timeout=self._operation_stream_keepalive_timeout) 

147 update_requested.clear() 

148 

149 if context.is_cancelled(): 

150 return 

151 

152 yield (operation := self.scheduler.load_operation(operation_name)) 

153 if operation.done: 

154 return 

155 

156 def _meter_execution(self, client_identity: Optional[ClientIdentityEntry], operation_name: str) -> None: 

157 """Meter the number of executions of client""" 

158 if self.scheduler.metering_client is None or client_identity is None: 

159 return 

160 try: 

161 identity = Identity( 

162 instance=client_identity.instance, 

163 workflow=client_identity.workflow, 

164 actor=client_identity.actor, 

165 subject=client_identity.subject, 

166 ) 

167 usage = Usage(rpc=RPCUsage(execute=1)) 

168 self.scheduler.metering_client.put_usage(identity, operation_name, usage) 

169 except Exception as exc: 

170 LOGGER.exception( 

171 f"Failed to publish execution usage for identity: {get_context_client_identity()}", exc_info=exc 

172 ) 

173 

174 def _should_throttle_execution(self, priority: int, client_identity: Optional[ClientIdentityEntry]) -> bool: 

175 if ( 

176 priority >= EXECUTION_DEPRIORITIZATION_LIMIT 

177 or self.scheduler.metering_client is None 

178 or client_identity is None 

179 ): 

180 return False 

181 try: 

182 identity = Identity( 

183 instance=client_identity.instance, 

184 workflow=client_identity.workflow, 

185 actor=client_identity.actor, 

186 subject=client_identity.subject, 

187 ) 

188 response = self.scheduler.metering_client.get_throttling(identity) 

189 if response.throttled: 

190 LOGGER.info( 

191 "Execution request is throttled.", 

192 tags=dict(client_id=client_identity, usage=response.tracked_usage), 

193 ) 

194 return response.throttled 

195 except Exception as exc: 

196 LOGGER.exception("Failed to get throttling information.", exc_info=exc) 

197 return False