Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from datetime import datetime 

17import logging 

18import uuid 

19from typing import List, Optional 

20 

21from google.protobuf.duration_pb2 import Duration 

22from google.protobuf.timestamp_pb2 import Timestamp 

23 

24from buildgrid._enums import LeaseState, OperationStage, BotStatus 

25from buildgrid._exceptions import CancelledError, NotFoundError 

26from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

27from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

28from buildgrid._protos.google.longrunning import operations_pb2 

29from buildgrid._protos.google.rpc import code_pb2, status_pb2 

30 

31 

32class Job: 

33 

34 def __init__(self, do_not_cache, action_digest, platform_requirements=None, priority=0, 

35 name=None, operations=(), cancelled_operations=set(), lease=None, 

36 stage=OperationStage.UNKNOWN.value, cancelled=False, 

37 queued_timestamp=None, queued_time_duration=None, 

38 worker_start_timestamp=None, worker_completed_timestamp=None, 

39 result=None, worker_name=None, n_tries=0, status_code=None): 

40 self.__logger = logging.getLogger(__name__) 

41 

42 self._name = name or str(uuid.uuid4()) 

43 self._priority = priority 

44 self._lease = lease 

45 

46 self.__execute_response = result 

47 if result is None: 

48 self.__execute_response = remote_execution_pb2.ExecuteResponse() 

49 self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata() 

50 

51 self.__queued_timestamp = Timestamp() 

52 if queued_timestamp is not None: 

53 self.__queued_timestamp.CopyFrom(queued_timestamp) 

54 

55 self.__queued_time_duration = Duration() 

56 if queued_time_duration is not None: 

57 self.__queued_time_duration.CopyFrom(queued_time_duration) 

58 

59 self.__worker_start_timestamp = Timestamp() 

60 if worker_start_timestamp is not None: 

61 self.__worker_start_timestamp.CopyFrom(worker_start_timestamp) 

62 

63 self.__worker_completed_timestamp = Timestamp() 

64 if worker_completed_timestamp is not None: 

65 self.__worker_completed_timestamp.CopyFrom(worker_completed_timestamp) 

66 

67 self.__operations_by_name = {op.name: op for op in operations} # Name to Operation 1:1 mapping 

68 self.__operations_cancelled = cancelled_operations 

69 self.__lease_cancelled = cancelled 

70 self.__job_cancelled = cancelled 

71 

72 self.__operation_metadata.action_digest.CopyFrom(action_digest) 

73 self.__operation_metadata.stage = stage 

74 

75 self._do_not_cache = do_not_cache 

76 self._n_tries = n_tries 

77 self._status_code = status_code 

78 

79 self._platform_requirements = platform_requirements \ 

80 if platform_requirements else dict() 

81 

82 self.worker_name = worker_name 

83 

84 def __str__(self): 

85 return (f"Job: name=[{self.name}], action_digest=" 

86 f"[{self.action_digest.hash}/{self.action_digest.size_bytes}]") 

87 

88 def __lt__(self, other): 

89 try: 

90 return self.priority < other.priority 

91 except AttributeError: 

92 return NotImplemented 

93 

94 def __le__(self, other): 

95 try: 

96 return self.priority <= other.priority 

97 except AttributeError: 

98 return NotImplemented 

99 

100 def __eq__(self, other): 

101 if isinstance(other, Job): 

102 return self.name == other.name 

103 return False 

104 

105 def __ne__(self, other): 

106 return not self.__eq__(other) 

107 

108 def __gt__(self, other): 

109 try: 

110 return self.priority > other.priority 

111 except AttributeError: 

112 return NotImplemented 

113 

114 def __ge__(self, other): 

115 try: 

116 return self.priority >= other.priority 

117 except AttributeError: 

118 return NotImplemented 

119 

120 # --- Public API --- 

121 

122 @property 

123 def name(self): 

124 return self._name 

125 

126 @property 

127 def cancelled(self): 

128 return self.__job_cancelled 

129 

130 @property 

131 def priority(self): 

132 return self._priority 

133 

134 def set_priority(self, new_priority, *, data_store): 

135 self._priority = new_priority 

136 data_store.update_job(self.name, {'priority': new_priority}) 

