Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/server.py: 19.21%

406 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import asyncio 

17from concurrent import futures 

18from datetime import datetime, timedelta 

19import logging 

20import logging.handlers 

21import os 

22import signal 

23import sys 

24import time 

25import traceback 

26from typing import Callable, Dict, Optional 

27 

28import grpc 

29from grpc_reflection.v1alpha import reflection 

30import janus 

31 

32from buildgrid._enums import ( 

33 BotStatus, LeaseState, LogRecordLevel, MetricCategories, OperationStage) 

34from buildgrid._exceptions import PermissionDeniedError 

35from buildgrid._protos.buildgrid.v2 import monitoring_pb2 

36from buildgrid.server.actioncache.service import ActionCacheService 

37from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm 

38from buildgrid.server._authentication import AuthContext, AuthMetadataServerInterceptor 

39from buildgrid.server.bots.service import BotsService 

40from buildgrid.server.build_events.service import PublishBuildEventService, QueryBuildEventsService 

41from buildgrid.server.capabilities.instance import CapabilitiesInstance 

42from buildgrid.server.capabilities.service import CapabilitiesService 

43from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService 

44from buildgrid.server.execution.service import ExecutionService 

45from buildgrid.server.cas.logstream.service import LogStreamService 

46from buildgrid.server.metrics_utils import create_gauge_record, create_timer_record 

47from buildgrid.server.metrics_names import ( 

48 AVERAGE_QUEUE_TIME_METRIC_NAME, 

49 CLIENT_COUNT_METRIC_NAME, 

50 BOT_COUNT_METRIC_NAME, 

51 LEASE_COUNT_METRIC_NAME, 

52 JOB_COUNT_METRIC_NAME 

53) 

54from buildgrid.server.monitoring import ( 

55 get_monitoring_bus, 

56 MonitoringOutputType, 

57 MonitoringOutputFormat, 

58 setup_monitoring_bus 

59) 

60from buildgrid.server.operations.service import OperationsService 

61from buildgrid.server.referencestorage.service import ReferenceStorageService 

62from buildgrid.server._resources import ExecContext 

63from buildgrid.settings import ( 

64 LOG_RECORD_FORMAT, MIN_THREAD_POOL_SIZE, MONITORING_PERIOD, DEFAULT_JWKS_REFETCH_INTERVAL_MINUTES 

65) 

66from buildgrid.utils import read_file 

67 

68# Need protos here, to enable server reflection. 

69from buildgrid._protos.google.longrunning import operations_pb2 

70from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

71 

72 

73def load_tls_server_credentials(server_key=None, server_cert=None, client_certs=None): 

74 """Looks-up and loads TLS server gRPC credentials. 

75 

76 Every private and public keys are expected to be PEM-encoded. 

77 

78 Args: 

79 server_key(str): private server key file path. 

80 server_cert(str): public server certificate file path. 

81 client_certs(str): public client certificates file path. 

82 

83 Returns: 

84 :obj:`ServerCredentials`: The credentials for use for a 

85 TLS-encrypted gRPC server channel. 

86 """ 

87 if not server_key or not os.path.exists(server_key): 

88 return None 

89 

90 if not server_cert or not os.path.exists(server_cert): 

91 return None 

92 

93 server_key_pem = read_file(server_key) 

94 server_cert_pem = read_file(server_cert) 

95 if client_certs and os.path.exists(client_certs): 

96 client_certs_pem = read_file(client_certs) 

97 else: 

98 client_certs_pem = None 

99 client_certs = None 

100 

101 credentials = grpc.ssl_server_credentials([(server_key_pem, server_cert_pem)], 

102 root_certificates=client_certs_pem, 

103 require_client_auth=bool(client_certs)) 

104 

105 credentials.server_key = server_key 

106 credentials.server_cert = server_cert 

107 credentials.client_certs = client_certs 

108 

109 return credentials 

110 

111 

112class Server: 

113 """Creates a BuildGrid server instance. 

114 

115 The :class:`Server` class binds together all the gRPC services. 

116 """ 

117 

118 def __init__(self, 

119 max_workers=None, monitor=False, 

120 mon_endpoint_type=MonitoringOutputType.STDOUT, 

121 mon_endpoint_location=None, 

122 mon_serialisation_format=MonitoringOutputFormat.JSON, 

123 mon_metric_prefix="", 

124 auth_method=AuthMetadataMethod.NONE, 

125 auth_secret=None, 

126 auth_jwks_url=None, 

127 auth_audience=None, 

128 auth_jwks_fetch_minutes=None, 

129 auth_algorithm=AuthMetadataAlgorithm.UNSPECIFIED, 

130 enable_server_reflection=True): 

