Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/bots/instance.py: 85.31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

245 statements  

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License' is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Bots instance 

18============= 

19 

20The Bots service is responsible for assigning work to the various worker 

21(sometimes "bot") machines connected to the grid. These workers communicate 

22with the Bots service using the `Remote Workers API`_. 

23 

24Like the other BuildGrid services, the Bots service uses the concept of 

25instance names to contain specific sets of workers. These names could refer 

26to (e.g.) a project-specific BuildGrid instance as part of a wider managed 

27deployment. 

28 

29These instance names are mapped to instances of the ``BotsInstance`` class, 

30which implements the actual functionality of the Bots service. It's 

31responsible for selecting work and assigning it to a connected capable 

32worker, as well as constructing state updates based on messages from the 

33workers and publishing those updates for the other grid services to consume. 

34 

35The ``BotsInstance`` is also responsible for keeping track of work that 

36should be cancelled, and making sure that any workers it finds doing that 

37work are informed of the cancellation. 

38 

39.. _Remote Workers API: https://github.com/googleapis/googleapis/tree/master/google/devtools/remoteworkers/v1test2 

40 

41""" 

42 

43 

44from datetime import datetime, timedelta 

45import logging 

46from queue import Queue 

47from threading import Event, Lock, TIMEOUT_MAX 

48from typing import Callable, Dict, Optional, Set, TYPE_CHECKING, Tuple 

49import uuid 

50 

51from google.protobuf.any_pb2 import Any 

52from google.protobuf.timestamp_pb2 import Timestamp 

53import grpc 

54import pika # type: ignore 

55 

56from buildgrid._enums import LeaseState, OperationStage 

57from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

58 Action, ActionResult, Digest, ExecuteOperationMetadata, ExecuteResponse) 

59from buildgrid._protos.buildgrid.v2.messaging_pb2 import BotStatus, Job, RetryableJob, UpdateOperations 

60from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession, Lease 

61from buildgrid._protos.google.longrunning.operations_pb2 import Operation 

62from buildgrid._types import GrpcTrailingMetadata 

63from buildgrid.client.channel import setup_channel 

64from buildgrid.server.lru_inmemory_cache import LruInMemoryCache 

65from buildgrid.server.rabbitmq._enums import Exchanges 

66from buildgrid.server.rabbitmq.pika_consumer import (RetryingPikaConsumer, QueueBinding) 

67from buildgrid.server.rabbitmq.pika_publisher import RetryingPikaPublisher 

68from buildgrid.server.rabbitmq.utils import MessageSpec 

69from buildgrid.settings import NETWORK_TIMEOUT 

70from buildgrid.utils import combinations_with_unique_keys, create_digest, flatten_capabilities 

71 

72 

73if TYPE_CHECKING: 

74 from buildgrid._app.settings.rmq_parser import RabbitMqConnection 

75 from buildgrid.server.rabbitmq.server import RMQServer 

76 

77_state_to_stage = { 

78 LeaseState.UNSPECIFIED: OperationStage.UNKNOWN, 

79 LeaseState.PENDING: OperationStage.EXECUTING, 

80 LeaseState.ACTIVE: OperationStage.EXECUTING, 

81 LeaseState.COMPLETED: OperationStage.COMPLETED, 

82 LeaseState.CANCELLED: OperationStage.COMPLETED 

83} 

84 

85 

86class _JobAssignmentRequest: 

87 

88 """Class to orchestrate a worker requesting a Job. 

89 

90 Used to coordinate passing a Job from a PikaConsumer callback to a 

91 gRPC thread trying to assign work to a BotSession. 

92 

93 """ 

94 

95 def __init__(self, ttl: float=0): 

96 """Instantiate a new _JobAssignmentRequest. 

97 

98 Args: 

99 ttl (float): The time-to-live of this assignment request. The 

100 request will expire ``ttl`` seconds after instantiation. 

101 If this TTL is set to 0 then the request will never expire. 

102 """ 

103 self._event = Event() 

104 self._expired = False 

105 self._init_time = datetime.utcnow() 

106 self._job: Optional[Job] = None 

107 self._lock = Lock() 

108 ttl = min(TIMEOUT_MAX, ttl) 

109 self._ttl = timedelta(seconds=ttl) 

110 

111 def _is_expired(self) -> bool: 

112 """Return whether or not this assignment request has expired. 

