Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/sql/impl.py: 93.39%

787 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17import threading 

18import uuid 

19from contextlib import ExitStack 

20from datetime import datetime, timedelta 

21from time import time 

22from typing import Any, Dict, Iterable, List, NamedTuple, Optional, Set, Tuple, TypedDict, TypeVar, Union 

23 

24from buildgrid_metering.client import SyncMeteringServiceClient 

25from buildgrid_metering.models.dataclasses import ComputingUsage, Identity, Usage 

26from google.protobuf.any_pb2 import Any as ProtoAny 

27from google.protobuf.internal.containers import RepeatedCompositeFieldContainer 

28from grpc import Channel 

29from sqlalchemy import and_, delete, func, literal_column, or_, select, union, update 

30from sqlalchemy.exc import IntegrityError 

31from sqlalchemy.orm import Session, joinedload 

32from sqlalchemy.sql import ClauseElement 

33from sqlalchemy.sql.expression import Select 

34 

35from buildgrid._enums import BotStatus, LeaseState, MetricCategories, OperationStage 

36from buildgrid._exceptions import ( 

37 BotSessionClosedError, 

38 BotSessionMismatchError, 

39 CancelledError, 

40 DatabaseError, 

41 InvalidArgumentError, 

42 NotFoundError, 

43 UpdateNotAllowedError, 

44) 

45from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

46from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

47 Action, 

48 ActionResult, 

49 Command, 

50 Digest, 

51 ExecutedActionMetadata, 

52 ExecuteOperationMetadata, 

53 ExecuteResponse, 

54 RequestMetadata, 

55) 

56from buildgrid._protos.build.buildbox.execution_stats_pb2 import ExecutionStatistics 

57from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import Lease 

58from buildgrid._protos.google.longrunning import operations_pb2 

59from buildgrid._protos.google.longrunning.operations_pb2 import Operation 

60from buildgrid._protos.google.rpc import code_pb2, status_pb2 

61from buildgrid._protos.google.rpc.status_pb2 import Status 

62from buildgrid.client.asset import AssetClient 

63from buildgrid.client.logstream import logstream_client 

64from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

65from buildgrid.server.cas.storage.storage_abc import StorageABC 

66from buildgrid.server.metrics_names import ( 

67 BOTS_ASSIGN_JOB_LEASES_TIME_METRIC_NAME, 

68 DATA_STORE_CREATE_JOB_TIME_METRIC_NAME, 

69 DATA_STORE_CREATE_OPERATION_TIME_METRIC_NAME, 

70 DATA_STORE_LIST_OPERATIONS_TIME_METRIC_NAME, 

71 DATA_STORE_PRUNER_DELETE_TIME_METRIC_NAME, 

72 DATA_STORE_PRUNER_NUM_ROWS_DELETED_METRIC_NAME, 

73 DATA_STORE_QUEUE_TIMEOUT_NUM_METRIC_NAME, 

74 DATA_STORE_QUEUE_TIMEOUT_TIME_METRIC_NAME, 

75 EXECUTION_TIME_METRIC_NAME, 

76 INPUTS_FETCHING_TIME_METRIC_NAME, 

77 OUTPUTS_UPLOADING_TIME_METRIC_NAME, 

78 QUEUED_TIME_METRIC_NAME, 

79 SCHEDULER_CANCEL_OPERATION_TIME_METRIC_NAME, 

80 SCHEDULER_UPDATE_LEASE_TIME_METRIC_NAME, 

81 TOTAL_HANDLING_TIME_METRIC_NAME, 

82 WORKER_HANDLING_TIME_METRIC_NAME, 

83) 

84from buildgrid.server.metrics_utils import Counter, DurationMetric, publish_timer_metric 

85from buildgrid.server.operations.filtering import DEFAULT_SORT_KEYS, OperationFilter, SortKey 

86from buildgrid.server.persistence.sql.models import ( 

87 BotEntry, 

88 ClientIdentityEntry, 

89 JobEntry, 

90 LeaseEntry, 

91 OperationEntry, 

92 PlatformEntry, 

93 digest_to_string, 

94 job_platform_association, 

95 string_to_digest, 

96) 

97from buildgrid.server.persistence.sql.notifier import OperationsNotifier 

98from buildgrid.server.persistence.sql.utils import ( 

99 build_custom_filters, 

100 build_page_filter, 

101 build_page_token, 

102 build_sort_column_list, 

103 extract_sort_keys, 

104) 

105from buildgrid.server.sql.provider import SqlProvider 

106from buildgrid.server.threading import ContextWorker 

107from buildgrid.settings import ( 

108 DEFAULT_MAX_EXECUTION_TIMEOUT, 

109 DEFAULT_PLATFORM_PROPERTY_KEYS, 

110 SQL_SCHEDULER_METRICS_PUBLISH_INTERVAL_SECONDS, 

111) 

112from buildgrid.utils import BrowserURL, convert_values_to_sorted_lists, create_digest, hash_from_dict 

113 

114LOGGER = logging.getLogger(__name__) 

115 

116 

117PROTOBUF_MEDIA_TYPE = "application/x-protobuf" 

118DIGEST_URI_TEMPLATE = "nih:sha-256;{digest_hash}" 

119 

120 

121class DataStoreMetrics(TypedDict, total=False): 

122 leases: Dict[LeaseState, int] 

123 jobs: Dict[OperationStage, int] 

124 

125 

126class AgedJobHandlerOptions(NamedTuple): 

127 job_max_age: timedelta = timedelta(days=30) 

128 handling_period: timedelta = timedelta(minutes=5) 

129 max_handling_window: int = 10000 

130 

131 @staticmethod 

132 def from_config( 

133 job_max_age_cfg: Dict[str, float], 

134 handling_period_cfg: Optional[Dict[str, float]] = None, 

135 max_handling_window_cfg: Optional[int] = None, 

136 ) -> "AgedJobHandlerOptions": 

137 """Helper method for creating ``AgedJobHandlerOptions`` objects 

138 If input configs are None, assign defaults""" 

139 

140 def _dict_to_timedelta(config: Dict[str, float]) -> timedelta: 

141 return timedelta( 

142 weeks=config.get("weeks", 0), 

143 days=config.get("days", 0), 

144 hours=config.get("hours", 0), 

145 minutes=config.get("minutes", 0), 

146 seconds=config.get("seconds", 0), 

147 ) 

148 

149 return AgedJobHandlerOptions( 

150 job_max_age=_dict_to_timedelta(job_max_age_cfg) if job_max_age_cfg else timedelta(days=30), 

151 handling_period=_dict_to_timedelta(handling_period_cfg) if handling_period_cfg else timedelta(minutes=5), 

152 max_handling_window=max_handling_window_cfg if max_handling_window_cfg else 10000, 

153 ) 

154 

155 

156T = TypeVar("T", bound="SQLDataStore") 

157 

158 

159class SQLDataStore: 

160 RETRYABLE_STATUS_CODES = (code_pb2.INTERNAL, code_pb2.UNAVAILABLE) 

161 

162 def __init__( 

163 self, 

164 sql_provider: SqlProvider, 

165 storage: StorageABC, 

166 *, 

167 sql_ro_provider: Optional[SqlProvider] = None, 

168 sql_notifier_provider: Optional[SqlProvider] = None, 

169 # Pulled out of the scheduler/execution/bots instances 

170 property_keys: Optional[Set[str]] = None, 

171 match_properties: Optional[Set[str]] = None, 

172 action_cache: Optional[ActionCacheABC] = None, 

173 action_browser_url: Optional[str] = None, 

174 max_execution_timeout: int = DEFAULT_MAX_EXECUTION_TIMEOUT, 

175 metering_client: Optional[SyncMeteringServiceClient] = None, 

176 bot_session_keepalive_timeout: int = 600, 

177 logstream_channel: Optional[Channel] = None, 

178 logstream_instance: Optional[str] = None, 

179 # Input/Output retention 

180 asset_client: Optional[AssetClient] = None, 

181 queued_action_retention_hours: Optional[float] = None, 

182 completed_action_retention_hours: Optional[float] = None, 

183 action_result_retention_hours: Optional[float] = None, 

184 # Job Settings 

185 enable_job_watcher: bool = False, 

186 poll_interval: float = 1, 

187 pruning_options: Optional[AgedJobHandlerOptions] = None, 

188 queue_timeout_options: Optional[AgedJobHandlerOptions] = None, 

189 max_job_attempts: int = 5, 

190 ) -> None: 

191 self._instance_name: str = None # type: ignore # This should be set during initialization 

192 

193 self._stack = ExitStack() 

194 

195 self.storage = storage 

196 

197 self.poll_interval = poll_interval 

198 self.pruning_options = pruning_options 

199 self.queue_timeout_options = queue_timeout_options 

200 self.max_job_attempts = max_job_attempts 

201 

202 self._sql = sql_provider 

203 self._sql_ro = sql_ro_provider or sql_provider 

204 self._sql_notifier = sql_notifier_provider or sql_provider 

205 

206 self.property_keys = property_keys or DEFAULT_PLATFORM_PROPERTY_KEYS 

207 self.match_properties = match_properties or DEFAULT_PLATFORM_PROPERTY_KEYS 

208 self.unique_keys = {"OSFamily"} # Those keys only allow one value to be set 

209 

210 self.action_cache = action_cache 

211 self.action_browser_url = action_browser_url 

212 self.max_execution_timeout = max_execution_timeout 

213 self.enable_job_watcher = enable_job_watcher 

214 self.metering_client = metering_client 

215 self.bot_session_keepalive_timeout = bot_session_keepalive_timeout 

216 self.logstream_channel = logstream_channel 

217 self.logstream_instance = logstream_instance 

218 self.asset_client = asset_client 

219 self.queued_action_retention_hours = queued_action_retention_hours 

