Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/server.py: 84.48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

393 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import asyncio 

17from concurrent import futures 

18from datetime import datetime, timedelta 

19import logging 

20import logging.handlers 

21import os 

22import signal 

23import sys 

24import time 

25import traceback 

26from typing import Callable, Dict, Optional 

27 

28import grpc 

29from grpc_reflection.v1alpha import reflection 

30import janus 

31 

32from buildgrid._enums import ( 

33 BotStatus, LeaseState, LogRecordLevel, MetricCategories, OperationStage) 

34from buildgrid._exceptions import PermissionDeniedError 

35from buildgrid._protos.buildgrid.v2 import monitoring_pb2 

36from buildgrid.server.actioncache.service import ActionCacheService 

37from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm 

38from buildgrid.server._authentication import AuthContext, AuthMetadataServerInterceptor 

39from buildgrid.server.bots.service import BotsService 

40from buildgrid.server.capabilities.instance import CapabilitiesInstance 

41from buildgrid.server.capabilities.service import CapabilitiesService 

42from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService 

43from buildgrid.server.execution.service import ExecutionService 

44from buildgrid.server.cas.logstream.service import LogStreamService 

45from buildgrid.server.metrics_utils import create_gauge_record, create_timer_record 

46from buildgrid.server.metrics_names import ( 

47 AVERAGE_QUEUE_TIME_METRIC_NAME, 

48 CLIENT_COUNT_METRIC_NAME, 

49 BOT_COUNT_METRIC_NAME, 

50 LEASE_COUNT_METRIC_NAME, 

51 JOB_COUNT_METRIC_NAME 

52) 

53from buildgrid.server.monitoring import ( 

54 get_monitoring_bus, 

55 MonitoringOutputType, 

56 MonitoringOutputFormat, 

57 setup_monitoring_bus 

58) 

59from buildgrid.server.operations.service import OperationsService 

60from buildgrid.server.referencestorage.service import ReferenceStorageService 

61from buildgrid.server._resources import ExecContext 

62from buildgrid.settings import ( 

63 LOG_RECORD_FORMAT, MIN_THREAD_POOL_SIZE, MONITORING_PERIOD, DEFAULT_JWKS_REFETCH_INTERVAL_MINUTES 

64) 

65from buildgrid.utils import read_file 

66 

67# Need protos here, to enable server reflection. 

68from buildgrid._protos.google.longrunning import operations_pb2 

69from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

70 

71 

72def load_tls_server_credentials(server_key=None, server_cert=None, client_certs=None): 

73 """Looks-up and loads TLS server gRPC credentials. 

74 

75 Every private and public keys are expected to be PEM-encoded. 

76 

77 Args: 

78 server_key(str): private server key file path. 

79 server_cert(str): public server certificate file path. 

80 client_certs(str): public client certificates file path. 

81 

82 Returns: 

83 :obj:`ServerCredentials`: The credentials for use for a 

84 TLS-encrypted gRPC server channel. 

85 """ 

86 if not server_key or not os.path.exists(server_key): 

87 return None 

88 

89 if not server_cert or not os.path.exists(server_cert): 

90 return None 

91 

92 server_key_pem = read_file(server_key) 

93 server_cert_pem = read_file(server_cert) 

94 if client_certs and os.path.exists(client_certs): 

95 client_certs_pem = read_file(client_certs) 

96 else: 

97 client_certs_pem = None 

98 client_certs = None 

99 

100 credentials = grpc.ssl_server_credentials([(server_key_pem, server_cert_pem)], 

101 root_certificates=client_certs_pem, 

102 require_client_auth=bool(client_certs)) 

103 

104 credentials.server_key = server_key 

105 credentials.server_cert = server_cert 

106 credentials.client_certs = client_certs 

107 

108 return credentials 

109 

110 

111class Server: 

112 """Creates a BuildGrid server instance. 

113 

114 The :class:`Server` class binds together all the gRPC services. 

115 """ 

116 

117 def __init__(self, 

118 max_workers=None, monitor=False, 

119 mon_endpoint_type=MonitoringOutputType.STDOUT, 

120 mon_endpoint_location=None, 

121 mon_serialisation_format=MonitoringOutputFormat.JSON, 

122 mon_metric_prefix="", 

123 auth_method=AuthMetadataMethod.NONE, 

124 auth_secret=None, 

125 auth_jwks_url=None, 

126 auth_audience=None, 

127 auth_jwks_fetch_minutes=None, 

128 auth_algorithm=AuthMetadataAlgorithm.UNSPECIFIED, 

129 enable_server_reflection=True): 

