Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler.py: 90.71%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

280 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Scheduler 

18========= 

19Schedules jobs. 

20""" 

21 

22from datetime import timedelta 

23import logging 

24from time import time 

25from threading import Lock 

26from typing import List, Tuple 

27 

28from buildgrid._protos.google.longrunning import operations_pb2 

29from buildgrid._protos.google.rpc import code_pb2, status_pb2 

30from buildgrid._enums import LeaseState, OperationStage 

31from buildgrid._exceptions import NotFoundError 

32from buildgrid.client.channel import setup_channel 

33from buildgrid.client.logstream import logstream_client 

34from buildgrid.server.job import Job 

35from buildgrid.server.metrics_names import ( 

36 QUEUED_TIME_METRIC_NAME, 

37 WORKER_HANDLING_TIME_METRIC_NAME, 

38 INPUTS_FETCHING_TIME_METRIC_NAME, 

39 OUTPUTS_UPLOADING_TIME_METRIC_NAME, 

40 EXECUTION_TIME_METRIC_NAME, 

41 TOTAL_HANDLING_TIME_METRIC_NAME, 

42 SCHEDULER_CANCEL_OPERATION_TIME_METRIC_NAME, 

43 SCHEDULER_QUEUE_ACTION_TIME_METRIC_NAME, 

44 SCHEDULER_UPDATE_LEASE_TIME_METRIC_NAME 

45) 

46from buildgrid.utils import acquire_lock_or_timeout 

47from buildgrid.server.metrics_utils import ( 

48 DurationMetric, 

49 publish_timer_metric 

50) 

51from buildgrid.server.operations.filtering import OperationFilter 

52 

53 

54class Scheduler: 

55 

56 MAX_N_TRIES = 5 

57 

58 def __init__(self, data_store, action_cache=None, action_browser_url=False, 

59 monitor=False, max_execution_timeout=None, logstream_url=None, 

60 logstream_credentials=None, logstream_instance_name=None): 

61 self.__logger = logging.getLogger(__name__) 

62 

63 self._instance_name = None 

64 self._max_execution_timeout = max_execution_timeout 

65 

66 self.__queue_time_average = None 

67 self.__retries_count = 0 

68 

69 self._action_cache = action_cache 

70 self._action_browser_url = action_browser_url 

71 

72 self._logstream_instance_name = logstream_instance_name 

73 self._logstream_url = logstream_url 

74 if logstream_credentials is None: 

75 logstream_credentials = {} 

76 self._logstream_credentials = logstream_credentials 

77 self._logstream_channel = None 

78 

79 self.__operation_lock = Lock() # Lock protecting deletion, addition and updating of jobs 

80 

81 self.data_store = data_store 

82 if self._action_browser_url: 

83 self.data_store.set_action_browser_url(self._action_browser_url) 

84 

85 self._is_instrumented = False 

86 if monitor: 

87 self.activate_monitoring() 

88 

89 # --- Public API --- 

90 

91 @property 

92 def instance_name(self): 

93 return self._instance_name 

94 

95 def set_instance_name(self, instance_name): 

96 if not self._instance_name: 

97 self._instance_name = instance_name 

98 self.data_store.set_instance_name(instance_name) 

99 

100 def setup_grpc(self): 

101 self.data_store.setup_grpc() 

102 self._action_cache.setup_grpc() 

103 

104 if self._logstream_channel is None and self._logstream_url is not None: 

105 self._logstream_channel, _ = setup_channel( 

106 self._logstream_url, 

107 auth_token=None, 

108 client_key=self._logstream_credentials.get("tls-client-key"), 

109 client_cert=self._logstream_credentials.get("tls-client-cert"), 

110 server_cert=self._logstream_credentials.get("tls-server-cert") 

111 ) 

112 

113 # --- Public API: REAPI --- 

114 

115 def register_job_peer(self, job_name, peer, request_metadata=None): 

116 """Subscribes to the job's :class:`Operation` stage changes. 

117 

118 Args: 

119 job_name (str): name of the job to subscribe to. 

120 peer (str): a unique string identifying the client. 

121 

122 Returns: 

123 str: The name of the subscribed :class:`Operation`. 

124 

