Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql.py: 89.13%
423 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17SQLIndex
18==================
20A SQL index implementation. This can be pointed to either a remote SQL server
21or a local SQLite database.
23"""
25from contextlib import contextmanager
26from datetime import datetime, timedelta
27import logging
28import os
29import itertools
30from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Sequence, Tuple, Type, Union
31from io import BytesIO
33from alembic import command
34from alembic.config import Config
35from sqlalchemy import and_, create_engine, delete, event, func, select, text, Column, update
36from sqlalchemy.sql.elements import BooleanClauseList
37from sqlalchemy.orm import scoped_session
38from sqlalchemy.orm.query import Query
39from sqlalchemy.orm.session import sessionmaker, Session as SessionType
40from sqlalchemy.orm.exc import StaleDataError
41from sqlalchemy.exc import SQLAlchemyError, DBAPIError
43from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
44from buildgrid._protos.google.rpc import code_pb2
45from buildgrid._protos.google.rpc.status_pb2 import Status
46from buildgrid.server.sql import sqlutils
47from buildgrid.server.monitoring import get_monitoring_bus
48from buildgrid.server.metrics_names import (
49 CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME,
50 CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME,
51 CAS_INDEX_GET_BLOB_TIME_METRIC_NAME,
52 CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME,
53 CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME,
54 CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME,
55)
56from buildgrid.server.metrics_utils import (
57 DurationMetric,
58 generator_method_duration_metric,
59 publish_counter_metric
60)
61from buildgrid.server.persistence.sql.impl import sqlite_on_connect
62from buildgrid.server.persistence.sql.models import IndexEntry
63from buildgrid.settings import MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS
64from buildgrid._exceptions import DatabaseError, StorageFullError, RetriableDatabaseError
65from buildgrid.utils import read_and_rewind, validate_digest_data
66from ..storage_abc import StorageABC
67from .index_abc import IndexABC
68from .sql_dialect_delegates import PostgreSQLDelegate, SQLiteDelegate
71# Each dialect has a limit on the number of bind parameters allowed. This
72# matters because it determines how large we can allow our IN clauses to get.
73#
74# SQLite: 1000 https://www.sqlite.org/limits.html#max_variable_number
75# PostgreSQL: 32767 (Int16.MAX_VALUE) https://www.postgresql.org/docs/9.4/protocol-message-formats.html
76#
77# We'll refer to this as the "inlimit" in the code. The inlimits are
78# set to 75% of the bind parameter limit of the implementation.
79DIALECT_INLIMIT_MAP = {
80 "postgresql": 24000,
81 "sqlite": 750
82}
83DEFAULT_INLIMIT = 100
85DIALECT_DELEGATES = {
86 "postgresql": PostgreSQLDelegate,
87 "sqlite": SQLiteDelegate
88}
90INLINE_BLOB_SIZE_HARD_MAXIMUM = 1000000000
93class SQLIndex(IndexABC):
95 def _get_default_inlimit(self, dialect) -> int:
96 """ Map a connection string to its inlimit. """
97 if dialect not in DIALECT_INLIMIT_MAP:
98 self.__logger.warning(
99 f"The SQL dialect [{dialect}] is unsupported, and "
100 f"errors may occur. Supported dialects are {list(DIALECT_INLIMIT_MAP.keys())}. "
101 f"Using default inclause limit of {DEFAULT_INLIMIT}.")
102 return DEFAULT_INLIMIT
104 return DIALECT_INLIMIT_MAP[dialect]
106 def __init__(self, storage: StorageABC, connection_string: str,
107 automigrate: bool=False, window_size: int=1000,
108 inclause_limit: int=-1, connection_timeout: int=5,
109 max_inline_blob_size: int=0, **kwargs):
110 base_argnames = ['fallback_on_get']
111 base_args = {}
112 for arg in base_argnames:
113 if arg in kwargs:
114 base_args[arg] = kwargs.pop(arg)
115 super().__init__(**base_args)
116 self.__logger = logging.getLogger(__name__)
118 self._storage = storage
119 self._instance_name = None
121 if max_inline_blob_size > INLINE_BLOB_SIZE_HARD_MAXIMUM:
122 raise ValueError(
123 f"Max inline blob size is [{max_inline_blob_size}], "
124 f"but must be less than [{INLINE_BLOB_SIZE_HARD_MAXIMUM}].")
125 if max_inline_blob_size >= 0:
126 self._max_inline_blob_size = max_inline_blob_size
127 else:
128 raise ValueError(
129 f"Max inline blob size is [{max_inline_blob_size}], but must be nonnegative.")
131 # Max # of rows to fetch while iterating over all blobs
132 # (e.g. in least_recent_digests)
133 self._all_blobs_window_size = window_size
135 # Only pass known kwargs to db session
136 available_options = {'pool_size', 'max_overflow', 'pool_timeout',
137 'pool_pre_ping', 'pool_recycle'}
138 kwargs_keys = kwargs.keys()
139 if kwargs_keys > available_options:
140 unknown_args = kwargs_keys - available_options
141 raise TypeError(f"Unknown keyword arguments: [{unknown_args}]")
143 self._create_sqlalchemy_engine(
144 connection_string, automigrate, connection_timeout, **kwargs)
146 self._sql_pool_dispose_helper = sqlutils.SQLPoolDisposeHelper(COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS,
147 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES,
148 self._engine)
150 # Dialect-specific initialization
151 dialect = self._engine.dialect.name
152 self._dialect_delegate = DIALECT_DELEGATES.get(dialect)
154 if inclause_limit > 0:
155 if inclause_limit > window_size:
156 self.__logger.warning(
157 f"Configured inclause_limit [{inclause_limit}] "
158 f"is greater than window_size [{window_size}]")
159 self._inclause_limit = inclause_limit
160 else:
161 # If the inlimit isn't explicitly set, we use a default that
162 # respects both the window size and the db implementation's
163 # inlimit.
164 self._inclause_limit = min(
165 window_size,
166 self._get_default_inlimit(dialect))
167 self.__logger.debug("SQL index: using default inclause limit "
168 f"of {self._inclause_limit}")
170 session_factory = sessionmaker(future=True)
171 self.Session = scoped_session(session_factory)
172 self.Session.configure(bind=self._engine)
174 # Make a test query against the database to ensure the connection is valid
175 with self.session() as session:
176 session.query(IndexEntry).first()
178 def _create_or_migrate_db(self, connection_string: str) -> None:
179 self.__logger.warning(
180 "Will attempt migration to latest version if needed.")
182 config = Config()
184 config.set_main_option("script_location",
185 os.path.join(
186 os.path.dirname(__file__),
187 "../../../persistence/sql/alembic"))
189 with self._engine.begin() as connection:
190 config.attributes['connection'] = connection
191 command.upgrade(config, "head")
193 def _create_sqlalchemy_engine(self, connection_string, automigrate, connection_timeout, **kwargs):
194 self.automigrate = automigrate
196 # Disallow sqlite in-memory because multi-threaded access to it is
197 # complex and potentially problematic at best
198 # ref: https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#threading-pooling-behavior
199 if sqlutils.is_sqlite_inmemory_connection_string(connection_string):
200 raise ValueError("Cannot use SQLite in-memory with BuildGrid (connection_string="
201 f"[{connection_string}]). Use a file or leave the connection_string "
202 "empty for a tempfile.")
204 if connection_timeout is not None:
205 if "connect_args" not in kwargs:
206 kwargs["connect_args"] = {}
207 if sqlutils.is_sqlite_connection_string(connection_string):
208 kwargs["connect_args"]["timeout"] = connection_timeout
209 elif sqlutils.is_psycopg2_connection_string(connection_string):
210 kwargs["connect_args"]["connect_timeout"] = connection_timeout
211 # Additional postgres specific timeouts
212 # Additional libpg options
213 # Note that those timeouts are in milliseconds (so *1000)
214 kwargs["connect_args"]["options"] = f'-c lock_timeout={connection_timeout * 1000}'
216 # Only pass the (known) kwargs that have been explicitly set by the user
217 available_options = set([
218 'pool_size', 'max_overflow', 'pool_timeout', 'pool_pre_ping',
219 'pool_recycle', 'connect_args'
220 ])
221 kwargs_keys = set(kwargs.keys())
222 if not kwargs_keys.issubset(available_options):
223 unknown_options = kwargs_keys - available_options
224 raise TypeError(f"Unknown keyword arguments: [{unknown_options}]")
226 self.__logger.debug(f"SQLAlchemy additional kwargs: [{kwargs}]")
228 self._engine = create_engine(connection_string, echo=False, future=True, **kwargs)
230 self.__logger.info(f"Using SQL backend for index at connection [{repr(self._engine.url)}] "
231 f"using additional SQL options {kwargs}")
233 if self._engine.dialect.name == "sqlite":
234 event.listen(self._engine, "connect", sqlite_on_connect)
236 if self.automigrate:
237 self._create_or_migrate_db(connection_string)
239 @contextmanager
240 def session(self, *, sqlite_lock_immediately: bool=False,
241 exceptions_to_not_raise_on: Optional[List[Type[SQLAlchemyError]]]=None,
242 exceptions_to_not_rollback_on: Optional[List[Type[SQLAlchemyError]]]=None) -> Any:
243 """ Context manager for convenience use of sessions. Automatically
244 commits when the context ends and rolls back failed transactions.
246 Setting sqlite_lock_immediately will only yield a session once the
247 SQLite database has been locked for exclusive use.
249 exceptions_to_not_raise_on is a list of exceptions which the session will not re-raise on.
250 However, the session will still rollback on these errors.
252 exceptions_to_not_rollback_on is a list of exceptions which the session manager will not
253 re-raise or rollback on. Being very careful specifying this option. It should only be used
254 in cases where you know the exception will not put the database in a bad state.
255 """
256 # If we recently disposed of the SQL pool due to connection issues
257 # allow for some cooldown period before we attempt more SQL
258 self._sql_pool_dispose_helper.wait_if_cooldown_in_effect()
260 # Try to obtain a session
261 try:
262 session = self.Session()
263 if sqlite_lock_immediately and session.bind.name == "sqlite": # type: ignore
264 session.execute(text("BEGIN IMMEDIATE"))
265 except Exception as e:
266 self.__logger.error("Unable to obtain an Index database session.", exc_info=True)
267 raise DatabaseError("Unable to obtain an Index database session.") from e
269 # Yield the session and catch exceptions that occur while using it
270 # to roll-back if needed
271 try:
272 yield session
273 session.commit()
274 except Exception as e:
275 transient_dberr = self._sql_pool_dispose_helper.check_dispose_pool(session, e)
276 if exceptions_to_not_rollback_on and type(e) in exceptions_to_not_rollback_on:
277 pass
278 else:
279 session.rollback()
280 if transient_dberr:
281 self.__logger.warning("Rolling back Index database session due to transient database error.",
282 exc_info=True)
283 else:
284 self.__logger.error("Error committing Index database session. Rolling back.", exc_info=True)
285 if exceptions_to_not_raise_on is None or type(e) not in exceptions_to_not_raise_on:
286 if transient_dberr:
287 raise RetriableDatabaseError(
288 "Database connection was temporarily interrupted, please retry",
289 timedelta(seconds=COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS)) from e
290 raise
291 finally:
292 session.close()
294 def setup_grpc(self):
295 self._storage.setup_grpc()
297 def has_blob(self, digest: Digest) -> bool:
298 with self.session() as session:
299 statement = select(func.count(IndexEntry.digest_hash)).where(
300 IndexEntry.digest_hash == digest.hash
301 )
303 num_entries = session.execute(statement).scalar()
304 if num_entries == 1:
305 return True
306 elif num_entries < 1:
307 return False
308 else:
309 raise RuntimeError(
310 f"Multiple results found for blob [{digest}]. "
311 "The index is in a bad state.")
313 def _fetch_one_entry(self, digest: Digest, session: SessionType) -> Optional[IndexEntry]:
314 """ Fetch an IndexEntry by its corresponding digest. Returns None if not found. """
315 statement = select(IndexEntry).where(
316 IndexEntry.digest_hash == digest.hash
317 )
319 result = session.execute(statement)
320 return result.scalar() # Raises MultipleResultsFound if multiple rows found
322 def _backfill_digest_and_return_reader(self, digest: Digest, session: SessionType) -> Optional[BinaryIO]:
323 blob_read_head = self._storage.get_blob(digest)
324 if blob_read_head:
325 blob_data = read_and_rewind(blob_read_head)
326 self._save_digests_to_index([(digest, blob_data)], session)
327 return blob_read_head
328 else:
329 self.__logger.warning(f"Unable to find blob [{digest.hash}] in storage")
330 return None
332 def _get_small_blob(self, digest: Digest) -> Optional[BinaryIO]:
333 blob_read_head = None
334 with self.session() as session:
335 entry = self._fetch_one_entry(digest, session)
337 if entry is None:
338 # No blob was found
339 # If fallback is enabled, try to fetch the blob and add it to the index
340 if self._fallback_on_get:
341 return self._backfill_digest_and_return_reader(digest, session)
343 return None
344 else:
345 # Found the entry, now try to find the blob inline
346 self._update_blob_timestamp(digest, session)
348 if entry.inline_blob is not None:
349 return BytesIO(entry.inline_blob)
350 else:
351 blob_read_head = self._backfill_digest_and_return_reader(digest, session)
352 if blob_read_head is None:
353 self.__logger.warning(f"Blob [{digest.hash}] was indexed but not in storage")
355 return blob_read_head
357 def _update_blob_timestamp(self, digest: Digest, session: SessionType,
358 sync_mode: Union[bool, str]=False) -> int:
359 """ Refresh a blob's timestamp and return the number of rows updated """
360 statement = update(IndexEntry).where(
361 IndexEntry.digest_hash == digest.hash
362 ).values(accessed_timestamp=datetime.utcnow())
364 options = {"synchronize_session": sync_mode}
365 num_rows_updated = session.execute(statement, execution_options=options).rowcount # type: ignore
366 return num_rows_updated
368 def _get_large_blob(self, digest: Digest) -> Optional[BinaryIO]:
369 with self.session() as session:
370 num_rows_updated = self._update_blob_timestamp(digest, session)
372 if num_rows_updated > 1:
373 raise RuntimeError(
374 f"Multiple rows found for blob [{digest.hash}]. "
375 "The index is in a bad state.")
377 if num_rows_updated == 0:
378 # If fallback is enabled, try to fetch the blob and add it
379 # to the index
380 if self._fallback_on_get:
381 blob = self._storage.get_blob(digest)
382 if blob:
383 with self.session() as session:
384 self._save_digests_to_index([(digest, None)], session)
385 return blob
386 else:
387 # Otherwise just skip the storage entirely
388 return None
389 else:
390 # If exactly one row in the index was updated, grab the blob
391 blob = self._storage.get_blob(digest)
392 if not blob:
393 self.__logger.warning(f"Unable to find blob [{digest.hash}] in storage")
394 return blob
396 @DurationMetric(CAS_INDEX_GET_BLOB_TIME_METRIC_NAME, instanced=True)
397 def get_blob(self, digest: Digest) -> Optional[BinaryIO]:
398 if digest.size_bytes <= self._max_inline_blob_size:
399 return self._get_small_blob(digest)
400 else:
401 return self._get_large_blob(digest)
403 def delete_blob(self, digest: Digest) -> None:
404 statement = delete(IndexEntry).where(
405 IndexEntry.digest_hash == digest.hash
406 )
407 options = {"synchronize_session": False}
409 with self.session() as session:
410 session.execute(statement, execution_options=options)
412 self._storage.delete_blob(digest)
414 def begin_write(self, digest: Digest) -> BinaryIO:
415 if digest.size_bytes <= self._max_inline_blob_size:
416 return BytesIO()
417 else:
418 return self._storage.begin_write(digest)
420 def commit_write(self, digest: Digest, write_session: BinaryIO) -> None:
421 # pylint: disable=no-name-in-module,import-outside-toplevel
422 inline_blob = None
423 if digest.size_bytes > self._max_inline_blob_size:
424 self._storage.commit_write(digest, write_session)
425 else:
426 write_session.seek(0)
427 inline_blob = write_session.read()
428 try:
429 with self.session() as session:
430 self._save_digests_to_index([(digest, inline_blob)], session)
431 except DBAPIError as error:
432 # Error has pgcode attribute (Postgres only)
433 if hasattr(error.orig, 'pgcode'):
434 # imported here to avoid global dependency on psycopg2
435 from psycopg2.errors import DiskFull, OutOfMemory # type: ignore
436 # 53100 == DiskFull && 53200 == OutOfMemory
437 if error.orig.pgerror in [DiskFull, OutOfMemory]:
438 raise StorageFullError(f"Postgres Error: {error.orig.pgcode}") from error
439 raise error
441 def _partitioned_hashes(self, digests: Sequence[Digest]) -> Iterator[Iterator[str]]:
442 """ Given a long list of digests, split it into parts no larger than
443 _inclause_limit and yield the hashes in each part.
444 """
445 for part_start in range(0, len(digests), self._inclause_limit):
446 part_end = min(len(digests), part_start + self._inclause_limit)
447 part_digests = itertools.islice(digests, part_start, part_end)
448 yield map(lambda digest: digest.hash, part_digests)
450 @generator_method_duration_metric(CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME)
451 def _bulk_select_digests(self, digests: Sequence[Digest], fetch_blobs: bool = False) -> Iterator[IndexEntry]:
452 """ Generator that selects all rows matching a digest list.
454 SQLAlchemy Core is used for this because the ORM has problems with
455 large numbers of bind variables for WHERE IN clauses.
457 We only select on the digest hash (not hash and size) to allow for
458 index-only queries on db backends that support them.
459 """
460 index_table = IndexEntry.__table__
461 with self.session() as session:
462 columns = [index_table.c.digest_hash]
463 if fetch_blobs:
464 columns.append(index_table.c.inline_blob)
465 for part in self._partitioned_hashes(digests):
466 stmt = select(
467 columns
468 ).where(
469 index_table.c.digest_hash.in_(part)
470 )
471 entries = session.execute(stmt)
472 yield from entries
474 @DurationMetric(CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME, instanced=True)
475 def _bulk_refresh_timestamps(self, digests: Sequence[Digest], update_time: Optional[datetime]=None):
476 """ Refresh all timestamps of the input digests.
478 SQLAlchemy Core is used for this because the ORM is not suitable for
479 bulk inserts and updates.
481 https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
482 """
483 index_table = IndexEntry.__table__
484 # If a timestamp was passed in, use it
485 if update_time:
486 timestamp = update_time
487 else:
488 timestamp = datetime.utcnow()
489 with self.session() as session:
490 for part in self._partitioned_hashes(digests):
491 stmt = index_table.update().where(
492 index_table.c.digest_hash.in_(
493 select([index_table.c.digest_hash], index_table.c.digest_hash.in_(part)
494 ).with_for_update(skip_locked=True))
495 ).values(
496 accessed_timestamp=timestamp
497 )
498 session.execute(stmt)
500 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
501 entries = self._bulk_select_digests(digests)
503 found_hashes = {entry.digest_hash for entry in entries}
505 # Update all timestamps
506 self._bulk_refresh_timestamps(digests)
508 return [digest for digest in digests if digest.hash not in found_hashes]
510 @DurationMetric(CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME, instanced=True)
511 def _save_digests_to_index(self,
512 digest_blob_pairs: List[Tuple[Digest, Optional[bytes]]],
513 session: SessionType) -> None:
514 """ Helper to persist a list of digest/blob pairs to the index.
516 Any digests present are updated, and new digests are inserted along with their inline blobs (if provided).
517 Only blobs with size less than or equal to the max_inline_blob_size are inserted directly into the index.
518 """
519 if not digest_blob_pairs:
520 return
522 if self._dialect_delegate:
523 try:
524 self._dialect_delegate._save_digests_to_index( # type: ignore
525 digest_blob_pairs, session, self._max_inline_blob_size)
526 return
527 except AttributeError:
528 pass
530 update_time = datetime.utcnow()
531 # Figure out which digests we can just update
532 digests = [digest for (digest, blob) in digest_blob_pairs]
533 entries = self._bulk_select_digests(digests)
534 # Map digests to new entries
535 entries_not_present = {
536 digest.hash: {
537 'digest_hash': digest.hash,
538 'digest_size_bytes': digest.size_bytes,
539 'accessed_timestamp': update_time,
540 'inline_blob': (blob if digest.size_bytes <= self._max_inline_blob_size else None)
541 }
542 for (digest, blob) in digest_blob_pairs
543 }
545 entries_present = {}
546 for entry in entries:
547 entries_present[entry.digest_hash] = entries_not_present[entry.digest_hash]
548 del entries_not_present[entry.digest_hash]
550 if entries_not_present:
551 session.bulk_insert_mappings(IndexEntry, entries_not_present.values()) # type: ignore
552 if entries_present:
553 session.bulk_update_mappings(IndexEntry, entries_present.values()) # type: ignore
555 def bulk_update_blobs(self, digest_blob_pairs: List[Tuple[Digest, bytes]]) -> List[Status]:
556 """ Implement the StorageABC's bulk_update_blobs method.
558 The StorageABC interface takes in a list of digest/blob pairs and
559 returns a list of results. The list of results MUST be ordered to
560 correspond with the order of the input list. """
561 pairs_to_store = []
562 result_map = {}
564 # For each blob, determine whether to store it in the backing storage or inline it
565 for (digest, blob) in digest_blob_pairs:
566 if digest.size_bytes > self._max_inline_blob_size:
567 pairs_to_store.append((digest, blob))
568 else:
569 if validate_digest_data(digest, blob):
570 result_map[digest.hash] = Status(code=code_pb2.OK)
571 else:
572 result_map[digest.hash] = Status(
573 code=code_pb2.INVALID_ARGUMENT,
574 message="Data doesn't match hash"
575 )
576 backup_results = self._storage.bulk_update_blobs(pairs_to_store)
578 for (digest, blob), result in zip(pairs_to_store, backup_results):
579 if digest.hash in result_map:
580 # ERROR: blob was both inlined and backed up
581 raise RuntimeError(
582 "Blob was both inlined and backed up.")
583 result_map[digest.hash] = result
585 # Generate the final list of results
586 pairs_to_inline = []
587 results = []
588 for (digest, blob) in digest_blob_pairs:
589 status = result_map.get(
590 digest.hash,
591 Status(code=code_pb2.UNKNOWN, message="SQL Index: unable to determine the status of this blob")
592 )
593 results.append(status)
594 if status.code == code_pb2.OK:
595 pairs_to_inline.append((digest, blob))
597 with self.session() as session:
598 self._save_digests_to_index(pairs_to_inline, session)
600 return results
602 def _bulk_read_blobs_with_fallback(self, digests: List[Digest]) -> Dict[str, BinaryIO]:
603 hash_to_digest = {digest.hash: digest for digest in digests}
604 results: Dict[str, BinaryIO] = {}
605 # First fetch inlined blobs
606 entries = self._bulk_select_digests(digests, fetch_blobs=True)
607 for index_entry in entries:
608 if index_entry.inline_blob is not None:
609 results[index_entry.digest_hash] = BytesIO(index_entry.inline_blob)
610 hash_to_digest.pop(index_entry.digest_hash)
612 # Fetch everything that wasn't inlined
613 fetched_digests = self._storage.bulk_read_blobs(hash_to_digest.values())
615 # Save everything that was fetched, inlining the blobs if they're small enough
616 digest_pairs_to_save = []
617 for digest_hash, blob_read_head in fetched_digests.items():
618 if blob_read_head is not None:
619 digest = hash_to_digest[digest_hash]
620 blob_data = None
621 if digest.size_bytes <= self._max_inline_blob_size:
622 blob_data = read_and_rewind(blob_read_head)
623 digest_pairs_to_save.append((digest, blob_data))
624 results[digest_hash] = blob_read_head
626 with self.session() as session:
627 self._save_digests_to_index(digest_pairs_to_save, session)
629 return results
631 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, BinaryIO]:
632 if self._fallback_on_get:
633 return self._bulk_read_blobs_with_fallback(digests)
635 # If fallback is disabled, query the index first and only
636 # query the storage for blobs that weren't inlined there
638 hash_to_digest = {digest.hash: digest for digest in digests} # hash -> digest map
639 results: Dict[str, BinaryIO] = {} # The final list of results (return value)
640 digests_to_fetch = [] # Digests that need to be fetched from storage
641 digest_pairs_to_save = [] # Digests that need to be updated in the index
643 # Fetch all of the digests in the database
644 # Anything that wasn't already inlined needs to be fetched
645 entries = self._bulk_select_digests(digests, fetch_blobs=True)
646 for index_entry in entries:
647 digest = hash_to_digest[index_entry.digest_hash]
648 if index_entry.inline_blob is not None:
649 results[index_entry.digest_hash] = BytesIO(index_entry.inline_blob)
650 digest_pairs_to_save.append((digest, index_entry.inline_blob))
651 else:
652 digests_to_fetch.append(digest)
654 fetched_digests = {}
655 if digests_to_fetch:
656 fetched_digests = self._storage.bulk_read_blobs(digests_to_fetch)
658 # Generate the list of inputs for _save_digests_to_index
659 #
660 # We only need to send blob data for small blobs fetched
661 # from the storage since everything else is either too
662 # big or already inlined
663 for digest in digests_to_fetch:
664 blob_data = None
665 if (digest.size_bytes <= self._max_inline_blob_size and
666 fetched_digests.get(digest.hash) is not None):
667 blob_data = read_and_rewind(fetched_digests[digest.hash])
668 digest_pairs_to_save.append((digest, blob_data))
670 # Update timestamps and inline blobs
671 with self.session() as session:
672 self._save_digests_to_index(digest_pairs_to_save, session)
674 results.update(fetched_digests)
675 return results
677 def _column_windows(self, session: SessionType, column: Column) -> Iterator[BooleanClauseList]:
678 """ Adapted from the sqlalchemy WindowedRangeQuery recipe.
679 https://github.com/sqlalchemy/sqlalchemy/wiki/WindowedRangeQuery
681 This method breaks the timestamp range into windows and yields
682 the borders of these windows to the callee. For example, the borders
683 yielded by this might look something like
684 ('2019-10-08 18:25:03.699863', '2019-10-08 18:25:03.751018')
685 ('2019-10-08 18:25:03.751018', '2019-10-08 18:25:03.807867')
686 ('2019-10-08 18:25:03.807867', '2019-10-08 18:25:03.862192')
687 ('2019-10-08 18:25:03.862192',)
689 _windowed_lru_digests uses these borders to form WHERE clauses for its
690 SELECTs. In doing so, we make sure to repeatedly query the database for
691 live updates, striking a balance between loading the entire resultset
692 into memory and querying each row individually, both of which are
693 inefficient in the context of a large index.
695 The window size is a parameter and can be configured. A larger window
696 size will yield better performance (fewer SQL queries) at the cost of
697 memory (holding on to the results of the query) and accuracy (blobs
698 may get updated while you're working on them), and vice versa for a
699 smaller window size.
700 """
702 def int_for_range(start_id: Any, end_id: Any) -> BooleanClauseList:
703 if end_id:
704 return and_(
705 column >= start_id,
706 column < end_id
707 )
708 else:
709 return column >= start_id
711 q = session.query(
712 column,
713 func.row_number()
714 .over(order_by=column)
715 .label('rownum')
716 ).from_self(column)
718 if self._all_blobs_window_size > 1:
719 q = q.filter(text(f"rownum % {self._all_blobs_window_size}=1"))
721 intervals = [id for id, in q]
723 while intervals:
724 start = intervals.pop(0)
725 if intervals:
726 end = intervals[0]
727 else:
728 end = None
729 yield int_for_range(start, end)
731 def _windowed_lru_digests(self, q: Query, column: Column) -> Iterator[IndexEntry]:
732 """ Generate a query for each window produced by _column_windows
733 and yield the results one by one.
734 """
735 for whereclause in self._column_windows(q.session, column): # type: ignore
736 window = q.filter(whereclause).order_by(column.asc())
737 yield from window
739 def least_recent_digests(self) -> Iterator[Digest]:
740 with self.session() as session:
741 q = session.query(IndexEntry)
742 for index_entry in self._windowed_lru_digests(q, IndexEntry.accessed_timestamp):
743 yield Digest(hash=index_entry.digest_hash, size_bytes=index_entry.digest_size_bytes)
745 @DurationMetric(CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME)
746 def get_total_size(self) -> int:
747 statement = select(func.sum(IndexEntry.digest_size_bytes))
748 with self.session() as session:
749 return session.execute(statement).scalar()
751 def mark_n_bytes_as_deleted(self, n_bytes: int, dry_run: bool=False,
752 protect_blobs_after: Optional[datetime]=None) -> List[Digest]:
753 # pylint: disable=singleton-comparison
754 if protect_blobs_after is None:
755 protect_blobs_after = datetime.utcnow()
756 gathered = 0
757 with self.session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
758 size_query = session.query(func.sum(IndexEntry.digest_size_bytes))
759 premarked_size = size_query.filter_by(deleted=True).scalar()
760 if premarked_size is None:
761 premarked_size = 0
763 q = session.query(IndexEntry)
764 entries = q.filter_by(deleted=True).order_by(IndexEntry.accessed_timestamp).all()
765 monitoring_bus = get_monitoring_bus()
766 if monitoring_bus.is_enabled:
767 metadata = {}
768 if self._instance_name:
769 metadata["instance-name"] = self._instance_name
770 publish_counter_metric(
771 CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME,
772 len(entries),
773 metadata
774 )
776 if premarked_size < n_bytes:
777 gathered = premarked_size
778 q = q.filter(IndexEntry.deleted == False, IndexEntry.accessed_timestamp <
779 protect_blobs_after).with_for_update(skip_locked=True)
780 iterator = self._windowed_lru_digests(q, IndexEntry.accessed_timestamp)
781 while gathered < n_bytes:
782 try:
783 index_entry = next(iterator)
784 except StopIteration:
785 break
786 gathered += index_entry.digest_size_bytes # type: ignore
787 self.__logger.debug(f"Gathering {gathered} out of {n_bytes} bytes(max)")
788 entries.append(index_entry)
789 if not dry_run:
790 index_entry.deleted = True
792 return [
793 Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes)
794 for entry in entries
795 ]
797 def bulk_delete(self, digests: List[Digest]) -> List[str]:
798 self.__logger.debug(f"Deleting {len(digests)} digests from the index")
799 index_table = IndexEntry.__table__
800 hashes = [x.hash for x in digests]
802 # Make sure we don't exceed maximum size of an IN clause
803 n = self._inclause_limit
804 hash_chunks = [hashes[i:i + n] for i in range(0, len(hashes), n)]
806 # We will not raise, rollback, or log on StaleDataErrors.
807 # These errors occur when we delete fewer rows than we were expecting.
808 # This is fine, since the missing rows will get deleted eventually.
809 # When running bulk_deletes concurrently, StaleDataErrors
810 # occur too often to log.
811 num_blobs_deleted = 0
812 with self.session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
813 for chunk in hash_chunks:
814 # Do not wait for locks when deleting rows. Skip locked rows to
815 # avoid deadlocks.
816 stmt = index_table.delete().where(
817 index_table.c.digest_hash.in_(select(
818 [index_table.c.digest_hash], index_table.c.digest_hash.in_(chunk)
819 ).with_for_update(skip_locked=True))
820 )
821 num_blobs_deleted += session.execute(stmt).rowcount
822 self.__logger.debug(f"{num_blobs_deleted}/{len(digests)} blobs deleted from the index")
824 # bulk_delete is typically expected to return the digests that were not deleted,
825 # but delete only returns the number of rows deleted and not what was/wasn't
826 # deleted. Getting this info would require extra queries, so assume that
827 # everything was either deleted or already deleted. Failures will continue to throw
828 return []