220 self.completed_action_retention_hours = completed_action_retention_hours 

221 self.action_result_retention_hours = action_result_retention_hours 

222 

223 # Overall Scheduler Metrics (totals of jobs/leases in each state) 

224 # Publish those metrics a bit more sparsely since the SQL requests 

225 # required to gather them can become expensive 

226 self._last_scheduler_metrics_publish_time: Optional[datetime] = None 

227 self._scheduler_metrics_publish_interval = timedelta(seconds=SQL_SCHEDULER_METRICS_PUBLISH_INTERVAL_SECONDS) 

228 

229 self.ops_notifier = OperationsNotifier(self._sql_notifier, self.poll_interval) 

230 self.prune_timer = ContextWorker(name="JobPruner", target=self._do_prune) 

231 self.queue_timer = ContextWorker(name="QueueTimeout", target=self._do_queue_timeout) 

232 self.execution_timer = ContextWorker(name="ExecutionTimeout", target=self._do_execution_timeout) 

233 

234 def __repr__(self) -> str: 

235 return f"SQL data store interface for `{repr(self._sql._engine.url)}`" 

236 

237 def __enter__(self: T) -> T: 

238 self.start() 

239 return self 

240 

241 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

242 self.stop() 

243 

244 def start( 

245 self, 

246 *, 

247 start_job_watcher: bool = True, 

248 ) -> None: 

249 self._stack.enter_context(self.storage) 

250 if self.action_cache: 

251 self._stack.enter_context(self.action_cache) 

252 

253 if self.logstream_channel: 

254 self._stack.enter_context(self.logstream_channel) 

255 if self.asset_client: 

256 self._stack.enter_context(self.asset_client) 

257 # Pruning configuration parameters 

258 if self.pruning_options is not None: 

259 LOGGER.info(f"Scheduler pruning enabled: {self.pruning_options}") 

260 self._stack.enter_context(self.prune_timer) 

261 else: 

262 LOGGER.info("Scheduler pruning not enabled") 

263 

264 # Queue timeout thread 

265 if self.queue_timeout_options is not None: 

266 LOGGER.info(f"Job queue timeout enabled: {self.queue_timeout_options}") 

267 self._stack.enter_context(self.queue_timer) 

268 else: 

269 LOGGER.info("Job queue timeout not enabled") 

270 

271 if start_job_watcher: 

272 self._stack.enter_context(self.execution_timer) 

273 self._stack.enter_context(self.ops_notifier) 

274 

275 def stop(self) -> None: 

276 self._stack.close() 

277 LOGGER.info("Stopped SQLDataStore") 

278 

279 def set_instance_name(self, instance_name: str) -> None: 

280 self._instance_name = instance_name 

281 

282 def _job_in_instance(self) -> ClauseElement: 

283 return or_(JobEntry.instance_name == self._instance_name, JobEntry.instance_name.is_(None)) 

284 

285 def _bot_in_instance(self) -> ClauseElement: 

286 return or_(BotEntry.instance_name == self._instance_name, BotEntry.instance_name.is_(None)) 

287 

288 def queue_job_action( 

289 self, 

290 *, 

291 action: Action, 

292 action_digest: Digest, 

293 command: Command, 

294 platform_requirements: Dict[str, Set[str]], 

295 priority: int, 

296 skip_cache_lookup: bool, 

297 request_metadata: Optional[RequestMetadata] = None, 

298 client_identity: Optional[ClientIdentityEntry] = None, 

299 ) -> str: 

300 """ 

301 De-duplicates or inserts a newly created job into the execution queue. 

302 Returns an operation name associated with this job. 

303 """ 

304 

305 if not action.do_not_cache: 

306 if operation_name := self.create_operation_for_existing_job( 

307 action_digest=action_digest, 

308 priority=priority, 

309 request_metadata=request_metadata, 

310 client_identity=client_identity, 

311 ): 

312 return operation_name 

313 

314 # If there was another job already in the action cache, we can check now. 

315 # We can use this entry to create a job and create it already completed! 

316 execute_response: Optional[ExecuteResponse] = None 

317 if self.action_cache and not action.do_not_cache and not skip_cache_lookup: 

318 try: 

319 action_result = self.action_cache.get_action_result(action_digest) 

320 LOGGER.info(f"Job cache hit for action [{action_digest.hash}/{action_digest.size_bytes}]") 

321 execute_response = ExecuteResponse() 

322 execute_response.result.CopyFrom(action_result) 

323 execute_response.cached_result = True 

324 except NotFoundError: 

325 pass 

326 except Exception: 

327 LOGGER.exception( 

328 f"Checking ActionCache for action [{action_digest.hash}/{action_digest.size_bytes}] failed." 

329 ) 

330 

331 # Extend retention for action 

332 self._update_action_retention(action, action_digest, self.queued_action_retention_hours) 

333 

334 return self.create_operation_for_new_job( 

335 action=action, 

336 action_digest=action_digest, 

337 command=command, 

338 execute_response=execute_response, 

339 platform_requirements=platform_requirements, 

340 priority=priority, 

341 request_metadata=request_metadata, 

342 client_identity=client_identity, 

343 ) 

344 

345 def create_operation_for_existing_job( 

346 self, 

347 *, 

348 action_digest: Digest, 

349 priority: int, 

350 request_metadata: Optional[RequestMetadata], 

351 client_identity: Optional[ClientIdentityEntry], 

352 ) -> Optional[str]: 

353 # Find a job with a matching action that isn't completed or cancelled and that can be cached. 

354 find_existing_stmt = ( 

355 select(JobEntry) 

356 .where( 

357 JobEntry.action_digest == digest_to_string(action_digest), 

358 JobEntry.stage != OperationStage.COMPLETED.value, 

359 JobEntry.cancelled != True, # noqa: E712 

360 JobEntry.do_not_cache != True, # noqa: E712 

361 self._job_in_instance(), 

362 ) 

363 .with_for_update() 

364 ) 

365 

366 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session: 

367 if not (job := session.execute(find_existing_stmt).scalars().first()): 

368 return None 

369 

370 # Reschedule if priority is now greater, and we're still waiting on it to start. 

371 if priority < job.priority and job.stage == OperationStage.QUEUED.value: 

372 LOGGER.info(f"Job {job.name} assigned a new priority {priority}") 

373 job.priority = priority 

374 job.assigned = False 

375 

376 return self._create_operation( 

377 session, 

378 job_name=job.name, 

379 request_metadata=request_metadata, 

380 client_identity=client_identity, 

381 ) 

382 

383 @DurationMetric(DATA_STORE_CREATE_JOB_TIME_METRIC_NAME, instanced=True) 

384 def create_operation_for_new_job( 

385 self, 

386 *, 

387 action: Action, 

388 action_digest: Digest, 

389 command: Command, 

390 execute_response: Optional[ExecuteResponse], 

391 platform_requirements: Dict[str, Set[str]], 

392 priority: int, 

393 request_metadata: Optional[RequestMetadata] = None, 

394 client_identity: Optional[ClientIdentityEntry] = None, 

395 ) -> str: 

396 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session: 

397 job = JobEntry( 

398 instance_name=self._instance_name, 

399 name=str(uuid.uuid4()), 

400 action=action.SerializeToString(), 

401 action_digest=digest_to_string(action_digest), 

402 do_not_cache=action.do_not_cache, 

403 priority=priority, 

404 stage=OperationStage.QUEUED.value, 

405 queued_timestamp=datetime.utcnow(), 

406 command=" ".join(command.arguments), 

407 platform_requirements=hash_from_dict(convert_values_to_sorted_lists(platform_requirements)), 

408 platform=self._populate_platform_requirements(session, platform_requirements), 

409 n_tries=1, 

410 ) 

411 if execute_response: 

412 job.stage = OperationStage.COMPLETED.value 

413 job.result = digest_to_string(self.storage.put_message(execute_response)) 

414 job.status_code = execute_response.status.code 

415 job.worker_completed_timestamp = datetime.utcnow() 

416 

417 session.add(job) 

418 

419 return self._create_operation( 

420 session, 

421 job_name=job.name, 

422 request_metadata=request_metadata, 

423 client_identity=client_identity, 

424 ) 

425 

426 def _populate_platform_requirements( 

427 self, session: Session, platform_requirements: Dict[str, Set[str]] 

428 ) -> List[PlatformEntry]: 

429 required_entries = {(k, v) for k, values in platform_requirements.items() for v in values} 

430 conditions = [and_(PlatformEntry.key == k, PlatformEntry.value == v) for k, v in required_entries] 

431 statement = select(PlatformEntry.key, PlatformEntry.value).where(or_(*conditions)) 

432 

433 while missing := required_entries - {(k, v) for [k, v] in session.execute(statement).all()}: 

434 try: 

435 session.bulk_insert_mappings(PlatformEntry, [{"key": k, "value": v} for k, v in missing]) 

436 session.commit() 

437 except IntegrityError: 

438 session.rollback() 

439 

440 return list(session.execute(select(PlatformEntry).where(or_(*conditions))).scalars()) 

441 

442 def create_operation( 

443 self, 

444 job_name: str, 

445 *, 

446 request_metadata: Optional[RequestMetadata] = None, 

447 client_identity: Optional[ClientIdentityEntry] = None, 

448 ) -> str: 

449 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session: 

450 if not (job := self._get_job(job_name, session, with_for_update=True)): 

451 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

452 

453 if job.cancelled: 

454 raise CancelledError(f"Job {job_name} is cancelled") 

455 

456 return self._create_operation( 

457 session, job_name=job_name, request_metadata=request_metadata, client_identity=client_identity 

458 ) 

459 

460 @DurationMetric(DATA_STORE_CREATE_OPERATION_TIME_METRIC_NAME, instanced=True) 

