Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/instance.py: 83.38%

331 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17BotsInterface 

18================= 

19 

20Instance of the Remote Workers interface. 

21""" 

22import asyncio 

23from collections import OrderedDict 

24from datetime import datetime, timedelta 

25import logging 

26from threading import Lock 

27from typing import Dict, Optional, Set, Tuple 

28import uuid 

29 

30from buildgrid._enums import BotStatus, LeaseState 

31from buildgrid._exceptions import ( 

32 InvalidArgumentError, NotFoundError, BotSessionClosedError, 

33 UnknownBotSessionError, BotSessionMismatchError, DuplicateBotSessionError 

34) 

35from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

36from buildgrid.server.metrics_names import ( 

37 BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, 

38 BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME 

39) 

40from buildgrid.server.metrics_utils import DurationMetric 

41from buildgrid.server.persistence.interface import DataStoreInterface 

42from buildgrid.server.scheduler import Scheduler 

43from buildgrid.settings import NETWORK_TIMEOUT 

44 

45 

46class BotsInterface: 

47 

48 def __init__(self, data_store: DataStoreInterface, *, action_cache=None, 

49 bot_session_keepalive_timeout: Optional[int]=None, 

50 permissive_bot_session: Optional[bool]=None, 

51 logstream_url: Optional[str]=None, 

52 logstream_credentials=None, 

53 logstream_instance_name: Optional[str]=None): 

54 self.__logger = logging.getLogger(__name__) 

55 

56 self._scheduler = Scheduler(data_store, action_cache, 

57 logstream_url=logstream_url, 

58 logstream_credentials=logstream_credentials, 

59 logstream_instance_name=logstream_instance_name) 

60 self._instance_name = None 

61 

62 # Mapping of bot_session.name -> bot_session.id 

63 self._bot_ids: Dict[str, str] = {} 

64 

65 # Mapping of bot_session.status -> set(bot_session.id, ...) 

66 self._bot_name_by_status: Dict[BotStatus, Set] = {bot_status: set() for bot_status in BotStatus} 

67 self._bot_name_by_status_lock = Lock() 

68 

69 self._assigned_leases: Dict[str, str] = {} 

70 self._assigned_leases_lock = Lock() 

71 

72 self._bot_session_keepalive_timeout = bot_session_keepalive_timeout 

73 bot_session_reaper_started = self._setup_bot_session_reaper_loop() 

74 

75 self._permissive_bot_session = permissive_bot_session 

76 if bot_session_reaper_started and permissive_bot_session: 

77 self.__logger.warning( 

78 "Both BotSession reaper and Permissive BotSession mode are enabled." 

79 "If the DNS configuration is not resulting in 'sticky sessions' with " 

80 "bots talking to the same BuildGrid process (unless unhealthy), " 

81 "the BotSession reaper from other processes may cancel and re-queue " 

82 "ongoing leases. Please refer to the documentation for more information." 

83 ) 

84 

85 # Ordered mapping of bot_session_name: string -> last_expire_time_we_assigned: datetime 

86 # NOTE: This works because the bot_session_keepalive_timeout is the same for all bots 

87 # and thus always increases with time (e.g. inserting at the end keeps them sorted because 

88 # of this property, otherwise we may have had to insert 'in the middle') 

89 self._ordered_expire_times_by_botsession: Dict[str, datetime] = OrderedDict() 

90 # The "minimum" expire_time we have coming up 

91 self._next_expire_time = None 

92 # The Event to set when we learn about a new expire time that is at a different point in the 

93 # future than what we knew (e.g. whenever we reset the value of self._next_expire_time) 

94 # This is mostly useful when we end up with a `next_expire_time` closer to the future than we 

95 # initially thought (e.g. tracking the first BotSession expiry since all BotSessions are assigned 

96 # the same keepalive_timeout). 

97 # NOTE: asyncio.Event() is NOT thread-safe. 

98 # However, here we .set() it from the ThreadPool threads handling RPC requests 

99 # and only clearing it from the asyncio event loop which the `reaper_loop`. 

100 self._deadline_event = asyncio.Event() 

101 

102 # Remembering the last n evicted_bot_sessions so that we can present the appropriate 

103 # messages if they ever get back. (See additional notes in `_close_bot_session`). 

104 self._remember_last_n_evicted_bot_sessions = 1000 

105 # Maps bot_session_name: string to (eviction_time: datetime, reason: string), with a maximum size 

106 # of approx `_remeber_last_n_evicted_bot_sessions`. 

107 self._evicted_bot_sessions: Dict[str, Tuple[datetime, str]] = OrderedDict() 

108 

109 # --- Public API --- 

110 

111 @property 

112 def instance_name(self): 

113 return self._instance_name 

114 

115 @property 

116 def scheduler(self): 

117 return self._scheduler 

118 

119 def setup_grpc(self): 

120 self._scheduler.setup_grpc() 

121 

122 def register_instance_with_server(self, instance_name, server): 

123 """Names and registers the bots interface with a given server.""" 

124 if self._instance_name is None: 

125 server.add_bots_interface(self, instance_name) 

126 

127 self._instance_name = instance_name 

128 if self._scheduler is not None: 

129 self._scheduler.set_instance_name(instance_name) 

130 

131 else: 

132 raise AssertionError("Instance already registered") 

133 

134 @DurationMetric(BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, instanced=True) 

135 def create_bot_session(self, parent, bot_session): 

136 """ Creates a new bot session. Server should assign a unique 

