Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17BotsInterface 

18================= 

19 

20Instance of the Remote Workers interface. 

21""" 

22from datetime import datetime, timedelta 

23from collections import OrderedDict 

24from threading import Lock 

25import asyncio 

26import logging 

27import uuid 

28 

29from buildgrid._exceptions import ( 

30 InvalidArgumentError, NotFoundError, BotSessionClosedError, 

31 UnknownBotSessionError, BotSessionMismatchError, DuplicateBotSessionError 

32) 

33from buildgrid.server.scheduler import Scheduler 

34from buildgrid.settings import NETWORK_TIMEOUT 

35 

36from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

37 

38from ..job import LeaseState, BotStatus 

39 

40 

41class BotsInterface: 

42 

43 def __init__(self, data_store, *, action_cache=None, 

44 bot_session_keepalive_timeout=None, permissive_bot_session=None): 

45 self.__logger = logging.getLogger(__name__) 

46 

47 self._scheduler = Scheduler(data_store, action_cache) 

48 self._instance_name = None 

49 

50 # Mapping of bot_session.name -> bot_session.id 

51 self._bot_ids = {} 

52 

53 # Mapping of bot_session.status -> set(bot_session.id, ...) 

54 self._bot_name_by_status = {bot_status: set() for bot_status in BotStatus} 

55 self._bot_name_by_status_lock = Lock() 

56 

57 self._assigned_leases = {} 

58 self._assigned_leases_lock = Lock() 

59 

60 self._bot_session_keepalive_timeout = bot_session_keepalive_timeout 

61 bot_session_reaper_started = self._setup_bot_session_reaper_loop() 

62 

63 self._permissive_bot_session = permissive_bot_session 

64 if bot_session_reaper_started and permissive_bot_session: 

65 self.__logger.warning( 

66 "Both BotSession reaper and Permissive BotSession mode are enabled." 

67 "If the DNS configuration is not resulting in 'sticky sessions' with " 

68 "bots talking to the same BuildGrid process (unless unhealthy), " 

69 "the BotSession reaper from other processes may cancel and re-queue " 

70 "ongoing leases. Please refer to the documentation for more information." 

71 ) 

72 

73 # Ordered mapping of bot_session_name: string -> last_expire_time_we_assigned: datetime 

74 # NOTE: This works because the bot_session_keepalive_timeout is the same for all bots 

75 # and thus always increases with time (e.g. inserting at the end keeps them sorted because 

76 # of this property, otherwise we may have had to insert 'in the middle') 

77 self._ordered_expire_times_by_botsession = OrderedDict() 

78 # The "minimum" expire_time we have coming up 

79 self._next_expire_time = None 

80 # The Event to set when we learn about a new expire time that is at a different point in the 

81 # future than what we knew (e.g. whenever we reset the value of self._next_expire_time) 

82 # This is mostly useful when we end up with a `next_expire_time` closer to the future than we 

83 # initially thought (e.g. tracking the first BotSession expiry since all BotSessions are assigned 

84 # the same keepalive_timeout). 

85 # NOTE: asyncio.Event() is NOT thread-safe. 

86 # However, here we .set() it from the ThreadPool threads handling RPC requests 

87 # and only clearing it from the asyncio event loop which the `reaper_loop`. 

88 self._deadline_event = asyncio.Event() 

89 

90 # Remembering the last n evicted_bot_sessions so that we can present the appropriate 

91 # messages if they ever get back. (See additional notes in `_close_bot_session`). 

92 self._remember_last_n_evicted_bot_sessions = 1000 

93 # Maps bot_session_name: string to (eviction_time: datetime, reason: string), with a maximum size 

94 # of approx `_remeber_last_n_evicted_bot_sessions`. 

95 self._evicted_bot_sessions = OrderedDict() 

96 

97 # --- Public API --- 

98 

99 @property 

100 def instance_name(self): 

101 return self._instance_name 

102 

103 @property 

104 def scheduler(self): 

105 return self._scheduler 

106 

107 def register_instance_with_server(self, instance_name, server): 

108 """Names and registers the bots interface with a given server.""" 

109 if self._instance_name is None: 

110 server.add_bots_interface(self, instance_name) 

111 

112 self._instance_name = instance_name 

113 

114 else: 

115 raise AssertionError("Instance already registered") 

116 

117 def create_bot_session(self, parent, bot_session): 

118 """ Creates a new bot session. Server should assign a unique 

