Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/interface.py: 100.00%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

13 statements  

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from abc import ABC, abstractmethod 

17import logging 

18from threading import Lock 

19from typing import Any, Callable, Dict, Generator, List, Mapping, Optional, Set, Tuple 

20 

21from grpc import RpcContext 

22 

23from buildgrid._enums import JobEventType, OperationStage 

24from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

25from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import Lease 

26from buildgrid._protos.google.longrunning.operations_pb2 import Operation 

27from buildgrid.server.job import Job 

28from buildgrid.server.operations.filtering import OperationFilter 

29from buildgrid.utils import JobWatchSpec 

30 

31 

32Message = Tuple[Optional[Exception], str] 

33 

34 

35class DataStoreInterface(ABC): # pragma: no cover 

36 

37 """Abstract class defining an interface to a data store for the scheduler. 

38 

39 The ``DataStoreInterface`` defines the interface used by the scheduler to 

40 manage storage of its internal state. It also provides some of the 

41 infrastructure for streaming messages triggered by changes in job and 

42 operation state, which can be used (via the 

43 :class:`buildgrid.server.scheduler.Scheduler` itself) to stream progress 

44 updates back to clients. 

45 

46 Implementations of the interface are required to implement all the abstract 

47 methods in this class. However, there is no requirement about the internal 

48 details of those implementations; it may be beneficial for certain 

49 implementations to make some of these methods a noop for example. 

50 

51 Implementations must also implement some way to set the events that are 

52 used in ``stream_operations_updates``, which live in the ``watched_jobs`` 

53 dictionary. 

54 

55 """ 

56 

57 def __init__(self, storage): 

58 self.logger = logging.getLogger(__file__) 

59 self.storage = storage 

60 self.watched_jobs = {} 

61 self.watched_jobs_lock = Lock() 

62 self._action_browser_url = None 

63 self._instance_name = None 

64 

65 def setup_grpc(self): 

66 self.storage.setup_grpc() 

67 

68 # Class Properties To Set 

69 

70 def set_instance_name(self, instance_name): 

71 self._instance_name = instance_name 

72 

73 def set_action_browser_url(self, url): 

74 self._action_browser_url = url 

75 

76 # Monitoring/metrics methods 

77 

78 @abstractmethod 

79 def activate_monitoring(self) -> None: 

80 """Enable the monitoring features of the data store.""" 

81 raise NotImplementedError() 

82 

83 @abstractmethod 

84 def deactivate_monitoring(self) -> None: 

85 """Disable the monitoring features of the data store. 

86 

87 This method also performs any necessary cleanup of stored metrics. 

88 

89 """ 

90 raise NotImplementedError() 

91 

92 @abstractmethod 

93 def get_metrics(self) -> Dict[str, Dict[int, int]]: 

94 """Return a dictionary of metrics for jobs, operations, and leases. 

95 

96 The returned dictionary is keyed by :class:`buildgrid._enums.MetricCategories` 

97 values, and the values are dictionaries of counts per operation stage 

98 (or lease state, in the case of leases). 

99 

100 """ 

101 raise NotImplementedError() 

102 

103 # Job API 

104 

105 @abstractmethod 

106 def create_job(self, job: Job) -> None: 

107 """Add a new job to the data store. 

108 

109 NOTE: This method just stores the job in the data store. In order to 

110 enqueue the job to make it available for scheduling execution, the 

111 ``queue_job`` method should also be called. 

112 

113 Args: 

114 job (buildgrid.server.job.Job): The job to be stored. 

115 

116 """ 

117 raise NotImplementedError() 

118 

119 @abstractmethod 

120 def queue_job(self, job_name: str) -> None: 

121 """Add an existing job to the queue of jobs waiting to be assigned. 

122 

123 This method adds a job with the given name to the queue of jobs. If 

124 the job is already in the queue, then this method ensures that the 

125 position of the job in the queue is correct. 

126 

127 """ 

128 raise NotImplementedError() 

129 

130 @abstractmethod 

131 def store_response(self, job: Job, commit_changes: bool) -> None: 

