Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/monitoring.py: 40.07%
267 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import asyncio
17import ctypes
18import os
19import socket
20import sys
21import threading
22import time
23from contextvars import ContextVar
24from enum import Enum
25from multiprocessing import Event, Process, Queue
26from queue import Empty
27from typing import IO, TYPE_CHECKING, Any, Sequence, Union, cast
29from google.protobuf import json_format
31from buildgrid._protos.buildgrid.v2.monitoring_pb2 import BusMessage, LogRecord, MetricRecord
32from buildgrid.server.exceptions import InvalidArgumentError
33from buildgrid.server.logging import buildgrid_logger
35if TYPE_CHECKING:
36 from asyncio.streams import StreamWriter
38LOGGER = buildgrid_logger(__name__)
41class MonitoringOutputType(Enum):
42 # Standard output stream.
43 STDOUT = "stdout"
44 # On-disk file.
45 FILE = "file"
46 # UNIX domain socket.
47 SOCKET = "socket"
48 # UDP IP:port
49 UDP = "udp"
50 # Silent
51 SILENT = "silent"
54class MonitoringOutputFormat(Enum):
55 # Protobuf binary format.
56 BINARY = "binary"
57 # JSON format.
58 JSON = "json"
59 # StatsD format. Only metrics are kept - logs are dropped.
60 STATSD = "statsd"
63class StatsDTagFormat(Enum):
64 NONE = "none"
65 INFLUX_STATSD = "influx-statsd"
66 DOG_STATSD = "dogstatsd"
67 GRAPHITE = "graphite"
70class UdpWrapper:
71 """Wraps socket sendto() in write() so it can be used polymorphically"""
73 def __init__(self, endpoint_location: str) -> None:
74 try:
75 addr, port = endpoint_location.split(":")
76 self._addr, self._port = addr, int(port)
77 except ValueError as e:
78 error_msg = f"udp endpoint-location {endpoint_location} does not have the form address:port"
79 raise ValueError(error_msg) from e
80 self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
82 def write(self, message: bytes) -> None:
83 self._socket.sendto(message, (self._addr, self._port))
85 def close(self) -> None:
86 return self._socket.close()
89MonitoringEndpoint = Union[IO[bytes], UdpWrapper, "StreamWriter"]
92class MonitoringBus:
93 def __init__(
94 self,
95 endpoint_type: MonitoringOutputType = MonitoringOutputType.SOCKET,
96 endpoint_location: str | None = None,
97 metric_prefix: str = "",
98 serialisation_format: MonitoringOutputFormat = MonitoringOutputFormat.STATSD,
99 tag_format: StatsDTagFormat = StatsDTagFormat.INFLUX_STATSD,
100 additional_tags: dict[str, str] | None = None,
101 ) -> None:
102 self.__event_loop: asyncio.AbstractEventLoop | None = None
103 self._streaming_process: Process | None = None
104 self._stop_streaming_worker = Event()
105 self._streaming_process_ppid: int | None = None
107 self.__message_queue: "Queue[Any]" = Queue()
108 self.__sequence_number = 1
110 self.__output_location = None
111 self.__async_output = False
112 self.__json_output = False
113 self.__statsd_output = False
114 self.__print_output = False
115 self.__udp_output = False
116 self.__is_silent = False
118 if endpoint_type == MonitoringOutputType.FILE:
119 self.__output_location = endpoint_location
121 elif endpoint_type == MonitoringOutputType.SOCKET:
122 self.__output_location = endpoint_location
123 self.__async_output = True
125 elif endpoint_type == MonitoringOutputType.STDOUT:
126 self.__print_output = True
128 elif endpoint_type == MonitoringOutputType.UDP:
129 self.__output_location = endpoint_location
130 self.__udp_output = True
132 elif endpoint_type == MonitoringOutputType.SILENT:
133 self.__is_silent = True
135 else:
136 raise InvalidArgumentError(f"Invalid endpoint output type: [{endpoint_type}]")
138 self.__metric_prefix = metric_prefix
140 if serialisation_format == MonitoringOutputFormat.JSON:
141 self.__json_output = True
142 elif serialisation_format == MonitoringOutputFormat.STATSD:
143 self.__statsd_output = True
145 self.__tag_format = tag_format
146 self._additional_tags = additional_tags or {}
148 # --- Public API ---
150 @property
151 def is_enabled(self) -> bool:
152 """Whether monitoring is enabled.
154 The send_record methods perform this check so clients don't need to
155 check this before sending a record to the monitoring bus, but it is
156 provided for convenience."""
157 return self._streaming_process is not None and self._streaming_process.is_alive()
159 @property
160 def prints_records(self) -> bool:
161 """Whether or not messages are printed to standard output."""
162 return self.__print_output
164 @property
165 def is_silent(self) -> bool:
166 """Whether or not this is a silent monitoring bus."""
167 return self.__is_silent
169 def process_should_exit(self) -> bool:
170 """Whether the streaming worker process should exit.
172 The streaming worker process should exit if explicitly told to by the
173 _stop_streaming_worker event or if it has been orphaned by the parent.
174 """
176 try:
177 assert self._streaming_process_ppid, "Streaming process pid access before initialization"
178 os.kill(self._streaming_process_ppid, 0)
179 return self._stop_streaming_worker.is_set()
180 except ProcessLookupError:
181 LOGGER.info("Monitoring bus process was orphaned, exiting.")
182 return True
183 except Exception as e:
184 LOGGER.info(f"Exception when checking if parent process of monitoring bus is still alive, exiting: {e}")
185 return True
187 def __enter__(self) -> "MonitoringBus":
188 self.start()
189 return self
191 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
192 self.stop()
194 def start(self) -> None:
195 """Starts the monitoring bus worker task."""
196 if self.__is_silent or self._streaming_process is not None:
197 return
199 self._streaming_process = Process(target=self._streaming_worker)
200 self._streaming_process_ppid = os.getpid()
201 self._streaming_process.start()
203 def stop(self) -> None:
204 """Cancels the monitoring bus worker task."""
205 if self.__is_silent or self._streaming_process is None:
206 return
208 self._stop_streaming_worker.set()
209 self._streaming_process.join()
211 def prefix_record_nowait(self, record: MetricRecord) -> MetricRecord:
212 """Prefix the record's metric name. This is the same as prefix_record, but called synchronously.
214 See the prefix_record docstring for notes on the prefixing rules.
216 Args:
217 record (Message): The record to prefix.
218 """
219 record.name = f"{self.__metric_prefix}{record.name}"
220 return record
222 def send_record_nowait(self, record: MetricRecord | LogRecord) -> None:
223 """Publishes a record onto the bus synchronously.
225 Args:
226 record (Message): The record to send.
227 """
228 if not self.is_enabled:
229 return
231 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR:
232 record = self.prefix_record_nowait(cast(MetricRecord, record))
234 self.__message_queue.put_nowait(record)
236 # --- Private API ---
237 def _format_statsd_with_tags(self, name: str, tags: list[str], value: int | float, metric_type: str) -> str:
238 if not tags or self.__tag_format == StatsDTagFormat.NONE:
239 return f"{name}:{value}|{metric_type}\n"
241 if self.__tag_format == StatsDTagFormat.INFLUX_STATSD:
242 tag_string = ",".join(tags)
243 return f"{name},{tag_string}:{value}|{metric_type}\n"
245 elif self.__tag_format == StatsDTagFormat.DOG_STATSD:
246 tag_string = ",".join(tags)
247 return f"{name}:{value}|{metric_type}|#{tag_string}\n"
249 elif self.__tag_format == StatsDTagFormat.GRAPHITE:
250 tag_string = ";".join(tags)
251 return f"{name};{tag_string}:{value}|{metric_type}\n"
253 else:
254 return f"{name}:{value}|{metric_type}\n"
256 def _format_record_as_statsd_string(self, record: MetricRecord) -> str:
257 """Helper function to convert metrics to a string in the statsd format.
259 See https://github.com/statsd/statsd/blob/master/docs/metric_types.md for valid metric types.
261 Note that BuildGrid currently only supports Counters, Timers, and Gauges, and it has the custom
262 Distribution type as an alias for Timers.
264 Args:
265 record (Message): The record to convert.
266 """
268 tag_assignment_symbol = "="
269 if self.__tag_format == StatsDTagFormat.DOG_STATSD:
270 tag_assignment_symbol = ":"
272 for key, value in self._additional_tags.items():
273 if key not in record.metadata:
274 record.metadata[key] = value
276 tags = [
277 f"{key}{tag_assignment_symbol}{record.metadata[key]}"
278 for key in sorted(record.metadata.keys())
279 if str(record.metadata[key]) != ""
280 ]
282 if record.type == MetricRecord.COUNTER:
283 if record.count is None:
284 raise ValueError(f"COUNTER record {record.name} is missing a count")
285 return self._format_statsd_with_tags(record.name, tags, record.count, "c")
286 elif record.type is MetricRecord.TIMER:
287 if record.duration is None:
288 raise ValueError(f"TIMER record {record.name} is missing a duration")
289 return self._format_statsd_with_tags(record.name, tags, record.duration.ToMilliseconds(), "ms")
290 elif record.type is MetricRecord.DISTRIBUTION:
291 if record.count is None:
292 raise ValueError(f"DISTRIBUTION record {record.name} is missing a count")
293 return self._format_statsd_with_tags(record.name, tags, record.count, "ms")
294 elif record.type is MetricRecord.GAUGE:
295 if record.value is None:
296 raise ValueError(f"GAUGE record {record.name} is missing a value")
297 return self._format_statsd_with_tags(record.name, tags, record.value, "g")
298 raise ValueError("Unknown record type.")
300 def _streaming_worker(self) -> None:
301 """Fetch records from the monitoring queue, and publish them.
303 This method loops until the `self._stop_streaming_worker` event is set.
304 Intended to run in a subprocess, it fetches messages from the message
305 queue in this class, formats the record appropriately, and publishes
306 them to whatever output endpoints were specified in the configuration
307 passed to this monitoring bus.
309 This method won't exit immediately when `self._stop_streaming_worker`
310 is set. It may be waiting to fetch a message from the queue, which
311 blocks for up to a second. It also needs to do some cleanup of the
312 output endpoints once looping has finished.
314 """
316 def __streaming_worker(end_points: Sequence[MonitoringEndpoint]) -> bool:
317 """Get a LogRecord or a MetricRecord, and publish it.
319 This function fetches the next record from the internal queue,
320 formats it in the configured output style, and writes it to
321 the endpoints provided in `end_points`.
323 If there is no record available within 1 second, or the record
324 received wasn't a LogRecord or MetricRecord protobuf message,
325 then this function returns False. Otherwise this returns True
326 if publishing was successful (an exception will be raised if
327 publishing goes wrong for some reason).
329 Args:
330 end_points (List): The list of output endpoints to write
331 formatted records to.
333 Returns:
334 bool, indicating whether or not a record was written.
336 """
337 try:
338 record = self.__message_queue.get(timeout=1)
339 except Empty:
340 return False
342 message = BusMessage()
343 message.sequence_number = self.__sequence_number
345 if record.DESCRIPTOR is LogRecord.DESCRIPTOR:
346 message.log_record.CopyFrom(record)
348 elif record.DESCRIPTOR is MetricRecord.DESCRIPTOR:
349 message.metric_record.CopyFrom(record)
351 else:
352 return False
354 if self.__json_output:
355 blob_message = json_format.MessageToJson(message).encode()
357 for end_point in end_points:
358 end_point.write(blob_message)
360 elif self.__statsd_output:
361 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR:
362 statsd_message = self._format_record_as_statsd_string(record)
363 for end_point in end_points:
364 end_point.write(statsd_message.encode())
366 else:
367 blob_size = ctypes.c_uint32(message.ByteSize())
368 blob_message = message.SerializeToString()
370 for end_point in end_points:
371 end_point.write(bytes(blob_size))
372 end_point.write(blob_message)
374 return True
376 # TODO clean this up. Way too much happening to understand it well.
377 output_writers: list[Any] = []
378 output_file: Any = None
380 async def __client_connected_callback(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
381 output_writers.append(writer)
383 # TODO clean this up. Total mess of what type this should or can be.
384 async def _wait_closed(event: threading.Event, writer: Any) -> None:
385 try:
386 await writer.wait_closed()
387 finally:
388 event.set()
390 self.__event_loop = asyncio.new_event_loop()
392 # In good circumstances we stay in the first iteration of this loop forever.
393 # The loop exists so that the subprocess is more resilient to temporary
394 # failures (e.g. failing to connect to a socket immediately on startup)
395 while not self.process_should_exit():
396 try:
397 if self.__async_output and self.__output_location:
398 async_done = threading.Event()
400 async def _async_output() -> None:
401 await asyncio.start_unix_server(
402 __client_connected_callback, path=self.__output_location, loop=self.__event_loop
403 )
405 while not self.process_should_exit():
406 try:
407 if __streaming_worker(output_writers):
408 self.__sequence_number += 1
410 for writer in output_writers:
411 await writer.drain()
412 except asyncio.CancelledError:
413 raise
414 except Exception:
415 LOGGER.warning("Caught exception when publishing metric.", exc_info=True)
416 async_done.set()
418 asyncio.ensure_future(_async_output(), loop=self.__event_loop)
419 async_done.wait()
421 elif self.__udp_output and self.__output_location:
422 output_writers.append(UdpWrapper(self.__output_location))
423 while not self.process_should_exit():
424 try:
425 if __streaming_worker(output_writers):
426 self.__sequence_number += 1
427 except Exception:
428 LOGGER.warning("Caught exception when publishing metric.", exc_info=True)
430 elif self.__output_location:
431 with open(self.__output_location, mode="wb") as output_file:
432 output_writers.append(output_file)
434 while not self.process_should_exit():
435 try:
436 if __streaming_worker([output_file]):
437 self.__sequence_number += 1
439 output_file.flush()
440 except Exception:
441 LOGGER.warning("Caught exception when publishing metric.", exc_info=True)
443 elif self.__print_output:
444 output_writers.append(sys.stdout.buffer)
446 while not self.process_should_exit():
447 try:
448 if __streaming_worker(output_writers):
449 self.__sequence_number += 1
450 except Exception:
451 LOGGER.warning("Caught exception when publishing metric.", exc_info=True)
452 else:
453 LOGGER.error(
454 "Unsupported monitoring configuration, metrics won't be published.",
455 tags=dict(
456 output_location=self.__output_location,
457 async_output=self.__async_output,
458 udp_output=self.__udp_output,
459 print_output=self.__print_output,
460 ),
461 exc_info=True,
462 )
463 raise InvalidArgumentError("Unsupported monitoring configuration")
465 except Exception:
466 LOGGER.warning(
467 "Caught exception in metrics publisher loop, sleeping for 5s before retrying.", exc_info=True
468 )
469 time.sleep(5)
471 # We exited the publishing loop, which means we've been told to shutdown
472 # by the parent process. Clean up the output writers.
473 if output_file is not None:
474 output_file.close()
476 elif output_writers:
477 for writer in output_writers:
478 writer.close()
479 if self.__async_output and self.__output_location:
480 async_closed = threading.Event()
481 asyncio.ensure_future(_wait_closed(async_closed, writer))
482 async_closed.wait()
485MonitoringContext: "ContextVar[MonitoringBus]" = ContextVar(
486 "MonitoringBus", default=MonitoringBus(MonitoringOutputType.SILENT)
487)
490def set_monitoring_bus(monitoring_bus: MonitoringBus) -> None:
491 MonitoringContext.set(monitoring_bus)
494def get_monitoring_bus() -> MonitoringBus:
495 return MonitoringContext.get()