137 

138 @property 

139 def done(self): 

140 return self.operation_stage == OperationStage.COMPLETED 

141 

142 # --- Public API: REAPI --- 

143 

144 @property 

145 def platform_requirements(self): 

146 return self._platform_requirements 

147 

148 @property 

149 def do_not_cache(self): 

150 return self._do_not_cache 

151 

152 @property 

153 def action_digest(self): 

154 return self.__operation_metadata.action_digest 

155 

156 @property 

157 def operation_stage(self): 

158 return OperationStage(self.__operation_metadata.stage) 

159 

160 @property 

161 def action_result(self): 

162 if self.__execute_response is not None: 

163 return self.__execute_response.result 

164 else: 

165 return None 

166 

167 @property 

168 def execute_response(self): 

169 return self.__execute_response 

170 

171 @execute_response.setter 

172 def execute_response(self, response): 

173 self.__execute_response = response 

174 for operation in self.__operations_by_name.values(): 

175 operation.response.Pack(self.__execute_response) 

176 

177 @property 

178 def holds_cached_result(self): 

179 if self.__execute_response is not None: 

180 return self.__execute_response.cached_result 

181 else: 

182 return False 

183 

184 @property 

185 def queued_timestamp(self) -> Timestamp: 

186 return self.__queued_timestamp 

187 

188 @property 

189 def queued_timestamp_as_datetime(self) -> Optional[datetime]: 

190 if self.__queued_timestamp.ByteSize(): 

191 return self.__queued_timestamp.ToDatetime() 

192 return None 

193 

194 @property 

195 def queued_time_duration(self): 

196 return self.__queued_time_duration 

197 

198 @property 

199 def worker_start_timestamp(self) -> Timestamp: 

200 return self.__worker_start_timestamp 

201 

202 @property 

203 def worker_start_timestamp_as_datetime(self) -> Optional[datetime]: 

204 if self.__worker_start_timestamp.ByteSize(): 

205 return self.__worker_start_timestamp.ToDatetime() 

206 return None 

207 

208 @property 

209 def worker_completed_timestamp(self) -> Timestamp: 

210 return self.__worker_completed_timestamp 

211 

212 @property 

213 def worker_completed_timestamp_as_datetime(self) -> Optional[datetime]: 

214 if self.__worker_completed_timestamp.ByteSize(): 

215 return self.__worker_completed_timestamp.ToDatetime() 

216 return None 

217 

218 def mark_worker_started(self): 

219 self.__worker_start_timestamp.GetCurrentTime() 

220 

221 def set_action_url(self, url): 

222 """Generates a CAS browser URL for the job's action.""" 

223 if url.for_message('action', self.__operation_metadata.action_digest): 

224 self.__execute_response.message = url.generate() 

225 

226 def set_cached_result(self, action_result, data_store): 

227 """Allows specifying an action result from the action cache for the job. 

228 

229 Note: 

230 This won't trigger any :class:`Operation` stage transition. 

231 

232 Args: 

233 action_result (ActionResult): The result from cache. 

234 """ 

235 self.__execute_response.result.CopyFrom(action_result) 

236 self.__execute_response.cached_result = True 

237 data_store.store_response(self) 

238 

239 def n_peers(self, watch_spec): 

240 if watch_spec is None: 

241 return 0 

242 return sum(self.n_peers_for_operation(operation_name, watch_spec) 

243 for operation_name in watch_spec.operations) 

244 

245 def n_peers_for_operation(self, operation_name, watch_spec): 

246 if watch_spec is None: 

247 return 0 

248 return len(watch_spec.peers_for_operation(operation_name)) 

249 

250 def register_new_operation(self, *, data_store): 

251 """Subscribes to a new job's :class:`Operation` stage changes. 

252 

253 Returns: 

254 str: The name of the subscribed :class:`Operation`. 

255 """ 

256 new_operation = operations_pb2.Operation() 

257 # Copy state from first existing and non cancelled operation: 

258 for operation in self.__operations_by_name.values(): 

259 if operation.name not in self.__operations_cancelled: 

260 new_operation.CopyFrom(operation) 

261 break 

262 