461 def _create_operation( 

462 self, 

463 session: Session, 

464 *, 

465 job_name: str, 

466 request_metadata: Optional[RequestMetadata], 

467 client_identity: Optional[ClientIdentityEntry], 

468 ) -> str: 

469 request_metadata = request_metadata or RequestMetadata() 

470 

471 client_identity_id: Optional[int] = None 

472 if client_identity: 

473 client_identity_id = self.get_or_create_client_identity_in_store(session, client_identity).id 

474 

475 operation = OperationEntry( 

476 name=str(uuid.uuid4()), 

477 job_name=job_name, 

478 client_identity_id=client_identity_id, 

479 invocation_id=request_metadata.tool_invocation_id or None, 

480 correlated_invocations_id=request_metadata.correlated_invocations_id or None, 

481 tool_name=request_metadata.tool_details.tool_name or None, 

482 tool_version=request_metadata.tool_details.tool_version or None, 

483 ) 

484 session.add(operation) 

485 return operation.name 

486 

487 def load_operation(self, operation_name: str) -> Operation: 

488 statement = ( 

489 select(OperationEntry).join(JobEntry).where(OperationEntry.name == operation_name, self._job_in_instance()) 

490 ) 

491 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

492 if op := session.execute(statement).scalars().first(): 

493 return self._load_operation(op) 

494 

495 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

496 

497 def _load_operation(self, op: OperationEntry) -> Operation: 

498 job: JobEntry = op.job 

499 

500 operation = operations_pb2.Operation( 

501 name=op.name, 

502 done=job.stage == OperationStage.COMPLETED.value or op.cancelled or job.cancelled, 

503 ) 

504 metadata = ExecuteOperationMetadata( 

505 stage=OperationStage.COMPLETED.value if operation.done else job.stage, # type: ignore[arg-type] 

506 action_digest=string_to_digest(job.action_digest), 

507 stderr_stream_name=job.stderr_stream_name or "", 

508 stdout_stream_name=job.stdout_stream_name or "", 

509 ) 

510 operation.metadata.Pack(metadata) 

511 

512 if job.cancelled or op.cancelled: 

513 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.CANCELLED)) 

514 elif job.status_code is not None and job.status_code != code_pb2.OK: 

515 operation.error.CopyFrom(status_pb2.Status(code=job.status_code)) 

516 

517 execute_response: Optional[ExecuteResponse] = None 

518 if job.result: 

519 result_digest = string_to_digest(job.result) 

520 execute_response = self.storage.get_message(result_digest, ExecuteResponse) 

521 if not execute_response: 

522 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.DATA_LOSS)) 

523 elif job.cancelled: 

524 execute_response = ExecuteResponse( 

525 status=status_pb2.Status(code=code_pb2.CANCELLED, message="Execution cancelled") 

526 ) 

527 if execute_response: 

528 if self.action_browser_url: 

529 url = BrowserURL(self.action_browser_url, self._instance_name) 

530 if url.for_message("action", string_to_digest(job.action_digest)): 

531 execute_response.message = str(url.generate()) 

532 operation.response.Pack(execute_response) 

533 

534 return operation 

535 

536 def _get_job(self, job_name: str, session: Session, with_for_update: bool = False) -> Optional[JobEntry]: 

537 statement = select(JobEntry).where(JobEntry.name == job_name, self._job_in_instance()) 

538 if with_for_update: 

539 statement = statement.with_for_update() 

540 

541 job: Optional[JobEntry] = session.execute(statement).scalars().first() 

542 if job: 

543 LOGGER.debug( 

544 f"Loaded job from db: name=[{job_name}], stage=[{job.stage}], " 

545 f"result=[{job.result}], instance=[{job.instance_name}]" 

546 ) 

547 

548 return job 

549 

550 def get_operation_job_name(self, operation_name: str) -> Optional[str]: 

551 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

552 if operation := self._get_operation(operation_name, session): 

553 return operation.job_name 

554 return None 

555 

556 def get_operation_request_metadata_by_name(self, operation_name: str) -> Optional[Dict[str, Any]]: 

557 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

558 operation = self._get_operation(operation_name, session) 

559 if not operation: 

560 return None 

561 

562 return { 

563 "tool-name": operation.tool_name or "", 

564 "tool-version": operation.tool_version or "", 

565 "invocation-id": operation.invocation_id or "", 

566 "correlated-invocations-id": operation.correlated_invocations_id or "", 

567 } 

568 

569 def _notify_job_updated(self, job_names: Union[str, List[str]], session: Session) -> None: 

570 if self._sql.dialect == "postgresql": 

571 if isinstance(job_names, str): 

572 job_names = [job_names] 

573 for job_name in job_names: 

574 # Mypy bug? "execute" of "_SessionTypingCommon" has incompatible type "str"; expected "Executable 

575 session.execute(f"NOTIFY job_updated, '{job_name}';") # type: ignore[arg-type] 

576 

577 def _get_operation(self, operation_name: str, session: Session) -> Optional[OperationEntry]: 

578 statement = ( 

579 select(OperationEntry).join(JobEntry).where(OperationEntry.name == operation_name, self._job_in_instance()) 

580 ) 

581 return session.execute(statement).scalars().first() 

582 

583 def _batch_timeout_jobs(self, job_select_stmt: Select, status_code: int, message: str) -> int: 

584 """Timeout all jobs selected by a query""" 

585 with self._sql.session(sqlite_lock_immediately=True, exceptions_to_not_raise_on=[Exception]) as session: 

586 # Get the full list of jobs to timeout 

587 jobs = [job.name for job in session.execute(job_select_stmt).scalars().all()] 

588 

589 if jobs: 

590 # Put response binary 

591 response = remote_execution_pb2.ExecuteResponse( 

592 status=status_pb2.Status(code=status_code, message=message) 

593 ) 

594 response_binary = response.SerializeToString() 

595 response_digest = create_digest(response_binary) 

596 self.storage.bulk_update_blobs([(response_digest, response_binary)]) 

597 

598 # Update response 

599 stmt_timeout_jobs = ( 

600 update(JobEntry) 

601 .where(JobEntry.name.in_(jobs), self._job_in_instance()) 

602 .values( 

603 stage=OperationStage.COMPLETED.value, 

604 status_code=status_code, 

605 result=digest_to_string(response_digest), 

606 ) 

607 ) 

608 session.execute(stmt_timeout_jobs) 

609 

610 # Notify all jobs updated 

611 self._notify_job_updated(jobs, session) 

612 return len(jobs) 

613 

614 def _do_execution_timeout(self, shutdown_requested: threading.Event) -> None: 

615 """Periodically timeout aged executing jobs""" 

616 while not shutdown_requested.is_set(): 

617 try: 

618 self.cancel_jobs_exceeding_execution_timeout(self.max_execution_timeout) 

619 except Exception as e: 

620 LOGGER.exception("Failed to timeout aged executing jobs", exc_info=e) 

621 shutdown_requested.wait(timeout=self.poll_interval) 

622 

623 def cancel_jobs_exceeding_execution_timeout(self, max_execution_timeout: Optional[int] = None) -> None: 

624 if not max_execution_timeout: 

625 return 

626 

627 # Get the full list of jobs exceeding execution timeout 

628 stale_jobs_statement = ( 

629 select(JobEntry) 

630 .where( 

631 JobEntry.stage == OperationStage.EXECUTING.value, 

632 JobEntry.worker_start_timestamp <= datetime.utcnow() - timedelta(seconds=max_execution_timeout), 

633 self._job_in_instance(), 

634 ) 

635 .with_for_update(skip_locked=True) 

636 ) 

637 with self._sql.session(sqlite_lock_immediately=True, exceptions_to_not_raise_on=[Exception]) as session: 

638 jobs = session.execute(stale_jobs_statement).scalars().all() 

639 if not jobs: 

640 return 

641 

642 response = remote_execution_pb2.ExecuteResponse( 

643 status=status_pb2.Status( 

644 code=code_pb2.DEADLINE_EXCEEDED, 

645 message="Execution didn't finish within timeout threshold", 

646 ) 

647 ) 

648 response_binary = response.SerializeToString() 

649 response_digest = create_digest(response_binary) 

650 self.storage.bulk_update_blobs([(response_digest, response_binary)]) 

651 

652 for job in jobs: 

653 executing_duration = datetime.utcnow() - (job.worker_start_timestamp or datetime.utcnow()) 

654 LOGGER.warning( 

655 f"Job=[{job.name}] has been executing for " 

656 f"executing_duration=[{executing_duration}]. " 

657 f"max_execution_timeout=[{max_execution_timeout}] " 

658 "Cancelling." 

659 ) 

660 for op in job.operations: 

661 op.cancelled = True 

662 for lease in job.active_leases: 

663 lease.state = LeaseState.CANCELLED.value 

664 job.worker_completed_timestamp = datetime.utcnow() 

665 job.stage = OperationStage.COMPLETED.value 

666 job.cancelled = True 

667 job.result = digest_to_string(response_digest) 

668 

669 for job in jobs: 

670 self._notify_job_updated(job.name, session) 

671 

672 @DurationMetric(SCHEDULER_CANCEL_OPERATION_TIME_METRIC_NAME, instanced=True) 

673 def cancel_operation(self, operation_name: str) -> None: 

674 statement = ( 

675 select(JobEntry) 

676 .join(OperationEntry) 

677 .where(OperationEntry.name == operation_name, self._job_in_instance()) 

678 .with_for_update() 

679 ) 

680 with self._sql.session() as session: 

681 if not (job := session.execute(statement).scalars().first()): 

682 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

683 

684 if job.stage == OperationStage.COMPLETED.value or job.cancelled: 

685 return 

686 

687 for op in job.operations: 

688 if op.name == operation_name: 

689 if op.cancelled: 

690 return 

691 op.cancelled = True 

692 

