Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/monitoring.py: 40.07%

267 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-21 15:45 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import asyncio 

17import ctypes 

18import os 

19import socket 

20import sys 

21import threading 

22import time 

23from contextvars import ContextVar 

24from enum import Enum 

25from multiprocessing import Event, Process, Queue 

26from queue import Empty 

27from typing import IO, TYPE_CHECKING, Any, Sequence, Union, cast 

28 

29from google.protobuf import json_format 

30 

31from buildgrid._protos.buildgrid.v2.monitoring_pb2 import BusMessage, LogRecord, MetricRecord 

32from buildgrid.server.exceptions import InvalidArgumentError 

33from buildgrid.server.logging import buildgrid_logger 

34 

35if TYPE_CHECKING: 

36 from asyncio.streams import StreamWriter 

37 

38LOGGER = buildgrid_logger(__name__) 

39 

40 

41class MonitoringOutputType(Enum): 

42 # Standard output stream. 

43 STDOUT = "stdout" 

44 # On-disk file. 

45 FILE = "file" 

46 # UNIX domain socket. 

47 SOCKET = "socket" 

48 # UDP IP:port 

49 UDP = "udp" 

50 # Silent 

51 SILENT = "silent" 

52 

53 

54class MonitoringOutputFormat(Enum): 

55 # Protobuf binary format. 

56 BINARY = "binary" 

57 # JSON format. 

58 JSON = "json" 

59 # StatsD format. Only metrics are kept - logs are dropped. 

60 STATSD = "statsd" 

61 

62 

63class StatsDTagFormat(Enum): 

64 NONE = "none" 

65 INFLUX_STATSD = "influx-statsd" 

66 DOG_STATSD = "dogstatsd" 

67 GRAPHITE = "graphite" 

68 

69 

70class UdpWrapper: 

71 """Wraps socket sendto() in write() so it can be used polymorphically""" 

72 

73 def __init__(self, endpoint_location: str) -> None: 

74 try: 

75 addr, port = endpoint_location.split(":") 

76 self._addr, self._port = addr, int(port) 

77 except ValueError as e: 

78 error_msg = f"udp endpoint-location {endpoint_location} does not have the form address:port" 

79 raise ValueError(error_msg) from e 

80 self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

81 

82 def write(self, message: bytes) -> None: 

83 self._socket.sendto(message, (self._addr, self._port)) 

84 

85 def close(self) -> None: 

86 return self._socket.close() 

87 

88 

89MonitoringEndpoint = Union[IO[bytes], UdpWrapper, "StreamWriter"] 

90 

91 

92class MonitoringBus: 

93 def __init__( 

94 self, 

95 endpoint_type: MonitoringOutputType = MonitoringOutputType.SOCKET, 

96 endpoint_location: str | None = None, 

97 metric_prefix: str = "", 

98 serialisation_format: MonitoringOutputFormat = MonitoringOutputFormat.STATSD, 

99 tag_format: StatsDTagFormat = StatsDTagFormat.INFLUX_STATSD, 

100 additional_tags: dict[str, str] | None = None, 

101 ) -> None: 

102 self.__event_loop: asyncio.AbstractEventLoop | None = None 

103 self._streaming_process: Process | None = None 

104 self._stop_streaming_worker = Event() 

105 self._streaming_process_ppid: int | None = None 

106 

107 self.__message_queue: "Queue[Any]" = Queue() 

108 self.__sequence_number = 1 

109 

110 self.__output_location = None 

111 self.__async_output = False 

112 self.__json_output = False 

113 self.__statsd_output = False 

114 self.__print_output = False 

115 self.__udp_output = False 

116 self.__is_silent = False 

117 

118 if endpoint_type == MonitoringOutputType.FILE: 

119 self.__output_location = endpoint_location 

120 

121 elif endpoint_type == MonitoringOutputType.SOCKET: 

122 self.__output_location = endpoint_location 

123 self.__async_output = True 

124 

125 elif endpoint_type == MonitoringOutputType.STDOUT: 

126 self.__print_output = True 

127 

128 elif endpoint_type == MonitoringOutputType.UDP: 

129 self.__output_location = endpoint_location 

130 self.__udp_output = True 

131 

132 elif endpoint_type == MonitoringOutputType.SILENT: 

133 self.__is_silent = True 

134 

135 else: 

136 raise InvalidArgumentError(f"Invalid endpoint output type: [{endpoint_type}]") 

137 

138 self.__metric_prefix = metric_prefix 

