Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler.py: 83.96%

293 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Scheduler 

18========= 

19Schedules jobs. 

20""" 

21 

22from datetime import timedelta 

23import logging 

24from time import time 

25from threading import Lock 

26from typing import List, Tuple 

27 

28from buildgrid._protos.google.longrunning import operations_pb2 

29from buildgrid._protos.google.rpc import code_pb2, status_pb2 

30from buildgrid._enums import LeaseState, OperationStage 

31from buildgrid._exceptions import NotFoundError 

32from buildgrid.client.channel import setup_channel 

33from buildgrid.client.logstream import logstream_client 

34from buildgrid.server.job import Job 

35from buildgrid.server.metrics_names import ( 

36 QUEUED_TIME_METRIC_NAME, 

37 WORKER_HANDLING_TIME_METRIC_NAME, 

38 INPUTS_FETCHING_TIME_METRIC_NAME, 

39 OUTPUTS_UPLOADING_TIME_METRIC_NAME, 

40 EXECUTION_TIME_METRIC_NAME, 

41 TOTAL_HANDLING_TIME_METRIC_NAME, 

42 SCHEDULER_CANCEL_OPERATION_TIME_METRIC_NAME, 

43 SCHEDULER_QUEUE_ACTION_TIME_METRIC_NAME, 

44 SCHEDULER_UPDATE_LEASE_TIME_METRIC_NAME 

45) 

46from buildgrid.utils import acquire_lock_or_timeout 

47from buildgrid.server.metrics_utils import ( 

48 DurationMetric, 

49 publish_timer_metric 

50) 

51from buildgrid.server.operations.filtering import OperationFilter 

52 

53 

54class Scheduler: 

55 

56 MAX_N_TRIES = 5 

57 RETRYABLE_STATUS_CODES = (code_pb2.INTERNAL, code_pb2.UNAVAILABLE) 

58 

59 def __init__(self, data_store, action_cache=None, action_browser_url=False, 

60 monitor=False, max_execution_timeout=None, logstream_url=None, 

61 logstream_credentials=None, logstream_instance_name=None): 

62 self.__logger = logging.getLogger(__name__) 

63 

64 self._instance_name = None 

65 self._max_execution_timeout = max_execution_timeout 

66 

67 self.__queue_time_average = None 

68 self.__retries_count = 0 

69 

70 self._action_cache = action_cache 

71 self._action_browser_url = action_browser_url 

72 

73 self._logstream_instance_name = logstream_instance_name 

74 self._logstream_url = logstream_url 

75 if logstream_credentials is None: 

76 logstream_credentials = {} 

77 self._logstream_credentials = logstream_credentials 

78 self._logstream_channel = None 

79 

80 self.__operation_lock = Lock() # Lock protecting deletion, addition and updating of jobs 

81 

82 self.data_store = data_store 

83 if self._action_browser_url: 

84 self.data_store.set_action_browser_url(self._action_browser_url) 

85 

86 self._is_instrumented = False 

87 if monitor: 

88 self.activate_monitoring() 

89 

90 # --- Public API --- 

91 

92 @property 

93 def instance_name(self): 

94 return self._instance_name 

95 

96 def set_instance_name(self, instance_name): 

97 if not self._instance_name: 

98 self._instance_name = instance_name 

99 self.data_store.set_instance_name(instance_name) 

100 

101 def setup_grpc(self): 

102 self.data_store.setup_grpc() 

103 

104 if self._action_cache is not None: 

105 self._action_cache.setup_grpc() 

106 

107 if self._logstream_channel is None and self._logstream_url is not None: 

108 self._logstream_channel, _ = setup_channel( 

109 self._logstream_url, 

110 auth_token=None, 

111 client_key=self._logstream_credentials.get("tls-client-key"), 

112 client_cert=self._logstream_credentials.get("tls-client-cert"), 

113 server_cert=self._logstream_credentials.get("tls-server-cert") 

114 ) 

115 

116 # --- Public API: REAPI --- 

117 

118 def register_job_peer(self, job_name, peer, request_metadata=None): 

119 """Subscribes to the job's :class:`Operation` stage changes. 

120 

121 Args: 

122 job_name (str): name of the job to subscribe to. 

123 peer (str): a unique string identifying the client. 

124 

125 Returns: 

126 str: The name of the subscribed :class:`Operation`. 

127 

