Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/instance.py: 94.05%
84 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from contextlib import ExitStack
17from typing import Any, Dict, List, Optional, Tuple
19from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import DESCRIPTOR as BOTS_DESCRIPTOR
20from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession, Lease
21from buildgrid.server.context import current_instance
22from buildgrid.server.enums import BotStatus
23from buildgrid.server.exceptions import InvalidArgumentError
24from buildgrid.server.logging import buildgrid_logger
25from buildgrid.server.scheduler import Scheduler
26from buildgrid.server.servicer import Instance
27from buildgrid.server.settings import MAX_WORKER_TTL, NETWORK_TIMEOUT
28from buildgrid.server.utils.cancellation import CancellationContext
30LOGGER = buildgrid_logger(__name__)
33class BotsInterface(Instance):
34 SERVICE_NAME = BOTS_DESCRIPTOR.services_by_name["Bots"].full_name
36 def __init__(self, scheduler: Scheduler) -> None:
37 self._stack = ExitStack()
38 self.scheduler = scheduler
40 def start(self) -> None:
41 self._stack.enter_context(self.scheduler)
42 self._stack.enter_context(self.scheduler.job_assigner)
43 self._stack.enter_context(self.scheduler.session_expiry_timer)
45 def stop(self) -> None:
46 self._stack.close()
47 LOGGER.info("Stopped Bots.")
49 def create_bot_session(
50 self, bot_session: BotSession, context: CancellationContext, deadline: Optional[float] = None
51 ) -> BotSession:
52 """Creates a new bot session. Server should assign a unique
53 name to the session. If the bot_id already exists in the database
54 then any leases already assigned to that id are requeued
55 (via close_bot_session) and then the name previously associated with
56 the bot_id is replaced with the new name in the database. If the bot_id
57 is not in the database, a new record is created.
58 """
59 if not bot_session.bot_id:
60 raise InvalidArgumentError("Bot's id must be set by client.")
62 # Create new record
63 bot_session.name = self.scheduler.add_bot_entry(
64 bot_session_id=bot_session.bot_id, bot_session_status=bot_session.status
65 )
67 LOGGER.info("Created new BotSession. Requesting leases.", tags=self._bot_log_tags(bot_session))
68 self._request_leases(bot_session, context, deadline=deadline)
69 self._assign_deadline_for_botsession(bot_session)
71 LOGGER.debug("Completed CreateBotSession.", tags=self._bot_log_tags(bot_session))
72 return bot_session
74 def update_bot_session(
75 self, bot_session: BotSession, context: CancellationContext, deadline: Optional[float] = None
76 ) -> Tuple[BotSession, List[Tuple[str, bytes]]]:
77 """Client updates the server. Any changes in state to the Lease should be
78 registered server side. Assigns available leases with work.
79 """
80 LOGGER.debug("Beginning initial lease synchronization.", tags=self._bot_log_tags(bot_session))
82 orig_lease: Optional[Lease] = None
83 if bot_session.leases:
84 orig_lease = bot_session.leases.pop()
86 if updated_lease := self.scheduler.synchronize_bot_lease(
87 bot_session.name, bot_session.bot_id, bot_session.status, orig_lease
88 ):
89 bot_session.leases.append(updated_lease)
91 LOGGER.debug("Completed initial lease synchronization.", tags=self._bot_log_tags(bot_session))
93 # Don't request new leases if a lease was removed. This mitigates situations where the scheduler
94 # is updated with the new state of the lease, but a fault thereafter causes the worker to retry
95 # the old UpdateBotSession call
96 if not orig_lease and not updated_lease:
97 self._request_leases(bot_session, context, deadline=deadline)
99 metadata = self.scheduler.get_metadata_for_leases(bot_session.leases)
100 self._assign_deadline_for_botsession(bot_session)
102 LOGGER.debug("Completed UpdateBotSession.", tags=self._bot_log_tags(bot_session))
103 return bot_session, metadata
105 def count_bots_by_status(self) -> Dict[BotStatus, int]:
106 return self.scheduler.count_bots_by_status()
108 def _assign_deadline_for_botsession(self, bot_session: BotSession) -> None:
109 bot_session.expire_time.FromDatetime(
110 self.scheduler.refresh_bot_expiry_time(bot_session.name, bot_session.bot_id)
111 )
113 def _request_leases(
114 self,
115 bot_session: BotSession,
116 context: CancellationContext,
117 deadline: Optional[float] = None,
118 ) -> None:
119 # We do not assign new leases if we are not in the OK state.
120 if bot_session.status != BotStatus.OK.value:
121 LOGGER.debug("BotSession not healthy. Skipping lease request.", tags=self._bot_log_tags(bot_session))
122 return
124 # Only send one lease at a time currently. If any leases are set we can abort the request.
125 if bot_session.leases:
126 LOGGER.debug("BotSession already assigned. Skipping lease request.", tags=self._bot_log_tags(bot_session))
127 return
129 # If no deadline is set default to the max we allow workers to long-poll for work
130 if deadline is None:
131 deadline = MAX_WORKER_TTL
133 # If the specified bot session keepalive timeout is greater than the
134 # deadline it can result in active bot sessions being reaped
135 deadline = min(deadline, self.scheduler.bot_session_keepalive_timeout)
137 # Use 80% of the given deadline to give time to respond but no less than NETWORK_TIMEOUT
138 ttl = deadline * 0.8
139 if ttl < NETWORK_TIMEOUT:
140 LOGGER.info(
141 "BotSession expires in less time than timeout. No leases will be assigned.",
142 tags={**self._bot_log_tags(bot_session), "network_timeout": NETWORK_TIMEOUT},
143 )
144 return
146 # Wait for an update to the bot session and then resynchronize the lease.
147 LOGGER.debug("Waiting for job assignment.", tags={**self._bot_log_tags(bot_session), "deadline": deadline})
148 with self.scheduler.job_assigner.assignment_context(bot_session) as event:
149 context.on_cancel(event.set)
150 event.wait(ttl)
152 # This is a best-effort check the see if the original request is still alive. Depending on
153 # network and proxy configurations, this status may not accurately reflect the state of the
154 # client connection. If we know for certain that the request is no longer being monitored,
155 # we can exit now to avoid state changes not being acked by the bot.
156 if context.is_cancelled():
157 LOGGER.debug(
158 "Bot request cancelled. Skipping lease synchronization.", tags=self._bot_log_tags(bot_session)
159 )
160 return
162 # In the case that we had a timeout, we can return without post lease synchronization. This
163 # helps deal with the case of uncommunicated cancellations from the bot request. If the bot
164 # is actually still waiting on work, this will be immediately followed up by a new request
165 # from the worker, where the initial synchronization will begin a bot ack for the pending
166 # job. In the case that the request has been abandoned, it avoids competing updates to the
167 # database records in the corresponding bots session.
168 if not event.is_set():
169 LOGGER.debug(
170 "Bot assignment timeout. Skipping lease synchronization.", tags=self._bot_log_tags(bot_session)
171 )
172 return
174 # Synchronize the lease again to pick up db changes.
175 LOGGER.debug("Synchronizing leases after job assignment wait.", tags=self._bot_log_tags(bot_session))
176 if lease := self.scheduler.synchronize_bot_lease(
177 bot_session.name, bot_session.bot_id, bot_session.status, None
178 ):
179 bot_session.leases.append(lease)
181 def _bot_log_tags(self, bot_session: BotSession) -> Dict[str, Any]:
182 lease_id, lease_state = None, None
183 if bot_session.leases:
184 lease_id, lease_state = bot_session.leases[0].id, bot_session.leases[0].state
185 return {
186 "instance_name": current_instance(),
187 "request.bot_name": bot_session.name,
188 "request.bot_id": bot_session.bot_id,
189 "request.bot_status": bot_session.status,
190 "request.lease_id": lease_id,
191 "request.lease_state": lease_state,
192 }