263 new_operation.name = str(uuid.uuid4()) 

264 

265 self.__logger.debug(f"Operation created for job [{self._name}]: [{new_operation.name}]") 

266 

267 self.__operations_by_name[new_operation.name] = new_operation 

268 

269 data_store.create_operation(new_operation, self._name) 

270 

271 self._update_operations(data_store=data_store) 

272 

273 return new_operation.name 

274 

275 def get_all_operations(self) -> List[operations_pb2.Operation]: 

276 """Gets all :class:`Operation` objects related to a job. 

277 

278 Returns: 

279 list: A list of :class:`Operation` objects. 

280 """ 

281 return [ 

282 self.get_operation(operation_name) for operation_name in self.__operations_by_name.keys() 

283 ] 

284 

285 def get_operation(self, operation_name): 

286 """Returns a copy of the the job's :class:`Operation`. 

287 

288 Args: 

289 operation_name (str): the operation's name. 

290 

291 Raises: 

292 NotFoundError: If no operation with `operation_name` exists. 

293 """ 

294 try: 

295 operation = self.__operations_by_name[operation_name] 

296 

297 except KeyError: 

298 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

299 

300 return self._get_packed_operation(operation) 

301 

302 def update_operation_stage(self, stage, *, data_store): 

303 """Operates a stage transition for the job's :class:`Operation`. 

304 

305 Args: 

306 stage (OperationStage): the operation stage to transition to. 

307 """ 

308 if stage.value == self.__operation_metadata.stage: 

309 return 

310 

311 changes = {} 

312 

313 self.__operation_metadata.stage = stage.value 

314 changes["stage"] = stage.value 

315 

316 self.__logger.debug( 

317 f"Stage changed for job [{self._name}]: [{stage.name}] (operation)") 

318 

319 if self.__operation_metadata.stage == OperationStage.QUEUED.value: 

320 if self.__queued_timestamp.ByteSize() == 0: 

321 self.__queued_timestamp.GetCurrentTime() 

322 changes["queued_timestamp"] = self.queued_timestamp_as_datetime 

323 self._n_tries += 1 

324 changes["n_tries"] = self._n_tries 

325 

326 elif self.__operation_metadata.stage == OperationStage.EXECUTING.value: 

327 queue_in, queue_out = self.queued_timestamp_as_datetime, datetime.utcnow() 

328 if queue_in: 

329 self.__queued_time_duration.FromTimedelta(queue_out - queue_in) 

330 changes["queued_time_duration"] = self.__queued_time_duration.seconds 

331 else: 

332 self.__logger.warning("Tried to calculate `queued_time_duration` " 

333 "but initial queue time wasn't set.") 

334 

335 data_store.update_job(self.name, changes) 

336 

337 self._update_operations(data_store=data_store) 

338 

339 def cancel_all_operations(self, *, data_store): 

340 for operation in self.get_all_operations(): 

341 self.cancel_operation(operation.name, data_store=data_store) 

342 

343 def cancel_operation(self, operation_name, *, data_store): 

344 """Triggers a job's :class:`Operation` cancellation. 

345 

346 This may cancel any job's :class:`Lease` that may have been issued. 

347 

348 Args: 

349 operation_name (str): the operation's name. 

350 

351 Raises: 

352 NotFoundError: If no operation with `operation_name` exists. 

353 """ 

354 try: 

355 operation = self.__operations_by_name[operation_name] 

356 

357 except KeyError: 

358 raise NotFoundError(f"Operation name does not exist: [{operation_name}]") 

359 

360 self.__operations_cancelled.add(operation.name) 

361 

362 self.__logger.debug( 

363 f"Operation cancelled for job [{self._name}]: [{operation.name}]") 

364 

365 ongoing_operations = set(self.__operations_by_name.keys()) 

366 # Job is cancelled if all the operation are: 

367 self.__job_cancelled = ongoing_operations.issubset(self.__operations_cancelled) 

368 

369 if self.__job_cancelled: 

370 self.__operation_metadata.stage = OperationStage.COMPLETED.value 

371 changes = { 

372 "stage": OperationStage.COMPLETED.value, 

373 "cancelled": True 

374 } 

375 data_store.update_job(self.name, changes) 

