Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/mem/impl.py: 79.09%
220 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import bisect
17import logging
18from multiprocessing import Queue
19from threading import Condition, Lock, Thread
20from typing import List, Tuple
21from datetime import datetime
22import time
24from buildgrid._protos.google.longrunning import operations_pb2
25from buildgrid._enums import LeaseState, MetricCategories, OperationStage
26from buildgrid._exceptions import InvalidArgumentError
27from buildgrid.utils import JobState, BrowserURL
28from buildgrid.settings import MAX_JOB_BLOCK_TIME
29from buildgrid.server.metrics_names import (
30 BOTS_ASSIGN_JOB_LEASES_TIME_METRIC_NAME,
31 DATA_STORE_CHECK_FOR_UPDATE_TIME_METRIC_NAME,
32 DATA_STORE_CREATE_JOB_TIME_METRIC_NAME,
33 DATA_STORE_CREATE_LEASE_TIME_METRIC_NAME,
34 DATA_STORE_CREATE_OPERATION_TIME_METRIC_NAME,
35 DATA_STORE_GET_JOB_BY_DIGEST_TIME_METRIC_NAME,
36 DATA_STORE_GET_JOB_BY_NAME_TIME_METRIC_NAME,
37 DATA_STORE_GET_JOB_BY_OPERATION_TIME_METRIC_NAME,
38 DATA_STORE_LIST_OPERATIONS_TIME_METRIC_NAME,
39 DATA_STORE_QUEUE_JOB_TIME_METRIC_NAME,
40 DATA_STORE_UPDATE_JOB_TIME_METRIC_NAME,
41 DATA_STORE_UPDATE_LEASE_TIME_METRIC_NAME,
42 DATA_STORE_UPDATE_OPERATION_TIME_METRIC_NAME
43)
44from buildgrid.server.metrics_utils import DurationMetric
45from buildgrid.server.operations.filtering import OperationFilter, DEFAULT_OPERATION_FILTERS
46from buildgrid.server.persistence.interface import DataStoreInterface
49class MemoryDataStore(DataStoreInterface):
51 def __init__(self, storage):
52 super().__init__(storage)
53 self.logger = logging.getLogger(__file__)
54 self.logger.info("Creating in-memory scheduler")
56 self.queue = []
57 self.queue_lock = Lock()
58 self.queue_condition = Condition(lock=self.queue_lock)
60 self.jobs_by_action = {}
61 self.jobs_by_operation = {}
62 self.jobs_by_name = {}
64 self.operations_by_stage = {}
65 self.leases_by_state = {}
66 self.is_instrumented = False
68 self.update_event_queue = Queue()
69 self.watcher = Thread(name="JobWatcher", target=self.wait_for_job_updates, daemon=True)
70 self.watcher_keep_running = True
71 self.watcher.start()
73 def __repr__(self):
74 return "In-memory data store interface"
76 def activate_monitoring(self):
77 if self.is_instrumented:
78 return
80 self.operations_by_stage = {
81 stage: set() for stage in OperationStage
82 }
83 self.leases_by_state = {
84 state: set() for state in LeaseState
85 }
86 self.is_instrumented = True
88 def deactivate_monitoring(self):
89 if not self.is_instrumented:
90 return
92 self.operations_by_stage = {}
93 self.leases_by_state = {}
94 self.is_instrumented = False
96 def _check_job_timeout(self, job_internal, *, max_execution_timeout=None):
97 """ Do a lazy check of maximum allowed job timeouts when clients try to retrieve
98 an existing job.
99 Cancel the job and related operations/leases, if we detect they have
100 exceeded timeouts on access.
102 Returns the `buildgrid.server.Job` object, possibly updated with `cancelled=True`.
103 """
104 if job_internal and max_execution_timeout and job_internal.worker_start_timestamp_as_datetime:
105 if job_internal.operation_stage == OperationStage.EXECUTING:
106 executing_duration = datetime.utcnow() - job_internal.worker_start_timestamp_as_datetime
107 if executing_duration.total_seconds() >= max_execution_timeout:
108 self.logger.warning(f"Job=[{job_internal}] has been executing for "
109 f"executing_duration=[{executing_duration}]. "
110 f"max_execution_timeout=[{max_execution_timeout}] "
111 "Cancelling.")
112 job_internal.cancel_all_operations(data_store=self)
113 self.logger.info(f"Job=[{job_internal}] has been cancelled.")
114 return job_internal
116 @DurationMetric(DATA_STORE_GET_JOB_BY_NAME_TIME_METRIC_NAME, instanced=True)
117 def get_job_by_name(self, name, *, max_execution_timeout=None):
118 job = self.jobs_by_name.get(name)
119 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout)
121 @DurationMetric(DATA_STORE_GET_JOB_BY_DIGEST_TIME_METRIC_NAME, instanced=True)
122 def get_job_by_action(self, action_digest, *, max_execution_timeout=None):
123 job = self.jobs_by_action.get(action_digest.hash)
124 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout)
126 @DurationMetric(DATA_STORE_GET_JOB_BY_OPERATION_TIME_METRIC_NAME, instanced=True)
127 def get_job_by_operation(self, operation_name, *, max_execution_timeout=None):
128 job = self.jobs_by_operation.get(operation_name)
129 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout)
131 def get_all_jobs(self):
132 return [job for job in self.jobs_by_name.values()
133 if job.operation_stage != OperationStage.COMPLETED]
135 def get_jobs_by_stage(self, operation_stage):
136 return [job for job in self.jobs_by_name.values()
137 if job.operation_stage == operation_stage]
139 def _get_job_count_by_stage(self):
140 results = []
141 for stage in OperationStage:
142 results.append((stage, len(self.get_jobs_by_stage(stage))))
143 return results
145 @DurationMetric(DATA_STORE_CREATE_JOB_TIME_METRIC_NAME, instanced=True)
146 def create_job(self, job):
147 self.jobs_by_action[job.action_digest.hash] = job
148 self.jobs_by_name[job.name] = job
149 if self._action_browser_url is not None:
150 job.set_action_url(BrowserURL(self._action_browser_url, self._instance_name))
152 @DurationMetric(DATA_STORE_QUEUE_JOB_TIME_METRIC_NAME, instanced=True)
153 def queue_job(self, job_name):
154 job = self.jobs_by_name[job_name]
155 with self.queue_condition:
156 if job.operation_stage != OperationStage.QUEUED:
157 bisect.insort(self.queue, job)
158 self.logger.info(f"Job queued: [{job.name}]")
159 # Wake all waiters as not all waiters may have the required capabilities
160 self.queue_condition.notify_all()
161 else:
162 self.logger.info(f"Job already queued: [{job.name}]")
163 self.queue.sort()
165 @DurationMetric(DATA_STORE_UPDATE_JOB_TIME_METRIC_NAME, instanced=True)
166 def update_job(self, job_name, changes, skip_notify=False):
167 # With this implementation, there's no need to actually make
168 # changes to the stored job, since its a reference to the
169 # in-memory job that caused this method to be called.
170 self.update_event_queue.put((job_name, changes, skip_notify))
172 def delete_job(self, job_name):
173 job = self.jobs_by_name[job_name]
175 del self.jobs_by_action[job.action_digest.hash]
176 del self.jobs_by_name[job.name]
178 self.logger.info(f"Job deleted: [{job.name}]")
180 if self.is_instrumented:
181 for stage in OperationStage:
182 self.operations_by_stage[stage].discard(job.name)
184 for state in LeaseState:
185 self.leases_by_state[state].discard(job.name)
187 def wait_for_job_updates(self):
188 self.logger.info("Starting job watcher thread")
189 while self.watcher_keep_running:
190 try:
191 job_name, changes, skip_notify = self.update_event_queue.get()
192 except EOFError:
193 continue
194 with DurationMetric(DATA_STORE_CHECK_FOR_UPDATE_TIME_METRIC_NAME):
195 with self.watched_jobs_lock:
196 if (all(field not in changes for field in ("cancelled", "stage")) or
197 job_name not in self.watched_jobs):
198 # If the stage or cancellation state haven't changed, we don't
199 # need to do anything with this event. Similarly, if we aren't
200 # watching this job, we can ignore the event.
201 continue
202 job = self.get_job_by_name(job_name)
203 spec = self.watched_jobs[job_name]
204 new_state = JobState(job)
205 if spec.last_state != new_state and not skip_notify:
206 spec.last_state = new_state
207 if not skip_notify:
208 spec.event.notify_change()
210 def store_response(self, job, commit_changes):
211 # The job is always in memory in this implementation, so there's
212 # no need to write anything to the CAS, since the job stays in
213 # memory as long as we need it
214 pass
216 def get_operations_by_stage(self, operation_stage):
217 return self.operations_by_stage.get(operation_stage, set())
219 def _get_operation_count_by_stage(self):
220 results = []
221 for stage in OperationStage:
222 results.append((stage, len(self.get_operations_by_stage(stage))))
223 return results
225 @DurationMetric(DATA_STORE_LIST_OPERATIONS_TIME_METRIC_NAME, instanced=True)
226 def list_operations(self,
227 operation_filters: List[OperationFilter]=None,
228 page_size: int=None,
229 page_token: str=None,
230 max_execution_timeout: int=None) -> Tuple[List[operations_pb2.Operation], str]:
232 if operation_filters and operation_filters != DEFAULT_OPERATION_FILTERS:
233 raise InvalidArgumentError("Filtering is not supported with the in-memory scheduler.")
235 if page_token:
236 raise InvalidArgumentError("page_token is not supported in the in-memory scheduler.")
238 # Run through all the jobs and see if any of are
239 # exceeding the execution timeout; mark those as cancelled
240 for job in self.jobs_by_name.values():
241 self._check_job_timeout(job, max_execution_timeout=max_execution_timeout)
243 # Return all operations
244 return [
245 operation for job in self.jobs_by_name.values() for operation in job.get_all_operations()
246 ], ""
248 @DurationMetric(DATA_STORE_CREATE_OPERATION_TIME_METRIC_NAME, instanced=True)
249 def create_operation(self, operation_name, job_name, request_metadata=None):
250 job = self.jobs_by_name[job_name]
251 self.jobs_by_operation[operation_name] = job
252 if self.is_instrumented:
253 self.operations_by_stage[job.operation_stage].add(job_name)
255 @DurationMetric(DATA_STORE_UPDATE_OPERATION_TIME_METRIC_NAME, instanced=True)
256 def update_operation(self, operation_name, changes):
257 if self.is_instrumented:
258 job = self.jobs_by_operation[operation_name]
259 self.operations_by_stage[job.operation_stage].add(job.name)
260 other_stages = [member for member in OperationStage if member != job.operation_stage]
261 for stage in other_stages:
262 self.operations_by_stage[stage].discard(job.name)
264 def delete_operation(self, operation_name):
265 del self.jobs_by_operation[operation_name]
267 def get_leases_by_state(self, lease_state):
268 return self.leases_by_state.get(lease_state, set())
270 def _get_lease_count_by_state(self):
271 results = []
272 for state in LeaseState:
273 results.append((state, len(self.get_leases_by_state(state))))
274 return results
276 @DurationMetric(DATA_STORE_CREATE_LEASE_TIME_METRIC_NAME, instanced=True)
277 def create_lease(self, lease):
278 if self.is_instrumented:
279 self.leases_by_state[LeaseState(lease.state)].add(lease.id)
281 @DurationMetric(DATA_STORE_UPDATE_LEASE_TIME_METRIC_NAME, instanced=True)
282 def update_lease(self, job_name, changes):
283 if self.is_instrumented:
284 job = self.jobs_by_name[job_name]
285 state = LeaseState(job.lease.state)
286 self.leases_by_state[state].add(job.lease.id)
287 other_states = [member for member in LeaseState if member != state]
288 for state in other_states:
289 self.leases_by_state[state].discard(job.lease.id)
291 def load_unfinished_jobs(self):
292 return []
294 def get_operation_request_metadata_by_name(self, operation_name):
295 return None
297 @DurationMetric(BOTS_ASSIGN_JOB_LEASES_TIME_METRIC_NAME, instanced=True)
298 def assign_lease_for_next_job(self, capabilities, callback, timeout=None):
299 """Return the highest priority job that can be run by a worker.
301 Iterate over the job queue and find the highest priority job which
302 the worker can run given the provided capabilities. Takes a
303 dictionary of worker capabilities to compare with job requirements.
305 :param capabilities: Dictionary of worker capabilities to compare
306 with job requirements when finding a job.
307 :type capabilities: dict
308 :param callback: Function to run on the next runnable job, should return
309 a list of leases.
310 :type callback: function
311 :param timeout: time to block waiting on job queue, caps if longer
312 than MAX_JOB_BLOCK_TIME.
313 :type timeout: int
314 :returns: A job
316 """
317 if not timeout and not self.queue:
318 return []
320 with self.queue_condition:
321 leases = self._assign_lease(capabilities, callback)
323 if timeout:
324 # Cap the timeout if it's larger than MAX_JOB_BLOCK_TIME
325 timeout = min(timeout, MAX_JOB_BLOCK_TIME)
326 deadline = time.time() + timeout
327 while not leases and time.time() < deadline:
328 ready = self.queue_condition.wait(timeout=deadline - time.time())
329 if not ready:
330 # If we ran out of time waiting for the condition variable,
331 # give up early.
332 break
333 leases = self._assign_lease(capabilities, callback, deadline=deadline)
335 return leases
337 def _assign_lease(self, worker_capabilities, callback, deadline=None):
338 for index, job in enumerate(self.queue):
339 if deadline is not None and time.time() >= deadline:
340 break
341 # Don't queue a cancelled job, it would be unable to get a lease anyway
342 if job.cancelled:
343 self.logger.debug(f"Dropping cancelled job: [{job.name}] from queue")
344 del self.queue[index]
345 continue
347 if self._worker_is_capable(worker_capabilities, job):
348 leases = callback(job)
349 if leases:
350 del self.queue[index]
351 return leases
352 return []
354 def _worker_is_capable(self, worker_capabilities, job):
355 """Returns whether the worker is suitable to run the job."""
356 # TODO: Replace this with the logic defined in the Platform msg. standard.
358 job_requirements = job.platform_requirements
359 # For now we'll only check OS and ISA properties.
361 if not job_requirements:
362 return True
364 for req, matches in job_requirements.items():
365 if not matches <= worker_capabilities.get(req, set()):
366 return False
367 return True
369 def get_metrics(self):
370 metrics = {}
371 metrics[MetricCategories.JOBS.value] = {
372 stage.value: count for stage, count in self._get_job_count_by_stage()
373 }
374 metrics[MetricCategories.LEASES.value] = {
375 state.value: count for state, count in self._get_lease_count_by_state()
376 }
378 return metrics