Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/monitoring.py: 40.81%

272 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-04-15 14:01 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import asyncio 

17import ctypes 

18import logging 

19import os 

20import socket 

21import sys 

22import threading 

23import time 

24from contextvars import ContextVar 

25from enum import Enum 

26from multiprocessing import Event, Process, Queue 

27from queue import Empty 

28from typing import IO, TYPE_CHECKING, Any, List, Optional, Sequence, Union, cast 

29 

30from google.protobuf import json_format 

31 

32from buildgrid._exceptions import InvalidArgumentError 

33from buildgrid._protos.buildgrid.v2.monitoring_pb2 import BusMessage, LogRecord, MetricRecord 

34 

35if TYPE_CHECKING: 

36 from asyncio.streams import StreamWriter 

37 

38LOGGER = logging.getLogger(__name__) 

39 

40 

41class MonitoringOutputType(Enum): 

42 # Standard output stream. 

43 STDOUT = "stdout" 

44 # On-disk file. 

45 FILE = "file" 

46 # UNIX domain socket. 

47 SOCKET = "socket" 

48 # UDP IP:port 

49 UDP = "udp" 

50 # Silent 

51 SILENT = "silent" 

52 

53 

54class MonitoringOutputFormat(Enum): 

55 # Protobuf binary format. 

56 BINARY = "binary" 

57 # JSON format. 

58 JSON = "json" 

59 # StatsD format. Only metrics are kept - logs are dropped. 

60 STATSD = "statsd" 

61 

62 

63class StatsDTagFormat(Enum): 

64 NONE = "none" 

65 INFLUX_STATSD = "influx-statsd" 

66 DOG_STATSD = "dogstatsd" 

67 GRAPHITE = "graphite" 

68 

69 

70class UdpWrapper: 

71 """Wraps socket sendto() in write() so it can be used polymorphically""" 

72 

73 def __init__(self, endpoint_location: str) -> None: 

74 try: 

75 addr, port = endpoint_location.split(":") 

76 self._addr, self._port = addr, int(port) 

77 except ValueError as e: 

78 error_msg = f"udp endpoint-location {endpoint_location} does not have the form address:port" 

79 raise ValueError(error_msg) from e 

80 self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

81 

82 def write(self, message: bytes) -> None: 

83 self._socket.sendto(message, (self._addr, self._port)) 

84 

85 def close(self) -> None: 

86 return self._socket.close() 

87 

88 

89MonitoringEndpoint = Union[IO, UdpWrapper, "StreamWriter"] 

90 

91 

92EXCLUDED_METADATA_KEYS = ["instance-name", "statsd-bucket"] 

93 

94 

95class MonitoringBus: 

96 def __init__( 

97 self, 

98 endpoint_type: MonitoringOutputType = MonitoringOutputType.SOCKET, 

99 endpoint_location: Optional[str] = None, 

100 metric_prefix: str = "", 

101 serialisation_format: MonitoringOutputFormat = MonitoringOutputFormat.STATSD, 

102 tag_format: StatsDTagFormat = StatsDTagFormat.NONE, 

103 ) -> None: 

104 self.__event_loop: Optional[asyncio.AbstractEventLoop] = None 

105 self._streaming_process: Optional[Process] = None 

106 self._stop_streaming_worker = Event() 

107 self._streaming_process_ppid: Optional[int] = None 

108 

109 self.__message_queue: "Queue[Any]" = Queue() 

110 self.__sequence_number = 1 

111 

112 self.__output_location = None 

113 self.__async_output = False 

114 self.__json_output = False 

115 self.__statsd_output = False 

116 self.__print_output = False 

117 self.__udp_output = False 

118 self.__is_silent = False 

119 

120 if endpoint_type == MonitoringOutputType.FILE: 

121 self.__output_location = endpoint_location 

122 

123 elif endpoint_type == MonitoringOutputType.SOCKET: 

124 self.__output_location = endpoint_location 

125 self.__async_output = True 

126 

127 elif endpoint_type == MonitoringOutputType.STDOUT: 

128 self.__print_output = True 

129 

130 elif endpoint_type == MonitoringOutputType.UDP: 

131 self.__output_location = endpoint_location 

132 self.__udp_output = True 

133 

134 elif endpoint_type == MonitoringOutputType.SILENT: 

135 self.__is_silent = True 

136 

137 else: 

138 raise InvalidArgumentError(f"Invalid endpoint output type: [{endpoint_type}]") 

139 

140 self.__metric_prefix = metric_prefix 

141 

142 if serialisation_format == MonitoringOutputFormat.JSON: 

