Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/server.py: 83.23%

310 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17import logging.handlers 

18import os 

19import sys 

20import threading 

21import time 

22import traceback 

23from collections import defaultdict 

24from contextlib import ExitStack 

25from datetime import datetime 

26from queue import Empty, Queue 

27from typing import Any, Dict, Iterable, List, Optional, Set, Tuple 

28 

29import grpc 

30from grpc_reflection.v1alpha import reflection 

31 

32from buildgrid._enums import BotStatus, LeaseState, LogRecordLevel, MetricCategories, OperationStage 

33from buildgrid._exceptions import PermissionDeniedError 

34from buildgrid._protos.buildgrid.v2.monitoring_pb2 import LogRecord, MetricRecord 

35from buildgrid._types import OnServerStartCallback, PortAssignedCallback 

36from buildgrid.server.actioncache.service import ActionCacheService 

37from buildgrid.server.auth.config import InstanceAuthorizationConfig 

38from buildgrid.server.auth.enums import AuthMetadataAlgorithm, AuthMetadataMethod 

39from buildgrid.server.auth.manager import HeadersAuthManager, JWTAuthManager, set_auth_manager 

40from buildgrid.server.bots.service import BotsService 

41from buildgrid.server.build_events.service import PublishBuildEventService, QueryBuildEventsService 

42from buildgrid.server.capabilities.instance import CapabilitiesInstance 

43from buildgrid.server.capabilities.service import CapabilitiesService 

44from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService 

45from buildgrid.server.controller import ExecutionController 

46from buildgrid.server.execution.service import ExecutionService 

47from buildgrid.server.metrics_names import ( 

48 BOT_COUNT_METRIC_NAME, 

49 CLIENT_COUNT_METRIC_NAME, 

50 JOB_COUNT_METRIC_NAME, 

51 LEASE_COUNT_METRIC_NAME, 

52) 

53from buildgrid.server.metrics_utils import create_gauge_record 

54from buildgrid.server.monitoring import ( 

55 MonitoringBus, 

56 MonitoringOutputFormat, 

57 MonitoringOutputType, 

58 StatsDTagFormat, 

59 get_monitoring_bus, 

60 set_monitoring_bus, 

61) 

62from buildgrid.server.operations.service import OperationsService 

63from buildgrid.server.persistence.sql.impl import DataStoreMetrics, SQLDataStore 

64from buildgrid.server.servicer import Instance, InstancedServicer 

65from buildgrid.server.threading import ContextThreadPoolExecutor, ContextWorker 

66from buildgrid.settings import ( 

67 DEFAULT_JWKS_REFETCH_INTERVAL_MINUTES, 

68 LOG_RECORD_FORMAT, 

69 MIN_THREAD_POOL_SIZE, 

70 MONITORING_PERIOD, 

71) 

72from buildgrid.utils import read_file 

73 

74LOGGER = logging.getLogger(__name__) 

75 

76 

77def load_tls_server_credentials( 

78 server_key: Optional[str] = None, server_cert: Optional[str] = None, client_certs: Optional[str] = None 

79) -> Optional[grpc.ServerCredentials]: 

80 """Looks-up and loads TLS server gRPC credentials. 

81 

82 Every private and public keys are expected to be PEM-encoded. 

83 

84 Args: 

85 server_key(str): private server key file path. 

86 server_cert(str): public server certificate file path. 

87 client_certs(str): public client certificates file path. 

88 

89 Returns: 

90 :obj:`ServerCredentials`: The credentials for use for a 

91 TLS-encrypted gRPC server channel. 

92 """ 

93 if not server_key or not os.path.exists(server_key): 

94 return None 

95 

96 if not server_cert or not os.path.exists(server_cert): 

97 return None 

98 

99 server_key_pem = read_file(server_key) 

100 server_cert_pem = read_file(server_cert) 

101 if client_certs and os.path.exists(client_certs): 

102 client_certs_pem = read_file(client_certs) 

103 else: 

104 client_certs_pem = None 

105 client_certs = None 

106 

107 credentials = grpc.ssl_server_credentials( 

108 [(server_key_pem, server_cert_pem)], root_certificates=client_certs_pem, require_client_auth=bool(client_certs) 

109 ) 

110 

111 # TODO: Fix this (missing stubs?) "ServerCredentials" has no attribute 

112 credentials.server_key = server_key # type: ignore[attr-defined] 

