Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql.py: 92.79%

416 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SQLIndex 

18================== 

19 

20A SQL index implementation. This can be pointed to either a remote SQL server 

21or a local SQLite database. 

22 

23""" 

24import io 

25import itertools 

26import time 

27from collections import deque 

28from datetime import datetime, timedelta 

29from typing import IO, Any, AnyStr, Deque, Dict, Iterator, List, Optional, Sequence, Tuple, cast 

30 

31from sqlalchemy import Column, and_, delete, func, not_, select, text 

32from sqlalchemy.exc import DBAPIError 

33from sqlalchemy.orm import Session, load_only 

34from sqlalchemy.orm.exc import StaleDataError 

35from sqlalchemy.orm.query import Query 

36from sqlalchemy.orm.session import Session as SessionType 

37from sqlalchemy.sql.elements import BooleanClauseList 

38 

39from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

40from buildgrid._protos.google.rpc import code_pb2 

41from buildgrid._protos.google.rpc.status_pb2 import Status 

42from buildgrid.server.decorators import timed 

43from buildgrid.server.exceptions import StorageFullError 

44from buildgrid.server.logging import buildgrid_logger 

45from buildgrid.server.metrics_names import METRIC 

46from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric, publish_timer_metric 

47from buildgrid.server.sql.models import IndexEntry 

48from buildgrid.server.sql.provider import SqlProvider 

49from buildgrid.server.utils.digests import validate_digest_data 

50 

51from ..storage_abc import StorageABC 

52from .index_abc import IndexABC 

53from .sql_dialect_delegates import PostgreSQLDelegate, SQLiteDelegate 

54 

55LOGGER = buildgrid_logger(__name__) 

56DIALECT_DELEGATES = {"postgresql": PostgreSQLDelegate, "sqlite": SQLiteDelegate} 

57 

58INLINE_BLOB_SIZE_HARD_MAXIMUM = 1000000000 

59 

60 

61def read_and_rewind(read_head: IO[AnyStr]) -> Optional[AnyStr]: 

62 """Reads from an IO object and returns the data found there 

63 after rewinding the object to the beginning. 

64 

65 Args: 

66 read_head (IO): readable IO head 

67 

68 Returns: 

69 AnyStr: readable content from `read_head`. 

