Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql.py: 92.97%

427 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-21 15:45 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SQLIndex 

18================== 

19 

20A SQL index implementation. This can be pointed to either a remote SQL server 

21or a local SQLite database. 

22 

23""" 

24import io 

25import itertools 

26import time 

27from collections import deque 

28from datetime import datetime, timedelta 

29from typing import IO, Any, AnyStr, Deque, Iterator, Sequence, cast 

30 

31from sqlalchemy import ColumnElement, Table, and_, delete, func, not_, select 

32from sqlalchemy.exc import DBAPIError 

33from sqlalchemy.orm import InstrumentedAttribute, Session, load_only 

34from sqlalchemy.orm.exc import StaleDataError 

35from sqlalchemy.orm.query import Query 

36from sqlalchemy.orm.session import Session as SessionType 

37 

38from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

39from buildgrid._protos.google.rpc import code_pb2 

40from buildgrid._protos.google.rpc.status_pb2 import Status 

41from buildgrid.server.decorators import timed 

42from buildgrid.server.exceptions import StorageFullError 

43from buildgrid.server.logging import buildgrid_logger 

44from buildgrid.server.metrics_names import METRIC 

45from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric, publish_timer_metric 

46from buildgrid.server.sql.models import IndexEntry 

47from buildgrid.server.sql.provider import SqlProvider 

48from buildgrid.server.utils.digests import validate_digest_data 

49 

50from ..storage_abc import StorageABC 

51from .index_abc import IndexABC 

52from .sql_dialect_delegates import PostgreSQLDelegate, SQLiteDelegate 

53 

54LOGGER = buildgrid_logger(__name__) 

55DIALECT_DELEGATES = {"postgresql": PostgreSQLDelegate, "sqlite": SQLiteDelegate} 

56 

57INLINE_BLOB_SIZE_HARD_MAXIMUM = 1000000000 

58 

59 

60def read_and_rewind(read_head: IO[AnyStr]) -> AnyStr | None: 

61 """Reads from an IO object and returns the data found there 

62 after rewinding the object to the beginning. 

63 

64 Args: 

65 read_head (IO): readable IO head 

66 

67 Returns: 

68 AnyStr: readable content from `read_head`. 

