Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/server.py: 73.55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

155 statements  

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import asyncio 

16from concurrent import futures 

17import logging 

18import logging.handlers 

19import os 

20import signal 

21import sys 

22from typing import Any, Callable, Dict, List, Optional, TYPE_CHECKING, Tuple 

23 

24import grpc 

25import janus 

26 

27from buildgrid._exceptions import PermissionDeniedError 

28from buildgrid.server.rabbitmq.bots.service import BotsService 

29from buildgrid.settings import ( 

30 LOG_RECORD_FORMAT, 

31 MIN_THREAD_POOL_SIZE 

32) 

33 

34if TYPE_CHECKING: 

35 from buildgrid.server.rabbitmq.bots.instance import BotsInstance 

36 

37 

38class RMQServer: 

39 """Creates a BuildGrid server instance that is powered by RabbitMQ. 

40 

41 The :class:`RMQServer` class binds together all the gRPC services. 

42 """ 

43 

44 def __init__(self, *, max_workers: Optional[int]=None, enable_metrics: bool=False, **kwargs): 

45 """Initializes a new :class:`RMQServer` instance. 

46 """ 

47 self._logger = logging.getLogger(__name__) 

48 

49 self._main_loop = asyncio.get_event_loop() 

50 

51 self._logging_queue: janus.Queue = janus.Queue(loop=self._main_loop) 

52 self._logging_handler = logging.handlers.QueueHandler(self._logging_queue.sync_q) # type: ignore 

53 self._logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT) 

54 self._logging_task: Optional[asyncio.Future] = None 

55 

56 self._bots_service = None 

57 

58 self._action_cache_instances: Dict[str, Any] = {} 

59 self._bots_instances: Dict[str, "BotsInstance"] = {} 

60 self._execution_instances: Dict[str, Any] = {} 

61 self._operations_instances: Dict[str, Any] = {} 

62 self._cas_instances: Dict[str, Any] = {} 

63 self._bytestream_instances: Dict[str, Any] = {} 

64 self._logstream_instances: Dict[str, Any] = {} 

65 

66 self._ports: List[Tuple[str, grpc.ServerCredentials]] = [] 

67 self._port_map: Dict[Tuple[str, grpc.ServerCredentials], int] = {} 

68 

69 self._enable_metrics = enable_metrics 

70 

71 # Setup the main logging handler: 

72 self._setup_logging() 

73 

74 if max_workers is None: 

75 # Use max_workers default from Python 3.4+ 

76 self._max_workers = max(MIN_THREAD_POOL_SIZE, (os.cpu_count() or 1) * 5) 

77 

78 elif max_workers < MIN_THREAD_POOL_SIZE: 

79 self._logger.warning(f"Specified thread-limit=[{max_workers}] is too small, " 

80 f"bumping it to [{MIN_THREAD_POOL_SIZE}]") 

81 # Enforce a minumun for max_workers 

82 self._max_workers = MIN_THREAD_POOL_SIZE 

83 

84 try: 

85 # pylint: disable=consider-using-with 

86 self._grpc_executor = futures.ThreadPoolExecutor( 

87 self._max_workers, thread_name_prefix="gRPC_Executor") 

88 except TypeError: 

89 # We need python >= 3.6 to support `thread_name_prefix`, so fallback 

90 # to ugly thread names if that didn't work 

91 

92 # pylint: disable=consider-using-with 

93 self._grpc_executor = futures.ThreadPoolExecutor(self._max_workers) 

94 

95 self._grpc_server: Optional[grpc.Server] = None 

96 

97 def _setup_logging(self) -> None: 

98 root_logger = logging.getLogger() 

99 

100 for log_filter in root_logger.filters[:]: 

101 self._logging_handler.addFilter(log_filter) 

102 root_logger.removeFilter(log_filter) 

103 

104 for log_handler in root_logger.handlers[:]: 

105 root_logger.removeHandler(log_handler) 

106 root_logger.addHandler(self._logging_handler) 

107 

108 async def _logging_worker(self) -> None: 

109 """Worker to read messages off the logging queue and write them to stdout. 

110 

111 """ 

112 async def _logging_worker() -> None: 

113 log_record = await self._logging_queue.async_q.get() 

114 record = self._logging_formatter.format(log_record) 

115 

116 # TODO: Investigate if async write would be worth here. 

117 sys.stdout.write(f'{record}\n') 

118 sys.stdout.flush() 

119 

120 # TODO: publish log records to a monitoring bus 

121 

122 while True: 

123 try: 

124 await _logging_worker() 

125 

126 except asyncio.CancelledError: 

127 break 

128 except Exception: 

129 # The thread shouldn't exit on exceptions, but log at a severe enough level 

130 # that it doesn't get lost in logs 

131 self._logger.exception("Exception in logging worker") 

132 

133 def _start_logging(self) -> None: 

134 """Start the logging coroutine.""" 

135 self._logging_task = asyncio.ensure_future( 

136 self._logging_worker(), loop=self._main_loop) 

