Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/monitoring.py: 40.49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

247 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import asyncio 

17import ctypes 

18from enum import Enum 

19import logging 

20from multiprocessing import Event, Process, Queue 

21from queue import Empty 

22import sys 

23import socket 

24import time 

25import threading 

26from typing import IO, TYPE_CHECKING, List, Optional, Union 

27 

28from google.protobuf import json_format 

29 

30from buildgrid._exceptions import InvalidArgumentError 

31from buildgrid._protos.buildgrid.v2.monitoring_pb2 import LogRecord, MetricRecord, BusMessage 

32 

33if TYPE_CHECKING: 

34 from asyncio.streams import StreamWriter 

35 

36 

37class MonitoringOutputType(Enum): 

38 # Standard output stream. 

39 STDOUT = 'stdout' 

40 # On-disk file. 

41 FILE = 'file' 

42 # UNIX domain socket. 

43 SOCKET = 'socket' 

44 # UDP IP:port 

45 UDP = 'udp' 

46 # Silent 

47 SILENT = 'silent' 

48 

49 

50class MonitoringOutputFormat(Enum): 

51 # Protobuf binary format. 

52 BINARY = 'binary' 

53 # JSON format. 

54 JSON = 'json' 

55 # StatsD format. Only metrics are kept - logs are dropped. 

56 STATSD = 'statsd' 

57 

58 

59class UdpWrapper: 

60 """ Wraps socket sendto() in write() so it can be used polymorphically """ 

61 

62 def __init__(self, endpoint_location): 

63 try: 

64 self._addr, self._port = endpoint_location.split(":") 

65 self._port = int(self._port) 

66 except ValueError as e: 

67 error_msg = f"udp endpoint-location {endpoint_location} does not have the form address:port" 

68 raise ValueError(error_msg) from e 

69 self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

70 

71 def write(self, message): 

72 self._socket.sendto(message, (self._addr, self._port)) 

73 

74 

75MonitoringEndpoint = Union[IO, UdpWrapper, "StreamWriter"] 

76 

77 

78class _MonitoringBus: 

79 """ Class representing a singleton monitoring bus. Only one should exist at a time (as _instance). """ 

80 _instance: Optional["_MonitoringBus"] = None # The type needs quotes because it's the same as the class 

81 

82 def __init__(self, endpoint_type=MonitoringOutputType.SOCKET, 

83 endpoint_location=None, 

84 metric_prefix="", 

85 serialisation_format=MonitoringOutputFormat.STATSD): 

86 self._logger = logging.getLogger(__name__) 

87 self.__event_loop = None 

88 self._streaming_process = None 

89 self._stop_streaming_worker = Event() 

90 

91 self.__message_queue = Queue() 

92 self.__sequence_number = 1 

93 

94 self.__output_location = None 

95 self.__async_output = False 

96 self.__json_output = False 

97 self.__statsd_output = False 

98 self.__print_output = False 

99 self.__udp_output = False 

100 self.__is_silent = False 

101 

102 if endpoint_type == MonitoringOutputType.FILE: 

103 self.__output_location = endpoint_location 

104 

105 elif endpoint_type == MonitoringOutputType.SOCKET: 

106 self.__output_location = endpoint_location 

107 self.__async_output = True 

108 

109 elif endpoint_type == MonitoringOutputType.STDOUT: 

110 self.__print_output = True 

111 

112 elif endpoint_type == MonitoringOutputType.UDP: 

113 self.__output_location = endpoint_location 

114 self.__udp_output = True 

115 

116 elif endpoint_type == MonitoringOutputType.SILENT: 

117 self.__is_silent = True 

118 

119 else: 

120 raise InvalidArgumentError( 

121 f"Invalid endpoint output type: [{endpoint_type}]") 

122 

123 self.__metric_prefix = metric_prefix 

124 

125 if serialisation_format == MonitoringOutputFormat.JSON: 

126 self.__json_output = True 

127 elif serialisation_format == MonitoringOutputFormat.STATSD: 

128 self.__statsd_output = True 

129 

130 # --- Public API --- 

131 

132 @property 

133 def is_enabled(self) -> bool: 

134 """Whether monitoring is enabled. 

135 

136 The send_record methods perform this check so clients don't need to 

137 check this before sending a record to the monitoring bus, but it is 

138 provided for convenience. """ 

139 return self._streaming_process is not None and self._streaming_process.is_alive() 

140 

141 @property 

142 def prints_records(self) -> bool: 

143 """Whether or not messages are printed to standard output.""" 

144 return self.__print_output 

145 

146 @property 

147 def is_silent(self) -> bool: 