113 

114 An assignment request has expired if a number of seconds equivalent to 

115 the request's TTL have passed since it was instantiated. 

116 

117 If the request's TTL is set to ``0``, then the request never expires. 

118 

119 Returns: 

120 ``True`` if the assignment request has expired, 

121 ``False`` otherwise. 

122 

123 """ 

124 ttl_expired = False 

125 if self._ttl.seconds > 0: 

126 ttl_expired = datetime.utcnow() > self._init_time + self._ttl 

127 return ttl_expired or self._expired 

128 

129 def expire(self) -> None: 

130 """Explicitly expire this assignment request, regardless of TTL. 

131 

132 This function marks the assignment request as expired, preventing 

133 assignment of work even within the TTL defined at construction time. 

134 

135 """ 

136 self._expired = True 

137 

138 def assign_job(self, job: Job) -> bool: 

139 """Attempt to notify a worker that it has been assigned the given Job. 

140 

141 This method attempts to set ``self._job`` to the given Job and then 

142 set the internal event to wake up any threads that have called 

143 ``self.wait_for_assignment`` and provide the assigned Job to them. 

144 

145 If this method has already been called (ie. a Job has already been 

146 assigned to the worker) or the assignment request has expired then 

147 nothing happens. 

148 

149 This method returns True if it was able to assign the Job, and False 

150 otherwise. 

151 

152 Args: 

153 job (Job): The Job message to assign to the worker. 

154 

155 Returns: 

156 ``True`` if ``job`` was successfully assigned, 

157 ``False`` otherwise. 

158 

159 """ 

160 with self._lock: 

161 if self._job is None and not self._is_expired(): 

162 self._job = job 

163 self._event.set() 

164 return True 

165 return False 

166 

167 def wait_for_assignment(self, timeout: Optional[float]=None) -> Optional[Job]: 

168 """Wait for a Job to be assigned, and return the assigned Job. 

169 

170 Args: 

171 timeout (float): The number of seconds to wait for a Job 

172 assignment to occur. Capped to ``threading.TIMEOUT_MAX``. 

173 

174 Returns: 

175 The ``Job`` message if one was assigned before the timeout. 

176 ``None`` otherwise. 

177 

178 """ 

179 if timeout is not None: 

180 timeout = min(TIMEOUT_MAX, timeout) 

181 self._event.wait(timeout=timeout) 

182 return self._job 

183 

184 

185class BotsInstance: 

186 

187 """An instance of the Bots service. 

188 

189 This class handles all the BotSession management specified in the Remote 

190 Workers API, getting work from the queues and handing it to capable workers, 

191 and relaying updates on the state of Jobs from workers back to the other 

192 BuildGrid services. 

193 

194 """ 

195 

196 def __init__( 

197 self, 

198 rabbitmq: "RabbitMqConnection", 

199 platform_queues: Dict[str, Set[str]], 

200 logstream_url: Optional[str]=None, 

201 logstream_credentials: Optional[grpc.ChannelCredentials]=None, 

202 logstream_instance_name: Optional[str]=None, 

203 max_publish_attempts: int=0, 

204 max_connection_attempts: int=0, 

205 max_cancellation_cache_capacity: int=100000 

206 ): 

207 """Instantiate a new BotsInstance. 

208 

209 Args: 

210 rabbitmq (RabbitMQConnection): The RabbitMQ connection information 

211 that this BotsInstance should use. 

212 platform_queues (dict): Mapping of platform properties 

213 (represented by a semicolon-separated string) to a 

214 list of RabbitMQ queues that can contain work meant 

215 to be executed on such a platform. 

216 logstream_channel (grpc.Channel): The gRPC channel to use to create 

217 a new LogStream for streaming stdout and stderr for a Job. 

218 logstream_instance_name (str): The instance name of the remote 

219 LogStream service. 

220 max_publish_attempts (int): The maximum number of times to attempt 

221 to publish a RabbitMQ message before giving up. 

222 max_connection_attempts (int): The maximum number of times to attempt 

223 to reestablish a Consumer connection 

224 max_cancellation_cache_capacity (int): The maximum number of jobID cache entries to store 

225 

