Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/redis.py: 95.35%
215 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-07-10 13:10 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-07-10 13:10 +0000
1# Copyright (C) 2023 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16A storage provider that uses redis to maintain existence and expiry metadata
17for a storage.
18"""
21import time
22from datetime import datetime, timedelta, timezone
23from typing import IO, Iterator, Optional
25import redis
27from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
28from buildgrid._protos.google.rpc import code_pb2
29from buildgrid._protos.google.rpc.status_pb2 import Status
30from buildgrid.server.cas.storage.index.index_abc import IndexABC
31from buildgrid.server.cas.storage.storage_abc import StorageABC
32from buildgrid.server.decorators import timed
33from buildgrid.server.logging import buildgrid_logger
34from buildgrid.server.metrics_names import METRIC
35from buildgrid.server.metrics_utils import publish_gauge_metric
36from buildgrid.server.redis.provider import RedisProvider
37from buildgrid.server.utils.digests import validate_digest_data
39LOGGER = buildgrid_logger(__name__)
42class RedisIndex(IndexABC):
43 TYPE = "RedisIndex"
45 def __init__(self, redis: RedisProvider, storage: StorageABC, prefix: Optional[str] = None) -> None:
46 self._redis = redis
47 self._storage = storage
48 self._prefix = "A"
49 self._total_size_key = "total_size"
50 if prefix == "A":
51 LOGGER.error("Prefix 'A' is reserved as the default prefix and cannot be used")
52 raise ValueError("Prefix 'A' is reserved as the default prefix and cannot be used")
53 elif prefix:
54 self._prefix = prefix
55 self._total_size_key = self._prefix + ":" + self._total_size_key
56 # TODO: make this configurable, and lower the default
57 self._ttl = timedelta(days=365)
59 # Keep track of the last returned scan cursor
60 # to not start at the beginning for each call to `delete_n_bytes`
61 self._delete_n_bytes_cursor = 0
63 def start(self) -> None:
64 self._storage.start()
66 def stop(self) -> None:
67 self._storage.stop()
69 def _construct_key(self, digest: Digest) -> str:
70 """Helper to get the redis key name for a particular digest"""
71 # The tag prefix serves to distinguish between our keys and
72 # actual blobs if the same redis is used for both index and storage
73 return self._prefix + ":" + digest.hash + "_" + str(digest.size_bytes)
75 def _deconstruct_key(self, keystr: str) -> Digest | None:
76 """Helper to attempt to recover a Digest from a redis key"""
78 try:
79 tag, rest = keystr.split(":", 1)
80 if tag != self._prefix:
81 return None
82 hash, size_bytes = rest.rsplit("_", 1)
83 return Digest(hash=hash, size_bytes=int(size_bytes))
84 except ValueError:
85 return None
87 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
88 def has_blob(self, digest: Digest) -> bool:
89 # Redis is authoritative for existence, no need to check storage.
90 return bool(self._redis.execute_ro(lambda r: r.exists(self._construct_key(digest))))
92 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
93 def get_blob(self, digest: Digest) -> IO[bytes] | None:
94 if blob := self._storage.get_blob(digest):
95 return blob
97 deleted_index_digests = self._bulk_delete_from_index([digest])
98 for digest in deleted_index_digests:
99 LOGGER.warning("Blob was indexed but not in storage. Deleted from the index.", tags=dict(digest=digest))
101 return None
103 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
104 def delete_blob(self, digest: Digest) -> None:
105 # If the initial delete doesn't delete anything due to the key not existing
106 # don't do anything else
107 if self._redis.execute_rw(lambda r: r.delete(self._construct_key(digest))):
108 self._storage.delete_blob(digest)
110 # If we race with a blob being re-added we might have just deleted the
111 # storage out from under it. We don't want the index to end up with
112 # keys for things that are not present in storage since we consider
113 # the index authoritative for existance. So we delete the keys again
114 # after deleting from storage, this way if they do get out of sync it
115 # will be in the direction of leaking objects in storage that the
116 # index doesn't know about.
117 def delete_from_index(r: "redis.Redis[bytes]") -> None:
118 pipe = r.pipeline()
119 pipe.delete(self._construct_key(digest))
120 pipe.decrby(self._total_size_key, digest.size_bytes)
121 pipe.execute()
123 self._redis.execute_rw(delete_from_index)
125 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
126 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
127 self._storage.commit_write(digest, write_session)
129 def set_ttl(r: "redis.Redis[bytes]") -> None:
130 key = self._construct_key(digest)
131 # Only increment total_size if this key is new
132 # Use a dummy value of 1. We only care about existence and expiry
133 if r.set(key, 1, ex=self._ttl, nx=True) is not None:
134 r.incrby(self._total_size_key, digest.size_bytes)
136 self._redis.execute_rw(set_ttl)
138 def _bulk_delete_from_index(self, digests: list[Digest]) -> list[Digest]:
139 def delete_from_index(r: "redis.Redis[bytes]") -> list[Digest]:
140 pipe = r.pipeline()
141 bytes_deleted = 0
142 for digest in digests:
143 pipe.delete(self._construct_key(digest))
144 results = pipe.execute()
145 # Go through the delete calls and only decrement total_size for the keys
146 # which were actually removed
147 successful_deletes = []
148 for result, digest in zip(results, digests):
149 if result:
150 bytes_deleted += digest.size_bytes
151 successful_deletes.append(digest)
152 r.decrby(self._total_size_key, bytes_deleted)
153 return successful_deletes
155 successful_deletes = self._redis.execute_rw(delete_from_index)
156 return successful_deletes
158 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE)
159 def bulk_delete(self, digests: list[Digest]) -> list[str]:
160 # Delete from the index and then delete from the backing storage.
161 successful_deletes = self._bulk_delete_from_index(digests)
162 failed_deletes = self._storage.bulk_delete(successful_deletes)
163 return failed_deletes
165 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
166 def missing_blobs(self, digests: list[Digest]) -> list[Digest]:
167 # We hit the RW node for every FMB call to extend all the TTLs.
168 # This could try to take advantage of RO replicas by only hitting the
169 # RW node for blobs that do not have enough TTL left, if any.
170 # We currently rely on the always-updated TTL to determine if a blob
171 # should be protected in mark_n_bytes_as_deleted. If we allow some
172 # slop before updating the RW node here we need to account for it
173 # there too.
174 def extend_ttls(r: "redis.Redis[bytes]") -> list[int]:
175 pipe = r.pipeline(transaction=False)
176 for digest in digests:
177 pipe.expire(name=self._construct_key(digest), time=self._ttl)
178 return pipe.execute()
180 extend_results = self._redis.execute_rw(extend_ttls)
182 return [digest for digest, result in zip(digests, extend_results) if result != 1]
184 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
185 def bulk_update_blobs(self, blobs: list[tuple[Digest, bytes]]) -> list[Status]:
186 result_map: dict[str, Status] = {}
187 missing_blob_pairs: list[tuple[Digest, bytes]] = []
188 missing_blobs = self.missing_blobs([digest for digest, _ in blobs])
189 for digest, blob in blobs:
190 if digest not in missing_blobs:
191 if validate_digest_data(digest, blob):
192 result_map[digest.hash] = Status(code=code_pb2.OK)
193 else:
194 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash")
195 else:
196 missing_blob_pairs.append((digest, blob))
197 results = self._storage.bulk_update_blobs(missing_blob_pairs)
199 def set_ttls(r: "redis.Redis[bytes]") -> None:
200 pipe = r.pipeline()
201 bytes_added = 0
202 for digest, result in zip(missing_blobs, results):
203 result_map[digest.hash] = result
204 if result.code == code_pb2.OK:
205 key = self._construct_key(digest)
206 # Use a dummy value of 1. We only care about existence and expiry
207 pipe.set(key, 1, ex=self._ttl, nx=True)
208 redis_results = pipe.execute()
209 # only update total_size for brand new keys
210 for result, digest in zip(redis_results, missing_blobs):
211 if result is not None:
212 bytes_added += digest.size_bytes
213 r.incrby(self._total_size_key, bytes_added)
215 self._redis.execute_rw(set_ttls)
216 return [result_map[digest.hash] for digest, _ in blobs]
218 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
219 def bulk_read_blobs(self, digests: list[Digest]) -> dict[str, bytes]:
220 fetched_digests = self._storage.bulk_read_blobs(digests)
222 fetched_digest_hashes = set(digest_hash for (digest_hash, _) in fetched_digests.items())
223 digests_not_in_storage: list[Digest] = []
224 for expected_digest in digests:
225 if expected_digest.hash not in fetched_digest_hashes:
226 digests_not_in_storage.append(expected_digest)
228 if digests_not_in_storage:
229 deleted_index_digests = self._bulk_delete_from_index(digests_not_in_storage)
230 for digest in deleted_index_digests:
231 LOGGER.warning(
232 "Blob was indexed but not in storage. Deleted from the index.", tags=dict(digest=digest)
233 )
235 return fetched_digests
237 def least_recent_digests(self) -> Iterator[Digest]:
238 """Generator to iterate through the digests in LRU order"""
239 # This is not a LRU index, this method is used only from tests.
240 raise NotImplementedError()
242 def get_total_size(self) -> int:
243 """
244 Return the sum of the size of all blobs within the index
245 """
247 # The total_size represents what we have stored in the underlying
248 # storage. However, if some redis notifications for expiring keys
249 # are missed we won't actually have keys to account for all the size.
250 # The expectation is that a "janitor" process will locate orphaned
251 # blobs in storage eventually and when it does so it will call our
252 # delete_blob which will finally decrby the total_size.
253 total_size = self._redis.execute_ro(lambda r: r.get(self._total_size_key))
254 if total_size:
255 return int(total_size)
256 else:
257 return 0
259 def get_blob_count(self) -> int:
260 key_count = int(self._redis.execute_ro(lambda r: r.dbsize()))
261 # Subtract 1 to not count the `total_size` key
262 # but never return a negative count
263 return max(0, key_count - 1)
265 def delete_n_bytes(
266 self,
267 n_bytes: int,
268 dry_run: bool = False,
269 protect_blobs_after: datetime | None = None,
270 large_blob_threshold: int | None = None,
271 large_blob_lifetime: datetime | None = None,
272 ) -> int:
273 """
274 Iterate through the Redis Index using 'SCAN' and delete any entries older than
275 'protect_blobs_after'. The ordering of the deletes is undefined and can't be assumed
276 to be LRU. Large blobs can optionally be configured to have a separate lifetime.
277 """
278 now = datetime.now(timezone.utc)
280 if protect_blobs_after:
281 threshold_time = protect_blobs_after
282 else:
283 threshold_time = now
285 seen: set[str] = set()
286 bytes_deleted = 0
288 while n_bytes > 0:
289 # Used for metric publishing
290 delete_start_time = time.time()
292 # Maybe count should be configurable or somehow self-tuning
293 # based on how many deletable keys we're actually getting
294 # back per-request.
295 # We could also choose random prefixes for the scan so that
296 # multiple cleanup process are less likely to contend
297 rawkeys: list[bytes]
298 previous_cursor = self._delete_n_bytes_cursor
299 self._delete_n_bytes_cursor, rawkeys = self._redis.execute_ro(
300 lambda r: r.scan(match=f"{self._prefix}:*", cursor=self._delete_n_bytes_cursor, count=1000)
301 )
302 keys = [key.decode() for key in rawkeys if key != b""]
304 def get_ttls(r: "redis.Redis[bytes]") -> list[bytes]:
305 pipe = r.pipeline(transaction=False)
306 for key in keys:
307 # Skip over any total_size keys
308 if key.split(":")[-1] == "total_size":
309 continue
310 pipe.ttl(key)
311 return pipe.execute()
313 raw_ttls = self._redis.execute_ro(get_ttls)
314 ttls = [int(x) for x in raw_ttls]
316 LOGGER.debug("Scan returned.", tags=dict(key_count=len(ttls)))
317 digests_to_delete: list[Digest] = []
318 failed_deletes: list[str] = []
319 new_blob_bytes = 0
320 for key, ttl in zip(keys, ttls):
321 digest = self._deconstruct_key(key)
322 if digest and digest.hash not in seen:
323 seen.add(digest.hash)
324 # Since FMB sets the ttl to self._ttl on every call we can
325 # use the time remaining to figure out when the last FMB
326 # call for that blob was.
327 blob_time = now - (self._ttl - timedelta(seconds=ttl))
328 if n_bytes <= 0:
329 # Reset scan cursor to previous value to not skip
330 # the digests we didn't get to
331 self._delete_n_bytes_cursor = previous_cursor
332 break
334 if (blob_time <= threshold_time) or (
335 large_blob_threshold
336 and large_blob_lifetime
337 and digest.size_bytes > large_blob_threshold
338 and blob_time <= large_blob_lifetime
339 ):
340 n_bytes -= digest.size_bytes
341 digests_to_delete.append(digest)
342 else:
343 new_blob_bytes += digest.size_bytes
345 if digests_to_delete:
346 if dry_run:
347 LOGGER.debug("Detected deletable digests.", tags=dict(digest_count=len(digests_to_delete)))
348 for digest in digests_to_delete:
349 if digest not in failed_deletes:
350 bytes_deleted += digest.size_bytes
351 else:
352 LOGGER.debug("Deleting digests.", tags=dict(digest_count=len(digests_to_delete)))
353 failed_deletes = self.bulk_delete(digests_to_delete)
354 blobs_deleted = 0
355 for digest in digests_to_delete:
356 if digest not in failed_deletes:
357 blobs_deleted += 1
358 bytes_deleted += digest.size_bytes
360 batch_duration = time.time() - delete_start_time
361 blobs_deleted_per_second = blobs_deleted / batch_duration
362 publish_gauge_metric(METRIC.CLEANUP.BLOBS_DELETED_PER_SECOND, blobs_deleted_per_second)
363 elif new_blob_bytes > 0:
364 LOGGER.error(
365 "All remaining digests have been accessed within the time threshold",
366 tags=dict(new_blob_bytes=new_blob_bytes, threshold_time=protect_blobs_after),
367 )
369 if self._delete_n_bytes_cursor == 0: # scan finished
370 LOGGER.debug("Cursor exhausted.")
371 break
373 return bytes_deleted