Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/instance.py: 86.43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

339 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17BotsInterface 

18================= 

19 

20Instance of the Remote Workers interface. 

21""" 

22import asyncio 

23from collections import OrderedDict 

24from datetime import datetime, timedelta 

25import logging 

26from threading import Lock 

27from typing import Dict, Optional, Set, Tuple 

28import uuid 

29 

30from buildgrid._enums import BotStatus, LeaseState 

31from buildgrid._exceptions import ( 

32 InvalidArgumentError, NotFoundError, BotSessionClosedError, 

33 UnknownBotSessionError, BotSessionMismatchError, DuplicateBotSessionError 

34) 

35from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

36from buildgrid.server.metrics_names import ( 

37 BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, 

38 BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME 

39) 

40from buildgrid.server.metrics_utils import DurationMetric 

41from buildgrid.server.persistence.interface import DataStoreInterface 

42from buildgrid.server.scheduler import Scheduler 

43from buildgrid.settings import NETWORK_TIMEOUT 

44 

45 

46class BotsInterface: 

47 

48 def __init__(self, data_store: DataStoreInterface, *, action_cache=None, 

49 bot_session_keepalive_timeout: Optional[int]=None, 

50 permissive_bot_session: Optional[bool]=None, 

51 logstream_url: Optional[str]=None, 

52 logstream_credentials=None, 

53 logstream_instance_name: Optional[str]=None): 

54 self.__logger = logging.getLogger(__name__) 

55 

56 self._scheduler = Scheduler(data_store, action_cache, 

57 logstream_url=logstream_url, 

58 logstream_credentials=logstream_credentials, 

59 logstream_instance_name=logstream_instance_name) 

60 self._instance_name = None 

61 

62 # Mapping of bot_session.name -> bot_session.id 

63 self._bot_ids: Dict[str, str] = {} 

64 

65 # Mapping of bot_session.status -> set(bot_session.id, ...) 

66 self._bot_name_by_status: Dict[BotStatus, Set] = {bot_status: set() for bot_status in BotStatus} 

67 self._bot_name_by_status_lock = Lock() 

68 

69 self._assigned_leases: Dict[str, str] = {} 

70 self._assigned_leases_lock = Lock() 

71 

72 self._bot_session_keepalive_timeout = bot_session_keepalive_timeout 

73 bot_session_reaper_started = self._setup_bot_session_reaper_loop() 

74 

75 self._permissive_bot_session = permissive_bot_session 

76 if bot_session_reaper_started and permissive_bot_session: 

77 self.__logger.warning( 

78 "Both BotSession reaper and Permissive BotSession mode are enabled." 

79 "If the DNS configuration is not resulting in 'sticky sessions' with " 

80 "bots talking to the same BuildGrid process (unless unhealthy), " 

81 "the BotSession reaper from other processes may cancel and re-queue " 

82 "ongoing leases. Please refer to the documentation for more information." 

83 ) 

84 

85 # Ordered mapping of bot_session_name: string -> last_expire_time_we_assigned: datetime 

86 # NOTE: This works because the bot_session_keepalive_timeout is the same for all bots 

87 # and thus always increases with time (e.g. inserting at the end keeps them sorted because 

88 # of this property, otherwise we may have had to insert 'in the middle') 

89 self._ordered_expire_times_by_botsession: Dict[str, datetime] = OrderedDict() 

90 # The "minimum" expire_time we have coming up 

91 self._next_expire_time = None 

92 # The Event to set when we learn about a new expire time that is at a different point in the 

93 # future than what we knew (e.g. whenever we reset the value of self._next_expire_time) 

94 # This is mostly useful when we end up with a `next_expire_time` closer to the future than we 

95 # initially thought (e.g. tracking the first BotSession expiry since all BotSessions are assigned 

96 # the same keepalive_timeout). 

97 # NOTE: asyncio.Event() is NOT thread-safe. 

98 # However, here we .set() it from the ThreadPool threads handling RPC requests 

99 # and only clearing it from the asyncio event loop which the `reaper_loop`. 

100 self._deadline_event = asyncio.Event() 

101 

102 # Remembering the last n evicted_bot_sessions so that we can present the appropriate 

103 # messages if they ever get back. (See additional notes in `_close_bot_session`). 

104 self._remember_last_n_evicted_bot_sessions = 1000 

105 # Maps bot_session_name: string to (eviction_time: datetime, reason: string), with a maximum size 

106 # of approx `_remeber_last_n_evicted_bot_sessions`. 

107 self._evicted_bot_sessions: Dict[str, Tuple[datetime, str]] = OrderedDict() 

108 

109 # --- Public API --- 

110 

111 @property 

112 def instance_name(self): 

113 return self._instance_name 

114 

115 @property 

116 def scheduler(self): 

117 return self._scheduler 

118 

119 def setup_grpc(self): 

120 self._scheduler.setup_grpc() 

121 

122 def register_instance_with_server(self, instance_name, server): 

123 """Names and registers the bots interface with a given server.""" 

124 if self._instance_name is None: 

125 server.add_bots_interface(self, instance_name) 

126 

127 self._instance_name = instance_name 

128 if self._scheduler is not None: 

129 self._scheduler.set_instance_name(instance_name) 

130 

131 else: 

132 raise AssertionError("Instance already registered") 

133 

134 @DurationMetric(BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, instanced=True) 

135 def create_bot_session(self, parent, bot_session): 

136 """ Creates a new bot session. Server should assign a unique 

