Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql.py: 92.87%

407 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-04-15 14:01 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SQLIndex 

18================== 

19 

20A SQL index implementation. This can be pointed to either a remote SQL server 

21or a local SQLite database. 

22 

23""" 

24import io 

25import itertools 

26import logging 

27import time 

28from collections import deque 

29from datetime import datetime, timedelta 

30from typing import IO, Any, Deque, Dict, Iterator, List, Optional, Sequence, Tuple, cast 

31 

32from sqlalchemy import Column, and_, delete, func, select, text 

33from sqlalchemy.exc import DBAPIError 

34from sqlalchemy.orm import Session, load_only 

35from sqlalchemy.orm.exc import StaleDataError 

36from sqlalchemy.orm.query import Query 

37from sqlalchemy.orm.session import Session as SessionType 

38from sqlalchemy.sql.elements import BooleanClauseList 

39 

40from buildgrid._exceptions import StorageFullError 

41from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

42from buildgrid._protos.google.rpc import code_pb2 

43from buildgrid._protos.google.rpc.status_pb2 import Status 

44from buildgrid.server.metrics_names import ( 

45 CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME, 

46 CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME, 

47 CAS_INDEX_GET_BLOB_TIME_METRIC_NAME, 

48 CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME, 

49 CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME, 

50 CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, 

51 CLEANUP_INDEX_BULK_DELETE_METRIC_NAME, 

52 CLEANUP_INDEX_MARK_DELETED_METRIC_NAME, 

53 CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME, 

54 CLEANUP_STORAGE_BULK_DELETE_METRIC_NAME, 

55 CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME, 

56) 

57from buildgrid.server.metrics_utils import ( 

58 DurationMetric, 

59 generator_method_duration_metric, 

60 publish_counter_metric, 

61 publish_gauge_metric, 

62 publish_timer_metric, 

63) 

64from buildgrid.server.persistence.sql.models import IndexEntry 

65from buildgrid.server.sql.provider import SqlProvider 

66from buildgrid.utils import read_and_rewind, validate_digest_data 

67 

68from ..storage_abc import StorageABC 

69from .index_abc import IndexABC 

70from .sql_dialect_delegates import PostgreSQLDelegate, SQLiteDelegate 

71 

72LOGGER = logging.getLogger(__name__) 

73DIALECT_DELEGATES = {"postgresql": PostgreSQLDelegate, "sqlite": SQLiteDelegate} 

74 

75INLINE_BLOB_SIZE_HARD_MAXIMUM = 1000000000 

76 

77 

78class SQLIndex(IndexABC): 

79 def __init__( 

80 self, 

81 sql_provider: SqlProvider, 

82 storage: StorageABC, 

83 *, 

84 window_size: int = 1000, 

85 inclause_limit: int = -1, 

86 max_inline_blob_size: int = 0, 

87 refresh_accesstime_older_than: int = 0, 

88 **kwargs: Any, 

89 ) -> None: 

90 base_argnames = ["fallback_on_get"] 

91 base_args = {} 

92 for arg in base_argnames: 

93 if arg in kwargs: 

94 base_args[arg] = kwargs.pop(arg) 

95 super().__init__(**base_args) 

96 

97 self._sql = sql_provider 

98 self._storage = storage 

99 self._instance_name = None 

100 

101 if max_inline_blob_size > INLINE_BLOB_SIZE_HARD_MAXIMUM: 

102 raise ValueError( 

103 f"Max inline blob size is [{max_inline_blob_size}], " 

104 f"but must be less than [{INLINE_BLOB_SIZE_HARD_MAXIMUM}]." 

105 ) 

106 if max_inline_blob_size >= 0: 

107 self._max_inline_blob_size = max_inline_blob_size 

108 else: 

109 raise ValueError(f"Max inline blob size is [{max_inline_blob_size}], but must be nonnegative.") 

110 

111 if refresh_accesstime_older_than >= 0: 

112 # Measured in seconds. Helps reduce the frequency of timestamp updates during heavy read. 

113 self.refresh_accesstime_older_than = refresh_accesstime_older_than 

114 else: 

115 raise ValueError( 

116 f"'refresh_accesstime_older_than' must be nonnegative. It is {refresh_accesstime_older_than}" 

117 ) 

118 

119 # Max # of rows to fetch while iterating over all blobs 

120 # (e.g. in least_recent_digests) 

121 self._all_blobs_window_size = window_size 

122 

123 # This variable stores the list of whereclauses (SQLAlchemy BooleanClauseList objects) 

124 # generated from the _column_windows() using time-expensive SQL query. 

125 # These whereclauses are used to construct the final SQL query 

126 # during cleanup in order to fetch blobs by time windows. 

127 # 

128 # Inside the _column_windows() a list of timestamp boarders are obtained: 

129 # intervals = [t1, t2, t3, ...] 

130 # Then the generated whereclauses might represent semantically as, for example,: 

131 # self._queue_of_whereclauses = [ 

132 # "WHERE t1 <= IndexEntry.accessed_timestamp < t2", 

133 # "WHERE t2 <= IndexEntry.accessed_timestamp < t3", 

134 # "WHERE t3 <= IndexEntry.accessed_timestamp < t4", 

135 # ... and so on] 

136 # Note the number of entries in each window is determined by 

137 # the instance variable "_all_blobs_window_size". 

138 self._queue_of_whereclauses: Deque[BooleanClauseList[Any]] = deque() 

139 

140 # Whether entries with deleted=True should be considered by mark_n_bytes. 

141 # This is useful to catch any half-finished deletes where a cleanup process 

142 # may have exited half-way through deletion. Once all premarked blobs have been 

143 # deleted this becomes False and is only reset after a full scan of the database 

144 self._delete_premarked_blobs: bool = True 

145 

146 # Only pass known kwargs to db session 

147 available_options = {"pool_size", "max_overflow", "pool_timeout", "pool_pre_ping", "pool_recycle"} 

148 kwargs_keys = kwargs.keys() 

149 if kwargs_keys > available_options: 

150 unknown_args = kwargs_keys - available_options 

151 raise TypeError(f"Unknown keyword arguments: [{unknown_args}]") 

152 

153 # Dialect-specific initialization 

154 self._dialect_delegate = DIALECT_DELEGATES.get(self._sql.dialect) 

155 

156 if inclause_limit > 0: 

157 if inclause_limit > window_size: 

158 LOGGER.warning( 

159 f"Configured inclause_limit [{inclause_limit}] " f"is greater than window_size [{window_size}]" 

160 ) 

161 self._inclause_limit = inclause_limit 

162 else: 

163 # If the inlimit isn't explicitly set, we use a default that 

164 # respects both the window size and the db implementation's 

165 # inlimit. 

166 self._inclause_limit = min(window_size, self._sql.default_inlimit) 

167 LOGGER.debug("SQL index: using default inclause limit " f"of {self._inclause_limit}") 

168 

169 # Make a test query against the database to ensure the connection is valid 

170 with self._sql.scoped_session() as session: 

171 session.query(IndexEntry).first() 

172 self._sql.remove_scoped_session() 

173 

174 def start(self) -> None: 

175 self._storage.start() 

176 

177 def stop(self) -> None: 

178 self._storage.stop() 

179 

180 def has_blob(self, digest: Digest) -> bool: 

181 with self._sql.scoped_session() as session: 

182 statement = select(func.count(IndexEntry.digest_hash)).where(IndexEntry.digest_hash == digest.hash) 

183 

184 num_entries = session.execute(statement).scalar() 

185 if num_entries is None: 

186 num_entries = 0 

187 

188 if num_entries == 1: 

189 return True 

190 elif num_entries < 1: 

191 return False 

192 else: 

193 raise RuntimeError(f"Multiple results found for blob [{digest}]. The index is in a bad state.") 

194 

195 @DurationMetric(CAS_INDEX_GET_BLOB_TIME_METRIC_NAME, instanced=True) 

196 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

197 """Get a blob from the index or the backing storage. Optionally fallback and repair index""" 

198 

199 # Check the index for the blob and return if found. 

200 with self._sql.scoped_session() as session: 

201 if entry := session.query(IndexEntry).filter(IndexEntry.digest_hash == digest.hash).first(): 

202 if entry.inline_blob is not None: 

203 return io.BytesIO(entry.inline_blob) 

204 elif blob := self._storage.get_blob(digest): 

205 # Fix any blobs that should have been inlined. 

206 if digest.size_bytes <= self._max_inline_blob_size: 

207 self._save_digests_to_index([(digest, read_and_rewind(blob))], session) 

208 session.commit() 

209 return blob 

210 LOGGER.warning(f"Blob [{digest.hash}] was indexed but not in storage. Deleting from the index") 

211 self._bulk_delete_from_index([digest], session) 

212 

213 # Check the storage for the blob and repair the index if found. 

214 if self._fallback_on_get: 

215 if blob := self._storage.get_blob(digest): 

216 with self._sql.scoped_session() as session: 

217 if digest.size_bytes <= self._max_inline_blob_size: 

218 self._save_digests_to_index([(digest, read_and_rewind(blob))], session) 

219 else: 

220 self._save_digests_to_index([(digest, None)], session) 

221 session.commit() 

222 return blob 

223 

224 # Blob was not found in index or storage 

225 return None 

226 

227 def delete_blob(self, digest: Digest) -> None: 

228 statement = delete(IndexEntry).where(IndexEntry.digest_hash == digest.hash) 

229 options = {"synchronize_session": False} 

230 

231 with self._sql.scoped_session() as session: 

232 session.execute(statement, execution_options=options) 

233 

234 self._storage.delete_blob(digest) 

235 

236 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

237 inline_blob = None 

238 if digest.size_bytes > self._max_inline_blob_size: 

239 self._storage.commit_write(digest, write_session) 

240 else: 

241 write_session.seek(0) 

242 inline_blob = write_session.read() 

243 try: 

244 with self._sql.scoped_session() as session: 

245 self._save_digests_to_index([(digest, inline_blob)], session) 

246 except DBAPIError as error: 

247 # Error has pgcode attribute (Postgres only) 

248 if hasattr(error.orig, "pgcode"): 

249 # imported here to avoid global dependency on psycopg2 

250 from psycopg2.errors import DiskFull, OutOfMemory 

251 

252 # 53100 == DiskFull && 53200 == OutOfMemory 

253 if error.orig.pgerror in [DiskFull, OutOfMemory]: 

254 raise StorageFullError(f"Postgres Error: {error.orig.pgcode}") from error 

255 raise error 

256 

257 def _partitioned_hashes(self, digests: Sequence[Digest]) -> Iterator[Iterator[str]]: 

258 """Given a long list of digests, split it into parts no larger than 