69 """ 

70 if not read_head: 

71 return None 

72 

73 data = read_head.read() 

74 read_head.seek(0) 

75 return data 

76 

77 

78class SQLIndex(IndexABC): 

79 TYPE = "SQLIndex" 

80 

81 def __init__( 

82 self, 

83 sql_provider: SqlProvider, 

84 storage: StorageABC, 

85 *, 

86 window_size: int = 1000, 

87 inclause_limit: int = -1, 

88 max_inline_blob_size: int = 0, 

89 refresh_accesstime_older_than: int = 0, 

90 **kwargs: Any, 

91 ) -> None: 

92 base_argnames = ["fallback_on_get"] 

93 base_args = {} 

94 for arg in base_argnames: 

95 if arg in kwargs: 

96 base_args[arg] = kwargs.pop(arg) 

97 super().__init__(**base_args) 

98 

99 self._sql = sql_provider 

100 self._storage = storage 

101 

102 if max_inline_blob_size > INLINE_BLOB_SIZE_HARD_MAXIMUM: 

103 raise ValueError( 

104 f"Max inline blob size is [{max_inline_blob_size}], " 

105 f"but must be less than [{INLINE_BLOB_SIZE_HARD_MAXIMUM}]." 

106 ) 

107 if max_inline_blob_size >= 0: 

108 self._max_inline_blob_size = max_inline_blob_size 

109 else: 

110 raise ValueError(f"Max inline blob size is [{max_inline_blob_size}], but must be nonnegative.") 

111 

112 if refresh_accesstime_older_than >= 0: 

113 # Measured in seconds. Helps reduce the frequency of timestamp updates during heavy read. 

114 self.refresh_accesstime_older_than = refresh_accesstime_older_than 

115 else: 

116 raise ValueError( 

117 f"'refresh_accesstime_older_than' must be nonnegative. It is {refresh_accesstime_older_than}" 

118 ) 

119 

120 # Max # of rows to fetch while iterating over all blobs 

121 # (e.g. in least_recent_digests) 

122 self._all_blobs_window_size = window_size 

123 

124 # This variable stores the list of whereclauses (SQLAlchemy BooleanClauseList objects) 

125 # generated from the _column_windows() using time-expensive SQL query. 

126 # These whereclauses are used to construct the final SQL query 

127 # during cleanup in order to fetch blobs by time windows. 

128 # 

129 # Inside the _column_windows() a list of timestamp boarders are obtained: 

130 # intervals = [t1, t2, t3, ...] 

131 # Then the generated whereclauses might represent semantically as, for example,: 

132 # self._queue_of_whereclauses = [ 

133 # "WHERE t1 <= IndexEntry.accessed_timestamp < t2", 

134 # "WHERE t2 <= IndexEntry.accessed_timestamp < t3", 

135 # "WHERE t3 <= IndexEntry.accessed_timestamp < t4", 

136 # ... and so on] 

137 # Note the number of entries in each window is determined by 

138 # the instance variable "_all_blobs_window_size". 

139 self._queue_of_whereclauses: Deque[ColumnElement[bool]] = deque() 

140 

141 # Whether entries with deleted=True should be considered by mark_n_bytes. 

142 # This is useful to catch any half-finished deletes where a cleanup process 

143 # may have exited half-way through deletion. Once all premarked blobs have been 

144 # deleted this becomes False and is only reset after a full scan of the database 

145 self._delete_premarked_blobs: bool = True 

146 

147 # Only pass known kwargs to db session 

148 available_options = {"pool_size", "max_overflow", "pool_timeout", "pool_pre_ping", "pool_recycle"} 

149 kwargs_keys = kwargs.keys() 

150 if kwargs_keys > available_options: 

151 unknown_args = kwargs_keys - available_options 

152 raise TypeError(f"Unknown keyword arguments: [{unknown_args}]") 

153 

154 # Dialect-specific initialization 

155 self._dialect_delegate = DIALECT_DELEGATES.get(self._sql.dialect) 

156 

157 if inclause_limit > 0: 

158 if inclause_limit > window_size: 

159 LOGGER.warning( 

160 "Configured inclause limit is greater than window size.", 

161 tags=dict(inclause_limit=inclause_limit, window_size=window_size), 

162 ) 

163 self._inclause_limit = inclause_limit 

164 else: 

165 # If the inlimit isn't explicitly set, we use a default that 

166 # respects both the window size and the db implementation's 

167 # inlimit. 

168 self._inclause_limit = min(window_size, self._sql.default_inlimit) 

169 LOGGER.debug("SQL index: using default inclause limit.", tags=dict(inclause_limit=self._inclause_limit)) 

170 

171 # Make a test query against the database to ensure the connection is valid 

172 with self._sql.scoped_session() as session: 

173 session.query(IndexEntry).first() 

174 self._sql.remove_scoped_session() 

175 

176 def start(self) -> None: 

177 self._storage.start() 

178 

179 def stop(self) -> None: 

180 self._storage.stop() 

181 

182 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

183 def has_blob(self, digest: Digest) -> bool: 

184 with self._sql.scoped_session() as session: 

185 statement = select(func.count(IndexEntry.digest_hash)).where(IndexEntry.digest_hash == digest.hash) 

186 

187 num_entries = session.execute(statement).scalar() 

188 if num_entries is None: 

189 num_entries = 0 

190 

191 if num_entries == 1: 

192 return True 

193 elif num_entries < 1: 

194 return False 

195 else: 

196 raise RuntimeError(f"Multiple results found for blob [{digest}]. The index is in a bad state.") 

197 

198 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

199 def get_blob(self, digest: Digest) -> IO[bytes] | None: 

200 """Get a blob from the index or the backing storage. Optionally fallback and repair index""" 

201 

202 # Check the index for the blob and return if found. 

203 with self._sql.scoped_session() as session: 

204 if entry := session.query(IndexEntry).filter(IndexEntry.digest_hash == digest.hash).first(): 

205 if entry.inline_blob is not None: 

206 return io.BytesIO(entry.inline_blob) 

207 elif blob := self._storage.get_blob(digest): 

208 # Fix any blobs that should have been inlined. 

209 if digest.size_bytes <= self._max_inline_blob_size: 

210 self._save_digests_to_index([(digest, read_and_rewind(blob))], session) 

211 session.commit() 

212 return blob 

213 LOGGER.warning( 

214 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest) 

215 ) 

216 self._bulk_delete_from_index([digest], session) 

217 

218 # Check the storage for the blob and repair the index if found. 

219 if self._fallback_on_get: 

220 if blob := self._storage.get_blob(digest): 

221 with self._sql.scoped_session() as session: 

222 if digest.size_bytes <= self._max_inline_blob_size: 

223 self._save_digests_to_index([(digest, read_and_rewind(blob))], session) 

224 else: 

225 self._save_digests_to_index([(digest, None)], session) 

226 session.commit() 

227 return blob 

228 

229 # Blob was not found in index or storage 

230 return None 

231 

232 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

233 def delete_blob(self, digest: Digest) -> None: 

234 statement = delete(IndexEntry).where(IndexEntry.digest_hash == digest.hash) 

235 options = {"synchronize_session": False} 

236 

237 with self._sql.scoped_session() as session: 

238 session.execute(statement, execution_options=options) 

239 

240 self._storage.delete_blob(digest) 

241 

242 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

243 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

244 inline_blob = None 

245 if digest.size_bytes > self._max_inline_blob_size: 

246 self._storage.commit_write(digest, write_session) 

247 else: 

248 write_session.seek(0) 

249 inline_blob = write_session.read() 

250 try: 

251 with self._sql.scoped_session() as session: 

252 self._save_digests_to_index([(digest, inline_blob)], session) 

253 except DBAPIError as error: 

254 # Error has pgcode attribute (Postgres only) 

255 if hasattr(error.orig, "pgcode"): 

256 # imported here to avoid global dependency on psycopg2 

257 from psycopg2.errors import DiskFull, Error, OutOfMemory 

258 

259 # 53100 == DiskFull && 53200 == OutOfMemory 

260 original_exception = cast(Error, error.orig) 

261 if isinstance(original_exception, (DiskFull, OutOfMemory)): 

262 raise StorageFullError( 

263 f"Postgres Error: {original_exception.pgerror} ({original_exception.pgcode}" 

264 ) from error 

265 raise error 

266 

267 def _partitioned_hashes(self, digests: Sequence[Digest]) -> Iterator[Iterator[str]]: 

268 """Given a long list of digests, split it into parts no larger than 

