Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/redis.py: 96.43%

196 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16A storage provider that uses redis to maintain existence and expiry metadata 

17for a storage. 

18""" 

19 

20 

21import time 

22from datetime import datetime, timedelta 

23from typing import IO, Dict, Iterator, List, Optional, Set, Tuple 

24 

25import redis 

26 

27from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

28from buildgrid._protos.google.rpc import code_pb2 

29from buildgrid._protos.google.rpc.status_pb2 import Status 

30from buildgrid.server.cas.storage.index.index_abc import IndexABC 

31from buildgrid.server.cas.storage.storage_abc import StorageABC 

32from buildgrid.server.decorators import timed 

33from buildgrid.server.logging import buildgrid_logger 

34from buildgrid.server.metrics_names import METRIC 

35from buildgrid.server.metrics_utils import publish_gauge_metric 

36from buildgrid.server.redis.provider import RedisProvider 

37from buildgrid.server.utils.digests import validate_digest_data 

38 

39LOGGER = buildgrid_logger(__name__) 

40 

41 

42class RedisIndex(IndexABC): 

43 TYPE = "RedisIndex" 

44 

45 def __init__(self, redis: RedisProvider, storage: StorageABC) -> None: 

46 self._redis = redis 

47 self._storage = storage 

48 # TODO: implement redis notification based cleanup, make this configurable, and lower the default 

49 self._ttl = timedelta(days=365) 

50 

51 # Keep track of the last returned scan cursor 

52 # to not start at the beginning for each call to `delete_n_bytes` 

53 self._delete_n_bytes_cursor = 0 

54 

55 def start(self) -> None: 

56 self._storage.start() 

57 

58 def stop(self) -> None: 

59 self._storage.stop() 

60 

61 def _construct_key(self, digest: Digest) -> str: 

62 """Helper to get the redis key name for a particular digest""" 

63 # The tag prefix serves to distinguish between our keys and 

64 # actual blobs if the same redis is used for both index and storage 

65 return "A:" + digest.hash + "_" + str(digest.size_bytes) 

66 

67 def _deconstruct_key(self, keystr: str) -> Optional[Digest]: 

68 """Helper to attempt to recover a Digest from a redis key""" 

69 

70 try: 

71 tag, rest = keystr.split(":", 1) 

72 if tag != "A": 

73 return None 

74 hash, size_bytes = rest.rsplit("_", 1) 

75 return Digest(hash=hash, size_bytes=int(size_bytes)) 

76 except ValueError: 

77 return None 

78 

79 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

80 def has_blob(self, digest: Digest) -> bool: 

81 # Redis is authoritative for existence, no need to check storage. 

82 return bool(self._redis.execute_ro(lambda r: r.exists(self._construct_key(digest)))) 

83 

84 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

85 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

86 if blob := self._storage.get_blob(digest): 

87 return blob 

88 

89 deleted_index_digests = self._bulk_delete_from_index([digest]) 

90 for digest in deleted_index_digests: 

91 LOGGER.warning("Blob was indexed but not in storage. Deleted from the index.", tags=dict(digest=digest)) 

92 

93 return None 

94 

95 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

96 def delete_blob(self, digest: Digest) -> None: 

97 # If the initial delete doesn't delete anything due to the key not existing 

98 # don't do anything else 

99 if self._redis.execute_rw(lambda r: r.delete(self._construct_key(digest))): 

100 self._storage.delete_blob(digest) 

101 

102 # If we race with a blob being re-added we might have just deleted the 

103 # storage out from under it. We don't want the index to end up with 

104 # keys for things that are not present in storage since we consider 

105 # the index authoritative for existance. So we delete the keys again 

106 # after deleting from storage, this way if they do get out of sync it 

107 # will be in the direction of leaking objects in storage that the 

108 # index doesn't know about. 

109 def delete_from_index(r: "redis.Redis[bytes]") -> None: 

110 pipe = r.pipeline() 

111 pipe.delete(self._construct_key(digest)) 

112 pipe.decrby("total_size", digest.size_bytes) 

113 pipe.execute() 

114 

115 self._redis.execute_rw(delete_from_index) 

116 

117 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

118 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

119 self._storage.commit_write(digest, write_session) 

120 

121 def set_ttl(r: "redis.Redis[bytes]") -> None: 

122 key = self._construct_key(digest) 

123 # Only increment total_size if this key is new 

124 # Use a dummy value of 1. We only care about existence and expiry 

125 if r.set(key, 1, ex=self._ttl, nx=True) is not None: 

126 r.incrby("total_size", digest.size_bytes) 

127 

128 self._redis.execute_rw(set_ttl) 

129 

130 def _bulk_delete_from_index(self, digests: List[Digest]) -> List[Digest]: 

131 def delete_from_index(r: "redis.Redis[bytes]") -> List[Digest]: 

132 pipe = r.pipeline() 

133 bytes_deleted = 0 

134 for digest in digests: 

135 pipe.delete(self._construct_key(digest)) 

136 results = pipe.execute() 

137 # Go through the delete calls and only decrement total_size for the keys 

138 # which were actually removed 

139 successful_deletes = [] 

140 for result, digest in zip(results, digests): 

141 if result: 

142 bytes_deleted += digest.size_bytes 

143 successful_deletes.append(digest) 

144 r.decrby("total_size", bytes_deleted) 

145 return successful_deletes 

146 

147 successful_deletes = self._redis.execute_rw(delete_from_index) 

148 return successful_deletes 

149 

150 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

151 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

152 # Delete from the index and then delete from the backing storage. 

153 successful_deletes = self._bulk_delete_from_index(digests) 

154 failed_deletes = self._storage.bulk_delete(successful_deletes) 

155 return failed_deletes 

156 

157 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

158 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

159 # We hit the RW node for every FMB call to extend all the TTLs. 

160 # This could try to take advantage of RO replicas by only hitting the 

161 # RW node for blobs that do not have enough TTL left, if any. 

162 # We currently rely on the always-updated TTL to determine if a blob 

163 # should be protected in mark_n_bytes_as_deleted. If we allow some 

164 # slop before updating the RW node here we need to account for it 

165 # there too. 

166 def extend_ttls(r: "redis.Redis[bytes]") -> List[int]: 

167 pipe = r.pipeline(transaction=False) 

168 for digest in digests: 

169 pipe.expire(name=self._construct_key(digest), time=self._ttl) 

170 return pipe.execute() 

171 

172 extend_results = self._redis.execute_rw(extend_ttls) 

173 

174 return [digest for digest, result in zip(digests, extend_results) if result != 1] 

175 

176 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

177 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

178 result_map: Dict[str, Status] = {} 

179 missing_blob_pairs: List[Tuple[Digest, bytes]] = [] 

180 missing_blobs = self.missing_blobs([digest for digest, _ in blobs]) 

181 for digest, blob in blobs: 

182 if digest not in missing_blobs: 

183 if validate_digest_data(digest, blob): 

184 result_map[digest.hash] = Status(code=code_pb2.OK) 

185 else: 

186 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash") 

187 else: 

188 missing_blob_pairs.append((digest, blob)) 

189 results = self._storage.bulk_update_blobs(missing_blob_pairs) 

190 

191 def set_ttls(r: "redis.Redis[bytes]") -> None: 

192 pipe = r.pipeline() 

193 bytes_added = 0 

194 for digest, result in zip(missing_blobs, results): 

195 result_map[digest.hash] = result 

196 if result.code == code_pb2.OK: 

197 key = self._construct_key(digest) 

198 # Use a dummy value of 1. We only care about existence and expiry 

199 pipe.set(key, 1, ex=self._ttl, nx=True) 

200 redis_results = pipe.execute() 

201 # only update total_size for brand new keys 

202 for result, digest in zip(redis_results, missing_blobs): 

203 if result is not None: 

204 bytes_added += digest.size_bytes 

205 r.incrby("total_size", bytes_added) 

206 

207 self._redis.execute_rw(set_ttls) 

208 return [result_map[digest.hash] for digest, _ in blobs] 

209 

210 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

211 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

212 fetched_digests = self._storage.bulk_read_blobs(digests) 

213 

214 fetched_digest_hashes = set(digest_hash for (digest_hash, _) in fetched_digests.items()) 

215 digests_not_in_storage: List[Digest] = [] 

216 for expected_digest in digests: 

217 if expected_digest.hash not in fetched_digest_hashes: 

218 digests_not_in_storage.append(expected_digest) 

219 

220 if digests_not_in_storage: 

221 deleted_index_digests = self._bulk_delete_from_index(digests_not_in_storage) 

222 for digest in deleted_index_digests: 

223 LOGGER.warning( 

224 "Blob was indexed but not in storage. Deleted from the index.", tags=dict(digest=digest) 

225 ) 

226 

227 return fetched_digests 

228 

229 def least_recent_digests(self) -> Iterator[Digest]: 

230 """Generator to iterate through the digests in LRU order""" 

231 # This is not a LRU index, this method is used only from tests. 

232 raise NotImplementedError() 

233 

234 def get_total_size(self) -> int: 

235 """ 