259 _inclause_limit and yield the hashes in each part. 

260 """ 

261 for part_start in range(0, len(digests), self._inclause_limit): 

262 part_end = min(len(digests), part_start + self._inclause_limit) 

263 part_digests = itertools.islice(digests, part_start, part_end) 

264 yield map(lambda digest: digest.hash, part_digests) 

265 

266 @generator_method_duration_metric(CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME) 

267 def _bulk_select_digests(self, digests: Sequence[Digest], fetch_blobs: bool = False) -> Iterator[IndexEntry]: 

268 """Generator that selects all rows matching a digest list. 

269 

270 SQLAlchemy Core is used for this because the ORM has problems with 

271 large numbers of bind variables for WHERE IN clauses. 

272 

273 We only select on the digest hash (not hash and size) to allow for 

274 index-only queries on db backends that support them. 

275 """ 

276 index_table = IndexEntry.__table__ 

277 with self._sql.scoped_session() as session: 

278 columns = [index_table.c.digest_hash] 

279 if fetch_blobs: 

280 columns.append(index_table.c.inline_blob) 

281 for part in self._partitioned_hashes(digests): 

282 stmt = select(columns).where(index_table.c.digest_hash.in_(part)) 

283 entries = session.execute(stmt) 

284 yield from entries # type: ignore 

285 

286 @DurationMetric(CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME, instanced=True) 

287 def _bulk_refresh_timestamps( 

288 self, digests: Sequence[Digest], session: SessionType, update_time: Optional[datetime] = None 

289 ) -> None: 

290 """Refresh all timestamps of the input digests. 

