Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/job.py: 88.48%

382 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-05 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17import uuid 

18from datetime import datetime, timedelta 

19from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple 

20 

21from google.protobuf.any_pb2 import Any as AnyPB 

22from google.protobuf.duration_pb2 import Duration 

23from google.protobuf.timestamp_pb2 import Timestamp 

24 

25from buildgrid._enums import LeaseState, OperationStage 

26from buildgrid._exceptions import CancelledError, NotFoundError, UpdateNotAllowedError 

27from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

28from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import RequestMetadata 

29from buildgrid._protos.build.bazel.remote.logstream.v1.remote_logstream_pb2 import LogStream 

30from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

31from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import Lease 

32from buildgrid._protos.google.longrunning import operations_pb2 

33from buildgrid._protos.google.rpc import code_pb2, status_pb2 

34from buildgrid._protos.google.rpc.status_pb2 import Status 

35from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

36from buildgrid.server.persistence.interface import DataStoreInterface 

37from buildgrid.utils import BrowserURL, JobWatchSpec 

38 

39# Avoiding a circular import here... gross. 

40if TYPE_CHECKING: 

41 from buildgrid.server.persistence.sql.models import ClientIdentityEntry 

42 

43LOGGER = logging.getLogger(__name__) 

44 

45 

46class Job: 

47 def __init__( 

48 self, 

49 *, 

50 do_not_cache: bool, 

51 action: remote_execution_pb2.Action, 

52 action_digest: remote_execution_pb2.Digest, 

53 platform_requirements: Optional[Dict[str, Set[str]]] = None, 

54 priority: int = 0, 

55 name: Optional[str] = None, 

56 operation_names: Optional[Set[str]] = None, 

57 cancelled_operation_names: Optional[Set[str]] = None, 

58 lease: Optional[bots_pb2.Lease] = None, 

59 stage: OperationStage = OperationStage.UNKNOWN, 

60 cancelled: bool = False, 

61 queued_timestamp: Optional[Timestamp] = None, 

62 queued_time_duration: Optional[Duration] = None, 

63 worker_start_timestamp: Optional[Timestamp] = None, 

64 worker_completed_timestamp: Optional[Timestamp] = None, 

65 result: Optional[remote_execution_pb2.ExecuteResponse] = None, 

66 worker_name: Optional[str] = None, 

67 n_tries: int = 0, 

68 status_code: Optional[int] = None, 

69 stdout_stream_name: Optional[str] = None, 

70 stdout_stream_write_name: Optional[str] = None, 

71 stderr_stream_name: Optional[str] = None, 

72 stderr_stream_write_name: Optional[str] = None, 

73 ): 

74 self._name = name or str(uuid.uuid4()) 

75 self._priority = priority 

76 self._lease = lease 

77 

78 self.__execute_response = result or remote_execution_pb2.ExecuteResponse() 

79 

80 self.__queued_timestamp = Timestamp() 

81 if queued_timestamp is not None: 

82 self.__queued_timestamp.CopyFrom(queued_timestamp) 

83 

84 self.__queued_time_duration = Duration() 

85 if queued_time_duration is not None: 

86 self.__queued_time_duration.CopyFrom(queued_time_duration) 

87 

88 self.__worker_start_timestamp = Timestamp() 

89 if worker_start_timestamp is not None: 

90 self.__worker_start_timestamp.CopyFrom(worker_start_timestamp) 

91 

92 self.__worker_completed_timestamp = Timestamp() 

93 if worker_completed_timestamp is not None: 

94 self.__worker_completed_timestamp.CopyFrom(worker_completed_timestamp) 

95 

96 self._action = action 

97 self._action_digest = action_digest 

98 

99 # Keep two lists of operations: all and cancelled 

100 # The fields (other than cancelled) will be populated with the job 

101 # details when an operation is accessed through `get_operation(operation_name)` 

102 if operation_names is None: 

103 operation_names = set() 

104 if cancelled_operation_names is None: 

105 cancelled_operation_names = set() 

106 self.operations_all = set(operation_names) 