137 name to the session. If a bot with the same bot id tries to 

138 register with the service, the old one should be closed along 

139 with all its jobs. 

140 """ 

141 if not bot_session.bot_id: 

142 raise InvalidArgumentError("Bot's id must be set by client.") 

143 

144 try: 

145 self._check_bot_ids(bot_session.bot_id) 

146 except DuplicateBotSessionError: 

147 pass 

148 

149 # Bot session name, selected by the server 

150 name = f"{parent}/{str(uuid.uuid4())}" 

151 bot_session.name = name 

152 

153 self._track_bot_session(name, bot_session.bot_id) 

154 

155 self._request_leases(bot_session, name=name) 

156 self._assign_deadline_for_botsession(bot_session, name) 

157 

158 self.__logger.info( 

159 f"Opened BotSession name=[{bot_session.name}] for bot_id=[{bot_session.bot_id}].") 

160 leases = ",".join(lease.id[:8] for lease in bot_session.leases) 

161 self.__logger.debug(f"Leases assigned to newly opened BotSession name=[{bot_session.name}] " 

162 f"for bot_id=[{bot_session.bot_id}]: [{leases}].") 

163 

164 self._update_status_count_for_bot_session(bot_session) 

165 return bot_session 

166 

167 def _track_bot_session(self, bot_session_name, bot_id, leases=()): 

168 self.__logger.debug( 

169 f"Now tracking BotSession name=[{bot_session_name}] " 

170 f"for bot_id=[{bot_id}] with leases=[{leases}]") 

171 

172 self._bot_ids[bot_session_name] = bot_id 

173 # We want to keep a copy of lease ids we have assigned 

174 leases_set = set(leases) 

175 with self._assigned_leases_lock: 

176 self._assigned_leases[bot_session_name] = leases_set 

177 

178 @DurationMetric(BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME, instanced=True) 

179 def update_bot_session(self, name, bot_session, deadline=None): 

180 """ Client updates the server. Any changes in state to the Lease should be 

181 registered server side. Assigns available leases with work. 

