Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/metrics_utils.py: 89.37%

207 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import functools 

16import logging 

17import threading 

18import time 

19from typing import Dict, Optional, Tuple, Type 

20from datetime import timedelta 

21 

22from buildgrid._enums import MetricRecordType 

23from buildgrid._protos.buildgrid.v2.monitoring_pb2 import MetricRecord 

24from buildgrid.server.monitoring import get_monitoring_bus 

25from buildgrid._exceptions import BgdError 

26 

27 

28def create_counter_record(name: str, count: float, metadata: Dict = None) -> MetricRecord: 

29 counter_record = MetricRecord() 

30 

31 counter_record.creation_timestamp.GetCurrentTime() 

32 counter_record.type = MetricRecordType.COUNTER.value 

33 counter_record.name = name 

34 counter_record.count = count 

35 if metadata is not None: 

36 counter_record.metadata.update(metadata) 

37 

38 return counter_record 

39 

40 

41def create_gauge_record(name: str, value: float, metadata: Dict = None) -> MetricRecord: 

42 gauge_record = MetricRecord() 

43 

44 gauge_record.creation_timestamp.GetCurrentTime() 

45 gauge_record.type = MetricRecordType.GAUGE.value 

46 gauge_record.name = name 

47 gauge_record.value = value 

48 if metadata is not None: 

49 gauge_record.metadata.update(metadata) 

50 

51 return gauge_record 

52 

53 

54def create_timer_record(name: str, duration: timedelta, metadata=None) -> MetricRecord: 

55 timer_record = MetricRecord() 

56 

57 timer_record.creation_timestamp.GetCurrentTime() 

58 timer_record.type = MetricRecordType.TIMER.value 

59 timer_record.name = name 

60 timer_record.duration.FromTimedelta(duration) 

61 if metadata is not None: 

62 timer_record.metadata.update(metadata) 

63 

64 return timer_record 

65 

66 

67def create_distribution_record(name: str, value: float, metadata=None) -> MetricRecord: 

68 dist_record = MetricRecord() 

69 

70 dist_record.creation_timestamp.GetCurrentTime() 

71 dist_record.type = MetricRecordType.DISTRIBUTION.value 

72 dist_record.name = name 

73 dist_record.count = value 

74 if metadata is not None: 

75 dist_record.metadata.update(metadata) 

76 

77 return dist_record 

78 

79 

80def publish_counter_metric(name: str, count: float, metadata: Dict = None) -> None: 

81 record = create_counter_record(name, count, metadata) 

82 monitoring_bus = get_monitoring_bus() 

83 monitoring_bus.send_record_nowait(record) 

84 

85 

86def publish_gauge_metric(name: str, value: float, metadata: Dict = None) -> None: 

87 record = create_gauge_record(name, value, metadata) 

88 monitoring_bus = get_monitoring_bus() 

89 monitoring_bus.send_record_nowait(record) 

90 

91 

92def publish_timer_metric(name: str, duration: timedelta, metadata: Dict = None) -> None: 

93 record = create_timer_record(name, duration, metadata) 

94 monitoring_bus = get_monitoring_bus() 

95 monitoring_bus.send_record_nowait(record) 

96 

97 

98class DurationMetric: 

99 

100 """ Provides a decorator and a context manager to measure execution duration. """ 

101 

102 def __init__(self, metric_name: str, 

103 instance_name: str = '', 

104 instanced: bool = False): 

105 self._logger = logging.getLogger(__name__) 

106 

107 self._metric_name = metric_name 

108 self._instance_name = instance_name 

109 self._instanced = instanced 

110 

111 self._start_time = None 

112 

113 @property 

114 def instanced(self): 

115 return self._instanced 

116 

117 @instanced.setter 

118 def instanced(self, value: bool): 

119 self._instanced = value 

120 

121 @property 

122 def instance_name(self): 

123 return self._instance_name 

124 

125 @instance_name.setter 

126 def instance_name(self, value: str): 

127 self._instance_name = value 

128 

129 def __call__(self, func): 

130 @functools.wraps(func) 

131 def _timer_wrapper(obj, *args, **kwargs): 

132 if self._instanced: 

133 if obj._instance_name is not None: 