693 if all(op.cancelled for op in job.operations): 

694 for lease in job.active_leases: 

695 lease.state = LeaseState.CANCELLED.value 

696 job.worker_completed_timestamp = datetime.utcnow() 

697 job.stage = OperationStage.COMPLETED.value 

698 job.cancelled = True 

699 

700 self._notify_job_updated(job.name, session) 

701 

702 @DurationMetric(DATA_STORE_LIST_OPERATIONS_TIME_METRIC_NAME, instanced=True) 

703 def list_operations( 

704 self, 

705 operation_filters: Optional[List[OperationFilter]] = None, 

706 page_size: Optional[int] = None, 

707 page_token: Optional[str] = None, 

708 ) -> Tuple[List[operations_pb2.Operation], str]: 

709 # Build filters and sort order 

710 sort_keys = DEFAULT_SORT_KEYS 

711 custom_filters = None 

712 platform_filters = [] 

713 if operation_filters: 

714 # Extract custom sort order (if present) 

715 specified_sort_keys, non_sort_filters = extract_sort_keys(operation_filters) 

716 

717 # Only override sort_keys if there were sort keys actually present in the filter string 

718 if specified_sort_keys: 

719 sort_keys = specified_sort_keys 

720 # Attach the operation name as a sort key for a deterministic order 

721 # This will ensure that the ordering of results is consistent between queries 

722 if not any(sort_key.name == "name" for sort_key in sort_keys): 

723 sort_keys.append(SortKey(name="name", descending=False)) 

724 

725 # Finally, compile the non-sort filters into a filter list 

726 custom_filters = build_custom_filters(non_sort_filters) 

727 platform_filters = [f for f in non_sort_filters if f.parameter == "platform"] 

728 

729 sort_columns = build_sort_column_list(sort_keys) 

730 

731 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

732 statement = select(OperationEntry).join(JobEntry, OperationEntry.job_name == JobEntry.name) 

733 statement = statement.filter(self._job_in_instance()) 

734 

735 # If we're filtering by platform, filter using a subquery containing job names 

736 # which match the specified platform properties. 

737 # 

738 # NOTE: A platform filter using `!=` will return only jobs which set that platform 

739 # property to an explicitly different value; jobs which don't set the property are 

740 # filtered out. 

741 if platform_filters: 

742 platform_clauses = [] 

743 for platform_filter in platform_filters: 

744 key, value = platform_filter.value.split(":", 1) 

745 platform_clauses.append( 

746 and_(PlatformEntry.key == key, platform_filter.operator(PlatformEntry.value, value)) 

747 ) 

748 

749 job_name_subquery = ( 

750 select(job_platform_association.c.job_name) 

751 .filter( 

752 job_platform_association.c.platform_id.in_( 

753 select(PlatformEntry.id).filter(or_(*platform_clauses)) 

754 ) 

755 ) 

756 .group_by(job_platform_association.c.job_name) 

757 .having(func.count() == len(platform_filters)) 

758 ) 

759 statement = statement.filter(JobEntry.name.in_(job_name_subquery)) 

760 

761 # Apply custom filters (if present) 

762 if custom_filters: 

763 statement = statement.filter(*custom_filters) 

764 

765 # Apply sort order 

766 statement = statement.order_by(*sort_columns) 

767 

768 # Apply pagination filter 

769 if page_token: 

770 page_filter = build_page_filter(page_token, sort_keys) 

771 statement = statement.filter(page_filter) 

772 if page_size: 

773 # We limit the number of operations we fetch to the page_size. However, we 

774 # fetch an extra operation to determine whether we need to provide a 

775 # next_page_token. 

776 statement = statement.limit(page_size + 1) 

777 

778 operations = session.execute(statement).scalars().all() 

779 

780 if not page_size or not operations: 

781 next_page_token = "" 

782 

783 # If the number of results we got is less than or equal to our page_size, 

784 # we're done with the operations listing and don't need to provide another 

785 # page token 

786 elif len(operations) <= page_size: 

787 next_page_token = "" 

788 else: 

789 # Drop the last operation since we have an extra 

790 operations.pop() 

791 # Our page token will be the last row of our set 

792 next_page_token = build_page_token(operations[-1], sort_keys) 

793 return [self._load_operation(operation) for operation in operations], next_page_token 

794 

795 def get_metrics(self) -> Optional[DataStoreMetrics]: 

796 # Skip publishing overall scheduler metrics if we have recently published them 

797 last_publish_time = self._last_scheduler_metrics_publish_time 

798 time_since_publish = None 

799 if last_publish_time: 

800 time_since_publish = datetime.utcnow() - last_publish_time 

801 if time_since_publish and time_since_publish < self._scheduler_metrics_publish_interval: 

802 # Published too recently, skip 

803 return None 

804 

805 def _get_query_leases_by_state(category: str) -> Select: 

806 # Using func.count here to avoid generating a subquery in the WHERE 

807 # clause of the resulting query. 

808 # https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.count 

809 return select( 

810 [ 

811 literal_column(f"'{category}'").label("category"), 

812 LeaseEntry.state.label("bucket"), 

813 func.count(LeaseEntry.id).label("value"), 

814 ] 

815 ).group_by(LeaseEntry.state) 

816 

817 def _cb_query_leases_by_state(leases_by_state: Dict[Any, Any]) -> Dict[Any, Any]: 

818 # The database only returns counts > 0, so fill in the gaps 

819 for state in LeaseState: 

820 if state.value not in leases_by_state: 

821 leases_by_state[state.value] = 0 

822 return leases_by_state 

823 

824 def _get_query_jobs_by_stage(category: str) -> Select: 

825 # Using func.count here to avoid generating a subquery in the WHERE 

826 # clause of the resulting query. 

827 # https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.count 

828 return select( 

829 [ 

830 literal_column(f"'{category}'").label("category"), 

831 JobEntry.stage.label("bucket"), 

832 func.count(JobEntry.name).label("value"), 

833 ] 

834 ).group_by(JobEntry.stage) 

835 

836 def _cb_query_jobs_by_stage(jobs_by_stage: Dict[Any, Any]) -> Dict[Any, Any]: 

837 # The database only returns counts > 0, so fill in the gaps 

838 for stage in OperationStage: 

839 if stage.value not in jobs_by_stage: 

840 jobs_by_stage[stage.value] = 0 

841 return jobs_by_stage 

842 

843 metrics: DataStoreMetrics = {} 

844 # metrics to gather: (category_name, function_returning_query, callback_function) 

845 metrics_to_gather = [ 

846 (MetricCategories.LEASES.value, _get_query_leases_by_state, _cb_query_leases_by_state), 

847 (MetricCategories.JOBS.value, _get_query_jobs_by_stage, _cb_query_jobs_by_stage), 

848 ] 

849 

850 statements = [query_fn(category) for category, query_fn, _ in metrics_to_gather] 

851 metrics_statement = union(*statements) 

852 

853 try: 

854 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

855 results = session.execute(metrics_statement).all() 

856 

857 grouped_results: Dict[str, Any] = {category: {} for category, _, _ in results} 

858 for category, bucket, value in results: 

859 grouped_results[category][bucket] = value 

860 

861 for category, _, category_cb in metrics_to_gather: 

862 metrics[category] = category_cb( # type: ignore[literal-required] 

863 grouped_results.setdefault(category, {}) 

864 ) 

865 except DatabaseError: 

866 LOGGER.warning("Unable to gather metrics due to a Database Error.") 

867 return {} 

868 

869 # This is only updated within the metrics asyncio loop; no race conditions 

870 self._last_scheduler_metrics_publish_time = datetime.utcnow() 

871 

872 return metrics 

873 

874 def _queued_jobs_by_capability(self, capability_hash: str) -> Select: 

875 return ( 

876 select(JobEntry) 

877 .with_for_update(skip_locked=True) 

878 .where( 

879 JobEntry.assigned != True, # noqa: E712 

880 self._job_in_instance(), 

881 JobEntry.platform_requirements == capability_hash, 

882 JobEntry.stage == OperationStage.QUEUED.value, 

883 ) 

884 ) 

885 

886 def assign_n_leases_by_priority( 

887 self, 

888 *, 

889 capability_hash: str, 

890 bot_names: List[str], 

891 ) -> List[str]: 

892 job_statement = self._queued_jobs_by_capability(capability_hash).order_by( 

893 JobEntry.priority, JobEntry.queued_timestamp 

894 ) 

895 return self._assign_n_leases(job_statement=job_statement, bot_names=bot_names) 

896 

897 def assign_n_leases_by_age( 

898 self, 

899 *, 

900 capability_hash: str, 

901 bot_names: List[str], 

902 ) -> List[str]: 

903 job_statement = self._queued_jobs_by_capability(capability_hash).order_by(JobEntry.queued_timestamp) 

904 return self._assign_n_leases(job_statement=job_statement, bot_names=bot_names) 

905 

906 @DurationMetric(BOTS_ASSIGN_JOB_LEASES_TIME_METRIC_NAME, instanced=True) 

907 def _assign_n_leases(self, *, job_statement: Select, bot_names: List[str]) -> List[str]: 

908 bot_statement = ( 

909 select(BotEntry) 

910 .with_for_update(skip_locked=True) 

911 .where( 

912 BotEntry.lease_id.is_(None), 

913 self._bot_in_instance(), 

914 BotEntry.name.in_(bot_names), 

915 BotEntry.expiry_time > datetime.utcnow(), 

916 ) 

917 ) 

918 

919 try: 

920 with self._sql.session(sqlite_lock_immediately=True, exceptions_to_not_raise_on=[Exception]) as session: 

921 jobs = session.execute(job_statement.limit(len(bot_names))).scalars().all() 

922 bots = session.execute(bot_statement.limit(len(jobs))).scalars().all() 

923 