130 """Initializes a new :class:`Server` instance. 

131 

132 Args: 

133 max_workers (int, optional): A pool of max worker threads. 

134 monitor (bool, optional): Whether or not to globally activate server 

135 monitoring. Defaults to ``False``. 

136 auth_method (AuthMetadataMethod, optional): Authentication method to 

137 be used for request authorization. Defaults to ``NONE``. 

138 auth_secret (str, optional): The secret or key to be used for 

139 authorizing request using `auth_method`. Defaults to ``None``. 

140 auth_jwks_url (str, optional): The url to fetch the JWKs. 

141 Either secret or this field must be specified. Defaults to ``None``. 

142 auth_audience (str): The audience used to validate jwt tokens against. 

143 The tokens must have an audience field. 

144 auth_jwks_fetch_minutes (int): The number of minutes to wait before 

145 refetching the jwks set. Default: 60 minutes. 

146 auth_algorithm (AuthMetadataAlgorithm, optional): The crytographic 

147 algorithm to be uses in combination with `auth_secret` for 

148 authorizing request using `auth_method`. Defaults to 

149 ``UNSPECIFIED``. 

150 """ 

151 self.__logger = logging.getLogger(__name__) 

152 

153 self.__main_loop = asyncio.get_event_loop() 

154 

155 self.__logging_queue = janus.Queue(loop=self.__main_loop) 

156 self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q) 

157 self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT) 

158 self.__print_log_records = True 

159 

160 self.__build_metadata_queues = None 

161 

162 self.__state_monitoring_task = None 

163 self.__build_monitoring_tasks = None 

164 self.__logging_task = None 

165 

166 self._capabilities_service = None 

167 self._execution_service = None 

168 self._bots_service = None 

169 self._operations_service = None 

170 self._reference_storage_service = None 

171 self._action_cache_service = None 

172 self._cas_service = None 

173 self._bytestream_service = None 

174 self._logstream_service = None 

175 

176 self._schedulers = {} 

177 

178 self._instances = set() 

179 self._execution_instances = {} 

180 self._operations_instances = {} 

181 self._bots_instances = {} 

182 self._cas_instances = {} 

183 self._bytestream_instances = {} 

184 self._logstream_instances = {} 

185 self._action_cache_instances = {} 

186 self._reference_storage_instances = {} 

187 

188 self._ports = [] 

189 self._port_map = {} 

190 

191 self._server_reflection = enable_server_reflection 

192 self._reflection_services = [reflection.SERVICE_NAME] 

193 

194 self._is_instrumented = monitor 

195 

196 if self._is_instrumented: 

197 monitoring_bus = setup_monitoring_bus(endpoint_type=mon_endpoint_type, 

198 endpoint_location=mon_endpoint_location, 

199 metric_prefix=mon_metric_prefix, 

200 serialisation_format=mon_serialisation_format) 

201 

202 self.__build_monitoring_tasks = [] 

203 

204 # Setup the main logging handler: 

205 root_logger = logging.getLogger() 

206 

207 for log_filter in root_logger.filters[:]: 

208 self.__logging_handler.addFilter(log_filter) 

209 root_logger.removeFilter(log_filter) 

210 

211 for log_handler in root_logger.handlers[:]: 

212 root_logger.removeHandler(log_handler) 

213 root_logger.addHandler(self.__logging_handler) 

214 

215 if self._is_instrumented and monitoring_bus.prints_records: 

216 self.__print_log_records = False 

217 

218 if max_workers is None: 

219 # Use max_workers default from Python 3.4+ 

220 max_workers = max(MIN_THREAD_POOL_SIZE, (os.cpu_count() or 1) * 5) 

221 

222 elif max_workers < MIN_THREAD_POOL_SIZE: 

223 self.__logger.warning(f"Specified thread-limit=[{max_workers}] is too small, " 

224 f"bumping it to [{MIN_THREAD_POOL_SIZE}]") 

225 # Enforce a minumun for max_workers 

226 max_workers = MIN_THREAD_POOL_SIZE 