226 """ 

227 self._active_queues: Set[str] = set() 

228 self._active_queues_lock = Lock() 

229 self._logger = logging.getLogger(__name__) 

230 self._logstream_url = logstream_url 

231 self._logstream_credentials = logstream_credentials 

232 self._logstream_instance_name = logstream_instance_name 

233 self._logstream_channel = None 

234 self._rabbitmq_connection_info = rabbitmq 

235 self._platform_queues = platform_queues 

236 self._cancellation_cache = LruInMemoryCache(max_cancellation_cache_capacity) 

237 self._stopped = False 

238 self._worker_map: Dict[str, "Queue[_JobAssignmentRequest]"] = {} 

239 self._bot_name_to_assignment_request: Dict[str, _JobAssignmentRequest] = {} 

240 

241 self._instance_name = None 

242 

243 params = pika.ConnectionParameters( 

244 self._rabbitmq_connection_info.address, 

245 self._rabbitmq_connection_info.port, 

246 self._rabbitmq_connection_info.virtual_host 

247 ) 

248 if self._rabbitmq_connection_info.credentials: 

249 params.credentials = self._rabbitmq_connection_info.credentials 

250 self._publisher = RetryingPikaPublisher( 

251 params, 

252 thread_name="BotsInstancePublisher", 

253 max_publish_attempts=max_publish_attempts, 

254 exchanges={ 

255 Exchanges.BOT_STATUS.value.name: Exchanges.BOT_STATUS.value.type 

256 } 

257 ) 

258 

259 # Set up a rabbitmq consumer for cancellation messages 

260 self._cancellation_queue_name = f"cancellation-queue-{uuid.uuid4()}" 

261 exchange_dict = { 

262 Exchanges.JOB_CANCELLATION.value.name: Exchanges.JOB_CANCELLATION.value.type, 

263 Exchanges.JOBS.value.name: Exchanges.JOBS.value.type 

264 } 

265 self._bindings = set([ 

266 QueueBinding( 

267 queue=self._cancellation_queue_name, 

268 exchange=Exchanges.JOB_CANCELLATION.value.name, 

269 routing_key='', 

270 auto_delete_queue=True 

271 ) 

272 ]) 

273 for platform, queues in self._platform_queues.items(): 

274 for queue in queues: 

275 self._bindings.add(QueueBinding( 

276 queue=queue, 

277 exchange=Exchanges.JOBS.value.name, 

278 routing_key=platform, 

279 auto_delete_queue=False 

280 )) 

281 if queue not in self._worker_map: 

282 self._worker_map[queue] = Queue() 

283 

284 self._consumer = RetryingPikaConsumer(params, 

285 exchanges=exchange_dict, 

286 bindings=self._bindings, 

287 max_connection_attempts=max_connection_attempts, 

288 retry_delay_base=1) 

289 

290 self._consumer.subscribe(self._cancellation_queue_name, self._cancel_msg_callback) 

291 self._logger.info(f"Subscribed to cancellation queue [{self._cancellation_queue_name}]") 

292 

293 def register_instance_with_server( 

294 self, 

295 instance_name: str, 

296 server: 'RMQServer' 

297 ) -> None: 

298 """Names and registers the bots interface with a given server. 

299 

300 Args: 

301 instance_name (str): The instance name to set for this instance when 

302 registering it with the server. 

303 server (RMQServer): The server to register this instance with. 

304 

305 """ 

306 if self._instance_name is None: 

307 server.add_bots_instance(self, instance_name) 

308 self._instance_name = instance_name 

309 else: 

310 raise AssertionError("Instance already registered") 

311 

312 def setup_grpc(self): 

313 if self._logstream_channel is None and self._logstream_url is not None: 

314 self._logstream_channel, _ = setup_channel( 

315 self._logstream_url, 

316 auth_token=None, 

317 client_key=self._logstream_credentials.get("tls-client-key"), 

318 client_cert=self._logstream_credentials.get("tls-client-cert"), 

319 server_cert=self._logstream_credentials.get("tls-server-cert") 

320 ) 

321 

322 def _publish_bot_status(self, bot_session: BotSession) -> None: 

323 """Send a BotStatus message to the RabbitMQ publisher thread. 

324 

325 This method generates a BotStatus message to publish on the BotStatus 

326 exchange. This message informs the services that consume it that the 

327 given ``BotSession`` has been used in a Create/UpdateBotSession request 

328 and is therefore still active. 

329 

330 This message gets put on the BotsInstance's internal queue that the 

