Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/job.py: 84.57%

350 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from datetime import datetime 

17import logging 

18import uuid 

19from typing import Dict, List, Optional, Set 

20 

21from google.protobuf.duration_pb2 import Duration 

22from google.protobuf.timestamp_pb2 import Timestamp 

23 

24from buildgrid._enums import LeaseState, OperationStage 

25from buildgrid._exceptions import CancelledError, NotFoundError, UpdateNotAllowedError 

26from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

27from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

28from buildgrid._protos.google.longrunning import operations_pb2 

29from buildgrid._protos.google.rpc import code_pb2, status_pb2 

30 

31 

32class Job: 

33 def __init__(self, *, do_not_cache: bool, 

34 action: remote_execution_pb2.Action, 

35 action_digest: remote_execution_pb2.Digest, 

36 platform_requirements: Optional[Dict[str, Set[str]]]=None, 

37 priority: int=0, 

38 name: Optional[str]=None, 

39 operation_names: Optional[List[str]]=None, 

40 cancelled_operation_names: Optional[List[str]]=None, 

41 lease: Optional[bots_pb2.Lease]=None, 

42 stage: OperationStage=OperationStage.UNKNOWN, 

43 cancelled: bool=False, 

44 queued_timestamp: Optional[Timestamp]=None, 

45 queued_time_duration: Optional[Duration]=None, 

46 worker_start_timestamp: Optional[Timestamp]=None, 

47 worker_completed_timestamp: Optional[Timestamp]=None, 

48 result: Optional[remote_execution_pb2.ExecuteResponse]=None, 

49 worker_name: Optional[str]=None, 

50 n_tries: int=0, 

51 status_code: Optional[int]=None, 

52 stdout_stream_name: Optional[str]=None, 

53 stdout_stream_write_name: Optional[str]=None, 

54 stderr_stream_name: Optional[str]=None, 

55 stderr_stream_write_name: Optional[str]=None): 

56 self.__logger = logging.getLogger(__name__) 

57 

58 self._name = name or str(uuid.uuid4()) 

59 self._priority = priority 

60 self._lease = lease 

61 

62 self.__execute_response = result or remote_execution_pb2.ExecuteResponse() 

63 

64 self.__queued_timestamp = Timestamp() 

65 if queued_timestamp is not None: 

66 self.__queued_timestamp.CopyFrom(queued_timestamp) 

67 

68 self.__queued_time_duration = Duration() 

69 if queued_time_duration is not None: 

70 self.__queued_time_duration.CopyFrom(queued_time_duration) 

71 

72 self.__worker_start_timestamp = Timestamp() 

73 if worker_start_timestamp is not None: 

74 self.__worker_start_timestamp.CopyFrom(worker_start_timestamp) 

75 

76 self.__worker_completed_timestamp = Timestamp() 

77 if worker_completed_timestamp is not None: 

78 self.__worker_completed_timestamp.CopyFrom(worker_completed_timestamp) 

79 

80 self._action = action 

81 self._action_digest = action_digest 

82 

83 # Keep two lists of operations: all and cancelled 

84 # The fields (other than cancelled) will be populated with the job 

85 # details when an operation is accessed through `get_operation(operation_name)` 

86 if operation_names is None: 

87 operation_names = [] 

88 if cancelled_operation_names is None: 

89 cancelled_operation_names = [] 

90 self.__operations_all = set(operation_names) 

91 self.__operations_cancelled = set(cancelled_operation_names) 

92 self.__lease_cancelled = cancelled 

93 self.__job_cancelled = cancelled 

94 

95 self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata() 

96 self.__operation_metadata.action_digest.CopyFrom(action_digest) 

97 self.__operation_metadata.stage = stage.value 

98 if stdout_stream_name is not None: 

99 self.__operation_metadata.stdout_stream_name = stdout_stream_name 

100 self._stdout_stream_write_name = stdout_stream_write_name 

101 if stderr_stream_name is not None: 

102 self.__operation_metadata.stderr_stream_name = stderr_stream_name 

103 self._stderr_stream_write_name = stderr_stream_write_name 

104 

105 self._do_not_cache = do_not_cache 

106 self._n_tries = n_tries 

