Coverage for /builds/BuildGrid/buildgrid/buildgrid/cleanup/cleanup.py: 73.83%
107 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import logging
17import threading
18import time
19from contextlib import ExitStack
20from datetime import datetime, timedelta
21from typing import Any, Dict, List, Optional
23from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
24from buildgrid.server.cas.storage.index.index_abc import IndexABC
25from buildgrid.server.metrics_names import CLEANUP_BYTES_DELETION_RATE_METRIC_NAME, CLEANUP_RUNTIME_METRIC_NAME
26from buildgrid.server.metrics_utils import DurationMetric, publish_gauge_metric
27from buildgrid.server.monitoring import (
28 MonitoringBus,
29 MonitoringOutputFormat,
30 MonitoringOutputType,
31 get_monitoring_bus,
32 set_monitoring_bus,
33)
34from buildgrid.server.threading import ContextThreadPoolExecutor, ContextWorker
36LOGGER = logging.getLogger(__name__)
39def _digests_str(digests: List[Digest]) -> str:
40 return f"{len(digests)} digests ({sum(d.size_bytes for d in digests)} bytes)"
43class CASCleanUp:
44 """Creates a CAS cleanup service."""
46 def __init__(
47 self,
48 dry_run: bool,
49 high_watermark: int,
50 low_watermark: int,
51 sleep_interval: int,
52 batch_size: int,
53 only_if_unused_for: timedelta,
54 indexes: Dict[str, IndexABC],
55 monitor: bool,
56 mon_endpoint_type: Optional[MonitoringOutputType] = None,
57 mon_endpoint_location: Optional[str] = None,
58 mon_serialisation_format: Optional[MonitoringOutputFormat] = None,
59 mon_metric_prefix: Optional[str] = None,
60 ) -> None:
61 self._stack = ExitStack()
62 self._dry_run = dry_run
64 self._high_watermark = high_watermark
65 self._low_watermark = low_watermark
66 self._batch_size = batch_size
67 self._only_if_unused_for = only_if_unused_for
69 self._indexes = indexes
71 self._is_instrumented = monitor
73 self._sleep_interval = sleep_interval
75 if self._is_instrumented:
76 set_monitoring_bus(
77 MonitoringBus(
78 endpoint_type=mon_endpoint_type or MonitoringOutputType.SOCKET,
79 endpoint_location=mon_endpoint_location,
80 metric_prefix=mon_metric_prefix or "",
81 serialisation_format=mon_serialisation_format or MonitoringOutputFormat.STATSD,
82 )
83 )
85 # --- Public API ---
87 def start(self, timeout: Optional[float] = None) -> None:
88 """Start cleanup service"""
89 if self._is_instrumented:
90 self._stack.enter_context(get_monitoring_bus())
91 worker = self._stack.enter_context(ContextWorker(self._begin_cleanup, "CleanUpLauncher"))
92 worker.wait(timeout=timeout)
94 def stop(self, *args: Any, **kwargs: Any) -> None:
95 """Stops the cleanup service"""
96 LOGGER.info("Stopping Cleanup Service")
97 self._stack.close()
99 def _begin_cleanup(self, stop_requested: threading.Event) -> None:
100 if self._dry_run:
101 for instance_name in self._indexes.keys():
102 self._calculate_cleanup(instance_name)
103 return
105 attempts = 0
106 with ContextThreadPoolExecutor(max_workers=len(self._indexes)) as ex:
107 while True:
108 futures = {
109 instance_name: ex.submit(self._cleanup_worker, instance_name, stop_requested)
110 for instance_name in self._indexes.keys()
111 }
113 failed = False
114 for instance_name, future in futures.items():
115 try:
116 future.result()
117 LOGGER.info(f"CleanUp for {instance_name} completed")
118 except Exception:
119 LOGGER.exception(f"CleanUp for {instance_name} failed")
120 failed = True
122 if not failed:
123 break
125 # Exponential backoff before retrying
126 sleep_time = 1.6**attempts
127 LOGGER.info(f"Retrying Cleanup in {sleep_time} seconds")
128 stop_requested.wait(timeout=sleep_time)
129 attempts += 1
130 continue
132 def _calculate_cleanup(self, instance_name: str) -> None:
133 """Work out which blobs will be deleted by the cleanup command."""
135 LOGGER.info(f"Cleanup dry run for instance '{instance_name}'")
136 index = self._indexes[instance_name]
137 only_delete_before = self._get_last_accessed_threshold()
138 total_size = index.get_total_size()
139 LOGGER.info(
140 f"CAS size is {total_size} bytes, compared with a high water mark of "
141 f"{self._high_watermark} bytes and a low water mark of {self._low_watermark} bytes."
142 )
143 if total_size >= self._high_watermark:
144 required_space = total_size - self._low_watermark
145 cleared_space = index.delete_n_bytes(required_space, dry_run=True, protect_blobs_after=only_delete_before)
146 LOGGER.info(f"{cleared_space} of the requested {required_space} bytes would be deleted.")
147 else:
148 LOGGER.info(f"Total size {total_size} is less than the high water mark, " f"nothing will be deleted.")
150 def _do_cleanup_batch(
151 self,
152 instance_name: str,
153 index: IndexABC,
154 only_delete_before: datetime,
155 total_size: int,
156 stop_requested: threading.Event,
157 ) -> None:
158 batch_start_time = time.time()
160 LOGGER.info(f"Deleting {self._batch_size} bytes from the index")
161 bytes_deleted = index.delete_n_bytes(self._batch_size, protect_blobs_after=only_delete_before)
163 if not bytes_deleted:
164 err = (
165 "Marked 0 digests for deletion, even though cleanup was triggered. "
166 "This may be because the remaining digests have been accessed within "
167 f"{only_delete_before}."
168 )
169 if total_size >= self._high_watermark:
170 LOGGER.error(f"{err} Total size still remains greater than high watermark!")
171 else:
172 LOGGER.warning(err)
173 stop_requested.wait(timeout=self._sleep_interval) # Avoid a busy loop when we can't make progress
174 return
176 LOGGER.info(f"Bulk deleted {bytes_deleted} bytes from index")
178 if self._is_instrumented:
179 batch_duration = time.time() - batch_start_time
180 bytes_deleted_per_second = bytes_deleted / batch_duration
181 publish_gauge_metric(
182 CLEANUP_BYTES_DELETION_RATE_METRIC_NAME, bytes_deleted_per_second, {"instance-name": instance_name}
183 )
185 def _cleanup_worker(self, instance_name: str, stop_requested: threading.Event) -> None:
186 """Cleanup when full"""
187 index = self._indexes[instance_name]
188 index.set_instance_name(instance_name)
189 LOGGER.info(f"Cleanup for instance '{instance_name}' started.")
191 while not stop_requested.is_set():
192 # When first starting a loop, we will also include any remaining delete markers as part of
193 # the total size.
194 total_size = index.get_total_size(include_marked=True)
195 if total_size >= self._high_watermark:
196 to_delete = total_size - self._low_watermark
197 LOGGER.info(
198 f"CAS size for instance '{instance_name}' is {total_size} bytes, at least "
199 f"{to_delete} bytes will be cleared."
200 )
201 LOGGER.info(f"Deleting items from storage/index for instance '{instance_name}'.")
203 with DurationMetric(CLEANUP_RUNTIME_METRIC_NAME, instance_name, instanced=True):
204 while not stop_requested.is_set() and total_size > self._low_watermark:
205 only_delete_before = self._get_last_accessed_threshold()
206 self._do_cleanup_batch(
207 instance_name=instance_name,
208 index=index,
209 only_delete_before=only_delete_before,
210 total_size=total_size,
211 stop_requested=stop_requested,
212 )
213 # Here we should have already deleted any remaining delete markers,
214 # Our cleanup target should be based on finding new entries.
215 total_size = index.get_total_size(include_marked=False)
216 LOGGER.info(f"After batch, the non-stale total size is {total_size} bytes.")
218 # Report the final size including the marked deleted items.
219 total_size = index.get_total_size(include_marked=True)
220 LOGGER.info(f"Finished cleanup. CAS size is now {total_size} bytes.")
222 stop_requested.wait(timeout=self._sleep_interval)
224 def _get_last_accessed_threshold(self) -> datetime:
225 return datetime.utcnow() - self._only_if_unused_for