Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/impl.py: 91.42%

886 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-03-13 15:36 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import threading 

17import uuid 

18from collections import defaultdict 

19from contextlib import ExitStack 

20from datetime import datetime, timedelta 

21from time import time 

22from typing import Any, Iterable, NamedTuple, Sequence, TypedDict, TypeVar, cast 

23 

24from buildgrid_metering.client import SyncMeteringServiceClient 

25from buildgrid_metering.models.dataclasses import ComputingUsage, Identity, Usage 

26from google.protobuf.any_pb2 import Any as ProtoAny 

27from google.protobuf.internal.containers import RepeatedCompositeFieldContainer 

28from google.protobuf.timestamp_pb2 import Timestamp 

29from grpc import Channel 

30from sqlalchemy import ColumnExpressionArgument, and_, delete, func, insert, or_, select, text, update 

31from sqlalchemy.dialects import postgresql, sqlite 

32from sqlalchemy.exc import IntegrityError 

33from sqlalchemy.orm import Session, joinedload, selectinload 

34from sqlalchemy.sql.expression import Insert, Select 

35 

36from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

37from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

38 Action, 

39 ActionResult, 

40 Command, 

41 Digest, 

42 ExecutedActionMetadata, 

43 ExecuteOperationMetadata, 

44 ExecuteResponse, 

45 RequestMetadata, 

46 ToolDetails, 

47) 

48from buildgrid._protos.build.buildbox.execution_stats_pb2 import ExecutionStatistics 

49from buildgrid._protos.buildgrid.v2.identity_pb2 import ClientIdentity 

50from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import Lease 

51from buildgrid._protos.google.longrunning import operations_pb2 

52from buildgrid._protos.google.longrunning.operations_pb2 import Operation 

53from buildgrid._protos.google.rpc import code_pb2, status_pb2 

54from buildgrid._protos.google.rpc.status_pb2 import Status 

55from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

56from buildgrid.server.cas.storage.storage_abc import StorageABC 

57from buildgrid.server.client.asset import AssetClient 

58from buildgrid.server.client.logstream import logstream_client 

59from buildgrid.server.context import current_instance, instance_context, try_current_instance 

60from buildgrid.server.decorators import timed 

61from buildgrid.server.enums import BotStatus, LeaseState, MeteringThrottleAction, OperationStage 

62from buildgrid.server.exceptions import ( 

63 BotSessionClosedError, 

64 BotSessionMismatchError, 

65 CancelledError, 

66 DatabaseError, 

67 InvalidArgumentError, 

68 NotFoundError, 

69 ResourceExhaustedError, 

70 UpdateNotAllowedError, 

71) 

72from buildgrid.server.logging import buildgrid_logger 

73from buildgrid.server.metrics_names import METRIC 

74from buildgrid.server.metrics_utils import publish_counter_metric, publish_timer_metric, timer 

75from buildgrid.server.operations.filtering import DEFAULT_SORT_KEYS, OperationFilter, SortKey 

76from buildgrid.server.settings import DEFAULT_MAX_EXECUTION_TIMEOUT, SQL_SCHEDULER_METRICS_PUBLISH_INTERVAL_SECONDS 

77from buildgrid.server.sql.models import Base as OrmBase 

78from buildgrid.server.sql.models import ( 

79 BotEntry, 

80 ClientIdentityEntry, 

81 JobEntry, 

82 LeaseEntry, 

83 OperationEntry, 

84 PlatformEntry, 

85 PropertyLabelEntry, 

86 RequestMetadataEntry, 

87 digest_to_string, 

88 job_platform_association, 

89 string_to_digest, 

90) 

91from buildgrid.server.sql.provider import SqlProvider 

92from buildgrid.server.sql.utils import ( 

93 build_custom_filters, 

94 build_page_filter, 

95 build_page_token, 

96 build_sort_column_list, 

97 extract_sort_keys, 

98) 

99from buildgrid.server.threading import ContextWorker 

100from buildgrid.server.utils.digests import create_digest 

101 

102from .assigner import JobAssigner 

103from .notifier import OperationsNotifier 

104from .properties import PropertySet, hash_from_dict 

105 

106LOGGER = buildgrid_logger(__name__) 

107 

108 

109PROTOBUF_MEDIA_TYPE = "application/x-protobuf" 

110DIGEST_URI_TEMPLATE = "nih:sha-256;{digest_hash}" 

111 

112 

113class SchedulerMetrics(TypedDict, total=False): 

114 # dict[tuple[stage_name: str, property_label: str], number_of_jobs: int] 

115 jobs: dict[tuple[str, str], int] 

116 

117 

118class BotMetrics(TypedDict, total=False): 

119 # dict[tuple[bot_status: BotStatus], number_of_bots: int] 

120 bots_total: dict[BotStatus, int] 

121 

122 # dict[tuple[bot_status: BotStatus, property_label: str], number_of_bots: int] 

123 bots_per_property_label: dict[tuple[BotStatus, str], int] 

124 

125 

126class AgedJobHandlerOptions(NamedTuple): 

127 job_max_age: timedelta = timedelta(days=30) 

128 handling_period: timedelta = timedelta(minutes=5) 

129 max_handling_window: int = 10000 

130 

131 @staticmethod 

132 def from_config( 

133 job_max_age_cfg: dict[str, float], 

134 handling_period_cfg: dict[str, float] | None = None, 

135 max_handling_window_cfg: int | None = None, 

136 ) -> "AgedJobHandlerOptions": 

137 """Helper method for creating ``AgedJobHandlerOptions`` objects 

138 If input configs are None, assign defaults""" 

139 

140 def _dict_to_timedelta(config: dict[str, float]) -> timedelta: 

141 return timedelta( 

142 weeks=config.get("weeks", 0), 

143 days=config.get("days", 0), 

144 hours=config.get("hours", 0), 

145 minutes=config.get("minutes", 0), 

146 seconds=config.get("seconds", 0), 

147 ) 

148 

149 return AgedJobHandlerOptions( 

150 job_max_age=_dict_to_timedelta(job_max_age_cfg) if job_max_age_cfg else timedelta(days=30), 

151 handling_period=_dict_to_timedelta(handling_period_cfg) if handling_period_cfg else timedelta(minutes=5), 

152 max_handling_window=max_handling_window_cfg if max_handling_window_cfg else 10000, 

153 ) 

154 

155 

156T = TypeVar("T", bound="Scheduler") 

157 

158 

159class Scheduler: 

160 RETRYABLE_STATUS_CODES = (code_pb2.INTERNAL, code_pb2.UNAVAILABLE) 

161 

162 def __init__( 

163 self, 

164 sql_provider: SqlProvider, 

165 storage: StorageABC, 

166 *, 

167 sql_ro_provider: SqlProvider | None = None, 

168 sql_notifier_provider: SqlProvider | None = None, 

169 property_set: PropertySet, 

170 action_cache: ActionCacheABC | None = None, 

171 action_browser_url: str | None = None, 

172 max_execution_timeout: int = DEFAULT_MAX_EXECUTION_TIMEOUT, 

173 metering_client: SyncMeteringServiceClient | None = None, 

174 metering_throttle_action: MeteringThrottleAction | None = None, 

175 bot_session_keepalive_timeout: int = 600, 

176 logstream_channel: Channel | None = None, 

177 logstream_instance: str | None = None, 

178 asset_client: AssetClient | None = None, 

179 queued_action_retention_hours: float | None = None, 

180 completed_action_retention_hours: float | None = None, 

181 action_result_retention_hours: float | None = None, 

182 enable_job_watcher: bool = False, 

183 poll_interval: float = 1, 

184 pruning_options: AgedJobHandlerOptions | None = None, 

185 queue_timeout_options: AgedJobHandlerOptions | None = None, 

186 max_job_attempts: int = 5, 

187 job_assignment_interval: float = 1.0, 

188 priority_assignment_percentage: int = 100, 

189 max_queue_size: int | None = None, 

190 execution_timer_interval: float = 60.0, 

191 session_expiry_timer_interval: float = 10.0, 

192 ) -> None: 

193 self._stack = ExitStack() 

194 

195 self.storage = storage 

196 

197 self.poll_interval = poll_interval 

198 self.execution_timer_interval = execution_timer_interval 

199 self.session_expiry_interval = session_expiry_timer_interval 

200 self.pruning_options = pruning_options 

201 self.queue_timeout_options = queue_timeout_options 

202 self.max_job_attempts = max_job_attempts 

203 

204 self._sql = sql_provider 

205 self._sql_ro = sql_ro_provider or sql_provider 

206 self._sql_notifier = sql_notifier_provider or sql_provider 

207 

208 self.property_set = property_set 

209 

210 self.action_cache = action_cache 

211 self.action_browser_url = (action_browser_url or "").rstrip("/") 

212 self.max_execution_timeout = max_execution_timeout 

213 self.enable_job_watcher = enable_job_watcher 

214 self.metering_client = metering_client 

215 self.metering_throttle_action = metering_throttle_action or MeteringThrottleAction.DEPRIORITIZE 

216 self.bot_session_keepalive_timeout = bot_session_keepalive_timeout 

217 self.logstream_channel = logstream_channel 

218 self.logstream_instance = logstream_instance 

219 self.asset_client = asset_client 

220 self.queued_action_retention_hours = queued_action_retention_hours 

221 self.completed_action_retention_hours = completed_action_retention_hours 

222 self.action_result_retention_hours = action_result_retention_hours 

223 self.max_queue_size = max_queue_size 

224 

225 # Overall Scheduler Metrics (totals of jobs/leases in each state) 

226 # Publish those metrics a bit more sparsely since the SQL requests 

227 # required to gather them can become expensive 

228 self._last_scheduler_metrics_publish_time: datetime | None = None 

229 self._scheduler_metrics_publish_interval = timedelta(seconds=SQL_SCHEDULER_METRICS_PUBLISH_INTERVAL_SECONDS) 

230 

231 self.ops_notifier = OperationsNotifier(self._sql_notifier, self.poll_interval) 

232 self.prune_timer = ContextWorker(name="JobPruner", target=self.prune_timer_loop) 

233 self.queue_timer = ContextWorker(name="QueueTimeout", target=self.queue_timer_loop) 

234 self.execution_timer = ContextWorker(name="ExecutionTimeout", target=self.execution_timer_loop) 

235 self.session_expiry_timer = ContextWorker(self.session_expiry_timer_loop, "BotReaper") 

236 self.job_assigner = JobAssigner( 

237 self, 

238 property_set=property_set, 

239 job_assignment_interval=job_assignment_interval, 

240 priority_percentage=priority_assignment_percentage, 

241 ) 

242 

243 def __repr__(self) -> str: 

244 return f"Scheduler for `{repr(self._sql._engine.url)}`" 

245 

246 def __enter__(self: T) -> T: 

247 self.start() 

248 return self 

249 