107 self._status_code = status_code 

108 

109 self._platform_requirements = platform_requirements \ 

110 if platform_requirements else {} 

111 

112 self.worker_name = worker_name 

113 

114 def __str__(self): 

115 return (f"Job: name=[{self.name}], action_digest=" 

116 f"[{self.action_digest.hash}/{self.action_digest.size_bytes}]") 

117 

118 def __lt__(self, other): 

119 try: 

120 return self.priority < other.priority 

121 except AttributeError: 

122 return NotImplemented 

123 

124 def __le__(self, other): 

125 try: 

126 return self.priority <= other.priority 

127 except AttributeError: 

128 return NotImplemented 

129 

130 def __eq__(self, other): 

131 if isinstance(other, Job): 

132 return self.name == other.name 

133 return False 

134 

135 def __ne__(self, other): 

136 return not self.__eq__(other) 

137 

138 def __gt__(self, other): 

139 try: 

140 return self.priority > other.priority 

141 except AttributeError: 

142 return NotImplemented 

143 

144 def __ge__(self, other): 

145 try: 

146 return self.priority >= other.priority 

147 except AttributeError: 

148 return NotImplemented 

149 

150 # --- Public API --- 

151 

152 @property 

153 def name(self): 

154 return self._name 

155 

156 @property 

157 def cancelled(self): 

158 return self.__job_cancelled 

159 

160 @property 

161 def priority(self): 

162 return self._priority 

163 

164 def set_priority(self, new_priority, *, data_store): 

165 self._priority = new_priority 

166 data_store.update_job(self.name, {'priority': new_priority}) 

167 

168 @property 

169 def done(self): 

170 return self.operation_stage == OperationStage.COMPLETED 

171 

172 # --- Public API: REAPI --- 

173 

174 @property 

175 def platform_requirements(self): 

176 return self._platform_requirements 

177 

178 @property 

179 def do_not_cache(self): 

180 return self._do_not_cache 

181 

182 @property 

183 def action_digest(self): 

184 return self._action_digest 

185 

186 @property 

187 def action(self): 

188 return self._action 

189 

190 @property 

191 def operation_stage(self): 

192 return OperationStage(self.__operation_metadata.stage) 

193 

194 @property 

195 def operation_metadata(self): 

196 return self.get_metadata(writeable_streams=False) 

197 

198 @property 

199 def action_result(self): 

200 if self.__execute_response is not None: 

201 return self.__execute_response.result 

202 else: 

203 return None 

204 

205 @property 

206 def execute_response(self): 

207 return self.__execute_response 

208 

209 @execute_response.setter 

210 def execute_response(self, response): 

211 self.__execute_response = response 

212 

213 @property 

214 def holds_cached_result(self): 

215 if self.__execute_response is not None: 

216 return self.__execute_response.cached_result 

217 else: 

218 return False 

219 

220 @property 

221 def queued_timestamp(self) -> Timestamp: 

222 return self.__queued_timestamp 

223 

224 @property 

225 def queued_timestamp_as_datetime(self) -> Optional[datetime]: 

226 if self.__queued_timestamp.ByteSize(): 

227 return self.__queued_timestamp.ToDatetime() 

228 return None 

229 

230 @property 

231 def queued_time_duration(self): 

232 return self.__queued_time_duration 

233 

234 @property 

235 def worker_start_timestamp(self) -> Timestamp: 

236 return self.__worker_start_timestamp 

237 

238 @property 

239 def worker_start_timestamp_as_datetime(self) -> Optional[datetime]: 

240 if self.__worker_start_timestamp.ByteSize(): 

241 return self.__worker_start_timestamp.ToDatetime() 

242 return None 

243 

244 @property 

245 def worker_completed_timestamp(self) -> Timestamp: 

246 return self.__worker_completed_timestamp 

247 

248 @property 

249 def worker_completed_timestamp_as_datetime(self) -> Optional[datetime]: 

250 if self.__worker_completed_timestamp.ByteSize(): 

251 return self.__worker_completed_timestamp.ToDatetime() 

252 return None 

253 

254 def mark_worker_started(self): 

255 self.__worker_start_timestamp.GetCurrentTime() 

256 