182 """ 

183 try: 

184 self._check_bot_ids(bot_session.bot_id, name) 

185 except (UnknownBotSessionError, BotSessionClosedError): 

186 if self._permissive_bot_session: 

187 # If in permissive mode, implicitly reopen botsession 

188 self.__logger.info( 

189 f"BotSession with bot_name=[{bot_session.name}] and bot_id=[{bot_session.bot_id}] " 

190 f"is now talking to this server instance. Confirming lease accuracy..." 

191 ) 

192 

193 lease_ids = [lease.id for lease in bot_session.leases] 

194 self._track_bot_session(name, bot_session.bot_id, lease_ids) 

195 

196 self.__logger.info( 

197 f"Successfully relocated BotSession with bot_name=[{bot_session.name}] " 

198 f"and bot_id=[{bot_session.bot_id}] with leases=[{lease_ids}].") 

199 else: 

200 # Default behavior is to raise those exceptions 

201 # and handle them accordingly at a higher level 

202 self.__logger.warning( 

203 f"Unknown bot with bot_name=[{bot_session.name}] and " 

204 f"bot_id=[{bot_session.bot_id}]; permissive BotSession mode disabled.") 

205 raise 

206 

207 self._check_assigned_leases(bot_session) 

208 

209 # Stop tracking the prior deadline since we have heard back 

210 # by the deadline we had announced, now we're going to prepare 

211 # a new BotSession for the bot and once done assign a new deadline. 

212 self._untrack_deadline_for_botsession(bot_session.name) 

213 

214 # Go over all the leases the bot sent us in this UpdateBotSession request 

215 lease_removed = False 

216 for lease in list(bot_session.leases): 

217 # See if we need to tell the bot about a lease update (e.g. execution-service side cancellation) 

218 # and whether we need to update our datastore (e.g. bot finished the lease) 

219 lease_to_send_to_bot, update_datastore_from_lease = self._check_lease_state(lease) 

220 

221 if update_datastore_from_lease: 

222 try: 

223 self._scheduler.update_job_lease_state(lease.id, lease) 

224 except NotFoundError: 

225 # Lease no longer exists or is already completed, be sure it gets removed from the bot 

226 self.__logger.debug("Lease Id not found when updating job lease state, removing from bot.") 

227 lease_to_send_to_bot = None 

228 

229 if lease_to_send_to_bot: 

230 # Replace old lease with updated lease (e.g. communicating cancellation) 

231 # only if there are changes (otherwise the previous lease/state is kept) 

232 if lease_to_send_to_bot != lease: 

233 bot_session.leases.remove(lease) 

234 bot_session.leases.append(lease_to_send_to_bot) 

235 else: 

236 # We want to remove the lease from this bot and update our records accordingly 

237 # For example the bot has completed the lease (and results were stored above), or, 

238 # the bot had to cancel 

239 with self._assigned_leases_lock: 

240 if name in self._assigned_leases: 

241 try: 

242 self._assigned_leases[name].remove(lease.id) 

243 self.__logger.debug(f"Removed lease id=[{lease.id}] from bot=[{name}]") 

244 except KeyError: 

245 self.__logger.info(f"Lease id=[{lease.id}] already removed from bot=[{name}]") 

246 bot_session.leases.remove(lease) 

247 lease_removed = True 

248 

249 # Don't set a deadline if leases were removed from this bot session 

250 # 

251 # Setting a "None" deadline has the effect of only assigning work if 

252 # it is available immediately, rather than waiting a period of time for 

253 # work to become available 

254 # 

255 # This mitigates situations where the scheduler is updated with the new 

256 # state of the lease, but a fault thereafter causes the worker to retry 

257 # the old UpdateBotSession call 

258 if lease_removed: 

259 deadline = None 

260 

261 self._request_leases(bot_session, deadline, name) 

262 

263 metadata = self._scheduler.get_metadata_for_leases( 

264 bot_session.leases, writeable_streams=True) 

265 

266 # Assign a new deadline to the BotSession 

267 self._assign_deadline_for_botsession(bot_session, name) 

268 

269 leases = ",".join(lease.id[:8] for lease in bot_session.leases) 

270 self.__logger.debug(f"Sending BotSession update for name=[{bot_session.name}], " 

271 f"bot_id=[{bot_session.bot_id}]: leases=[{leases}].") 

272 

273 self._update_status_count_for_bot_session(bot_session) 

274 return bot_session, metadata 

275 

276 def count_bots(self) -> int: 

277 return len(self._bot_ids) 

278 

279 def count_bots_by_status(self, status: BotStatus) -> int: 

280 return len(self._bot_name_by_status[status]) 

281 

282 # --- Private API --- 

283 def _request_leases(self, bot_session, deadline=None, name=None): 

284 # Only send one lease at a time currently. 

285 if bot_session.status == BotStatus.OK.value and not bot_session.leases: 

286 worker_capabilities = {} 

287 

288 # TODO? Fail if there are no devices in the worker? 

289 if bot_session.worker.devices: 

290 # According to the spec: 

291 # "The first device in the worker is the "primary device" - 

292 # that is, the device running a bot and which is 

293 # responsible for actually executing commands." 

294 primary_device = bot_session.worker.devices[0] 

295 

296 for device_property in primary_device.properties: 

297 if device_property.key not in worker_capabilities: 

298 worker_capabilities[device_property.key] = set() 

299 worker_capabilities[device_property.key].add(device_property.value) 

300 self.__logger.debug(f"New Workers Capabilities: {worker_capabilities}") 

301 

302 # If the client specified deadline is less than NETWORK_TIMEOUT, 

303 # the response shouldn't long poll for work. 

304 if deadline and (deadline > NETWORK_TIMEOUT): 

305 deadline = deadline - NETWORK_TIMEOUT 

306 else: 

307 deadline = None 

308 

309 leases = self._scheduler.request_job_leases( 

310 worker_capabilities, 

311 timeout=deadline, 

312 worker_name=name, 

313 bot_id=self._bot_ids.get(name)) 

314 

315 if leases: 

316 with self._assigned_leases_lock: 

317 if bot_session.name in self._assigned_leases: 

318 for lease in leases: 

319 self._assigned_leases[bot_session.name].add(lease.id) 

320 bot_session.leases.extend(leases) 

321 else: 

322 # The BotSession may no longer exist, make sure the leases are re-queued! 

323 for lease in leases: 

324 self._scheduler.retry_job_lease(lease.id) 

325 self.__logger.info(f'BotSession name=[{name}] closed while trying to assign leases. ' 

326 f'Re-queued n=[{len(leases)}] leases=[{leases}].') 

327 raise BotSessionClosedError('BotSession closed while assigning leases. Re-queued leases.') 

328 

329 # Returns a Tuple[Optional[Lease], bool]: 

330 # (lease_to_send_to_bot, update_datastore_from_lease) 

331 def _check_lease_state(self, lease: bots_pb2.Lease) -> Tuple[Optional[bots_pb2.Lease], bool]: 

332 lease_state = LeaseState(lease.state) 

333 

334 try: 

335 current_lease = self._scheduler.get_job_lease(lease.id) 

336 # get_job_lease will only return active leases in sql 

337 # data-store, so handle if no lease was returned 

338 if current_lease is None: 

339 return (None, False) 

340 except NotFoundError: 

341 # Job does not exist, remove lease from bot 

342 return (None, False) 

343 

344 # If the lease was marked cancelled on the buildgrid side 

345 # inform the bot (update lease, no need to update bgd datastore) 

346 if current_lease.state == LeaseState.CANCELLED.value: 

347 lease.state = LeaseState.CANCELLED.value 

348 return (lease, False) 

349 

350 # If the lease is already completed on the buildgrid side 

351 # remove it from the bot 

352 if current_lease.state == LeaseState.COMPLETED.value: 

353 return (None, False) 

354 

355 if lease_state == LeaseState.COMPLETED: 

356 return (None, True) 

357 

358 return (lease, True) 

359 

360 def _get_bot_id_from_bot_name_or_raise(self, name): 

361 """ Returns the bot_id corresponding to the passed `name`. 

