Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Scheduler 

18========= 

19Schedules jobs. 

20""" 

21 

22from datetime import timedelta 

23import logging 

24from threading import Lock 

25from typing import List 

26 

27from buildgrid._protos.google.longrunning import operations_pb2 

28from buildgrid._enums import LeaseState, OperationStage 

29from buildgrid._exceptions import NotFoundError, UpdateNotAllowedError 

30from buildgrid.server.job import Job 

31from buildgrid.utils import BrowserURL 

32 

33 

34class Scheduler: 

35 

36 MAX_N_TRIES = 5 

37 

38 def __init__(self, data_store, action_cache=None, 

39 action_browser_url=False, monitor=False, max_execution_timeout=None): 

40 self.__logger = logging.getLogger(__name__) 

41 

42 self._instance_name = None 

43 self._max_execution_timeout = max_execution_timeout 

44 

45 self.__build_metadata_queues = None 

46 

47 self.__queue_time_average = None 

48 self.__retries_count = 0 

49 

50 self._action_cache = action_cache 

51 self._action_browser_url = action_browser_url 

52 

53 self.__operation_lock = Lock() # Lock protecting deletion, addition and updating of jobs 

54 

55 self.data_store = data_store 

56 

57 self._is_instrumented = False 

58 if monitor: 

59 self.activate_monitoring() 

60 

61 # --- Public API --- 

62 

63 @property 

64 def instance_name(self): 

65 return self._instance_name 

66 

67 def set_instance_name(self, instance_name): 

68 if not self._instance_name: 

69 self._instance_name = instance_name 

70 

71 def list_current_jobs(self): 

72 """Returns a list of the :class:`Job` names currently managed.""" 

73 jobs = self.data_store.get_all_jobs() 

74 return [job.name for job in jobs] 

75 

76 # --- Public API: REAPI --- 

77 

78 def register_job_peer(self, job_name, peer): 

79 """Subscribes to the job's :class:`Operation` stage changes. 

80 

81 Args: 

82 job_name (str): name of the job to subscribe to. 

83 peer (str): a unique string identifying the client. 

84 

85 Returns: 

86 str: The name of the subscribed :class:`Operation`. 

87 

88 Raises: 

89 NotFoundError: If no job with `job_name` exists. 

90 """ 

91 with self.__operation_lock: 

92 job = self.data_store.get_job_by_name(job_name, max_execution_timeout=self._max_execution_timeout) 

93 

94 if job is None: 

95 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

96 

97 operation_name = job.register_new_operation(data_store=self.data_store) 

98 

99 self.data_store.watch_job(job, operation_name, peer) 

100 

101 return operation_name 

102 

103 def register_job_operation_peer(self, operation_name, peer): 

104 """Subscribes to an existing the job's :class:`Operation` stage changes. 

105 

106 Args: 

107 operation_name (str): name of the operation to subscribe to. 

108 peer (str): a unique string identifying the client. 

109 

110 Returns: 

111 str: The name of the subscribed :class:`Operation`. 

112 

113 Raises: 

114 NotFoundError: If no operation with `operation_name` exists. 

115 """ 

116 with self.__operation_lock: 

117 job = self.data_store.get_job_by_operation(operation_name, 

118 max_execution_timeout=self._max_execution_timeout) 

119 

120 if job is None: 

121 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

122 

123 self.data_store.watch_job(job, operation_name, peer) 

124 

125 def stream_operation_updates(self, operation_name, context): 

126 yield from self.data_store.stream_operation_updates(operation_name, context) 

127 

128 def unregister_job_operation_peer(self, operation_name, peer, discard_unwatched_jobs: bool=False): 

129 """Unsubscribes to one of the job's :class:`Operation` stage change. 

130 

131 Args: 

132 operation_name (str): name of the operation to unsubscribe from. 

133 peer (str): a unique string identifying the client. 

134 discard_unwatched_jobs (bool): don't remove operation when client rpc is terminated. 

135 

136 Raises: 

137 NotFoundError: If no operation with `operation_name` exists. 

138 """ 

139 with self.__operation_lock: 

140 job = self.data_store.get_job_by_operation(operation_name) 

141 

142 if job is None: 

143 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

144 

145 self.data_store.stop_watching_operation(job, operation_name, peer) 

146 

147 if not job.n_peers_for_operation(operation_name, self.data_store.watched_jobs.get(job.name)): 

148 if discard_unwatched_jobs: 

149 self.__logger.info(f"No peers watching the operation, removing: {operation_name}") 

150 self.data_store.delete_operation(operation_name) 

151 

152 if not job.n_peers(self.data_store.watched_jobs.get(job.name)) and job.done and not job.lease: 

153 self.data_store.delete_job(job.name) 

154 

155 def queue_job_action(self, action, action_digest, platform_requirements={}, 

156 priority=0, skip_cache_lookup=False): 

157 """Inserts a newly created job into the execution queue. 