227 

228 self._max_grpc_workers = max_workers 

229 

230 ExecContext.init(max_workers) 

231 

232 self.__grpc_auth_interceptor = None 

233 

234 if auth_method != AuthMetadataMethod.NONE: 

235 if auth_jwks_fetch_minutes is None: 

236 auth_jwks_fetch_minutes = DEFAULT_JWKS_REFETCH_INTERVAL_MINUTES 

237 self.__grpc_auth_interceptor = AuthMetadataServerInterceptor( 

238 method=auth_method, secret=auth_secret, algorithm=auth_algorithm, 

239 jwks_url=auth_jwks_url, audience=auth_audience, jwks_fetch_minutes=auth_jwks_fetch_minutes) 

240 

241 AuthContext.interceptor = self.__grpc_auth_interceptor 

242 

243 try: 

244 # pylint: disable=consider-using-with 

245 self.__grpc_executor = futures.ThreadPoolExecutor( 

246 max_workers, thread_name_prefix="gRPC_Executor") 

247 except TypeError: 

248 # We need python >= 3.6 to support `thread_name_prefix`, so fallback 

249 # to ugly thread names if that didn't work 

250 

251 # pylint: disable=consider-using-with 

252 self.__grpc_executor = futures.ThreadPoolExecutor(max_workers) 

253 self.__grpc_server = None 

254 

255 self.__logger.debug(f"Setting up gRPC server with thread-limit=[{max_workers}]") 

256 

257 # --- Public API --- 

258 

259 def _start_logging(self) -> None: 

260 """Start the logging coroutine.""" 

261 self.__logging_task = asyncio.ensure_future( 

262 self._logging_worker(), loop=self.__main_loop) 

263 

264 def _start_monitoring(self) -> None: 

265 """Start the monitoring functionality. 

266 

267 This starts up the monitoring bus subprocess, and also starts the 

268 periodic status monitoring coroutine. 

269 

270 """ 

271 monitoring_bus = get_monitoring_bus() 

272 monitoring_bus.start() 

273 

274 self.__state_monitoring_task = asyncio.ensure_future( 

275 self._state_monitoring_worker(period=MONITORING_PERIOD), 

276 loop=self.__main_loop 

277 ) 

278 

279 def _instantiate_grpc(self) -> None: 

280 """Instantiate the gRPC objects. 

281 

282 This creates the gRPC server, and causes the instances attached to 

283 this server to instantiate any gRPC channels they need. This also 

284 sets up the services which route to those instances, and sets up 

285 gRPC reflection. 

286 

287 """ 

288 self.__grpc_server = grpc.server( 

289 self.__grpc_executor, 

290 maximum_concurrent_rpcs=self._max_grpc_workers 

291 ) 

292 

293 # We always want a capabilities service 

294 self._capabilities_service = CapabilitiesService(self.__grpc_server) 

295 

296 for instance_name, instance in self._execution_instances.items(): 

297 instance.setup_grpc() 

298 self._add_execution_instance(instance, instance_name) 

299 

300 for instance_name, instance in self._operations_instances.items(): 

301 instance.setup_grpc() 

302 self._add_operations_instance(instance, instance_name) 

303 

304 for instance_name, instance in self._bots_instances.items(): 

305 instance.setup_grpc() 

306 self._add_bots_instance(instance, instance_name) 

307 

308 for instance_name, instance in self._cas_instances.items(): 

309 instance.setup_grpc() 

310 self._add_cas_instance(instance, instance_name) 

311 

312 for instance_name, instance in self._bytestream_instances.items(): 

313 instance.setup_grpc() 

314 self._add_bytestream_instance(instance, instance_name) 

315 

316 for instance_name, instance in self._logstream_instances.items(): 

317 instance.setup_grpc() 

318 self._add_logstream_instance(instance, instance_name) 

319 

320 for instance_name, instance in self._action_cache_instances.items(): 

321 instance.setup_grpc() 

322 self._add_action_cache_instance(instance, instance_name) 

323 

324 for instance_name, instance in self._reference_storage_instances.items(): 

325 instance.setup_grpc() 

326 self._add_reference_storage_instance(instance, instance_name) 

327 

328 # Add the requested ports to the gRPC server 

329 for address, credentials in self._ports: 

330 if credentials is not None: 