331 publisher thread uses to get messages to publish to RabbitMQ. 

332 

333 Args: 

334 bot_session (BotSession): The BotSession to send a status update for. 

335 

336 """ 

337 assignments = [] 

338 for lease in bot_session.leases: 

339 action = Action() 

340 lease.payload.Unpack(action) 

341 

342 # TODO: Set routing_key here, this only needs to be set on the 

343 # first update for a Job, so can maybe be special-cased at job assignment 

344 # time rather than needing to duplicate any of the Execution service 

345 # config for wildcard properties 

346 assignments.append(RetryableJob(job_id=lease.id, action=action)) # type: ignore 

347 

348 now = Timestamp() 

349 now.GetCurrentTime() 

350 bot_status = BotStatus( 

351 bot_name=bot_session.name, 

352 assignments=assignments, 

353 connection_timestamp=now 

354 ) 

355 spec = MessageSpec( 

356 exchange=Exchanges.BOT_STATUS.value, 

357 payload=bot_status.SerializeToString() 

358 ) 

359 self._publisher.send(spec) 

360 

361 def _publish_operation_status(self, operation: Operation, 

362 operation_name: str, 

363 cacheable: bool, 

364 stage: OperationStage) -> None: 

365 """Send an UpdateOperations message to the RabbitMQ publisher 

366 thread. 

367 

368 This method generates an UpdateOperations message to publish on 

369 the Operations exchange. This will allow Execution services to 

370 relay relevant updates to clients, ActionCaches to store the 

371 results of completed work, and Operations services to persist 

372 the change of state. 

373 

374 This message gets put on the BotsInstance's internal queue that the 

375 publisher thread uses to get messages to publish to RabbitMQ. 

376 

377 Args: 

378 operation (google.longrunning.operations_pb2): The 

379 Operation to send a status update for. 

380 cacheable (bool): whether the job can be cached. 

381 stage (OperationStage): operation's new stage. 

382 

383 

384 """ 

385 update_operations = UpdateOperations(job_id=operation_name, 

386 operation_state=operation, 

387 cacheable=cacheable) # type: ignore 

388 any_wrapper = Any() 

389 any_wrapper.Pack(update_operations) 

390 

391 if self._instance_name is None: 

392 raise AssertionError("Instance is unnamed") 

393 

394 if self._instance_name: 

395 routing_key = f"{stage.value}.{self._instance_name}" 

396 else: 

397 routing_key = f"{stage.value}" 

398 

399 spec = MessageSpec( 

400 exchange=Exchanges.OPERATION_UPDATES.value, 

401 payload=any_wrapper.SerializeToString(), 

402 routing_key=routing_key 

403 ) 

404 self._logger.debug(f"Publishing update for operation '{operation_name}' " 

405 f"(stage={stage}, cacheable={cacheable}, " 

406 f"routing key='{routing_key}'") 

407 self._publisher.send(spec) 

408 

409 def start(self): 

410 """Prepare the BotsInstance for handling incoming requests. 

411 

412 This starts up the background threads needed for publishing/consuming 

413 RabbitMQ messages. 

414 

415 """ 

416 # If for some reason it wasn't already done, set up any gRPC objects. 

417 # `setup_grpc` is idempotent so its safe to do this more than once. 

418 self.setup_grpc() 

419 self._publisher.start() 

420 

421 def stop(self): 

422 """"Shutdown the BotsInstance cleanly. 

423 

424 This sets flags to tell the background threads to stop running, and waits 

425 for them to finish. 

426 

427 """ 

428 self._stopped = True 

429 

430 self._publisher.stop() 

431 self._consumer.stop() 

432 

433 def __del__(self): 

434 if not self._stopped: 

435 self.stop() 

436 

437 def _make_assignment_callback_for_queue( 

438 self, 

439 queue_name: str 

440 ) -> Callable: 

441 """Return a callback to handle an incoming Job message. 

442 

443 This callback is intended to handle Job messages consumed from the 

444 platform queues. It gets the list of workers available for the 

445 given queue, and attempts to assign the Job message being handled 

446 to one of them. 

447 

448 If the Job can't be assigned to any of the workers (e.g. there are 

449 no workers in the list for this queue, or all the workers already 

450 had work assigned by a different callback), then the Job message 

451 is NACKed and we unsubscribe from the queue. 

452 

453 Args: 

