Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/cleanup.py: 68.31%

142 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-28 16:48 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import threading 

17import time 

18from contextlib import ExitStack 

19from datetime import datetime, timezone 

20from typing import Any 

21 

22from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

23from buildgrid.server.app.settings.config import CleanupConfig 

24from buildgrid.server.cas.storage.index.index_abc import IndexABC 

25from buildgrid.server.context import instance_context 

26from buildgrid.server.logging import buildgrid_logger 

27from buildgrid.server.metrics_names import METRIC 

28from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric, timer 

29from buildgrid.server.monitoring import get_monitoring_bus 

30from buildgrid.server.threading import ContextThreadPoolExecutor, ContextWorker 

31 

32LOGGER = buildgrid_logger(__name__) 

33 

34 

35def _digests_str(digests: list[Digest]) -> str: 

36 return f"{len(digests)} digests ({sum(d.size_bytes for d in digests)} bytes)" 

37 

38 

39class CASCleanUp: 

40 """Creates a CAS cleanup service.""" 

41 

42 def __init__( 

43 self, 

44 dry_run: bool, 

45 sleep_interval: int, 

46 cleanup_configs: list[CleanupConfig], 

47 monitor: bool, 

48 ) -> None: 

49 self._stack = ExitStack() 

50 self._dry_run = dry_run 

51 self._sleep_interval = sleep_interval 

52 

53 self._cleanup_configs = cleanup_configs 

54 

55 self._is_instrumented = monitor 

56 

57 # --- Public API --- 

58 

59 def start(self, timeout: float | None = None) -> None: 

60 """Start cleanup service""" 

61 if self._is_instrumented: 

62 self._stack.enter_context(get_monitoring_bus()) 

63 worker = self._stack.enter_context(ContextWorker(self._begin_cleanup, "CleanUpLauncher")) 

64 worker.wait(timeout=timeout) 

65 

66 def stop(self, *args: Any, **kwargs: Any) -> None: 

67 """Stops the cleanup service""" 

68 LOGGER.info("Stopping Cleanup Service.") 

69 self._stack.close() 

70 

71 def _begin_cleanup(self, stop_requested: threading.Event) -> None: 

72 if self._dry_run: 

73 for options in self._cleanup_configs: 

74 self._calculate_cleanup(options) 

75 return 

76 

77 with ContextThreadPoolExecutor(max_workers=len(self._cleanup_configs)) as ex: 

78 futures = { 

79 config.name: ex.submit(self._retry_cleanup, config, stop_requested) for config in self._cleanup_configs 

80 } 

81 

82 for instance_name, future in futures.items(): 

83 try: 

84 future.result() 

85 except Exception: 

86 LOGGER.error("Cleanup exceeded retry limit.", tags=dict(instance_name=instance_name)) 

87 

88 def _retry_cleanup(self, config: CleanupConfig, stop_requested: threading.Event) -> None: 

89 attempts = 0 

90 while attempts <= config.retry_limit: 

91 try: 

92 self._cleanup_worker(config, stop_requested) 

93 LOGGER.info("Cleanup completed.", tags=dict(instance_name=config.name)) 

94 break 

95 except Exception: 

96 LOGGER.exception("Cleanup failed.", tags=dict(instance_name=config.name)) 

97 

98 # Exponential backoff before retrying 

99 sleep_time = 1.6**attempts 

100 LOGGER.info("Retrying Cleanup after delay...", tags=dict(sleep_time_seconds=sleep_time)) 

101 stop_requested.wait(timeout=sleep_time) 

102 attempts += 1 

103 continue 

104 

105 def _calculate_cleanup(self, config: CleanupConfig) -> None: 

106 """Work out which blobs will be deleted by the cleanup command.""" 

107 instance_name = config.name 

108 with instance_context(instance_name): 

109 LOGGER.info("Cleanup dry run.", tags=dict(instance_name=instance_name)) 

110 index = config.index 

111 only_delete_before = datetime.now(timezone.utc) - config.only_if_unused_for 

112 large_blob_only_delete_before = ( 

113 datetime.now(timezone.utc) - config.large_blob_lifetime if config.large_blob_lifetime else None 

114 ) 

