Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/monitoring.py: 34.65%

254 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import asyncio 

17import ctypes 

18from enum import Enum 

19import logging 

20from multiprocessing import Event, Process, Queue 

21from queue import Empty 

22import sys 

23import socket 

24import time 

25import threading 

26from typing import IO, TYPE_CHECKING, List, Optional, Union 

27 

28from google.protobuf import json_format 

29 

30from buildgrid._exceptions import InvalidArgumentError 

31from buildgrid._protos.buildgrid.v2.monitoring_pb2 import LogRecord, MetricRecord, BusMessage 

32 

33if TYPE_CHECKING: 

34 from asyncio.streams import StreamWriter 

35 

36 

37class MonitoringOutputType(Enum): 

38 # Standard output stream. 

39 STDOUT = 'stdout' 

40 # On-disk file. 

41 FILE = 'file' 

42 # UNIX domain socket. 

43 SOCKET = 'socket' 

44 # UDP IP:port 

45 UDP = 'udp' 

46 # Silent 

47 SILENT = 'silent' 

48 

49 

50class MonitoringOutputFormat(Enum): 

51 # Protobuf binary format. 

52 BINARY = 'binary' 

53 # JSON format. 

54 JSON = 'json' 

55 # StatsD format. Only metrics are kept - logs are dropped. 

56 STATSD = 'statsd' 

57 

58 

59class UdpWrapper: 

60 """ Wraps socket sendto() in write() so it can be used polymorphically """ 

61 

62 def __init__(self, endpoint_location): 

63 try: 

64 self._addr, self._port = endpoint_location.split(":") 

65 self._port = int(self._port) 

66 except ValueError as e: 

67 error_msg = f"udp endpoint-location {endpoint_location} does not have the form address:port" 

68 raise ValueError(error_msg) from e 

69 self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

70 

71 def write(self, message): 

72 self._socket.sendto(message, (self._addr, self._port)) 

73 

74 def close(self): 

75 return self._socket.close() 

76 

77 

78MonitoringEndpoint = Union[IO, UdpWrapper, "StreamWriter"] 

79 

80 

81class _MonitoringBus: 

82 """ Class representing a singleton monitoring bus. Only one should exist at a time (as _instance). """ 

83 _instance: Optional["_MonitoringBus"] = None # The type needs quotes because it's the same as the class 

84 _instance_lock = threading.Lock() 

85 

86 def __init__(self, endpoint_type=MonitoringOutputType.SOCKET, 

87 endpoint_location=None, 

88 metric_prefix="", 

89 serialisation_format=MonitoringOutputFormat.STATSD): 

90 self._logger = logging.getLogger(__name__) 

91 self.__event_loop = None 

92 self._streaming_process = None 

93 self._stop_streaming_worker = Event() 

94 

95 self.__message_queue = Queue() 

96 self.__sequence_number = 1 

97 

98 self.__output_location = None 

99 self.__async_output = False 

100 self.__json_output = False 

101 self.__statsd_output = False 

102 self.__print_output = False 

103 self.__udp_output = False 

104 self.__is_silent = False 

105 

106 if endpoint_type == MonitoringOutputType.FILE: 

107 self.__output_location = endpoint_location 

108 

109 elif endpoint_type == MonitoringOutputType.SOCKET: 

110 self.__output_location = endpoint_location 

111 self.__async_output = True 

112 

113 elif endpoint_type == MonitoringOutputType.STDOUT: 

114 self.__print_output = True 

115 

116 elif endpoint_type == MonitoringOutputType.UDP: 

117 self.__output_location = endpoint_location 

118 self.__udp_output = True 

119 

120 elif endpoint_type == MonitoringOutputType.SILENT: 

121 self.__is_silent = True 

122 

123 else: 

124 raise InvalidArgumentError( 

125 f"Invalid endpoint output type: [{endpoint_type}]") 

126 

127 self.__metric_prefix = metric_prefix 

128 

129 if serialisation_format == MonitoringOutputFormat.JSON: 

130 self.__json_output = True 

131 elif serialisation_format == MonitoringOutputFormat.STATSD: 

132 self.__statsd_output = True 

133 

134 # --- Public API --- 

135 

136 @property 

137 def is_enabled(self) -> bool: 

138 """Whether monitoring is enabled. 

139 

140 The send_record methods perform this check so clients don't need to 

141 check this before sending a record to the monitoring bus, but it is 

142 provided for convenience. """ 

143 return self._streaming_process is not None and self._streaming_process.is_alive() 

144 

145 @property 

146 def prints_records(self) -> bool: 

147 """Whether or not messages are printed to standard output.""" 

148 return self.__print_output 

149 

150 @property 

