Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/bots/service.py: 27.78%

90 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License' is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Bots service 

18============ 

19 

20The Bots service is responsible for assigning work to the various worker 

21(sometimes "bot") machines connected to the grid. These workers communicate 

22with the Bots service using the `Remote Workers API`_. 

23 

24Like the other BuildGrid services, the Bots service uses the concept of 

25instance names to contain specific sets of workers. These names could refer 

26to (e.g.) a project-specific BuildGrid instance as part of a wider managed 

27deployment. 

28 

29Workers specify an instance name as part of the request messages in the Remote 

30Workers API, and the gRPC servicer for the Bots service routes the requests 

31to the correct **instance**. The actual functionality for assigning work to 

32workers and handling progress updates from workers lives in the 

33``BotsInstance`` instances which get registered with the gRPC servicer. 

34 

35.. _Remote Workers API: https://github.com/googleapis/googleapis/tree/master/google/devtools/remoteworkers/v1test2 

36 

37""" 

38 

39 

40from functools import partial 

41import logging 

42from typing import Dict 

43 

44import grpc 

45 

46from buildgrid._exceptions import ( 

47 InvalidArgumentError, BotSessionClosedError, 

48 UnknownBotSessionError, BotSessionMismatchError, 

49 RetriableError 

50) 

51from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import ( 

52 BotSession, 

53 CreateBotSessionRequest, 

54 UpdateBotSessionRequest 

55) 

56from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc 

57from buildgrid.server._authentication import AuthContext, authorize 

58from buildgrid.server.metrics_names import ( 

59 BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, 

60 BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME 

61) 

62from buildgrid.server.metrics_utils import DurationMetric 

63from buildgrid.server.rabbitmq.bots.instance import BotsInstance 

64 

65 

66class BotsService(bots_pb2_grpc.BotsServicer): 

67 

68 """gRPC servicer class for the Bots service. 

69 

70 This is the servicer which gets attached to a gRPC server and given 

71 incoming requests to handle. 

72 

73 Those requests are routed to the ``BotsInstances`` registered with 

74 this servicer using the ``parent`` field in ``CreateBotSession`` 

75 requests and the prefix of the server-assigned bot session name in 

76 ``UpdateBotSession`` requests. 

77 

78 The actual ``BotSession`` management is done on a per-instance basis 

79 in the ``BotsInstance`` class. 

80 

81 """ 

82 

83 def __init__(self, server: grpc.Server, enable_metrics: bool=False): 

84 """Instantiate a new Bots service. 

85 

86 Args: 

87 server (grpc.Server): The gRPC server to add this request 

88 servicer to. 

89 enable_metrics (bool): Whether or not to enable metrics 

90 publishing for this Bots service. 

91 

92 """ 

93 self._logger = logging.getLogger(__name__) 

94 self._metrics_enabled = enable_metrics 

95 self._instances: Dict[str, BotsInstance] = {} 

96 

97 bots_pb2_grpc.add_BotsServicer_to_server(self, server) 

98 

99 def add_instance(self, name: str, instance: BotsInstance) -> None: 

100 """Register a new instance with the Bots service. 

101 

102 Takes a name and a ``BotsInstance``, and registers the instance 

103 with the Bots service using the given name. This allows the 

104 Bots service to handle incoming requests specifying the given 

105 name as an instance name. 

106 

107 Args: 

108 name (str): The name of the instance to be registered. 

109 instance (BotsInstance): The instance to be registered. 

110 

111 """ 

112 self._instances[name] = instance 

113 

114 def _get_instance(self, name: str) -> BotsInstance: 

115 try: 

116 return self._instances[name] 

117 except KeyError: 

118 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]") 

119 

120 def start(self) -> None: 

121 for instance in self._instances.values(): 

122 instance.start() 

123 

124 def stop(self) -> None: 

125 for instance in self._instances.values(): 

126 instance.stop() 

127 

128 @authorize(AuthContext) 

129 @DurationMetric(BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME) 

130 def CreateBotSession( 

131 self, 

132 request: CreateBotSessionRequest, 

133 context: grpc.ServicerContext 

134 ) -> BotSession: 

135 """Handler for CreateBotSession requests. 

136 

137 This method takes a request to create a new ``BotSession`` in a 

138 specific instance of the Bots service. This request is handed to 

139 the specified instance (if it exists). The instance returns the newly 

140 registered ``BotSession`` or raises an error if the client-provided 

141 session details were unacceptable. 

142 

143 Args: 