158 

159 Warning: 

160 Priority is handle like a POSIX ``nice`` values: a higher value 

161 means a low priority, 0 being default priority. 

162 

163 Args: 

164 action (Action): the given action to queue for execution. 

165 action_digest (Digest): the digest of the given action. 

166 platform_requirements (dict(set)): platform attributes that a worker 

167 must satisfy in order to be assigned the job. (Each key can 

168 have multiple values.) 

169 priority (int): the execution job's priority. 

170 skip_cache_lookup (bool): whether or not to look for pre-computed 

171 result for the given action. 

172 

173 Returns: 

174 str: the newly created job's name. 

175 """ 

176 job = self.data_store.get_job_by_action(action_digest, 

177 max_execution_timeout=self._max_execution_timeout) 

178 

179 if job is not None and not action.do_not_cache: 

180 # If existing job has been cancelled or isn't 

181 # cacheable, create a new one. 

182 if not job.cancelled and not job.do_not_cache: 

183 # Reschedule if priority is now greater: 

184 if priority < job.priority: 

185 job.set_priority(priority, data_store=self.data_store) 

186 

187 if job.operation_stage == OperationStage.QUEUED: 

188 self.data_store.queue_job(job.name) 

189 

190 self.__logger.debug( 

191 f"Job deduplicated for action [{action_digest.hash[:8]}]: [{job.name}]") 

192 

193 return job.name 

194 

195 job = Job(action.do_not_cache, action_digest, 

196 platform_requirements=platform_requirements, 

197 priority=priority) 

198 self.data_store.create_job(job) 

199 

200 if self._action_browser_url: 

201 job.set_action_url( 

202 BrowserURL(self._action_browser_url, self._instance_name)) 

203 

204 self.__logger.debug( 

205 f"Job created for action [{action_digest.hash[:8]}]: [{job.name} requiring: {job.platform_requirements}]") 

206 

207 operation_stage = None 

208 

209 if self._action_cache is not None and not skip_cache_lookup: 

210 try: 

211 action_result = self._action_cache.get_action_result(job.action_digest) 

212 

213 self.__logger.debug( 

214 f"Job cache hit for action [{action_digest.hash[:8]}]: [{job.name}]") 

215 

216 operation_stage = OperationStage.COMPLETED 

217 job.set_cached_result(action_result, self.data_store) 

218 

219 except NotFoundError: 

220 operation_stage = OperationStage.QUEUED 

221 self.data_store.queue_job(job.name) 

222 

223 else: 

224 operation_stage = OperationStage.QUEUED 

225 self.data_store.queue_job(job.name) 

226 

227 self._update_job_operation_stage(job.name, operation_stage) 

228 

229 return job.name 

230 

231 def get_job_operation(self, operation_name): 

232 """Retrieves a job's :class:`Operation` by name. 

233 

234 Args: 

235 operation_name (str): name of the operation to query. 

236 

237 Raises: 

238 NotFoundError: If no operation with `operation_name` exists. 

239 """ 

240 job = self.data_store.get_job_by_operation(operation_name, 

241 max_execution_timeout=self._max_execution_timeout) 

242 

243 if job is None: 

244 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

245 

246 return job.get_operation(operation_name) 

247 

248 def cancel_job_operation(self, operation_name): 

249 """"Cancels a job's :class:`Operation` by name. 

250 

251 Args: 

252 operation_name (str): name of the operation to cancel. 

253 

254 Raises: 

255 NotFoundError: If no operation with `operation_name` exists. 

256 """ 

257 job = self.data_store.get_job_by_operation(operation_name) 

258 

259 if job is None: 

260 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

261 

262 job.cancel_operation(operation_name, data_store=self.data_store) 

263 

264 def delete_job_operation(self, operation_name): 

265 """"Removes a job. 

266 

267 Args: 

268 operation_name (str): name of the operation to delete. 

269 

270 Raises: 

271 NotFoundError: If no operation with `operation_name` exists. 

272 """ 

273 with self.__operation_lock: 

274 job = self.data_store.get_job_by_operation(operation_name) 

275 

276 if job is None: 

277 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

278 if not job.n_peers(self.data_store.watched_jobs.get(job.name)) and job.done and not job.lease: 

279 self.data_store.delete_job(job.name) 

280 

281 def get_all_operations(self) -> List[operations_pb2.Operation]: 

282 return self.data_store.get_all_operations() 

283 

284 # --- Public API: RWAPI --- 

285 

286 def request_job_leases(self, worker_capabilities, timeout=None, worker_name=None, bot_id=None): 

287 """Generates a list of the highest priority leases to be run. 