137 name to the session. If a bot with the same bot id tries to 

138 register with the service, the old one should be closed along 

139 with all its jobs. 

140 """ 

141 if not bot_session.bot_id: 

142 raise InvalidArgumentError("Bot's id must be set by client.") 

143 

144 try: 

145 self._check_bot_ids(bot_session.bot_id) 

146 except DuplicateBotSessionError: 

147 pass 

148 

149 # Bot session name, selected by the server 

150 name = f"{parent}/{str(uuid.uuid4())}" 

151 bot_session.name = name 

152 

153 self._track_bot_session(name, bot_session.bot_id) 

154 

155 self._request_leases(bot_session, name=name) 

156 self._assign_deadline_for_botsession(bot_session, name) 

157 

158 self.__logger.info( 

159 f"Opened BotSession name=[{bot_session.name}] for bot_id=[{bot_session.bot_id}].") 

160 leases = ",".join(lease.id[:8] for lease in bot_session.leases) 

161 self.__logger.debug(f"Leases assigned to newly opened BotSession name=[{bot_session.name}] " 

162 f"for bot_id=[{bot_session.bot_id}]: [{leases}].") 

163 

164 self._update_status_count_for_bot_session(bot_session) 

165 return bot_session 

166 

167 def _track_bot_session(self, bot_session_name, bot_id, leases=()): 

168 self.__logger.debug( 

169 f"Now tracking BotSession name=[{bot_session_name}] " 

170 f"for bot_id=[{bot_id}] with leases=[{leases}]") 

171 

172 self._bot_ids[bot_session_name] = bot_id 

173 # We want to keep a copy of lease ids we have assigned 

174 leases_set = set(leases) 

175 with self._assigned_leases_lock: 

176 self._assigned_leases[bot_session_name] = leases_set 

177 

178 @DurationMetric(BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME, instanced=True) 

179 def update_bot_session(self, name, bot_session, deadline=None): 

180 """ Client updates the server. Any changes in state to the Lease should be 

181 registered server side. Assigns available leases with work. 