454 queue_name (str): The name of the queue that this callback is 

455 for. This is needed so that the callback can determine 

456 which workers can execute the Job. 

457 

458 Returns: 

459 A function which handles assigning a Job message to a worker. 

460 

461 """ 

462 def _assignment_callback(body: bytes, delivery_tag: str) -> None: 

463 job = Job() 

464 job.ParseFromString(body) 

465 assignment_queue = self._worker_map.get(queue_name, Queue()) 

466 assigned = False 

467 assignment_request = assignment_queue.get_nowait() 

468 while assignment_request is not None and not assigned: 

469 assigned = assignment_request.assign_job(job) 

470 if assigned: 

471 self._consumer.ack_message(delivery_tag) 

472 return 

473 assignment_request = assignment_queue.get_nowait() 

474 

475 # There are no workers left for this platform so we can't handle 

476 # this message (or subsequent messages) anymore. 

477 self._consumer.nack_message(delivery_tag) 

478 with self._active_queues_lock: 

479 # NOTE: This call can block for a while in situations where 

480 # RabbitMQ connectivity is bad. However, it needs to be inside 

481 # this lock to avoid a race condition between unsubscribe/subscribe 

482 # and/or discard/add calls. 

483 self._consumer.unsubscribe(queue_name) 

484 self._active_queues.discard(queue_name) 

485 

486 return _assignment_callback 

487 

488 def _get_queues_for_botsession(self, bot_session: BotSession) -> Set[str]: 

489 """Return the set of queue names that are usable by this BotSession. 

490 

491 Args: 

492 bot_session (BotSession): The BotSession whose capabilities should 

493 be matched to platform queues. 

494 

495 Returns: 

496 Set containing the queues which this BotSession is capable of 

497 consuming work from. 

498 

499 """ 

500 queues: Set[str] = set() 

501 capabilities: Dict[str, Set[str]] = {} 

502 if bot_session.worker.devices: 

503 primary_device = bot_session.worker.devices[0] 

504 for prop in primary_device.properties: 

505 if prop.key not in capabilities: 

506 capabilities[prop.key] = set() 

507 capabilities[prop.key].add(prop.value) 

508 

509 flattened_capabilities = flatten_capabilities(capabilities) 

510 for combination in combinations_with_unique_keys(flattened_capabilities, len(capabilities)): 

511 platforms = [f'{key}={value}' for key, value in sorted(combination, key=lambda i: i[0])] 

512 platform_string = ';'.join(platforms) 

513 queues.update(self._platform_queues.get(platform_string, [])) 

514 

515 return queues 

516 

517 def _ensure_consuming_from_queues(self, queues: Set[str]) -> None: 

518 """Ensure that the consumer is subscribed to the given queues. 

519 

520 This method checks that the consumer is subscribed to the given set 

521 of queues, and creates a callback and subscribes to any queues in 

522 the set that the consumer isn't subscribed to yet. 

523 

524 Args: 

525 queues (set): The queues that the consumer should be subscribed to. 

526 

527 """ 

528 for queue in queues: 

529 with self._active_queues_lock: 

530 if queue not in self._active_queues: 

531 callback = self._make_assignment_callback_for_queue(queue) 

532 self._active_queues.add(queue) 

533 # NOTE: This call can block for a while in situations where 

534 # RabbitMQ connectivity is bad. However, it needs to be inside 

535 # this lock to avoid a race condition between 

536 # unsubscribe/subscribe and/or discard/add calls. 

537 self._consumer.subscribe(queue, callback) 

538 

539 def _assign_work_to_bot_session( 

540 self, 

541 bot_session: BotSession, 

542 timeout: Optional[float]=None 

543 ) -> None: 

544 """Attempt to add a new Lease to the given BotSession. 

545 

546 This method creates a request to assign work to the provided 

547 BotSession, and puts that request into the internal data structure 

548 which maps platform/Job queues to work requests. 

549 

550 This method also ensures that the BotsInstance is consuming from all 

551 of the queues that can be serviced by the given BotSession, before 

552 waiting for up to ``timeout`` seconds for work to be assigned to the 

553 request. 

554 

555 If the request for work gets given a Job, then a Lease is created 

556 for that Job and appended to the list of Leases in the BotSession. 

557 

558 Args: 

559 bot_session (BotSession): The BotSession to attempt to assign 