257 def get_metadata(self, writeable_streams=False): 

258 operation_metadata = remote_execution_pb2.ExecuteOperationMetadata() 

259 operation_metadata.CopyFrom(self.__operation_metadata) 

260 if writeable_streams and self._stdout_stream_write_name: 

261 operation_metadata.stdout_stream_name = self._stdout_stream_write_name 

262 if writeable_streams and self._stderr_stream_write_name: 

263 operation_metadata.stderr_stream_name = self._stderr_stream_write_name 

264 if self.__job_cancelled: 

265 operation_metadata.stage = OperationStage.COMPLETED.value 

266 return operation_metadata 

267 

268 def set_action_url(self, url): 

269 """Generates a CAS browser URL for the job's action.""" 

270 if url.for_message('action', self.action_digest): 

271 self.__execute_response.message = url.generate() 

272 

273 def set_cached_result(self, action_result, data_store): 

274 """Allows specifying an action result from the action cache for the job. 

275 

276 Note: 

277 This won't trigger any :class:`Operation` stage transition. 

278 

279 Args: 

280 action_result (ActionResult): The result from cache. 

281 """ 

282 self.__execute_response.result.CopyFrom(action_result) 

283 self.__execute_response.cached_result = True 

284 data_store.store_response(self) 

285 

286 def n_peers(self, watch_spec): 

287 if watch_spec is None: 

288 return 0 

289 return len(watch_spec.peers) 

290 

291 def n_peers_for_operation(self, operation_name, watch_spec): 

292 if watch_spec is None: 

293 return 0 

294 return len(watch_spec.peers_for_operation(operation_name)) 

295 

296 def register_new_operation(self, *, data_store, request_metadata=None): 

297 """Subscribes to a new job's :class:`Operation` stage changes. 

298 

299 Returns: 

300 str: The name of the subscribed :class:`Operation`. 

301 """ 

302 new_operation_name = str(uuid.uuid4()) 

303 self.__operations_all.add(new_operation_name) 

304 

305 data_store.create_operation( 

306 operation_name=new_operation_name, 

307 job_name=self._name, 

308 request_metadata=request_metadata) 

309 self.__logger.debug(f"Operation created for job [{self._name}]: [{new_operation_name}]") 

310 

311 return new_operation_name 

312 

313 def get_all_operations(self) -> List[operations_pb2.Operation]: 

314 """Gets all :class:`Operation` objects related to a job. 

315 

316 Returns: 

317 list: A list of :class:`Operation` objects. 

318 """ 

319 return [ 

320 self.get_operation(operation_name) for operation_name in self.__operations_all 

321 ] 

322 

323 def get_operation(self, operation_name: str) -> operations_pb2.Operation: 

324 """Returns a copy of the the job's :class:`Operation` 

325 with all the fields populated from the job fields 

326 

327 Args: 

328 operation_name (str): the operation's name. 

329 

330 Raises: 

331 NotFoundError: If no operation with `operation_name` exists. 

332 """ 

333 self.__logger.debug(f"get_operation({operation_name})") 

334 if operation_name not in self.__operations_all: 

335 raise NotFoundError(f"Operation does not exist for job: operation_name=[{operation_name}], " 

336 f"job_name=[{self._name}]") 

337 

338 cancelled = operation_name in self.__operations_cancelled 

339 

340 # Create new proto and populate fields 

341 operation_proto = operations_pb2.Operation() 

342 operation_proto.name = operation_name 

343 operation_proto.done = self.done or cancelled 

344 

345 # Pack the metadata from the job 

346 operation_metadata = remote_execution_pb2.ExecuteOperationMetadata() 

347 operation_metadata.CopyFrom(self.operation_metadata) 

348 # Set the stage to COMPLETED if the operation is cancelled 

349 if operation_name in self.__operations_cancelled: 

350 operation_metadata.stage = OperationStage.COMPLETED.value 

351 operation_proto.metadata.Pack(operation_metadata) 

352 

353 # Set the status code/execute response as appropriate 

354 if cancelled: 

355 operation_proto.error.CopyFrom(status_pb2.Status(code=code_pb2.CANCELLED)) 

356 elif self._status_code is not None and self._status_code != code_pb2.OK: 

