Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql.py: 92.68%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

314 statements  

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SQLIndex 

18================== 

19 

20A SQL index implementation. This can be pointed to either a remote SQL server 

21or a local SQLite database. 

22 

23""" 

24 

25from contextlib import contextmanager 

26from datetime import datetime, timedelta 

27import logging 

28import os 

29import itertools 

30from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple, Union, Type 

31 

32from alembic import command 

33from alembic.config import Config 

34from sqlalchemy import and_, create_engine, event, func, text, Column 

35from sqlalchemy.sql import select 

36from sqlalchemy.sql.elements import BooleanClauseList 

37from sqlalchemy.orm import scoped_session 

38from sqlalchemy.orm.query import Query 

39from sqlalchemy.orm.session import sessionmaker, Session as SessionType 

40from sqlalchemy.orm.exc import StaleDataError 

41from sqlalchemy.exc import SQLAlchemyError, DBAPIError 

42 

43from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

44from buildgrid._protos.google.rpc import code_pb2 

45from buildgrid._protos.google.rpc.status_pb2 import Status 

46from buildgrid.server.sql import sqlutils 

47from buildgrid.server.monitoring import get_monitoring_bus 

48from buildgrid.server.metrics_names import ( 

49 CAS_INDEX_BLOB_TIMESTAMP_UPDATE_TIME_METRIC_NAME, 

50 CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME, 

51 CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME, 

52 CAS_INDEX_GET_BLOB_TIME_METRIC_NAME, 

53 CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME, 

54 CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME, 

55 CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME, 

56) 

57from buildgrid.server.metrics_utils import ( 

58 DurationMetric, 

59 generator_method_duration_metric, 

60 publish_counter_metric 

61) 

62from buildgrid.server.persistence.sql.impl import sqlite_on_connect 

63from buildgrid.server.persistence.sql.models import IndexEntry 

64from buildgrid.settings import MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS 

65from buildgrid._exceptions import DatabaseError, StorageFullError, RetriableDatabaseError 

66from ..storage_abc import StorageABC 

67from .index_abc import IndexABC 

68from .sql_dialect_delegates import PostgreSQLDelegate 

69 

70 

71# Each dialect has a limit on the number of bind parameters allowed. This 

72# matters because it determines how large we can allow our IN clauses to get. 

73# 

74# SQLite: 1000 https://www.sqlite.org/limits.html#max_variable_number 

75# PostgreSQL: 32767 (Int16.MAX_VALUE) https://www.postgresql.org/docs/9.4/protocol-message-formats.html 

76# 

77# We'll refer to this as the "inlimit" in the code. The inlimits are 

78# set to 75% of the bind parameter limit of the implementation. 

79DIALECT_INLIMIT_MAP = { 

80 "sqlite": 750, 

81 "postgresql": 24000 

82} 

83DEFAULT_INLIMIT = 100 

84 

85DIALECT_DELEGATES = { 

86 "postgresql": PostgreSQLDelegate 

87} 

88 

89 

90class SQLIndex(IndexABC): 

91 

92 def _get_default_inlimit(self, dialect) -> int: 

93 """ Map a connection string to its inlimit. """ 

94 if dialect not in DIALECT_INLIMIT_MAP: 

95 self.__logger.warning( 

96 f"The SQL dialect [{dialect}] is unsupported, and " 

97 f"errors may occur. Supported dialects are {list(DIALECT_INLIMIT_MAP.keys())}. " 

98 f"Using default inclause limit of {DEFAULT_INLIMIT}.") 

99 return DEFAULT_INLIMIT 

100 

101 return DIALECT_INLIMIT_MAP[dialect] 

102 

103 def __init__(self, storage: StorageABC, connection_string: str, 

104 automigrate: bool=False, window_size: int=1000, 

105 inclause_limit: int=-1, connection_timeout: int=5, 

106 **kwargs): 

107 base_argnames = ['fallback_on_get'] 

108 base_args = {} 

109 for arg in base_argnames: 

110 if arg in kwargs: 

111 base_args[arg] = kwargs.pop(arg) 

112 super().__init__(**base_args) 

113 self.__logger = logging.getLogger(__name__) 

114 

115 self._storage = storage 

116 self._instance_name = None 

117 

118 # Max # of rows to fetch while iterating over all blobs 

119 # (e.g. in least_recent_digests) 

120 self._all_blobs_window_size = window_size 

121 

122 # Only pass known kwargs to db session 

123 available_options = {'pool_size', 'max_overflow', 'pool_timeout', 

124 'pool_pre_ping', 'pool_recycle'} 

125 kwargs_keys = kwargs.keys() 

126 if kwargs_keys > available_options: 

127 unknown_args = kwargs_keys - available_options 

128 raise TypeError(f"Unknown keyword arguments: [{unknown_args}]") 

129 

130 self._create_sqlalchemy_engine( 

131 connection_string, automigrate, connection_timeout, **kwargs) 

132 

133 self._sql_pool_dispose_helper = sqlutils.SQLPoolDisposeHelper(COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS, 

134 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, 

135 self._engine) 

136 

137 # Dialect-specific initialization 

138 dialect = self._engine.dialect.name 

139 self._dialect_delegate = DIALECT_DELEGATES.get(dialect) 

140 

141 if inclause_limit > 0: 

142 if inclause_limit > window_size: 

143 self.__logger.warning( 

144 f"Configured inclause_limit [{inclause_limit}] " 

145 f"is greater than window_size [{window_size}]") 

146 self._inclause_limit = inclause_limit 

147 else: 

148 # If the inlimit isn't explicitly set, we use a default that 

149 # respects both the window size and the db implementation's 

150 # inlimit. 

151 self._inclause_limit = min( 

152 window_size, 

153 self._get_default_inlimit(dialect)) 

154 self.__logger.debug("SQL index: using default inclause limit " 

155 f"of {self._inclause_limit}") 

156 

157 session_factory = sessionmaker() 

158 self.Session = scoped_session(session_factory) 

159 self.Session.configure(bind=self._engine) 

160 

161 # Make a test query against the database to ensure the connection is valid 

162 with self.session() as session: 

163 session.query(IndexEntry).first() 

164 

165 def _create_or_migrate_db(self, connection_string: str) -> None: 

166 self.__logger.warning( 

167 "Will attempt migration to latest version if needed.") 

168 

169 config = Config() 

170 

171 config.set_main_option("script_location", 

172 os.path.join( 

173 os.path.dirname(__file__), 

174 "../../../persistence/sql/alembic")) 

175 

176 with self._engine.begin() as connection: 

177 config.attributes['connection'] = connection 

178 command.upgrade(config, "head") 

179 

180 def _create_sqlalchemy_engine(self, connection_string, automigrate, connection_timeout, **kwargs): 

181 self.automigrate = automigrate 

182 

183 # Disallow sqlite in-memory because multi-threaded access to it is 

184 # complex and potentially problematic at best 

185 # ref: https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#threading-pooling-behavior 

186 if sqlutils.is_sqlite_inmemory_connection_string(connection_string): 

187 raise ValueError("Cannot use SQLite in-memory with BuildGrid (connection_string=" 

188 f"[{connection_string}]). Use a file or leave the connection_string " 

189 "empty for a tempfile.") 

190 

191 if connection_timeout is not None: 

192 if "connect_args" not in kwargs: 

193 kwargs["connect_args"] = {} 

194 if sqlutils.is_sqlite_connection_string(connection_string): 

195 kwargs["connect_args"]["timeout"] = connection_timeout 

196 elif sqlutils.is_psycopg2_connection_string(connection_string): 

197 kwargs["connect_args"]["connect_timeout"] = connection_timeout 

198 # Additional postgres specific timeouts 

199 # Additional libpg options 

200 # Note that those timeouts are in milliseconds (so *1000) 

201 kwargs["connect_args"]["options"] = f'-c lock_timeout={connection_timeout * 1000}' 

202 

203 # Only pass the (known) kwargs that have been explicitly set by the user 

204 available_options = set([ 

205 'pool_size', 'max_overflow', 'pool_timeout', 'pool_pre_ping', 

206 'pool_recycle', 'connect_args' 

207 ]) 

208 kwargs_keys = set(kwargs.keys()) 

209 if not kwargs_keys.issubset(available_options): 

210 unknown_options = kwargs_keys - available_options 

211 raise TypeError(f"Unknown keyword arguments: [{unknown_options}]") 

212 

213 self.__logger.debug(f"SQLAlchemy additional kwargs: [{kwargs}]") 

214 

215 self._engine = create_engine(connection_string, echo=False, **kwargs) 

216 

217 self.__logger.info(f"Using SQL backend for index at connection [{repr(self._engine.url)}] " 

218 f"using additional SQL options {kwargs}") 

219 

220 if self._engine.dialect.name == "sqlite": 

221 event.listen(self._engine, "connect", sqlite_on_connect) 

222 

223 if self.automigrate: 

224 self._create_or_migrate_db(connection_string) 

225 

226 @contextmanager 

227 def session(self, *, sqlite_lock_immediately: bool=False, 

228 exceptions_to_not_raise_on: Optional[List[Type[SQLAlchemyError]]]=None, 

229 exceptions_to_not_rollback_on: Optional[List[Type[SQLAlchemyError]]]=None) -> Any: 

230 """ Context manager for convenience use of sessions. Automatically 