250 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

251 self.stop() 

252 

253 def start(self) -> None: 

254 self._stack.enter_context(self.storage) 

255 if self.action_cache: 

256 self._stack.enter_context(self.action_cache) 

257 

258 if self.logstream_channel: 

259 self._stack.enter_context(self.logstream_channel) 

260 if self.asset_client: 

261 self._stack.enter_context(self.asset_client) 

262 # Pruning configuration parameters 

263 if self.pruning_options is not None: 

264 LOGGER.info(f"Scheduler pruning enabled: {self.pruning_options}") 

265 self._stack.enter_context(self.prune_timer) 

266 else: 

267 LOGGER.info("Scheduler pruning not enabled.") 

268 

269 # Queue timeout thread 

270 if self.queue_timeout_options is not None: 

271 LOGGER.info(f"Job queue timeout enabled: {self.queue_timeout_options}") 

272 self._stack.enter_context(self.queue_timer) 

273 else: 

274 LOGGER.info("Job queue timeout not enabled.") 

275 

276 if self.execution_timer_interval > 0: 

277 self._stack.enter_context(self.execution_timer) 

278 if self.poll_interval > 0: 

279 self._stack.enter_context(self.ops_notifier) 

280 

281 def stop(self) -> None: 

282 self._stack.close() 

283 LOGGER.info("Stopped Scheduler.") 

284 

285 def _job_in_instance(self) -> ColumnExpressionArgument[bool]: 

286 return JobEntry.instance_name == current_instance() 

287 

288 def _bot_in_instance(self) -> ColumnExpressionArgument[bool]: 

289 return BotEntry.instance_name == current_instance() 

290 

291 def queue_job_action( 

292 self, 

293 *, 

294 action: Action, 

295 action_digest: Digest, 

296 command: Command, 

297 platform_requirements: dict[str, list[str]], 

298 property_label: str, 

299 priority: int, 

300 skip_cache_lookup: bool, 

301 request_metadata: RequestMetadata | None = None, 

302 client_identity: ClientIdentityEntry | None = None, 

303 ) -> str: 

304 """ 

305 De-duplicates or inserts a newly created job into the execution queue. 

306 Returns an operation name associated with this job. 

307 """ 

308 if self.max_execution_timeout and action.timeout.seconds > self.max_execution_timeout: 

309 raise InvalidArgumentError("Action timeout is larger than the server's maximum execution timeout.") 

310 

311 if not action.do_not_cache: 

312 if operation_name := self.create_operation_for_existing_job( 

313 action_digest=action_digest, 

314 priority=priority, 

315 request_metadata=request_metadata, 

316 client_identity=client_identity, 

317 ): 

318 return operation_name 

319 

320 # If there was another job already in the action cache, we can check now. 

321 # We can use this entry to create a job and create it already completed! 

322 execute_response: ExecuteResponse | None = None 

323 if self.action_cache and not action.do_not_cache and not skip_cache_lookup: 

324 try: 

325 action_result = self.action_cache.get_action_result(action_digest) 

326 LOGGER.info("Job cache hit for action.", tags=dict(digest=action_digest)) 

327 execute_response = ExecuteResponse() 

328 execute_response.result.CopyFrom(action_result) 

329 execute_response.cached_result = True 

330 except NotFoundError: 

331 pass 

332 except Exception: 

333 LOGGER.exception("Checking ActionCache for action failed.", tags=dict(digest=action_digest)) 

334 

335 # Extend retention for action 

336 self._update_action_retention(action, action_digest, self.queued_action_retention_hours) 

337 

338 return self.create_operation_for_new_job( 

339 action=action, 

340 action_digest=action_digest, 

341 command=command, 

342 execute_response=execute_response, 

343 platform_requirements=platform_requirements, 

344 property_label=property_label, 

345 priority=priority, 

346 request_metadata=request_metadata, 

347 client_identity=client_identity, 

348 ) 

349 

350 def create_operation_for_existing_job( 

351 self, 

352 *, 

353 action_digest: Digest, 

354 priority: int, 

355 request_metadata: RequestMetadata | None, 

356 client_identity: ClientIdentityEntry | None, 

357 ) -> str | None: 

358 # Find a job with a matching action that isn't completed or cancelled and that can be cached. 

359 find_existing_stmt = ( 

360 select(JobEntry) 

361 .where( 

362 JobEntry.action_digest == digest_to_string(action_digest), 

363 JobEntry.stage != OperationStage.COMPLETED.value, 

364 JobEntry.cancelled != True, # noqa: E712 

365 JobEntry.do_not_cache != True, # noqa: E712 

366 self._job_in_instance(), 

367 ) 

368 .with_for_update() 

369 ) 

370 

371 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session: 

372 if not (job := session.execute(find_existing_stmt).scalars().first()): 

373 return None 

374 

375 # Reschedule if priority is now greater, and we're still waiting on it to start. 

376 if priority < job.priority and job.stage == OperationStage.QUEUED.value: 

377 LOGGER.info("Job assigned a new priority.", tags=dict(job_name=job.name, priority=priority)) 

378 job.priority = priority 

379 job.assigned = False 

380 

381 return self._create_operation( 

382 session, 

383 job_name=job.name, 

384 request_metadata=request_metadata, 

385 client_identity=client_identity, 

386 ) 

387 

388 def create_operation_for_new_job( 

389 self, 

390 *, 

391 action: Action, 

392 action_digest: Digest, 

393 command: Command, 

394 execute_response: ExecuteResponse | None, 

395 platform_requirements: dict[str, list[str]], 

396 property_label: str, 

397 priority: int, 

398 request_metadata: RequestMetadata | None = None, 

399 client_identity: ClientIdentityEntry | None = None, 

400 ) -> str: 

401 if execute_response is None and self.max_queue_size is not None: 

402 # Using func.count here to avoid generating a subquery in the WHERE 

403 # clause of the resulting query. 

404 # https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.query.Query.count 

405 queue_count_statement = select(func.count(JobEntry.name)).where( 

406 JobEntry.assigned != True, # noqa: E712 

407 self._job_in_instance(), 

408 JobEntry.property_label == property_label, 

409 JobEntry.stage == OperationStage.QUEUED.value, 

410 ) 

411 else: 

412 queue_count_statement = None 

413 

414 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session: 

415 if queue_count_statement is not None: 

416 queue_size = session.execute(queue_count_statement).scalar_one() 

417 if self.max_queue_size is not None and queue_size >= self.max_queue_size: 

418 raise ResourceExhaustedError(f"The platform's job queue is full: {property_label=}") 

419 

420 now = datetime.utcnow() 

421 job = JobEntry( 

422 instance_name=current_instance(), 

423 name=str(uuid.uuid4()), 

424 action=action.SerializeToString(), 

425 action_digest=digest_to_string(action_digest), 

426 do_not_cache=action.do_not_cache, 

427 priority=priority, 

428 stage=OperationStage.QUEUED.value, 

429 create_timestamp=now, 

430 queued_timestamp=now, 

431 command=" ".join(command.arguments), 

432 platform_requirements=hash_from_dict(platform_requirements), 

433 platform=self._populate_platform_requirements(session, platform_requirements), 

434 property_label=property_label, 

435 n_tries=1, 

436 ) 

437 if execute_response: 

438 job.stage = OperationStage.COMPLETED.value 

439 job.result = digest_to_string(self.storage.put_message(execute_response)) 

440 job.status_code = execute_response.status.code 

441 job.worker_completed_timestamp = datetime.utcnow() 

442 

443 session.add(job) 

444 

445 return self._create_operation( 

446 session, 

447 job_name=job.name, 

448 request_metadata=request_metadata, 

449 client_identity=client_identity, 

450 ) 

451 

452 def _populate_platform_requirements( 

453 self, session: Session, platform_requirements: dict[str, list[str]] 

454 ) -> list[PlatformEntry]: 

455 if not platform_requirements: 

456 return [] 

457 

458 required_entries = {(k, v) for k, values in platform_requirements.items() for v in values} 

459 conditions = [and_(PlatformEntry.key == k, PlatformEntry.value == v) for k, v in required_entries] 

460 statement = select(PlatformEntry.key, PlatformEntry.value).where(or_(*conditions)) 

461 

462 while missing := required_entries - {(k, v) for [k, v] in session.execute(statement).all()}: 

463 try: 

464 session.execute(insert(PlatformEntry), [{"key": k, "value": v} for k, v in missing]) 

465 session.commit() 

466 except IntegrityError: 

467 session.rollback() 

468 

469 return list(session.execute(select(PlatformEntry).where(or_(*conditions))).scalars()) 

470 

471 def create_operation( 

472 self, 

473 job_name: str, 

474 *, 

475 request_metadata: RequestMetadata | None = None, 

476 client_identity: ClientIdentityEntry | None = None, 

477 ) -> str: 

478 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session: 

479 if not (job := self._get_job(job_name, session, with_for_update=True)): 

480 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

481 

482 if job.cancelled: 

483 raise CancelledError(f"Job {job_name} is cancelled") 

484 

485 return self._create_operation( 

486 session, job_name=job_name, request_metadata=request_metadata, client_identity=client_identity 

487 ) 

488 

489 def _create_operation( 

490 self, 

491 session: Session, 

492 *, 

493 job_name: str, 

494 request_metadata: RequestMetadata | None, 

495 client_identity: ClientIdentityEntry | None, 

496 ) -> str: 

497 

498 client_identity_id: int | None = None 

499 if client_identity: 

500 client_identity_id = self.get_or_create_client_identity_in_store(session, client_identity).id 

501 

502 request_metadata_id: int | None = None 

503 if request_metadata: 

504 request_metadata_id = self.get_or_create_request_metadata_in_store(session, request_metadata).id 

505 

506 request_metadata = request_metadata or RequestMetadata() 

507 operation = OperationEntry( 

508 name=str(uuid.uuid4()), 

509 job_name=job_name, 

510 client_identity_id=client_identity_id, 

511 request_metadata_id=request_metadata_id, 

512 ) 

513 session.add(operation) 

514 return operation.name 

515 

516 def load_operation(self, operation_name: str) -> Operation: 

517 statement = ( 

518 select(OperationEntry).join(JobEntry).where(OperationEntry.name == operation_name, self._job_in_instance()) 

519 ) 

520 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

521 if op := session.execute(statement).scalars().first(): 

522 return self._load_operation(op) 

523 

524 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

525 

526 def _load_operation(self, op: OperationEntry) -> Operation: 

527 job: JobEntry = op.job 

528 

529 operation = operations_pb2.Operation( 

530 name=op.name, 

531 done=job.stage == OperationStage.COMPLETED.value or op.cancelled or job.cancelled, 

532 ) 