113 credentials.server_cert = server_cert # type: ignore[attr-defined] 

114 credentials.client_certs = client_certs # type: ignore[attr-defined] 

115 

116 return credentials 

117 

118 

119class Server: 

120 """Creates a BuildGrid server instance. 

121 

122 The :class:`Server` class binds together all the gRPC services. 

123 """ 

124 

125 def __init__( 

126 self, 

127 max_workers: Optional[int] = None, 

128 monitor: bool = False, 

129 mon_endpoint_type: MonitoringOutputType = MonitoringOutputType.STDOUT, 

130 mon_endpoint_location: Optional[str] = None, 

131 mon_serialisation_format: MonitoringOutputFormat = MonitoringOutputFormat.JSON, 

132 mon_metric_prefix: str = "", 

133 mon_tag_format: StatsDTagFormat = StatsDTagFormat.NONE, 

134 auth_method: AuthMetadataMethod = AuthMetadataMethod.NONE, 

135 auth_secret: Optional[str] = None, 

136 auth_jwks_url: Optional[str] = None, 

137 auth_audience: Optional[str] = None, 

138 auth_jwks_fetch_minutes: Optional[int] = None, 

139 auth_algorithm: AuthMetadataAlgorithm = AuthMetadataAlgorithm.UNSPECIFIED, 

140 auth_acl_config: Optional[Dict[str, InstanceAuthorizationConfig]] = None, 

141 auth_allow_unauthorized_instances: Optional[List[str]] = None, 

142 enable_server_reflection: bool = True, 

143 grpc_compression: grpc.Compression = grpc.Compression.NoCompression, 

144 monitoring_period: float = MONITORING_PERIOD, 

145 ): 

146 """Initializes a new :class:`Server` instance. 

147 

148 Args: 

149 max_workers (int, optional): A pool of max worker threads. 

150 monitor (bool, optional): Whether to globally activate server 

151 monitoring. Defaults to ``False``. 

152 auth_method (AuthMetadataMethod, optional): Authentication method to 

153 be used for request authorization. Defaults to ``NONE``. 

154 auth_secret (str, optional): The secret or key to be used for 

155 authorizing request using `auth_method`. Defaults to ``None``. 

156 auth_jwks_url (str, optional): The url to fetch the JWKs. 

157 Either secret or this field must be specified. Defaults to ``None``. 

158 auth_audience (str): The audience used to validate jwt tokens against. 

159 The tokens must have an audience field. 

160 auth_jwks_fetch_minutes (int): The number of minutes to wait before 

161 refetching the jwks set. Default: 60 minutes. 

162 auth_algorithm (AuthMetadataAlgorithm, optional): The crytographic 

163 algorithm to be uses in combination with `auth_secret` for 

164 authorizing request using `auth_method`. Defaults to 

165 ``UNSPECIFIED``. 

166 auth_acl_config (Dict[str, InstanceAuthorizationConfig], optional): The acl 

167 rules to configure in what context a request is authorized 

168 auth_allow_unauthorized_instances (List[str], optional): List of instances that should 

169 be allowed to have unautheticated access 

170 enable_server_reflection (bool): Whether to enable grpc reflection for 

171 registered services. Defaults to ``True``. 

172 grpc_compression (grpc.Compression): The transport compression algorithm to use for sending responses, 

173 defaults to NoCompression 

174 """ 

175 self._stack = ExitStack() 

176 

177 self._is_instrumented = monitor 

178 

179 self._action_cache_service = ActionCacheService() 

180 self._bots_service = BotsService() 

181 self._bytestream_service = ByteStreamService() 

182 self._capabilities_service = CapabilitiesService() 

183 self._cas_service = ContentAddressableStorageService() 

184 self._execution_service = ExecutionService() 

185 self._operations_service = OperationsService() 

186 

187 # Special cases 

188 self._build_events_service = PublishBuildEventService() 

189 self._query_build_events_service = QueryBuildEventsService() 

190 

191 self._schedulers: Dict[str, Set[SQLDataStore]] = defaultdict(set) 

192 

193 self._ports: List[Tuple[str, Optional[Dict[str, str]]]] = [] 

194 self._port_map: Dict[str, int] = {} 

195 

196 self._server_reflection = enable_server_reflection 

197 self._grpc_compression = grpc_compression 

198 

199 self._logging_queue: Queue[Any] = Queue() 

200 self._monitoring_period = monitoring_period 

201 

202 if max_workers is None: 