291 

292 SQLAlchemy Core is used for this because the ORM is not suitable for 

293 bulk inserts and updates. 

294 

295 https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow 

296 """ 

297 index_table = IndexEntry.__table__ 

298 current_time = datetime.utcnow() 

299 

300 # If a timestamp was passed in, use it. And always refreshes (no threshold). 

301 if update_time: 

302 timestamp = update_time 

303 last_accessed_threshold = current_time 

304 # Otherwise timestamp of digest will not refresh if it was last accessed more recent than this threshold. 

305 else: 

306 timestamp = current_time 

307 last_accessed_threshold = current_time - timedelta(seconds=self.refresh_accesstime_older_than) 

308 

309 for part in self._partitioned_hashes(digests): 

310 # Generate the SQL Statement: 

311 # UPDATE index SET accessed_timestamp=<timestamp> 

312 # WHERE index.digest_hash IN 

313 # (SELECT index.digest_hash FROM index 

314 # WHERE index.digest_hash IN <part> AND WHERE index.accessed_timestamp < <last_accessed_threshold> 

315 # FOR UPDATE SKIP LOCKED) 

316 stmt = ( 

317 index_table.update() 

318 .where( 

319 index_table.c.digest_hash.in_( 

320 select(index_table.c.digest_hash) 

321 .where(index_table.c.digest_hash.in_(part)) 

322 .where(index_table.c.accessed_timestamp < last_accessed_threshold) 

323 .with_for_update(skip_locked=True) 

324 ) 

325 ) 

326 .values(accessed_timestamp=timestamp) 

327 ) 

328 session.execute(stmt) 

329 session.commit() 

330 

331 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

332 entries = self._bulk_select_digests(digests) 

333 

334 found_hashes = {entry.digest_hash for entry in entries} 

335 

336 # Split the digests into two found/missing lists 

337 found_digests, missing_digests = [], [] 

338 for digest in digests: 

339 if digest.hash in found_hashes: 

340 found_digests.append(digest) 

341 else: 

342 missing_digests.append(digest) 

343 

344 # Update all timestamps for blobs which were found 

345 with self._sql.scoped_session() as session: 

346 self._bulk_refresh_timestamps(found_digests, session) 

347 

348 return missing_digests 

349 

350 @DurationMetric(CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME, instanced=True) 

351 def _save_digests_to_index( 

352 self, digest_blob_pairs: List[Tuple[Digest, Optional[bytes]]], session: SessionType 

353 ) -> None: 

354 """Helper to persist a list of digest/blob pairs to the index. 