182 """ 

183 try: 

184 self._check_bot_ids(bot_session.bot_id, name) 

185 except (UnknownBotSessionError, BotSessionClosedError): 

186 if self._permissive_bot_session: 

187 # If in permissive mode, implicitly reopen botsession 

188 self.__logger.info( 

189 f"BotSession with bot_name=[{bot_session.name}] and bot_id=[{bot_session.bot_id}] " 

190 f"is now talking to this server instance. Confirming lease accuracy..." 

191 ) 

192 

193 lease_ids = [lease.id for lease in bot_session.leases] 

194 self._track_bot_session(name, bot_session.bot_id, lease_ids) 

195 

196 self.__logger.info( 

197 f"Successfully relocated BotSession with bot_name=[{bot_session.name}] " 

198 f"and bot_id=[{bot_session.bot_id}] with leases=[{lease_ids}].") 

199 else: 

200 # Default behavior is to raise those exceptions 

201 # and handle them accordingly at a higher level 

202 self.__logger.warning( 

203 f"Unknown bot with bot_name=[{bot_session.name}] and " 

204 f"bot_id=[{bot_session.bot_id}]; permissive BotSession mode disabled.") 

205 raise 

206 

207 self._check_assigned_leases(bot_session) 

208 

209 # Stop tracking the prior deadline since we have heard back 

210 # by the deadline we had announced, now we're going to prepare 

211 # a new BotSession for the bot and once done assign a new deadline. 

212 self._untrack_deadline_for_botsession(bot_session.name) 

213 

214 # Go over all the leases the bot sent us in this UpdateBotSession request 

215 lease_removed = False 

216 for lease in list(bot_session.leases): 

217 # See if we need to tell the bot about a lease update (e.g. execution-service side cancellation) 

218 # and whether we need to update our datastore (e.g. bot finished the lease) 

219 lease_to_send_to_bot, update_datastore_from_lease = self._check_lease_state(lease) 

220 

221 if update_datastore_from_lease: 

222 try: 

223 self._scheduler.update_job_lease_state(lease.id, lease) 

224 except NotFoundError: 

225 # Lease no longer exists or is already completed, be sure it gets removed from the bot 

226 self.__logger.debug("Lease Id not found when updating job lease state, removing from bot.") 

227 lease_to_send_to_bot = None 

228 

229 if lease_to_send_to_bot: 

230 # Replace old lease with updated lease (e.g. communicating cancellation) 

231 # only if there are changes (otherwise the previous lease/state is kept) 

232 if lease_to_send_to_bot != lease: 

233 bot_session.leases.remove(lease) 

234 bot_session.leases.append(lease_to_send_to_bot) 

235 else: 

236 # We want to remove the lease from this bot and update our records accordingly 

237 # For example the bot has completed the lease (and results were stored above), or, 

238 # the bot had to cancel 

239 with self._assigned_leases_lock: 

240 if name in self._assigned_leases: 

241 try: 

242 self._assigned_leases[name].remove(lease.id) 

243 self.__logger.debug(f"Removed lease id=[{lease.id}] from bot=[{name}]") 

244 except KeyError: 

245 self.__logger.info(f"Lease id=[{lease.id}] already removed from bot=[{name}]") 

246 try: 

247 self._scheduler.delete_job_lease(lease.id) 

248 except NotFoundError: 

249 # Job already dropped from scheduler 

250 pass 

251 except TimeoutError: 

252 self.__logger.warning(f"Could not delete job lease_id=[{lease.id}] due to timeout.", 

253 exc_info=True) 

254 bot_session.leases.remove(lease) 

255 lease_removed = True 

256 

257 # Don't set a deadline if leases were removed from this bot session 

258 # 

259 # Setting a "None" deadline has the effect of only assigning work if 

260 # it is available immediately, rather than waiting a period of time for 

261 # work to become available 

262 # 

263 # This mitigates situations where the scheduler is updated with the new 

264 # state of the lease, but a fault thereafter causes the worker to retry 

265 # the old UpdateBotSession call 

266 if lease_removed: 

267 deadline = None 

268 

269 self._request_leases(bot_session, deadline, name) 

270 

271 metadata = self._scheduler.get_metadata_for_leases( 

272 bot_session.leases, writeable_streams=True) 

273 

274 # Assign a new deadline to the BotSession 

275 self._assign_deadline_for_botsession(bot_session, name) 

276 

277 leases = ",".join(lease.id[:8] for lease in bot_session.leases) 

278 self.__logger.debug(f"Sending BotSession update for name=[{bot_session.name}], " 

279 f"bot_id=[{bot_session.bot_id}]: leases=[{leases}].") 

280 

281 self._update_status_count_for_bot_session(bot_session) 

282 return bot_session, metadata 

283 

284 def count_bots(self) -> int: 

285 return len(self._bot_ids) 

286 

287 def count_bots_by_status(self, status: BotStatus) -> int: 

288 return len(self._bot_name_by_status[status]) 

289 

290 # --- Private API --- 

291 def _request_leases(self, bot_session, deadline=None, name=None): 

292 # Only send one lease at a time currently. 

293 if bot_session.status == BotStatus.OK.value and not bot_session.leases: 

294 worker_capabilities = {} 

295 

296 # TODO? Fail if there are no devices in the worker? 

297 if bot_session.worker.devices: 

298 # According to the spec: 

299 # "The first device in the worker is the "primary device" - 

300 # that is, the device running a bot and which is 

301 # responsible for actually executing commands." 

302 primary_device = bot_session.worker.devices[0] 

303 

304 for device_property in primary_device.properties: 

305 if device_property.key not in worker_capabilities: 

306 worker_capabilities[device_property.key] = set() 

307 worker_capabilities[device_property.key].add(device_property.value) 

308 self.__logger.debug(f"New Workers Capabilities: {worker_capabilities}") 

309 

310 # If the client specified deadline is less than NETWORK_TIMEOUT, 

311 # the response shouldn't long poll for work. 

312 if deadline and (deadline > NETWORK_TIMEOUT): 

313 deadline = deadline - NETWORK_TIMEOUT 

314 else: 

315 deadline = None 

316 

317 leases = self._scheduler.request_job_leases( 

318 worker_capabilities, 

319 timeout=deadline, 

320 worker_name=name, 

321 bot_id=self._bot_ids.get(name)) 

322 

323 if leases: 

324 with self._assigned_leases_lock: 

325 if bot_session.name in self._assigned_leases: 

326 for lease in leases: 

327 self._assigned_leases[bot_session.name].add(lease.id) 

328 bot_session.leases.extend(leases) 

329 else: 

330 # The BotSession may no longer exist, make sure the leases are re-queued! 

331 for lease in leases: 

332 self._scheduler.retry_job_lease(lease.id) 

333 self.__logger.info(f'BotSession name=[{name}] closed while trying to assign leases. ' 

334 f'Re-queued n=[{len(leases)}] leases=[{leases}].') 

335 raise BotSessionClosedError('BotSession closed while assigning leases. Re-queued leases.') 

336 

337 # Returns a Tuple[Optional[Lease], bool]: 

338 # (lease_to_send_to_bot, update_datastore_from_lease) 

339 def _check_lease_state(self, lease: bots_pb2.Lease) -> Tuple[Optional[bots_pb2.Lease], bool]: 

340 lease_state = LeaseState(lease.state) 

341 

342 # Bot has updated lease to cancelled, remove lease; 

343 # don't keep any other lease updates 

344 if lease_state == LeaseState.CANCELLED: 

345 return (None, False) 

346 

347 try: 

348 current_lease = self._scheduler.get_job_lease(lease.id) 

349 # get_job_lease will only return active leases in sql 

350 # data-store, so handle if no lease was returned 

351 if current_lease is None: 

352 return (None, False) 

353 except NotFoundError: 

354 # Job does not exist, remove lease from bot 

355 return (None, False) 

356 

357 # If the lease was marked cancelled on the buildgrid side 

358 # inform the bot (update lease, no need to update bgd datastore) 

359 if current_lease.state == LeaseState.CANCELLED.value: 

360 lease.state = LeaseState.CANCELLED.value 

361 return (lease, False) 

362 

363 # If the lease is already completed on the buildgrid side 

364 # remove it from the bot 

365 if current_lease.state == LeaseState.COMPLETED.value: 

366 return (None, False) 

367 

368 if lease_state == LeaseState.COMPLETED: 

369 return (None, True) 

370 

371 return (lease, True) 

372 

373 def _get_bot_id_from_bot_name_or_raise(self, name): 

374 """ Returns the bot_id corresponding to the passed `name`. 

