Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/impl.py: 91.42%
886 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-03-13 15:36 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-03-13 15:36 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import threading
17import uuid
18from collections import defaultdict
19from contextlib import ExitStack
20from datetime import datetime, timedelta
21from time import time
22from typing import Any, Iterable, NamedTuple, Sequence, TypedDict, TypeVar, cast
24from buildgrid_metering.client import SyncMeteringServiceClient
25from buildgrid_metering.models.dataclasses import ComputingUsage, Identity, Usage
26from google.protobuf.any_pb2 import Any as ProtoAny
27from google.protobuf.internal.containers import RepeatedCompositeFieldContainer
28from google.protobuf.timestamp_pb2 import Timestamp
29from grpc import Channel
30from sqlalchemy import ColumnExpressionArgument, and_, delete, func, insert, or_, select, text, update
31from sqlalchemy.dialects import postgresql, sqlite
32from sqlalchemy.exc import IntegrityError
33from sqlalchemy.orm import Session, joinedload, selectinload
34from sqlalchemy.sql.expression import Insert, Select
36from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
37from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import (
38 Action,
39 ActionResult,
40 Command,
41 Digest,
42 ExecutedActionMetadata,
43 ExecuteOperationMetadata,
44 ExecuteResponse,
45 RequestMetadata,
46 ToolDetails,
47)
48from buildgrid._protos.build.buildbox.execution_stats_pb2 import ExecutionStatistics
49from buildgrid._protos.buildgrid.v2.identity_pb2 import ClientIdentity
50from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import Lease
51from buildgrid._protos.google.longrunning import operations_pb2
52from buildgrid._protos.google.longrunning.operations_pb2 import Operation
53from buildgrid._protos.google.rpc import code_pb2, status_pb2
54from buildgrid._protos.google.rpc.status_pb2 import Status
55from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
56from buildgrid.server.cas.storage.storage_abc import StorageABC
57from buildgrid.server.client.asset import AssetClient
58from buildgrid.server.client.logstream import logstream_client
59from buildgrid.server.context import current_instance, instance_context, try_current_instance
60from buildgrid.server.decorators import timed
61from buildgrid.server.enums import BotStatus, LeaseState, MeteringThrottleAction, OperationStage
62from buildgrid.server.exceptions import (
63 BotSessionClosedError,
64 BotSessionMismatchError,
65 CancelledError,
66 DatabaseError,
67 InvalidArgumentError,
68 NotFoundError,
69 ResourceExhaustedError,
70 UpdateNotAllowedError,
71)
72from buildgrid.server.logging import buildgrid_logger
73from buildgrid.server.metrics_names import METRIC
74from buildgrid.server.metrics_utils import publish_counter_metric, publish_timer_metric, timer
75from buildgrid.server.operations.filtering import DEFAULT_SORT_KEYS, OperationFilter, SortKey
76from buildgrid.server.settings import DEFAULT_MAX_EXECUTION_TIMEOUT, SQL_SCHEDULER_METRICS_PUBLISH_INTERVAL_SECONDS
77from buildgrid.server.sql.models import Base as OrmBase
78from buildgrid.server.sql.models import (
79 BotEntry,
80 ClientIdentityEntry,
81 JobEntry,
82 LeaseEntry,
83 OperationEntry,
84 PlatformEntry,
85 PropertyLabelEntry,
86 RequestMetadataEntry,
87 digest_to_string,
88 job_platform_association,
89 string_to_digest,
90)
91from buildgrid.server.sql.provider import SqlProvider
92from buildgrid.server.sql.utils import (
93 build_custom_filters,
94 build_page_filter,
95 build_page_token,
96 build_sort_column_list,
97 extract_sort_keys,
98)
99from buildgrid.server.threading import ContextWorker
100from buildgrid.server.utils.digests import create_digest
102from .assigner import JobAssigner
103from .notifier import OperationsNotifier
104from .properties import PropertySet, hash_from_dict
106LOGGER = buildgrid_logger(__name__)
109PROTOBUF_MEDIA_TYPE = "application/x-protobuf"
110DIGEST_URI_TEMPLATE = "nih:sha-256;{digest_hash}"
113class SchedulerMetrics(TypedDict, total=False):
114 # dict[tuple[stage_name: str, property_label: str], number_of_jobs: int]
115 jobs: dict[tuple[str, str], int]
118class BotMetrics(TypedDict, total=False):
119 # dict[tuple[bot_status: BotStatus], number_of_bots: int]
120 bots_total: dict[BotStatus, int]
122 # dict[tuple[bot_status: BotStatus, property_label: str], number_of_bots: int]
123 bots_per_property_label: dict[tuple[BotStatus, str], int]
126class AgedJobHandlerOptions(NamedTuple):
127 job_max_age: timedelta = timedelta(days=30)
128 handling_period: timedelta = timedelta(minutes=5)
129 max_handling_window: int = 10000
131 @staticmethod
132 def from_config(
133 job_max_age_cfg: dict[str, float],
134 handling_period_cfg: dict[str, float] | None = None,
135 max_handling_window_cfg: int | None = None,
136 ) -> "AgedJobHandlerOptions":
137 """Helper method for creating ``AgedJobHandlerOptions`` objects
138 If input configs are None, assign defaults"""
140 def _dict_to_timedelta(config: dict[str, float]) -> timedelta:
141 return timedelta(
142 weeks=config.get("weeks", 0),
143 days=config.get("days", 0),
144 hours=config.get("hours", 0),
145 minutes=config.get("minutes", 0),
146 seconds=config.get("seconds", 0),
147 )
149 return AgedJobHandlerOptions(
150 job_max_age=_dict_to_timedelta(job_max_age_cfg) if job_max_age_cfg else timedelta(days=30),
151 handling_period=_dict_to_timedelta(handling_period_cfg) if handling_period_cfg else timedelta(minutes=5),
152 max_handling_window=max_handling_window_cfg if max_handling_window_cfg else 10000,
153 )
156T = TypeVar("T", bound="Scheduler")
159class Scheduler:
160 RETRYABLE_STATUS_CODES = (code_pb2.INTERNAL, code_pb2.UNAVAILABLE)
162 def __init__(
163 self,
164 sql_provider: SqlProvider,
165 storage: StorageABC,
166 *,
167 sql_ro_provider: SqlProvider | None = None,
168 sql_notifier_provider: SqlProvider | None = None,
169 property_set: PropertySet,
170 action_cache: ActionCacheABC | None = None,
171 action_browser_url: str | None = None,
172 max_execution_timeout: int = DEFAULT_MAX_EXECUTION_TIMEOUT,
173 metering_client: SyncMeteringServiceClient | None = None,
174 metering_throttle_action: MeteringThrottleAction | None = None,
175 bot_session_keepalive_timeout: int = 600,
176 logstream_channel: Channel | None = None,
177 logstream_instance: str | None = None,
178 asset_client: AssetClient | None = None,
179 queued_action_retention_hours: float | None = None,
180 completed_action_retention_hours: float | None = None,
181 action_result_retention_hours: float | None = None,
182 enable_job_watcher: bool = False,
183 poll_interval: float = 1,
184 pruning_options: AgedJobHandlerOptions | None = None,
185 queue_timeout_options: AgedJobHandlerOptions | None = None,
186 max_job_attempts: int = 5,
187 job_assignment_interval: float = 1.0,
188 priority_assignment_percentage: int = 100,
189 max_queue_size: int | None = None,
190 execution_timer_interval: float = 60.0,
191 session_expiry_timer_interval: float = 10.0,
192 ) -> None:
193 self._stack = ExitStack()
195 self.storage = storage
197 self.poll_interval = poll_interval
198 self.execution_timer_interval = execution_timer_interval
199 self.session_expiry_interval = session_expiry_timer_interval
200 self.pruning_options = pruning_options
201 self.queue_timeout_options = queue_timeout_options
202 self.max_job_attempts = max_job_attempts
204 self._sql = sql_provider
205 self._sql_ro = sql_ro_provider or sql_provider
206 self._sql_notifier = sql_notifier_provider or sql_provider
208 self.property_set = property_set
210 self.action_cache = action_cache
211 self.action_browser_url = (action_browser_url or "").rstrip("/")
212 self.max_execution_timeout = max_execution_timeout
213 self.enable_job_watcher = enable_job_watcher
214 self.metering_client = metering_client
215 self.metering_throttle_action = metering_throttle_action or MeteringThrottleAction.DEPRIORITIZE
216 self.bot_session_keepalive_timeout = bot_session_keepalive_timeout
217 self.logstream_channel = logstream_channel
218 self.logstream_instance = logstream_instance
219 self.asset_client = asset_client
220 self.queued_action_retention_hours = queued_action_retention_hours
221 self.completed_action_retention_hours = completed_action_retention_hours
222 self.action_result_retention_hours = action_result_retention_hours
223 self.max_queue_size = max_queue_size
225 # Overall Scheduler Metrics (totals of jobs/leases in each state)
226 # Publish those metrics a bit more sparsely since the SQL requests
227 # required to gather them can become expensive
228 self._last_scheduler_metrics_publish_time: datetime | None = None
229 self._scheduler_metrics_publish_interval = timedelta(seconds=SQL_SCHEDULER_METRICS_PUBLISH_INTERVAL_SECONDS)
231 self.ops_notifier = OperationsNotifier(self._sql_notifier, self.poll_interval)
232 self.prune_timer = ContextWorker(name="JobPruner", target=self.prune_timer_loop)
233 self.queue_timer = ContextWorker(name="QueueTimeout", target=self.queue_timer_loop)
234 self.execution_timer = ContextWorker(name="ExecutionTimeout", target=self.execution_timer_loop)
235 self.session_expiry_timer = ContextWorker(self.session_expiry_timer_loop, "BotReaper")
236 self.job_assigner = JobAssigner(
237 self,
238 property_set=property_set,
239 job_assignment_interval=job_assignment_interval,
240 priority_percentage=priority_assignment_percentage,
241 )
243 def __repr__(self) -> str:
244 return f"Scheduler for `{repr(self._sql._engine.url)}`"
246 def __enter__(self: T) -> T:
247 self.start()
248 return self
250 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
251 self.stop()
253 def start(self) -> None:
254 self._stack.enter_context(self.storage)
255 if self.action_cache:
256 self._stack.enter_context(self.action_cache)
258 if self.logstream_channel:
259 self._stack.enter_context(self.logstream_channel)
260 if self.asset_client:
261 self._stack.enter_context(self.asset_client)
262 # Pruning configuration parameters
263 if self.pruning_options is not None:
264 LOGGER.info(f"Scheduler pruning enabled: {self.pruning_options}")
265 self._stack.enter_context(self.prune_timer)
266 else:
267 LOGGER.info("Scheduler pruning not enabled.")
269 # Queue timeout thread
270 if self.queue_timeout_options is not None:
271 LOGGER.info(f"Job queue timeout enabled: {self.queue_timeout_options}")
272 self._stack.enter_context(self.queue_timer)
273 else:
274 LOGGER.info("Job queue timeout not enabled.")
276 if self.execution_timer_interval > 0:
277 self._stack.enter_context(self.execution_timer)
278 if self.poll_interval > 0:
279 self._stack.enter_context(self.ops_notifier)
281 def stop(self) -> None:
282 self._stack.close()
283 LOGGER.info("Stopped Scheduler.")
285 def _job_in_instance(self) -> ColumnExpressionArgument[bool]:
286 return JobEntry.instance_name == current_instance()
288 def _bot_in_instance(self) -> ColumnExpressionArgument[bool]:
289 return BotEntry.instance_name == current_instance()
291 def queue_job_action(
292 self,
293 *,
294 action: Action,
295 action_digest: Digest,
296 command: Command,
297 platform_requirements: dict[str, list[str]],
298 property_label: str,
299 priority: int,
300 skip_cache_lookup: bool,
301 request_metadata: RequestMetadata | None = None,
302 client_identity: ClientIdentityEntry | None = None,
303 ) -> str:
304 """
305 De-duplicates or inserts a newly created job into the execution queue.
306 Returns an operation name associated with this job.
307 """
308 if self.max_execution_timeout and action.timeout.seconds > self.max_execution_timeout:
309 raise InvalidArgumentError("Action timeout is larger than the server's maximum execution timeout.")
311 if not action.do_not_cache:
312 if operation_name := self.create_operation_for_existing_job(
313 action_digest=action_digest,
314 priority=priority,
315 request_metadata=request_metadata,
316 client_identity=client_identity,
317 ):
318 return operation_name
320 # If there was another job already in the action cache, we can check now.
321 # We can use this entry to create a job and create it already completed!
322 execute_response: ExecuteResponse | None = None
323 if self.action_cache and not action.do_not_cache and not skip_cache_lookup:
324 try:
325 action_result = self.action_cache.get_action_result(action_digest)
326 LOGGER.info("Job cache hit for action.", tags=dict(digest=action_digest))
327 execute_response = ExecuteResponse()
328 execute_response.result.CopyFrom(action_result)
329 execute_response.cached_result = True
330 except NotFoundError:
331 pass
332 except Exception:
333 LOGGER.exception("Checking ActionCache for action failed.", tags=dict(digest=action_digest))
335 # Extend retention for action
336 self._update_action_retention(action, action_digest, self.queued_action_retention_hours)
338 return self.create_operation_for_new_job(
339 action=action,
340 action_digest=action_digest,
341 command=command,
342 execute_response=execute_response,
343 platform_requirements=platform_requirements,
344 property_label=property_label,
345 priority=priority,
346 request_metadata=request_metadata,
347 client_identity=client_identity,
348 )
350 def create_operation_for_existing_job(
351 self,
352 *,
353 action_digest: Digest,
354 priority: int,
355 request_metadata: RequestMetadata | None,
356 client_identity: ClientIdentityEntry | None,
357 ) -> str | None:
358 # Find a job with a matching action that isn't completed or cancelled and that can be cached.
359 find_existing_stmt = (
360 select(JobEntry)
361 .where(
362 JobEntry.action_digest == digest_to_string(action_digest),
363 JobEntry.stage != OperationStage.COMPLETED.value,
364 JobEntry.cancelled != True, # noqa: E712
365 JobEntry.do_not_cache != True, # noqa: E712
366 self._job_in_instance(),
367 )
368 .with_for_update()
369 )
371 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session:
372 if not (job := session.execute(find_existing_stmt).scalars().first()):
373 return None
375 # Reschedule if priority is now greater, and we're still waiting on it to start.
376 if priority < job.priority and job.stage == OperationStage.QUEUED.value:
377 LOGGER.info("Job assigned a new priority.", tags=dict(job_name=job.name, priority=priority))
378 job.priority = priority
379 job.assigned = False
381 return self._create_operation(
382 session,
383 job_name=job.name,
384 request_metadata=request_metadata,
385 client_identity=client_identity,
386 )
388 def create_operation_for_new_job(
389 self,
390 *,
391 action: Action,
392 action_digest: Digest,
393 command: Command,
394 execute_response: ExecuteResponse | None,
395 platform_requirements: dict[str, list[str]],
396 property_label: str,
397 priority: int,
398 request_metadata: RequestMetadata | None = None,
399 client_identity: ClientIdentityEntry | None = None,
400 ) -> str:
401 if execute_response is None and self.max_queue_size is not None:
402 # Using func.count here to avoid generating a subquery in the WHERE
403 # clause of the resulting query.
404 # https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.query.Query.count
405 queue_count_statement = select(func.count(JobEntry.name)).where(
406 JobEntry.assigned != True, # noqa: E712
407 self._job_in_instance(),
408 JobEntry.property_label == property_label,
409 JobEntry.stage == OperationStage.QUEUED.value,
410 )
411 else:
412 queue_count_statement = None
414 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session:
415 if queue_count_statement is not None:
416 queue_size = session.execute(queue_count_statement).scalar_one()
417 if self.max_queue_size is not None and queue_size >= self.max_queue_size:
418 raise ResourceExhaustedError(f"The platform's job queue is full: {property_label=}")
420 now = datetime.utcnow()
421 job = JobEntry(
422 instance_name=current_instance(),
423 name=str(uuid.uuid4()),
424 action=action.SerializeToString(),
425 action_digest=digest_to_string(action_digest),
426 do_not_cache=action.do_not_cache,
427 priority=priority,
428 stage=OperationStage.QUEUED.value,
429 create_timestamp=now,
430 queued_timestamp=now,
431 command=" ".join(command.arguments),
432 platform_requirements=hash_from_dict(platform_requirements),
433 platform=self._populate_platform_requirements(session, platform_requirements),
434 property_label=property_label,
435 n_tries=1,
436 )
437 if execute_response:
438 job.stage = OperationStage.COMPLETED.value
439 job.result = digest_to_string(self.storage.put_message(execute_response))
440 job.status_code = execute_response.status.code
441 job.worker_completed_timestamp = datetime.utcnow()
443 session.add(job)
445 return self._create_operation(
446 session,
447 job_name=job.name,
448 request_metadata=request_metadata,
449 client_identity=client_identity,
450 )
452 def _populate_platform_requirements(
453 self, session: Session, platform_requirements: dict[str, list[str]]
454 ) -> list[PlatformEntry]:
455 if not platform_requirements:
456 return []
458 required_entries = {(k, v) for k, values in platform_requirements.items() for v in values}
459 conditions = [and_(PlatformEntry.key == k, PlatformEntry.value == v) for k, v in required_entries]
460 statement = select(PlatformEntry.key, PlatformEntry.value).where(or_(*conditions))
462 while missing := required_entries - {(k, v) for [k, v] in session.execute(statement).all()}:
463 try:
464 session.execute(insert(PlatformEntry), [{"key": k, "value": v} for k, v in missing])
465 session.commit()
466 except IntegrityError:
467 session.rollback()
469 return list(session.execute(select(PlatformEntry).where(or_(*conditions))).scalars())
471 def create_operation(
472 self,
473 job_name: str,
474 *,
475 request_metadata: RequestMetadata | None = None,
476 client_identity: ClientIdentityEntry | None = None,
477 ) -> str:
478 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session:
479 if not (job := self._get_job(job_name, session, with_for_update=True)):
480 raise NotFoundError(f"Job name does not exist: [{job_name}]")
482 if job.cancelled:
483 raise CancelledError(f"Job {job_name} is cancelled")
485 return self._create_operation(
486 session, job_name=job_name, request_metadata=request_metadata, client_identity=client_identity
487 )
489 def _create_operation(
490 self,
491 session: Session,
492 *,
493 job_name: str,
494 request_metadata: RequestMetadata | None,
495 client_identity: ClientIdentityEntry | None,
496 ) -> str:
498 client_identity_id: int | None = None
499 if client_identity:
500 client_identity_id = self.get_or_create_client_identity_in_store(session, client_identity).id
502 request_metadata_id: int | None = None
503 if request_metadata:
504 request_metadata_id = self.get_or_create_request_metadata_in_store(session, request_metadata).id
506 request_metadata = request_metadata or RequestMetadata()
507 operation = OperationEntry(
508 name=str(uuid.uuid4()),
509 job_name=job_name,
510 client_identity_id=client_identity_id,
511 request_metadata_id=request_metadata_id,
512 )
513 session.add(operation)
514 return operation.name
516 def load_operation(self, operation_name: str) -> Operation:
517 statement = (
518 select(OperationEntry).join(JobEntry).where(OperationEntry.name == operation_name, self._job_in_instance())
519 )
520 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session:
521 if op := session.execute(statement).scalars().first():
522 return self._load_operation(op)
524 raise NotFoundError(f"Operation name does not exist: [{operation_name}]")
526 def _load_operation(self, op: OperationEntry) -> Operation:
527 job: JobEntry = op.job
529 operation = operations_pb2.Operation(
530 name=op.name,
531 done=job.stage == OperationStage.COMPLETED.value or op.cancelled or job.cancelled,
532 )
533 metadata = ExecuteOperationMetadata(
534 stage=OperationStage.COMPLETED.value if operation.done else job.stage, # type: ignore[arg-type]
535 action_digest=string_to_digest(job.action_digest),
536 stderr_stream_name=job.stderr_stream_name or "",
537 stdout_stream_name=job.stdout_stream_name or "",
538 partial_execution_metadata=self.get_execute_action_metadata(job),
539 )
540 operation.metadata.Pack(metadata)
542 if job.cancelled or op.cancelled:
543 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.CANCELLED))
544 elif job.status_code is not None and job.status_code != code_pb2.OK:
545 operation.error.CopyFrom(status_pb2.Status(code=job.status_code))
547 execute_response: ExecuteResponse | None = None
548 if job.result:
549 result_digest = string_to_digest(job.result)
550 execute_response = self.storage.get_message(result_digest, ExecuteResponse)
551 if not execute_response:
552 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.DATA_LOSS))
553 elif job.cancelled:
554 execute_response = ExecuteResponse(
555 status=status_pb2.Status(code=code_pb2.CANCELLED, message="Execution cancelled")
556 )
558 if execute_response:
559 if self.action_browser_url:
560 execute_response.message = f"{self.action_browser_url}/action/{job.action_digest}/"
561 operation.response.Pack(execute_response)
563 return operation
565 def _get_job(self, job_name: str, session: Session, with_for_update: bool = False) -> JobEntry | None:
566 statement = select(JobEntry).where(JobEntry.name == job_name, self._job_in_instance())
567 if with_for_update:
568 statement = statement.with_for_update()
570 job: JobEntry | None = session.execute(statement).scalars().first()
571 if job:
572 LOGGER.debug(
573 "Loaded job from db.",
574 tags=dict(job_name=job_name, job_stage=job.stage, result=job.result, instance_name=job.instance_name),
575 )
577 return job
579 def get_operation_job_name(self, operation_name: str) -> str | None:
580 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session:
581 if operation := self._get_operation(operation_name, session):
582 return operation.job_name
583 return None
585 def get_operation_request_metadata_by_name(self, operation_name: str) -> RequestMetadata | None:
586 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session:
587 operation = self._get_operation(operation_name, session)
588 if not operation or not operation.request_metadata:
589 return None
591 metadata = RequestMetadata(
592 tool_details=ToolDetails(
593 tool_name=operation.request_metadata.tool_name or "",
594 tool_version=operation.request_metadata.tool_version or "",
595 ),
596 action_id=operation.job.action_digest,
597 correlated_invocations_id=operation.request_metadata.correlated_invocations_id or "",
598 tool_invocation_id=operation.request_metadata.invocation_id or "",
599 action_mnemonic=operation.request_metadata.action_mnemonic or "",
600 configuration_id=operation.request_metadata.configuration_id or "",
601 target_id=operation.request_metadata.target_id or "",
602 )
604 return metadata
606 def get_client_identity_by_operation(self, operation_name: str) -> ClientIdentity | None:
607 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session:
608 operation = self._get_operation(operation_name, session)
609 if not operation or not operation.client_identity:
610 return None
612 return ClientIdentity(
613 actor=operation.client_identity.actor or "",
614 subject=operation.client_identity.subject or "",
615 workflow=operation.client_identity.workflow or "",
616 )
618 def _notify_job_updated(self, job_names: str | list[str], session: Session) -> None:
619 if self._sql.dialect == "postgresql":
620 if isinstance(job_names, str):
621 job_names = [job_names]
622 for job_name in job_names:
623 session.execute(text(f"NOTIFY job_updated, '{job_name}';"))
625 def _get_operation(self, operation_name: str, session: Session) -> OperationEntry | None:
626 statement = (
627 select(OperationEntry).join(JobEntry).where(OperationEntry.name == operation_name, self._job_in_instance())
628 )
629 return session.execute(statement).scalars().first()
631 def _batch_timeout_jobs(self, job_select_stmt: Select[Any], status_code: int, message: str) -> int:
632 """Timeout all jobs selected by a query"""
633 with self._sql.session(sqlite_lock_immediately=True, exceptions_to_not_raise_on=[Exception]) as session:
634 # Get the full list of jobs to timeout
635 jobs = [job.name for job in session.execute(job_select_stmt).scalars().all()]
637 if jobs:
638 # Put response binary
639 response = remote_execution_pb2.ExecuteResponse(
640 status=status_pb2.Status(code=status_code, message=message)
641 )
642 response_binary = response.SerializeToString()
643 response_digest = create_digest(response_binary)
644 self.storage.bulk_update_blobs([(response_digest, response_binary)])
646 # Update response
647 stmt_timeout_jobs = (
648 update(JobEntry)
649 .where(JobEntry.name.in_(jobs))
650 .values(
651 stage=OperationStage.COMPLETED.value,
652 status_code=status_code,
653 result=digest_to_string(response_digest),
654 )
655 )
656 session.execute(stmt_timeout_jobs)
658 # Notify all jobs updated
659 self._notify_job_updated(jobs, session)
660 return len(jobs)
662 def execution_timer_loop(self, shutdown_requested: threading.Event) -> None:
663 """Periodically timeout aged executing jobs"""
664 while not shutdown_requested.is_set():
665 try:
666 self.cancel_jobs_exceeding_execution_timeout(self.max_execution_timeout)
667 except Exception as e:
668 LOGGER.exception("Failed to timeout aged executing jobs.", exc_info=e)
669 shutdown_requested.wait(timeout=self.execution_timer_interval)
671 @timed(METRIC.SCHEDULER.EXECUTION_TIMEOUT_DURATION)
672 def cancel_jobs_exceeding_execution_timeout(self, max_execution_timeout: int | None = None) -> None:
673 if not max_execution_timeout:
674 return
676 # Get the full list of jobs exceeding execution timeout
677 stale_jobs_statement = (
678 select(JobEntry)
679 .where(
680 JobEntry.stage == OperationStage.EXECUTING.value,
681 JobEntry.worker_start_timestamp <= datetime.utcnow() - timedelta(seconds=max_execution_timeout),
682 )
683 .with_for_update(skip_locked=True)
684 )
685 with self._sql.session(sqlite_lock_immediately=True, exceptions_to_not_raise_on=[Exception]) as session:
686 jobs = session.execute(stale_jobs_statement).scalars().all()
687 if not jobs:
688 return
690 response = remote_execution_pb2.ExecuteResponse(
691 status=status_pb2.Status(
692 code=code_pb2.DEADLINE_EXCEEDED,
693 message="Execution didn't finish within timeout threshold",
694 )
695 )
696 response_binary = response.SerializeToString()
697 response_digest = create_digest(response_binary)
699 # When running with a proxying client, we might need to specify instance.
700 with instance_context(jobs[0].instance_name):
701 self.storage.bulk_update_blobs([(response_digest, response_binary)])
703 for job in jobs:
704 executing_duration = datetime.utcnow() - (job.worker_start_timestamp or datetime.utcnow())
705 LOGGER.warning(
706 "Job has been executing for too long. Cancelling.",
707 tags=dict(
708 job_name=job.name,
709 executing_duration=executing_duration,
710 max_execution_timeout=max_execution_timeout,
711 ),
712 )
713 for op in job.operations:
714 op.cancelled = True
715 for lease in job.active_leases:
716 lease.state = LeaseState.CANCELLED.value
717 job.worker_completed_timestamp = datetime.utcnow()
718 job.stage = OperationStage.COMPLETED.value
719 job.cancelled = True
720 job.result = digest_to_string(response_digest)
722 for job in jobs:
723 self._notify_job_updated(job.name, session)
725 publish_counter_metric(METRIC.SCHEDULER.EXECUTION_TIMEOUT_COUNT, len(jobs))
727 def cancel_operation(self, operation_name: str) -> None:
728 statement = (
729 select(JobEntry)
730 .join(OperationEntry)
731 .where(OperationEntry.name == operation_name, self._job_in_instance())
732 .with_for_update()
733 )
734 with self._sql.session() as session:
735 if not (job := session.execute(statement).scalars().first()):
736 raise NotFoundError(f"Operation name does not exist: [{operation_name}]")
738 if job.stage == OperationStage.COMPLETED.value or job.cancelled:
739 return
741 for op in job.operations:
742 if op.name == operation_name:
743 if op.cancelled:
744 return
745 op.cancelled = True
747 if all(op.cancelled for op in job.operations):
748 for lease in job.active_leases:
749 lease.state = LeaseState.CANCELLED.value
750 job.worker_completed_timestamp = datetime.utcnow()
751 job.stage = OperationStage.COMPLETED.value
752 job.cancelled = True
754 self._notify_job_updated(job.name, session)
756 def list_operations(
757 self,
758 operation_filters: list[OperationFilter] | None = None,
759 page_size: int | None = None,
760 page_token: str | None = None,
761 ) -> tuple[list[operations_pb2.Operation], str]:
762 # Build filters and sort order
763 sort_keys = DEFAULT_SORT_KEYS
764 custom_filters = None
765 platform_filters = []
766 if operation_filters:
767 # Extract custom sort order (if present)
768 specified_sort_keys, non_sort_filters = extract_sort_keys(operation_filters)
770 # Only override sort_keys if there were sort keys actually present in the filter string
771 if specified_sort_keys:
772 sort_keys = specified_sort_keys
773 # Attach the operation name as a sort key for a deterministic order
774 # This will ensure that the ordering of results is consistent between queries
775 if not any(sort_key.name == "name" for sort_key in sort_keys):
776 sort_keys.append(SortKey(name="name", descending=False))
778 # Finally, compile the non-sort filters into a filter list
779 custom_filters = build_custom_filters(non_sort_filters)
780 platform_filters = [f for f in non_sort_filters if f.parameter == "platform"]
782 sort_columns = build_sort_column_list(sort_keys)
784 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session:
785 statement = (
786 select(OperationEntry)
787 .join(JobEntry, OperationEntry.job_name == JobEntry.name)
788 .outerjoin(RequestMetadataEntry)
789 .outerjoin(ClientIdentityEntry)
790 )
791 statement = statement.filter(self._job_in_instance())
793 # If we're filtering by platform, filter using a subquery containing job names
794 # which match the specified platform properties.
795 #
796 # NOTE: A platform filter using `!=` will return only jobs which set that platform
797 # property to an explicitly different value; jobs which don't set the property are
798 # filtered out.
799 if platform_filters:
800 platform_clauses = []
801 for platform_filter in platform_filters:
802 key, value = platform_filter.value.split(":", 1)
803 platform_clauses.append(
804 and_(PlatformEntry.key == key, platform_filter.operator(PlatformEntry.value, value))
805 )
807 job_name_subquery = (
808 select(job_platform_association.c.job_name)
809 .filter(
810 job_platform_association.c.platform_id.in_(
811 select(PlatformEntry.id).filter(or_(*platform_clauses))
812 )
813 )
814 .group_by(job_platform_association.c.job_name)
815 .having(func.count() == len(platform_filters))
816 )
817 statement = statement.filter(JobEntry.name.in_(job_name_subquery))
819 # Apply custom filters (if present)
820 if custom_filters:
821 statement = statement.filter(*custom_filters)
823 # Apply sort order
824 statement = statement.order_by(*sort_columns)
826 # Apply pagination filter
827 if page_token:
828 page_filter = build_page_filter(page_token, sort_keys)
829 statement = statement.filter(page_filter)
830 if page_size:
831 # We limit the number of operations we fetch to the page_size. However, we
832 # fetch an extra operation to determine whether we need to provide a
833 # next_page_token.
834 statement = statement.limit(page_size + 1)
836 operations = list(session.execute(statement).scalars().all())
838 if not page_size or not operations:
839 next_page_token = ""
841 # If the number of results we got is less than or equal to our page_size,
842 # we're done with the operations listing and don't need to provide another
843 # page token
844 elif len(operations) <= page_size:
845 next_page_token = ""
846 else:
847 # Drop the last operation since we have an extra
848 operations.pop()
849 # Our page token will be the last row of our set
850 next_page_token = build_page_token(operations[-1], sort_keys)
851 return [self._load_operation(operation) for operation in operations], next_page_token
853 def list_workers(self, name_filter: str, page_number: int, page_size: int) -> tuple[list[BotEntry], int]:
854 stmt = select(BotEntry, func.count().over().label("total"))
855 stmt = stmt.options(selectinload(BotEntry.job).selectinload(JobEntry.operations))
856 stmt = stmt.where(
857 or_(
858 BotEntry.name.ilike(f"%{name_filter}%"),
859 BotEntry.bot_id.ilike(f"%{name_filter}%"),
860 ),
861 BotEntry.instance_name == current_instance(),
862 )
863 stmt = stmt.order_by(BotEntry.bot_id)
865 if page_size:
866 stmt = stmt.limit(page_size)
867 if page_number > 1:
868 stmt = stmt.offset((page_number - 1) * page_size)
870 with self._sql.scoped_session() as session:
871 results = session.execute(stmt).all()
872 count = cast(int, results[0].total) if results else 0
873 session.expunge_all()
875 return [r[0] for r in results], count
877 def get_metrics(self) -> SchedulerMetrics | None:
878 # Skip publishing overall scheduler metrics if we have recently published them
879 last_publish_time = self._last_scheduler_metrics_publish_time
880 time_since_publish = None
881 if last_publish_time:
882 time_since_publish = datetime.utcnow() - last_publish_time
883 if time_since_publish and time_since_publish < self._scheduler_metrics_publish_interval:
884 # Published too recently, skip
885 return None
887 metrics: SchedulerMetrics = {}
888 # metrics to gather: (category_name, function_returning_query, callback_function)
890 try:
891 with self._sql_ro.session(exceptions_to_not_raise_on=[Exception]) as session:
892 # To utilize "ix_jobs_stage_property_label" B-tree index we query
893 # `stage < COMPLETED.value` rather than `stage != COMPLETED.value`.
894 results = session.execute(
895 select(
896 JobEntry.stage.label("job_stage"),
897 JobEntry.property_label.label("property_label"),
898 func.count(JobEntry.name).label("job_count"),
899 )
900 .where(JobEntry.stage < OperationStage.COMPLETED.value)
901 .group_by(JobEntry.stage, JobEntry.property_label),
902 ).all()
904 jobs_metrics = {}
905 for stage in OperationStage:
906 if stage != OperationStage.COMPLETED:
907 jobs_metrics[stage.name, "unknown"] = 0
909 for job_stage, property_label, job_count in results:
910 jobs_metrics[OperationStage(job_stage).name, property_label] = cast(int, job_count)
912 metrics["jobs"] = jobs_metrics
913 except DatabaseError:
914 LOGGER.warning("Unable to gather metrics due to a Database Error.")
915 return {}
917 # This is only updated within the metrics asyncio loop; no race conditions
918 self._last_scheduler_metrics_publish_time = datetime.utcnow()
920 return metrics
922 def _queued_jobs_by_capability(self, capability_hash: str) -> Select[Any]:
923 return (
924 select(JobEntry)
925 .with_for_update(skip_locked=True)
926 .where(
927 JobEntry.assigned != True, # noqa: E712
928 self._job_in_instance(),
929 JobEntry.platform_requirements == capability_hash,
930 JobEntry.stage == OperationStage.QUEUED.value,
931 )
932 )
934 def assign_n_leases_by_priority(
935 self,
936 *,
937 capability_hash: str,
938 bot_names: list[str],
939 ) -> list[str]:
940 job_statement = self._queued_jobs_by_capability(capability_hash).order_by(
941 JobEntry.priority, JobEntry.queued_timestamp
942 )
943 return self._assign_n_leases(job_statement=job_statement, bot_names=bot_names)
945 def assign_n_leases_by_age(
946 self,
947 *,
948 capability_hash: str,
949 bot_names: list[str],
950 ) -> list[str]:
951 job_statement = self._queued_jobs_by_capability(capability_hash).order_by(JobEntry.queued_timestamp)
952 return self._assign_n_leases(job_statement=job_statement, bot_names=bot_names)
954 @timed(METRIC.SCHEDULER.ASSIGNMENT_DURATION)
955 def _assign_n_leases(self, *, job_statement: Select[Any], bot_names: list[str]) -> list[str]:
956 bot_statement = (
957 select(BotEntry)
958 .with_for_update(skip_locked=True)
959 .where(
960 BotEntry.lease_id.is_(None),
961 self._bot_in_instance(),
962 BotEntry.name.in_(bot_names),
963 BotEntry.expiry_time > datetime.utcnow(),
964 )
965 )
967 try:
968 with self._sql.session(sqlite_lock_immediately=True, exceptions_to_not_raise_on=[Exception]) as session:
969 jobs = session.execute(job_statement.limit(len(bot_names))).scalars().all()
970 bots = session.execute(bot_statement.limit(len(jobs))).scalars().all()
972 assigned_bot_names: list[str] = []
973 for job, bot in zip(jobs, bots):
974 job.assigned = True
975 job.queued_time_duration = int((datetime.utcnow() - job.queued_timestamp).total_seconds())
976 job.worker_start_timestamp = datetime.utcnow()
977 job.worker_completed_timestamp = None
978 bot.lease_id = job.name
979 bot.last_update_timestamp = datetime.utcnow()
980 if job.active_leases:
981 lease = job.active_leases[0]
982 LOGGER.debug(
983 "Reassigned existing lease.",
984 tags=dict(
985 job_name=job.name,
986 bot_id=bot.bot_id,
987 bot_name=bot.name,
988 prev_lease_state=lease.state,
989 prev_lease_status=lease.status,
990 prev_bot_id=lease.worker_name,
991 ),
992 )
993 lease.state = LeaseState.PENDING.value
994 lease.status = None
995 lease.worker_name = bot.bot_id
996 else:
997 LOGGER.debug(
998 "Assigned new lease.", tags=dict(job_name=job.name, bot_id=bot.bot_id, bot_name=bot.name)
999 )
1000 session.add(
1001 LeaseEntry(
1002 job_name=job.name,
1003 state=LeaseState.PENDING.value,
1004 status=None,
1005 worker_name=bot.bot_id,
1006 )
1007 )
1008 assigned_bot_names.append(bot.name)
1010 return assigned_bot_names
1011 except DatabaseError:
1012 LOGGER.warning("Will not assign any leases this time due to a Database Error.")
1013 return []
1015 def queue_timer_loop(self, shutdown_requested: threading.Event) -> None:
1016 """Periodically timeout aged queued jobs"""
1018 if not (opts := self.queue_timeout_options):
1019 return
1021 job_max_age = opts.job_max_age
1022 period = opts.handling_period
1023 limit = opts.max_handling_window
1025 last_timeout_time = datetime.utcnow()
1026 while not shutdown_requested.is_set():
1027 now = datetime.utcnow()
1028 if now - last_timeout_time < period:
1029 LOGGER.info(f"Job queue timeout thread sleeping for {period} seconds")
1030 shutdown_requested.wait(timeout=period.total_seconds())
1031 continue
1033 timeout_jobs_scheduled_before = now - job_max_age
1034 try:
1035 with timer(METRIC.SCHEDULER.QUEUE_TIMEOUT_DURATION):
1036 num_timeout = self._timeout_queued_jobs_scheduled_before(timeout_jobs_scheduled_before, limit)
1037 LOGGER.info(f"Timed-out {num_timeout} queued jobs scheduled before {timeout_jobs_scheduled_before}")
1038 if num_timeout > 0:
1039 publish_counter_metric(METRIC.SCHEDULER.QUEUE_TIMEOUT_COUNT, num_timeout)
1041 except Exception as e:
1042 LOGGER.exception("Failed to timeout aged queued jobs.", exc_info=e)
1043 finally:
1044 last_timeout_time = now
1046 def _timeout_queued_jobs_scheduled_before(self, dt: datetime, limit: int) -> int:
1047 jobs_to_timeout_stmt = (
1048 select(JobEntry)
1049 .where(JobEntry.stage == OperationStage.QUEUED.value)
1050 .where(JobEntry.queued_timestamp < dt)
1051 .limit(limit)
1052 )
1053 return self._batch_timeout_jobs(
1054 jobs_to_timeout_stmt, code_pb2.UNAVAILABLE, "Operation has been queued for too long"
1055 )
1057 def prune_timer_loop(self, shutdown_requested: threading.Event) -> None:
1058 """Running in a background thread, this method wakes up periodically and deletes older records
1059 from the jobs tables using configurable parameters"""
1061 if not (opts := self.pruning_options):
1062 return
1064 job_max_age = opts.job_max_age
1065 pruning_period = opts.handling_period
1066 limit = opts.max_handling_window
1068 utc_last_prune_time = datetime.utcnow()
1069 while not shutdown_requested.is_set():
1070 utcnow = datetime.utcnow()
1071 if (utcnow - pruning_period) < utc_last_prune_time:
1072 LOGGER.info(f"Pruner thread sleeping for {pruning_period}(until {utcnow + pruning_period})")
1073 shutdown_requested.wait(timeout=pruning_period.total_seconds())
1074 continue
1076 delete_before_datetime = utcnow - job_max_age
1077 try:
1078 num_rows = self._delete_jobs_prior_to(delete_before_datetime, limit)
1079 LOGGER.info(f"Pruned {num_rows} row(s) from the jobs table older than {delete_before_datetime}")
1080 except Exception:
1081 LOGGER.exception("Caught exception while deleting jobs records.")
1082 finally:
1083 # Update even if error occurred to avoid potentially infinitely retrying
1084 utc_last_prune_time = utcnow
1086 LOGGER.info("Exiting pruner thread.")
1088 @timed(METRIC.SCHEDULER.PRUNE_DURATION)
1089 def _delete_jobs_prior_to(self, delete_before_datetime: datetime, limit: int) -> int:
1090 """Deletes older records from the jobs tables constrained by `delete_before_datetime` and `limit`"""
1091 delete_stmt = delete(JobEntry).where(
1092 JobEntry.name.in_(
1093 select(JobEntry.name)
1094 .with_for_update(skip_locked=True)
1095 .where(JobEntry.worker_completed_timestamp <= delete_before_datetime)
1096 .limit(limit)
1097 ),
1098 )
1100 with self._sql.session() as session:
1101 options = {"synchronize_session": "fetch"}
1102 num_rows_deleted: int = session.execute(delete_stmt, execution_options=options).rowcount
1104 if num_rows_deleted:
1105 publish_counter_metric(METRIC.SCHEDULER.PRUNE_COUNT, num_rows_deleted)
1107 return num_rows_deleted
1109 def _insert_on_conflict_do_nothing(self, model: type[OrmBase]) -> Insert:
1110 # `Insert.on_conflict_do_nothing` is a SQLAlchemy "generative method", it
1111 # returns a modified copy of the statement it is called on. For
1112 # some reason mypy can't understand this, so the errors are ignored here.
1113 if self._sql.dialect == "sqlite":
1114 sqlite_insert: sqlite.Insert = sqlite.insert(model)
1115 return sqlite_insert.on_conflict_do_nothing()
1117 elif self._sql.dialect == "postgresql":
1118 insertion: postgresql.Insert = postgresql.insert(model)
1119 return insertion.on_conflict_do_nothing()
1121 else:
1122 # Fall back to the non-specific insert implementation. This doesn't
1123 # support `ON CONFLICT DO NOTHING`, so callers need to be careful to
1124 # still catch IntegrityErrors if other database backends are possible.
1125 return insert(model)
1127 def get_or_create_client_identity_in_store(
1128 self, session: Session, client_id: ClientIdentityEntry
1129 ) -> ClientIdentityEntry:
1130 """Get the ClientIdentity in the storage or create one.
1131 This helper function essentially makes sure the `client_id` is created during the transaction
1133 Args:
1134 session (Session): sqlalchemy Session
1135 client_id (ClientIdentityEntry): identity of the client that creates an operation
1137 Returns:
1138 ClientIdentityEntry: identity of the client that creates an operation
1139 """
1140 insertion = self._insert_on_conflict_do_nothing(ClientIdentityEntry)
1141 insertion = insertion.values(
1142 {
1143 "instance": client_id.instance,
1144 "workflow": client_id.workflow,
1145 "actor": client_id.actor,
1146 "subject": client_id.subject,
1147 }
1148 )
1149 try:
1150 session.execute(insertion)
1152 # Handle unique constraint violation when using an unsupported database (ie. not PostgreSQL or SQLite)
1153 except IntegrityError:
1154 LOGGER.debug("Handled IntegrityError when inserting client identity.")
1156 stmt = (
1157 select(ClientIdentityEntry)
1158 .where(ClientIdentityEntry.instance == client_id.instance)
1159 .where(ClientIdentityEntry.workflow == client_id.workflow)
1160 .where(ClientIdentityEntry.actor == client_id.actor)
1161 .where(ClientIdentityEntry.subject == client_id.subject)
1162 )
1164 result: ClientIdentityEntry = session.execute(stmt).scalar_one()
1165 return result
1167 def get_or_create_request_metadata_in_store(
1168 self, session: Session, request_metadata: RequestMetadata
1169 ) -> RequestMetadataEntry:
1170 insertion = self._insert_on_conflict_do_nothing(RequestMetadataEntry)
1171 insertion = insertion.values(
1172 {
1173 "action_mnemonic": request_metadata.action_mnemonic,
1174 "configuration_id": request_metadata.configuration_id,
1175 "correlated_invocations_id": request_metadata.correlated_invocations_id,
1176 "invocation_id": request_metadata.tool_invocation_id,
1177 "target_id": request_metadata.target_id,
1178 "tool_name": request_metadata.tool_details.tool_name,
1179 "tool_version": request_metadata.tool_details.tool_version,
1180 }
1181 )
1182 try:
1183 session.execute(insertion)
1185 # Handle unique constraint violation when using an unsupported database (ie. not PostgreSQL or SQLite)
1186 except IntegrityError:
1187 LOGGER.debug("Handled IntegrityError when inserting request metadata.")
1189 stmt = (
1190 select(RequestMetadataEntry)
1191 .where(RequestMetadataEntry.action_mnemonic == request_metadata.action_mnemonic)
1192 .where(RequestMetadataEntry.configuration_id == request_metadata.configuration_id)
1193 .where(RequestMetadataEntry.correlated_invocations_id == request_metadata.correlated_invocations_id)
1194 .where(RequestMetadataEntry.invocation_id == request_metadata.tool_invocation_id)
1195 .where(RequestMetadataEntry.target_id == request_metadata.target_id)
1196 .where(RequestMetadataEntry.tool_name == request_metadata.tool_details.tool_name)
1197 .where(RequestMetadataEntry.tool_version == request_metadata.tool_details.tool_version)
1198 )
1200 result: RequestMetadataEntry = session.execute(stmt).scalar_one()
1201 return result
1203 def add_bot_entry(
1204 self, *, bot_session_id: str, bot_session_status: int, bot_property_labels: list[str] = []
1205 ) -> str:
1206 if not bot_property_labels:
1207 bot_property_labels = ["unknown"]
1209 with self._sql.session() as session:
1210 # Check if bot_id is already known. If yes, all leases associated with
1211 # it are requeued and the existing record deleted. A new record is then
1212 # created with the new bot_id/name combination, as it would in the
1213 # unknown case.
1214 locate_bot_stmt = (
1215 select(BotEntry).where(BotEntry.bot_id == bot_session_id, self._bot_in_instance()).with_for_update()
1216 )
1217 self._close_bot_sessions(session, session.execute(locate_bot_stmt).scalars().all())
1219 bot_name = f"{current_instance()}/{str(uuid.uuid4())}"
1220 session.add(
1221 BotEntry(
1222 name=bot_name,
1223 bot_id=bot_session_id,
1224 last_update_timestamp=datetime.utcnow(),
1225 lease_id=None,
1226 bot_status=bot_session_status,
1227 property_labels=[],
1228 instance_name=current_instance(),
1229 expiry_time=datetime.utcnow() + timedelta(seconds=self.bot_session_keepalive_timeout),
1230 )
1231 )
1233 for label in bot_property_labels:
1234 session.add(PropertyLabelEntry(property_label=label, bot_name=bot_name))
1236 return bot_name
1238 def close_bot_sessions(self, bot_name: str) -> None:
1239 with self._sql.session() as session:
1240 locate_bot_stmt = (
1241 select(BotEntry).where(BotEntry.name == bot_name, self._bot_in_instance()).with_for_update()
1242 )
1243 self._close_bot_sessions(session, session.execute(locate_bot_stmt).scalars().all())
1245 def _close_bot_sessions(self, session: Session, bots: Sequence[BotEntry]) -> None:
1246 for bot in bots:
1247 log_tags = {
1248 "instance_name": try_current_instance(),
1249 "request.bot_name": bot.name,
1250 "request.bot_id": bot.bot_id,
1251 "request.bot_status": bot.bot_status,
1252 }
1253 LOGGER.debug("Closing bot session.", tags=log_tags)
1254 if bot.lease_id:
1255 if job := self._get_job(bot.lease_id, session, with_for_update=True):
1256 for db_lease in job.active_leases:
1257 lease_tags = {**log_tags, "db.lease_id": job.name, "db.lease_state": db_lease.state}
1258 LOGGER.debug("Reassigning lease for bot session.", tags=lease_tags)
1259 self._retry_job_lease(session, job, db_lease)
1260 self._notify_job_updated(job.name, session)
1261 session.delete(bot)
1263 def session_expiry_timer_loop(self, shutdown_requested: threading.Event) -> None:
1264 LOGGER.info("Starting BotSession reaper.", tags=dict(keepalive_timeout=self.bot_session_keepalive_timeout))
1265 while not shutdown_requested.is_set():
1266 try:
1267 while self.reap_expired_sessions():
1268 if shutdown_requested.is_set():
1269 break
1270 except Exception as exception:
1271 LOGGER.exception(exception)
1272 shutdown_requested.wait(timeout=self.session_expiry_interval)
1274 def reap_expired_sessions(self) -> bool:
1275 """
1276 Find and close expired bot sessions. Returns True if sessions were closed.
1277 Only closes a few sessions to minimize time in transaction.
1278 """
1280 with self._sql.session() as session:
1281 locate_bot_stmt = (
1282 select(BotEntry)
1283 .where(BotEntry.expiry_time < datetime.utcnow())
1284 .order_by(BotEntry.expiry_time.desc())
1285 .with_for_update(skip_locked=True)
1286 .limit(5)
1287 )
1288 if bots := cast(list[BotEntry], session.execute(locate_bot_stmt).scalars().all()):
1289 bots_by_instance: dict[str, list[BotEntry]] = defaultdict(list)
1290 for bot in bots:
1291 LOGGER.warning(
1292 "BotSession has expired.",
1293 tags=dict(
1294 name=bot.name, bot_id=bot.bot_id, instance_name=bot.instance_name, deadline=bot.expiry_time
1295 ),
1296 )
1297 bots_by_instance[bot.instance_name].append(bot)
1298 for instance_name, instance_bots in bots_by_instance.items():
1299 with instance_context(instance_name):
1300 self._close_bot_sessions(session, instance_bots)
1301 return True
1302 return False
1304 def _publish_job_duration(
1305 self, start: Timestamp | None, end: Timestamp | None, state: str, property_label: str
1306 ) -> None:
1307 start_set = start is not None and (start.seconds > 0 or start.nanos > 0)
1308 end_set = end is not None and (end.seconds > 0 or end.nanos > 0)
1309 if start_set and end_set:
1310 publish_timer_metric(
1311 METRIC.JOB.DURATION,
1312 end.ToDatetime() - start.ToDatetime(), # type: ignore[union-attr]
1313 state=state,
1314 propertyLabel=property_label,
1315 )
1317 @timed(METRIC.SCHEDULER.SYNCHRONIZE_DURATION)
1318 def synchronize_bot_lease(
1319 self,
1320 bot_name: str,
1321 bot_id: str,
1322 bot_status: int,
1323 session_lease: Lease | None,
1324 partial_execution_metadata: dict[str, ExecutedActionMetadata] | None = None,
1325 ) -> Lease | None:
1326 log_tags = {
1327 "instance_name": try_current_instance(),
1328 "request.bot_id": bot_id,
1329 "request.bot_status": bot_status,
1330 "request.bot_name": bot_name,
1331 "request.lease_id": session_lease.id if session_lease else "",
1332 "request.lease_state": session_lease.state if session_lease else "",
1333 }
1335 with self._sql.session(exceptions_to_not_raise_on=[Exception]) as session:
1336 locate_bot_stmt = (
1337 select(BotEntry).where(BotEntry.bot_id == bot_id, self._bot_in_instance()).with_for_update()
1338 )
1339 bots: Sequence[BotEntry] = session.execute(locate_bot_stmt).scalars().all()
1340 if not bots:
1341 raise InvalidArgumentError(f"Bot does not exist while validating leases. {log_tags}")
1343 # This is a tricky case. This case happens when a new bot session is created while an older
1344 # session for a bot id is waiting on leases. This can happen when a worker reboots but the
1345 # connection context takes a long time to close. In this case, we DO NOT want to update anything
1346 # in the database, because the work/lease has already been re-assigned to a new session.
1347 # Closing anything in the database at this point would cause the newly restarted worker
1348 # to get cancelled prematurely.
1349 if len(bots) == 1 and bots[0].name != bot_name:
1350 raise BotSessionMismatchError(
1351 "Mismatch between client supplied bot_id/bot_name and buildgrid database record. "
1352 f"db.bot_name=[{bots[0].name}] {log_tags}"
1353 )
1355 # Everything at this point is wrapped in try/catch, so we can raise BotSessionMismatchError or
1356 # BotSessionClosedError and have the session be closed if preconditions from here out fail.
1357 try:
1358 # There should never be time when two bot sessions exist for the same bot id. We have logic to
1359 # assert that old database entries for a given bot id are closed and deleted prior to making a
1360 # new one. If this case happens shut everything down, so we can hopefully recover.
1361 if len(bots) > 1:
1362 raise BotSessionMismatchError(
1363 "Bot id is registered to more than one bot session. "
1364 f"names=[{', '.join(bot.name for bot in bots)}] {log_tags}"
1365 )
1367 bot = bots[0]
1368 log_tags["db.lease_id"] = bot.lease_id
1370 # Validate that the lease_id matches the client and database if both are supplied.
1371 if (session_lease and session_lease.id and bot.lease_id) and (session_lease.id != bot.lease_id):
1372 raise BotSessionMismatchError(
1373 f"Mismatch between client supplied lease_id and buildgrid database record. {log_tags}"
1374 )
1376 # Update the expiry time.
1377 bot.expiry_time = datetime.utcnow() + timedelta(seconds=self.bot_session_keepalive_timeout)
1378 bot.last_update_timestamp = datetime.utcnow()
1379 bot.bot_status = bot_status
1381 # Validate the cases where the database doesn't know about any leases.
1382 if bot.lease_id is None:
1383 # If there's no lease in the database or session, we have nothing to update!
1384 if not session_lease:
1385 LOGGER.debug("No lease in session or database. Skipping.", tags=log_tags)
1386 return None
1388 # If the database has no lease, but the work is completed, we probably timed out the last call.
1389 if session_lease.state == LeaseState.COMPLETED.value:
1390 LOGGER.debug("No lease in database, but session lease is completed. Skipping.", tags=log_tags)
1391 return None
1393 # Otherwise, the bot session has a lease that the server doesn't know about. Bad bad bad.
1394 raise BotSessionClosedError(f"Bot session lease id does not match the database. {log_tags}")
1396 # Let's now lock the job so no more state transitions occur while we perform our updates.
1397 job = self._get_job(bot.lease_id, session, with_for_update=True)
1398 if not job:
1399 raise BotSessionClosedError(f"Bot session lease id points to non-existent job. {log_tags}")
1401 # If we don't have any leases assigned to the job now, someone interrupted us before locking.
1402 # Disconnect our bot from mutating this job.
1403 if not job.leases:
1404 raise BotSessionClosedError(f"Leases were changed while job was being locked. {log_tags}")
1406 db_lease = job.leases[0]
1407 log_tags["db.lease_state"] = db_lease.state
1409 # Update Partial Execution Metadata:
1410 #
1411 # Update the job table in the database with the partial execution metadata from the worker.
1412 # This is included in the UpdateBotSession GRPC call and should contain partial execution metadata
1413 # for each lease. The job.name is the same as the lease_id.
1415 if partial_execution_metadata:
1416 if metadata := partial_execution_metadata.get(job.name):
1417 if metadata.HasField("input_fetch_start_timestamp"):
1418 job.input_fetch_start_timestamp = metadata.input_fetch_start_timestamp.ToDatetime()
1419 if metadata.HasField("input_fetch_completed_timestamp"):
1420 job.input_fetch_completed_timestamp = metadata.input_fetch_completed_timestamp.ToDatetime()
1421 if metadata.HasField("output_upload_start_timestamp"):
1422 job.output_upload_start_timestamp = metadata.output_upload_start_timestamp.ToDatetime()
1423 if metadata.HasField("output_upload_completed_timestamp"):
1424 job.output_upload_completed_timestamp = (
1425 metadata.output_upload_completed_timestamp.ToDatetime()
1426 )
1427 if metadata.HasField("execution_start_timestamp"):
1428 job.execution_start_timestamp = metadata.execution_start_timestamp.ToDatetime()
1429 if metadata.HasField("execution_completed_timestamp"):
1430 job.execution_completed_timestamp = metadata.execution_completed_timestamp.ToDatetime()
1432 # Assign:
1433 #
1434 # If the lease is in the PENDING state, this means that it is a new lease for the worker, which
1435 # it must acknowledge (the next time it calls UpdateBotSession) by changing the state to ACTIVE.
1436 #
1437 # Leases contain a “payload,” which is an Any proto that must be understandable to the bot.
1438 #
1439 # If at any time the bot issues a call to UpdateBotSession that is inconsistent with what the service
1440 # expects, the service can take appropriate action. For example, the service may have assigned a
1441 # lease to a bot, but the call gets interrupted before the bot receives the message, perhaps because
1442 # the UpdateBotSession call times out. As a result, the next call to UpdateBotSession from the bot
1443 # will not include the lease, and the service can immediately conclude that the lease needs to be
1444 # reassigned.
1445 #
1446 if not session_lease:
1447 if db_lease.state != LeaseState.PENDING.value:
1448 raise BotSessionClosedError(
1449 f"Session has no lease and database entry not in pending state. {log_tags}"
1450 )
1452 job.stage = OperationStage.EXECUTING.value
1453 if self.logstream_channel and self.logstream_instance is not None:
1454 try:
1455 action_digest = string_to_digest(job.action_digest)
1456 parent_base = f"{action_digest.hash}_{action_digest.size_bytes}_{int(time())}"
1457 with logstream_client(self.logstream_channel, self.logstream_instance) as ls_client:
1458 stdout_stream = ls_client.create(f"{parent_base}_stdout")
1459 stderr_stream = ls_client.create(f"{parent_base}_stderr")
1460 job.stdout_stream_name = stdout_stream.name
1461 job.stdout_stream_write_name = stdout_stream.write_resource_name
1462 job.stderr_stream_name = stderr_stream.name
1463 job.stderr_stream_write_name = stderr_stream.write_resource_name
1464 except Exception as e:
1465 LOGGER.warning("Failed to create log stream.", tags=log_tags, exc_info=e)
1467 self._notify_job_updated(job.name, session)
1468 LOGGER.debug("Pending lease sent to bot for ack.", tags=log_tags)
1469 return db_lease.to_protobuf()
1471 # At this point, we know that there's a lease both in the bot session and in the database.
1473 # Accept:
1474 #
1475 # If the lease is in the PENDING state, this means that it is a new lease for the worker,
1476 # which it must acknowledge (the next time it calls UpdateBotSession) by changing the state to ACTIVE
1477 #
1478 if session_lease.state == LeaseState.ACTIVE.value and db_lease.state == LeaseState.PENDING.value:
1479 db_lease.state = LeaseState.ACTIVE.value
1480 self._notify_job_updated(job.name, session)
1481 LOGGER.debug("Bot acked pending lease.", tags=log_tags)
1483 # Now the job has been accepted by a worker the time this job has spent in the queue can be
1484 # calculated and posted to the metrics.
1485 job_metadata = self.get_execute_action_metadata(job)
1486 queued = job_metadata.queued_timestamp
1487 worker_start = job_metadata.worker_start_timestamp
1488 self._publish_job_duration(queued, worker_start, "Queued", job.property_label)
1490 return session_lease
1492 # Complete:
1493 #
1494 # Once the assignment is complete - either because it finishes or because it times out - the bot
1495 # calls Bots.UpdateBotSession again, this time updating the state of the lease from accepted to
1496 # complete, and optionally by also populating the lease’s results field, which is another Any proto.
1497 # The service can then assign it new work (removing any completed leases).
1498 #
1499 # A successfully completed lease may go directly from PENDING to COMPLETED if, for example, the
1500 # lease was completed before the bot has had the opportunity to transition to ACTIVE, or if the
1501 # update transitioning the lease to the ACTIVE state was lost.
1502 #
1503 if session_lease.state == LeaseState.COMPLETED.value and db_lease.state in (
1504 LeaseState.PENDING.value,
1505 LeaseState.ACTIVE.value,
1506 ):
1507 log_tags["request.lease_status_code"] = session_lease.status.code
1508 log_tags["request.lease_status_message"] = session_lease.status.message
1509 log_tags["db.n_tries"] = job.n_tries
1511 bot.lease_id = None
1512 if (
1513 session_lease.status.code in self.RETRYABLE_STATUS_CODES
1514 and job.n_tries < self.max_job_attempts
1515 ):
1516 LOGGER.debug("Retrying bot lease.", tags=log_tags)
1517 self._retry_job_lease(session, job, db_lease)
1518 else:
1519 LOGGER.debug("Bot completed lease.", tags=log_tags)
1520 self._complete_lease(session, job, db_lease, session_lease.status, session_lease.result)
1522 self._notify_job_updated(job.name, session)
1523 return None
1525 # Cancel:
1526 #
1527 # At any time, the service may change the state of a lease from PENDING or ACTIVE to CANCELLED;
1528 # the bot may not change to this state. The service then waits for the bot to acknowledge the
1529 # change by updating its own status to CANCELLED as well. Once both the service and the bot agree,
1530 # the service may remove it from the list of leases.
1531 #
1532 if session_lease.state == db_lease.state == LeaseState.CANCELLED.value:
1533 bot.lease_id = None
1534 LOGGER.debug("Bot acked cancelled lease.", tags=log_tags)
1535 return None
1537 if db_lease.state == LeaseState.CANCELLED.value:
1538 session_lease.state = LeaseState.CANCELLED.value
1539 LOGGER.debug("Cancelled lease sent to bot for ack.", tags=log_tags)
1540 return session_lease
1542 if session_lease.state == LeaseState.CANCELLED.value:
1543 raise BotSessionClosedError(f"Illegal attempt from session to set state as cancelled. {log_tags}")
1545 # Keepalive:
1546 #
1547 # The Bot periodically calls Bots.UpdateBotSession, either if there’s a genuine change (for example,
1548 # an attached phone has died) or simply to let the service know that it’s alive and ready to receive
1549 # work. If the bot doesn’t call back on time, the service considers it to have died, and all work
1550 # from the bot to be lost.
1551 #
1552 if session_lease.state == db_lease.state:
1553 LOGGER.debug("Bot heartbeat acked.", tags=log_tags)
1554 return session_lease
1556 # Any other transition should really never happen... cover it anyways.
1557 raise BotSessionClosedError(f"Unsupported lease state transition. {log_tags}")
1558 # TODO allow creating a session with manual commit logic.
1559 # For now... Sneak the exception past the context manager.
1560 except (BotSessionMismatchError, BotSessionClosedError) as e:
1561 self._close_bot_sessions(session, bots)
1562 err = e
1563 raise err
1565 def _retry_job_lease(self, session: Session, job: JobEntry, lease: LeaseEntry) -> None:
1566 # If the job was mutated before we could lock it, exit fast on terminal states.
1567 if job.cancelled or job.stage == OperationStage.COMPLETED.value:
1568 return
1570 if job.n_tries >= self.max_job_attempts:
1571 status = status_pb2.Status(
1572 code=code_pb2.ABORTED, message=f"Job was retried {job.n_tries} unsuccessfully. Aborting."
1573 )
1574 self._complete_lease(session, job, lease, status=status)
1575 return
1577 job.stage = OperationStage.QUEUED.value
1578 job.assigned = False
1579 job.n_tries += 1
1581 lease.state = LeaseState.PENDING.value
1582 lease.status = None
1583 lease.worker_name = None
1585 def _complete_lease(
1586 self, session: Session, job: JobEntry, lease: LeaseEntry, status: Status, result: ProtoAny | None = None
1587 ) -> None:
1588 lease.state = LeaseState.COMPLETED.value
1589 lease.status = status.code
1591 job.stage = OperationStage.COMPLETED.value
1592 job.status_code = status.code
1593 if not job.do_not_cache:
1594 job.do_not_cache = status.code != code_pb2.OK
1595 job.worker_completed_timestamp = datetime.utcnow()
1597 action_result = ActionResult()
1598 if result is not None and result.Is(action_result.DESCRIPTOR):
1599 result.Unpack(action_result)
1600 now = datetime.utcnow()
1601 action_result.execution_metadata.queued_timestamp.FromDatetime(job.queued_timestamp)
1602 action_result.execution_metadata.worker_start_timestamp.FromDatetime(job.worker_start_timestamp or now)
1603 action_result.execution_metadata.worker_completed_timestamp.FromDatetime(job.worker_completed_timestamp or now)
1604 response = ExecuteResponse(result=action_result, cached_result=False, status=status)
1606 job.result = digest_to_string(self.storage.put_message(response))
1608 if self.action_cache and result and not job.do_not_cache:
1609 action_digest = string_to_digest(job.action_digest)
1610 try:
1611 self.action_cache.update_action_result(action_digest, action_result)
1612 LOGGER.debug(
1613 "Stored action result in ActionCache.",
1614 tags=dict(action_result=action_result, digest=action_digest),
1615 )
1616 except UpdateNotAllowedError:
1617 # The configuration doesn't allow updating the old result
1618 LOGGER.exception(
1619 "ActionCache is not configured to allow updates, ActionResult wasn't updated.",
1620 tags=dict(digest=action_digest),
1621 )
1622 except Exception:
1623 LOGGER.exception(
1624 "Unable to update ActionCache, results will not be stored in the ActionCache.",
1625 tags=dict(digest=action_digest),
1626 )
1628 # Update retentions
1629 self._update_action_retention(
1630 Action.FromString(job.action),
1631 string_to_digest(job.action_digest),
1632 retention_hours=self.completed_action_retention_hours,
1633 )
1634 if action_result.ByteSize() > 0:
1635 self._update_action_result_retention(action_result, retention_hours=self.action_result_retention_hours)
1637 self._publish_execution_stats(session, job.name, action_result.execution_metadata, job.property_label)
1639 def get_bot_status_metrics(self) -> BotMetrics:
1640 """Count the number of bots with a particular status and property_label"""
1641 with self._sql.session() as session:
1642 metrics: BotMetrics = {"bots_total": {}, "bots_per_property_label": {}}
1644 # bot count by status only
1645 query_total = (
1646 session.query(BotEntry.bot_status, func.count(BotEntry.bot_status))
1647 .group_by(BotEntry.bot_status)
1648 .filter(self._bot_in_instance())
1649 )
1650 for status in BotStatus:
1651 metrics["bots_total"][status] = 0
1652 for [bot_status, count] in query_total.all():
1653 metrics["bots_total"][BotStatus(bot_status)] = cast(int, count)
1655 # bot count by status for each property label
1656 query_per_label = (
1657 session.query(BotEntry.bot_status, PropertyLabelEntry.property_label, func.count(BotEntry.bot_status))
1658 .join(BotEntry, BotEntry.name == PropertyLabelEntry.bot_name, isouter=True)
1659 .group_by(BotEntry.bot_status, PropertyLabelEntry.property_label)
1660 .filter(self._bot_in_instance())
1661 )
1662 for status in BotStatus:
1663 metrics["bots_per_property_label"][status, "unknown"] = 0
1664 for [bot_status, property_label, count] in query_per_label.all():
1665 metrics["bots_per_property_label"][BotStatus(bot_status), property_label] = cast(int, count)
1667 return metrics
1669 def refresh_bot_expiry_time(self, bot_name: str, bot_id: str) -> datetime:
1670 """
1671 This update is done out-of-band from the main synchronize_bot_lease transaction, as there
1672 are cases where we will skip calling the synchronization, but still want the session to be
1673 updated such that it does not get reaped. This slightly duplicates the update happening in
1674 synchronize_bot_lease, however, that update is still required to not have the job reaped
1675 during its job assignment waiting period.
1677 This method should be called at the end of the update and create bot session methods.
1678 The returned datetime should be assigned to the deadline within the returned session proto.
1679 """
1681 locate_bot_stmt = (
1682 select(BotEntry)
1683 .where(BotEntry.name == bot_name, BotEntry.bot_id == bot_id, self._bot_in_instance())
1684 .with_for_update()
1685 )
1686 with self._sql.session() as session:
1687 if bot := session.execute(locate_bot_stmt).scalar():
1688 now = datetime.utcnow()
1689 bot.last_update_timestamp = now
1690 bot.expiry_time = now + timedelta(seconds=self.bot_session_keepalive_timeout)
1691 return bot.expiry_time
1692 raise BotSessionClosedError("Bot not found to fetch expiry. {bot_name=} {bot_id=}")
1694 def get_metadata_for_leases(self, leases: Iterable[Lease]) -> list[tuple[str, bytes]]:
1695 """Return a list of Job metadata for a given list of leases.
1697 Args:
1698 leases (list): List of leases to get Job metadata for.
1700 Returns:
1701 List of tuples of the form
1702 ``('executeoperationmetadata-bin': serialized_metadata)``.
1704 """
1705 metadata = []
1706 with self._sql_ro.session() as session:
1707 for lease in leases:
1708 job = self._get_job(lease.id, session)
1709 if job is not None:
1710 job_metadata = ExecuteOperationMetadata(
1711 stage=job.stage, # type: ignore[arg-type]
1712 action_digest=string_to_digest(job.action_digest),
1713 stderr_stream_name=job.stderr_stream_write_name or "",
1714 stdout_stream_name=job.stdout_stream_write_name or "",
1715 partial_execution_metadata=self.get_execute_action_metadata(job),
1716 )
1717 metadata.append(("executeoperationmetadata-bin", job_metadata.SerializeToString()))
1719 return metadata
1721 def get_execute_action_metadata(self, job: JobEntry) -> ExecutedActionMetadata:
1722 worker_name = ""
1723 if job.leases:
1724 worker_name = job.leases[-1].worker_name or ""
1726 metadata = ExecutedActionMetadata(worker=worker_name)
1728 def assign_timestamp(field: Timestamp, timestamp: datetime | None) -> None:
1729 if timestamp is not None:
1730 field.FromDatetime(timestamp)
1732 assign_timestamp(metadata.queued_timestamp, job.queued_timestamp)
1733 assign_timestamp(metadata.worker_start_timestamp, job.worker_start_timestamp)
1734 assign_timestamp(metadata.worker_completed_timestamp, job.worker_completed_timestamp)
1735 assign_timestamp(metadata.input_fetch_start_timestamp, job.input_fetch_start_timestamp)
1736 assign_timestamp(metadata.input_fetch_completed_timestamp, job.input_fetch_completed_timestamp)
1737 assign_timestamp(metadata.output_upload_start_timestamp, job.output_upload_start_timestamp)
1738 assign_timestamp(metadata.output_upload_completed_timestamp, job.output_upload_completed_timestamp)
1739 assign_timestamp(metadata.execution_start_timestamp, job.execution_start_timestamp)
1740 assign_timestamp(metadata.execution_completed_timestamp, job.execution_completed_timestamp)
1742 return metadata
1744 def _fetch_execution_stats(
1745 self, auxiliary_metadata: RepeatedCompositeFieldContainer[ProtoAny]
1746 ) -> ExecutionStatistics | None:
1747 """Fetch ExecutionStatistics from Storage
1748 ProtoAny[Digest] -> ProtoAny[ExecutionStatistics]
1749 """
1750 for aux_metadata_any in auxiliary_metadata:
1751 # Get the wrapped digest
1752 if not aux_metadata_any.Is(Digest.DESCRIPTOR):
1753 continue
1754 aux_metadata_digest = Digest()
1755 try:
1756 aux_metadata_any.Unpack(aux_metadata_digest)
1757 # Get the blob from CAS
1758 execution_stats_any = self.storage.get_message(aux_metadata_digest, ProtoAny)
1759 # Get the wrapped ExecutionStatistics
1760 if execution_stats_any and execution_stats_any.Is(ExecutionStatistics.DESCRIPTOR):
1761 execution_stats = ExecutionStatistics()
1762 execution_stats_any.Unpack(execution_stats)
1763 return execution_stats
1764 except Exception as exc:
1765 LOGGER.exception(
1766 "Cannot fetch ExecutionStatistics from storage.",
1767 tags=dict(auxiliary_metadata=aux_metadata_digest),
1768 exc_info=exc,
1769 )
1770 return None
1771 return None
1773 def publish_execution_stats(
1774 self, job_name: str, execution_metadata: ExecutedActionMetadata, property_label: str = "unknown"
1775 ) -> None:
1776 with self._sql_ro.session(expire_on_commit=False) as session:
1777 self._publish_execution_stats(session, job_name, execution_metadata, property_label)
1779 def _publish_execution_stats(
1780 self, session: Session, job_name: str, execution_metadata: ExecutedActionMetadata, property_label: str
1781 ) -> None:
1782 """Publish resource usage of the job"""
1783 queued = execution_metadata.queued_timestamp
1784 worker_start = execution_metadata.worker_start_timestamp
1785 worker_completed = execution_metadata.worker_completed_timestamp
1786 fetch_start = execution_metadata.input_fetch_start_timestamp
1787 fetch_completed = execution_metadata.input_fetch_completed_timestamp
1788 execution_start = execution_metadata.execution_start_timestamp
1789 execution_completed = execution_metadata.execution_completed_timestamp
1790 upload_start = execution_metadata.output_upload_start_timestamp
1791 upload_completed = execution_metadata.output_upload_completed_timestamp
1793 self._publish_job_duration(queued, worker_completed, "Total", property_label)
1794 # The Queued time is missing here as it's posted as soon as worker has accepted the job.
1795 self._publish_job_duration(worker_start, worker_completed, "Worker", property_label)
1796 self._publish_job_duration(fetch_start, fetch_completed, "Fetch", property_label)
1797 self._publish_job_duration(execution_start, execution_completed, "Execution", property_label)
1798 self._publish_job_duration(upload_start, upload_completed, "Upload", property_label)
1800 if self.metering_client is None or len(execution_metadata.auxiliary_metadata) == 0:
1801 return
1803 execution_stats = self._fetch_execution_stats(execution_metadata.auxiliary_metadata)
1804 if execution_stats is None:
1805 return
1806 usage = Usage(
1807 computing=ComputingUsage(
1808 utime=execution_stats.command_rusage.utime.ToMilliseconds(),
1809 stime=execution_stats.command_rusage.stime.ToMilliseconds(),
1810 maxrss=execution_stats.command_rusage.maxrss,
1811 inblock=execution_stats.command_rusage.inblock,
1812 oublock=execution_stats.command_rusage.oublock,
1813 )
1814 )
1816 try:
1817 operations = (
1818 session.query(OperationEntry)
1819 .where(OperationEntry.job_name == job_name)
1820 .options(joinedload(OperationEntry.client_identity))
1821 .all()
1822 )
1823 for op in operations:
1824 if op.client_identity is None:
1825 continue
1826 client_id = Identity(
1827 instance=op.client_identity.instance,
1828 workflow=op.client_identity.workflow,
1829 actor=op.client_identity.actor,
1830 subject=op.client_identity.subject,
1831 )
1832 self.metering_client.put_usage(identity=client_id, operation_name=op.name, usage=usage)
1833 except Exception as exc:
1834 LOGGER.exception("Cannot publish resource usage.", tags=dict(job_name=job_name), exc_info=exc)
1836 def _update_action_retention(self, action: Action, action_digest: Digest, retention_hours: float | None) -> None:
1837 if not self.asset_client or not retention_hours:
1838 return
1839 uri = DIGEST_URI_TEMPLATE.format(digest_hash=action_digest.hash)
1840 qualifier = {"resource_type": PROTOBUF_MEDIA_TYPE}
1841 expire_at = datetime.now() + timedelta(hours=retention_hours)
1842 referenced_blobs = [action.command_digest]
1843 referenced_directories = [action.input_root_digest]
1845 try:
1846 self.asset_client.push_blob(
1847 uris=[uri],
1848 qualifiers=qualifier,
1849 blob_digest=action_digest,
1850 expire_at=expire_at,
1851 referenced_blobs=referenced_blobs,
1852 referenced_directories=referenced_directories,
1853 )
1854 LOGGER.debug(
1855 "Extended the retention of action.", tags=dict(digest=action_digest, retention_hours=retention_hours)
1856 )
1857 except Exception:
1858 LOGGER.exception("Failed to push action as an asset.", tags=dict(digest=action_digest))
1859 # Not a fatal path, don't reraise here
1861 def _update_action_result_retention(self, action_result: ActionResult, retention_hours: float | None) -> None:
1862 if not self.asset_client or not retention_hours:
1863 return
1864 digest = None
1865 try:
1866 # BuildGrid doesn't store action_result in CAS, but if we push it as an asset
1867 # we need it to be accessible
1868 digest = self.storage.put_message(action_result)
1870 uri = DIGEST_URI_TEMPLATE.format(digest_hash=digest.hash)
1871 qualifier = {"resource_type": PROTOBUF_MEDIA_TYPE}
1872 expire_at = datetime.now() + timedelta(hours=retention_hours)
1874 referenced_blobs: list[Digest] = []
1875 referenced_directories: list[Digest] = []
1877 for file in action_result.output_files:
1878 referenced_blobs.append(file.digest)
1879 for dir in action_result.output_directories:
1880 # Caveat: the underlying directories referenced by this `Tree` message are not referenced by this asset.
1881 # For clients who need to keep all referenced outputs,
1882 # consider setting `Action.output_directory_format` as `DIRECTORY_ONLY` or `TREE_AND_DIRECTORY`.
1883 if dir.tree_digest.ByteSize() != 0:
1884 referenced_blobs.append(dir.tree_digest)
1885 if dir.root_directory_digest.ByteSize() != 0:
1886 referenced_directories.append(dir.root_directory_digest)
1888 if action_result.stdout_digest.ByteSize() != 0:
1889 referenced_blobs.append(action_result.stdout_digest)
1890 if action_result.stderr_digest.ByteSize() != 0:
1891 referenced_blobs.append(action_result.stderr_digest)
1893 self.asset_client.push_blob(
1894 uris=[uri],
1895 qualifiers=qualifier,
1896 blob_digest=digest,
1897 expire_at=expire_at,
1898 referenced_blobs=referenced_blobs,
1899 referenced_directories=referenced_directories,
1900 )
1901 LOGGER.debug(
1902 "Extended the retention of action result.", tags=dict(digest=digest, retention_hours=retention_hours)
1903 )
1905 except Exception as e:
1906 LOGGER.exception("Failed to push action_result as an asset.", tags=dict(digest=digest), exc_info=e)
1907 # Not a fatal path, don't reraise here