231 commits when the context ends and rolls back failed transactions. 

232 

233 Setting sqlite_lock_immediately will only yield a session once the 

234 SQLite database has been locked for exclusive use. 

235 

236 exceptions_to_not_raise_on is a list of exceptions which the session will not re-raise on. 

237 However, the session will still rollback on these errors. 

238 

239 exceptions_to_not_rollback_on is a list of exceptions which the session manager will not 

240 re-raise or rollback on. Being very careful specifying this option. It should only be used 

241 in cases where you know the exception will not put the database in a bad state. 

242 """ 

243 # If we recently disposed of the SQL pool due to connection issues 

244 # allow for some cooldown period before we attempt more SQL 

245 self._sql_pool_dispose_helper.wait_if_cooldown_in_effect() 

246 

247 # Try to obtain a session 

248 try: 

249 session = self.Session() 

250 if sqlite_lock_immediately and session.bind.name == "sqlite": 

251 session.execute("BEGIN IMMEDIATE") 

252 except Exception as e: 

253 self.__logger.error("Unable to obtain an Index database session.", exc_info=True) 

254 raise DatabaseError("Unable to obtain an Index database session.") from e 

255 

256 # Yield the session and catch exceptions that occur while using it 

257 # to roll-back if needed 

258 try: 

259 yield session 

260 session.commit() 

261 except Exception as e: 

262 transient_dberr = self._sql_pool_dispose_helper.check_dispose_pool(session, e) 

263 if exceptions_to_not_rollback_on and type(e) in exceptions_to_not_rollback_on: 

264 pass 

265 else: 

266 session.rollback() 

267 if transient_dberr: 

268 self.__logger.warning("Rolling back Index database session due to transient database error.", 

269 exc_info=True) 

270 else: 

271 self.__logger.error("Error committing Index database session. Rolling back.", exc_info=True) 

272 if exceptions_to_not_raise_on is None or type(e) not in exceptions_to_not_raise_on: 

273 if transient_dberr: 

274 raise RetriableDatabaseError( 

275 "Database connection was temporarily interrupted, please retry", 

276 timedelta(seconds=COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS)) from e 

277 raise 

278 finally: 

279 session.close() 

280 

281 def _query_for_entry_by_digest(self, digest: Digest, 

282 session: SessionType) -> Query: 

283 """ Helper method to query for a blob """ 

284 return session.query(IndexEntry).filter( 

285 IndexEntry.digest_hash == digest.hash 

286 ) 

287 

288 def has_blob(self, digest: Digest) -> bool: 

289 with self.session() as session: 

290 num_entries = self._query_for_entry_by_digest( 

291 digest, session).count() 

292 if num_entries == 1: 

293 return True 

294 elif num_entries < 1: 

295 return False 

296 else: 

297 raise RuntimeError( 

298 f"Multiple results found for blob [{digest}]. " 

299 "The index is in a bad state.") 

300 

301 @DurationMetric(CAS_INDEX_BLOB_TIMESTAMP_UPDATE_TIME_METRIC_NAME, instanced=True) 

302 def _update_blob_timestamp(self, digest: Digest, session: SessionType, 

303 sync_mode: Union[bool, str]=False) -> int: 

304 """ Refresh a blob's timestamp and return the number of rows updated """ 