236 Return the sum of the size of all blobs within the index 

237 """ 

238 

239 # The total_size represents what we have stored in the underlying 

240 # storage. However, if some redis notifications for expiring keys 

241 # are missed we won't actually have keys to account for all the size. 

242 # The expectation is that a "janitor" process will locate orphaned 

243 # blobs in storage eventually and when it does so it will call our 

244 # delete_blob which will finally decrby the total_size. 

245 total_size = self._redis.execute_ro(lambda r: r.get("total_size")) 

246 if total_size: 

247 return int(total_size) 

248 else: 

249 return 0 

250 

251 def delete_n_bytes( 

252 self, n_bytes: int, dry_run: bool = False, protect_blobs_after: Optional[datetime] = None 

253 ) -> int: 

254 """ 

255 Iterate through the Redis Index using 'SCAN' and delete any entries older than 

256 'protect_blobs_after'. The ordering of the deletes is undefined and can't be assumed 

257 to be LRU. 

258 """ 

259 now = datetime.utcnow() 

260 

261 if protect_blobs_after: 

262 threshold_time = protect_blobs_after 

263 else: 

264 threshold_time = now 

265 

266 seen: Set[str] = set() 

267 bytes_deleted = 0 

268 

269 while n_bytes > 0: 

270 # Used for metric publishing 

271 delete_start_time = time.time() 

272 

273 # Maybe count should be configurable or somehow self-tuning 

274 # based on how many deletable keys we're actually getting 

275 # back per-request. 

276 # We could also choose random prefixes for the scan so that 

277 # multiple cleanup process are less likely to contend 

278 rawkeys: List[bytes] 

279 previous_cursor = self._delete_n_bytes_cursor 

280 self._delete_n_bytes_cursor, rawkeys = self._redis.execute_ro( 

281 lambda r: r.scan(match="A:*", cursor=self._delete_n_bytes_cursor, count=1000) 

282 ) 

283 keys = [key.decode() for key in rawkeys] 

284 

285 def get_ttls(r: "redis.Redis[bytes]") -> List[bytes]: 

286 pipe = r.pipeline(transaction=False) 

287 for key in keys: 

288 pipe.ttl(key) 

289 return pipe.execute() 

290 

291 raw_ttls = self._redis.execute_ro(get_ttls) 

292 ttls = [int(x) for x in raw_ttls] 

293 

294 LOGGER.debug("Scan returned.", tags=dict(key_count=len(ttls))) 

295 digests_to_delete: List[Digest] = [] 

296 failed_deletes: List[str] = [] 

297 for key, ttl in zip(keys, ttls): 

298 # Since FMB sets the ttl to self._ttl on every call we can 

299 # use the time remaining to figure out when the last FMB 

300 # call for that blob was. 

301 blob_time = now - (self._ttl - timedelta(seconds=ttl)) 

302 if n_bytes <= 0: 

303 # Reset scan cursor to previous value to not skip 

304 # the digests we didn't get to 

305 self._delete_n_bytes_cursor = previous_cursor 

306 break 

307 

308 digest = self._deconstruct_key(key) 

309 if digest and (blob_time <= threshold_time) and (digest.hash not in seen): 

310 n_bytes -= digest.size_bytes 

311 digests_to_delete.append(digest) 

312 

313 if digests_to_delete: 

314 if dry_run: 

315 LOGGER.debug("Detected deletable digests.", tags=dict(digest_count=len(digests_to_delete))) 

316 for digest in digests_to_delete: 

317 if digest not in failed_deletes: 

318 bytes_deleted += digest.size_bytes 

319 else: 

320 LOGGER.debug("Deleting digests.", tags=dict(digest_count=len(digests_to_delete))) 

321 failed_deletes = self.bulk_delete(digests_to_delete) 

322 blobs_deleted = 0 

323 for digest in digests_to_delete: 

324 if digest not in failed_deletes: 

325 blobs_deleted += 1 

326 bytes_deleted += digest.size_bytes 

327 

328 batch_duration = time.time() - delete_start_time 

329 blobs_deleted_per_second = blobs_deleted / batch_duration 

330 publish_gauge_metric(METRIC.CLEANUP.BLOBS_DELETED_PER_SECOND, blobs_deleted_per_second) 

331 

332 if self._delete_n_bytes_cursor == 0: # scan finished 

333 LOGGER.debug("Cursor exhausted.") 

334 break 

335 

336 return bytes_deleted