119 name to the session. If a bot with the same bot id tries to 

120 register with the service, the old one should be closed along 

121 with all its jobs. 

122 """ 

123 if not bot_session.bot_id: 

124 raise InvalidArgumentError("Bot's id must be set by client.") 

125 

126 try: 

127 self._check_bot_ids(bot_session.bot_id) 

128 except DuplicateBotSessionError: 

129 pass 

130 

131 # Bot session name, selected by the server 

132 name = f"{parent}/{str(uuid.uuid4())}" 

133 bot_session.name = name 

134 

135 self._track_bot_session(name, bot_session.bot_id) 

136 

137 self._request_leases(bot_session, name=name) 

138 self._assign_deadline_for_botsession(bot_session, name) 

139 

140 self.__logger.info( 

141 f"Opened BotSession name=[{bot_session.name}] for bot_id=[{bot_session.bot_id}].") 

142 leases = ",".join(lease.id[:8] for lease in bot_session.leases) 

143 self.__logger.debug(f"Leases assigned to newly opened BotSession name=[{bot_session.name}] " 

144 f"for bot_id=[{bot_session.bot_id}]: [{leases}].") 

145 

146 self._update_status_count_for_bot_session(bot_session) 

147 return bot_session 

148 

149 def _track_bot_session(self, bot_session_name, bot_id, leases=()): 

150 self.__logger.debug( 

151 f"Now tracking BotSession name=[{bot_session_name}] " 

152 f"for bot_id=[{bot_id}] with leases=[{leases}]") 

153 

154 self._bot_ids[bot_session_name] = bot_id 

155 # We want to keep a copy of lease ids we have assigned 

156 leases_set = set(leases) 

157 with self._assigned_leases_lock: 

158 self._assigned_leases[bot_session_name] = leases_set 

159 

160 def _check_bot_leases_exist_in_backend_or_raise(self, bot_session): 

161 if bot_session and bot_session.leases: 

162 for client_lease in list(bot_session.leases): 

163 try: 

164 # Check if the lease is known to the scheduler 

165 # Otherwise a NotFoundError will be raised 

166 _ = self._scheduler.get_job_lease(client_lease.id) 

167 except NotFoundError: 

168 raise UnknownBotSessionError( 

169 f"Could not verify lease accuracy for relocating bot with " 

170 f"bot_name=[%{bot_session.name}] and bot_id=[%{bot_session.bot_id}]: " 

171 f"no record for lease id=[%{client_lease.id}.") 

172 else: 

173 # We don't have a record of any leases for this botsession, just reject 

174 raise UnknownBotSessionError( 

175 f"Relocating bot with bot_name=[%{bot_session.name}] and " 

176 f"bot_id=[%{bot_session.bot_id}] doesn't have any leases thus " 

177 f"rejecting the request.") 

178 

179 def update_bot_session(self, name, bot_session, deadline=None): 

180 """ Client updates the server. Any changes in state to the Lease should be 

181 registered server side. Assigns available leases with work. 