305 entry = self._query_for_entry_by_digest(digest, session) 

306 num_rows_updated = entry.update({ 

307 "accessed_timestamp": datetime.utcnow() 

308 }, synchronize_session=sync_mode) 

309 return num_rows_updated 

310 

311 @DurationMetric(CAS_INDEX_GET_BLOB_TIME_METRIC_NAME, instanced=True) 

312 def get_blob(self, digest: Digest) -> Optional[BinaryIO]: 

313 num_rows_updated = 0 

314 with self.session() as session: 

315 num_rows_updated = self._update_blob_timestamp(digest, session) 

316 

317 if num_rows_updated > 1: 

318 raise RuntimeError( 

319 f"Multiple rows found for blob [{digest.hash}]. " 

320 "The index is in a bad state.") 

321 

322 if num_rows_updated == 0: 

323 # If fallback is enabled, try to fetch the blob and add it 

324 # to the index 

325 if self._fallback_on_get: 

326 blob = self._storage.get_blob(digest) 

327 if blob: 

328 self._save_digests_to_index([digest]) 

329 return blob 

330 else: 

331 # Otherwise just skip the storage entirely 

332 return None 

333 else: 

334 # If exactly one row in the index was updated, 

335 # grab the blob 

336 blob = self._storage.get_blob(digest) 