331 self.__logger.info(f"Adding secure connection on: [{address}]") 

332 server_key = credentials.get('tls-server-key') 

333 server_cert = credentials.get('tls-server-cert') 

334 client_certs = credentials.get('tls-client-certs') 

335 credentials = load_tls_server_credentials( 

336 server_cert=server_cert, 

337 server_key=server_key, 

338 client_certs=client_certs 

339 ) 

340 port_number = self.__grpc_server.add_secure_port(address, credentials) 

341 

342 else: 

343 self.__logger.info(f"Adding insecure connection on [{address}]") 

344 port_number = self.__grpc_server.add_insecure_port(address) 

345 self._port_map[address] = port_number 

346 

347 if not port_number: 

348 raise PermissionDeniedError("Unable to configure socket") 

349 

350 self.__enable_server_reflection() 

351 

352 def start( 

353 self, 

354 *, 

355 on_server_start_cb: Optional[Callable]=None, 

356 port_assigned_callback: Optional[Callable]=None 

357 ) -> None: 

358 """Starts the BuildGrid server. 

359 

360 BuildGrid server startup consists of 3 stages, 

361 

362 1. Starting logging and monitoring 

363 

364 This step starts up the logging coroutine, the periodic status metrics 

365 coroutine, and the monitoring bus' publishing subprocess. Since this 

366 step involves forking, anything not fork-safe needs to be done *after* 

367 this step. 

368 

369 2. Instantiate gRPC 

370 

371 This step instantiates the gRPC server, and tells all the instances 

372 which have been attached to the server to instantiate their gRPC 

373 objects. It is also responsible for creating the various service 

374 objects and connecting them to the server and the instances. 

375 

376 After this step, gRPC core is running and its no longer safe to fork 

377 the process. 

378 

379 3. Start the gRPC server 

380 

381 The final step is starting up the gRPC server. The callback passed in 

382 via ``on_server_start_cb`` is executed in this step once the server 

383 has started. After this point BuildGrid is ready to serve requests. 

384 

385 The final thing done by this method is adding a ``SIGTERM`` handler 

386 which calls the ``Server.stop`` method to the event loop, and then 

387 that loop is started up using ``run_forever()``. 

388 

389 Args: 

390 on_server_start_cb (Callable): Callback function to execute once 

391 the gRPC server has started up. 

392 port_assigned_callback (Callable): Callback function to execute 

393 once the gRPC server has started up. The mapping of addresses 

394 to ports is passed to this callback. 

395 

396 """ 

397 # 1. Start logging and monitoring 

398 self._start_logging() 

399 if self._is_instrumented: 

400 self._start_monitoring() 

401 

402 # 2. Instantiate gRPC objects 

403 self._instantiate_grpc() 

404 

405 # 3. Start the gRPC server 

406 self.__grpc_server.start() 

407 if on_server_start_cb: 

408 on_server_start_cb() 

409 if port_assigned_callback: 

410 port_assigned_callback(port_map=self._port_map) 

411 

412 # Add the stop handler and run the event loop 

413 self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop) 

414 self.__main_loop.run_forever() 

415 

416 def stop(self): 

417 """Stops the BuildGrid server.""" 

418 if self._is_instrumented: 

419 if self.__state_monitoring_task is not None: 

420 self.__state_monitoring_task.cancel() 

421 

422 monitoring_bus = get_monitoring_bus() 

423 monitoring_bus.stop() 

424 

425 if self.__logging_task is not None: 

426 self.__logging_task.cancel() 

427 

428 self.__main_loop.stop() 

429 

430 if self.__grpc_server is not None: 

431 self.__grpc_server.stop(None) 

432 

433 def add_port(self, address, credentials): 

434 """Adds a port to the server. 

435 

436 Must be called before the server starts. If a credentials object exists, 

437 it will make a secure port. 

438 

439 Args: 

440 address (str): The address with port number. 

441 credentials (:obj:`grpc.ChannelCredentials`): Credentials object. 

442 

443 Returns: 

444 int: Number of the bound port. 

445 

446 Raises: 

447 PermissionDeniedError: If socket binding fails. 

448 

449 """ 

450 self._ports.append((address, credentials)) 

451 

452 def add_execution_instance(self, instance, instance_name): 

