Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/instance.py: 94.05%

84 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from contextlib import ExitStack 

17from typing import Any, Dict, List, Optional, Tuple 

18 

19from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import DESCRIPTOR as BOTS_DESCRIPTOR 

20from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession, Lease 

21from buildgrid.server.context import current_instance 

22from buildgrid.server.enums import BotStatus 

23from buildgrid.server.exceptions import InvalidArgumentError 

24from buildgrid.server.logging import buildgrid_logger 

25from buildgrid.server.scheduler import Scheduler 

26from buildgrid.server.servicer import Instance 

27from buildgrid.server.settings import MAX_WORKER_TTL, NETWORK_TIMEOUT 

28from buildgrid.server.utils.cancellation import CancellationContext 

29 

30LOGGER = buildgrid_logger(__name__) 

31 

32 

33class BotsInterface(Instance): 

34 SERVICE_NAME = BOTS_DESCRIPTOR.services_by_name["Bots"].full_name 

35 

36 def __init__(self, scheduler: Scheduler) -> None: 

37 self._stack = ExitStack() 

38 self.scheduler = scheduler 

39 

40 def start(self) -> None: 

41 self._stack.enter_context(self.scheduler) 

42 self._stack.enter_context(self.scheduler.job_assigner) 

43 self._stack.enter_context(self.scheduler.session_expiry_timer) 

44 

45 def stop(self) -> None: 

46 self._stack.close() 

47 LOGGER.info("Stopped Bots.") 

48 

49 def create_bot_session( 

50 self, bot_session: BotSession, context: CancellationContext, deadline: Optional[float] = None 

51 ) -> BotSession: 

52 """Creates a new bot session. Server should assign a unique 

53 name to the session. If the bot_id already exists in the database 

54 then any leases already assigned to that id are requeued 

55 (via close_bot_session) and then the name previously associated with 

56 the bot_id is replaced with the new name in the database. If the bot_id 

57 is not in the database, a new record is created. 

58 """ 

59 if not bot_session.bot_id: 

60 raise InvalidArgumentError("Bot's id must be set by client.") 

61 

62 # Create new record 

63 bot_session.name = self.scheduler.add_bot_entry( 

64 bot_session_id=bot_session.bot_id, bot_session_status=bot_session.status 

65 ) 

66 

67 LOGGER.info("Created new BotSession. Requesting leases.", tags=self._bot_log_tags(bot_session)) 

68 self._request_leases(bot_session, context, deadline=deadline) 

69 self._assign_deadline_for_botsession(bot_session) 

70 

71 LOGGER.debug("Completed CreateBotSession.", tags=self._bot_log_tags(bot_session)) 

72 return bot_session 

73 

74 def update_bot_session( 

75 self, bot_session: BotSession, context: CancellationContext, deadline: Optional[float] = None 

76 ) -> Tuple[BotSession, List[Tuple[str, bytes]]]: 

77 """Client updates the server. Any changes in state to the Lease should be 

78 registered server side. Assigns available leases with work. 

79 """ 

80 LOGGER.debug("Beginning initial lease synchronization.", tags=self._bot_log_tags(bot_session)) 

81 

82 orig_lease: Optional[Lease] = None 

83 if bot_session.leases: 

84 orig_lease = bot_session.leases.pop() 

85 

86 if updated_lease := self.scheduler.synchronize_bot_lease( 

87 bot_session.name, bot_session.bot_id, bot_session.status, orig_lease 

88 ): 

89 bot_session.leases.append(updated_lease) 

90 

91 LOGGER.debug("Completed initial lease synchronization.", tags=self._bot_log_tags(bot_session)) 

92 

93 # Don't request new leases if a lease was removed. This mitigates situations where the scheduler 

94 # is updated with the new state of the lease, but a fault thereafter causes the worker to retry 

95 # the old UpdateBotSession call 

96 if not orig_lease and not updated_lease: 

97 self._request_leases(bot_session, context, deadline=deadline) 

98 

99 metadata = self.scheduler.get_metadata_for_leases(bot_session.leases) 

100 self._assign_deadline_for_botsession(bot_session) 

101 

102 LOGGER.debug("Completed UpdateBotSession.", tags=self._bot_log_tags(bot_session)) 

103 return bot_session, metadata 

104 

105 def count_bots_by_status(self) -> Dict[BotStatus, int]: 