125 Raises: 

126 NotFoundError: If no job with `job_name` exists. 

127 TimeoutError: If the operation lock cannot be acquired within a short period of time. 

128 """ 

129 with acquire_lock_or_timeout(self.__operation_lock): 

130 job = self.data_store.get_job_by_name(job_name, max_execution_timeout=self._max_execution_timeout) 

131 

132 if job is None: 

133 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

134 

135 operation_name = job.register_new_operation( 

136 data_store=self.data_store, request_metadata=request_metadata) 

137 

138 self.data_store.watch_job(job, operation_name, peer) 

139 

140 return operation_name 

141 

142 def register_job_operation_peer(self, operation_name, peer): 

143 """Subscribes to an existing the job's :class:`Operation` stage changes. 

144 

145 Args: 

146 operation_name (str): name of the operation to subscribe to. 

147 peer (str): a unique string identifying the client. 

148 

149 Returns: 

150 str: The name of the subscribed :class:`Operation`. 

151 

152 Raises: 

153 NotFoundError: If no operation with `operation_name` exists. 

154 TimeoutError: If the operation lock cannot be acquired within a short period of time. 

155 """ 

156 with acquire_lock_or_timeout(self.__operation_lock): 

157 job = self.data_store.get_job_by_operation(operation_name, 

158 max_execution_timeout=self._max_execution_timeout) 

159 

160 if job is None: 

161 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

162 

163 self.data_store.watch_job(job, operation_name, peer) 

164 

165 def stream_operation_updates(self, operation_name, context): 

166 yield from self.data_store.stream_operation_updates(operation_name, context) 

167 

168 def unregister_job_operation_peer(self, operation_name, peer, discard_unwatched_jobs: bool=False): 

169 """Unsubscribes to one of the job's :class:`Operation` stage change. 

170 

171 Args: 

172 operation_name (str): name of the operation to unsubscribe from. 

173 peer (str): a unique string identifying the client. 

174 discard_unwatched_jobs (bool): don't remove operation when client rpc is terminated. 

175 

176 Raises: 

177 NotFoundError: If no operation with `operation_name` exists. 

178 TimeoutError: If the operation lock cannot be acquired within a short period of time. 

179 """ 

180 with acquire_lock_or_timeout(self.__operation_lock): 

181 job = self.data_store.get_job_by_operation(operation_name) 

182 

183 if job is None: 

184 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

185 

186 self.data_store.stop_watching_operation(job, operation_name, peer) 

187 

188 if not job.n_peers_for_operation(operation_name, self.data_store.watched_jobs.get(job.name)): 

189 if discard_unwatched_jobs: 

190 self.__logger.info(f"No peers watching the operation, removing: {operation_name}") 

191 self.data_store.delete_operation(operation_name) 

192 

193 if not job.n_peers(self.data_store.watched_jobs.get(job.name)) and job.done and not job.lease: 

194 self.data_store.delete_job(job.name) 

195 

196 @DurationMetric(SCHEDULER_QUEUE_ACTION_TIME_METRIC_NAME, instanced=True) 

197 def queue_job_action(self, action, action_digest, platform_requirements=None, 

198 priority=0, skip_cache_lookup=False): 

199 """Inserts a newly created job into the execution queue. 

200 

201 Warning: 

202 Priority is handle like a POSIX ``nice`` values: a higher value 

203 means a low priority, 0 being default priority. 

204 

205 Args: 

206 action (Action): the given action to queue for execution. 

207 action_digest (Digest): the digest of the given action. 

208 platform_requirements (dict(set)): platform attributes that a worker 

209 must satisfy in order to be assigned the job. (Each key can 

210 have multiple values.) 

211 priority (int): the execution job's priority. 

212 skip_cache_lookup (bool): whether or not to look for pre-computed 

213 result for the given action. 

214 

215 Returns: 

216 str: the newly created job's name. 