288 

289 Args: 

290 worker_capabilities (dict): a set of key-value pairs describing the 

291 worker properties, configuration and state at the time of the 

292 request. 

293 timeout (int): time to block waiting on job queue, caps if longer 

294 than MAX_JOB_BLOCK_TIME 

295 worker_name (string): name of the worker requesting the leases. 

296 """ 

297 def assign_lease(job): 

298 self.__logger.info(f"Job scheduled to run: [{job.name}]") 

299 

300 lease = job.lease 

301 

302 if not lease: 

303 # For now, one lease at a time: 

304 lease = job.create_lease(worker_name, bot_id, data_store=self.data_store) 

305 

306 if lease: 

307 job.mark_worker_started() 

308 return [lease] 

309 return [] 

310 

311 leases = self.data_store.assign_lease_for_next_job( 

312 worker_capabilities, assign_lease, timeout=timeout) 

313 if leases: 

314 # Update the leases outside of the callback to avoid nested data_store operations 

315 for lease in leases: 

316 # The lease id and job names are the same, so use that as the job name 

317 self._update_job_operation_stage(lease.id, OperationStage.EXECUTING) 

318 return leases 

319 

320 def update_job_lease_state(self, job_name, lease): 

321 """Requests a state transition for a job's current :class:Lease. 

322 

323 Note: 

324 This may trigger a job's :class:`Operation` stage transition. 

325 

326 Args: 

327 job_name (str): name of the job to update lease state from. 

328 lease (Lease): the lease holding the new state. 

329 

330 Raises: 

331 NotFoundError: If no job with `job_name` exists. 

332 """ 

333 job = self.data_store.get_job_by_name(job_name) 

334 

335 if job is None: 

336 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

337 

338 lease_state = LeaseState(lease.state) 

339 

340 operation_stage = None 

341 if lease_state == LeaseState.PENDING: 

342 job.update_lease_state(LeaseState.PENDING, data_store=self.data_store) 

343 operation_stage = OperationStage.QUEUED 

344 

345 elif lease_state == LeaseState.ACTIVE: 

346 job.update_lease_state(LeaseState.ACTIVE, data_store=self.data_store) 

347 operation_stage = OperationStage.EXECUTING 

348 

349 elif lease_state == LeaseState.COMPLETED: 

350 job.update_lease_state(LeaseState.COMPLETED, 

351 status=lease.status, result=lease.result, 

352 data_store=self.data_store) 

353 

354 if (self._action_cache is not None and 

355 self._action_cache.allow_updates and not job.do_not_cache): 

356 try: 

357 self._action_cache.update_action_result(job.action_digest, job.action_result) 

358 except UpdateNotAllowedError: 

359 # The configuration doesn't allow updating the old result 

360 self.__logger.exception( 

361 "ActionResult for action_digest=" 

362 f"[{job.action_digest.hash}/{job.action_digest.size_bytes}] wasn't updated.", 

363 exc_info=True) 

364 pass 

365 self.data_store.store_response(job) 

366 

367 operation_stage = OperationStage.COMPLETED 

368 

369 self._update_job_operation_stage(job_name, operation_stage) 

370 

371 def retry_job_lease(self, job_name): 

372 """Re-queues a job on lease execution failure. 

373 

374 Note: 

375 This may trigger a job's :class:`Operation` stage transition. 

376 

377 Args: 

378 job_name (str): name of the job to retry the lease from. 

379 

380 Raises: 

381 NotFoundError: If no job with `job_name` exists. 

382 """ 

383 job = self.data_store.get_job_by_name(job_name) 

384 

385 if job is None: 

386 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

387 

388 updated_operation_stage = None 

389 if job.n_tries >= self.MAX_N_TRIES: 

390 # TODO: Decide what to do with these jobs 

391 updated_operation_stage = OperationStage.COMPLETED 

392 # TODO: Mark these jobs as done 

393 

394 elif not job.cancelled: 

395 updated_operation_stage = OperationStage.QUEUED 

396 self.data_store.queue_job(job.name) 

397 

398 job.update_lease_state(LeaseState.PENDING, data_store=self.data_store) 

399 

400 if self._is_instrumented: 

401 self.__retries_count += 1 

402 

403 if updated_operation_stage: 

404 self._update_job_operation_stage(job_name, updated_operation_stage) 

405 

406 def get_job_lease(self, job_name): 

407 """Returns the lease associated to job, if any have been emitted yet. 