128 Raises: 

129 NotFoundError: If no job with `job_name` exists. 

130 TimeoutError: If the operation lock cannot be acquired within a short period of time. 

131 """ 

132 with acquire_lock_or_timeout(self.__operation_lock): 

133 job = self.data_store.get_job_by_name(job_name, max_execution_timeout=self._max_execution_timeout) 

134 

135 if job is None: 

136 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

137 

138 operation_name = job.register_new_operation( 

139 data_store=self.data_store, request_metadata=request_metadata) 

140 

141 self.data_store.watch_job(job, operation_name, peer) 

142 

143 return operation_name 

144 

145 def register_job_operation_peer(self, operation_name, peer): 

146 """Subscribes to an existing the job's :class:`Operation` stage changes. 

147 

148 Args: 

149 operation_name (str): name of the operation to subscribe to. 

150 peer (str): a unique string identifying the client. 

151 

152 Returns: 

153 str: The name of the subscribed :class:`Operation`. 

154 

155 Raises: 

156 NotFoundError: If no operation with `operation_name` exists. 

157 TimeoutError: If the operation lock cannot be acquired within a short period of time. 

158 """ 

159 with acquire_lock_or_timeout(self.__operation_lock): 

160 job = self.data_store.get_job_by_operation(operation_name, 

161 max_execution_timeout=self._max_execution_timeout) 

162 

163 if job is None: 

164 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

165 

166 self.data_store.watch_job(job, operation_name, peer) 

167 

168 def stream_operation_updates(self, operation_name, context): 

169 yield from self.data_store.stream_operation_updates(operation_name, context) 

170 

171 def unregister_job_operation_peer(self, operation_name, peer, discard_unwatched_jobs: bool=False): 

172 """Unsubscribes to one of the job's :class:`Operation` stage change. 

173 

174 Args: 

175 operation_name (str): name of the operation to unsubscribe from. 

176 peer (str): a unique string identifying the client. 

177 discard_unwatched_jobs (bool): don't remove operation when client rpc is terminated. 

178 

179 Raises: 

180 NotFoundError: If no operation with `operation_name` exists. 

181 TimeoutError: If the operation lock cannot be acquired within a short period of time. 

182 """ 

183 with acquire_lock_or_timeout(self.__operation_lock): 

184 job = self.data_store.get_job_by_operation(operation_name) 

185 

186 if job is None: 

187 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

188 

189 self.data_store.stop_watching_operation(job, operation_name, peer) 

190 

191 if not job.n_peers_for_operation(operation_name, self.data_store.watched_jobs.get(job.name)): 

192 if discard_unwatched_jobs: 

193 self.__logger.info(f"No peers watching the operation, removing: {operation_name}") 

194 self.data_store.delete_operation(operation_name) 

195 

196 if not job.n_peers(self.data_store.watched_jobs.get(job.name)) and job.done and not job.lease: 

197 self.data_store.delete_job(job.name) 

198 

199 @DurationMetric(SCHEDULER_QUEUE_ACTION_TIME_METRIC_NAME, instanced=True) 

200 def queue_job_action(self, action, action_digest, platform_requirements=None, 

201 priority=0, skip_cache_lookup=False): 

202 """Inserts a newly created job into the execution queue. 

203 

204 Warning: 

205 Priority is handle like a POSIX ``nice`` values: a higher value 

206 means a low priority, 0 being default priority. 

207 

208 Args: 

209 action (Action): the given action to queue for execution. 

210 action_digest (Digest): the digest of the given action. 

211 platform_requirements (dict(set)): platform attributes that a worker 

212 must satisfy in order to be assigned the job. (Each key can 

213 have multiple values.) 

214 priority (int): the execution job's priority. 

215 skip_cache_lookup (bool): whether or not to look for pre-computed 

216 result for the given action. 

217 

218 Returns: 

219 str: the newly created job's name. 

