Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/instance.py: 94.32%

88 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-03-13 15:36 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from contextlib import ExitStack 

17from typing import Any 

18 

19from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecutedActionMetadata 

20from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import DESCRIPTOR as BOTS_DESCRIPTOR 

21from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession, Lease 

22from buildgrid.server.context import current_instance 

23from buildgrid.server.enums import BotStatus 

24from buildgrid.server.exceptions import InvalidArgumentError 

25from buildgrid.server.logging import buildgrid_logger 

26from buildgrid.server.scheduler import Scheduler 

27from buildgrid.server.scheduler.impl import BotMetrics 

28from buildgrid.server.servicer import Instance 

29from buildgrid.server.settings import MAX_WORKER_TTL, NETWORK_TIMEOUT 

30from buildgrid.server.utils.cancellation import CancellationContext 

31 

32LOGGER = buildgrid_logger(__name__) 

33 

34 

35class BotsInterface(Instance): 

36 SERVICE_NAME = BOTS_DESCRIPTOR.services_by_name["Bots"].full_name 

37 

38 def __init__(self, scheduler: Scheduler) -> None: 

39 self._stack = ExitStack() 

40 self.scheduler = scheduler 

41 

42 def start(self) -> None: 

43 self._stack.enter_context(self.scheduler) 

44 self._stack.enter_context(self.scheduler.job_assigner) 

45 if self.scheduler.session_expiry_interval > 0: 

46 self._stack.enter_context(self.scheduler.session_expiry_timer) 

47 

48 def stop(self) -> None: 

49 self._stack.close() 

50 LOGGER.info("Stopped Bots.") 

51 

52 def create_bot_session( 

53 self, bot_session: BotSession, context: CancellationContext, deadline: float | None = None 

54 ) -> BotSession: 

55 """Creates a new bot session. Server should assign a unique 

56 name to the session. If the bot_id already exists in the database 

57 then any leases already assigned to that id are requeued 

58 (via close_bot_session) and then the name previously associated with 

59 the bot_id is replaced with the new name in the database. If the bot_id 

60 is not in the database, a new record is created. 

61 """ 

62 if not bot_session.bot_id: 

63 raise InvalidArgumentError("Bot's id must be set by client.") 

64 

65 labels = self.scheduler.property_set.bot_property_labels(bot_session) 

66 

67 # Create new record 

68 bot_session.name = self.scheduler.add_bot_entry( 

69 bot_session_id=bot_session.bot_id, 

70 bot_session_status=bot_session.status, 

71 bot_property_labels=labels, 

72 ) 

73 

74 LOGGER.info("Created new BotSession. Requesting leases.", tags=self._bot_log_tags(bot_session)) 

75 self._request_leases(bot_session, context, deadline=deadline) 

76 self._assign_deadline_for_botsession(bot_session) 

77 

78 LOGGER.debug("Completed CreateBotSession.", tags=self._bot_log_tags(bot_session)) 

79 return bot_session 

80 

81 def update_bot_session( 

82 self, 

83 bot_session: BotSession, 

84 context: CancellationContext, 

85 deadline: float | None = None, 

86 partial_execution_metadata: dict[str, ExecutedActionMetadata] | None = None, 

87 ) -> tuple[BotSession, list[tuple[str, bytes]]]: 

88 """Client updates the server. Any changes in state to the Lease should be 

89 registered server side. Assigns available leases with work. 

90 """ 

91 LOGGER.debug("Beginning initial lease synchronization.", tags=self._bot_log_tags(bot_session)) 

92 

93 orig_lease: Lease | None = None 

94 if bot_session.leases: 

95 orig_lease = bot_session.leases.pop() 

96 

97 if updated_lease := self.scheduler.synchronize_bot_lease( 

98 bot_session.name, bot_session.bot_id, bot_session.status, orig_lease, partial_execution_metadata 

99 ): 

100 bot_session.leases.append(updated_lease) 

101 

102 LOGGER.debug("Completed initial lease synchronization.", tags=self._bot_log_tags(bot_session)) 

103 

104 # Don't request new leases if a lease was removed. This mitigates situations where the scheduler 

105 # is updated with the new state of the lease, but a fault thereafter causes the worker to retry 

106 # the old UpdateBotSession call 

107 if not orig_lease and not updated_lease: 

108 self._request_leases(bot_session, context, deadline=deadline) 

109 

110 metadata = self.scheduler.get_metadata_for_leases(bot_session.leases) 

111 self._assign_deadline_for_botsession(bot_session) 