203 # Use max_workers default from Python 3.4+ 

204 max_workers = max(MIN_THREAD_POOL_SIZE, (os.cpu_count() or 1) * 5) 

205 

206 elif max_workers < MIN_THREAD_POOL_SIZE: 

207 LOGGER.warning( 

208 f"Specified thread-limit=[{max_workers}] is too small, " f"bumping it to [{MIN_THREAD_POOL_SIZE}]" 

209 ) 

210 # Enforce a minumun for max_workers 

211 max_workers = MIN_THREAD_POOL_SIZE 

212 

213 self._max_grpc_workers = max_workers 

214 

215 if self._is_instrumented: 

216 set_monitoring_bus( 

217 MonitoringBus( 

218 endpoint_type=mon_endpoint_type, 

219 endpoint_location=mon_endpoint_location, 

220 metric_prefix=mon_metric_prefix, 

221 serialisation_format=mon_serialisation_format, 

222 tag_format=mon_tag_format, 

223 ) 

224 ) 

225 

226 if auth_method == AuthMetadataMethod.JWT: 

227 if auth_jwks_fetch_minutes is None: 

228 auth_jwks_fetch_minutes = DEFAULT_JWKS_REFETCH_INTERVAL_MINUTES 

229 set_auth_manager( 

230 JWTAuthManager( 

231 secret=auth_secret, 

232 algorithm=auth_algorithm, 

233 jwks_url=auth_jwks_url, 

234 audience=auth_audience, 

235 jwks_fetch_minutes=auth_jwks_fetch_minutes, 

236 acls=auth_acl_config, 

237 allow_unauthorized_instances=( 

238 set(auth_allow_unauthorized_instances) if auth_allow_unauthorized_instances else None 

239 ), 

240 ) 

241 ) 

242 elif auth_method == AuthMetadataMethod.HEADERS: 

243 set_auth_manager( 

244 HeadersAuthManager( 

245 acls=auth_acl_config, 

246 allow_unauthorized_instances=( 

247 set(auth_allow_unauthorized_instances) if auth_allow_unauthorized_instances else None 

248 ), 

249 ) 

250 ) 

251 

252 LOGGER.debug(f"Setting up gRPC server with thread-limit=[{max_workers}]") 

253 

254 def register_instance(self, instance_name: str, instance: Instance) -> None: 

255 """ 

256 Register an instance with the server. Handled the logic of mapping instances to the 

257 correct servicer container. 

258 

259 Args: 

260 instance_name (str): The name of the instance. 

261 

262 instance (Instance): The instance implementation. 

263 """ 

264 

265 # Special case to handle the ExecutionController which combines the service interfaces. 

266 if isinstance(instance, ExecutionController): 

267 if bots_interface := instance.bots_interface: 

268 self.register_instance(instance_name, bots_interface) 

269 if execution_instance := instance.execution_instance: 

270 self.register_instance(instance_name, execution_instance) 

271 if operations_instance := instance.operations_instance: 

272 self.register_instance(instance_name, operations_instance) 

273 

274 elif action_instance := self._action_cache_service.cast(instance): 

275 self._action_cache_service.add_instance(instance_name, action_instance) 

276 instance.set_instance_name(instance_name) 

277 capabilities = self._capabilities_service.instances.setdefault(instance_name, CapabilitiesInstance()) 

278 capabilities.add_action_cache_instance(action_instance) 

279 

280 elif bots_instance := self._bots_service.cast(instance): 

281 self._bots_service.add_instance(instance_name, bots_instance) 

282 instance.set_instance_name(instance_name) 

283 self._schedulers[instance_name].add(bots_instance.scheduler) 

284 

285 elif bytestream_instance := self._bytestream_service.cast(instance): 

286 self._bytestream_service.add_instance(instance_name, bytestream_instance) 

287 instance.set_instance_name(instance_name) 

288 

289 elif cas_instance := self._cas_service.cast(instance): 

290 self._cas_service.add_instance(instance_name, cas_instance) 

291 instance.set_instance_name(instance_name) 

292 capabilities = self._capabilities_service.instances.setdefault(instance_name, CapabilitiesInstance()) 

293 capabilities.add_cas_instance(cas_instance) 

294 

295 elif execution_instance := self._execution_service.cast(instance): 

296 self._execution_service.add_instance(instance_name, execution_instance) 

297 instance.set_instance_name(instance_name) 

