Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/cleanup.py: 71.93%
114 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import threading
17import time
18from contextlib import ExitStack
19from datetime import datetime, timedelta
20from typing import Any, Dict, List, Optional
22from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
23from buildgrid.server.cas.storage.index.index_abc import IndexABC
24from buildgrid.server.context import instance_context
25from buildgrid.server.logging import buildgrid_logger
26from buildgrid.server.metrics_names import METRIC
27from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric, timer
28from buildgrid.server.monitoring import get_monitoring_bus
29from buildgrid.server.threading import ContextThreadPoolExecutor, ContextWorker
31LOGGER = buildgrid_logger(__name__)
34def _digests_str(digests: List[Digest]) -> str:
35 return f"{len(digests)} digests ({sum(d.size_bytes for d in digests)} bytes)"
38class CASCleanUp:
39 """Creates a CAS cleanup service."""
41 def __init__(
42 self,
43 dry_run: bool,
44 high_watermark: int,
45 low_watermark: int,
46 sleep_interval: int,
47 batch_size: int,
48 only_if_unused_for: timedelta,
49 indexes: Dict[str, IndexABC],
50 monitor: bool,
51 ) -> None:
52 self._stack = ExitStack()
53 self._dry_run = dry_run
55 self._high_watermark = high_watermark
56 self._low_watermark = low_watermark
57 self._batch_size = batch_size
58 self._only_if_unused_for = only_if_unused_for
60 self._indexes = indexes
62 self._is_instrumented = monitor
64 self._sleep_interval = sleep_interval
66 # --- Public API ---
68 def start(self, timeout: Optional[float] = None) -> None:
69 """Start cleanup service"""
70 if self._is_instrumented:
71 self._stack.enter_context(get_monitoring_bus())
72 worker = self._stack.enter_context(ContextWorker(self._begin_cleanup, "CleanUpLauncher"))
73 worker.wait(timeout=timeout)
75 def stop(self, *args: Any, **kwargs: Any) -> None:
76 """Stops the cleanup service"""
77 LOGGER.info("Stopping Cleanup Service.")
78 self._stack.close()
80 def _begin_cleanup(self, stop_requested: threading.Event) -> None:
81 if self._dry_run:
82 for instance_name in self._indexes.keys():
83 self._calculate_cleanup(instance_name)
84 return
86 attempts = 0
87 with ContextThreadPoolExecutor(max_workers=len(self._indexes)) as ex:
88 while True:
89 futures = {
90 instance_name: ex.submit(self._cleanup_worker, instance_name, stop_requested)
91 for instance_name in self._indexes.keys()
92 }
94 failed = False
95 for instance_name, future in futures.items():
96 try:
97 future.result()
98 LOGGER.info("Cleanup completed.", tags=dict(instance_name=instance_name))
99 except Exception:
100 LOGGER.exception("Cleanup failed.", tags=dict(instance_name=instance_name))
101 failed = True
103 if not failed:
104 break
106 # Exponential backoff before retrying
107 sleep_time = 1.6**attempts
108 LOGGER.info("Retrying Cleanup after delay...", tags=dict(sleep_time_seconds=sleep_time))
109 stop_requested.wait(timeout=sleep_time)
110 attempts += 1
111 continue
113 def _calculate_cleanup(self, instance_name: str) -> None:
114 """Work out which blobs will be deleted by the cleanup command."""
115 with instance_context(instance_name):
116 LOGGER.info("Cleanup dry run.", tags=dict(instance_name=instance_name))
117 index = self._indexes[instance_name]
118 only_delete_before = self._get_last_accessed_threshold()
119 total_size = index.get_total_size()
120 LOGGER.info(
121 "Calculated CAS size.",
122 tags=dict(
123 total_size=total_size,
124 high_watermark_bytes=self._high_watermark,
125 low_watermark_bytes=self._low_watermark,
126 ),
127 )
128 if total_size >= self._high_watermark:
129 required_space = total_size - self._low_watermark
130 cleared_space = index.delete_n_bytes(
131 required_space, dry_run=True, protect_blobs_after=only_delete_before
132 )
133 LOGGER.info(f"Determined {cleared_space} of the requested {required_space} bytes would be deleted.")
134 else:
135 LOGGER.info(f"Total size {total_size} is less than the high water mark, " f"nothing will be deleted.")
137 def _do_cleanup_batch(
138 self,
139 instance_name: str,
140 index: IndexABC,
141 only_delete_before: datetime,
142 total_size: int,
143 stop_requested: threading.Event,
144 ) -> None:
145 batch_start_time = time.time()
147 LOGGER.info("Deleting bytes from the index.", tags=dict(batch_size=self._batch_size))
148 bytes_deleted = index.delete_n_bytes(self._batch_size, protect_blobs_after=only_delete_before)
150 if not bytes_deleted:
151 err = (
152 "Marked 0 digests for deletion, even though cleanup was triggered. "
153 "This may be because the remaining digests have been accessed within "
154 f"{only_delete_before}."
155 )
156 if total_size >= self._high_watermark:
157 LOGGER.error(f"{err} Total size still remains greater than high watermark!")
158 else:
159 LOGGER.warning(err)
160 stop_requested.wait(timeout=self._sleep_interval) # Avoid a busy loop when we can't make progress
161 return
163 LOGGER.info("Bulk deleted bytes from index.", tags=dict(bytes_deleted=bytes_deleted))
165 if self._is_instrumented:
166 batch_duration = time.time() - batch_start_time
167 bytes_deleted_per_second = bytes_deleted / batch_duration
168 publish_gauge_metric(METRIC.CLEANUP.BYTES_DELETED_PER_SECOND, bytes_deleted_per_second)
169 publish_counter_metric(METRIC.CLEANUP.BYTES_DELETED_COUNT, bytes_deleted)
171 def _cleanup_worker(self, instance_name: str, stop_requested: threading.Event) -> None:
172 """Cleanup when full"""
173 with instance_context(instance_name):
174 index = self._indexes[instance_name]
175 LOGGER.info("Cleanup started.", tags=dict(instance_name=instance_name))
177 while not stop_requested.is_set():
178 # When first starting a loop, we will also include any remaining delete markers as part of
179 # the total size.
180 total_size = index.get_total_size()
181 self.publish_total_size_metric(total_size)
183 if total_size >= self._high_watermark:
184 to_delete = total_size - self._low_watermark
185 LOGGER.info(
186 "High watermark exceeded. Deleting items from storage/index.",
187 tags=dict(total_bytes=total_size, min_bytes_to_delete=to_delete, instance_name=instance_name),
188 )
190 with timer(METRIC.CLEANUP.DURATION):
191 while not stop_requested.is_set() and total_size > self._low_watermark:
192 only_delete_before = self._get_last_accessed_threshold()
193 with timer(METRIC.CLEANUP.BATCH_DURATION):
194 self._do_cleanup_batch(
195 instance_name=instance_name,
196 index=index,
197 only_delete_before=only_delete_before,
198 total_size=total_size,
199 stop_requested=stop_requested,
200 )
201 total_size = index.get_total_size()
202 self.publish_total_size_metric(total_size)
203 LOGGER.info("Finished cleanup batch.", tags=dict(non_stale_total_bytes=total_size))
205 LOGGER.info("Finished cleanup.", tags=dict(total_bytes=total_size))
207 stop_requested.wait(timeout=self._sleep_interval)
209 def _get_last_accessed_threshold(self) -> datetime:
210 return datetime.utcnow() - self._only_if_unused_for
212 def publish_total_size_metric(self, total_size: int) -> None:
213 if self._is_instrumented:
214 publish_gauge_metric(METRIC.CLEANUP.TOTAL_BYTES_COUNT, total_size)
215 publish_gauge_metric(METRIC.CLEANUP.LOW_WATERMARK_BYTES_COUNT, self._low_watermark)
216 publish_gauge_metric(METRIC.CLEANUP.HIGH_WATERMARK_BYTES_COUNT, self._high_watermark)