Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql.py: 92.79%
416 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17SQLIndex
18==================
20A SQL index implementation. This can be pointed to either a remote SQL server
21or a local SQLite database.
23"""
24import io
25import itertools
26import time
27from collections import deque
28from datetime import datetime, timedelta
29from typing import IO, Any, AnyStr, Deque, Dict, Iterator, List, Optional, Sequence, Tuple, cast
31from sqlalchemy import Column, and_, delete, func, not_, select, text
32from sqlalchemy.exc import DBAPIError
33from sqlalchemy.orm import Session, load_only
34from sqlalchemy.orm.exc import StaleDataError
35from sqlalchemy.orm.query import Query
36from sqlalchemy.orm.session import Session as SessionType
37from sqlalchemy.sql.elements import BooleanClauseList
39from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
40from buildgrid._protos.google.rpc import code_pb2
41from buildgrid._protos.google.rpc.status_pb2 import Status
42from buildgrid.server.decorators import timed
43from buildgrid.server.exceptions import StorageFullError
44from buildgrid.server.logging import buildgrid_logger
45from buildgrid.server.metrics_names import METRIC
46from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric, publish_timer_metric
47from buildgrid.server.sql.models import IndexEntry
48from buildgrid.server.sql.provider import SqlProvider
49from buildgrid.server.utils.digests import validate_digest_data
51from ..storage_abc import StorageABC
52from .index_abc import IndexABC
53from .sql_dialect_delegates import PostgreSQLDelegate, SQLiteDelegate
55LOGGER = buildgrid_logger(__name__)
56DIALECT_DELEGATES = {"postgresql": PostgreSQLDelegate, "sqlite": SQLiteDelegate}
58INLINE_BLOB_SIZE_HARD_MAXIMUM = 1000000000
61def read_and_rewind(read_head: IO[AnyStr]) -> Optional[AnyStr]:
62 """Reads from an IO object and returns the data found there
63 after rewinding the object to the beginning.
65 Args:
66 read_head (IO): readable IO head
68 Returns:
69 AnyStr: readable content from `read_head`.
70 """
71 if not read_head:
72 return None
74 data = read_head.read()
75 read_head.seek(0)
76 return data
79class SQLIndex(IndexABC):
80 TYPE = "SQLIndex"
82 def __init__(
83 self,
84 sql_provider: SqlProvider,
85 storage: StorageABC,
86 *,
87 window_size: int = 1000,
88 inclause_limit: int = -1,
89 max_inline_blob_size: int = 0,
90 refresh_accesstime_older_than: int = 0,
91 **kwargs: Any,
92 ) -> None:
93 base_argnames = ["fallback_on_get"]
94 base_args = {}
95 for arg in base_argnames:
96 if arg in kwargs:
97 base_args[arg] = kwargs.pop(arg)
98 super().__init__(**base_args)
100 self._sql = sql_provider
101 self._storage = storage
103 if max_inline_blob_size > INLINE_BLOB_SIZE_HARD_MAXIMUM:
104 raise ValueError(
105 f"Max inline blob size is [{max_inline_blob_size}], "
106 f"but must be less than [{INLINE_BLOB_SIZE_HARD_MAXIMUM}]."
107 )
108 if max_inline_blob_size >= 0:
109 self._max_inline_blob_size = max_inline_blob_size
110 else:
111 raise ValueError(f"Max inline blob size is [{max_inline_blob_size}], but must be nonnegative.")
113 if refresh_accesstime_older_than >= 0:
114 # Measured in seconds. Helps reduce the frequency of timestamp updates during heavy read.
115 self.refresh_accesstime_older_than = refresh_accesstime_older_than
116 else:
117 raise ValueError(
118 f"'refresh_accesstime_older_than' must be nonnegative. It is {refresh_accesstime_older_than}"
119 )
121 # Max # of rows to fetch while iterating over all blobs
122 # (e.g. in least_recent_digests)
123 self._all_blobs_window_size = window_size
125 # This variable stores the list of whereclauses (SQLAlchemy BooleanClauseList objects)
126 # generated from the _column_windows() using time-expensive SQL query.
127 # These whereclauses are used to construct the final SQL query
128 # during cleanup in order to fetch blobs by time windows.
129 #
130 # Inside the _column_windows() a list of timestamp boarders are obtained:
131 # intervals = [t1, t2, t3, ...]
132 # Then the generated whereclauses might represent semantically as, for example,:
133 # self._queue_of_whereclauses = [
134 # "WHERE t1 <= IndexEntry.accessed_timestamp < t2",
135 # "WHERE t2 <= IndexEntry.accessed_timestamp < t3",
136 # "WHERE t3 <= IndexEntry.accessed_timestamp < t4",
137 # ... and so on]
138 # Note the number of entries in each window is determined by
139 # the instance variable "_all_blobs_window_size".
140 self._queue_of_whereclauses: Deque[BooleanClauseList[Any]] = deque()
142 # Whether entries with deleted=True should be considered by mark_n_bytes.
143 # This is useful to catch any half-finished deletes where a cleanup process
144 # may have exited half-way through deletion. Once all premarked blobs have been
145 # deleted this becomes False and is only reset after a full scan of the database
146 self._delete_premarked_blobs: bool = True
148 # Only pass known kwargs to db session
149 available_options = {"pool_size", "max_overflow", "pool_timeout", "pool_pre_ping", "pool_recycle"}
150 kwargs_keys = kwargs.keys()
151 if kwargs_keys > available_options:
152 unknown_args = kwargs_keys - available_options
153 raise TypeError(f"Unknown keyword arguments: [{unknown_args}]")
155 # Dialect-specific initialization
156 self._dialect_delegate = DIALECT_DELEGATES.get(self._sql.dialect)
158 if inclause_limit > 0:
159 if inclause_limit > window_size:
160 LOGGER.warning(
161 "Configured inclause limit is greater than window size.",
162 tags=dict(inclause_limit=inclause_limit, window_size=window_size),
163 )
164 self._inclause_limit = inclause_limit
165 else:
166 # If the inlimit isn't explicitly set, we use a default that
167 # respects both the window size and the db implementation's
168 # inlimit.
169 self._inclause_limit = min(window_size, self._sql.default_inlimit)
170 LOGGER.debug("SQL index: using default inclause limit.", tags=dict(inclause_limit=self._inclause_limit))
172 # Make a test query against the database to ensure the connection is valid
173 with self._sql.scoped_session() as session:
174 session.query(IndexEntry).first()
175 self._sql.remove_scoped_session()
177 def start(self) -> None:
178 self._storage.start()
180 def stop(self) -> None:
181 self._storage.stop()
183 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
184 def has_blob(self, digest: Digest) -> bool:
185 with self._sql.scoped_session() as session:
186 statement = select(func.count(IndexEntry.digest_hash)).where(IndexEntry.digest_hash == digest.hash)
188 num_entries = session.execute(statement).scalar()
189 if num_entries is None:
190 num_entries = 0
192 if num_entries == 1:
193 return True
194 elif num_entries < 1:
195 return False
196 else:
197 raise RuntimeError(f"Multiple results found for blob [{digest}]. The index is in a bad state.")
199 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
200 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
201 """Get a blob from the index or the backing storage. Optionally fallback and repair index"""
203 # Check the index for the blob and return if found.
204 with self._sql.scoped_session() as session:
205 if entry := session.query(IndexEntry).filter(IndexEntry.digest_hash == digest.hash).first():
206 if entry.inline_blob is not None:
207 return io.BytesIO(entry.inline_blob)
208 elif blob := self._storage.get_blob(digest):
209 # Fix any blobs that should have been inlined.
210 if digest.size_bytes <= self._max_inline_blob_size:
211 self._save_digests_to_index([(digest, read_and_rewind(blob))], session)
212 session.commit()
213 return blob
214 LOGGER.warning(
215 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest)
216 )
217 self._bulk_delete_from_index([digest], session)
219 # Check the storage for the blob and repair the index if found.
220 if self._fallback_on_get:
221 if blob := self._storage.get_blob(digest):
222 with self._sql.scoped_session() as session:
223 if digest.size_bytes <= self._max_inline_blob_size:
224 self._save_digests_to_index([(digest, read_and_rewind(blob))], session)
225 else:
226 self._save_digests_to_index([(digest, None)], session)
227 session.commit()
228 return blob
230 # Blob was not found in index or storage
231 return None
233 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
234 def delete_blob(self, digest: Digest) -> None:
235 statement = delete(IndexEntry).where(IndexEntry.digest_hash == digest.hash)
236 options = {"synchronize_session": False}
238 with self._sql.scoped_session() as session:
239 session.execute(statement, execution_options=options)
241 self._storage.delete_blob(digest)
243 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
244 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
245 inline_blob = None
246 if digest.size_bytes > self._max_inline_blob_size:
247 self._storage.commit_write(digest, write_session)
248 else:
249 write_session.seek(0)
250 inline_blob = write_session.read()
251 try:
252 with self._sql.scoped_session() as session:
253 self._save_digests_to_index([(digest, inline_blob)], session)
254 except DBAPIError as error:
255 # Error has pgcode attribute (Postgres only)
256 if hasattr(error.orig, "pgcode"):
257 # imported here to avoid global dependency on psycopg2
258 from psycopg2.errors import DiskFull, OutOfMemory
260 # 53100 == DiskFull && 53200 == OutOfMemory
261 if error.orig.pgerror in [DiskFull, OutOfMemory]:
262 raise StorageFullError(f"Postgres Error: {error.orig.pgcode}") from error
263 raise error
265 def _partitioned_hashes(self, digests: Sequence[Digest]) -> Iterator[Iterator[str]]:
266 """Given a long list of digests, split it into parts no larger than
267 _inclause_limit and yield the hashes in each part.
268 """
269 for part_start in range(0, len(digests), self._inclause_limit):
270 part_end = min(len(digests), part_start + self._inclause_limit)
271 part_digests = itertools.islice(digests, part_start, part_end)
272 yield map(lambda digest: digest.hash, part_digests)
274 def _bulk_select_digests(
275 self, digests: Sequence[Digest], fetch_blobs: bool = False, fetch_deleted: bool = True
276 ) -> Iterator[IndexEntry]:
277 """Generator that selects all rows matching a digest list.
279 SQLAlchemy Core is used for this because the ORM has problems with
280 large numbers of bind variables for WHERE IN clauses.
282 We only select on the digest hash (not hash and size) to allow for
283 index-only queries on db backends that support them.
284 """
285 index_table = IndexEntry.__table__
286 with self._sql.scoped_session() as session:
287 columns = [index_table.c.digest_hash]
288 if fetch_blobs:
289 columns.append(index_table.c.inline_blob)
290 for part in self._partitioned_hashes(digests):
291 stmt = select(columns).where(index_table.c.digest_hash.in_(part))
292 if not fetch_deleted:
293 stmt = stmt.where(not_(index_table.c.deleted))
294 entries = session.execute(stmt)
295 yield from entries # type: ignore
297 @timed(METRIC.STORAGE.SQL_INDEX.UPDATE_TIMESTAMP_DURATION)
298 def _bulk_refresh_timestamps(
299 self, digests: Sequence[Digest], session: SessionType, update_time: Optional[datetime] = None
300 ) -> None:
301 """Refresh all timestamps of the input digests.
303 SQLAlchemy Core is used for this because the ORM is not suitable for
304 bulk inserts and updates.
306 https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
307 """
308 index_table = IndexEntry.__table__
309 current_time = datetime.utcnow()
311 # If a timestamp was passed in, use it. And always refreshes (no threshold).
312 if update_time:
313 timestamp = update_time
314 last_accessed_threshold = current_time
315 # Otherwise timestamp of digest will not refresh if it was last accessed more recent than this threshold.
316 else:
317 timestamp = current_time
318 last_accessed_threshold = current_time - timedelta(seconds=self.refresh_accesstime_older_than)
320 for part in self._partitioned_hashes(digests):
321 # Generate the SQL Statement:
322 # UPDATE index SET accessed_timestamp=<timestamp>
323 # WHERE index.digest_hash IN
324 # (SELECT index.digest_hash FROM index
325 # WHERE index.digest_hash IN <part> AND WHERE index.accessed_timestamp < <last_accessed_threshold>
326 # FOR UPDATE SKIP LOCKED)
327 stmt = (
328 index_table.update()
329 .where(
330 index_table.c.digest_hash.in_(
331 select(index_table.c.digest_hash)
332 .where(index_table.c.digest_hash.in_(part))
333 .where(index_table.c.accessed_timestamp < last_accessed_threshold)
334 .with_for_update(skip_locked=True)
335 )
336 )
337 .values(accessed_timestamp=timestamp)
338 )
339 session.execute(stmt)
340 session.commit()
342 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
343 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
344 # Blobs marked as deleted are considered as missing
345 entries = self._bulk_select_digests(digests, fetch_deleted=False)
347 found_hashes = {entry.digest_hash for entry in entries}
349 # Split the digests into two found/missing lists
350 found_digests, missing_digests = [], []
351 for digest in digests:
352 if digest.hash in found_hashes:
353 found_digests.append(digest)
354 else:
355 missing_digests.append(digest)
357 # Update all timestamps for blobs which were found
358 with self._sql.scoped_session() as session:
359 self._bulk_refresh_timestamps(found_digests, session)
361 return missing_digests
363 @timed(METRIC.STORAGE.SQL_INDEX.SAVE_DIGESTS_DURATION)
364 def _save_digests_to_index(
365 self, digest_blob_pairs: List[Tuple[Digest, Optional[bytes]]], session: SessionType
366 ) -> None:
367 """Helper to persist a list of digest/blob pairs to the index.
369 Any digests present are updated, and new digests are inserted along with their inline blobs (if provided).
370 Only blobs with size less than or equal to the max_inline_blob_size are inserted directly into the index.
371 """
372 if not digest_blob_pairs:
373 return
375 digest_blob_pairs = sorted(digest_blob_pairs, key=lambda pair: (pair[0].hash, pair[0].size_bytes))
377 if self._dialect_delegate:
378 try:
379 self._dialect_delegate._save_digests_to_index( # type: ignore
380 digest_blob_pairs, session, self._max_inline_blob_size
381 )
382 return
383 except AttributeError:
384 pass
386 update_time = datetime.utcnow()
387 # Figure out which digests we can just update
388 digests = [digest for (digest, blob) in digest_blob_pairs]
389 entries = self._bulk_select_digests(digests)
390 # Map digests to new entries
391 entries_not_present = {
392 digest.hash: {
393 "digest_hash": digest.hash,
394 "digest_size_bytes": digest.size_bytes,
395 "accessed_timestamp": update_time,
396 "inline_blob": (blob if digest.size_bytes <= self._max_inline_blob_size else None),
397 "deleted": False,
398 }
399 for (digest, blob) in digest_blob_pairs
400 }
402 entries_present = {}
403 for entry in entries:
404 entries_present[entry.digest_hash] = entries_not_present[entry.digest_hash]
405 del entries_not_present[entry.digest_hash]
407 if entries_not_present:
408 session.bulk_insert_mappings(IndexEntry, entries_not_present.values()) # type: ignore
409 if entries_present:
410 session.bulk_update_mappings(IndexEntry, entries_present.values()) # type: ignore
412 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
413 def bulk_update_blobs( # pylint: disable=arguments-renamed
414 self, digest_blob_pairs: List[Tuple[Digest, bytes]]
415 ) -> List[Status]:
416 """Implement the StorageABC's bulk_update_blobs method.
418 The StorageABC interface takes in a list of digest/blob pairs and
419 returns a list of results. The list of results MUST be ordered to
420 correspond with the order of the input list."""
421 pairs_to_store = []
422 result_map = {}
424 # For each blob, determine whether to store it in the backing storage or inline it
425 for digest, blob in digest_blob_pairs:
426 if validate_digest_data(digest, blob):
427 if digest.size_bytes > self._max_inline_blob_size:
428 pairs_to_store.append((digest, blob))
429 else:
430 result_map[digest.hash] = Status(code=code_pb2.OK)
431 else:
432 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash")
433 missing_blobs = self.missing_blobs([digest for digest, _ in pairs_to_store])
434 missing_blob_pairs = []
435 for digest, blob in pairs_to_store:
436 if digest not in missing_blobs:
437 result_map[digest.hash] = Status(code=code_pb2.OK)
438 else:
439 missing_blob_pairs.append((digest, blob))
441 backup_results = self._storage.bulk_update_blobs(missing_blob_pairs)
443 for digest, result in zip(missing_blobs, backup_results):
444 if digest.hash in result_map:
445 # ERROR: blob was both inlined and backed up
446 raise RuntimeError("Blob was both inlined and backed up.")
447 result_map[digest.hash] = result
449 # Generate the final list of results
450 pairs_to_inline: List[Tuple[Digest, Optional[bytes]]] = []
451 results = []
452 for digest, blob in digest_blob_pairs:
453 status = result_map.get(
454 digest.hash,
455 Status(code=code_pb2.UNKNOWN, message="SQL Index: unable to determine the status of this blob"),
456 )
457 results.append(status)
458 if status.code == code_pb2.OK:
459 pairs_to_inline.append((digest, blob))
461 with self._sql.scoped_session() as session:
462 self._save_digests_to_index(pairs_to_inline, session)
464 return results
466 def _bulk_read_blobs_with_fallback(self, digests: List[Digest]) -> Dict[str, bytes]:
467 hash_to_digest: Dict[str, Digest] = {digest.hash: digest for digest in digests}
468 results: Dict[str, bytes] = {}
470 expected_storage_digests: List[Digest] = []
471 # Fetch inlined blobs directly from the index
472 entries = self._bulk_select_digests(digests, fetch_blobs=True)
473 for e in entries:
474 blob, digest_hash, digest = e.inline_blob, e.digest_hash, hash_to_digest[e.digest_hash]
475 if blob is not None:
476 results[digest_hash] = blob
477 hash_to_digest.pop(digest_hash)
478 else:
479 # If a blob is not inlined then the blob is expected to be in storage
480 expected_storage_digests.append(digest)
482 # Fetch everything that wasn't inlined from the backing storage
483 fetched_digests = self._storage.bulk_read_blobs(list(hash_to_digest.values()))
485 # Save everything fetched from storage, inlining the blobs if they're small enough
486 digest_pairs_to_save: List[Tuple[Digest, Optional[bytes]]] = []
487 for digest_hash, blob_data in fetched_digests.items():
488 if blob_data is not None:
489 digest = hash_to_digest[digest_hash]
490 if digest.size_bytes <= self._max_inline_blob_size:
491 digest_pairs_to_save.append((digest, blob_data))
492 else:
493 digest_pairs_to_save.append((digest, None))
494 results[digest_hash] = blob_data
496 # List of digests found in storage
497 acutal_storage_digest_hashes = set(
498 digest_hash for (digest_hash, blob_data) in fetched_digests.items() if blob_data is not None
499 )
500 # Get a list of all the digests that were in the index but not found in storage
501 digests_expected_not_in_storage: List[Digest] = []
502 for expected_digest in expected_storage_digests:
503 if expected_digest.hash not in acutal_storage_digest_hashes:
504 LOGGER.warning(
505 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest)
506 )
507 digests_expected_not_in_storage.append(expected_digest)
509 with self._sql.scoped_session() as session:
510 self._save_digests_to_index(digest_pairs_to_save, session)
511 if digests_expected_not_in_storage:
512 self._bulk_delete_from_index(digests_expected_not_in_storage, session)
513 session.commit()
515 return results
517 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
518 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
519 if self._fallback_on_get:
520 return self._bulk_read_blobs_with_fallback(digests)
522 # If fallback is disabled, query the index first and only
523 # query the storage for blobs that weren't inlined there
525 hash_to_digest = {digest.hash: digest for digest in digests} # hash -> digest map
526 results: Dict[str, bytes] = {} # The final list of results (return value)
527 digests_to_fetch: List[Digest] = [] # Digests that need to be fetched from storage
528 digest_pairs_to_save: List[Tuple[Digest, Optional[bytes]]] = [] # Digests that need to be updated in the index
530 # Fetch all of the digests in the database
531 # Anything that wasn't already inlined needs to be fetched
532 entries = self._bulk_select_digests(digests, fetch_blobs=True)
533 for index_entry in entries:
534 digest = hash_to_digest[index_entry.digest_hash]
535 if index_entry.inline_blob is not None:
536 results[index_entry.digest_hash] = index_entry.inline_blob
537 else:
538 digests_to_fetch.append(digest)
540 # Caution: digest whose blob cannot be found from storage will be dropped.
541 if digests_to_fetch:
542 fetched_digests = self._storage.bulk_read_blobs(digests_to_fetch)
543 else:
544 fetched_digests = {}
546 # Generate the list of inputs for _save_digests_to_index
547 #
548 # We only need to send blob data for small blobs fetched
549 # from the storage since everything else is either too
550 # big or already inlined
551 for digest in digests_to_fetch:
552 if blob_data := fetched_digests.get(digest.hash):
553 if digest.size_bytes <= self._max_inline_blob_size:
554 digest_pairs_to_save.append((digest, blob_data))
556 acutal_storage_digests = set(digest_hash for (digest_hash, _) in fetched_digests.items())
557 # Get a list of all the digests that were in the index but not found in storage
558 digests_expected_not_in_storage: List[Digest] = []
559 for expected_digest in digests_to_fetch:
560 if expected_digest.hash not in acutal_storage_digests:
561 LOGGER.warning(
562 "Blob was indexed but not in storage. Deleting from the index.", tags=dict(digest=digest)
563 )
564 digests_expected_not_in_storage.append(expected_digest)
566 # Update any blobs which need to be inlined
567 with self._sql.scoped_session() as session:
568 self._save_digests_to_index(digest_pairs_to_save, session)
569 if digests_expected_not_in_storage:
570 self._bulk_delete_from_index(digests_expected_not_in_storage, session)
571 session.commit()
573 results.update(fetched_digests)
574 return results
576 def _column_windows(self, session: SessionType, column: Column[Any]) -> Iterator[BooleanClauseList[Any]]:
577 """Adapted from the sqlalchemy WindowedRangeQuery recipe.
578 https://github.com/sqlalchemy/sqlalchemy/wiki/WindowedRangeQuery
580 This method breaks the timestamp range into windows and yields
581 the borders of these windows to the callee. For example, the borders
582 yielded by this might look something like
583 ('2019-10-08 18:25:03.699863', '2019-10-08 18:25:03.751018')
584 ('2019-10-08 18:25:03.751018', '2019-10-08 18:25:03.807867')
585 ('2019-10-08 18:25:03.807867', '2019-10-08 18:25:03.862192')
586 ('2019-10-08 18:25:03.862192',)
588 _windowed_lru_digests uses these borders to form WHERE clauses for its
589 SELECTs. In doing so, we make sure to repeatedly query the database for
590 live updates, striking a balance between loading the entire resultset
591 into memory and querying each row individually, both of which are
592 inefficient in the context of a large index.
594 The window size is a parameter and can be configured. A larger window
595 size will yield better performance (fewer SQL queries) at the cost of
596 memory (holding on to the results of the query) and accuracy (blobs
597 may get updated while you're working on them), and vice versa for a
598 smaller window size.
599 """
601 def int_for_range(start_id: Any, end_id: Any) -> BooleanClauseList[Any]:
602 if end_id:
603 return and_(column >= start_id, column < end_id)
604 else:
605 return column >= start_id # type: ignore[no-any-return]
607 # Constructs a query that:
608 # 1. Gets all the timestamps in sorted order.
609 # 2. Assign a row number to each entry.
610 # 3. Only keep timestamps that are every other N row number apart. N="_all_blobs_window_size".
611 # SELECT
612 # anon_1.index_accessed_timestamp AS anon_1_index_accessed_timestamp
613 # FROM (
614 # SELECT
615 # index.accessed_timestamp AS index_accessed_timestamp,
616 # row_number() OVER (ORDER BY index.accessed_timestamp) AS rownum
617 # FROM index
618 # )
619 # AS anon_1
620 # WHERE rownum % 1000=1
621 #
622 # Note:
623 # - This query can be slow due to checking each entry with "WHERE rownum % 1000=1".
624 # - These timestamps will be the basis for constructing the SQL "WHERE" clauses later.
625 q = session.query(column, func.row_number().over(order_by=column).label("rownum")).from_self(column)
626 if self._all_blobs_window_size > 1:
627 q = q.filter(text(f"rownum % {self._all_blobs_window_size}=1"))
629 # Execute the underlying query against the database.
630 # Ex: intervals = [t1, t1001, t2001, ...], q = [(t1, ), (t1001, ), (t2001, ), ...]
631 intervals = [id for id, in q]
633 # Generate the whereclauses
634 while intervals:
635 start = intervals.pop(0)
636 if intervals:
637 end = intervals[0]
638 else:
639 end = None
640 # Ex: yield "WHERE IndexEntry.accessed_timestamp >= start AND IndexEntry.accessed_timestamp < end"
641 yield int_for_range(start, end)
643 def _windowed_lru_digests(self, q: "Query[Any]", column: Column[Any]) -> Iterator[Tuple[IndexEntry, bool]]:
644 """Generate a query for each window produced by _column_windows
645 and yield the results one by one.
646 """
647 # Determine whether the conditions are met to make an SQL call to get new windows.
648 msg = "Using stored LRU windows"
649 if len(self._queue_of_whereclauses) == 0:
650 msg = "Requesting new LRU windows."
651 self._queue_of_whereclauses = deque(self._column_windows(q.session, column)) # type: ignore
652 self._delete_premarked_blobs = True
654 msg += f" Number of windows remaining: {len(self._queue_of_whereclauses)}"
655 LOGGER.debug(msg)
657 while self._queue_of_whereclauses:
658 whereclause = self._queue_of_whereclauses[0]
659 window = q.filter(whereclause).order_by(column.asc())
660 yield from window
662 # If yield from window doesn't get to this point that means
663 # the cleanup hasn't consumed all the content in a whereclause and exited.
664 # Otherwise, the whereclause is exhausted and can be discarded.
665 self._queue_of_whereclauses.popleft()
667 def least_recent_digests(self) -> Iterator[Digest]:
668 with self._sql.scoped_session() as session:
669 q = session.query(IndexEntry)
670 for entry in self._windowed_lru_digests(q, IndexEntry.accessed_timestamp):
671 # TODO make this generic or delete this method only used by tests.
672 index_entry = cast(IndexEntry, entry)
673 assert isinstance(index_entry.digest_hash, str)
674 assert isinstance(index_entry.digest_size_bytes, int)
675 yield Digest(hash=index_entry.digest_hash, size_bytes=index_entry.digest_size_bytes)
677 @timed(METRIC.STORAGE.SQL_INDEX.SIZE_CALCULATION_DURATION)
678 def get_total_size(self) -> int:
679 statement = select(func.sum(IndexEntry.digest_size_bytes))
680 with self._sql.scoped_session() as session:
681 result = session.execute(statement).scalar()
682 if result is None:
683 result = 0
684 return result
686 @timed(METRIC.STORAGE.SQL_INDEX.DELETE_N_BYTES_DURATION)
687 def delete_n_bytes(
688 self, n_bytes: int, dry_run: bool = False, protect_blobs_after: Optional[datetime] = None
689 ) -> int:
690 """
691 When using a SQL Index, entries with a delete marker are "in the process of being deleted".
692 This is required because storage operations can't be safely tied to the SQL index transaction
693 (one may fail independently of the other, and you end up inconsistent).
695 The workflow is roughly as follows:
696 - Start a SQL transaction.
697 - Lock and mark the indexed items you want to delete.
698 - Close the SQL transaction.
699 - Perform the storage deletes
700 - Start a SQL transaction.
701 - Actually delete the index entries.
702 - Close the SQL transaction.
704 This means anything with deleted=False will always be present in the backing store. If it is marked
705 deleted=True, and the process gets killed when deleting from the backing storage, only
706 some of the items might actually be gone.
708 The next time the cleaner starts up, it can try to do that delete again (ignoring 404s).
709 Eventually that will succeed and the item will actually be removed from the DB. Only during
710 the first run of batches do we consider already marked items. This avoids multiple cleanup
711 daemons from competing with each other on every batch.
712 """
713 if protect_blobs_after is None:
714 protect_blobs_after = datetime.utcnow()
716 # Used for metric publishing
717 delete_start_time = time.time()
719 storage_digests: List[Digest] = []
720 marked_digests: List[Digest] = []
721 collected_bytes = 0
723 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
724 base_query = session.query(IndexEntry, IndexEntry.inline_blob != None).options( # noqa
725 load_only("digest_hash", "digest_size_bytes")
726 )
728 if self._delete_premarked_blobs:
729 LOGGER.info("Starting to gather pre-marked records.")
730 premarked_query = base_query.filter_by(deleted=True)
731 for [entry, is_inline] in premarked_query.all():
732 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes)
733 marked_digests.append(digest)
734 if not is_inline:
735 storage_digests.append(digest)
736 collected_bytes += entry.digest_size_bytes
738 if not dry_run:
739 publish_counter_metric(METRIC.STORAGE.SQL_INDEX.PREMARKED_DELETED_COUNT, len(marked_digests))
740 LOGGER.info(
741 "Gathered pre-marked bytes.", tags=dict(collected_bytes=collected_bytes, max_bytes=n_bytes)
742 )
743 self._delete_premarked_blobs = False
745 if collected_bytes < n_bytes:
746 LOGGER.info("Searching for records to mark deleted.")
747 unmarked_query = (
748 base_query.filter_by(deleted=False)
749 .filter(IndexEntry.accessed_timestamp < protect_blobs_after)
750 .with_for_update(skip_locked=True)
751 )
752 window = self._windowed_lru_digests(unmarked_query, IndexEntry.accessed_timestamp)
753 mark_deleted_start = time.perf_counter()
754 for [entry, is_inline] in window:
755 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes)
756 marked_digests.append(digest)
757 if not is_inline:
758 storage_digests.append(digest)
759 collected_bytes += entry.digest_size_bytes
760 if not dry_run:
761 entry.deleted = True
762 if collected_bytes >= n_bytes:
763 break
764 mark_deleted_duration = timedelta(seconds=time.perf_counter() - mark_deleted_start)
765 if not dry_run:
766 publish_timer_metric(METRIC.STORAGE.SQL_INDEX.MARK_DELETED_DURATION, mark_deleted_duration)
767 LOGGER.info("Gathered bytes.", tags=dict(collected_bytes=collected_bytes, max_bytes=n_bytes))
769 if dry_run:
770 return collected_bytes
772 failed_deletes = self._storage.bulk_delete(storage_digests)
773 digests_to_delete = [x for x in marked_digests if f"{x.hash}/{x.size_bytes}" not in failed_deletes]
775 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
776 failed_deletes.extend(self._bulk_delete_from_index(digests_to_delete, session))
777 for digest in digests_to_delete:
778 if digest in failed_deletes:
779 collected_bytes -= digest.size_bytes
781 batch_duration = time.time() - delete_start_time
782 blobs_deleted_per_second = (len(digests_to_delete) - len(failed_deletes)) / batch_duration
783 publish_gauge_metric(METRIC.CLEANUP.BLOBS_DELETED_PER_SECOND, blobs_deleted_per_second)
784 return collected_bytes
786 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE)
787 def bulk_delete(self, digests: List[Digest]) -> List[str]:
788 # Delete from the index and then delete from the backing storage.
789 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
790 failed_deletes = self._bulk_delete_from_index(digests, session)
792 digests_to_delete = [x for x in digests if f"{x.hash}/{x.size_bytes}" not in failed_deletes]
793 failed_deletes.extend(self._storage.bulk_delete(digests_to_delete))
794 return failed_deletes
796 @timed(METRIC.STORAGE.SQL_INDEX.BULK_DELETE_INDEX_DURATION)
797 def _bulk_delete_from_index(self, digests: List[Digest], session: Session) -> List[str]:
798 LOGGER.info("Deleting digests from the index.", tags=dict(digest_count=len(digests)))
799 index_table = IndexEntry.__table__
800 hashes = [x.hash for x in digests]
802 # Make sure we don't exceed maximum size of an IN clause
803 n = self._inclause_limit
804 hash_chunks = [hashes[i : i + n] for i in range(0, len(hashes), n)]
806 # We will not raise, rollback, or log on StaleDataErrors.
807 # These errors occur when we delete fewer rows than we were expecting.
808 # This is fine, since the missing rows will get deleted eventually.
809 # When running bulk_deletes concurrently, StaleDataErrors
810 # occur too often to log.
811 num_blobs_deleted = 0
812 for chunk in hash_chunks:
813 # Do not wait for locks when deleting rows. Skip locked rows to
814 # avoid deadlocks.
815 stmt = index_table.delete().where(
816 index_table.c.digest_hash.in_(
817 select([index_table.c.digest_hash], index_table.c.digest_hash.in_(chunk)).with_for_update(
818 skip_locked=True
819 )
820 )
821 )
822 num_blobs_deleted += session.execute(stmt).rowcount # type: ignore
823 LOGGER.info("Blobs deleted from the index.", tags=dict(deleted_count=num_blobs_deleted, digest_count=digests))
825 # bulk_delete is typically expected to return the digests that were not deleted,
826 # but delete only returns the number of rows deleted and not what was/wasn't
827 # deleted. Getting this info would require extra queries, so assume that
828 # everything was either deleted or already deleted. Failures will continue to throw
829 return []