132 """Store the job's ExecuteResponse in the data store. 

133 

134 This method stores the response message for the job in the data 

135 store, in order to allow it to be retrieved when getting jobs 

136 in the future. 

137 

138 This is separate from ``update_job`` as implementations will 

139 likely need to always have a special case for handling 

140 persistence of the response message. 

141 

142 Args: 

143 job (buildgrid.server.job.Job): The job to store the response 

144 message of. 

145 

146 """ 

147 raise NotImplementedError() 

148 

149 @abstractmethod 

150 def get_job_by_action(self, action_digest: Digest, *, 

151 max_execution_timeout: Optional[int]=None) -> Optional[Job]: 

152 """Return the job corresponding to an Action digest. 

153 

154 This method looks for a job object corresponding to the given 

155 Action digest in the data store. If a job is found it is returned, 

156 otherwise None is returned. 

157 

158 Args: 

159 action_digest (Digest): The digest of the Action to find the 

160 corresponding job for. 

161 max_execution_timeout (int, Optional): The max execution timeout. 

162 

163 Returns: 

164 buildgrid.server.job.Job or None: 

165 The job with the given Action digest, if it exists. 

166 Otherwise None. 

167 

168 """ 

169 raise NotImplementedError() 

170 

171 @abstractmethod 

172 def get_job_by_name(self, name: str, *, 

173 max_execution_timeout: Optional[int]=None) -> Optional[Job]: 

174 """Return the job with the given name. 

175 

176 This method looks for a job with the specified name in the data 

177 store. If there is a matching Job it is returned, otherwise this 

178 returns None. 

179 

180 Args: 

181 name (str): The name of the job to return. 

182 max_execution_timeout (int, Optional): The max execution timeout. 

183 

184 Returns: 

185 buildgrid.server.job.Job or None: 

186 The job with the given name, if it exists. Otherwise None. 

187 

188 """ 

189 raise NotImplementedError() 

190 

191 @abstractmethod 

192 def get_job_by_operation(self, operation_name: str, *, 

193 max_execution_timeout: Optional[int]=None) -> Optional[Job]: 

194 """Return the Job for a given Operation. 

195 

196 This method takes an Operation name, and returns the Job which 

197 corresponds to that Operation. If the Operation isn't found, 

198 or if the data store doesn't contain a corresponding job, this 

199 returns None. 

200 

201 Args: 

202 operation (str): Name of the Operation whose corresponding 

203 Job is to be returned. 

204 max_execution_timeout (int, Optional): The max execution timeout. 

205 

206 Returns: 

207 buildgrid.server.job.Job or None: 

208 The job related to the given operation, if it exists. 

209 Otherwise None. 

210 

211 """ 

212 raise NotImplementedError() 

213 

214 @abstractmethod 

215 def get_all_jobs(self) -> List[Job]: 

216 """Return a list of all jobs in the data store. 

217 

218 This method returns a list of all incomplete jobs in the data 

219 store. 

220 

221 Returns: 

222 list: List of all incomplete jobs in the data store. 

223 

224 """ 

225 raise NotImplementedError() 

226 

227 @abstractmethod 

228 def get_jobs_by_stage(self, operation_stage: OperationStage) -> List[Job]: 

229 """Return a list of jobs in the given stage. 

230 

231 This method returns a list of all jobs in a specific operation stage. 

232 

233 Args: 

234 operation_stage (OperationStage): The stage that the returned list 

235 of jobs should all be in. 

236 

237 Returns: 

238 list: List of all jobs in the specified operation stage. 

239 

240 """ 

241 raise NotImplementedError() 

242 

243 @abstractmethod 

244 def update_job(self, job_name: str, changes: Mapping[str, Any], skip_notify: bool) -> None: 

245 """Update a job in the data store. 

246 

247 This method takes a job name and a dictionary of changes to apply to 

248 the job in the data store, and updates the job with those changes. 

249 The dictionary should be keyed by the attribute names which need to 

250 be updated, with the values being the new values for the attributes. 

251 

252 Args: 

253 job_name (str): The name of the job that is being updated. 

254 changes: (dict): The dictionary of changes 

255 skip_notify: (bool): Whether notifying about job changes should be skipped 

256 

257 """ 