112 

113 LOGGER.debug("Completed UpdateBotSession.", tags=self._bot_log_tags(bot_session)) 

114 return bot_session, metadata 

115 

116 def get_bot_status_metrics(self) -> BotMetrics: 

117 return self.scheduler.get_bot_status_metrics() 

118 

119 def _assign_deadline_for_botsession(self, bot_session: BotSession) -> None: 

120 bot_session.expire_time.FromDatetime( 

121 self.scheduler.refresh_bot_expiry_time(bot_session.name, bot_session.bot_id) 

122 ) 

123 

124 def _request_leases( 

125 self, 

126 bot_session: BotSession, 

127 context: CancellationContext, 

128 deadline: float | None = None, 

129 ) -> None: 

130 # We do not assign new leases if we are not in the OK state. 

131 if bot_session.status != BotStatus.OK.value: 

132 LOGGER.debug("BotSession not healthy. Skipping lease request.", tags=self._bot_log_tags(bot_session)) 

133 return 

134 

135 # Only send one lease at a time currently. If any leases are set we can abort the request. 

136 if bot_session.leases: 

137 LOGGER.debug("BotSession already assigned. Skipping lease request.", tags=self._bot_log_tags(bot_session)) 

138 return 

139 

140 # If no deadline is set default to the max we allow workers to long-poll for work 

141 if deadline is None: 

142 deadline = MAX_WORKER_TTL 

143 

144 # If the specified bot session keepalive timeout is greater than the 

145 # deadline it can result in active bot sessions being reaped 

146 deadline = min(deadline, self.scheduler.bot_session_keepalive_timeout) 

147 

148 # Use 80% of the given deadline to give time to respond but no less than NETWORK_TIMEOUT 

149 ttl = deadline * 0.8 

150 if ttl < NETWORK_TIMEOUT: 

151 LOGGER.info( 

152 "BotSession expires in less time than timeout. No leases will be assigned.", 

153 tags={**self._bot_log_tags(bot_session), "network_timeout": NETWORK_TIMEOUT}, 

154 ) 

155 return 

156 

157 # Wait for an update to the bot session and then resynchronize the lease. 

158 LOGGER.debug("Waiting for job assignment.", tags={**self._bot_log_tags(bot_session), "deadline": deadline}) 

159 with self.scheduler.job_assigner.assignment_context(bot_session) as event: 

160 context.on_cancel(event.set) 

161 event.wait(ttl) 

162 

163 # This is a best-effort check the see if the original request is still alive. Depending on 

164 # network and proxy configurations, this status may not accurately reflect the state of the 

165 # client connection. If we know for certain that the request is no longer being monitored, 

166 # we can exit now to avoid state changes not being acked by the bot. 

167 if context.is_cancelled(): 

168 LOGGER.debug( 

169 "Bot request cancelled. Skipping lease synchronization.", tags=self._bot_log_tags(bot_session) 

170 ) 

171 return 

172 

173 # In the case that we had a timeout, we can return without post lease synchronization. This 

174 # helps deal with the case of uncommunicated cancellations from the bot request. If the bot 

175 # is actually still waiting on work, this will be immediately followed up by a new request 

176 # from the worker, where the initial synchronization will begin a bot ack for the pending 

177 # job. In the case that the request has been abandoned, it avoids competing updates to the 

178 # database records in the corresponding bots session. 

179 if not event.is_set(): 

180 LOGGER.debug( 

181 "Bot assignment timeout. Skipping lease synchronization.", tags=self._bot_log_tags(bot_session) 

182 ) 

183 return 

184 

185 # Synchronize the lease again to pick up db changes. 

186 LOGGER.debug("Synchronizing leases after job assignment wait.", tags=self._bot_log_tags(bot_session)) 

187 if lease := self.scheduler.synchronize_bot_lease( 

188 bot_session.name, bot_session.bot_id, bot_session.status, None 

189 ): 

190 bot_session.leases.append(lease) 

191 

192 def _bot_log_tags(self, bot_session: BotSession) -> dict[str, Any]: 

193 lease_id, lease_state = None, None 

194 if bot_session.leases: 

195 lease_id, lease_state = bot_session.leases[0].id, bot_session.leases[0].state 

196 return { 

197 "instance_name": current_instance(), 

198 "request.bot_name": bot_session.name, 

199 "request.bot_id": bot_session.bot_id, 

200 "request.bot_status": bot_session.status, 

201 "request.lease_id": lease_id, 

202 "request.lease_state": lease_state, 

203 }