182 """ 

183 try: 

184 self._check_bot_ids(bot_session.bot_id, name) 

185 except (UnknownBotSessionError, BotSessionClosedError): 

186 if self._permissive_bot_session: 

187 # If in permissive mode, implicitly reopen botsession 

188 self.__logger.info( 

189 f"BotSession with bot_name=[{bot_session.name}] and bot_id=[{bot_session.bot_id}] " 

190 f"is now talking to this server instance. Confirming lease accuracy..." 

191 ) 

192 

193 self._check_bot_leases_exist_in_backend_or_raise(bot_session) 

194 

195 lease_ids = [lease.id for lease in bot_session.leases] 

196 self._track_bot_session(name, bot_session.bot_id, lease_ids) 

197 

198 self.__logger.info( 

199 f"Successfully relocated BotSession with bot_name=[{bot_session.name}] " 

200 f"and bot_id=[{bot_session.bot_id}] with leases=[{lease_ids}].") 

201 else: 

202 # Default behavior is to raise those exceptions 

203 # and handle them accordingly at a higher level 

204 self.__logger.warning( 

205 f"Unknown bot with bot_name=[{bot_session.name}] and " 

206 f"bot_id=[{bot_session.bot_id}]; permissive BotSession mode disabled.") 

207 raise 

208 

209 self._check_assigned_leases(bot_session) 

210 

211 # Stop tracking the prior deadline since we have heard back 

212 # by the deadline we had announced, now we're going to prepare 

213 # a new BotSession for the bot and once done assign a new deadline. 

214 self._untrack_deadline_for_botsession(bot_session.name) 

215 

216 for lease in list(bot_session.leases): 

217 checked_lease = self._check_lease_state(lease) 

218 if not checked_lease: 

219 # If a lease that was supposed to be assigned to this bot_session 

220 # is no longer in the leases list it sent back, 

221 # make sure to update our assigned leases records and let it 

222 # be rescheduled 

223 with self._assigned_leases_lock: 

224 if name in self._assigned_leases: 

225 self._assigned_leases[name].remove(lease.id) 

226 try: 

227 self._scheduler.delete_job_lease(lease.id) 

228 except NotFoundError: 

229 # Job already dropped from scheduler 

230 pass 

231 

232 bot_session.leases.remove(lease) 

233 

234 self._request_leases(bot_session, deadline, name) 

235 # Assign a new deadline to the BotSession 

236 self._assign_deadline_for_botsession(bot_session, name) 

237 

238 leases = ",".join(lease.id[:8] for lease in bot_session.leases) 

239 self.__logger.debug(f"Sending BotSession update for name=[{bot_session.name}], " 

240 f"bot_id=[{bot_session.bot_id}]: leases=[{leases}].") 

241 

242 self._update_status_count_for_bot_session(bot_session) 

243 return bot_session 

244 

245 def count_bots(self) -> int: 

246 return len(self._bot_ids) 

247 

248 def count_bots_by_status(self, status: BotStatus) -> int: 

249 return len(self._bot_name_by_status[status]) 

250 

251 # --- Private API --- 

252 def _request_leases(self, bot_session, deadline=None, name=None): 

253 # Only send one lease at a time currently. 

254 if bot_session.status == BotStatus.OK.value and not bot_session.leases: 

255 worker_capabilities = {} 

256 

257 # TODO? Fail if there are no devices in the worker? 

258 if bot_session.worker.devices: 

259 # According to the spec: 

260 # "The first device in the worker is the "primary device" - 

261 # that is, the device running a bot and which is 

262 # responsible for actually executing commands." 

263 primary_device = bot_session.worker.devices[0] 

264 

265 for device_property in primary_device.properties: 

266 if device_property.key not in worker_capabilities: 

267 worker_capabilities[device_property.key] = set() 

268 worker_capabilities[device_property.key].add(device_property.value) 

269 self.__logger.debug(f"New Workers Capabilities: {worker_capabilities}") 

270 

271 # If the client specified deadline is less than NETWORK_TIMEOUT, 

272 # the response shouldn't long poll for work. 

273 if deadline and (deadline > NETWORK_TIMEOUT): 

274 deadline = deadline - NETWORK_TIMEOUT 

275 else: 

276 deadline = None 

277 

278 leases = self._scheduler.request_job_leases( 

279 worker_capabilities, 

280 timeout=deadline, 

281 worker_name=name, 

282 bot_id=self._bot_ids.get(name)) 

283 

284 if leases: 

285 with self._assigned_leases_lock: 

286 if bot_session.name in self._assigned_leases: 

287 for lease in leases: 

288 self._assigned_leases[bot_session.name].add(lease.id) 

289 bot_session.leases.extend(leases) 

290 else: 

291 # The BotSession may no longer exist, make sure the leases are re-queued! 

292 for lease in leases: 

293 self._scheduler.retry_job_lease(lease.id) 

294 self.__logger.info(f'BotSession name=[{name}] closed while trying to assign leases. ' 

295 f'Re-queued n=[{len(leases)}] leases=[{leases}].') 

296 raise BotSessionClosedError('BotSession closed while assigning leases. Re-queued leases.') 

297 

298 def _check_lease_state(self, lease): 

299 # careful here 

300 # should store bot name in scheduler 

301 lease_state = LeaseState(lease.state) 

302 

303 # Lease has replied with cancelled, remove 

304 if lease_state == LeaseState.CANCELLED: 

305 return None 

306 

307 try: 

308 if self._scheduler.get_job_lease_cancelled(lease.id): 

309 lease.state = LeaseState.CANCELLED.value 

310 return lease 

311 except KeyError: 

312 # Job does not exist, remove from bot. 

313 return None 

314 

315 self._scheduler.update_job_lease_state(lease.id, lease) 

316 

317 if lease_state == LeaseState.COMPLETED: 

318 return None 

319 

320 return lease 

321 

322 def _get_bot_id_from_bot_name_or_raise(self, name): 

323 """ Returns the bot_id corresponding to the passed `name`. 