107 self.__operations_cancelled = set(cancelled_operation_names) 

108 self.__lease_cancelled = cancelled 

109 self.__job_cancelled = cancelled 

110 

111 self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata() 

112 self.__operation_metadata.action_digest.CopyFrom(action_digest) 

113 self.__operation_metadata.stage = stage.value 

114 if stdout_stream_name is not None: 

115 self.__operation_metadata.stdout_stream_name = stdout_stream_name 

116 self._stdout_stream_write_name = stdout_stream_write_name 

117 if stderr_stream_name is not None: 

118 self.__operation_metadata.stderr_stream_name = stderr_stream_name 

119 self._stderr_stream_write_name = stderr_stream_write_name 

120 

121 self._do_not_cache = do_not_cache 

122 self._n_tries = n_tries 

123 self._status_code = status_code 

124 

125 self._platform_requirements: Dict[str, Set[str]] = platform_requirements if platform_requirements else {} 

126 

127 self.worker_name = worker_name 

128 

129 def __str__(self) -> str: 

130 return ( 

131 f"Job: name=[{self.name}], action_digest=" f"[{self.action_digest.hash}/{self.action_digest.size_bytes}]" 

132 ) 

133 

134 def __lt__(self, other: Any) -> bool: 

135 try: 

136 return bool(self.priority < other.priority) 

137 except AttributeError: 

138 raise NotImplementedError 

139 

140 def __le__(self, other: Any) -> bool: 

141 try: 

142 return bool(self.priority <= other.priority) 

143 except AttributeError: 

144 raise NotImplementedError 

145 

146 def __eq__(self, other: Any) -> bool: 

147 try: 

148 return bool(self.name == other.name) 

149 except AttributeError: 

150 raise NotImplementedError 

151 

152 def __ne__(self, other: Any) -> bool: 

153 return bool(not self.__eq__(other)) 

154 

155 def __gt__(self, other: Any) -> bool: 

156 try: 

157 return bool(self.priority > other.priority) 

158 except AttributeError: 

159 raise NotImplementedError 

160 

161 def __ge__(self, other: Any) -> bool: 

162 try: 

163 return bool(self.priority >= other.priority) 

164 except AttributeError: 

165 raise NotImplementedError 

166 

167 # --- Public API --- 

168 

169 @property 

170 def name(self) -> str: 

171 return self._name 

172 

173 @property 

174 def cancelled(self) -> bool: 

175 return self.__job_cancelled 

176 

177 @property 

178 def priority(self) -> int: 

179 return self._priority 

180 

181 def set_priority(self, new_priority: int, *, data_store: DataStoreInterface) -> None: 

182 self._priority = new_priority 

183 data_store.update_job(self.name, {"priority": new_priority}) 

184 

185 @property 

186 def done(self) -> bool: 

187 return self.operation_stage == OperationStage.COMPLETED 

188 

189 # --- Public API: REAPI --- 

190 

191 @property 

192 def platform_requirements(self) -> Dict[str, Set[str]]: 

193 return self._platform_requirements 

194 

195 @property 

196 def do_not_cache(self) -> bool: 

197 return self._do_not_cache 

198 

199 @property 

200 def action_digest(self) -> remote_execution_pb2.Digest: 

201 return self._action_digest 

202 

203 @property 

204 def action(self) -> remote_execution_pb2.Action: 

205 return self._action 

206 

207 @property 

208 def operation_stage(self) -> OperationStage: 

209 return OperationStage(self.__operation_metadata.stage) 

210 

211 @property 

212 def operation_metadata(self) -> remote_execution_pb2.ExecuteOperationMetadata: 

213 return self.get_metadata(writeable_streams=False) 

214 

215 @property 

216 def action_result(self) -> Optional[remote_execution_pb2.ActionResult]: 

217 if self.__execute_response is not None: 

218 return self.__execute_response.result 

219 else: 

220 return None 

221 

222 @property 

223 def execute_response(self) -> remote_execution_pb2.ExecuteResponse: 

