Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql.py: 92.91%

409 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SQLIndex 

18================== 

19 

20A SQL index implementation. This can be pointed to either a remote SQL server 

21or a local SQLite database. 

22 

23""" 

24import io 

25import itertools 

26import logging 

27import time 

28from collections import deque 

29from datetime import datetime, timedelta 

30from typing import IO, Any, Deque, Dict, Iterator, List, Optional, Sequence, Tuple, cast 

31 

32from sqlalchemy import Column, and_, delete, func, not_, select, text 

33from sqlalchemy.exc import DBAPIError 

34from sqlalchemy.orm import Session, load_only 

35from sqlalchemy.orm.exc import StaleDataError 

36from sqlalchemy.orm.query import Query 

37from sqlalchemy.orm.session import Session as SessionType 

38from sqlalchemy.sql.elements import BooleanClauseList 

39 

40from buildgrid._exceptions import StorageFullError 

41from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

42from buildgrid._protos.google.rpc import code_pb2 

43from buildgrid._protos.google.rpc.status_pb2 import Status 

44from buildgrid.server.metrics_names import ( 

45 CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME, 

46 CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME, 

47 CAS_INDEX_GET_BLOB_TIME_METRIC_NAME, 

48 CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME, 

49 CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME, 

50 CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, 

51 CLEANUP_INDEX_BULK_DELETE_METRIC_NAME, 

52 CLEANUP_INDEX_MARK_DELETED_METRIC_NAME, 

53 CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME, 

54 CLEANUP_STORAGE_BULK_DELETE_METRIC_NAME, 

55 CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME, 

56) 

57from buildgrid.server.metrics_utils import ( 

58 DurationMetric, 

59 generator_method_duration_metric, 

60 publish_counter_metric, 

61 publish_gauge_metric, 

62 publish_timer_metric, 

63) 

64from buildgrid.server.persistence.sql.models import IndexEntry 

65from buildgrid.server.sql.provider import SqlProvider 

66from buildgrid.utils import read_and_rewind, validate_digest_data 

67 

68from ..storage_abc import StorageABC 

69from .index_abc import IndexABC 

70from .sql_dialect_delegates import PostgreSQLDelegate, SQLiteDelegate 

71 

72LOGGER = logging.getLogger(__name__) 

73DIALECT_DELEGATES = {"postgresql": PostgreSQLDelegate, "sqlite": SQLiteDelegate} 

74 

75INLINE_BLOB_SIZE_HARD_MAXIMUM = 1000000000 

76 

77 

78class SQLIndex(IndexABC): 

79 def __init__( 

80 self, 

81 sql_provider: SqlProvider, 

82 storage: StorageABC, 

83 *, 

84 window_size: int = 1000, 

85 inclause_limit: int = -1, 

86 max_inline_blob_size: int = 0, 

87 refresh_accesstime_older_than: int = 0, 

88 **kwargs: Any, 

89 ) -> None: 

90 base_argnames = ["fallback_on_get"] 

91 base_args = {} 

92 for arg in base_argnames: 

93 if arg in kwargs: 

94 base_args[arg] = kwargs.pop(arg) 

95 super().__init__(**base_args) 

96 

97 self._sql = sql_provider 

98 self._storage = storage 

99 self._instance_name = None 

100 

101 if max_inline_blob_size > INLINE_BLOB_SIZE_HARD_MAXIMUM: 

102 raise ValueError( 

103 f"Max inline blob size is [{max_inline_blob_size}], " 

104 f"but must be less than [{INLINE_BLOB_SIZE_HARD_MAXIMUM}]." 

105 ) 

106 if max_inline_blob_size >= 0: 

107 self._max_inline_blob_size = max_inline_blob_size 

108 else: 

109 raise ValueError(f"Max inline blob size is [{max_inline_blob_size}], but must be nonnegative.") 

110 

111 if refresh_accesstime_older_than >= 0: 

112 # Measured in seconds. Helps reduce the frequency of timestamp updates during heavy read. 

113 self.refresh_accesstime_older_than = refresh_accesstime_older_than 

114 else: 

115 raise ValueError( 

116 f"'refresh_accesstime_older_than' must be nonnegative. It is {refresh_accesstime_older_than}" 

117 ) 

118 

119 # Max # of rows to fetch while iterating over all blobs 

120 # (e.g. in least_recent_digests) 

121 self._all_blobs_window_size = window_size 

122 

123 # This variable stores the list of whereclauses (SQLAlchemy BooleanClauseList objects) 

124 # generated from the _column_windows() using time-expensive SQL query. 

125 # These whereclauses are used to construct the final SQL query 

126 # during cleanup in order to fetch blobs by time windows. 

127 # 

128 # Inside the _column_windows() a list of timestamp boarders are obtained: 

129 # intervals = [t1, t2, t3, ...] 

130 # Then the generated whereclauses might represent semantically as, for example,: 

131 # self._queue_of_whereclauses = [ 

132 # "WHERE t1 <= IndexEntry.accessed_timestamp < t2", 

133 # "WHERE t2 <= IndexEntry.accessed_timestamp < t3", 

134 # "WHERE t3 <= IndexEntry.accessed_timestamp < t4", 

135 # ... and so on] 

136 # Note the number of entries in each window is determined by 

137 # the instance variable "_all_blobs_window_size". 

138 self._queue_of_whereclauses: Deque[BooleanClauseList[Any]] = deque() 

139 

140 # Whether entries with deleted=True should be considered by mark_n_bytes. 

141 # This is useful to catch any half-finished deletes where a cleanup process 

142 # may have exited half-way through deletion. Once all premarked blobs have been 

143 # deleted this becomes False and is only reset after a full scan of the database 

144 self._delete_premarked_blobs: bool = True 

145 

146 # Only pass known kwargs to db session 

147 available_options = {"pool_size", "max_overflow", "pool_timeout", "pool_pre_ping", "pool_recycle"} 

148 kwargs_keys = kwargs.keys() 

149 if kwargs_keys > available_options: 

150 unknown_args = kwargs_keys - available_options 

151 raise TypeError(f"Unknown keyword arguments: [{unknown_args}]") 

152 

153 # Dialect-specific initialization 

154 self._dialect_delegate = DIALECT_DELEGATES.get(self._sql.dialect) 

155 

156 if inclause_limit > 0: 

157 if inclause_limit > window_size: 

158 LOGGER.warning( 

159 f"Configured inclause_limit [{inclause_limit}] " f"is greater than window_size [{window_size}]" 

160 ) 

161 self._inclause_limit = inclause_limit 

162 else: 

163 # If the inlimit isn't explicitly set, we use a default that 

164 # respects both the window size and the db implementation's 

165 # inlimit. 

166 self._inclause_limit = min(window_size, self._sql.default_inlimit) 

167 LOGGER.debug("SQL index: using default inclause limit " f"of {self._inclause_limit}") 

168 

169 # Make a test query against the database to ensure the connection is valid 

170 with self._sql.scoped_session() as session: 

171 session.query(IndexEntry).first() 

172 self._sql.remove_scoped_session() 

173 

174 def start(self) -> None: 

175 self._storage.start() 

176 

177 def stop(self) -> None: 

178 self._storage.stop() 

179 

180 def has_blob(self, digest: Digest) -> bool: 

181 with self._sql.scoped_session() as session: 

182 statement = select(func.count(IndexEntry.digest_hash)).where(IndexEntry.digest_hash == digest.hash) 

183 

184 num_entries = session.execute(statement).scalar() 

185 if num_entries is None: 

186 num_entries = 0 

187 

188 if num_entries == 1: 

189 return True 

190 elif num_entries < 1: 

191 return False 

192 else: 

193 raise RuntimeError(f"Multiple results found for blob [{digest}]. The index is in a bad state.") 

194 

195 @DurationMetric(CAS_INDEX_GET_BLOB_TIME_METRIC_NAME, instanced=True) 

196 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

197 """Get a blob from the index or the backing storage. Optionally fallback and repair index""" 

198 

199 # Check the index for the blob and return if found. 

200 with self._sql.scoped_session() as session: 

201 if entry := session.query(IndexEntry).filter(IndexEntry.digest_hash == digest.hash).first(): 

202 if entry.inline_blob is not None: 

203 return io.BytesIO(entry.inline_blob) 

204 elif blob := self._storage.get_blob(digest): 

205 # Fix any blobs that should have been inlined. 

206 if digest.size_bytes <= self._max_inline_blob_size: 

207 self._save_digests_to_index([(digest, read_and_rewind(blob))], session) 

208 session.commit() 

209 return blob 

210 LOGGER.warning( 

211 f"Blob [{digest.hash}/{digest.size_bytes}] was indexed but not in storage. Deleting from the index" 

212 ) 

213 self._bulk_delete_from_index([digest], session) 

214 

215 # Check the storage for the blob and repair the index if found. 

216 if self._fallback_on_get: 

217 if blob := self._storage.get_blob(digest): 

218 with self._sql.scoped_session() as session: 

219 if digest.size_bytes <= self._max_inline_blob_size: 

220 self._save_digests_to_index([(digest, read_and_rewind(blob))], session) 

221 else: 

222 self._save_digests_to_index([(digest, None)], session) 

223 session.commit() 

224 return blob 

225 

226 # Blob was not found in index or storage 

227 return None 

228 

229 def delete_blob(self, digest: Digest) -> None: 

230 statement = delete(IndexEntry).where(IndexEntry.digest_hash == digest.hash) 

231 options = {"synchronize_session": False} 

232 

233 with self._sql.scoped_session() as session: 

234 session.execute(statement, execution_options=options) 

235 

236 self._storage.delete_blob(digest) 

237 

238 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

239 inline_blob = None 

240 if digest.size_bytes > self._max_inline_blob_size: 

241 self._storage.commit_write(digest, write_session) 

242 else: 

243 write_session.seek(0) 

244 inline_blob = write_session.read() 

245 try: 

246 with self._sql.scoped_session() as session: 

247 self._save_digests_to_index([(digest, inline_blob)], session) 

248 except DBAPIError as error: 

249 # Error has pgcode attribute (Postgres only) 

250 if hasattr(error.orig, "pgcode"): 

251 # imported here to avoid global dependency on psycopg2 

252 from psycopg2.errors import DiskFull, OutOfMemory 

253 

254 # 53100 == DiskFull && 53200 == OutOfMemory 

255 if error.orig.pgerror in [DiskFull, OutOfMemory]: 

256 raise StorageFullError(f"Postgres Error: {error.orig.pgcode}") from error 

257 raise error 

258 

259 def _partitioned_hashes(self, digests: Sequence[Digest]) -> Iterator[Iterator[str]]: 

260 """Given a long list of digests, split it into parts no larger than 