269 _inclause_limit and yield the hashes in each part. 

270 """ 

271 for part_start in range(0, len(digests), self._inclause_limit): 

272 part_end = min(len(digests), part_start + self._inclause_limit) 

273 part_digests = itertools.islice(digests, part_start, part_end) 

274 yield map(lambda digest: digest.hash, part_digests) 

275 

276 def _bulk_select_digests( 

277 self, digests: Sequence[Digest], fetch_blobs: bool = False, fetch_deleted: bool = True 

278 ) -> Iterator[IndexEntry]: 

279 """Generator that selects all rows matching a digest list. 

280 

281 SQLAlchemy Core is used for this because the ORM has problems with 

282 large numbers of bind variables for WHERE IN clauses. 

283 

284 We only select on the digest hash (not hash and size) to allow for 

285 index-only queries on db backends that support them. 

286 """ 

287 index_table = IndexEntry.__table__ 

288 with self._sql.scoped_session() as session: 

289 columns = [index_table.c.digest_hash] 

290 if fetch_blobs: 

291 columns.append(index_table.c.inline_blob) 

292 for part in self._partitioned_hashes(digests): 

293 stmt = select(*columns).where(index_table.c.digest_hash.in_(part)) 

294 if not fetch_deleted: 

295 stmt = stmt.where(not_(index_table.c.deleted)) 

296 entries = session.execute(stmt) 

297 yield from entries # type: ignore 

298 

299 @timed(METRIC.STORAGE.SQL_INDEX.UPDATE_TIMESTAMP_DURATION) 

300 def _bulk_refresh_timestamps( 

301 self, digests: Sequence[Digest], session: SessionType, update_time: datetime | None = None 

302 ) -> None: 

303 """Refresh all timestamps of the input digests. 

304 

305 SQLAlchemy Core is used for this because the ORM is not suitable for 

306 bulk inserts and updates. 

307 

308 https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow 

309 """ 

310 # See discussion of __table__ typing in https://github.com/sqlalchemy/sqlalchemy/issues/9130 

311 index_table = cast(Table, IndexEntry.__table__) 

312 current_time = datetime.utcnow() 

313 

314 # If a timestamp was passed in, use it. And always refreshes (no threshold). 

315 if update_time: 

316 timestamp = update_time 

317 last_accessed_threshold = current_time 

318 # Otherwise timestamp of digest will not refresh if it was last accessed more recent than this threshold. 

319 else: 

320 timestamp = current_time 

321 last_accessed_threshold = current_time - timedelta(seconds=self.refresh_accesstime_older_than) 

322 

323 for part in self._partitioned_hashes(digests): 

324 # Generate the SQL Statement: 

325 # UPDATE index SET accessed_timestamp=<timestamp> 

326 # WHERE index.digest_hash IN 

327 # (SELECT index.digest_hash FROM index 

328 # WHERE index.digest_hash IN <part> AND WHERE index.accessed_timestamp < <last_accessed_threshold> 

329 # FOR UPDATE SKIP LOCKED) 

330 stmt = ( 

331 index_table.update() 

332 .where( 

333 index_table.c.digest_hash.in_( 

334 select(index_table.c.digest_hash) 

335 .where(index_table.c.digest_hash.in_(part)) 

336 .where(index_table.c.accessed_timestamp < last_accessed_threshold) 

337 .with_for_update(skip_locked=True) 

338 ) 

339 ) 

340 .values(accessed_timestamp=timestamp) 

341 ) 

342 session.execute(stmt) 

343 session.commit() 

344 

345 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

346 def missing_blobs(self, digests: list[Digest]) -> list[Digest]: 

347 # Blobs marked as deleted are considered as missing 

348 entries = self._bulk_select_digests(digests, fetch_deleted=False) 

349 

350 found_hashes = {entry.digest_hash for entry in entries} 

351 

352 # Split the digests into two found/missing lists 

353 found_digests, missing_digests = [], [] 

354 for digest in digests: 

355 if digest.hash in found_hashes: 

356 found_digests.append(digest) 

357 else: 

358 missing_digests.append(digest) 

359 

360 # Update all timestamps for blobs which were found 

361 with self._sql.scoped_session() as session: 

362 self._bulk_refresh_timestamps(found_digests, session) 

363 

364 return missing_digests 

365 

366 @timed(METRIC.STORAGE.SQL_INDEX.SAVE_DIGESTS_DURATION) 

367 def _save_digests_to_index( 

368 self, digest_blob_pairs: list[tuple[Digest, bytes | None]], session: SessionType 

369 ) -> None: 

370 """Helper to persist a list of digest/blob pairs to the index. 