560 work to. This BotSession's ``leases`` attribute will be 

561 modified in-place if work is assigned. 

562 timeout (float): Maximum number of seconds to wait for a Job 

563 to be assigned to the BotSession. 

564 

565 """ 

566 # Parse the worker config in the BotSession to determine the set of 

567 # queues that we can consume Jobs from for this worker. 

568 queues = self._get_queues_for_botsession(bot_session) 

569 

570 # Add the assignment Event wrapper to _worker_map for all the queues 

571 # served by this BotSession. 

572 assignment_ttl = 0.0 

573 if timeout is not None: 

574 assignment_ttl = timeout 

575 assignment = _JobAssignmentRequest(ttl=assignment_ttl) 

576 self._bot_name_to_assignment_request[bot_session.name] = assignment 

577 for queue in queues: 

578 self._worker_map[queue].put(assignment) 

579 

580 # Register callbacks for any queues we aren't already consuming 

581 # from. This needs to be done **after** we've put the 

582 # _JobAssignmentRequest in the _worker_map for the queue, so that 

583 # the on_message callback can actually assign work. 

584 self._ensure_consuming_from_queues(queues) 

585 

586 # Get the Job that has been assigned, waiting for up to ``timeout`` 

587 # seconds for the assignment to happen. If we get a Job, create a 

588 # Lease to represent it in a way the worker can understand, and add 

589 # it to the BotSession. 

590 job = assignment.wait_for_assignment(timeout) 

591 if job is not None: 

592 lease = Lease( 

593 id=job.job_id, 

594 state=LeaseState.PENDING.value 

595 ) 

596 lease.payload.Pack(job.action) 

597 bot_session.leases.append(lease) 

598 

599 def expire_assignment_for_bot_name(self, bot_name: str): 

600 """Expire any pending Job request for the given bot name. 

601 

602 This should be called when a worker disconnects for whatever reason, 

603 to ensure that we don't accidentally assign a Job to a worker who 

604 isn't around to accept the work. 

605 

606 Args: 

607 bot_name (str): The server-assigned BotSession name that 

608 needs its assignment request expiring. 

609 

610 """ 

611 assignment = self._bot_name_to_assignment_request.get(bot_name) 

612 if assignment is not None: 

613 assignment.expire() 

614 

615 def create_bot_session( 

616 self, 

617 parent: str, 

618 bot_session: BotSession, 

619 time_remaining: float 

620 ) -> BotSession: 

621 """Start a new BotSession with this instance. 

622 

623 This method takes a ``BotSession`` and sets the initial server-assigned 

624 fields, notably the server-assigned BotSession name. It also attempts to 

625 assign a Job to the worker in the process. 

626 

627 Args: 

628 parent (str): The parent part of the server-assigned name. This 

629 should be the instance name of this ``BotsInstance``. 

630 bot_session (BotSession): The initial state of the ``BotSession`` 

631 to be created. 

632 

633 """ 

634 name = f"{parent}/{str(uuid.uuid4())}" 

635 bot_session.name = name 

636 

637 # TODO: 

638 # 1. If work was assigned, publish an UpdateOperations message 

639 

640 assignment_timeout = time_remaining - NETWORK_TIMEOUT 

641 self._assign_work_to_bot_session(bot_session, timeout=assignment_timeout) 

642 self._publish_bot_status(bot_session) 

643 

644 return bot_session 

645 

646 def update_bot_session( 

647 self, 

648 name: str, 

649 bot_session: BotSession, 

650 time_remaining: float 

651 ) -> Tuple[BotSession, GrpcTrailingMetadata]: 

652 """Update an existing BotSession. 

653 

654 This method serves a few purposes. It both handles updates that the 

655 worker has made to the BotSession, as well as updating the relevant 

656 server-assigned fields to inform the worker of changes. These changes 

657 are things like the Job being cancelled, or new work being assigned 

658 to a worker which has spare capacity. 

659 

660 This method is also used to decide worker health, with a message 

661 being published whenever this method is called to announce that a 

662 specific BotSession has been seen. 

663 

664 Args: 

665 name (str): The name of the BotSession being updated. 

666 bot_session (BotSession): The BotSession which needs to be 

667 updated and have any worker-assigned updates handled. 

668 time_remaining (float): How long is left to handle this 

669 BotSession before the gRPC connection times out. This 

670 is used to decide how long to wait when looking for new 