261 _inclause_limit and yield the hashes in each part. 

262 """ 

263 for part_start in range(0, len(digests), self._inclause_limit): 

264 part_end = min(len(digests), part_start + self._inclause_limit) 

265 part_digests = itertools.islice(digests, part_start, part_end) 

266 yield map(lambda digest: digest.hash, part_digests) 

267 

268 @generator_method_duration_metric(CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME) 

269 def _bulk_select_digests( 

270 self, digests: Sequence[Digest], fetch_blobs: bool = False, fetch_deleted: bool = True 

271 ) -> Iterator[IndexEntry]: 

272 """Generator that selects all rows matching a digest list. 

273 

274 SQLAlchemy Core is used for this because the ORM has problems with 

275 large numbers of bind variables for WHERE IN clauses. 

276 

277 We only select on the digest hash (not hash and size) to allow for 

278 index-only queries on db backends that support them. 

279 """ 

280 index_table = IndexEntry.__table__ 

281 with self._sql.scoped_session() as session: 

282 columns = [index_table.c.digest_hash] 

283 if fetch_blobs: 

284 columns.append(index_table.c.inline_blob) 

285 for part in self._partitioned_hashes(digests): 

286 stmt = select(columns).where(index_table.c.digest_hash.in_(part)) 

287 if not fetch_deleted: 

288 stmt = stmt.where(not_(index_table.c.deleted)) 

289 entries = session.execute(stmt) 

290 yield from entries # type: ignore 

291 

292 @DurationMetric(CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME, instanced=True) 

293 def _bulk_refresh_timestamps( 

294 self, digests: Sequence[Digest], session: SessionType, update_time: Optional[datetime] = None 

295 ) -> None: 

296 """Refresh all timestamps of the input digests. 