148 """Whether or not this is a silent monitoring bus.""" 

149 return self.__is_silent 

150 

151 def start(self) -> None: 

152 """Starts the monitoring bus worker task.""" 

153 if self.__is_silent or self._streaming_process is not None: 

154 return 

155 

156 self._streaming_process = Process(target=self._streaming_worker) 

157 self._streaming_process.start() 

158 

159 def stop(self) -> None: 

160 """Cancels the monitoring bus worker task.""" 

161 if self.__is_silent or self._streaming_process is None: 

162 return 

163 

164 self._stop_streaming_worker.set() 

165 self._streaming_process.join() 

166 

167 async def prefix_record(self, record: MetricRecord) -> MetricRecord: 

168 """ Prefix the record's metric name. 

169 

170 The new metric name is built according to the following rules. Each element is separated 

171 with a dot (.): 

172 

173 1. The custom prefix specified in the configuration, if any. 

174 2. If this is an instance metric with a nonempty instance name, the instance name. 

175 3. If this is an instant metric (even with an empty instance name), "instance". 

176 4. The original record name. 

177 

178 For example, consider the "widgets-built" metric, and let's say we have a custom prefix of "mycompany". 

179 

180 If "widgets-built" is not an instanced metric: "mycompany.widgets-built". 

181 If "widgets-built" is an instanced metric in an empty instance: "mycompany.instance.widgets-built". 

182 If "widgets-built" is an instanced metric in an instance named "dev": "mycompany.dev.instance.widgets-built". 

183 

184 Args: 

185 record (Message): The record to prefix. 

186 """ 

187 instance_name = record.metadata.get('instance-name') 

188 if instance_name is not None: 

189 # This is an instance metric, so we'll add the instance name 

190 # to the prefix if it isn't empty 

191 if instance_name: 

192 instance_name = instance_name + "." 

193 

194 # Prefix the metric with "instance_name.instance." if the instance 

195 # name isn't empty, otherwise just "instance." 

196 record.name = f"{self.__metric_prefix}{instance_name}instance.{record.name}" 

197 

198 else: 

199 # Not an instance metric 

200 record.name = f"{self.__metric_prefix}{record.name}" 

201 

202 return record 

203 

204 async def send_record(self, record: MetricRecord) -> None: 

205 """Publishes a record onto the bus asynchronously. 

206 

207 Args: 

208 record (Message): The record to send. 

209 """ 

210 if not self.is_enabled: 

211 return 

212 

213 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

214 record = await self.prefix_record(record) 

215 

216 self.__message_queue.put(record) 

217 

218 def prefix_record_nowait(self, record: MetricRecord) -> MetricRecord: 

219 """ Prefix the record's metric name. This is the same as prefix_record, but called synchronously. 

220 

221 See the prefix_record docstring for notes on the prefixing rules. 

222 

223 Args: 

224 record (Message): The record to prefix. 

225 """ 

226 instance_name = record.metadata.get('instance-name') 

227 if instance_name is not None: 

228 # This is an instance metric, so we'll add the instance name 

229 # to the prefix if it isn't empty 

230 if instance_name: 

231 instance_name = instance_name + "." 

232 

233 # Prefix the metric with "instance_name.instance." if the instance 

234 # name isn't empty, otherwise just "instance." 

235 record.name = f"{self.__metric_prefix}{instance_name}instance.{record.name}" 

236 

237 else: 

238 # Not an instance metric 

239 record.name = f"{self.__metric_prefix}{record.name}" 

240 

241 return record 

242 

243 def send_record_nowait(self, record: MetricRecord) -> None: 

244 """Publishes a record onto the bus synchronously. 

245 

246 Args: 

247 record (Message): The record to send. 

248 """ 

249 if not self.is_enabled: 

250 return 

251 

252 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

253 record = self.prefix_record_nowait(record) 

254 

255 self.__message_queue.put_nowait(record) 

256 

257 # --- Private API --- 

258 

259 @staticmethod 

260 def _format_record_as_stasd_string(record: MetricRecord) -> str: 

261 """ Helper function to convert metrics to a string in the statsd format. 

262 

263 See https://github.com/statsd/statsd/blob/master/docs/metric_types.md for valid metric types. 

264 

265 Note that BuildGrid currently only supports Counters, Timers, and Gauges, and it has the custom 

266 Distribution type as an alias for Timers. 

267 

268 Args: 

269 record (Message): The record to convert. 

270 """ 

271 bucket = record.metadata.get("statsd-bucket") 

272 if bucket: 

273 record.name = f"{record.name}.{bucket}" 