224 return self.__execute_response 

225 

226 @execute_response.setter 

227 def execute_response(self, response: remote_execution_pb2.ExecuteResponse) -> None: 

228 self.__execute_response = response 

229 

230 @property 

231 def holds_cached_result(self) -> bool: 

232 if self.__execute_response is not None: 

233 return self.__execute_response.cached_result 

234 else: 

235 return False 

236 

237 @property 

238 def queued_timestamp(self) -> Timestamp: 

239 return self.__queued_timestamp 

240 

241 @property 

242 def queued_timestamp_as_datetime(self) -> Optional[datetime]: 

243 if self.__queued_timestamp.ByteSize(): 

244 return self.__queued_timestamp.ToDatetime() 

245 return None 

246 

247 @property 

248 def queued_time_duration(self) -> Duration: 

249 return self.__queued_time_duration 

250 

251 @property 

252 def worker_start_timestamp(self) -> Timestamp: 

253 return self.__worker_start_timestamp 

254 

255 @property 

256 def worker_start_timestamp_as_datetime(self) -> Optional[datetime]: 

257 if self.__worker_start_timestamp.ByteSize(): 

258 return self.__worker_start_timestamp.ToDatetime() 

259 return None 

260 

261 @property 

262 def worker_completed_timestamp(self) -> Timestamp: 

263 return self.__worker_completed_timestamp 

264 

265 @property 

266 def worker_completed_timestamp_as_datetime(self) -> Optional[datetime]: 

267 if self.__worker_completed_timestamp.ByteSize(): 

268 return self.__worker_completed_timestamp.ToDatetime() 

269 return None 

270 

271 def mark_worker_started(self) -> None: 

272 self.__worker_start_timestamp.GetCurrentTime() 

273 

274 def get_metadata(self, writeable_streams: bool = False) -> remote_execution_pb2.ExecuteOperationMetadata: 

275 operation_metadata = remote_execution_pb2.ExecuteOperationMetadata() 

276 operation_metadata.CopyFrom(self.__operation_metadata) 

277 if writeable_streams and self._stdout_stream_write_name: 

278 operation_metadata.stdout_stream_name = self._stdout_stream_write_name 

279 if writeable_streams and self._stderr_stream_write_name: 

280 operation_metadata.stderr_stream_name = self._stderr_stream_write_name 

281 if self.__job_cancelled: 

282 operation_metadata.stage = OperationStage.COMPLETED.value 

283 return operation_metadata 

284 

285 def set_action_url(self, url: BrowserURL) -> None: 

286 """Generates a CAS browser URL for the job's action.""" 

287 if url.for_message("action", self.action_digest): 

288 # TODO Should we cast here to be safe? 

289 self.__execute_response.message = url.generate() # type: ignore[assignment] 

290 

291 def set_cached_result( 

292 self, action_result: remote_execution_pb2.ActionResult, data_store: DataStoreInterface 

293 ) -> None: 

294 """Allows specifying an action result from the action cache for the job. 

295 

296 Note: 

297 This won't trigger any :class:`Operation` stage transition. 

298 

299 Args: 

300 action_result (ActionResult): The result from cache. 

301 """ 

302 self.__execute_response.result.CopyFrom(action_result) 

303 self.__execute_response.cached_result = True 

304 

305 data_store.store_response(self) 

306 

307 def n_peers(self, watch_spec: Optional[JobWatchSpec]) -> int: 

308 if watch_spec is None: 

309 return 0 

310 return len(watch_spec.peers) 

311 

312 def n_peers_for_operation(self, operation_name: str, watch_spec: Optional[JobWatchSpec]) -> int: 

313 if watch_spec is None: 

314 return 0 

315 return len(watch_spec.peers_for_operation(operation_name)) 

316 

317 def register_new_operation( 

318 self, 

319 *, 

320 data_store: DataStoreInterface, 

321 request_metadata: Optional[RequestMetadata] = None, 

322 client_identity: Optional["ClientIdentityEntry"] = None, 

323 ) -> str: 