376 if self._lease is not None: 

377 self.cancel_lease(data_store=data_store) 

378 

379 self._update_operations(data_store=data_store) 

380 

381 # --- Public API: RWAPI --- 

382 

383 @property 

384 def lease(self): 

385 return self._lease 

386 

387 @property 

388 def lease_state(self): 

389 if self._lease is not None: 

390 return LeaseState(self._lease.state) 

391 else: 

392 return None 

393 

394 @property 

395 def lease_cancelled(self): 

396 return self.__lease_cancelled 

397 

398 @property 

399 def n_tries(self): 

400 return self._n_tries 

401 

402 def create_lease(self, worker_name, bot_id=None, *, data_store): 

403 """Emits a new :class:`Lease` for the job. 

404 

405 Only one :class:`Lease` can be emitted for a given job. This method 

406 should only be used once, any further calls are ignored. 

407 

408 Args: 

409 worker_name (string): The name of the worker this lease is for. 

410 bot_id (string): The name of the corresponding bot for this job's worker. 

411 """ 

412 if self._lease is not None: 

413 return self._lease 

414 elif self.__job_cancelled: 

415 return None 

416 

417 self._lease = bots_pb2.Lease() 

418 self._lease.id = self._name 

419 self._lease.payload.Pack(self.__operation_metadata.action_digest) 

420 self._lease.state = LeaseState.UNSPECIFIED.value 

421 

422 if bot_id is None: 

423 bot_id = "UNKNOWN" 

424 self.__logger.debug( 

425 f"Lease created for job [{self._name}]: [{self._lease.id}] (assigned to bot [{bot_id}])") 

426 

427 self.update_lease_state(LeaseState.PENDING, skip_lease_persistence=True, data_store=data_store) 

428 

429 self.worker_name = worker_name 

430 

431 return self._lease 

432 

433 def update_lease_state(self, state, status=None, result=None, 

434 skip_lease_persistence=False, *, data_store): 

435 """Operates a state transition for the job's current :class:`Lease`. 

436 

437 Args: 

438 state (LeaseState): the lease state to transition to. 

439 status (google.rpc.Status, optional): the lease execution status, 

440 only required if `state` is `COMPLETED`. 

441 result (google.protobuf.Any, optional): the lease execution result, 

442 only required if `state` is `COMPLETED`. 

443 """ 

444 if state.value == self._lease.state: 

445 return 

446 

447 job_changes = {} 

448 lease_changes = {} 

449 

450 self._lease.state = state.value 

451 lease_changes["state"] = state.value 

452 

453 self.__logger.debug(f"State changed for job [{self._name}]: [{state.name}] (lease)") 

454 

455 if self._lease.state == LeaseState.PENDING.value: 

456 self.__worker_start_timestamp.Clear() 

457 self.__worker_completed_timestamp.Clear() 

458 job_changes["worker_start_timestamp"] = self.worker_start_timestamp_as_datetime 

459 job_changes["worker_completed_timestamp"] = self.worker_completed_timestamp_as_datetime 

460 

461 self._lease.status.Clear() 

462 self._lease.result.Clear() 

463 lease_changes["status"] = self._lease.status.code 

464 

465 elif self._lease.state == LeaseState.COMPLETED.value: 

466 self.__worker_completed_timestamp.GetCurrentTime() 

467 job_changes["worker_completed_timestamp"] = self.worker_completed_timestamp_as_datetime 

468 

469 action_result = remote_execution_pb2.ActionResult() 

470 

471 # TODO: Make a distinction between build and bot failures! 

472 if status.code != code_pb2.OK: 

473 self._do_not_cache = True 

474 job_changes["do_not_cache"] = True 

475 

476 lease_changes["status"] = status.code 

477 

478 if result is not None and result.Is(action_result.DESCRIPTOR): 

479 result.Unpack(action_result) 

480 

481 action_metadata = action_result.execution_metadata 

482 action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp) 

483 action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp) 

484 action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp) 

485 

486 self.__execute_response.result.CopyFrom(action_result) 

487 self.__execute_response.cached_result = False 

488 self.__execute_response.status.CopyFrom(status) 

489 

