Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/server.py: 24.36%

156 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import asyncio 

16from concurrent import futures 

17import logging 

18import logging.handlers 

19import os 

20import signal 

21import sys 

22from typing import Any, Callable, Dict, List, Optional, TYPE_CHECKING, Tuple 

23 

24import grpc 

25import janus 

26 

27from buildgrid._exceptions import PermissionDeniedError 

28from buildgrid.server.rabbitmq.bots.service import BotsService 

29from buildgrid.settings import ( 

30 LOG_RECORD_FORMAT, 

31 MIN_THREAD_POOL_SIZE 

32) 

33 

34if TYPE_CHECKING: 

35 from buildgrid.server.rabbitmq.bots.instance import BotsInstance 

36 

37 

38class RMQServer: 

39 """Creates a BuildGrid server instance that is powered by RabbitMQ. 

40 

41 The :class:`RMQServer` class binds together all the gRPC services. 

42 """ 

43 

44 def __init__(self, *, max_workers: Optional[int]=None, enable_metrics: bool=False, **kwargs): 

45 """Initializes a new :class:`RMQServer` instance. 

46 """ 

47 self._logger = logging.getLogger(__name__) 

48 

49 self._main_loop = asyncio.get_event_loop() 

50 

51 self._logging_queue = None # type: Optional[janus.Queue] 

52 self._logging_handler = None # type: Optional[logging.handlers.QueueHandler] 

53 self._logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT) 

54 self._logging_task: Optional[asyncio.Future] = None 

55 

56 self._bots_service = None 

57 

58 self._action_cache_instances: Dict[str, Any] = {} 

59 self._bots_instances: Dict[str, "BotsInstance"] = {} 

60 self._execution_instances: Dict[str, Any] = {} 

61 self._operations_instances: Dict[str, Any] = {} 

62 self._cas_instances: Dict[str, Any] = {} 

63 self._bytestream_instances: Dict[str, Any] = {} 

64 self._logstream_instances: Dict[str, Any] = {} 

65 

66 self._ports: List[Tuple[str, grpc.ServerCredentials]] = [] 

67 self._port_map: Dict[Tuple[str, grpc.ServerCredentials], int] = {} 

68 

69 self._enable_metrics = enable_metrics 

70 

71 if max_workers is None: 

72 # Use max_workers default from Python 3.4+ 

73 self._max_workers = max(MIN_THREAD_POOL_SIZE, (os.cpu_count() or 1) * 5) 

74 

75 elif max_workers < MIN_THREAD_POOL_SIZE: 

76 self._logger.warning(f"Specified thread-limit=[{max_workers}] is too small, " 

77 f"bumping it to [{MIN_THREAD_POOL_SIZE}]") 

78 # Enforce a minumun for max_workers 

79 self._max_workers = MIN_THREAD_POOL_SIZE 

80 

81 try: 

82 # pylint: disable=consider-using-with 

83 self._grpc_executor = futures.ThreadPoolExecutor( 

84 self._max_workers, thread_name_prefix="gRPC_Executor") 

85 except TypeError: 

86 # We need python >= 3.6 to support `thread_name_prefix`, so fallback 

87 # to ugly thread names if that didn't work 

88 

89 # pylint: disable=consider-using-with 

90 self._grpc_executor = futures.ThreadPoolExecutor(self._max_workers) 

91 

92 self._grpc_server: Optional[grpc.Server] = None 

93 

94 async def _logging_worker(self) -> None: 

95 """Worker to read messages off the logging queue and write them to stdout. 

96 

97 """ 

98 self._logging_queue = janus.Queue() 

99 self._logging_handler = logging.handlers.QueueHandler(self._logging_queue.sync_q) # type: ignore 

100 

101 # Setup the main logging handler: 

102 root_logger = logging.getLogger() 

103 

104 for log_filter in root_logger.filters[:]: 

105 self._logging_handler.addFilter(log_filter) 

106 root_logger.removeFilter(log_filter) 

107 

108 for log_handler in root_logger.handlers[:]: 

109 root_logger.removeHandler(log_handler) 

110 root_logger.addHandler(self._logging_handler) 

111 

112 async def _logging_worker() -> None: 

113 if self._logging_queue is not None: 

114 log_record = await self._logging_queue.async_q.get() 

115 record = self._logging_formatter.format(log_record) 

116 

117 # TODO: Investigate if async write would be worth here. 

118 sys.stdout.write(f'{record}\n') 

119 sys.stdout.flush() 

120 

121 # TODO: publish log records to a monitoring bus 

122 

123 while True: 

124 try: 

125 await _logging_worker() 

126 

127 except asyncio.CancelledError: 

128 break 

129 except Exception: 

130 # The thread shouldn't exit on exceptions, but log at a severe enough level 

131 # that it doesn't get lost in logs 

132 self._logger.exception("Exception in logging worker") 

133 

134 def _start_logging(self) -> None: 

135 """Start the logging coroutine.""" 

