Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17BotsService 

18================= 

19 

20""" 

21 

22import logging 

23 

24import grpc 

25 

26from google.protobuf import empty_pb2, timestamp_pb2 

27 

28from buildgrid._enums import BotStatus 

29from buildgrid._exceptions import ( 

30 InvalidArgumentError, BotSessionClosedError, 

31 UnknownBotSessionError, BotSessionMismatchError 

32) 

33from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

34from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc 

35from buildgrid.server._authentication import AuthContext, authorize 

36 

37 

38class BotsService(bots_pb2_grpc.BotsServicer): 

39 

40 def __init__(self, server, monitor=False): 

41 self.__logger = logging.getLogger(__name__) 

42 

43 self._instances = {} 

44 

45 bots_pb2_grpc.add_BotsServicer_to_server(self, server) 

46 

47 self._is_instrumented = monitor 

48 

49 # --- Public API --- 

50 

51 def add_instance(self, instance_name, instance): 

52 """Registers a new servicer instance. 

53 

54 Args: 

55 instance_name (str): The new instance's name. 

56 instance (BotsInterface): The new instance itself. 

57 """ 

58 self._instances[instance_name] = instance 

59 

60 def get_scheduler(self, instance_name): 

61 """Retrieves a reference to the scheduler for an instance. 

62 

63 Args: 

64 instance_name (str): The name of the instance to query. 

65 

66 Returns: 

67 Scheduler: A reference to the scheduler for `instance_name`. 

68 

69 Raises: 

70 InvalidArgumentError: If no instance named `instance_name` exists. 

71 """ 

72 instance = self._get_instance(instance_name) 

73 

74 return instance.scheduler 

75 

76 # --- Public API: Servicer --- 

77 

78 @authorize(AuthContext) 

79 def CreateBotSession(self, request, context): 

80 """Handles CreateBotSessionRequest messages. 

81 

82 Args: 

83 request (CreateBotSessionRequest): The incoming RPC request. 

84 context (grpc.ServicerContext): Context for the RPC call. 

85 """ 

86 self.__logger.debug(f"CreateBotSession request from [{context.peer()}]") 

87 

88 instance_name = request.parent 

89 bot_status = BotStatus(request.bot_session.status) 

90 bot_id = request.bot_session.bot_id 

91 

92 try: 

93 instance = self._get_instance(instance_name) 

94 bot_session = instance.create_bot_session(instance_name, 

95 request.bot_session) 

96 now = timestamp_pb2.Timestamp() 

97 now.GetCurrentTime() 

98 

99 return bot_session 

100 

101 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

102 self.__logger.error(e) 

103 context.set_details(str(e)) 

104 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

105 

106 except BotSessionClosedError as e: 

107 self.__logger.error(e) 

108 context.set_details(str(e)) 

109 context.set_code(grpc.StatusCode.DATA_LOSS) 

110 

111 except NotImplementedError as e: 

112 self.__logger.error(e) 

113 context.set_details(str(e)) 

114 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

115 

116 return bots_pb2.BotSession() 

117 

118 @authorize(AuthContext) 

119 def UpdateBotSession(self, request, context): 

120 """Handles UpdateBotSessionRequest messages. 

121 

122 Args: 

123 request (UpdateBotSessionRequest): The incoming RPC request. 

124 context (grpc.ServicerContext): Context for the RPC call. 

125 """ 

126 self.__logger.debug(f"UpdateBotSession request from [{context.peer()}]") 

127 

128 names = request.name.split("/") 

129 bot_status = BotStatus(request.bot_session.status) 

130 bot_id = request.bot_session.bot_id 

131 

132 try: 

133 instance_name = '/'.join(names[:-1]) 

134 

135 instance = self._get_instance(instance_name) 

136 bot_session = instance.update_bot_session( 

137 request.name, 

138 request.bot_session, 

139 deadline=context.time_remaining()) 

140 

141 return bot_session 

142 

143 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

144 self.__logger.error(e) 

145 context.set_details(str(e)) 

146 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

147 

148 except BotSessionClosedError as e: 

149 self.__logger.error(e) 

150 context.set_details(str(e)) 

151 context.set_code(grpc.StatusCode.DATA_LOSS) 

152 

153 except NotImplementedError as e: 

154 self.__logger.error(e) 

155 context.set_details(str(e)) 

156 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

157 

158 return bots_pb2.BotSession() 

159 

160 @authorize(AuthContext) 

161 def PostBotEventTemp(self, request, context): 

162 """Handles PostBotEventTempRequest messages. 

163 

164 Args: 

165 request (PostBotEventTempRequest): The incoming RPC request. 

166 context (grpc.ServicerContext): Context for the RPC call. 

167 """ 

168 self.__logger.debug(f"PostBotEventTemp request from [{context.peer()}]") 

169 

170 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

171 

172 return empty_pb2.Empty() 

173 

174 # --- Public API: Monitoring --- 

175 

176 @property 

177 def is_instrumented(self): 

178 return self._is_instrumented 

179 

180 def query_n_bots(self) -> int: 

181 if self.is_instrumented: 

182 total = 0 

183 for instance in self._instances.values(): 

184 total += instance.count_bots() 

185 return total 

186 raise RuntimeError("BuildGrid instrumentation is not enabled.") 

187 

188 def query_n_bots_for_instance(self, instance_name: str) -> int: 

189 if self.is_instrumented: 

190 try: 

191 instance = self._instances[instance_name] 

192 return instance.count_bots() 

193 except KeyError: 

194 raise InvalidArgumentError(f"Instance doesn't exist on server: [{instance_name}].") 

195 raise RuntimeError("BuildGrid instrumentation is not enabled.") 

196 

197 def query_n_bots_for_status(self, bot_status: BotStatus) -> int: 

198 if self.is_instrumented: 

199 try: 

200 total = 0 

201 for instance in self._instances.values(): 

202 total += instance.count_bots_by_status(bot_status) 

203 return total 

204 except KeyError: 

205 raise InvalidArgumentError(f"Bot Status: [{bot_status}] is not monitored.") 

206 raise RuntimeError("BuildGrid instrumentation is not enabled.") 

207 

208 # --- Private API --- 

209 

210 def _get_instance(self, name): 

211 try: 

212 return self._instances[name] 

213 

214 except KeyError: 

215 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]")