924 assigned_bot_names: List[str] = [] 

925 for job, bot in zip(jobs, bots): 

926 job.assigned = True 

927 job.queued_timestamp = job.queued_timestamp or datetime.utcnow() 

928 job.queued_time_duration = int((datetime.utcnow() - job.queued_timestamp).total_seconds()) 

929 job.worker_start_timestamp = datetime.utcnow() 

930 job.worker_completed_timestamp = None 

931 bot.lease_id = job.name 

932 bot.last_update_timestamp = datetime.utcnow() 

933 log_tags = f"job_name=[{job.name}] bot_id=[{bot.bot_id}] bot_name=[{bot.name}]" 

934 if job.active_leases: 

935 lease = job.active_leases[0] 

936 LOGGER.debug( 

937 f"Reassigned existing lease. {log_tags} prev_lease_state=[{lease.state}]" 

938 f" prev_lease_status=[{lease.status}] prev_bot_id=[{lease.worker_name}]" 

939 ) 

940 lease.state = LeaseState.PENDING.value 

941 lease.status = None 

942 lease.worker_name = bot.bot_id 

943 else: 

944 LOGGER.debug(f"Assigned new lease. {log_tags}") 

945 session.add( 

946 LeaseEntry( 

947 job_name=job.name, 

948 state=LeaseState.PENDING.value, 

949 status=None, 

950 worker_name=bot.bot_id, 

951 ) 

952 ) 

953 assigned_bot_names.append(bot.name) 

954 

955 return assigned_bot_names 

956 except DatabaseError: 

957 LOGGER.warning("Will not assign any leases this time due to a Database Error.") 

958 return [] 

959 

960 def _do_queue_timeout(self, shutdown_requested: threading.Event) -> None: 

961 """Periodically timeout aged queued jobs""" 

962 

963 if not (opts := self.queue_timeout_options): 

964 return 

965 

966 job_max_age = opts.job_max_age 

967 period = opts.handling_period 

968 limit = opts.max_handling_window 

969 

970 last_timeout_time = datetime.utcnow() 

971 while not shutdown_requested.is_set(): 

972 now = datetime.utcnow() 

973 if now - last_timeout_time < period: 

974 LOGGER.info(f"Job queue timeout thread sleeping for {period} seconds") 

975 shutdown_requested.wait(timeout=period.total_seconds()) 

976 continue 

977 

978 timeout_jobs_scheduled_before = now - job_max_age 

979 try: 

980 with DurationMetric( 

981 DATA_STORE_QUEUE_TIMEOUT_TIME_METRIC_NAME, instance_name=self._instance_name, instanced=True 

982 ): 

983 num_timeout = self._timeout_queued_jobs_scheduled_before(timeout_jobs_scheduled_before, limit) 

984 LOGGER.info(f"Timed-out {num_timeout} queued jobs scheduled before {timeout_jobs_scheduled_before}") 

985 if num_timeout > 0: 

986 with Counter( 

987 metric_name=DATA_STORE_QUEUE_TIMEOUT_NUM_METRIC_NAME, 

988 instance_name=self._instance_name, 

989 ) as num_dequeued_metric: 

990 num_dequeued_metric.increment(num_timeout) 

991 

992 except Exception as e: 

993 LOGGER.exception("Failed to timeout aged queued jobs", exc_info=e) 

994 finally: 

995 last_timeout_time = now 

996 

997 def _timeout_queued_jobs_scheduled_before(self, dt: datetime, limit: int) -> int: 

998 jobs_to_timeout_stmt = ( 

999 select(JobEntry) 

1000 .where(JobEntry.stage == OperationStage.QUEUED.value) 

1001 .where(JobEntry.queued_timestamp < dt) 

1002 .limit(limit) 

1003 ) 

1004 return self._batch_timeout_jobs( 

1005 jobs_to_timeout_stmt, code_pb2.UNAVAILABLE, "Operation has been queued for too long" 

1006 ) 

1007 

1008 def _do_prune(self, shutdown_requested: threading.Event) -> None: 

1009 """Running in a background thread, this method wakes up periodically and deletes older records 

1010 from the jobs tables using configurable parameters""" 

1011 

1012 if not (opts := self.pruning_options): 

1013 return 

1014 

1015 job_max_age = opts.job_max_age 

1016 pruning_period = opts.handling_period 

1017 limit = opts.max_handling_window 

1018 

1019 utc_last_prune_time = datetime.utcnow() 

1020 while not shutdown_requested.is_set(): 

1021 utcnow = datetime.utcnow() 

1022 if (utcnow - pruning_period) < utc_last_prune_time: 

1023 LOGGER.info(f"Pruner thread sleeping for {pruning_period}(until {utcnow + pruning_period})") 

1024 shutdown_requested.wait(timeout=pruning_period.total_seconds()) 

1025 continue 

1026 

1027 delete_before_datetime = utcnow - job_max_age 

1028 try: 

1029 with DurationMetric( 

1030 DATA_STORE_PRUNER_DELETE_TIME_METRIC_NAME, instance_name=self._instance_name, instanced=True 

1031 ): 

1032 num_rows = self._delete_jobs_prior_to(delete_before_datetime, limit) 

1033 

1034 LOGGER.info(f"Pruned {num_rows} row(s) from the jobs table older than {delete_before_datetime}") 

1035 

1036 if num_rows > 0: 

1037 with Counter( 

1038 metric_name=DATA_STORE_PRUNER_NUM_ROWS_DELETED_METRIC_NAME, instance_name=self._instance_name 

1039 ) as num_rows_deleted: 

1040 num_rows_deleted.increment(num_rows) 

1041 

1042 except Exception: 

1043 LOGGER.exception("Caught exception while deleting jobs records") 

1044 

1045 finally: 

1046 # Update even if error occurred to avoid potentially infinitely retrying 

1047 utc_last_prune_time = utcnow 

1048 

1049 LOGGER.info("Exiting pruner thread") 

1050 

1051 def _delete_jobs_prior_to(self, delete_before_datetime: datetime, limit: int) -> int: 

1052 """Deletes older records from the jobs tables constrained by `delete_before_datetime` and `limit`""" 

1053 delete_stmt = delete(JobEntry).where( 

1054 JobEntry.name.in_( 

1055 select(JobEntry.name) 

1056 .with_for_update(skip_locked=True) 

1057 .where( 

1058 self._job_in_instance(), 

1059 JobEntry.worker_completed_timestamp <= delete_before_datetime, 

1060 ) 

1061 .limit(limit) 

1062 ), 

1063 self._job_in_instance(), 

1064 ) 

1065 

1066 with self._sql.session() as session: 

1067 options = {"synchronize_session": "fetch"} 

1068 num_rows_deleted: int = session.execute(delete_stmt, execution_options=options).rowcount # type: ignore 

1069 

1070 return num_rows_deleted 

1071 

1072 def get_or_create_client_identity_in_store( 

1073 self, session: Session, client_id: ClientIdentityEntry 

1074 ) -> ClientIdentityEntry: 

1075 """Get the ClientIdentity in the storage or create one. 

1076 This helper function essentially makes sure the `client_id` is created during the transaction 

1077 

1078 Args: 

1079 session (Session): sqlalchemy Session 

1080 client_id (ClientIdentityEntry): identity of the client that creates an operation 

1081 

1082 Returns: 

1083 ClientIdentityEntry: identity of the client that creates an operation 

1084 """ 

1085 client_id_stored: Optional[ClientIdentityEntry] = ( 

1086 session.query(ClientIdentityEntry) 

1087 .where(ClientIdentityEntry.instance == client_id.instance) 

1088 .where(ClientIdentityEntry.workflow == client_id.workflow) 

1089 .where(ClientIdentityEntry.actor == client_id.actor) 

1090 .where(ClientIdentityEntry.subject == client_id.subject) 

1091 .one_or_none() 

1092 ) 

1093 if client_id_stored is not None: 

1094 return client_id_stored 

1095 

1096 session.add(client_id) 

1097 session.flush() 

1098 return client_id 

1099 

1100 def add_bot_entry(self, *, bot_session_id: str, bot_session_status: int) -> str: 

1101 with self._sql.session() as session: 

1102 # Check if bot_id is already known. If yes, all leases associated with 

1103 # it are requeued and the existing record deleted. A new record is then 

1104 # created with the new bot_id/name combination, as it would in the 

1105 # unknown case. 

1106 locate_bot_stmt = ( 

1107 select(BotEntry).where(BotEntry.bot_id == bot_session_id, self._bot_in_instance()).with_for_update() 

1108 ) 

1109 self._close_bot_sessions(session, session.execute(locate_bot_stmt).scalars().all()) 

1110 

1111 bot_name = f"{self._instance_name}/{str(uuid.uuid4())}" 

1112 session.add( 

1113 BotEntry( 

1114 name=bot_name, 

1115 bot_id=bot_session_id, 

1116 last_update_timestamp=datetime.utcnow(), 

1117 lease_id=None, 

1118 bot_status=bot_session_status, 

1119 instance_name=self._instance_name, 

1120 expiry_time=datetime.utcnow() + timedelta(seconds=self.bot_session_keepalive_timeout), 

1121 ) 

1122 ) 

1123 return bot_name 

1124 

1125 def close_bot_sessions(self, bot_name: str) -> None: 

1126 with self._sql.session() as session: 

1127 locate_bot_stmt = ( 

1128 select(BotEntry).where(BotEntry.name == bot_name, self._bot_in_instance()).with_for_update() 

1129 ) 

1130 self._close_bot_sessions(session, session.execute(locate_bot_stmt).scalars().all()) 

1131 

1132 def _close_bot_sessions(self, session: Session, bots: List[BotEntry]) -> None: 

1133 for bot in bots: 

1134 log_tags = f" instance_name=[{self._instance_name}] request.bot_name=[{bot.name}]" 