139 

140 if serialisation_format == MonitoringOutputFormat.JSON: 

141 self.__json_output = True 

142 elif serialisation_format == MonitoringOutputFormat.STATSD: 

143 self.__statsd_output = True 

144 

145 self.__tag_format = tag_format 

146 self._additional_tags = additional_tags or {} 

147 

148 # --- Public API --- 

149 

150 @property 

151 def is_enabled(self) -> bool: 

152 """Whether monitoring is enabled. 

153 

154 The send_record methods perform this check so clients don't need to 

155 check this before sending a record to the monitoring bus, but it is 

156 provided for convenience.""" 

157 return self._streaming_process is not None and self._streaming_process.is_alive() 

158 

159 @property 

160 def prints_records(self) -> bool: 

161 """Whether or not messages are printed to standard output.""" 

162 return self.__print_output 

163 

164 @property 

165 def is_silent(self) -> bool: 

166 """Whether or not this is a silent monitoring bus.""" 

167 return self.__is_silent 

168 

169 def process_should_exit(self) -> bool: 

170 """Whether the streaming worker process should exit. 

171 

172 The streaming worker process should exit if explicitly told to by the 

173 _stop_streaming_worker event or if it has been orphaned by the parent. 

174 """ 

175 

176 try: 

177 assert self._streaming_process_ppid, "Streaming process pid access before initialization" 

178 os.kill(self._streaming_process_ppid, 0) 

179 return self._stop_streaming_worker.is_set() 

180 except ProcessLookupError: 

181 LOGGER.info("Monitoring bus process was orphaned, exiting.") 

182 return True 

183 except Exception as e: 

184 LOGGER.info(f"Exception when checking if parent process of monitoring bus is still alive, exiting: {e}") 

185 return True 

186 

187 def __enter__(self) -> "MonitoringBus": 

188 self.start() 

189 return self 

190 

191 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

192 self.stop() 

193 

194 def start(self) -> None: 

195 """Starts the monitoring bus worker task.""" 

196 if self.__is_silent or self._streaming_process is not None: 

197 return 

198 

199 self._streaming_process = Process(target=self._streaming_worker) 

200 self._streaming_process_ppid = os.getpid() 

201 self._streaming_process.start() 

202 

203 def stop(self) -> None: 

204 """Cancels the monitoring bus worker task.""" 

205 if self.__is_silent or self._streaming_process is None: 

206 return 

207 

208 self._stop_streaming_worker.set() 

209 self._streaming_process.join() 

210 

211 def prefix_record_nowait(self, record: MetricRecord) -> MetricRecord: 

212 """Prefix the record's metric name. This is the same as prefix_record, but called synchronously. 

213 

214 See the prefix_record docstring for notes on the prefixing rules. 

215 

216 Args: 

217 record (Message): The record to prefix. 

218 """ 

219 record.name = f"{self.__metric_prefix}{record.name}" 

220 return record 

221 

222 def send_record_nowait(self, record: MetricRecord | LogRecord) -> None: 

223 """Publishes a record onto the bus synchronously. 

224 

225 Args: 

226 record (Message): The record to send. 

227 """ 

228 if not self.is_enabled: 

229 return 

230 

231 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

232 record = self.prefix_record_nowait(cast(MetricRecord, record)) 

233 

234 self.__message_queue.put_nowait(record) 

235 

236 # --- Private API --- 

237 def _format_statsd_with_tags(self, name: str, tags: list[str], value: int | float, metric_type: str) -> str: 

238 if not tags or self.__tag_format == StatsDTagFormat.NONE: 

239 return f"{name}:{value}|{metric_type}\n" 

240 

241 if self.__tag_format == StatsDTagFormat.INFLUX_STATSD: 

242 tag_string = ",".join(tags) 

243 return f"{name},{tag_string}:{value}|{metric_type}\n" 

244 

245 elif self.__tag_format == StatsDTagFormat.DOG_STATSD: 

246 tag_string = ",".join(tags) 

247 return f"{name}:{value}|{metric_type}|#{tag_string}\n" 

248 

249 elif self.__tag_format == StatsDTagFormat.GRAPHITE: 

250 tag_string = ";".join(tags) 

251 return f"{name};{tag_string}:{value}|{metric_type}\n" 

252 

253 else: 

254 return f"{name}:{value}|{metric_type}\n" 

255 

256 def _format_record_as_statsd_string(self, record: MetricRecord) -> str: 