357 # If there was an error, populate the error field. 

358 operation_proto.error.CopyFrom(status_pb2.Status(code=self._status_code)) 

359 else: 

360 # Otherwise, pack the response. 

361 execute_response = self.execute_response 

362 operation_proto.response.Pack(execute_response) 

363 

364 return operation_proto 

365 

366 def update_operation_stage(self, stage, *, data_store): 

367 """Operates a stage transition for the job's :class:`Operation`. 

368 

369 Args: 

370 stage (OperationStage): the operation stage to transition to. 

371 """ 

372 if stage.value == self.__operation_metadata.stage: 

373 return 

374 

375 changes = {} 

376 

377 self.__operation_metadata.stage = stage.value 

378 changes["stage"] = stage.value 

379 

380 self.__logger.debug( 

381 f"Stage changed for job [{self._name}]: [{stage.name}] (operation)") 

382 

383 if self.__operation_metadata.stage == OperationStage.QUEUED.value: 

384 if self.__queued_timestamp.ByteSize() == 0: 

385 self.__queued_timestamp.GetCurrentTime() 

386 changes["queued_timestamp"] = self.queued_timestamp_as_datetime 

387 self._n_tries += 1 

388 changes["n_tries"] = self._n_tries 

389 

390 elif self.__operation_metadata.stage == OperationStage.EXECUTING.value: 

391 queue_in, queue_out = self.queued_timestamp_as_datetime, datetime.utcnow() 

392 if queue_in: 

393 self.__queued_time_duration.FromTimedelta(queue_out - queue_in) 

394 changes["queued_time_duration"] = self.__queued_time_duration.seconds 

395 else: 

396 self.__logger.warning("Tried to calculate `queued_time_duration` " 

397 "but initial queue time wasn't set.") 

398 

399 data_store.update_job(self.name, changes) 

400 

401 def cancel_all_operations(self, *, data_store): 

402 for operation_name in self.__operations_all: 

403 self.cancel_operation(operation_name, data_store=data_store) 

404 

405 def cancel_operation(self, operation_name, *, data_store): 

406 """Triggers a job's :class:`Operation` cancellation. 

407 

408 This may cancel any job's :class:`Lease` that may have been issued. 

409 

410 Args: 

411 operation_name (str): the operation's name. 

412 

413 Raises: 

414 NotFoundError: If no operation with `operation_name` exists. 

415 """ 

416 # NOTE: This assumes that operations weren't created in a sql backend 

417 # from a different buildgrid 

418 if operation_name not in self.__operations_all: 

419 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

420 

421 if operation_name in self.__operations_cancelled: 

422 self.__logger.debug( 

423 f"Tried to cancel operation for job [{self._name}]: [{operation_name}] but it was already cancelled.") 

424 else: 

425 # Mark operation as cancelled and update in data_store 

426 self.__operations_cancelled.add(operation_name) 

427 data_store.update_operation(operation_name, {"cancelled": True}) 

428 

429 self.__logger.debug( 

430 f"Operation cancelled for job [{self._name}]: [{operation_name}]") 

431 

432 self._mark_job_as_cancelled_if_all_operations_cancelled(data_store=data_store) 

433 

434 def _mark_job_as_cancelled_if_all_operations_cancelled(self, *, data_store): 

435 # If all the operations are cancelled, then mark the job as cancelled too 

436 

437 # NOTE: In the case of multiple buildgrid instances we may want to 

438 # check with the data_store first, or, handle that there as there may be more 

439 # operations created from different buildgrid instances 

440 

441 self.__job_cancelled = self.__operations_all.issubset(self.__operations_cancelled) 

442 

443 if self.__job_cancelled: 

444 self.__operation_metadata.stage = OperationStage.COMPLETED.value 

445 self.__worker_completed_timestamp.GetCurrentTime() 

446 changes = { 

447 "stage": OperationStage.COMPLETED.value, 

448 "cancelled": True, 

449 "worker_completed_timestamp": self.worker_completed_timestamp_as_datetime 

450 } 

451 

452 data_store.update_job(self.name, changes) 

453 if self._lease is not None: 

454 self.cancel_lease(data_store=data_store) 

455 