453 """Adds an :obj:`ExecutionInstance` to the service. 

454 

455 If no service exists, it creates one. 

456 

457 Args: 

458 instance (:obj:`ExecutionInstance`): Instance to add. 

459 instance_name (str): Instance name. 

460 """ 

461 self._execution_instances[instance_name] = instance 

462 self._instances.add(instance_name) 

463 

464 def _add_execution_instance(self, instance, instance_name): 

465 if self._execution_service is None: 

466 self._execution_service = ExecutionService( 

467 self.__grpc_server, monitor=self._is_instrumented) 

468 

469 self._execution_service.add_instance(instance_name, instance) 

470 self._add_capabilities_instance(instance_name, execution_instance=instance) 

471 

472 self._schedulers.setdefault(instance_name, set()).add(instance.scheduler) 

473 

474 if self._is_instrumented: 

475 instance.scheduler.activate_monitoring() 

476 self._reflection_services.append(remote_execution_pb2.DESCRIPTOR.services_by_name['Execution'].full_name) 

477 

478 def add_bots_interface(self, instance, instance_name): 

479 """Adds a :obj:`BotsInterface` to the service. 

480 

481 If no service exists, it creates one. 

482 

483 Args: 

484 instance (:obj:`BotsInterface`): Instance to add. 

485 instance_name (str): Instance name. 

486 """ 

487 self._bots_instances[instance_name] = instance 

488 self._instances.add(instance_name) 

489 

490 def _add_bots_instance(self, instance, instance_name): 

491 if self._bots_service is None: 

492 self._bots_service = BotsService( 

493 self.__grpc_server, monitor=self._is_instrumented) 

494 

495 self._bots_service.add_instance(instance_name, instance) 

496 

497 self._schedulers.setdefault(instance_name, set()).add(instance.scheduler) 

498 

499 if self._is_instrumented: 

500 instance.scheduler.activate_monitoring() 

501 

502 def add_operations_instance(self, instance, instance_name): 

503 """Adds an :obj:`OperationsInstance` to the service. 

504 

505 If no service exists, it creates one. 

506 

507 Args: 

508 instance (:obj:`OperationsInstance`): Instance to add. 

509 instance_name (str): Instance name. 

510 """ 

511 self._operations_instances[instance_name] = instance 

512 

513 def _add_operations_instance(self, instance, instance_name): 

514 if self._operations_service is None: 

515 self._operations_service = OperationsService(self.__grpc_server) 

516 

517 self._operations_service.add_instance(instance_name, instance) 

518 self._reflection_services.append(operations_pb2.DESCRIPTOR.services_by_name['Operations'].full_name) 

519 

520 def add_reference_storage_instance(self, instance, instance_name): 

521 """Adds a :obj:`ReferenceCache` to the service. 

522 

523 If no service exists, it creates one. 

524 

525 Args: 

526 instance (:obj:`ReferenceCache`): Instance to add. 

527 instance_name (str): Instance name. 

528 """ 

529 self._reference_storage_instances[instance_name] = instance 

530 

531 def _add_reference_storage_instance(self, instance, instance_name): 

532 if self._reference_storage_service is None: 

533 self._reference_storage_service = ReferenceStorageService(self.__grpc_server) 

534 

535 self._reference_storage_service.add_instance(instance_name, instance) 

536 

537 def add_action_cache_instance(self, instance, instance_name): 

538 """Adds a :obj:`ReferenceCache` to the service. 

539 

540 If no service exists, it creates one. 

541 

542 Args: 

543 instance (:obj:`ReferenceCache`): Instance to add. 

544 instance_name (str): Instance name. 

545 """ 

546 self._action_cache_instances[instance_name] = instance 

547 

548 def _add_action_cache_instance(self, instance, instance_name): 

549 if self._action_cache_service is None: 

550 self._action_cache_service = ActionCacheService(self.__grpc_server) 

551 

552 self._action_cache_service.add_instance(instance_name, instance) 

553 self._add_capabilities_instance(instance_name, action_cache_instance=instance) 

554 self._reflection_services.append(remote_execution_pb2.DESCRIPTOR.services_by_name['ActionCache'].full_name) 

555 

556 def add_cas_instance(self, instance, instance_name): 

557 """Adds a :obj:`ContentAddressableStorageInstance` to the service. 

558 

559 If no service exists, it creates one. 

560 

561 Args: 

562 instance (:obj:`ReferenceCache`): Instance to add. 

563 instance_name (str): Instance name. 

564 """ 