136 self._logging_task = asyncio.ensure_future( 

137 self._logging_worker(), loop=self._main_loop) 

138 

139 def _instantiate_grpc(self) -> None: 

140 """Instantiate the gRPC objects. 

141 

142 This creates the gRPC server, and causes the instances attached to 

143 this server to instantiate any gRPC channels they need. This also 

144 sets up the services which route to those instances, and sets up 

145 gRPC reflection. 

146 

147 """ 

148 # NOTE: maximum_concurrent_rpcs is set equal to max_workers to avoid 

149 # deadlocking remote execution when multiple services being run in 

150 # one process. eg. A server running CAS, Execution, and Bots services 

151 # can get into a state where all the executor threads are handling 

152 # long-lived execution connections. Allowing more concurrent RPCs 

153 # will queue CAS or Bots service requests until they timeout in this 

154 # scenario, rather than failing fast. 

155 self._grpc_server = grpc.server( 

156 self._grpc_executor, 

157 maximum_concurrent_rpcs=self._max_workers 

158 ) 

159 

160 # TODO: Capabilities service 

161 

162 for instance_name, instance in self._execution_instances.items(): 

163 instance.setup_grpc() 

164 self._add_execution_instance(instance, instance_name) 

165 

166 for instance_name, instance in self._operations_instances.items(): 

167 instance.setup_grpc() 

168 self._add_operations_instance(instance, instance_name) 

169 

170 for instance_name, instance in self._bots_instances.items(): 

171 instance.setup_grpc() 

172 self._add_bots_instance(instance, instance_name) 

173 

174 for instance_name, instance in self._cas_instances.items(): 

175 instance.setup_grpc() 

176 self._add_cas_instance(instance, instance_name) 

177 

178 for instance_name, instance in self._bytestream_instances.items(): 

179 instance.setup_grpc() 

180 self._add_bytestream_instance(instance, instance_name) 

181 

182 for instance_name, instance in self._logstream_instances.items(): 

183 instance.setup_grpc() 

184 self._add_logstream_instance(instance, instance_name) 

185 

186 for instance_name, instance in self._action_cache_instances.items(): 

187 instance.setup_grpc() 

188 self._add_action_cache_instance(instance, instance_name) 

189 

190 # Add the requested ports to the gRPC server 

191 for address, credentials in self._ports: 

192 if credentials is not None: 

193 self._logger.info(f"Adding secure connection on: [{address}]") 

194 port_number = self._grpc_server.add_secure_port(address, credentials) 

195 

196 else: 

197 self._logger.info(f"Adding insecure connection on [{address}]") 

198 port_number = self._grpc_server.add_insecure_port(address) 

199 self._port_map[(address, credentials)] = port_number 

200 

201 if not port_number: 

202 raise PermissionDeniedError("Unable to configure socket") 

203 

204 # TODO: Server reflection 

205 

206 def _start_services(self): 

207 if self._bots_service is not None: 

208 self._bots_service.start() 

209 

210 # --- Public API --- 

211 def start( 

212 self, 

213 *, 

214 on_server_start_cb: Callable=None, 

215 port_assigned_callback: Optional[Callable]=None 

216 ) -> None: 

217 """Starts the BuildGrid server. 

218 

219 BuildGrid server startup consists of 3 stages, 

220 

221 1. Starting logging and monitoring 

222 

223 This step starts up the logging coroutine, the periodic status metrics 

224 coroutine, and the monitoring bus' publishing subprocess. Since this 

225 step involves forking, anything not fork-safe needs to be done *after* 

226 this step. 

227 

228 2. Instantiate gRPC 

229 

230 This step instantiates the gRPC server. It is also responsible for 

231 creating the various service objects and connecting them to the server 

232 and the instances. This step starts up any background threads needed 

233 for the gRPC servicers, and tells the instances attached to those 

234 servicers to instantiate their gRPC objects (such as channels to 

235 remote servers). 

236 

237 After this step, gRPC core is running and its no longer safe to fork 

238 the process. 

239 

240 3. Start the gRPC server 

241 

242 The final step is starting up the gRPC server. The callback passed in 

243 via ``on_server_start_cb`` is executed in this step once the server 

244 has started. After this point BuildGrid is ready to serve requests. 

245 

246 The final thing done by this method is adding a ``SIGTERM`` handler 

247 which calls the ``Server.stop`` method to the event loop, and then 

248 that loop is started up using ``run_forever()``. 

249 

250 Args: 

251 on_server_start_cb (Callable): Callback function to execute once 

252 the gRPC server has started up. 

253 port_assigned_callback (Callable): Callback function to execute 

254 once the gRPC server has started up. The mapping of addresses 

255 to ports is passed to this callback. 

256 

257 """ 

258 # 1. Start logging and monitoring 

259 self._start_logging() 

260 # TODO: Start the monitoring bus 

261 

262 # 2. Instantiate gRPC objects 

263 self._instantiate_grpc() 

264 

265 # Call the start method of the gRPC services, so they can start up any 