143 self.__json_output = True 

144 elif serialisation_format == MonitoringOutputFormat.STATSD: 

145 self.__statsd_output = True 

146 

147 self.__tag_format = tag_format 

148 

149 # --- Public API --- 

150 

151 @property 

152 def is_enabled(self) -> bool: 

153 """Whether monitoring is enabled. 

154 

155 The send_record methods perform this check so clients don't need to 

156 check this before sending a record to the monitoring bus, but it is 

157 provided for convenience.""" 

158 return self._streaming_process is not None and self._streaming_process.is_alive() 

159 

160 @property 

161 def prints_records(self) -> bool: 

162 """Whether or not messages are printed to standard output.""" 

163 return self.__print_output 

164 

165 @property 

166 def is_silent(self) -> bool: 

167 """Whether or not this is a silent monitoring bus.""" 

168 return self.__is_silent 

169 

170 def process_should_exit(self) -> bool: 

171 """Whether the streaming worker process should exit. 

172 

173 The streaming worker process should exit if explicitly told to by the 

174 _stop_streaming_worker event or if it has been orphaned by the parent. 

175 """ 

176 

177 try: 

178 assert self._streaming_process_ppid, "Streaming process pid access before initialization" 

179 os.kill(self._streaming_process_ppid, 0) 

180 return self._stop_streaming_worker.is_set() 

181 except ProcessLookupError: 

182 LOGGER.info("Monitoring bus process was orphaned, exiting") 

183 return True 

184 except Exception as e: 

185 LOGGER.info(f"Exception when checking if parent process of monitoring bus is still alive, exiting: {e}") 

186 return True 

187 

188 def __enter__(self) -> "MonitoringBus": 

189 self.start() 

190 return self 

191 

192 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

193 self.stop() 

194 

195 def start(self) -> None: 

196 """Starts the monitoring bus worker task.""" 

197 if self.__is_silent or self._streaming_process is not None: 

198 return 

199 

200 self._streaming_process = Process(target=self._streaming_worker) 

201 self._streaming_process_ppid = os.getpid() 

202 self._streaming_process.start() 

203 

204 def stop(self) -> None: 

205 """Cancels the monitoring bus worker task.""" 

206 if self.__is_silent or self._streaming_process is None: 

207 return 

208 

209 self._stop_streaming_worker.set() 

210 self._streaming_process.join() 

211 

212 def prefix_record_nowait(self, record: MetricRecord) -> MetricRecord: 

213 """Prefix the record's metric name. This is the same as prefix_record, but called synchronously. 

214 

215 See the prefix_record docstring for notes on the prefixing rules. 

216 

217 Args: 

218 record (Message): The record to prefix. 

219 """ 

220 instance_name = record.metadata.get("instance-name") 

221 if instance_name is not None: 

222 # This is an instance metric, so we'll add the instance name 

223 # to the prefix if it isn't empty 

224 if instance_name: 

225 instance_name = instance_name + "." 

226 

227 # Prefix the metric with "instance_name.instance." if the instance 

228 # name isn't empty, otherwise just "instance." 

229 record.name = f"{self.__metric_prefix}{instance_name}instance.{record.name}" 

230 

231 else: 

232 # Not an instance metric 

233 record.name = f"{self.__metric_prefix}{record.name}" 

234 

235 return record 

236 

237 def send_record_nowait(self, record: Union[MetricRecord, LogRecord]) -> None: 

238 """Publishes a record onto the bus synchronously. 

239 

240 Args: 

241 record (Message): The record to send. 

242 """ 

243 if not self.is_enabled: 

244 return 

245 

246 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

247 record = self.prefix_record_nowait(cast(MetricRecord, record)) 

248 

249 self.__message_queue.put_nowait(record) 

250 

251 # --- Private API --- 

252 def _format_statsd_with_tags(self, name: str, tags: List[str], value: Union[int, float], metric_type: str) -> str: 

253 if not tags or self.__tag_format == StatsDTagFormat.NONE: 

254 return f"{name}:{value}|{metric_type}\n" 

255 

256 if self.__tag_format == StatsDTagFormat.INFLUX_STATSD: 

257 tag_string = ",".join(tags) 

258 return f"{name},{tag_string}:{value}|{metric_type}\n" 

259 

260 elif self.__tag_format == StatsDTagFormat.DOG_STATSD: 

261 tag_string = ",".join(tags) 

262 return f"{name}:{value}|{metric_type}|#{tag_string}\n" 

263 

264 elif self.__tag_format == StatsDTagFormat.GRAPHITE: 

265 tag_string = ";".join(tags) 