671 work to assign. 

672 

673 """ 

674 

675 # TODO: 

676 # 1. Publish UpdateOperations message with new job state (if any) 

677 # 2. Attempt to assign new work if the session has no jobs left 

678 # 2(b). Publish UpdateOperations message if work was assigned 

679 

680 # Lookup the lease.id in our jobID cache and if found, set the state 

681 # of the lease to CANCELLED 

682 for lease in bot_session.leases: 

683 if self._cancellation_cache.get(lease.id) is not None: 

684 self._logger.debug(f"Found job={lease.id} in cancellation cache, setting state to CANCELLED") 

685 lease.state = LeaseState.CANCELLED.value 

686 

687 # If there are no leases assigned to the worker, try to give it some 

688 # more work. 

689 if len(bot_session.leases) == 0: 

690 assignment_timeout = time_remaining - NETWORK_TIMEOUT 

691 self._assign_work_to_bot_session(bot_session, timeout=assignment_timeout) 

692 

693 self._publish_bot_status(bot_session) 

694 

695 for lease in bot_session.leases: 

696 self._send_operation_update(lease) 

697 

698 return bot_session, () 

699 

700 def _cancel_msg_callback(self, body: bytes, delivery_tag: str) -> None: 

701 self._consumer.ack_message(delivery_tag) 

702 cancelled_job_id = body.decode() 

703 

704 # Create a cache of all lease ids so we can keep track of which jobs need to be cancelled 

705 if self._cancellation_cache.get(cancelled_job_id) is None: 

706 self._cancellation_cache.update(cancelled_job_id, cancelled_job_id) 

707 

708 def _send_operation_update(self, lease: Lease): 

709 """Issue an Operation update to the 

710 ``buildgrid.server.rabbitmq._enums.Exchanges.OPERATION_UPDATES`` exchange. 

711 """ 

712 stage = _state_to_stage[LeaseState(lease.state)] 

713 operation, operation_name, cacheable = self._construct_operation(lease) 

714 # TODO: Check state transition to determine whether to send the update 

715 self._publish_operation_status(operation, operation_name, cacheable, stage) 

716 

717 def _construct_operation(self, lease: Lease) -> Tuple[Operation, str, bool]: 

718 """Get the ``Operation`` for the ``Lease``. 

719 Returns a tuple with the Operation message, the operation name, 

720 and whether the action is cacheable. 

721 """ 

722 # Unpack the Action to calculate the digest (the operation name) 

723 action = Action() 

724 lease.payload.Unpack(action) 

725 action_digest = create_digest(action.SerializeToString()) 

726 action_is_cacheable = not action.do_not_cache 

727 

728 operation_name = action_digest.hash 

729 operation = Operation(name=operation_name) 

730 

731 lease_state = LeaseState(lease.state) 

732 operation.done = lease_state in (LeaseState.COMPLETED, LeaseState.CANCELLED) 

733 

734 execute_response = self._construct_execute_response(lease) 

735 operation.response.Pack(execute_response) 

736 

737 metadata = self._construct_operation_metadata(lease, action_digest) 

738 operation.metadata.Pack(metadata) 

739 

740 return operation, operation_name, action_is_cacheable 

741 

742 def _construct_execute_response(self, lease: Lease) -> ExecuteResponse: 

743 """Get the ``ExecuteResponse`` message for the ``Lease``.""" 

744 response = ExecuteResponse() 

745 response.status.CopyFrom(lease.status) 

746 if lease.status == LeaseState.CANCELLED.value: 

747 response.status.message = "Operation cancelled by client." 

748 

749 result = ActionResult() 

750 if lease.result is not None and lease.result.Is(result.DESCRIPTOR): 

751 lease.result.Unpack(result) 

752 response.result.CopyFrom(result) 

753 

754 # TODO: timestamps 

755 

756 return response 

757 

758 def _construct_operation_metadata( 

759 self, lease: Lease, action_digest: Digest) -> ExecuteOperationMetadata: 

760 """Given a ``Lease`` and the digest of its ``Action``, return 

761 the associated ``ExecuteOperationMetadata`` message.""" 

762 metadata = ExecuteOperationMetadata() 

763 metadata.stage = _state_to_stage[LeaseState(lease.state)].value 

764 metadata.action_digest.CopyFrom(action_digest) 

765 return metadata