70 """ 

71 if not read_head: 

72 return None 

73 

74 data = read_head.read() 

75 read_head.seek(0) 

76 return data 

77 

78 

79class SQLIndex(IndexABC): 

80 TYPE = "SQLIndex" 

81 

82 def __init__( 

83 self, 

84 sql_provider: SqlProvider, 

85 storage: StorageABC, 

86 *, 

87 window_size: int = 1000, 

88 inclause_limit: int = -1, 

89 max_inline_blob_size: int = 0, 

90 refresh_accesstime_older_than: int = 0, 

91 **kwargs: Any, 

92 ) -> None: 

93 base_argnames = ["fallback_on_get"] 

94 base_args = {} 

95 for arg in base_argnames: 

96 if arg in kwargs: 

97 base_args[arg] = kwargs.pop(arg) 

98 super().__init__(**base_args) 

99 

100 self._sql = sql_provider 

101 self._storage = storage 

102 

103 if max_inline_blob_size > INLINE_BLOB_SIZE_HARD_MAXIMUM: 

104 raise ValueError( 

105 f"Max inline blob size is [{max_inline_blob_size}], " 

106 f"but must be less than [{INLINE_BLOB_SIZE_HARD_MAXIMUM}]." 

107 ) 

108 if max_inline_blob_size >= 0: 

109 self._max_inline_blob_size = max_inline_blob_size 

110 else: 

111 raise ValueError(f"Max inline blob size is [{max_inline_blob_size}], but must be nonnegative.") 

112 

113 if refresh_accesstime_older_than >= 0: 

114 # Measured in seconds. Helps reduce the frequency of timestamp updates during heavy read. 

115 self.refresh_accesstime_older_than = refresh_accesstime_older_than 

116 else: 

117 raise ValueError( 

118 f"'refresh_accesstime_older_than' must be nonnegative. It is {refresh_accesstime_older_than}" 

119 ) 

120 

121 # Max # of rows to fetch while iterating over all blobs 

122 # (e.g. in least_recent_digests) 

123 self._all_blobs_window_size = window_size 

124 

125 # This variable stores the list of whereclauses (SQLAlchemy BooleanClauseList objects) 

126 # generated from the _column_windows() using time-expensive SQL query. 

127 # These whereclauses are used to construct the final SQL query 

128 # during cleanup in order to fetch blobs by time windows. 

129 # 

130 # Inside the _column_windows() a list of timestamp boarders are obtained: 

131 # intervals = [t1, t2, t3, ...] 

132 # Then the generated whereclauses might represent semantically as, for example,: 

133 # self._queue_of_whereclauses = [ 

134 # "WHERE t1 <= IndexEntry.accessed_timestamp < t2", 

135 # "WHERE t2 <= IndexEntry.accessed_timestamp < t3", 

136 # "WHERE t3 <= IndexEntry.accessed_timestamp < t4", 

137 # ... and so on] 

138 # Note the number of entries in each window is determined by 

139 # the instance variable "_all_blobs_window_size". 

140 self._queue_of_whereclauses: Deque[BooleanClauseList[Any]] = deque() 

141 

142 # Whether entries with deleted=True should be considered by mark_n_bytes. 

143 # This is useful to catch any half-finished deletes where a cleanup process 

144 # may have exited half-way through deletion. Once all premarked blobs have been 

145 # deleted this becomes False and is only reset after a full scan of the database 

146 self._delete_premarked_blobs: bool = True 

147 

148 # Only pass known kwargs to db session 

149 available_options = {"pool_size", "max_overflow", "pool_timeout", "pool_pre_ping", "pool_recycle"} 

150 kwargs_keys = kwargs.keys() 

151 if kwargs_keys > available_options: 

152 unknown_args = kwargs_keys - available_options 

153 raise TypeError(f"Unknown keyword arguments: [{unknown_args}]") 

154 

155 # Dialect-specific initialization 

156 self._dialect_delegate = DIALECT_DELEGATES.get(self._sql.dialect) 

157 

158 if inclause_limit > 0: 

159 if inclause_limit > window_size: 

160 LOGGER.warning( 

161 "Configured inclause limit is greater than window size.", 

162 tags=dict(inclause_limit=inclause_limit, window_size=window_size), 

163 ) 

164 self._inclause_limit = inclause_limit 

165 else: 

166 # If the inlimit isn't explicitly set, we use a default that 

167 # respects both the window size and the db implementation's 

168 # inlimit. 

169 self._inclause_limit = min(window_size, self._sql.default_inlimit) 

170 LOGGER.debug("SQL index: using default inclause limit.", tags=dict(inclause_limit=self._inclause_limit)) 

171 

172 # Make a test query against the database to ensure the connection is valid 

173 with self._sql.scoped_session() as session: 

174 session.query(IndexEntry).first() 

175 self._sql.remove_scoped_session() 

176 

177 def start(self) -> None: 

178 self._storage.start() 

179 

180 def stop(self) -> None: 

181 self._storage.stop() 

182 

183 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

184 def has_blob(self, digest: Digest) -> bool: 

185 with self._sql.scoped_session() as session: 

186 statement = select(func.count(IndexEntry.digest_hash)).where(IndexEntry.digest_hash == digest.hash) 

187 

188 num_entries = session.execute(statement).scalar() 

189 if num_entries is None: 

190 num_entries = 0 

191 

192 if num_entries == 1: 

193 return True 

194 elif num_entries < 1: 

195 return False 

196 else: 

197 raise RuntimeError(f"Multiple results found for blob [{digest}]. The index is in a bad state.") 

198 

199 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

200 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

201 """Get a blob from the index or the backing storage. Optionally fallback and repair index""" 

202 

203 # Check the index for the blob and return if found. 

204 with self._sql.scoped_session() as session: 

205 if entry := session.query(IndexEntry).filter(IndexEntry.digest_hash == digest.hash).first(): 

206 if entry.inline_blob is not None: 

207 return io.BytesIO(entry.inline_blob) 

208 elif blob := self._storage.get_blob(digest): 

209 # Fix any blobs that should have been inlined. 

210 if digest.size_bytes <= self._max_inline_blob_size: 

211 self._save_digests_to_index([(digest, read_and_rewind(blob))], session) 

212 session.commit() 

213 return blob 

214 LOGGER.warning( 

215 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest) 

216 ) 

217 self._bulk_delete_from_index([digest], session) 

218 

219 # Check the storage for the blob and repair the index if found. 

220 if self._fallback_on_get: 

221 if blob := self._storage.get_blob(digest): 

222 with self._sql.scoped_session() as session: 

223 if digest.size_bytes <= self._max_inline_blob_size: 

224 self._save_digests_to_index([(digest, read_and_rewind(blob))], session) 

225 else: 

226 self._save_digests_to_index([(digest, None)], session) 

227 session.commit() 

228 return blob 

229 

230 # Blob was not found in index or storage 

231 return None 

232 

233 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

234 def delete_blob(self, digest: Digest) -> None: 

235 statement = delete(IndexEntry).where(IndexEntry.digest_hash == digest.hash) 

236 options = {"synchronize_session": False} 

237 

238 with self._sql.scoped_session() as session: 

239 session.execute(statement, execution_options=options) 

240 

241 self._storage.delete_blob(digest) 

242 

243 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

244 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

245 inline_blob = None 

246 if digest.size_bytes > self._max_inline_blob_size: 

247 self._storage.commit_write(digest, write_session) 

248 else: 

249 write_session.seek(0) 

250 inline_blob = write_session.read() 

251 try: 

252 with self._sql.scoped_session() as session: 

253 self._save_digests_to_index([(digest, inline_blob)], session) 

254 except DBAPIError as error: 

255 # Error has pgcode attribute (Postgres only) 

256 if hasattr(error.orig, "pgcode"): 

257 # imported here to avoid global dependency on psycopg2 

258 from psycopg2.errors import DiskFull, OutOfMemory 

259 

260 # 53100 == DiskFull && 53200 == OutOfMemory 

261 if error.orig.pgerror in [DiskFull, OutOfMemory]: 

262 raise StorageFullError(f"Postgres Error: {error.orig.pgcode}") from error 

263 raise error 

264 

265 def _partitioned_hashes(self, digests: Sequence[Digest]) -> Iterator[Iterator[str]]: 

266 """Given a long list of digests, split it into parts no larger than 