533 metadata = ExecuteOperationMetadata( 

534 stage=OperationStage.COMPLETED.value if operation.done else job.stage, # type: ignore[arg-type] 

535 action_digest=string_to_digest(job.action_digest), 

536 stderr_stream_name=job.stderr_stream_name or "", 

537 stdout_stream_name=job.stdout_stream_name or "", 

538 partial_execution_metadata=self.get_execute_action_metadata(job), 

539 ) 

540 operation.metadata.Pack(metadata) 

541 

542 if job.cancelled or op.cancelled: 

543 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.CANCELLED)) 

544 elif job.status_code is not None and job.status_code != code_pb2.OK: 

545 operation.error.CopyFrom(status_pb2.Status(code=job.status_code)) 

546 

547 execute_response: ExecuteResponse | None = None 

548 if job.result: 

549 result_digest = string_to_digest(job.result) 

550 execute_response = self.storage.get_message(result_digest, ExecuteResponse) 

551 if not execute_response: 

552 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.DATA_LOSS)) 

553 elif job.cancelled: 

554 execute_response = ExecuteResponse( 

555 status=status_pb2.Status(code=code_pb2.CANCELLED, message="Execution cancelled") 

556 ) 

557 

558 if execute_response: 

559 if self.action_browser_url: 

560 execute_response.message = f"{self.action_browser_url}/action/{job.action_digest}/" 

561 operation.response.Pack(execute_response) 

562 

563 return operation 

564 

565 def _get_job(self, job_name: str, session: Session, with_for_update: bool = False) -> JobEntry | None: 

566 statement = select(JobEntry).where(JobEntry.name == job_name, self._job_in_instance()) 

567 if with_for_update: 

568 statement = statement.with_for_update() 

569 

570 job: JobEntry | None = session.execute(statement).scalars().first() 

571 if job: 

572 LOGGER.debug( 

573 "Loaded job from db.", 

574 tags=dict(job_name=job_name, job_stage=job.stage, result=job.result, instance_name=job.instance_name), 

575 ) 

576 

577 return job 

578 

579 def get_operation_job_name(self, operation_name: str) -> str | None: 

580 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

581 if operation := self._get_operation(operation_name, session): 

582 return operation.job_name 

583 return None 

584 

585 def get_operation_request_metadata_by_name(self, operation_name: str) -> RequestMetadata | None: 

586 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

587 operation = self._get_operation(operation_name, session) 

588 if not operation or not operation.request_metadata: 

589 return None 

590 

591 metadata = RequestMetadata( 

592 tool_details=ToolDetails( 

593 tool_name=operation.request_metadata.tool_name or "", 

594 tool_version=operation.request_metadata.tool_version or "", 

595 ), 

596 action_id=operation.job.action_digest, 

597 correlated_invocations_id=operation.request_metadata.correlated_invocations_id or "", 

598 tool_invocation_id=operation.request_metadata.invocation_id or "", 

599 action_mnemonic=operation.request_metadata.action_mnemonic or "", 

600 configuration_id=operation.request_metadata.configuration_id or "", 

601 target_id=operation.request_metadata.target_id or "", 

602 ) 

603 

604 return metadata 

605 

606 def get_client_identity_by_operation(self, operation_name: str) -> ClientIdentity | None: 

607 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

608 operation = self._get_operation(operation_name, session) 

609 if not operation or not operation.client_identity: 

610 return None 

611 

612 return ClientIdentity( 

613 actor=operation.client_identity.actor or "", 

614 subject=operation.client_identity.subject or "", 

615 workflow=operation.client_identity.workflow or "", 

616 ) 

617 

618 def _notify_job_updated(self, job_names: str | list[str], session: Session) -> None: 

619 if self._sql.dialect == "postgresql": 

620 if isinstance(job_names, str): 

621 job_names = [job_names] 

622 for job_name in job_names: 

623 session.execute(text(f"NOTIFY job_updated, '{job_name}';")) 

624 

625 def _get_operation(self, operation_name: str, session: Session) -> OperationEntry | None: 

626 statement = ( 

627 select(OperationEntry).join(JobEntry).where(OperationEntry.name == operation_name, self._job_in_instance()) 

628 ) 

629 return session.execute(statement).scalars().first() 

630 

631 def _batch_timeout_jobs(self, job_select_stmt: Select[Any], status_code: int, message: str) -> int: 

632 """Timeout all jobs selected by a query""" 

633 with self._sql.session(sqlite_lock_immediately=True, exceptions_to_not_raise_on=[Exception]) as session: 

634 # Get the full list of jobs to timeout 

635 jobs = [job.name for job in session.execute(job_select_stmt).scalars().all()] 

636 

637 if jobs: 

638 # Put response binary 

639 response = remote_execution_pb2.ExecuteResponse( 

640 status=status_pb2.Status(code=status_code, message=message) 

641 ) 

642 response_binary = response.SerializeToString() 

643 response_digest = create_digest(response_binary) 

644 self.storage.bulk_update_blobs([(response_digest, response_binary)]) 

645 

646 # Update response 

647 stmt_timeout_jobs = ( 

648 update(JobEntry) 

649 .where(JobEntry.name.in_(jobs)) 

650 .values( 

651 stage=OperationStage.COMPLETED.value, 

652 status_code=status_code, 

653 result=digest_to_string(response_digest), 

654 ) 

655 ) 

656 session.execute(stmt_timeout_jobs) 

657 

658 # Notify all jobs updated 

659 self._notify_job_updated(jobs, session) 

660 return len(jobs) 

661 

662 def execution_timer_loop(self, shutdown_requested: threading.Event) -> None: 

663 """Periodically timeout aged executing jobs""" 

664 while not shutdown_requested.is_set(): 

665 try: 

666 self.cancel_jobs_exceeding_execution_timeout(self.max_execution_timeout) 

667 except Exception as e: 

668 LOGGER.exception("Failed to timeout aged executing jobs.", exc_info=e) 

669 shutdown_requested.wait(timeout=self.execution_timer_interval) 

670 

671 @timed(METRIC.SCHEDULER.EXECUTION_TIMEOUT_DURATION) 

672 def cancel_jobs_exceeding_execution_timeout(self, max_execution_timeout: int | None = None) -> None: 

673 if not max_execution_timeout: 

674 return 

675 

676 # Get the full list of jobs exceeding execution timeout 

677 stale_jobs_statement = ( 

678 select(JobEntry) 

679 .where( 

680 JobEntry.stage == OperationStage.EXECUTING.value, 

681 JobEntry.worker_start_timestamp <= datetime.utcnow() - timedelta(seconds=max_execution_timeout), 

682 ) 

683 .with_for_update(skip_locked=True) 

684 ) 

685 with self._sql.session(sqlite_lock_immediately=True, exceptions_to_not_raise_on=[Exception]) as session: 

686 jobs = session.execute(stale_jobs_statement).scalars().all() 

687 if not jobs: 

688 return 

689 

690 response = remote_execution_pb2.ExecuteResponse( 

691 status=status_pb2.Status( 

692 code=code_pb2.DEADLINE_EXCEEDED, 

693 message="Execution didn't finish within timeout threshold", 

694 ) 

695 ) 

696 response_binary = response.SerializeToString() 

697 response_digest = create_digest(response_binary) 

698 

699 # When running with a proxying client, we might need to specify instance. 

700 with instance_context(jobs[0].instance_name): 

701 self.storage.bulk_update_blobs([(response_digest, response_binary)]) 

702 

703 for job in jobs: 

704 executing_duration = datetime.utcnow() - (job.worker_start_timestamp or datetime.utcnow()) 

705 LOGGER.warning( 

706 "Job has been executing for too long. Cancelling.", 

707 tags=dict( 

708 job_name=job.name, 

709 executing_duration=executing_duration, 

710 max_execution_timeout=max_execution_timeout, 

711 ), 

712 ) 

713 for op in job.operations: 

714 op.cancelled = True 

715 for lease in job.active_leases: 

716 lease.state = LeaseState.CANCELLED.value 

717 job.worker_completed_timestamp = datetime.utcnow() 

718 job.stage = OperationStage.COMPLETED.value 

719 job.cancelled = True 

720 job.result = digest_to_string(response_digest) 

721 

722 for job in jobs: 

723 self._notify_job_updated(job.name, session) 

724 

725 publish_counter_metric(METRIC.SCHEDULER.EXECUTION_TIMEOUT_COUNT, len(jobs)) 

726 

727 def cancel_operation(self, operation_name: str) -> None: 

728 statement = ( 

729 select(JobEntry) 

730 .join(OperationEntry) 

731 .where(OperationEntry.name == operation_name, self._job_in_instance()) 

732 .with_for_update() 

733 ) 

734 with self._sql.session() as session: 

735 if not (job := session.execute(statement).scalars().first()): 

736 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

737 

738 if job.stage == OperationStage.COMPLETED.value or job.cancelled: 

739 return 

740 

741 for op in job.operations: 

742 if op.name == operation_name: 

743 if op.cancelled: 

744 return 

745 op.cancelled = True 

746 

747 if all(op.cancelled for op in job.operations): 

748 for lease in job.active_leases: 

749 lease.state = LeaseState.CANCELLED.value 

750 job.worker_completed_timestamp = datetime.utcnow() 

751 job.stage = OperationStage.COMPLETED.value 

752 job.cancelled = True 

753 

754 self._notify_job_updated(job.name, session) 

755 

756 def list_operations( 

757 self, 

758 operation_filters: list[OperationFilter] | None = None, 

759 page_size: int | None = None, 

760 page_token: str | None = None, 

761 ) -> tuple[list[operations_pb2.Operation], str]: 

762 # Build filters and sort order 

763 sort_keys = DEFAULT_SORT_KEYS 

764 custom_filters = None 

765 platform_filters = [] 

766 if operation_filters: 

767 # Extract custom sort order (if present) 

768 specified_sort_keys, non_sort_filters = extract_sort_keys(operation_filters) 

769 

770 # Only override sort_keys if there were sort keys actually present in the filter string 

771 if specified_sort_keys: 

772 sort_keys = specified_sort_keys 

773 # Attach the operation name as a sort key for a deterministic order 

774 # This will ensure that the ordering of results is consistent between queries 

775 if not any(sort_key.name == "name" for sort_key in sort_keys): 

776 sort_keys.append(SortKey(name="name", descending=False)) 

777 

778 # Finally, compile the non-sort filters into a filter list 

779 custom_filters = build_custom_filters(non_sort_filters) 

780 platform_filters = [f for f in non_sort_filters if f.parameter == "platform"] 

781 

782 sort_columns = build_sort_column_list(sort_keys) 

783 

784 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

785 statement = ( 

786 select(OperationEntry) 

787 .join(JobEntry, OperationEntry.job_name == JobEntry.name) 

788 .outerjoin(RequestMetadataEntry) 

789 .outerjoin(ClientIdentityEntry) 

790 ) 

791 statement = statement.filter(self._job_in_instance()) 

792 

793 # If we're filtering by platform, filter using a subquery containing job names 