297 

298 SQLAlchemy Core is used for this because the ORM is not suitable for 

299 bulk inserts and updates. 

300 

301 https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow 

302 """ 

303 index_table = IndexEntry.__table__ 

304 current_time = datetime.utcnow() 

305 

306 # If a timestamp was passed in, use it. And always refreshes (no threshold). 

307 if update_time: 

308 timestamp = update_time 

309 last_accessed_threshold = current_time 

310 # Otherwise timestamp of digest will not refresh if it was last accessed more recent than this threshold. 

311 else: 

312 timestamp = current_time 

313 last_accessed_threshold = current_time - timedelta(seconds=self.refresh_accesstime_older_than) 

314 

315 for part in self._partitioned_hashes(digests): 

316 # Generate the SQL Statement: 

317 # UPDATE index SET accessed_timestamp=<timestamp> 

318 # WHERE index.digest_hash IN 

319 # (SELECT index.digest_hash FROM index 

320 # WHERE index.digest_hash IN <part> AND WHERE index.accessed_timestamp < <last_accessed_threshold> 

321 # FOR UPDATE SKIP LOCKED) 

322 stmt = ( 

323 index_table.update() 

324 .where( 

325 index_table.c.digest_hash.in_( 

326 select(index_table.c.digest_hash) 

327 .where(index_table.c.digest_hash.in_(part)) 

328 .where(index_table.c.accessed_timestamp < last_accessed_threshold) 

329 .with_for_update(skip_locked=True) 

330 ) 

331 ) 

332 .values(accessed_timestamp=timestamp) 

333 ) 

334 session.execute(stmt) 

335 session.commit() 

336 

337 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

338 # Blobs marked as deleted are considered as missing 

339 entries = self._bulk_select_digests(digests, fetch_deleted=False) 

340 

341 found_hashes = {entry.digest_hash for entry in entries} 

342 

343 # Split the digests into two found/missing lists 

344 found_digests, missing_digests = [], [] 

345 for digest in digests: 

346 if digest.hash in found_hashes: 

347 found_digests.append(digest) 

348 else: 

349 missing_digests.append(digest) 

350 

351 # Update all timestamps for blobs which were found 

352 with self._sql.scoped_session() as session: 

353 self._bulk_refresh_timestamps(found_digests, session) 

354 

355 return missing_digests 

356 

357 @DurationMetric(CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME, instanced=True) 

358 def _save_digests_to_index( 

359 self, digest_blob_pairs: List[Tuple[Digest, Optional[bytes]]], session: SessionType 

360 ) -> None: 

361 """Helper to persist a list of digest/blob pairs to the index. 