565 self._cas_instances[instance_name] = instance 

566 

567 def _add_cas_instance(self, instance, instance_name): 

568 if self._cas_service is None: 

569 self._cas_service = ContentAddressableStorageService(self.__grpc_server) 

570 

571 self._cas_service.add_instance(instance_name, instance) 

572 self._add_capabilities_instance(instance_name, cas_instance=instance) 

573 self._reflection_services.append( 

574 remote_execution_pb2.DESCRIPTOR.services_by_name['ContentAddressableStorage'].full_name) 

575 

576 def add_bytestream_instance(self, instance, instance_name): 

577 """Adds a :obj:`ByteStreamInstance` to the service. 

578 

579 If no service exists, it creates one. 

580 

581 Args: 

582 instance (:obj:`ByteStreamInstance`): Instance to add. 

583 instance_name (str): Instance name. 

584 """ 

585 self._bytestream_instances[instance_name] = instance 

586 

587 def _add_bytestream_instance(self, instance, instance_name): 

588 if self._bytestream_service is None: 

589 self._bytestream_service = ByteStreamService(self.__grpc_server) 

590 

591 self._bytestream_service.add_instance(instance_name, instance) 

592 

593 def add_logstream_instance(self, instance, instance_name): 

594 """Adds a :obj:`LogStreamInstance` to the service. 

595 

596 If no service exists, it creates one. 

597 

598 Args: 

599 instance (:obj:`LogStreamInstance`): Instance to add. 

600 instance_name (str): The name of the instance being added. 

601 

602 """ 

603 self._logstream_instances[instance_name] = instance 

604 

605 def _add_logstream_instance(self, instance, instance_name): 

606 if self._logstream_service is None: 

607 self._logstream_service = LogStreamService(self.__grpc_server) 

608 

609 self._logstream_service.add_instance(instance_name, instance) 

610 

611 # --- Public API: Monitoring --- 

612 

613 @property 

614 def is_instrumented(self): 

615 return self._is_instrumented 

616 

617 # --- Private API --- 

618 

619 def _add_capabilities_instance(self, instance_name, 

620 cas_instance=None, 

621 action_cache_instance=None, 

622 execution_instance=None): 

623 """Adds a :obj:`CapabilitiesInstance` to the service. 

624 

625 Args: 

626 instance (:obj:`CapabilitiesInstance`): Instance to add. 

627 instance_name (str): Instance name. 

628 """ 

629 

630 try: 

631 if cas_instance: 

632 self._capabilities_service.add_cas_instance(instance_name, cas_instance) 

633 if action_cache_instance: 

634 self._capabilities_service.add_action_cache_instance(instance_name, action_cache_instance) 

635 if execution_instance: 

636 self._capabilities_service.add_execution_instance(instance_name, execution_instance) 

637 

638 except KeyError: 

639 capabilities_instance = CapabilitiesInstance(cas_instance, 

640 action_cache_instance, 

641 execution_instance) 

642 self._capabilities_service.add_instance(instance_name, capabilities_instance) 

643 

644 async def _logging_worker(self): 

645 """Publishes log records to the monitoring bus.""" 

646 async def __logging_worker(): 

647 monitoring_bus = get_monitoring_bus() 

648 log_record = await self.__logging_queue.async_q.get() 

649 

650 # Print log records to stdout, if required: 

651 if self.__print_log_records: 

652 record = self.__logging_formatter.format(log_record) 

653 

654 # TODO: Investigate if async write would be worth here. 

655 sys.stdout.write(f'{record}\n') 

656 sys.stdout.flush() 

657 

658 # Emit a log record if server is instrumented: 

659 if self._is_instrumented: 

660 log_record_level = LogRecordLevel(int(log_record.levelno / 10)) 

661 log_record_creation_time = datetime.fromtimestamp(log_record.created) 

662 # logging.LogRecord.extra must be a str to str dict: 

663 if 'extra' in log_record.__dict__ and log_record.extra: 

664 log_record_metadata = log_record.extra 

665 else: 

666 log_record_metadata = None 

667 record = self._forge_log_record( 

668 domain=log_record.name, level=log_record_level, message=log_record.message, 

669 creation_time=log_record_creation_time, metadata=log_record_metadata) 

