Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from abc import ABC, abstractmethod 

17import logging 

18from threading import Lock 

19from typing import Any, Callable, Dict, Generator, List, Mapping, Optional, Set, Tuple 

20 

21from grpc import RpcContext 

22 

23from buildgrid._enums import JobEventType, OperationStage 

24from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

25from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import Lease 

26from buildgrid._protos.google.longrunning.operations_pb2 import Operation 

27from buildgrid.server.job import Job 

28from buildgrid.utils import JobWatchSpec 

29 

30 

31Message = Tuple[Optional[Exception], str] 

32 

33 

34class DataStoreInterface(ABC): # pragma: no cover 

35 

36 """Abstract class defining an interface to a data store for the scheduler. 

37 

38 The ``DataStoreInterface`` defines the interface used by the scheduler to 

39 manage storage of its internal state. It also provides some of the 

40 infrastructure for streaming messages triggered by changes in job and 

41 operation state, which can be used (via the 

42 :class:`buildgrid.server.scheduler.Scheduler` itself) to stream progress 

43 updates back to clients. 

44 

45 Implementations of the interface are required to implement all the abstract 

46 methods in this class. However, there is no requirement about the internal 

47 details of those implementations; it may be beneficial for certain 

48 implementations to make some of these methods a noop for example. 

49 

50 Implementations must also implement some way to set the events that are 

51 used in ``stream_operations_updates``, which live in the ``watched_jobs`` 

52 dictionary. 

53 

54 """ 

55 

56 def __init__(self): 

57 self.logger = logging.getLogger(__file__) 

58 self.watched_jobs = {} 

59 self.watched_jobs_lock = Lock() 

60 

61 # Monitoring/metrics methods 

62 

63 @abstractmethod 

64 def activate_monitoring(self) -> None: 

65 """Enable the monitoring features of the data store.""" 

66 raise NotImplementedError() 

67 

68 @abstractmethod 

69 def deactivate_monitoring(self) -> None: 

70 """Disable the monitoring features of the data store. 

71 

72 This method also performs any necessary cleanup of stored metrics. 

73 

74 """ 

75 raise NotImplementedError() 

76 

77 @abstractmethod 

78 def get_metrics(self) -> Dict[str, Dict[int, int]]: 

79 """Return a dictionary of metrics for jobs, operations, and leases. 

80 

81 The returned dictionary is keyed by :class:`buildgrid._enums.MetricCategories` 

82 values, and the values are dictionaries of counts per operation stage 

83 (or lease state, in the case of leases). 

84 

85 """ 

86 raise NotImplementedError() 

87 

88 # Job API 

89 

90 @abstractmethod 

91 def create_job(self, job: Job) -> None: 

92 """Add a new job to the data store. 

93 

94 NOTE: This method just stores the job in the data store. In order to 

95 enqueue the job to make it available for scheduling execution, the 

96 ``queue_job`` method should also be called. 

97 

98 Args: 

99 job (buildgrid.server.job.Job): The job to be stored. 

100 

101 """ 

102 raise NotImplementedError() 

103 

104 @abstractmethod 

105 def queue_job(self, job_name: str) -> None: 

106 """Add an existing job to the queue of jobs waiting to be assigned. 

107 

108 This method adds a job with the given name to the queue of jobs. If 

109 the job is already in the queue, then this method ensures that the 

110 position of the job in the queue is correct. 

111 

112 """ 

113 raise NotImplementedError() 

114 

115 @abstractmethod 

116 def store_response(self, job: Job) -> None: 

117 """Store the job's ExecuteResponse in the data store. 

118 

119 This method stores the response message for the job in the data 

120 store, in order to allow it to be retrieved when getting jobs 

121 in the future. 

122 

123 This is separate from ``update_job`` as implementations will 

124 likely need to always have a special case for handling 

125 persistence of the response message. 

126 

127 Args: 

128 job (buildgrid.server.job.Job): The job to store the response 

129 message of. 

130 

131 """ 

132 raise NotImplementedError() 

133 

134 @abstractmethod 

135 def get_job_by_action(self, action_digest: Digest, *, 

136 max_execution_timeout: Optional[int]=None) -> Optional[Job]: 

137 """Return the job corresponding to an Action digest. 

138 

139 This method looks for a job object corresponding to the given 

140 Action digest in the data store. If a job is found it is returned, 

141 otherwise None is returned. 

142 

143 Args: 

144 action_digest (Digest): The digest of the Action to find the 

145 corresponding job for. 

146 max_execution_timeout (int, Optional): The max execution timeout. 

147 

148 Returns: 

149 buildgrid.server.job.Job or None: 

150 The job with the given Action digest, if it exists. 

151 Otherwise None. 

152 

153 """ 