131 """Initializes a new :class:`Server` instance. 

132 

133 Args: 

134 max_workers (int, optional): A pool of max worker threads. 

135 monitor (bool, optional): Whether or not to globally activate server 

136 monitoring. Defaults to ``False``. 

137 auth_method (AuthMetadataMethod, optional): Authentication method to 

138 be used for request authorization. Defaults to ``NONE``. 

139 auth_secret (str, optional): The secret or key to be used for 

140 authorizing request using `auth_method`. Defaults to ``None``. 

141 auth_jwks_url (str, optional): The url to fetch the JWKs. 

142 Either secret or this field must be specified. Defaults to ``None``. 

143 auth_audience (str): The audience used to validate jwt tokens against. 

144 The tokens must have an audience field. 

145 auth_jwks_fetch_minutes (int): The number of minutes to wait before 

146 refetching the jwks set. Default: 60 minutes. 

147 auth_algorithm (AuthMetadataAlgorithm, optional): The crytographic 

148 algorithm to be uses in combination with `auth_secret` for 

149 authorizing request using `auth_method`. Defaults to 

150 ``UNSPECIFIED``. 

151 """ 

152 self.__logger = logging.getLogger(__name__) 

153 

154 self.__main_loop = asyncio.get_event_loop() 

155 

156 self.__logging_queue = None # type: Optional[janus.Queue] 

157 self.__logging_handler = None # type: Optional[logging.handlers.QueueHandler] 

158 self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT) 

159 self.__print_log_records = True 

160 

161 self.__build_metadata_queues = None 

162 

163 self.__state_monitoring_task = None 

164 self.__build_monitoring_tasks = None 

165 self.__logging_task = None 

166 

167 self._capabilities_service = None 

168 self._execution_service = None 

169 self._bots_service = None 

170 self._operations_service = None 

171 self._reference_storage_service = None 

172 self._action_cache_service = None 

173 self._cas_service = None 

174 self._bytestream_service = None 

175 self._logstream_service = None 

176 self._build_events_service = None 

177 self._build_events_storage_backend = None 

178 self._query_build_events_service = None 

179 

180 self._schedulers = {} 

181 

182 self._instances = set() 

183 self._execution_instances = {} 

184 self._operations_instances = {} 

185 self._bots_instances = {} 

186 self._cas_instances = {} 

187 self._bytestream_instances = {} 

188 self._logstream_instances = {} 

189 self._action_cache_instances = {} 

190 self._reference_storage_instances = {} 

191 

192 self._ports = [] 

193 self._port_map = {} 

194 

195 self._server_reflection = enable_server_reflection 

196 self._reflection_services = [reflection.SERVICE_NAME] 

197 

198 self._is_instrumented = monitor 

199 

200 if self._is_instrumented: 

201 monitoring_bus = setup_monitoring_bus(endpoint_type=mon_endpoint_type, 

202 endpoint_location=mon_endpoint_location, 

203 metric_prefix=mon_metric_prefix, 

204 serialisation_format=mon_serialisation_format) 

205 

206 self.__build_monitoring_tasks = [] 

207 

208 if self._is_instrumented and monitoring_bus.prints_records: 

209 self.__print_log_records = False 

210 

211 if max_workers is None: 

212 # Use max_workers default from Python 3.4+ 

213 max_workers = max(MIN_THREAD_POOL_SIZE, (os.cpu_count() or 1) * 5) 

214 

215 elif max_workers < MIN_THREAD_POOL_SIZE: 

216 self.__logger.warning(f"Specified thread-limit=[{max_workers}] is too small, " 

217 f"bumping it to [{MIN_THREAD_POOL_SIZE}]") 

218 # Enforce a minumun for max_workers 

219 max_workers = MIN_THREAD_POOL_SIZE 

220 

221 self._max_grpc_workers = max_workers 

222 

223 ExecContext.init(max_workers) 

224 

225 self.__grpc_auth_interceptor = None 

226 

227 if auth_method != AuthMetadataMethod.NONE: 

228 if auth_jwks_fetch_minutes is None: 

229 auth_jwks_fetch_minutes = DEFAULT_JWKS_REFETCH_INTERVAL_MINUTES 