337 if not blob: 

338 self.__logger.warning(f"Unable to find blob [{digest.hash}] in storage") 

339 return blob 

340 

341 def delete_blob(self, digest: Digest) -> None: 

342 with self.session() as session: 

343 entry = self._query_for_entry_by_digest(digest, session) 

344 entry.delete(synchronize_session=False) 

345 self._storage.delete_blob(digest) 

346 

347 def begin_write(self, digest: Digest) -> BinaryIO: 

348 return self._storage.begin_write(digest) 

349 

350 def commit_write(self, digest: Digest, write_session: BinaryIO) -> None: 

351 # pylint: disable=no-name-in-module,import-outside-toplevel 

352 self._storage.commit_write(digest, write_session) 

353 try: 

354 self._save_digests_to_index([digest]) 

355 except DBAPIError as error: 

356 # Error has pgcode attribute (Postgres only) 

357 if hasattr(error.orig, 'pgcode'): 

358 # imported here to avoid global dependency on psycopg2 

359 from psycopg2.errors import DiskFull, OutOfMemory # type: ignore 

360 # 53100 == DiskFull && 53200 == OutOfMemory 

361 if error.orig.pgerror in [DiskFull, OutOfMemory]: 

362 raise StorageFullError(f"Postgres Error: {error.orig.pgcode}") from error 

363 raise error 

364 

365 def _partitioned_hashes(self, digests: Sequence[Digest]) -> Iterator[Iterator[str]]: 

366 """ Given a long list of digests, split it into parts no larger than 

367 _inclause_limit and yield the hashes in each part. 

368 """ 

