Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql.py: 92.97%
427 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17SQLIndex
18==================
20A SQL index implementation. This can be pointed to either a remote SQL server
21or a local SQLite database.
23"""
24import io
25import itertools
26import time
27from collections import deque
28from datetime import datetime, timedelta
29from typing import IO, Any, AnyStr, Deque, Iterator, Sequence, cast
31from sqlalchemy import ColumnElement, Table, and_, delete, func, not_, select
32from sqlalchemy.exc import DBAPIError
33from sqlalchemy.orm import InstrumentedAttribute, Session, load_only
34from sqlalchemy.orm.exc import StaleDataError
35from sqlalchemy.orm.query import Query
36from sqlalchemy.orm.session import Session as SessionType
38from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
39from buildgrid._protos.google.rpc import code_pb2
40from buildgrid._protos.google.rpc.status_pb2 import Status
41from buildgrid.server.decorators import timed
42from buildgrid.server.exceptions import StorageFullError
43from buildgrid.server.logging import buildgrid_logger
44from buildgrid.server.metrics_names import METRIC
45from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric, publish_timer_metric
46from buildgrid.server.sql.models import IndexEntry
47from buildgrid.server.sql.provider import SqlProvider
48from buildgrid.server.utils.digests import validate_digest_data
50from ..storage_abc import StorageABC
51from .index_abc import IndexABC
52from .sql_dialect_delegates import PostgreSQLDelegate, SQLiteDelegate
54LOGGER = buildgrid_logger(__name__)
55DIALECT_DELEGATES = {"postgresql": PostgreSQLDelegate, "sqlite": SQLiteDelegate}
57INLINE_BLOB_SIZE_HARD_MAXIMUM = 1000000000
60def read_and_rewind(read_head: IO[AnyStr]) -> AnyStr | None:
61 """Reads from an IO object and returns the data found there
62 after rewinding the object to the beginning.
64 Args:
65 read_head (IO): readable IO head
67 Returns:
68 AnyStr: readable content from `read_head`.
69 """
70 if not read_head:
71 return None
73 data = read_head.read()
74 read_head.seek(0)
75 return data
78class SQLIndex(IndexABC):
79 TYPE = "SQLIndex"
81 def __init__(
82 self,
83 sql_provider: SqlProvider,
84 storage: StorageABC,
85 *,
86 window_size: int = 1000,
87 inclause_limit: int = -1,
88 max_inline_blob_size: int = 0,
89 refresh_accesstime_older_than: int = 0,
90 **kwargs: Any,
91 ) -> None:
92 base_argnames = ["fallback_on_get"]
93 base_args = {}
94 for arg in base_argnames:
95 if arg in kwargs:
96 base_args[arg] = kwargs.pop(arg)
97 super().__init__(**base_args)
99 self._sql = sql_provider
100 self._storage = storage
102 if max_inline_blob_size > INLINE_BLOB_SIZE_HARD_MAXIMUM:
103 raise ValueError(
104 f"Max inline blob size is [{max_inline_blob_size}], "
105 f"but must be less than [{INLINE_BLOB_SIZE_HARD_MAXIMUM}]."
106 )
107 if max_inline_blob_size >= 0:
108 self._max_inline_blob_size = max_inline_blob_size
109 else:
110 raise ValueError(f"Max inline blob size is [{max_inline_blob_size}], but must be nonnegative.")
112 if refresh_accesstime_older_than >= 0:
113 # Measured in seconds. Helps reduce the frequency of timestamp updates during heavy read.
114 self.refresh_accesstime_older_than = refresh_accesstime_older_than
115 else:
116 raise ValueError(
117 f"'refresh_accesstime_older_than' must be nonnegative. It is {refresh_accesstime_older_than}"
118 )
120 # Max # of rows to fetch while iterating over all blobs
121 # (e.g. in least_recent_digests)
122 self._all_blobs_window_size = window_size
124 # This variable stores the list of whereclauses (SQLAlchemy BooleanClauseList objects)
125 # generated from the _column_windows() using time-expensive SQL query.
126 # These whereclauses are used to construct the final SQL query
127 # during cleanup in order to fetch blobs by time windows.
128 #
129 # Inside the _column_windows() a list of timestamp boarders are obtained:
130 # intervals = [t1, t2, t3, ...]
131 # Then the generated whereclauses might represent semantically as, for example,:
132 # self._queue_of_whereclauses = [
133 # "WHERE t1 <= IndexEntry.accessed_timestamp < t2",
134 # "WHERE t2 <= IndexEntry.accessed_timestamp < t3",
135 # "WHERE t3 <= IndexEntry.accessed_timestamp < t4",
136 # ... and so on]
137 # Note the number of entries in each window is determined by
138 # the instance variable "_all_blobs_window_size".
139 self._queue_of_whereclauses: Deque[ColumnElement[bool]] = deque()
141 # Whether entries with deleted=True should be considered by mark_n_bytes.
142 # This is useful to catch any half-finished deletes where a cleanup process
143 # may have exited half-way through deletion. Once all premarked blobs have been
144 # deleted this becomes False and is only reset after a full scan of the database
145 self._delete_premarked_blobs: bool = True
147 # Only pass known kwargs to db session
148 available_options = {"pool_size", "max_overflow", "pool_timeout", "pool_pre_ping", "pool_recycle"}
149 kwargs_keys = kwargs.keys()
150 if kwargs_keys > available_options:
151 unknown_args = kwargs_keys - available_options
152 raise TypeError(f"Unknown keyword arguments: [{unknown_args}]")
154 # Dialect-specific initialization
155 self._dialect_delegate = DIALECT_DELEGATES.get(self._sql.dialect)
157 if inclause_limit > 0:
158 if inclause_limit > window_size:
159 LOGGER.warning(
160 "Configured inclause limit is greater than window size.",
161 tags=dict(inclause_limit=inclause_limit, window_size=window_size),
162 )
163 self._inclause_limit = inclause_limit
164 else:
165 # If the inlimit isn't explicitly set, we use a default that
166 # respects both the window size and the db implementation's
167 # inlimit.
168 self._inclause_limit = min(window_size, self._sql.default_inlimit)
169 LOGGER.debug("SQL index: using default inclause limit.", tags=dict(inclause_limit=self._inclause_limit))
171 # Make a test query against the database to ensure the connection is valid
172 with self._sql.scoped_session() as session:
173 session.query(IndexEntry).first()
174 self._sql.remove_scoped_session()
176 def start(self) -> None:
177 self._storage.start()
179 def stop(self) -> None:
180 self._storage.stop()
182 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
183 def has_blob(self, digest: Digest) -> bool:
184 with self._sql.scoped_session() as session:
185 statement = select(func.count(IndexEntry.digest_hash)).where(IndexEntry.digest_hash == digest.hash)
187 num_entries = session.execute(statement).scalar()
188 if num_entries is None:
189 num_entries = 0
191 if num_entries == 1:
192 return True
193 elif num_entries < 1:
194 return False
195 else:
196 raise RuntimeError(f"Multiple results found for blob [{digest}]. The index is in a bad state.")
198 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
199 def get_blob(self, digest: Digest) -> IO[bytes] | None:
200 """Get a blob from the index or the backing storage. Optionally fallback and repair index"""
202 # Check the index for the blob and return if found.
203 with self._sql.scoped_session() as session:
204 if entry := session.query(IndexEntry).filter(IndexEntry.digest_hash == digest.hash).first():
205 if entry.inline_blob is not None:
206 return io.BytesIO(entry.inline_blob)
207 elif blob := self._storage.get_blob(digest):
208 # Fix any blobs that should have been inlined.
209 if digest.size_bytes <= self._max_inline_blob_size:
210 self._save_digests_to_index([(digest, read_and_rewind(blob))], session)
211 session.commit()
212 return blob
213 LOGGER.warning(
214 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest)
215 )
216 self._bulk_delete_from_index([digest], session)
218 # Check the storage for the blob and repair the index if found.
219 if self._fallback_on_get:
220 if blob := self._storage.get_blob(digest):
221 with self._sql.scoped_session() as session:
222 if digest.size_bytes <= self._max_inline_blob_size:
223 self._save_digests_to_index([(digest, read_and_rewind(blob))], session)
224 else:
225 self._save_digests_to_index([(digest, None)], session)
226 session.commit()
227 return blob
229 # Blob was not found in index or storage
230 return None
232 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
233 def delete_blob(self, digest: Digest) -> None:
234 statement = delete(IndexEntry).where(IndexEntry.digest_hash == digest.hash)
235 options = {"synchronize_session": False}
237 with self._sql.scoped_session() as session:
238 session.execute(statement, execution_options=options)
240 self._storage.delete_blob(digest)
242 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
243 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
244 inline_blob = None
245 if digest.size_bytes > self._max_inline_blob_size:
246 self._storage.commit_write(digest, write_session)
247 else:
248 write_session.seek(0)
249 inline_blob = write_session.read()
250 try:
251 with self._sql.scoped_session() as session:
252 self._save_digests_to_index([(digest, inline_blob)], session)
253 except DBAPIError as error:
254 # Error has pgcode attribute (Postgres only)
255 if hasattr(error.orig, "pgcode"):
256 # imported here to avoid global dependency on psycopg2
257 from psycopg2.errors import DiskFull, Error, OutOfMemory
259 # 53100 == DiskFull && 53200 == OutOfMemory
260 original_exception = cast(Error, error.orig)
261 if isinstance(original_exception, (DiskFull, OutOfMemory)):
262 raise StorageFullError(
263 f"Postgres Error: {original_exception.pgerror} ({original_exception.pgcode}"
264 ) from error
265 raise error
267 def _partitioned_hashes(self, digests: Sequence[Digest]) -> Iterator[Iterator[str]]:
268 """Given a long list of digests, split it into parts no larger than
269 _inclause_limit and yield the hashes in each part.
270 """
271 for part_start in range(0, len(digests), self._inclause_limit):
272 part_end = min(len(digests), part_start + self._inclause_limit)
273 part_digests = itertools.islice(digests, part_start, part_end)
274 yield map(lambda digest: digest.hash, part_digests)
276 def _bulk_select_digests(
277 self, digests: Sequence[Digest], fetch_blobs: bool = False, fetch_deleted: bool = True
278 ) -> Iterator[IndexEntry]:
279 """Generator that selects all rows matching a digest list.
281 SQLAlchemy Core is used for this because the ORM has problems with
282 large numbers of bind variables for WHERE IN clauses.
284 We only select on the digest hash (not hash and size) to allow for
285 index-only queries on db backends that support them.
286 """
287 index_table = IndexEntry.__table__
288 with self._sql.scoped_session() as session:
289 columns = [index_table.c.digest_hash]
290 if fetch_blobs:
291 columns.append(index_table.c.inline_blob)
292 for part in self._partitioned_hashes(digests):
293 stmt = select(*columns).where(index_table.c.digest_hash.in_(part))
294 if not fetch_deleted:
295 stmt = stmt.where(not_(index_table.c.deleted))
296 entries = session.execute(stmt)
297 yield from entries # type: ignore
299 @timed(METRIC.STORAGE.SQL_INDEX.UPDATE_TIMESTAMP_DURATION)
300 def _bulk_refresh_timestamps(
301 self, digests: Sequence[Digest], session: SessionType, update_time: datetime | None = None
302 ) -> None:
303 """Refresh all timestamps of the input digests.
305 SQLAlchemy Core is used for this because the ORM is not suitable for
306 bulk inserts and updates.
308 https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
309 """
310 # See discussion of __table__ typing in https://github.com/sqlalchemy/sqlalchemy/issues/9130
311 index_table = cast(Table, IndexEntry.__table__)
312 current_time = datetime.utcnow()
314 # If a timestamp was passed in, use it. And always refreshes (no threshold).
315 if update_time:
316 timestamp = update_time
317 last_accessed_threshold = current_time
318 # Otherwise timestamp of digest will not refresh if it was last accessed more recent than this threshold.
319 else:
320 timestamp = current_time
321 last_accessed_threshold = current_time - timedelta(seconds=self.refresh_accesstime_older_than)
323 for part in self._partitioned_hashes(digests):
324 # Generate the SQL Statement:
325 # UPDATE index SET accessed_timestamp=<timestamp>
326 # WHERE index.digest_hash IN
327 # (SELECT index.digest_hash FROM index
328 # WHERE index.digest_hash IN <part> AND WHERE index.accessed_timestamp < <last_accessed_threshold>
329 # FOR UPDATE SKIP LOCKED)
330 stmt = (
331 index_table.update()
332 .where(
333 index_table.c.digest_hash.in_(
334 select(index_table.c.digest_hash)
335 .where(index_table.c.digest_hash.in_(part))
336 .where(index_table.c.accessed_timestamp < last_accessed_threshold)
337 .with_for_update(skip_locked=True)
338 )
339 )
340 .values(accessed_timestamp=timestamp)
341 )
342 session.execute(stmt)
343 session.commit()
345 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
346 def missing_blobs(self, digests: list[Digest]) -> list[Digest]:
347 # Blobs marked as deleted are considered as missing
348 entries = self._bulk_select_digests(digests, fetch_deleted=False)
350 found_hashes = {entry.digest_hash for entry in entries}
352 # Split the digests into two found/missing lists
353 found_digests, missing_digests = [], []
354 for digest in digests:
355 if digest.hash in found_hashes:
356 found_digests.append(digest)
357 else:
358 missing_digests.append(digest)
360 # Update all timestamps for blobs which were found
361 with self._sql.scoped_session() as session:
362 self._bulk_refresh_timestamps(found_digests, session)
364 return missing_digests
366 @timed(METRIC.STORAGE.SQL_INDEX.SAVE_DIGESTS_DURATION)
367 def _save_digests_to_index(
368 self, digest_blob_pairs: list[tuple[Digest, bytes | None]], session: SessionType
369 ) -> None:
370 """Helper to persist a list of digest/blob pairs to the index.
372 Any digests present are updated, and new digests are inserted along with their inline blobs (if provided).
373 Only blobs with size less than or equal to the max_inline_blob_size are inserted directly into the index.
374 """
375 if not digest_blob_pairs:
376 return
378 digest_blob_pairs = sorted(digest_blob_pairs, key=lambda pair: (pair[0].hash, pair[0].size_bytes))
380 if self._dialect_delegate:
381 try:
382 self._dialect_delegate._save_digests_to_index( # type: ignore
383 digest_blob_pairs, session, self._max_inline_blob_size
384 )
385 return
386 except AttributeError:
387 pass
389 update_time = datetime.utcnow()
390 # Figure out which digests we can just update
391 digests = [digest for (digest, blob) in digest_blob_pairs]
392 entries = self._bulk_select_digests(digests)
393 # Map digests to new entries
394 entries_not_present = {
395 digest.hash: {
396 "digest_hash": digest.hash,
397 "digest_size_bytes": digest.size_bytes,
398 "accessed_timestamp": update_time,
399 "inline_blob": (blob if digest.size_bytes <= self._max_inline_blob_size else None),
400 "deleted": False,
401 }
402 for (digest, blob) in digest_blob_pairs
403 }
405 entries_present = {}
406 for entry in entries:
407 entries_present[entry.digest_hash] = entries_not_present[entry.digest_hash]
408 del entries_not_present[entry.digest_hash]
410 if entries_not_present:
411 session.bulk_insert_mappings(IndexEntry, entries_not_present.values()) # type: ignore
412 if entries_present:
413 session.bulk_update_mappings(IndexEntry, entries_present.values()) # type: ignore
415 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
416 def bulk_update_blobs( # pylint: disable=arguments-renamed
417 self, digest_blob_pairs: list[tuple[Digest, bytes]]
418 ) -> list[Status]:
419 """Implement the StorageABC's bulk_update_blobs method.
421 The StorageABC interface takes in a list of digest/blob pairs and
422 returns a list of results. The list of results MUST be ordered to
423 correspond with the order of the input list."""
424 pairs_to_store = []
425 result_map = {}
427 # For each blob, determine whether to store it in the backing storage or inline it
428 for digest, blob in digest_blob_pairs:
429 if validate_digest_data(digest, blob):
430 if digest.size_bytes > self._max_inline_blob_size:
431 pairs_to_store.append((digest, blob))
432 else:
433 result_map[digest.hash] = Status(code=code_pb2.OK)
434 else:
435 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash")
436 missing_blobs = self.missing_blobs([digest for digest, _ in pairs_to_store])
437 missing_blob_pairs = []
438 for digest, blob in pairs_to_store:
439 if digest not in missing_blobs:
440 result_map[digest.hash] = Status(code=code_pb2.OK)
441 else:
442 missing_blob_pairs.append((digest, blob))
444 backup_results = self._storage.bulk_update_blobs(missing_blob_pairs)
446 for digest, result in zip(missing_blobs, backup_results):
447 if digest.hash in result_map:
448 # ERROR: blob was both inlined and backed up
449 raise RuntimeError("Blob was both inlined and backed up.")
450 result_map[digest.hash] = result
452 # Generate the final list of results
453 pairs_to_inline: list[tuple[Digest, bytes | None]] = []
454 results = []
455 for digest, blob in digest_blob_pairs:
456 status = result_map.get(
457 digest.hash,
458 Status(code=code_pb2.UNKNOWN, message="SQL Index: unable to determine the status of this blob"),
459 )
460 results.append(status)
461 if status.code == code_pb2.OK:
462 pairs_to_inline.append((digest, blob))
464 with self._sql.scoped_session() as session:
465 self._save_digests_to_index(pairs_to_inline, session)
467 return results
469 def _bulk_read_blobs_with_fallback(self, digests: list[Digest]) -> dict[str, bytes]:
470 hash_to_digest: dict[str, Digest] = {digest.hash: digest for digest in digests}
471 results: dict[str, bytes] = {}
473 expected_storage_digests: list[Digest] = []
474 # Fetch inlined blobs directly from the index
475 entries = self._bulk_select_digests(digests, fetch_blobs=True)
476 for e in entries:
477 blob, digest_hash, digest = e.inline_blob, e.digest_hash, hash_to_digest[e.digest_hash]
478 if blob is not None:
479 results[digest_hash] = blob
480 hash_to_digest.pop(digest_hash)
481 else:
482 # If a blob is not inlined then the blob is expected to be in storage
483 expected_storage_digests.append(digest)
485 # Fetch everything that wasn't inlined from the backing storage
486 fetched_digests = self._storage.bulk_read_blobs(list(hash_to_digest.values()))
488 # Save everything fetched from storage, inlining the blobs if they're small enough
489 digest_pairs_to_save: list[tuple[Digest, bytes | None]] = []
490 for digest_hash, blob_data in fetched_digests.items():
491 if blob_data is not None:
492 digest = hash_to_digest[digest_hash]
493 if digest.size_bytes <= self._max_inline_blob_size:
494 digest_pairs_to_save.append((digest, blob_data))
495 else:
496 digest_pairs_to_save.append((digest, None))
497 results[digest_hash] = blob_data
499 # List of digests found in storage
500 acutal_storage_digest_hashes = set(
501 digest_hash for (digest_hash, blob_data) in fetched_digests.items() if blob_data is not None
502 )
503 # Get a list of all the digests that were in the index but not found in storage
504 digests_expected_not_in_storage: list[Digest] = []
505 for expected_digest in expected_storage_digests:
506 if expected_digest.hash not in acutal_storage_digest_hashes:
507 LOGGER.warning(
508 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest)
509 )
510 digests_expected_not_in_storage.append(expected_digest)
512 with self._sql.scoped_session() as session:
513 self._save_digests_to_index(digest_pairs_to_save, session)
514 if digests_expected_not_in_storage:
515 self._bulk_delete_from_index(digests_expected_not_in_storage, session)
516 session.commit()
518 return results
520 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
521 def bulk_read_blobs(self, digests: list[Digest]) -> dict[str, bytes]:
522 if self._fallback_on_get:
523 return self._bulk_read_blobs_with_fallback(digests)
525 # If fallback is disabled, query the index first and only
526 # query the storage for blobs that weren't inlined there
528 hash_to_digest = {digest.hash: digest for digest in digests} # hash -> digest map
529 results: dict[str, bytes] = {} # The final list of results (return value)
530 digests_to_fetch: list[Digest] = [] # Digests that need to be fetched from storage
531 digest_pairs_to_save: list[tuple[Digest, bytes | None]] = [] # Digests that need to be updated in the index
533 # Fetch all of the digests in the database
534 # Anything that wasn't already inlined needs to be fetched
535 entries = self._bulk_select_digests(digests, fetch_blobs=True)
536 for index_entry in entries:
537 digest = hash_to_digest[index_entry.digest_hash]
538 if index_entry.inline_blob is not None:
539 results[index_entry.digest_hash] = index_entry.inline_blob
540 else:
541 digests_to_fetch.append(digest)
543 # Caution: digest whose blob cannot be found from storage will be dropped.
544 if digests_to_fetch:
545 fetched_digests = self._storage.bulk_read_blobs(digests_to_fetch)
546 else:
547 fetched_digests = {}
549 # Generate the list of inputs for _save_digests_to_index
550 #
551 # We only need to send blob data for small blobs fetched
552 # from the storage since everything else is either too
553 # big or already inlined
554 for digest in digests_to_fetch:
555 if blob_data := fetched_digests.get(digest.hash):
556 if digest.size_bytes <= self._max_inline_blob_size:
557 digest_pairs_to_save.append((digest, blob_data))
559 acutal_storage_digests = set(digest_hash for (digest_hash, _) in fetched_digests.items())
560 # Get a list of all the digests that were in the index but not found in storage
561 digests_expected_not_in_storage: list[Digest] = []
562 for expected_digest in digests_to_fetch:
563 if expected_digest.hash not in acutal_storage_digests:
564 LOGGER.warning(
565 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest)
566 )
567 digests_expected_not_in_storage.append(expected_digest)
569 # Update any blobs which need to be inlined
570 with self._sql.scoped_session() as session:
571 self._save_digests_to_index(digest_pairs_to_save, session)
572 if digests_expected_not_in_storage:
573 self._bulk_delete_from_index(digests_expected_not_in_storage, session)
574 session.commit()
576 results.update(fetched_digests)
577 return results
579 def _column_windows(
580 self, session: SessionType, column: InstrumentedAttribute[Any]
581 ) -> Iterator[ColumnElement[bool]]:
582 """Adapted from the sqlalchemy WindowedRangeQuery recipe.
583 https://github.com/sqlalchemy/sqlalchemy/wiki/WindowedRangeQuery
585 This method breaks the timestamp range into windows and yields
586 the borders of these windows to the callee. For example, the borders
587 yielded by this might look something like
588 ('2019-10-08 18:25:03.699863', '2019-10-08 18:25:03.751018')
589 ('2019-10-08 18:25:03.751018', '2019-10-08 18:25:03.807867')
590 ('2019-10-08 18:25:03.807867', '2019-10-08 18:25:03.862192')
591 ('2019-10-08 18:25:03.862192',)
593 _windowed_lru_digests uses these borders to form WHERE clauses for its
594 SELECTs. In doing so, we make sure to repeatedly query the database for
595 live updates, striking a balance between loading the entire resultset
596 into memory and querying each row individually, both of which are
597 inefficient in the context of a large index.
599 The window size is a parameter and can be configured. A larger window
600 size will yield better performance (fewer SQL queries) at the cost of
601 memory (holding on to the results of the query) and accuracy (blobs
602 may get updated while you're working on them), and vice versa for a
603 smaller window size.
604 """
606 def int_for_range(start_id: Any, end_id: Any) -> ColumnElement[bool]:
607 if end_id:
608 return and_(column >= start_id, column < end_id)
609 else:
610 return column >= start_id # type: ignore[no-any-return]
612 # Constructs a query that:
613 # 1. Gets all the timestamps in sorted order.
614 # 2. Assign a row number to each entry.
615 # 3. Only keep timestamps that are every other N row number apart. N="_all_blobs_window_size".
616 # SELECT
617 # anon_1.index_accessed_timestamp AS anon_1_index_accessed_timestamp
618 # FROM (
619 # SELECT
620 # index.accessed_timestamp AS index_accessed_timestamp,
621 # row_number() OVER (ORDER BY index.accessed_timestamp) AS rownum
622 # FROM index
623 # )
624 # AS anon_1
625 # WHERE rownum % 1000=1
626 #
627 # Note:
628 # - This query can be slow due to checking each entry with "WHERE rownum % 1000=1".
629 # - These timestamps will be the basis for constructing the SQL "WHERE" clauses later.
630 rownum = func.row_number().over(order_by=column).label("rownum")
631 subq = select(column, rownum).subquery()
633 # The upstream recipe noted in the docstring uses `subq.corresponding_column` here. That
634 # method takes a KeyedColumnElement, which the ORM InstrumentedAttributes are not instances
635 # of. Rather than switching to passing actual columns here, we can take advantage of controlling
636 # the initial `select` to instead use the subquery columns directly and avoid ever calling this
637 # method.
638 #
639 # See https://github.com/sqlalchemy/sqlalchemy/discussions/10325#discussioncomment-6952547.
640 target_column = subq.columns[0]
641 rownum_column = subq.columns[1]
643 stmt = select(target_column)
644 if self._all_blobs_window_size > 1:
645 stmt = stmt.filter(rownum_column % self._all_blobs_window_size == 1)
647 # Execute the underlying query against the database.
648 # Ex: intervals = [t1, t1001, t2001, ...], q = [(t1, ), (t1001, ), (t2001, ), ...]
649 intervals = list(session.scalars(stmt))
651 # Generate the whereclauses
652 while intervals:
653 start = intervals.pop(0)
654 if intervals:
655 end = intervals[0]
656 else:
657 end = None
658 # Ex: yield "WHERE IndexEntry.accessed_timestamp >= start AND IndexEntry.accessed_timestamp < end"
659 yield int_for_range(start, end)
661 def _windowed_lru_digests(
662 self, q: "Query[Any]", column: InstrumentedAttribute[Any]
663 ) -> Iterator[tuple[IndexEntry, bool]]:
664 """Generate a query for each window produced by _column_windows
665 and yield the results one by one.
666 """
667 # Determine whether the conditions are met to make an SQL call to get new windows.
668 msg = "Using stored LRU windows"
669 if len(self._queue_of_whereclauses) == 0:
670 msg = "Requesting new LRU windows."
671 self._queue_of_whereclauses = deque(self._column_windows(q.session, column))
672 self._delete_premarked_blobs = True
674 msg += f" Number of windows remaining: {len(self._queue_of_whereclauses)}"
675 LOGGER.debug(msg)
677 while self._queue_of_whereclauses:
678 whereclause = self._queue_of_whereclauses[0]
679 window = q.filter(whereclause).order_by(column.asc())
680 yield from window
682 # If yield from window doesn't get to this point that means
683 # the cleanup hasn't consumed all the content in a whereclause and exited.
684 # Otherwise, the whereclause is exhausted and can be discarded.
685 self._queue_of_whereclauses.popleft()
687 def least_recent_digests(self) -> Iterator[Digest]:
688 with self._sql.scoped_session() as session:
689 # TODO: session.query is legacy, we should replace this with the `select` construct
690 # as we do elsewhere.
691 q = session.query(IndexEntry)
692 for entry in self._windowed_lru_digests(q, IndexEntry.accessed_timestamp):
693 # TODO make this generic or delete this method only used by tests.
694 index_entry = cast(IndexEntry, entry)
695 assert isinstance(index_entry.digest_hash, str)
696 assert isinstance(index_entry.digest_size_bytes, int)
697 yield Digest(hash=index_entry.digest_hash, size_bytes=index_entry.digest_size_bytes)
699 @timed(METRIC.STORAGE.SQL_INDEX.SIZE_CALCULATION_DURATION)
700 def get_total_size(self) -> int:
701 statement = select(func.sum(IndexEntry.digest_size_bytes))
702 with self._sql.scoped_session() as session:
703 result = session.execute(statement).scalar()
704 if result is None:
705 result = 0
706 return result
708 def get_blob_count(self) -> int:
709 with self._sql.scoped_session() as session:
710 statement = select(func.count(IndexEntry.digest_hash))
711 return session.execute(statement).scalar() or 0
713 @timed(METRIC.STORAGE.SQL_INDEX.DELETE_N_BYTES_DURATION)
714 def delete_n_bytes(
715 self,
716 n_bytes: int,
717 dry_run: bool = False,
718 protect_blobs_after: datetime | None = None,
719 large_blob_threshold: int | None = None,
720 large_blob_lifetime: datetime | None = None,
721 ) -> int:
722 """
723 When using a SQL Index, entries with a delete marker are "in the process of being deleted".
724 This is required because storage operations can't be safely tied to the SQL index transaction
725 (one may fail independently of the other, and you end up inconsistent).
727 The workflow is roughly as follows:
728 - Start a SQL transaction.
729 - Lock and mark the indexed items you want to delete.
730 - Close the SQL transaction.
731 - Perform the storage deletes
732 - Start a SQL transaction.
733 - Actually delete the index entries.
734 - Close the SQL transaction.
736 This means anything with deleted=False will always be present in the backing store. If it is marked
737 deleted=True, and the process gets killed when deleting from the backing storage, only
738 some of the items might actually be gone.
740 The next time the cleaner starts up, it can try to do that delete again (ignoring 404s).
741 Eventually that will succeed and the item will actually be removed from the DB. Only during
742 the first run of batches do we consider already marked items. This avoids multiple cleanup
743 daemons from competing with each other on every batch.
744 """
745 if protect_blobs_after is None:
746 protect_blobs_after = datetime.utcnow()
748 # Used for metric publishing
749 delete_start_time = time.time()
751 storage_digests: list[Digest] = []
752 marked_digests: list[Digest] = []
753 collected_bytes = 0
755 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
756 base_query = session.query(IndexEntry, IndexEntry.inline_blob != None).options( # noqa
757 load_only(IndexEntry.digest_hash, IndexEntry.digest_size_bytes)
758 )
760 if self._delete_premarked_blobs:
761 LOGGER.info("Starting to gather pre-marked records.")
762 premarked_query = base_query.filter_by(deleted=True)
763 for [entry, is_inline] in premarked_query.all():
764 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes)
765 marked_digests.append(digest)
766 if not is_inline:
767 storage_digests.append(digest)
768 collected_bytes += entry.digest_size_bytes
770 if not dry_run:
771 publish_counter_metric(METRIC.STORAGE.SQL_INDEX.PREMARKED_DELETED_COUNT, len(marked_digests))
772 LOGGER.info(
773 "Gathered pre-marked bytes.", tags=dict(collected_bytes=collected_bytes, max_bytes=n_bytes)
774 )
775 self._delete_premarked_blobs = False
777 if collected_bytes < n_bytes:
778 LOGGER.info("Searching for records to mark deleted.")
779 unmarked_query = base_query.filter_by(deleted=False).with_for_update(skip_locked=True)
780 if large_blob_lifetime and large_blob_threshold:
781 unmarked_query = unmarked_query.filter(
782 (IndexEntry.accessed_timestamp < protect_blobs_after)
783 | (
784 (IndexEntry.digest_size_bytes > large_blob_threshold)
785 & (IndexEntry.accessed_timestamp < large_blob_lifetime)
786 )
787 )
788 else:
789 unmarked_query = unmarked_query.filter(IndexEntry.accessed_timestamp < protect_blobs_after)
790 window = self._windowed_lru_digests(unmarked_query, IndexEntry.accessed_timestamp)
791 mark_deleted_start = time.perf_counter()
792 for [entry, is_inline] in window:
793 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes)
794 marked_digests.append(digest)
795 if not is_inline:
796 storage_digests.append(digest)
797 collected_bytes += entry.digest_size_bytes
798 if not dry_run:
799 entry.deleted = True
800 if collected_bytes >= n_bytes:
801 break
802 mark_deleted_duration = timedelta(seconds=time.perf_counter() - mark_deleted_start)
803 if not dry_run:
804 publish_timer_metric(METRIC.STORAGE.SQL_INDEX.MARK_DELETED_DURATION, mark_deleted_duration)
805 LOGGER.info("Gathered bytes.", tags=dict(collected_bytes=collected_bytes, max_bytes=n_bytes))
807 if dry_run:
808 return collected_bytes
810 failed_deletes = self._storage.bulk_delete(storage_digests)
811 digests_to_delete = [x for x in marked_digests if f"{x.hash}/{x.size_bytes}" not in failed_deletes]
813 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
814 failed_deletes.extend(self._bulk_delete_from_index(digests_to_delete, session))
815 for digest in digests_to_delete:
816 if digest in failed_deletes:
817 collected_bytes -= digest.size_bytes
819 batch_duration = time.time() - delete_start_time
820 blobs_deleted_per_second = (len(digests_to_delete) - len(failed_deletes)) / batch_duration
821 publish_gauge_metric(METRIC.CLEANUP.BLOBS_DELETED_PER_SECOND, blobs_deleted_per_second)
822 return collected_bytes
824 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE)
825 def bulk_delete(self, digests: list[Digest]) -> list[str]:
826 # Delete from the index and then delete from the backing storage.
827 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
828 failed_deletes = self._bulk_delete_from_index(digests, session)
830 digests_to_delete = [x for x in digests if f"{x.hash}/{x.size_bytes}" not in failed_deletes]
831 failed_deletes.extend(self._storage.bulk_delete(digests_to_delete))
832 return failed_deletes
834 @timed(METRIC.STORAGE.SQL_INDEX.BULK_DELETE_INDEX_DURATION)
835 def _bulk_delete_from_index(self, digests: list[Digest], session: Session) -> list[str]:
836 LOGGER.info("Deleting digests from the index.", tags=dict(digest_count=len(digests)))
837 # See discussion of __table__ typing in https://github.com/sqlalchemy/sqlalchemy/issues/9130
838 index_table = cast(Table, IndexEntry.__table__)
839 hashes = [x.hash for x in digests]
841 # Make sure we don't exceed maximum size of an IN clause
842 n = self._inclause_limit
843 hash_chunks = [hashes[i : i + n] for i in range(0, len(hashes), n)]
845 # We will not raise, rollback, or log on StaleDataErrors.
846 # These errors occur when we delete fewer rows than we were expecting.
847 # This is fine, since the missing rows will get deleted eventually.
848 # When running bulk_deletes concurrently, StaleDataErrors
849 # occur too often to log.
850 num_blobs_deleted = 0
851 for chunk in hash_chunks:
852 # Do not wait for locks when deleting rows. Skip locked rows to
853 # avoid deadlocks.
854 stmt = index_table.delete().where(
855 index_table.c.digest_hash.in_(
856 select(index_table.c.digest_hash)
857 .where(index_table.c.digest_hash.in_(chunk))
858 .with_for_update(skip_locked=True)
859 )
860 )
861 num_blobs_deleted += session.execute(stmt).rowcount
862 LOGGER.info("Blobs deleted from the index.", tags=dict(deleted_count=num_blobs_deleted, digest_count=digests))
864 # bulk_delete is typically expected to return the digests that were not deleted,
865 # but delete only returns the number of rows deleted and not what was/wasn't
866 # deleted. Getting this info would require extra queries, so assume that
867 # everything was either deleted or already deleted. Failures will continue to throw
868 return []