Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/instance.py: 93.81%

113 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ExecutionInstance 

18================= 

19An instance of the Remote Execution Service. 

20""" 

21 

22import logging 

23from collections import defaultdict 

24from contextlib import ExitStack 

25from typing import Any, Dict, Iterable, Optional, Set, Tuple 

26 

27from buildgrid_metering.models.dataclasses import Identity, RPCUsage, Usage 

28 

29from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR 

31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

32 Action, 

33 ActionCacheUpdateCapabilities, 

34 CacheCapabilities, 

35 Command, 

36 Digest, 

37 DigestFunction, 

38 Platform, 

39 RequestMetadata, 

40) 

41from buildgrid._protos.google.longrunning.operations_pb2 import Operation 

42from buildgrid.server.metrics_names import SCHEDULER_QUEUE_ACTION_TIME_METRIC_NAME 

43from buildgrid.server.metrics_utils import DurationMetric 

44from buildgrid.server.persistence.sql.impl import SQLDataStore 

45from buildgrid.server.persistence.sql.models import ClientIdentityEntry 

46from buildgrid.server.servicer import Instance 

47from buildgrid.server.utils.context import CancellationContext 

48from buildgrid.utils import get_hash_type 

49 

50# All priorities >= this value will not be throttled / deprioritized 

51EXECUTION_DEPRIORITIZATION_LIMIT = 1 

52 

53LOGGER = logging.getLogger(__name__) 

54 

55 

56class ExecutionInstance(Instance): 

57 SERVICE_NAME = RE_DESCRIPTOR.services_by_name["Execution"].full_name 

58 

59 def __init__( 

60 self, 

61 scheduler: SQLDataStore, 

62 operation_stream_keepalive_timeout: Optional[int] = None, 

63 ) -> None: 

64 self._stack = ExitStack() 

65 self.scheduler = scheduler 

66 

67 self._operation_stream_keepalive_timeout = operation_stream_keepalive_timeout 

68 

69 # --- Public API --- 

70 

71 def start(self) -> None: 

72 self.scheduler.start(start_job_watcher=True) 

73 self._stack.callback(self.scheduler.stop) 

74 

75 def stop(self) -> None: 

76 self._stack.close() 

77 LOGGER.info(f"Stopped Execution instance for '{self._instance_name}'") 

78 

79 def set_instance_name(self, instance_name: str) -> None: 

80 super().set_instance_name(instance_name) 

81 self.scheduler.set_instance_name(instance_name) 

82 

83 def hash_type(self) -> "DigestFunction.Value.ValueType": 

84 return get_hash_type() 

85 

86 @DurationMetric(SCHEDULER_QUEUE_ACTION_TIME_METRIC_NAME, instanced=True) 

87 def execute( 

88 self, 

89 *, 

90 action_digest: Digest, 

91 skip_cache_lookup: bool, 

92 priority: int = 0, 

93 request_metadata: Optional[RequestMetadata] = None, 

94 client_identity: Optional[ClientIdentityEntry] = None, 

95 ) -> str: 

96 """ 

97 Sends a job for execution. Queues an action and creates an Operation to be associated with this action. 