794 # which match the specified platform properties. 

795 # 

796 # NOTE: A platform filter using `!=` will return only jobs which set that platform 

797 # property to an explicitly different value; jobs which don't set the property are 

798 # filtered out. 

799 if platform_filters: 

800 platform_clauses = [] 

801 for platform_filter in platform_filters: 

802 key, value = platform_filter.value.split(":", 1) 

803 platform_clauses.append( 

804 and_(PlatformEntry.key == key, platform_filter.operator(PlatformEntry.value, value)) 

805 ) 

806 

807 job_name_subquery = ( 

808 select(job_platform_association.c.job_name) 

809 .filter( 

810 job_platform_association.c.platform_id.in_( 

811 select(PlatformEntry.id).filter(or_(*platform_clauses)) 

812 ) 

813 ) 

814 .group_by(job_platform_association.c.job_name) 

815 .having(func.count() == len(platform_filters)) 

816 ) 

817 statement = statement.filter(JobEntry.name.in_(job_name_subquery)) 

818 

819 # Apply custom filters (if present) 

820 if custom_filters: 

821 statement = statement.filter(*custom_filters) 

822 

823 # Apply sort order 

824 statement = statement.order_by(*sort_columns) 

825 

826 # Apply pagination filter 

827 if page_token: 

828 page_filter = build_page_filter(page_token, sort_keys) 

829 statement = statement.filter(page_filter) 

830 if page_size: 

831 # We limit the number of operations we fetch to the page_size. However, we 

832 # fetch an extra operation to determine whether we need to provide a 

833 # next_page_token. 

834 statement = statement.limit(page_size + 1) 

835 

836 operations = list(session.execute(statement).scalars().all()) 

837 

838 if not page_size or not operations: 

839 next_page_token = "" 

840 

841 # If the number of results we got is less than or equal to our page_size, 

842 # we're done with the operations listing and don't need to provide another 

843 # page token 

844 elif len(operations) <= page_size: 

845 next_page_token = "" 

846 else: 

847 # Drop the last operation since we have an extra 

848 operations.pop() 

849 # Our page token will be the last row of our set 

850 next_page_token = build_page_token(operations[-1], sort_keys) 

851 return [self._load_operation(operation) for operation in operations], next_page_token 

852 

853 def list_workers(self, name_filter: str, page_number: int, page_size: int) -> tuple[list[BotEntry], int]: 

854 stmt = select(BotEntry, func.count().over().label("total")) 

855 stmt = stmt.options(selectinload(BotEntry.job).selectinload(JobEntry.operations)) 

856 stmt = stmt.where( 

857 or_( 

858 BotEntry.name.ilike(f"%{name_filter}%"), 

859 BotEntry.bot_id.ilike(f"%{name_filter}%"), 

860 ), 

861 BotEntry.instance_name == current_instance(), 

862 ) 

863 stmt = stmt.order_by(BotEntry.bot_id) 

864 

865 if page_size: 

866 stmt = stmt.limit(page_size) 

867 if page_number > 1: 

868 stmt = stmt.offset((page_number - 1) * page_size) 

869 

870 with self._sql.scoped_session() as session: 

871 results = session.execute(stmt).all() 

872 count = cast(int, results[0].total) if results else 0 

873 session.expunge_all() 

874 

875 return [r[0] for r in results], count 

876 

877 def get_metrics(self) -> SchedulerMetrics | None: 

878 # Skip publishing overall scheduler metrics if we have recently published them 

879 last_publish_time = self._last_scheduler_metrics_publish_time 

880 time_since_publish = None 

881 if last_publish_time: 

882 time_since_publish = datetime.utcnow() - last_publish_time 

883 if time_since_publish and time_since_publish < self._scheduler_metrics_publish_interval: 

884 # Published too recently, skip 

885 return None 

886 

887 metrics: SchedulerMetrics = {} 

888 # metrics to gather: (category_name, function_returning_query, callback_function) 

889 

890 try: 

891 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session: 

892 # To utilize "ix_jobs_stage_property_label" B-tree index we query 

893 # `stage < COMPLETED.value` rather than `stage != COMPLETED.value`. 

894 results = session.execute( 

895 select( 

896 JobEntry.stage.label("job_stage"), 

897 JobEntry.property_label.label("property_label"), 

898 func.count(JobEntry.name).label("job_count"), 

899 ) 

900 .where(JobEntry.stage < OperationStage.COMPLETED.value) 

901 .group_by(JobEntry.stage, JobEntry.property_label), 

902 ).all() 

903 

904 jobs_metrics = {} 

905 for stage in OperationStage: 

906 if stage != OperationStage.COMPLETED: 

907 jobs_metrics[stage.name, "unknown"] = 0 

908 

909 for job_stage, property_label, job_count in results: 

910 jobs_metrics[OperationStage(job_stage).name, property_label] = cast(int, job_count) 

911 

912 metrics["jobs"] = jobs_metrics 

913 except DatabaseError: 

914 LOGGER.warning("Unable to gather metrics due to a Database Error.") 

915 return {} 

916 

917 # This is only updated within the metrics asyncio loop; no race conditions 

918 self._last_scheduler_metrics_publish_time = datetime.utcnow() 

919 

920 return metrics 

921 

922 def _queued_jobs_by_capability(self, capability_hash: str) -> Select[Any]: 

923 return ( 

924 select(JobEntry) 

925 .with_for_update(skip_locked=True) 

926 .where( 

927 JobEntry.assigned != True, # noqa: E712 

928 self._job_in_instance(), 

929 JobEntry.platform_requirements == capability_hash, 

930 JobEntry.stage == OperationStage.QUEUED.value, 

931 ) 

932 ) 

933 

934 def assign_n_leases_by_priority( 

935 self, 

936 *, 

937 capability_hash: str, 

938 bot_names: list[str], 

939 ) -> list[str]: 

940 job_statement = self._queued_jobs_by_capability(capability_hash).order_by( 

941 JobEntry.priority, JobEntry.queued_timestamp 

942 ) 

943 return self._assign_n_leases(job_statement=job_statement, bot_names=bot_names) 

944 

945 def assign_n_leases_by_age( 

946 self, 

947 *, 

948 capability_hash: str, 

949 bot_names: list[str], 

950 ) -> list[str]: 

951 job_statement = self._queued_jobs_by_capability(capability_hash).order_by(JobEntry.queued_timestamp) 

952 return self._assign_n_leases(job_statement=job_statement, bot_names=bot_names) 

953 

954 @timed(METRIC.SCHEDULER.ASSIGNMENT_DURATION) 

955 def _assign_n_leases(self, *, job_statement: Select[Any], bot_names: list[str]) -> list[str]: 

956 bot_statement = ( 

957 select(BotEntry) 

958 .with_for_update(skip_locked=True) 

959 .where( 

960 BotEntry.lease_id.is_(None), 

961 self._bot_in_instance(), 

962 BotEntry.name.in_(bot_names), 

963 BotEntry.expiry_time > datetime.utcnow(), 

964 ) 

965 ) 

966 

967 try: 

968 with self._sql.session(sqlite_lock_immediately=True, exceptions_to_not_raise_on=[Exception]) as session: 

969 jobs = session.execute(job_statement.limit(len(bot_names))).scalars().all() 

970 bots = session.execute(bot_statement.limit(len(jobs))).scalars().all() 

971 

972 assigned_bot_names: list[str] = [] 

973 for job, bot in zip(jobs, bots): 

974 job.assigned = True 

975 job.queued_time_duration = int((datetime.utcnow() - job.queued_timestamp).total_seconds()) 

976 job.worker_start_timestamp = datetime.utcnow() 

977 job.worker_completed_timestamp = None 

978 bot.lease_id = job.name 

979 bot.last_update_timestamp = datetime.utcnow() 

980 if job.active_leases: 

981 lease = job.active_leases[0] 

982 LOGGER.debug( 

983 "Reassigned existing lease.", 

984 tags=dict( 

985 job_name=job.name, 

986 bot_id=bot.bot_id, 

987 bot_name=bot.name, 

988 prev_lease_state=lease.state, 

989 prev_lease_status=lease.status, 

990 prev_bot_id=lease.worker_name, 

991 ), 

992 ) 

993 lease.state = LeaseState.PENDING.value 

994 lease.status = None 

995 lease.worker_name = bot.bot_id 

996 else: 

997 LOGGER.debug( 

998 "Assigned new lease.", tags=dict(job_name=job.name, bot_id=bot.bot_id, bot_name=bot.name) 

999 ) 

1000 session.add( 

1001 LeaseEntry( 

1002 job_name=job.name, 

1003 state=LeaseState.PENDING.value, 

1004 status=None, 

1005 worker_name=bot.bot_id, 

1006 ) 

1007 ) 

1008 assigned_bot_names.append(bot.name) 

1009 

1010 return assigned_bot_names 

1011 except DatabaseError: 

1012 LOGGER.warning("Will not assign any leases this time due to a Database Error.") 

1013 return [] 

1014 

1015 def queue_timer_loop(self, shutdown_requested: threading.Event) -> None: 

1016 """Periodically timeout aged queued jobs""" 

1017 

1018 if not (opts := self.queue_timeout_options): 

1019 return 

1020 

1021 job_max_age = opts.job_max_age 

1022 period = opts.handling_period 

1023 limit = opts.max_handling_window 

1024 

1025 last_timeout_time = datetime.utcnow() 

1026 while not shutdown_requested.is_set(): 

1027 now = datetime.utcnow() 

1028 if now - last_timeout_time < period: 

1029 LOGGER.info(f"Job queue timeout thread sleeping for {period} seconds") 

1030 shutdown_requested.wait(timeout=period.total_seconds()) 

1031 continue 

1032 

1033 timeout_jobs_scheduled_before = now - job_max_age 

1034 try: 

1035 with timer(METRIC.SCHEDULER.QUEUE_TIMEOUT_DURATION): 

1036 num_timeout = self._timeout_queued_jobs_scheduled_before(timeout_jobs_scheduled_before, limit) 

1037 LOGGER.info(f"Timed-out {num_timeout} queued jobs scheduled before {timeout_jobs_scheduled_before}") 

1038 if num_timeout > 0: 

1039 publish_counter_metric(METRIC.SCHEDULER.QUEUE_TIMEOUT_COUNT, num_timeout) 

1040 

1041 except Exception as e: 

1042 LOGGER.exception("Failed to timeout aged queued jobs.", exc_info=e) 

1043 finally: 

1044 last_timeout_time = now 

1045 

1046 def _timeout_queued_jobs_scheduled_before(self, dt: datetime, limit: int) -> int: 

1047 jobs_to_timeout_stmt = ( 

1048 select(JobEntry) 

1049 .where(JobEntry.stage == OperationStage.QUEUED.value) 

1050 .where(JobEntry.queued_timestamp < dt) 

1051 .limit(limit) 

1052 ) 