456 # --- Public API: RWAPI --- 

457 

458 @property 

459 def lease(self): 

460 return self._lease 

461 

462 @property 

463 def lease_state(self): 

464 if self._lease is not None: 

465 return LeaseState(self._lease.state) 

466 else: 

467 return None 

468 

469 @property 

470 def lease_cancelled(self): 

471 return self.__lease_cancelled 

472 

473 @property 

474 def n_tries(self): 

475 return self._n_tries 

476 

477 @property 

478 def status_code(self): 

479 return self._status_code 

480 

481 def create_lease(self, worker_name, bot_id=None, *, data_store): 

482 """Emits a new :class:`Lease` for the job. 

483 

484 Only one :class:`Lease` can be emitted for a given job. This method 

485 should only be used once, any further calls are ignored. 

486 

487 Args: 

488 worker_name (string): The name of the worker this lease is for. 

489 bot_id (string): The name of the corresponding bot for this job's worker. 

490 """ 

491 if self._lease is not None: 

492 return self._lease 

493 elif self.__job_cancelled: 

494 return None 

495 

496 self._lease = bots_pb2.Lease() 

497 self._lease.id = self._name 

498 

499 if self.action is not None: 

500 self._lease.payload.Pack(self.action) 

501 else: 

502 self._lease.payload.Pack(self.action_digest) 

503 

504 self._lease.state = LeaseState.UNSPECIFIED.value 

505 

506 if bot_id is None: 

507 bot_id = "UNKNOWN" 

508 self.__logger.debug( 

509 f"Lease created for job [{self._name}]: [{self._lease.id}] (assigned to bot [{bot_id}])") 

510 

511 self.update_lease_state(LeaseState.PENDING, skip_lease_persistence=True, data_store=data_store) 

512 

513 self.worker_name = worker_name 

514 

515 return self._lease 

516 

517 def update_lease_state(self, state, status=None, result=None, 

518 action_cache=None, *, data_store, 

519 skip_lease_persistence=False, skip_notify=False): 

520 """Operates a state transition for the job's current :class:`Lease`. 

521 

522 Args: 

523 state (LeaseState): the lease state to transition to. 

524 status (google.rpc.Status, optional): the lease execution status, 

525 only required if `state` is `COMPLETED`. 

526 result (google.protobuf.Any, optional): the lease execution result, 

527 only required if `state` is `COMPLETED`. 

528 action_cache (ActionCache) : The ActionCache object, 

529 only needed to store results 

530 skip_lease_persistence (bool) : Whether to skip storing the lease 

531 in the datastore for now 

532 skip_notify (bool) : Whether to skip notifying of job changes 

533 """ 

534 if self._lease is None: 

535 msg = f"Called update_lease_state on job=[{self._name}] but job has no lease" 

536 self.__logger.debug(msg) 

537 raise NotFoundError(msg) 

538 

539 if state.value == self._lease.state: 

540 return 

541 

542 job_changes = {} 

543 lease_changes = {} 

544 

545 self._lease.state = state.value 

546 lease_changes["state"] = state.value 

547 

548 self.__logger.debug(f"State changed for job [{self._name}]: [{state.name}] (lease)") 

549 

550 if self._lease.state == LeaseState.PENDING.value: 

551 self.__worker_start_timestamp.Clear() 

552 self.__worker_completed_timestamp.Clear() 

553 job_changes["worker_start_timestamp"] = self.worker_start_timestamp_as_datetime 

554 job_changes["worker_completed_timestamp"] = self.worker_completed_timestamp_as_datetime 

555 

556 self._lease.status.Clear() 

557 self._lease.result.Clear() 

558 lease_changes["status"] = self._lease.status.code 

559 

560 elif self._lease.state == LeaseState.COMPLETED.value: 

561 self.__worker_completed_timestamp.GetCurrentTime() 

562 job_changes["worker_completed_timestamp"] = self.worker_completed_timestamp_as_datetime 

563 

564 action_result = remote_execution_pb2.ActionResult() 

565 

566 # TODO: Make a distinction between build and bot failures! 

567 if status.code != code_pb2.OK: 

568 self._do_not_cache = True 

569 job_changes["do_not_cache"] = True 

