Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql.py: 89.13%

423 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SQLIndex 

18================== 

19 

20A SQL index implementation. This can be pointed to either a remote SQL server 

21or a local SQLite database. 

22 

23""" 

24 

25from contextlib import contextmanager 

26from datetime import datetime, timedelta 

27import logging 

28import os 

29import itertools 

30from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple, Type, Union 

31from io import BytesIO 

32 

33from alembic import command 

34from alembic.config import Config 

35from sqlalchemy import and_, create_engine, delete, event, func, select, text, Column, update 

36from sqlalchemy.sql.elements import BooleanClauseList 

37from sqlalchemy.orm import scoped_session 

38from sqlalchemy.orm.query import Query 

39from sqlalchemy.orm.session import sessionmaker, Session as SessionType 

40from sqlalchemy.orm.exc import StaleDataError 

41from sqlalchemy.exc import SQLAlchemyError, DBAPIError 

42 

43from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

44from buildgrid._protos.google.rpc import code_pb2 

45from buildgrid._protos.google.rpc.status_pb2 import Status 

46from buildgrid.server.sql import sqlutils 

47from buildgrid.server.monitoring import get_monitoring_bus 

48from buildgrid.server.metrics_names import ( 

49 CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME, 

50 CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME, 

51 CAS_INDEX_GET_BLOB_TIME_METRIC_NAME, 

52 CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME, 

53 CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME, 

54 CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME, 

55) 

56from buildgrid.server.metrics_utils import ( 

57 DurationMetric, 

58 generator_method_duration_metric, 

59 publish_counter_metric 

60) 

61from buildgrid.server.persistence.sql.impl import sqlite_on_connect 

62from buildgrid.server.persistence.sql.models import IndexEntry 

63from buildgrid.settings import MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS 

64from buildgrid._exceptions import DatabaseError, StorageFullError, RetriableDatabaseError 

65from buildgrid.utils import read_and_rewind, validate_digest_data 

66from ..storage_abc import StorageABC 

67from .index_abc import IndexABC 

68from .sql_dialect_delegates import PostgreSQLDelegate, SQLiteDelegate 

69 

70 

71# Each dialect has a limit on the number of bind parameters allowed. This 

72# matters because it determines how large we can allow our IN clauses to get. 

73# 

74# SQLite: 1000 https://www.sqlite.org/limits.html#max_variable_number 

75# PostgreSQL: 32767 (Int16.MAX_VALUE) https://www.postgresql.org/docs/9.4/protocol-message-formats.html 

76# 

77# We'll refer to this as the "inlimit" in the code. The inlimits are 

78# set to 75% of the bind parameter limit of the implementation. 

79DIALECT_INLIMIT_MAP = { 

80 "postgresql": 24000, 

81 "sqlite": 750 

82} 

83DEFAULT_INLIMIT = 100 

84 

85DIALECT_DELEGATES = { 

86 "postgresql": PostgreSQLDelegate, 

87 "sqlite": SQLiteDelegate 

88} 

89 

90INLINE_BLOB_SIZE_HARD_MAXIMUM = 1000000000 

91 

92 

93class SQLIndex(IndexABC): 

94 

95 def _get_default_inlimit(self, dialect) -> int: 

96 """ Map a connection string to its inlimit. """ 

97 if dialect not in DIALECT_INLIMIT_MAP: 

98 self.__logger.warning( 

99 f"The SQL dialect [{dialect}] is unsupported, and " 

100 f"errors may occur. Supported dialects are {list(DIALECT_INLIMIT_MAP.keys())}. " 

101 f"Using default inclause limit of {DEFAULT_INLIMIT}.") 

102 return DEFAULT_INLIMIT 

103 

104 return DIALECT_INLIMIT_MAP[dialect] 

105 

106 def __init__(self, storage: StorageABC, connection_string: str, 

107 automigrate: bool=False, window_size: int=1000, 

108 inclause_limit: int=-1, connection_timeout: int=5, 

109 max_inline_blob_size: int=0, **kwargs): 

110 base_argnames = ['fallback_on_get'] 

111 base_args = {} 

112 for arg in base_argnames: 

113 if arg in kwargs: 

114 base_args[arg] = kwargs.pop(arg) 

115 super().__init__(**base_args) 

116 self.__logger = logging.getLogger(__name__) 

117 

118 self._storage = storage 

119 self._instance_name = None 

120 

121 if max_inline_blob_size > INLINE_BLOB_SIZE_HARD_MAXIMUM: 

122 raise ValueError( 

123 f"Max inline blob size is [{max_inline_blob_size}], " 

124 f"but must be less than [{INLINE_BLOB_SIZE_HARD_MAXIMUM}].") 

125 if max_inline_blob_size >= 0: 

126 self._max_inline_blob_size = max_inline_blob_size 

127 else: 

128 raise ValueError( 

129 f"Max inline blob size is [{max_inline_blob_size}], but must be nonnegative.") 

130 

131 # Max # of rows to fetch while iterating over all blobs 

132 # (e.g. in least_recent_digests) 

133 self._all_blobs_window_size = window_size 

134 

135 # Only pass known kwargs to db session 

136 available_options = {'pool_size', 'max_overflow', 'pool_timeout', 

137 'pool_pre_ping', 'pool_recycle'} 

138 kwargs_keys = kwargs.keys() 

139 if kwargs_keys > available_options: 

140 unknown_args = kwargs_keys - available_options 

141 raise TypeError(f"Unknown keyword arguments: [{unknown_args}]") 

142 

143 self._create_sqlalchemy_engine( 

144 connection_string, automigrate, connection_timeout, **kwargs) 

145 

146 self._sql_pool_dispose_helper = sqlutils.SQLPoolDisposeHelper(COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS, 

147 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, 

148 self._engine) 

149 

150 # Dialect-specific initialization 

151 dialect = self._engine.dialect.name 

152 self._dialect_delegate = DIALECT_DELEGATES.get(dialect) 

153 

154 if inclause_limit > 0: 

155 if inclause_limit > window_size: 

156 self.__logger.warning( 

157 f"Configured inclause_limit [{inclause_limit}] " 

158 f"is greater than window_size [{window_size}]") 

159 self._inclause_limit = inclause_limit 

160 else: 

161 # If the inlimit isn't explicitly set, we use a default that 

162 # respects both the window size and the db implementation's 

163 # inlimit. 

164 self._inclause_limit = min( 

165 window_size, 

166 self._get_default_inlimit(dialect)) 

167 self.__logger.debug("SQL index: using default inclause limit " 

168 f"of {self._inclause_limit}") 

169 

170 session_factory = sessionmaker(future=True) 

171 self.Session = scoped_session(session_factory) 

172 self.Session.configure(bind=self._engine) 

173 

174 # Make a test query against the database to ensure the connection is valid 

175 with self.session() as session: 

176 session.query(IndexEntry).first() 

177 

178 def _create_or_migrate_db(self, connection_string: str) -> None: 

179 self.__logger.warning( 

180 "Will attempt migration to latest version if needed.") 

181 

182 config = Config() 

183 

184 config.set_main_option("script_location", 

185 os.path.join( 

186 os.path.dirname(__file__), 

187 "../../../persistence/sql/alembic")) 

188 

189 with self._engine.begin() as connection: 

190 config.attributes['connection'] = connection 

191 command.upgrade(config, "head") 

192 

193 def _create_sqlalchemy_engine(self, connection_string, automigrate, connection_timeout, **kwargs): 

194 self.automigrate = automigrate 

195 

196 # Disallow sqlite in-memory because multi-threaded access to it is 

197 # complex and potentially problematic at best 

198 # ref: https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#threading-pooling-behavior 

199 if sqlutils.is_sqlite_inmemory_connection_string(connection_string): 

200 raise ValueError("Cannot use SQLite in-memory with BuildGrid (connection_string=" 

201 f"[{connection_string}]). Use a file or leave the connection_string " 

202 "empty for a tempfile.") 

203 

204 if connection_timeout is not None: 

205 if "connect_args" not in kwargs: 

206 kwargs["connect_args"] = {} 

207 if sqlutils.is_sqlite_connection_string(connection_string): 

208 kwargs["connect_args"]["timeout"] = connection_timeout 

209 elif sqlutils.is_psycopg2_connection_string(connection_string): 

210 kwargs["connect_args"]["connect_timeout"] = connection_timeout 

211 # Additional postgres specific timeouts 

212 # Additional libpg options 

213 # Note that those timeouts are in milliseconds (so *1000) 

214 kwargs["connect_args"]["options"] = f'-c lock_timeout={connection_timeout * 1000}' 

215 

216 # Only pass the (known) kwargs that have been explicitly set by the user 

217 available_options = set([ 

218 'pool_size', 'max_overflow', 'pool_timeout', 'pool_pre_ping', 

219 'pool_recycle', 'connect_args' 

220 ]) 

221 kwargs_keys = set(kwargs.keys()) 

222 if not kwargs_keys.issubset(available_options): 

223 unknown_options = kwargs_keys - available_options 

224 raise TypeError(f"Unknown keyword arguments: [{unknown_options}]") 

225 

226 self.__logger.debug(f"SQLAlchemy additional kwargs: [{kwargs}]") 

227 

228 self._engine = create_engine(connection_string, echo=False, future=True, **kwargs) 

229 

230 self.__logger.info(f"Using SQL backend for index at connection [{repr(self._engine.url)}] " 

231 f"using additional SQL options {kwargs}") 

232 

233 if self._engine.dialect.name == "sqlite": 

234 event.listen(self._engine, "connect", sqlite_on_connect) 

235 

236 if self.automigrate: 

237 self._create_or_migrate_db(connection_string) 

238 

239 @contextmanager 

240 def session(self, *, sqlite_lock_immediately: bool=False, 

241 exceptions_to_not_raise_on: Optional[List[Type[SQLAlchemyError]]]=None, 

242 exceptions_to_not_rollback_on: Optional[List[Type[SQLAlchemyError]]]=None) -> Any: 

243 """ Context manager for convenience use of sessions. Automatically 