324 Raises BotSessionClosedError if the botsession was recently closed. 

325 Raises UnknownBotSessionError if there is no such known BotSession. 

326 """ 

327 bot_id = self._bot_ids.get(name) 

328 if bot_id is None: 

329 eviction_record = self._evicted_bot_sessions.get(name) 

330 if eviction_record: 

331 raise BotSessionClosedError(f'Server has recently evicted the BotSession name=[{name}] at ' 

332 f'timestamp=[{eviction_record[0]}], reason=[{eviction_record[1]}]') 

333 raise UnknownBotSessionError('Unknown BotSession. BuildGrid has not seen a ' 

334 f'BotSession with name=[{name}] recently.') 

335 return bot_id 

336 

337 def _check_bot_ids(self, bot_id, name=None): 

338 """ Checks whether the ID and the name of the bot match, 

339 otherwise closes the bot sessions with that name or ID 

340 """ 

341 if name is not None: 

342 _bot_id = self._get_bot_id_from_bot_name_or_raise(name) 

343 if _bot_id != bot_id: 

344 self._close_bot_session(name, reason="bot_id mismatch between worker and bgd") 

345 raise BotSessionMismatchError( 

346 f'Mismatch between client supplied client_bot_id=[{bot_id}] and ' 

347 f'buildgrid record of bgd_bot_id=[{_bot_id}] for BotSession with name=[{name}].') 

348 else: 

349 for _name, _bot_id in self._bot_ids.items(): 

350 if bot_id == _bot_id: 

351 self._close_bot_session(_name, 

352 reason="Bot with same ID trying to create a new BotSession") 

353 raise DuplicateBotSessionError( 

354 f'Bot ID bot_id=[{bot_id}] already registered and given bgd_bot_name=[{_name}].') 

355 

356 def _assign_deadline_for_botsession(self, bot_session, bot_session_name): 

357 """ Assigns a deadline to the BotSession if bgd was configured to do so 

358 """ 

359 # Specify bot keepalive expiry time if timeout is set 

360 if self._bot_session_keepalive_timeout: 

361 # Calculate expire time 

362 expire_time_python = datetime.utcnow() + timedelta(seconds=self._bot_session_keepalive_timeout) 

363 

364 # Set it in the bot_session 

365 bot_session.expire_time.FromDatetime(expire_time_python) 

366 

367 # Keep track internally for the botsession reaper 

368 self._track_deadline_for_bot_session(bot_session_name, expire_time_python) 

369 

370 def _untrack_deadline_for_botsession(self, bot_session_name): 

371 """ Un-assigns the session reaper tracked deadline of the BotSession 

372 if bgd was configured to do so 

373 """ 

374 # Specify bot keepalive expiry time if timeout is set 

375 if self._bot_session_keepalive_timeout: 

376 self._track_deadline_for_bot_session(bot_session_name, None) 

377 

378 def _track_deadline_for_bot_session(self, bot_session_name, new_deadline): 

379 """ Updates the data structures keeping track of the last deadline 

380 we had assigned to this BotSession by name. 

381 When `new_deadline` is set to None, the deadline is unassigned. 

