Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/redis.py: 96.39%

166 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16A storage provider that uses redis to maintain existence and expiry metadata 

17for a storage. 

18""" 

19 

20import logging 

21import time 

22from datetime import datetime, timedelta 

23from typing import IO, Dict, Iterator, List, Optional, Set, Tuple 

24 

25import redis 

26 

27from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

28from buildgrid._protos.google.rpc import code_pb2 

29from buildgrid._protos.google.rpc.status_pb2 import Status 

30from buildgrid.server.cas.storage.index.index_abc import IndexABC 

31from buildgrid.server.cas.storage.storage_abc import StorageABC 

32from buildgrid.server.metrics_names import ( 

33 CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, 

34 CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME, 

35) 

36from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric 

37from buildgrid.server.redis.provider import RedisProvider 

38from buildgrid.utils import validate_digest_data 

39 

40LOGGER = logging.getLogger(__name__) 

41 

42 

43class RedisIndex(IndexABC): 

44 def __init__(self, redis: RedisProvider, storage: StorageABC) -> None: 

45 self._redis = redis 

46 self._storage = storage 

47 # TODO: implement redis notification based cleanup, make this configurable, and lower the default 

48 self._ttl = timedelta(days=365) 

49 

50 def start(self) -> None: 

51 self._storage.start() 

52 

53 def stop(self) -> None: 

54 self._storage.stop() 

55 

56 def _construct_key(self, digest: Digest) -> str: 

57 """Helper to get the redis key name for a particular digest""" 

58 # The tag prefix serves to distinguish between our keys and 

59 # actual blobs if the same redis is used for both index and storage 

60 return "A:" + digest.hash + "_" + str(digest.size_bytes) 

61 

62 def _deconstruct_key(self, keystr: str) -> Optional[Digest]: 

63 """Helper to attempt to recover a Digest from a redis key""" 

64 

65 try: 

66 tag, rest = keystr.split(":", 1) 

67 if tag != "A": 

68 return None 

69 hash, size_bytes = rest.rsplit("_", 1) 

70 return Digest(hash=hash, size_bytes=int(size_bytes)) 

71 except ValueError: 

72 return None 

73 

74 def has_blob(self, digest: Digest) -> bool: 

75 # Redis is authoritative for existence, no need to check storage. 

76 return bool(self._redis.execute_ro(lambda r: r.exists(self._construct_key(digest)))) 

77 

78 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

79 # If storage has the blob that's enough, no need to check the index. 

80 # We aren't expecting to typically "discover" blobs this way and a 

81 # get request does not extend the ttl so we don't update the index. 

82 return self._storage.get_blob(digest) 

83 

84 def delete_blob(self, digest: Digest) -> None: 

85 # If the initial delete doesn't delete anything due to the key not existing 

86 # don't do anything else 

87 if self._redis.execute_rw(lambda r: r.delete(self._construct_key(digest))): 

88 self._storage.delete_blob(digest) 

89 

90 # If we race with a blob being re-added we might have just deleted the 

91 # storage out from under it. We don't want the index to end up with 

92 # keys for things that are not present in storage since we consider 

93 # the index authoritative for existance. So we delete the keys again 

94 # after deleting from storage, this way if they do get out of sync it 

95 # will be in the direction of leaking objects in storage that the 

96 # index doesn't know about. 

97 def delete_from_index(r: "redis.Redis[bytes]") -> None: 

98 pipe = r.pipeline() 

99 pipe.delete(self._construct_key(digest)) 

100 pipe.decrby("total_size", digest.size_bytes) 

101 pipe.execute() 

102 

103 self._redis.execute_rw(delete_from_index) 

104 

105 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

106 self._storage.commit_write(digest, write_session) 

107 

108 def set_ttl(r: "redis.Redis[bytes]") -> None: 

109 key = self._construct_key(digest) 

110 # Only increment total_size if this key is new 

111 # Use a dummy value of 1. We only care about existence and expiry 

112 if r.set(key, 1, ex=self._ttl, nx=True) is not None: 

113 r.incrby("total_size", digest.size_bytes) 

114 

115 self._redis.execute_rw(set_ttl) 

116 

117 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

118 # Delete from the index and then delete from the backing storage. 

119 def delete_from_index(r: "redis.Redis[bytes]") -> List[Digest]: 

120 pipe = r.pipeline() 

121 bytes_deleted = 0 

122 for digest in digests: 

123 pipe.delete(self._construct_key(digest)) 

124 results = pipe.execute() 

125 # Go through the delete calls and only decrement total_size for the keys 

126 # which were actually removed 

127 successful_deletes = [] 

128 for result, digest in zip(results, digests): 

129 if result: 

130 bytes_deleted += digest.size_bytes 

131 successful_deletes.append(digest) 

132 r.decrby("total_size", bytes_deleted) 

133 return successful_deletes 

134 

135 successful_deletes = self._redis.execute_rw(delete_from_index) 

136 failed_deletes = self._storage.bulk_delete(successful_deletes) 

137 return failed_deletes 

138 

139 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

140 # We hit the RW node for every FMB call to extend all the TTLs. 

141 # This could try to take advantage of RO replicas by only hitting the 

142 # RW node for blobs that do not have enough TTL left, if any. 

143 # We currently rely on the always-updated TTL to determine if a blob 

144 # should be protected in mark_n_bytes_as_deleted. If we allow some 

145 # slop before updating the RW node here we need to account for it 

146 # there too. 

147 def extend_ttls(r: "redis.Redis[bytes]") -> List[int]: 

148 pipe = r.pipeline(transaction=False) 

149 for digest in digests: 

150 pipe.expire(name=self._construct_key(digest), time=self._ttl) 

151 return pipe.execute() 

152 

153 extend_results = self._redis.execute_rw(extend_ttls) 

154 

155 return [digest for digest, result in zip(digests, extend_results) if result != 1] 

156 

157 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

158 result_map: Dict[str, Status] = {} 

159 missing_blob_pairs: List[Tuple[Digest, bytes]] = [] 

160 missing_blobs = self.missing_blobs([digest for digest, _ in blobs]) 

161 for digest, blob in blobs: 

162 if digest not in missing_blobs: 

163 if validate_digest_data(digest, blob): 

164 result_map[digest.hash] = Status(code=code_pb2.OK) 

165 else: 

166 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash") 

167 else: 

168 missing_blob_pairs.append((digest, blob)) 

169 results = self._storage.bulk_update_blobs(missing_blob_pairs) 

170 

171 def set_ttls(r: "redis.Redis[bytes]") -> None: 

172 pipe = r.pipeline() 

173 bytes_added = 0 

174 for digest, result in zip(missing_blobs, results): 

175 result_map[digest.hash] = result 

176 if result.code == code_pb2.OK: 

177 key = self._construct_key(digest) 

178 # Use a dummy value of 1. We only care about existence and expiry 

179 pipe.set(key, 1, ex=self._ttl, nx=True) 

180 redis_results = pipe.execute() 

181 # only update total_size for brand new keys 

182 for result, digest in zip(redis_results, missing_blobs): 

183 if result is not None: 

184 bytes_added += digest.size_bytes 

185 r.incrby("total_size", bytes_added) 

186 

187 self._redis.execute_rw(set_ttls) 

188 return [result_map[digest.hash] for digest, _ in blobs] 

189 

190 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

191 # If storage has the blob that's enough, no need to check the index. 

192 # We aren't expecting to typically "discover" blobs this way and a 

193 # get request does not extend the ttl so we don't update the index. 

194 return self._storage.bulk_read_blobs(digests) 

195 

196 def least_recent_digests(self) -> Iterator[Digest]: 

197 """Generator to iterate through the digests in LRU order""" 

198 # This is not a LRU index, this method is used only from tests. 

199 raise NotImplementedError() 

200 

201 def get_total_size(self, include_marked: bool = True) -> int: 

202 """ 