244 commits when the context ends and rolls back failed transactions. 

245 

246 Setting sqlite_lock_immediately will only yield a session once the 

247 SQLite database has been locked for exclusive use. 

248 

249 exceptions_to_not_raise_on is a list of exceptions which the session will not re-raise on. 

250 However, the session will still rollback on these errors. 

251 

252 exceptions_to_not_rollback_on is a list of exceptions which the session manager will not 

253 re-raise or rollback on. Being very careful specifying this option. It should only be used 

254 in cases where you know the exception will not put the database in a bad state. 

255 """ 

256 # If we recently disposed of the SQL pool due to connection issues 

257 # allow for some cooldown period before we attempt more SQL 

258 self._sql_pool_dispose_helper.wait_if_cooldown_in_effect() 

259 

260 # Try to obtain a session 

261 try: 

262 session = self.Session() 

263 if sqlite_lock_immediately and session.bind.name == "sqlite": # type: ignore 

264 session.execute(text("BEGIN IMMEDIATE")) 

265 except Exception as e: 

266 self.__logger.error("Unable to obtain an Index database session.", exc_info=True) 

267 raise DatabaseError("Unable to obtain an Index database session.") from e 

268 

269 # Yield the session and catch exceptions that occur while using it 

270 # to roll-back if needed 

271 try: 

272 yield session 

273 session.commit() 

274 except Exception as e: 

275 transient_dberr = self._sql_pool_dispose_helper.check_dispose_pool(session, e) 

276 if exceptions_to_not_rollback_on and type(e) in exceptions_to_not_rollback_on: 

277 pass 

278 else: 

279 session.rollback() 

280 if transient_dberr: 

281 self.__logger.warning("Rolling back Index database session due to transient database error.", 

282 exc_info=True) 

283 else: 

284 self.__logger.error("Error committing Index database session. Rolling back.", exc_info=True) 

285 if exceptions_to_not_raise_on is None or type(e) not in exceptions_to_not_raise_on: 

286 if transient_dberr: 

287 raise RetriableDatabaseError( 

288 "Database connection was temporarily interrupted, please retry", 

289 timedelta(seconds=COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS)) from e 

290 raise 

291 finally: 

292 session.close() 

293 

294 def setup_grpc(self): 

295 self._storage.setup_grpc() 

296 

297 def has_blob(self, digest: Digest) -> bool: 

298 with self.session() as session: 

299 statement = select(func.count(IndexEntry.digest_hash)).where( 

300 IndexEntry.digest_hash == digest.hash 

301 ) 

302 

303 num_entries = session.execute(statement).scalar() 

304 if num_entries == 1: 

305 return True 

306 elif num_entries < 1: 

307 return False 

308 else: 

309 raise RuntimeError( 

310 f"Multiple results found for blob [{digest}]. " 

311 "The index is in a bad state.") 

312 

313 def _fetch_one_entry(self, digest: Digest, session: SessionType) -> Optional[IndexEntry]: 

314 """ Fetch an IndexEntry by its corresponding digest. Returns None if not found. """ 

315 statement = select(IndexEntry).where( 

316 IndexEntry.digest_hash == digest.hash 

317 ) 

318 

319 result = session.execute(statement) 

320 return result.scalar() # Raises MultipleResultsFound if multiple rows found 

321 

322 def _backfill_digest_and_return_reader(self, digest: Digest, session: SessionType) -> Optional[BinaryIO]: 

323 blob_read_head = self._storage.get_blob(digest) 

324 if blob_read_head: 

325 blob_data = read_and_rewind(blob_read_head) 

326 self._save_digests_to_index([(digest, blob_data)], session) 

327 return blob_read_head 

328 else: 

329 self.__logger.warning(f"Unable to find blob [{digest.hash}] in storage") 

330 return None 

331 

332 def _get_small_blob(self, digest: Digest) -> Optional[BinaryIO]: 

333 blob_read_head = None 

334 with self.session() as session: 

335 entry = self._fetch_one_entry(digest, session) 

336 

337 if entry is None: 

338 # No blob was found 

339 # If fallback is enabled, try to fetch the blob and add it to the index 

340 if self._fallback_on_get: 

341 return self._backfill_digest_and_return_reader(digest, session) 

342 

343 return None 

344 else: 

345 # Found the entry, now try to find the blob inline 

346 self._update_blob_timestamp(digest, session) 

347 

348 if entry.inline_blob is not None: 

349 return BytesIO(entry.inline_blob) 

350 else: 

351 blob_read_head = self._backfill_digest_and_return_reader(digest, session) 

352 if blob_read_head is None: 

353 self.__logger.warning(f"Blob [{digest.hash}] was indexed but not in storage") 

354 

355 return blob_read_head 

356 

357 def _update_blob_timestamp(self, digest: Digest, session: SessionType, 

358 sync_mode: Union[bool, str]=False) -> int: 

359 """ Refresh a blob's timestamp and return the number of rows updated """ 