274 if record.type == MetricRecord.COUNTER: 

275 if record.count is None: 

276 raise ValueError( 

277 f"COUNTER record {record.name} is missing a count") 

278 return f"{record.name}:{record.count}|c\n" 

279 elif record.type is MetricRecord.TIMER: 

280 if record.duration is None: 

281 raise ValueError( 

282 f"TIMER record {record.name} is missing a duration") 

283 return f"{record.name}:{record.duration.ToMilliseconds()}|ms\n" 

284 elif record.type is MetricRecord.DISTRIBUTION: 

285 if record.count is None: 

286 raise ValueError( 

287 f"DISTRIBUTION record {record.name} is missing a count") 

288 return f"{record.name}:{record.count}|ms\n" 

289 elif record.type is MetricRecord.GAUGE: 

290 if record.value is None: 

291 raise ValueError( 

292 f"GAUGE record {record.name} is missing a value") 

293 return f"{record.name}:{record.value}|g\n" 

294 raise ValueError("Unknown record type.") 

295 

296 def _streaming_worker(self) -> None: 

297 """Fetch records from the monitoring queue, and publish them. 

298 

299 This method loops until the `self._stop_streaming_worker` event is set. 

300 Intended to run in a subprocess, it fetches messages from the message 

301 queue in this class, formats the record appropriately, and publishes 

302 them to whatever output endpoints were specified in the configuration 

303 passed to this monitoring bus. 

304 

305 This method won't exit immediately when `self._stop_streaming_worker` 

306 is set. It may be waiting to fetch a message from the queue, which 

307 blocks for up to a second. It also needs to do some cleanup of the 

308 output endpoints once looping has finished. 

309 

310 """ 

311 

312 def __streaming_worker(end_points: List[MonitoringEndpoint]) -> bool: 

313 """Get a LogRecord or a MetricRecord, and publish it. 

314 

315 This function fetches the next record from the internal queue, 

316 formats it in the configured output style, and writes it to 

317 the endpoints provided in `end_points`. 

318 

319 If there is no record available within 1 second, or the record 

320 received wasn't a LogRecord or MetricRecord protobuf message, 

321 then this function returns False. Otherwise this returns True 

322 if publishing was successful (an exception will be raised if 

323 publishing goes wrong for some reason). 

324 

325 Args: 

326 end_points (List): The list of output endpoints to write 

327 formatted records to. 

328 

329 Returns: 

330 bool, indicating whether or not a record was written. 

331 

332 """ 

333 try: 

334 record = self.__message_queue.get(timeout=1) 

335 except Empty: 

336 return False 

337 

338 message = BusMessage() 

339 message.sequence_number = self.__sequence_number 

340 

341 if record.DESCRIPTOR is LogRecord.DESCRIPTOR: 

342 message.log_record.CopyFrom(record) 

343 

344 elif record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

345 message.metric_record.CopyFrom(record) 

346 

347 else: 

348 return False 

349 

350 if self.__json_output: 

351 blob_message = json_format.MessageToJson(message).encode() 

352 

353 for end_point in end_points: 

354 end_point.write(blob_message) 

355 

356 elif self.__statsd_output: 

357 if record.DESCRIPTOR is MetricRecord.DESCRIPTOR: 

358 statsd_message = _MonitoringBus._format_record_as_stasd_string( 

359 record) 

360 for end_point in end_points: 

361 end_point.write(statsd_message.encode()) 

362 

363 else: 

364 blob_size = ctypes.c_uint32(message.ByteSize()) 

365 blob_message = message.SerializeToString() 

366 

367 for end_point in end_points: 

368 end_point.write(bytes(blob_size)) # type: ignore 

369 end_point.write(blob_message) 

370 

371 return True 

372 

373 output_writers, output_file = [], None 

374 

375 async def __client_connected_callback(reader, writer) -> None: 

376 output_writers.append(writer) 

377 

378 async def _wait_closed(event, writer): 

379 try: 

380 await writer.wait_closed() 

381 finally: 

382 event.set() 

383 

384 self.__event_loop = asyncio.new_event_loop() 

385 

386 # In good circumstances we stay in the first iteration of this loop forever. 

387 # The loop exists so that the subprocess is more resilient to temporary 

388 # failures (e.g. failing to connect to a socket immediately on startup) 

389 while not self._stop_streaming_worker.is_set(): 

390 try: 

391 if self.__async_output and self.__output_location: 

392 async_done = threading.Event() 

393 

394 async def _async_output(): 

395 await asyncio.start_unix_server( 

396 __client_connected_callback, path=self.__output_location, 

397 loop=self.__event_loop) 