258 raise NotImplementedError() 

259 

260 @abstractmethod 

261 def delete_job(self, job_name: str) -> None: 

262 """Delete a job from the data store. 

263 

264 This method removes a job from the data store. 

265 

266 Args: 

267 job_name (str): The name of the job to be removed. 

268 

269 """ 

270 raise NotImplementedError() 

271 

272 def watch_job(self, job: Job, operation_name: str, peer: str) -> None: 

273 """Start watching a job and operation for changes. 

274 

275 If the given job is already being watched, then this method finds (or adds) 

276 the operation in the job's entry in ``watched_jobs``, and adds the peer to 

277 the list of peers for that operation. 

278 

279 Otherwise, it creates a whole new entry in ``watched_jobs`` for the given 

280 job, operation, and peer. 

281 

282 This method runs in a thread spawned by gRPC handling a connected peer. 

283 

284 Args: 

285 job (buildgrid.server.job.Job): The job to watch. 

286 operation_name (str): The name of the specific operation to 

287 watch. 

288 peer (str): The peer that is requesting to watch the job. 

289 

290 """ 

291 with self.watched_jobs_lock: 

292 spec = self.watched_jobs.get(job.name) 

293 if spec is None: 

294 self.watched_jobs[job.name] = spec = JobWatchSpec(job) 

295 spec.add_peer(operation_name, peer) 

296 self.logger.debug( 

297 f"Registered peer [{peer}] to watch operation [{operation_name}] of job [{job.name}]") 

298 

299 def stream_operation_updates(self, operation_name: str, 

300 context: RpcContext) -> Generator[Message, None, None]: 

301 """Stream update messages for a given operation. 

302 

303 This is a generator which yields tuples of the form 

304 

305 .. code-block :: 

306 

307 (error, operation) 

308 

309 where ``error`` is None unless the job is cancelled, in which case 

310 ``error`` is a :class:`buildgrid._exceptions.CancelledError`. 

311 

312 This method runs in a thread spawned by gRPC handling a connected 

313 peer, and should spend most of its time blocked waiting on an event 

314 which is set by either the thread which watches the data store for 

315 job updates or the main thread handling the gRPC termination 

316 callback. 

317 

318 Iteration finishes either when the provided gRPC context becomes 

319 inactive, or when the job owning the operation being watched is 

320 deleted from the data store. 

321 

322 Args: 

323 operation_name (str): The name of the operation to stream 

324 updates for. 

325 context (grpc.ServicerContext): The RPC context for the peer 

326 that is requesting a stream of events. 

327 

328 """ 

329 # Send an initial update as soon as we start watching, to provide the 

330 # peer with the initial state of the operation. This is done outside 

331 # the loop to simplify the logic for handling events without sending 

332 # unnecessary messages to peers. 

333 job = self.get_job_by_operation(operation_name) 

334 if job is None: 

335 return 

336 message = job.get_operation_update(operation_name) 

337 yield message 

338 

339 self.logger.debug("Waiting for events") 

340 

341 # Wait for events whilst the context is active. Events are set by the 

342 # thread which is watching the state data store for job updates. 

343 with self.watched_jobs_lock: 

344 watched_job = self.watched_jobs.get(job.name) 

345 if watched_job is None: 

346 self.logger.error(f"Unable to find job with name: [{job.name}] in watched_jobs dictionary.") 

347 return 

348 event = watched_job.event 

349 last_event = None 

350 while context.is_active(): 

351 last_event, event_type = event.wait(last_event) 

352 self.logger.debug( 

353 f"Received event #{last_event} for operation [{operation_name}] " 

354 f"with type [{event_type}].") 

355 # A `JobEventType.STOP` event means that a peer watching this job 

356 # has disconnected and its termination callback has executed on 

357 # the thread gRPC creates for callbacks. In this case we don't 

358 # want to send a message, so we use `continue` to evaluate whether 

359 # or not to continue iteration. 