324 """Subscribes to a new job's :class:`Operation` stage changes. 

325 

326 Returns: 

327 str: The name of the subscribed :class:`Operation`. 

328 """ 

329 new_operation_name = str(uuid.uuid4()) 

330 self.operations_all.add(new_operation_name) 

331 

332 data_store.create_operation( 

333 operation_name=new_operation_name, 

334 job_name=self._name, 

335 request_metadata=request_metadata, 

336 client_identity=client_identity, 

337 ) 

338 LOGGER.debug(f"Operation created for job [{self._name}]: [{new_operation_name}]") 

339 

340 return new_operation_name 

341 

342 def get_all_operations(self) -> List[operations_pb2.Operation]: 

343 """Gets all :class:`Operation` objects related to a job. 

344 

345 Returns: 

346 list: A list of :class:`Operation` objects. 

347 """ 

348 return [self.get_operation(operation_name) for operation_name in self.operations_all] 

349 

350 def get_operation(self, operation_name: str) -> operations_pb2.Operation: 

351 """Returns a copy of the the job's :class:`Operation` 

352 with all the fields populated from the job fields 

353 

354 Args: 

355 operation_name (str): the operation's name. 

356 

357 Raises: 

358 NotFoundError: If no operation with `operation_name` exists. 

359 """ 

360 LOGGER.debug(f"get_operation({operation_name})") 

361 if operation_name not in self.operations_all: 

362 raise NotFoundError( 

363 f"Operation does not exist for job: operation_name=[{operation_name}], " f"job_name=[{self._name}]" 

364 ) 

365 

366 cancelled = operation_name in self.__operations_cancelled 

367 

368 # Create new proto and populate fields 

369 operation_proto = operations_pb2.Operation() 

370 operation_proto.name = operation_name 

371 operation_proto.done = self.done or cancelled 

372 

373 # Pack the metadata from the job 

374 operation_metadata = remote_execution_pb2.ExecuteOperationMetadata() 

375 operation_metadata.CopyFrom(self.operation_metadata) 

376 # Set the stage to COMPLETED if the operation is cancelled 

377 if operation_name in self.__operations_cancelled: 

378 operation_metadata.stage = OperationStage.COMPLETED.value 

379 operation_proto.metadata.Pack(operation_metadata) 

380 

381 # Set the status code/execute response as appropriate 

382 if cancelled: 

383 operation_proto.error.CopyFrom(status_pb2.Status(code=code_pb2.CANCELLED)) 

384 elif self._status_code is not None and self._status_code != code_pb2.OK: 

385 # If there was an error, populate the error field. 

386 operation_proto.error.CopyFrom(status_pb2.Status(code=self._status_code)) 

387 else: 

388 # Otherwise, pack the response. 

389 execute_response = self.execute_response 

390 operation_proto.response.Pack(execute_response) 

391 

392 return operation_proto 

393 

394 def update_operation_stage(self, stage: OperationStage, *, data_store: DataStoreInterface) -> None: 

395 """Operates a stage transition for the job's :class:`Operation`. 

396 

397 Args: 

398 stage (OperationStage): the operation stage to transition to. 

399 """ 

400 if stage.value == self.__operation_metadata.stage: 

401 return 

402 

403 changes: Dict[str, Any] = {} 

404 

405 self.__operation_metadata.stage = stage.value 

406 changes["stage"] = stage.value 

407 

408 LOGGER.debug(f"Stage changed for job [{self._name}]: [{stage.name}] (operation)") 

409 

410 if self.__operation_metadata.stage == OperationStage.QUEUED.value: 

411 if self.__queued_timestamp.ByteSize() == 0: 

412 self.__queued_timestamp.GetCurrentTime() 

413 changes["queued_timestamp"] = self.queued_timestamp_as_datetime 

414 self._n_tries += 1 

415 changes["n_tries"] = self._n_tries 

416 

417 elif self.__operation_metadata.stage == OperationStage.EXECUTING.value: 

418 queue_in, queue_out = self.queued_timestamp_as_datetime, datetime.utcnow() 