154 raise NotImplementedError() 

155 

156 @abstractmethod 

157 def get_job_by_name(self, name: str, *, 

158 max_execution_timeout: Optional[int]=None) -> Optional[Job]: 

159 """Return the job with the given name. 

160 

161 This method looks for a job with the specified name in the data 

162 store. If there is a matching Job it is returned, otherwise this 

163 returns None. 

164 

165 Args: 

166 name (str): The name of the job to return. 

167 max_execution_timeout (int, Optional): The max execution timeout. 

168 

169 Returns: 

170 buildgrid.server.job.Job or None: 

171 The job with the given name, if it exists. Otherwise None. 

172 

173 """ 

174 raise NotImplementedError() 

175 

176 @abstractmethod 

177 def get_job_by_operation(self, operation: str, *, 

178 max_execution_timeout: Optional[int]=None) -> Optional[Job]: 

179 """Return the Job for a given Operation. 

180 

181 This method takes an Operation name, and returns the Job which 

182 corresponds to that Operation. If the Operation isn't found, 

183 or if the data store doesn't contain a corresponding job, this 

184 returns None. 

185 

186 Args: 

187 operation (str): Name of the Operation whose corresponding 

188 Job is to be returned. 

189 max_execution_timeout (int, Optional): The max execution timeout. 

190 

191 Returns: 

192 buildgrid.server.job.Job or None: 

193 The job related to the given operation, if it exists. 

194 Otherwise None. 

195 

196 """ 

197 raise NotImplementedError() 

198 

199 @abstractmethod 

200 def get_all_jobs(self) -> List[Job]: 

201 """Return a list of all jobs in the data store. 

202 

203 This method returns a list of all incomplete jobs in the data 

204 store. 

205 

206 Returns: 

207 list: List of all incomplete jobs in the data store. 

208 

209 """ 

210 raise NotImplementedError() 

211 

212 @abstractmethod 

213 def get_jobs_by_stage(self, operation_stage: OperationStage) -> List[Job]: 

214 """Return a list of jobs in the given stage. 

215 

216 This method returns a list of all jobs in a specific operation stage. 

217 

218 Args: 

219 operation_stage (OperationStage): The stage that the returned list 

220 of jobs should all be in. 

221 

222 Returns: 

223 list: List of all jobs in the specified operation stage. 

224 

225 """ 

226 raise NotImplementedError() 

227 

228 @abstractmethod 

229 def update_job(self, job_name: str, changes: Mapping[str, Any]) -> None: 

230 """Update a job in the data store. 

231 

232 This method takes a job name and a dictionary of changes to apply to 

233 the job in the data store, and updates the job with those changes. 

234 The dictionary should be keyed by the attribute names which need to 

235 be updated, with the values being the new values for the attributes. 

236 

237 Args: 

238 job_name (str): The name of the job that is being updated. 

239 changes: (dict): The dictionary of changes 

240 

241 """ 

242 raise NotImplementedError() 

243 

244 @abstractmethod 

245 def delete_job(self, job_name: str) -> None: 

246 """Delete a job from the data store. 

247 

248 This method removes a job from the data store. 

249 

250 Args: 

251 job_name (str): The name of the job to be removed. 

252 

253 """ 

254 raise NotImplementedError() 

255 

256 def watch_job(self, job: Job, operation_name: str, peer: str) -> None: 

257 """Start watching a job and operation for changes. 

258 

259 If the given job is already being watched, then this method finds (or adds) 

260 the operation in the job's entry in ``watched_jobs``, and adds the peer to 

261 the list of peers for that operation. 

262 

263 Otherwise, it creates a whole new entry in ``watched_jobs`` for the given 

264 job, operation, and peer. 

265 

266 This method runs in a thread spawned by gRPC handling a connected peer. 

267 

268 Args: 

269 job (buildgrid.server.job.Job): The job to watch. 

270 operation_name (str): The name of the specific operation to 

271 watch. 

272 peer (str): The peer that is requesting to watch the job. 

273 

274 """ 

275 with self.watched_jobs_lock: 

276 spec = self.watched_jobs.get(job.name) 

277 if spec is None: 

278 self.watched_jobs[job.name] = spec = JobWatchSpec(job) 

279 spec.add_peer(operation_name, peer) 

280 self.logger.debug( 

281 f"Registered peer [{peer}] to watch operation [{operation_name}] of job [{job.name}]") 

282 

283 def stream_operation_updates(self, operation_name: str, 

284 context: RpcContext) -> Generator[Message, None, None]: 