230 self.__grpc_auth_interceptor = AuthMetadataServerInterceptor( 

231 method=auth_method, secret=auth_secret, algorithm=auth_algorithm, 

232 jwks_url=auth_jwks_url, audience=auth_audience, jwks_fetch_minutes=auth_jwks_fetch_minutes) 

233 

234 AuthContext.interceptor = self.__grpc_auth_interceptor 

235 

236 try: 

237 # pylint: disable=consider-using-with 

238 self.__grpc_executor = futures.ThreadPoolExecutor( 

239 max_workers, thread_name_prefix="gRPC_Executor") 

240 except TypeError: 

241 # We need python >= 3.6 to support `thread_name_prefix`, so fallback 

242 # to ugly thread names if that didn't work 

243 

244 # pylint: disable=consider-using-with 

245 self.__grpc_executor = futures.ThreadPoolExecutor(max_workers) 

246 self.__grpc_server = None 

247 

248 self.__logger.debug(f"Setting up gRPC server with thread-limit=[{max_workers}]") 

249 

250 # --- Public API --- 

251 

252 def _start_logging(self) -> None: 

253 """Start the logging coroutine.""" 

254 self.__logging_task = asyncio.ensure_future( 

255 self._logging_worker(), loop=self.__main_loop) 

256 

257 def _start_monitoring(self) -> None: 

258 """Start the monitoring functionality. 

259 

260 This starts up the monitoring bus subprocess, and also starts the 

261 periodic status monitoring coroutine. 

262 

263 """ 

264 monitoring_bus = get_monitoring_bus() 

265 monitoring_bus.start() 

266 

267 self.__state_monitoring_task = asyncio.ensure_future( 

268 self._state_monitoring_worker(period=MONITORING_PERIOD), 

269 loop=self.__main_loop 

270 ) 

271 

272 def _instantiate_grpc(self) -> None: 

273 """Instantiate the gRPC objects. 

274 

275 This creates the gRPC server, and causes the instances attached to 

276 this server to instantiate any gRPC channels they need. This also 

277 sets up the services which route to those instances, and sets up 

278 gRPC reflection. 

279 

280 """ 

281 self.__grpc_server = grpc.server( 

282 self.__grpc_executor, 

283 maximum_concurrent_rpcs=self._max_grpc_workers 

284 ) 

285 

286 # We always want a capabilities service 

287 self._capabilities_service = CapabilitiesService(self.__grpc_server) 

288 

289 for instance_name, instance in self._execution_instances.items(): 

290 instance.setup_grpc() 

291 self._add_execution_instance(instance, instance_name) 

292 

293 for instance_name, instance in self._operations_instances.items(): 

294 instance.setup_grpc() 

295 self._add_operations_instance(instance, instance_name) 

296 

297 for instance_name, instance in self._bots_instances.items(): 

298 instance.setup_grpc() 

299 self._add_bots_instance(instance, instance_name) 

300 

301 for instance_name, instance in self._cas_instances.items(): 

302 instance.setup_grpc() 

303 self._add_cas_instance(instance, instance_name) 

304 

305 for instance_name, instance in self._bytestream_instances.items(): 

306 instance.setup_grpc() 

307 self._add_bytestream_instance(instance, instance_name) 

308 

309 for instance_name, instance in self._logstream_instances.items(): 

310 instance.setup_grpc() 

311 self._add_logstream_instance(instance, instance_name) 

312 

313 for instance_name, instance in self._action_cache_instances.items(): 

314 instance.setup_grpc() 

315 self._add_action_cache_instance(instance, instance_name) 

316 

317 for instance_name, instance in self._reference_storage_instances.items(): 

318 instance.setup_grpc() 

319 self._add_reference_storage_instance(instance, instance_name) 

320 

321 # The Build Events Services have no support for instance names, so this 

322 # is a bit of a special case where the storage backend itself is the 

323 # trigger for creating the gRPC services. 

324 if self._build_events_storage_backend is not None: 

325 self._add_build_events_services() 

326 

327 # Add the requested ports to the gRPC server 

328 for address, credentials in self._ports: 

329 if credentials is not None: 

330 self.__logger.info(f"Adding secure connection on: [{address}]") 

331 server_key = credentials.get('tls-server-key') 

332 server_cert = credentials.get('tls-server-cert') 

333 client_certs = credentials.get('tls-client-certs') 

334 credentials = load_tls_server_credentials( 

335 server_cert=server_cert, 

336 server_key=server_key, 

337 client_certs=client_certs 

338 ) 