382 """ 

383 # Keep track of the next expire time to inform the watcher 

384 updated_next_expire_time = False 

385 

386 if new_deadline: 

387 # Since we're re-setting the update time for this bot, make sure to move it 

388 # to the end of the OrderedDict (if it was already tracked in the OrderedDict) 

389 try: 

390 self._ordered_expire_times_by_botsession.move_to_end(bot_session_name) 

391 except KeyError: 

392 pass 

393 

394 self._ordered_expire_times_by_botsession[bot_session_name] = new_deadline 

395 updated_next_expire_time = True 

396 else: 

397 try: 

398 if self._ordered_expire_times_by_botsession.pop(bot_session_name): 

399 updated_next_expire_time = True 

400 except KeyError: 

401 self.__logger.debug("Tried to un-assign deadline for bot_session_name=" 

402 f"[{bot_session_name}] but it had no deadline to begin with.") 

403 pass 

404 

405 # Make the botsession reaper thread look at the current new_deadline 

406 # (if it's nearer in the future) compared to the previously known `next_expire_time`. 

407 if updated_next_expire_time: 

408 if self._update_next_expire_time(compare_to=new_deadline): 

409 self._deadline_event.set() 

410 

411 def _check_assigned_leases(self, bot_session): 

412 session_lease_ids = [] 

413 

414 for lease in bot_session.leases: 

415 session_lease_ids.append(lease.id) 

416 

417 with self._assigned_leases_lock: 

418 if bot_session.name in self._assigned_leases: 

419 for lease_id in self._assigned_leases[bot_session.name]: 

420 if lease_id not in session_lease_ids: 

421 self.__logger.error(f"Assigned lease id=[{lease_id}], " 

422 f"not found on bot with name=[{bot_session.name}] and " 

423 f"id=[{bot_session.bot_id}]. Retrying job.") 

424 try: 

425 self._scheduler.retry_job_lease(lease_id) 

426 except NotFoundError: 

427 pass 

428 else: 

429 raise BotSessionClosedError(f"BotSession name=[{bot_session.name}] for bot_id=" 

430 f"[{bot_session.bot_id}] closed while checking leases.") 

431 

432 def _truncate_eviction_history(self): 

433 # Make sure we're only keeping the last N evicted sessions 

434 # NOTE: there could be some rare race conditions when the length of the OrderedDict is 

435 # only 1 below the limit; Multiple threads could check the size simultaneously before 

436 # they get to add their items in the OrderedDict, resulting in a size bigger than initially intented 

437 # (with a very unlikely upper bound of: 

438 # O(n) = `remember_last_n_evicted_bot_sessions` 

439 # + min(number_of_threads, number_of_concurrent_threads_cpu_can_handle)). 

440 # The size being only 1 below the limit could also happen when the OrderedDict contains 

441 # exactly `n` items and a thread trying to insert sees the limit has been reached and makes 

442 # just enough space to add its own item. 

443 # The cost of locking vs using a bit more memory for a few more items in-memory is high, thus 

444 # we opt for the unlikely event of the OrderedDict growing a bit more and 

445 # make the next thread which tries to to insert an item, clean up `while len > n`. 

446 while len(self._evicted_bot_sessions) > self._remember_last_n_evicted_bot_sessions: 

447 self._evicted_bot_sessions.popitem(last=False) 

448 

449 def _close_bot_session(self, name, *, reason): 

450 """ Before removing the session, close any leases and 

451 requeue with high priority. 