115 total_size = index.get_total_size() 

116 LOGGER.info( 

117 "Calculated CAS size.", 

118 tags=dict( 

119 total_size=total_size, 

120 high_watermark_bytes=config.high_watermark, 

121 low_watermark_bytes=config.low_watermark, 

122 ), 

123 ) 

124 if total_size >= config.high_watermark: 

125 required_space = total_size - config.low_watermark 

126 cleared_space = index.delete_n_bytes( 

127 required_space, 

128 dry_run=True, 

129 protect_blobs_after=only_delete_before, 

130 large_blob_threshold=config.large_blob_threshold, 

131 large_blob_lifetime=large_blob_only_delete_before, 

132 ) 

133 LOGGER.info(f"Determined {cleared_space} of the requested {required_space} bytes would be deleted.") 

134 else: 

135 LOGGER.info(f"Total size {total_size} is less than the high water mark, " f"nothing will be deleted.") 

136 

137 def _do_cleanup_batch( 

138 self, 

139 index: IndexABC, 

140 only_delete_before: datetime, 

141 batch_size: int, 

142 large_blob_only_delete_before: datetime | None, 

143 large_blob_threshold: int | None, 

144 ) -> int: 

145 batch_start_time = time.time() 

146 

147 LOGGER.info("Deleting bytes from the index.", tags=dict(batch_size=batch_size)) 

148 bytes_deleted = index.delete_n_bytes( 

149 batch_size, 

150 protect_blobs_after=only_delete_before, 

151 large_blob_threshold=large_blob_threshold, 

152 large_blob_lifetime=large_blob_only_delete_before, 

153 ) 

154 

155 LOGGER.info("Bulk deleted bytes from index.", tags=dict(bytes_deleted=bytes_deleted)) 

156 

157 if self._is_instrumented and bytes_deleted > 0: 

158 batch_duration = time.time() - batch_start_time 

159 bytes_deleted_per_second = bytes_deleted / batch_duration 

160 publish_gauge_metric(METRIC.CLEANUP.BYTES_DELETED_PER_SECOND, bytes_deleted_per_second) 

161 publish_counter_metric(METRIC.CLEANUP.BYTES_DELETED_COUNT, bytes_deleted) 

162 return bytes_deleted 

163 

164 def _cleanup_worker(self, cleanup_config: CleanupConfig, stop_requested: threading.Event) -> None: 

165 """Cleanup when full""" 

166 instance_name = cleanup_config.name 

167 with instance_context(instance_name): 

168 index = cleanup_config.index 

169 LOGGER.info("Cleanup started.", tags=dict(instance_name=instance_name)) 

170 

171 while not stop_requested.is_set(): 

172 # When first starting a loop, we will also include any remaining delete markers as part of 

173 # the total size. 

174 total_size = index.get_total_size() 

175 self.publish_total_size_metric(total_size, cleanup_config.high_watermark, cleanup_config.low_watermark) 

176 

177 blob_count = index.get_blob_count() 

178 blob_count_range: range | None = None 

179 if ( 

180 cleanup_config.high_blob_count_watermark is not None 

181 and cleanup_config.low_blob_count_watermark is not None 

182 ): 

183 blob_count_range = range( 

184 cleanup_config.low_blob_count_watermark, 

185 cleanup_config.high_blob_count_watermark, 

186 ) 

187 self.publish_blob_count_metric(blob_count, blob_count_range) 

188 

189 if total_size >= cleanup_config.high_watermark or ( 

190 blob_count_range is not None and blob_count >= blob_count_range.stop 

191 ): 

192 bytes_to_delete = total_size - cleanup_config.low_watermark 

193 if bytes_to_delete > 0: 

194 LOGGER.info( 

195 "High bytes watermark exceeded. Deleting items from storage/index.", 

196 tags=dict( 

197 total_bytes=total_size, 

198 min_bytes_to_delete=bytes_to_delete, 

199 instance_name=instance_name, 

200 ), 

201 ) 

202 if blob_count_range is not None: 

203 blobs_to_delete = blob_count - blob_count_range.start 

204 if blobs_to_delete > 0: 

