Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/redis.py: 95.35%

215 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-07-10 13:10 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16A storage provider that uses redis to maintain existence and expiry metadata 

17for a storage. 

18""" 

19 

20 

21import time 

22from datetime import datetime, timedelta, timezone 

23from typing import IO, Iterator, Optional 

24 

25import redis 

26 

27from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

28from buildgrid._protos.google.rpc import code_pb2 

29from buildgrid._protos.google.rpc.status_pb2 import Status 

30from buildgrid.server.cas.storage.index.index_abc import IndexABC 

31from buildgrid.server.cas.storage.storage_abc import StorageABC 

32from buildgrid.server.decorators import timed 

33from buildgrid.server.logging import buildgrid_logger 

34from buildgrid.server.metrics_names import METRIC 

35from buildgrid.server.metrics_utils import publish_gauge_metric 

36from buildgrid.server.redis.provider import RedisProvider 

37from buildgrid.server.utils.digests import validate_digest_data 

38 

39LOGGER = buildgrid_logger(__name__) 

40 

41 

42class RedisIndex(IndexABC): 

43 TYPE = "RedisIndex" 

44 

45 def __init__(self, redis: RedisProvider, storage: StorageABC, prefix: Optional[str] = None) -> None: 

46 self._redis = redis 

47 self._storage = storage 

48 self._prefix = "A" 

49 self._total_size_key = "total_size" 

50 if prefix == "A": 

51 LOGGER.error("Prefix 'A' is reserved as the default prefix and cannot be used") 

52 raise ValueError("Prefix 'A' is reserved as the default prefix and cannot be used") 

53 elif prefix: 

54 self._prefix = prefix 

55 self._total_size_key = self._prefix + ":" + self._total_size_key 

56 # TODO: make this configurable, and lower the default 

57 self._ttl = timedelta(days=365) 

58 

59 # Keep track of the last returned scan cursor 

60 # to not start at the beginning for each call to `delete_n_bytes` 

61 self._delete_n_bytes_cursor = 0 

62 

63 def start(self) -> None: 

64 self._storage.start() 

65 

66 def stop(self) -> None: 

67 self._storage.stop() 

68 

69 def _construct_key(self, digest: Digest) -> str: 

70 """Helper to get the redis key name for a particular digest""" 

71 # The tag prefix serves to distinguish between our keys and 

72 # actual blobs if the same redis is used for both index and storage 

73 return self._prefix + ":" + digest.hash + "_" + str(digest.size_bytes) 

74 

75 def _deconstruct_key(self, keystr: str) -> Digest | None: 

76 """Helper to attempt to recover a Digest from a redis key""" 

77 

78 try: 

79 tag, rest = keystr.split(":", 1) 

80 if tag != self._prefix: 

81 return None 

82 hash, size_bytes = rest.rsplit("_", 1) 

83 return Digest(hash=hash, size_bytes=int(size_bytes)) 

84 except ValueError: 

85 return None 

86 

87 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

88 def has_blob(self, digest: Digest) -> bool: 

89 # Redis is authoritative for existence, no need to check storage. 

90 return bool(self._redis.execute_ro(lambda r: r.exists(self._construct_key(digest)))) 

91 

92 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

93 def get_blob(self, digest: Digest) -> IO[bytes] | None: 

94 if blob := self._storage.get_blob(digest): 

95 return blob 

96 

97 deleted_index_digests = self._bulk_delete_from_index([digest]) 

98 for digest in deleted_index_digests: 

99 LOGGER.warning("Blob was indexed but not in storage. Deleted from the index.", tags=dict(digest=digest)) 

100 

101 return None 

102 

103 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

104 def delete_blob(self, digest: Digest) -> None: 

105 # If the initial delete doesn't delete anything due to the key not existing 

106 # don't do anything else 

107 if self._redis.execute_rw(lambda r: r.delete(self._construct_key(digest))): 

108 self._storage.delete_blob(digest) 

109 

110 # If we race with a blob being re-added we might have just deleted the 

111 # storage out from under it. We don't want the index to end up with 

112 # keys for things that are not present in storage since we consider 

113 # the index authoritative for existance. So we delete the keys again 

114 # after deleting from storage, this way if they do get out of sync it 

115 # will be in the direction of leaking objects in storage that the 

116 # index doesn't know about. 

117 def delete_from_index(r: "redis.Redis[bytes]") -> None: 

118 pipe = r.pipeline() 

119 pipe.delete(self._construct_key(digest)) 

120 pipe.decrby(self._total_size_key, digest.size_bytes) 

121 pipe.execute() 

122 

123 self._redis.execute_rw(delete_from_index) 

124 

125 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

126 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

127 self._storage.commit_write(digest, write_session) 

128 

129 def set_ttl(r: "redis.Redis[bytes]") -> None: 

130 key = self._construct_key(digest) 

131 # Only increment total_size if this key is new 

132 # Use a dummy value of 1. We only care about existence and expiry 

133 if r.set(key, 1, ex=self._ttl, nx=True) is not None: 

134 r.incrby(self._total_size_key, digest.size_bytes) 

135 

136 self._redis.execute_rw(set_ttl) 

137 

138 def _bulk_delete_from_index(self, digests: list[Digest]) -> list[Digest]: 

139 def delete_from_index(r: "redis.Redis[bytes]") -> list[Digest]: 

140 pipe = r.pipeline() 

141 bytes_deleted = 0 

142 for digest in digests: 

143 pipe.delete(self._construct_key(digest)) 

144 results = pipe.execute() 

145 # Go through the delete calls and only decrement total_size for the keys 

146 # which were actually removed 

147 successful_deletes = [] 

148 for result, digest in zip(results, digests): 

149 if result: 

150 bytes_deleted += digest.size_bytes 

151 successful_deletes.append(digest) 

152 r.decrby(self._total_size_key, bytes_deleted) 

153 return successful_deletes 

154 

155 successful_deletes = self._redis.execute_rw(delete_from_index) 

156 return successful_deletes 

157 

158 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

159 def bulk_delete(self, digests: list[Digest]) -> list[str]: 

160 # Delete from the index and then delete from the backing storage. 

161 successful_deletes = self._bulk_delete_from_index(digests) 

162 failed_deletes = self._storage.bulk_delete(successful_deletes) 

163 return failed_deletes 

164 

165 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

166 def missing_blobs(self, digests: list[Digest]) -> list[Digest]: 

167 # We hit the RW node for every FMB call to extend all the TTLs. 

168 # This could try to take advantage of RO replicas by only hitting the 

169 # RW node for blobs that do not have enough TTL left, if any. 

170 # We currently rely on the always-updated TTL to determine if a blob 

171 # should be protected in mark_n_bytes_as_deleted. If we allow some 

172 # slop before updating the RW node here we need to account for it 

173 # there too. 

174 def extend_ttls(r: "redis.Redis[bytes]") -> list[int]: 

175 pipe = r.pipeline(transaction=False) 

176 for digest in digests: 

177 pipe.expire(name=self._construct_key(digest), time=self._ttl) 

178 return pipe.execute() 

179 

180 extend_results = self._redis.execute_rw(extend_ttls) 

181 

182 return [digest for digest, result in zip(digests, extend_results) if result != 1] 

183 

184 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

185 def bulk_update_blobs(self, blobs: list[tuple[Digest, bytes]]) -> list[Status]: 

186 result_map: dict[str, Status] = {} 

187 missing_blob_pairs: list[tuple[Digest, bytes]] = [] 

188 missing_blobs = self.missing_blobs([digest for digest, _ in blobs]) 

189 for digest, blob in blobs: 

190 if digest not in missing_blobs: 

191 if validate_digest_data(digest, blob): 

192 result_map[digest.hash] = Status(code=code_pb2.OK) 

193 else: 

194 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash") 

195 else: 

196 missing_blob_pairs.append((digest, blob)) 

197 results = self._storage.bulk_update_blobs(missing_blob_pairs) 

198 

199 def set_ttls(r: "redis.Redis[bytes]") -> None: 

200 pipe = r.pipeline() 

201 bytes_added = 0 

202 for digest, result in zip(missing_blobs, results): 

203 result_map[digest.hash] = result 

204 if result.code == code_pb2.OK: 

205 key = self._construct_key(digest) 

206 # Use a dummy value of 1. We only care about existence and expiry 

207 pipe.set(key, 1, ex=self._ttl, nx=True) 

208 redis_results = pipe.execute() 

209 # only update total_size for brand new keys 

210 for result, digest in zip(redis_results, missing_blobs): 

211 if result is not None: 

212 bytes_added += digest.size_bytes 

213 r.incrby(self._total_size_key, bytes_added) 

214 

215 self._redis.execute_rw(set_ttls) 

216 return [result_map[digest.hash] for digest, _ in blobs] 

217 

218 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

219 def bulk_read_blobs(self, digests: list[Digest]) -> dict[str, bytes]: 

220 fetched_digests = self._storage.bulk_read_blobs(digests) 

221 

222 fetched_digest_hashes = set(digest_hash for (digest_hash, _) in fetched_digests.items()) 

223 digests_not_in_storage: list[Digest] = [] 

224 for expected_digest in digests: 

225 if expected_digest.hash not in fetched_digest_hashes: 

226 digests_not_in_storage.append(expected_digest) 

227 

228 if digests_not_in_storage: 

229 deleted_index_digests = self._bulk_delete_from_index(digests_not_in_storage) 

230 for digest in deleted_index_digests: 

231 LOGGER.warning( 

232 "Blob was indexed but not in storage. Deleted from the index.", tags=dict(digest=digest) 

233 ) 

234 

235 return fetched_digests 

236 

237 def least_recent_digests(self) -> Iterator[Digest]: 

238 """Generator to iterate through the digests in LRU order""" 

239 # This is not a LRU index, this method is used only from tests. 

240 raise NotImplementedError() 

241 

242 def get_total_size(self) -> int: 

243 """ 