362 

363 Any digests present are updated, and new digests are inserted along with their inline blobs (if provided). 

364 Only blobs with size less than or equal to the max_inline_blob_size are inserted directly into the index. 

365 """ 

366 if not digest_blob_pairs: 

367 return 

368 

369 digest_blob_pairs = sorted(digest_blob_pairs, key=lambda pair: (pair[0].hash, pair[0].size_bytes)) 

370 

371 if self._dialect_delegate: 

372 try: 

373 self._dialect_delegate._save_digests_to_index( # type: ignore 

374 digest_blob_pairs, session, self._max_inline_blob_size 

375 ) 

376 return 

377 except AttributeError: 

378 pass 

379 

380 update_time = datetime.utcnow() 

381 # Figure out which digests we can just update 

382 digests = [digest for (digest, blob) in digest_blob_pairs] 

383 entries = self._bulk_select_digests(digests) 

384 # Map digests to new entries 

385 entries_not_present = { 

386 digest.hash: { 

387 "digest_hash": digest.hash, 

388 "digest_size_bytes": digest.size_bytes, 

389 "accessed_timestamp": update_time, 

390 "inline_blob": (blob if digest.size_bytes <= self._max_inline_blob_size else None), 

391 "deleted": False, 

392 } 

393 for (digest, blob) in digest_blob_pairs 

394 } 

395 

396 entries_present = {} 

397 for entry in entries: 

398 entries_present[entry.digest_hash] = entries_not_present[entry.digest_hash] 

399 del entries_not_present[entry.digest_hash] 

400 

401 if entries_not_present: 

402 session.bulk_insert_mappings(IndexEntry, entries_not_present.values()) # type: ignore 

403 if entries_present: 

404 session.bulk_update_mappings(IndexEntry, entries_present.values()) # type: ignore 

405 

406 def bulk_update_blobs( # pylint: disable=arguments-renamed 

407 self, digest_blob_pairs: List[Tuple[Digest, bytes]] 

408 ) -> List[Status]: 

409 """Implement the StorageABC's bulk_update_blobs method. 

410 

411 The StorageABC interface takes in a list of digest/blob pairs and 

412 returns a list of results. The list of results MUST be ordered to 