1053 return self._batch_timeout_jobs( 

1054 jobs_to_timeout_stmt, code_pb2.UNAVAILABLE, "Operation has been queued for too long" 

1055 ) 

1056 

1057 def prune_timer_loop(self, shutdown_requested: threading.Event) -> None: 

1058 """Running in a background thread, this method wakes up periodically and deletes older records 

1059 from the jobs tables using configurable parameters""" 

1060 

1061 if not (opts := self.pruning_options): 

1062 return 

1063 

1064 job_max_age = opts.job_max_age 

1065 pruning_period = opts.handling_period 

1066 limit = opts.max_handling_window 

1067 

1068 utc_last_prune_time = datetime.utcnow() 

1069 while not shutdown_requested.is_set(): 

1070 utcnow = datetime.utcnow() 

1071 if (utcnow - pruning_period) < utc_last_prune_time: 

1072 LOGGER.info(f"Pruner thread sleeping for {pruning_period}(until {utcnow + pruning_period})") 

1073 shutdown_requested.wait(timeout=pruning_period.total_seconds()) 

1074 continue 

1075 

1076 delete_before_datetime = utcnow - job_max_age 

1077 try: 

1078 num_rows = self._delete_jobs_prior_to(delete_before_datetime, limit) 

1079 LOGGER.info(f"Pruned {num_rows} row(s) from the jobs table older than {delete_before_datetime}") 

1080 except Exception: 

1081 LOGGER.exception("Caught exception while deleting jobs records.") 

1082 finally: 

1083 # Update even if error occurred to avoid potentially infinitely retrying 

1084 utc_last_prune_time = utcnow 

1085 

1086 LOGGER.info("Exiting pruner thread.") 

1087 

1088 @timed(METRIC.SCHEDULER.PRUNE_DURATION) 

1089 def _delete_jobs_prior_to(self, delete_before_datetime: datetime, limit: int) -> int: 

1090 """Deletes older records from the jobs tables constrained by `delete_before_datetime` and `limit`""" 

1091 delete_stmt = delete(JobEntry).where( 

1092 JobEntry.name.in_( 

1093 select(JobEntry.name) 

1094 .with_for_update(skip_locked=True) 

1095 .where(JobEntry.worker_completed_timestamp <= delete_before_datetime) 

1096 .limit(limit) 

1097 ), 

1098 ) 

1099 

1100 with self._sql.session() as session: 

1101 options = {"synchronize_session": "fetch"} 

1102 num_rows_deleted: int = session.execute(delete_stmt, execution_options=options).rowcount 

1103 

1104 if num_rows_deleted: 

1105 publish_counter_metric(METRIC.SCHEDULER.PRUNE_COUNT, num_rows_deleted) 

1106 

1107 return num_rows_deleted 

1108 

1109 def _insert_on_conflict_do_nothing(self, model: type[OrmBase]) -> Insert: 

1110 # `Insert.on_conflict_do_nothing` is a SQLAlchemy "generative method", it 

1111 # returns a modified copy of the statement it is called on. For 

1112 # some reason mypy can't understand this, so the errors are ignored here. 

1113 if self._sql.dialect == "sqlite": 

1114 sqlite_insert: sqlite.Insert = sqlite.insert(model) 

1115 return sqlite_insert.on_conflict_do_nothing() 

1116 

1117 elif self._sql.dialect == "postgresql": 

1118 insertion: postgresql.Insert = postgresql.insert(model) 

1119 return insertion.on_conflict_do_nothing() 

1120 

1121 else: 

1122 # Fall back to the non-specific insert implementation. This doesn't 

1123 # support `ON CONFLICT DO NOTHING`, so callers need to be careful to 

1124 # still catch IntegrityErrors if other database backends are possible. 

1125 return insert(model) 

1126 

1127 def get_or_create_client_identity_in_store( 

1128 self, session: Session, client_id: ClientIdentityEntry 

1129 ) -> ClientIdentityEntry: 

1130 """Get the ClientIdentity in the storage or create one. 

1131 This helper function essentially makes sure the `client_id` is created during the transaction 

1132 

1133 Args: 

1134 session (Session): sqlalchemy Session 

1135 client_id (ClientIdentityEntry): identity of the client that creates an operation 

1136 

1137 Returns: 

1138 ClientIdentityEntry: identity of the client that creates an operation 

1139 """ 

1140 insertion = self._insert_on_conflict_do_nothing(ClientIdentityEntry) 

1141 insertion = insertion.values( 

1142 { 

1143 "instance": client_id.instance, 

1144 "workflow": client_id.workflow, 

1145 "actor": client_id.actor, 

1146 "subject": client_id.subject, 

1147 } 

1148 ) 

1149 try: 

1150 session.execute(insertion) 

1151 

1152 # Handle unique constraint violation when using an unsupported database (ie. not PostgreSQL or SQLite) 

1153 except IntegrityError: 

1154 LOGGER.debug("Handled IntegrityError when inserting client identity.") 

1155 

1156 stmt = ( 

1157 select(ClientIdentityEntry) 

1158 .where(ClientIdentityEntry.instance == client_id.instance) 

1159 .where(ClientIdentityEntry.workflow == client_id.workflow) 

1160 .where(ClientIdentityEntry.actor == client_id.actor) 

1161 .where(ClientIdentityEntry.subject == client_id.subject) 

1162 ) 

1163 

1164 result: ClientIdentityEntry = session.execute(stmt).scalar_one() 

1165 return result 

1166 

1167 def get_or_create_request_metadata_in_store( 

1168 self, session: Session, request_metadata: RequestMetadata 

1169 ) -> RequestMetadataEntry: 

1170 insertion = self._insert_on_conflict_do_nothing(RequestMetadataEntry) 

1171 insertion = insertion.values( 

1172 { 

1173 "action_mnemonic": request_metadata.action_mnemonic, 

1174 "configuration_id": request_metadata.configuration_id, 

1175 "correlated_invocations_id": request_metadata.correlated_invocations_id, 

1176 "invocation_id": request_metadata.tool_invocation_id, 

1177 "target_id": request_metadata.target_id, 

1178 "tool_name": request_metadata.tool_details.tool_name, 

1179 "tool_version": request_metadata.tool_details.tool_version, 

1180 } 

1181 ) 

1182 try: 

1183 session.execute(insertion) 

1184 

1185 # Handle unique constraint violation when using an unsupported database (ie. not PostgreSQL or SQLite) 

1186 except IntegrityError: 

1187 LOGGER.debug("Handled IntegrityError when inserting request metadata.") 

1188 

1189 stmt = ( 

1190 select(RequestMetadataEntry) 

1191 .where(RequestMetadataEntry.action_mnemonic == request_metadata.action_mnemonic) 

1192 .where(RequestMetadataEntry.configuration_id == request_metadata.configuration_id) 

1193 .where(RequestMetadataEntry.correlated_invocations_id == request_metadata.correlated_invocations_id) 

1194 .where(RequestMetadataEntry.invocation_id == request_metadata.tool_invocation_id) 

1195 .where(RequestMetadataEntry.target_id == request_metadata.target_id) 

1196 .where(RequestMetadataEntry.tool_name == request_metadata.tool_details.tool_name) 

1197 .where(RequestMetadataEntry.tool_version == request_metadata.tool_details.tool_version) 

1198 ) 

1199 

1200 result: RequestMetadataEntry = session.execute(stmt).scalar_one() 

1201 return result 

1202 

1203 def add_bot_entry( 

1204 self, *, bot_session_id: str, bot_session_status: int, bot_property_labels: list[str] = [] 

1205 ) -> str: 

1206 if not bot_property_labels: 

1207 bot_property_labels = ["unknown"] 

1208 

1209 with self._sql.session() as session: 

1210 # Check if bot_id is already known. If yes, all leases associated with 

1211 # it are requeued and the existing record deleted. A new record is then 

1212 # created with the new bot_id/name combination, as it would in the 

1213 # unknown case. 

1214 locate_bot_stmt = ( 

1215 select(BotEntry).where(BotEntry.bot_id == bot_session_id, self._bot_in_instance()).with_for_update() 

1216 ) 

1217 self._close_bot_sessions(session, session.execute(locate_bot_stmt).scalars().all()) 

1218 

1219 bot_name = f"{current_instance()}/{str(uuid.uuid4())}" 

1220 session.add( 

1221 BotEntry( 

1222 name=bot_name, 

1223 bot_id=bot_session_id, 

1224 last_update_timestamp=datetime.utcnow(), 

1225 lease_id=None, 

1226 bot_status=bot_session_status, 

1227 property_labels=[], 

1228 instance_name=current_instance(), 

1229 expiry_time=datetime.utcnow() + timedelta(seconds=self.bot_session_keepalive_timeout), 

1230 ) 

1231 ) 

1232 

1233 for label in bot_property_labels: 

1234 session.add(PropertyLabelEntry(property_label=label, bot_name=bot_name)) 

1235 

1236 return bot_name 

1237 

1238 def close_bot_sessions(self, bot_name: str) -> None: 

1239 with self._sql.session() as session: 

1240 locate_bot_stmt = ( 

1241 select(BotEntry).where(BotEntry.name == bot_name, self._bot_in_instance()).with_for_update() 

1242 ) 

1243 self._close_bot_sessions(session, session.execute(locate_bot_stmt).scalars().all()) 

1244 

1245 def _close_bot_sessions(self, session: Session, bots: Sequence[BotEntry]) -> None: 

1246 for bot in bots: 

1247 log_tags = { 

1248 "instance_name": try_current_instance(), 

1249 "request.bot_name": bot.name, 

1250 "request.bot_id": bot.bot_id, 

1251 "request.bot_status": bot.bot_status, 

1252 } 

1253 LOGGER.debug("Closing bot session.", tags=log_tags) 

1254 if bot.lease_id: 

1255 if job := self._get_job(bot.lease_id, session, with_for_update=True): 

1256 for db_lease in job.active_leases: 

1257 lease_tags = {**log_tags, "db.lease_id": job.name, "db.lease_state": db_lease.state} 

1258 LOGGER.debug("Reassigning lease for bot session.", tags=lease_tags) 

1259 self._retry_job_lease(session, job, db_lease) 

1260 self._notify_job_updated(job.name, session) 

1261 session.delete(bot) 

1262 

1263 def session_expiry_timer_loop(self, shutdown_requested: threading.Event) -> None: 

1264 LOGGER.info("Starting BotSession reaper.", tags=dict(keepalive_timeout=self.bot_session_keepalive_timeout)) 

1265 while not shutdown_requested.is_set(): 

1266 try: 

1267 while self.reap_expired_sessions(): 

1268 if shutdown_requested.is_set(): 

1269 break 

1270 except Exception as exception: 

1271 LOGGER.exception(exception) 