266 return f"{name};{tag_string}:{value}|{metric_type}\n" 

267 

268 else: 

269 return f"{name}:{value}|{metric_type}\n" 

270 

271 def _format_record_as_statsd_string(self, record: MetricRecord) -> str: 

272 """Helper function to convert metrics to a string in the statsd format. 

273 

274 See https://github.com/statsd/statsd/blob/master/docs/metric_types.md for valid metric types. 

275 

276 Note that BuildGrid currently only supports Counters, Timers, and Gauges, and it has the custom 

277 Distribution type as an alias for Timers. 

278 

279 Args: 

280 record (Message): The record to convert. 

281 """ 

282 bucket = record.metadata.get("statsd-bucket") 

283 

284 tag_assignment_symbol = "=" 

285 if self.__tag_format == StatsDTagFormat.DOG_STATSD: 

286 tag_assignment_symbol = ":" 

287 tags = [ 

288 f"{key}{tag_assignment_symbol}{value}" 

289 for key, value in record.metadata.items() 

290 if key not in EXCLUDED_METADATA_KEYS and str(value) != "" 

291 ] 

292 

293 if bucket: 

294 record.name = f"{record.name}.{bucket}" 

295 if record.type == MetricRecord.COUNTER: 

296 if record.count is None: 

297 raise ValueError(f"COUNTER record {record.name} is missing a count") 

298 return self._format_statsd_with_tags(record.name, tags, record.count, "c") 

299 elif record.type is MetricRecord.TIMER: 

300 if record.duration is None: 

301 raise ValueError(f"TIMER record {record.name} is missing a duration") 

302 return self._format_statsd_with_tags(record.name, tags, record.duration.ToMilliseconds(), "ms") 

303 elif record.type is MetricRecord.DISTRIBUTION: 

304 if record.count is None: 

305 raise ValueError(f"DISTRIBUTION record {record.name} is missing a count") 

306 return self._format_statsd_with_tags(record.name, tags, record.count, "ms") 

307 elif record.type is MetricRecord.GAUGE: 

308 if record.value is None: 

309 raise ValueError(f"GAUGE record {record.name} is missing a value") 

310 return self._format_statsd_with_tags(record.name, tags, record.value, "g") 

311 raise ValueError("Unknown record type.") 

312 

313 def _streaming_worker(self) -> None: 

314 """Fetch records from the monitoring queue, and publish them. 

315 

316 This method loops until the `self._stop_streaming_worker` event is set. 

317 Intended to run in a subprocess, it fetches messages from the message 

318 queue in this class, formats the record appropriately, and publishes 

319 them to whatever output endpoints were specified in the configuration 

320 passed to this monitoring bus. 

321 

322 This method won't exit immediately when `self._stop_streaming_worker` 

323 is set. It may be waiting to fetch a message from the queue, which 

324 blocks for up to a second. It also needs to do some cleanup of the 

325 output endpoints once looping has finished. 

326 

327 """ 

328 

329 def __streaming_worker(end_points: Sequence[MonitoringEndpoint]) -> bool: 

330 """Get a LogRecord or a MetricRecord, and publish it. 

331 

332 This function fetches the next record from the internal queue, 

333 formats it in the configured output style, and writes it to 

334 the endpoints provided in `end_points`. 

335 

336 If there is no record available within 1 second, or the record 

337 received wasn't a LogRecord or MetricRecord protobuf message, 

338 then this function returns False. Otherwise this returns True 

339 if publishing was successful (an exception will be raised if 

340 publishing goes wrong for some reason). 

341 

342 Args: 

343 end_points (List): The list of output endpoints to write 

344 formatted records to. 

345 

346 Returns: 

347 bool, indicating whether or not a record was written. 

348 

349 """ 

350 try: 

351 record = self.__message_queue.get(timeout=1) 

352 except Empty: 

353 return False 

354 

355 message = BusMessage() 

356 message.sequence_number = self.__sequence_number 

357 

358 if record.DESCRIPTOR is LogRecord.DESCRIPTOR: 

359 message.log_record.CopyFrom(record) 

360 

361 elif record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

362 message.metric_record.CopyFrom(record) 

363 

364 else: 

365 return False 

366 

367 if self.__json_output: 

368 blob_message = json_format.MessageToJson(message).encode() 

369 

370 for end_point in end_points: 

371 end_point.write(blob_message) 

372 

373 elif self.__statsd_output: 

374 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

375 statsd_message = self._format_record_as_statsd_string(record) 

376 for end_point in end_points: 

377 end_point.write(statsd_message.encode()) 