267 _inclause_limit and yield the hashes in each part. 

268 """ 

269 for part_start in range(0, len(digests), self._inclause_limit): 

270 part_end = min(len(digests), part_start + self._inclause_limit) 

271 part_digests = itertools.islice(digests, part_start, part_end) 

272 yield map(lambda digest: digest.hash, part_digests) 

273 

274 def _bulk_select_digests( 

275 self, digests: Sequence[Digest], fetch_blobs: bool = False, fetch_deleted: bool = True 

276 ) -> Iterator[IndexEntry]: 

277 """Generator that selects all rows matching a digest list. 

278 

279 SQLAlchemy Core is used for this because the ORM has problems with 

280 large numbers of bind variables for WHERE IN clauses. 

281 

282 We only select on the digest hash (not hash and size) to allow for 

283 index-only queries on db backends that support them. 

284 """ 

285 index_table = IndexEntry.__table__ 

286 with self._sql.scoped_session() as session: 

287 columns = [index_table.c.digest_hash] 

288 if fetch_blobs: 

289 columns.append(index_table.c.inline_blob) 

290 for part in self._partitioned_hashes(digests): 

291 stmt = select(columns).where(index_table.c.digest_hash.in_(part)) 

292 if not fetch_deleted: 

293 stmt = stmt.where(not_(index_table.c.deleted)) 

294 entries = session.execute(stmt) 

295 yield from entries # type: ignore 

296 

297 @timed(METRIC.STORAGE.SQL_INDEX.UPDATE_TIMESTAMP_DURATION) 

298 def _bulk_refresh_timestamps( 

299 self, digests: Sequence[Digest], session: SessionType, update_time: Optional[datetime] = None 

300 ) -> None: 

301 """Refresh all timestamps of the input digests. 

302 

303 SQLAlchemy Core is used for this because the ORM is not suitable for 

304 bulk inserts and updates. 

305 

306 https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow 

307 """ 

308 index_table = IndexEntry.__table__ 

309 current_time = datetime.utcnow() 

310 

311 # If a timestamp was passed in, use it. And always refreshes (no threshold). 

312 if update_time: 

313 timestamp = update_time 

314 last_accessed_threshold = current_time 

315 # Otherwise timestamp of digest will not refresh if it was last accessed more recent than this threshold. 

316 else: 

317 timestamp = current_time 

318 last_accessed_threshold = current_time - timedelta(seconds=self.refresh_accesstime_older_than) 

319 

320 for part in self._partitioned_hashes(digests): 

321 # Generate the SQL Statement: 

322 # UPDATE index SET accessed_timestamp=<timestamp> 

323 # WHERE index.digest_hash IN 

324 # (SELECT index.digest_hash FROM index 

325 # WHERE index.digest_hash IN <part> AND WHERE index.accessed_timestamp < <last_accessed_threshold> 

326 # FOR UPDATE SKIP LOCKED) 

327 stmt = ( 

328 index_table.update() 

329 .where( 

330 index_table.c.digest_hash.in_( 

331 select(index_table.c.digest_hash) 

332 .where(index_table.c.digest_hash.in_(part)) 

333 .where(index_table.c.accessed_timestamp < last_accessed_threshold) 

334 .with_for_update(skip_locked=True) 

335 ) 

336 ) 

337 .values(accessed_timestamp=timestamp) 

338 ) 

339 session.execute(stmt) 

340 session.commit() 

341 

342 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

343 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

344 # Blobs marked as deleted are considered as missing 

345 entries = self._bulk_select_digests(digests, fetch_deleted=False) 

346 

347 found_hashes = {entry.digest_hash for entry in entries} 

348 

349 # Split the digests into two found/missing lists 

350 found_digests, missing_digests = [], [] 

351 for digest in digests: 

352 if digest.hash in found_hashes: 

353 found_digests.append(digest) 

354 else: 

355 missing_digests.append(digest) 

356 

357 # Update all timestamps for blobs which were found 

358 with self._sql.scoped_session() as session: 

359 self._bulk_refresh_timestamps(found_digests, session) 

360 

361 return missing_digests 

362 

363 @timed(METRIC.STORAGE.SQL_INDEX.SAVE_DIGESTS_DURATION) 

364 def _save_digests_to_index( 

365 self, digest_blob_pairs: List[Tuple[Digest, Optional[bytes]]], session: SessionType 

366 ) -> None: 

367 """Helper to persist a list of digest/blob pairs to the index. 