398 

399 while not self._stop_streaming_worker.is_set(): 

400 try: 

401 if __streaming_worker(output_writers): 

402 self.__sequence_number += 1 

403 

404 for writer in output_writers: 

405 await writer.drain() 

406 except asyncio.CancelledError: 

407 raise 

408 except Exception: 

409 self._logger.warning( 

410 "Caught exception when publishing metric", exc_info=True) 

411 async_done.set() 

412 

413 asyncio.ensure_future(_async_output(), loop=self.__event_loop) 

414 async_done.wait() 

415 

416 elif self.__udp_output and self.__output_location: 

417 output_writers.append(UdpWrapper(self.__output_location)) 

418 while not self._stop_streaming_worker.is_set(): 

419 try: 

420 if __streaming_worker(output_writers): 

421 self.__sequence_number += 1 

422 except Exception: 

423 self._logger.warning( 

424 "Caught exception when publishing metric", exc_info=True) 

425 

426 elif self.__output_location: 

427 with open(self.__output_location, mode='wb') as output_file: 

428 

429 output_writers.append(output_file) 

430 

431 while not self._stop_streaming_worker.is_set(): 

432 try: 

433 if __streaming_worker([output_file]): 

434 self.__sequence_number += 1 

435 

436 output_file.flush() 

437 except Exception: 

438 self._logger.warning( 

439 "Caught exception when publishing metric", exc_info=True) 

440 

441 elif self.__print_output: 

442 output_writers.append(sys.stdout.buffer) 

443 

444 while not self._stop_streaming_worker.is_set(): 

445 try: 

446 if __streaming_worker(output_writers): 

447 self.__sequence_number += 1 

448 except Exception: 

449 self._logger.warning( 

450 "Caught exception when publishing metric", exc_info=True) 

451 else: 

452 self._logger.error( 

453 "Unsupported monitoring configuration, metrics won't be published." 

454 f"output_location={self.__output_location}, " 

455 f"async_output={self.__async_output}, " 

456 f"udp_output={self.__udp_output}", 

457 f"print_output={self.__print_output}", 

458 exc_info=True 

459 ) 

460 raise InvalidArgumentError("Unsupported monitoring configuration") 

461 

462 except Exception: 

463 self._logger.warning( 

464 "Caught exception in metrics publisher loop, sleeping for 5s before retrying", 

465 exc_info=True) 

466 time.sleep(5) 

467 

468 # We exited the publishing loop, which means we've been told to shutdown 

469 # by the parent process. Clean up the output writers. 

470 if output_file is not None: 

471 output_file.close() 

472 

473 elif output_writers: 

474 for writer in output_writers: 

475 writer.close() 

476 if self.__async_output and self.__output_location: 

477 async_closed = threading.Event() 

478 asyncio.ensure_future(_wait_closed(async_closed, writer)) 

479 async_closed.wait() 

480 

481 

482def setup_monitoring_bus( 

483 endpoint_type: MonitoringOutputType = MonitoringOutputType.SOCKET, 

484 endpoint_location: str = None, 

485 metric_prefix: str = "", 

486 serialisation_format: MonitoringOutputFormat = MonitoringOutputFormat.STATSD) -> _MonitoringBus: 

487 """ Sets up the monitoring bus. 

488 

489 Throws an error monitoring bus has already been set up (either via a previous call to setup_monitoring_bus or 

490 an earlier call to get_monitoring_bus that set up a silent bus). """ 

491 

492 # If we currently have a silent monitoring bus, there's no harm in reconfiguring to use a real one. 

493 if _MonitoringBus._instance is not None and not _MonitoringBus._instance.is_silent: 

494 raise ValueError("A monitoring bus was already created, either with setup_monitoring_bus or" 

495 "implicitly with get_monitoring_bus. Please ensure that you are not attempting" 

496 "to call setup_monitoring_bus after calling either of those functions.") 

497 _MonitoringBus._instance = _MonitoringBus( 

498 endpoint_type, endpoint_location, metric_prefix, serialisation_format) 

499 return _MonitoringBus._instance 

500 

501 

502def get_monitoring_bus() -> _MonitoringBus: 

503 """ Get the monitoring bus. 

504 

505 If a monitoring bus does not exist, a silent bus will be created and no metrics will be published. """ 

506 if _MonitoringBus._instance is None: 

507 logger = logging.getLogger(__name__) 

508 logger.info( 

509 "get_monitoring_bus() was called before setup_monitoring_bus; no metrics will be published.") 

510 return setup_monitoring_bus(MonitoringOutputType.SILENT) 

511 return _MonitoringBus._instance