419 if queue_in: 

420 self.__queued_time_duration.FromTimedelta(queue_out - queue_in) 

421 changes["queued_time_duration"] = self.__queued_time_duration.seconds 

422 else: 

423 LOGGER.warning("Tried to calculate `queued_time_duration` but initial queue time wasn't set.") 

424 

425 data_store.update_job(self.name, changes) 

426 

427 def cancel_all_operations(self, *, data_store: DataStoreInterface) -> None: 

428 for operation_name in self.operations_all: 

429 self.cancel_operation(operation_name, data_store=data_store) 

430 

431 def cancel_operation(self, operation_name: str, *, data_store: DataStoreInterface) -> None: 

432 """Triggers a job's :class:`Operation` cancellation. 

433 

434 This may cancel any job's :class:`Lease` that may have been issued. 

435 

436 Args: 

437 operation_name (str): the operation's name. 

438 

439 Raises: 

440 NotFoundError: If no operation with `operation_name` exists. 

441 """ 

442 # NOTE: This assumes that operations weren't created in a sql backend 

443 # from a different buildgrid 

444 if operation_name not in self.operations_all: 

445 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

446 

447 if operation_name in self.__operations_cancelled: 

448 LOGGER.debug( 

449 f"Tried to cancel operation for job [{self._name}]: [{operation_name}] but it was already cancelled." 

450 ) 

451 else: 

452 # Mark operation as cancelled and update in data_store 

453 self.__operations_cancelled.add(operation_name) 

454 data_store.update_operation(operation_name, {"cancelled": True}) 

455 

456 LOGGER.debug(f"Operation cancelled for job [{self._name}]: [{operation_name}]") 

457 

458 self._mark_job_as_cancelled_if_all_operations_cancelled(data_store=data_store) 

459 

460 def _mark_job_as_cancelled_if_all_operations_cancelled(self, *, data_store: DataStoreInterface) -> None: 

461 # If all the operations are cancelled, then mark the job as cancelled too 

462 

463 # NOTE: In the case of multiple buildgrid instances we may want to 

464 # check with the data_store first, or, handle that there as there may be more 

465 # operations created from different buildgrid instances 

466 

467 self.__job_cancelled = self.operations_all.issubset(self.__operations_cancelled) 

468 

469 if self.__job_cancelled: 

470 self.__operation_metadata.stage = OperationStage.COMPLETED.value 

471 self.__worker_completed_timestamp.GetCurrentTime() 

472 changes = { 

473 "stage": OperationStage.COMPLETED.value, 

474 "cancelled": True, 

475 "worker_completed_timestamp": self.worker_completed_timestamp_as_datetime, 

476 } 

477 

478 data_store.update_job(self.name, changes) 

479 if self._lease is not None: 

480 self.cancel_lease(data_store=data_store) 

481 

482 # --- Public API: RWAPI --- 

483 

484 @property 

485 def lease(self) -> Optional[Lease]: 

486 return self._lease 

487 

488 @property 

489 def lease_state(self) -> Optional[LeaseState]: 

490 if self._lease is not None: 

491 return LeaseState(self._lease.state) 

492 else: 

493 return None 

494 

495 @property 

496 def lease_cancelled(self) -> bool: 

497 return self.__lease_cancelled 

498 

499 @property 

500 def n_tries(self) -> int: 

501 return self._n_tries 

502 

503 @property 

504 def status_code(self) -> Optional[int]: 

505 return self._status_code 

506 

507 def create_lease( 

508 self, worker_name: str, bot_id: Optional[str] = None, *, data_store: DataStoreInterface 

509 ) -> Optional[Lease]: 

510 """Emits a new :class:`Lease` for the job. 

511 

512 Only one :class:`Lease` can be emitted for a given job. This method 

513 should only be used once, any further calls are ignored. 

514 

515 Args: 

516 worker_name (string): The name of the worker this lease is for. 

517 bot_id (string): The name of the corresponding bot for this job's worker. 

518 """ 

519 if self._lease is not None: 

