Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/metrics_utils.py: 95.07%

223 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import functools 

16import logging 

17import threading 

18import time 

19from datetime import timedelta 

20from typing import Any, Callable, Dict, Optional, Tuple, Type, TypeVar, cast 

21 

22from buildgrid._enums import MetricRecordType 

23from buildgrid._exceptions import BgdError 

24from buildgrid._protos.buildgrid.v2.monitoring_pb2 import MetricRecord 

25from buildgrid.server.monitoring import get_monitoring_bus 

26 

27 

28def create_counter_record(name: str, count: float, metadata: Optional[Dict[str, str]] = None) -> MetricRecord: 

29 counter_record = MetricRecord() 

30 

31 counter_record.creation_timestamp.GetCurrentTime() 

32 counter_record.type = MetricRecordType.COUNTER.value 

33 counter_record.name = name 

34 counter_record.count = count 

35 if metadata is not None: 

36 counter_record.metadata.update(metadata) 

37 

38 return counter_record 

39 

40 

41def create_gauge_record(name: str, value: float, metadata: Optional[Dict[str, str]] = None) -> MetricRecord: 

42 gauge_record = MetricRecord() 

43 

44 gauge_record.creation_timestamp.GetCurrentTime() 

45 gauge_record.type = MetricRecordType.GAUGE.value 

46 gauge_record.name = name 

47 gauge_record.value = value 

48 if metadata is not None: 

49 gauge_record.metadata.update(metadata) 

50 

51 return gauge_record 

52 

53 

54def create_timer_record(name: str, duration: timedelta, metadata: Optional[Dict[str, str]] = None) -> MetricRecord: 

55 timer_record = MetricRecord() 

56 

57 timer_record.creation_timestamp.GetCurrentTime() 

58 timer_record.type = MetricRecordType.TIMER.value 

59 timer_record.name = name 

60 timer_record.duration.FromTimedelta(duration) 

61 if metadata is not None: 

62 timer_record.metadata.update(metadata) 

63 

64 return timer_record 

65 

66 

67def create_distribution_record(name: str, value: float, metadata: Optional[Dict[str, str]] = None) -> MetricRecord: 

68 dist_record = MetricRecord() 

69 

70 dist_record.creation_timestamp.GetCurrentTime() 

71 dist_record.type = MetricRecordType.DISTRIBUTION.value 

72 dist_record.name = name 

73 dist_record.count = value 

74 if metadata is not None: 

75 dist_record.metadata.update(metadata) 

76 

77 return dist_record 

78 

79 

80def publish_counter_metric(name: str, count: float, metadata: Optional[Dict[str, str]] = None) -> None: 

81 record = create_counter_record(name, count, metadata) 

82 monitoring_bus = get_monitoring_bus() 

83 monitoring_bus.send_record_nowait(record) 

84 

85 

86def publish_gauge_metric(name: str, value: float, metadata: Optional[Dict[str, str]] = None) -> None: 

87 record = create_gauge_record(name, value, metadata) 

88 monitoring_bus = get_monitoring_bus() 

89 monitoring_bus.send_record_nowait(record) 

90 

91 

92def publish_timer_metric(name: str, duration: timedelta, metadata: Optional[Dict[str, str]] = None) -> None: 

93 record = create_timer_record(name, duration, metadata) 

94 monitoring_bus = get_monitoring_bus() 

95 monitoring_bus.send_record_nowait(record) 

96 

97 

98Func = TypeVar("Func", bound=Callable) # type: ignore[type-arg] 

99 

100LOGGER = logging.getLogger(__name__) 

101 

102 

103class DurationMetric: 

104 """Provides a decorator and a context manager to measure execution duration.""" 

105 

106 def __init__(self, metric_name: str, instance_name: str = "", instanced: bool = False) -> None: 

107 self._metric_name = metric_name 

108 self._instance_name = instance_name 

109 self._instanced = instanced 

110 

111 self._start_time: Optional[float] = None 

112 

113 @property 

114 def instanced(self) -> bool: 

115 return self._instanced 

116 

117 @instanced.setter 

118 def instanced(self, value: bool) -> None: 

119 self._instanced = value 

120 

121 @property 

122 def instance_name(self) -> str: 

123 return self._instance_name 

124 

125 @instance_name.setter 

126 def instance_name(self, value: str) -> None: 

127 self._instance_name = value 

128 

129 def __call__(self, func: Func) -> Func: 

130 @functools.wraps(func) 

131 def _timer_wrapper(obj: Any, *args: Any, **kwargs: Any) -> Any: 

132 if self._instanced: 

133 if obj._instance_name is not None: 

134 self._instance_name = obj._instance_name 

135 try: 

136 start_time = time.perf_counter() 

137 except Exception: 

138 LOGGER.exception(f"Error raised while starting timing metric [{self._metric_name}]") 

139 