98 """ 

99 

100 action = self.scheduler.storage.get_message(action_digest, Action) 

101 if not action: 

102 raise FailedPreconditionError("Could not get action from storage.") 

103 

104 command = self.scheduler.storage.get_message(action.command_digest, Command) 

105 if not command: 

106 raise FailedPreconditionError("Could not get command from storage.") 

107 

108 if action.HasField("platform"): 

109 platform = action.platform 

110 elif command.HasField("platform"): 

111 platform = command.platform 

112 else: 

113 platform = Platform() 

114 

115 platform_requirements: Dict[str, Set[str]] = defaultdict(set) 

116 for platform_property in platform.properties: 

117 name = platform_property.name 

118 if name not in self.scheduler.property_keys: 

119 raise FailedPreconditionError( 

120 f"Unregistered platform property [{name}]. Known properties are: [{self.scheduler.property_keys}]" 

121 ) 

122 if name in self.scheduler.unique_keys and name in platform_requirements: 

123 raise FailedPreconditionError(f"Platform property [{name}] can only be set once.") 

124 if name in self.scheduler.match_properties: 

125 platform_requirements[name].add(platform_property.value) 

126 

127 should_throttle = self._should_throttle_execution(priority, client_identity) 

128 if should_throttle: 

129 # TODO test_execution_instance is a total mess. It mocks way too much making tests 

130 # brittle. when possible merge it into execution_service tests and use proper logging here. 

131 # Should be able to write `action_digest=[{action_digest.hash}/{action_digest.size_bytes}]`, but cant 

132 # AttributeError: 'str' object has no attribute 'hash' 

133 LOGGER.info( 

134 f"Job priority throttled action_digest=[{action_digest}] " 

135 f"old_priority=[{priority}] new_priority=[{EXECUTION_DEPRIORITIZATION_LIMIT}]" 

136 ) 

137 priority = EXECUTION_DEPRIORITIZATION_LIMIT 

138 

139 operation_name = self.scheduler.queue_job_action( 

140 action=action, 

141 action_digest=action_digest, 

142 command=command, 

143 platform_requirements=platform_requirements, 

144 skip_cache_lookup=skip_cache_lookup, 

145 priority=priority, 

146 request_metadata=request_metadata, 

147 client_identity=client_identity, 

148 ) 

149 self._meter_execution(client_identity, operation_name) 

150 return operation_name 

151 

152 def stream_operation_updates(self, operation_name: str, context: CancellationContext) -> Iterable[Operation]: 

153 job_name = self.scheduler.get_operation_job_name(operation_name) 

154 if not job_name: 

155 raise InvalidArgumentError(f"Operation name does not exist: [{operation_name}]") 

156 # Start the listener as soon as we get the job name and re-query. This avoids potentially missing 

157 # the completed update if it triggers in between sending back the first result and the yield resuming. 

158 with self.scheduler.ops_notifier.subscription(job_name) as update_requested: 

159 yield (operation := self.scheduler.load_operation(operation_name)) 

160 if operation.done: 

161 return 

162 

163 # When the context is deactivated, we can quickly stop waiting. 

164 context.on_cancel(update_requested.set) 

165 while not context.is_cancelled(): 

166 update_requested.wait(timeout=self._operation_stream_keepalive_timeout) 

167 update_requested.clear() 

168 

169 if context.is_cancelled(): 

170 return 

171 

172 yield (operation := self.scheduler.load_operation(operation_name)) 

173 if operation.done: 

174 return 

175 

176 def get_storage_capabilities(self) -> CacheCapabilities: 

177 return self.scheduler.storage.get_capabilities() 

178 

179 def get_action_cache_capabilities(self) -> Tuple[Optional[Any], Optional[ActionCacheUpdateCapabilities]]: 

180 hash_type = None 

181 capabilities = None 

182 if self.scheduler.action_cache is not None: 

183 capabilities = ActionCacheUpdateCapabilities() 

184 hash_type = self.scheduler.action_cache.hash_type() 

185 capabilities.update_enabled = self.scheduler.action_cache.allow_updates 

186 return hash_type, capabilities 

187 

188 def _meter_execution(self, client_identity: Optional[ClientIdentityEntry], operation_name: str) -> None: 

189 """Meter the number of executions of client""" 

190 if self.scheduler.metering_client is None or client_identity is None: 

191 return 

192 try: 

193 identity = Identity( 

194 instance=client_identity.instance, 

195 workflow=client_identity.workflow, 

196 actor=client_identity.actor, 

197 subject=client_identity.subject, 

198 ) 

199 usage = Usage(rpc=RPCUsage(execute=1)) 

200 self.scheduler.metering_client.put_usage(identity, operation_name, usage) 

201 except Exception as exc: 

202 LOGGER.exception(f"Failed to publish execution usage, {identity=} {usage=}", exc_info=exc) 

203 

204 def _should_throttle_execution(self, priority: int, client_identity: Optional[ClientIdentityEntry]) -> bool: 

205 if ( 

206 priority >= EXECUTION_DEPRIORITIZATION_LIMIT 

207 or self.scheduler.metering_client is None 

208 or client_identity is None 

209 ): 

210 return False 

211 try: 

212 identity = Identity( 

213 instance=client_identity.instance, 

214 workflow=client_identity.workflow, 

215 actor=client_identity.actor, 

216 subject=client_identity.subject, 

217 ) 

218 response = self.scheduler.metering_client.get_throttling(identity) 

219 if response.throttled: 

220 LOGGER.info( 

221 "Execution request is throttled, client_id: [%s], usage: [%s]", 

222 client_identity, 

223 response.tracked_usage, 

224 ) 

225 return response.throttled 

226 except Exception as exc: 

227 LOGGER.exception("Failed to get throttling information", exc_info=exc) 

228 return False