1135 log_tags += f" request.bot_id=[{bot.bot_id}] request.bot_status=[{bot.bot_status}]" 

1136 

1137 LOGGER.debug(f"Closing bot session. {log_tags}") 

1138 if bot.lease_id: 

1139 if job := self._get_job(bot.lease_id, session, with_for_update=True): 

1140 for db_lease in job.active_leases: 

1141 lease_tags = log_tags + f" db.lease_id=[{job.name}] db.lease_state=[{db_lease.state}]" 

1142 LOGGER.debug(f"Reassigning lease for bot session. {lease_tags}") 

1143 self._retry_job_lease(session, job, db_lease) 

1144 self._notify_job_updated(job.name, session) 

1145 session.delete(bot) 

1146 

1147 def reap_expired_sessions(self) -> bool: 

1148 """ 

1149 Find and close expired bot sessions. Returns True if sessions were closed. 

1150 Only closes a few sessions to minimize time in transaction. 

1151 """ 

1152 

1153 with self._sql.session() as session: 

1154 locate_bot_stmt = ( 

1155 select(BotEntry) 

1156 .where(BotEntry.expiry_time < datetime.utcnow(), self._bot_in_instance()) 

1157 .order_by(BotEntry.expiry_time.desc()) 

1158 .with_for_update(skip_locked=True) 

1159 .limit(5) 

1160 ) 

1161 if bots := session.execute(locate_bot_stmt).scalars().all(): 

1162 for bot in bots: 

1163 LOGGER.warning( 

1164 f"BotSession name=[{bot.name}] for bot_id=[{bot.bot_id}] " 

1165 f"with deadline=[{bot.expiry_time}] has expired." 

1166 ) 

1167 self._close_bot_sessions(session, bots) 

1168 return True 

1169 return False 

1170 

1171 @DurationMetric(SCHEDULER_UPDATE_LEASE_TIME_METRIC_NAME, instanced=True) 

1172 def synchronize_bot_lease( 

1173 self, bot_name: str, bot_id: str, bot_status: int, session_lease: Optional[Lease] 

1174 ) -> Optional[Lease]: 

1175 log_tags = f"instance_name=[{self._instance_name}]" 

1176 log_tags += f" request.bot_name=[{bot_name}] request.bot_id=[{bot_id}] request.bot_status=[{bot_status}]" 

1177 

1178 if session_lease: 

1179 log_tags += f" request.lease_id=[{session_lease.id}] request.lease_state=[{session_lease.state}]" 

1180 else: 

1181 log_tags += " request.lease_id=[] request.lease_state=[]" 

1182 

1183 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session: 

1184 locate_bot_stmt = ( 

1185 select(BotEntry).where(BotEntry.bot_id == bot_id, self._bot_in_instance()).with_for_update() 

1186 ) 

1187 bots: List[BotEntry] = session.execute(locate_bot_stmt).scalars().all() 

1188 if not bots: 

1189 raise InvalidArgumentError(f"Bot does not exist while validating leases. {log_tags}") 

1190 

1191 # This is a tricky case. This case happens when a new bot session is created while an older 

1192 # session for a bot id is waiting on leases. This can happen when a worker reboots but the 

1193 # connection context takes a long time to close. In this case, we DO NOT want to update anything 

1194 # in the database, because the work/lease has already been re-assigned to a new session. 

1195 # Closing anything in the database at this point would cause the newly restarted worker 

1196 # to get cancelled prematurely. 

1197 if len(bots) == 1 and bots[0].name != bot_name: 

1198 raise BotSessionMismatchError( 

1199 "Mismatch between client supplied bot_id/bot_name and buildgrid database record. " 

1200 f"db.bot_name=[{bots[0].name}] {log_tags}" 

1201 ) 

1202 

1203 # Everything at this point is wrapped in try/catch, so we can raise BotSessionMismatchError or 

1204 # BotSessionClosedError and have the session be closed if preconditions from here out fail. 

1205 try: 

1206 # There should never be time when two bot sessions exist for the same bot id. We have logic to 

1207 # assert that old database entries for a given bot id are closed and deleted prior to making a 

1208 # new one. If this case happens shut everything down, so we can hopefully recover. 

1209 if len(bots) > 1: 

1210 raise BotSessionMismatchError( 

1211 "Bot id is registered to more than one bot session. " 

1212 f"names=[{', '.join(bot.name for bot in bots)}] {log_tags}" 

1213 ) 

1214 

1215 bot = bots[0] 

1216 log_tags += f" db.lease_id=[{bot.lease_id}]" 

1217 

1218 # Validate that the lease_id matches the client and database if both are supplied. 

1219 if (session_lease and session_lease.id and bot.lease_id) and (session_lease.id != bot.lease_id): 

1220 raise BotSessionMismatchError( 

1221 f"Mismatch between client supplied lease_id and buildgrid database record. {log_tags}" 

1222 ) 

1223 

1224 # Update the expiry time. 

1225 bot.expiry_time = datetime.utcnow() + timedelta(seconds=self.bot_session_keepalive_timeout) 

1226 bot.last_update_timestamp = datetime.utcnow() 

1227 bot.bot_status = bot_status 

1228 

1229 # Validate the cases where the database doesn't know about any leases. 

1230 if bot.lease_id is None: 

1231 # If there's no lease in the database or session, we have nothing to update! 

1232 if not session_lease: 

1233 LOGGER.debug(f"No lease in session or database. Skipping. {log_tags}") 

1234 return None 

1235 

1236 # If the database has no lease, but the work is completed, we probably timed out the last call. 

1237 if session_lease.state == LeaseState.COMPLETED.value: 

1238 LOGGER.debug(f"No lease in database, but session lease is completed. Skipping. {log_tags}") 

1239 return None 

1240 

1241 # Otherwise, the bot session has a lease that the server doesn't know about. Bad bad bad. 

1242 raise BotSessionClosedError(f"Bot session lease id does not match the database. {log_tags}") 

1243 

1244 # Let's now lock the job so no more state transitions occur while we perform our updates. 

1245 job = self._get_job(bot.lease_id, session, with_for_update=True) 

1246 if not job: 

1247 raise BotSessionClosedError(f"Bot session lease id points to non-existent job. {log_tags}") 

1248 

1249 # If we don't have any leases assigned to the job now, someone interrupted us before locking. 

1250 # Disconnect our bot from mutating this job. 

1251 if not job.leases: 

1252 raise BotSessionClosedError(f"Leases were changed while job was being locked. {log_tags}") 

1253 

1254 db_lease = job.leases[0] 

1255 log_tags += f" db.lease_state=[{db_lease.state}]" 

1256 

1257 # Assign: 

1258 # 

1259 # If the lease is in the PENDING state, this means that it is a new lease for the worker, which 

1260 # it must acknowledge (the next time it calls UpdateBotSession) by changing the state to ACTIVE. 

1261 # 

1262 # Leases contain a “payload,” which is an Any proto that must be understandable to the bot. 

1263 # 

1264 # If at any time the bot issues a call to UpdateBotSession that is inconsistent with what the service 

1265 # expects, the service can take appropriate action. For example, the service may have assigned a 

1266 # lease to a bot, but the call gets interrupted before the bot receives the message, perhaps because 

1267 # the UpdateBotSession call times out. As a result, the next call to UpdateBotSession from the bot 

1268 # will not include the lease, and the service can immediately conclude that the lease needs to be 

1269 # reassigned. 

1270 # 

1271 if not session_lease: 

1272 if db_lease.state != LeaseState.PENDING.value: 

1273 raise BotSessionClosedError( 

1274 f"Session has no lease and database entry not in pending state. {log_tags}" 

1275 ) 

1276 

1277 job.stage = OperationStage.EXECUTING.value 

1278 if self.logstream_channel and self.logstream_instance is not None: 

1279 try: 

1280 action_digest = string_to_digest(job.action_digest) 

1281 parent_base = f"{action_digest.hash}_{action_digest.size_bytes}_{int(time())}" 

1282 with logstream_client(self.logstream_channel, self.logstream_instance) as ls_client: 

1283 stdout_stream = ls_client.create(f"{parent_base}_stdout") 

1284 stderr_stream = ls_client.create(f"{parent_base}_stderr") 

1285 job.stdout_stream_name = stdout_stream.name 

1286 job.stdout_stream_write_name = stdout_stream.write_resource_name 

1287 job.stderr_stream_name = stderr_stream.name 

1288 job.stderr_stream_write_name = stderr_stream.write_resource_name 

1289 except Exception as e: 

1290 LOGGER.warning(f"Failed to create log stream. {log_tags}", exc_info=e) 

1291 

1292 self._notify_job_updated(job.name, session) 

1293 LOGGER.debug(f"Pending lease sent to bot for ack. {log_tags}") 

1294 return db_lease.to_protobuf() 

1295 

1296 # At this point, we know that there's a lease both in the bot session and in the database. 

1297 

1298 # Accept: 

1299 # 

1300 # If the lease is in the PENDING state, this means that it is a new lease for the worker, 

1301 # which it must acknowledge (the next time it calls UpdateBotSession) by changing the state to ACTIVE 

1302 # 

1303 if session_lease.state == LeaseState.ACTIVE.value and db_lease.state == LeaseState.PENDING.value: 

1304 db_lease.state = LeaseState.ACTIVE.value 

1305 self._notify_job_updated(job.name, session) 

1306 LOGGER.debug(f"Bot acked pending lease. {log_tags}") 

1307 return session_lease 

1308 

1309 # Complete: 

1310 # 

1311 # Once the assignment is complete - either because it finishes or because it times out - the bot 

1312 # calls Bots.UpdateBotSession again, this time updating the state of the lease from accepted to 

1313 # complete, and optionally by also populating the lease’s results field, which is another Any proto. 