371 

372 Any digests present are updated, and new digests are inserted along with their inline blobs (if provided). 

373 Only blobs with size less than or equal to the max_inline_blob_size are inserted directly into the index. 

374 """ 

375 if not digest_blob_pairs: 

376 return 

377 

378 digest_blob_pairs = sorted(digest_blob_pairs, key=lambda pair: (pair[0].hash, pair[0].size_bytes)) 

379 

380 if self._dialect_delegate: 

381 try: 

382 self._dialect_delegate._save_digests_to_index( # type: ignore 

383 digest_blob_pairs, session, self._max_inline_blob_size 

384 ) 

385 return 

386 except AttributeError: 

387 pass 

388 

389 update_time = datetime.utcnow() 

390 # Figure out which digests we can just update 

391 digests = [digest for (digest, blob) in digest_blob_pairs] 

392 entries = self._bulk_select_digests(digests) 

393 # Map digests to new entries 

394 entries_not_present = { 

395 digest.hash: { 

396 "digest_hash": digest.hash, 

397 "digest_size_bytes": digest.size_bytes, 

398 "accessed_timestamp": update_time, 

399 "inline_blob": (blob if digest.size_bytes <= self._max_inline_blob_size else None), 

400 "deleted": False, 

401 } 

402 for (digest, blob) in digest_blob_pairs 

403 } 

404 

405 entries_present = {} 

406 for entry in entries: 

407 entries_present[entry.digest_hash] = entries_not_present[entry.digest_hash] 

408 del entries_not_present[entry.digest_hash] 

409 

410 if entries_not_present: 

411 session.bulk_insert_mappings(IndexEntry, entries_not_present.values()) # type: ignore 

412 if entries_present: 

413 session.bulk_update_mappings(IndexEntry, entries_present.values()) # type: ignore 

414 

415 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

416 def bulk_update_blobs( # pylint: disable=arguments-renamed 

417 self, digest_blob_pairs: list[tuple[Digest, bytes]] 

418 ) -> list[Status]: 

419 """Implement the StorageABC's bulk_update_blobs method. 

420 

421 The StorageABC interface takes in a list of digest/blob pairs and 

422 returns a list of results. The list of results MUST be ordered to 