257 """Helper function to convert metrics to a string in the statsd format. 

258 

259 See https://github.com/statsd/statsd/blob/master/docs/metric_types.md for valid metric types. 

260 

261 Note that BuildGrid currently only supports Counters, Timers, and Gauges, and it has the custom 

262 Distribution type as an alias for Timers. 

263 

264 Args: 

265 record (Message): The record to convert. 

266 """ 

267 

268 tag_assignment_symbol = "=" 

269 if self.__tag_format == StatsDTagFormat.DOG_STATSD: 

270 tag_assignment_symbol = ":" 

271 

272 for key, value in self._additional_tags.items(): 

273 if key not in record.metadata: 

274 record.metadata[key] = value 

275 

276 tags = [ 

277 f"{key}{tag_assignment_symbol}{record.metadata[key]}" 

278 for key in sorted(record.metadata.keys()) 

279 if str(record.metadata[key]) != "" 

280 ] 

281 

282 if record.type == MetricRecord.COUNTER: 

283 if record.count is None: 

284 raise ValueError(f"COUNTER record {record.name} is missing a count") 

285 return self._format_statsd_with_tags(record.name, tags, record.count, "c") 

286 elif record.type is MetricRecord.TIMER: 

287 if record.duration is None: 

288 raise ValueError(f"TIMER record {record.name} is missing a duration") 

289 return self._format_statsd_with_tags(record.name, tags, record.duration.ToMilliseconds(), "ms") 

290 elif record.type is MetricRecord.DISTRIBUTION: 

291 if record.count is None: 

292 raise ValueError(f"DISTRIBUTION record {record.name} is missing a count") 

293 return self._format_statsd_with_tags(record.name, tags, record.count, "ms") 

294 elif record.type is MetricRecord.GAUGE: 

295 if record.value is None: 

296 raise ValueError(f"GAUGE record {record.name} is missing a value") 

297 return self._format_statsd_with_tags(record.name, tags, record.value, "g") 

298 raise ValueError("Unknown record type.") 

299 

300 def _streaming_worker(self) -> None: 

301 """Fetch records from the monitoring queue, and publish them. 

302 

303 This method loops until the `self._stop_streaming_worker` event is set. 

304 Intended to run in a subprocess, it fetches messages from the message 

305 queue in this class, formats the record appropriately, and publishes 

306 them to whatever output endpoints were specified in the configuration 

307 passed to this monitoring bus. 

308 

309 This method won't exit immediately when `self._stop_streaming_worker` 

310 is set. It may be waiting to fetch a message from the queue, which 

311 blocks for up to a second. It also needs to do some cleanup of the 

312 output endpoints once looping has finished. 

313 

314 """ 

315 

316 def __streaming_worker(end_points: Sequence[MonitoringEndpoint]) -> bool: 

317 """Get a LogRecord or a MetricRecord, and publish it. 

318 

319 This function fetches the next record from the internal queue, 

320 formats it in the configured output style, and writes it to 

321 the endpoints provided in `end_points`. 

322 

323 If there is no record available within 1 second, or the record 

324 received wasn't a LogRecord or MetricRecord protobuf message, 

325 then this function returns False. Otherwise this returns True 

326 if publishing was successful (an exception will be raised if 

327 publishing goes wrong for some reason). 

328 

329 Args: 

330 end_points (List): The list of output endpoints to write 

331 formatted records to. 

332 

333 Returns: 

334 bool, indicating whether or not a record was written. 

335 

336 """ 

337 try: 

338 record = self.__message_queue.get(timeout=1) 

339 except Empty: 

340 return False 

341 

342 message = BusMessage() 

343 message.sequence_number = self.__sequence_number 

344 

345 if record.DESCRIPTOR is LogRecord.DESCRIPTOR: 

346 message.log_record.CopyFrom(record) 

347 

348 elif record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

349 message.metric_record.CopyFrom(record) 

350 

351 else: 

352 return False 

353 

354 if self.__json_output: 

355 blob_message = json_format.MessageToJson(message).encode() 

356 

357 for end_point in end_points: 

358 end_point.write(blob_message) 

359 

360 elif self.__statsd_output: 

361 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

362 statsd_message = self._format_record_as_statsd_string(record) 

363 for end_point in end_points: 

364 end_point.write(statsd_message.encode()) 

365 

366 else: 

367 blob_size = ctypes.c_uint32(message.ByteSize()) 

368 blob_message = message.SerializeToString() 

369 