134 self._instance_name = obj._instance_name 

135 try: 

136 start_time = time.perf_counter() 

137 except Exception: 

138 self._logger.exception(f"Error raised while starting timing metric [{self._metric_name}]") 

139 

140 value = func(obj, *args, **kwargs) 

141 

142 try: 

143 self._stop_timer_and_submit(start_time) 

144 except Exception: 

145 self._logger.exception(f"Error raised while timing metric [{self._metric_name}]") 

146 return value 

147 return _timer_wrapper 

148 

149 def __enter__(self): 

150 try: 

151 self._start_time = time.perf_counter() 

152 except Exception: 

153 self._logger.exception(f"Error raised while entering timing metric [{self._metric_name}]") 

154 return self 

155 

156 def __exit__(self, exception_type, exception_value, traceback): 

157 try: 

158 self._stop_timer_and_submit(self._start_time) 

159 except Exception: 

160 self._logger.exception(f"Error raised while stopping timing metric [{self._metric_name}] in exit") 

161 finally: 

162 self._start_time = None 

163 

164 def _stop_timer_and_submit(self, start_time): 

165 monitoring_bus = get_monitoring_bus() 

166 if self._instanced and self._instance_name is None: 

167 self._instanced = False 

168 

169 run_time = timedelta(seconds=time.perf_counter() - start_time) 

170 

171 metadata = None 

172 if self._instanced: 

173 metadata = {'instance-name': self._instance_name} 

174 record = create_timer_record( 

175 self._metric_name, run_time, metadata) 

176 monitoring_bus.send_record_nowait(record) 

177 

178 

179def generator_method_duration_metric(name: str): 

180 """Helper function to publish a metric for the duration of a generator method. 

181 

182 This returns a decorator which publishes a duration metric which measures the 

183 execution time of the decorated **generator method**. 

184 

185 This is separate from the ``__call__`` method of ``DurationMetric`` to keep the 

186 code in that method a bit more readable whilst still having acceptable return 

187 values, as well as to make the difference between the two approaches clear. 

188 

189 Usage example 

190 .. code:: python 

191 

192 class ExampleInstance: 

193 

194 @generator_method_duration_metric(EXAMPLE_METHOD_DURATION_NAME) 

195 def example_method(self, digests, context): 

196 for digest in digests: 

197 yield self._do_something(digests) 

198 

199 Args: 

200 name (str): The metric name to publish the method duration under. 

201 

202 """ 

203 def decorator(func): 

204 @functools.wraps(func) 

205 def wrapped_generator_method(obj, *args, **kwargs): 

206 instance_name = getattr(obj, '_instance_name', None) 

207 with DurationMetric(name) as metric_recorder: 

208 if instance_name is not None: 

209 metric_recorder.instanced = True 

210 metric_recorder.instance_name = instance_name 

211 yield from func(obj, *args, **kwargs) 

212 return wrapped_generator_method 

213 return decorator 

214 

215 

216class Counter(): 

217 """ Provides a generic metric counter. Optionally/Ideally used as a context manager. 

218 Example Usage: 

219 

220 with Counter("count-size") as size_counter: 

221 for i in range(10): 

222 size_counter.increment(i) 

223 """ 

224 

225 def __init__(self, metric_name: str, instance_name: Optional[str] = None): 

226 self._metric_name = metric_name 

227 self._instance_name = instance_name 

228 self._count = 0.0 

229 self._counter_lock = threading.Lock() 

230 

231 @property 

232 def count(self) -> float: 

233 return self._count 

234 

235 @count.setter 

236 def count(self, value: float) -> None: 

237 with self._counter_lock: 

238 self._count = value 

239 

240 @property 

241 def metric_name(self) -> str: 

242 return self._metric_name 

243 

244 @property 

245 def instance_name(self) -> Optional[str]: 

246 return self._instance_name 

247 

248 @instance_name.setter 

249 def instance_name(self, name: str) -> None: 

250 with self._counter_lock: 

251 self._instance_name = name 

252 

253 def __enter__(self): 

254 return self 

255 

256 def __exit__(self, exception_type, exception_value, traceback): 

257 if exception_type is None: 

258 with self._counter_lock: 