137 

138 def _instantiate_grpc(self) -> None: 

139 """Instantiate the gRPC objects. 

140 

141 This creates the gRPC server, and causes the instances attached to 

142 this server to instantiate any gRPC channels they need. This also 

143 sets up the services which route to those instances, and sets up 

144 gRPC reflection. 

145 

146 """ 

147 # NOTE: maximum_concurrent_rpcs is set equal to max_workers to avoid 

148 # deadlocking remote execution when multiple services being run in 

149 # one process. eg. A server running CAS, Execution, and Bots services 

150 # can get into a state where all the executor threads are handling 

151 # long-lived execution connections. Allowing more concurrent RPCs 

152 # will queue CAS or Bots service requests until they timeout in this 

153 # scenario, rather than failing fast. 

154 self._grpc_server = grpc.server( 

155 self._grpc_executor, 

156 maximum_concurrent_rpcs=self._max_workers 

157 ) 

158 

159 # TODO: Capabilities service 

160 

161 for instance_name, instance in self._execution_instances.items(): 

162 instance.setup_grpc() 

163 self._add_execution_instance(instance, instance_name) 

164 

165 for instance_name, instance in self._operations_instances.items(): 

166 instance.setup_grpc() 

167 self._add_operations_instance(instance, instance_name) 

168 

169 for instance_name, instance in self._bots_instances.items(): 

170 instance.setup_grpc() 

171 self._add_bots_instance(instance, instance_name) 

172 

173 for instance_name, instance in self._cas_instances.items(): 

174 instance.setup_grpc() 

175 self._add_cas_instance(instance, instance_name) 

176 

177 for instance_name, instance in self._bytestream_instances.items(): 

178 instance.setup_grpc() 

179 self._add_bytestream_instance(instance, instance_name) 

180 

181 for instance_name, instance in self._logstream_instances.items(): 

182 instance.setup_grpc() 

183 self._add_logstream_instance(instance, instance_name) 

184 

185 for instance_name, instance in self._action_cache_instances.items(): 

186 instance.setup_grpc() 

187 self._add_action_cache_instance(instance, instance_name) 

188 

189 # Add the requested ports to the gRPC server 

190 for address, credentials in self._ports: 

191 if credentials is not None: 

192 self._logger.info(f"Adding secure connection on: [{address}]") 

193 port_number = self._grpc_server.add_secure_port(address, credentials) 

194 

195 else: 

196 self._logger.info(f"Adding insecure connection on [{address}]") 

197 port_number = self._grpc_server.add_insecure_port(address) 

198 self._port_map[(address, credentials)] = port_number 

199 

200 if not port_number: 

201 raise PermissionDeniedError("Unable to configure socket") 

202 

203 # TODO: Server reflection 

204 

205 def _start_services(self): 

206 if self._bots_service is not None: 

207 self._bots_service.start() 

208 

209 # --- Public API --- 

210 def start( 

211 self, 

212 *, 

213 on_server_start_cb: Callable=None, 

214 port_assigned_callback: Optional[Callable]=None 

215 ) -> None: 

216 """Starts the BuildGrid server. 

217 

218 BuildGrid server startup consists of 3 stages, 

219 

220 1. Starting logging and monitoring 

221 

222 This step starts up the logging coroutine, the periodic status metrics 

223 coroutine, and the monitoring bus' publishing subprocess. Since this 

224 step involves forking, anything not fork-safe needs to be done *after* 

225 this step. 

226 

227 2. Instantiate gRPC 

228 

229 This step instantiates the gRPC server. It is also responsible for 

230 creating the various service objects and connecting them to the server 

231 and the instances. This step starts up any background threads needed 

232 for the gRPC servicers, and tells the instances attached to those 

233 servicers to instantiate their gRPC objects (such as channels to 

234 remote servers). 

235 

236 After this step, gRPC core is running and its no longer safe to fork 

237 the process. 

238 

239 3. Start the gRPC server 

240 

241 The final step is starting up the gRPC server. The callback passed in 

242 via ``on_server_start_cb`` is executed in this step once the server 

243 has started. After this point BuildGrid is ready to serve requests. 

244 

245 The final thing done by this method is adding a ``SIGTERM`` handler 

246 which calls the ``Server.stop`` method to the event loop, and then 

247 that loop is started up using ``run_forever()``. 

248 

249 Args: 

250 on_server_start_cb (Callable): Callback function to execute once 

251 the gRPC server has started up. 

252 port_assigned_callback (Callable): Callback function to execute 

253 once the gRPC server has started up. The mapping of addresses 

254 to ports is passed to this callback. 

255 

256 """ 

257 # 1. Start logging and monitoring 

258 self._start_logging() 

259 # TODO: Start the monitoring bus 

260 

261 # 2. Instantiate gRPC objects 

262 self._instantiate_grpc() 

263 

264 # Call the start method of the gRPC services, so they can start up any 

