Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ExecutionService 

18================ 

19 

20Serves remote execution requests. 

21""" 

22 

23import logging 

24import queue 

25from functools import partial 

26 

27import grpc 

28 

29from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError 

30from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc 

31from buildgrid._protos.google.longrunning import operations_pb2 

32from buildgrid.server._authentication import AuthContext, authorize 

33from buildgrid.server.peer import Peer 

34from buildgrid.server._resources import ExecContext, limit 

35from buildgrid.server.request_metadata_utils import printable_request_metadata 

36 

37 

38class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): 

39 

40 def __init__(self, server, monitor=False): 

41 self.__logger = logging.getLogger(__name__) 

42 

43 self.__peers_by_instance = None 

44 self.__peers = None 

45 

46 self._instances = {} 

47 

48 remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server) 

49 

50 self._is_instrumented = monitor 

51 

52 if self._is_instrumented: 

53 self.__peers_by_instance = {} 

54 self.__peers = {} 

55 

56 # --- Public API --- 

57 

58 def add_instance(self, instance_name, instance): 

59 """Registers a new servicer instance. 

60 

61 Args: 

62 instance_name (str): The new instance's name. 

63 instance (ExecutionInstance): The new instance itself. 

64 """ 

65 self._instances[instance_name] = instance 

66 

67 if self._is_instrumented: 

68 self.__peers_by_instance[instance_name] = set() 

69 

70 def get_scheduler(self, instance_name): 

71 """Retrieves a reference to the scheduler for an instance. 

72 

73 Args: 

74 instance_name (str): The name of the instance to query. 

75 

76 Returns: 

77 Scheduler: A reference to the scheduler for `instance_name`. 

78 

79 Raises: 

80 InvalidArgumentError: If no instance named `instance_name` exists. 

81 """ 

82 instance = self._get_instance(instance_name) 

83 

84 return instance.scheduler 

85 

86 # --- Public API: Servicer --- 

87 

88 @authorize(AuthContext) 

89 @limit(ExecContext) 

90 def Execute(self, request, context): 

91 """Handles ExecuteRequest messages. 

92 

93 Args: 

94 request (ExecuteRequest): The incoming RPC request. 

95 context (grpc.ServicerContext): Context for the RPC call. 

96 """ 

97 self.__logger.debug(f"Execute request from [{context.peer()}] " 

98 f"([{printable_request_metadata(context)}])") 

99 

100 instance_name = request.instance_name 

101 peer_uid = context.peer() 

102 

103 Peer.register_peer(uid=peer_uid, context=context) 

104 

105 try: 

106 instance = self._get_instance(instance_name) 

107 

108 job_name = instance.execute(request.action_digest, 

109 request.skip_cache_lookup) 

110 

111 operation_name = instance.register_job_peer(job_name, peer_uid) 

112 

113 context.add_callback(partial(self._rpc_termination_callback, 

114 peer_uid, instance_name, operation_name)) 

115 

116 if self._is_instrumented: 

117 if peer_uid not in self.__peers: 

118 self.__peers_by_instance[instance_name].add(peer_uid) 

119 self.__peers[peer_uid] = 1 

120 else: 

121 self.__peers[peer_uid] += 1 

122 

123 operation_full_name = f"{instance_name}/{operation_name}" 

124 

125 self.__logger.info( 

126 f"Operation [{operation_full_name}] created for job [{job_name}]") 

127 

128 for operation in instance.stream_operation_updates(operation_name, context): 

129 operation.name = operation_full_name 

130 yield operation 

131 

132 if not context.is_active(): 

133 self.__logger.info(f"Peer peer_uid=[{peer_uid}] was holding up a thread for " 

134 f"`stream_operation_updates()` for instance_name=[{instance_name}], " 

135 f"operation_name=[{operation_name}], but the rpc context is not " 

136 "active anymore; releasing thread.") 

137 

138 except InvalidArgumentError as e: 

139 self.__logger.error(e) 

140 context.set_details(str(e)) 

141 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

142 yield operations_pb2.Operation() 

143 

144 except FailedPreconditionError as e: 

145 self.__logger.error(e) 

146 context.set_details(str(e)) 

147 context.set_code(grpc.StatusCode.FAILED_PRECONDITION) 

148 yield operations_pb2.Operation() 

149 

150 except CancelledError as e: 

151 self.__logger.info(f"Operation cancelled [{operation_full_name}]") 

152 context.set_details(str(e)) 

153 context.set_code(grpc.StatusCode.CANCELLED) 

154 yield e.last_response 

155 

156 @authorize(AuthContext) 

157 @limit(ExecContext) 

158 def WaitExecution(self, request, context): 

159 """Handles WaitExecutionRequest messages. 