298 self._schedulers[instance_name].add(execution_instance.scheduler) 

299 capabilities = self._capabilities_service.instances.setdefault(instance_name, CapabilitiesInstance()) 

300 capabilities.add_execution_instance(execution_instance) 

301 

302 elif operations_instance := self._operations_service.cast(instance): 

303 self._operations_service.add_instance(instance_name, operations_instance) 

304 instance.set_instance_name(instance_name) 

305 

306 # The Build Events Services have no support for instance names, so this 

307 # is a bit of a special case where the storage backend itself is the 

308 # trigger for creating the gRPC services. 

309 elif instance.SERVICE_NAME == "BuildEvents": 

310 self._build_events_service.add_instance("", instance) # type: ignore[arg-type] 

311 self._query_build_events_service.add_instance("", instance) # type: ignore[arg-type] 

312 instance.set_instance_name("") 

313 

314 else: 

315 raise ValueError(f"Instance of type {type(instance)} not supported by {type(self)}") 

316 

317 @property 

318 def _services(self) -> Iterable[InstancedServicer[Any]]: 

319 return ( 

320 self._action_cache_service, 

321 self._bots_service, 

322 self._bytestream_service, 

323 self._capabilities_service, 

324 self._cas_service, 

325 self._execution_service, 

326 self._operations_service, 

327 # Special cases 

328 self._build_events_service, 

329 self._query_build_events_service, 

330 ) 

331 

332 def add_port(self, address: str, credentials: Optional[Dict[str, str]]) -> None: 

333 """Adds a port to the server. 

334 

335 Must be called before the server starts. If a credentials object exists, 

336 it will make a secure port. 

337 

338 Args: 

339 address (str): The address with port number. 

340 credentials (:obj:`grpc.ChannelCredentials`): Credentials object. 

341 """ 

342 self._ports.append((address, credentials)) 

343 

344 @property 

345 def is_instrumented(self) -> bool: 

346 return self._is_instrumented 

347 

348 def start( 

349 self, 

350 *, 

351 on_server_start_cb: Optional[OnServerStartCallback] = None, 

352 port_assigned_callback: Optional[PortAssignedCallback] = None, 

353 run_forever: bool = True, 

354 ) -> None: 

355 """Starts the BuildGrid server. 

356 

357 BuildGrid server startup consists of 3 stages, 

358 

359 1. Starting logging and monitoring 

360 

361 This step starts up the logging coroutine, the periodic status metrics 

362 coroutine, and the monitoring bus' publishing subprocess. Since this 

363 step involves forking, anything not fork-safe needs to be done *after* 

364 this step. 

365 

366 2. Instantiate gRPC 

367 

368 This step instantiates the gRPC server, and tells all the instances 

369 which have been attached to the server to instantiate their gRPC 

370 objects. It is also responsible for creating the various service 

371 objects and connecting them to the server and the instances. 

372 

373 After this step, gRPC core is running and its no longer safe to fork 

374 the process. 

375 

376 3. Start instances 

377 

378 Several of BuildGrid's services use background threads that need to 

379 be explicitly started when BuildGrid starts up. Rather than doing 

380 this at configuration parsing time, this step provides a hook for 

381 services to start up in a more organised fashion. 

382 

383 4. Start the gRPC server 

384 

385 The final step is starting up the gRPC server. The callback passed in 

386 via ``on_server_start_cb`` is executed in this step once the server 

387 has started. After this point BuildGrid is ready to serve requests. 

388 

389 The final thing done by this method is adding a ``SIGTERM`` handler 

390 which calls the ``Server.stop`` method to the event loop, and then 

391 that loop is started up using ``run_forever()``. 

392 

393 Args: 

394 on_server_start_cb (Callable): Callback function to execute once 

395 the gRPC server has started up. 

396 port_assigned_callback (Callable): Callback function to execute 

397 once the gRPC server has started up. The mapping of addresses 

398 to ports is passed to this callback. 

399 

400 """ 

401 

402 # 1. Start logging and monitoring 

403 self._stack.enter_context( 

404 ContextWorker( 

405 self._logging_worker, 

406 "ServerLogger", 

407 # Add a dummy value to the queue to unblock the get call. 

408 on_shutdown_requested=lambda: self._logging_queue.put(None), 

409 ) 

410 ) 

411 if self._is_instrumented: 

412 self._stack.enter_context(get_monitoring_bus()) 