520 return self._lease 

521 elif self.__job_cancelled: 

522 return None 

523 

524 self._lease = bots_pb2.Lease() 

525 self._lease.id = self._name 

526 

527 if self.action is not None: 

528 self._lease.payload.Pack(self.action) 

529 else: 

530 self._lease.payload.Pack(self.action_digest) 

531 

532 self._lease.state = LeaseState.UNSPECIFIED.value 

533 

534 if bot_id is None: 

535 bot_id = "UNKNOWN" 

536 LOGGER.debug(f"Lease created for job [{self._name}]: [{self._lease.id}] (assigned to bot [{bot_id}])") 

537 

538 self.update_lease_state(LeaseState.PENDING, skip_lease_persistence=True, data_store=data_store) 

539 

540 self.worker_name = worker_name 

541 

542 return self._lease 

543 

544 def update_lease_state( 

545 self, 

546 state: LeaseState, 

547 status: Optional[Status] = None, 

548 result: Optional[AnyPB] = None, 

549 action_cache: Optional[ActionCacheABC] = None, 

550 *, 

551 data_store: DataStoreInterface, 

552 skip_lease_persistence: bool = False, 

553 skip_notify: bool = False, 

554 ) -> None: 

555 """Operates a state transition for the job's current :class:`Lease`. 

556 

557 Args: 

558 state (LeaseState): the lease state to transition to. 

559 status (google.rpc.Status, optional): the lease execution status, 

560 only required if `state` is `COMPLETED`. 

561 result (google.protobuf.Any, optional): the lease execution result, 

562 only required if `state` is `COMPLETED`. 

563 action_cache (ActionCache) : The ActionCache object, 

564 only needed to store results 

565 skip_lease_persistence (bool) : Whether to skip storing the lease 

566 in the datastore for now 

567 skip_notify (bool) : Whether to skip notifying of job changes 

568 """ 

569 if self._lease is None: 

570 msg = f"Called update_lease_state on job=[{self._name}] but job has no lease" 

571 LOGGER.debug(msg) 

572 raise NotFoundError(msg) 

573 

574 if state.value == self._lease.state: 

575 return 

576 

577 job_changes: Dict[str, Any] = {} 

578 lease_changes: Dict[str, Any] = {} 

579 

580 self._lease.state = state.value 

581 lease_changes["state"] = state.value 

582 

583 LOGGER.debug(f"State changed for job [{self._name}]: [{state.name}] (lease)") 

584 

585 if self._lease.state == LeaseState.PENDING.value: 

586 self.__worker_start_timestamp.Clear() 

587 self.__worker_completed_timestamp.Clear() 

588 job_changes["worker_start_timestamp"] = self.worker_start_timestamp_as_datetime 

589 job_changes["worker_completed_timestamp"] = self.worker_completed_timestamp_as_datetime 

590 

591 self._lease.status.Clear() 

592 self._lease.result.Clear() 

593 lease_changes["status"] = self._lease.status.code 

594 

595 elif self._lease.state == LeaseState.COMPLETED.value: 

596 assert status, "Status value not set" 

597 

598 self.__worker_completed_timestamp.GetCurrentTime() 

599 job_changes["worker_completed_timestamp"] = self.worker_completed_timestamp_as_datetime 

600 

601 action_result = remote_execution_pb2.ActionResult() 

602 

603 # TODO: Make a distinction between build and bot failures! 

604 if status.code != code_pb2.OK: 

605 self._do_not_cache = True 

606 job_changes["do_not_cache"] = True 

607 

608 self._status_code = status.code 

609 lease_changes["status"] = status.code 

610 

611 if result is not None and result.Is(action_result.DESCRIPTOR): 

612 result.Unpack(action_result) 

613 

614 action_metadata = action_result.execution_metadata 

615 action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp) 

616 action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp) 

617 action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp) 

618 

619 self.__execute_response.result.CopyFrom(action_result) 

620 self.__execute_response.cached_result = False 

621 self.__execute_response.status.CopyFrom(status) 

622 

