Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/instance.py: 94.68%

94 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-04-15 14:01 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17BotsInterface 

18================= 

19 

20Instance of the Remote Workers interface. 

21""" 

22import logging 

23import threading 

24from contextlib import ExitStack 

25from typing import List, Optional, Tuple 

26 

27from buildgrid._enums import BotStatus 

28from buildgrid._exceptions import InvalidArgumentError, RetriableError 

29from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import DESCRIPTOR as BOTS_DESCRIPTOR 

30from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession, Lease 

31from buildgrid.server.bots.job_assigner import JobAssigner 

32from buildgrid.server.metrics_names import ( 

33 BOTS_CREATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME, 

34 BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, 

35 BOTS_UPDATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME, 

36 BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME, 

37) 

38from buildgrid.server.metrics_utils import DurationMetric, ExceptionCounter 

39from buildgrid.server.persistence.sql.impl import SQLDataStore 

40from buildgrid.server.servicer import Instance 

41from buildgrid.server.threading import ContextWorker 

42from buildgrid.server.utils.context import CancellationContext 

43from buildgrid.settings import MAX_WORKER_TTL, NETWORK_TIMEOUT 

44 

45LOGGER = logging.getLogger(__name__) 

46 

47 

48class BotsInterface(Instance): 

49 SERVICE_NAME = BOTS_DESCRIPTOR.services_by_name["Bots"].full_name 

50 

51 def __init__( 

52 self, 

53 scheduler: SQLDataStore, 

54 *, 

55 job_assignment_interval: float = 1.0, 

56 ) -> None: 

57 self._stack = ExitStack() 

58 

59 self._job_assigner = JobAssigner(scheduler, job_assignment_interval=job_assignment_interval) 

60 self.scheduler = scheduler 

61 

62 # Set the deadline event on worker stop to allow fast exit 

63 self.reaper = ContextWorker(self._reap_expired_sessions_loop, "BotReaper") 

64 

65 def start(self) -> None: 

66 self._stack.enter_context(self.scheduler) 

67 self._stack.enter_context(self._job_assigner) 

68 self._stack.enter_context(self.reaper) 

69 

70 def stop(self) -> None: 

71 self._stack.close() 

72 

73 def set_instance_name(self, instance_name: str) -> None: 

74 super().set_instance_name(instance_name) 

75 self.scheduler.set_instance_name(instance_name) 

76 

77 create_bot_session_ignored_exceptions = (RetriableError,) 

78 

79 @DurationMetric(BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, instanced=True) 

80 @ExceptionCounter( 

81 BOTS_CREATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME, ignored_exceptions=create_bot_session_ignored_exceptions 

82 ) 

83 def create_bot_session( 

84 self, bot_session: BotSession, context: CancellationContext, deadline: Optional[float] = None 

85 ) -> BotSession: 

86 """Creates a new bot session. Server should assign a unique 

87 name to the session. If the bot_id already exists in the database 

88 then any leases already assigned to that id are requeued 

89 (via close_bot_session) and then the name previously associated with 

90 the bot_id is replaced with the new name in the database. If the bot_id 

91 is not in the database, a new record is created. 

92 """ 

93 if not bot_session.bot_id: 

94 raise InvalidArgumentError("Bot's id must be set by client.") 

95 

96 # Create new record 

97 bot_session.name = self.scheduler.add_bot_entry( 

98 bot_session_id=bot_session.bot_id, bot_session_status=bot_session.status 

99 ) 

100 

101 LOGGER.info(f"Opened BotSession name=[{bot_session.name}] for bot_id=[{bot_session.bot_id}].") 

102 

103 self._request_leases(bot_session, context, deadline=deadline) 

104 

105 leases = ",".join(lease.id[:8] for lease in bot_session.leases) 

106 LOGGER.debug( 

107 f"Leases assigned to newly opened BotSession name=[{bot_session.name}] " 

108 f"for bot_id=[{bot_session.bot_id}]: [{leases}]." 

109 ) 

110 

111 # Update status for bot session 

112 self._assign_deadline_for_botsession(bot_session) 

113 

114 return bot_session 

115 

116 update_bot_session_ignored_exceptions = (RetriableError,) 

117 

118 @DurationMetric(BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME, instanced=True) 

119 @ExceptionCounter( 

120 BOTS_UPDATE_BOT_SESSION_EXCEPTION_COUNT_METRIC_NAME, ignored_exceptions=update_bot_session_ignored_exceptions 

121 ) 

122 def update_bot_session( 

123 self, bot_session: BotSession, context: CancellationContext, deadline: Optional[float] = None 

124 ) -> Tuple[BotSession, List[Tuple[str, bytes]]]: 

125 """Client updates the server. Any changes in state to the Lease should be 