423 correspond with the order of the input list.""" 

424 pairs_to_store = [] 

425 result_map = {} 

426 

427 # For each blob, determine whether to store it in the backing storage or inline it 

428 for digest, blob in digest_blob_pairs: 

429 if validate_digest_data(digest, blob): 

430 if digest.size_bytes > self._max_inline_blob_size: 

431 pairs_to_store.append((digest, blob)) 

432 else: 

433 result_map[digest.hash] = Status(code=code_pb2.OK) 

434 else: 

435 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash") 

436 missing_blobs = self.missing_blobs([digest for digest, _ in pairs_to_store]) 

437 missing_blob_pairs = [] 

438 for digest, blob in pairs_to_store: 

439 if digest not in missing_blobs: 

440 result_map[digest.hash] = Status(code=code_pb2.OK) 

441 else: 

442 missing_blob_pairs.append((digest, blob)) 

443 

444 backup_results = self._storage.bulk_update_blobs(missing_blob_pairs) 

445 

446 for digest, result in zip(missing_blobs, backup_results): 

447 if digest.hash in result_map: 

448 # ERROR: blob was both inlined and backed up 

449 raise RuntimeError("Blob was both inlined and backed up.") 

450 result_map[digest.hash] = result 

451 

452 # Generate the final list of results 

453 pairs_to_inline: list[tuple[Digest, bytes | None]] = [] 

454 results = [] 

455 for digest, blob in digest_blob_pairs: 

456 status = result_map.get( 

457 digest.hash, 

458 Status(code=code_pb2.UNKNOWN, message="SQL Index: unable to determine the status of this blob"), 

459 ) 

460 results.append(status) 

461 if status.code == code_pb2.OK: 

462 pairs_to_inline.append((digest, blob)) 

463 

464 with self._sql.scoped_session() as session: 

465 self._save_digests_to_index(pairs_to_inline, session) 

466 

467 return results 

468 

469 def _bulk_read_blobs_with_fallback(self, digests: list[Digest]) -> dict[str, bytes]: 

470 hash_to_digest: dict[str, Digest] = {digest.hash: digest for digest in digests} 

471 results: dict[str, bytes] = {} 

472 

473 expected_storage_digests: list[Digest] = [] 

474 # Fetch inlined blobs directly from the index 

475 entries = self._bulk_select_digests(digests, fetch_blobs=True) 

476 for e in entries: 

477 blob, digest_hash, digest = e.inline_blob, e.digest_hash, hash_to_digest[e.digest_hash] 

478 if blob is not None: 

479 results[digest_hash] = blob 

480 hash_to_digest.pop(digest_hash) 

481 else: 

482 # If a blob is not inlined then the blob is expected to be in storage 

483 expected_storage_digests.append(digest) 

484 

485 # Fetch everything that wasn't inlined from the backing storage 

486 fetched_digests = self._storage.bulk_read_blobs(list(hash_to_digest.values())) 

487 

488 # Save everything fetched from storage, inlining the blobs if they're small enough 

489 digest_pairs_to_save: list[tuple[Digest, bytes | None]] = [] 

490 for digest_hash, blob_data in fetched_digests.items(): 

491 if blob_data is not None: 

492 digest = hash_to_digest[digest_hash] 

493 if digest.size_bytes <= self._max_inline_blob_size: 

494 digest_pairs_to_save.append((digest, blob_data)) 

495 else: 

496 digest_pairs_to_save.append((digest, None)) 

497 results[digest_hash] = blob_data 

498 

499 # List of digests found in storage 

500 acutal_storage_digest_hashes = set( 

501 digest_hash for (digest_hash, blob_data) in fetched_digests.items() if blob_data is not None 

502 ) 

503 # Get a list of all the digests that were in the index but not found in storage 

504 digests_expected_not_in_storage: list[Digest] = [] 

505 for expected_digest in expected_storage_digests: 

506 if expected_digest.hash not in acutal_storage_digest_hashes: 

507 LOGGER.warning( 

508 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest) 

509 ) 

510 digests_expected_not_in_storage.append(expected_digest) 

511 

512 with self._sql.scoped_session() as session: 

513 self._save_digests_to_index(digest_pairs_to_save, session) 

514 if digests_expected_not_in_storage: 

515 self._bulk_delete_from_index(digests_expected_not_in_storage, session) 

516 session.commit() 

517 

518 return results 

519 

520 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

521 def bulk_read_blobs(self, digests: list[Digest]) -> dict[str, bytes]: 

522 if self._fallback_on_get: 

523 return self._bulk_read_blobs_with_fallback(digests) 

524 

525 # If fallback is disabled, query the index first and only 

526 # query the storage for blobs that weren't inlined there 

527 

528 hash_to_digest = {digest.hash: digest for digest in digests} # hash -> digest map 

529 results: dict[str, bytes] = {} # The final list of results (return value) 

530 digests_to_fetch: list[Digest] = [] # Digests that need to be fetched from storage 

531 digest_pairs_to_save: list[tuple[Digest, bytes | None]] = [] # Digests that need to be updated in the index 

532 

533 # Fetch all of the digests in the database 

534 # Anything that wasn't already inlined needs to be fetched 

535 entries = self._bulk_select_digests(digests, fetch_blobs=True) 

536 for index_entry in entries: 

537 digest = hash_to_digest[index_entry.digest_hash] 

538 if index_entry.inline_blob is not None: 

539 results[index_entry.digest_hash] = index_entry.inline_blob 

540 else: 

541 digests_to_fetch.append(digest) 

542 

543 # Caution: digest whose blob cannot be found from storage will be dropped. 

544 if digests_to_fetch: 

545 fetched_digests = self._storage.bulk_read_blobs(digests_to_fetch) 

546 else: 

547 fetched_digests = {} 

548 

549 # Generate the list of inputs for _save_digests_to_index 

550 # 

551 # We only need to send blob data for small blobs fetched 

552 # from the storage since everything else is either too 

553 # big or already inlined 

554 for digest in digests_to_fetch: 

555 if blob_data := fetched_digests.get(digest.hash): 

556 if digest.size_bytes <= self._max_inline_blob_size: 

557 digest_pairs_to_save.append((digest, blob_data)) 

558 

559 acutal_storage_digests = set(digest_hash for (digest_hash, _) in fetched_digests.items()) 

560 # Get a list of all the digests that were in the index but not found in storage 

561 digests_expected_not_in_storage: list[Digest] = [] 

562 for expected_digest in digests_to_fetch: 

563 if expected_digest.hash not in acutal_storage_digests: 

564 LOGGER.warning( 

565 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest) 

566 ) 

567 digests_expected_not_in_storage.append(expected_digest) 

568 

569 # Update any blobs which need to be inlined 

570 with self._sql.scoped_session() as session: 

571 self._save_digests_to_index(digest_pairs_to_save, session) 

572 if digests_expected_not_in_storage: 

573 self._bulk_delete_from_index(digests_expected_not_in_storage, session) 

574 session.commit() 

575 

576 results.update(fetched_digests) 

577 return results 

578 

579 def _column_windows( 

580 self, session: SessionType, column: InstrumentedAttribute[Any] 

581 ) -> Iterator[ColumnElement[bool]]: 

582 """Adapted from the sqlalchemy WindowedRangeQuery recipe. 