265 # background threads they need 

266 self._start_services() 

267 

268 # 3. Start the gRPC server 

269 if self._grpc_server is not None: 

270 self._grpc_server.start() 

271 if on_server_start_cb: 

272 on_server_start_cb() 

273 if port_assigned_callback: 

274 port_assigned_callback(port_map=self._port_map) 

275 

276 # Add the stop handler and start the event loop 

277 self._main_loop.add_signal_handler(signal.SIGTERM, self.stop) 

278 self._main_loop.run_forever() 

279 

280 def stop(self): 

281 """Stops the BuildGrid server.""" 

282 # TODO: Stop the monitoring bus tasks 

283 if self._logging_task is not None: 

284 self._logging_task.cancel() 

285 

286 # Stop the gRPC server to prevent new incoming requests 

287 self._grpc_server.stop(None) 

288 

289 # Stop the gRPC services, to cleanly shut down background threads 

290 self._bots_service.stop() 

291 

292 self._main_loop.stop() 

293 

294 def add_port(self, address, credentials): 

295 """Adds a port to the server. 

296 

297 Must be called before the server starts. If a credentials object exists, 

298 it will make a secure port. 

299 

300 Args: 

301 address (str): The address with port number. 

302 credentials (:obj:`grpc.ChannelCredentials`): Credentials object. 

303 

304 """ 

305 self._ports.append((address, credentials)) 

306 

307 def add_execution_instance(self, instance, instance_name): 

308 """Adds an :obj:`ExecutionInstance` to the service. 

309 

310 If no service exists, it creates one. 

311 

312 Args: 

313 instance (:obj:`ExecutionInstance`): Instance to add. 

314 instance_name (str): Instance name. 

315 """ 

316 self._execution_instances[instance_name] = instance 

317 

318 def _add_execution_instance(self, instance, instance_name): 

319 pass 

320 

321 def add_bots_instance(self, instance, instance_name): 

322 """Adds a :obj:`BotsInstance` to the service. 

323 

324 If no service exists, it creates one. 

325 

326 Args: 

327 instance (:obj:`BotsInstance`): Instance to add. 

328 instance_name (str): Instance name. 

329 """ 

330 self._bots_instances[instance_name] = instance 

331 

332 def _add_bots_instance(self, instance, instance_name): 

333 if self._bots_service is None: 

334 self._bots_service = BotsService(self._grpc_server, self._enable_metrics) 

335 self._bots_service.add_instance(instance_name, instance) 

336 

337 def add_operations_instance(self, instance, instance_name): 

338 """Adds an :obj:`OperationsInstance` to the service. 

339 

340 If no service exists, it creates one. 

341 

342 Args: 

343 instance (:obj:`OperationsInstance`): Instance to add. 

344 instance_name (str): Instance name. 

345 """ 

346 self._operations_instances[instance_name] = instance 

347 

348 def _add_operations_instance(self, instance, instance_name): 

349 pass 

350 

351 def add_action_cache_instance(self, instance, instance_name): 

352 """Adds a :obj:`ReferenceCache` to the service. 

353 

354 If no service exists, it creates one. 

355 

356 Args: 

357 instance (:obj:`ReferenceCache`): Instance to add. 

358 instance_name (str): Instance name. 

359 """ 

360 self._action_cache_instances[instance_name] = instance 

361 

362 def _add_action_cache_instance(self, instance, instance_name): 

363 pass 

364 

365 def add_cas_instance(self, instance, instance_name): 

366 """Adds a :obj:`ContentAddressableStorageInstance` to the service. 

367 

368 If no service exists, it creates one. 

369 

370 Args: 

371 instance (:obj:`ReferenceCache`): Instance to add. 

372 instance_name (str): Instance name. 

373 """ 

374 self._cas_instances[instance_name] = instance 

375 

376 def _add_cas_instance(self, instance, instance_name): 

377 pass 

378 

379 def add_bytestream_instance(self, instance, instance_name): 

380 """Adds a :obj:`ByteStreamInstance` to the service. 

381 

382 If no service exists, it creates one. 

383 

384 Args: 

385 instance (:obj:`ByteStreamInstance`): Instance to add. 

386 instance_name (str): Instance name. 

387 """ 

388 self._bytestream_instances[instance_name] = instance 

389 

390 def _add_bytestream_instance(self, instance, instance_name): 

391 pass 

392 

393 def add_logstream_instance(self, instance, instance_name): 

394 """Adds a :obj:`LogStreamInstance` to the service. 

395 

396 If no service exists, it creates one. 

397 

398 Args: 

399 instance (:obj:`LogStreamInstance`): Instance to add. 

400 instance_name (str): The name of the instance being added. 

401 

402 """ 

403 self._logstream_instances[instance_name] = instance 

404 

405 def _add_logstream_instance(self, instance, instance_name): 

406 pass 

407 

408 # --- Public API: Monitoring --- 

409 @property 

410 def is_instrumented(self): 

411 pass