140 value = func(obj, *args, **kwargs) 

141 

142 try: 

143 self._stop_timer_and_submit(start_time) 

144 except Exception: 

145 LOGGER.exception(f"Error raised while timing metric [{self._metric_name}]") 

146 return value 

147 

148 return cast(Func, _timer_wrapper) 

149 

150 def __enter__(self) -> "DurationMetric": 

151 try: 

152 self._start_time = time.perf_counter() 

153 except Exception: 

154 LOGGER.exception(f"Error raised while entering timing metric [{self._metric_name}]") 

155 return self 

156 

157 def __exit__( 

158 self, exception_type: Optional[Type[BaseException]], exception_value: Optional[BaseException], traceback: Any 

159 ) -> None: 

160 try: 

161 assert self._start_time 

162 self._stop_timer_and_submit(self._start_time) 

163 except Exception: 

164 LOGGER.exception(f"Error raised while stopping timing metric [{self._metric_name}] in exit") 

165 finally: 

166 self._start_time = None 

167 

168 def _stop_timer_and_submit(self, start_time: float) -> None: 

169 monitoring_bus = get_monitoring_bus() 

170 if self._instanced and self._instance_name is None: 

171 self._instanced = False 

172 

173 run_time = timedelta(seconds=time.perf_counter() - start_time) 

174 

175 metadata = None 

176 if self._instanced: 

177 metadata = {"instance-name": self._instance_name} 

178 record = create_timer_record(self._metric_name, run_time, metadata) 

179 monitoring_bus.send_record_nowait(record) 

180 

181 

182def generator_method_duration_metric(name: str) -> Callable[[Func], Func]: 

183 """Helper function to publish a metric for the duration of a generator method. 

184 

185 This returns a decorator which publishes a duration metric which measures the 

186 execution time of the decorated **generator method**. 

187 

188 This is separate from the ``__call__`` method of ``DurationMetric`` to keep the 

189 code in that method a bit more readable whilst still having acceptable return 

190 values, as well as to make the difference between the two approaches clear. 

191 

192 Usage example 

193 .. code:: python 

194 

195 class ExampleInstance: 

196 

197 @generator_method_duration_metric(EXAMPLE_METHOD_DURATION_NAME) 

198 def example_method(self, digests, context): 

199 for digest in digests: 

200 yield self._do_something(digests) 

201 

202 Args: 

203 name (str): The metric name to publish the method duration under. 

204 

205 """ 

206 

207 def decorator(func: Func) -> Func: 

208 @functools.wraps(func) 

209 def wrapped_generator_method(obj: Any, *args: Any, **kwargs: Any) -> Any: 

210 instance_name = getattr(obj, "_instance_name", None) 

211 with DurationMetric(name) as metric_recorder: 

212 if instance_name is not None: 

213 metric_recorder.instanced = True 

214 metric_recorder.instance_name = instance_name 

215 yield from func(obj, *args, **kwargs) 

216 

217 return cast(Func, wrapped_generator_method) 

218 

219 return decorator 

220 

221 

222class Counter: 

223 """Provides a generic metric counter. Optionally/Ideally used as a context manager. 

224 Example Usage: 

225 

226 with Counter("count-size") as size_counter: 

227 for i in range(10): 

228 size_counter.increment(i) 

229 """ 

230 

231 def __init__(self, metric_name: str, instance_name: Optional[str] = None) -> None: 

232 self._metric_name = metric_name 

233 self._instance_name = instance_name 

234 self._count = 0.0 

235 self._counter_lock = threading.Lock() 

236 

237 @property 

238 def count(self) -> float: 

239 return self._count 

240 

241 @count.setter 

242 def count(self, value: float) -> None: 

243 with self._counter_lock: 

244 self._count = value 

245 

246 @property 

247 def metric_name(self) -> str: 

248 return self._metric_name 

249 

250 @property 

251 def instance_name(self) -> Optional[str]: 

252 return self._instance_name 

253 

254 @instance_name.setter 

255 def instance_name(self, name: str) -> None: 

256 with self._counter_lock: 

257 self._instance_name = name 

258 

259 def __enter__(self) -> "Counter": 

260 return self 

261 

262 def __exit__( 

263 self, exception_type: Optional[Type[BaseException]], exception_value: Optional[BaseException], traceback: Any 

264 ) -> None: 

265 if exception_type is None: 

266 with self._counter_lock: 

267 self.publish() 

268 

269 def increment(self, value: float = 1.0) -> None: 

270 with self._counter_lock: 

271 self._count += value 

272 

273 def publish(self, reset_counter: bool = True) -> None: 

274 monitoring_bus = get_monitoring_bus() 

275 

276 metadata = None 

277 if self._instance_name is not None: 

278 metadata = {"instance-name": self._instance_name} 

279 

280 record = create_counter_record(self._metric_name, self._count, metadata) 

