Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/service.py: 65.83%

120 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17BotsService 

18================= 

19 

20""" 

21 

22import logging 

23 

24import grpc 

25 

26from google.protobuf import empty_pb2, timestamp_pb2 

27 

28 

29from buildgrid._enums import BotStatus 

30from buildgrid._exceptions import ( 

31 InvalidArgumentError, BotSessionClosedError, 

32 UnknownBotSessionError, BotSessionMismatchError, 

33 RetriableError 

34) 

35from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

36from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc 

37from buildgrid.server._authentication import AuthContext, authorize 

38from buildgrid.server.metrics_names import ( 

39 BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, 

40 BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME 

41) 

42from buildgrid.server.metrics_utils import DurationMetric 

43 

44 

45class BotsService(bots_pb2_grpc.BotsServicer): 

46 

47 def __init__(self, server, monitor=False): 

48 self.__logger = logging.getLogger(__name__) 

49 

50 self._instances = {} 

51 

52 bots_pb2_grpc.add_BotsServicer_to_server(self, server) 

53 

54 self._is_instrumented = monitor 

55 

56 # --- Public API --- 

57 

58 def add_instance(self, instance_name, instance): 

59 """Registers a new servicer instance. 

60 

61 Args: 

62 instance_name (str): The new instance's name. 

63 instance (BotsInterface): The new instance itself. 

64 """ 

65 self._instances[instance_name] = instance 

66 

67 def get_scheduler(self, instance_name): 

68 """Retrieves a reference to the scheduler for an instance. 

69 

70 Args: 

71 instance_name (str): The name of the instance to query. 

72 

73 Returns: 

74 Scheduler: A reference to the scheduler for `instance_name`. 

75 

76 Raises: 

77 InvalidArgumentError: If no instance named `instance_name` exists. 

78 """ 

79 instance = self._get_instance(instance_name) 

80 

81 return instance.scheduler 

82 

83 # --- Public API: Servicer --- 

84 

85 @authorize(AuthContext) 

86 @DurationMetric(BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME) 

87 def CreateBotSession(self, request, context): 

88 """Handles CreateBotSessionRequest messages. 

89 

90 Args: 

91 request (CreateBotSessionRequest): The incoming RPC request. 

92 context (grpc.ServicerContext): Context for the RPC call. 

93 """ 

94 self.__logger.info(f"CreateBotSession request from [{context.peer()}]") 

95 

96 instance_name = request.parent 

97 bot_id = request.bot_session.bot_id 

98 

99 try: 

100 instance = self._get_instance(instance_name) 

101 bot_session = instance.create_bot_session(instance_name, 

102 request.bot_session) 

103 now = timestamp_pb2.Timestamp() 

104 now.GetCurrentTime() 

105 

106 return bot_session 

107 

108 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

109 self.__logger.info(e) 

110 context.set_details(str(e)) 

111 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

112 

113 except BotSessionClosedError as e: # Leases re-queued, not an error 

114 self.__logger.debug(e) 

115 context.set_details(str(e)) 

116 context.set_code(grpc.StatusCode.DATA_LOSS) 

117 

118 except NotImplementedError as e: 

119 self.__logger.info(e) 

120 context.set_details(str(e)) 

121 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

122 

123 except RetriableError as e: 

124 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

125 context.abort_with_status(e.error_status) 

126 

127 except Exception as e: 

128 self.__logger.exception( 

129 f"Unexpected error in CreateBotSession; bot_id=[{bot_id}]: " 

130 f"request=[{request}]" 

131 ) 

132 context.set_details(str(e)) 

133 context.set_code(grpc.StatusCode.INTERNAL) 

134 

135 return bots_pb2.BotSession() 

136 

137 @authorize(AuthContext) 

138 @DurationMetric(BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME) 

139 def UpdateBotSession(self, request, context): 

140 """Handles UpdateBotSessionRequest messages. 

141 

142 Args: 

143 request (UpdateBotSessionRequest): The incoming RPC request. 

144 context (grpc.ServicerContext): Context for the RPC call. 

145 """ 

146 self.__logger.debug(f"UpdateBotSession request from [{context.peer()}]") 

147 

148 names = request.name.split("/") 

149 bot_id = request.bot_session.bot_id 

150 

151 try: 

152 instance_name = '/'.join(names[:-1]) 

153 

154 instance = self._get_instance(instance_name) 

155 bot_session, metadata = instance.update_bot_session( 

156 request.name, 

157 request.bot_session, 

158 deadline=context.time_remaining()) 

159 

160 context.set_trailing_metadata(metadata) 

161 

162 return bot_session 

163 

164 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

165 self.__logger.info(e) 

166 context.set_details(str(e)) 

167 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

168 

169 except BotSessionClosedError as e: # Leases re-queued, not an error 

170 self.__logger.debug(e) 

171 context.set_details(str(e)) 

172 context.set_code(grpc.StatusCode.DATA_LOSS) 

173 

174 except NotImplementedError as e: 

175 self.__logger.info(e) 

176 context.set_details(str(e)) 

177 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

178 

179 except RetriableError as e: 

180 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

181 context.abort_with_status(e.error_status) 

182 

183 except Exception as e: 

184 self.__logger.exception( 

185 f"Unexpected error in UpdateBotSession; bot_id=[{bot_id}]: " 

186 f"request=[{request}]" 

187 ) 

188 context.set_details(str(e)) 

189 context.set_code(grpc.StatusCode.INTERNAL) 

190 

191 return bots_pb2.BotSession() 

192 

193 @authorize(AuthContext) 

194 def PostBotEventTemp(self, request, context): 

195 """Handles PostBotEventTempRequest messages. 

196 

197 Args: 

198 request (PostBotEventTempRequest): The incoming RPC request. 

199 context (grpc.ServicerContext): Context for the RPC call. 

200 """ 

201 self.__logger.info(f"PostBotEventTemp request from [{context.peer()}]") 

202 

203 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

204 

205 return empty_pb2.Empty() 

206 

207 # --- Public API: Monitoring --- 

208 

209 @property 

210 def is_instrumented(self): 

211 return self._is_instrumented 

212 

213 def query_n_bots(self) -> int: 

214 if self.is_instrumented: 

215 total = 0 

216 for instance in self._instances.values(): 

217 total += instance.count_bots() 

218 return total 

219 raise RuntimeError("BuildGrid instrumentation is not enabled.") 

220 

221 def query_n_bots_for_instance(self, instance_name: str) -> int: 

222 if self.is_instrumented: 

223 try: 

224 instance = self._instances[instance_name] 

225 return instance.count_bots() 

226 except KeyError: 

227 raise InvalidArgumentError(f"Instance doesn't exist on server: [{instance_name}].") 

228 raise RuntimeError("BuildGrid instrumentation is not enabled.") 

229 

230 def query_n_bots_for_status(self, bot_status: BotStatus) -> int: 

231 if self.is_instrumented: 

232 try: 

233 total = 0 

234 for instance in self._instances.values(): 

235 total += instance.count_bots_by_status(bot_status) 

236 return total 

237 except KeyError: 

238 raise InvalidArgumentError(f"Bot Status: [{bot_status}] is not monitored.") 

239 raise RuntimeError("BuildGrid instrumentation is not enabled.") 

240 

241 # --- Private API --- 

242 

243 def _get_instance(self, name): 

244 try: 

245 return self._instances[name] 

246 

247 except KeyError: 

248 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]")