360 statement = update(IndexEntry).where( 

361 IndexEntry.digest_hash == digest.hash 

362 ).values(accessed_timestamp=datetime.utcnow()) 

363 

364 options = {"synchronize_session": sync_mode} 

365 num_rows_updated = session.execute(statement, execution_options=options).rowcount # type: ignore 

366 return num_rows_updated 

367 

368 def _get_large_blob(self, digest: Digest) -> Optional[BinaryIO]: 

369 with self.session() as session: 

370 num_rows_updated = self._update_blob_timestamp(digest, session) 

371 

372 if num_rows_updated > 1: 

373 raise RuntimeError( 

374 f"Multiple rows found for blob [{digest.hash}]. " 

375 "The index is in a bad state.") 

376 

377 if num_rows_updated == 0: 

378 # If fallback is enabled, try to fetch the blob and add it 

379 # to the index 

380 if self._fallback_on_get: 

381 blob = self._storage.get_blob(digest) 

382 if blob: 

383 with self.session() as session: 

384 self._save_digests_to_index([(digest, None)], session) 

385 return blob 

386 else: 

387 # Otherwise just skip the storage entirely 

388 return None 

389 else: 

390 # If exactly one row in the index was updated, grab the blob 

391 blob = self._storage.get_blob(digest) 