490 data_store.update_job(self.name, job_changes) 

491 if not skip_lease_persistence: 

492 data_store.update_lease(self.name, lease_changes) 

493 

494 def cancel_lease(self, *, data_store): 

495 """Triggers a job's :class:`Lease` cancellation. 

496 

497 Note: 

498 This will not cancel the job's :class:`Operation`. 

499 """ 

500 self.__lease_cancelled = True 

501 

502 self.__logger.debug(f"Lease cancelled for job [{self._name}]: [{self._lease.id}]") 

503 

504 if self._lease is not None: 

505 self.update_lease_state(LeaseState.CANCELLED, data_store=data_store) 

506 

507 def delete_lease(self): 

508 """Discard the job's :class:`Lease`. 

509 

510 Note: 

511 This will not cancel the job's :class:`Operation`. 

512 """ 

513 if self._lease is not None: 

514 self.__worker_start_timestamp.Clear() 

515 self.__worker_completed_timestamp.Clear() 

516 

517 self.__logger.debug(f"Lease deleted for job [{self._name}]: [{self._lease.id}]") 

518 

519 self._lease = None 

520 

521 # --- Public API: Monitoring --- 

522 

523 def query_queue_time(self): 

524 return self.__queued_time_duration.ToTimedelta() 

525 

526 def query_n_retries(self): 

527 return self._n_tries - 1 if self._n_tries > 0 else 0 

528 

529 # --- Private API --- 

530 

531 def _get_packed_operation(self, operation: operations_pb2.Operation): 

532 """Return a copy of the operation with the metadata and response packed. 

533 

534 Args: 

535 operation (:class:`buildgrid._protos.google.longrunning.operations_pb2.Operation`): 

536 The operation to copy. 

537 """ 

538 cancelled = operation.name in self.__operations_cancelled 

539 

540 # Create a copy of the operation 

541 new_operation = operations_pb2.Operation() 

542 new_operation.CopyFrom(operation) 

543 

544 # Pack the metadata, setting the stage to COMPLETED if the operation 

545 # is cancelled. 

546 operation_metadata = remote_execution_pb2.ExecuteOperationMetadata() 

547 operation_metadata.CopyFrom(self.__operation_metadata) 

548 if cancelled: 

549 operation_metadata.stage = OperationStage.COMPLETED.value 

550 new_operation.metadata.Pack(operation_metadata) 

551 

552 if cancelled: 

553 new_operation.error.CopyFrom(status_pb2.Status(code=code_pb2.CANCELLED)) 

554 elif self._status_code is not None and self._status_code != code_pb2.OK: 

555 # If there was an error, populate the error field. 

556 new_operation.error.CopyFrom(status_pb2.Status(code=self._status_code)) 

557 else: 

558 # Otherwise, pack the response. 

559 execute_response = remote_execution_pb2.ExecuteResponse() 

560 execute_response.CopyFrom(self.__execute_response) 

561 new_operation.response.Pack(execute_response) 

562 

563 # Set whether or not the operation is done, based on the state of 

564 # the job and whether or not the operation is cancelled (cancelled 

565 # operations are always also done). 

566 new_operation.done = self.done or cancelled 

567 

568 return new_operation 

569 

570 def _update_operations(self, *, data_store): 

571 """Update done/cancelled state of operations in the data store.""" 

572 

573 for operation in self.__operations_by_name.values(): 

574 changes = {} 

575 cancelled = operation.name in self.__operations_cancelled 

576 if cancelled: 

577 operation.done = True 

578 changes["cancelled"] = True 

579 

580 elif self.done and not operation.done: 

581 operation.done = True 

582 

583 if changes: 

584 data_store.update_operation(operation.name, changes) 

585 

586 def get_operation_update(self, operation_name): 

587 """Get an operation update message tuple. 

588 

589 The returned tuple is of the form 

590 

591 (error, operation_state) 

592 

593 """ 

594 operation = self.__operations_by_name[operation_name] 

595 if operation_name in self.__operations_cancelled: 

596 message = (CancelledError("Operation has been cancelled"), 

597 self._get_packed_operation(operation)) 

598 else: 

599 message = (None, self._get_packed_operation(operation)) 

600 return message