Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/bots/service.py: 38.64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

88 statements  

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License' is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Bots service 

18============ 

19 

20The Bots service is responsible for assigning work to the various worker 

21(sometimes "bot") machines connected to the grid. These workers communicate 

22with the Bots service using the `Remote Workers API`_. 

23 

24Like the other BuildGrid services, the Bots service uses the concept of 

25instance names to contain specific sets of workers. These names could refer 

26to (e.g.) a project-specific BuildGrid instance as part of a wider managed 

27deployment. 

28 

29Workers specify an instance name as part of the request messages in the Remote 

30Workers API, and the gRPC servicer for the Bots service routes the requests 

31to the correct **instance**. The actual functionality for assigning work to 

32workers and handling progress updates from workers lives in the 

33``BotsInstance`` instances which get registered with the gRPC servicer. 

34 

35.. _Remote Workers API: https://github.com/googleapis/googleapis/tree/master/google/devtools/remoteworkers/v1test2 

36 

37""" 

38 

39 

40from functools import partial 

41import logging 

42from typing import Dict 

43 

44import grpc 

45 

46from buildgrid._exceptions import ( 

47 InvalidArgumentError, BotSessionClosedError, 

48 UnknownBotSessionError, BotSessionMismatchError, 

49 RetriableError 

50) 

51from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import ( 

52 BotSession, 

53 CreateBotSessionRequest, 

54 UpdateBotSessionRequest 

55) 

56from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc 

57from buildgrid.server._authentication import AuthContext, authorize 

58from buildgrid.server.metrics_names import ( 

59 BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, 

60 BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME 

61) 

62from buildgrid.server.metrics_utils import DurationMetric 

63from buildgrid.server.rabbitmq.bots.instance import BotsInstance 

64 

65 

66class BotsService(bots_pb2_grpc.BotsServicer): 

67 

68 """gRPC servicer class for the Bots service. 

69 

70 This is the servicer which gets attached to a gRPC server and given 

71 incoming requests to handle. 

72 

73 Those requests are routed to the ``BotsInstances`` registered with 

74 this servicer using the ``parent`` field in ``CreateBotSession`` 

75 requests and the prefix of the server-assigned bot session name in 

76 ``UpdateBotSession`` requests. 

77 

78 The actual ``BotSession`` management is done on a per-instance basis 

79 in the ``BotsInstance`` class. 

80 

81 """ 

82 

83 def __init__(self, server: grpc.Server, enable_metrics: bool=False): 

84 """Instantiate a new Bots service. 

85 

86 Args: 

87 server (grpc.Server): The gRPC server to add this request 

88 servicer to. 

89 enable_metrics (bool): Whether or not to enable metrics 

90 publishing for this Bots service. 

91 

92 """ 

93 self._logger = logging.getLogger(__name__) 

94 self._metrics_enabled = enable_metrics 

95 self._instances: Dict[str, BotsInstance] = {} 

96 

97 bots_pb2_grpc.add_BotsServicer_to_server(self, server) 

98 

99 def add_instance(self, name: str, instance: BotsInstance) -> None: 

100 """Register a new instance with the Bots service. 

101 

102 Takes a name and a ``BotsInstance``, and registers the instance 

103 with the Bots service using the given name. This allows the 

104 Bots service to handle incoming requests specifying the given 

105 name as an instance name. 

106 

107 Args: 

108 name (str): The name of the instance to be registered. 

109 instance (BotsInstance): The instance to be registered. 

110 

111 """ 

112 self._instances[name] = instance 

113 

114 def _get_instance(self, name: str) -> BotsInstance: 

115 try: 

116 return self._instances[name] 

117 except KeyError: 

118 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]") 

119 

120 def start(self) -> None: 

121 for instance in self._instances.values(): 

122 instance.start() 

123 

124 def stop(self) -> None: 

125 for instance in self._instances.values(): 

126 instance.stop() 

127 

128 @authorize(AuthContext) 

129 @DurationMetric(BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME) 

130 def CreateBotSession( 

131 self, 

132 request: CreateBotSessionRequest, 

133 context: grpc.ServicerContext 

134 ) -> BotSession: 

135 """Handler for CreateBotSession requests. 

136 

137 This method takes a request to create a new ``BotSession`` in a 

138 specific instance of the Bots service. This request is handed to 

139 the specified instance (if it exists). The instance returns the newly 

140 registered ``BotSession`` or raises an error if the client-provided 

141 session details were unacceptable. 