339 port_number = self.__grpc_server.add_secure_port(address, credentials) 

340 

341 else: 

342 self.__logger.info(f"Adding insecure connection on [{address}]") 

343 port_number = self.__grpc_server.add_insecure_port(address) 

344 self._port_map[address] = port_number 

345 

346 if not port_number: 

347 raise PermissionDeniedError("Unable to configure socket") 

348 

349 self.__enable_server_reflection() 

350 

351 def start( 

352 self, 

353 *, 

354 on_server_start_cb: Optional[Callable]=None, 

355 port_assigned_callback: Optional[Callable]=None 

356 ) -> None: 

357 """Starts the BuildGrid server. 

358 

359 BuildGrid server startup consists of 3 stages, 

360 

361 1. Starting logging and monitoring 

362 

363 This step starts up the logging coroutine, the periodic status metrics 

364 coroutine, and the monitoring bus' publishing subprocess. Since this 

365 step involves forking, anything not fork-safe needs to be done *after* 

366 this step. 

367 

368 2. Instantiate gRPC 

369 

370 This step instantiates the gRPC server, and tells all the instances 

371 which have been attached to the server to instantiate their gRPC 

372 objects. It is also responsible for creating the various service 

373 objects and connecting them to the server and the instances. 

374 

375 After this step, gRPC core is running and its no longer safe to fork 

376 the process. 

377 

378 3. Start the gRPC server 

379 

380 The final step is starting up the gRPC server. The callback passed in 

381 via ``on_server_start_cb`` is executed in this step once the server 

382 has started. After this point BuildGrid is ready to serve requests. 

383 

384 The final thing done by this method is adding a ``SIGTERM`` handler 

385 which calls the ``Server.stop`` method to the event loop, and then 

386 that loop is started up using ``run_forever()``. 

387 

388 Args: 

389 on_server_start_cb (Callable): Callback function to execute once 

390 the gRPC server has started up. 

391 port_assigned_callback (Callable): Callback function to execute 

392 once the gRPC server has started up. The mapping of addresses 

393 to ports is passed to this callback. 

394 

395 """ 

396 # 1. Start logging and monitoring 

397 self._start_logging() 

398 if self._is_instrumented: 

399 self._start_monitoring() 

400 

401 # 2. Instantiate gRPC objects 

402 self._instantiate_grpc() 

403 

404 # 3. Start the gRPC server 

405 self.__grpc_server.start() 

406 if on_server_start_cb: 

407 on_server_start_cb() 

408 if port_assigned_callback: 

409 port_assigned_callback(port_map=self._port_map) 

410 

411 # Add the stop handler and run the event loop 

412 self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop) 

413 self.__main_loop.run_forever() 

414 

415 def stop(self): 

416 """Stops the BuildGrid server.""" 

417 if self._is_instrumented: 

418 if self.__state_monitoring_task is not None: 

419 self.__state_monitoring_task.cancel() 

420 

421 monitoring_bus = get_monitoring_bus() 

422 monitoring_bus.stop() 

423 

424 if self.__logging_task is not None: 

425 self.__logging_task.cancel() 

426 

427 self.__main_loop.stop() 

428 

429 if self.__grpc_server is not None: 

430 self.__grpc_server.stop(None) 

431 

432 def add_port(self, address, credentials): 

433 """Adds a port to the server. 

434 

435 Must be called before the server starts. If a credentials object exists, 

436 it will make a secure port. 

437 

438 Args: 

439 address (str): The address with port number. 

440 credentials (:obj:`grpc.ChannelCredentials`): Credentials object. 

441 

442 Returns: 

443 int: Number of the bound port. 

444 

445 Raises: 

446 PermissionDeniedError: If socket binding fails. 

447 

448 """ 

449 self._ports.append((address, credentials)) 

450 

451 def add_execution_instance(self, instance, instance_name): 

452 """Adds an :obj:`ExecutionInstance` to the service. 

453 

454 If no service exists, it creates one. 

455 

456 Args: 

457 instance (:obj:`ExecutionInstance`): Instance to add. 

458 instance_name (str): Instance name. 

459 """ 

460 self._execution_instances[instance_name] = instance 

461 self._instances.add(instance_name) 

462 

463 def _add_execution_instance(self, instance, instance_name): 

464 if self._execution_service is None: 

465 self._execution_service = ExecutionService( 

466 self.__grpc_server, monitor=self._is_instrumented) 

467 