160 

161 Args: 

162 request (WaitExecutionRequest): The incoming RPC request. 

163 context (grpc.ServicerContext): Context for the RPC call. 

164 """ 

165 self.__logger.debug(f"WaitExecution request from [{context.peer()}] " 

166 f"([{printable_request_metadata(context)}])") 

167 

168 names = request.name.split('/') 

169 instance_name = '/'.join(names[:-1]) 

170 operation_name = names[-1] 

171 peer = context.peer() 

172 

173 Peer.register_peer(uid=peer, context=context) 

174 

175 try: 

176 instance = self._get_instance(instance_name) 

177 

178 instance.register_operation_peer(operation_name, peer) 

179 

180 context.add_callback(partial(self._rpc_termination_callback, 

181 peer, instance_name, operation_name)) 

182 

183 if self._is_instrumented: 

184 if peer not in self.__peers: 

185 self.__peers_by_instance[instance_name].add(peer) 

186 self.__peers[peer] = 1 

187 else: 

188 self.__peers[peer] += 1 

189 

190 operation_full_name = f"{instance_name}/{operation_name}" 

191 

192 for operation in instance.stream_operation_updates(operation_name, context): 

193 operation.name = operation_full_name 

194 yield operation 

195 

196 if not context.is_active(): 

197 self.__logger.info(f"Peer peer_uid=[{peer}] was holding up a thread for " 

198 f"`stream_operation_updates()` for instance_name=[{instance_name}], " 

199 f"operation_name=[{operation_name}], but the rpc context is not " 

200 "active anymore; releasing thread.") 

201 

202 except InvalidArgumentError as e: 

203 self.__logger.error(e) 

204 context.set_details(str(e)) 

205 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

206 yield operations_pb2.Operation() 

207 

208 except CancelledError as e: 

209 self.__logger.info(f"Operation cancelled [{operation_full_name}]") 

210 context.set_details(str(e)) 

211 context.set_code(grpc.StatusCode.CANCELLED) 

212 yield e.last_response 

213 

214 # --- Public API: Monitoring --- 

215 

216 @property 

217 def is_instrumented(self): 

218 return self._is_instrumented 

219 

220 def query_n_clients(self): 

221 if self.__peers is not None: 

222 return len(self.__peers) 

223 return 0 

224 

225 def query_n_clients_for_instance(self, instance_name): 

226 try: 

227 if self.__peers_by_instance is not None: 

228 return len(self.__peers_by_instance[instance_name]) 

229 except KeyError: 

230 pass 

231 return 0 

232 

233 # --- Private API --- 

234 

235 def _rpc_termination_callback(self, peer_uid, instance_name, operation_name): 

236 self.__logger.debug(f"RPC terminated for peer_uid=[{peer_uid}], " 

237 f"instance_name=[{instance_name}], " 

238 f"operation_name=[{operation_name}]") 

239 

240 instance = self._get_instance(instance_name) 

241 

242 instance.unregister_operation_peer(operation_name, peer_uid) 

243 

244 if self._is_instrumented: 

245 if self.__peers[peer_uid] > 1: 

246 self.__peers[peer_uid] -= 1 

247 else: 

248 self.__peers_by_instance[instance_name].remove(peer_uid) 

249 del self.__peers[peer_uid] 

250 

251 Peer.deregister_peer(peer_uid) 

252 

253 def _get_instance(self, name): 

254 try: 

255 return self._instances[name] 

256 

257 except KeyError: 

258 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]")