220 """ 

221 if platform_requirements is None: 

222 platform_requirements = {} 

223 

224 job = self.data_store.get_job_by_action(action_digest, 

225 max_execution_timeout=self._max_execution_timeout) 

226 

227 if job is not None and not action.do_not_cache: 

228 # If existing job has been cancelled or isn't 

229 # cacheable, create a new one. 

230 if not job.cancelled and not job.do_not_cache: 

231 # Reschedule if priority is now greater: 

232 if priority < job.priority: 

233 job.set_priority(priority, data_store=self.data_store) 

234 

235 if job.operation_stage == OperationStage.QUEUED: 

236 self.data_store.queue_job(job.name) 

237 

238 self.__logger.debug( 

239 f"Job deduplicated for action [{action_digest.hash[:8]}]: [{job.name}] " 

240 f"with new priority: [{priority}]") 

241 else: 

242 self.__logger.debug( 

243 f"Job deduplicated for action [{action_digest.hash[:8]}]: [{job.name}]") 

244 

245 return job.name 

246 

247 job = Job(do_not_cache=action.do_not_cache, 

248 action=action, action_digest=action_digest, 

249 platform_requirements=platform_requirements, 

250 priority=priority) 

251 self.data_store.create_job(job) 

252 

253 self.__logger.debug( 

254 f"Job created for action [{action_digest.hash[:8]}]: " 

255 f"[{job.name} requiring: {job.platform_requirements}, priority: {priority}]") 

256 

257 operation_stage = None 

258 

259 if self._action_cache is not None and not skip_cache_lookup: 

260 try: 

261 action_result = self._action_cache.get_action_result(job.action_digest) 

262 

263 self.__logger.debug( 

264 f"Job cache hit for action [{action_digest.hash[:8]}]: [{job.name}]") 

265 

266 operation_stage = OperationStage.COMPLETED 

267 job.set_cached_result(action_result, self.data_store) 

268 

269 except NotFoundError: 

270 operation_stage = OperationStage.QUEUED 

271 self.data_store.queue_job(job.name) 

272 except Exception: 

273 self.__logger.exception("Checking ActionCache for action " 

274 f"[{action_digest.hash}/{action_digest.size_bytes}] " 

275 "failed.") 

276 operation_stage = OperationStage.QUEUED 

277 self.data_store.queue_job(job.name) 

278 

279 else: 

280 operation_stage = OperationStage.QUEUED 

281 self.data_store.queue_job(job.name) 

282 

283 self._update_job_operation_stage(job.name, operation_stage) 

284 

285 return job.name 

286 

287 def get_job_operation(self, operation_name): 

288 """Retrieves a job's :class:`Operation` by name. 

289 

290 Args: 

291 operation_name (str): name of the operation to query. 

292 

293 Raises: 

294 NotFoundError: If no operation with `operation_name` exists. 

295 """ 

296 job = self.data_store.get_job_by_operation(operation_name, 

297 max_execution_timeout=self._max_execution_timeout) 

298 

299 if job is None: 

300 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

301 

302 return job.get_operation(operation_name) 

303 

304 @DurationMetric(SCHEDULER_CANCEL_OPERATION_TIME_METRIC_NAME, instanced=True) 

305 def cancel_job_operation(self, operation_name): 

306 """"Cancels a job's :class:`Operation` by name. 

307 

308 Args: 

309 operation_name (str): name of the operation to cancel. 

310 

311 Raises: 

312 NotFoundError: If no operation with `operation_name` exists. 

313 """ 

314 job = self.data_store.get_job_by_operation(operation_name) 

315 

316 if job is None: 

317 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

318 

319 job.cancel_operation(operation_name, data_store=self.data_store) 

320 

321 def list_operations(self, 

322 operation_filters: List[OperationFilter]=None, 

323 page_size: int=None, 

324 page_token: str=None) -> Tuple[List[operations_pb2.Operation], str]: 

325 operations, next_token = self.data_store.list_operations( 

326 operation_filters, 

327 page_size, 

328 page_token, 

329 max_execution_timeout=self._max_execution_timeout) 

330 return operations, next_token 

331 

332 # --- Public API: RWAPI --- 

333 

334 def request_job_leases(self, worker_capabilities, timeout=None, worker_name=None, bot_id=None): 

335 """Generates a list of the highest priority leases to be run. 

336 

337 Args: 

338 worker_capabilities (dict): a set of key-value pairs describing the 

339 worker properties, configuration and state at the time of the 

340 request. 

341 timeout (int): time to block waiting on job queue, caps if longer 

342 than MAX_JOB_BLOCK_TIME 

343 worker_name (string): name of the worker requesting the leases. 