392 if not blob: 

393 self.__logger.warning(f"Unable to find blob [{digest.hash}] in storage") 

394 return blob 

395 

396 @DurationMetric(CAS_INDEX_GET_BLOB_TIME_METRIC_NAME, instanced=True) 

397 def get_blob(self, digest: Digest) -> Optional[BinaryIO]: 

398 if digest.size_bytes <= self._max_inline_blob_size: 

399 return self._get_small_blob(digest) 

400 else: 

401 return self._get_large_blob(digest) 

402 

403 def delete_blob(self, digest: Digest) -> None: 

404 statement = delete(IndexEntry).where( 

405 IndexEntry.digest_hash == digest.hash 

406 ) 

407 options = {"synchronize_session": False} 

408 

409 with self.session() as session: 

410 session.execute(statement, execution_options=options) 

411 

412 self._storage.delete_blob(digest) 

413 

414 def begin_write(self, digest: Digest) -> BinaryIO: 

415 if digest.size_bytes <= self._max_inline_blob_size: 

416 return BytesIO() 

417 else: 

418 return self._storage.begin_write(digest) 

419 

420 def commit_write(self, digest: Digest, write_session: BinaryIO) -> None: 

421 # pylint: disable=no-name-in-module,import-outside-toplevel 

422 inline_blob = None 