368 

369 Any digests present are updated, and new digests are inserted along with their inline blobs (if provided). 

370 Only blobs with size less than or equal to the max_inline_blob_size are inserted directly into the index. 

371 """ 

372 if not digest_blob_pairs: 

373 return 

374 

375 digest_blob_pairs = sorted(digest_blob_pairs, key=lambda pair: (pair[0].hash, pair[0].size_bytes)) 

376 

377 if self._dialect_delegate: 

378 try: 

379 self._dialect_delegate._save_digests_to_index( # type: ignore 

380 digest_blob_pairs, session, self._max_inline_blob_size 

381 ) 

382 return 

383 except AttributeError: 

384 pass 

385 

386 update_time = datetime.utcnow() 

387 # Figure out which digests we can just update 

388 digests = [digest for (digest, blob) in digest_blob_pairs] 

389 entries = self._bulk_select_digests(digests) 

390 # Map digests to new entries 

391 entries_not_present = { 

392 digest.hash: { 

393 "digest_hash": digest.hash, 

394 "digest_size_bytes": digest.size_bytes, 

395 "accessed_timestamp": update_time, 

396 "inline_blob": (blob if digest.size_bytes <= self._max_inline_blob_size else None), 

397 "deleted": False, 

398 } 

399 for (digest, blob) in digest_blob_pairs 

400 } 

401 

402 entries_present = {} 

403 for entry in entries: 

404 entries_present[entry.digest_hash] = entries_not_present[entry.digest_hash] 

405 del entries_not_present[entry.digest_hash] 

406 

407 if entries_not_present: 

408 session.bulk_insert_mappings(IndexEntry, entries_not_present.values()) # type: ignore 

409 if entries_present: 

410 session.bulk_update_mappings(IndexEntry, entries_present.values()) # type: ignore 

411 

412 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

413 def bulk_update_blobs( # pylint: disable=arguments-renamed 

414 self, digest_blob_pairs: List[Tuple[Digest, bytes]] 

415 ) -> List[Status]: 

416 """Implement the StorageABC's bulk_update_blobs method. 

417 

418 The StorageABC interface takes in a list of digest/blob pairs and 

419 returns a list of results. The list of results MUST be ordered to 