217 """ 

218 if platform_requirements is None: 

219 platform_requirements = {} 

220 

221 job = self.data_store.get_job_by_action(action_digest, 

222 max_execution_timeout=self._max_execution_timeout) 

223 

224 if job is not None and not action.do_not_cache: 

225 # If existing job has been cancelled or isn't 

226 # cacheable, create a new one. 

227 if not job.cancelled and not job.do_not_cache: 

228 # Reschedule if priority is now greater: 

229 if priority < job.priority: 

230 job.set_priority(priority, data_store=self.data_store) 

231 

232 if job.operation_stage == OperationStage.QUEUED: 

233 self.data_store.queue_job(job.name) 

234 

235 self.__logger.debug( 

236 f"Job deduplicated for action [{action_digest.hash[:8]}]: [{job.name}] " 

237 f"with new priority: [{priority}]") 

238 else: 

239 self.__logger.debug( 

240 f"Job deduplicated for action [{action_digest.hash[:8]}]: [{job.name}]") 

241 

242 return job.name 

243 

244 job = Job(do_not_cache=action.do_not_cache, 

245 action=action, action_digest=action_digest, 

246 platform_requirements=platform_requirements, 

247 priority=priority) 

248 self.data_store.create_job(job) 

249 

250 self.__logger.debug( 

251 f"Job created for action [{action_digest.hash[:8]}]: " 

252 f"[{job.name} requiring: {job.platform_requirements}, priority: {priority}]") 

253 

254 operation_stage = None 

255 

256 if self._action_cache is not None and not skip_cache_lookup: 

257 try: 

258 action_result = self._action_cache.get_action_result(job.action_digest) 

259 

260 self.__logger.debug( 

261 f"Job cache hit for action [{action_digest.hash[:8]}]: [{job.name}]") 

262 

263 operation_stage = OperationStage.COMPLETED 

264 job.set_cached_result(action_result, self.data_store) 

265 

266 except NotFoundError: 

267 operation_stage = OperationStage.QUEUED 

268 self.data_store.queue_job(job.name) 

269 except Exception: 

270 self.__logger.exception("Checking ActionCache for action " 

271 f"[{action_digest.hash}/{action_digest.size_bytes}] " 

272 "failed.") 

273 operation_stage = OperationStage.QUEUED 

274 self.data_store.queue_job(job.name) 

275 

276 else: 

277 operation_stage = OperationStage.QUEUED 

278 self.data_store.queue_job(job.name) 

279 

280 self._update_job_operation_stage(job.name, operation_stage) 

281 

282 return job.name 

283 

284 def get_job_operation(self, operation_name): 

285 """Retrieves a job's :class:`Operation` by name. 

286 

287 Args: 

288 operation_name (str): name of the operation to query. 

289 

290 Raises: 

291 NotFoundError: If no operation with `operation_name` exists. 

292 """ 

293 job = self.data_store.get_job_by_operation(operation_name, 

294 max_execution_timeout=self._max_execution_timeout) 

295 

296 if job is None: 

297 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

298 

299 return job.get_operation(operation_name) 

300 

301 @DurationMetric(SCHEDULER_CANCEL_OPERATION_TIME_METRIC_NAME, instanced=True) 

302 def cancel_job_operation(self, operation_name): 

303 """"Cancels a job's :class:`Operation` by name. 

304 

305 Args: 

306 operation_name (str): name of the operation to cancel. 

307 

308 Raises: 

309 NotFoundError: If no operation with `operation_name` exists. 

310 """ 

311 job = self.data_store.get_job_by_operation(operation_name) 

312 

313 if job is None: 

314 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

315 

316 job.cancel_operation(operation_name, data_store=self.data_store) 

317 

318 def list_operations(self, 

319 operation_filters: List[OperationFilter]=None, 

320 page_size: int=None, 

321 page_token: str=None) -> Tuple[List[operations_pb2.Operation], str]: 

322 operations, next_token = self.data_store.list_operations( 

323 operation_filters, 

324 page_size, 

325 page_token, 

326 max_execution_timeout=self._max_execution_timeout) 

327 return operations, next_token 

328 

329 # --- Public API: RWAPI --- 

330 

331 def request_job_leases(self, worker_capabilities, timeout=None, worker_name=None, bot_id=None): 

332 """Generates a list of the highest priority leases to be run. 

333 

334 Args: 

335 worker_capabilities (dict): a set of key-value pairs describing the 

336 worker properties, configuration and state at the time of the 

337 request. 