378 

379 else: 

380 blob_size = ctypes.c_uint32(message.ByteSize()) 

381 blob_message = message.SerializeToString() 

382 

383 for end_point in end_points: 

384 end_point.write(bytes(blob_size)) 

385 end_point.write(blob_message) 

386 

387 return True 

388 

389 # TODO clean this up. Way too much happening to understand it well. 

390 output_writers: List[Any] = [] 

391 output_file: Any = None 

392 

393 async def __client_connected_callback(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: 

394 output_writers.append(writer) 

395 

396 # TODO clean this up. Total mess of what type this should or can be. 

397 async def _wait_closed(event: threading.Event, writer: Any) -> None: 

398 try: 

399 await writer.wait_closed() 

400 finally: 

401 event.set() 

402 

403 self.__event_loop = asyncio.new_event_loop() 

404 

405 # In good circumstances we stay in the first iteration of this loop forever. 

406 # The loop exists so that the subprocess is more resilient to temporary 

407 # failures (e.g. failing to connect to a socket immediately on startup) 

408 while not self.process_should_exit(): 

409 try: 

410 if self.__async_output and self.__output_location: 

411 async_done = threading.Event() 

412 

413 async def _async_output() -> None: 

414 await asyncio.start_unix_server( 

415 __client_connected_callback, path=self.__output_location, loop=self.__event_loop 

416 ) 

417 

418 while not self.process_should_exit(): 

419 try: 

420 if __streaming_worker(output_writers): 

421 self.__sequence_number += 1 

422 

423 for writer in output_writers: 

424 await writer.drain() 

425 except asyncio.CancelledError: 

426 raise 

427 except Exception: 

428 LOGGER.warning("Caught exception when publishing metric", exc_info=True) 

429 async_done.set() 

430 

431 asyncio.ensure_future(_async_output(), loop=self.__event_loop) 

432 async_done.wait() 

433 

434 elif self.__udp_output and self.__output_location: 

435 output_writers.append(UdpWrapper(self.__output_location)) 

436 while not self.process_should_exit(): 

437 try: 

438 if __streaming_worker(output_writers): 

439 self.__sequence_number += 1 

440 except Exception: 

441 LOGGER.warning("Caught exception when publishing metric", exc_info=True) 

442 

443 elif self.__output_location: 

444 with open(self.__output_location, mode="wb") as output_file: 

445 output_writers.append(output_file) 

446 

447 while not self.process_should_exit(): 

448 try: 

449 if __streaming_worker([output_file]): 

450 self.__sequence_number += 1 

451 

452 output_file.flush() 

453 except Exception: 

454 LOGGER.warning("Caught exception when publishing metric", exc_info=True) 

455 

456 elif self.__print_output: 

457 output_writers.append(sys.stdout.buffer) 

458 

459 while not self.process_should_exit(): 

460 try: 

461 if __streaming_worker(output_writers): 

462 self.__sequence_number += 1 

463 except Exception: 

464 LOGGER.warning("Caught exception when publishing metric", exc_info=True) 

465 else: 

466 LOGGER.error( 

467 "Unsupported monitoring configuration, metrics won't be published." 

468 f"output_location={self.__output_location}, " 

469 f"async_output={self.__async_output}, " 

470 f"udp_output={self.__udp_output}", 

471 f"print_output={self.__print_output}", 

472 exc_info=True, 

473 ) 

474 raise InvalidArgumentError("Unsupported monitoring configuration") 

475 

476 except Exception: 

477 LOGGER.warning( 

478 "Caught exception in metrics publisher loop, sleeping for 5s before retrying", exc_info=True 

479 ) 

480 time.sleep(5) 

481 

482 # We exited the publishing loop, which means we've been told to shutdown 

483 # by the parent process. Clean up the output writers. 

484 if output_file is not None: 

485 output_file.close() 

486 

487 elif output_writers: 

488 for writer in output_writers: 

489 writer.close() 

490 if self.__async_output and self.__output_location: 

491 async_closed = threading.Event() 

492 asyncio.ensure_future(_wait_closed(async_closed, writer)) 

493 async_closed.wait() 

494 

495 

496MonitoringContext: "ContextVar[MonitoringBus]" = ContextVar( 

497 "MonitoringBus", default=MonitoringBus(MonitoringOutputType.SILENT) 

498) 

499 

500 

501def set_monitoring_bus(monitoring_bus: MonitoringBus) -> None: 

502 MonitoringContext.set(monitoring_bus) 

503 

504 

505def get_monitoring_bus() -> MonitoringBus: 

506 return MonitoringContext.get()