375 Raises BotSessionClosedError if the botsession was recently closed. 

376 Raises UnknownBotSessionError if there is no such known BotSession. 

377 """ 

378 bot_id = self._bot_ids.get(name) 

379 if bot_id is None: 

380 eviction_record = self._evicted_bot_sessions.get(name) 

381 if eviction_record: 

382 raise BotSessionClosedError(f'Server has recently evicted the BotSession name=[{name}] at ' 

383 f'timestamp=[{eviction_record[0]}], reason=[{eviction_record[1]}]') 

384 raise UnknownBotSessionError('Unknown BotSession. BuildGrid has not seen a ' 

385 f'BotSession with name=[{name}] recently.') 

386 return bot_id 

387 

388 def _check_bot_ids(self, bot_id, name=None): 

389 """ Checks whether the ID and the name of the bot match, 

390 otherwise closes the bot sessions with that name or ID 

391 """ 

392 if name is not None: 

393 _bot_id = self._get_bot_id_from_bot_name_or_raise(name) 

394 if _bot_id != bot_id: 

395 self._close_bot_session(name, reason="bot_id mismatch between worker and bgd") 

396 raise BotSessionMismatchError( 

397 f'Mismatch between client supplied client_bot_id=[{bot_id}] and ' 

398 f'buildgrid record of bgd_bot_id=[{_bot_id}] for BotSession with name=[{name}].') 

399 else: 

400 for _name, _bot_id in self._bot_ids.items(): 

401 if bot_id == _bot_id: 

402 self._close_bot_session(_name, 

403 reason="Bot with same ID trying to create a new BotSession") 

404 raise DuplicateBotSessionError( 

405 f'Bot ID bot_id=[{bot_id}] already registered and given bgd_bot_name=[{_name}].') 

406 

407 def _assign_deadline_for_botsession(self, bot_session, bot_session_name): 

408 """ Assigns a deadline to the BotSession if bgd was configured to do so 