360 if event_type == JobEventType.STOP: 

361 continue 

362 

363 job = self.get_job_by_operation(operation_name) 

364 if job is None: 

365 self.logger.debug( 

366 f"Job for operation [{operation_name}] has gone away, stopped streaming updates.") 

367 return 

368 message = job.get_operation_update(operation_name) 

369 yield message 

370 

371 self.logger.debug("Context became inactive, stopped streaming updates.") 

372 

373 def stop_watching_operation(self, job: Job, operation_name: str, peer: str) -> None: 

374 """Remove the given peer from the list of peers watching the given job. 

375 

376 If the given job is being watched, this method triggers a 

377 ``JobEventType.STOP`` for it to cause the waiting threads to check 

378 whether their context is still active. It then removes the given peer 

379 from the list of peers watching the given operation name. If this 

380 leaves no peers then the entire entry for the operation in the tracked 

381 job is removed. 

382 

383 If this process leaves the job with no operations being watched, the 

384 job itself is removed from the `watched_jobs` dictionary, and it will 

385 no longer be checked for updates. 

386 

387 This runs in the main thread as part of the RPC termination callback 

388 for ``Execute`` and ``WaitExecution`` requests. 

389 

390 Args: 

391 job (buildgrid.server.job.Job): The job to stop watching. 

392 operation_name (str): The name of the specific operation to 

393 stop watching. 

394 peer (str): The peer that is requesting to stop watching the job. 

395 

396 """ 

397 with self.watched_jobs_lock: 

398 spec = self.watched_jobs.get(job.name) 

399 if spec is None: 

400 self.logger.debug( 

401 f"Peer [{peer}] attempted to stop watching job [{job.name}] and operation " 

402 f"[{operation_name}], but no record of that job being watched was found.") 

403 return 

404 spec.event.notify_stop() 

405 spec.remove_peer(operation_name, peer) 

406 if not spec.peers: 

407 self.logger.debug( 

408 f"No peers remain watching job [{job.name}], removing it from the " 

409 "dictionary of jobs being watched.") 

410 self.watched_jobs.pop(job.name) 

411 

412 # Operation API 

413 

414 @abstractmethod 

415 def create_operation(self, operation_name: str, job_name: str) -> None: 

416 """Add a new operation to the data store. 

417 

418 Args: 

419 operation_name (str): The name of the Operation to create in the 

420 data store. 

421 job_name (str): The name of the Job representing the execution of 

422 this operation. 

423 

424 """ 

425 raise NotImplementedError() 

426 

427 # NOTE: This method is badly named, the current implementations return a 

428 # set of *job* names rather than *operation* names. 

429 # TODO: Fix or remove this. 

430 @abstractmethod 

431 def get_operations_by_stage(self, operation_stage: OperationStage) -> Set[str]: 

432 """Return a set of Job names in a specific operation stage. 

433 

434 Find the operations in a given stage and return a set containing the 

435 names of the Jobs related to those operations. 

436 

437 Args: 

438 operation_stage (OperationStage): The stage that the operations 

439 should be in. 

440 

441 Returns: 

442 set: Set of all job names with operations in the specified state. 

443 

444 """ 

445 raise NotImplementedError() 

446 

447 @abstractmethod 

448 def list_operations(self, 

449 operation_filters: List[OperationFilter], 

450 page_size: int=None, 

451 page_token: str=None, 

452 max_execution_timeout: int=None) -> Tuple[List[Operation], str]: 

453 """Return all operations matching the filter. 

454 

455 Returns: 

456 list: A page of matching operations in the data store. 

457 str: If nonempty, a token to be submitted by the requester for the next page of results. 

458 """ 

459 raise NotImplementedError() 

460 

461 @abstractmethod 

462 def update_operation(self, operation_name: str, changes: Mapping[str, Any]) -> None: 

463 """Update an operation in the data store. 

464 

465 This method takes an operation name and a dictionary of changes to 

466 apply to the operation in the data store, and updates the operation 

467 with those changes. The dictionary should be keyed by the attribute 

468 names which need to be updated, with the values being the new values 

469 for the attributes. 

470 

471 Args: 

472 operation_name (str): The name of the operation that is being updated. 

473 changes: (dict): The dictionary of changes to be applied. 

474 

475 """ 