285 """Stream update messages for a given operation. 

286 

287 This is a generator which yields tuples of the form 

288 

289 .. code-block :: 

290 

291 (error, operation) 

292 

293 where ``error`` is None unless the job is cancelled, in which case 

294 ``error`` is a :class:`buildgrid._exceptions.CancelledError`. 

295 

296 This method runs in a thread spawned by gRPC handling a connected 

297 peer, and should spend most of its time blocked waiting on an event 

298 which is set by either the thread which watches the data store for 

299 job updates or the main thread handling the gRPC termination 

300 callback. 

301 

302 Iteration finishes either when the provided gRPC context becomes 

303 inactive, or when the job owning the operation being watched is 

304 deleted from the data store. 

305 

306 Args: 

307 operation_name (str): The name of the operation to stream 

308 updates for. 

309 context (grpc.ServicerContext): The RPC context for the peer 

310 that is requesting a stream of events. 

311 

312 """ 

313 # Send an initial update as soon as we start watching, to provide the 

314 # peer with the initial state of the operation. This is done outside 

315 # the loop to simplify the logic for handling events without sending 

316 # unnecessary messages to peers. 

317 job = self.get_job_by_operation(operation_name) 

318 if job is None: 

319 return 

320 message = job.get_operation_update(operation_name) 

321 yield message 

322 

323 self.logger.debug("Waiting for events") 

324 

325 # Wait for events whilst the context is active. Events are set by the 

326 # thread which is watching the state data store for job updates. 

327 with self.watched_jobs_lock: 

328 event = self.watched_jobs[job.name].event 

329 last_event = None 

330 while context.is_active(): 

331 last_event, event_type = event.wait(last_event) 

332 self.logger.debug( 

333 f"Received event #{last_event} for operation [{operation_name}] " 

334 f"with type [{event_type}].") 

335 # A `JobEventType.STOP` event means that a peer watching this job 

336 # has disconnected and its termination callback has executed on 

337 # the thread gRPC creates for callbacks. In this case we don't 

338 # want to send a message, so we use `continue` to evaluate whether 

339 # or not to continue iteration. 

340 if event_type == JobEventType.STOP: 

341 continue 

342 

343 job = self.get_job_by_operation(operation_name) 

344 if job is None: 

345 self.logger.debug( 

346 f"Job for operation [{operation_name}] has gone away, stopped streaming updates.") 

347 return 

348 message = job.get_operation_update(operation_name) 

349 yield message 

350 

351 self.logger.debug("Context became inactive, stopped streaming updates.") 

352 

353 def stop_watching_operation(self, job: Job, operation_name: str, peer: str) -> None: 

354 """Remove the given peer from the list of peers watching the given job. 

355 

356 If the given job is being watched, this method triggers a 

357 ``JobEventType.STOP`` for it to cause the waiting threads to check 

358 whether their context is still active. It then removes the given peer 

359 from the list of peers watching the given operation name. If this 

360 leaves no peers then the entire entry for the operation in the tracked 

361 job is removed. 

362 

363 If this process leaves the job with no operations being watched, the 

364 job itself is removed from the `watched_jobs` dictionary, and it will 

365 no longer be checked for updates. 

366 

367 This runs in the main thread as part of the RPC termination callback 

368 for ``Execute`` and ``WaitExecution`` requests. 

369 

370 Args: 

371 job (buildgrid.server.job.Job): The job to stop watching. 

372 operation_name (str): The name of the specific operation to 

373 stop watching. 

374 peer (str): The peer that is requesting to stop watching the job. 

375 

376 """ 

377 with self.watched_jobs_lock: 

378 spec = self.watched_jobs.get(job.name) 

379 if spec is None: 

380 self.logger.debug( 

381 f"Peer [{peer}] attempted to stop watching job [{job.name}] and operation " 

382 f"[{operation_name}], but no record of that job being watched was found.") 

383 return 

384 spec.event.notify_stop() 

385 spec.remove_peer(operation_name, peer) 

386 if not spec.peers: 

387 self.logger.debug( 

388 f"No peers remain watching job [{job.name}], removing it from the " 

389 "dictionary of jobs being watched.") 

390 self.watched_jobs.pop(job.name) 

391 

392 # Operation API 

393 

394 @abstractmethod 

395 def create_operation(self, operation: Operation, job_name: str) -> None: 

396 """Add a new operation to the data store. 

397 

398 Args: 

399 operation (Operation): The Operation protobuf object representing 

400 the operation to add to the data store. 

401 job_name (str): The name of the Job representing the execution of 

402 this operation. 

403 

404 """ 

405 raise NotImplementedError() 

406 

407 # NOTE: This method is badly named, the current implementations return a 

408 # set of *job* names rather than *operation* names. 

409 # TODO: Fix or remove this. 

410 @abstractmethod 