362 Raises BotSessionClosedError if the botsession was recently closed. 

363 Raises UnknownBotSessionError if there is no such known BotSession. 

364 """ 

365 bot_id = self._bot_ids.get(name) 

366 if bot_id is None: 

367 eviction_record = self._evicted_bot_sessions.get(name) 

368 if eviction_record: 

369 raise BotSessionClosedError(f'Server has recently evicted the BotSession name=[{name}] at ' 

370 f'timestamp=[{eviction_record[0]}], reason=[{eviction_record[1]}]') 

371 raise UnknownBotSessionError('Unknown BotSession. BuildGrid has not seen a ' 

372 f'BotSession with name=[{name}] recently.') 

373 return bot_id 

374 

375 def _check_bot_ids(self, bot_id, name=None): 

376 """ Checks whether the ID and the name of the bot match, 

377 otherwise closes the bot sessions with that name or ID 

378 """ 

379 if name is not None: 

380 _bot_id = self._get_bot_id_from_bot_name_or_raise(name) 

381 if _bot_id != bot_id: 

382 self._close_bot_session(name, reason="bot_id mismatch between worker and bgd") 

383 raise BotSessionMismatchError( 

384 f'Mismatch between client supplied client_bot_id=[{bot_id}] and ' 

385 f'buildgrid record of bgd_bot_id=[{_bot_id}] for BotSession with name=[{name}].') 

386 else: 

387 for _name, _bot_id in self._bot_ids.items(): 

388 if bot_id == _bot_id: 

389 self._close_bot_session(_name, 

390 reason="Bot with same ID trying to create a new BotSession") 

391 raise DuplicateBotSessionError( 

392 f'Bot ID bot_id=[{bot_id}] already registered and given bgd_bot_name=[{_name}].') 

393 

394 def _assign_deadline_for_botsession(self, bot_session, bot_session_name): 

395 """ Assigns a deadline to the BotSession if bgd was configured to do so 

