Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/monitoring.py: 40.81%
272 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import asyncio
17import ctypes
18import logging
19import os
20import socket
21import sys
22import threading
23import time
24from contextvars import ContextVar
25from enum import Enum
26from multiprocessing import Event, Process, Queue
27from queue import Empty
28from typing import IO, TYPE_CHECKING, Any, List, Optional, Sequence, Union, cast
30from google.protobuf import json_format
32from buildgrid._exceptions import InvalidArgumentError
33from buildgrid._protos.buildgrid.v2.monitoring_pb2 import BusMessage, LogRecord, MetricRecord
35if TYPE_CHECKING:
36 from asyncio.streams import StreamWriter
38LOGGER = logging.getLogger(__name__)
41class MonitoringOutputType(Enum):
42 # Standard output stream.
43 STDOUT = "stdout"
44 # On-disk file.
45 FILE = "file"
46 # UNIX domain socket.
47 SOCKET = "socket"
48 # UDP IP:port
49 UDP = "udp"
50 # Silent
51 SILENT = "silent"
54class MonitoringOutputFormat(Enum):
55 # Protobuf binary format.
56 BINARY = "binary"
57 # JSON format.
58 JSON = "json"
59 # StatsD format. Only metrics are kept - logs are dropped.
60 STATSD = "statsd"
63class StatsDTagFormat(Enum):
64 NONE = "none"
65 INFLUX_STATSD = "influx-statsd"
66 DOG_STATSD = "dogstatsd"
67 GRAPHITE = "graphite"
70class UdpWrapper:
71 """Wraps socket sendto() in write() so it can be used polymorphically"""
73 def __init__(self, endpoint_location: str) -> None:
74 try:
75 addr, port = endpoint_location.split(":")
76 self._addr, self._port = addr, int(port)
77 except ValueError as e:
78 error_msg = f"udp endpoint-location {endpoint_location} does not have the form address:port"
79 raise ValueError(error_msg) from e
80 self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
82 def write(self, message: bytes) -> None:
83 self._socket.sendto(message, (self._addr, self._port))
85 def close(self) -> None:
86 return self._socket.close()
89MonitoringEndpoint = Union[IO, UdpWrapper, "StreamWriter"]
92EXCLUDED_METADATA_KEYS = ["instance-name", "statsd-bucket"]
95class MonitoringBus:
96 def __init__(
97 self,
98 endpoint_type: MonitoringOutputType = MonitoringOutputType.SOCKET,
99 endpoint_location: Optional[str] = None,
100 metric_prefix: str = "",
101 serialisation_format: MonitoringOutputFormat = MonitoringOutputFormat.STATSD,
102 tag_format: StatsDTagFormat = StatsDTagFormat.NONE,
103 ) -> None:
104 self.__event_loop: Optional[asyncio.AbstractEventLoop] = None
105 self._streaming_process: Optional[Process] = None
106 self._stop_streaming_worker = Event()
107 self._streaming_process_ppid: Optional[int] = None
109 self.__message_queue: "Queue[Any]" = Queue()
110 self.__sequence_number = 1
112 self.__output_location = None
113 self.__async_output = False
114 self.__json_output = False
115 self.__statsd_output = False
116 self.__print_output = False
117 self.__udp_output = False
118 self.__is_silent = False
120 if endpoint_type == MonitoringOutputType.FILE:
121 self.__output_location = endpoint_location
123 elif endpoint_type == MonitoringOutputType.SOCKET:
124 self.__output_location = endpoint_location
125 self.__async_output = True
127 elif endpoint_type == MonitoringOutputType.STDOUT:
128 self.__print_output = True
130 elif endpoint_type == MonitoringOutputType.UDP:
131 self.__output_location = endpoint_location
132 self.__udp_output = True
134 elif endpoint_type == MonitoringOutputType.SILENT:
135 self.__is_silent = True
137 else:
138 raise InvalidArgumentError(f"Invalid endpoint output type: [{endpoint_type}]")
140 self.__metric_prefix = metric_prefix
142 if serialisation_format == MonitoringOutputFormat.JSON:
143 self.__json_output = True
144 elif serialisation_format == MonitoringOutputFormat.STATSD:
145 self.__statsd_output = True
147 self.__tag_format = tag_format
149 # --- Public API ---
151 @property
152 def is_enabled(self) -> bool:
153 """Whether monitoring is enabled.
155 The send_record methods perform this check so clients don't need to
156 check this before sending a record to the monitoring bus, but it is
157 provided for convenience."""
158 return self._streaming_process is not None and self._streaming_process.is_alive()
160 @property
161 def prints_records(self) -> bool:
162 """Whether or not messages are printed to standard output."""
163 return self.__print_output
165 @property
166 def is_silent(self) -> bool:
167 """Whether or not this is a silent monitoring bus."""
168 return self.__is_silent
170 def process_should_exit(self) -> bool:
171 """Whether the streaming worker process should exit.
173 The streaming worker process should exit if explicitly told to by the
174 _stop_streaming_worker event or if it has been orphaned by the parent.
175 """
177 try:
178 assert self._streaming_process_ppid, "Streaming process pid access before initialization"
179 os.kill(self._streaming_process_ppid, 0)
180 return self._stop_streaming_worker.is_set()
181 except ProcessLookupError:
182 LOGGER.info("Monitoring bus process was orphaned, exiting")
183 return True
184 except Exception as e:
185 LOGGER.info(f"Exception when checking if parent process of monitoring bus is still alive, exiting: {e}")
186 return True
188 def __enter__(self) -> "MonitoringBus":
189 self.start()
190 return self
192 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
193 self.stop()
195 def start(self) -> None:
196 """Starts the monitoring bus worker task."""
197 if self.__is_silent or self._streaming_process is not None:
198 return
200 self._streaming_process = Process(target=self._streaming_worker)
201 self._streaming_process_ppid = os.getpid()
202 self._streaming_process.start()
204 def stop(self) -> None:
205 """Cancels the monitoring bus worker task."""
206 if self.__is_silent or self._streaming_process is None:
207 return
209 self._stop_streaming_worker.set()
210 self._streaming_process.join()
212 def prefix_record_nowait(self, record: MetricRecord) -> MetricRecord:
213 """Prefix the record's metric name. This is the same as prefix_record, but called synchronously.
215 See the prefix_record docstring for notes on the prefixing rules.
217 Args:
218 record (Message): The record to prefix.
219 """
220 instance_name = record.metadata.get("instance-name")
221 if instance_name is not None:
222 # This is an instance metric, so we'll add the instance name
223 # to the prefix if it isn't empty
224 if instance_name:
225 instance_name = instance_name + "."
227 # Prefix the metric with "instance_name.instance." if the instance
228 # name isn't empty, otherwise just "instance."
229 record.name = f"{self.__metric_prefix}{instance_name}instance.{record.name}"
231 else:
232 # Not an instance metric
233 record.name = f"{self.__metric_prefix}{record.name}"
235 return record
237 def send_record_nowait(self, record: Union[MetricRecord, LogRecord]) -> None:
238 """Publishes a record onto the bus synchronously.
240 Args:
241 record (Message): The record to send.
242 """
243 if not self.is_enabled:
244 return
246 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR:
247 record = self.prefix_record_nowait(cast(MetricRecord, record))
249 self.__message_queue.put_nowait(record)
251 # --- Private API ---
252 def _format_statsd_with_tags(self, name: str, tags: List[str], value: Union[int, float], metric_type: str) -> str:
253 if not tags or self.__tag_format == StatsDTagFormat.NONE:
254 return f"{name}:{value}|{metric_type}\n"
256 if self.__tag_format == StatsDTagFormat.INFLUX_STATSD:
257 tag_string = ",".join(tags)
258 return f"{name},{tag_string}:{value}|{metric_type}\n"
260 elif self.__tag_format == StatsDTagFormat.DOG_STATSD:
261 tag_string = ",".join(tags)
262 return f"{name}:{value}|{metric_type}|#{tag_string}\n"
264 elif self.__tag_format == StatsDTagFormat.GRAPHITE:
265 tag_string = ";".join(tags)
266 return f"{name};{tag_string}:{value}|{metric_type}\n"
268 else:
269 return f"{name}:{value}|{metric_type}\n"
271 def _format_record_as_statsd_string(self, record: MetricRecord) -> str:
272 """Helper function to convert metrics to a string in the statsd format.
274 See https://github.com/statsd/statsd/blob/master/docs/metric_types.md for valid metric types.
276 Note that BuildGrid currently only supports Counters, Timers, and Gauges, and it has the custom
277 Distribution type as an alias for Timers.
279 Args:
280 record (Message): The record to convert.
281 """
282 bucket = record.metadata.get("statsd-bucket")
284 tag_assignment_symbol = "="
285 if self.__tag_format == StatsDTagFormat.DOG_STATSD:
286 tag_assignment_symbol = ":"
287 tags = [
288 f"{key}{tag_assignment_symbol}{value}"
289 for key, value in record.metadata.items()
290 if key not in EXCLUDED_METADATA_KEYS and str(value) != ""
291 ]
293 if bucket:
294 record.name = f"{record.name}.{bucket}"
295 if record.type == MetricRecord.COUNTER:
296 if record.count is None:
297 raise ValueError(f"COUNTER record {record.name} is missing a count")
298 return self._format_statsd_with_tags(record.name, tags, record.count, "c")
299 elif record.type is MetricRecord.TIMER:
300 if record.duration is None:
301 raise ValueError(f"TIMER record {record.name} is missing a duration")
302 return self._format_statsd_with_tags(record.name, tags, record.duration.ToMilliseconds(), "ms")
303 elif record.type is MetricRecord.DISTRIBUTION:
304 if record.count is None:
305 raise ValueError(f"DISTRIBUTION record {record.name} is missing a count")
306 return self._format_statsd_with_tags(record.name, tags, record.count, "ms")
307 elif record.type is MetricRecord.GAUGE:
308 if record.value is None:
309 raise ValueError(f"GAUGE record {record.name} is missing a value")
310 return self._format_statsd_with_tags(record.name, tags, record.value, "g")
311 raise ValueError("Unknown record type.")
313 def _streaming_worker(self) -> None:
314 """Fetch records from the monitoring queue, and publish them.
316 This method loops until the `self._stop_streaming_worker` event is set.
317 Intended to run in a subprocess, it fetches messages from the message
318 queue in this class, formats the record appropriately, and publishes
319 them to whatever output endpoints were specified in the configuration
320 passed to this monitoring bus.
322 This method won't exit immediately when `self._stop_streaming_worker`
323 is set. It may be waiting to fetch a message from the queue, which
324 blocks for up to a second. It also needs to do some cleanup of the
325 output endpoints once looping has finished.
327 """
329 def __streaming_worker(end_points: Sequence[MonitoringEndpoint]) -> bool:
330 """Get a LogRecord or a MetricRecord, and publish it.
332 This function fetches the next record from the internal queue,
333 formats it in the configured output style, and writes it to
334 the endpoints provided in `end_points`.
336 If there is no record available within 1 second, or the record
337 received wasn't a LogRecord or MetricRecord protobuf message,
338 then this function returns False. Otherwise this returns True
339 if publishing was successful (an exception will be raised if
340 publishing goes wrong for some reason).
342 Args:
343 end_points (List): The list of output endpoints to write
344 formatted records to.
346 Returns:
347 bool, indicating whether or not a record was written.
349 """
350 try:
351 record = self.__message_queue.get(timeout=1)
352 except Empty:
353 return False
355 message = BusMessage()
356 message.sequence_number = self.__sequence_number
358 if record.DESCRIPTOR is LogRecord.DESCRIPTOR:
359 message.log_record.CopyFrom(record)
361 elif record.DESCRIPTOR is MetricRecord.DESCRIPTOR:
362 message.metric_record.CopyFrom(record)
364 else:
365 return False
367 if self.__json_output:
368 blob_message = json_format.MessageToJson(message).encode()
370 for end_point in end_points:
371 end_point.write(blob_message)
373 elif self.__statsd_output:
374 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR:
375 statsd_message = self._format_record_as_statsd_string(record)
376 for end_point in end_points:
377 end_point.write(statsd_message.encode())
379 else:
380 blob_size = ctypes.c_uint32(message.ByteSize())
381 blob_message = message.SerializeToString()
383 for end_point in end_points:
384 end_point.write(bytes(blob_size))
385 end_point.write(blob_message)
387 return True
389 # TODO clean this up. Way too much happening to understand it well.
390 output_writers: List[Any] = []
391 output_file: Any = None
393 async def __client_connected_callback(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
394 output_writers.append(writer)
396 # TODO clean this up. Total mess of what type this should or can be.
397 async def _wait_closed(event: threading.Event, writer: Any) -> None:
398 try:
399 await writer.wait_closed()
400 finally:
401 event.set()
403 self.__event_loop = asyncio.new_event_loop()
405 # In good circumstances we stay in the first iteration of this loop forever.
406 # The loop exists so that the subprocess is more resilient to temporary
407 # failures (e.g. failing to connect to a socket immediately on startup)
408 while not self.process_should_exit():
409 try:
410 if self.__async_output and self.__output_location:
411 async_done = threading.Event()
413 async def _async_output() -> None:
414 await asyncio.start_unix_server(
415 __client_connected_callback, path=self.__output_location, loop=self.__event_loop
416 )
418 while not self.process_should_exit():
419 try:
420 if __streaming_worker(output_writers):
421 self.__sequence_number += 1
423 for writer in output_writers:
424 await writer.drain()
425 except asyncio.CancelledError:
426 raise
427 except Exception:
428 LOGGER.warning("Caught exception when publishing metric", exc_info=True)
429 async_done.set()
431 asyncio.ensure_future(_async_output(), loop=self.__event_loop)
432 async_done.wait()
434 elif self.__udp_output and self.__output_location:
435 output_writers.append(UdpWrapper(self.__output_location))
436 while not self.process_should_exit():
437 try:
438 if __streaming_worker(output_writers):
439 self.__sequence_number += 1
440 except Exception:
441 LOGGER.warning("Caught exception when publishing metric", exc_info=True)
443 elif self.__output_location:
444 with open(self.__output_location, mode="wb") as output_file:
445 output_writers.append(output_file)
447 while not self.process_should_exit():
448 try:
449 if __streaming_worker([output_file]):
450 self.__sequence_number += 1
452 output_file.flush()
453 except Exception:
454 LOGGER.warning("Caught exception when publishing metric", exc_info=True)
456 elif self.__print_output:
457 output_writers.append(sys.stdout.buffer)
459 while not self.process_should_exit():
460 try:
461 if __streaming_worker(output_writers):
462 self.__sequence_number += 1
463 except Exception:
464 LOGGER.warning("Caught exception when publishing metric", exc_info=True)
465 else:
466 LOGGER.error(
467 "Unsupported monitoring configuration, metrics won't be published."
468 f"output_location={self.__output_location}, "
469 f"async_output={self.__async_output}, "
470 f"udp_output={self.__udp_output}",
471 f"print_output={self.__print_output}",
472 exc_info=True,
473 )
474 raise InvalidArgumentError("Unsupported monitoring configuration")
476 except Exception:
477 LOGGER.warning(
478 "Caught exception in metrics publisher loop, sleeping for 5s before retrying", exc_info=True
479 )
480 time.sleep(5)
482 # We exited the publishing loop, which means we've been told to shutdown
483 # by the parent process. Clean up the output writers.
484 if output_file is not None:
485 output_file.close()
487 elif output_writers:
488 for writer in output_writers:
489 writer.close()
490 if self.__async_output and self.__output_location:
491 async_closed = threading.Event()
492 asyncio.ensure_future(_wait_closed(async_closed, writer))
493 async_closed.wait()
496MonitoringContext: "ContextVar[MonitoringBus]" = ContextVar(
497 "MonitoringBus", default=MonitoringBus(MonitoringOutputType.SILENT)
498)
501def set_monitoring_bus(monitoring_bus: MonitoringBus) -> None:
502 MonitoringContext.set(monitoring_bus)
505def get_monitoring_bus() -> MonitoringBus:
506 return MonitoringContext.get()