144 request (CreateBotSessionRequest): The gRPC request to create a 

145 new ``BotSession``. 

146 context (grpc.ServicerContext): Context for this gRPC request. 

147 

148 Returns: 

149 A ``BotSession`` gRPC message with the server-assigned fields set. 

150 

151 """ 

152 self._logger.info(f"CreateBotSession request from [{context.peer()}]") 

153 

154 instance_name = request.parent 

155 bot_id = request.bot_session.bot_id 

156 

157 try: 

158 instance = self._get_instance(instance_name) 

159 context.add_callback(partial(self._on_disconnect_callback, 

160 instance_name, request.bot_session)) 

161 

162 bot_session = instance.create_bot_session(instance_name, 

163 request.bot_session, 

164 context.time_remaining()) 

165 

166 return bot_session 

167 

168 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

169 self._logger.info(e) 

170 context.set_details(str(e)) 

171 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

172 

173 except BotSessionClosedError as e: # Leases re-queued, not an error 

174 self._logger.debug(e) 

175 context.set_details(str(e)) 

176 context.set_code(grpc.StatusCode.DATA_LOSS) 

177 

178 except RetriableError as e: 

179 self._logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

180 context.abort_with_status(e.error_status) 

181 

182 except Exception as e: 

183 self._logger.exception( 

184 f"Unexpected error in CreateBotSession; bot_id=[{bot_id}]: " 

185 f"request=[{request}]" 

186 ) 

187 context.set_details(str(e)) 

188 context.set_code(grpc.StatusCode.INTERNAL) 

189 

190 return BotSession() 

191 

192 @authorize(AuthContext) 

193 @DurationMetric(BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME) 

194 def UpdateBotSession( 

195 self, 

196 request: UpdateBotSessionRequest, 

197 context: grpc.ServicerContext 

198 ) -> BotSession: 

199 """Handler for UpdateBotSession requests. 

200 

201 This method takes a request to update an existing ``BotSession``. The 

202 server-assigned name is parsed to determine the instance the session 

203 belongs to, and then the bot-supplied ``BotSession`` state is given 

204 to that instance to handle. 

205 

206 Once the instance has handled any updates in either direction, the 

207 (potentially modified) ``BotSession`` is returned to the connected 

208 bot. 

209 

210 Args: 

211 request (UpdateBotSessionRequest): The incoming gRPC request to 

212 update a ``BotSession``. 

213 context (grpc.ServicerContext): Context for this gRPC request. 

214 

215 Returns: 

216 ``BotSession`` containing potentially updated server-assigned 

217 fields. 

218 """ 

219 self._logger.debug(f"UpdateBotSession request from [{context.peer()}]") 

220 

221 names = request.name.split("/") 

222 bot_id = request.bot_session.bot_id 

223 

224 try: 

225 instance_name = '/'.join(names[:-1]) 

226 context.add_callback(partial(self._on_disconnect_callback, 

227 instance_name, request.bot_session)) 

228 

229 instance = self._get_instance(instance_name) 

230 bot_session, metadata = instance.update_bot_session( 

231 request.name, 

232 request.bot_session, 

233 time_remaining=context.time_remaining()) 

234 

235 context.set_trailing_metadata(metadata) 

236 

237 return bot_session 

238 

239 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

240 self._logger.info(e) 

241 context.set_details(str(e)) 

242 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

243 

244 except BotSessionClosedError as e: # Leases re-queued, not an error 

245 self._logger.debug(e) 

246 context.set_details(str(e)) 

247 context.set_code(grpc.StatusCode.DATA_LOSS) 

248 

249 except RetriableError as e: 

250 self._logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

251 context.abort_with_status(e.error_status) 

252 

253 except Exception as e: 

254 self._logger.exception( 

255 f"Unexpected error in UpdateBotSession; bot_id=[{bot_id}]: " 

256 f"request=[{request}]" 

257 ) 

258 context.set_details(str(e)) 

259 context.set_code(grpc.StatusCode.INTERNAL) 

260 

261 return BotSession() 

262 

263 def _on_disconnect_callback(self, instance_name: str, bot_session: BotSession) -> None: 

264 """Callback to run when an RPC connection is terminated. 

265 

266 Args: 

267 instance_name (str): The instance name that the terminated 

268 connection was using. 

269 bot_session (BotSession): The BotSession being used in the 

270 terminated RPC connection. 

271 

272 """ 

273 instance = self._get_instance(instance_name) 

274 instance.expire_assignment_for_bot_name(bot_session.name)