1272 shutdown_requested.wait(timeout=self.session_expiry_interval) 

1273 

1274 def reap_expired_sessions(self) -> bool: 

1275 """ 

1276 Find and close expired bot sessions. Returns True if sessions were closed. 

1277 Only closes a few sessions to minimize time in transaction. 

1278 """ 

1279 

1280 with self._sql.session() as session: 

1281 locate_bot_stmt = ( 

1282 select(BotEntry) 

1283 .where(BotEntry.expiry_time < datetime.utcnow()) 

1284 .order_by(BotEntry.expiry_time.desc()) 

1285 .with_for_update(skip_locked=True) 

1286 .limit(5) 

1287 ) 

1288 if bots := cast(list[BotEntry], session.execute(locate_bot_stmt).scalars().all()): 

1289 bots_by_instance: dict[str, list[BotEntry]] = defaultdict(list) 

1290 for bot in bots: 

1291 LOGGER.warning( 

1292 "BotSession has expired.", 

1293 tags=dict( 

1294 name=bot.name, bot_id=bot.bot_id, instance_name=bot.instance_name, deadline=bot.expiry_time 

1295 ), 

1296 ) 

1297 bots_by_instance[bot.instance_name].append(bot) 

1298 for instance_name, instance_bots in bots_by_instance.items(): 

1299 with instance_context(instance_name): 

1300 self._close_bot_sessions(session, instance_bots) 

1301 return True 

1302 return False 

1303 

1304 def _publish_job_duration( 

1305 self, start: Timestamp | None, end: Timestamp | None, state: str, property_label: str 

1306 ) -> None: 

1307 start_set = start is not None and (start.seconds > 0 or start.nanos > 0) 

1308 end_set = end is not None and (end.seconds > 0 or end.nanos > 0) 

1309 if start_set and end_set: 

1310 publish_timer_metric( 

1311 METRIC.JOB.DURATION, 

1312 end.ToDatetime() - start.ToDatetime(), # type: ignore[union-attr] 

1313 state=state, 

1314 propertyLabel=property_label, 

1315 ) 

1316 

1317 @timed(METRIC.SCHEDULER.SYNCHRONIZE_DURATION) 

1318 def synchronize_bot_lease( 

1319 self, 

1320 bot_name: str, 

1321 bot_id: str, 

1322 bot_status: int, 

1323 session_lease: Lease | None, 

1324 partial_execution_metadata: dict[str, ExecutedActionMetadata] | None = None, 

1325 ) -> Lease | None: 

1326 log_tags = { 

1327 "instance_name": try_current_instance(), 

1328 "request.bot_id": bot_id, 

1329 "request.bot_status": bot_status, 

1330 "request.bot_name": bot_name, 

1331 "request.lease_id": session_lease.id if session_lease else "", 

1332 "request.lease_state": session_lease.state if session_lease else "", 

1333 } 

1334 

1335 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session: 

1336 locate_bot_stmt = ( 

1337 select(BotEntry).where(BotEntry.bot_id == bot_id, self._bot_in_instance()).with_for_update() 

1338 ) 

1339 bots: Sequence[BotEntry] = session.execute(locate_bot_stmt).scalars().all() 

1340 if not bots: 

1341 raise InvalidArgumentError(f"Bot does not exist while validating leases. {log_tags}") 

1342 

1343 # This is a tricky case. This case happens when a new bot session is created while an older 

1344 # session for a bot id is waiting on leases. This can happen when a worker reboots but the 

1345 # connection context takes a long time to close. In this case, we DO NOT want to update anything 

1346 # in the database, because the work/lease has already been re-assigned to a new session. 

1347 # Closing anything in the database at this point would cause the newly restarted worker 

1348 # to get cancelled prematurely. 

1349 if len(bots) == 1 and bots[0].name != bot_name: 

1350 raise BotSessionMismatchError( 

1351 "Mismatch between client supplied bot_id/bot_name and buildgrid database record. " 

1352 f"db.bot_name=[{bots[0].name}] {log_tags}" 

1353 ) 

1354 

1355 # Everything at this point is wrapped in try/catch, so we can raise BotSessionMismatchError or 

1356 # BotSessionClosedError and have the session be closed if preconditions from here out fail. 

1357 try: 

1358 # There should never be time when two bot sessions exist for the same bot id. We have logic to 

1359 # assert that old database entries for a given bot id are closed and deleted prior to making a 

1360 # new one. If this case happens shut everything down, so we can hopefully recover. 

1361 if len(bots) > 1: 

1362 raise BotSessionMismatchError( 

1363 "Bot id is registered to more than one bot session. " 

1364 f"names=[{', '.join(bot.name for bot in bots)}] {log_tags}" 

1365 ) 

1366 

1367 bot = bots[0] 

1368 log_tags["db.lease_id"] = bot.lease_id 

1369 

1370 # Validate that the lease_id matches the client and database if both are supplied. 

1371 if (session_lease and session_lease.id and bot.lease_id) and (session_lease.id != bot.lease_id): 

1372 raise BotSessionMismatchError( 

1373 f"Mismatch between client supplied lease_id and buildgrid database record. {log_tags}" 

1374 ) 

1375 

1376 # Update the expiry time. 

1377 bot.expiry_time = datetime.utcnow() + timedelta(seconds=self.bot_session_keepalive_timeout) 

1378 bot.last_update_timestamp = datetime.utcnow() 

1379 bot.bot_status = bot_status 

1380 

1381 # Validate the cases where the database doesn't know about any leases. 

1382 if bot.lease_id is None: 

1383 # If there's no lease in the database or session, we have nothing to update! 

1384 if not session_lease: 

1385 LOGGER.debug("No lease in session or database. Skipping.", tags=log_tags) 

1386 return None 

1387 

1388 # If the database has no lease, but the work is completed, we probably timed out the last call. 

1389 if session_lease.state == LeaseState.COMPLETED.value: 

1390 LOGGER.debug("No lease in database, but session lease is completed. Skipping.", tags=log_tags) 

1391 return None 

1392 

1393 # Otherwise, the bot session has a lease that the server doesn't know about. Bad bad bad. 

1394 raise BotSessionClosedError(f"Bot session lease id does not match the database. {log_tags}") 

1395 

1396 # Let's now lock the job so no more state transitions occur while we perform our updates. 

1397 job = self._get_job(bot.lease_id, session, with_for_update=True) 

1398 if not job: 

1399 raise BotSessionClosedError(f"Bot session lease id points to non-existent job. {log_tags}") 

1400 

1401 # If we don't have any leases assigned to the job now, someone interrupted us before locking. 

1402 # Disconnect our bot from mutating this job. 

1403 if not job.leases: 

1404 raise BotSessionClosedError(f"Leases were changed while job was being locked. {log_tags}") 

1405 

1406 db_lease = job.leases[0] 

1407 log_tags["db.lease_state"] = db_lease.state 

1408 

1409 # Update Partial Execution Metadata: 

1410 # 

1411 # Update the job table in the database with the partial execution metadata from the worker. 

1412 # This is included in the UpdateBotSession GRPC call and should contain partial execution metadata 

1413 # for each lease. The job.name is the same as the lease_id. 

1414 

1415 if partial_execution_metadata: 

1416 if metadata := partial_execution_metadata.get(job.name): 

1417 if metadata.HasField("input_fetch_start_timestamp"): 

1418 job.input_fetch_start_timestamp = metadata.input_fetch_start_timestamp.ToDatetime() 

1419 if metadata.HasField("input_fetch_completed_timestamp"): 

1420 job.input_fetch_completed_timestamp = metadata.input_fetch_completed_timestamp.ToDatetime() 

1421 if metadata.HasField("output_upload_start_timestamp"): 

1422 job.output_upload_start_timestamp = metadata.output_upload_start_timestamp.ToDatetime() 

1423 if metadata.HasField("output_upload_completed_timestamp"): 

1424 job.output_upload_completed_timestamp = ( 

1425 metadata.output_upload_completed_timestamp.ToDatetime() 

1426 ) 

1427 if metadata.HasField("execution_start_timestamp"): 

1428 job.execution_start_timestamp = metadata.execution_start_timestamp.ToDatetime() 

1429 if metadata.HasField("execution_completed_timestamp"): 

1430 job.execution_completed_timestamp = metadata.execution_completed_timestamp.ToDatetime() 

1431 

1432 # Assign: 

1433 # 

1434 # If the lease is in the PENDING state, this means that it is a new lease for the worker, which 

1435 # it must acknowledge (the next time it calls UpdateBotSession) by changing the state to ACTIVE. 

1436 # 

1437 # Leases contain a “payload,” which is an Any proto that must be understandable to the bot. 

1438 # 

1439 # If at any time the bot issues a call to UpdateBotSession that is inconsistent with what the service 

1440 # expects, the service can take appropriate action. For example, the service may have assigned a 

1441 # lease to a bot, but the call gets interrupted before the bot receives the message, perhaps because 

1442 # the UpdateBotSession call times out. As a result, the next call to UpdateBotSession from the bot 

1443 # will not include the lease, and the service can immediately conclude that the lease needs to be 

1444 # reassigned. 

1445 # 

1446 if not session_lease: 

1447 if db_lease.state != LeaseState.PENDING.value: 

1448 raise BotSessionClosedError( 

1449 f"Session has no lease and database entry not in pending state. {log_tags}" 

1450 ) 

1451 

1452 job.stage = OperationStage.EXECUTING.value 

1453 if self.logstream_channel and self.logstream_instance is not None: 

1454 try: 

1455 action_digest = string_to_digest(job.action_digest) 

1456 parent_base = f"{action_digest.hash}_{action_digest.size_bytes}_{int(time())}" 

1457 with logstream_client(self.logstream_channel, self.logstream_instance) as ls_client: 

1458 stdout_stream = ls_client.create(f"{parent_base}_stdout") 

1459 stderr_stream = ls_client.create(f"{parent_base}_stderr") 

1460 job.stdout_stream_name = stdout_stream.name 

1461 job.stdout_stream_write_name = stdout_stream.write_resource_name 

1462 job.stderr_stream_name = stderr_stream.name 

1463 job.stderr_stream_write_name = stderr_stream.write_resource_name 

1464 except Exception as e: 

1465 LOGGER.warning("Failed to create log stream.", tags=log_tags, exc_info=e) 

1466 

1467 self._notify_job_updated(job.name, session) 

1468 LOGGER.debug("Pending lease sent to bot for ack.", tags=log_tags) 

1469 return db_lease.to_protobuf() 

1470 

1471 # At this point, we know that there's a lease both in the bot session and in the database. 

1472 

1473 # Accept: 

1474 # 

1475 # If the lease is in the PENDING state, this means that it is a new lease for the worker, 

1476 # which it must acknowledge (the next time it calls UpdateBotSession) by changing the state to ACTIVE 

1477 # 