396 """ 

397 # Specify bot keepalive expiry time if timeout is set 

398 if self._bot_session_keepalive_timeout: 

399 # Calculate expire time 

400 expire_time_python = datetime.utcnow() + timedelta(seconds=self._bot_session_keepalive_timeout) 

401 

402 # Set it in the bot_session 

403 bot_session.expire_time.FromDatetime(expire_time_python) 

404 

405 # Keep track internally for the botsession reaper 

406 self._track_deadline_for_bot_session(bot_session_name, expire_time_python) 

407 

408 def _untrack_deadline_for_botsession(self, bot_session_name): 

409 """ Un-assigns the session reaper tracked deadline of the BotSession 

410 if bgd was configured to do so 

411 """ 

412 # Specify bot keepalive expiry time if timeout is set 

413 if self._bot_session_keepalive_timeout: 

414 self._track_deadline_for_bot_session(bot_session_name, None) 

415 

416 def _track_deadline_for_bot_session(self, bot_session_name, new_deadline): 

417 """ Updates the data structures keeping track of the last deadline 

418 we had assigned to this BotSession by name. 

419 When `new_deadline` is set to None, the deadline is unassigned. 

420 """ 

421 # Keep track of the next expire time to inform the watcher 

422 updated_next_expire_time = False 

423 

424 if new_deadline: 

425 # Since we're re-setting the update time for this bot, make sure to move it 

426 # to the end of the OrderedDict (if it was already tracked in the OrderedDict) 

427 try: 

428 self._ordered_expire_times_by_botsession.move_to_end(bot_session_name) 

429 except KeyError: 

430 pass 

431 

432 self._ordered_expire_times_by_botsession[bot_session_name] = new_deadline 

433 updated_next_expire_time = True 

434 else: 

435 try: 

436 if self._ordered_expire_times_by_botsession.pop(bot_session_name): 

437 updated_next_expire_time = True 

438 except KeyError: 

439 self.__logger.debug("Tried to un-assign deadline for bot_session_name=" 

440 f"[{bot_session_name}] but it had no deadline to begin with.") 

441 pass 

442 

443 # Make the botsession reaper thread look at the current new_deadline 

444 # (if it's nearer in the future) compared to the previously known `next_expire_time`. 

445 if updated_next_expire_time: 

446 if self._update_next_expire_time(compare_to=new_deadline): 

447 self._deadline_event.set() 

448 

449 def _check_assigned_leases(self, bot_session): 

450 """Makes sure that all the leases we knew of that were assigned to the bot 

