Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/redis.py: 96.43%
196 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2023 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16A storage provider that uses redis to maintain existence and expiry metadata
17for a storage.
18"""
21import time
22from datetime import datetime, timedelta
23from typing import IO, Dict, Iterator, List, Optional, Set, Tuple
25import redis
27from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
28from buildgrid._protos.google.rpc import code_pb2
29from buildgrid._protos.google.rpc.status_pb2 import Status
30from buildgrid.server.cas.storage.index.index_abc import IndexABC
31from buildgrid.server.cas.storage.storage_abc import StorageABC
32from buildgrid.server.decorators import timed
33from buildgrid.server.logging import buildgrid_logger
34from buildgrid.server.metrics_names import METRIC
35from buildgrid.server.metrics_utils import publish_gauge_metric
36from buildgrid.server.redis.provider import RedisProvider
37from buildgrid.server.utils.digests import validate_digest_data
39LOGGER = buildgrid_logger(__name__)
42class RedisIndex(IndexABC):
43 TYPE = "RedisIndex"
45 def __init__(self, redis: RedisProvider, storage: StorageABC) -> None:
46 self._redis = redis
47 self._storage = storage
48 # TODO: implement redis notification based cleanup, make this configurable, and lower the default
49 self._ttl = timedelta(days=365)
51 # Keep track of the last returned scan cursor
52 # to not start at the beginning for each call to `delete_n_bytes`
53 self._delete_n_bytes_cursor = 0
55 def start(self) -> None:
56 self._storage.start()
58 def stop(self) -> None:
59 self._storage.stop()
61 def _construct_key(self, digest: Digest) -> str:
62 """Helper to get the redis key name for a particular digest"""
63 # The tag prefix serves to distinguish between our keys and
64 # actual blobs if the same redis is used for both index and storage
65 return "A:" + digest.hash + "_" + str(digest.size_bytes)
67 def _deconstruct_key(self, keystr: str) -> Optional[Digest]:
68 """Helper to attempt to recover a Digest from a redis key"""
70 try:
71 tag, rest = keystr.split(":", 1)
72 if tag != "A":
73 return None
74 hash, size_bytes = rest.rsplit("_", 1)
75 return Digest(hash=hash, size_bytes=int(size_bytes))
76 except ValueError:
77 return None
79 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
80 def has_blob(self, digest: Digest) -> bool:
81 # Redis is authoritative for existence, no need to check storage.
82 return bool(self._redis.execute_ro(lambda r: r.exists(self._construct_key(digest))))
84 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
85 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
86 if blob := self._storage.get_blob(digest):
87 return blob
89 deleted_index_digests = self._bulk_delete_from_index([digest])
90 for digest in deleted_index_digests:
91 LOGGER.warning("Blob was indexed but not in storage. Deleted from the index.", tags=dict(digest=digest))
93 return None
95 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
96 def delete_blob(self, digest: Digest) -> None:
97 # If the initial delete doesn't delete anything due to the key not existing
98 # don't do anything else
99 if self._redis.execute_rw(lambda r: r.delete(self._construct_key(digest))):
100 self._storage.delete_blob(digest)
102 # If we race with a blob being re-added we might have just deleted the
103 # storage out from under it. We don't want the index to end up with
104 # keys for things that are not present in storage since we consider
105 # the index authoritative for existance. So we delete the keys again
106 # after deleting from storage, this way if they do get out of sync it
107 # will be in the direction of leaking objects in storage that the
108 # index doesn't know about.
109 def delete_from_index(r: "redis.Redis[bytes]") -> None:
110 pipe = r.pipeline()
111 pipe.delete(self._construct_key(digest))
112 pipe.decrby("total_size", digest.size_bytes)
113 pipe.execute()
115 self._redis.execute_rw(delete_from_index)
117 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
118 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
119 self._storage.commit_write(digest, write_session)
121 def set_ttl(r: "redis.Redis[bytes]") -> None:
122 key = self._construct_key(digest)
123 # Only increment total_size if this key is new
124 # Use a dummy value of 1. We only care about existence and expiry
125 if r.set(key, 1, ex=self._ttl, nx=True) is not None:
126 r.incrby("total_size", digest.size_bytes)
128 self._redis.execute_rw(set_ttl)
130 def _bulk_delete_from_index(self, digests: List[Digest]) -> List[Digest]:
131 def delete_from_index(r: "redis.Redis[bytes]") -> List[Digest]:
132 pipe = r.pipeline()
133 bytes_deleted = 0
134 for digest in digests:
135 pipe.delete(self._construct_key(digest))
136 results = pipe.execute()
137 # Go through the delete calls and only decrement total_size for the keys
138 # which were actually removed
139 successful_deletes = []
140 for result, digest in zip(results, digests):
141 if result:
142 bytes_deleted += digest.size_bytes
143 successful_deletes.append(digest)
144 r.decrby("total_size", bytes_deleted)
145 return successful_deletes
147 successful_deletes = self._redis.execute_rw(delete_from_index)
148 return successful_deletes
150 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE)
151 def bulk_delete(self, digests: List[Digest]) -> List[str]:
152 # Delete from the index and then delete from the backing storage.
153 successful_deletes = self._bulk_delete_from_index(digests)
154 failed_deletes = self._storage.bulk_delete(successful_deletes)
155 return failed_deletes
157 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
158 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
159 # We hit the RW node for every FMB call to extend all the TTLs.
160 # This could try to take advantage of RO replicas by only hitting the
161 # RW node for blobs that do not have enough TTL left, if any.
162 # We currently rely on the always-updated TTL to determine if a blob
163 # should be protected in mark_n_bytes_as_deleted. If we allow some
164 # slop before updating the RW node here we need to account for it
165 # there too.
166 def extend_ttls(r: "redis.Redis[bytes]") -> List[int]:
167 pipe = r.pipeline(transaction=False)
168 for digest in digests:
169 pipe.expire(name=self._construct_key(digest), time=self._ttl)
170 return pipe.execute()
172 extend_results = self._redis.execute_rw(extend_ttls)
174 return [digest for digest, result in zip(digests, extend_results) if result != 1]
176 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
177 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
178 result_map: Dict[str, Status] = {}
179 missing_blob_pairs: List[Tuple[Digest, bytes]] = []
180 missing_blobs = self.missing_blobs([digest for digest, _ in blobs])
181 for digest, blob in blobs:
182 if digest not in missing_blobs:
183 if validate_digest_data(digest, blob):
184 result_map[digest.hash] = Status(code=code_pb2.OK)
185 else:
186 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash")
187 else:
188 missing_blob_pairs.append((digest, blob))
189 results = self._storage.bulk_update_blobs(missing_blob_pairs)
191 def set_ttls(r: "redis.Redis[bytes]") -> None:
192 pipe = r.pipeline()
193 bytes_added = 0
194 for digest, result in zip(missing_blobs, results):
195 result_map[digest.hash] = result
196 if result.code == code_pb2.OK:
197 key = self._construct_key(digest)
198 # Use a dummy value of 1. We only care about existence and expiry
199 pipe.set(key, 1, ex=self._ttl, nx=True)
200 redis_results = pipe.execute()
201 # only update total_size for brand new keys
202 for result, digest in zip(redis_results, missing_blobs):
203 if result is not None:
204 bytes_added += digest.size_bytes
205 r.incrby("total_size", bytes_added)
207 self._redis.execute_rw(set_ttls)
208 return [result_map[digest.hash] for digest, _ in blobs]
210 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
211 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
212 fetched_digests = self._storage.bulk_read_blobs(digests)
214 fetched_digest_hashes = set(digest_hash for (digest_hash, _) in fetched_digests.items())
215 digests_not_in_storage: List[Digest] = []
216 for expected_digest in digests:
217 if expected_digest.hash not in fetched_digest_hashes:
218 digests_not_in_storage.append(expected_digest)
220 if digests_not_in_storage:
221 deleted_index_digests = self._bulk_delete_from_index(digests_not_in_storage)
222 for digest in deleted_index_digests:
223 LOGGER.warning(
224 "Blob was indexed but not in storage. Deleted from the index.", tags=dict(digest=digest)
225 )
227 return fetched_digests
229 def least_recent_digests(self) -> Iterator[Digest]:
230 """Generator to iterate through the digests in LRU order"""
231 # This is not a LRU index, this method is used only from tests.
232 raise NotImplementedError()
234 def get_total_size(self) -> int:
235 """
236 Return the sum of the size of all blobs within the index
237 """
239 # The total_size represents what we have stored in the underlying
240 # storage. However, if some redis notifications for expiring keys
241 # are missed we won't actually have keys to account for all the size.
242 # The expectation is that a "janitor" process will locate orphaned
243 # blobs in storage eventually and when it does so it will call our
244 # delete_blob which will finally decrby the total_size.
245 total_size = self._redis.execute_ro(lambda r: r.get("total_size"))
246 if total_size:
247 return int(total_size)
248 else:
249 return 0
251 def delete_n_bytes(
252 self, n_bytes: int, dry_run: bool = False, protect_blobs_after: Optional[datetime] = None
253 ) -> int:
254 """
255 Iterate through the Redis Index using 'SCAN' and delete any entries older than
256 'protect_blobs_after'. The ordering of the deletes is undefined and can't be assumed
257 to be LRU.
258 """
259 now = datetime.utcnow()
261 if protect_blobs_after:
262 threshold_time = protect_blobs_after
263 else:
264 threshold_time = now
266 seen: Set[str] = set()
267 bytes_deleted = 0
269 while n_bytes > 0:
270 # Used for metric publishing
271 delete_start_time = time.time()
273 # Maybe count should be configurable or somehow self-tuning
274 # based on how many deletable keys we're actually getting
275 # back per-request.
276 # We could also choose random prefixes for the scan so that
277 # multiple cleanup process are less likely to contend
278 rawkeys: List[bytes]
279 previous_cursor = self._delete_n_bytes_cursor
280 self._delete_n_bytes_cursor, rawkeys = self._redis.execute_ro(
281 lambda r: r.scan(match="A:*", cursor=self._delete_n_bytes_cursor, count=1000)
282 )
283 keys = [key.decode() for key in rawkeys]
285 def get_ttls(r: "redis.Redis[bytes]") -> List[bytes]:
286 pipe = r.pipeline(transaction=False)
287 for key in keys:
288 pipe.ttl(key)
289 return pipe.execute()
291 raw_ttls = self._redis.execute_ro(get_ttls)
292 ttls = [int(x) for x in raw_ttls]
294 LOGGER.debug("Scan returned.", tags=dict(key_count=len(ttls)))
295 digests_to_delete: List[Digest] = []
296 failed_deletes: List[str] = []
297 for key, ttl in zip(keys, ttls):
298 # Since FMB sets the ttl to self._ttl on every call we can
299 # use the time remaining to figure out when the last FMB
300 # call for that blob was.
301 blob_time = now - (self._ttl - timedelta(seconds=ttl))
302 if n_bytes <= 0:
303 # Reset scan cursor to previous value to not skip
304 # the digests we didn't get to
305 self._delete_n_bytes_cursor = previous_cursor
306 break
308 digest = self._deconstruct_key(key)
309 if digest and (blob_time <= threshold_time) and (digest.hash not in seen):
310 n_bytes -= digest.size_bytes
311 digests_to_delete.append(digest)
313 if digests_to_delete:
314 if dry_run:
315 LOGGER.debug("Detected deletable digests.", tags=dict(digest_count=len(digests_to_delete)))
316 for digest in digests_to_delete:
317 if digest not in failed_deletes:
318 bytes_deleted += digest.size_bytes
319 else:
320 LOGGER.debug("Deleting digests.", tags=dict(digest_count=len(digests_to_delete)))
321 failed_deletes = self.bulk_delete(digests_to_delete)
322 blobs_deleted = 0
323 for digest in digests_to_delete:
324 if digest not in failed_deletes:
325 blobs_deleted += 1
326 bytes_deleted += digest.size_bytes
328 batch_duration = time.time() - delete_start_time
329 blobs_deleted_per_second = blobs_deleted / batch_duration
330 publish_gauge_metric(METRIC.CLEANUP.BLOBS_DELETED_PER_SECOND, blobs_deleted_per_second)
332 if self._delete_n_bytes_cursor == 0: # scan finished
333 LOGGER.debug("Cursor exhausted.")
334 break
336 return bytes_deleted