338 timeout (int): time to block waiting on job queue, caps if longer 

339 than MAX_JOB_BLOCK_TIME 

340 worker_name (string): name of the worker requesting the leases. 

341 """ 

342 def assign_lease(job): 

343 self.__logger.info(f"Job scheduled to run: [{job.name}]") 

344 

345 lease = job.lease 

346 

347 if not lease: 

348 # For now, one lease at a time: 

349 lease = job.create_lease(worker_name, bot_id, data_store=self.data_store) 

350 

351 if lease: 

352 job.mark_worker_started() 

353 return [lease] 

354 return [] 

355 

356 leases = self.data_store.assign_lease_for_next_job( 

357 worker_capabilities, assign_lease, timeout=timeout) 

358 if leases: 

359 # Update the leases outside of the callback to avoid nested data_store operations 

360 for lease in leases: 

361 self._create_log_stream(lease) 

362 # The lease id and job names are the same, so use that as the job name 

363 self._update_job_operation_stage(lease.id, OperationStage.EXECUTING) 

364 return leases 

365 

366 @DurationMetric(SCHEDULER_UPDATE_LEASE_TIME_METRIC_NAME, instanced=True) 

367 def update_job_lease_state(self, job_name, lease): 

368 """Requests a state transition for a job's current :class:Lease. 

369 

370 Note: 

371 This may trigger a job's :class:`Operation` stage transition. 

372 

373 Args: 

374 job_name (str): name of the job to update lease state from. 

375 lease (Lease): the lease holding the new state. 

376 

377 Raises: 

378 NotFoundError: If no job with `job_name` exists. 

379 """ 

380 job = self.data_store.get_job_by_name(job_name) 

381 

382 if job is None: 

383 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

384 

385 lease_state = LeaseState(lease.state) 

386 

387 operation_stage = None 

388 if lease_state == LeaseState.PENDING: 

389 job.update_lease_state(LeaseState.PENDING, data_store=self.data_store) 

390 operation_stage = OperationStage.QUEUED 

391 

392 elif lease_state == LeaseState.ACTIVE: 

393 job.update_lease_state(LeaseState.ACTIVE, data_store=self.data_store) 

394 operation_stage = OperationStage.EXECUTING 

395 

396 elif lease_state == LeaseState.COMPLETED: 

397 # Update lease state to COMPLETED and store result in CAS 

398 # Also store mapping in ActionResult, if job is cacheable 

399 job.update_lease_state(LeaseState.COMPLETED, 

400 status=lease.status, result=lease.result, 

401 action_cache=self._action_cache, 

402 data_store=self.data_store, 

403 skip_notify=True) 

404 operation_stage = OperationStage.COMPLETED 

405 

406 self._update_job_operation_stage(job_name, operation_stage) 

407 

408 def retry_job_lease(self, job_name): 

409 """Re-queues a job on lease execution failure. 

410 

411 Note: 

412 This may trigger a job's :class:`Operation` stage transition. 

413 

414 Args: 

415 job_name (str): name of the job to retry the lease from. 

416 

417 Raises: 

418 NotFoundError: If no job with `job_name` exists. 

419 """ 

420 job = self.data_store.get_job_by_name(job_name) 

421 

422 if job is None: 

423 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

424 

425 updated_operation_stage = None 

426 if job.n_tries >= self.MAX_N_TRIES: 

427 updated_operation_stage = OperationStage.COMPLETED 

428 status = status_pb2.Status(code=code_pb2.ABORTED, 

429 message=f"Job was retried {job.n_tries} unsuccessfully. Aborting.") 

430 job.update_lease_state(LeaseState.COMPLETED, status=status, data_store=self.data_store) 

431 

432 elif not job.cancelled: 

433 if job.done: 

434 self.__logger.info(f"Attempted to re-queue job name=[{job_name}] " 

435 f"but it was already completed.") 

436 return 

437 

438 updated_operation_stage = OperationStage.QUEUED 

439 self.data_store.queue_job(job.name) 

440 

441 job.update_lease_state(LeaseState.PENDING, data_store=self.data_store) 

442 

443 if self._is_instrumented: 

444 self.__retries_count += 1 

445 

446 if updated_operation_stage: 

447 self._update_job_operation_stage(job_name, updated_operation_stage) 

448 

449 def get_job_lease(self, job_name): 

450 """Returns the lease associated to job, if any have been emitted yet. 