413 self._stack.enter_context(ContextWorker(self._state_monitoring_worker, "ServerMonitor")) 

414 

415 # 2. Instantiate gRPC objects 

416 grpc_server = self.setup_grpc() 

417 

418 # 3. Start background threads 

419 for service in self._services: 

420 self._stack.enter_context(service) 

421 

422 # 4. Start the gRPC server. 

423 grpc_server.start() 

424 self._stack.callback(grpc_server.stop, None) 

425 

426 if on_server_start_cb: 

427 on_server_start_cb() 

428 if port_assigned_callback: 

429 port_assigned_callback(port_map=self._port_map) 

430 

431 # Add the stop handler and run the event loop 

432 if run_forever: 

433 grpc_server.wait_for_termination() 

434 

435 def setup_grpc(self) -> grpc.Server: 

436 """Instantiate the gRPC objects. 

437 

438 This creates the gRPC server, and causes the instances attached to 

439 this server to instantiate any gRPC channels they need. This also 

440 sets up the services which route to those instances, and sets up 

441 gRPC reflection. 

442 

443 """ 

444 grpc_server = grpc.server( 

445 ContextThreadPoolExecutor(self._max_grpc_workers, "gRPC_Executor", immediate_copy=True), 

446 maximum_concurrent_rpcs=self._max_grpc_workers, 

447 compression=self._grpc_compression, 

448 ) 

449 

450 # Add the requested ports to the gRPC server 

451 for address, credentials in self._ports: 

452 port_number = 0 

453 if credentials is not None: 

454 LOGGER.info(f"Adding secure connection on: [{address}]") 

455 server_key = credentials.get("tls-server-key") 

456 server_cert = credentials.get("tls-server-cert") 

457 client_certs = credentials.get("tls-client-certs") 

458 server_credentials = load_tls_server_credentials( 

459 server_cert=server_cert, server_key=server_key, client_certs=client_certs 

460 ) 

461 # TODO should this error out?? 

462 if server_credentials: 

463 port_number = grpc_server.add_secure_port(address, server_credentials) 

464 

465 else: 

466 LOGGER.info(f"Adding insecure connection on [{address}]") 

467 port_number = grpc_server.add_insecure_port(address) 

468 

469 if not port_number: 

470 raise PermissionDeniedError("Unable to configure socket") 

471 

472 self._port_map[address] = port_number 

473 

474 for service in self._services: 

475 service.setup_grpc(grpc_server) 

476 

477 if self._server_reflection: 

478 reflection_services = [service.FULL_NAME for service in self._services if service.enabled] 

479 LOGGER.info(f"Server reflection is enabled for the following services: {reflection_services}") 

480 reflection.enable_server_reflection([reflection.SERVICE_NAME] + reflection_services, grpc_server) 

481 else: 

482 LOGGER.info("Server reflection is not enabled.") 

483 

484 return grpc_server 

485 

486 def stop(self, *args: Any, **kwargs: Any) -> None: 

487 LOGGER.info("Stopping BuildGrid server") 

488 self._stack.close() 

489 

490 def _logging_worker(self, shutdown_requested: threading.Event) -> None: 

491 """Publishes log records to the monitoring bus.""" 

492 

493 logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT) 

494 logging_handler = logging.handlers.QueueHandler(self._logging_queue) 

495 

496 # Setup the main logging handler: 

497 root_logger = logging.getLogger() 

498 

499 for log_filter in root_logger.filters[:]: 

500 logging_handler.addFilter(log_filter) 

501 root_logger.removeFilter(log_filter) 

502 

503 for log_handler in root_logger.handlers[:]: 

504 for log_filter in log_handler.filters[:]: 

505 logging_handler.addFilter(log_filter) 

506 root_logger.removeHandler(log_handler) 

507 root_logger.addHandler(logging_handler) 

508 

509 def logging_worker() -> None: 

510 monitoring_bus = get_monitoring_bus() 

511 

512 try: 

513 log_record = self._logging_queue.get(timeout=self._monitoring_period) 

514 except Empty: 

515 return 

516 if log_record is None: 

517 return 

518 

519 # Print log records to stdout, if required: 

520 if not self._is_instrumented or not monitoring_bus.prints_records: 

521 record = logging_formatter.format(log_record) 

522 # TODO: Investigate if async write would be worth here. 

523 sys.stdout.write(f"{record}\n") 

524 sys.stdout.flush() 

525 

526 # Emit a log record if server is instrumented: 

