Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/instance.py: 92.11%

114 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17BotsInterface 

18================= 

19 

20Instance of the Remote Workers interface. 

21""" 

22import logging 

23import threading 

24from contextlib import ExitStack 

25from typing import List, Optional, Tuple 

26 

27from buildgrid._enums import BotStatus 

28from buildgrid._exceptions import InvalidArgumentError, RetriableError 

29from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import DESCRIPTOR as BOTS_DESCRIPTOR 

30from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession, Lease 

31from buildgrid.server.bots.job_assigner import JobAssigner 

32from buildgrid.server.metrics_names import ( 

33 BOTS_CREATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME, 

34 BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, 

35 BOTS_UPDATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME, 

36 BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME, 

37) 

38from buildgrid.server.metrics_utils import DurationMetric, ExceptionCounter 

39from buildgrid.server.persistence.sql.impl import SQLDataStore 

40from buildgrid.server.servicer import Instance 

41from buildgrid.server.threading import ContextWorker 

42from buildgrid.server.utils.context import CancellationContext 

43from buildgrid.settings import MAX_WORKER_TTL, NETWORK_TIMEOUT 

44 

45LOGGER = logging.getLogger(__name__) 

46 

47 

48class BotsInterface(Instance): 

49 SERVICE_NAME = BOTS_DESCRIPTOR.services_by_name["Bots"].full_name 

50 

51 def __init__( 

52 self, 

53 scheduler: SQLDataStore, 

54 *, 

55 job_assignment_interval: float = 1.0, 

56 priority_assignment_percentage: int = 100, 

57 ) -> None: 

58 self._stack = ExitStack() 

59 

60 self._job_assigner = JobAssigner( 

61 scheduler, 

62 job_assignment_interval=job_assignment_interval, 

63 priority_percentage=priority_assignment_percentage, 

64 ) 

65 self.scheduler = scheduler 

66 

67 # Set the deadline event on worker stop to allow fast exit 

68 self.reaper = ContextWorker(self._reap_expired_sessions_loop, "BotReaper") 

69 

70 def start(self) -> None: 

71 self._stack.enter_context(self.scheduler) 

72 self._stack.enter_context(self._job_assigner) 

73 self._stack.enter_context(self.reaper) 

74 

75 def stop(self) -> None: 

76 self._stack.close() 

77 LOGGER.info(f"Stopped Bots instance for '{self._instance_name}'") 

78 

79 def set_instance_name(self, instance_name: str) -> None: 

80 super().set_instance_name(instance_name) 

81 self.scheduler.set_instance_name(instance_name) 

82 

83 create_bot_session_ignored_exceptions = (RetriableError,) 

84 

85 @DurationMetric(BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, instanced=True) 

86 @ExceptionCounter( 

87 BOTS_CREATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME, ignored_exceptions=create_bot_session_ignored_exceptions 

88 ) 

89 def create_bot_session( 

90 self, bot_session: BotSession, context: CancellationContext, deadline: Optional[float] = None 

91 ) -> BotSession: 

92 """Creates a new bot session. Server should assign a unique 

93 name to the session. If the bot_id already exists in the database 

94 then any leases already assigned to that id are requeued 

95 (via close_bot_session) and then the name previously associated with 

96 the bot_id is replaced with the new name in the database. If the bot_id 

97 is not in the database, a new record is created. 

98 """ 

99 if not bot_session.bot_id: 

100 raise InvalidArgumentError("Bot's id must be set by client.") 

101 

102 # Create new record 

103 bot_session.name = self.scheduler.add_bot_entry( 

104 bot_session_id=bot_session.bot_id, bot_session_status=bot_session.status 

105 ) 

106 

107 LOGGER.info(f"Created new BotSession. Requesting leases. {self._bot_log_tags(bot_session)}") 

108 self._request_leases(bot_session, context, deadline=deadline) 

109 self._assign_deadline_for_botsession(bot_session) 

110 

111 LOGGER.debug(f"Completed CreateBotSession. {self._bot_log_tags(bot_session)}") 

112 return bot_session 

113 

114 update_bot_session_ignored_exceptions = (RetriableError,) 

115 

116 @DurationMetric(BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME, instanced=True) 

117 @ExceptionCounter( 

118 BOTS_UPDATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME, ignored_exceptions=update_bot_session_ignored_exceptions 

119 ) 

120 def update_bot_session( 

121 self, bot_session: BotSession, context: CancellationContext, deadline: Optional[float] = None 

122 ) -> Tuple[BotSession, List[Tuple[str, bytes]]]: 

123 """Client updates the server. Any changes in state to the Lease should be 

124 registered server side. Assigns available leases with work. 