451 

452 Args: 

453 job_name (str): name of the job to query the lease from. 

454 

455 Raises: 

456 NotFoundError: If no job with `job_name` exists. 

457 """ 

458 job = self.data_store.get_job_by_name(job_name) 

459 

460 if job is None: 

461 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

462 

463 return job.lease 

464 

465 def delete_job_lease(self, job_name): 

466 """Discards the lease associated with a job. 

467 

468 Args: 

469 job_name (str): name of the job to delete the lease from. 

470 

471 Raises: 

472 NotFoundError: If no job with `job_name` exists. 

473 TimeoutError: If the operation lock cannot be acquired within a short period of time. 

474 """ 

475 with acquire_lock_or_timeout(self.__operation_lock): 

476 job = self.data_store.get_job_by_name(job_name) 

477 

478 if job is None: 

479 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

480 

481 job.delete_lease() 

482 

483 if not job.n_peers(self.data_store.watched_jobs.get(job.name)) and job.done: 

484 self.data_store.delete_job(job.name) 

485 

486 def get_operation_request_metadata(self, operation_name): 

487 return self.data_store.get_operation_request_metadata_by_name(operation_name) 

488 

489 def get_metadata_for_leases(self, leases, writeable_streams=False): 

490 """Return a list of Job metadata for a given list of leases. 

491 

492 Args: 

493 leases (list): List of leases to get Job metadata for. 

494 

495 Returns: 

496 List of tuples of the form 

497 ``('executeoperationmetadata-bin': serialized_metadata)``. 

498 

499 """ 

500 metadata = [] 

501 for lease in leases: 

502 job = self.data_store.get_job_by_name(lease.id) 

503 job_metadata = job.get_metadata(writeable_streams=True) 

504 metadata.append( 

505 ('executeoperationmetadata-bin', job_metadata.SerializeToString())) 

506 

507 return metadata 

508 

509 # --- Public API: Monitoring --- 

510 

511 @property 

512 def is_instrumented(self): 

513 return self._is_instrumented 

514 

515 def activate_monitoring(self): 

516 """Activated jobs monitoring.""" 

517 if self._is_instrumented: 

518 return 

519 

520 self.__queue_time_average = 0, timedelta() 

521 self.__retries_count = 0 

522 

523 self._is_instrumented = True 

524 

525 self.data_store.activate_monitoring() 

526 

527 def deactivate_monitoring(self): 

528 """Deactivated jobs monitoring.""" 

529 if not self._is_instrumented: 

530 return 

531 

532 self._is_instrumented = False 

533 

534 self.__queue_time_average = None 

535 self.__retries_count = 0 

536 

537 self.data_store.deactivate_monitoring() 

538 

539 def get_metrics(self): 

540 return self.data_store.get_metrics() 

541 

542 def query_n_retries(self): 

543 return self.__retries_count 

544 

545 def query_am_queue_time(self): 

546 if self.__queue_time_average is not None: 

547 return self.__queue_time_average[1] 

548 return timedelta() 

549 

550 # --- Private API --- 

551 

552 def _create_log_stream(self, lease): 

553 if not self._logstream_channel: 

554 return 

555 

556 job = self.data_store.get_job_by_name(lease.id) 

557 parent_base = f"{job.action_digest.hash}_{job.action_digest.size_bytes}_{time()}" 

558 stdout_parent = f"{parent_base}_stdout" 

559 stderr_parent = f"{parent_base}_stderr" 

560 with logstream_client(self._logstream_channel, 

561 self._logstream_instance_name) as ls_client: 

562 stdout_stream = ls_client.create(stdout_parent) 

563 stderr_stream = ls_client.create(stderr_parent) 

564 job.set_stdout_stream(stdout_stream, data_store=self.data_store) 

565 job.set_stderr_stream(stderr_stream, data_store=self.data_store) 

566 

567 def _update_job_operation_stage(self, job_name, operation_stage): 

568 """Requests a stage transition for the job's :class:Operations. 