205 LOGGER.info( 

206 "High blob count watermark exceeded. Deleting items from storage/index.", 

207 tags=dict( 

208 total_blobs=blob_count, 

209 min_blobs_to_delete=blobs_to_delete, 

210 instance_name=instance_name, 

211 ), 

212 ) 

213 

214 with timer(METRIC.CLEANUP.DURATION): 

215 while not stop_requested.is_set() and ( 

216 total_size > cleanup_config.low_watermark 

217 or (blob_count_range is not None and blob_count > blob_count_range.start) 

218 ): 

219 only_delete_before = datetime.now(timezone.utc) - cleanup_config.only_if_unused_for 

220 large_blob_only_delete_before = ( 

221 datetime.now(timezone.utc) - cleanup_config.large_blob_lifetime 

222 if cleanup_config.large_blob_lifetime is not None 

223 else None 

224 ) 

225 with timer(METRIC.CLEANUP.BATCH_DURATION): 

226 bytes_deleted = self._do_cleanup_batch( 

227 index=index, 

228 only_delete_before=only_delete_before, 

229 batch_size=cleanup_config.batch_size, 

230 large_blob_threshold=cleanup_config.large_blob_threshold, 

231 large_blob_only_delete_before=large_blob_only_delete_before, 

232 ) 

233 if not bytes_deleted: 

234 err = "Marked 0 digests for deletion, even though cleanup was triggered." 

235 if total_size >= cleanup_config.high_watermark: 

236 LOGGER.error(f"{err} Total size still remains greater than high watermark!") 

237 elif blob_count_range is not None and blob_count >= blob_count_range.stop: 

238 LOGGER.error(f"{err} Blob count remains greater than high watermark!") 

239 else: 

240 LOGGER.warning(err) 

241 stop_requested.wait( 

242 timeout=self._sleep_interval 

243 ) # Avoid a busy loop when we can't make progress 

244 total_size = index.get_total_size() 

245 blob_count = index.get_blob_count() 

246 self.publish_total_size_metric( 

247 total_size, cleanup_config.high_watermark, cleanup_config.low_watermark 

248 ) 

249 self.publish_blob_count_metric(blob_count, blob_count_range) 

250 LOGGER.info("Finished cleanup batch.", tags=dict(non_stale_total_bytes=total_size)) 

251 

252 LOGGER.info("Finished cleanup.", tags=dict(total_bytes=total_size)) 

253 

254 stop_requested.wait(timeout=self._sleep_interval) 

255 

256 def publish_total_size_metric(self, total_size: int, high_watermark: int, low_watermark: int) -> None: 

257 if self._is_instrumented: 

258 high_watermark_percentage = float((total_size / high_watermark) * 100) if high_watermark > 0 else 0 

259 publish_gauge_metric(METRIC.CLEANUP.TOTAL_BYTES_COUNT, total_size) 

260 publish_gauge_metric(METRIC.CLEANUP.LOW_WATERMARK_BYTES_COUNT, low_watermark) 

261 publish_gauge_metric(METRIC.CLEANUP.HIGH_WATERMARK_BYTES_COUNT, high_watermark) 

262 publish_gauge_metric(METRIC.CLEANUP.TOTAL_BYTES_WATERMARK_PERCENT, high_watermark_percentage) 

263 

264 def publish_blob_count_metric(self, blob_count: int, blob_count_range: range | None) -> None: 

265 if self._is_instrumented: 

266 publish_gauge_metric(METRIC.CLEANUP.TOTAL_BLOBS_COUNT, blob_count) 

267 if blob_count_range is not None: 

268 if blob_count_range.stop > 0: 

269 high_watermark_percentage = float((blob_count / blob_count_range.stop) * 100) 

270 else: 

271 high_watermark_percentage = 0 

272 publish_gauge_metric(METRIC.CLEANUP.LOW_WATERMARK_BLOBS_COUNT, blob_count_range.start) 

273 publish_gauge_metric(METRIC.CLEANUP.HIGH_WATERMARK_BLOBS_COUNT, blob_count_range.stop) 

274 publish_gauge_metric(METRIC.CLEANUP.TOTAL_BLOBS_WATERMARK_PERCENT, high_watermark_percentage)