527 if self._is_instrumented: 

528 log_record_level = LogRecordLevel(int(log_record.levelno / 10)) 

529 log_record_creation_time = datetime.fromtimestamp(log_record.created) 

530 # logging.LogRecord.extra must be a str to str dict: 

531 if "extra" in log_record.__dict__ and log_record.extra: 

532 log_record_metadata = log_record.extra 

533 else: 

534 log_record_metadata = None 

535 forged_record = self._forge_log_record( 

536 domain=log_record.name, 

537 level=log_record_level, 

538 message=log_record.message, 

539 creation_time=log_record_creation_time, 

540 metadata=log_record_metadata, 

541 ) 

542 monitoring_bus.send_record_nowait(forged_record) 

543 

544 while not shutdown_requested.is_set(): 

545 try: 

546 logging_worker() 

547 except Exception: 

548 # The thread shouldn't exit on exceptions, but output the exception so that 

549 # it can be found in the logs. 

550 # 

551 # Note, we DO NOT use `LOGGER` here, because we don't want to write 

552 # anything new to the logging queue in case the Exception isn't some transient 

553 # issue. 

554 try: 

555 sys.stdout.write("Exception in logging worker\n") 

556 sys.stdout.flush() 

557 traceback.print_exc() 

558 except Exception: 

559 # There's not a lot we can do at this point really. 

560 pass 

561 

562 if shutdown_requested.is_set(): 

563 # Reset logging, so any logging after shutting down the logging worker 

564 # still gets written to stdout and the queue doesn't get any more logs 

565 stream_handler = logging.StreamHandler(stream=sys.stdout) 

566 stream_handler.setFormatter(logging_formatter) 

567 root_logger = logging.getLogger() 

568 

569 for log_filter in root_logger.filters[:]: 

570 stream_handler.addFilter(log_filter) 

571 root_logger.removeFilter(log_filter) 

572 

573 for log_handler in root_logger.handlers[:]: 

574 for log_filter in log_handler.filters[:]: 

575 stream_handler.addFilter(log_filter) 

576 root_logger.removeHandler(log_handler) 

577 root_logger.addHandler(stream_handler) 

578 

579 # Drain the log message queue 

580 while self._logging_queue.qsize() > 0: 

581 logging_worker() 

582 

583 def _forge_log_record( 

584 self, 

585 *, 

586 domain: str, 

587 level: LogRecordLevel, 

588 message: str, 

589 creation_time: datetime, 

590 metadata: Optional[Dict[str, str]] = None, 

591 ) -> LogRecord: 

592 log_record = LogRecord() 

593 

594 log_record.creation_timestamp.FromDatetime(creation_time) 

595 log_record.domain = domain 

596 log_record.level = level.value 

597 log_record.message = message 

598 if metadata is not None: 

599 log_record.metadata.update(metadata) 

600 

601 return log_record 

602 

603 def _state_monitoring_worker(self, shutdown_requested: threading.Event) -> None: 

604 """Periodically publishes state metrics to the monitoring bus.""" 

605 

606 def __state_monitoring_worker() -> None: 

607 monitoring_bus = get_monitoring_bus() 

608 # Execution metrics 

609 if self._execution_service.enabled: 

610 # Emit total clients count record: 

611 _, record = self._query_n_clients() 

612 monitoring_bus.send_record_nowait(record) 

613 

614 for instance_name in self._execution_service.instances: 

615 # Emit instance clients count record: 

616 _, record = self._query_n_clients_for_instance(instance_name) 

617 monitoring_bus.send_record_nowait(record) 

618 

619 if self._bots_service.enabled: 

620 # Emit total bots count record: 

621 _, record = self._query_n_bots() 

622 monitoring_bus.send_record_nowait(record) 

623 

624 for instance_name in self._bots_service.instances: 

625 # Emit instance bots count record: 

626 _, record = self._query_n_bots_for_instance(instance_name) 

627 monitoring_bus.send_record_nowait(record) 

628 

629 # Emits records by bot status: 

630 for bot_status in (botstatus for botstatus in BotStatus): 

631 # Emit status bots count record: 

632 _, record = self._query_n_bots_for_status(bot_status) 

633 monitoring_bus.send_record_nowait(record) 

634 

635 if self._schedulers: 

636 for instance_name in self._schedulers: 

637 if scheduler_metrics := self._query_scheduler_metrics_for_instance(instance_name): 