451 are there, and automatically retries leases the bot may have dropped. 

452 """ 

453 session_lease_ids = [] 

454 

455 for lease in bot_session.leases: 

456 session_lease_ids.append(lease.id) 

457 

458 with self._assigned_leases_lock: 

459 if bot_session.name in self._assigned_leases: 

460 # In order to be able to remove leases while iterating 

461 # we need to get the values as a list and iterate over that 

462 # (python3 doesn't allow modifying containers during iteration) 

463 for lease_id in list(self._assigned_leases[bot_session.name]): 

464 if lease_id not in session_lease_ids: 

465 self.__logger.warning(f"Assigned lease id=[{lease_id}], " 

466 f"not found on bot with name=[{bot_session.name}] and " 

467 f"id=[{bot_session.bot_id}]. Retrying job. " 

468 "Did the bot crash and restart?") 

469 # Un-assign job from this botsession and let the scheduler handle it 

470 try: 

471 self._scheduler.retry_job_lease(lease_id) 

472 except NotFoundError: 

473 pass 

474 try: 

475 self._assigned_leases[bot_session.name].remove(lease_id) 

476 except KeyError: 

477 pass 

478 else: 

479 raise BotSessionClosedError(f"BotSession name=[{bot_session.name}] for bot_id=" 

480 f"[{bot_session.bot_id}] closed while checking leases.") 

481 

482 def _truncate_eviction_history(self): 

483 # Make sure we're only keeping the last N evicted sessions 

484 # NOTE: there could be some rare race conditions when the length of the OrderedDict is 

485 # only 1 below the limit; Multiple threads could check the size simultaneously before 

486 # they get to add their items in the OrderedDict, resulting in a size bigger than initially intented 

487 # (with a very unlikely upper bound of: 

488 # O(n) = `remember_last_n_evicted_bot_sessions` 

489 # + min(number_of_threads, number_of_concurrent_threads_cpu_can_handle)). 

490 # The size being only 1 below the limit could also happen when the OrderedDict contains 

491 # exactly `n` items and a thread trying to insert sees the limit has been reached and makes 

492 # just enough space to add its own item. 

493 # The cost of locking vs using a bit more memory for a few more items in-memory is high, thus 

494 # we opt for the unlikely event of the OrderedDict growing a bit more and 

495 # make the next thread which tries to to insert an item, clean up `while len > n`. 

496 while len(self._evicted_bot_sessions) > self._remember_last_n_evicted_bot_sessions: 

497 self._evicted_bot_sessions.popitem(last=False) 

498 

499 def _close_bot_session(self, name, *, reason): 

500 """ Before removing the session, close any leases and 

501 requeue with high priority. 

502 """ 

503 # If we had assigned an expire_time for this botsession, make sure to 

504 # clean up, regardless of the reason we end up closing this BotSession 

505 self._untrack_deadline_for_botsession(name) 

506 

507 retried_leases = 0 

508 total_leases = 0 

509 with self._assigned_leases_lock: 

510 if name in self._assigned_leases: 

511 total_leases = len(self._assigned_leases[name]) 

512 for lease_id in self._assigned_leases[name]: 

513 try: 

514 self._scheduler.retry_job_lease(lease_id) 

515 except NotFoundError: 

516 pass 

517 else: 

518 retried_leases += 1 

519 self._assigned_leases.pop(name) 

520 

521 self._truncate_eviction_history() 

522 # Record this eviction 

523 self._evicted_bot_sessions[name] = (datetime.utcnow(), reason) 

524 

525 try: 

526 bot_id = self._get_bot_id_from_bot_name_or_raise(name) 

527 self._bot_ids.pop(name) 

528 except (BotSessionMismatchError, DuplicateBotSessionError) as e: 

529 self.__logger.warning('Unable to identify `bot_id` associated with BotSession ' 

530 f'while closing the BotSession name=[{name}]: {e}') 

531 bot_id = 'unknown' 

532 

533 self._clear_status_count_for_bot_name(name) 

534 

535 self.__logger.info(f'Closed BotSession bot_id=[{bot_id}], name=[{name}], reason=[{reason}] ' 

536 f'and sucessfully requeued [{retried_leases}]/[{total_leases}] leases.') 

537 

538 def _update_next_expire_time(self, compare_to=None): 

539 """ 