423 if digest.size_bytes > self._max_inline_blob_size: 

424 self._storage.commit_write(digest, write_session) 

425 else: 

426 write_session.seek(0) 

427 inline_blob = write_session.read() 

428 try: 

429 with self.session() as session: 

430 self._save_digests_to_index([(digest, inline_blob)], session) 

431 except DBAPIError as error: 

432 # Error has pgcode attribute (Postgres only) 

433 if hasattr(error.orig, 'pgcode'): 

434 # imported here to avoid global dependency on psycopg2 

435 from psycopg2.errors import DiskFull, OutOfMemory # type: ignore 

436 # 53100 == DiskFull && 53200 == OutOfMemory 

437 if error.orig.pgerror in [DiskFull, OutOfMemory]: 

438 raise StorageFullError(f"Postgres Error: {error.orig.pgcode}") from error 

439 raise error 

440 

441 def _partitioned_hashes(self, digests: Sequence[Digest]) -> Iterator[Iterator[str]]: 

442 """ Given a long list of digests, split it into parts no larger than 

443 _inclause_limit and yield the hashes in each part. 

444 """ 

445 for part_start in range(0, len(digests), self._inclause_limit): 

446 part_end = min(len(digests), part_start + self._inclause_limit) 

447 part_digests = itertools.islice(digests, part_start, part_end) 

448 yield map(lambda digest: digest.hash, part_digests) 

449 

450 @generator_method_duration_metric(CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME) 

451 def _bulk_select_digests(self, digests: Sequence[Digest], fetch_blobs: bool = False) -> Iterator[IndexEntry]: 

452 """ Generator that selects all rows matching a digest list. 

453 

454 SQLAlchemy Core is used for this because the ORM has problems with 

455 large numbers of bind variables for WHERE IN clauses. 

456 

457 We only select on the digest hash (not hash and size) to allow for 

458 index-only queries on db backends that support them. 

459 """ 

460 index_table = IndexEntry.__table__ 

461 with self.session() as session: 

462 columns = [index_table.c.digest_hash] 

463 if fetch_blobs: 

464 columns.append(index_table.c.inline_blob) 

465 for part in self._partitioned_hashes(digests): 

466 stmt = select( 

467 columns 

468 ).where( 

469 index_table.c.digest_hash.in_(part) 

470 ) 

471 entries = session.execute(stmt) 

472 yield from entries 

473 

474 @DurationMetric(CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME, instanced=True) 

475 def _bulk_refresh_timestamps(self, digests: Sequence[Digest], update_time: Optional[datetime]=None): 

476 """ Refresh all timestamps of the input digests. 

477 

478 SQLAlchemy Core is used for this because the ORM is not suitable for 

479 bulk inserts and updates. 

480 

481 https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow 

482 """ 

483 index_table = IndexEntry.__table__ 

484 # If a timestamp was passed in, use it 

485 if update_time: 

486 timestamp = update_time 

487 else: 

488 timestamp = datetime.utcnow() 

489 with self.session() as session: 

490 for part in self._partitioned_hashes(digests): 

491 stmt = index_table.update().where( 

492 index_table.c.digest_hash.in_( 

493 select([index_table.c.digest_hash], index_table.c.digest_hash.in_(part) 

494 ).with_for_update(skip_locked=True)) 

495 ).values( 

496 accessed_timestamp=timestamp 

497 ) 

498 session.execute(stmt) 

499 

500 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

501 entries = self._bulk_select_digests(digests) 

502 

503 found_hashes = {entry.digest_hash for entry in entries} 

504 

505 # Update all timestamps 

506 self._bulk_refresh_timestamps(digests) 

507 

508 return [digest for digest in digests if digest.hash not in found_hashes] 

509 

510 @DurationMetric(CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME, instanced=True) 

511 def _save_digests_to_index(self, 

512 digest_blob_pairs: List[Tuple[Digest, Optional[bytes]]], 

513 session: SessionType) -> None: 

514 """ Helper to persist a list of digest/blob pairs to the index. 

515 

516 Any digests present are updated, and new digests are inserted along with their inline blobs (if provided). 

517 Only blobs with size less than or equal to the max_inline_blob_size are inserted directly into the index. 

518 """ 

519 if not digest_blob_pairs: 

520 return 

521 

522 if self._dialect_delegate: 

523 try: 

524 self._dialect_delegate._save_digests_to_index( # type: ignore 

525 digest_blob_pairs, session, self._max_inline_blob_size) 