420 correspond with the order of the input list.""" 

421 pairs_to_store = [] 

422 result_map = {} 

423 

424 # For each blob, determine whether to store it in the backing storage or inline it 

425 for digest, blob in digest_blob_pairs: 

426 if validate_digest_data(digest, blob): 

427 if digest.size_bytes > self._max_inline_blob_size: 

428 pairs_to_store.append((digest, blob)) 

429 else: 

430 result_map[digest.hash] = Status(code=code_pb2.OK) 

431 else: 

432 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash") 

433 missing_blobs = self.missing_blobs([digest for digest, _ in pairs_to_store]) 

434 missing_blob_pairs = [] 

435 for digest, blob in pairs_to_store: 

436 if digest not in missing_blobs: 

437 result_map[digest.hash] = Status(code=code_pb2.OK) 

438 else: 

439 missing_blob_pairs.append((digest, blob)) 

440 

441 backup_results = self._storage.bulk_update_blobs(missing_blob_pairs) 

442 

443 for digest, result in zip(missing_blobs, backup_results): 

444 if digest.hash in result_map: 

445 # ERROR: blob was both inlined and backed up 

446 raise RuntimeError("Blob was both inlined and backed up.") 

447 result_map[digest.hash] = result 

448 

449 # Generate the final list of results 

450 pairs_to_inline: List[Tuple[Digest, Optional[bytes]]] = [] 

451 results = [] 

452 for digest, blob in digest_blob_pairs: 

453 status = result_map.get( 

454 digest.hash, 

455 Status(code=code_pb2.UNKNOWN, message="SQL Index: unable to determine the status of this blob"), 

456 ) 

457 results.append(status) 

458 if status.code == code_pb2.OK: 

459 pairs_to_inline.append((digest, blob)) 

460 

461 with self._sql.scoped_session() as session: 

462 self._save_digests_to_index(pairs_to_inline, session) 

463 

464 return results 

465 

466 def _bulk_read_blobs_with_fallback(self, digests: List[Digest]) -> Dict[str, bytes]: 

467 hash_to_digest: Dict[str, Digest] = {digest.hash: digest for digest in digests} 

468 results: Dict[str, bytes] = {} 

469 

470 expected_storage_digests: List[Digest] = [] 

471 # Fetch inlined blobs directly from the index 

472 entries = self._bulk_select_digests(digests, fetch_blobs=True) 

473 for e in entries: 

474 blob, digest_hash, digest = e.inline_blob, e.digest_hash, hash_to_digest[e.digest_hash] 

475 if blob is not None: 

476 results[digest_hash] = blob 

477 hash_to_digest.pop(digest_hash) 

478 else: 

479 # If a blob is not inlined then the blob is expected to be in storage 

480 expected_storage_digests.append(digest) 

481 

482 # Fetch everything that wasn't inlined from the backing storage 

483 fetched_digests = self._storage.bulk_read_blobs(list(hash_to_digest.values())) 

484 

485 # Save everything fetched from storage, inlining the blobs if they're small enough 

486 digest_pairs_to_save: List[Tuple[Digest, Optional[bytes]]] = [] 

487 for digest_hash, blob_data in fetched_digests.items(): 

488 if blob_data is not None: 

489 digest = hash_to_digest[digest_hash] 

490 if digest.size_bytes <= self._max_inline_blob_size: 

491 digest_pairs_to_save.append((digest, blob_data)) 

492 else: 

493 digest_pairs_to_save.append((digest, None)) 

494 results[digest_hash] = blob_data 

495 

496 # List of digests found in storage 

497 acutal_storage_digest_hashes = set( 

498 digest_hash for (digest_hash, blob_data) in fetched_digests.items() if blob_data is not None 

499 ) 

500 # Get a list of all the digests that were in the index but not found in storage 

501 digests_expected_not_in_storage: List[Digest] = [] 

502 for expected_digest in expected_storage_digests: 

503 if expected_digest.hash not in acutal_storage_digest_hashes: 

504 LOGGER.warning( 

505 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest) 

506 ) 

507 digests_expected_not_in_storage.append(expected_digest) 

508 

509 with self._sql.scoped_session() as session: 

510 self._save_digests_to_index(digest_pairs_to_save, session) 

511 if digests_expected_not_in_storage: 

512 self._bulk_delete_from_index(digests_expected_not_in_storage, session) 

513 session.commit() 

514 

515 return results 

516 

517 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

518 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

519 if self._fallback_on_get: 

520 return self._bulk_read_blobs_with_fallback(digests) 

521 

522 # If fallback is disabled, query the index first and only 

523 # query the storage for blobs that weren't inlined there 

524 

525 hash_to_digest = {digest.hash: digest for digest in digests} # hash -> digest map 

526 results: Dict[str, bytes] = {} # The final list of results (return value) 

527 digests_to_fetch: List[Digest] = [] # Digests that need to be fetched from storage 

528 digest_pairs_to_save: List[Tuple[Digest, Optional[bytes]]] = [] # Digests that need to be updated in the index 

529 

530 # Fetch all of the digests in the database 

531 # Anything that wasn't already inlined needs to be fetched 

532 entries = self._bulk_select_digests(digests, fetch_blobs=True) 

533 for index_entry in entries: 

534 digest = hash_to_digest[index_entry.digest_hash] 

535 if index_entry.inline_blob is not None: 

536 results[index_entry.digest_hash] = index_entry.inline_blob 

537 else: 

538 digests_to_fetch.append(digest) 

539 

540 # Caution: digest whose blob cannot be found from storage will be dropped. 

541 if digests_to_fetch: 

542 fetched_digests = self._storage.bulk_read_blobs(digests_to_fetch) 

543 else: 

544 fetched_digests = {} 

545 

546 # Generate the list of inputs for _save_digests_to_index 

547 # 

548 # We only need to send blob data for small blobs fetched 

549 # from the storage since everything else is either too 

550 # big or already inlined 

551 for digest in digests_to_fetch: 

552 if blob_data := fetched_digests.get(digest.hash): 

553 if digest.size_bytes <= self._max_inline_blob_size: 

554 digest_pairs_to_save.append((digest, blob_data)) 

555 

556 acutal_storage_digests = set(digest_hash for (digest_hash, _) in fetched_digests.items()) 

557 # Get a list of all the digests that were in the index but not found in storage 

558 digests_expected_not_in_storage: List[Digest] = [] 

559 for expected_digest in digests_to_fetch: 

560 if expected_digest.hash not in acutal_storage_digests: 

561 LOGGER.warning( 

562 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest) 

563 ) 

564 digests_expected_not_in_storage.append(expected_digest) 

565 

566 # Update any blobs which need to be inlined 

567 with self._sql.scoped_session() as session: 

568 self._save_digests_to_index(digest_pairs_to_save, session) 

569 if digests_expected_not_in_storage: 

570 self._bulk_delete_from_index(digests_expected_not_in_storage, session) 

571 session.commit() 

572 

573 results.update(fetched_digests) 

574 return results 

575 

576 def _column_windows(self, session: SessionType, column: Column[Any]) -> Iterator[BooleanClauseList[Any]]: 

577 """Adapted from the sqlalchemy WindowedRangeQuery recipe. 

