Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/interface.py: 100.00%
13 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from abc import ABC, abstractmethod
17import logging
18from threading import Lock
19from typing import Any, Callable, Dict, Generator, List, Mapping, Optional, Set, Tuple
21from grpc import RpcContext
23from buildgrid._enums import JobEventType, OperationStage
24from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
25from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import Lease
26from buildgrid._protos.google.longrunning.operations_pb2 import Operation
27from buildgrid.server.job import Job
28from buildgrid.server.operations.filtering import OperationFilter
29from buildgrid.utils import JobWatchSpec
32Message = Tuple[Optional[Exception], str]
35class DataStoreInterface(ABC): # pragma: no cover
37 """Abstract class defining an interface to a data store for the scheduler.
39 The ``DataStoreInterface`` defines the interface used by the scheduler to
40 manage storage of its internal state. It also provides some of the
41 infrastructure for streaming messages triggered by changes in job and
42 operation state, which can be used (via the
43 :class:`buildgrid.server.scheduler.Scheduler` itself) to stream progress
44 updates back to clients.
46 Implementations of the interface are required to implement all the abstract
47 methods in this class. However, there is no requirement about the internal
48 details of those implementations; it may be beneficial for certain
49 implementations to make some of these methods a noop for example.
51 Implementations must also implement some way to set the events that are
52 used in ``stream_operations_updates``, which live in the ``watched_jobs``
53 dictionary.
55 """
57 def __init__(self, storage):
58 self.logger = logging.getLogger(__file__)
59 self.storage = storage
60 self.watched_jobs = {}
61 self.watched_jobs_lock = Lock()
62 self._action_browser_url = None
63 self._instance_name = None
65 def setup_grpc(self):
66 self.storage.setup_grpc()
68 # Class Properties To Set
70 def set_instance_name(self, instance_name):
71 self._instance_name = instance_name
73 def set_action_browser_url(self, url):
74 self._action_browser_url = url
76 # Monitoring/metrics methods
78 @abstractmethod
79 def activate_monitoring(self) -> None:
80 """Enable the monitoring features of the data store."""
81 raise NotImplementedError()
83 @abstractmethod
84 def deactivate_monitoring(self) -> None:
85 """Disable the monitoring features of the data store.
87 This method also performs any necessary cleanup of stored metrics.
89 """
90 raise NotImplementedError()
92 @abstractmethod
93 def get_metrics(self) -> Dict[str, Dict[int, int]]:
94 """Return a dictionary of metrics for jobs, operations, and leases.
96 The returned dictionary is keyed by :class:`buildgrid._enums.MetricCategories`
97 values, and the values are dictionaries of counts per operation stage
98 (or lease state, in the case of leases).
100 """
101 raise NotImplementedError()
103 # Job API
105 @abstractmethod
106 def create_job(self, job: Job) -> None:
107 """Add a new job to the data store.
109 NOTE: This method just stores the job in the data store. In order to
110 enqueue the job to make it available for scheduling execution, the
111 ``queue_job`` method should also be called.
113 Args:
114 job (buildgrid.server.job.Job): The job to be stored.
116 """
117 raise NotImplementedError()
119 @abstractmethod
120 def queue_job(self, job_name: str) -> None:
121 """Add an existing job to the queue of jobs waiting to be assigned.
123 This method adds a job with the given name to the queue of jobs. If
124 the job is already in the queue, then this method ensures that the
125 position of the job in the queue is correct.
127 """
128 raise NotImplementedError()
130 @abstractmethod
131 def store_response(self, job: Job, commit_changes: bool) -> None:
132 """Store the job's ExecuteResponse in the data store.
134 This method stores the response message for the job in the data
135 store, in order to allow it to be retrieved when getting jobs
136 in the future.
138 This is separate from ``update_job`` as implementations will
139 likely need to always have a special case for handling
140 persistence of the response message.
142 Args:
143 job (buildgrid.server.job.Job): The job to store the response
144 message of.
146 """
147 raise NotImplementedError()
149 @abstractmethod
150 def get_job_by_action(self, action_digest: Digest, *,
151 max_execution_timeout: Optional[int]=None) -> Optional[Job]:
152 """Return the job corresponding to an Action digest.
154 This method looks for a job object corresponding to the given
155 Action digest in the data store. If a job is found it is returned,
156 otherwise None is returned.
158 Args:
159 action_digest (Digest): The digest of the Action to find the
160 corresponding job for.
161 max_execution_timeout (int, Optional): The max execution timeout.
163 Returns:
164 buildgrid.server.job.Job or None:
165 The job with the given Action digest, if it exists.
166 Otherwise None.
168 """
169 raise NotImplementedError()
171 @abstractmethod
172 def get_job_by_name(self, name: str, *,
173 max_execution_timeout: Optional[int]=None) -> Optional[Job]:
174 """Return the job with the given name.
176 This method looks for a job with the specified name in the data
177 store. If there is a matching Job it is returned, otherwise this
178 returns None.
180 Args:
181 name (str): The name of the job to return.
182 max_execution_timeout (int, Optional): The max execution timeout.
184 Returns:
185 buildgrid.server.job.Job or None:
186 The job with the given name, if it exists. Otherwise None.
188 """
189 raise NotImplementedError()
191 @abstractmethod
192 def get_job_by_operation(self, operation_name: str, *,
193 max_execution_timeout: Optional[int]=None) -> Optional[Job]:
194 """Return the Job for a given Operation.
196 This method takes an Operation name, and returns the Job which
197 corresponds to that Operation. If the Operation isn't found,
198 or if the data store doesn't contain a corresponding job, this
199 returns None.
201 Args:
202 operation (str): Name of the Operation whose corresponding
203 Job is to be returned.
204 max_execution_timeout (int, Optional): The max execution timeout.
206 Returns:
207 buildgrid.server.job.Job or None:
208 The job related to the given operation, if it exists.
209 Otherwise None.
211 """
212 raise NotImplementedError()
214 @abstractmethod
215 def get_all_jobs(self) -> List[Job]:
216 """Return a list of all jobs in the data store.
218 This method returns a list of all incomplete jobs in the data
219 store.
221 Returns:
222 list: List of all incomplete jobs in the data store.
224 """
225 raise NotImplementedError()
227 @abstractmethod
228 def get_jobs_by_stage(self, operation_stage: OperationStage) -> List[Job]:
229 """Return a list of jobs in the given stage.
231 This method returns a list of all jobs in a specific operation stage.
233 Args:
234 operation_stage (OperationStage): The stage that the returned list
235 of jobs should all be in.
237 Returns:
238 list: List of all jobs in the specified operation stage.
240 """
241 raise NotImplementedError()
243 @abstractmethod
244 def update_job(self, job_name: str, changes: Mapping[str, Any], skip_notify: bool) -> None:
245 """Update a job in the data store.
247 This method takes a job name and a dictionary of changes to apply to
248 the job in the data store, and updates the job with those changes.
249 The dictionary should be keyed by the attribute names which need to
250 be updated, with the values being the new values for the attributes.
252 Args:
253 job_name (str): The name of the job that is being updated.
254 changes: (dict): The dictionary of changes
255 skip_notify: (bool): Whether notifying about job changes should be skipped
257 """
258 raise NotImplementedError()
260 @abstractmethod
261 def delete_job(self, job_name: str) -> None:
262 """Delete a job from the data store.
264 This method removes a job from the data store.
266 Args:
267 job_name (str): The name of the job to be removed.
269 """
270 raise NotImplementedError()
272 def watch_job(self, job: Job, operation_name: str, peer: str) -> None:
273 """Start watching a job and operation for changes.
275 If the given job is already being watched, then this method finds (or adds)
276 the operation in the job's entry in ``watched_jobs``, and adds the peer to
277 the list of peers for that operation.
279 Otherwise, it creates a whole new entry in ``watched_jobs`` for the given
280 job, operation, and peer.
282 This method runs in a thread spawned by gRPC handling a connected peer.
284 Args:
285 job (buildgrid.server.job.Job): The job to watch.
286 operation_name (str): The name of the specific operation to
287 watch.
288 peer (str): The peer that is requesting to watch the job.
290 """
291 with self.watched_jobs_lock:
292 spec = self.watched_jobs.get(job.name)
293 if spec is None:
294 self.watched_jobs[job.name] = spec = JobWatchSpec(job)
295 spec.add_peer(operation_name, peer)
296 self.logger.debug(
297 f"Registered peer [{peer}] to watch operation [{operation_name}] of job [{job.name}]")
299 def stream_operation_updates(self, operation_name: str,
300 context: RpcContext) -> Generator[Message, None, None]:
301 """Stream update messages for a given operation.
303 This is a generator which yields tuples of the form
305 .. code-block ::
307 (error, operation)
309 where ``error`` is None unless the job is cancelled, in which case
310 ``error`` is a :class:`buildgrid._exceptions.CancelledError`.
312 This method runs in a thread spawned by gRPC handling a connected
313 peer, and should spend most of its time blocked waiting on an event
314 which is set by either the thread which watches the data store for
315 job updates or the main thread handling the gRPC termination
316 callback.
318 Iteration finishes either when the provided gRPC context becomes
319 inactive, or when the job owning the operation being watched is
320 deleted from the data store.
322 Args:
323 operation_name (str): The name of the operation to stream
324 updates for.
325 context (grpc.ServicerContext): The RPC context for the peer
326 that is requesting a stream of events.
328 """
329 # Send an initial update as soon as we start watching, to provide the
330 # peer with the initial state of the operation. This is done outside
331 # the loop to simplify the logic for handling events without sending
332 # unnecessary messages to peers.
333 job = self.get_job_by_operation(operation_name)
334 if job is None:
335 return
336 message = job.get_operation_update(operation_name)
337 yield message
339 self.logger.debug("Waiting for events")
341 # Wait for events whilst the context is active. Events are set by the
342 # thread which is watching the state data store for job updates.
343 with self.watched_jobs_lock:
344 watched_job = self.watched_jobs.get(job.name)
345 if watched_job is None:
346 self.logger.error(f"Unable to find job with name: [{job.name}] in watched_jobs dictionary.")
347 return
348 event = watched_job.event
349 last_event = None
350 while context.is_active():
351 last_event, event_type = event.wait(last_event)
352 self.logger.debug(
353 f"Received event #{last_event} for operation [{operation_name}] "
354 f"with type [{event_type}].")
355 # A `JobEventType.STOP` event means that a peer watching this job
356 # has disconnected and its termination callback has executed on
357 # the thread gRPC creates for callbacks. In this case we don't
358 # want to send a message, so we use `continue` to evaluate whether
359 # or not to continue iteration.
360 if event_type == JobEventType.STOP:
361 continue
363 job = self.get_job_by_operation(operation_name)
364 if job is None:
365 self.logger.debug(
366 f"Job for operation [{operation_name}] has gone away, stopped streaming updates.")
367 return
368 message = job.get_operation_update(operation_name)
369 yield message
371 self.logger.debug("Context became inactive, stopped streaming updates.")
373 def stop_watching_operation(self, job: Job, operation_name: str, peer: str) -> None:
374 """Remove the given peer from the list of peers watching the given job.
376 If the given job is being watched, this method triggers a
377 ``JobEventType.STOP`` for it to cause the waiting threads to check
378 whether their context is still active. It then removes the given peer
379 from the list of peers watching the given operation name. If this
380 leaves no peers then the entire entry for the operation in the tracked
381 job is removed.
383 If this process leaves the job with no operations being watched, the
384 job itself is removed from the `watched_jobs` dictionary, and it will
385 no longer be checked for updates.
387 This runs in the main thread as part of the RPC termination callback
388 for ``Execute`` and ``WaitExecution`` requests.
390 Args:
391 job (buildgrid.server.job.Job): The job to stop watching.
392 operation_name (str): The name of the specific operation to
393 stop watching.
394 peer (str): The peer that is requesting to stop watching the job.
396 """
397 with self.watched_jobs_lock:
398 spec = self.watched_jobs.get(job.name)
399 if spec is None:
400 self.logger.debug(
401 f"Peer [{peer}] attempted to stop watching job [{job.name}] and operation "
402 f"[{operation_name}], but no record of that job being watched was found.")
403 return
404 spec.event.notify_stop()
405 spec.remove_peer(operation_name, peer)
406 if not spec.peers:
407 self.logger.debug(
408 f"No peers remain watching job [{job.name}], removing it from the "
409 "dictionary of jobs being watched.")
410 self.watched_jobs.pop(job.name)
412 # Operation API
414 @abstractmethod
415 def create_operation(self, operation_name: str, job_name: str) -> None:
416 """Add a new operation to the data store.
418 Args:
419 operation_name (str): The name of the Operation to create in the
420 data store.
421 job_name (str): The name of the Job representing the execution of
422 this operation.
424 """
425 raise NotImplementedError()
427 # NOTE: This method is badly named, the current implementations return a
428 # set of *job* names rather than *operation* names.
429 # TODO: Fix or remove this.
430 @abstractmethod
431 def get_operations_by_stage(self, operation_stage: OperationStage) -> Set[str]:
432 """Return a set of Job names in a specific operation stage.
434 Find the operations in a given stage and return a set containing the
435 names of the Jobs related to those operations.
437 Args:
438 operation_stage (OperationStage): The stage that the operations
439 should be in.
441 Returns:
442 set: Set of all job names with operations in the specified state.
444 """
445 raise NotImplementedError()
447 @abstractmethod
448 def list_operations(self,
449 operation_filters: List[OperationFilter],
450 page_size: int=None,
451 page_token: str=None,
452 max_execution_timeout: int=None) -> Tuple[List[Operation], str]:
453 """Return all operations matching the filter.
455 Returns:
456 list: A page of matching operations in the data store.
457 str: If nonempty, a token to be submitted by the requester for the next page of results.
458 """
459 raise NotImplementedError()
461 @abstractmethod
462 def update_operation(self, operation_name: str, changes: Mapping[str, Any]) -> None:
463 """Update an operation in the data store.
465 This method takes an operation name and a dictionary of changes to
466 apply to the operation in the data store, and updates the operation
467 with those changes. The dictionary should be keyed by the attribute
468 names which need to be updated, with the values being the new values
469 for the attributes.
471 Args:
472 operation_name (str): The name of the operation that is being updated.
473 changes: (dict): The dictionary of changes to be applied.
475 """
476 raise NotImplementedError()
478 @abstractmethod
479 def delete_operation(self, operation_name: str) -> None:
480 """Delete a operation from the data store.
482 This method removes a operation from the data store.
484 Args:
485 operation_name (str): The name of the operation to be removed.
487 """
488 raise NotImplementedError()
490 # Lease API
492 @abstractmethod
493 def create_lease(self, lease: Lease) -> None:
494 """Add a new lease to the data store.
496 Args:
497 lease (Lease): The Lease protobuf object representing the lease
498 to be added to the data store.
500 """
501 raise NotImplementedError()
503 @abstractmethod
504 def get_leases_by_state(self, lease_state) -> Set[str]:
505 """Return the set of IDs of leases in a given state.
507 Args:
508 lease_state (LeaseState): The state that the leases should
509 be in.
511 Returns:
512 set: Set of strings containing IDs of leases in the given state.
514 """
515 raise NotImplementedError()
517 @abstractmethod
518 def update_lease(self, job_name: str, changes: Mapping[str, Any]) -> None:
519 """Update a lease in the data store.
521 This method takes a job name and a dictionary of changes to
522 apply to the lease for that job in the data store, and updates the
523 lease with those changes. The dictionary should be keyed by the
524 attribute names which need to be updated, with the values being the
525 new values for the attributes.
527 The job name is used as leases have no unique identifier; instead
528 there should always be at most one active lease for the job. It is
529 the responsibility of data store implementations to ensure this.
531 Args:
532 job_name (str): The name of the job whose lease is being updated.
533 changes: (dict): The dictionary of changes to be applied.
535 """
536 raise NotImplementedError()
538 # NOTE: We use a callback to create the leases in this method so that the
539 # data store doesn't need to know anything about the bots, or other lease
540 # metadata.
541 @abstractmethod
542 def assign_lease_for_next_job(self, capabilities: Mapping[str, Set[str]],
543 callback: Callable[[Job], List[Lease]],
544 timeout: Optional[int] = None) -> List[Lease]:
545 """Return a list of leases for a worker to run.
547 NOTE: Currently the list only ever has one or zero leases.
549 Inspect the list of jobs in the data store and return a list containing
550 the lease for the highest priority job whose requirements match the
551 given worker capabilities. If there are no matching jobs, return the
552 empty list.
554 The list containing the lease must be created by the callback passed to
555 this method. The callback is passed the job selected as suitable for
556 execution by the specific worker.
558 Setting a timeout will cause this method to block up to ``timeout``
559 seconds, returning the empty list if no matching jobs become available
560 before the timeout is reached.
562 Args:
563 capabilities (dict): Dictionary of worker capabilities to compare
564 with job requirements when finding a job.
565 callback (callable): Function to run on the next runnable job,
566 should return a list of leases.
567 timeout (int): time to wait for new jobs, caps if longer
568 than MAX_JOB_BLOCK_TIME.
570 Returns:
571 list: List of leases for the worker to run.
572 """
573 raise NotImplementedError()
575 @abstractmethod
576 def get_operation_request_metadata_by_name(self, operation_name):
577 """Return a dictionary containing metadata information that was
578 sent by a client as part of a ``remote_execution_pb2.RequestMetadata``
579 message.
581 It contains the following keys:
582 ``{'tool-name', 'tool-version', 'invocation-id', 'correlated-invocations-id'}``.
583 """
584 raise NotImplementedError()