452 """ 

453 # If we had assigned an expire_time for this botsession, make sure to 

454 # clean up, regardless of the reason we end up closing this BotSession 

455 self._untrack_deadline_for_botsession(name) 

456 

457 retried_leases = 0 

458 total_leases = 0 

459 with self._assigned_leases_lock: 

460 if name in self._assigned_leases: 

461 total_leases = len(self._assigned_leases[name]) 

462 for lease_id in self._assigned_leases[name]: 

463 try: 

464 self._scheduler.retry_job_lease(lease_id) 

465 except NotFoundError: 

466 pass 

467 else: 

468 retried_leases += 1 

469 self._assigned_leases.pop(name) 

470 

471 self._truncate_eviction_history() 

472 # Record this eviction 

473 self._evicted_bot_sessions[name] = (datetime.utcnow(), reason) 

474 

475 try: 

476 bot_id = self._get_bot_id_from_bot_name_or_raise(name) 

477 self._bot_ids.pop(name) 

478 except (BotSessionMismatchError, DuplicateBotSessionError) as e: 

479 self.__logger.warning('Unable to identify `bot_id` associated with BotSession ' 

480 f'while closing the BotSession name=[{name}]: {e}') 

481 bot_id = 'unknown' 

482 

483 self._clear_status_count_for_bot_name(name) 

484 

485 self.__logger.info(f'Closed BotSession bot_id=[{bot_id}], name=[{name}], reason=[{reason}] ' 

486 f'and sucessfully requeued [{retried_leases}]/[{total_leases}] leases.') 

487 

488 def _update_next_expire_time(self, compare_to=None): 

489 """ 

490 If we don't have any more bot_session deadlines, clear out this variable 

491 to avoid busy-waiting. Otherwise, populate it with the next known expiry time 

492 either from the queue or by comparing to the optional argument `compare_to`. 

493 This method returns True/False indicating whether the `next_expire_time` 

494 was updated. 