569 

570 Args: 

571 job_name (str): name of the job to query. 

572 operation_stage (OperationStage): the stage to transition to. 

573 

574 Raises: 

575 TimeoutError: If the operation lock cannot be acquired within a short period of time. 

576 """ 

577 with acquire_lock_or_timeout(self.__operation_lock): 

578 job = self.data_store.get_job_by_name(job_name) 

579 

580 if operation_stage == OperationStage.CACHE_CHECK: 

581 job.update_operation_stage(OperationStage.CACHE_CHECK, 

582 data_store=self.data_store) 

583 

584 elif operation_stage == OperationStage.QUEUED: 

585 job.update_operation_stage(OperationStage.QUEUED, 

586 data_store=self.data_store) 

587 

588 elif operation_stage == OperationStage.EXECUTING: 

589 job.update_operation_stage(OperationStage.EXECUTING, 

590 data_store=self.data_store) 

591 

592 elif operation_stage == OperationStage.COMPLETED: 

593 job.update_operation_stage(OperationStage.COMPLETED, 

594 data_store=self.data_store) 

595 

596 if self._is_instrumented: 

597 average_order, average_time = self.__queue_time_average 

598 

599 average_order += 1 

600 if average_order <= 1: 

601 average_time = job.query_queue_time() 

602 else: 

603 queue_time = job.query_queue_time() 

604 average_time = average_time + ((queue_time - average_time) / average_order) 

605 

606 self.__queue_time_average = average_order, average_time 

607 

608 if not job.holds_cached_result: 

609 execution_metadata = job.action_result.execution_metadata 

610 context_metadata = {'instance-name': self.instance_name} if self.instance_name else None 

611 

612 queued = execution_metadata.queued_timestamp.ToDatetime() 

613 worker_start = execution_metadata.worker_start_timestamp.ToDatetime() 

614 worker_completed = execution_metadata.worker_completed_timestamp.ToDatetime() 

615 fetch_start = execution_metadata.input_fetch_start_timestamp.ToDatetime() 

616 fetch_completed = execution_metadata.input_fetch_completed_timestamp.ToDatetime() 

617 execution_start = execution_metadata.execution_start_timestamp.ToDatetime() 

618 execution_completed = execution_metadata.execution_completed_timestamp.ToDatetime() 

619 upload_start = execution_metadata.output_upload_start_timestamp.ToDatetime() 

620 upload_completed = execution_metadata.output_upload_completed_timestamp.ToDatetime() 

621 

622 # Emit build inputs fetching time record: 

623 input_fetch_time = fetch_completed - fetch_start 

624 publish_timer_metric( 

625 INPUTS_FETCHING_TIME_METRIC_NAME, 

626 input_fetch_time, 

627 metadata=context_metadata) 

628 

629 # Emit build execution time record: 

630 execution_time = execution_completed - execution_start 

631 publish_timer_metric( 

632 EXECUTION_TIME_METRIC_NAME, execution_time, 

633 metadata=context_metadata) 

634 

635 # Emit build outputs uploading time record: 

636 output_upload_time = upload_completed - upload_start 

637 publish_timer_metric( 

638 OUTPUTS_UPLOADING_TIME_METRIC_NAME, output_upload_time, 

639 metadata=context_metadata) 

640 

641 # Emit total queued time record: 

642 # This calculates the queue time based purely on 

643 # values set in the ActionResult's ExecutedActionMetadata, 

644 # which may be ever so slightly different than what 

645 # the job object's queued_time is. 

646 total_queued_time = worker_start - queued 

647 publish_timer_metric( 

648 QUEUED_TIME_METRIC_NAME, total_queued_time, 

649 metadata=context_metadata) 

650 

651 # Emit total time spent in worker 

652 total_worker_time = worker_completed - worker_start 

653 publish_timer_metric( 

654 WORKER_HANDLING_TIME_METRIC_NAME, total_worker_time, 

655 metadata=context_metadata) 

656 

657 # Emit total build handling time record: 

658 total_handling_time = worker_completed - queued 

659 publish_timer_metric( 

660 TOTAL_HANDLING_TIME_METRIC_NAME, total_handling_time, 

661 metadata=context_metadata)