540 If we don't have any more bot_session deadlines, clear out this variable 

541 to avoid busy-waiting. Otherwise, populate it with the next known expiry time 

542 either from the queue or by comparing to the optional argument `compare_to`. 

543 This method returns True/False indicating whether the `next_expire_time` 

544 was updated. 

545 """ 

546 if compare_to: 

547 # If we pass in a time earlier than the already known `next_expire_time` 

548 # or this is the first expire time we know of... set it to `compare_to` 

549 

550 # NOTE: We could end up in a race condition here, where threads could 

551 # update the `_next_expire_time` to their own value of `compare_to` 

552 # if at the time they checked that their time was "earlier" than the 

553 # shared `_next_expire_time`. 

554 # For the purpose this is used, this is an OK behavior since: 

555 # 1. If this method is called around the same time on different threads, 

556 # the expiry time should be very close (`delta`). 

557 # 2. We may end up waiting for an additional `delta` time to expire the first 

558 # session in the OrderedDict, and then rapidly close all the subsequent 

559 # sessions with expire_time < now. 

560 # This approach allows for potentially "lazy session expiry" (after an additional minimal `delta`), 

561 # giving priority to all the other work buildgrid needs to do, instead of using the overhead of 

562 # locking this and blocking up multiple threads to update this with each rpc. 

563 # TL;DR Approximation of the `next_expire_time` here is good enough for this purpose. 

564 if not self._next_expire_time or compare_to < self._next_expire_time: 

565 self._next_expire_time = compare_to 

566 return True 

567 else: 

568 _, next_expire_time_in_queue = self._get_next_botsession_expiry() 

569 # It is likely that the expire time we knew of is no longer in the OrderedDict 

570 # (e.g. we assigned a new one to that BotSession), thus this could be either 

571 # before or after the previously known `next_expire_time` 

572 if self._next_expire_time != next_expire_time_in_queue: 

573 self._next_expire_time = next_expire_time_in_queue 

574 return True 

575 

576 return False 

577 

578 def _next_expire_time_occurs_in(self): 

579 if self._next_expire_time: 

580 next_expire_time = (self._next_expire_time - datetime.utcnow()).total_seconds() 

581 # Check if this is in the future (> 0, negative values means expiry happened already!) 

582 if next_expire_time > 0: 

583 # Pad this with 0.1 second so that the expiry actually happens when we try to reap 

584 return round(next_expire_time + 0.1, 3) 

585 return 0 

586 

587 return None 

588 

589 def _get_next_botsession_expiry(self): 

590 botsession_name = None 

591 expire_time = None 

592 # We want to `peek` the first entry of the OrderedDict here 

593 # We do this by: 

594 # 1. Popping the first item (if any) 

595 # 2. Inserting that key-value pair again (goes to the end with the OrderedDict) 

596 # 3. Moving that newly re-inserted entry to the beginning (to preserve the order) 

597 # This should work exactly as a `peek` since we only pop the first item in the asyncio event loop, 

598 # and we know that all other items we add in this OrderedDict must be >= the current first in 

599 # terms of expiry (Thus re-adding it and moving it to first should still maintain the sorted order). 

600 try: 

601 botsession_name, expire_time = self._ordered_expire_times_by_botsession.popitem(last=False) 

602 except KeyError: 

603 pass # OrderedDict is empty, no BotSessions to check at this instant 

604 else: 

605 self._ordered_expire_times_by_botsession[botsession_name] = expire_time 

606 self._ordered_expire_times_by_botsession.move_to_end(botsession_name, last=False) 

607 

608 return (botsession_name, expire_time) 

609 

610 def _reap_expired_sessions(self): 

611 self.__logger.debug("Checking for expired BotSessions to reap...") 

612 next_botsession_name_to_expire, next_botsession_expire_time = self._get_next_botsession_expiry() 

613 while next_botsession_expire_time and next_botsession_expire_time <= datetime.utcnow(): 

614 # This is the last deadline we have communicated with this bot... 

615 # It has expired. 

616 # If there is no bot_id -> bot_name mapping anymore, Bot may have opened a new BotSession 

617 bot_id = self._bot_ids.get(next_botsession_name_to_expire) 

618 

619 self.__logger.warning( 

620 f"BotSession name=[{next_botsession_name_to_expire}] for bot_id=[{bot_id}] " 

621 f"with deadline=[{next_botsession_expire_time}] has expired.") 

622 try: 

623 self._close_bot_session(next_botsession_name_to_expire, reason="expired") 

624 except BotSessionClosedError: 

625 self.__logger.warning( 

626 f"Expired BotSession name=[{next_botsession_name_to_expire}] " 

627 f"for bot_id=[{self._bot_ids.get(next_botsession_name_to_expire)}] " 

628 f"with deadline=[{next_botsession_expire_time}] was already closed.") 

629 pass 

630 

631 next_botsession_name_to_expire, next_botsession_expire_time = self._get_next_botsession_expiry() 

632 

633 self._update_next_expire_time() 

634 

635 async def _reap_expired_sessions_loop(self): 

636 try: 

637 self.__logger.info( 

638 "Starting BotSession reaper, bot_session_keepalive_timeout=" 

639 f"[{self._bot_session_keepalive_timeout}].") 

640 while True: 

641 try: 

642 # for <= 0, assume something expired already 

643 expires_in = self._next_expire_time_occurs_in() 

644 if expires_in: 

645 self.__logger.debug( 

646 f"Waiting for an event indicating earlier expiry or wait=[{expires_in}] " 

647 "for the next BotSession to expire.") 

648 else: 

649 self.__logger.debug("No more BotSessions to watch for expiry, waiting for new BotSessions.") 

650 await asyncio.wait_for(self._deadline_event.wait(), timeout=expires_in) 

651 self._deadline_event.clear() 

652 except asyncio.TimeoutError: 

653 pass 

654 

655 self._reap_expired_sessions() 

656 except asyncio.CancelledError: 

657 self.__logger.info("Cancelled reaper task.") 

658 pass 

659 except Exception as exception: 

660 self.__logger.exception(exception) 

661 raise 

662 

663 def _setup_bot_session_reaper_loop(self): 

664 if self._bot_session_keepalive_timeout: 

665 if self._bot_session_keepalive_timeout <= 0: 

666 raise InvalidArgumentError( 

667 f"[bot_session_keepalive_timeout] set to [{self._bot_session_keepalive_timeout}], " 

668 "must be > 0, in seconds") 

669 

670 # Add the expired session reaper in the event loop 

671 main_loop = asyncio.get_event_loop() 

672 main_loop.create_task(self._reap_expired_sessions_loop()) 

673 return True 

674 return False 

675 

676 def _clear_status_count_for_bot_name(self, bot_name: str) -> None: 

677 with self._bot_name_by_status_lock: 

678 for status in self._bot_name_by_status: 

679 self._bot_name_by_status[status].discard(bot_name) 

680 

681 def _update_status_count_for_bot_session(self, bot_session: bots_pb2.BotSession) -> None: 

682 bot_name = bot_session.name 

683 bot_status = BotStatus(bot_session.status) 

684 

685 self._clear_status_count_for_bot_name(bot_name) 

686 with self._bot_name_by_status_lock: 

687 try: 

688 self._bot_name_by_status[bot_status].add(bot_name) 

689 except KeyError: 

690 # We are not tracking this bot status, this will be a no-op 

691 pass