125 """ 

126 LOGGER.debug(f"Beginning initial lease synchronization. {self._bot_log_tags(bot_session)}") 

127 

128 orig_lease: Optional[Lease] = None 

129 if bot_session.leases: 

130 orig_lease = bot_session.leases.pop() 

131 

132 if updated_lease := self.scheduler.synchronize_bot_lease( 

133 bot_session.name, bot_session.bot_id, bot_session.status, orig_lease 

134 ): 

135 bot_session.leases.append(updated_lease) 

136 

137 LOGGER.debug(f"Completed initial lease synchronization. {self._bot_log_tags(bot_session)}") 

138 

139 # Don't request new leases if a lease was removed. This mitigates situations where the scheduler 

140 # is updated with the new state of the lease, but a fault thereafter causes the worker to retry 

141 # the old UpdateBotSession call 

142 if not orig_lease and not updated_lease: 

143 self._request_leases(bot_session, context, deadline=deadline) 

144 

145 metadata = self.scheduler.get_metadata_for_leases(bot_session.leases) 

146 self._assign_deadline_for_botsession(bot_session) 

147 

148 LOGGER.debug(f"Completed UpdateBotSession. {self._bot_log_tags(bot_session)}") 

149 return bot_session, metadata 

150 

151 def count_bots(self) -> int: 

152 return self.scheduler.count_bots() 

153 

154 def count_bots_by_status(self, status: BotStatus) -> int: 

155 return self.scheduler.count_bots_by_status(status) 

156 

157 def _assign_deadline_for_botsession(self, bot_session: BotSession) -> None: 

158 bot_session.expire_time.FromDatetime( 

159 self.scheduler.refresh_bot_expiry_time(bot_session.name, bot_session.bot_id) 

160 ) 

161 

162 def _request_leases( 

163 self, 

164 bot_session: BotSession, 

165 context: CancellationContext, 

166 deadline: Optional[float] = None, 

167 ) -> None: 

168 # We do not assign new leases if we are not in the OK state. 

169 if bot_session.status != BotStatus.OK.value: 

170 LOGGER.debug(f"BotSession not healthy. Skipping lease request. {self._bot_log_tags(bot_session)}") 

171 return 

172 

173 # Only send one lease at a time currently. If any leases are set we can abort the request. 

174 if bot_session.leases: 

175 LOGGER.debug(f"BotSession already assigned. Skipping lease request. {self._bot_log_tags(bot_session)}") 

176 return 

177 

178 # If no deadline is set default to the max we allow workers to long-poll for work 

179 if deadline is None: 

180 deadline = MAX_WORKER_TTL 

181 

182 # If the specified bot session keepalive timeout is greater than the 

183 # deadline it can result in active bot sessions being reaped 

184 deadline = min(deadline, self.scheduler.bot_session_keepalive_timeout) 

185 

186 # Use 80% of the given deadline to give time to respond but no less than NETWORK_TIMEOUT 

187 ttl = deadline * 0.8 

188 if ttl < NETWORK_TIMEOUT: 

189 LOGGER.info( 

190 f"BotSession expires in less time than NETWORK_TIMEOUT=[{NETWORK_TIMEOUT}]," 

191 f" no leases will be assigned. {self._bot_log_tags(bot_session)}" 

192 ) 

193 return 

194 

195 # Wait for an update to the bot session and then resynchronize the lease. 

196 LOGGER.debug(f"Waiting for job assignment. {self._bot_log_tags(bot_session)} deadline=[{deadline:.2f}]") 

197 with self._job_assigner.assignment_context(bot_session) as event: 

198 context.on_cancel(event.set) 

199 event.wait(ttl) 

200 

201 # This is a best-effort check the see if the original request is still alive. Depending on 

202 # network and proxy configurations, this status may not accurately reflect the state of the 

203 # client connection. If we know for certain that the request is no longer being monitored, 

204 # we can exit now to avoid state changes not being acked by the bot. 

205 if context.is_cancelled(): 

206 LOGGER.debug(f"Bot request cancelled. Skipping lease synchronization. {self._bot_log_tags(bot_session)}") 

207 return 

208 

209 # In the case that we had a timeout, we can return without post lease synchronization. This 

210 # helps deal with the case of uncommunicated cancellations from the bot request. If the bot 

211 # is actually still waiting on work, this will be immediately followed up by a new request 

212 # from the worker, where the initial synchronization will begin a bot ack for the pending 

213 # job. In the case that the request has been abandoned, it avoids competing updates to the 

214 # database records in the corresponding bots session. 

215 if not event.is_set(): 

216 LOGGER.debug(f"Bot assignment timeout. Skipping lease synchronization. {self._bot_log_tags(bot_session)}") 

217 return 

218 

219 # Synchronize the lease again to pick up db changes. 

220 LOGGER.debug(f"Synchronizing leases after job assignment wait. {self._bot_log_tags(bot_session)}") 

221 if lease := self.scheduler.synchronize_bot_lease( 

222 bot_session.name, bot_session.bot_id, bot_session.status, None 

223 ): 

224 bot_session.leases.append(lease) 

225 

226 def _reap_expired_sessions_loop(self, shutdown_requested: threading.Event) -> None: 

227 LOGGER.info( 

228 "Starting BotSession reaper, " 

229 f"bot_session_keepalive_timeout=[{self.scheduler.bot_session_keepalive_timeout}]" 

230 ) 

231 while not shutdown_requested.is_set(): 

232 try: 

233 while self.scheduler.reap_expired_sessions(): 

234 if shutdown_requested.is_set(): 

235 break 

236 except Exception as exception: 

237 LOGGER.exception(exception) 

238 shutdown_requested.wait(timeout=self.scheduler.poll_interval) 

239 

240 def _bot_log_tags(self, bot_session: BotSession) -> str: 

241 log_tags = ( 

242 f"instance_name=[{self._instance_name}]" 

243 f" request.bot_name=[{bot_session.name}]" 

244 f" request.bot_id=[{bot_session.bot_id}]" 

245 f" request.bot_status=[{bot_session.status}]" 

246 ) 

247 if bot_session.leases: 

248 lease = bot_session.leases[0] 

249 log_tags += f" request.lease_id=[{lease.id}] request.lease_state=[{lease.state}]" 

250 else: 

251 log_tags += " request.lease_id=[] request.lease_state=[]" 

252 return log_tags