526 return 

527 except AttributeError: 

528 pass 

529 

530 update_time = datetime.utcnow() 

531 # Figure out which digests we can just update 

532 digests = [digest for (digest, blob) in digest_blob_pairs] 

533 entries = self._bulk_select_digests(digests) 

534 # Map digests to new entries 

535 entries_not_present = { 

536 digest.hash: { 

537 'digest_hash': digest.hash, 

538 'digest_size_bytes': digest.size_bytes, 

539 'accessed_timestamp': update_time, 

540 'inline_blob': (blob if digest.size_bytes <= self._max_inline_blob_size else None) 

541 } 

542 for (digest, blob) in digest_blob_pairs 

543 } 

544 

545 entries_present = {} 

546 for entry in entries: 

547 entries_present[entry.digest_hash] = entries_not_present[entry.digest_hash] 

548 del entries_not_present[entry.digest_hash] 

549 

550 if entries_not_present: 

551 session.bulk_insert_mappings(IndexEntry, entries_not_present.values()) # type: ignore 

552 if entries_present: 

553 session.bulk_update_mappings(IndexEntry, entries_present.values()) # type: ignore 

554 

555 def bulk_update_blobs(self, digest_blob_pairs: List[Tuple[Digest, bytes]]) -> List[Status]: 

556 """ Implement the StorageABC's bulk_update_blobs method. 

557 

558 The StorageABC interface takes in a list of digest/blob pairs and 

559 returns a list of results. The list of results MUST be ordered to 

560 correspond with the order of the input list. """ 

561 pairs_to_store = [] 

562 result_map = {} 

563 

564 # For each blob, determine whether to store it in the backing storage or inline it 

565 for (digest, blob) in digest_blob_pairs: 

566 if digest.size_bytes > self._max_inline_blob_size: 

567 pairs_to_store.append((digest, blob)) 

568 else: 

569 if validate_digest_data(digest, blob): 

570 result_map[digest.hash] = Status(code=code_pb2.OK) 

571 else: 

572 result_map[digest.hash] = Status( 

573 code=code_pb2.INVALID_ARGUMENT, 

574 message="Data doesn't match hash" 

575 ) 

576 backup_results = self._storage.bulk_update_blobs(pairs_to_store) 

577 

578 for (digest, blob), result in zip(pairs_to_store, backup_results): 

579 if digest.hash in result_map: 

580 # ERROR: blob was both inlined and backed up 

581 raise RuntimeError( 

582 "Blob was both inlined and backed up.") 

583 result_map[digest.hash] = result 

584 

585 # Generate the final list of results 

586 pairs_to_inline = [] 

587 results = [] 

588 for (digest, blob) in digest_blob_pairs: 

589 status = result_map.get( 

590 digest.hash, 

591 Status(code=code_pb2.UNKNOWN, message="SQL Index: unable to determine the status of this blob") 

592 ) 

593 results.append(status) 

594 if status.code == code_pb2.OK: 

595 pairs_to_inline.append((digest, blob)) 

596 

597 with self.session() as session: 

598 self._save_digests_to_index(pairs_to_inline, session) 

599 

600 return results 

601 

602 def _bulk_read_blobs_with_fallback(self, digests: List[Digest]) -> Dict[str, BinaryIO]: 

603 hash_to_digest = {digest.hash: digest for digest in digests} 

604 results: Dict[str, BinaryIO] = {} 

605 # First fetch inlined blobs 

606 entries = self._bulk_select_digests(digests, fetch_blobs=True) 

607 for index_entry in entries: 

608 if index_entry.inline_blob is not None: 

609 results[index_entry.digest_hash] = BytesIO(index_entry.inline_blob) 

610 hash_to_digest.pop(index_entry.digest_hash) 

611 

612 # Fetch everything that wasn't inlined 

613 fetched_digests = self._storage.bulk_read_blobs(hash_to_digest.values()) 

614 

615 # Save everything that was fetched, inlining the blobs if they're small enough 

616 digest_pairs_to_save = [] 

617 for digest_hash, blob_read_head in fetched_digests.items(): 

618 if blob_read_head is not None: 

619 digest = hash_to_digest[digest_hash] 

620 blob_data = None 

621 if digest.size_bytes <= self._max_inline_blob_size: 

622 blob_data = read_and_rewind(blob_read_head) 

623 digest_pairs_to_save.append((digest, blob_data)) 

624 results[digest_hash] = blob_read_head 

625 