670 

671 await monitoring_bus.send_record(record) 

672 

673 while True: 

674 try: 

675 await __logging_worker() 

676 

677 except asyncio.CancelledError: 

678 break 

679 except Exception: 

680 # The thread shouldn't exit on exceptions, but output the exception so that 

681 # it can be found in the logs. 

682 # 

683 # Note, we DO NOT use `self.__logger` here, because we don't want to write 

684 # anything new to the logging queue in case the Exception isn't some transient 

685 # issue. 

686 try: 

687 sys.stdout.write("Exception in logging worker\n") 

688 sys.stdout.flush() 

689 traceback.print_exc() 

690 except Exception: 

691 # There's not a lot we can do at this point really. 

692 pass 

693 

694 def _forge_log_record(self, *, 

695 domain: str, 

696 level: LogRecordLevel, 

697 message: str, 

698 creation_time: datetime, 

699 metadata: Dict[str, str]=None): 

700 log_record = monitoring_pb2.LogRecord() 

701 

702 log_record.creation_timestamp.FromDatetime(creation_time) 

703 log_record.domain = domain 

704 log_record.level = level.value 

705 log_record.message = message 

706 if metadata is not None: 

707 log_record.metadata.update(metadata) 

708 

709 return log_record 

710 

711 async def _state_monitoring_worker(self, period=1.0): 

712 """Periodically publishes state metrics to the monitoring bus.""" 

713 async def __state_monitoring_worker(): 

714 monitoring_bus = get_monitoring_bus() 

715 # Execution metrics 

716 if self._execution_service: 

717 # Emit total clients count record: 

718 _, record = self._query_n_clients() 

719 await monitoring_bus.send_record(record) 

720 

721 for instance_name in self._instances: 

722 # Emit instance clients count record: 

723 _, record = self._query_n_clients_for_instance(instance_name) 

724 await monitoring_bus.send_record(record) 

725 

726 if self._bots_service: 

727 # Emit total bots count record: 

728 _, record = self._query_n_bots() 

729 await monitoring_bus.send_record(record) 

730 

731 for instance_name in self._instances: 

732 # Emit instance bots count record: 

733 _, record = self._query_n_bots_for_instance(instance_name) 

734 await monitoring_bus.send_record(record) 

735 

736 # Emits records by bot status: 

737 for bot_status in (botstatus for botstatus in BotStatus): 

738 # Emit status bots count record: 

739 _, record = self._query_n_bots_for_status(bot_status) 

740 await monitoring_bus.send_record(record) 

741 

742 if self._schedulers: 

743 queue_times = [] 

744 # Emits records by instance: 

745 for instance_name in self._instances: 

746 # Emit instance average queue time record: 

747 queue_time, record = self._query_am_queue_time_for_instance(instance_name) 

748 await monitoring_bus.send_record(record) 

749 if queue_time: 

750 queue_times.append(queue_time) 

751 

752 scheduler_metrics = self._query_scheduler_metrics_for_instance(instance_name) 

753 # This will be skipped if there were no stage changes 

754 if scheduler_metrics: 

755 for _, record in self._records_from_scheduler_metrics(scheduler_metrics, instance_name): 

756 await monitoring_bus.send_record(record) 

757 

758 # Emit overall average queue time record: 

759 if queue_times: 

760 am_queue_time = sum(queue_times, timedelta()) / len(queue_times) 

761 else: 

762 am_queue_time = timedelta() 

763 record = create_timer_record( 

764 AVERAGE_QUEUE_TIME_METRIC_NAME, 

765 am_queue_time) 

766 

767 await monitoring_bus.send_record(record) 

768 

769 while True: 

770 start = time.time() 

771 try: 

772 await __state_monitoring_worker() 

773 

774 except asyncio.CancelledError: 

775 break 

776 except Exception: 

777 # The thread shouldn't exit on exceptions, but log at a severe enough level 

778 # that it doesn't get lost in logs 

779 self.__logger.exception("Exception while gathering state metrics") 

780 

781 end = time.time() 

782 await asyncio.sleep(period - (end - start)) 

783 

784 # --- Private API: Monitoring --- 

785 

786 def _query_n_clients(self): 

787 """Queries the number of clients connected.""" 

788 n_clients = self._execution_service.query_n_clients() 