355 

356 Any digests present are updated, and new digests are inserted along with their inline blobs (if provided). 

357 Only blobs with size less than or equal to the max_inline_blob_size are inserted directly into the index. 

358 """ 

359 if not digest_blob_pairs: 

360 return 

361 

362 digest_blob_pairs = sorted(digest_blob_pairs, key=lambda pair: (pair[0].hash, pair[0].size_bytes)) 

363 

364 if self._dialect_delegate: 

365 try: 

366 self._dialect_delegate._save_digests_to_index( # type: ignore 

367 digest_blob_pairs, session, self._max_inline_blob_size 

368 ) 

369 return 

370 except AttributeError: 

371 pass 

372 

373 update_time = datetime.utcnow() 

374 # Figure out which digests we can just update 

375 digests = [digest for (digest, blob) in digest_blob_pairs] 

376 entries = self._bulk_select_digests(digests) 

377 # Map digests to new entries 

378 entries_not_present = { 

379 digest.hash: { 

380 "digest_hash": digest.hash, 

381 "digest_size_bytes": digest.size_bytes, 

382 "accessed_timestamp": update_time, 

383 "inline_blob": (blob if digest.size_bytes <= self._max_inline_blob_size else None), 

384 } 

385 for (digest, blob) in digest_blob_pairs 

386 } 

387 

388 entries_present = {} 

389 for entry in entries: 

390 entries_present[entry.digest_hash] = entries_not_present[entry.digest_hash] 

391 del entries_not_present[entry.digest_hash] 

392 

393 if entries_not_present: 

394 session.bulk_insert_mappings(IndexEntry, entries_not_present.values()) # type: ignore 

395 if entries_present: 

396 session.bulk_update_mappings(IndexEntry, entries_present.values()) # type: ignore 

397 

398 def bulk_update_blobs( # pylint: disable=arguments-renamed 

399 self, digest_blob_pairs: List[Tuple[Digest, bytes]] 

400 ) -> List[Status]: 

401 """Implement the StorageABC's bulk_update_blobs method. 

402 

403 The StorageABC interface takes in a list of digest/blob pairs and 

404 returns a list of results. The list of results MUST be ordered to 