626 with self.session() as session: 

627 self._save_digests_to_index(digest_pairs_to_save, session) 

628 

629 return results 

630 

631 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, BinaryIO]: 

632 if self._fallback_on_get: 

633 return self._bulk_read_blobs_with_fallback(digests) 

634 

635 # If fallback is disabled, query the index first and only 

636 # query the storage for blobs that weren't inlined there 

637 

638 hash_to_digest = {digest.hash: digest for digest in digests} # hash -> digest map 

639 results: Dict[str, BinaryIO] = {} # The final list of results (return value) 

640 digests_to_fetch = [] # Digests that need to be fetched from storage 

641 digest_pairs_to_save = [] # Digests that need to be updated in the index 

642 

643 # Fetch all of the digests in the database 

644 # Anything that wasn't already inlined needs to be fetched 

645 entries = self._bulk_select_digests(digests, fetch_blobs=True) 

646 for index_entry in entries: 

647 digest = hash_to_digest[index_entry.digest_hash] 

648 if index_entry.inline_blob is not None: 

649 results[index_entry.digest_hash] = BytesIO(index_entry.inline_blob) 

650 digest_pairs_to_save.append((digest, index_entry.inline_blob)) 

651 else: 

652 digests_to_fetch.append(digest) 

653 

654 fetched_digests = {} 

655 if digests_to_fetch: 

656 fetched_digests = self._storage.bulk_read_blobs(digests_to_fetch) 

657 

658 # Generate the list of inputs for _save_digests_to_index 

659 # 

660 # We only need to send blob data for small blobs fetched 

661 # from the storage since everything else is either too 

662 # big or already inlined 

663 for digest in digests_to_fetch: 

664 blob_data = None 

665 if (digest.size_bytes <= self._max_inline_blob_size and 

666 fetched_digests.get(digest.hash) is not None): 

667 blob_data = read_and_rewind(fetched_digests[digest.hash]) 

668 digest_pairs_to_save.append((digest, blob_data)) 

669 

670 # Update timestamps and inline blobs 

671 with self.session() as session: 

672 self._save_digests_to_index(digest_pairs_to_save, session) 

673 

674 results.update(fetched_digests) 

675 return results 

676 

677 def _column_windows(self, session: SessionType, column: Column) -> Iterator[BooleanClauseList]: 

678 """ Adapted from the sqlalchemy WindowedRangeQuery recipe. 

679 https://github.com/sqlalchemy/sqlalchemy/wiki/WindowedRangeQuery 

680 

681 This method breaks the timestamp range into windows and yields 

682 the borders of these windows to the callee. For example, the borders 

683 yielded by this might look something like 

684 ('2019-10-08 18:25:03.699863', '2019-10-08 18:25:03.751018') 

685 ('2019-10-08 18:25:03.751018', '2019-10-08 18:25:03.807867') 

686 ('2019-10-08 18:25:03.807867', '2019-10-08 18:25:03.862192') 

687 ('2019-10-08 18:25:03.862192',) 

688 

689 _windowed_lru_digests uses these borders to form WHERE clauses for its 

690 SELECTs. In doing so, we make sure to repeatedly query the database for 

691 live updates, striking a balance between loading the entire resultset 

692 into memory and querying each row individually, both of which are 

693 inefficient in the context of a large index. 

694 

695 The window size is a parameter and can be configured. A larger window 

696 size will yield better performance (fewer SQL queries) at the cost of 

697 memory (holding on to the results of the query) and accuracy (blobs 

698 may get updated while you're working on them), and vice versa for a 

699 smaller window size. 

700 """ 

701 

702 def int_for_range(start_id: Any, end_id: Any) -> BooleanClauseList: 

703 if end_id: 

704 return and_( 

705 column >= start_id, 

706 column < end_id 

707 ) 

708 else: 

709 return column >= start_id 

710 

711 q = session.query( 

712 column, 

713 func.row_number() 

714 .over(order_by=column) 

715 .label('rownum') 

716 ).from_self(column) 

717 

718 if self._all_blobs_window_size > 1: 

719 q = q.filter(text(f"rownum % {self._all_blobs_window_size}=1")) 

720 

721 intervals = [id for id, in q] 

722 

723 while intervals: 

724 start = intervals.pop(0) 

725 if intervals: 

726 end = intervals[0] 

727 else: 

728 end = None 

729 yield int_for_range(start, end) 

730 

731 def _windowed_lru_digests(self, q: Query, column: Column) -> Iterator[IndexEntry]: 