789 gauge_record = create_gauge_record( 

790 CLIENT_COUNT_METRIC_NAME, n_clients) 

791 

792 return n_clients, gauge_record 

793 

794 def _query_n_clients_for_instance(self, instance_name): 

795 """Queries the number of clients connected for a given instance""" 

796 n_clients = self._execution_service.query_n_clients_for_instance(instance_name) 

797 gauge_record = create_gauge_record( 

798 CLIENT_COUNT_METRIC_NAME, n_clients, 

799 metadata={'instance-name': instance_name or ''}) 

800 

801 return n_clients, gauge_record 

802 

803 def _query_n_bots(self): 

804 """Queries the number of bots connected.""" 

805 n_bots = self._bots_service.query_n_bots() 

806 gauge_record = create_gauge_record( 

807 BOT_COUNT_METRIC_NAME, n_bots) 

808 

809 return n_bots, gauge_record 

810 

811 def _query_n_bots_for_instance(self, instance_name): 

812 """Queries the number of bots connected for a given instance.""" 

813 n_bots = self._bots_service.query_n_bots_for_instance(instance_name) 

814 gauge_record = create_gauge_record( 

815 BOT_COUNT_METRIC_NAME, n_bots, 

816 metadata={'instance-name': instance_name or ''}) 

817 

818 return n_bots, gauge_record 

819 

820 def _query_n_bots_for_status(self, bot_status): 

821 """Queries the number of bots connected for a given health status.""" 

822 n_bots = self._bots_service.query_n_bots_for_status(bot_status) 

823 gauge_record = create_gauge_record(BOT_COUNT_METRIC_NAME, n_bots, 

824 metadata={'bot-status': bot_status.name, 'statsd-bucket': bot_status.name}) 

825 

826 return n_bots, gauge_record 

827 

828 def _query_am_queue_time_for_instance(self, instance_name): 

829 """Queries the average job's queue time for a given instance.""" 

830 # Multiple schedulers may be active for this instance, but only 

831 # one (the one associated with the BotsInterface) actually keeps 

832 # track of this metric. So publish the first one that has a non-default 

833 # value 

834 for scheduler in self._schedulers[instance_name]: 

835 am_queue_time = scheduler.query_am_queue_time() 

836 if am_queue_time != timedelta(): 

837 break 

838 timer_record = create_timer_record( 

839 AVERAGE_QUEUE_TIME_METRIC_NAME, am_queue_time, 

840 metadata={'instance-name': instance_name or ''}) 

841 

842 return am_queue_time, timer_record 

843 

844 def _query_scheduler_metrics_for_instance(self, instance_name): 

845 # Since multiple schedulers may be active for this instance, but should 

846 # be using the same data-store, just use the first one 

847 for scheduler in self._schedulers[instance_name]: 

848 return scheduler.get_metrics() 

849 

850 def _records_from_scheduler_metrics(self, scheduler_metrics, instance_name): 

851 # Jobs 

852 for stage, n_jobs in scheduler_metrics[MetricCategories.JOBS.value].items(): 

853 

854 stage = OperationStage(stage) 

855 gauge_record = create_gauge_record( 

856 JOB_COUNT_METRIC_NAME, n_jobs, 

857 metadata={'instance-name': instance_name or '', 

858 'operation-stage': stage.name, 

859 'statsd-bucket': stage.name}) 

860 yield n_jobs, gauge_record 

861 # Leases 

862 for state, n_leases in scheduler_metrics[MetricCategories.LEASES.value].items(): 

863 state = LeaseState(state) 

864 gauge_record = create_gauge_record( 

865 LEASE_COUNT_METRIC_NAME, n_leases, 

866 metadata={'instance-name': instance_name or '', 

867 'lease-state': state.name, 

868 'statsd-bucket': state.name}) 

869 yield n_leases, gauge_record 

870 

871 # --- Private API --- 

872 

873 def __enable_server_reflection(self) -> None: 

874 

875 if self._server_reflection: 

876 services = ', '.join(self._reflection_services[1:]) 

877 self.__logger.info( 

878 f"Server reflection is enabled for the following services: {services}") 

879 reflection.enable_server_reflection(self._reflection_services, self.__grpc_server) 

880 else: 

881 self.__logger.info("Server reflection is not enabled.")