468 self._execution_service.add_instance(instance_name, instance) 

469 self._add_capabilities_instance(instance_name, execution_instance=instance) 

470 

471 self._schedulers.setdefault(instance_name, set()).add(instance.scheduler) 

472 

473 if self._is_instrumented: 

474 instance.scheduler.activate_monitoring() 

475 self._reflection_services.append(remote_execution_pb2.DESCRIPTOR.services_by_name['Execution'].full_name) 

476 

477 def add_bots_interface(self, instance, instance_name): 

478 """Adds a :obj:`BotsInterface` to the service. 

479 

480 If no service exists, it creates one. 

481 

482 Args: 

483 instance (:obj:`BotsInterface`): Instance to add. 

484 instance_name (str): Instance name. 

485 """ 

486 self._bots_instances[instance_name] = instance 

487 self._instances.add(instance_name) 

488 

489 def _add_bots_instance(self, instance, instance_name): 

490 if self._bots_service is None: 

491 self._bots_service = BotsService( 

492 self.__grpc_server, monitor=self._is_instrumented) 

493 

494 self._bots_service.add_instance(instance_name, instance) 

495 

496 self._schedulers.setdefault(instance_name, set()).add(instance.scheduler) 

497 

498 if self._is_instrumented: 

499 instance.scheduler.activate_monitoring() 

500 

501 def add_operations_instance(self, instance, instance_name): 

502 """Adds an :obj:`OperationsInstance` to the service. 

503 

504 If no service exists, it creates one. 

505 

506 Args: 

507 instance (:obj:`OperationsInstance`): Instance to add. 

508 instance_name (str): Instance name. 

509 """ 

510 self._operations_instances[instance_name] = instance 

511 

512 def _add_operations_instance(self, instance, instance_name): 

513 if self._operations_service is None: 

514 self._operations_service = OperationsService(self.__grpc_server) 

515 

516 self._operations_service.add_instance(instance_name, instance) 

517 self._reflection_services.append(operations_pb2.DESCRIPTOR.services_by_name['Operations'].full_name) 

518 

519 def add_reference_storage_instance(self, instance, instance_name): 

520 """Adds a :obj:`ReferenceCache` to the service. 

521 

522 If no service exists, it creates one. 

523 

524 Args: 

525 instance (:obj:`ReferenceCache`): Instance to add. 

526 instance_name (str): Instance name. 

527 """ 

528 self._reference_storage_instances[instance_name] = instance 

529 

530 def _add_reference_storage_instance(self, instance, instance_name): 

531 if self._reference_storage_service is None: 

532 self._reference_storage_service = ReferenceStorageService(self.__grpc_server) 

533 

534 self._reference_storage_service.add_instance(instance_name, instance) 

535 

536 def add_action_cache_instance(self, instance, instance_name): 

537 """Adds a :obj:`ReferenceCache` to the service. 

538 

539 If no service exists, it creates one. 

540 

541 Args: 

542 instance (:obj:`ReferenceCache`): Instance to add. 

543 instance_name (str): Instance name. 

544 """ 

545 self._action_cache_instances[instance_name] = instance 

546 

547 def _add_action_cache_instance(self, instance, instance_name): 

548 if self._action_cache_service is None: 

549 self._action_cache_service = ActionCacheService(self.__grpc_server) 

550 

551 self._action_cache_service.add_instance(instance_name, instance) 

552 self._add_capabilities_instance(instance_name, action_cache_instance=instance) 

553 self._reflection_services.append(remote_execution_pb2.DESCRIPTOR.services_by_name['ActionCache'].full_name) 

554 

555 def add_cas_instance(self, instance, instance_name): 

556 """Adds a :obj:`ContentAddressableStorageInstance` to the service. 

557 

558 If no service exists, it creates one. 

559 

560 Args: 

561 instance (:obj:`ReferenceCache`): Instance to add. 

562 instance_name (str): Instance name. 

563 """ 

564 self._cas_instances[instance_name] = instance 

565 

566 def _add_cas_instance(self, instance, instance_name): 

567 if self._cas_service is None: 

568 self._cas_service = ContentAddressableStorageService(self.__grpc_server) 

569 

570 self._cas_service.add_instance(instance_name, instance) 

571 self._add_capabilities_instance(instance_name, cas_instance=instance) 

572 self._reflection_services.append( 

573 remote_execution_pb2.DESCRIPTOR.services_by_name['ContentAddressableStorage'].full_name) 

574 