405 correspond with the order of the input list.""" 

406 pairs_to_store = [] 

407 result_map = {} 

408 

409 # For each blob, determine whether to store it in the backing storage or inline it 

410 for digest, blob in digest_blob_pairs: 

411 if validate_digest_data(digest, blob): 

412 if digest.size_bytes > self._max_inline_blob_size: 

413 pairs_to_store.append((digest, blob)) 

414 else: 

415 result_map[digest.hash] = Status(code=code_pb2.OK) 

416 else: 

417 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash") 

418 missing_blobs = self.missing_blobs([digest for digest, _ in pairs_to_store]) 

419 missing_blob_pairs = [] 

420 for digest, blob in pairs_to_store: 

421 if digest not in missing_blobs: 

422 result_map[digest.hash] = Status(code=code_pb2.OK) 

423 else: 

424 missing_blob_pairs.append((digest, blob)) 

425 

426 backup_results = self._storage.bulk_update_blobs(missing_blob_pairs) 

427 

428 for digest, result in zip(missing_blobs, backup_results): 

429 if digest.hash in result_map: 

430 # ERROR: blob was both inlined and backed up 

431 raise RuntimeError("Blob was both inlined and backed up.") 

432 result_map[digest.hash] = result 

433 

434 # Generate the final list of results 

435 pairs_to_inline: List[Tuple[Digest, Optional[bytes]]] = [] 

436 results = [] 

437 for digest, blob in digest_blob_pairs: 

438 status = result_map.get( 

439 digest.hash, 

440 Status(code=code_pb2.UNKNOWN, message="SQL Index: unable to determine the status of this blob"), 

441 ) 

442 results.append(status) 

443 if status.code == code_pb2.OK: 

444 pairs_to_inline.append((digest, blob)) 

445 

446 with self._sql.scoped_session() as session: 

447 self._save_digests_to_index(pairs_to_inline, session) 

448 

449 return results 

450 

451 def _bulk_read_blobs_with_fallback(self, digests: List[Digest]) -> Dict[str, bytes]: 

452 hash_to_digest: Dict[str, Digest] = {digest.hash: digest for digest in digests} 

453 results: Dict[str, bytes] = {} 

454 

455 expected_storage_digests: List[Digest] = [] 

456 # Fetch inlined blobs directly from the index 

457 entries = self._bulk_select_digests(digests, fetch_blobs=True) 

458 for e in entries: 

459 blob, digest_hash, digest = e.inline_blob, e.digest_hash, hash_to_digest[e.digest_hash] 

460 if blob is not None: 

461 results[digest_hash] = blob 

462 hash_to_digest.pop(digest_hash) 

463 else: 

464 # If a blob is not inlined then the blob is expected to be in storage 

465 expected_storage_digests.append(digest) 

466 

467 # Fetch everything that wasn't inlined from the backing storage 

468 fetched_digests = self._storage.bulk_read_blobs(list(hash_to_digest.values())) 

469 

470 # Save everything fetched from storage, inlining the blobs if they're small enough 

471 digest_pairs_to_save: List[Tuple[Digest, Optional[bytes]]] = [] 

472 for digest_hash, blob_data in fetched_digests.items(): 

473 if blob_data is not None: 

474 digest = hash_to_digest[digest_hash] 

475 if digest.size_bytes <= self._max_inline_blob_size: 

476 digest_pairs_to_save.append((digest, blob_data)) 

477 else: 

478 digest_pairs_to_save.append((digest, None)) 

479 results[digest_hash] = blob_data 

480 

481 # List of digests found in storage 

482 acutal_storage_digest_hashes = set( 

483 digest_hash for (digest_hash, blob_data) in fetched_digests.items() if blob_data is not None 

484 ) 

485 # Get a list of all the digests that were in the index but not found in storage 

486 digests_expected_not_in_storage: List[Digest] = [] 

487 for expected_digest in expected_storage_digests: 

488 if expected_digest.hash not in acutal_storage_digest_hashes: 

489 LOGGER.warning(f"Blob [{expected_digest}] was indexed but not in storage. Deleting from the index") 

490 digests_expected_not_in_storage.append(expected_digest) 

491 

492 with self._sql.scoped_session() as session: 

493 self._save_digests_to_index(digest_pairs_to_save, session) 

494 if digests_expected_not_in_storage: 

495 self._bulk_delete_from_index(digests_expected_not_in_storage, session) 

496 session.commit() 

497 

498 return results 

499 

500 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

501 if self._fallback_on_get: 

502 return self._bulk_read_blobs_with_fallback(digests) 

503 

504 # If fallback is disabled, query the index first and only 

505 # query the storage for blobs that weren't inlined there 

506 

507 hash_to_digest = {digest.hash: digest for digest in digests} # hash -> digest map 

508 results: Dict[str, bytes] = {} # The final list of results (return value) 

509 digests_to_fetch: List[Digest] = [] # Digests that need to be fetched from storage 

510 digest_pairs_to_save: List[Tuple[Digest, Optional[bytes]]] = [] # Digests that need to be updated in the index 

511 

512 # Fetch all of the digests in the database 

513 # Anything that wasn't already inlined needs to be fetched 

514 entries = self._bulk_select_digests(digests, fetch_blobs=True) 

515 for index_entry in entries: 

516 digest = hash_to_digest[index_entry.digest_hash] 

517 if index_entry.inline_blob is not None: 

518 results[index_entry.digest_hash] = index_entry.inline_blob 

519 else: 

520 digests_to_fetch.append(digest) 

521 

522 # Caution: digest whose blob cannot be found from storage will be dropped. 

523 if digests_to_fetch: 

524 fetched_digests = self._storage.bulk_read_blobs(digests_to_fetch) 

525 else: 

526 fetched_digests = {} 

527 

528 # Generate the list of inputs for _save_digests_to_index 

529 # 

530 # We only need to send blob data for small blobs fetched 

531 # from the storage since everything else is either too 

532 # big or already inlined 

533 for digest in digests_to_fetch: 

534 if blob_data := fetched_digests.get(digest.hash): 

535 if digest.size_bytes <= self._max_inline_blob_size: 

536 digest_pairs_to_save.append((digest, blob_data)) 

537 

538 acutal_storage_digests = set(digest_hash for (digest_hash, _) in fetched_digests.items()) 

539 # Get a list of all the digests that were in the index but not found in storage 

540 digests_expected_not_in_storage: List[Digest] = [] 

541 for expected_digest in digests_to_fetch: 

542 if expected_digest.hash not in acutal_storage_digests: 

543 LOGGER.warning(f"Blob [{expected_digest}] was indexed but not in storage. Deleting from the index") 

544 digests_expected_not_in_storage.append(expected_digest) 

545 

546 # Update any blobs which need to be inlined 

547 with self._sql.scoped_session() as session: 

548 self._save_digests_to_index(digest_pairs_to_save, session) 

549 if digests_expected_not_in_storage: 

550 self._bulk_delete_from_index(digests_expected_not_in_storage, session) 

551 session.commit() 

552 

553 results.update(fetched_digests) 

554 return results 

555 

556 def _column_windows(self, session: SessionType, column: Column[Any]) -> Iterator[BooleanClauseList[Any]]: 

557 """Adapted from the sqlalchemy WindowedRangeQuery recipe. 