495 """ 

496 if compare_to: 

497 # If we pass in a time earlier than the already known `next_expire_time` 

498 # or this is the first expire time we know of... set it to `compare_to` 

499 

500 # NOTE: We could end up in a race condition here, where threads could 

501 # update the `_next_expire_time` to their own value of `compare_to` 

502 # if at the time they checked that their time was "earlier" than the 

503 # shared `_next_expire_time`. 

504 # For the purpose this is used, this is an OK behavior since: 

505 # 1. If this method is called around the same time on different threads, 

506 # the expiry time should be very close (`delta`). 

507 # 2. We may end up waiting for an additional `delta` time to expire the first 

508 # session in the OrderedDict, and then rapidly close all the subsequent 

509 # sessions with expire_time < now. 

510 # This approach allows for potentially "lazy session expiry" (after an additional minimal `delta`), 

511 # giving priority to all the other work buildgrid needs to do, instead of using the overhead of 

512 # locking this and blocking up multiple threads to update this with each rpc. 

513 # TL;DR Approximation of the `next_expire_time` here is good enough for this purpose. 

514 if not self._next_expire_time or compare_to < self._next_expire_time: 

515 self._next_expire_time = compare_to 

516 return True 

517 else: 

518 _, next_expire_time_in_queue = self._get_next_botsession_expiry() 

519 # It is likely that the expire time we knew of is no longer in the OrderedDict 

520 # (e.g. we assigned a new one to that BotSession), thus this could be either 

521 # before or after the previously known `next_expire_time` 

522 if self._next_expire_time != next_expire_time_in_queue: 

523 self._next_expire_time = next_expire_time_in_queue 

524 return True 

525 

526 return False 

527 

528 def _next_expire_time_occurs_in(self): 

529 if self._next_expire_time: 

530 next_expire_time = (self._next_expire_time - datetime.utcnow()).total_seconds() 

531 # Check if this is in the future (> 0, negative values means expiry happened already!) 

532 if next_expire_time > 0: 

533 # Pad this with 0.1 second so that the expiry actually happens when we try to reap 

534 return round(next_expire_time + 0.1, 3) 

535 return 0 

536 

537 return None 

538 

539 def _get_next_botsession_expiry(self): 

540 botsession_name = None 

541 expire_time = None 

542 # We want to `peek` the first entry of the OrderedDict here 

543 # We do this by: 

544 # 1. Popping the first item (if any) 

545 # 2. Inserting that key-value pair again (goes to the end with the OrderedDict) 

546 # 3. Moving that newly re-inserted entry to the beginning (to preserve the order) 

547 # This should work exactly as a `peek` since we only pop the first item in the asyncio event loop, 

548 # and we know that all other items we add in this OrderedDict must be >= the current first in 

549 # terms of expiry (Thus re-adding it and moving it to first should still maintain the sorted order). 

550 try: 

551 botsession_name, expire_time = self._ordered_expire_times_by_botsession.popitem(last=False) 

552 except KeyError: 

553 pass # OrderedDict is empty, no BotSessions to check at this instant 

554 else: 

555 self._ordered_expire_times_by_botsession[botsession_name] = expire_time 

556 self._ordered_expire_times_by_botsession.move_to_end(botsession_name, last=False) 

557 

558 return (botsession_name, expire_time) 

559 

560 def _reap_next_expired_session(self): 

561 self.__logger.debug("Checking for next BotSession to reap...") 

562 next_botsession_name_to_expire, next_botsession_expire_time = self._get_next_botsession_expiry() 

563 

564 if next_botsession_expire_time and next_botsession_expire_time <= datetime.utcnow(): 

565 # This is the last deadline we have communicated with this bot... 

566 # It has expired. 

567 # If there is no bot_id -> bot_name mapping anymore, Bot may have opened a new BotSession 

568 bot_id = self._bot_ids.get(next_botsession_name_to_expire) 

569 

570 self.__logger.warning( 

571 f"BotSession name=[{next_botsession_name_to_expire}] for bot_id=[{bot_id}] " 

572 f"with deadline=[{next_botsession_expire_time}] has expired.") 

573 try: 

574 self._close_bot_session(next_botsession_name_to_expire, reason="expired") 

575 except BotSessionClosedError: 

576 self.__logger.warning( 

577 f"Expired BotSession name=[{next_botsession_name_to_expire}] " 

578 f"for bot_id=[{self._bot_ids.get(next_botsession_name_to_expire)}] " 

579 f"with deadline=[{next_botsession_expire_time}] was already closed.") 

580 pass 

581 

582 self._update_next_expire_time() 

583 

584 async def _reap_expired_sessions_loop(self): 

585 try: 

586 self.__logger.info( 

587 "Starting BotSession reaper, bot_session_keepalive_timeout=" 

588 f"[{self._bot_session_keepalive_timeout}].") 

589 while True: 

590 try: 

591 # for <= 0, assume something expired already 

592 expires_in = self._next_expire_time_occurs_in() 

593 if expires_in: 

594 self.__logger.debug( 

595 f"Waiting for an event indicating earlier expiry or wait=[{expires_in}] " 

596 "for the next BotSession to expire.") 

597 else: 

598 self.__logger.debug("No more BotSessions to watch for expiry, waiting for new BotSessions.") 

599 await asyncio.wait_for(self._deadline_event.wait(), timeout=expires_in) 

600 self._deadline_event.clear() 

601 except asyncio.TimeoutError: 

602 pass 

603 

604 self._reap_next_expired_session() 

605 except asyncio.CancelledError: 

606 self.__logger.info("Cancelled reaper task.") 

607 pass 

608 except Exception as exception: 

609 self.__logger.exception(exception) 

610 raise 

611 

612 def _setup_bot_session_reaper_loop(self): 

613 if self._bot_session_keepalive_timeout: 

614 if self._bot_session_keepalive_timeout <= 0: 

615 raise InvalidArgumentError( 

616 f"[bot_session_keepalive_timeout] set to [{self._bot_session_keepalive_timeout}], " 

617 "must be > 0, in seconds") 

618 

619 # Add the expired session reaper in the event loop 

620 main_loop = asyncio.get_event_loop() 

621 main_loop.create_task(self._reap_expired_sessions_loop()) 

622 return True 

623 return False 

624 

625 def _clear_status_count_for_bot_name(self, bot_name: str) -> None: 

626 with self._bot_name_by_status_lock: 

627 for status in self._bot_name_by_status: 

628 self._bot_name_by_status[status].discard(bot_name) 

629 

630 def _update_status_count_for_bot_session(self, bot_session: bots_pb2.BotSession) -> None: 

631 bot_name = bot_session.name 

632 bot_status = BotStatus(bot_session.status) 

633 

634 self._clear_status_count_for_bot_name(bot_name) 

635 with self._bot_name_by_status_lock: 

636 try: 

637 self._bot_name_by_status[bot_status].add(bot_name) 

638 except KeyError: 

639 # We are not tracking this bot status, this will be a no-op 

640 pass