1478 if session_lease.state == LeaseState.ACTIVE.value and db_lease.state == LeaseState.PENDING.value: 

1479 db_lease.state = LeaseState.ACTIVE.value 

1480 self._notify_job_updated(job.name, session) 

1481 LOGGER.debug("Bot acked pending lease.", tags=log_tags) 

1482 

1483 # Now the job has been accepted by a worker the time this job has spent in the queue can be 

1484 # calculated and posted to the metrics. 

1485 job_metadata = self.get_execute_action_metadata(job) 

1486 queued = job_metadata.queued_timestamp 

1487 worker_start = job_metadata.worker_start_timestamp 

1488 self._publish_job_duration(queued, worker_start, "Queued", job.property_label) 

1489 

1490 return session_lease 

1491 

1492 # Complete: 

1493 # 

1494 # Once the assignment is complete - either because it finishes or because it times out - the bot 

1495 # calls Bots.UpdateBotSession again, this time updating the state of the lease from accepted to 

1496 # complete, and optionally by also populating the lease’s results field, which is another Any proto. 

1497 # The service can then assign it new work (removing any completed leases). 

1498 # 

1499 # A successfully completed lease may go directly from PENDING to COMPLETED if, for example, the 

1500 # lease was completed before the bot has had the opportunity to transition to ACTIVE, or if the 

1501 # update transitioning the lease to the ACTIVE state was lost. 

1502 # 

1503 if session_lease.state == LeaseState.COMPLETED.value and db_lease.state in ( 

1504 LeaseState.PENDING.value, 

1505 LeaseState.ACTIVE.value, 

1506 ): 

1507 log_tags["request.lease_status_code"] = session_lease.status.code 

1508 log_tags["request.lease_status_message"] = session_lease.status.message 

1509 log_tags["db.n_tries"] = job.n_tries 

1510 

1511 bot.lease_id = None 

1512 if ( 

1513 session_lease.status.code in self.RETRYABLE_STATUS_CODES 

1514 and job.n_tries < self.max_job_attempts 

1515 ): 

1516 LOGGER.debug("Retrying bot lease.", tags=log_tags) 

1517 self._retry_job_lease(session, job, db_lease) 

1518 else: 

1519 LOGGER.debug("Bot completed lease.", tags=log_tags) 

1520 self._complete_lease(session, job, db_lease, session_lease.status, session_lease.result) 

1521 

1522 self._notify_job_updated(job.name, session) 

1523 return None 

1524 

1525 # Cancel: 

1526 # 

1527 # At any time, the service may change the state of a lease from PENDING or ACTIVE to CANCELLED; 

1528 # the bot may not change to this state. The service then waits for the bot to acknowledge the 

1529 # change by updating its own status to CANCELLED as well. Once both the service and the bot agree, 

1530 # the service may remove it from the list of leases. 

1531 # 

1532 if session_lease.state == db_lease.state == LeaseState.CANCELLED.value: 

1533 bot.lease_id = None 

1534 LOGGER.debug("Bot acked cancelled lease.", tags=log_tags) 

1535 return None 

1536 

1537 if db_lease.state == LeaseState.CANCELLED.value: 

1538 session_lease.state = LeaseState.CANCELLED.value 

1539 LOGGER.debug("Cancelled lease sent to bot for ack.", tags=log_tags) 

1540 return session_lease 

1541 

1542 if session_lease.state == LeaseState.CANCELLED.value: 

1543 raise BotSessionClosedError(f"Illegal attempt from session to set state as cancelled. {log_tags}") 

1544 

1545 # Keepalive: 

1546 # 

1547 # The Bot periodically calls Bots.UpdateBotSession, either if there’s a genuine change (for example, 

1548 # an attached phone has died) or simply to let the service know that it’s alive and ready to receive 

1549 # work. If the bot doesn’t call back on time, the service considers it to have died, and all work 

1550 # from the bot to be lost. 

1551 # 

1552 if session_lease.state == db_lease.state: 

1553 LOGGER.debug("Bot heartbeat acked.", tags=log_tags) 

1554 return session_lease 

1555 

1556 # Any other transition should really never happen... cover it anyways. 

1557 raise BotSessionClosedError(f"Unsupported lease state transition. {log_tags}") 

1558 # TODO allow creating a session with manual commit logic. 

1559 # For now... Sneak the exception past the context manager. 

1560 except (BotSessionMismatchError, BotSessionClosedError) as e: 

1561 self._close_bot_sessions(session, bots) 

1562 err = e 

1563 raise err 

1564 

1565 def _retry_job_lease(self, session: Session, job: JobEntry, lease: LeaseEntry) -> None: 

1566 # If the job was mutated before we could lock it, exit fast on terminal states. 

1567 if job.cancelled or job.stage == OperationStage.COMPLETED.value: 

1568 return 

1569 

1570 if job.n_tries >= self.max_job_attempts: 

1571 status = status_pb2.Status( 

1572 code=code_pb2.ABORTED, message=f"Job was retried {job.n_tries} unsuccessfully. Aborting." 

1573 ) 

1574 self._complete_lease(session, job, lease, status=status) 

1575 return 

1576 

1577 job.stage = OperationStage.QUEUED.value 

1578 job.assigned = False 

1579 job.n_tries += 1 

1580 

1581 lease.state = LeaseState.PENDING.value 

1582 lease.status = None 

1583 lease.worker_name = None 

1584 

1585 def _complete_lease( 

1586 self, session: Session, job: JobEntry, lease: LeaseEntry, status: Status, result: ProtoAny | None = None 

1587 ) -> None: 

1588 lease.state = LeaseState.COMPLETED.value 

1589 lease.status = status.code 

1590 

1591 job.stage = OperationStage.COMPLETED.value 

1592 job.status_code = status.code 

1593 if not job.do_not_cache: 

1594 job.do_not_cache = status.code != code_pb2.OK 

1595 job.worker_completed_timestamp = datetime.utcnow() 

1596 

1597 action_result = ActionResult() 

1598 if result is not None and result.Is(action_result.DESCRIPTOR): 

1599 result.Unpack(action_result) 

1600 now = datetime.utcnow() 

1601 action_result.execution_metadata.queued_timestamp.FromDatetime(job.queued_timestamp) 

1602 action_result.execution_metadata.worker_start_timestamp.FromDatetime(job.worker_start_timestamp or now) 

1603 action_result.execution_metadata.worker_completed_timestamp.FromDatetime(job.worker_completed_timestamp or now) 

1604 response = ExecuteResponse(result=action_result, cached_result=False, status=status) 

1605 

1606 job.result = digest_to_string(self.storage.put_message(response)) 

1607 

1608 if self.action_cache and result and not job.do_not_cache: 

1609 action_digest = string_to_digest(job.action_digest) 

1610 try: 

1611 self.action_cache.update_action_result(action_digest, action_result) 

1612 LOGGER.debug( 

1613 "Stored action result in ActionCache.", 

1614 tags=dict(action_result=action_result, digest=action_digest), 

1615 ) 

1616 except UpdateNotAllowedError: 

1617 # The configuration doesn't allow updating the old result 

1618 LOGGER.exception( 

1619 "ActionCache is not configured to allow updates, ActionResult wasn't updated.", 

1620 tags=dict(digest=action_digest), 

1621 ) 

1622 except Exception: 

1623 LOGGER.exception( 

1624 "Unable to update ActionCache, results will not be stored in the ActionCache.", 

1625 tags=dict(digest=action_digest), 

1626 ) 

1627 

1628 # Update retentions 

1629 self._update_action_retention( 

1630 Action.FromString(job.action), 

1631 string_to_digest(job.action_digest), 

1632 retention_hours=self.completed_action_retention_hours, 

1633 ) 

1634 if action_result.ByteSize() > 0: 

1635 self._update_action_result_retention(action_result, retention_hours=self.action_result_retention_hours) 

1636 

1637 self._publish_execution_stats(session, job.name, action_result.execution_metadata, job.property_label) 

1638 

1639 def get_bot_status_metrics(self) -> BotMetrics: 

1640 """Count the number of bots with a particular status and property_label""" 

1641 with self._sql.session() as session: 

1642 metrics: BotMetrics = {"bots_total": {}, "bots_per_property_label": {}} 

1643 

1644 # bot count by status only 

1645 query_total = ( 

1646 session.query(BotEntry.bot_status, func.count(BotEntry.bot_status)) 

1647 .group_by(BotEntry.bot_status) 

1648 .filter(self._bot_in_instance()) 

1649 ) 

1650 for status in BotStatus: 

1651 metrics["bots_total"][status] = 0 

1652 for [bot_status, count] in query_total.all(): 

1653 metrics["bots_total"][BotStatus(bot_status)] = cast(int, count) 

1654 

1655 # bot count by status for each property label 

1656 query_per_label = ( 

1657 session.query(BotEntry.bot_status, PropertyLabelEntry.property_label, func.count(BotEntry.bot_status)) 

1658 .join(BotEntry, BotEntry.name == PropertyLabelEntry.bot_name, isouter=True) 

1659 .group_by(BotEntry.bot_status, PropertyLabelEntry.property_label) 

1660 .filter(self._bot_in_instance()) 

1661 ) 

1662 for status in BotStatus: 

1663 metrics["bots_per_property_label"][status, "unknown"] = 0 

1664 for [bot_status, property_label, count] in query_per_label.all(): 

1665 metrics["bots_per_property_label"][BotStatus(bot_status), property_label] = cast(int, count) 

1666 

1667 return metrics 

1668 

1669 def refresh_bot_expiry_time(self, bot_name: str, bot_id: str) -> datetime: 

1670 """ 

1671 This update is done out-of-band from the main synchronize_bot_lease transaction, as there 

1672 are cases where we will skip calling the synchronization, but still want the session to be 

1673 updated such that it does not get reaped. This slightly duplicates the update happening in 

1674 synchronize_bot_lease, however, that update is still required to not have the job reaped 

1675 during its job assignment waiting period. 

1676 

1677 This method should be called at the end of the update and create bot session methods. 

1678 The returned datetime should be assigned to the deadline within the returned session proto. 

1679 """ 

1680 

1681 locate_bot_stmt = ( 

1682 select(BotEntry) 

1683 .where(BotEntry.name == bot_name, BotEntry.bot_id == bot_id, self._bot_in_instance()) 

1684 .with_for_update() 

1685 ) 

1686 with self._sql.session() as session: 

1687 if bot := session.execute(locate_bot_stmt).scalar(): 

1688 now = datetime.utcnow() 

1689 bot.last_update_timestamp = now 

1690 bot.expiry_time = now + timedelta(seconds=self.bot_session_keepalive_timeout) 

1691 return bot.expiry_time 

1692 raise BotSessionClosedError("Bot not found to fetch expiry. {bot_name=} {bot_id=}") 

1693 

1694 def get_metadata_for_leases(self, leases: Iterable[Lease]) -> list[tuple[str, bytes]]: 