413 correspond with the order of the input list.""" 

414 pairs_to_store = [] 

415 result_map = {} 

416 

417 # For each blob, determine whether to store it in the backing storage or inline it 

418 for digest, blob in digest_blob_pairs: 

419 if validate_digest_data(digest, blob): 

420 if digest.size_bytes > self._max_inline_blob_size: 

421 pairs_to_store.append((digest, blob)) 

422 else: 

423 result_map[digest.hash] = Status(code=code_pb2.OK) 

424 else: 

425 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash") 

426 missing_blobs = self.missing_blobs([digest for digest, _ in pairs_to_store]) 

427 missing_blob_pairs = [] 

428 for digest, blob in pairs_to_store: 

429 if digest not in missing_blobs: 

430 result_map[digest.hash] = Status(code=code_pb2.OK) 

431 else: 

432 missing_blob_pairs.append((digest, blob)) 

433 

434 backup_results = self._storage.bulk_update_blobs(missing_blob_pairs) 

435 

436 for digest, result in zip(missing_blobs, backup_results): 

437 if digest.hash in result_map: 

438 # ERROR: blob was both inlined and backed up 

439 raise RuntimeError("Blob was both inlined and backed up.") 

440 result_map[digest.hash] = result 

441 

442 # Generate the final list of results 

443 pairs_to_inline: List[Tuple[Digest, Optional[bytes]]] = [] 

444 results = [] 

445 for digest, blob in digest_blob_pairs: 

446 status = result_map.get( 

447 digest.hash, 

448 Status(code=code_pb2.UNKNOWN, message="SQL Index: unable to determine the status of this blob"), 

449 ) 

450 results.append(status) 

451 if status.code == code_pb2.OK: 

452 pairs_to_inline.append((digest, blob)) 

453 

454 with self._sql.scoped_session() as session: 

455 self._save_digests_to_index(pairs_to_inline, session) 

456 

457 return results 

458 

459 def _bulk_read_blobs_with_fallback(self, digests: List[Digest]) -> Dict[str, bytes]: 

460 hash_to_digest: Dict[str, Digest] = {digest.hash: digest for digest in digests} 

461 results: Dict[str, bytes] = {} 

462 

463 expected_storage_digests: List[Digest] = [] 

464 # Fetch inlined blobs directly from the index 

465 entries = self._bulk_select_digests(digests, fetch_blobs=True) 

466 for e in entries: 

467 blob, digest_hash, digest = e.inline_blob, e.digest_hash, hash_to_digest[e.digest_hash] 

468 if blob is not None: 

469 results[digest_hash] = blob 

470 hash_to_digest.pop(digest_hash) 

471 else: 

472 # If a blob is not inlined then the blob is expected to be in storage 

473 expected_storage_digests.append(digest) 

474 

475 # Fetch everything that wasn't inlined from the backing storage 

476 fetched_digests = self._storage.bulk_read_blobs(list(hash_to_digest.values())) 

477 

478 # Save everything fetched from storage, inlining the blobs if they're small enough 

479 digest_pairs_to_save: List[Tuple[Digest, Optional[bytes]]] = [] 

480 for digest_hash, blob_data in fetched_digests.items(): 

481 if blob_data is not None: 

482 digest = hash_to_digest[digest_hash] 

483 if digest.size_bytes <= self._max_inline_blob_size: 

484 digest_pairs_to_save.append((digest, blob_data)) 

485 else: 

486 digest_pairs_to_save.append((digest, None)) 

487 results[digest_hash] = blob_data 

488 

489 # List of digests found in storage 

490 acutal_storage_digest_hashes = set( 

491 digest_hash for (digest_hash, blob_data) in fetched_digests.items() if blob_data is not None 

492 ) 

493 # Get a list of all the digests that were in the index but not found in storage 

494 digests_expected_not_in_storage: List[Digest] = [] 

495 for expected_digest in expected_storage_digests: 

496 if expected_digest.hash not in acutal_storage_digest_hashes: 

497 LOGGER.warning(f"Blob [{expected_digest}] was indexed but not in storage. Deleting from the index") 

498 digests_expected_not_in_storage.append(expected_digest) 

499 

500 with self._sql.scoped_session() as session: 

501 self._save_digests_to_index(digest_pairs_to_save, session) 

502 if digests_expected_not_in_storage: 

503 self._bulk_delete_from_index(digests_expected_not_in_storage, session) 

504 session.commit() 

505 

506 return results 

507 

508 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

509 if self._fallback_on_get: 

510 return self._bulk_read_blobs_with_fallback(digests) 

511 

512 # If fallback is disabled, query the index first and only 

513 # query the storage for blobs that weren't inlined there 

514 

515 hash_to_digest = {digest.hash: digest for digest in digests} # hash -> digest map 

516 results: Dict[str, bytes] = {} # The final list of results (return value) 

517 digests_to_fetch: List[Digest] = [] # Digests that need to be fetched from storage 

518 digest_pairs_to_save: List[Tuple[Digest, Optional[bytes]]] = [] # Digests that need to be updated in the index 

519 

520 # Fetch all of the digests in the database 

521 # Anything that wasn't already inlined needs to be fetched 

522 entries = self._bulk_select_digests(digests, fetch_blobs=True) 

523 for index_entry in entries: 

524 digest = hash_to_digest[index_entry.digest_hash] 

525 if index_entry.inline_blob is not None: 

526 results[index_entry.digest_hash] = index_entry.inline_blob 

527 else: 

528 digests_to_fetch.append(digest) 

529 

530 # Caution: digest whose blob cannot be found from storage will be dropped. 

531 if digests_to_fetch: 

532 fetched_digests = self._storage.bulk_read_blobs(digests_to_fetch) 

533 else: 

534 fetched_digests = {} 

535 

536 # Generate the list of inputs for _save_digests_to_index 

537 # 

538 # We only need to send blob data for small blobs fetched 

539 # from the storage since everything else is either too 

540 # big or already inlined 

541 for digest in digests_to_fetch: 

542 if blob_data := fetched_digests.get(digest.hash): 

543 if digest.size_bytes <= self._max_inline_blob_size: 

544 digest_pairs_to_save.append((digest, blob_data)) 

545 

546 acutal_storage_digests = set(digest_hash for (digest_hash, _) in fetched_digests.items()) 

547 # Get a list of all the digests that were in the index but not found in storage 

548 digests_expected_not_in_storage: List[Digest] = [] 

549 for expected_digest in digests_to_fetch: 

550 if expected_digest.hash not in acutal_storage_digests: 

551 LOGGER.warning( 

552 f"Blob [{expected_digest.hash}/{expected_digest.size_bytes}] " 

553 "was indexed but not in storage. Deleting from the index" 

554 ) 

555 digests_expected_not_in_storage.append(expected_digest) 

556 

557 # Update any blobs which need to be inlined 

558 with self._sql.scoped_session() as session: 

559 self._save_digests_to_index(digest_pairs_to_save, session) 

560 if digests_expected_not_in_storage: 

561 self._bulk_delete_from_index(digests_expected_not_in_storage, session) 

562 session.commit() 

563 

564 results.update(fetched_digests) 

565 return results 

566 

567 def _column_windows(self, session: SessionType, column: Column[Any]) -> Iterator[BooleanClauseList[Any]]: 

568 """Adapted from the sqlalchemy WindowedRangeQuery recipe. 