476 raise NotImplementedError() 

477 

478 @abstractmethod 

479 def delete_operation(self, operation_name: str) -> None: 

480 """Delete a operation from the data store. 

481 

482 This method removes a operation from the data store. 

483 

484 Args: 

485 operation_name (str): The name of the operation to be removed. 

486 

487 """ 

488 raise NotImplementedError() 

489 

490 # Lease API 

491 

492 @abstractmethod 

493 def create_lease(self, lease: Lease) -> None: 

494 """Add a new lease to the data store. 

495 

496 Args: 

497 lease (Lease): The Lease protobuf object representing the lease 

498 to be added to the data store. 

499 

500 """ 

501 raise NotImplementedError() 

502 

503 @abstractmethod 

504 def get_leases_by_state(self, lease_state) -> Set[str]: 

505 """Return the set of IDs of leases in a given state. 

506 

507 Args: 

508 lease_state (LeaseState): The state that the leases should 

509 be in. 

510 

511 Returns: 

512 set: Set of strings containing IDs of leases in the given state. 

513 

514 """ 

515 raise NotImplementedError() 

516 

517 @abstractmethod 

518 def update_lease(self, job_name: str, changes: Mapping[str, Any]) -> None: 

519 """Update a lease in the data store. 

520 

521 This method takes a job name and a dictionary of changes to 

522 apply to the lease for that job in the data store, and updates the 

523 lease with those changes. The dictionary should be keyed by the 

524 attribute names which need to be updated, with the values being the 

525 new values for the attributes. 

526 

527 The job name is used as leases have no unique identifier; instead 

528 there should always be at most one active lease for the job. It is 

529 the responsibility of data store implementations to ensure this. 

530 

531 Args: 

532 job_name (str): The name of the job whose lease is being updated. 

533 changes: (dict): The dictionary of changes to be applied. 

534 

535 """ 

536 raise NotImplementedError() 

537 

538 # NOTE: We use a callback to create the leases in this method so that the 

539 # data store doesn't need to know anything about the bots, or other lease 

540 # metadata. 

541 @abstractmethod 

542 def assign_lease_for_next_job(self, capabilities: Mapping[str, Set[str]], 

543 callback: Callable[[Job], List[Lease]], 

544 timeout: Optional[int] = None) -> List[Lease]: 

545 """Return a list of leases for a worker to run. 

546 

547 NOTE: Currently the list only ever has one or zero leases. 

548 

549 Inspect the list of jobs in the data store and return a list containing 

550 the lease for the highest priority job whose requirements match the 

551 given worker capabilities. If there are no matching jobs, return the 

552 empty list. 

553 

554 The list containing the lease must be created by the callback passed to 

555 this method. The callback is passed the job selected as suitable for 

556 execution by the specific worker. 

557 

558 Setting a timeout will cause this method to block up to ``timeout`` 

559 seconds, returning the empty list if no matching jobs become available 

560 before the timeout is reached. 

561 

562 Args: 

563 capabilities (dict): Dictionary of worker capabilities to compare 

564 with job requirements when finding a job. 

565 callback (callable): Function to run on the next runnable job, 

566 should return a list of leases. 

567 timeout (int): time to wait for new jobs, caps if longer 

568 than MAX_JOB_BLOCK_TIME. 

569 

570 Returns: 

571 list: List of leases for the worker to run. 

572 """ 

573 raise NotImplementedError() 

574 

575 @abstractmethod 

576 def get_operation_request_metadata_by_name(self, operation_name): 

577 """Return a dictionary containing metadata information that was 

578 sent by a client as part of a ``remote_execution_pb2.RequestMetadata`` 

579 message. 

580 

581 It contains the following keys: 

582 ``{'tool-name', 'tool-version', 'invocation-id', 'correlated-invocations-id'}``. 

583 """ 

584 raise NotImplementedError()