369 for part_start in range(0, len(digests), self._inclause_limit): 

370 part_end = min(len(digests), part_start + self._inclause_limit) 

371 part_digests = itertools.islice(digests, part_start, part_end) 

372 yield map(lambda digest: digest.hash, part_digests) 

373 

374 @generator_method_duration_metric(CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME) 

375 def _bulk_select_digests(self, digests: Sequence[Digest]) -> Iterator[IndexEntry]: 

376 """ Generator that selects all rows matching a digest list. 

377 

378 SQLAlchemy Core is used for this because the ORM has problems with 

379 large numbers of bind variables for WHERE IN clauses. 

380 

381 We only select on the digest hash (not hash and size) to allow for 

382 index-only queries on db backends that support them. 

383 """ 

384 index_table = IndexEntry.__table__ 

385 with self.session() as session: 

386 for part in self._partitioned_hashes(digests): 

387 stmt = select( 

388 [index_table.c.digest_hash] 

389 ).where( 

390 index_table.c.digest_hash.in_(part) 

391 ) 

392 entries = session.execute(stmt) 

393 yield from entries 

394 

395 @DurationMetric(CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME, instanced=True) 

396 def _bulk_refresh_timestamps(self, digests: Sequence[Digest], update_time: Optional[datetime]=None): 

397 """ Refresh all timestamps of the input digests. 

398 

399 SQLAlchemy Core is used for this because the ORM is not suitable for 

400 bulk inserts and updates. 

401 

402 https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow 

403 """ 

404 index_table = IndexEntry.__table__ 

405 # If a timestamp was passed in, use it 

406 if update_time: 

407 timestamp = update_time 

408 else: 

409 timestamp = datetime.utcnow() 

410 with self.session() as session: 

411 for part in self._partitioned_hashes(digests): 

412 stmt = index_table.update().where( 

413 index_table.c.digest_hash.in_( 

414 select([index_table.c.digest_hash], index_table.c.digest_hash.in_(part) 

415 ).with_for_update(skip_locked=True)) 

416 ).values( 

417 accessed_timestamp=timestamp 

418 ) 

419 session.execute(stmt) 

420 

421 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

422 entries = self._bulk_select_digests(digests) 

423 

424 found_hashes = {entry.digest_hash for entry in entries} 

425 

426 # Update all timestamps 

427 self._bulk_refresh_timestamps(digests) 

428 

429 return [digest for digest in digests if digest.hash not in found_hashes] 

430 

431 @DurationMetric(CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME, instanced=True) 

432 def _save_digests_to_index(self, digests: List[Digest]) -> None: 

433 """ Persist a list of digests to the index. 

434 

435 Any digests present are updated, and new digests are inserted. 

436 """ 

437 if not digests: 

438 return 

439 

440 if self._dialect_delegate: 

441 try: 

442 with self.session() as session: 

443 self._dialect_delegate._save_digests_to_index(digests, session) 

444 return 

445 except AttributeError: 

446 pass 

447 

448 update_time = datetime.utcnow() 

449 

450 # Update existing digests 

451 self._bulk_refresh_timestamps(digests, update_time) 

452 

453 # Figure out which digests need to be inserted 

454 entries = self._bulk_select_digests(digests) 

455 

456 # Map digests to new entries 

457 entries_not_present = { 

458 digest.hash: IndexEntry( 

459 digest_hash=digest.hash, 

460 digest_size_bytes=digest.size_bytes, 

461 accessed_timestamp=update_time 

462 ) 

463 for digest in digests 

464 } 

465 for entry in entries: 

466 del entries_not_present[entry.digest_hash] 

467 

468 # Add new digests 

469 with self.session() as session: 

470 session.bulk_save_objects(entries_not_present.values()) 

471 

472 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

473 results = self._storage.bulk_update_blobs(blobs) 