151 def is_silent(self) -> bool: 

152 """Whether or not this is a silent monitoring bus.""" 

153 return self.__is_silent 

154 

155 def start(self) -> None: 

156 """Starts the monitoring bus worker task.""" 

157 if self.__is_silent or self._streaming_process is not None: 

158 return 

159 

160 self._streaming_process = Process(target=self._streaming_worker) 

161 self._streaming_process.start() 

162 

163 def stop(self) -> None: 

164 """Cancels the monitoring bus worker task.""" 

165 if self.__is_silent or self._streaming_process is None: 

166 return 

167 

168 self._stop_streaming_worker.set() 

169 self._streaming_process.join() 

170 

171 async def prefix_record(self, record: MetricRecord) -> MetricRecord: 

172 """ Prefix the record's metric name. 

173 

174 The new metric name is built according to the following rules. Each element is separated 

175 with a dot (.): 

176 

177 1. The custom prefix specified in the configuration, if any. 

178 2. If this is an instance metric with a nonempty instance name, the instance name. 

179 3. If this is an instant metric (even with an empty instance name), "instance". 

180 4. The original record name. 

181 

182 For example, consider the "widgets-built" metric, and let's say we have a custom prefix of "mycompany". 

183 

184 If "widgets-built" is not an instanced metric: "mycompany.widgets-built". 

185 If "widgets-built" is an instanced metric in an empty instance: "mycompany.instance.widgets-built". 

186 If "widgets-built" is an instanced metric in an instance named "dev": "mycompany.dev.instance.widgets-built". 

187 

188 Args: 

189 record (Message): The record to prefix. 

190 """ 

191 instance_name = record.metadata.get('instance-name') 

192 if instance_name is not None: 

193 # This is an instance metric, so we'll add the instance name 

194 # to the prefix if it isn't empty 

195 if instance_name: 

196 instance_name = instance_name + "." 

197 

198 # Prefix the metric with "instance_name.instance." if the instance 

199 # name isn't empty, otherwise just "instance." 

200 record.name = f"{self.__metric_prefix}{instance_name}instance.{record.name}" 

201 

202 else: 

203 # Not an instance metric 

204 record.name = f"{self.__metric_prefix}{record.name}" 

205 

206 return record 

207 

208 async def send_record(self, record: MetricRecord) -> None: 

209 """Publishes a record onto the bus asynchronously. 

210 

211 Args: 

212 record (Message): The record to send. 

213 """ 

214 if not self.is_enabled: 

215 return 

216 

217 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

218 record = await self.prefix_record(record) 

219 

220 self.__message_queue.put(record) 

221 

222 def prefix_record_nowait(self, record: MetricRecord) -> MetricRecord: 

223 """ Prefix the record's metric name. This is the same as prefix_record, but called synchronously. 

224 

225 See the prefix_record docstring for notes on the prefixing rules. 

226 

227 Args: 

228 record (Message): The record to prefix. 

229 """ 

230 instance_name = record.metadata.get('instance-name') 

231 if instance_name is not None: 

232 # This is an instance metric, so we'll add the instance name 

233 # to the prefix if it isn't empty 

234 if instance_name: 

235 instance_name = instance_name + "." 

236 

237 # Prefix the metric with "instance_name.instance." if the instance 

238 # name isn't empty, otherwise just "instance." 

239 record.name = f"{self.__metric_prefix}{instance_name}instance.{record.name}" 

240 

241 else: 

242 # Not an instance metric 

243 record.name = f"{self.__metric_prefix}{record.name}" 

244 

245 return record 

246 

247 def send_record_nowait(self, record: MetricRecord) -> None: 

248 """Publishes a record onto the bus synchronously. 

249 

250 Args: 

251 record (Message): The record to send. 

252 """ 

253 if not self.is_enabled: 

254 return 

255 

256 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

257 record = self.prefix_record_nowait(record) 

258 

259 self.__message_queue.put_nowait(record) 

260 

261 # --- Private API --- 

262 

263 @staticmethod 

264 def _format_record_as_stasd_string(record: MetricRecord) -> str: 

265 """ Helper function to convert metrics to a string in the statsd format. 

266 

267 See https://github.com/statsd/statsd/blob/master/docs/metric_types.md for valid metric types. 

268 

269 Note that BuildGrid currently only supports Counters, Timers, and Gauges, and it has the custom 

270 Distribution type as an alias for Timers. 

271 

272 Args: 

273 record (Message): The record to convert. 

274 """ 

275 bucket = record.metadata.get("statsd-bucket") 

276 if bucket: 

277 record.name = f"{record.name}.{bucket}" 

278 if record.type == MetricRecord.COUNTER: 

279 if record.count is None: 