575 def add_bytestream_instance(self, instance, instance_name): 

576 """Adds a :obj:`ByteStreamInstance` to the service. 

577 

578 If no service exists, it creates one. 

579 

580 Args: 

581 instance (:obj:`ByteStreamInstance`): Instance to add. 

582 instance_name (str): Instance name. 

583 """ 

584 self._bytestream_instances[instance_name] = instance 

585 

586 def _add_bytestream_instance(self, instance, instance_name): 

587 if self._bytestream_service is None: 

588 self._bytestream_service = ByteStreamService(self.__grpc_server) 

589 

590 self._bytestream_service.add_instance(instance_name, instance) 

591 

592 def add_logstream_instance(self, instance, instance_name): 

593 """Adds a :obj:`LogStreamInstance` to the service. 

594 

595 If no service exists, it creates one. 

596 

597 Args: 

598 instance (:obj:`LogStreamInstance`): Instance to add. 

599 instance_name (str): The name of the instance being added. 

600 

601 """ 

602 self._logstream_instances[instance_name] = instance 

603 

604 def _add_logstream_instance(self, instance, instance_name): 

605 if self._logstream_service is None: 

606 self._logstream_service = LogStreamService(self.__grpc_server) 

607 

608 self._logstream_service.add_instance(instance_name, instance) 

609 

610 def add_build_events_storage(self, storage_backend): 

611 """Adds a :obj:`BuildEventStreamStorage` to the server. 

612 

613 This is used to decide whether to create the Build Events services in 

614 the server. No instance name is passed in since the Build Events 

615 protocol has no support for instance names. 

616 

617 """ 

618 self._build_events_storage_backend = storage_backend 

619 

620 def _add_build_events_services(self): 

621 self._build_events_service = PublishBuildEventService( 

622 self.__grpc_server, self._build_events_storage_backend) 

623 self._query_build_events_service = QueryBuildEventsService( 

624 self.__grpc_server, self._build_events_storage_backend) 

625 

626 # --- Public API: Monitoring --- 

627 

628 @property 

629 def is_instrumented(self): 

630 return self._is_instrumented 

631 

632 # --- Private API --- 

633 

634 def _add_capabilities_instance(self, instance_name, 

635 cas_instance=None, 

636 action_cache_instance=None, 

637 execution_instance=None): 

638 """Adds a :obj:`CapabilitiesInstance` to the service. 

639 

640 Args: 

641 instance (:obj:`CapabilitiesInstance`): Instance to add. 

642 instance_name (str): Instance name. 

643 """ 

644 

645 try: 

646 if cas_instance: 

647 self._capabilities_service.add_cas_instance(instance_name, cas_instance) 

648 if action_cache_instance: 

649 self._capabilities_service.add_action_cache_instance(instance_name, action_cache_instance) 

650 if execution_instance: 

651 self._capabilities_service.add_execution_instance(instance_name, execution_instance) 

652 

653 except KeyError: 

654 capabilities_instance = CapabilitiesInstance(cas_instance, 

655 action_cache_instance, 

656 execution_instance) 

657 self._capabilities_service.add_instance(instance_name, capabilities_instance) 

658 

659 async def _logging_worker(self): 

660 """Publishes log records to the monitoring bus.""" 

661 self.__logging_queue = janus.Queue() 

662 self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q) 

663 

664 # Setup the main logging handler: 

665 root_logger = logging.getLogger() 

666 

667 for log_filter in root_logger.filters[:]: 

668 self.__logging_handler.addFilter(log_filter) 

669 root_logger.removeFilter(log_filter) 

670 

671 for log_handler in root_logger.handlers[:]: 

672 root_logger.removeHandler(log_handler) 

673 root_logger.addHandler(self.__logging_handler) 

674 

675 async def __logging_worker(): 

676 monitoring_bus = get_monitoring_bus() 

677 log_record = await self.__logging_queue.async_q.get() 

678 

679 # Print log records to stdout, if required: 

680 if self.__print_log_records: 

681 record = self.__logging_formatter.format(log_record) 

682 

683 # TODO: Investigate if async write would be worth here. 

684 sys.stdout.write(f'{record}\n') 

685 sys.stdout.flush() 

686 

687 # Emit a log record if server is instrumented: 

688 if self._is_instrumented: 

689 log_record_level = LogRecordLevel(int(log_record.levelno / 10)) 

690 log_record_creation_time = datetime.fromtimestamp(log_record.created) 

691 # logging.LogRecord.extra must be a str to str dict: 