259 self.publish() 

260 

261 def increment(self, value: float = 1.0) -> None: 

262 with self._counter_lock: 

263 self._count += value 

264 

265 def publish(self, reset_counter=True) -> None: 

266 monitoring_bus = get_monitoring_bus() 

267 

268 metadata = None 

269 if self._instance_name is not None: 

270 metadata = {'instance-name': self._instance_name} 

271 

272 record = create_counter_record(self._metric_name, self._count, metadata) 

273 monitoring_bus.send_record_nowait(record) 

274 if reset_counter: 

275 self._count = 0.0 

276 

277 

278class ExceptionCounter(Counter): 

279 """ Provides a decorator and context manager in order to count exceptions thrown in a function/method body. 

280 This class inherits from Counter, publishing a value of 1, using the base classes methods. 

281 Example Usage: 

282 

283 with ExceptionCounter("test", exceptions=(RuntimeError,)) as ec: 

284 ret_val = do_work() 

285 """ 

286 

287 def __init__(self, metric_name: str, *args, 

288 exceptions: Tuple[Type[Exception], ...] = (BgdError,), **kwargs): 

289 

290 super().__init__(metric_name, *args, **kwargs) 

291 

292 self._exceptions = exceptions 

293 

294 # Increment the counter to 1, publishing will occur on every exception caught. 

295 self.increment() 

296 

297 def __exit__(self, exception_type, exception_value, traceback): 

298 if exception_value is not None: 

299 for exception in self._exceptions: 

300 if isinstance(exception_value, exception): 

301 self.publish() 

302 return 

303 

304 def __call__(self, func): 

305 @functools.wraps(func) 

306 def _exception_wrapper(obj, *args, **kwargs): 

307 try: 

308 return func(obj, *args, **kwargs) 

309 except self._exceptions as e: 

310 with self._counter_lock: 

311 if hasattr(obj, '_instance_name'): 

312 self._instance_name = obj._instance_name 

313 try: 

314 self.publish(reset_counter=False) 

315 except Exception: 

316 logging.getLogger(__name__).exception(f"Expection raised when publishing \ 

317 exception metric of type: {type(e)}.") 

318 raise e 

319 return _exception_wrapper 

320 

321 

322def generator_method_exception_counter(name: str, exceptions: Tuple[Type[Exception]] = (BgdError,)): 

323 """Helper function to publish a counter when an exception is raised by a generator method. 

324 

325 This returns a decorator which publishes a counter metric which measures the 

326 number of exceptions raised by the decorated **generator method**. 

327 

328 This is separate from the ``__call__`` method of ``ExceptionCounter`` to keep the 

329 code in that method a bit more readable whilst still having acceptable return 

330 values, as well as to make the difference between the two approaches clear. 

331 

332 Usage example 

333 .. code:: python 

334 

335 class ExampleInstance: 

336 

337 @generator_method_exception_counter(EXAMPLE_METHOD_EXCEPTION_COUNT_NAME) 

338 def example_method(self, digests, context): 

339 for digest in digests: 

340 yield self._do_something(digests) 

341 

342 Args: 

343 name (str): The metric name to publish the exception count under. 

344 exceptions (tuple): Tuple of Exception types to count. Defaults to ``BgdError``. 

345 

346 """ 

347 def decorator(func): 

348 @functools.wraps(func) 

349 def wrapped_generator_method(obj, *args, **kwargs): 

350 with ExceptionCounter(name, exceptions=exceptions): 

351 yield from func(obj, *args, **kwargs) 

352 return wrapped_generator_method 

353 return decorator 

354 

355 

356class Distribution(Counter): 

357 """ Provides a generic metric using Distribution semantics """ 

358 

359 def __init__(self, metric_name: str, instance_name: str = ""): 

360 super().__init__(metric_name, instance_name) 

361 

362 def publish(self, reset_counter=True) -> None: 

363 monitoring_bus = get_monitoring_bus() 

364 

365 metadata = {'instance-name': self._instance_name} if self._instance_name else None 

366 record = create_distribution_record(self._metric_name, self._count, metadata) 

367 monitoring_bus.send_record_nowait(record) 

368 if reset_counter: 

369 self._count = 0.0