280 raise ValueError( 

281 f"COUNTER record {record.name} is missing a count") 

282 return f"{record.name}:{record.count}|c\n" 

283 elif record.type is MetricRecord.TIMER: 

284 if record.duration is None: 

285 raise ValueError( 

286 f"TIMER record {record.name} is missing a duration") 

287 return f"{record.name}:{record.duration.ToMilliseconds()}|ms\n" 

288 elif record.type is MetricRecord.DISTRIBUTION: 

289 if record.count is None: 

290 raise ValueError( 

291 f"DISTRIBUTION record {record.name} is missing a count") 

292 return f"{record.name}:{record.count}|ms\n" 

293 elif record.type is MetricRecord.GAUGE: 

294 if record.value is None: 

295 raise ValueError( 

296 f"GAUGE record {record.name} is missing a value") 

297 return f"{record.name}:{record.value}|g\n" 

298 raise ValueError("Unknown record type.") 

299 

300 def _streaming_worker(self) -> None: 

301 """Fetch records from the monitoring queue, and publish them. 

302 

303 This method loops until the `self._stop_streaming_worker` event is set. 

304 Intended to run in a subprocess, it fetches messages from the message 

305 queue in this class, formats the record appropriately, and publishes 

306 them to whatever output endpoints were specified in the configuration 

307 passed to this monitoring bus. 

308 

309 This method won't exit immediately when `self._stop_streaming_worker` 

310 is set. It may be waiting to fetch a message from the queue, which 

311 blocks for up to a second. It also needs to do some cleanup of the 

312 output endpoints once looping has finished. 

313 

314 """ 

315 

316 def __streaming_worker(end_points: List[MonitoringEndpoint]) -> bool: 

317 """Get a LogRecord or a MetricRecord, and publish it. 

318 

319 This function fetches the next record from the internal queue, 

320 formats it in the configured output style, and writes it to 

321 the endpoints provided in `end_points`. 

322 

323 If there is no record available within 1 second, or the record 

324 received wasn't a LogRecord or MetricRecord protobuf message, 

325 then this function returns False. Otherwise this returns True 

326 if publishing was successful (an exception will be raised if 

327 publishing goes wrong for some reason). 

328 

329 Args: 

330 end_points (List): The list of output endpoints to write 

331 formatted records to. 

332 

333 Returns: 

334 bool, indicating whether or not a record was written. 

335 

336 """ 

337 try: 

338 record = self.__message_queue.get(timeout=1) 

339 except Empty: 

340 return False 

341 

342 message = BusMessage() 

343 message.sequence_number = self.__sequence_number 

344 

345 if record.DESCRIPTOR is LogRecord.DESCRIPTOR: 

346 message.log_record.CopyFrom(record) 

347 

348 elif record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

349 message.metric_record.CopyFrom(record) 

350 

351 else: 

352 return False 

353 

354 if self.__json_output: 

355 blob_message = json_format.MessageToJson(message).encode() 

356 

357 for end_point in end_points: 

358 end_point.write(blob_message) 

359 

360 elif self.__statsd_output: 

361 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

362 statsd_message = _MonitoringBus._format_record_as_stasd_string( 

363 record) 

364 for end_point in end_points: 

365 end_point.write(statsd_message.encode()) 

366 

367 else: 

368 blob_size = ctypes.c_uint32(message.ByteSize()) 

369 blob_message = message.SerializeToString() 

370 

371 for end_point in end_points: 

372 end_point.write(bytes(blob_size)) # type: ignore 

373 end_point.write(blob_message) 

374 

375 return True 

376 

377 output_writers, output_file = [], None 

378 

379 async def __client_connected_callback(reader, writer) -> None: 

380 output_writers.append(writer) 

381 

382 async def _wait_closed(event, writer): 

383 try: 

384 await writer.wait_closed() 

385 finally: 

386 event.set() 

387 

388 self.__event_loop = asyncio.new_event_loop() 

389 

390 # In good circumstances we stay in the first iteration of this loop forever. 

391 # The loop exists so that the subprocess is more resilient to temporary 

392 # failures (e.g. failing to connect to a socket immediately on startup) 

393 while not self._stop_streaming_worker.is_set(): 

394 try: 

395 if self.__async_output and self.__output_location: 

396 async_done = threading.Event() 

397 

398 async def _async_output(): 

399 await asyncio.start_unix_server( 

400 __client_connected_callback, path=self.__output_location, 

401 loop=self.__event_loop) 

402 

403 while not self._stop_streaming_worker.is_set(): 

404 try: 

405 if __streaming_worker(output_writers): 

406 self.__sequence_number += 1 

407 

408 for writer in output_writers: 