692 if 'extra' in log_record.__dict__ and log_record.extra: 

693 log_record_metadata = log_record.extra 

694 else: 

695 log_record_metadata = None 

696 record = self._forge_log_record( 

697 domain=log_record.name, level=log_record_level, message=log_record.message, 

698 creation_time=log_record_creation_time, metadata=log_record_metadata) 

699 

700 await monitoring_bus.send_record(record) 

701 

702 while True: 

703 try: 

704 await __logging_worker() 

705 

706 except asyncio.CancelledError: 

707 break 

708 except Exception: 

709 # The thread shouldn't exit on exceptions, but output the exception so that 

710 # it can be found in the logs. 

711 # 

712 # Note, we DO NOT use `self.__logger` here, because we don't want to write 

713 # anything new to the logging queue in case the Exception isn't some transient 

714 # issue. 

715 try: 

716 sys.stdout.write("Exception in logging worker\n") 

717 sys.stdout.flush() 

718 traceback.print_exc() 

719 except Exception: 

720 # There's not a lot we can do at this point really. 

721 pass 

722 

723 def _forge_log_record(self, *, 

724 domain: str, 

725 level: LogRecordLevel, 

726 message: str, 

727 creation_time: datetime, 

728 metadata: Dict[str, str]=None): 

729 log_record = monitoring_pb2.LogRecord() 

730 

731 log_record.creation_timestamp.FromDatetime(creation_time) 

732 log_record.domain = domain 

733 log_record.level = level.value 

734 log_record.message = message 

735 if metadata is not None: 

736 log_record.metadata.update(metadata) 

737 

738 return log_record 

739 

740 async def _state_monitoring_worker(self, period=1.0): 

741 """Periodically publishes state metrics to the monitoring bus.""" 

742 async def __state_monitoring_worker(): 

743 monitoring_bus = get_monitoring_bus() 

744 # Execution metrics 

745 if self._execution_service: 

746 # Emit total clients count record: 

747 _, record = self._query_n_clients() 

748 await monitoring_bus.send_record(record) 

749 

750 for instance_name in self._instances: 

751 # Emit instance clients count record: 

752 _, record = self._query_n_clients_for_instance(instance_name) 

753 await monitoring_bus.send_record(record) 

754 

755 if self._bots_service: 

756 # Emit total bots count record: 

757 _, record = self._query_n_bots() 

758 await monitoring_bus.send_record(record) 

759 

760 for instance_name in self._instances: 

761 # Emit instance bots count record: 

762 _, record = self._query_n_bots_for_instance(instance_name) 

763 await monitoring_bus.send_record(record) 

764 

765 # Emits records by bot status: 

766 for bot_status in (botstatus for botstatus in BotStatus): 

767 # Emit status bots count record: 

768 _, record = self._query_n_bots_for_status(bot_status) 

769 await monitoring_bus.send_record(record) 

770 

771 if self._schedulers: 

772 queue_times = [] 

773 # Emits records by instance: 

774 for instance_name in self._instances: 

775 # Emit instance average queue time record: 

776 queue_time, record = self._query_am_queue_time_for_instance(instance_name) 

777 await monitoring_bus.send_record(record) 

778 if queue_time: 

779 queue_times.append(queue_time) 

780 

781 scheduler_metrics = self._query_scheduler_metrics_for_instance(instance_name) 

782 # This will be skipped if there were no stage changes 

783 if scheduler_metrics: 

784 for _, record in self._records_from_scheduler_metrics(scheduler_metrics, instance_name): 

785 await monitoring_bus.send_record(record) 

786 

787 # Emit overall average queue time record: 

788 if queue_times: 

789 am_queue_time = sum(queue_times, timedelta()) / len(queue_times) 

790 else: 

791 am_queue_time = timedelta() 

792 record = create_timer_record( 

793 AVERAGE_QUEUE_TIME_METRIC_NAME, 

794 am_queue_time) 

795 

796 await monitoring_bus.send_record(record) 

797 

798 while True: 

799 start = time.time() 

800 try: 

801 await __state_monitoring_worker() 

802 

803 except asyncio.CancelledError: 

804 break 

805 except Exception: 

806 # The thread shouldn't exit on exceptions, but log at a severe enough level 

807 # that it doesn't get lost in logs 

808 self.__logger.exception("Exception while gathering state metrics") 

809 

810 end = time.time() 

811 await asyncio.sleep(period - (end - start)) 

812 

