Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler.py: 83.96%
293 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17Scheduler
18=========
19Schedules jobs.
20"""
22from datetime import timedelta
23import logging
24from time import time
25from threading import Lock
26from typing import List, Tuple
28from buildgrid._protos.google.longrunning import operations_pb2
29from buildgrid._protos.google.rpc import code_pb2, status_pb2
30from buildgrid._enums import LeaseState, OperationStage
31from buildgrid._exceptions import NotFoundError
32from buildgrid.client.channel import setup_channel
33from buildgrid.client.logstream import logstream_client
34from buildgrid.server.job import Job
35from buildgrid.server.metrics_names import (
36 QUEUED_TIME_METRIC_NAME,
37 WORKER_HANDLING_TIME_METRIC_NAME,
38 INPUTS_FETCHING_TIME_METRIC_NAME,
39 OUTPUTS_UPLOADING_TIME_METRIC_NAME,
40 EXECUTION_TIME_METRIC_NAME,
41 TOTAL_HANDLING_TIME_METRIC_NAME,
42 SCHEDULER_CANCEL_OPERATION_TIME_METRIC_NAME,
43 SCHEDULER_QUEUE_ACTION_TIME_METRIC_NAME,
44 SCHEDULER_UPDATE_LEASE_TIME_METRIC_NAME
45)
46from buildgrid.utils import acquire_lock_or_timeout
47from buildgrid.server.metrics_utils import (
48 DurationMetric,
49 publish_timer_metric
50)
51from buildgrid.server.operations.filtering import OperationFilter
54class Scheduler:
56 MAX_N_TRIES = 5
57 RETRYABLE_STATUS_CODES = (code_pb2.INTERNAL, code_pb2.UNAVAILABLE)
59 def __init__(self, data_store, action_cache=None, action_browser_url=False,
60 monitor=False, max_execution_timeout=None, logstream_url=None,
61 logstream_credentials=None, logstream_instance_name=None):
62 self.__logger = logging.getLogger(__name__)
64 self._instance_name = None
65 self._max_execution_timeout = max_execution_timeout
67 self.__queue_time_average = None
68 self.__retries_count = 0
70 self._action_cache = action_cache
71 self._action_browser_url = action_browser_url
73 self._logstream_instance_name = logstream_instance_name
74 self._logstream_url = logstream_url
75 if logstream_credentials is None:
76 logstream_credentials = {}
77 self._logstream_credentials = logstream_credentials
78 self._logstream_channel = None
80 self.__operation_lock = Lock() # Lock protecting deletion, addition and updating of jobs
82 self.data_store = data_store
83 if self._action_browser_url:
84 self.data_store.set_action_browser_url(self._action_browser_url)
86 self._is_instrumented = False
87 if monitor:
88 self.activate_monitoring()
90 # --- Public API ---
92 @property
93 def instance_name(self):
94 return self._instance_name
96 def set_instance_name(self, instance_name):
97 if not self._instance_name:
98 self._instance_name = instance_name
99 self.data_store.set_instance_name(instance_name)
101 def setup_grpc(self):
102 self.data_store.setup_grpc()
104 if self._action_cache is not None:
105 self._action_cache.setup_grpc()
107 if self._logstream_channel is None and self._logstream_url is not None:
108 self._logstream_channel, _ = setup_channel(
109 self._logstream_url,
110 auth_token=None,
111 client_key=self._logstream_credentials.get("tls-client-key"),
112 client_cert=self._logstream_credentials.get("tls-client-cert"),
113 server_cert=self._logstream_credentials.get("tls-server-cert")
114 )
116 # --- Public API: REAPI ---
118 def register_job_peer(self, job_name, peer, request_metadata=None):
119 """Subscribes to the job's :class:`Operation` stage changes.
121 Args:
122 job_name (str): name of the job to subscribe to.
123 peer (str): a unique string identifying the client.
125 Returns:
126 str: The name of the subscribed :class:`Operation`.
128 Raises:
129 NotFoundError: If no job with `job_name` exists.
130 TimeoutError: If the operation lock cannot be acquired within a short period of time.
131 """
132 with acquire_lock_or_timeout(self.__operation_lock):
133 job = self.data_store.get_job_by_name(job_name, max_execution_timeout=self._max_execution_timeout)
135 if job is None:
136 raise NotFoundError(f"Job name does not exist: [{job_name}]")
138 operation_name = job.register_new_operation(
139 data_store=self.data_store, request_metadata=request_metadata)
141 self.data_store.watch_job(job, operation_name, peer)
143 return operation_name
145 def register_job_operation_peer(self, operation_name, peer):
146 """Subscribes to an existing the job's :class:`Operation` stage changes.
148 Args:
149 operation_name (str): name of the operation to subscribe to.
150 peer (str): a unique string identifying the client.
152 Returns:
153 str: The name of the subscribed :class:`Operation`.
155 Raises:
156 NotFoundError: If no operation with `operation_name` exists.
157 TimeoutError: If the operation lock cannot be acquired within a short period of time.
158 """
159 with acquire_lock_or_timeout(self.__operation_lock):
160 job = self.data_store.get_job_by_operation(operation_name,
161 max_execution_timeout=self._max_execution_timeout)
163 if job is None:
164 raise NotFoundError(f"Operation name does not exist: [{operation_name}]")
166 self.data_store.watch_job(job, operation_name, peer)
168 def stream_operation_updates(self, operation_name, context):
169 yield from self.data_store.stream_operation_updates(operation_name, context)
171 def unregister_job_operation_peer(self, operation_name, peer, discard_unwatched_jobs: bool=False):
172 """Unsubscribes to one of the job's :class:`Operation` stage change.
174 Args:
175 operation_name (str): name of the operation to unsubscribe from.
176 peer (str): a unique string identifying the client.
177 discard_unwatched_jobs (bool): don't remove operation when client rpc is terminated.
179 Raises:
180 NotFoundError: If no operation with `operation_name` exists.
181 TimeoutError: If the operation lock cannot be acquired within a short period of time.
182 """
183 with acquire_lock_or_timeout(self.__operation_lock):
184 job = self.data_store.get_job_by_operation(operation_name)
186 if job is None:
187 raise NotFoundError(f"Operation name does not exist: [{operation_name}]")
189 self.data_store.stop_watching_operation(job, operation_name, peer)
191 if not job.n_peers_for_operation(operation_name, self.data_store.watched_jobs.get(job.name)):
192 if discard_unwatched_jobs:
193 self.__logger.info(f"No peers watching the operation, removing: {operation_name}")
194 self.data_store.delete_operation(operation_name)
196 if not job.n_peers(self.data_store.watched_jobs.get(job.name)) and job.done and not job.lease:
197 self.data_store.delete_job(job.name)
199 @DurationMetric(SCHEDULER_QUEUE_ACTION_TIME_METRIC_NAME, instanced=True)
200 def queue_job_action(self, action, action_digest, platform_requirements=None,
201 priority=0, skip_cache_lookup=False):
202 """Inserts a newly created job into the execution queue.
204 Warning:
205 Priority is handle like a POSIX ``nice`` values: a higher value
206 means a low priority, 0 being default priority.
208 Args:
209 action (Action): the given action to queue for execution.
210 action_digest (Digest): the digest of the given action.
211 platform_requirements (dict(set)): platform attributes that a worker
212 must satisfy in order to be assigned the job. (Each key can
213 have multiple values.)
214 priority (int): the execution job's priority.
215 skip_cache_lookup (bool): whether or not to look for pre-computed
216 result for the given action.
218 Returns:
219 str: the newly created job's name.
220 """
221 if platform_requirements is None:
222 platform_requirements = {}
224 job = self.data_store.get_job_by_action(action_digest,
225 max_execution_timeout=self._max_execution_timeout)
227 if job is not None and not action.do_not_cache:
228 # If existing job has been cancelled or isn't
229 # cacheable, create a new one.
230 if not job.cancelled and not job.do_not_cache:
231 # Reschedule if priority is now greater:
232 if priority < job.priority:
233 job.set_priority(priority, data_store=self.data_store)
235 if job.operation_stage == OperationStage.QUEUED:
236 self.data_store.queue_job(job.name)
238 self.__logger.debug(
239 f"Job deduplicated for action [{action_digest.hash[:8]}]: [{job.name}] "
240 f"with new priority: [{priority}]")
241 else:
242 self.__logger.debug(
243 f"Job deduplicated for action [{action_digest.hash[:8]}]: [{job.name}]")
245 return job.name
247 job = Job(do_not_cache=action.do_not_cache,
248 action=action, action_digest=action_digest,
249 platform_requirements=platform_requirements,
250 priority=priority)
251 self.data_store.create_job(job)
253 self.__logger.debug(
254 f"Job created for action [{action_digest.hash[:8]}]: "
255 f"[{job.name} requiring: {job.platform_requirements}, priority: {priority}]")
257 operation_stage = None
259 if self._action_cache is not None and not skip_cache_lookup:
260 try:
261 action_result = self._action_cache.get_action_result(job.action_digest)
263 self.__logger.debug(
264 f"Job cache hit for action [{action_digest.hash[:8]}]: [{job.name}]")
266 operation_stage = OperationStage.COMPLETED
267 job.set_cached_result(action_result, self.data_store)
269 except NotFoundError:
270 operation_stage = OperationStage.QUEUED
271 self.data_store.queue_job(job.name)
272 except Exception:
273 self.__logger.exception("Checking ActionCache for action "
274 f"[{action_digest.hash}/{action_digest.size_bytes}] "
275 "failed.")
276 operation_stage = OperationStage.QUEUED
277 self.data_store.queue_job(job.name)
279 else:
280 operation_stage = OperationStage.QUEUED
281 self.data_store.queue_job(job.name)
283 self._update_job_operation_stage(job.name, operation_stage)
285 return job.name
287 def get_job_operation(self, operation_name):
288 """Retrieves a job's :class:`Operation` by name.
290 Args:
291 operation_name (str): name of the operation to query.
293 Raises:
294 NotFoundError: If no operation with `operation_name` exists.
295 """
296 job = self.data_store.get_job_by_operation(operation_name,
297 max_execution_timeout=self._max_execution_timeout)
299 if job is None:
300 raise NotFoundError(f"Operation name does not exist: [{operation_name}]")
302 return job.get_operation(operation_name)
304 @DurationMetric(SCHEDULER_CANCEL_OPERATION_TIME_METRIC_NAME, instanced=True)
305 def cancel_job_operation(self, operation_name):
306 """"Cancels a job's :class:`Operation` by name.
308 Args:
309 operation_name (str): name of the operation to cancel.
311 Raises:
312 NotFoundError: If no operation with `operation_name` exists.
313 """
314 job = self.data_store.get_job_by_operation(operation_name)
316 if job is None:
317 raise NotFoundError(f"Operation name does not exist: [{operation_name}]")
319 job.cancel_operation(operation_name, data_store=self.data_store)
321 def list_operations(self,
322 operation_filters: List[OperationFilter]=None,
323 page_size: int=None,
324 page_token: str=None) -> Tuple[List[operations_pb2.Operation], str]:
325 operations, next_token = self.data_store.list_operations(
326 operation_filters,
327 page_size,
328 page_token,
329 max_execution_timeout=self._max_execution_timeout)
330 return operations, next_token
332 # --- Public API: RWAPI ---
334 def request_job_leases(self, worker_capabilities, timeout=None, worker_name=None, bot_id=None):
335 """Generates a list of the highest priority leases to be run.
337 Args:
338 worker_capabilities (dict): a set of key-value pairs describing the
339 worker properties, configuration and state at the time of the
340 request.
341 timeout (int): time to block waiting on job queue, caps if longer
342 than MAX_JOB_BLOCK_TIME
343 worker_name (string): name of the worker requesting the leases.
344 """
345 def assign_lease(job):
346 self.__logger.info(f"Job scheduled to run: [{job.name}]")
348 lease = job.lease
350 if not lease:
351 # For now, one lease at a time:
352 lease = job.create_lease(worker_name, bot_id, data_store=self.data_store)
353 else:
354 # Update the job with the new worker name assigned to it
355 job.worker_name = worker_name
357 if lease:
358 job.mark_worker_started()
359 return [lease]
360 return []
362 leases = self.data_store.assign_lease_for_next_job(
363 worker_capabilities, assign_lease, timeout=timeout)
364 if leases:
365 # Update the leases outside of the callback to avoid nested data_store operations
366 for lease in leases:
367 self._create_log_stream(lease)
368 # The lease id and job names are the same, so use that as the job name
369 self._update_job_operation_stage(lease.id, OperationStage.EXECUTING)
370 return leases
372 @DurationMetric(SCHEDULER_UPDATE_LEASE_TIME_METRIC_NAME, instanced=True)
373 def update_job_lease_state(self, job_name, lease):
374 """Requests a state transition for a job's current :class:Lease.
376 Note:
377 This may trigger a job's :class:`Operation` stage transition.
379 Args:
380 job_name (str): name of the job to update lease state from.
381 lease (Lease): the lease holding the new state.
383 Raises:
384 NotFoundError: If no job with `job_name` exists.
385 """
386 job = self.data_store.get_job_by_name(job_name)
388 if job is None:
389 raise NotFoundError(f"Job name does not exist: [{job_name}]")
391 lease_state = LeaseState(lease.state)
393 operation_stage = None
394 if lease_state == LeaseState.PENDING:
395 job.update_lease_state(LeaseState.PENDING, data_store=self.data_store)
396 operation_stage = OperationStage.QUEUED
398 elif lease_state == LeaseState.ACTIVE:
399 job.update_lease_state(LeaseState.ACTIVE, data_store=self.data_store)
400 operation_stage = OperationStage.EXECUTING
402 elif lease_state == LeaseState.COMPLETED:
403 # Check the lease status to determine if the job should be retried
404 lease_status = lease.status.code
405 if lease_status in self.RETRYABLE_STATUS_CODES and job.n_tries < self.MAX_N_TRIES:
406 self.__logger.info(f"Job {job_name} completed with a non-OK retryable status code "
407 f"{lease_status}, retrying")
408 self.retry_job_lease(job_name)
409 else:
410 # Update lease state to COMPLETED and store result in CAS
411 # Also store mapping in ActionResult, if job is cacheable
412 job.update_lease_state(LeaseState.COMPLETED,
413 status=lease.status, result=lease.result,
414 action_cache=self._action_cache,
415 data_store=self.data_store,
416 skip_notify=True)
417 try:
418 self.delete_job_lease(job_name)
419 except NotFoundError:
420 # Job already deleted
421 pass
422 except TimeoutError:
423 self.__logger.warning(f"Could not delete job lease_id=[{lease.id}] due to timeout.",
424 exc_info=True)
426 operation_stage = OperationStage.COMPLETED
428 self._update_job_operation_stage(job_name, operation_stage)
430 def retry_job_lease(self, job_name):
431 """Re-queues a job on lease execution failure.
433 Note:
434 This may trigger a job's :class:`Operation` stage transition.
436 Args:
437 job_name (str): name of the job to retry the lease from.
439 Raises:
440 NotFoundError: If no job with `job_name` exists.
441 """
442 job = self.data_store.get_job_by_name(job_name)
444 if job is None:
445 raise NotFoundError(f"Job name does not exist: [{job_name}]")
447 updated_operation_stage = None
448 if job.n_tries >= self.MAX_N_TRIES:
449 updated_operation_stage = OperationStage.COMPLETED
450 status = status_pb2.Status(code=code_pb2.ABORTED,
451 message=f"Job was retried {job.n_tries} unsuccessfully. Aborting.")
452 job.update_lease_state(LeaseState.COMPLETED, status=status, data_store=self.data_store)
454 elif not job.cancelled:
455 if job.done:
456 self.__logger.info(f"Attempted to re-queue job name=[{job_name}] "
457 f"but it was already completed.")
458 return
460 updated_operation_stage = OperationStage.QUEUED
461 self.data_store.queue_job(job.name)
463 job.update_lease_state(LeaseState.PENDING, data_store=self.data_store)
465 if self._is_instrumented:
466 self.__retries_count += 1
468 if updated_operation_stage:
469 self._update_job_operation_stage(job_name, updated_operation_stage)
471 def get_job_lease(self, job_name):
472 """Returns the lease associated to job, if any have been emitted yet.
474 Args:
475 job_name (str): name of the job to query the lease from.
477 Raises:
478 NotFoundError: If no job with `job_name` exists.
479 """
480 job = self.data_store.get_job_by_name(job_name)
482 if job is None:
483 raise NotFoundError(f"Job name does not exist: [{job_name}]")
485 return job.lease
487 def delete_job_lease(self, job_name):
488 """Discards the lease associated with a job.
490 Args:
491 job_name (str): name of the job to delete the lease from.
493 Raises:
494 NotFoundError: If no job with `job_name` exists.
495 TimeoutError: If the operation lock cannot be acquired within a short period of time.
496 """
497 with acquire_lock_or_timeout(self.__operation_lock):
498 job = self.data_store.get_job_by_name(job_name)
500 if job is None:
501 raise NotFoundError(f"Job name does not exist: [{job_name}]")
503 job.delete_lease()
505 if not job.n_peers(self.data_store.watched_jobs.get(job.name)) and job.done:
506 self.data_store.delete_job(job.name)
508 def get_operation_request_metadata(self, operation_name):
509 return self.data_store.get_operation_request_metadata_by_name(operation_name)
511 def get_metadata_for_leases(self, leases, writeable_streams=False):
512 """Return a list of Job metadata for a given list of leases.
514 Args:
515 leases (list): List of leases to get Job metadata for.
517 Returns:
518 List of tuples of the form
519 ``('executeoperationmetadata-bin': serialized_metadata)``.
521 """
522 metadata = []
523 for lease in leases:
524 job = self.data_store.get_job_by_name(lease.id)
525 job_metadata = job.get_metadata(writeable_streams=True)
526 metadata.append(
527 ('executeoperationmetadata-bin', job_metadata.SerializeToString()))
529 return metadata
531 # --- Public API: Monitoring ---
533 @property
534 def is_instrumented(self):
535 return self._is_instrumented
537 def activate_monitoring(self):
538 """Activated jobs monitoring."""
539 if self._is_instrumented:
540 return
542 self.__queue_time_average = 0, timedelta()
543 self.__retries_count = 0
545 self._is_instrumented = True
547 self.data_store.activate_monitoring()
549 def deactivate_monitoring(self):
550 """Deactivated jobs monitoring."""
551 if not self._is_instrumented:
552 return
554 self._is_instrumented = False
556 self.__queue_time_average = None
557 self.__retries_count = 0
559 self.data_store.deactivate_monitoring()
561 def get_metrics(self):
562 return self.data_store.get_metrics()
564 def query_n_retries(self):
565 return self.__retries_count
567 def query_am_queue_time(self):
568 if self.__queue_time_average is not None:
569 return self.__queue_time_average[1]
570 return timedelta()
572 # --- Private API ---
574 def _create_log_stream(self, lease):
575 if not self._logstream_channel:
576 return
578 job = self.data_store.get_job_by_name(lease.id)
579 parent_base = f"{job.action_digest.hash}_{job.action_digest.size_bytes}_{time()}"
580 stdout_parent = f"{parent_base}_stdout"
581 stderr_parent = f"{parent_base}_stderr"
582 with logstream_client(self._logstream_channel,
583 self._logstream_instance_name) as ls_client:
584 stdout_stream = ls_client.create(stdout_parent)
585 stderr_stream = ls_client.create(stderr_parent)
586 job.set_stdout_stream(stdout_stream, data_store=self.data_store)
587 job.set_stderr_stream(stderr_stream, data_store=self.data_store)
589 def _update_job_operation_stage(self, job_name, operation_stage):
590 """Requests a stage transition for the job's :class:Operations.
592 Args:
593 job_name (str): name of the job to query.
594 operation_stage (OperationStage): the stage to transition to.
596 Raises:
597 TimeoutError: If the operation lock cannot be acquired within a short period of time.
598 """
599 with acquire_lock_or_timeout(self.__operation_lock):
600 job = self.data_store.get_job_by_name(job_name)
602 if operation_stage == OperationStage.CACHE_CHECK:
603 job.update_operation_stage(OperationStage.CACHE_CHECK,
604 data_store=self.data_store)
606 elif operation_stage == OperationStage.QUEUED:
607 job.update_operation_stage(OperationStage.QUEUED,
608 data_store=self.data_store)
610 elif operation_stage == OperationStage.EXECUTING:
611 job.update_operation_stage(OperationStage.EXECUTING,
612 data_store=self.data_store)
614 elif operation_stage == OperationStage.COMPLETED:
615 job.update_operation_stage(OperationStage.COMPLETED,
616 data_store=self.data_store)
618 if self._is_instrumented:
619 average_order, average_time = self.__queue_time_average
621 average_order += 1
622 if average_order <= 1:
623 average_time = job.query_queue_time()
624 else:
625 queue_time = job.query_queue_time()
626 average_time = average_time + ((queue_time - average_time) / average_order)
628 self.__queue_time_average = average_order, average_time
630 if not job.holds_cached_result:
631 execution_metadata = job.action_result.execution_metadata
632 context_metadata = {'instance-name': self.instance_name} if self.instance_name else None
634 queued = execution_metadata.queued_timestamp.ToDatetime()
635 worker_start = execution_metadata.worker_start_timestamp.ToDatetime()
636 worker_completed = execution_metadata.worker_completed_timestamp.ToDatetime()
637 fetch_start = execution_metadata.input_fetch_start_timestamp.ToDatetime()
638 fetch_completed = execution_metadata.input_fetch_completed_timestamp.ToDatetime()
639 execution_start = execution_metadata.execution_start_timestamp.ToDatetime()
640 execution_completed = execution_metadata.execution_completed_timestamp.ToDatetime()
641 upload_start = execution_metadata.output_upload_start_timestamp.ToDatetime()
642 upload_completed = execution_metadata.output_upload_completed_timestamp.ToDatetime()
644 # Emit build inputs fetching time record:
645 input_fetch_time = fetch_completed - fetch_start
646 publish_timer_metric(
647 INPUTS_FETCHING_TIME_METRIC_NAME,
648 input_fetch_time,
649 metadata=context_metadata)
651 # Emit build execution time record:
652 execution_time = execution_completed - execution_start
653 publish_timer_metric(
654 EXECUTION_TIME_METRIC_NAME, execution_time,
655 metadata=context_metadata)
657 # Emit build outputs uploading time record:
658 output_upload_time = upload_completed - upload_start
659 publish_timer_metric(
660 OUTPUTS_UPLOADING_TIME_METRIC_NAME, output_upload_time,
661 metadata=context_metadata)
663 # Emit total queued time record:
664 # This calculates the queue time based purely on
665 # values set in the ActionResult's ExecutedActionMetadata,
666 # which may be ever so slightly different than what
667 # the job object's queued_time is.
668 total_queued_time = worker_start - queued
669 publish_timer_metric(
670 QUEUED_TIME_METRIC_NAME, total_queued_time,
671 metadata=context_metadata)
673 # Emit total time spent in worker
674 total_worker_time = worker_completed - worker_start
675 publish_timer_metric(
676 WORKER_HANDLING_TIME_METRIC_NAME, total_worker_time,
677 metadata=context_metadata)
679 # Emit total build handling time record:
680 total_handling_time = worker_completed - queued
681 publish_timer_metric(
682 TOTAL_HANDLING_TIME_METRIC_NAME, total_handling_time,
683 metadata=context_metadata)