409 await writer.drain() 

410 except asyncio.CancelledError: # pylint: disable=try-except-raise 

411 raise 

412 except Exception: 

413 self._logger.warning( 

414 "Caught exception when publishing metric", exc_info=True) 

415 async_done.set() 

416 

417 asyncio.ensure_future(_async_output(), loop=self.__event_loop) 

418 async_done.wait() 

419 

420 elif self.__udp_output and self.__output_location: 

421 output_writers.append(UdpWrapper(self.__output_location)) 

422 while not self._stop_streaming_worker.is_set(): 

423 try: 

424 if __streaming_worker(output_writers): 

425 self.__sequence_number += 1 

426 except Exception: 

427 self._logger.warning( 

428 "Caught exception when publishing metric", exc_info=True) 

429 

430 elif self.__output_location: 

431 with open(self.__output_location, mode='wb') as output_file: 

432 

433 output_writers.append(output_file) 

434 

435 while not self._stop_streaming_worker.is_set(): 

436 try: 

437 if __streaming_worker([output_file]): 

438 self.__sequence_number += 1 

439 

440 output_file.flush() 

441 except Exception: 

442 self._logger.warning( 

443 "Caught exception when publishing metric", exc_info=True) 

444 

445 elif self.__print_output: 

446 output_writers.append(sys.stdout.buffer) 

447 

448 while not self._stop_streaming_worker.is_set(): 

449 try: 

450 if __streaming_worker(output_writers): 

451 self.__sequence_number += 1 

452 except Exception: 

453 self._logger.warning( 

454 "Caught exception when publishing metric", exc_info=True) 

455 else: 

456 self._logger.error( 

457 "Unsupported monitoring configuration, metrics won't be published." 

458 f"output_location={self.__output_location}, " 

459 f"async_output={self.__async_output}, " 

460 f"udp_output={self.__udp_output}", 

461 f"print_output={self.__print_output}", 

462 exc_info=True 

463 ) 

464 raise InvalidArgumentError("Unsupported monitoring configuration") 

465 

466 except Exception: 

467 self._logger.warning( 

468 "Caught exception in metrics publisher loop, sleeping for 5s before retrying", 

469 exc_info=True) 

470 time.sleep(5) 

471 

472 # We exited the publishing loop, which means we've been told to shutdown 

473 # by the parent process. Clean up the output writers. 

474 if output_file is not None: 

475 output_file.close() 

476 

477 elif output_writers: 

478 for writer in output_writers: 

479 writer.close() 

480 if self.__async_output and self.__output_location: 

481 async_closed = threading.Event() 

482 asyncio.ensure_future(_wait_closed(async_closed, writer)) 

483 async_closed.wait() 

484 

485 

486def setup_monitoring_bus( 

487 endpoint_type: MonitoringOutputType = MonitoringOutputType.SOCKET, 

488 endpoint_location: str = None, 

489 metric_prefix: str = "", 

490 serialisation_format: MonitoringOutputFormat = MonitoringOutputFormat.STATSD) -> _MonitoringBus: 

491 """ Sets up the monitoring bus. 

492 

493 Throws an error if a non-silent monitoring bus has already been set up. However a silent monitoring bus 

494 can be overridden by a non-silent one. If a silent monitoring bus attempts to override a non-silent one, 

495 it is not overridden and the non-silent one is returned""" 

496 

497 with _MonitoringBus._instance_lock: 

498 if _MonitoringBus._instance is not None: 

499 # Don't override an existing monitoring bus with a silent one, and instead return 

500 # the already configured one 

501 if endpoint_type == MonitoringOutputType.SILENT: 

502 return _MonitoringBus._instance 

503 if not _MonitoringBus._instance.is_silent: 

504 raise ValueError("A non-silent monitoring bus has already been created and can't be overridden." 

505 "Please ensure that you are not attempting to call setup_monitoring_bus again.") 

506 # If we currently have a silent monitoring bus, there's no harm in reconfiguring to use a real one. 

507 _MonitoringBus._instance = _MonitoringBus( 

508 endpoint_type, endpoint_location, metric_prefix, serialisation_format) 

509 return _MonitoringBus._instance 

510 

511 

512def get_monitoring_bus() -> _MonitoringBus: 

513 """ Get the monitoring bus. 

514 

515 If a monitoring bus does not exist, a silent bus will be created and no metrics will be published. """ 

516 if _MonitoringBus._instance is None: 

517 logger = logging.getLogger(__name__) 

518 logger.info( 

519 "get_monitoring_bus() was called before setup_monitoring_bus; no metrics will be published.") 

520 return setup_monitoring_bus(MonitoringOutputType.SILENT) 

521 return _MonitoringBus._instance