106 return self.scheduler.count_bots_by_status() 

107 

108 def _assign_deadline_for_botsession(self, bot_session: BotSession) -> None: 

109 bot_session.expire_time.FromDatetime( 

110 self.scheduler.refresh_bot_expiry_time(bot_session.name, bot_session.bot_id) 

111 ) 

112 

113 def _request_leases( 

114 self, 

115 bot_session: BotSession, 

116 context: CancellationContext, 

117 deadline: Optional[float] = None, 

118 ) -> None: 

119 # We do not assign new leases if we are not in the OK state. 

120 if bot_session.status != BotStatus.OK.value: 

121 LOGGER.debug("BotSession not healthy. Skipping lease request.", tags=self._bot_log_tags(bot_session)) 

122 return 

123 

124 # Only send one lease at a time currently. If any leases are set we can abort the request. 

125 if bot_session.leases: 

126 LOGGER.debug("BotSession already assigned. Skipping lease request.", tags=self._bot_log_tags(bot_session)) 

127 return 

128 

129 # If no deadline is set default to the max we allow workers to long-poll for work 

130 if deadline is None: 

131 deadline = MAX_WORKER_TTL 

132 

133 # If the specified bot session keepalive timeout is greater than the 

134 # deadline it can result in active bot sessions being reaped 

135 deadline = min(deadline, self.scheduler.bot_session_keepalive_timeout) 

136 

137 # Use 80% of the given deadline to give time to respond but no less than NETWORK_TIMEOUT 

138 ttl = deadline * 0.8 

139 if ttl < NETWORK_TIMEOUT: 

140 LOGGER.info( 

141 "BotSession expires in less time than timeout. No leases will be assigned.", 

142 tags={**self._bot_log_tags(bot_session), "network_timeout": NETWORK_TIMEOUT}, 

143 ) 

144 return 

145 

146 # Wait for an update to the bot session and then resynchronize the lease. 

147 LOGGER.debug("Waiting for job assignment.", tags={**self._bot_log_tags(bot_session), "deadline": deadline}) 

148 with self.scheduler.job_assigner.assignment_context(bot_session) as event: 

149 context.on_cancel(event.set) 

150 event.wait(ttl) 

151 

152 # This is a best-effort check the see if the original request is still alive. Depending on 

153 # network and proxy configurations, this status may not accurately reflect the state of the 

154 # client connection. If we know for certain that the request is no longer being monitored, 

155 # we can exit now to avoid state changes not being acked by the bot. 

156 if context.is_cancelled(): 

157 LOGGER.debug( 

158 "Bot request cancelled. Skipping lease synchronization.", tags=self._bot_log_tags(bot_session) 

159 ) 

160 return 

161 

162 # In the case that we had a timeout, we can return without post lease synchronization. This 

163 # helps deal with the case of uncommunicated cancellations from the bot request. If the bot 

164 # is actually still waiting on work, this will be immediately followed up by a new request 

165 # from the worker, where the initial synchronization will begin a bot ack for the pending 

166 # job. In the case that the request has been abandoned, it avoids competing updates to the 

167 # database records in the corresponding bots session. 

168 if not event.is_set(): 

169 LOGGER.debug( 

170 "Bot assignment timeout. Skipping lease synchronization.", tags=self._bot_log_tags(bot_session) 

171 ) 

172 return 

173 

174 # Synchronize the lease again to pick up db changes. 

175 LOGGER.debug("Synchronizing leases after job assignment wait.", tags=self._bot_log_tags(bot_session)) 

176 if lease := self.scheduler.synchronize_bot_lease( 

177 bot_session.name, bot_session.bot_id, bot_session.status, None 

178 ): 

179 bot_session.leases.append(lease) 

180 

181 def _bot_log_tags(self, bot_session: BotSession) -> Dict[str, Any]: 

182 lease_id, lease_state = None, None 

183 if bot_session.leases: 

184 lease_id, lease_state = bot_session.leases[0].id, bot_session.leases[0].state 

185 return { 

186 "instance_name": current_instance(), 

187 "request.bot_name": bot_session.name, 

188 "request.bot_id": bot_session.bot_id, 

189 "request.bot_status": bot_session.status, 

190 "request.lease_id": lease_id, 

191 "request.lease_state": lease_state, 

192 }