474 digests_to_save = [ 

475 digest for ((digest, data), result) in zip(blobs, results) 

476 if result.code == code_pb2.OK 

477 ] 

478 self._save_digests_to_index(digests_to_save) 

479 return results 

480 

481 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, BinaryIO]: 

482 hash_to_digest = {digest.hash: digest for digest in digests} 

483 if self._fallback_on_get: 

484 # If fallback is enabled, query the backend first and update 

485 # the index which each blob found there 

486 results = self._storage.bulk_read_blobs(digests) 

487 # Save only the results that were fetched 

488 digests_to_save = [ 

489 hash_to_digest[digest_hash] for digest_hash in results 

490 if results[digest_hash] is not None 

491 ] 

492 self._save_digests_to_index(digests_to_save) 

493 return results 

494 else: 

495 # If fallback is disabled, query the index first and only 

496 # query the storage for blobs found there 

497 entries = self._bulk_select_digests(digests) 

498 digests_to_fetch = [hash_to_digest[entry.digest_hash] 

499 for entry in entries] 

500 # Update timestamps 

501 self._bulk_refresh_timestamps(digests_to_fetch) 

502 return self._storage.bulk_read_blobs(digests_to_fetch) 

503 

504 def _column_windows(self, session: SessionType, column: Column) -> Iterator[BooleanClauseList]: 

505 """ Adapted from the sqlalchemy WindowedRangeQuery recipe. 

506 https://github.com/sqlalchemy/sqlalchemy/wiki/WindowedRangeQuery 

507 

508 This method breaks the timestamp range into windows and yields 

509 the borders of these windows to the callee. For example, the borders 

510 yielded by this might look something like 

511 ('2019-10-08 18:25:03.699863', '2019-10-08 18:25:03.751018') 

512 ('2019-10-08 18:25:03.751018', '2019-10-08 18:25:03.807867') 

513 ('2019-10-08 18:25:03.807867', '2019-10-08 18:25:03.862192') 

514 ('2019-10-08 18:25:03.862192',) 

515 

516 _windowed_lru_digests uses these borders to form WHERE clauses for its 

517 SELECTs. In doing so, we make sure to repeatedly query the database for 

518 live updates, striking a balance between loading the entire resultset 

519 into memory and querying each row individually, both of which are 

520 inefficient in the context of a large index. 

521 

522 The window size is a parameter and can be configured. A larger window 

523 size will yield better performance (fewer SQL queries) at the cost of 

524 memory (holding on to the results of the query) and accuracy (blobs 

525 may get updated while you're working on them), and vice versa for a 

526 smaller window size. 

527 """ 

528 

529 def int_for_range(start_id: Any, end_id: Any) -> BooleanClauseList: 

530 if end_id: 

531 return and_( 

532 column >= start_id, 

533 column < end_id 

534 ) 

535 else: 

536 return column >= start_id 

537 

538 q = session.query( 

539 column, 

540 func.row_number() 

541 .over(order_by=column) 

542 .label('rownum') 

543 ).from_self(column) 

544 

545 if self._all_blobs_window_size > 1: 

546 q = q.filter(text(f"rownum % {self._all_blobs_window_size}=1")) 

547 

548 intervals = [id for id, in q] 

549 

550 while intervals: 

551 start = intervals.pop(0) 

552 if intervals: 

553 end = intervals[0] 

554 else: 

555 end = None 

556 yield int_for_range(start, end) 

557 

558 def _windowed_lru_digests(self, q: Query, column: Column) -> Iterator[IndexEntry]: 

559 """ Generate a query for each window produced by _column_windows 

560 and yield the results one by one. 

561 """ 

562 for whereclause in self._column_windows(q.session, column): 

563 window = q.filter(whereclause).order_by(column.asc()) 

564 yield from window 

565 

566 def least_recent_digests(self) -> Iterator[Digest]: 

567 with self.session() as session: 

568 q = session.query(IndexEntry) 