409 """ 

410 # Specify bot keepalive expiry time if timeout is set 

411 if self._bot_session_keepalive_timeout: 

412 # Calculate expire time 

413 expire_time_python = datetime.utcnow() + timedelta(seconds=self._bot_session_keepalive_timeout) 

414 

415 # Set it in the bot_session 

416 bot_session.expire_time.FromDatetime(expire_time_python) 

417 

418 # Keep track internally for the botsession reaper 

419 self._track_deadline_for_bot_session(bot_session_name, expire_time_python) 

420 

421 def _untrack_deadline_for_botsession(self, bot_session_name): 

422 """ Un-assigns the session reaper tracked deadline of the BotSession 

423 if bgd was configured to do so 

424 """ 

425 # Specify bot keepalive expiry time if timeout is set 

426 if self._bot_session_keepalive_timeout: 

427 self._track_deadline_for_bot_session(bot_session_name, None) 

428 

429 def _track_deadline_for_bot_session(self, bot_session_name, new_deadline): 

430 """ Updates the data structures keeping track of the last deadline 

431 we had assigned to this BotSession by name. 

432 When `new_deadline` is set to None, the deadline is unassigned. 

433 """ 

434 # Keep track of the next expire time to inform the watcher 

435 updated_next_expire_time = False 

436 

437 if new_deadline: 

438 # Since we're re-setting the update time for this bot, make sure to move it 

439 # to the end of the OrderedDict (if it was already tracked in the OrderedDict) 

440 try: 

441 self._ordered_expire_times_by_botsession.move_to_end(bot_session_name) 

442 except KeyError: 

443 pass 

444 

445 self._ordered_expire_times_by_botsession[bot_session_name] = new_deadline 

446 updated_next_expire_time = True 

447 else: 

448 try: 

449 if self._ordered_expire_times_by_botsession.pop(bot_session_name): 

450 updated_next_expire_time = True 

451 except KeyError: 

452 self.__logger.debug("Tried to un-assign deadline for bot_session_name=" 

453 f"[{bot_session_name}] but it had no deadline to begin with.") 

454 pass 

455 

456 # Make the botsession reaper thread look at the current new_deadline 

457 # (if it's nearer in the future) compared to the previously known `next_expire_time`. 

458 if updated_next_expire_time: 

459 if self._update_next_expire_time(compare_to=new_deadline): 

460 self._deadline_event.set() 

461 

462 def _check_assigned_leases(self, bot_session): 

463 """Makes sure that all the leases we knew of that were assigned to the bot 