569 https://github.com/sqlalchemy/sqlalchemy/wiki/WindowedRangeQuery 

570 

571 This method breaks the timestamp range into windows and yields 

572 the borders of these windows to the callee. For example, the borders 

573 yielded by this might look something like 

574 ('2019-10-08 18:25:03.699863', '2019-10-08 18:25:03.751018') 

575 ('2019-10-08 18:25:03.751018', '2019-10-08 18:25:03.807867') 

576 ('2019-10-08 18:25:03.807867', '2019-10-08 18:25:03.862192') 

577 ('2019-10-08 18:25:03.862192',) 

578 

579 _windowed_lru_digests uses these borders to form WHERE clauses for its 

580 SELECTs. In doing so, we make sure to repeatedly query the database for 

581 live updates, striking a balance between loading the entire resultset 

582 into memory and querying each row individually, both of which are 

583 inefficient in the context of a large index. 

584 

585 The window size is a parameter and can be configured. A larger window 

586 size will yield better performance (fewer SQL queries) at the cost of 

587 memory (holding on to the results of the query) and accuracy (blobs 

588 may get updated while you're working on them), and vice versa for a 

589 smaller window size. 

590 """ 

591 

592 def int_for_range(start_id: Any, end_id: Any) -> BooleanClauseList[Any]: 

593 if end_id: 

594 return and_(column >= start_id, column < end_id) 

595 else: 

596 return column >= start_id # type: ignore[no-any-return] 

597 

598 # Constructs a query that: 

599 # 1. Gets all the timestamps in sorted order. 

600 # 2. Assign a row number to each entry. 

601 # 3. Only keep timestamps that are every other N row number apart. N="_all_blobs_window_size". 

602 # SELECT 

603 # anon_1.index_accessed_timestamp AS anon_1_index_accessed_timestamp 

604 # FROM ( 

605 # SELECT 

606 # index.accessed_timestamp AS index_accessed_timestamp, 

607 # row_number() OVER (ORDER BY index.accessed_timestamp) AS rownum 

608 # FROM index 

609 # ) 

610 # AS anon_1 

611 # WHERE rownum % 1000=1 

612 # 

613 # Note: 

614 # - This query can be slow due to checking each entry with "WHERE rownum % 1000=1". 

615 # - These timestamps will be the basis for constructing the SQL "WHERE" clauses later. 

616 q = session.query(column, func.row_number().over(order_by=column).label("rownum")).from_self(column) 

617 if self._all_blobs_window_size > 1: 

618 q = q.filter(text(f"rownum % {self._all_blobs_window_size}=1")) 

619 

620 # Execute the underlying query against the database. 

621 # Ex: intervals = [t1, t1001, t2001, ...], q = [(t1, ), (t1001, ), (t2001, ), ...] 

622 intervals = [id for id, in q] 

623 

624 # Generate the whereclauses 

625 while intervals: 

626 start = intervals.pop(0) 

627 if intervals: 

628 end = intervals[0] 

629 else: 

630 end = None 

631 # Ex: yield "WHERE IndexEntry.accessed_timestamp >= start AND IndexEntry.accessed_timestamp < end" 

632 yield int_for_range(start, end) 

633 

634 def _windowed_lru_digests(self, q: "Query[Any]", column: Column[Any]) -> Iterator[Tuple[IndexEntry, bool]]: 

635 """Generate a query for each window produced by _column_windows 

636 and yield the results one by one. 

637 """ 

638 # Determine whether the conditions are met to make an SQL call to get new windows. 

639 msg = "Using stored LRU windows" 

640 if len(self._queue_of_whereclauses) == 0: 

641 msg = "Requesting new LRU windows." 

642 self._queue_of_whereclauses = deque(self._column_windows(q.session, column)) # type: ignore 

643 self._delete_premarked_blobs = True 

644 

645 msg += f" Number of windows remaining: {len(self._queue_of_whereclauses)}" 

646 LOGGER.debug(msg) 

647 

648 while self._queue_of_whereclauses: 

649 whereclause = self._queue_of_whereclauses[0] 

650 window = q.filter(whereclause).order_by(column.asc()) 

651 yield from window 

652 

653 # If yield from window doesn't get to this point that means 

654 # the cleanup hasn't consumed all the content in a whereclause and exited. 

655 # Otherwise, the whereclause is exhausted and can be discarded. 

656 self._queue_of_whereclauses.popleft() 

657 

658 def least_recent_digests(self) -> Iterator[Digest]: 

659 with self._sql.scoped_session() as session: 

660 q = session.query(IndexEntry) 

661 for entry in self._windowed_lru_digests(q, IndexEntry.accessed_timestamp): 

662 # TODO make this generic or delete this method only used by tests. 

663 index_entry = cast(IndexEntry, entry) 

664 assert isinstance(index_entry.digest_hash, str) 

665 assert isinstance(index_entry.digest_size_bytes, int) 

666 yield Digest(hash=index_entry.digest_hash, size_bytes=index_entry.digest_size_bytes) 

667 

668 @DurationMetric(CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME) 

669 def get_total_size(self, include_marked: bool = True) -> int: 

670 statement = select(func.sum(IndexEntry.digest_size_bytes)) 

671 if not include_marked: 

672 statement = statement.filter_by(deleted=False) 

673 

674 with self._sql.scoped_session() as session: 

675 result = session.execute(statement).scalar() 

676 if result is None: 

677 result = 0 

678 return result 

679 

680 def delete_n_bytes( 

681 self, n_bytes: int, dry_run: bool = False, protect_blobs_after: Optional[datetime] = None 

682 ) -> int: 

683 """ 

684 When using a SQL Index, entries with a delete marker are "in the process of being deleted". 

685 This is required because storage operations can't be safely tied to the SQL index transaction 

686 (one may fail independently of the other, and you end up inconsistent). 

687 

688 The workflow is roughly as follows: 

689 - Start a SQL transaction. 

690 - Lock and mark the indexed items you want to delete. 

691 - Close the SQL transaction. 

692 - Perform the storage deletes 

693 - Start a SQL transaction. 

694 - Actually delete the index entries. 

695 - Close the SQL transaction. 

696 

697 This means anything with deleted=False will always be present in the backing store. If it is marked 

698 deleted=True, and the process gets killed when deleting from the backing storage, only 

699 some of the items might actually be gone. 

700 

701 The next time the cleaner starts up, it can try to do that delete again (ignoring 404s). 

702 Eventually that will succeed and the item will actually be removed from the DB. Only during 

703 the first run of batches do we consider already marked items. This avoids multiple cleanup 

704 daemons from competing with each other on every batch. 

705 """ 

706 if protect_blobs_after is None: 

707 protect_blobs_after = datetime.utcnow() 

708 

709 # Used for metric publishing 

710 delete_start_time = time.time() 

711 metadata = {} 

712 if self._instance_name: 

713 metadata["instance-name"] = self._instance_name 

714 

715 storage_digests: List[Digest] = [] 

716 marked_digests: List[Digest] = [] 

717 collected_bytes = 0 

718 

719 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

720 base_query = session.query(IndexEntry, IndexEntry.inline_blob != None).options( # noqa 

721 load_only("digest_hash", "digest_size_bytes") 

722 ) 

723 

724 if self._delete_premarked_blobs: 

725 LOGGER.info("Starting to gather pre-marked records") 

726 premarked_query = base_query.filter_by(deleted=True) 

727 for [entry, is_inline] in premarked_query.all(): 

728 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes) 

729 marked_digests.append(digest) 

730 if not is_inline: 

731 storage_digests.append(digest) 

732 collected_bytes += entry.digest_size_bytes 

733 

734 if not dry_run: 

735 publish_counter_metric(CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME, len(marked_digests), metadata) 

736 LOGGER.info(f"Gathered pre-marked {collected_bytes} out of {n_bytes} bytes(max)") 

737 self._delete_premarked_blobs = False 

738 

739 if collected_bytes < n_bytes: 

740 LOGGER.info("Searching for records to mark deleted") 

741 unmarked_query = ( 

742 base_query.filter_by(deleted=False) 

743 .filter(IndexEntry.accessed_timestamp < protect_blobs_after) 

744 .with_for_update(skip_locked=True) 

745 ) 

746 window = self._windowed_lru_digests(unmarked_query, IndexEntry.accessed_timestamp) 

747 mark_deleted_start = time.perf_counter() 

748 for [entry, is_inline] in window: 

749 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes) 

750 marked_digests.append(digest) 

751 if not is_inline: 

752 storage_digests.append(digest) 

753 collected_bytes += entry.digest_size_bytes 

754 if not dry_run: 

755 entry.deleted = True 

756 if collected_bytes >= n_bytes: 

757 break 

758 mark_deleted_duration = timedelta(seconds=time.perf_counter() - mark_deleted_start) 

759 if not dry_run: 

760 publish_timer_metric(CLEANUP_INDEX_MARK_DELETED_METRIC_NAME, mark_deleted_duration, metadata) 

761 LOGGER.info(f"Gathered {collected_bytes} out of {n_bytes} bytes(max)") 

762 

763 if dry_run: 

764 return collected_bytes 

765 

766 with DurationMetric(CLEANUP_STORAGE_BULK_DELETE_METRIC_NAME, instanced=True): 

767 failed_deletes = self._storage.bulk_delete(storage_digests) 

768 

769 publish_counter_metric(CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME, len(failed_deletes), metadata) 

770 digests_to_delete = [x for x in marked_digests if f"{x.hash}/{x.size_bytes}" not in failed_deletes] 

771 

772 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

773 with DurationMetric(CLEANUP_INDEX_BULK_DELETE_METRIC_NAME, instanced=True): 

774 failed_deletes.extend(self._bulk_delete_from_index(digests_to_delete, session)) 

775 for digest in digests_to_delete: 

776 if digest in failed_deletes: 

777 collected_bytes -= digest.size_bytes 

778 

779 batch_duration = time.time() - delete_start_time 

780 blobs_deleted_per_second = (len(digests_to_delete) - len(failed_deletes)) / batch_duration 

781 publish_gauge_metric(CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, blobs_deleted_per_second, metadata) 

782 return collected_bytes 

783 

784 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

785 # Delete from the index and then delete from the backing storage. 

786 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

787 failed_deletes = self._bulk_delete_from_index(digests, session) 

788 

789 digests_to_delete = [x for x in digests if f"{x.hash}/{x.size_bytes}" not in failed_deletes] 

790 failed_deletes.extend(self._storage.bulk_delete(digests_to_delete)) 

791 return failed_deletes 

792 

793 def _bulk_delete_from_index(self, digests: List[Digest], session: Session) -> List[str]: 

794 LOGGER.info(f"Deleting {len(digests)} digests from the index") 

795 index_table = IndexEntry.__table__ 

796 hashes = [x.hash for x in digests] 

797 

798 # Make sure we don't exceed maximum size of an IN clause 

799 n = self._inclause_limit 

800 hash_chunks = [hashes[i : i + n] for i in range(0, len(hashes), n)] 

801 

802 # We will not raise, rollback, or log on StaleDataErrors. 

803 # These errors occur when we delete fewer rows than we were expecting. 

804 # This is fine, since the missing rows will get deleted eventually. 

805 # When running bulk_deletes concurrently, StaleDataErrors 

806 # occur too often to log. 

807 num_blobs_deleted = 0 

808 for chunk in hash_chunks: 

809 # Do not wait for locks when deleting rows. Skip locked rows to 

810 # avoid deadlocks. 

811 stmt = index_table.delete().where( 

812 index_table.c.digest_hash.in_( 

813 select([index_table.c.digest_hash], index_table.c.digest_hash.in_(chunk)).with_for_update( 

814 skip_locked=True 

815 ) 

816 ) 

817 ) 

818 num_blobs_deleted += session.execute(stmt).rowcount # type: ignore 

819 LOGGER.info(f"{num_blobs_deleted}/{len(digests)} blobs deleted from the index") 

820 

821 # bulk_delete is typically expected to return the digests that were not deleted, 

822 # but delete only returns the number of rows deleted and not what was/wasn't 

823 # deleted. Getting this info would require extra queries, so assume that 

824 # everything was either deleted or already deleted. Failures will continue to throw 

825 return []