638 for _, record in self._records_from_scheduler_metrics(scheduler_metrics, instance_name): 

639 monitoring_bus.send_record_nowait(record) 

640 

641 while not shutdown_requested.is_set(): 

642 start = time.time() 

643 try: 

644 __state_monitoring_worker() 

645 except Exception: 

646 # The thread shouldn't exit on exceptions, but log at a severe enough level 

647 # that it doesn't get lost in logs 

648 LOGGER.exception("Exception while gathering state metrics") 

649 

650 end = time.time() 

651 shutdown_requested.wait(timeout=max(0, self._monitoring_period - (end - start))) 

652 

653 # --- Private API: Monitoring --- 

654 

655 def _query_n_clients(self) -> Tuple[int, MetricRecord]: 

656 """Queries the number of clients connected.""" 

657 n_clients = self._execution_service.query_n_clients() 

658 gauge_record = create_gauge_record(CLIENT_COUNT_METRIC_NAME, n_clients) 

659 

660 return n_clients, gauge_record 

661 

662 def _query_n_clients_for_instance(self, instance_name: str) -> Tuple[int, MetricRecord]: 

663 """Queries the number of clients connected for a given instance""" 

664 n_clients = self._execution_service.query_n_clients_for_instance(instance_name) 

665 gauge_record = create_gauge_record( 

666 CLIENT_COUNT_METRIC_NAME, n_clients, metadata={"instance-name": instance_name or ""} 

667 ) 

668 

669 return n_clients, gauge_record 

670 

671 def _query_n_bots(self) -> Tuple[int, MetricRecord]: 

672 """Queries the number of bots connected.""" 

673 n_bots = self._bots_service.query_n_bots() 

674 gauge_record = create_gauge_record(BOT_COUNT_METRIC_NAME, n_bots) 

675 

676 return n_bots, gauge_record 

677 

678 def _query_n_bots_for_instance(self, instance_name: str) -> Tuple[int, MetricRecord]: 

679 """Queries the number of bots connected for a given instance.""" 

680 n_bots = self._bots_service.query_n_bots_for_instance(instance_name) 

681 gauge_record = create_gauge_record( 

682 BOT_COUNT_METRIC_NAME, n_bots, metadata={"instance-name": instance_name or ""} 

683 ) 

684 

685 return n_bots, gauge_record 

686 

687 def _query_n_bots_for_status(self, bot_status: BotStatus) -> Tuple[int, MetricRecord]: 

688 """Queries the number of bots connected for a given health status.""" 

689 n_bots = self._bots_service.query_n_bots_for_status(bot_status) 

690 gauge_record = create_gauge_record( 

691 BOT_COUNT_METRIC_NAME, n_bots, metadata={"bot-status": bot_status.name, "statsd-bucket": bot_status.name} 

692 ) 

693 

694 return n_bots, gauge_record 

695 

696 def _query_scheduler_metrics_for_instance(self, instance_name: str) -> Optional[DataStoreMetrics]: 

697 # Since multiple schedulers may be active for this instance, but should 

698 # be using the same data-store, just use the first one 

699 for scheduler in self._schedulers[instance_name]: 

700 return scheduler.get_metrics() 

701 return {"leases": {}, "jobs": {}} 

702 

703 def _records_from_scheduler_metrics( 

704 self, scheduler_metrics: DataStoreMetrics, instance_name: str 

705 ) -> Iterable[Tuple[int, MetricRecord]]: 

706 # Jobs 

707 for stage, n_jobs in scheduler_metrics[MetricCategories.JOBS.value].items(): 

708 stage = OperationStage(stage) 

709 gauge_record = create_gauge_record( 

710 JOB_COUNT_METRIC_NAME, 

711 n_jobs, 

712 metadata={ 

713 "instance-name": instance_name or "", 

714 "operation-stage": stage.name, 

715 "statsd-bucket": stage.name, 

716 }, 

717 ) 

718 yield n_jobs, gauge_record 

719 # Leases 

720 for state, n_leases in scheduler_metrics[MetricCategories.LEASES.value].items(): 

721 state = LeaseState(state) 

722 gauge_record = create_gauge_record( 

723 LEASE_COUNT_METRIC_NAME, 

724 n_leases, 

725 metadata={ 

726 "instance-name": instance_name or "", 

727 "lease-state": state.name, 

728 "statsd-bucket": state.name, 

729 }, 

730 ) 

731 yield n_leases, gauge_record