408 

409 Args: 

410 job_name (str): name of the job to query the lease from. 

411 

412 Raises: 

413 NotFoundError: If no job with `job_name` exists. 

414 """ 

415 job = self.data_store.get_job_by_name(job_name) 

416 

417 if job is None: 

418 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

419 

420 return job.lease 

421 

422 def delete_job_lease(self, job_name): 

423 """Discards the lease associated with a job. 

424 

425 Args: 

426 job_name (str): name of the job to delete the lease from. 

427 

428 Raises: 

429 NotFoundError: If no job with `job_name` exists. 

430 """ 

431 with self.__operation_lock: 

432 job = self.data_store.get_job_by_name(job_name) 

433 

434 if job is None: 

435 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

436 

437 job.delete_lease() 

438 

439 if not job.n_peers(self.data_store.watched_jobs.get(job.name)) and job.done: 

440 self.data_store.delete_job(job.name) 

441 

442 def get_job_lease_cancelled(self, job_name): 

443 """Returns true if the lease is cancelled. 

444 

445 Args: 

446 job_name (str): name of the job to query the lease state from. 

447 

448 Raises: 

449 NotFoundError: If no job with `job_name` exists. 

450 """ 

451 job = self.data_store.get_job_by_name(job_name) 

452 

453 if job is None: 

454 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

455 

456 return job.lease_cancelled 

457 

458 # --- Public API: Monitoring --- 

459 

460 @property 

461 def is_instrumented(self): 

462 return self._is_instrumented 

463 

464 def activate_monitoring(self): 

465 """Activated jobs monitoring.""" 

466 if self._is_instrumented: 

467 return 

468 

469 self.__build_metadata_queues = [] 

470 

471 self.__queue_time_average = 0, timedelta() 

472 self.__retries_count = 0 

473 

474 self._is_instrumented = True 

475 

476 self.data_store.activate_monitoring() 

477 

478 def deactivate_monitoring(self): 

479 """Deactivated jobs monitoring.""" 

480 if not self._is_instrumented: 

481 return 

482 

483 self._is_instrumented = False 

484 

485 self.__build_metadata_queues = None 

486 

487 self.__queue_time_average = None 

488 self.__retries_count = 0 

489 

490 self.data_store.deactivate_monitoring() 

491 

492 def register_build_metadata_watcher(self, message_queue): 

493 if self.__build_metadata_queues is not None: 

494 self.__build_metadata_queues.append(message_queue) 

495 

496 def get_metrics(self): 

497 return self.data_store.get_metrics() 

498 

499 def query_n_retries(self): 

500 return self.__retries_count 

501 

502 def query_am_queue_time(self): 

503 if self.__queue_time_average is not None: 

504 return self.__queue_time_average[1] 

505 return timedelta() 

506 

507 # --- Private API --- 

508 

509 def _update_job_operation_stage(self, job_name, operation_stage): 

510 """Requests a stage transition for the job's :class:Operations. 

511 

512 Args: 

513 job_name (str): name of the job to query. 

514 operation_stage (OperationStage): the stage to transition to. 

515 """ 

516 with self.__operation_lock: 

517 job = self.data_store.get_job_by_name(job_name) 

518 

519 if operation_stage == OperationStage.CACHE_CHECK: 

520 job.update_operation_stage(OperationStage.CACHE_CHECK, 

521 data_store=self.data_store) 

522 

523 elif operation_stage == OperationStage.QUEUED: 

524 job.update_operation_stage(OperationStage.QUEUED, 

525 data_store=self.data_store) 

526 

527 elif operation_stage == OperationStage.EXECUTING: 

528 job.update_operation_stage(OperationStage.EXECUTING, 

529 data_store=self.data_store) 

530 

531 elif operation_stage == OperationStage.COMPLETED: 

532 job.update_operation_stage(OperationStage.COMPLETED, 

533 data_store=self.data_store) 

534 

535 if self._is_instrumented: 

536 average_order, average_time = self.__queue_time_average 

537 

538 average_order += 1 

539 if average_order <= 1: 

540 average_time = job.query_queue_time() 

541 else: 

542 queue_time = job.query_queue_time() 

543 average_time = average_time + ((queue_time - average_time) / average_order) 

544 

545 self.__queue_time_average = average_order, average_time 

546 

547 if not job.holds_cached_result: 

548 execution_metadata = job.action_result.execution_metadata 

549 context_metadata = {'job-is': job.name} 

550 

551 message = (execution_metadata, context_metadata,) 

552 

553 for message_queue in self.__build_metadata_queues: 

554 message_queue.put(message)