569 for index_entry in self._windowed_lru_digests(q, IndexEntry.accessed_timestamp): 

570 yield Digest(hash=index_entry.digest_hash, size_bytes=index_entry.digest_size_bytes) 

571 

572 @DurationMetric(CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME) 

573 def get_total_size(self) -> int: 

574 with self.session() as session: 

575 return session.query(func.sum(IndexEntry.digest_size_bytes)).scalar() 

576 

577 def mark_n_bytes_as_deleted(self, n_bytes: int, dry_run: bool=False, 

578 protect_blobs_after: Optional[datetime]=None) -> List[Digest]: 

579 # pylint: disable=singleton-comparison 

580 if protect_blobs_after is None: 

581 protect_blobs_after = datetime.utcnow() 

582 gathered = 0 

583 with self.session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

584 size_query = session.query(func.sum(IndexEntry.digest_size_bytes)) 

585 premarked_size = size_query.filter_by(deleted=True).scalar() 

586 if premarked_size is None: 

587 premarked_size = 0 

588 

589 q = session.query(IndexEntry) 

590 entries = q.filter_by(deleted=True).order_by(IndexEntry.accessed_timestamp).all() 

591 monitoring_bus = get_monitoring_bus() 

592 if monitoring_bus.is_enabled: 

593 metadata = {} 

594 if self._instance_name: 

595 metadata["instance-name"] = self._instance_name 

596 publish_counter_metric( 

597 CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME, 

598 len(entries), 

599 metadata 

600 ) 

601 

602 if premarked_size < n_bytes: 

603 gathered = premarked_size 

604 q = q.filter(IndexEntry.deleted == False, IndexEntry.accessed_timestamp < 

605 protect_blobs_after).with_for_update(skip_locked=True) 

606 iterator = self._windowed_lru_digests(q, IndexEntry.accessed_timestamp) 

607 while gathered < n_bytes: 

608 try: 

609 index_entry = next(iterator) 

610 except StopIteration: 

611 break 

612 gathered += index_entry.digest_size_bytes 

613 self.__logger.debug(f"Gathering {gathered} out of {n_bytes} bytes(max)") 

614 entries.append(index_entry) 

615 if not dry_run: 

616 index_entry.deleted = True 

617 

618 return [ 

619 Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes) 

620 for entry in entries 

621 ] 

622 

623 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

624 self.__logger.debug(f"Deleting {len(digests)} digests from the index") 

625 index_table = IndexEntry.__table__ 

626 hashes = [x.hash for x in digests] 

627 

628 # Make sure we don't exceed maximum size of an IN clause 

629 n = self._inclause_limit 

630 hash_chunks = [hashes[i:i + n] for i in range(0, len(hashes), n)] 

631 

632 # We will not raise, rollback, or log on StaleDataErrors. 

633 # These errors occur when we delete fewer rows than we were expecting. 

634 # This is fine, since the missing rows will get deleted eventually. 

635 # When running bulk_deletes concurrently, StaleDataErrors 

636 # occur too often to log. 

637 num_blobs_deleted = 0 

638 with self.session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

639 for chunk in hash_chunks: 

640 # Do not wait for locks when deleting rows. Skip locked rows to 

641 # avoid deadlocks. 

642 stmt = index_table.delete().where( 

643 index_table.c.digest_hash.in_(select( 

644 [index_table.c.digest_hash], index_table.c.digest_hash.in_(chunk) 

645 ).with_for_update(skip_locked=True)) 

646 ) 

647 num_blobs_deleted += session.execute(stmt).rowcount 

648 self.__logger.debug(f"{num_blobs_deleted}/{len(digests)} blobs deleted from the index") 

649 

650 # bulk_delete is typically expected to return the digests that were not deleted, 

651 # but delete only returns the number of rows deleted and not what was/wasn't 

652 # deleted. Getting this info would require extra queries, so assume that 

653 # everything was either deleted or already deleted. Failures will continue to throw 

654 return []