203 Return the sum of the size of all blobs within the index 

204 

205 For the redis index, include_marked does not apply. 

206 """ 

207 

208 # The total_size represents what we have stored in the underlying 

209 # storage. However, if some redis notifications for expiring keys 

210 # are missed we won't actually have keys to account for all the size. 

211 # The expectation is that a "janitor" process will locate orphaned 

212 # blobs in storage eventually and when it does so it will call our 

213 # delete_blob which will finally decrby the total_size. 

214 total_size = self._redis.execute_ro(lambda r: r.get("total_size")) 

215 if total_size: 

216 return int(total_size) 

217 else: 

218 return 0 

219 

220 def delete_n_bytes( 

221 self, n_bytes: int, dry_run: bool = False, protect_blobs_after: Optional[datetime] = None 

222 ) -> int: 

223 """ 

224 Iterate through the Redis Index using 'SCAN' and delete any entries older than 

225 'protect_blobs_after'. The ordering of the deletes is undefined and can't be assumed 

226 to be LRU. 

227 """ 

228 now = datetime.utcnow() 

229 

230 if protect_blobs_after: 

231 threshold_time = protect_blobs_after 

232 else: 

233 threshold_time = now 

234 

235 # Used for metric publishing 

236 delete_start_time = time.time() 

237 metadata = {} 

238 if self.instance_name: 

239 metadata["instance-name"] = self.instance_name 

240 

241 seen: Set[str] = set() 

242 cursor = 0 

243 bytes_deleted = 0 

244 

245 while n_bytes > 0: 

246 # Maybe count should be configurable or somehow self-tuning 

247 # based on how many deletable keys we're actually getting 

248 # back per-request. 

249 # We could also choose random prefixes for the scan so that 

250 # multiple cleanup process are less likely to contend 

251 rawkeys: List[bytes] 

252 cursor, rawkeys = self._redis.execute_ro(lambda r: r.scan(match="A:*", cursor=cursor, count=1000)) 

253 keys = [key.decode() for key in rawkeys] 

254 

255 def get_ttls(r: "redis.Redis[bytes]") -> List[bytes]: 

256 pipe = r.pipeline(transaction=False) 

257 for key in keys: 

258 pipe.ttl(key) 

259 return pipe.execute() 

260 

261 raw_ttls = self._redis.execute_ro(get_ttls) 

262 ttls = [int(x) for x in raw_ttls] 

263 

264 LOGGER.debug(f"scan returned {len(ttls)} keys") 

265 digests_to_delete: List[Digest] = [] 

266 failed_deletes: List[str] = [] 

267 for key, ttl in zip(keys, ttls): 

268 # Since FMB sets the ttl to self._ttl on every call we can 

269 # use the time remaining to figure out when the last FMB 

270 # call for that blob was. 

271 blob_time = now - (self._ttl - timedelta(seconds=ttl)) 

272 if n_bytes <= 0: 

273 break 

274 

275 digest = self._deconstruct_key(key) 

276 if digest and (blob_time <= threshold_time) and (digest.hash not in seen): 

277 n_bytes -= digest.size_bytes 

278 digests_to_delete.append(digest) 

279 

280 if digests_to_delete: 

281 if dry_run: 

282 LOGGER.debug(f"Would delete {len(digests_to_delete)} digests") 

283 else: 

284 LOGGER.debug(f"Deleting {len(digests_to_delete)} digests") 

285 failed_deletes = self.bulk_delete(digests_to_delete) 

286 publish_counter_metric( 

287 CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME, len(failed_deletes), metadata 

288 ) 

289 for digest in digests_to_delete: 

290 if digest not in failed_deletes: 

291 bytes_deleted += digest.size_bytes 

292 

293 if cursor == 0: # scan finished 

294 LOGGER.debug("cursor exhausted") 

295 break 

296 

297 batch_duration = time.time() - delete_start_time 

298 blobs_deleted_per_second = (len(digests_to_delete) - len(failed_deletes)) / batch_duration 

299 if not dry_run: 

300 publish_gauge_metric(CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, blobs_deleted_per_second, metadata) 

301 return bytes_deleted