126 registered server side. Assigns available leases with work. 

127 """ 

128 

129 orig_lease: Optional[Lease] = None 

130 if bot_session.leases: 

131 orig_lease = bot_session.leases.pop() 

132 if updated_lease := self.scheduler.synchronize_bot_lease( 

133 bot_session.name, bot_session.bot_id, bot_session.status, orig_lease 

134 ): 

135 bot_session.leases.append(updated_lease) 

136 

137 # Don't request new leases if a lease was removed. This mitigates situations where the scheduler 

138 # is updated with the new state of the lease, but a fault thereafter causes the worker to retry 

139 # the old UpdateBotSession call 

140 if not orig_lease and not updated_lease: 

141 self._request_leases(bot_session, context, deadline=deadline) 

142 

143 metadata = self.scheduler.get_metadata_for_leases(bot_session.leases) 

144 

145 leases = ",".join(lease.id[:8] for lease in bot_session.leases) 

146 LOGGER.debug( 

147 f"Sending BotSession update for name=[{bot_session.name}], " 

148 f"bot_id=[{bot_session.bot_id}]: leases=[{leases}]." 

149 ) 

150 

151 self._assign_deadline_for_botsession(bot_session) 

152 

153 return bot_session, metadata 

154 

155 def count_bots(self) -> int: 

156 return self.scheduler.count_bots() 

157 

158 def count_bots_by_status(self, status: BotStatus) -> int: 

159 return self.scheduler.count_bots_by_status(status) 

160 

161 def _assign_deadline_for_botsession(self, bot_session: BotSession) -> None: 

162 bot_session.expire_time.FromDatetime(self.scheduler.get_bot_expiry_time(bot_session.name, bot_session.bot_id)) 

163 

164 def _request_leases( 

165 self, 

166 bot_session: BotSession, 

167 context: CancellationContext, 

168 deadline: Optional[float] = None, 

169 ) -> None: 

170 # Only send one lease at a time currently. 

171 if bot_session.status != BotStatus.OK.value or bot_session.leases: 

172 return 

173 

174 # If no deadline is set default to the max we allow workers to 

175 # long-poll for work 

176 if deadline is None: 

177 deadline = MAX_WORKER_TTL 

178 

179 # If the specified bot session keepalive timeout is greater than the 

180 # deadline it can result in active bot sessions being reaped 

181 deadline = min(deadline, self.scheduler.bot_session_keepalive_timeout) 

182 

183 # Use 80% of the given deadline to give time to respond 

184 # but no less than NETWORK_TIMEOUT 

185 ttl = deadline * 0.8 

186 if ttl < NETWORK_TIMEOUT: 

187 LOGGER.info( 

188 f"BotSession name=[{bot_session.name}] expires in less time than " 

189 f"NETWORK_TIMEOUT=[{NETWORK_TIMEOUT}], no leases will be assigned" 

190 ) 

191 return 

192 

193 # Wait for an update to the bot session and then resynchronize the lease. 

194 with self._job_assigner.assignment_context(bot_session) as event: 

195 context.on_cancel(event.set) 

196 event.wait(ttl) 

197 

198 # Synchronize the lease again to pick up db changes. 

199 if lease := self.scheduler.synchronize_bot_lease( 

200 bot_session.name, bot_session.bot_id, bot_session.status, None 

201 ): 

202 bot_session.leases.append(lease) 

203 

204 def _reap_expired_sessions_loop(self, shutdown_requested: threading.Event) -> None: 

205 LOGGER.info( 

206 "Starting BotSession reaper, " 

207 f"bot_session_keepalive_timeout=[{self.scheduler.bot_session_keepalive_timeout}]" 

208 ) 

209 while not shutdown_requested.is_set(): 

210 try: 

211 while self.scheduler.reap_expired_sessions(): 

212 if shutdown_requested.is_set(): 

213 break 

214 except Exception as exception: 

215 LOGGER.exception(exception) 

216 shutdown_requested.wait(timeout=self.scheduler.poll_interval)