1314 # The service can then assign it new work (removing any completed leases). 

1315 # 

1316 # A successfully completed lease may go directly from PENDING to COMPLETED if, for example, the 

1317 # lease was completed before the bot has had the opportunity to transition to ACTIVE, or if the 

1318 # update transitioning the lease to the ACTIVE state was lost. 

1319 # 

1320 if session_lease.state == LeaseState.COMPLETED.value and db_lease.state in ( 

1321 LeaseState.PENDING.value, 

1322 LeaseState.ACTIVE.value, 

1323 ): 

1324 log_tags += f" request.lease_status_code=[{session_lease.status.code}]" 

1325 log_tags += f" request.lease_status_message=[{session_lease.status.message}]" 

1326 log_tags += f" db.n_tries=[{job.n_tries}]" 

1327 

1328 bot.lease_id = None 

1329 if ( 

1330 session_lease.status.code in self.RETRYABLE_STATUS_CODES 

1331 and (job.n_tries or 1) < self.max_job_attempts 

1332 ): 

1333 LOGGER.debug(f"Retrying bot lease. {log_tags}") 

1334 self._retry_job_lease(session, job, db_lease) 

1335 else: 

1336 LOGGER.debug(f"Bot completed lease. {log_tags}") 

1337 self._complete_lease(session, job, db_lease, session_lease.status, session_lease.result) 

1338 

1339 self._notify_job_updated(job.name, session) 

1340 return None 

1341 

1342 # Cancel: 

1343 # 

1344 # At any time, the service may change the state of a lease from PENDING or ACTIVE to CANCELLED; 

1345 # the bot may not change to this state. The service then waits for the bot to acknowledge the 

1346 # change by updating its own status to CANCELLED as well. Once both the service and the bot agree, 

1347 # the service may remove it from the list of leases. 

1348 # 

1349 if session_lease.state == db_lease.state == LeaseState.CANCELLED.value: 

1350 bot.lease_id = None 

1351 LOGGER.debug(f"Bot acked cancelled lease. {log_tags}") 

1352 return None 

1353 

1354 if db_lease.state == LeaseState.CANCELLED.value: 

1355 session_lease.state = LeaseState.CANCELLED.value 

1356 LOGGER.debug(f"Cancelled lease sent to bot for ack. {log_tags}") 

1357 return session_lease 

1358 

1359 if session_lease.state == LeaseState.CANCELLED.value: 

1360 raise BotSessionClosedError(f"Illegal attempt from session to set state as cancelled. {log_tags}") 

1361 

1362 # Keepalive: 

1363 # 

1364 # The Bot periodically calls Bots.UpdateBotSession, either if there’s a genuine change (for example, 

1365 # an attached phone has died) or simply to let the service know that it’s alive and ready to receive 

1366 # work. If the bot doesn’t call back on time, the service considers it to have died, and all work 

1367 # from the bot to be lost. 

1368 # 

1369 if session_lease.state == db_lease.state: 

1370 LOGGER.debug(f"Bot heartbeat acked. {log_tags}") 

1371 return session_lease 

1372 

1373 # Any other transition should really never happen... cover it anyways. 

1374 raise BotSessionClosedError(f"Unsupported lease state transition. {log_tags}") 

1375 # TODO allow creating a session with manual commit logic. 

1376 # For now... Sneak the exception past the context manager. 

1377 except (BotSessionMismatchError, BotSessionClosedError) as e: 

1378 self._close_bot_sessions(session, bots) 

1379 err = e 

1380 raise err 

1381 

1382 def _retry_job_lease(self, session: Session, job: JobEntry, lease: LeaseEntry) -> None: 

1383 # If the job was mutated before we could lock it, exit fast on terminal states. 

1384 if job.cancelled or job.stage == OperationStage.COMPLETED.value: 

1385 return 

1386 

1387 job.n_tries = job.n_tries or 1 

1388 if job.n_tries >= self.max_job_attempts: 

1389 status = status_pb2.Status( 

1390 code=code_pb2.ABORTED, message=f"Job was retried {job.n_tries} unsuccessfully. Aborting." 

1391 ) 

1392 self._complete_lease(session, job, lease, status=status) 

1393 return 

1394 

1395 job.stage = OperationStage.QUEUED.value 

1396 job.assigned = False 

1397 job.n_tries += 1 

1398 

1399 lease.state = LeaseState.PENDING.value 

1400 lease.status = None 

1401 lease.worker_name = None 

1402 

1403 def _complete_lease( 

1404 self, session: Session, job: JobEntry, lease: LeaseEntry, status: Status, result: Optional[ProtoAny] = None 

1405 ) -> None: 

1406 lease.state = LeaseState.COMPLETED.value 

1407 lease.status = status.code 

1408 

1409 job.stage = OperationStage.COMPLETED.value 

1410 job.status_code = status.code 

1411 if not job.do_not_cache: 

1412 job.do_not_cache = status.code != code_pb2.OK 

1413 job.worker_completed_timestamp = datetime.utcnow() 

1414 

1415 action_result = ActionResult() 

1416 if result is not None and result.Is(action_result.DESCRIPTOR): 

1417 result.Unpack(action_result) 

1418 now = datetime.utcnow() 

1419 action_result.execution_metadata.queued_timestamp.FromDatetime(job.queued_timestamp or now) 

1420 action_result.execution_metadata.worker_start_timestamp.FromDatetime(job.worker_start_timestamp or now) 

1421 action_result.execution_metadata.worker_completed_timestamp.FromDatetime(job.worker_completed_timestamp or now) 

1422 response = ExecuteResponse(result=action_result, cached_result=False, status=status) 

1423 

1424 job.result = digest_to_string(self.storage.put_message(response)) 

1425 

1426 if self.action_cache and result and not job.do_not_cache: 

1427 action_digest = string_to_digest(job.action_digest) 

1428 try: 

1429 self.action_cache.update_action_result(action_digest, action_result) 

1430 LOGGER.debug( 

1431 f"Stored action result=[{action_result}] for action_digest=[{action_digest}] in ActionCache" 

1432 ) 

1433 except UpdateNotAllowedError: 

1434 # The configuration doesn't allow updating the old result 

1435 LOGGER.exception( 

1436 "ActionCache is not configured to allow updates, ActionResult for action_digest=" 

1437 f"[{action_digest.hash}/{action_digest.size_bytes}] wasn't updated." 

1438 ) 

1439 except Exception: 

1440 LOGGER.exception( 

1441 "Unable to update ActionCache for action " 

1442 f"[{action_digest.hash}/{action_digest.size_bytes}], " 

1443 "results will not be stored in the ActionCache" 

1444 ) 

1445 

1446 # Update retentions 

1447 self._update_action_retention( 

1448 Action.FromString(job.action), 

1449 string_to_digest(job.action_digest), 

1450 retention_hours=self.completed_action_retention_hours, 

1451 ) 

1452 if action_result.ByteSize() > 0: 

1453 self._update_action_result_retention(action_result, retention_hours=self.action_result_retention_hours) 

1454 

1455 self._publish_execution_stats(session, job.name, action_result.execution_metadata) 

1456 

1457 def count_bots(self) -> int: 

1458 """Count the number of bots""" 

1459 with self._sql.session() as session: 

1460 return session.query(BotEntry).filter(self._bot_in_instance()).count() 

1461 

1462 def count_bots_by_status(self, status: BotStatus) -> int: 

1463 """Count the number of bots with a particular status""" 

1464 with self._sql.session() as session: 

1465 return session.query(BotEntry).filter(self._bot_in_instance(), BotEntry.bot_status == status.value).count() 

1466 

1467 def refresh_bot_expiry_time(self, bot_name: str, bot_id: str) -> datetime: 

1468 """ 

1469 This update is done out-of-band from the main synchronize_bot_lease transaction, as there 

1470 are cases where we will skip calling the synchronization, but still want the session to be 

1471 updated such that it does not get reaped. This slightly duplicates the update happening in 

1472 synchronize_bot_lease, however, that update is still required to not have the job reaped 

1473 during its job assignment waiting period. 

1474 

1475 This method should be called at the end of the update and create bot session methods. 

1476 The returned datetime should be assigned to the deadline within the returned session proto. 

1477 """ 

1478 

1479 locate_bot_stmt = ( 

1480 select(BotEntry) 

1481 .where(BotEntry.name == bot_name, BotEntry.bot_id == bot_id, self._bot_in_instance()) 

1482 .with_for_update() 

1483 ) 

1484 with self._sql.session() as session: 

1485 if bot := session.execute(locate_bot_stmt).scalar(): 

1486 now = datetime.utcnow() 

1487 bot.last_update_timestamp = now 

1488 bot.expiry_time = now + timedelta(seconds=self.bot_session_keepalive_timeout) 

1489 return bot.expiry_time 

1490 raise BotSessionClosedError(f"Bot not found to fetch expiry. {bot_name=} {bot_id=}") 

1491 

1492 def get_metadata_for_leases(self, leases: Iterable[Lease]) -> List[Tuple[str, bytes]]: 

1493 """Return a list of Job metadata for a given list of leases. 

1494 

1495 Args: 

1496 leases (list): List of leases to get Job metadata for. 

1497 

1498 Returns: 

1499 List of tuples of the form 

1500 ``('executeoperationmetadata-bin': serialized_metadata)``. 

1501 

1502 """ 

1503 metadata = [] 

1504 with self._sql_ro.session() as session: 

1505 for lease in leases: 

1506 job = self._get_job(lease.id, session) 

1507 if job is not None: 

1508 job_metadata = ExecuteOperationMetadata( 

1509 stage=job.stage, # type: ignore[arg-type] 

1510 action_digest=string_to_digest(job.action_digest), 

1511 stderr_stream_name=job.stderr_stream_write_name or "", 

1512 stdout_stream_name=job.stdout_stream_write_name or "", 

1513 ) 