266 # background threads they need 

267 self._start_services() 

268 

269 # 3. Start the gRPC server 

270 if self._grpc_server is not None: 

271 self._grpc_server.start() 

272 if on_server_start_cb: 

273 on_server_start_cb() 

274 if port_assigned_callback: 

275 port_assigned_callback(port_map=self._port_map) 

276 

277 # Add the stop handler and start the event loop 

278 self._main_loop.add_signal_handler(signal.SIGTERM, self.stop) 

279 self._main_loop.run_forever() 

280 

281 def stop(self): 

282 """Stops the BuildGrid server.""" 

283 # TODO: Stop the monitoring bus tasks 

284 if self._logging_task is not None: 

285 self._logging_task.cancel() 

286 

287 # Stop the gRPC server to prevent new incoming requests 

288 self._grpc_server.stop(None) 

289 

290 # Stop the gRPC services, to cleanly shut down background threads 

291 self._bots_service.stop() 

292 

293 self._main_loop.stop() 

294 

295 def add_port(self, address, credentials): 

296 """Adds a port to the server. 

297 

298 Must be called before the server starts. If a credentials object exists, 

299 it will make a secure port. 

300 

301 Args: 

302 address (str): The address with port number. 

303 credentials (:obj:`grpc.ChannelCredentials`): Credentials object. 

304 

305 """ 

306 self._ports.append((address, credentials)) 

307 

308 def add_execution_instance(self, instance, instance_name): 

309 """Adds an :obj:`ExecutionInstance` to the service. 

310 

311 If no service exists, it creates one. 

312 

313 Args: 

314 instance (:obj:`ExecutionInstance`): Instance to add. 

315 instance_name (str): Instance name. 

316 """ 

317 self._execution_instances[instance_name] = instance 

318 

319 def _add_execution_instance(self, instance, instance_name): 

320 pass 

321 

322 def add_bots_instance(self, instance, instance_name): 

323 """Adds a :obj:`BotsInstance` to the service. 

324 

325 If no service exists, it creates one. 

326 

327 Args: 

328 instance (:obj:`BotsInstance`): Instance to add. 

329 instance_name (str): Instance name. 

330 """ 

331 self._bots_instances[instance_name] = instance 

332 

333 def _add_bots_instance(self, instance, instance_name): 

334 if self._bots_service is None: 

335 self._bots_service = BotsService(self._grpc_server, self._enable_metrics) 

336 self._bots_service.add_instance(instance_name, instance) 

337 

338 def add_operations_instance(self, instance, instance_name): 

339 """Adds an :obj:`OperationsInstance` to the service. 

340 

341 If no service exists, it creates one. 

342 

343 Args: 

344 instance (:obj:`OperationsInstance`): Instance to add. 

345 instance_name (str): Instance name. 

346 """ 

347 self._operations_instances[instance_name] = instance 

348 

349 def _add_operations_instance(self, instance, instance_name): 

350 pass 

351 

352 def add_action_cache_instance(self, instance, instance_name): 

353 """Adds a :obj:`ReferenceCache` to the service. 

354 

355 If no service exists, it creates one. 

356 

357 Args: 

358 instance (:obj:`ReferenceCache`): Instance to add. 

359 instance_name (str): Instance name. 

360 """ 

361 self._action_cache_instances[instance_name] = instance 

362 

363 def _add_action_cache_instance(self, instance, instance_name): 

364 pass 

365 

366 def add_cas_instance(self, instance, instance_name): 

367 """Adds a :obj:`ContentAddressableStorageInstance` to the service. 

368 

369 If no service exists, it creates one. 

370 

371 Args: 

372 instance (:obj:`ReferenceCache`): Instance to add. 

373 instance_name (str): Instance name. 

374 """ 

375 self._cas_instances[instance_name] = instance 

376 

377 def _add_cas_instance(self, instance, instance_name): 

378 pass 

379 

380 def add_bytestream_instance(self, instance, instance_name): 

381 """Adds a :obj:`ByteStreamInstance` to the service. 

382 

383 If no service exists, it creates one. 

384 

385 Args: 

386 instance (:obj:`ByteStreamInstance`): Instance to add. 

387 instance_name (str): Instance name. 

388 """ 

389 self._bytestream_instances[instance_name] = instance 

390 

391 def _add_bytestream_instance(self, instance, instance_name): 

392 pass 

393 

394 def add_logstream_instance(self, instance, instance_name): 

395 """Adds a :obj:`LogStreamInstance` to the service. 

396 

397 If no service exists, it creates one. 

398 

399 Args: 

400 instance (:obj:`LogStreamInstance`): Instance to add. 

401 instance_name (str): The name of the instance being added. 

402 

403 """ 

404 self._logstream_instances[instance_name] = instance 

405 

406 def _add_logstream_instance(self, instance, instance_name): 

407 pass 

408 

409 # --- Public API: Monitoring --- 

410 @property 

411 def is_instrumented(self): 

412 pass