Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/janitor/sql.py: 74.36%
78 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-28 16:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-28 16:48 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import random
16from functools import lru_cache
17from threading import Event
18from typing import Any
20from sqlalchemy import delete, select
21from sqlalchemy.orm.exc import StaleDataError
23from buildgrid.server.cleanup.janitor.config import SQLStorageConfig
24from buildgrid.server.cleanup.janitor.index import IndexLookup
25from buildgrid.server.exceptions import FailedPreconditionError
26from buildgrid.server.logging import buildgrid_logger
27from buildgrid.server.settings import HASH
28from buildgrid.server.sql.models import BlobEntry
29from buildgrid.server.threading import ContextWorker
31LOGGER = buildgrid_logger(__name__)
34@lru_cache(maxsize=1)
35def get_sha256_buckets() -> list[tuple[str, str]]:
36 # This function creates bucket boundaries for 256 buckets based on sha256 hash values.
37 # Each bucket represents a range of hash values. For example, the first bucket will include
38 # all hashes starting with "00" and ending before "01". This is because in string comparison:
39 # "00" < "0000..." < "00ffffff" < "01".
40 # The last bucket will include hashes starting with "ff" and ending before "fg":
41 # "ff" < "ff000000..." < "ffffff..." < "fg".
43 if HASH().name != "sha256":
44 LOGGER.error("SQL Janitor only supports sha256 hashing.")
45 raise FailedPreconditionError("SQL Janitor only supports sha256 hashing.")
47 # This creates the first 255 buckets. For example, the first bucket is ("00", "01"),
48 # the second bucket is ("01", "02"), and so on up to ("fe", "ff").
49 buckets = [(f"{i:02x}", f"{i + 1:02x}") for i in range(255)]
51 # The last bucket is a special case because the last value is "ff" and the next value is "fg".
52 buckets.append(("ff", "fg"))
53 return buckets
56class SQLJanitor:
58 def __init__(self, sqlStorageConfig: SQLStorageConfig, index: IndexLookup):
59 self._index = index
60 self._sql = sqlStorageConfig.sql
61 self._sql_ro = sqlStorageConfig.sql_ro
62 self._sleep_interval = sqlStorageConfig.sleep_interval
63 self._batch_size = sqlStorageConfig.batch_size
64 self._batch_sleep_interval = sqlStorageConfig.batch_sleep_interval
66 self._stop_requested = Event()
67 self._worker = ContextWorker(target=self.run, name="Janitor", on_shutdown_requested=self._stop_requested.set)
69 def delete_digests(self, digests: set[str]) -> int:
70 # We will not raise, rollback, or log on StaleDataErrors.
71 # These errors occur when we delete fewer rows than we were expecting.
72 with self._sql.session(exceptions_to_not_rollback_on=[StaleDataError]) as session:
73 stmt = delete(BlobEntry).where(
74 BlobEntry.digest_hash.in_(
75 select(BlobEntry.digest_hash)
76 .where(BlobEntry.digest_hash.in_(digests))
77 .with_for_update(skip_locked=True)
78 )
79 )
80 # Set synchronize_session to false as we don't have any local session objects
81 # to keep in sync
82 return session.execute(stmt, execution_options={"synchronize_session": False}).rowcount
84 def cleanup_bucket(self, bucket: tuple[str, str]) -> None:
85 current_end = bucket[0]
86 num_deleted = 0
87 while True:
88 if self._stop_requested.is_set():
89 break
90 statement = (
91 select(BlobEntry.digest_hash, BlobEntry.digest_size_bytes)
92 .where(BlobEntry.digest_hash > current_end, BlobEntry.digest_hash < bucket[1])
93 .order_by(BlobEntry.digest_hash)
94 .limit(self._batch_size)
95 )
96 with self._sql_ro.session() as session:
97 results = session.execute(statement).fetchall()
99 if not results:
100 break
102 # get the digest of the last blob
103 current_end = results[-1][0]
105 # Create a map from digets to a string "digests/size"
106 digest_map = {f"{digest}_{size}": digest for digest, size in results}
107 missing_digests = self._index.get_missing_digests(set(digest_map.keys()))
108 missing_hashes = set([digest_map[key] for key in missing_digests])
109 num_deleted += self.delete_digests(missing_hashes)
111 if self._batch_sleep_interval:
112 self._stop_requested.wait(timeout=self._batch_sleep_interval)
114 if len(results) < self._batch_size:
115 break
117 LOGGER.info(f"Deleted {num_deleted} blobs from sql storage between {bucket[0]}-{bucket[1]}")
119 def __enter__(self) -> "SQLJanitor":
120 self.start()
121 return self
123 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
124 self.stop()
126 def start(self) -> None:
127 self._worker.start()
129 def stop(self, *args: Any, **kwargs: Any) -> None:
130 self._worker.stop()
132 def run(self, stop_requested: Event) -> None:
133 random.seed()
134 while not stop_requested.is_set():
135 try:
136 bucket_boundaries = get_sha256_buckets()
138 # Shuffle the bucket names to reduce the likelihood of two janitors
139 # concurrently cleaning the same bucket.
140 random.shuffle(bucket_boundaries)
142 for bucket in bucket_boundaries:
143 self.cleanup_bucket(bucket)
144 except Exception:
145 LOGGER.exception("Exception while cleaning up SQL storage with janitor")
146 continue
148 stop_requested.wait(timeout=self._sleep_interval)