344 """ 

345 def assign_lease(job): 

346 self.__logger.info(f"Job scheduled to run: [{job.name}]") 

347 

348 lease = job.lease 

349 

350 if not lease: 

351 # For now, one lease at a time: 

352 lease = job.create_lease(worker_name, bot_id, data_store=self.data_store) 

353 else: 

354 # Update the job with the new worker name assigned to it 

355 job.worker_name = worker_name 

356 

357 if lease: 

358 job.mark_worker_started() 

359 return [lease] 

360 return [] 

361 

362 leases = self.data_store.assign_lease_for_next_job( 

363 worker_capabilities, assign_lease, timeout=timeout) 

364 if leases: 

365 # Update the leases outside of the callback to avoid nested data_store operations 

366 for lease in leases: 

367 self._create_log_stream(lease) 

368 # The lease id and job names are the same, so use that as the job name 

369 self._update_job_operation_stage(lease.id, OperationStage.EXECUTING) 

370 return leases 

371 

372 @DurationMetric(SCHEDULER_UPDATE_LEASE_TIME_METRIC_NAME, instanced=True) 

373 def update_job_lease_state(self, job_name, lease): 

374 """Requests a state transition for a job's current :class:Lease. 

375 

376 Note: 

377 This may trigger a job's :class:`Operation` stage transition. 

378 

379 Args: 

380 job_name (str): name of the job to update lease state from. 

381 lease (Lease): the lease holding the new state. 

382 

383 Raises: 

384 NotFoundError: If no job with `job_name` exists. 

385 """ 

386 job = self.data_store.get_job_by_name(job_name) 

387 

388 if job is None: 

389 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

390 

391 lease_state = LeaseState(lease.state) 

392 

393 operation_stage = None 

394 if lease_state == LeaseState.PENDING: 

395 job.update_lease_state(LeaseState.PENDING, data_store=self.data_store) 

396 operation_stage = OperationStage.QUEUED 

397 

398 elif lease_state == LeaseState.ACTIVE: 

399 job.update_lease_state(LeaseState.ACTIVE, data_store=self.data_store) 

400 operation_stage = OperationStage.EXECUTING 

401 

402 elif lease_state == LeaseState.COMPLETED: 

403 # Check the lease status to determine if the job should be retried 

404 lease_status = lease.status.code 

405 if lease_status in self.RETRYABLE_STATUS_CODES and job.n_tries < self.MAX_N_TRIES: 

406 self.__logger.info(f"Job {job_name} completed with a non-OK retryable status code " 

407 f"{lease_status}, retrying") 

408 self.retry_job_lease(job_name) 

409 else: 

410 # Update lease state to COMPLETED and store result in CAS 

411 # Also store mapping in ActionResult, if job is cacheable 

412 job.update_lease_state(LeaseState.COMPLETED, 

413 status=lease.status, result=lease.result, 

414 action_cache=self._action_cache, 

415 data_store=self.data_store, 

416 skip_notify=True) 

417 try: 

418 self.delete_job_lease(job_name) 

419 except NotFoundError: 

420 # Job already deleted 

421 pass 

422 except TimeoutError: 

423 self.__logger.warning(f"Could not delete job lease_id=[{lease.id}] due to timeout.", 

424 exc_info=True) 

425 

426 operation_stage = OperationStage.COMPLETED 

427 

428 self._update_job_operation_stage(job_name, operation_stage) 

429 

430 def retry_job_lease(self, job_name): 

431 """Re-queues a job on lease execution failure. 

432 

433 Note: 

434 This may trigger a job's :class:`Operation` stage transition. 

435 

436 Args: 

437 job_name (str): name of the job to retry the lease from. 

438 

439 Raises: 

440 NotFoundError: If no job with `job_name` exists. 

441 """ 

442 job = self.data_store.get_job_by_name(job_name) 

443 

444 if job is None: 

445 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

446 

447 updated_operation_stage = None 

448 if job.n_tries >= self.MAX_N_TRIES: 

449 updated_operation_stage = OperationStage.COMPLETED 

450 status = status_pb2.Status(code=code_pb2.ABORTED, 

451 message=f"Job was retried {job.n_tries} unsuccessfully. Aborting.") 

452 job.update_lease_state(LeaseState.COMPLETED, status=status, data_store=self.data_store) 

453 

454 elif not job.cancelled: 

455 if job.done: 

456 self.__logger.info(f"Attempted to re-queue job name=[{job_name}] " 

457 f"but it was already completed.") 

458 return 

459 

460 updated_operation_stage = OperationStage.QUEUED 

461 self.data_store.queue_job(job.name) 

462 

463 job.update_lease_state(LeaseState.PENDING, data_store=self.data_store) 

464 

465 if self._is_instrumented: 

466 self.__retries_count += 1 

467 

468 if updated_operation_stage: 

469 self._update_job_operation_stage(job_name, updated_operation_stage) 

470 

471 def get_job_lease(self, job_name): 

472 """Returns the lease associated to job, if any have been emitted yet. 