244 Return the sum of the size of all blobs within the index 

245 """ 

246 

247 # The total_size represents what we have stored in the underlying 

248 # storage. However, if some redis notifications for expiring keys 

249 # are missed we won't actually have keys to account for all the size. 

250 # The expectation is that a "janitor" process will locate orphaned 

251 # blobs in storage eventually and when it does so it will call our 

252 # delete_blob which will finally decrby the total_size. 

253 total_size = self._redis.execute_ro(lambda r: r.get(self._total_size_key)) 

254 if total_size: 

255 return int(total_size) 

256 else: 

257 return 0 

258 

259 def get_blob_count(self) -> int: 

260 key_count = int(self._redis.execute_ro(lambda r: r.dbsize())) 

261 # Subtract 1 to not count the `total_size` key 

262 # but never return a negative count 

263 return max(0, key_count - 1) 

264 

265 def delete_n_bytes( 

266 self, 

267 n_bytes: int, 

268 dry_run: bool = False, 

269 protect_blobs_after: datetime | None = None, 

270 large_blob_threshold: int | None = None, 

271 large_blob_lifetime: datetime | None = None, 

272 ) -> int: 

273 """ 

274 Iterate through the Redis Index using 'SCAN' and delete any entries older than 

275 'protect_blobs_after'. The ordering of the deletes is undefined and can't be assumed 