464 are there, and automatically retries leases the bot may have dropped. 

465 """ 

466 session_lease_ids = [] 

467 

468 for lease in bot_session.leases: 

469 session_lease_ids.append(lease.id) 

470 

471 with self._assigned_leases_lock: 

472 if bot_session.name in self._assigned_leases: 

473 # In order to be able to remove leases while iterating 

474 # we need to get the values as a list and iterate over that 

475 # (python3 doesn't allow modifying containers during iteration) 

476 for lease_id in list(self._assigned_leases[bot_session.name]): 

477 if lease_id not in session_lease_ids: 

478 self.__logger.warning(f"Assigned lease id=[{lease_id}], " 

479 f"not found on bot with name=[{bot_session.name}] and " 

480 f"id=[{bot_session.bot_id}]. Retrying job. " 

481 "Did the bot crash and restart?") 

482 # Un-assign job from this botsession and let the scheduler handle it 

483 try: 

484 self._scheduler.retry_job_lease(lease_id) 

485 except NotFoundError: 

486 pass 

487 try: 

488 self._assigned_leases[bot_session.name].remove(lease_id) 

489 except KeyError: 

490 pass 

491 else: 

492 raise BotSessionClosedError(f"BotSession name=[{bot_session.name}] for bot_id=" 

493 f"[{bot_session.bot_id}] closed while checking leases.") 

494 

495 def _truncate_eviction_history(self): 

496 # Make sure we're only keeping the last N evicted sessions 

497 # NOTE: there could be some rare race conditions when the length of the OrderedDict is 

498 # only 1 below the limit; Multiple threads could check the size simultaneously before 

499 # they get to add their items in the OrderedDict, resulting in a size bigger than initially intented 

500 # (with a very unlikely upper bound of: 

501 # O(n) = `remember_last_n_evicted_bot_sessions` 

502 # + min(number_of_threads, number_of_concurrent_threads_cpu_can_handle)). 

503 # The size being only 1 below the limit could also happen when the OrderedDict contains 

504 # exactly `n` items and a thread trying to insert sees the limit has been reached and makes 

505 # just enough space to add its own item. 

506 # The cost of locking vs using a bit more memory for a few more items in-memory is high, thus 

507 # we opt for the unlikely event of the OrderedDict growing a bit more and 

508 # make the next thread which tries to to insert an item, clean up `while len > n`. 

509 while len(self._evicted_bot_sessions) > self._remember_last_n_evicted_bot_sessions: 

510 self._evicted_bot_sessions.popitem(last=False) 

511 

512 def _close_bot_session(self, name, *, reason): 

513 """ Before removing the session, close any leases and 

514 requeue with high priority. 

515 """ 

516 # If we had assigned an expire_time for this botsession, make sure to 

517 # clean up, regardless of the reason we end up closing this BotSession 

518 self._untrack_deadline_for_botsession(name) 

519 

520 retried_leases = 0 

521 total_leases = 0 

522 with self._assigned_leases_lock: 

523 if name in self._assigned_leases: 

524 total_leases = len(self._assigned_leases[name]) 

525 for lease_id in self._assigned_leases[name]: 

526 try: 

527 self._scheduler.retry_job_lease(lease_id) 

528 except NotFoundError: 

529 pass 

530 else: 

531 retried_leases += 1 

532 self._assigned_leases.pop(name) 

533 

534 self._truncate_eviction_history() 

535 # Record this eviction 

536 self._evicted_bot_sessions[name] = (datetime.utcnow(), reason) 

537 

538 try: 

539 bot_id = self._get_bot_id_from_bot_name_or_raise(name) 

540 self._bot_ids.pop(name) 

541 except (BotSessionMismatchError, DuplicateBotSessionError) as e: 

542 self.__logger.warning('Unable to identify `bot_id` associated with BotSession ' 

543 f'while closing the BotSession name=[{name}]: {e}') 

544 bot_id = 'unknown' 

545 

546 self._clear_status_count_for_bot_name(name) 

547 

548 self.__logger.info(f'Closed BotSession bot_id=[{bot_id}], name=[{name}], reason=[{reason}] ' 

549 f'and sucessfully requeued [{retried_leases}]/[{total_leases}] leases.') 

550 

551 def _update_next_expire_time(self, compare_to=None): 

552 """ 

