Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/instance.py: 94.32%
88 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-03-13 15:36 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-03-13 15:36 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from contextlib import ExitStack
17from typing import Any
19from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecutedActionMetadata
20from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import DESCRIPTOR as BOTS_DESCRIPTOR
21from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession, Lease
22from buildgrid.server.context import current_instance
23from buildgrid.server.enums import BotStatus
24from buildgrid.server.exceptions import InvalidArgumentError
25from buildgrid.server.logging import buildgrid_logger
26from buildgrid.server.scheduler import Scheduler
27from buildgrid.server.scheduler.impl import BotMetrics
28from buildgrid.server.servicer import Instance
29from buildgrid.server.settings import MAX_WORKER_TTL, NETWORK_TIMEOUT
30from buildgrid.server.utils.cancellation import CancellationContext
32LOGGER = buildgrid_logger(__name__)
35class BotsInterface(Instance):
36 SERVICE_NAME = BOTS_DESCRIPTOR.services_by_name["Bots"].full_name
38 def __init__(self, scheduler: Scheduler) -> None:
39 self._stack = ExitStack()
40 self.scheduler = scheduler
42 def start(self) -> None:
43 self._stack.enter_context(self.scheduler)
44 self._stack.enter_context(self.scheduler.job_assigner)
45 if self.scheduler.session_expiry_interval > 0:
46 self._stack.enter_context(self.scheduler.session_expiry_timer)
48 def stop(self) -> None:
49 self._stack.close()
50 LOGGER.info("Stopped Bots.")
52 def create_bot_session(
53 self, bot_session: BotSession, context: CancellationContext, deadline: float | None = None
54 ) -> BotSession:
55 """Creates a new bot session. Server should assign a unique
56 name to the session. If the bot_id already exists in the database
57 then any leases already assigned to that id are requeued
58 (via close_bot_session) and then the name previously associated with
59 the bot_id is replaced with the new name in the database. If the bot_id
60 is not in the database, a new record is created.
61 """
62 if not bot_session.bot_id:
63 raise InvalidArgumentError("Bot's id must be set by client.")
65 labels = self.scheduler.property_set.bot_property_labels(bot_session)
67 # Create new record
68 bot_session.name = self.scheduler.add_bot_entry(
69 bot_session_id=bot_session.bot_id,
70 bot_session_status=bot_session.status,
71 bot_property_labels=labels,
72 )
74 LOGGER.info("Created new BotSession. Requesting leases.", tags=self._bot_log_tags(bot_session))
75 self._request_leases(bot_session, context, deadline=deadline)
76 self._assign_deadline_for_botsession(bot_session)
78 LOGGER.debug("Completed CreateBotSession.", tags=self._bot_log_tags(bot_session))
79 return bot_session
81 def update_bot_session(
82 self,
83 bot_session: BotSession,
84 context: CancellationContext,
85 deadline: float | None = None,
86 partial_execution_metadata: dict[str, ExecutedActionMetadata] | None = None,
87 ) -> tuple[BotSession, list[tuple[str, bytes]]]:
88 """Client updates the server. Any changes in state to the Lease should be
89 registered server side. Assigns available leases with work.
90 """
91 LOGGER.debug("Beginning initial lease synchronization.", tags=self._bot_log_tags(bot_session))
93 orig_lease: Lease | None = None
94 if bot_session.leases:
95 orig_lease = bot_session.leases.pop()
97 if updated_lease := self.scheduler.synchronize_bot_lease(
98 bot_session.name, bot_session.bot_id, bot_session.status, orig_lease, partial_execution_metadata
99 ):
100 bot_session.leases.append(updated_lease)
102 LOGGER.debug("Completed initial lease synchronization.", tags=self._bot_log_tags(bot_session))
104 # Don't request new leases if a lease was removed. This mitigates situations where the scheduler
105 # is updated with the new state of the lease, but a fault thereafter causes the worker to retry
106 # the old UpdateBotSession call
107 if not orig_lease and not updated_lease:
108 self._request_leases(bot_session, context, deadline=deadline)
110 metadata = self.scheduler.get_metadata_for_leases(bot_session.leases)
111 self._assign_deadline_for_botsession(bot_session)
113 LOGGER.debug("Completed UpdateBotSession.", tags=self._bot_log_tags(bot_session))
114 return bot_session, metadata
116 def get_bot_status_metrics(self) -> BotMetrics:
117 return self.scheduler.get_bot_status_metrics()
119 def _assign_deadline_for_botsession(self, bot_session: BotSession) -> None:
120 bot_session.expire_time.FromDatetime(
121 self.scheduler.refresh_bot_expiry_time(bot_session.name, bot_session.bot_id)
122 )
124 def _request_leases(
125 self,
126 bot_session: BotSession,
127 context: CancellationContext,
128 deadline: float | None = None,
129 ) -> None:
130 # We do not assign new leases if we are not in the OK state.
131 if bot_session.status != BotStatus.OK.value:
132 LOGGER.debug("BotSession not healthy. Skipping lease request.", tags=self._bot_log_tags(bot_session))
133 return
135 # Only send one lease at a time currently. If any leases are set we can abort the request.
136 if bot_session.leases:
137 LOGGER.debug("BotSession already assigned. Skipping lease request.", tags=self._bot_log_tags(bot_session))
138 return
140 # If no deadline is set default to the max we allow workers to long-poll for work
141 if deadline is None:
142 deadline = MAX_WORKER_TTL
144 # If the specified bot session keepalive timeout is greater than the
145 # deadline it can result in active bot sessions being reaped
146 deadline = min(deadline, self.scheduler.bot_session_keepalive_timeout)
148 # Use 80% of the given deadline to give time to respond but no less than NETWORK_TIMEOUT
149 ttl = deadline * 0.8
150 if ttl < NETWORK_TIMEOUT:
151 LOGGER.info(
152 "BotSession expires in less time than timeout. No leases will be assigned.",
153 tags={**self._bot_log_tags(bot_session), "network_timeout": NETWORK_TIMEOUT},
154 )
155 return
157 # Wait for an update to the bot session and then resynchronize the lease.
158 LOGGER.debug("Waiting for job assignment.", tags={**self._bot_log_tags(bot_session), "deadline": deadline})
159 with self.scheduler.job_assigner.assignment_context(bot_session) as event:
160 context.on_cancel(event.set)
161 event.wait(ttl)
163 # This is a best-effort check the see if the original request is still alive. Depending on
164 # network and proxy configurations, this status may not accurately reflect the state of the
165 # client connection. If we know for certain that the request is no longer being monitored,
166 # we can exit now to avoid state changes not being acked by the bot.
167 if context.is_cancelled():
168 LOGGER.debug(
169 "Bot request cancelled. Skipping lease synchronization.", tags=self._bot_log_tags(bot_session)
170 )
171 return
173 # In the case that we had a timeout, we can return without post lease synchronization. This
174 # helps deal with the case of uncommunicated cancellations from the bot request. If the bot
175 # is actually still waiting on work, this will be immediately followed up by a new request
176 # from the worker, where the initial synchronization will begin a bot ack for the pending
177 # job. In the case that the request has been abandoned, it avoids competing updates to the
178 # database records in the corresponding bots session.
179 if not event.is_set():
180 LOGGER.debug(
181 "Bot assignment timeout. Skipping lease synchronization.", tags=self._bot_log_tags(bot_session)
182 )
183 return
185 # Synchronize the lease again to pick up db changes.
186 LOGGER.debug("Synchronizing leases after job assignment wait.", tags=self._bot_log_tags(bot_session))
187 if lease := self.scheduler.synchronize_bot_lease(
188 bot_session.name, bot_session.bot_id, bot_session.status, None
189 ):
190 bot_session.leases.append(lease)
192 def _bot_log_tags(self, bot_session: BotSession) -> dict[str, Any]:
193 lease_id, lease_state = None, None
194 if bot_session.leases:
195 lease_id, lease_state = bot_session.leases[0].id, bot_session.leases[0].state
196 return {
197 "instance_name": current_instance(),
198 "request.bot_name": bot_session.name,
199 "request.bot_id": bot_session.bot_id,
200 "request.bot_status": bot_session.status,
201 "request.lease_id": lease_id,
202 "request.lease_state": lease_state,
203 }