732 """ Generate a query for each window produced by _column_windows 

733 and yield the results one by one. 

734 """ 

735 for whereclause in self._column_windows(q.session, column): # type: ignore 

736 window = q.filter(whereclause).order_by(column.asc()) 

737 yield from window 

738 

739 def least_recent_digests(self) -> Iterator[Digest]: 

740 with self.session() as session: 

741 q = session.query(IndexEntry) 

742 for index_entry in self._windowed_lru_digests(q, IndexEntry.accessed_timestamp): 

743 yield Digest(hash=index_entry.digest_hash, size_bytes=index_entry.digest_size_bytes) 

744 

745 @DurationMetric(CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME) 

746 def get_total_size(self) -> int: 

747 statement = select(func.sum(IndexEntry.digest_size_bytes)) 

748 with self.session() as session: 

749 return session.execute(statement).scalar() 

750 

751 def mark_n_bytes_as_deleted(self, n_bytes: int, dry_run: bool=False, 

752 protect_blobs_after: Optional[datetime]=None) -> List[Digest]: 

753 # pylint: disable=singleton-comparison 

754 if protect_blobs_after is None: 

755 protect_blobs_after = datetime.utcnow() 

756 gathered = 0 

757 with self.session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

758 size_query = session.query(func.sum(IndexEntry.digest_size_bytes)) 

759 premarked_size = size_query.filter_by(deleted=True).scalar() 

760 if premarked_size is None: 

761 premarked_size = 0 

762 

763 q = session.query(IndexEntry) 

764 entries = q.filter_by(deleted=True).order_by(IndexEntry.accessed_timestamp).all() 

765 monitoring_bus = get_monitoring_bus() 

766 if monitoring_bus.is_enabled: 

767 metadata = {} 

768 if self._instance_name: 

769 metadata["instance-name"] = self._instance_name 

770 publish_counter_metric( 

771 CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME, 

772 len(entries), 

773 metadata 

774 ) 

775 

776 if premarked_size < n_bytes: 

777 gathered = premarked_size 

778 q = q.filter(IndexEntry.deleted == False, IndexEntry.accessed_timestamp < 

779 protect_blobs_after).with_for_update(skip_locked=True) 

780 iterator = self._windowed_lru_digests(q, IndexEntry.accessed_timestamp) 

781 while gathered < n_bytes: 

782 try: 

783 index_entry = next(iterator) 

784 except StopIteration: 

785 break 

786 gathered += index_entry.digest_size_bytes # type: ignore 

787 self.__logger.debug(f"Gathering {gathered} out of {n_bytes} bytes(max)") 

788 entries.append(index_entry) 

789 if not dry_run: 

790 index_entry.deleted = True 

791 

792 return [ 

793 Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes) 

794 for entry in entries 

795 ] 

796 

797 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

798 self.__logger.debug(f"Deleting {len(digests)} digests from the index") 

799 index_table = IndexEntry.__table__ 

800 hashes = [x.hash for x in digests] 

801 

802 # Make sure we don't exceed maximum size of an IN clause 

803 n = self._inclause_limit 

804 hash_chunks = [hashes[i:i + n] for i in range(0, len(hashes), n)] 

805 

806 # We will not raise, rollback, or log on StaleDataErrors. 

807 # These errors occur when we delete fewer rows than we were expecting. 

808 # This is fine, since the missing rows will get deleted eventually. 

809 # When running bulk_deletes concurrently, StaleDataErrors 

810 # occur too often to log. 

811 num_blobs_deleted = 0 

812 with self.session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

813 for chunk in hash_chunks: 

814 # Do not wait for locks when deleting rows. Skip locked rows to 

815 # avoid deadlocks. 

816 stmt = index_table.delete().where( 

817 index_table.c.digest_hash.in_(select( 

818 [index_table.c.digest_hash], index_table.c.digest_hash.in_(chunk) 

819 ).with_for_update(skip_locked=True)) 

820 ) 

821 num_blobs_deleted += session.execute(stmt).rowcount 

822 self.__logger.debug(f"{num_blobs_deleted}/{len(digests)} blobs deleted from the index") 

823 

824 # bulk_delete is typically expected to return the digests that were not deleted, 

825 # but delete only returns the number of rows deleted and not what was/wasn't 

826 # deleted. Getting this info would require extra queries, so assume that 

827 # everything was either deleted or already deleted. Failures will continue to throw 

828 return []