Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/cleanup.py: 68.31%
142 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-28 16:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-28 16:48 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import threading
17import time
18from contextlib import ExitStack
19from datetime import datetime, timezone
20from typing import Any
22from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
23from buildgrid.server.app.settings.config import CleanupConfig
24from buildgrid.server.cas.storage.index.index_abc import IndexABC
25from buildgrid.server.context import instance_context
26from buildgrid.server.logging import buildgrid_logger
27from buildgrid.server.metrics_names import METRIC
28from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric, timer
29from buildgrid.server.monitoring import get_monitoring_bus
30from buildgrid.server.threading import ContextThreadPoolExecutor, ContextWorker
32LOGGER = buildgrid_logger(__name__)
35def _digests_str(digests: list[Digest]) -> str:
36 return f"{len(digests)} digests ({sum(d.size_bytes for d in digests)} bytes)"
39class CASCleanUp:
40 """Creates a CAS cleanup service."""
42 def __init__(
43 self,
44 dry_run: bool,
45 sleep_interval: int,
46 cleanup_configs: list[CleanupConfig],
47 monitor: bool,
48 ) -> None:
49 self._stack = ExitStack()
50 self._dry_run = dry_run
51 self._sleep_interval = sleep_interval
53 self._cleanup_configs = cleanup_configs
55 self._is_instrumented = monitor
57 # --- Public API ---
59 def start(self, timeout: float | None = None) -> None:
60 """Start cleanup service"""
61 if self._is_instrumented:
62 self._stack.enter_context(get_monitoring_bus())
63 worker = self._stack.enter_context(ContextWorker(self._begin_cleanup, "CleanUpLauncher"))
64 worker.wait(timeout=timeout)
66 def stop(self, *args: Any, **kwargs: Any) -> None:
67 """Stops the cleanup service"""
68 LOGGER.info("Stopping Cleanup Service.")
69 self._stack.close()
71 def _begin_cleanup(self, stop_requested: threading.Event) -> None:
72 if self._dry_run:
73 for options in self._cleanup_configs:
74 self._calculate_cleanup(options)
75 return
77 with ContextThreadPoolExecutor(max_workers=len(self._cleanup_configs)) as ex:
78 futures = {
79 config.name: ex.submit(self._retry_cleanup, config, stop_requested) for config in self._cleanup_configs
80 }
82 for instance_name, future in futures.items():
83 try:
84 future.result()
85 except Exception:
86 LOGGER.error("Cleanup exceeded retry limit.", tags=dict(instance_name=instance_name))
88 def _retry_cleanup(self, config: CleanupConfig, stop_requested: threading.Event) -> None:
89 attempts = 0
90 while attempts <= config.retry_limit:
91 try:
92 self._cleanup_worker(config, stop_requested)
93 LOGGER.info("Cleanup completed.", tags=dict(instance_name=config.name))
94 break
95 except Exception:
96 LOGGER.exception("Cleanup failed.", tags=dict(instance_name=config.name))
98 # Exponential backoff before retrying
99 sleep_time = 1.6**attempts
100 LOGGER.info("Retrying Cleanup after delay...", tags=dict(sleep_time_seconds=sleep_time))
101 stop_requested.wait(timeout=sleep_time)
102 attempts += 1
103 continue
105 def _calculate_cleanup(self, config: CleanupConfig) -> None:
106 """Work out which blobs will be deleted by the cleanup command."""
107 instance_name = config.name
108 with instance_context(instance_name):
109 LOGGER.info("Cleanup dry run.", tags=dict(instance_name=instance_name))
110 index = config.index
111 only_delete_before = datetime.now(timezone.utc) - config.only_if_unused_for
112 large_blob_only_delete_before = (
113 datetime.now(timezone.utc) - config.large_blob_lifetime if config.large_blob_lifetime else None
114 )
115 total_size = index.get_total_size()
116 LOGGER.info(
117 "Calculated CAS size.",
118 tags=dict(
119 total_size=total_size,
120 high_watermark_bytes=config.high_watermark,
121 low_watermark_bytes=config.low_watermark,
122 ),
123 )
124 if total_size >= config.high_watermark:
125 required_space = total_size - config.low_watermark
126 cleared_space = index.delete_n_bytes(
127 required_space,
128 dry_run=True,
129 protect_blobs_after=only_delete_before,
130 large_blob_threshold=config.large_blob_threshold,
131 large_blob_lifetime=large_blob_only_delete_before,
132 )
133 LOGGER.info(f"Determined {cleared_space} of the requested {required_space} bytes would be deleted.")
134 else:
135 LOGGER.info(f"Total size {total_size} is less than the high water mark, " f"nothing will be deleted.")
137 def _do_cleanup_batch(
138 self,
139 index: IndexABC,
140 only_delete_before: datetime,
141 batch_size: int,
142 large_blob_only_delete_before: datetime | None,
143 large_blob_threshold: int | None,
144 ) -> int:
145 batch_start_time = time.time()
147 LOGGER.info("Deleting bytes from the index.", tags=dict(batch_size=batch_size))
148 bytes_deleted = index.delete_n_bytes(
149 batch_size,
150 protect_blobs_after=only_delete_before,
151 large_blob_threshold=large_blob_threshold,
152 large_blob_lifetime=large_blob_only_delete_before,
153 )
155 LOGGER.info("Bulk deleted bytes from index.", tags=dict(bytes_deleted=bytes_deleted))
157 if self._is_instrumented and bytes_deleted > 0:
158 batch_duration = time.time() - batch_start_time
159 bytes_deleted_per_second = bytes_deleted / batch_duration
160 publish_gauge_metric(METRIC.CLEANUP.BYTES_DELETED_PER_SECOND, bytes_deleted_per_second)
161 publish_counter_metric(METRIC.CLEANUP.BYTES_DELETED_COUNT, bytes_deleted)
162 return bytes_deleted
164 def _cleanup_worker(self, cleanup_config: CleanupConfig, stop_requested: threading.Event) -> None:
165 """Cleanup when full"""
166 instance_name = cleanup_config.name
167 with instance_context(instance_name):
168 index = cleanup_config.index
169 LOGGER.info("Cleanup started.", tags=dict(instance_name=instance_name))
171 while not stop_requested.is_set():
172 # When first starting a loop, we will also include any remaining delete markers as part of
173 # the total size.
174 total_size = index.get_total_size()
175 self.publish_total_size_metric(total_size, cleanup_config.high_watermark, cleanup_config.low_watermark)
177 blob_count = index.get_blob_count()
178 blob_count_range: range | None = None
179 if (
180 cleanup_config.high_blob_count_watermark is not None
181 and cleanup_config.low_blob_count_watermark is not None
182 ):
183 blob_count_range = range(
184 cleanup_config.low_blob_count_watermark,
185 cleanup_config.high_blob_count_watermark,
186 )
187 self.publish_blob_count_metric(blob_count, blob_count_range)
189 if total_size >= cleanup_config.high_watermark or (
190 blob_count_range is not None and blob_count >= blob_count_range.stop
191 ):
192 bytes_to_delete = total_size - cleanup_config.low_watermark
193 if bytes_to_delete > 0:
194 LOGGER.info(
195 "High bytes watermark exceeded. Deleting items from storage/index.",
196 tags=dict(
197 total_bytes=total_size,
198 min_bytes_to_delete=bytes_to_delete,
199 instance_name=instance_name,
200 ),
201 )
202 if blob_count_range is not None:
203 blobs_to_delete = blob_count - blob_count_range.start
204 if blobs_to_delete > 0:
205 LOGGER.info(
206 "High blob count watermark exceeded. Deleting items from storage/index.",
207 tags=dict(
208 total_blobs=blob_count,
209 min_blobs_to_delete=blobs_to_delete,
210 instance_name=instance_name,
211 ),
212 )
214 with timer(METRIC.CLEANUP.DURATION):
215 while not stop_requested.is_set() and (
216 total_size > cleanup_config.low_watermark
217 or (blob_count_range is not None and blob_count > blob_count_range.start)
218 ):
219 only_delete_before = datetime.now(timezone.utc) - cleanup_config.only_if_unused_for
220 large_blob_only_delete_before = (
221 datetime.now(timezone.utc) - cleanup_config.large_blob_lifetime
222 if cleanup_config.large_blob_lifetime is not None
223 else None
224 )
225 with timer(METRIC.CLEANUP.BATCH_DURATION):
226 bytes_deleted = self._do_cleanup_batch(
227 index=index,
228 only_delete_before=only_delete_before,
229 batch_size=cleanup_config.batch_size,
230 large_blob_threshold=cleanup_config.large_blob_threshold,
231 large_blob_only_delete_before=large_blob_only_delete_before,
232 )
233 if not bytes_deleted:
234 err = "Marked 0 digests for deletion, even though cleanup was triggered."
235 if total_size >= cleanup_config.high_watermark:
236 LOGGER.error(f"{err} Total size still remains greater than high watermark!")
237 elif blob_count_range is not None and blob_count >= blob_count_range.stop:
238 LOGGER.error(f"{err} Blob count remains greater than high watermark!")
239 else:
240 LOGGER.warning(err)
241 stop_requested.wait(
242 timeout=self._sleep_interval
243 ) # Avoid a busy loop when we can't make progress
244 total_size = index.get_total_size()
245 blob_count = index.get_blob_count()
246 self.publish_total_size_metric(
247 total_size, cleanup_config.high_watermark, cleanup_config.low_watermark
248 )
249 self.publish_blob_count_metric(blob_count, blob_count_range)
250 LOGGER.info("Finished cleanup batch.", tags=dict(non_stale_total_bytes=total_size))
252 LOGGER.info("Finished cleanup.", tags=dict(total_bytes=total_size))
254 stop_requested.wait(timeout=self._sleep_interval)
256 def publish_total_size_metric(self, total_size: int, high_watermark: int, low_watermark: int) -> None:
257 if self._is_instrumented:
258 high_watermark_percentage = float((total_size / high_watermark) * 100) if high_watermark > 0 else 0
259 publish_gauge_metric(METRIC.CLEANUP.TOTAL_BYTES_COUNT, total_size)
260 publish_gauge_metric(METRIC.CLEANUP.LOW_WATERMARK_BYTES_COUNT, low_watermark)
261 publish_gauge_metric(METRIC.CLEANUP.HIGH_WATERMARK_BYTES_COUNT, high_watermark)
262 publish_gauge_metric(METRIC.CLEANUP.TOTAL_BYTES_WATERMARK_PERCENT, high_watermark_percentage)
264 def publish_blob_count_metric(self, blob_count: int, blob_count_range: range | None) -> None:
265 if self._is_instrumented:
266 publish_gauge_metric(METRIC.CLEANUP.TOTAL_BLOBS_COUNT, blob_count)
267 if blob_count_range is not None:
268 if blob_count_range.stop > 0:
269 high_watermark_percentage = float((blob_count / blob_count_range.stop) * 100)
270 else:
271 high_watermark_percentage = 0
272 publish_gauge_metric(METRIC.CLEANUP.LOW_WATERMARK_BLOBS_COUNT, blob_count_range.start)
273 publish_gauge_metric(METRIC.CLEANUP.HIGH_WATERMARK_BLOBS_COUNT, blob_count_range.stop)
274 publish_gauge_metric(METRIC.CLEANUP.TOTAL_BLOBS_WATERMARK_PERCENT, high_watermark_percentage)