370 for end_point in end_points: 

371 end_point.write(bytes(blob_size)) 

372 end_point.write(blob_message) 

373 

374 return True 

375 

376 # TODO clean this up. Way too much happening to understand it well. 

377 output_writers: list[Any] = [] 

378 output_file: Any = None 

379 

380 async def __client_connected_callback(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: 

381 output_writers.append(writer) 

382 

383 # TODO clean this up. Total mess of what type this should or can be. 

384 async def _wait_closed(event: threading.Event, writer: Any) -> None: 

385 try: 

386 await writer.wait_closed() 

387 finally: 

388 event.set() 

389 

390 self.__event_loop = asyncio.new_event_loop() 

391 

392 # In good circumstances we stay in the first iteration of this loop forever. 

393 # The loop exists so that the subprocess is more resilient to temporary 

394 # failures (e.g. failing to connect to a socket immediately on startup) 

395 while not self.process_should_exit(): 

396 try: 

397 if self.__async_output and self.__output_location: 

398 async_done = threading.Event() 

399 

400 async def _async_output() -> None: 

401 await asyncio.start_unix_server( 

402 __client_connected_callback, path=self.__output_location, loop=self.__event_loop 

403 ) 

404 

405 while not self.process_should_exit(): 

406 try: 

407 if __streaming_worker(output_writers): 

408 self.__sequence_number += 1 

409 

410 for writer in output_writers: 

411 await writer.drain() 

412 except asyncio.CancelledError: 

413 raise 

414 except Exception: 

415 LOGGER.warning("Caught exception when publishing metric.", exc_info=True) 

416 async_done.set() 

417 

418 asyncio.ensure_future(_async_output(), loop=self.__event_loop) 

419 async_done.wait() 

420 

421 elif self.__udp_output and self.__output_location: 

422 output_writers.append(UdpWrapper(self.__output_location)) 

423 while not self.process_should_exit(): 

424 try: 

425 if __streaming_worker(output_writers): 

426 self.__sequence_number += 1 

427 except Exception: 

428 LOGGER.warning("Caught exception when publishing metric.", exc_info=True) 

429 

430 elif self.__output_location: 

431 with open(self.__output_location, mode="wb") as output_file: 

432 output_writers.append(output_file) 

433 

434 while not self.process_should_exit(): 

435 try: 

436 if __streaming_worker([output_file]): 

437 self.__sequence_number += 1 

438 

439 output_file.flush() 

440 except Exception: 

441 LOGGER.warning("Caught exception when publishing metric.", exc_info=True) 

442 

443 elif self.__print_output: 

444 output_writers.append(sys.stdout.buffer) 

445 

446 while not self.process_should_exit(): 

447 try: 

448 if __streaming_worker(output_writers): 

449 self.__sequence_number += 1 

450 except Exception: 

451 LOGGER.warning("Caught exception when publishing metric.", exc_info=True) 

452 else: 

453 LOGGER.error( 

454 "Unsupported monitoring configuration, metrics won't be published.", 

455 tags=dict( 

456 output_location=self.__output_location, 

457 async_output=self.__async_output, 

458 udp_output=self.__udp_output, 

459 print_output=self.__print_output, 

460 ), 

461 exc_info=True, 

462 ) 

463 raise InvalidArgumentError("Unsupported monitoring configuration") 

464 

465 except Exception: 

466 LOGGER.warning( 

467 "Caught exception in metrics publisher loop, sleeping for 5s before retrying.", exc_info=True 

468 ) 

469 time.sleep(5) 

470 

471 # We exited the publishing loop, which means we've been told to shutdown 

472 # by the parent process. Clean up the output writers. 

473 if output_file is not None: 

474 output_file.close() 

475 

476 elif output_writers: 

477 for writer in output_writers: 

478 writer.close() 

479 if self.__async_output and self.__output_location: 

480 async_closed = threading.Event() 

481 asyncio.ensure_future(_wait_closed(async_closed, writer)) 

482 async_closed.wait() 

483 

484 

485MonitoringContext: "ContextVar[MonitoringBus]" = ContextVar( 

486 "MonitoringBus", default=MonitoringBus(MonitoringOutputType.SILENT) 

487) 

488 

489 

490def set_monitoring_bus(monitoring_bus: MonitoringBus) -> None: 

491 MonitoringContext.set(monitoring_bus) 

492 

493 

494def get_monitoring_bus() -> MonitoringBus: 

495 return MonitoringContext.get()