Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/service.py: 70.34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

118 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17BotsService 

18================= 

19 

20""" 

21 

22import logging 

23 

24import grpc 

25 

26from google.protobuf import empty_pb2, timestamp_pb2 

27 

28 

29from buildgrid._enums import BotStatus 

30from buildgrid._exceptions import ( 

31 InvalidArgumentError, BotSessionClosedError, 

32 UnknownBotSessionError, BotSessionMismatchError, 

33 RetriableError 

34) 

35from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

36from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc 

37from buildgrid.server._authentication import AuthContext, authorize 

38from buildgrid.server.metrics_names import ( 

39 BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME, 

40 BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME 

41) 

42from buildgrid.server.metrics_utils import DurationMetric 

43 

44 

45class BotsService(bots_pb2_grpc.BotsServicer): 

46 

47 def __init__(self, server, monitor=False): 

48 self.__logger = logging.getLogger(__name__) 

49 

50 self._instances = {} 

51 

52 bots_pb2_grpc.add_BotsServicer_to_server(self, server) 

53 

54 self._is_instrumented = monitor 

55 

56 # --- Public API --- 

57 

58 def add_instance(self, instance_name, instance): 

59 """Registers a new servicer instance. 

60 

61 Args: 

62 instance_name (str): The new instance's name. 

63 instance (BotsInterface): The new instance itself. 

64 """ 

65 self._instances[instance_name] = instance 

66 

67 def get_scheduler(self, instance_name): 

68 """Retrieves a reference to the scheduler for an instance. 

69 

70 Args: 

71 instance_name (str): The name of the instance to query. 

72 

73 Returns: 

74 Scheduler: A reference to the scheduler for `instance_name`. 

75 

76 Raises: 

77 InvalidArgumentError: If no instance named `instance_name` exists. 

78 """ 

79 instance = self._get_instance(instance_name) 

80 

81 return instance.scheduler 

82 

83 # --- Public API: Servicer --- 

84 

85 @authorize(AuthContext) 

86 @DurationMetric(BOTS_CREATE_BOT_SESSION_TIME_METRIC_NAME) 

87 def CreateBotSession(self, request, context): 

88 """Handles CreateBotSessionRequest messages. 

89 

90 Args: 

91 request (CreateBotSessionRequest): The incoming RPC request. 

92 context (grpc.ServicerContext): Context for the RPC call. 

93 """ 

94 self.__logger.info(f"CreateBotSession request from [{context.peer()}]") 

95 

96 instance_name = request.parent 

97 bot_id = request.bot_session.bot_id 

98 

99 try: 

100 instance = self._get_instance(instance_name) 

101 bot_session = instance.create_bot_session(instance_name, 

102 request.bot_session) 

103 now = timestamp_pb2.Timestamp() 

104 now.GetCurrentTime() 

105 

106 return bot_session 

107 

108 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

109 self.__logger.info(e) 

110 context.set_details(str(e)) 

111 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

112 

113 except BotSessionClosedError as e: # Leases re-queued, not an error 

114 self.__logger.debug(e) 

115 context.set_details(str(e)) 

116 context.set_code(grpc.StatusCode.DATA_LOSS) 

117 

118 except NotImplementedError as e: 

119 self.__logger.info(e) 

120 context.set_details(str(e)) 

121 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

122 

123 except RetriableError as e: 

124 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

125 context.abort_with_status(e.error_status) 

126 

127 except Exception as e: 

128 self.__logger.exception( 

129 f"Unexpected error in CreateBotSession; bot_id=[{bot_id}]: " 

130 f"request=[{request}]" 

131 ) 

132 context.set_code(grpc.StatusCode.INTERNAL) 

133 

134 return bots_pb2.BotSession() 

135 

136 @authorize(AuthContext) 

137 @DurationMetric(BOTS_UPDATE_BOT_SESSION_TIME_METRIC_NAME) 

138 def UpdateBotSession(self, request, context): 

139 """Handles UpdateBotSessionRequest messages. 

140 

141 Args: 

142 request (UpdateBotSessionRequest): The incoming RPC request. 

143 context (grpc.ServicerContext): Context for the RPC call. 

144 """ 

145 self.__logger.debug(f"UpdateBotSession request from [{context.peer()}]") 

146 

147 names = request.name.split("/") 

148 bot_id = request.bot_session.bot_id 

149 

150 try: 

151 instance_name = '/'.join(names[:-1]) 

152 

153 instance = self._get_instance(instance_name) 

154 bot_session, metadata = instance.update_bot_session( 

155 request.name, 

156 request.bot_session, 

157 deadline=context.time_remaining()) 

158 

159 context.set_trailing_metadata(metadata) 

160 

161 return bot_session 

162 

163 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

164 self.__logger.info(e) 

165 context.set_details(str(e)) 

166 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

167 

168 except BotSessionClosedError as e: # Leases re-queued, not an error 

169 self.__logger.debug(e) 

170 context.set_details(str(e)) 

171 context.set_code(grpc.StatusCode.DATA_LOSS) 

172 

173 except NotImplementedError as e: 

174 self.__logger.info(e) 

175 context.set_details(str(e)) 

176 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

177 

178 except RetriableError as e: 

179 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

180 context.abort_with_status(e.error_status) 

181 

182 except Exception as e: 

183 self.__logger.exception( 

184 f"Unexpected error in UpdateBotSession; bot_id=[{bot_id}]: " 

185 f"request=[{request}]" 

186 ) 

187 context.set_code(grpc.StatusCode.INTERNAL) 

188 

189 return bots_pb2.BotSession() 

190 

191 @authorize(AuthContext) 

192 def PostBotEventTemp(self, request, context): 

193 """Handles PostBotEventTempRequest messages. 

194 

195 Args: 

196 request (PostBotEventTempRequest): The incoming RPC request. 

197 context (grpc.ServicerContext): Context for the RPC call. 

198 """ 

199 self.__logger.info(f"PostBotEventTemp request from [{context.peer()}]") 

200 

201 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

202 

203 return empty_pb2.Empty() 

204 

205 # --- Public API: Monitoring --- 

206 

207 @property 

208 def is_instrumented(self): 

209 return self._is_instrumented 

210 

211 def query_n_bots(self) -> int: 

212 if self.is_instrumented: 

213 total = 0 

214 for instance in self._instances.values(): 

215 total += instance.count_bots() 

216 return total 

217 raise RuntimeError("BuildGrid instrumentation is not enabled.") 

218 

219 def query_n_bots_for_instance(self, instance_name: str) -> int: 

220 if self.is_instrumented: 

221 try: 

222 instance = self._instances[instance_name] 

223 return instance.count_bots() 

224 except KeyError: 

225 raise InvalidArgumentError(f"Instance doesn't exist on server: [{instance_name}].") 

226 raise RuntimeError("BuildGrid instrumentation is not enabled.") 

227 

228 def query_n_bots_for_status(self, bot_status: BotStatus) -> int: 

229 if self.is_instrumented: 

230 try: 

231 total = 0 

232 for instance in self._instances.values(): 

233 total += instance.count_bots_by_status(bot_status) 

234 return total 

235 except KeyError: 

236 raise InvalidArgumentError(f"Bot Status: [{bot_status}] is not monitored.") 

237 raise RuntimeError("BuildGrid instrumentation is not enabled.") 

238 

239 # --- Private API --- 

240 

241 def _get_instance(self, name): 

242 try: 

243 return self._instances[name] 

244 

245 except KeyError: 

246 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]")