1514 metadata.append(("executeoperationmetadata-bin", job_metadata.SerializeToString())) 

1515 

1516 return metadata 

1517 

1518 def _fetch_execution_stats( 

1519 self, auxiliary_metadata: RepeatedCompositeFieldContainer[ProtoAny] 

1520 ) -> Optional[ExecutionStatistics]: 

1521 """Fetch ExecutionStatistics from Storage 

1522 ProtoAny[Digest] -> ProtoAny[ExecutionStatistics] 

1523 """ 

1524 for aux_metadata_any in auxiliary_metadata: 

1525 # Get the wrapped digest 

1526 if not aux_metadata_any.Is(Digest.DESCRIPTOR): 

1527 continue 

1528 aux_metadata_digest = Digest() 

1529 try: 

1530 aux_metadata_any.Unpack(aux_metadata_digest) 

1531 # Get the blob from CAS 

1532 execution_stats_any = self.storage.get_message(aux_metadata_digest, ProtoAny) 

1533 # Get the wrapped ExecutionStatistics 

1534 if execution_stats_any and execution_stats_any.Is(ExecutionStatistics.DESCRIPTOR): 

1535 execution_stats = ExecutionStatistics() 

1536 execution_stats_any.Unpack(execution_stats) 

1537 return execution_stats 

1538 except Exception as exc: 

1539 LOGGER.exception( 

1540 "Cannot fetch ExecutionStatistics from storage, auxiliary_metadata=%s", 

1541 aux_metadata_digest.SerializeToString(), 

1542 exc_info=exc, 

1543 ) 

1544 return None 

1545 return None 

1546 

1547 def publish_execution_stats(self, job_name: str, execution_metadata: ExecutedActionMetadata) -> None: 

1548 with self._sql_ro.session(expire_on_commit=False) as session: 

1549 self._publish_execution_stats(session, job_name, execution_metadata) 

1550 

1551 def _publish_execution_stats( 

1552 self, session: Session, job_name: str, execution_metadata: ExecutedActionMetadata 

1553 ) -> None: 

1554 """Publish resource usage of the job""" 

1555 context_metadata = {"instance-name": self._instance_name} if self._instance_name else None 

1556 

1557 queued = execution_metadata.queued_timestamp.ToDatetime() 

1558 worker_start = execution_metadata.worker_start_timestamp.ToDatetime() 

1559 worker_completed = execution_metadata.worker_completed_timestamp.ToDatetime() 

1560 fetch_start = execution_metadata.input_fetch_start_timestamp.ToDatetime() 

1561 fetch_completed = execution_metadata.input_fetch_completed_timestamp.ToDatetime() 

1562 execution_start = execution_metadata.execution_start_timestamp.ToDatetime() 

1563 execution_completed = execution_metadata.execution_completed_timestamp.ToDatetime() 

1564 upload_start = execution_metadata.output_upload_start_timestamp.ToDatetime() 

1565 upload_completed = execution_metadata.output_upload_completed_timestamp.ToDatetime() 

1566 

1567 # Emit build inputs fetching time record: 

1568 input_fetch_time = fetch_completed - fetch_start 

1569 publish_timer_metric(INPUTS_FETCHING_TIME_METRIC_NAME, input_fetch_time, metadata=context_metadata) 

1570 

1571 # Emit build execution time record: 

1572 execution_time = execution_completed - execution_start 

1573 publish_timer_metric(EXECUTION_TIME_METRIC_NAME, execution_time, metadata=context_metadata) 

1574 

1575 # Emit build outputs uploading time record: 

1576 output_upload_time = upload_completed - upload_start 

1577 publish_timer_metric(OUTPUTS_UPLOADING_TIME_METRIC_NAME, output_upload_time, metadata=context_metadata) 

1578 

1579 # Emit total queued time record: 

1580 # This calculates the queue time based purely on 

1581 # values set in the ActionResult's ExecutedActionMetadata, 

1582 # which may be ever so slightly different than what 

1583 # the job object's queued_time is. 

1584 total_queued_time = worker_start - queued 

1585 publish_timer_metric(QUEUED_TIME_METRIC_NAME, total_queued_time, metadata=context_metadata) 

1586 

1587 # Emit total time spent in worker 

1588 total_worker_time = worker_completed - worker_start 

1589 publish_timer_metric(WORKER_HANDLING_TIME_METRIC_NAME, total_worker_time, metadata=context_metadata) 

1590 

1591 # Emit total build handling time record: 

1592 total_handling_time = worker_completed - queued 

1593 publish_timer_metric(TOTAL_HANDLING_TIME_METRIC_NAME, total_handling_time, metadata=context_metadata) 

1594 

1595 if self.metering_client is None or len(execution_metadata.auxiliary_metadata) == 0: 

1596 return 

1597 

1598 execution_stats = self._fetch_execution_stats(execution_metadata.auxiliary_metadata) 

1599 if execution_stats is None: 

1600 return 

1601 usage = Usage( 

1602 computing=ComputingUsage( 

1603 utime=execution_stats.command_rusage.utime.ToMilliseconds(), 

1604 stime=execution_stats.command_rusage.stime.ToMilliseconds(), 

1605 maxrss=execution_stats.command_rusage.maxrss, 

1606 inblock=execution_stats.command_rusage.inblock, 

1607 oublock=execution_stats.command_rusage.oublock, 

1608 ) 

1609 ) 

1610 

1611 try: 

1612 operations = ( 

1613 session.query(OperationEntry) 

1614 .where(OperationEntry.job_name == job_name) 

1615 .options(joinedload(OperationEntry.client_identity)) 

1616 .all() 

1617 ) 

1618 for op in operations: 

1619 if op.client_identity is None: 

1620 continue 

1621 client_id = Identity( 

1622 instance=op.client_identity.instance, 

1623 workflow=op.client_identity.workflow, 

1624 actor=op.client_identity.actor, 

1625 subject=op.client_identity.subject, 

1626 ) 

1627 self.metering_client.put_usage(identity=client_id, operation_name=op.name, usage=usage) 

1628 except Exception as exc: 

1629 LOGGER.exception("Cannot publish resource usage, job_name=%s", job_name, exc_info=exc) 

1630 

1631 def _update_action_retention( 

1632 self, action: Action, action_digest: Digest, retention_hours: Optional[float] 

1633 ) -> None: 

1634 if not self.asset_client or not retention_hours: 

1635 return 

1636 digest_str = digest_to_string(action_digest) 

1637 uri = DIGEST_URI_TEMPLATE.format(digest_hash=action_digest.hash) 

1638 qualifier = {"resource_type": PROTOBUF_MEDIA_TYPE} 

1639 expire_at = datetime.now() + timedelta(hours=retention_hours) 

1640 referenced_blobs = [action.command_digest] 

1641 referenced_directories = [action.input_root_digest] 

1642 

1643 try: 

1644 self.asset_client.push_blob( 

1645 uris=[uri], 

1646 qualifiers=qualifier, 

1647 blob_digest=action_digest, 

1648 expire_at=expire_at, 

1649 referenced_blobs=referenced_blobs, 

1650 referenced_directories=referenced_directories, 

1651 ) 

1652 LOGGER.debug(f"Extended the retention of action {digest_str} for {retention_hours} hours") 

1653 except Exception: 

1654 LOGGER.exception(f"Failed to push action as an asset: {digest_str}") 

1655 # Not a fatal path, don't reraise here 

1656 

1657 def _update_action_result_retention(self, action_result: ActionResult, retention_hours: Optional[float]) -> None: 

1658 if not self.asset_client or not retention_hours: 

1659 return 

1660 try: 

1661 # BuildGrid doesn't store action_result in CAS, but if we push it as an asset 

1662 # we need it to be accessible 

1663 digest = self.storage.put_message(action_result) 

1664 digest_str = digest_to_string(digest) 

1665 

1666 uri = DIGEST_URI_TEMPLATE.format(digest_hash=digest.hash) 

1667 qualifier = {"resource_type": PROTOBUF_MEDIA_TYPE} 

1668 expire_at = datetime.now() + timedelta(hours=retention_hours) 

1669 

1670 referenced_blobs: List[Digest] = [] 

1671 referenced_directories: List[Digest] = [] 

1672 

1673 for file in action_result.output_files: 

1674 referenced_blobs.append(file.digest) 

1675 for dir in action_result.output_directories: 

1676 # Caveat: the underlying directories referenced by this `Tree` message are not referenced by this asset. 

1677 # For clients who need to keep all referenced outputs, 

1678 # consider setting `Action.output_directory_format` as `DIRECTORY_ONLY` or `TREE_AND_DIRECTORY`. 

1679 if dir.tree_digest.ByteSize() != 0: 

1680 referenced_blobs.append(dir.tree_digest) 

1681 if dir.root_directory_digest.ByteSize() != 0: 

1682 referenced_directories.append(dir.root_directory_digest) 

1683 

1684 if action_result.stdout_digest.ByteSize() != 0: 

1685 referenced_blobs.append(action_result.stdout_digest) 

1686 if action_result.stderr_digest.ByteSize() != 0: 

1687 referenced_blobs.append(action_result.stderr_digest) 

1688 

1689 self.asset_client.push_blob( 

1690 uris=[uri], 

1691 qualifiers=qualifier, 

1692 blob_digest=digest, 

1693 expire_at=expire_at, 

1694 referenced_blobs=referenced_blobs, 

1695 referenced_directories=referenced_directories, 

1696 ) 

1697 LOGGER.debug(f"Extended the retention of action_result {digest_str} for {retention_hours} hours") 

1698 

1699 except Exception as e: 

1700 LOGGER.exception(f"Failed to push action_result as an asset: {digest_str}", exc_info=e) 

1701 # Not a fatal path, don't reraise here