813 # --- Private API: Monitoring --- 

814 

815 def _query_n_clients(self): 

816 """Queries the number of clients connected.""" 

817 n_clients = self._execution_service.query_n_clients() 

818 gauge_record = create_gauge_record( 

819 CLIENT_COUNT_METRIC_NAME, n_clients) 

820 

821 return n_clients, gauge_record 

822 

823 def _query_n_clients_for_instance(self, instance_name): 

824 """Queries the number of clients connected for a given instance""" 

825 n_clients = self._execution_service.query_n_clients_for_instance(instance_name) 

826 gauge_record = create_gauge_record( 

827 CLIENT_COUNT_METRIC_NAME, n_clients, 

828 metadata={'instance-name': instance_name or ''}) 

829 

830 return n_clients, gauge_record 

831 

832 def _query_n_bots(self): 

833 """Queries the number of bots connected.""" 

834 n_bots = self._bots_service.query_n_bots() 

835 gauge_record = create_gauge_record( 

836 BOT_COUNT_METRIC_NAME, n_bots) 

837 

838 return n_bots, gauge_record 

839 

840 def _query_n_bots_for_instance(self, instance_name): 

841 """Queries the number of bots connected for a given instance.""" 

842 n_bots = self._bots_service.query_n_bots_for_instance(instance_name) 

843 gauge_record = create_gauge_record( 

844 BOT_COUNT_METRIC_NAME, n_bots, 

845 metadata={'instance-name': instance_name or ''}) 

846 

847 return n_bots, gauge_record 

848 

849 def _query_n_bots_for_status(self, bot_status): 

850 """Queries the number of bots connected for a given health status.""" 

851 n_bots = self._bots_service.query_n_bots_for_status(bot_status) 

852 gauge_record = create_gauge_record(BOT_COUNT_METRIC_NAME, n_bots, 

853 metadata={'bot-status': bot_status.name, 'statsd-bucket': bot_status.name}) 

854 

855 return n_bots, gauge_record 

856 

857 def _query_am_queue_time_for_instance(self, instance_name): 

858 """Queries the average job's queue time for a given instance.""" 

859 # Multiple schedulers may be active for this instance, but only 

860 # one (the one associated with the BotsInterface) actually keeps 

861 # track of this metric. So publish the first one that has a non-default 

862 # value 

863 for scheduler in self._schedulers[instance_name]: 

864 am_queue_time = scheduler.query_am_queue_time() 

865 if am_queue_time != timedelta(): 

866 break 

867 timer_record = create_timer_record( 

868 AVERAGE_QUEUE_TIME_METRIC_NAME, am_queue_time, 

869 metadata={'instance-name': instance_name or ''}) 

870 

871 return am_queue_time, timer_record 

872 

873 def _query_scheduler_metrics_for_instance(self, instance_name): 

874 # Since multiple schedulers may be active for this instance, but should 

875 # be using the same data-store, just use the first one 

876 for scheduler in self._schedulers[instance_name]: 

877 return scheduler.get_metrics() 

878 

879 def _records_from_scheduler_metrics(self, scheduler_metrics, instance_name): 

880 # Jobs 

881 for stage, n_jobs in scheduler_metrics[MetricCategories.JOBS.value].items(): 

882 

883 stage = OperationStage(stage) 

884 gauge_record = create_gauge_record( 

885 JOB_COUNT_METRIC_NAME, n_jobs, 

886 metadata={'instance-name': instance_name or '', 

887 'operation-stage': stage.name, 

888 'statsd-bucket': stage.name}) 

889 yield n_jobs, gauge_record 

890 # Leases 

891 for state, n_leases in scheduler_metrics[MetricCategories.LEASES.value].items(): 

892 state = LeaseState(state) 

893 gauge_record = create_gauge_record( 

894 LEASE_COUNT_METRIC_NAME, n_leases, 

895 metadata={'instance-name': instance_name or '', 

896 'lease-state': state.name, 

897 'statsd-bucket': state.name}) 

898 yield n_leases, gauge_record 

899 

900 # --- Private API --- 

901 

902 def __enable_server_reflection(self) -> None: 

903 

904 if self._server_reflection: 

905 services = ', '.join(self._reflection_services[1:]) 

906 self.__logger.info( 

907 f"Server reflection is enabled for the following services: {services}") 

908 reflection.enable_server_reflection(self._reflection_services, self.__grpc_server) 

909 else: 

910 self.__logger.info("Server reflection is not enabled.")