1695 """Return a list of Job metadata for a given list of leases. 

1696 

1697 Args: 

1698 leases (list): List of leases to get Job metadata for. 

1699 

1700 Returns: 

1701 List of tuples of the form 

1702 ``('executeoperationmetadata-bin': serialized_metadata)``. 

1703 

1704 """ 

1705 metadata = [] 

1706 with self._sql_ro.session() as session: 

1707 for lease in leases: 

1708 job = self._get_job(lease.id, session) 

1709 if job is not None: 

1710 job_metadata = ExecuteOperationMetadata( 

1711 stage=job.stage, # type: ignore[arg-type] 

1712 action_digest=string_to_digest(job.action_digest), 

1713 stderr_stream_name=job.stderr_stream_write_name or "", 

1714 stdout_stream_name=job.stdout_stream_write_name or "", 

1715 partial_execution_metadata=self.get_execute_action_metadata(job), 

1716 ) 

1717 metadata.append(("executeoperationmetadata-bin", job_metadata.SerializeToString())) 

1718 

1719 return metadata 

1720 

1721 def get_execute_action_metadata(self, job: JobEntry) -> ExecutedActionMetadata: 

1722 worker_name = "" 

1723 if job.leases: 

1724 worker_name = job.leases[-1].worker_name or "" 

1725 

1726 metadata = ExecutedActionMetadata(worker=worker_name) 

1727 

1728 def assign_timestamp(field: Timestamp, timestamp: datetime | None) -> None: 

1729 if timestamp is not None: 

1730 field.FromDatetime(timestamp) 

1731 

1732 assign_timestamp(metadata.queued_timestamp, job.queued_timestamp) 

1733 assign_timestamp(metadata.worker_start_timestamp, job.worker_start_timestamp) 

1734 assign_timestamp(metadata.worker_completed_timestamp, job.worker_completed_timestamp) 

1735 assign_timestamp(metadata.input_fetch_start_timestamp, job.input_fetch_start_timestamp) 

1736 assign_timestamp(metadata.input_fetch_completed_timestamp, job.input_fetch_completed_timestamp) 

1737 assign_timestamp(metadata.output_upload_start_timestamp, job.output_upload_start_timestamp) 

1738 assign_timestamp(metadata.output_upload_completed_timestamp, job.output_upload_completed_timestamp) 

1739 assign_timestamp(metadata.execution_start_timestamp, job.execution_start_timestamp) 

1740 assign_timestamp(metadata.execution_completed_timestamp, job.execution_completed_timestamp) 

1741 

1742 return metadata 

1743 

1744 def _fetch_execution_stats( 

1745 self, auxiliary_metadata: RepeatedCompositeFieldContainer[ProtoAny] 

1746 ) -> ExecutionStatistics | None: 

1747 """Fetch ExecutionStatistics from Storage 

1748 ProtoAny[Digest] -> ProtoAny[ExecutionStatistics] 

1749 """ 

1750 for aux_metadata_any in auxiliary_metadata: 

1751 # Get the wrapped digest 

1752 if not aux_metadata_any.Is(Digest.DESCRIPTOR): 

1753 continue 

1754 aux_metadata_digest = Digest() 

1755 try: 

1756 aux_metadata_any.Unpack(aux_metadata_digest) 

1757 # Get the blob from CAS 

1758 execution_stats_any = self.storage.get_message(aux_metadata_digest, ProtoAny) 

1759 # Get the wrapped ExecutionStatistics 

1760 if execution_stats_any and execution_stats_any.Is(ExecutionStatistics.DESCRIPTOR): 

1761 execution_stats = ExecutionStatistics() 

1762 execution_stats_any.Unpack(execution_stats) 

1763 return execution_stats 

1764 except Exception as exc: 

1765 LOGGER.exception( 

1766 "Cannot fetch ExecutionStatistics from storage.", 

1767 tags=dict(auxiliary_metadata=aux_metadata_digest), 

1768 exc_info=exc, 

1769 ) 

1770 return None 

1771 return None 

1772 

1773 def publish_execution_stats( 

1774 self, job_name: str, execution_metadata: ExecutedActionMetadata, property_label: str = "unknown" 

1775 ) -> None: 

1776 with self._sql_ro.session(expire_on_commit=False) as session: 

1777 self._publish_execution_stats(session, job_name, execution_metadata, property_label) 

1778 

1779 def _publish_execution_stats( 

1780 self, session: Session, job_name: str, execution_metadata: ExecutedActionMetadata, property_label: str 

1781 ) -> None: 

1782 """Publish resource usage of the job""" 

1783 queued = execution_metadata.queued_timestamp 

1784 worker_start = execution_metadata.worker_start_timestamp 

1785 worker_completed = execution_metadata.worker_completed_timestamp 

1786 fetch_start = execution_metadata.input_fetch_start_timestamp 

1787 fetch_completed = execution_metadata.input_fetch_completed_timestamp 

1788 execution_start = execution_metadata.execution_start_timestamp 

1789 execution_completed = execution_metadata.execution_completed_timestamp 

1790 upload_start = execution_metadata.output_upload_start_timestamp 

1791 upload_completed = execution_metadata.output_upload_completed_timestamp 

1792 

1793 self._publish_job_duration(queued, worker_completed, "Total", property_label) 

1794 # The Queued time is missing here as it's posted as soon as worker has accepted the job. 

1795 self._publish_job_duration(worker_start, worker_completed, "Worker", property_label) 

1796 self._publish_job_duration(fetch_start, fetch_completed, "Fetch", property_label) 

1797 self._publish_job_duration(execution_start, execution_completed, "Execution", property_label) 

1798 self._publish_job_duration(upload_start, upload_completed, "Upload", property_label) 

1799 

1800 if self.metering_client is None or len(execution_metadata.auxiliary_metadata) == 0: 

1801 return 

1802 

1803 execution_stats = self._fetch_execution_stats(execution_metadata.auxiliary_metadata) 

1804 if execution_stats is None: 

1805 return 

1806 usage = Usage( 

1807 computing=ComputingUsage( 

1808 utime=execution_stats.command_rusage.utime.ToMilliseconds(), 

1809 stime=execution_stats.command_rusage.stime.ToMilliseconds(), 

1810 maxrss=execution_stats.command_rusage.maxrss, 

1811 inblock=execution_stats.command_rusage.inblock, 

1812 oublock=execution_stats.command_rusage.oublock, 

1813 ) 

1814 ) 

1815 

1816 try: 

1817 operations = ( 

1818 session.query(OperationEntry) 

1819 .where(OperationEntry.job_name == job_name) 

1820 .options(joinedload(OperationEntry.client_identity)) 

1821 .all() 

1822 ) 

1823 for op in operations: 

1824 if op.client_identity is None: 

1825 continue 

1826 client_id = Identity( 

1827 instance=op.client_identity.instance, 

1828 workflow=op.client_identity.workflow, 

1829 actor=op.client_identity.actor, 

1830 subject=op.client_identity.subject, 

1831 ) 

1832 self.metering_client.put_usage(identity=client_id, operation_name=op.name, usage=usage) 

1833 except Exception as exc: 

1834 LOGGER.exception("Cannot publish resource usage.", tags=dict(job_name=job_name), exc_info=exc) 

1835 

1836 def _update_action_retention(self, action: Action, action_digest: Digest, retention_hours: float | None) -> None: 

1837 if not self.asset_client or not retention_hours: 

1838 return 

1839 uri = DIGEST_URI_TEMPLATE.format(digest_hash=action_digest.hash) 

1840 qualifier = {"resource_type": PROTOBUF_MEDIA_TYPE} 

1841 expire_at = datetime.now() + timedelta(hours=retention_hours) 

1842 referenced_blobs = [action.command_digest] 

1843 referenced_directories = [action.input_root_digest] 

1844 

1845 try: 

1846 self.asset_client.push_blob( 

1847 uris=[uri], 

1848 qualifiers=qualifier, 

1849 blob_digest=action_digest, 

1850 expire_at=expire_at, 

1851 referenced_blobs=referenced_blobs, 

1852 referenced_directories=referenced_directories, 

1853 ) 

1854 LOGGER.debug( 

1855 "Extended the retention of action.", tags=dict(digest=action_digest, retention_hours=retention_hours) 

1856 ) 

1857 except Exception: 

1858 LOGGER.exception("Failed to push action as an asset.", tags=dict(digest=action_digest)) 

1859 # Not a fatal path, don't reraise here 

1860 

1861 def _update_action_result_retention(self, action_result: ActionResult, retention_hours: float | None) -> None: 

1862 if not self.asset_client or not retention_hours: 

1863 return 

1864 digest = None 

1865 try: 

1866 # BuildGrid doesn't store action_result in CAS, but if we push it as an asset 

1867 # we need it to be accessible 

1868 digest = self.storage.put_message(action_result) 

1869 

1870 uri = DIGEST_URI_TEMPLATE.format(digest_hash=digest.hash) 

1871 qualifier = {"resource_type": PROTOBUF_MEDIA_TYPE} 

1872 expire_at = datetime.now() + timedelta(hours=retention_hours) 

1873 

1874 referenced_blobs: list[Digest] = [] 

1875 referenced_directories: list[Digest] = [] 

1876 

1877 for file in action_result.output_files: 

1878 referenced_blobs.append(file.digest) 

1879 for dir in action_result.output_directories: 

1880 # Caveat: the underlying directories referenced by this `Tree` message are not referenced by this asset. 

1881 # For clients who need to keep all referenced outputs, 

1882 # consider setting `Action.output_directory_format` as `DIRECTORY_ONLY` or `TREE_AND_DIRECTORY`. 

1883 if dir.tree_digest.ByteSize() != 0: 

1884 referenced_blobs.append(dir.tree_digest) 

1885 if dir.root_directory_digest.ByteSize() != 0: 

1886 referenced_directories.append(dir.root_directory_digest) 

1887 

1888 if action_result.stdout_digest.ByteSize() != 0: 

1889 referenced_blobs.append(action_result.stdout_digest) 

1890 if action_result.stderr_digest.ByteSize() != 0: 

1891 referenced_blobs.append(action_result.stderr_digest) 

1892 

1893 self.asset_client.push_blob( 

1894 uris=[uri], 

1895 qualifiers=qualifier, 

1896 blob_digest=digest, 

1897 expire_at=expire_at, 

1898 referenced_blobs=referenced_blobs, 

1899 referenced_directories=referenced_directories, 

1900 ) 

1901 LOGGER.debug( 

1902 "Extended the retention of action result.", tags=dict(digest=digest, retention_hours=retention_hours) 

1903 ) 

1904 

1905 except Exception as e: 

1906 LOGGER.exception("Failed to push action_result as an asset.", tags=dict(digest=digest), exc_info=e) 

1907 # Not a fatal path, don't reraise here