570 

571 self._status_code = status.code 

572 lease_changes["status"] = status.code 

573 

574 if result is not None and result.Is(action_result.DESCRIPTOR): 

575 result.Unpack(action_result) 

576 

577 action_metadata = action_result.execution_metadata 

578 action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp) 

579 action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp) 

580 action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp) 

581 

582 self.__execute_response.result.CopyFrom(action_result) 

583 self.__execute_response.cached_result = False 

584 self.__execute_response.status.CopyFrom(status) 

585 

586 response_changes = data_store.store_response(self, commit_changes=False) 

587 if response_changes: 

588 job_changes.update(response_changes) 

589 

590 if (action_cache is not None and 

591 action_cache.allow_updates and not self.do_not_cache): 

592 try: 

593 action_cache.update_action_result(self.action_digest, self.action_result) 

594 self.__logger.debug(f"Stored action result=[{self.action_result}] for " 

595 f"action_digest=[{self.action_digest}] in ActionCache") 

596 except UpdateNotAllowedError: 

597 # The configuration doesn't allow updating the old result 

598 self.__logger.exception( 

599 "ActionCache is not configured to allow updates, ActionResult for action_digest=" 

600 f"[{self.action_digest.hash}/{self.action_digest.size_bytes}] wasn't updated.") 

601 except Exception: 

602 self.__logger.exception("Unable to update ActionCache for action " 

603 f"[{self.action_digest.hash}/{self.action_digest.size_bytes}], " 

604 "results will not be stored in the ActionCache") 

605 

606 data_store.update_job(self.name, job_changes, skip_notify=skip_notify) 

607 if not skip_lease_persistence: 

608 data_store.update_lease(self.name, lease_changes) 

609 

610 def cancel_lease(self, *, data_store): 

611 """Triggers a job's :class:`Lease` cancellation. 

612 

613 Note: 

614 This will not cancel the job's :class:`Operation`. 

615 """ 

616 self.__lease_cancelled = True 

617 

618 self.__logger.debug(f"Lease cancelled for job [{self._name}]: [{self._lease.id}]") 

619 

620 if self._lease is not None: 

621 self.update_lease_state(LeaseState.CANCELLED, data_store=data_store) 

622 

623 def delete_lease(self): 

624 """Discard the job's :class:`Lease`. 

625 

626 Note: 

627 This will not cancel the job's :class:`Operation`. 

628 """ 

629 if self._lease is not None: 

630 self.__worker_start_timestamp.Clear() 

631 self.__worker_completed_timestamp.Clear() 

632 

633 self.__logger.debug(f"Lease deleted for job [{self._name}]: [{self._lease.id}]") 

634 

635 self._lease = None 

636 

637 def set_stdout_stream(self, logstream, *, data_store): 

638 self.__operation_metadata.stdout_stream_name = logstream.name 

639 self._stdout_stream_write_name = logstream.write_resource_name 

640 

641 data_store.update_job(self._name, { 

642 "stdout_stream_name": logstream.name, 

643 "stdout_stream_write_name": logstream.write_resource_name 

644 }) 

645 

646 def set_stderr_stream(self, logstream, *, data_store): 

647 self.__operation_metadata.stderr_stream_name = logstream.name 

648 self._stderr_stream_write_name = logstream.write_resource_name 

649 

650 data_store.update_job(self._name, { 

651 "stderr_stream_name": logstream.name, 

652 "stderr_stream_write_name": logstream.write_resource_name 

653 }) 

654 

655 # --- Public API: Monitoring --- 

656 

657 def query_queue_time(self): 

658 return self.__queued_time_duration.ToTimedelta() 

659 

660 def query_n_retries(self): 

661 return self._n_tries - 1 if self._n_tries > 0 else 0 

662 

663 # --- Private API --- 

664 

665 def get_operation_update(self, operation_name): 

666 """Get an operation update message tuple. 

667 

668 The returned tuple is of the form 

669 

670 (error, operation) 

671 

672 """ 

673 error = None 

674 if operation_name in self.__operations_cancelled: 

675 error = CancelledError("Operation has been cancelled") 

676 operation = self.get_operation(operation_name) 

677 

678 return (error, operation)