473 

474 Args: 

475 job_name (str): name of the job to query the lease from. 

476 

477 Raises: 

478 NotFoundError: If no job with `job_name` exists. 

479 """ 

480 job = self.data_store.get_job_by_name(job_name) 

481 

482 if job is None: 

483 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

484 

485 return job.lease 

486 

487 def delete_job_lease(self, job_name): 

488 """Discards the lease associated with a job. 

489 

490 Args: 

491 job_name (str): name of the job to delete the lease from. 

492 

493 Raises: 

494 NotFoundError: If no job with `job_name` exists. 

495 TimeoutError: If the operation lock cannot be acquired within a short period of time. 

496 """ 

497 with acquire_lock_or_timeout(self.__operation_lock): 

498 job = self.data_store.get_job_by_name(job_name) 

499 

500 if job is None: 

501 raise NotFoundError(f"Job name does not exist: [{job_name}]") 

502 

503 job.delete_lease() 

504 

505 if not job.n_peers(self.data_store.watched_jobs.get(job.name)) and job.done: 

506 self.data_store.delete_job(job.name) 

507 

508 def get_operation_request_metadata(self, operation_name): 

509 return self.data_store.get_operation_request_metadata_by_name(operation_name) 

510 

511 def get_metadata_for_leases(self, leases, writeable_streams=False): 

512 """Return a list of Job metadata for a given list of leases. 

513 

514 Args: 

515 leases (list): List of leases to get Job metadata for. 

516 

517 Returns: 

518 List of tuples of the form 

519 ``('executeoperationmetadata-bin': serialized_metadata)``. 

520 

521 """ 

522 metadata = [] 

523 for lease in leases: 

524 job = self.data_store.get_job_by_name(lease.id) 

525 job_metadata = job.get_metadata(writeable_streams=True) 

526 metadata.append( 

527 ('executeoperationmetadata-bin', job_metadata.SerializeToString())) 

528 

529 return metadata 

530 

531 # --- Public API: Monitoring --- 

532 

533 @property 

534 def is_instrumented(self): 

535 return self._is_instrumented 

536 

537 def activate_monitoring(self): 

538 """Activated jobs monitoring.""" 

539 if self._is_instrumented: 

540 return 

541 

542 self.__queue_time_average = 0, timedelta() 

543 self.__retries_count = 0 

544 

545 self._is_instrumented = True 

546 

547 self.data_store.activate_monitoring() 

548 

549 def deactivate_monitoring(self): 

550 """Deactivated jobs monitoring.""" 

551 if not self._is_instrumented: 

552 return 

553 

554 self._is_instrumented = False 

555 

556 self.__queue_time_average = None 

557 self.__retries_count = 0 

558 

559 self.data_store.deactivate_monitoring() 

560 

561 def get_metrics(self): 

562 return self.data_store.get_metrics() 

563 

564 def query_n_retries(self): 

565 return self.__retries_count 

566 

567 def query_am_queue_time(self): 

568 if self.__queue_time_average is not None: 

569 return self.__queue_time_average[1] 

570 return timedelta() 

571 

572 # --- Private API --- 

573 

574 def _create_log_stream(self, lease): 

575 if not self._logstream_channel: 

576 return 

577 

578 job = self.data_store.get_job_by_name(lease.id) 

579 parent_base = f"{job.action_digest.hash}_{job.action_digest.size_bytes}_{time()}" 

580 stdout_parent = f"{parent_base}_stdout" 

581 stderr_parent = f"{parent_base}_stderr" 

582 with logstream_client(self._logstream_channel, 

583 self._logstream_instance_name) as ls_client: 

584 stdout_stream = ls_client.create(stdout_parent) 

585 stderr_stream = ls_client.create(stderr_parent) 

586 job.set_stdout_stream(stdout_stream, data_store=self.data_store) 

587 job.set_stderr_stream(stderr_stream, data_store=self.data_store) 

588 

589 def _update_job_operation_stage(self, job_name, operation_stage): 

590 """Requests a stage transition for the job's :class:Operations. 