411 def get_operations_by_stage(self, operation_stage: OperationStage) -> Set[str]: 

412 """Return a set of Job names in a specific operation stage. 

413 

414 Find the operations in a given stage and return a set containing the 

415 names of the Jobs related to those operations. 

416 

417 Args: 

418 operation_stage (OperationStage): The stage that the operations 

419 should be in. 

420 

421 Returns: 

422 set: Set of all job names with operations in the specified state. 

423 

424 """ 

425 raise NotImplementedError() 

426 

427 @abstractmethod 

428 def get_all_operations(self) -> List[Operation]: 

429 """Return all incomplete operations. 

430 

431 Returns: 

432 list: List of all incomplete operations in the data store. 

433 

434 """ 

435 raise NotImplementedError() 

436 

437 @abstractmethod 

438 def update_operation(self, operation_name: str, changes: Mapping[str, Any]) -> None: 

439 """Update an operation in the data store. 

440 

441 This method takes an operation name and a dictionary of changes to 

442 apply to the operation in the data store, and updates the operation 

443 with those changes. The dictionary should be keyed by the attribute 

444 names which need to be updated, with the values being the new values 

445 for the attributes. 

446 

447 Args: 

448 operation_name (str): The name of the operation that is being updated. 

449 changes: (dict): The dictionary of changes to be applied. 

450 

451 """ 

452 raise NotImplementedError() 

453 

454 @abstractmethod 

455 def delete_operation(self, operation_name: str) -> None: 

456 """Delete a operation from the data store. 

457 

458 This method removes a operation from the data store. 

459 

460 Args: 

461 operation_name (str): The name of the operation to be removed. 

462 

463 """ 

464 raise NotImplementedError() 

465 

466 # Lease API 

467 

468 @abstractmethod 

469 def create_lease(self, lease: Lease) -> None: 

470 """Add a new lease to the data store. 

471 

472 Args: 

473 lease (Lease): The Lease protobuf object representing the lease 

474 to be added to the data store. 

475 

476 """ 

477 raise NotImplementedError() 

478 

479 @abstractmethod 

480 def get_leases_by_state(self, lease_state) -> Set[str]: 

481 """Return the set of IDs of leases in a given state. 

482 

483 Args: 

484 lease_state (LeaseState): The state that the leases should 

485 be in. 

486 

487 Returns: 

488 set: Set of strings containing IDs of leases in the given state. 

489 

490 """ 

491 raise NotImplementedError() 

492 

493 @abstractmethod 

494 def update_lease(self, job_name: str, changes: Mapping[str, Any]) -> None: 

495 """Update a lease in the data store. 

496 

497 This method takes a job name and a dictionary of changes to 

498 apply to the lease for that job in the data store, and updates the 

499 lease with those changes. The dictionary should be keyed by the 

500 attribute names which need to be updated, with the values being the 

501 new values for the attributes. 

502 

503 The job name is used as leases have no unique identifier; instead 

504 there should always be at most one active lease for the job. It is 

505 the responsibility of data store implementations to ensure this. 

506 

507 Args: 

508 job_name (str): The name of the job whose lease is being updated. 

509 changes: (dict): The dictionary of changes to be applied. 

510 

511 """ 

512 raise NotImplementedError() 

513 

514 # NOTE: We use a callback to create the leases in this method so that the 

515 # data store doesn't need to know anything about the bots, or other lease 

516 # metadata. 

517 @abstractmethod 

518 def assign_lease_for_next_job(self, capabilities: Mapping[str, Set[str]], 

519 callback: Callable[[Job], List[Lease]], 

520 timeout: Optional[int] = None) -> List[Lease]: 

521 """Return a list of leases for a worker to run. 

522 

523 NOTE: Currently the list only ever has one or zero leases. 

524 

525 Inspect the list of jobs in the data store and return a list containing 

526 the lease for the highest priority job whose requirements match the 

527 given worker capabilities. If there are no matching jobs, return the 

528 empty list. 

529 

530 The list containing the lease must be created by the callback passed to 

531 this method. The callback is passed the job selected as suitable for 

532 execution by the specific worker. 

533 

534 Setting a timeout will cause this method to block up to ``timeout`` 

535 seconds, returning the empty list if no matching jobs become available 

536 before the timeout is reached. 

537 

538 Args: 

539 capabilities (dict): Dictionary of worker capabilities to compare 

540 with job requirements when finding a job. 

541 callback (callable): Function to run on the next runnable job, 

542 should return a list of leases. 

543 timeout (int): time to wait for new jobs, caps if longer 

544 than MAX_JOB_BLOCK_TIME. 

545 

546 Returns: 

547 list: List of leases for the worker to run. 

548 """ 

549 raise NotImplementedError()