Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql.py: 92.87%
407 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17SQLIndex
18==================
20A SQL index implementation. This can be pointed to either a remote SQL server
21or a local SQLite database.
23"""
24import io
25import itertools
26import logging
27import time
28from collections import deque
29from datetime import datetime, timedelta
30from typing import IO, Any, Deque, Dict, Iterator, List, Optional, Sequence, Tuple, cast
32from sqlalchemy import Column, and_, delete, func, select, text
33from sqlalchemy.exc import DBAPIError
34from sqlalchemy.orm import Session, load_only
35from sqlalchemy.orm.exc import StaleDataError
36from sqlalchemy.orm.query import Query
37from sqlalchemy.orm.session import Session as SessionType
38from sqlalchemy.sql.elements import BooleanClauseList
40from buildgrid._exceptions import StorageFullError
41from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
42from buildgrid._protos.google.rpc import code_pb2
43from buildgrid._protos.google.rpc.status_pb2 import Status
44from buildgrid.server.metrics_names import (
45 CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME,
46 CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME,
47 CAS_INDEX_GET_BLOB_TIME_METRIC_NAME,
48 CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME,
49 CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME,
50 CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME,
51 CLEANUP_INDEX_BULK_DELETE_METRIC_NAME,
52 CLEANUP_INDEX_MARK_DELETED_METRIC_NAME,
53 CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME,
54 CLEANUP_STORAGE_BULK_DELETE_METRIC_NAME,
55 CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME,
56)
57from buildgrid.server.metrics_utils import (
58 DurationMetric,
59 generator_method_duration_metric,
60 publish_counter_metric,
61 publish_gauge_metric,
62 publish_timer_metric,
63)
64from buildgrid.server.persistence.sql.models import IndexEntry
65from buildgrid.server.sql.provider import SqlProvider
66from buildgrid.utils import read_and_rewind, validate_digest_data
68from ..storage_abc import StorageABC
69from .index_abc import IndexABC
70from .sql_dialect_delegates import PostgreSQLDelegate, SQLiteDelegate
72LOGGER = logging.getLogger(__name__)
73DIALECT_DELEGATES = {"postgresql": PostgreSQLDelegate, "sqlite": SQLiteDelegate}
75INLINE_BLOB_SIZE_HARD_MAXIMUM = 1000000000
78class SQLIndex(IndexABC):
79 def __init__(
80 self,
81 sql_provider: SqlProvider,
82 storage: StorageABC,
83 *,
84 window_size: int = 1000,
85 inclause_limit: int = -1,
86 max_inline_blob_size: int = 0,
87 refresh_accesstime_older_than: int = 0,
88 **kwargs: Any,
89 ) -> None:
90 base_argnames = ["fallback_on_get"]
91 base_args = {}
92 for arg in base_argnames:
93 if arg in kwargs:
94 base_args[arg] = kwargs.pop(arg)
95 super().__init__(**base_args)
97 self._sql = sql_provider
98 self._storage = storage
99 self._instance_name = None
101 if max_inline_blob_size > INLINE_BLOB_SIZE_HARD_MAXIMUM:
102 raise ValueError(
103 f"Max inline blob size is [{max_inline_blob_size}], "
104 f"but must be less than [{INLINE_BLOB_SIZE_HARD_MAXIMUM}]."
105 )
106 if max_inline_blob_size >= 0:
107 self._max_inline_blob_size = max_inline_blob_size
108 else:
109 raise ValueError(f"Max inline blob size is [{max_inline_blob_size}], but must be nonnegative.")
111 if refresh_accesstime_older_than >= 0:
112 # Measured in seconds. Helps reduce the frequency of timestamp updates during heavy read.
113 self.refresh_accesstime_older_than = refresh_accesstime_older_than
114 else:
115 raise ValueError(
116 f"'refresh_accesstime_older_than' must be nonnegative. It is {refresh_accesstime_older_than}"
117 )
119 # Max # of rows to fetch while iterating over all blobs
120 # (e.g. in least_recent_digests)
121 self._all_blobs_window_size = window_size
123 # This variable stores the list of whereclauses (SQLAlchemy BooleanClauseList objects)
124 # generated from the _column_windows() using time-expensive SQL query.
125 # These whereclauses are used to construct the final SQL query
126 # during cleanup in order to fetch blobs by time windows.
127 #
128 # Inside the _column_windows() a list of timestamp boarders are obtained:
129 # intervals = [t1, t2, t3, ...]
130 # Then the generated whereclauses might represent semantically as, for example,:
131 # self._queue_of_whereclauses = [
132 # "WHERE t1 <= IndexEntry.accessed_timestamp < t2",
133 # "WHERE t2 <= IndexEntry.accessed_timestamp < t3",
134 # "WHERE t3 <= IndexEntry.accessed_timestamp < t4",
135 # ... and so on]
136 # Note the number of entries in each window is determined by
137 # the instance variable "_all_blobs_window_size".
138 self._queue_of_whereclauses: Deque[BooleanClauseList[Any]] = deque()
140 # Whether entries with deleted=True should be considered by mark_n_bytes.
141 # This is useful to catch any half-finished deletes where a cleanup process
142 # may have exited half-way through deletion. Once all premarked blobs have been
143 # deleted this becomes False and is only reset after a full scan of the database
144 self._delete_premarked_blobs: bool = True
146 # Only pass known kwargs to db session
147 available_options = {"pool_size", "max_overflow", "pool_timeout", "pool_pre_ping", "pool_recycle"}
148 kwargs_keys = kwargs.keys()
149 if kwargs_keys > available_options:
150 unknown_args = kwargs_keys - available_options
151 raise TypeError(f"Unknown keyword arguments: [{unknown_args}]")
153 # Dialect-specific initialization
154 self._dialect_delegate = DIALECT_DELEGATES.get(self._sql.dialect)
156 if inclause_limit > 0:
157 if inclause_limit > window_size:
158 LOGGER.warning(
159 f"Configured inclause_limit [{inclause_limit}] " f"is greater than window_size [{window_size}]"
160 )
161 self._inclause_limit = inclause_limit
162 else:
163 # If the inlimit isn't explicitly set, we use a default that
164 # respects both the window size and the db implementation's
165 # inlimit.
166 self._inclause_limit = min(window_size, self._sql.default_inlimit)
167 LOGGER.debug("SQL index: using default inclause limit " f"of {self._inclause_limit}")
169 # Make a test query against the database to ensure the connection is valid
170 with self._sql.scoped_session() as session:
171 session.query(IndexEntry).first()
172 self._sql.remove_scoped_session()
174 def start(self) -> None:
175 self._storage.start()
177 def stop(self) -> None:
178 self._storage.stop()
180 def has_blob(self, digest: Digest) -> bool:
181 with self._sql.scoped_session() as session:
182 statement = select(func.count(IndexEntry.digest_hash)).where(IndexEntry.digest_hash == digest.hash)
184 num_entries = session.execute(statement).scalar()
185 if num_entries is None:
186 num_entries = 0
188 if num_entries == 1:
189 return True
190 elif num_entries < 1:
191 return False
192 else:
193 raise RuntimeError(f"Multiple results found for blob [{digest}]. The index is in a bad state.")
195 @DurationMetric(CAS_INDEX_GET_BLOB_TIME_METRIC_NAME, instanced=True)
196 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
197 """Get a blob from the index or the backing storage. Optionally fallback and repair index"""
199 # Check the index for the blob and return if found.
200 with self._sql.scoped_session() as session:
201 if entry := session.query(IndexEntry).filter(IndexEntry.digest_hash == digest.hash).first():
202 if entry.inline_blob is not None:
203 return io.BytesIO(entry.inline_blob)
204 elif blob := self._storage.get_blob(digest):
205 # Fix any blobs that should have been inlined.
206 if digest.size_bytes <= self._max_inline_blob_size:
207 self._save_digests_to_index([(digest, read_and_rewind(blob))], session)
208 session.commit()
209 return blob
210 LOGGER.warning(f"Blob [{digest.hash}] was indexed but not in storage. Deleting from the index")
211 self._bulk_delete_from_index([digest], session)
213 # Check the storage for the blob and repair the index if found.
214 if self._fallback_on_get:
215 if blob := self._storage.get_blob(digest):
216 with self._sql.scoped_session() as session:
217 if digest.size_bytes <= self._max_inline_blob_size:
218 self._save_digests_to_index([(digest, read_and_rewind(blob))], session)
219 else:
220 self._save_digests_to_index([(digest, None)], session)
221 session.commit()
222 return blob
224 # Blob was not found in index or storage
225 return None
227 def delete_blob(self, digest: Digest) -> None:
228 statement = delete(IndexEntry).where(IndexEntry.digest_hash == digest.hash)
229 options = {"synchronize_session": False}
231 with self._sql.scoped_session() as session:
232 session.execute(statement, execution_options=options)
234 self._storage.delete_blob(digest)
236 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
237 inline_blob = None
238 if digest.size_bytes > self._max_inline_blob_size:
239 self._storage.commit_write(digest, write_session)
240 else:
241 write_session.seek(0)
242 inline_blob = write_session.read()
243 try:
244 with self._sql.scoped_session() as session:
245 self._save_digests_to_index([(digest, inline_blob)], session)
246 except DBAPIError as error:
247 # Error has pgcode attribute (Postgres only)
248 if hasattr(error.orig, "pgcode"):
249 # imported here to avoid global dependency on psycopg2
250 from psycopg2.errors import DiskFull, OutOfMemory
252 # 53100 == DiskFull && 53200 == OutOfMemory
253 if error.orig.pgerror in [DiskFull, OutOfMemory]:
254 raise StorageFullError(f"Postgres Error: {error.orig.pgcode}") from error
255 raise error
257 def _partitioned_hashes(self, digests: Sequence[Digest]) -> Iterator[Iterator[str]]:
258 """Given a long list of digests, split it into parts no larger than
259 _inclause_limit and yield the hashes in each part.
260 """
261 for part_start in range(0, len(digests), self._inclause_limit):
262 part_end = min(len(digests), part_start + self._inclause_limit)
263 part_digests = itertools.islice(digests, part_start, part_end)
264 yield map(lambda digest: digest.hash, part_digests)
266 @generator_method_duration_metric(CAS_INDEX_BULK_SELECT_DIGEST_TIME_METRIC_NAME)
267 def _bulk_select_digests(self, digests: Sequence[Digest], fetch_blobs: bool = False) -> Iterator[IndexEntry]:
268 """Generator that selects all rows matching a digest list.
270 SQLAlchemy Core is used for this because the ORM has problems with
271 large numbers of bind variables for WHERE IN clauses.
273 We only select on the digest hash (not hash and size) to allow for
274 index-only queries on db backends that support them.
275 """
276 index_table = IndexEntry.__table__
277 with self._sql.scoped_session() as session:
278 columns = [index_table.c.digest_hash]
279 if fetch_blobs:
280 columns.append(index_table.c.inline_blob)
281 for part in self._partitioned_hashes(digests):
282 stmt = select(columns).where(index_table.c.digest_hash.in_(part))
283 entries = session.execute(stmt)
284 yield from entries # type: ignore
286 @DurationMetric(CAS_INDEX_BULK_TIMESTAMP_UPDATE_TIME_METRIC_NAME, instanced=True)
287 def _bulk_refresh_timestamps(
288 self, digests: Sequence[Digest], session: SessionType, update_time: Optional[datetime] = None
289 ) -> None:
290 """Refresh all timestamps of the input digests.
292 SQLAlchemy Core is used for this because the ORM is not suitable for
293 bulk inserts and updates.
295 https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
296 """
297 index_table = IndexEntry.__table__
298 current_time = datetime.utcnow()
300 # If a timestamp was passed in, use it. And always refreshes (no threshold).
301 if update_time:
302 timestamp = update_time
303 last_accessed_threshold = current_time
304 # Otherwise timestamp of digest will not refresh if it was last accessed more recent than this threshold.
305 else:
306 timestamp = current_time
307 last_accessed_threshold = current_time - timedelta(seconds=self.refresh_accesstime_older_than)
309 for part in self._partitioned_hashes(digests):
310 # Generate the SQL Statement:
311 # UPDATE index SET accessed_timestamp=<timestamp>
312 # WHERE index.digest_hash IN
313 # (SELECT index.digest_hash FROM index
314 # WHERE index.digest_hash IN <part> AND WHERE index.accessed_timestamp < <last_accessed_threshold>
315 # FOR UPDATE SKIP LOCKED)
316 stmt = (
317 index_table.update()
318 .where(
319 index_table.c.digest_hash.in_(
320 select(index_table.c.digest_hash)
321 .where(index_table.c.digest_hash.in_(part))
322 .where(index_table.c.accessed_timestamp < last_accessed_threshold)
323 .with_for_update(skip_locked=True)
324 )
325 )
326 .values(accessed_timestamp=timestamp)
327 )
328 session.execute(stmt)
329 session.commit()
331 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
332 entries = self._bulk_select_digests(digests)
334 found_hashes = {entry.digest_hash for entry in entries}
336 # Split the digests into two found/missing lists
337 found_digests, missing_digests = [], []
338 for digest in digests:
339 if digest.hash in found_hashes:
340 found_digests.append(digest)
341 else:
342 missing_digests.append(digest)
344 # Update all timestamps for blobs which were found
345 with self._sql.scoped_session() as session:
346 self._bulk_refresh_timestamps(found_digests, session)
348 return missing_digests
350 @DurationMetric(CAS_INDEX_SAVE_DIGESTS_TIME_METRIC_NAME, instanced=True)
351 def _save_digests_to_index(
352 self, digest_blob_pairs: List[Tuple[Digest, Optional[bytes]]], session: SessionType
353 ) -> None:
354 """Helper to persist a list of digest/blob pairs to the index.
356 Any digests present are updated, and new digests are inserted along with their inline blobs (if provided).
357 Only blobs with size less than or equal to the max_inline_blob_size are inserted directly into the index.
358 """
359 if not digest_blob_pairs:
360 return
362 digest_blob_pairs = sorted(digest_blob_pairs, key=lambda pair: (pair[0].hash, pair[0].size_bytes))
364 if self._dialect_delegate:
365 try:
366 self._dialect_delegate._save_digests_to_index( # type: ignore
367 digest_blob_pairs, session, self._max_inline_blob_size
368 )
369 return
370 except AttributeError:
371 pass
373 update_time = datetime.utcnow()
374 # Figure out which digests we can just update
375 digests = [digest for (digest, blob) in digest_blob_pairs]
376 entries = self._bulk_select_digests(digests)
377 # Map digests to new entries
378 entries_not_present = {
379 digest.hash: {
380 "digest_hash": digest.hash,
381 "digest_size_bytes": digest.size_bytes,
382 "accessed_timestamp": update_time,
383 "inline_blob": (blob if digest.size_bytes <= self._max_inline_blob_size else None),
384 }
385 for (digest, blob) in digest_blob_pairs
386 }
388 entries_present = {}
389 for entry in entries:
390 entries_present[entry.digest_hash] = entries_not_present[entry.digest_hash]
391 del entries_not_present[entry.digest_hash]
393 if entries_not_present:
394 session.bulk_insert_mappings(IndexEntry, entries_not_present.values()) # type: ignore
395 if entries_present:
396 session.bulk_update_mappings(IndexEntry, entries_present.values()) # type: ignore
398 def bulk_update_blobs( # pylint: disable=arguments-renamed
399 self, digest_blob_pairs: List[Tuple[Digest, bytes]]
400 ) -> List[Status]:
401 """Implement the StorageABC's bulk_update_blobs method.
403 The StorageABC interface takes in a list of digest/blob pairs and
404 returns a list of results. The list of results MUST be ordered to
405 correspond with the order of the input list."""
406 pairs_to_store = []
407 result_map = {}
409 # For each blob, determine whether to store it in the backing storage or inline it
410 for digest, blob in digest_blob_pairs:
411 if validate_digest_data(digest, blob):
412 if digest.size_bytes > self._max_inline_blob_size:
413 pairs_to_store.append((digest, blob))
414 else:
415 result_map[digest.hash] = Status(code=code_pb2.OK)
416 else:
417 result_map[digest.hash] = Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash")
418 missing_blobs = self.missing_blobs([digest for digest, _ in pairs_to_store])
419 missing_blob_pairs = []
420 for digest, blob in pairs_to_store:
421 if digest not in missing_blobs:
422 result_map[digest.hash] = Status(code=code_pb2.OK)
423 else:
424 missing_blob_pairs.append((digest, blob))
426 backup_results = self._storage.bulk_update_blobs(missing_blob_pairs)
428 for digest, result in zip(missing_blobs, backup_results):
429 if digest.hash in result_map:
430 # ERROR: blob was both inlined and backed up
431 raise RuntimeError("Blob was both inlined and backed up.")
432 result_map[digest.hash] = result
434 # Generate the final list of results
435 pairs_to_inline: List[Tuple[Digest, Optional[bytes]]] = []
436 results = []
437 for digest, blob in digest_blob_pairs:
438 status = result_map.get(
439 digest.hash,
440 Status(code=code_pb2.UNKNOWN, message="SQL Index: unable to determine the status of this blob"),
441 )
442 results.append(status)
443 if status.code == code_pb2.OK:
444 pairs_to_inline.append((digest, blob))
446 with self._sql.scoped_session() as session:
447 self._save_digests_to_index(pairs_to_inline, session)
449 return results
451 def _bulk_read_blobs_with_fallback(self, digests: List[Digest]) -> Dict[str, bytes]:
452 hash_to_digest: Dict[str, Digest] = {digest.hash: digest for digest in digests}
453 results: Dict[str, bytes] = {}
455 expected_storage_digests: List[Digest] = []
456 # Fetch inlined blobs directly from the index
457 entries = self._bulk_select_digests(digests, fetch_blobs=True)
458 for e in entries:
459 blob, digest_hash, digest = e.inline_blob, e.digest_hash, hash_to_digest[e.digest_hash]
460 if blob is not None:
461 results[digest_hash] = blob
462 hash_to_digest.pop(digest_hash)
463 else:
464 # If a blob is not inlined then the blob is expected to be in storage
465 expected_storage_digests.append(digest)
467 # Fetch everything that wasn't inlined from the backing storage
468 fetched_digests = self._storage.bulk_read_blobs(list(hash_to_digest.values()))
470 # Save everything fetched from storage, inlining the blobs if they're small enough
471 digest_pairs_to_save: List[Tuple[Digest, Optional[bytes]]] = []
472 for digest_hash, blob_data in fetched_digests.items():
473 if blob_data is not None:
474 digest = hash_to_digest[digest_hash]
475 if digest.size_bytes <= self._max_inline_blob_size:
476 digest_pairs_to_save.append((digest, blob_data))
477 else:
478 digest_pairs_to_save.append((digest, None))
479 results[digest_hash] = blob_data
481 # List of digests found in storage
482 acutal_storage_digest_hashes = set(
483 digest_hash for (digest_hash, blob_data) in fetched_digests.items() if blob_data is not None
484 )
485 # Get a list of all the digests that were in the index but not found in storage
486 digests_expected_not_in_storage: List[Digest] = []
487 for expected_digest in expected_storage_digests:
488 if expected_digest.hash not in acutal_storage_digest_hashes:
489 LOGGER.warning(f"Blob [{expected_digest}] was indexed but not in storage. Deleting from the index")
490 digests_expected_not_in_storage.append(expected_digest)
492 with self._sql.scoped_session() as session:
493 self._save_digests_to_index(digest_pairs_to_save, session)
494 if digests_expected_not_in_storage:
495 self._bulk_delete_from_index(digests_expected_not_in_storage, session)
496 session.commit()
498 return results
500 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
501 if self._fallback_on_get:
502 return self._bulk_read_blobs_with_fallback(digests)
504 # If fallback is disabled, query the index first and only
505 # query the storage for blobs that weren't inlined there
507 hash_to_digest = {digest.hash: digest for digest in digests} # hash -> digest map
508 results: Dict[str, bytes] = {} # The final list of results (return value)
509 digests_to_fetch: List[Digest] = [] # Digests that need to be fetched from storage
510 digest_pairs_to_save: List[Tuple[Digest, Optional[bytes]]] = [] # Digests that need to be updated in the index
512 # Fetch all of the digests in the database
513 # Anything that wasn't already inlined needs to be fetched
514 entries = self._bulk_select_digests(digests, fetch_blobs=True)
515 for index_entry in entries:
516 digest = hash_to_digest[index_entry.digest_hash]
517 if index_entry.inline_blob is not None:
518 results[index_entry.digest_hash] = index_entry.inline_blob
519 else:
520 digests_to_fetch.append(digest)
522 # Caution: digest whose blob cannot be found from storage will be dropped.
523 if digests_to_fetch:
524 fetched_digests = self._storage.bulk_read_blobs(digests_to_fetch)
525 else:
526 fetched_digests = {}
528 # Generate the list of inputs for _save_digests_to_index
529 #
530 # We only need to send blob data for small blobs fetched
531 # from the storage since everything else is either too
532 # big or already inlined
533 for digest in digests_to_fetch:
534 if blob_data := fetched_digests.get(digest.hash):
535 if digest.size_bytes <= self._max_inline_blob_size:
536 digest_pairs_to_save.append((digest, blob_data))
538 acutal_storage_digests = set(digest_hash for (digest_hash, _) in fetched_digests.items())
539 # Get a list of all the digests that were in the index but not found in storage
540 digests_expected_not_in_storage: List[Digest] = []
541 for expected_digest in digests_to_fetch:
542 if expected_digest.hash not in acutal_storage_digests:
543 LOGGER.warning(f"Blob [{expected_digest}] was indexed but not in storage. Deleting from the index")
544 digests_expected_not_in_storage.append(expected_digest)
546 # Update any blobs which need to be inlined
547 with self._sql.scoped_session() as session:
548 self._save_digests_to_index(digest_pairs_to_save, session)
549 if digests_expected_not_in_storage:
550 self._bulk_delete_from_index(digests_expected_not_in_storage, session)
551 session.commit()
553 results.update(fetched_digests)
554 return results
556 def _column_windows(self, session: SessionType, column: Column[Any]) -> Iterator[BooleanClauseList[Any]]:
557 """Adapted from the sqlalchemy WindowedRangeQuery recipe.
558 https://github.com/sqlalchemy/sqlalchemy/wiki/WindowedRangeQuery
560 This method breaks the timestamp range into windows and yields
561 the borders of these windows to the callee. For example, the borders
562 yielded by this might look something like
563 ('2019-10-08 18:25:03.699863', '2019-10-08 18:25:03.751018')
564 ('2019-10-08 18:25:03.751018', '2019-10-08 18:25:03.807867')
565 ('2019-10-08 18:25:03.807867', '2019-10-08 18:25:03.862192')
566 ('2019-10-08 18:25:03.862192',)
568 _windowed_lru_digests uses these borders to form WHERE clauses for its
569 SELECTs. In doing so, we make sure to repeatedly query the database for
570 live updates, striking a balance between loading the entire resultset
571 into memory and querying each row individually, both of which are
572 inefficient in the context of a large index.
574 The window size is a parameter and can be configured. A larger window
575 size will yield better performance (fewer SQL queries) at the cost of
576 memory (holding on to the results of the query) and accuracy (blobs
577 may get updated while you're working on them), and vice versa for a
578 smaller window size.
579 """
581 def int_for_range(start_id: Any, end_id: Any) -> BooleanClauseList[Any]:
582 if end_id:
583 return and_(column >= start_id, column < end_id)
584 else:
585 return column >= start_id # type: ignore[no-any-return]
587 # Constructs a query that:
588 # 1. Gets all the timestamps in sorted order.
589 # 2. Assign a row number to each entry.
590 # 3. Only keep timestamps that are every other N row number apart. N="_all_blobs_window_size".
591 # SELECT
592 # anon_1.index_accessed_timestamp AS anon_1_index_accessed_timestamp
593 # FROM (
594 # SELECT
595 # index.accessed_timestamp AS index_accessed_timestamp,
596 # row_number() OVER (ORDER BY index.accessed_timestamp) AS rownum
597 # FROM index
598 # )
599 # AS anon_1
600 # WHERE rownum % 1000=1
601 #
602 # Note:
603 # - This query can be slow due to checking each entry with "WHERE rownum % 1000=1".
604 # - These timestamps will be the basis for constructing the SQL "WHERE" clauses later.
605 q = session.query(column, func.row_number().over(order_by=column).label("rownum")).from_self(column)
606 if self._all_blobs_window_size > 1:
607 q = q.filter(text(f"rownum % {self._all_blobs_window_size}=1"))
609 # Execute the underlying query against the database.
610 # Ex: intervals = [t1, t1001, t2001, ...], q = [(t1, ), (t1001, ), (t2001, ), ...]
611 intervals = [id for id, in q]
613 # Generate the whereclauses
614 while intervals:
615 start = intervals.pop(0)
616 if intervals:
617 end = intervals[0]
618 else:
619 end = None
620 # Ex: yield "WHERE IndexEntry.accessed_timestamp >= start AND IndexEntry.accessed_timestamp < end"
621 yield int_for_range(start, end)
623 def _windowed_lru_digests(self, q: "Query[Any]", column: Column[Any]) -> Iterator[Tuple[IndexEntry, bool]]:
624 """Generate a query for each window produced by _column_windows
625 and yield the results one by one.
626 """
627 # Determine whether the conditions are met to make an SQL call to get new windows.
628 msg = "Using stored LRU windows"
629 if len(self._queue_of_whereclauses) == 0:
630 msg = "Requesting new LRU windows."
631 self._queue_of_whereclauses = deque(self._column_windows(q.session, column)) # type: ignore
632 self._delete_premarked_blobs = True
634 msg += f" Number of windows remaining: {len(self._queue_of_whereclauses)}"
635 LOGGER.debug(msg)
637 while self._queue_of_whereclauses:
638 whereclause = self._queue_of_whereclauses[0]
639 window = q.filter(whereclause).order_by(column.asc())
640 yield from window
642 # If yield from window doesn't get to this point that means
643 # the cleanup hasn't consumed all the content in a whereclause and exited.
644 # Otherwise, the whereclause is exhausted and can be discarded.
645 self._queue_of_whereclauses.popleft()
647 def least_recent_digests(self) -> Iterator[Digest]:
648 with self._sql.scoped_session() as session:
649 q = session.query(IndexEntry)
650 for entry in self._windowed_lru_digests(q, IndexEntry.accessed_timestamp):
651 # TODO make this generic or delete this method only used by tests.
652 index_entry = cast(IndexEntry, entry)
653 assert isinstance(index_entry.digest_hash, str)
654 assert isinstance(index_entry.digest_size_bytes, int)
655 yield Digest(hash=index_entry.digest_hash, size_bytes=index_entry.digest_size_bytes)
657 @DurationMetric(CAS_INDEX_SIZE_CALCULATION_TIME_METRIC_NAME)
658 def get_total_size(self, include_marked: bool = True) -> int:
659 statement = select(func.sum(IndexEntry.digest_size_bytes))
660 if not include_marked:
661 statement = statement.filter_by(deleted=False)
663 with self._sql.scoped_session() as session:
664 result = session.execute(statement).scalar()
665 if result is None:
666 result = 0
667 return result
669 def delete_n_bytes(
670 self, n_bytes: int, dry_run: bool = False, protect_blobs_after: Optional[datetime] = None
671 ) -> int:
672 """
673 When using a SQL Index, entries with a delete marker are "in the process of being deleted".
674 This is required because storage operations can't be safely tied to the SQL index transaction
675 (one may fail independently of the other, and you end up inconsistent).
677 The workflow is roughly as follows:
678 - Start a SQL transaction.
679 - Lock and mark the indexed items you want to delete.
680 - Close the SQL transaction.
681 - Perform the storage deletes
682 - Start a SQL transaction.
683 - Actually delete the index entries.
684 - Close the SQL transaction.
686 This means anything with deleted=False will always be present in the backing store. If it is marked
687 deleted=True, and the process gets killed when deleting from the backing storage, only
688 some of the items might actually be gone.
690 The next time the cleaner starts up, it can try to do that delete again (ignoring 404s).
691 Eventually that will succeed and the item will actually be removed from the DB. Only during
692 the first run of batches do we consider already marked items. This avoids multiple cleanup
693 daemons from competing with each other on every batch.
694 """
695 if protect_blobs_after is None:
696 protect_blobs_after = datetime.utcnow()
698 # Used for metric publishing
699 delete_start_time = time.time()
700 metadata = {}
701 if self._instance_name:
702 metadata["instance-name"] = self._instance_name
704 storage_digests: List[Digest] = []
705 marked_digests: List[Digest] = []
706 collected_bytes = 0
708 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
709 base_query = session.query(IndexEntry, IndexEntry.inline_blob != None).options( # noqa
710 load_only("digest_hash", "digest_size_bytes")
711 )
713 if self._delete_premarked_blobs:
714 LOGGER.info("Starting to gather pre-marked records")
715 premarked_query = base_query.filter_by(deleted=True)
716 for [entry, is_inline] in premarked_query.all():
717 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes)
718 marked_digests.append(digest)
719 if not is_inline:
720 storage_digests.append(digest)
721 collected_bytes += entry.digest_size_bytes
723 if not dry_run:
724 publish_counter_metric(CLEANUP_INDEX_PREMARKED_BLOBS_METRIC_NAME, len(marked_digests), metadata)
725 LOGGER.info(f"Gathered pre-marked {collected_bytes} out of {n_bytes} bytes(max)")
726 self._delete_premarked_blobs = False
728 if collected_bytes < n_bytes:
729 LOGGER.info("Searching for records to mark deleted")
730 unmarked_query = (
731 base_query.filter_by(deleted=False)
732 .filter(IndexEntry.accessed_timestamp < protect_blobs_after)
733 .with_for_update(skip_locked=True)
734 )
735 window = self._windowed_lru_digests(unmarked_query, IndexEntry.accessed_timestamp)
736 mark_deleted_start = time.perf_counter()
737 for [entry, is_inline] in window:
738 digest = Digest(hash=entry.digest_hash, size_bytes=entry.digest_size_bytes)
739 marked_digests.append(digest)
740 if not is_inline:
741 storage_digests.append(digest)
742 collected_bytes += entry.digest_size_bytes
743 if not dry_run:
744 entry.deleted = True
745 if collected_bytes >= n_bytes:
746 break
747 mark_deleted_duration = timedelta(seconds=time.perf_counter() - mark_deleted_start)
748 if not dry_run:
749 publish_timer_metric(CLEANUP_INDEX_MARK_DELETED_METRIC_NAME, mark_deleted_duration, metadata)
750 LOGGER.info(f"Gathered {collected_bytes} out of {n_bytes} bytes(max)")
752 if dry_run:
753 return collected_bytes
755 with DurationMetric(CLEANUP_STORAGE_BULK_DELETE_METRIC_NAME, instanced=True):
756 failed_deletes = self._storage.bulk_delete(storage_digests)
758 publish_counter_metric(CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME, len(failed_deletes), metadata)
759 digests_to_delete = [x for x in marked_digests if x.hash not in failed_deletes]
761 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
762 with DurationMetric(CLEANUP_INDEX_BULK_DELETE_METRIC_NAME, instanced=True):
763 failed_deletes.extend(self._bulk_delete_from_index(digests_to_delete, session))
764 for digest in digests_to_delete:
765 if digest in failed_deletes:
766 collected_bytes -= digest.size_bytes
768 batch_duration = time.time() - delete_start_time
769 blobs_deleted_per_second = (len(digests_to_delete) - len(failed_deletes)) / batch_duration
770 publish_gauge_metric(CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, blobs_deleted_per_second, metadata)
771 return collected_bytes
773 def bulk_delete(self, digests: List[Digest]) -> List[str]:
774 # Delete from the index and then delete from the backing storage.
775 with self._sql.scoped_session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
776 failed_deletes = self._bulk_delete_from_index(digests, session)
778 digests_to_delete = [x for x in digests if x.hash not in failed_deletes]
779 failed_deletes.extend(self._storage.bulk_delete(digests_to_delete))
780 return failed_deletes
782 def _bulk_delete_from_index(self, digests: List[Digest], session: Session) -> List[str]:
783 LOGGER.info(f"Deleting {len(digests)} digests from the index")
784 index_table = IndexEntry.__table__
785 hashes = [x.hash for x in digests]
787 # Make sure we don't exceed maximum size of an IN clause
788 n = self._inclause_limit
789 hash_chunks = [hashes[i : i + n] for i in range(0, len(hashes), n)]
791 # We will not raise, rollback, or log on StaleDataErrors.
792 # These errors occur when we delete fewer rows than we were expecting.
793 # This is fine, since the missing rows will get deleted eventually.
794 # When running bulk_deletes concurrently, StaleDataErrors
795 # occur too often to log.
796 num_blobs_deleted = 0
797 for chunk in hash_chunks:
798 # Do not wait for locks when deleting rows. Skip locked rows to
799 # avoid deadlocks.
800 stmt = index_table.delete().where(
801 index_table.c.digest_hash.in_(
802 select([index_table.c.digest_hash], index_table.c.digest_hash.in_(chunk)).with_for_update(
803 skip_locked=True
804 )
805 )
806 )
807 num_blobs_deleted += session.execute(stmt).rowcount # type: ignore
808 LOGGER.info(f"{num_blobs_deleted}/{len(digests)} blobs deleted from the index")
810 # bulk_delete is typically expected to return the digests that were not deleted,
811 # but delete only returns the number of rows deleted and not what was/wasn't
812 # deleted. Getting this info would require extra queries, so assume that
813 # everything was either deleted or already deleted. Failures will continue to throw
814 return []