281 monitoring_bus.send_record_nowait(record) 

282 if reset_counter: 

283 self._count = 0.0 

284 

285 

286class ExceptionCounter(Counter): 

287 """Provides a decorator and context manager in order to count exceptions thrown in a function/method body. 

288 This class inherits from Counter, publishing a value of 1, using the base classes methods. 

289 Example Usage: 

290 

291 with ExceptionCounter("test", exceptions=(RuntimeError,), ignored_exceptions=(NotFoundError,)) as ec: 

292 ret_val = do_work() 

293 """ 

294 

295 def __init__( 

296 self, 

297 metric_name: str, 

298 *args: Any, 

299 exceptions: Tuple[Type[Exception], ...] = (BgdError,), 

300 ignored_exceptions: Optional[Tuple[Type[Exception], ...]] = None, 

301 **kwargs: Any, 

302 ): 

303 super().__init__(metric_name, *args, **kwargs) 

304 

305 self._exceptions = exceptions 

306 self._ignored_exceptions: Tuple[Type[Exception], ...] = () 

307 

308 if ignored_exceptions: 

309 self._ignored_exceptions = ignored_exceptions 

310 

311 # Increment the counter to 1, publishing will occur on every exception caught. 

312 self.increment() 

313 

314 def __exit__( 

315 self, exception_type: Optional[Type[BaseException]], exception_value: Optional[BaseException], traceback: Any 

316 ) -> None: 

317 if exception_value is not None: 

318 for ignored_exception in self._ignored_exceptions: 

319 if isinstance(exception_value, ignored_exception): 

320 return 

321 for exception in self._exceptions: 

322 if isinstance(exception_value, exception): 

323 self.publish() 

324 return 

325 

326 def __call__(self, func: Func) -> Func: 

327 @functools.wraps(func) 

328 def _exception_wrapper(obj: Any, *args: Any, **kwargs: Any) -> Any: 

329 try: 

330 return func(obj, *args, **kwargs) 

331 except self._ignored_exceptions as e: 

332 raise e 

333 except self._exceptions as e: 

334 with self._counter_lock: 

335 if hasattr(obj, "_instance_name"): 

336 self._instance_name = obj._instance_name 

337 try: 

338 self.publish(reset_counter=False) 

339 except Exception: 

340 LOGGER.exception( 

341 f"Expection raised when publishing \ 

342 exception metric of type: {type(e)}." 

343 ) 

344 raise e 

345 

346 return cast(Func, _exception_wrapper) 

347 

348 

349def generator_method_exception_counter( 

350 name: str, 

351 exceptions: Tuple[Type[Exception]] = (BgdError,), 

352 ignored_exceptions: Optional[Tuple[Type[Exception], ...]] = None, 

353) -> Callable[[Func], Func]: 

354 """Helper function to publish a counter when an exception is raised by a generator method. 

355 

356 This returns a decorator which publishes a counter metric which measures the 

357 number of exceptions raised by the decorated **generator method**. 

358 

359 This is separate from the ``__call__`` method of ``ExceptionCounter`` to keep the 

360 code in that method a bit more readable whilst still having acceptable return 

361 values, as well as to make the difference between the two approaches clear. 

362 

363 Usage example 

364 .. code:: python 

365 

366 class ExampleInstance: 

367 

368 @generator_method_exception_counter(EXAMPLE_METHOD_EXCEPTION_COUNT_NAME) 

369 def example_method(self, digests, context): 

370 for digest in digests: 

371 yield self._do_something(digests) 

372 

373 Args: 

374 name (str): The metric name to publish the exception count under. 

375 exceptions (tuple): Tuple of Exception types to count. Defaults to ``BgdError``. 

376 ignored_exceptions (tuple): Tuple of Exception types to ignore counting of. Defaults to ``None``. 

377 

378 """ 

379 

380 def decorator(func: Func) -> Func: 

381 @functools.wraps(func) 

382 def wrapped_generator_method(obj: Any, *args: Any, **kwargs: Any) -> Any: 

383 with ExceptionCounter(name, exceptions=exceptions, ignored_exceptions=ignored_exceptions): 

384 yield from func(obj, *args, **kwargs) 

385 

386 return cast(Func, wrapped_generator_method) 

387 

388 return decorator 

389 

390 

391class Distribution(Counter): 

392 """Provides a generic metric using Distribution semantics""" 

393 

394 def __init__(self, metric_name: str, instance_name: str = "") -> None: 

395 super().__init__(metric_name, instance_name) 

396 

397 def publish(self, reset_counter: bool = True) -> None: 

398 monitoring_bus = get_monitoring_bus() 

399 

400 metadata = {"instance-name": self._instance_name} if self._instance_name else None 

401 record = create_distribution_record(self._metric_name, self._count, metadata) 

402 monitoring_bus.send_record_nowait(record) 

403 if reset_counter: 

404 self._count = 0.0