591 

592 Args: 

593 job_name (str): name of the job to query. 

594 operation_stage (OperationStage): the stage to transition to. 

595 

596 Raises: 

597 TimeoutError: If the operation lock cannot be acquired within a short period of time. 

598 """ 

599 with acquire_lock_or_timeout(self.__operation_lock): 

600 job = self.data_store.get_job_by_name(job_name) 

601 

602 if operation_stage == OperationStage.CACHE_CHECK: 

603 job.update_operation_stage(OperationStage.CACHE_CHECK, 

604 data_store=self.data_store) 

605 

606 elif operation_stage == OperationStage.QUEUED: 

607 job.update_operation_stage(OperationStage.QUEUED, 

608 data_store=self.data_store) 

609 

610 elif operation_stage == OperationStage.EXECUTING: 

611 job.update_operation_stage(OperationStage.EXECUTING, 

612 data_store=self.data_store) 

613 

614 elif operation_stage == OperationStage.COMPLETED: 

615 job.update_operation_stage(OperationStage.COMPLETED, 

616 data_store=self.data_store) 

617 

618 if self._is_instrumented: 

619 average_order, average_time = self.__queue_time_average 

620 

621 average_order += 1 

622 if average_order <= 1: 

623 average_time = job.query_queue_time() 

624 else: 

625 queue_time = job.query_queue_time() 

626 average_time = average_time + ((queue_time - average_time) / average_order) 

627 

628 self.__queue_time_average = average_order, average_time 

629 

630 if not job.holds_cached_result: 

631 execution_metadata = job.action_result.execution_metadata 

632 context_metadata = {'instance-name': self.instance_name} if self.instance_name else None 

633 

634 queued = execution_metadata.queued_timestamp.ToDatetime() 

635 worker_start = execution_metadata.worker_start_timestamp.ToDatetime() 

636 worker_completed = execution_metadata.worker_completed_timestamp.ToDatetime() 

637 fetch_start = execution_metadata.input_fetch_start_timestamp.ToDatetime() 

638 fetch_completed = execution_metadata.input_fetch_completed_timestamp.ToDatetime() 

639 execution_start = execution_metadata.execution_start_timestamp.ToDatetime() 

640 execution_completed = execution_metadata.execution_completed_timestamp.ToDatetime() 

641 upload_start = execution_metadata.output_upload_start_timestamp.ToDatetime() 

642 upload_completed = execution_metadata.output_upload_completed_timestamp.ToDatetime() 

643 

644 # Emit build inputs fetching time record: 

645 input_fetch_time = fetch_completed - fetch_start 

646 publish_timer_metric( 

647 INPUTS_FETCHING_TIME_METRIC_NAME, 

648 input_fetch_time, 

649 metadata=context_metadata) 

650 

651 # Emit build execution time record: 

652 execution_time = execution_completed - execution_start 

653 publish_timer_metric( 

654 EXECUTION_TIME_METRIC_NAME, execution_time, 

655 metadata=context_metadata) 

656 

657 # Emit build outputs uploading time record: 

658 output_upload_time = upload_completed - upload_start 

659 publish_timer_metric( 

660 OUTPUTS_UPLOADING_TIME_METRIC_NAME, output_upload_time, 

661 metadata=context_metadata) 

662 

663 # Emit total queued time record: 

664 # This calculates the queue time based purely on 

665 # values set in the ActionResult's ExecutedActionMetadata, 

666 # which may be ever so slightly different than what 

667 # the job object's queued_time is. 

668 total_queued_time = worker_start - queued 

669 publish_timer_metric( 

670 QUEUED_TIME_METRIC_NAME, total_queued_time, 

671 metadata=context_metadata) 

672 

673 # Emit total time spent in worker 

674 total_worker_time = worker_completed - worker_start 

675 publish_timer_metric( 

676 WORKER_HANDLING_TIME_METRIC_NAME, total_worker_time, 

677 metadata=context_metadata) 

678 

679 # Emit total build handling time record: 

680 total_handling_time = worker_completed - queued 

681 publish_timer_metric( 

682 TOTAL_HANDLING_TIME_METRIC_NAME, total_handling_time, 

683 metadata=context_metadata)