583 https://github.com/sqlalchemy/sqlalchemy/wiki/WindowedRangeQuery 

584 

585 This method breaks the timestamp range into windows and yields 

586 the borders of these windows to the callee. For example, the borders 

587 yielded by this might look something like 

588 ('2019-10-08 18:25:03.699863', '2019-10-08 18:25:03.751018') 

589 ('2019-10-08 18:25:03.751018', '2019-10-08 18:25:03.807867') 

590 ('2019-10-08 18:25:03.807867', '2019-10-08 18:25:03.862192') 

591 ('2019-10-08 18:25:03.862192',) 

592 

593 _windowed_lru_digests uses these borders to form WHERE clauses for its 

594 SELECTs. In doing so, we make sure to repeatedly query the database for 

595 live updates, striking a balance between loading the entire resultset 

596 into memory and querying each row individually, both of which are 

597 inefficient in the context of a large index. 

598 

599 The window size is a parameter and can be configured. A larger window 

600 size will yield better performance (fewer SQL queries) at the cost of 

601 memory (holding on to the results of the query) and accuracy (blobs 

602 may get updated while you're working on them), and vice versa for a 

603 smaller window size. 

604 """ 

605 

606 def int_for_range(start_id: Any, end_id: Any) -> ColumnElement[bool]: 

607 if end_id: 

608 return and_(column >= start_id, column < end_id) 

609 else: 

610 return column >= start_id # type: ignore[no-any-return] 

611 

612 # Constructs a query that: 

613 # 1. Gets all the timestamps in sorted order. 

614 # 2. Assign a row number to each entry. 

615 # 3. Only keep timestamps that are every other N row number apart. N="_all_blobs_window_size". 

616 # SELECT 

617 # anon_1.index_accessed_timestamp AS anon_1_index_accessed_timestamp 

618 # FROM ( 

619 # SELECT 

620 # index.accessed_timestamp AS index_accessed_timestamp, 

621 # row_number() OVER (ORDER BY index.accessed_timestamp) AS rownum 

622 # FROM index 

623 # ) 

624 # AS anon_1 

625 # WHERE rownum % 1000=1 

626 # 

627 # Note: 

628 # - This query can be slow due to checking each entry with "WHERE rownum % 1000=1". 

629 # - These timestamps will be the basis for constructing the SQL "WHERE" clauses later. 

630 rownum = func.row_number().over(order_by=column).label("rownum") 

631 subq = select(column, rownum).subquery() 

632 

633 # The upstream recipe noted in the docstring uses `subq.corresponding_column` here. That 

634 # method takes a KeyedColumnElement, which the ORM InstrumentedAttributes are not instances 

635 # of. Rather than switching to passing actual columns here, we can take advantage of controlling 

636 # the initial `select` to instead use the subquery columns directly and avoid ever calling this 

637 # method. 

638 # 

639 # See https://github.com/sqlalchemy/sqlalchemy/discussions/10325#discussioncomment-6952547. 

640 target_column = subq.columns[0] 

641 rownum_column = subq.columns[1] 

642 

643 stmt = select(target_column) 

644 if self._all_blobs_window_size > 1: 

645 stmt = stmt.filter(rownum_column % self._all_blobs_window_size == 1) 

646 

647 # Execute the underlying query against the database. 

648 # Ex: intervals = [t1, t1001, t2001, ...], q = [(t1, ), (t1001, ), (t2001, ), ...] 

649 intervals = list(session.scalars(stmt)) 

650 

651 # Generate the whereclauses 

652 while intervals: 

653 start = intervals.pop(0) 

654 if intervals: 

655 end = intervals[0] 

656 else: 

657 end = None 

658 # Ex: yield "WHERE IndexEntry.accessed_timestamp >= start AND IndexEntry.accessed_timestamp < end" 

659 yield int_for_range(start, end) 

660 

661 def _windowed_lru_digests( 

662 self, q: "Query[Any]", column: InstrumentedAttribute[Any] 

663 ) -> Iterator[tuple[IndexEntry, bool]]: 

664 """Generate a query for each window produced by _column_windows 

665 and yield the results one by one. 

666 """ 

667 # Determine whether the conditions are met to make an SQL call to get new windows. 

668 msg = "Using stored LRU windows" 

669 if len(self._queue_of_whereclauses) == 0: 

670 msg = "Requesting new LRU windows." 

671 self._queue_of_whereclauses = deque(self._column_windows(q.session, column)) 

672 self._delete_premarked_blobs = True 

673 

674 msg += f" Number of windows remaining: {len(self._queue_of_whereclauses)}" 

675 LOGGER.debug(msg) 

676 

677 while self._queue_of_whereclauses: 

678 whereclause = self._queue_of_whereclauses[0] 

679 window = q.filter(whereclause).order_by(column.asc()) 

680 yield from window 

681 

682 # If yield from window doesn't get to this point that means 

683 # the cleanup hasn't consumed all the content in a whereclause and exited. 

684 # Otherwise, the whereclause is exhausted and can be discarded. 

685 self._queue_of_whereclauses.popleft() 

686 

687 def least_recent_digests(self) -> Iterator[Digest]: 

688 with self._sql.scoped_session() as session: 

689 # TODO: session.query is legacy, we should replace this with the `select` construct 

690 # as we do elsewhere. 

691 q = session.query(IndexEntry) 

692 for entry in self._windowed_lru_digests(q, IndexEntry.accessed_timestamp): 

693 # TODO make this generic or delete this method only used by tests. 

694 index_entry = cast(IndexEntry, entry) 

695 assert isinstance(index_entry.digest_hash, str) 

696 assert isinstance(index_entry.digest_size_bytes, int) 

697 yield Digest(hash=index_entry.digest_hash, size_bytes=index_entry.digest_size_bytes) 

698 

699 @timed(METRIC.STORAGE.SQL_INDEX.SIZE_CALCULATION_DURATION) 

700 def get_total_size(self) -> int: 

701 statement = select(func.sum(IndexEntry.digest_size_bytes)) 

702 with self._sql.scoped_session() as session: 

703 result = session.execute(statement).scalar() 

704 if result is None: 

705 result = 0 

706 return result 

707 

708 def get_blob_count(self) -> int: 

709 with self._sql.scoped_session() as session: 

710 statement = select(func.count(IndexEntry.digest_hash)) 

711 return session.execute(statement).scalar() or 0 

712 

713 @timed(METRIC.STORAGE.SQL_INDEX.DELETE_N_BYTES_DURATION) 

714 def delete_n_bytes( 

715 self, 

716 n_bytes: int, 

717 dry_run: bool = False, 

718 protect_blobs_after: datetime | None = None, 

719 large_blob_threshold: int | None = None, 

720 large_blob_lifetime: datetime | None = None, 

721 ) -> int: 

722 """ 

723 When using a SQL Index, entries with a delete marker are "in the process of being deleted". 

724 This is required because storage operations can't be safely tied to the SQL index transaction 

725 (one may fail independently of the other, and you end up inconsistent). 

726 

727 The workflow is roughly as follows: 

728 - Start a SQL transaction. 

729 - Lock and mark the indexed items you want to delete. 

730 - Close the SQL transaction. 

731 - Perform the storage deletes 

732 - Start a SQL transaction. 

733 - Actually delete the index entries. 

734 - Close the SQL transaction. 

735 

736 This means anything with deleted=False will always be present in the backing store. If it is marked 

737 deleted=True, and the process gets killed when deleting from the backing storage, only 

738 some of the items might actually be gone. 

739 

740 The next time the cleaner starts up, it can try to do that delete again (ignoring 404s). 

741 Eventually that will succeed and the item will actually be removed from the DB. Only during 

742 the first run of batches do we consider already marked items. This avoids multiple cleanup 

743 daemons from competing with each other on every batch. 

744 """ 

745 if protect_blobs_after is None: 

746 protect_blobs_after = datetime.utcnow() 

747 

748 # Used for metric publishing 

749 delete_start_time = time.time() 

750 

751 storage_digests: list[Digest] = [] 

752 marked_digests: list[Digest] = [] 

753 collected_bytes = 0 

754 

755 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

756 base_query = session.query(IndexEntry, IndexEntry.inline_blob != None).options( # noqa 

757 load_only(IndexEntry.digest_hash, IndexEntry.digest_size_bytes) 

758 ) 

759 

760 if self._delete_premarked_blobs: 

761 LOGGER.info("Starting to gather pre-marked records.") 

762 premarked_query = base_query.filter_by(deleted=True) 

763 for [entry, is_inline] in premarked_query.all(): 

764 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes) 

765 marked_digests.append(digest) 

766 if not is_inline: 

767 storage_digests.append(digest) 

768 collected_bytes += entry.digest_size_bytes 

769 

770 if not dry_run: 

771 publish_counter_metric(METRIC.STORAGE.SQL_INDEX.PREMARKED_DELETED_COUNT, len(marked_digests)) 

772 LOGGER.info( 

773 "Gathered pre-marked bytes.", tags=dict(collected_bytes=collected_bytes, max_bytes=n_bytes) 

774 ) 

775 self._delete_premarked_blobs = False 

776 

777 if collected_bytes < n_bytes: 

778 LOGGER.info("Searching for records to mark deleted.") 

779 unmarked_query = base_query.filter_by(deleted=False).with_for_update(skip_locked=True) 

780 if large_blob_lifetime and large_blob_threshold: 

781 unmarked_query = unmarked_query.filter( 

782 (IndexEntry.accessed_timestamp < protect_blobs_after) 

783 | ( 

784 (IndexEntry.digest_size_bytes > large_blob_threshold) 

785 & (IndexEntry.accessed_timestamp < large_blob_lifetime) 

786 ) 

787 ) 

788 else: 

789 unmarked_query = unmarked_query.filter(IndexEntry.accessed_timestamp < protect_blobs_after) 

790 window = self._windowed_lru_digests(unmarked_query, IndexEntry.accessed_timestamp) 

791 mark_deleted_start = time.perf_counter() 

792 for [entry, is_inline] in window: 

793 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes) 

794 marked_digests.append(digest) 

795 if not is_inline: 

796 storage_digests.append(digest) 

797 collected_bytes += entry.digest_size_bytes 

798 if not dry_run: 

799 entry.deleted = True 

800 if collected_bytes >= n_bytes: 

801 break 

802 mark_deleted_duration = timedelta(seconds=time.perf_counter() - mark_deleted_start) 

803 if not dry_run: 

804 publish_timer_metric(METRIC.STORAGE.SQL_INDEX.MARK_DELETED_DURATION, mark_deleted_duration) 

805 LOGGER.info("Gathered bytes.", tags=dict(collected_bytes=collected_bytes, max_bytes=n_bytes)) 

806 

807 if dry_run: 

808 return collected_bytes 

809 

810 failed_deletes = self._storage.bulk_delete(storage_digests) 

811 digests_to_delete = [x for x in marked_digests if f"{x.hash}/{x.size_bytes}" not in failed_deletes] 

812 

813 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

814 failed_deletes.extend(self._bulk_delete_from_index(digests_to_delete, session)) 

815 for digest in digests_to_delete: 

816 if digest in failed_deletes: 

817 collected_bytes -= digest.size_bytes 

818 

819 batch_duration = time.time() - delete_start_time 

820 blobs_deleted_per_second = (len(digests_to_delete) - len(failed_deletes)) / batch_duration 

821 publish_gauge_metric(METRIC.CLEANUP.BLOBS_DELETED_PER_SECOND, blobs_deleted_per_second) 

822 return collected_bytes 

823 

824 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

825 def bulk_delete(self, digests: list[Digest]) -> list[str]: 

826 # Delete from the index and then delete from the backing storage. 

827 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

828 failed_deletes = self._bulk_delete_from_index(digests, session) 

829 

830 digests_to_delete = [x for x in digests if f"{x.hash}/{x.size_bytes}" not in failed_deletes] 

831 failed_deletes.extend(self._storage.bulk_delete(digests_to_delete)) 

832 return failed_deletes 

833 

834 @timed(METRIC.STORAGE.SQL_INDEX.BULK_DELETE_INDEX_DURATION) 

835 def _bulk_delete_from_index(self, digests: list[Digest], session: Session) -> list[str]: 

836 LOGGER.info("Deleting digests from the index.", tags=dict(digest_count=len(digests))) 

837 # See discussion of __table__ typing in https://github.com/sqlalchemy/sqlalchemy/issues/9130 

838 index_table = cast(Table, IndexEntry.__table__) 

839 hashes = [x.hash for x in digests] 

840 

841 # Make sure we don't exceed maximum size of an IN clause 

842 n = self._inclause_limit 

843 hash_chunks = [hashes[i : i + n] for i in range(0, len(hashes), n)] 

844 

845 # We will not raise, rollback, or log on StaleDataErrors. 

846 # These errors occur when we delete fewer rows than we were expecting. 

847 # This is fine, since the missing rows will get deleted eventually. 

848 # When running bulk_deletes concurrently, StaleDataErrors 

849 # occur too often to log. 

850 num_blobs_deleted = 0 

851 for chunk in hash_chunks: 

852 # Do not wait for locks when deleting rows. Skip locked rows to 

853 # avoid deadlocks. 

854 stmt = index_table.delete().where( 

855 index_table.c.digest_hash.in_( 

856 select(index_table.c.digest_hash) 

857 .where(index_table.c.digest_hash.in_(chunk)) 

858 .with_for_update(skip_locked=True) 

859 ) 

860 ) 

861 num_blobs_deleted += session.execute(stmt).rowcount 

862 LOGGER.info("Blobs deleted from the index.", tags=dict(deleted_count=num_blobs_deleted, digest_count=digests)) 

863 

864 # bulk_delete is typically expected to return the digests that were not deleted, 

865 # but delete only returns the number of rows deleted and not what was/wasn't 

866 # deleted. Getting this info would require extra queries, so assume that 

867 # everything was either deleted or already deleted. Failures will continue to throw 

868 return []