623 response_changes = data_store.store_response(self, commit_changes=False) 

624 if response_changes: 

625 job_changes.update(response_changes) 

626 

627 if action_cache is not None and action_cache.allow_updates and not self.do_not_cache: 

628 try: 

629 # TODO needs None check? 

630 action_cache.update_action_result(self.action_digest, self.action_result) # type: ignore[arg-type] 

631 LOGGER.debug( 

632 f"Stored action result=[{self.action_result}] for " 

633 f"action_digest=[{self.action_digest}] in ActionCache" 

634 ) 

635 except UpdateNotAllowedError: 

636 # The configuration doesn't allow updating the old result 

637 LOGGER.exception( 

638 "ActionCache is not configured to allow updates, ActionResult for action_digest=" 

639 f"[{self.action_digest.hash}/{self.action_digest.size_bytes}] wasn't updated." 

640 ) 

641 except Exception: 

642 LOGGER.exception( 

643 "Unable to update ActionCache for action " 

644 f"[{self.action_digest.hash}/{self.action_digest.size_bytes}], " 

645 "results will not be stored in the ActionCache" 

646 ) 

647 

648 data_store.update_job(self.name, job_changes, skip_notify=skip_notify) 

649 if not skip_lease_persistence: 

650 data_store.update_lease(self.name, lease_changes) 

651 

652 def cancel_lease(self, *, data_store: DataStoreInterface) -> None: 

653 """Triggers a job's :class:`Lease` cancellation. 

654 

655 Note: 

656 This will not cancel the job's :class:`Operation`. 

657 """ 

658 self.__lease_cancelled = True 

659 

660 if self._lease is not None: 

661 LOGGER.debug(f"Lease cancelled for job [{self._name}]: [{self._lease.id}]") 

662 self.update_lease_state(LeaseState.CANCELLED, data_store=data_store) 

663 

664 def delete_lease(self) -> None: 

665 """Discard the job's :class:`Lease`. 

666 

667 Note: 

668 This will not cancel the job's :class:`Operation`. 

669 """ 

670 if self._lease is not None: 

671 self.__worker_start_timestamp.Clear() 

672 self.__worker_completed_timestamp.Clear() 

673 

674 LOGGER.debug(f"Lease deleted for job [{self._name}]: [{self._lease.id}]") 

675 

676 self._lease = None 

677 

678 def set_stdout_stream(self, logstream: LogStream, *, data_store: DataStoreInterface) -> None: 

679 self.__operation_metadata.stdout_stream_name = logstream.name 

680 self._stdout_stream_write_name = logstream.write_resource_name 

681 

682 data_store.update_job( 

683 self._name, 

684 {"stdout_stream_name": logstream.name, "stdout_stream_write_name": logstream.write_resource_name}, 

685 ) 

686 

687 def set_stderr_stream(self, logstream: LogStream, *, data_store: DataStoreInterface) -> None: 

688 self.__operation_metadata.stderr_stream_name = logstream.name 

689 self._stderr_stream_write_name = logstream.write_resource_name 

690 

691 data_store.update_job( 

692 self._name, 

693 {"stderr_stream_name": logstream.name, "stderr_stream_write_name": logstream.write_resource_name}, 

694 ) 

695 

696 # --- Public API: Monitoring --- 

697 

698 def query_queue_time(self) -> timedelta: 

699 return self.__queued_time_duration.ToTimedelta() 

700 

701 def query_n_retries(self) -> int: 

702 return self._n_tries - 1 if self._n_tries > 0 else 0 

703 

704 # --- Private API --- 

705 

706 def get_operation_update(self, operation_name: str) -> Tuple[Optional[CancelledError], operations_pb2.Operation]: 

707 """Get an operation update message tuple. 

708 

709 The returned tuple is of the form 

710 

711 (error, operation) 

712 

713 """ 

714 error = None 

715 if operation_name in self.__operations_cancelled: 

716 error = CancelledError("Operation has been cancelled") 

717 operation = self.get_operation(operation_name) 

718 

719 return (error, operation)