142 

143 Args: 

144 request (CreateBotSessionRequest): The gRPC request to create a 

145 new ``BotSession``. 

146 context (grpc.ServicerContext): Context for this gRPC request. 

147 

148 Returns: 

149 A ``BotSession`` gRPC message with the server-assigned fields set. 

150 

151 """ 

152 self._logger.info(f"CreateBotSession request from [{context.peer()}]") 

153 

154 instance_name = request.parent 

155 bot_id = request.bot_session.bot_id 

156 

157 try: 

158 instance = self._get_instance(instance_name) 

159 context.add_callback(partial(self._on_disconnect_callback, 

160 instance_name, request.bot_session)) 

161 

162 bot_session = instance.create_bot_session(instance_name, 

163 request.bot_session, 

164 context.time_remaining()) 

165 

166 return bot_session 

167 

168 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

169 self._logger.info(e) 

170 context.set_details(str(e)) 

171 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

172 

173 except BotSessionClosedError as e: # Leases re-queued, not an error 

174 self._logger.debug(e) 

175 context.set_details(str(e)) 

176 context.set_code(grpc.StatusCode.DATA_LOSS) 

177 

178 except RetriableError as e: 

179 self._logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

180 context.abort_with_status(e.error_status) 

181 

182 except Exception as e: 

183 self._logger.exception( 

184 f"Unexpected error in CreateBotSession; bot_id=[{bot_id}]: " 

185 f"request=[{request}]" 

186 ) 

187 context.set_code(grpc.StatusCode.INTERNAL) 

188 

189 return BotSession() 

190 

191 @authorize(AuthContext) 

192 @DurationMetric(BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME) 

193 def UpdateBotSession( 

194 self, 

195 request: UpdateBotSessionRequest, 

196 context: grpc.ServicerContext 

197 ) -> BotSession: 

198 """Handler for UpdateBotSession requests. 

199 

200 This method takes a request to update an existing ``BotSession``. The 

201 server-assigned name is parsed to determine the instance the session 

202 belongs to, and then the bot-supplied ``BotSession`` state is given 

203 to that instance to handle. 

204 

205 Once the instance has handled any updates in either direction, the 

206 (potentially modified) ``BotSession`` is returned to the connected 

207 bot. 

208 

209 Args: 

210 request (UpdateBotSessionRequest): The incoming gRPC request to 

211 update a ``BotSession``. 

212 context (grpc.ServicerContext): Context for this gRPC request. 

213 

214 Returns: 

215 ``BotSession`` containing potentially updated server-assigned 

216 fields. 

217 """ 

218 self._logger.debug(f"UpdateBotSession request from [{context.peer()}]") 

219 

220 names = request.name.split("/") 

221 bot_id = request.bot_session.bot_id 

222 

223 try: 

224 instance_name = '/'.join(names[:-1]) 

225 context.add_callback(partial(self._on_disconnect_callback, 

226 instance_name, request.bot_session)) 

227 

228 instance = self._get_instance(instance_name) 

229 bot_session, metadata = instance.update_bot_session( 

230 request.name, 

231 request.bot_session, 

232 time_remaining=context.time_remaining()) 

233 

234 context.set_trailing_metadata(metadata) 

235 

236 return bot_session 

237 

238 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

239 self._logger.info(e) 

240 context.set_details(str(e)) 

241 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

242 

243 except BotSessionClosedError as e: # Leases re-queued, not an error 

244 self._logger.debug(e) 

245 context.set_details(str(e)) 

246 context.set_code(grpc.StatusCode.DATA_LOSS) 

247 

248 except RetriableError as e: 

249 self._logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

250 context.abort_with_status(e.error_status) 

251 

252 except Exception as e: 

253 self._logger.exception( 

254 f"Unexpected error in UpdateBotSession; bot_id=[{bot_id}]: " 

255 f"request=[{request}]" 

256 ) 

257 context.set_code(grpc.StatusCode.INTERNAL) 

258 

259 return BotSession() 

260 

261 def _on_disconnect_callback(self, instance_name: str, bot_session: BotSession) -> None: 

262 """Callback to run when an RPC connection is terminated. 

263 

264 Args: 

265 instance_name (str): The instance name that the terminated 

266 connection was using. 

267 bot_session (BotSession): The BotSession being used in the 

268 terminated RPC connection. 

269 

270 """ 

271 instance = self._get_instance(instance_name) 

272 instance.expire_assignment_for_bot_name(bot_session.name)