558 https://github.com/sqlalchemy/sqlalchemy/wiki/WindowedRangeQuery 

559 

560 This method breaks the timestamp range into windows and yields 

561 the borders of these windows to the callee. For example, the borders 

562 yielded by this might look something like 

563 ('2019-10-08 18:25:03.699863', '2019-10-08 18:25:03.751018') 

564 ('2019-10-08 18:25:03.751018', '2019-10-08 18:25:03.807867') 

565 ('2019-10-08 18:25:03.807867', '2019-10-08 18:25:03.862192') 

566 ('2019-10-08 18:25:03.862192',) 

567 

568 _windowed_lru_digests uses these borders to form WHERE clauses for its 

569 SELECTs. In doing so, we make sure to repeatedly query the database for 

570 live updates, striking a balance between loading the entire resultset 

571 into memory and querying each row individually, both of which are 

572 inefficient in the context of a large index. 

573 

574 The window size is a parameter and can be configured. A larger window 

575 size will yield better performance (fewer SQL queries) at the cost of 

576 memory (holding on to the results of the query) and accuracy (blobs 

577 may get updated while you're working on them), and vice versa for a 

578 smaller window size. 

579 """ 

580 

581 def int_for_range(start_id: Any, end_id: Any) -> BooleanClauseList[Any]: 

582 if end_id: 

583 return and_(column >= start_id, column < end_id) 

584 else: 

585 return column >= start_id # type: ignore[no-any-return] 

586 

587 # Constructs a query that: 

588 # 1. Gets all the timestamps in sorted order. 

589 # 2. Assign a row number to each entry. 

590 # 3. Only keep timestamps that are every other N row number apart. N="_all_blobs_window_size". 

591 # SELECT 

592 # anon_1.index_accessed_timestamp AS anon_1_index_accessed_timestamp 

593 # FROM ( 

594 # SELECT 

595 # index.accessed_timestamp AS index_accessed_timestamp, 

596 # row_number() OVER (ORDER BY index.accessed_timestamp) AS rownum 

597 # FROM index 

598 # ) 

599 # AS anon_1 

600 # WHERE rownum % 1000=1 

601 # 

602 # Note: 

603 # - This query can be slow due to checking each entry with "WHERE rownum % 1000=1". 

604 # - These timestamps will be the basis for constructing the SQL "WHERE" clauses later. 

605 q = session.query(column, func.row_number().over(order_by=column).label("rownum")).from_self(column) 

606 if self._all_blobs_window_size > 1: 

607 q = q.filter(text(f"rownum % {self._all_blobs_window_size}=1")) 

608 

609 # Execute the underlying query against the database. 

610 # Ex: intervals = [t1, t1001, t2001, ...], q = [(t1, ), (t1001, ), (t2001, ), ...] 

611 intervals = [id for id, in q] 

612 

613 # Generate the whereclauses 

614 while intervals: 

615 start = intervals.pop(0) 

616 if intervals: 

617 end = intervals[0] 

618 else: 

619 end = None 

620 # Ex: yield "WHERE IndexEntry.accessed_timestamp >= start AND IndexEntry.accessed_timestamp < end" 

621 yield int_for_range(start, end) 

622 

623 def _windowed_lru_digests(self, q: "Query[Any]", column: Column[Any]) -> Iterator[Tuple[IndexEntry, bool]]: 

624 """Generate a query for each window produced by _column_windows 

625 and yield the results one by one. 

626 """ 

627 # Determine whether the conditions are met to make an SQL call to get new windows. 

628 msg = "Using stored LRU windows" 

629 if len(self._queue_of_whereclauses) == 0: 

630 msg = "Requesting new LRU windows." 

631 self._queue_of_whereclauses = deque(self._column_windows(q.session, column)) # type: ignore 

632 self._delete_premarked_blobs = True 

633 

634 msg += f" Number of windows remaining: {len(self._queue_of_whereclauses)}" 

635 LOGGER.debug(msg) 

636 

637 while self._queue_of_whereclauses: 

638 whereclause = self._queue_of_whereclauses[0] 

639 window = q.filter(whereclause).order_by(column.asc()) 

640 yield from window 

641 

642 # If yield from window doesn't get to this point that means 

643 # the cleanup hasn't consumed all the content in a whereclause and exited. 

644 # Otherwise, the whereclause is exhausted and can be discarded. 

645 self._queue_of_whereclauses.popleft() 

646 

647 def least_recent_digests(self) -> Iterator[Digest]: 

648 with self._sql.scoped_session() as session: 

649 q = session.query(IndexEntry) 

650 for entry in self._windowed_lru_digests(q, IndexEntry.accessed_timestamp): 

651 # TODO make this generic or delete this method only used by tests. 

652 index_entry = cast(IndexEntry, entry) 

653 assert isinstance(index_entry.digest_hash, str) 

654 assert isinstance(index_entry.digest_size_bytes, int) 

655 yield Digest(hash=index_entry.digest_hash, size_bytes=index_entry.digest_size_bytes) 

656 

657 @DurationMetric(CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME) 

658 def get_total_size(self, include_marked: bool = True) -> int: 

659 statement = select(func.sum(IndexEntry.digest_size_bytes)) 

660 if not include_marked: 

661 statement = statement.filter_by(deleted=False) 

662 

663 with self._sql.scoped_session() as session: 

664 result = session.execute(statement).scalar() 

665 if result is None: 

666 result = 0 

667 return result 

668 

669 def delete_n_bytes( 

670 self, n_bytes: int, dry_run: bool = False, protect_blobs_after: Optional[datetime] = None 

671 ) -> int: 

672 """ 

673 When using a SQL Index, entries with a delete marker are "in the process of being deleted". 

674 This is required because storage operations can't be safely tied to the SQL index transaction 

675 (one may fail independently of the other, and you end up inconsistent). 

676 

677 The workflow is roughly as follows: 

678 - Start a SQL transaction. 

679 - Lock and mark the indexed items you want to delete. 

680 - Close the SQL transaction. 

681 - Perform the storage deletes 

682 - Start a SQL transaction. 

683 - Actually delete the index entries. 

684 - Close the SQL transaction. 

685 

686 This means anything with deleted=False will always be present in the backing store. If it is marked 

687 deleted=True, and the process gets killed when deleting from the backing storage, only 

688 some of the items might actually be gone. 

689 

690 The next time the cleaner starts up, it can try to do that delete again (ignoring 404s). 

691 Eventually that will succeed and the item will actually be removed from the DB. Only during 

692 the first run of batches do we consider already marked items. This avoids multiple cleanup 

693 daemons from competing with each other on every batch. 

694 """ 

695 if protect_blobs_after is None: 

696 protect_blobs_after = datetime.utcnow() 

697 

698 # Used for metric publishing 

699 delete_start_time = time.time() 

700 metadata = {} 

701 if self._instance_name: 

702 metadata["instance-name"] = self._instance_name 

703 

704 storage_digests: List[Digest] = [] 

705 marked_digests: List[Digest] = [] 

706 collected_bytes = 0 

707 

708 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

709 base_query = session.query(IndexEntry, IndexEntry.inline_blob != None).options( # noqa 

710 load_only("digest_hash", "digest_size_bytes") 

711 ) 

712 

713 if self._delete_premarked_blobs: 

714 LOGGER.info("Starting to gather pre-marked records") 

715 premarked_query = base_query.filter_by(deleted=True) 

716 for [entry, is_inline] in premarked_query.all(): 

717 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes) 

718 marked_digests.append(digest) 

719 if not is_inline: 

720 storage_digests.append(digest) 

721 collected_bytes += entry.digest_size_bytes 

722 

723 if not dry_run: 

724 publish_counter_metric(CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME, len(marked_digests), metadata) 

725 LOGGER.info(f"Gathered pre-marked {collected_bytes} out of {n_bytes} bytes(max)") 

726 self._delete_premarked_blobs = False 

727 

728 if collected_bytes < n_bytes: 

729 LOGGER.info("Searching for records to mark deleted") 

730 unmarked_query = ( 

731 base_query.filter_by(deleted=False) 

732 .filter(IndexEntry.accessed_timestamp < protect_blobs_after) 

733 .with_for_update(skip_locked=True) 

734 ) 

735 window = self._windowed_lru_digests(unmarked_query, IndexEntry.accessed_timestamp) 

736 mark_deleted_start = time.perf_counter() 

737 for [entry, is_inline] in window: 

738 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes) 

739 marked_digests.append(digest) 

740 if not is_inline: 

741 storage_digests.append(digest) 

742 collected_bytes += entry.digest_size_bytes 

743 if not dry_run: 

744 entry.deleted = True 

745 if collected_bytes >= n_bytes: 

746 break 

747 mark_deleted_duration = timedelta(seconds=time.perf_counter() - mark_deleted_start) 

748 if not dry_run: 

749 publish_timer_metric(CLEANUP_INDEX_MARK_DELETED_METRIC_NAME, mark_deleted_duration, metadata) 

750 LOGGER.info(f"Gathered {collected_bytes} out of {n_bytes} bytes(max)") 

751 

752 if dry_run: 

753 return collected_bytes 

754 

755 with DurationMetric(CLEANUP_STORAGE_BULK_DELETE_METRIC_NAME, instanced=True): 

756 failed_deletes = self._storage.bulk_delete(storage_digests) 

757 

758 publish_counter_metric(CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME, len(failed_deletes), metadata) 

759 digests_to_delete = [x for x in marked_digests if x.hash not in failed_deletes] 

760 

761 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

762 with DurationMetric(CLEANUP_INDEX_BULK_DELETE_METRIC_NAME, instanced=True): 

763 failed_deletes.extend(self._bulk_delete_from_index(digests_to_delete, session)) 

764 for digest in digests_to_delete: 

765 if digest in failed_deletes: 

766 collected_bytes -= digest.size_bytes 

767 

768 batch_duration = time.time() - delete_start_time 

769 blobs_deleted_per_second = (len(digests_to_delete) - len(failed_deletes)) / batch_duration 

770 publish_gauge_metric(CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, blobs_deleted_per_second, metadata) 

771 return collected_bytes 

772 

773 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

774 # Delete from the index and then delete from the backing storage. 

775 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

776 failed_deletes = self._bulk_delete_from_index(digests, session) 

777 

778 digests_to_delete = [x for x in digests if x.hash not in failed_deletes] 

779 failed_deletes.extend(self._storage.bulk_delete(digests_to_delete)) 

780 return failed_deletes 

781 

782 def _bulk_delete_from_index(self, digests: List[Digest], session: Session) -> List[str]: 

783 LOGGER.info(f"Deleting {len(digests)} digests from the index") 

784 index_table = IndexEntry.__table__ 

785 hashes = [x.hash for x in digests] 

786 

787 # Make sure we don't exceed maximum size of an IN clause 

788 n = self._inclause_limit 

789 hash_chunks = [hashes[i : i + n] for i in range(0, len(hashes), n)] 

790 

791 # We will not raise, rollback, or log on StaleDataErrors. 

792 # These errors occur when we delete fewer rows than we were expecting. 

793 # This is fine, since the missing rows will get deleted eventually. 

794 # When running bulk_deletes concurrently, StaleDataErrors 

795 # occur too often to log. 

796 num_blobs_deleted = 0 

797 for chunk in hash_chunks: 

798 # Do not wait for locks when deleting rows. Skip locked rows to 

799 # avoid deadlocks. 

800 stmt = index_table.delete().where( 

801 index_table.c.digest_hash.in_( 

802 select([index_table.c.digest_hash], index_table.c.digest_hash.in_(chunk)).with_for_update( 

803 skip_locked=True 

804 ) 

805 ) 

806 ) 

807 num_blobs_deleted += session.execute(stmt).rowcount # type: ignore 

808 LOGGER.info(f"{num_blobs_deleted}/{len(digests)} blobs deleted from the index") 

809 

810 # bulk_delete is typically expected to return the digests that were not deleted, 

811 # but delete only returns the number of rows deleted and not what was/wasn't 

812 # deleted. Getting this info would require extra queries, so assume that 

813 # everything was either deleted or already deleted. Failures will continue to throw 

814 return []