276 to be LRU. Large blobs can optionally be configured to have a separate lifetime. 

277 """ 

278 now = datetime.now(timezone.utc) 

279 

280 if protect_blobs_after: 

281 threshold_time = protect_blobs_after 

282 else: 

283 threshold_time = now 

284 

285 seen: set[str] = set() 

286 bytes_deleted = 0 

287 

288 while n_bytes > 0: 

289 # Used for metric publishing 

290 delete_start_time = time.time() 

291 

292 # Maybe count should be configurable or somehow self-tuning 

293 # based on how many deletable keys we're actually getting 

294 # back per-request. 

295 # We could also choose random prefixes for the scan so that 

296 # multiple cleanup process are less likely to contend 

297 rawkeys: list[bytes] 

298 previous_cursor = self._delete_n_bytes_cursor 

299 self._delete_n_bytes_cursor, rawkeys = self._redis.execute_ro( 

300 lambda r: r.scan(match=f"{self._prefix}:*", cursor=self._delete_n_bytes_cursor, count=1000) 

301 ) 

302 keys = [key.decode() for key in rawkeys if key != b""] 

303 

304 def get_ttls(r: "redis.Redis[bytes]") -> list[bytes]: 

305 pipe = r.pipeline(transaction=False) 

306 for key in keys: 

307 # Skip over any total_size keys 

308 if key.split(":")[-1] == "total_size": 

309 continue 

310 pipe.ttl(key) 

311 return pipe.execute() 

312 

313 raw_ttls = self._redis.execute_ro(get_ttls) 

314 ttls = [int(x) for x in raw_ttls] 

315 

316 LOGGER.debug("Scan returned.", tags=dict(key_count=len(ttls))) 

317 digests_to_delete: list[Digest] = [] 

318 failed_deletes: list[str] = [] 

319 new_blob_bytes = 0 

320 for key, ttl in zip(keys, ttls): 

321 digest = self._deconstruct_key(key) 

322 if digest and digest.hash not in seen: 

323 seen.add(digest.hash) 

324 # Since FMB sets the ttl to self._ttl on every call we can 

325 # use the time remaining to figure out when the last FMB 

326 # call for that blob was. 

327 blob_time = now - (self._ttl - timedelta(seconds=ttl)) 

328 if n_bytes <= 0: 

329 # Reset scan cursor to previous value to not skip 

330 # the digests we didn't get to 

331 self._delete_n_bytes_cursor = previous_cursor 

332 break 

333 

334 if (blob_time <= threshold_time) or ( 

335 large_blob_threshold 

336 and large_blob_lifetime 

337 and digest.size_bytes > large_blob_threshold 

338 and blob_time <= large_blob_lifetime 

339 ): 

340 n_bytes -= digest.size_bytes 

341 digests_to_delete.append(digest) 

342 else: 

343 new_blob_bytes += digest.size_bytes 

344 

345 if digests_to_delete: 

346 if dry_run: 

347 LOGGER.debug("Detected deletable digests.", tags=dict(digest_count=len(digests_to_delete))) 

348 for digest in digests_to_delete: 

349 if digest not in failed_deletes: 

350 bytes_deleted += digest.size_bytes 

351 else: 

352 LOGGER.debug("Deleting digests.", tags=dict(digest_count=len(digests_to_delete))) 

353 failed_deletes = self.bulk_delete(digests_to_delete) 

354 blobs_deleted = 0 

355 for digest in digests_to_delete: 

356 if digest not in failed_deletes: 

357 blobs_deleted += 1 

358 bytes_deleted += digest.size_bytes 

359 

360 batch_duration = time.time() - delete_start_time 

361 blobs_deleted_per_second = blobs_deleted / batch_duration 

362 publish_gauge_metric(METRIC.CLEANUP.BLOBS_DELETED_PER_SECOND, blobs_deleted_per_second) 

363 elif new_blob_bytes > 0: 

364 LOGGER.error( 

365 "All remaining digests have been accessed within the time threshold", 

366 tags=dict(new_blob_bytes=new_blob_bytes, threshold_time=protect_blobs_after), 

367 ) 

368 

369 if self._delete_n_bytes_cursor == 0: # scan finished 

370 LOGGER.debug("Cursor exhausted.") 

371 break 

372 

373 return bytes_deleted