578 https://github.com/sqlalchemy/sqlalchemy/wiki/WindowedRangeQuery 

579 

580 This method breaks the timestamp range into windows and yields 

581 the borders of these windows to the callee. For example, the borders 

582 yielded by this might look something like 

583 ('2019-10-08 18:25:03.699863', '2019-10-08 18:25:03.751018') 

584 ('2019-10-08 18:25:03.751018', '2019-10-08 18:25:03.807867') 

585 ('2019-10-08 18:25:03.807867', '2019-10-08 18:25:03.862192') 

586 ('2019-10-08 18:25:03.862192',) 

587 

588 _windowed_lru_digests uses these borders to form WHERE clauses for its 

589 SELECTs. In doing so, we make sure to repeatedly query the database for 

590 live updates, striking a balance between loading the entire resultset 

591 into memory and querying each row individually, both of which are 

592 inefficient in the context of a large index. 

593 

594 The window size is a parameter and can be configured. A larger window 

595 size will yield better performance (fewer SQL queries) at the cost of 

596 memory (holding on to the results of the query) and accuracy (blobs 

597 may get updated while you're working on them), and vice versa for a 

598 smaller window size. 

599 """ 

600 

601 def int_for_range(start_id: Any, end_id: Any) -> BooleanClauseList[Any]: 

602 if end_id: 

603 return and_(column >= start_id, column < end_id) 

604 else: 

605 return column >= start_id # type: ignore[no-any-return] 

606 

607 # Constructs a query that: 

608 # 1. Gets all the timestamps in sorted order. 

609 # 2. Assign a row number to each entry. 

610 # 3. Only keep timestamps that are every other N row number apart. N="_all_blobs_window_size". 

611 # SELECT 

612 # anon_1.index_accessed_timestamp AS anon_1_index_accessed_timestamp 

613 # FROM ( 

614 # SELECT 

615 # index.accessed_timestamp AS index_accessed_timestamp, 

616 # row_number() OVER (ORDER BY index.accessed_timestamp) AS rownum 

617 # FROM index 

618 # ) 

619 # AS anon_1 

620 # WHERE rownum % 1000=1 

621 # 

622 # Note: 

623 # - This query can be slow due to checking each entry with "WHERE rownum % 1000=1". 

624 # - These timestamps will be the basis for constructing the SQL "WHERE" clauses later. 

625 q = session.query(column, func.row_number().over(order_by=column).label("rownum")).from_self(column) 

626 if self._all_blobs_window_size > 1: 

627 q = q.filter(text(f"rownum % {self._all_blobs_window_size}=1")) 

628 

629 # Execute the underlying query against the database. 

630 # Ex: intervals = [t1, t1001, t2001, ...], q = [(t1, ), (t1001, ), (t2001, ), ...] 

631 intervals = [id for id, in q] 

632 

633 # Generate the whereclauses 

634 while intervals: 

635 start = intervals.pop(0) 

636 if intervals: 

637 end = intervals[0] 

638 else: 

639 end = None 

640 # Ex: yield "WHERE IndexEntry.accessed_timestamp >= start AND IndexEntry.accessed_timestamp < end" 

641 yield int_for_range(start, end) 

642 

643 def _windowed_lru_digests(self, q: "Query[Any]", column: Column[Any]) -> Iterator[Tuple[IndexEntry, bool]]: 

644 """Generate a query for each window produced by _column_windows 

645 and yield the results one by one. 

646 """ 

647 # Determine whether the conditions are met to make an SQL call to get new windows. 

648 msg = "Using stored LRU windows" 

649 if len(self._queue_of_whereclauses) == 0: 

650 msg = "Requesting new LRU windows." 

651 self._queue_of_whereclauses = deque(self._column_windows(q.session, column)) # type: ignore 

652 self._delete_premarked_blobs = True 

653 

654 msg += f" Number of windows remaining: {len(self._queue_of_whereclauses)}" 

655 LOGGER.debug(msg) 

656 

657 while self._queue_of_whereclauses: 

658 whereclause = self._queue_of_whereclauses[0] 

659 window = q.filter(whereclause).order_by(column.asc()) 

660 yield from window 

661 

662 # If yield from window doesn't get to this point that means 

663 # the cleanup hasn't consumed all the content in a whereclause and exited. 

664 # Otherwise, the whereclause is exhausted and can be discarded. 

665 self._queue_of_whereclauses.popleft() 

666 

667 def least_recent_digests(self) -> Iterator[Digest]: 

668 with self._sql.scoped_session() as session: 

669 q = session.query(IndexEntry) 

670 for entry in self._windowed_lru_digests(q, IndexEntry.accessed_timestamp): 

671 # TODO make this generic or delete this method only used by tests. 

672 index_entry = cast(IndexEntry, entry) 

673 assert isinstance(index_entry.digest_hash, str) 

674 assert isinstance(index_entry.digest_size_bytes, int) 

675 yield Digest(hash=index_entry.digest_hash, size_bytes=index_entry.digest_size_bytes) 

676 

677 @timed(METRIC.STORAGE.SQL_INDEX.SIZE_CALCULATION_DURATION) 

678 def get_total_size(self) -> int: 

679 statement = select(func.sum(IndexEntry.digest_size_bytes)) 

680 with self._sql.scoped_session() as session: 

681 result = session.execute(statement).scalar() 

682 if result is None: 

683 result = 0 

684 return result 

685 

686 @timed(METRIC.STORAGE.SQL_INDEX.DELETE_N_BYTES_DURATION) 

687 def delete_n_bytes( 

688 self, n_bytes: int, dry_run: bool = False, protect_blobs_after: Optional[datetime] = None 

689 ) -> int: 

690 """ 

691 When using a SQL Index, entries with a delete marker are "in the process of being deleted". 

692 This is required because storage operations can't be safely tied to the SQL index transaction 

693 (one may fail independently of the other, and you end up inconsistent). 

694 

695 The workflow is roughly as follows: 

696 - Start a SQL transaction. 

697 - Lock and mark the indexed items you want to delete. 

698 - Close the SQL transaction. 

699 - Perform the storage deletes 

700 - Start a SQL transaction. 

701 - Actually delete the index entries. 

702 - Close the SQL transaction. 

703 

704 This means anything with deleted=False will always be present in the backing store. If it is marked 

705 deleted=True, and the process gets killed when deleting from the backing storage, only 

706 some of the items might actually be gone. 

707 

708 The next time the cleaner starts up, it can try to do that delete again (ignoring 404s). 

709 Eventually that will succeed and the item will actually be removed from the DB. Only during 

710 the first run of batches do we consider already marked items. This avoids multiple cleanup 

711 daemons from competing with each other on every batch. 

712 """ 

713 if protect_blobs_after is None: 

714 protect_blobs_after = datetime.utcnow() 

715 

716 # Used for metric publishing 

717 delete_start_time = time.time() 

718 

719 storage_digests: List[Digest] = [] 

720 marked_digests: List[Digest] = [] 

721 collected_bytes = 0 

722 

723 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

724 base_query = session.query(IndexEntry, IndexEntry.inline_blob != None).options( # noqa 

725 load_only("digest_hash", "digest_size_bytes") 

726 ) 

727 

728 if self._delete_premarked_blobs: 

729 LOGGER.info("Starting to gather pre-marked records.") 

730 premarked_query = base_query.filter_by(deleted=True) 

731 for [entry, is_inline] in premarked_query.all(): 

732 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes) 

733 marked_digests.append(digest) 

734 if not is_inline: 

735 storage_digests.append(digest) 

736 collected_bytes += entry.digest_size_bytes 

737 

738 if not dry_run: 

739 publish_counter_metric(METRIC.STORAGE.SQL_INDEX.PREMARKED_DELETED_COUNT, len(marked_digests)) 

740 LOGGER.info( 

741 "Gathered pre-marked bytes.", tags=dict(collected_bytes=collected_bytes, max_bytes=n_bytes) 

742 ) 

743 self._delete_premarked_blobs = False 

744 

745 if collected_bytes < n_bytes: 

746 LOGGER.info("Searching for records to mark deleted.") 

747 unmarked_query = ( 

748 base_query.filter_by(deleted=False) 

749 .filter(IndexEntry.accessed_timestamp < protect_blobs_after) 

750 .with_for_update(skip_locked=True) 

751 ) 

752 window = self._windowed_lru_digests(unmarked_query, IndexEntry.accessed_timestamp) 

753 mark_deleted_start = time.perf_counter() 

754 for [entry, is_inline] in window: 

755 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes) 

756 marked_digests.append(digest) 

757 if not is_inline: 

758 storage_digests.append(digest) 

759 collected_bytes += entry.digest_size_bytes 

760 if not dry_run: 

761 entry.deleted = True 

762 if collected_bytes >= n_bytes: 

763 break 

764 mark_deleted_duration = timedelta(seconds=time.perf_counter() - mark_deleted_start) 

765 if not dry_run: 

766 publish_timer_metric(METRIC.STORAGE.SQL_INDEX.MARK_DELETED_DURATION, mark_deleted_duration) 

767 LOGGER.info("Gathered bytes.", tags=dict(collected_bytes=collected_bytes, max_bytes=n_bytes)) 

768 

769 if dry_run: 

770 return collected_bytes 

771 

772 failed_deletes = self._storage.bulk_delete(storage_digests) 

773 digests_to_delete = [x for x in marked_digests if f"{x.hash}/{x.size_bytes}" not in failed_deletes] 

774 

775 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

776 failed_deletes.extend(self._bulk_delete_from_index(digests_to_delete, session)) 

777 for digest in digests_to_delete: 

778 if digest in failed_deletes: 

779 collected_bytes -= digest.size_bytes 

780 

781 batch_duration = time.time() - delete_start_time 

782 blobs_deleted_per_second = (len(digests_to_delete) - len(failed_deletes)) / batch_duration 

783 publish_gauge_metric(METRIC.CLEANUP.BLOBS_DELETED_PER_SECOND, blobs_deleted_per_second) 

784 return collected_bytes 

785 

786 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

787 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

788 # Delete from the index and then delete from the backing storage. 

789 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

790 failed_deletes = self._bulk_delete_from_index(digests, session) 

791 

792 digests_to_delete = [x for x in digests if f"{x.hash}/{x.size_bytes}" not in failed_deletes] 

793 failed_deletes.extend(self._storage.bulk_delete(digests_to_delete)) 

794 return failed_deletes 

795 

796 @timed(METRIC.STORAGE.SQL_INDEX.BULK_DELETE_INDEX_DURATION) 

797 def _bulk_delete_from_index(self, digests: List[Digest], session: Session) -> List[str]: 

798 LOGGER.info("Deleting digests from the index.", tags=dict(digest_count=len(digests))) 

799 index_table = IndexEntry.__table__ 

800 hashes = [x.hash for x in digests] 

801 

802 # Make sure we don't exceed maximum size of an IN clause 

803 n = self._inclause_limit 

804 hash_chunks = [hashes[i : i + n] for i in range(0, len(hashes), n)] 

805 

806 # We will not raise, rollback, or log on StaleDataErrors. 

807 # These errors occur when we delete fewer rows than we were expecting. 

808 # This is fine, since the missing rows will get deleted eventually. 

809 # When running bulk_deletes concurrently, StaleDataErrors 

810 # occur too often to log. 

811 num_blobs_deleted = 0 

812 for chunk in hash_chunks: 

813 # Do not wait for locks when deleting rows. Skip locked rows to 

814 # avoid deadlocks. 

815 stmt = index_table.delete().where( 

816 index_table.c.digest_hash.in_( 

817 select([index_table.c.digest_hash], index_table.c.digest_hash.in_(chunk)).with_for_update( 

818 skip_locked=True 

819 ) 

820 ) 

821 ) 

822 num_blobs_deleted += session.execute(stmt).rowcount # type: ignore 

823 LOGGER.info("Blobs deleted from the index.", tags=dict(deleted_count=num_blobs_deleted, digest_count=digests)) 

824 

825 # bulk_delete is typically expected to return the digests that were not deleted, 

826 # but delete only returns the number of rows deleted and not what was/wasn't 

827 # deleted. Getting this info would require extra queries, so assume that 

828 # everything was either deleted or already deleted. Failures will continue to throw 

829 return []