Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/instance.py: 94.68%
94 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17BotsInterface
18=================
20Instance of the Remote Workers interface.
21"""
22import logging
23import threading
24from contextlib import ExitStack
25from typing import List, Optional, Tuple
27from buildgrid._enums import BotStatus
28from buildgrid._exceptions import InvalidArgumentError, RetriableError
29from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import DESCRIPTOR as BOTS_DESCRIPTOR
30from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession, Lease
31from buildgrid.server.bots.job_assigner import JobAssigner
32from buildgrid.server.metrics_names import (
33 BOTS_CREATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME,
34 BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME,
35 BOTS_UPDATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME,
36 BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME,
37)
38from buildgrid.server.metrics_utils import DurationMetric, ExceptionCounter
39from buildgrid.server.persistence.sql.impl import SQLDataStore
40from buildgrid.server.servicer import Instance
41from buildgrid.server.threading import ContextWorker
42from buildgrid.server.utils.context import CancellationContext
43from buildgrid.settings import MAX_WORKER_TTL, NETWORK_TIMEOUT
45LOGGER = logging.getLogger(__name__)
48class BotsInterface(Instance):
49 SERVICE_NAME = BOTS_DESCRIPTOR.services_by_name["Bots"].full_name
51 def __init__(
52 self,
53 scheduler: SQLDataStore,
54 *,
55 job_assignment_interval: float = 1.0,
56 ) -> None:
57 self._stack = ExitStack()
59 self._job_assigner = JobAssigner(scheduler, job_assignment_interval=job_assignment_interval)
60 self.scheduler = scheduler
62 # Set the deadline event on worker stop to allow fast exit
63 self.reaper = ContextWorker(self._reap_expired_sessions_loop, "BotReaper")
65 def start(self) -> None:
66 self._stack.enter_context(self.scheduler)
67 self._stack.enter_context(self._job_assigner)
68 self._stack.enter_context(self.reaper)
70 def stop(self) -> None:
71 self._stack.close()
73 def set_instance_name(self, instance_name: str) -> None:
74 super().set_instance_name(instance_name)
75 self.scheduler.set_instance_name(instance_name)
77 create_bot_session_ignored_exceptions = (RetriableError,)
79 @DurationMetric(BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, instanced=True)
80 @ExceptionCounter(
81 BOTS_CREATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME, ignored_exceptions=create_bot_session_ignored_exceptions
82 )
83 def create_bot_session(
84 self, bot_session: BotSession, context: CancellationContext, deadline: Optional[float] = None
85 ) -> BotSession:
86 """Creates a new bot session. Server should assign a unique
87 name to the session. If the bot_id already exists in the database
88 then any leases already assigned to that id are requeued
89 (via close_bot_session) and then the name previously associated with
90 the bot_id is replaced with the new name in the database. If the bot_id
91 is not in the database, a new record is created.
92 """
93 if not bot_session.bot_id:
94 raise InvalidArgumentError("Bot's id must be set by client.")
96 # Create new record
97 bot_session.name = self.scheduler.add_bot_entry(
98 bot_session_id=bot_session.bot_id, bot_session_status=bot_session.status
99 )
101 LOGGER.info(f"Opened BotSession name=[{bot_session.name}] for bot_id=[{bot_session.bot_id}].")
103 self._request_leases(bot_session, context, deadline=deadline)
105 leases = ",".join(lease.id[:8] for lease in bot_session.leases)
106 LOGGER.debug(
107 f"Leases assigned to newly opened BotSession name=[{bot_session.name}] "
108 f"for bot_id=[{bot_session.bot_id}]: [{leases}]."
109 )
111 # Update status for bot session
112 self._assign_deadline_for_botsession(bot_session)
114 return bot_session
116 update_bot_session_ignored_exceptions = (RetriableError,)
118 @DurationMetric(BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME, instanced=True)
119 @ExceptionCounter(
120 BOTS_UPDATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME, ignored_exceptions=update_bot_session_ignored_exceptions
121 )
122 def update_bot_session(
123 self, bot_session: BotSession, context: CancellationContext, deadline: Optional[float] = None
124 ) -> Tuple[BotSession, List[Tuple[str, bytes]]]:
125 """Client updates the server. Any changes in state to the Lease should be
126 registered server side. Assigns available leases with work.
127 """
129 orig_lease: Optional[Lease] = None
130 if bot_session.leases:
131 orig_lease = bot_session.leases.pop()
132 if updated_lease := self.scheduler.synchronize_bot_lease(
133 bot_session.name, bot_session.bot_id, bot_session.status, orig_lease
134 ):
135 bot_session.leases.append(updated_lease)
137 # Don't request new leases if a lease was removed. This mitigates situations where the scheduler
138 # is updated with the new state of the lease, but a fault thereafter causes the worker to retry
139 # the old UpdateBotSession call
140 if not orig_lease and not updated_lease:
141 self._request_leases(bot_session, context, deadline=deadline)
143 metadata = self.scheduler.get_metadata_for_leases(bot_session.leases)
145 leases = ",".join(lease.id[:8] for lease in bot_session.leases)
146 LOGGER.debug(
147 f"Sending BotSession update for name=[{bot_session.name}], "
148 f"bot_id=[{bot_session.bot_id}]: leases=[{leases}]."
149 )
151 self._assign_deadline_for_botsession(bot_session)
153 return bot_session, metadata
155 def count_bots(self) -> int:
156 return self.scheduler.count_bots()
158 def count_bots_by_status(self, status: BotStatus) -> int:
159 return self.scheduler.count_bots_by_status(status)
161 def _assign_deadline_for_botsession(self, bot_session: BotSession) -> None:
162 bot_session.expire_time.FromDatetime(self.scheduler.get_bot_expiry_time(bot_session.name, bot_session.bot_id))
164 def _request_leases(
165 self,
166 bot_session: BotSession,
167 context: CancellationContext,
168 deadline: Optional[float] = None,
169 ) -> None:
170 # Only send one lease at a time currently.
171 if bot_session.status != BotStatus.OK.value or bot_session.leases:
172 return
174 # If no deadline is set default to the max we allow workers to
175 # long-poll for work
176 if deadline is None:
177 deadline = MAX_WORKER_TTL
179 # If the specified bot session keepalive timeout is greater than the
180 # deadline it can result in active bot sessions being reaped
181 deadline = min(deadline, self.scheduler.bot_session_keepalive_timeout)
183 # Use 80% of the given deadline to give time to respond
184 # but no less than NETWORK_TIMEOUT
185 ttl = deadline * 0.8
186 if ttl < NETWORK_TIMEOUT:
187 LOGGER.info(
188 f"BotSession name=[{bot_session.name}] expires in less time than "
189 f"NETWORK_TIMEOUT=[{NETWORK_TIMEOUT}], no leases will be assigned"
190 )
191 return
193 # Wait for an update to the bot session and then resynchronize the lease.
194 with self._job_assigner.assignment_context(bot_session) as event:
195 context.on_cancel(event.set)
196 event.wait(ttl)
198 # Synchronize the lease again to pick up db changes.
199 if lease := self.scheduler.synchronize_bot_lease(
200 bot_session.name, bot_session.bot_id, bot_session.status, None
201 ):
202 bot_session.leases.append(lease)
204 def _reap_expired_sessions_loop(self, shutdown_requested: threading.Event) -> None:
205 LOGGER.info(
206 "Starting BotSession reaper, "
207 f"bot_session_keepalive_timeout=[{self.scheduler.bot_session_keepalive_timeout}]"
208 )
209 while not shutdown_requested.is_set():
210 try:
211 while self.scheduler.reap_expired_sessions():
212 if shutdown_requested.is_set():
213 break
214 except Exception as exception:
215 LOGGER.exception(exception)
216 shutdown_requested.wait(timeout=self.scheduler.poll_interval)