553 If we don't have any more bot_session deadlines, clear out this variable 

554 to avoid busy-waiting. Otherwise, populate it with the next known expiry time 

555 either from the queue or by comparing to the optional argument `compare_to`. 

556 This method returns True/False indicating whether the `next_expire_time` 

557 was updated. 

558 """ 

559 if compare_to: 

560 # If we pass in a time earlier than the already known `next_expire_time` 

561 # or this is the first expire time we know of... set it to `compare_to` 

562 

563 # NOTE: We could end up in a race condition here, where threads could 

564 # update the `_next_expire_time` to their own value of `compare_to` 

565 # if at the time they checked that their time was "earlier" than the 

566 # shared `_next_expire_time`. 

567 # For the purpose this is used, this is an OK behavior since: 

568 # 1. If this method is called around the same time on different threads, 

569 # the expiry time should be very close (`delta`). 

570 # 2. We may end up waiting for an additional `delta` time to expire the first 

571 # session in the OrderedDict, and then rapidly close all the subsequent 

572 # sessions with expire_time < now. 

573 # This approach allows for potentially "lazy session expiry" (after an additional minimal `delta`), 

574 # giving priority to all the other work buildgrid needs to do, instead of using the overhead of 

575 # locking this and blocking up multiple threads to update this with each rpc. 

576 # TL;DR Approximation of the `next_expire_time` here is good enough for this purpose. 

577 if not self._next_expire_time or compare_to < self._next_expire_time: 

578 self._next_expire_time = compare_to 

579 return True 

580 else: 

581 _, next_expire_time_in_queue = self._get_next_botsession_expiry() 

582 # It is likely that the expire time we knew of is no longer in the OrderedDict 

583 # (e.g. we assigned a new one to that BotSession), thus this could be either 

584 # before or after the previously known `next_expire_time` 

585 if self._next_expire_time != next_expire_time_in_queue: 

586 self._next_expire_time = next_expire_time_in_queue 

587 return True 

588 

589 return False 

590 

591 def _next_expire_time_occurs_in(self): 

592 if self._next_expire_time: 

593 next_expire_time = (self._next_expire_time - datetime.utcnow()).total_seconds() 

594 # Check if this is in the future (> 0, negative values means expiry happened already!) 

595 if next_expire_time > 0: 

596 # Pad this with 0.1 second so that the expiry actually happens when we try to reap 

597 return round(next_expire_time + 0.1, 3) 

598 return 0 

599 

600 return None 

601 

602 def _get_next_botsession_expiry(self): 

603 botsession_name = None 

604 expire_time = None 

605 # We want to `peek` the first entry of the OrderedDict here 

606 # We do this by: 

607 # 1. Popping the first item (if any) 

608 # 2. Inserting that key-value pair again (goes to the end with the OrderedDict) 

609 # 3. Moving that newly re-inserted entry to the beginning (to preserve the order) 

610 # This should work exactly as a `peek` since we only pop the first item in the asyncio event loop, 

611 # and we know that all other items we add in this OrderedDict must be >= the current first in 

612 # terms of expiry (Thus re-adding it and moving it to first should still maintain the sorted order). 

613 try: 

614 botsession_name, expire_time = self._ordered_expire_times_by_botsession.popitem(last=False) 

615 except KeyError: 

616 pass # OrderedDict is empty, no BotSessions to check at this instant 

617 else: 

618 self._ordered_expire_times_by_botsession[botsession_name] = expire_time 

619 self._ordered_expire_times_by_botsession.move_to_end(botsession_name, last=False) 

620 

621 return (botsession_name, expire_time) 

622 

623 def _reap_expired_sessions(self): 

624 self.__logger.debug("Checking for expired BotSessions to reap...") 

625 next_botsession_name_to_expire, next_botsession_expire_time = self._get_next_botsession_expiry() 

626 while next_botsession_expire_time and next_botsession_expire_time <= datetime.utcnow(): 

627 # This is the last deadline we have communicated with this bot... 

628 # It has expired. 

629 # If there is no bot_id -> bot_name mapping anymore, Bot may have opened a new BotSession 

630 bot_id = self._bot_ids.get(next_botsession_name_to_expire) 

631 

632 self.__logger.warning( 

633 f"BotSession name=[{next_botsession_name_to_expire}] for bot_id=[{bot_id}] " 

634 f"with deadline=[{next_botsession_expire_time}] has expired.") 

635 try: 

636 self._close_bot_session(next_botsession_name_to_expire, reason="expired") 

637 except BotSessionClosedError: 

638 self.__logger.warning( 

639 f"Expired BotSession name=[{next_botsession_name_to_expire}] " 

640 f"for bot_id=[{self._bot_ids.get(next_botsession_name_to_expire)}] " 

641 f"with deadline=[{next_botsession_expire_time}] was already closed.") 

642 pass 

643 

644 next_botsession_name_to_expire, next_botsession_expire_time = self._get_next_botsession_expiry() 

645 

646 self._update_next_expire_time() 

647 

648 async def _reap_expired_sessions_loop(self): 

649 try: 

650 self.__logger.info( 

651 "Starting BotSession reaper, bot_session_keepalive_timeout=" 

652 f"[{self._bot_session_keepalive_timeout}].") 

653 while True: 

654 try: 

655 # for <= 0, assume something expired already 

656 expires_in = self._next_expire_time_occurs_in() 

657 if expires_in: 

658 self.__logger.debug( 

659 f"Waiting for an event indicating earlier expiry or wait=[{expires_in}] " 

660 "for the next BotSession to expire.") 

661 else: 

662 self.__logger.debug("No more BotSessions to watch for expiry, waiting for new BotSessions.") 

663 await asyncio.wait_for(self._deadline_event.wait(), timeout=expires_in) 

664 self._deadline_event.clear() 

665 except asyncio.TimeoutError: 

666 pass 

667 

668 self._reap_expired_sessions() 

669 except asyncio.CancelledError: 

670 self.__logger.info("Cancelled reaper task.") 

671 pass 

672 except Exception as exception: 

673 self.__logger.exception(exception) 

674 raise 

675 

676 def _setup_bot_session_reaper_loop(self): 

677 if self._bot_session_keepalive_timeout: 

678 if self._bot_session_keepalive_timeout <= 0: 

679 raise InvalidArgumentError( 

680 f"[bot_session_keepalive_timeout] set to [{self._bot_session_keepalive_timeout}], " 

681 "must be > 0, in seconds") 

682 

683 # Add the expired session reaper in the event loop 

684 main_loop = asyncio.get_event_loop() 

685 main_loop.create_task(self._reap_expired_sessions_loop()) 

686 return True 

687 return False 

688 

689 def _clear_status_count_for_bot_name(self, bot_name: str) -> None: 

690 with self._bot_name_by_status_lock: 

691 for status in self._bot_name_by_status: 

692 self._bot_name_by_status[status].discard(bot_name) 

693 

694 def _update_status_count_for_bot_session(self, bot_session: bots_pb2.BotSession) -> None: 

695 bot_name = bot_session.name 

696 bot_status = BotStatus(bot_session.status) 

697 

698 self._clear_status_count_for_bot_name(bot_name) 

699 with self._bot_name_by_status_lock: 

700 try: 

701 self._bot_name_by_status[bot_status].add(bot_name) 

702 except KeyError: 

703 # We are not tracking this bot status, this will be a no-op 

704 pass