Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/janitor/sql.py: 74.36%

78 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-28 16:48 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import random 

16from functools import lru_cache 

17from threading import Event 

18from typing import Any 

19 

20from sqlalchemy import delete, select 

21from sqlalchemy.orm.exc import StaleDataError 

22 

23from buildgrid.server.cleanup.janitor.config import SQLStorageConfig 

24from buildgrid.server.cleanup.janitor.index import IndexLookup 

25from buildgrid.server.exceptions import FailedPreconditionError 

26from buildgrid.server.logging import buildgrid_logger 

27from buildgrid.server.settings import HASH 

28from buildgrid.server.sql.models import BlobEntry 

29from buildgrid.server.threading import ContextWorker 

30 

31LOGGER = buildgrid_logger(__name__) 

32 

33 

34@lru_cache(maxsize=1) 

35def get_sha256_buckets() -> list[tuple[str, str]]: 

36 # This function creates bucket boundaries for 256 buckets based on sha256 hash values. 

37 # Each bucket represents a range of hash values. For example, the first bucket will include 

38 # all hashes starting with "00" and ending before "01". This is because in string comparison: 

39 # "00" < "0000..." < "00ffffff" < "01". 

40 # The last bucket will include hashes starting with "ff" and ending before "fg": 

41 # "ff" < "ff000000..." < "ffffff..." < "fg". 

42 

43 if HASH().name != "sha256": 

44 LOGGER.error("SQL Janitor only supports sha256 hashing.") 

45 raise FailedPreconditionError("SQL Janitor only supports sha256 hashing.") 

46 

47 # This creates the first 255 buckets. For example, the first bucket is ("00", "01"), 

48 # the second bucket is ("01", "02"), and so on up to ("fe", "ff"). 

49 buckets = [(f"{i:02x}", f"{i + 1:02x}") for i in range(255)] 

50 

51 # The last bucket is a special case because the last value is "ff" and the next value is "fg". 

52 buckets.append(("ff", "fg")) 

53 return buckets 

54 

55 

56class SQLJanitor: 

57 

58 def __init__(self, sqlStorageConfig: SQLStorageConfig, index: IndexLookup): 

59 self._index = index 

60 self._sql = sqlStorageConfig.sql 

61 self._sql_ro = sqlStorageConfig.sql_ro 

62 self._sleep_interval = sqlStorageConfig.sleep_interval 

63 self._batch_size = sqlStorageConfig.batch_size 

64 self._batch_sleep_interval = sqlStorageConfig.batch_sleep_interval 

65 

66 self._stop_requested = Event() 

67 self._worker = ContextWorker(target=self.run, name="Janitor", on_shutdown_requested=self._stop_requested.set) 

68 

69 def delete_digests(self, digests: set[str]) -> int: 

70 # We will not raise, rollback, or log on StaleDataErrors. 

71 # These errors occur when we delete fewer rows than we were expecting. 

72 with self._sql.session(exceptions_to_not_rollback_on=[StaleDataError]) as session: 

73 stmt = delete(BlobEntry).where( 

74 BlobEntry.digest_hash.in_( 

75 select(BlobEntry.digest_hash) 

76 .where(BlobEntry.digest_hash.in_(digests)) 

77 .with_for_update(skip_locked=True) 

78 ) 

79 ) 

80 # Set synchronize_session to false as we don't have any local session objects 

81 # to keep in sync 

82 return session.execute(stmt, execution_options={"synchronize_session": False}).rowcount 

83 

84 def cleanup_bucket(self, bucket: tuple[str, str]) -> None: 

85 current_end = bucket[0] 

86 num_deleted = 0 

87 while True: 

88 if self._stop_requested.is_set(): 

89 break 

90 statement = ( 

91 select(BlobEntry.digest_hash, BlobEntry.digest_size_bytes) 

92 .where(BlobEntry.digest_hash > current_end, BlobEntry.digest_hash < bucket[1]) 

93 .order_by(BlobEntry.digest_hash) 

94 .limit(self._batch_size) 

95 ) 

96 with self._sql_ro.session() as session: 

97 results = session.execute(statement).fetchall() 

98 

99 if not results: 

100 break 

101 

102 # get the digest of the last blob 

103 current_end = results[-1][0] 

104 

105 # Create a map from digets to a string "digests/size" 

106 digest_map = {f"{digest}_{size}": digest for digest, size in results} 

107 missing_digests = self._index.get_missing_digests(set(digest_map.keys())) 

108 missing_hashes = set([digest_map[key] for key in missing_digests]) 

109 num_deleted += self.delete_digests(missing_hashes) 

110 

111 if self._batch_sleep_interval: 

112 self._stop_requested.wait(timeout=self._batch_sleep_interval) 

113 

114 if len(results) < self._batch_size: 

115 break 

116 

117 LOGGER.info(f"Deleted {num_deleted} blobs from sql storage between {bucket[0]}-{bucket[1]}") 

118 

119 def __enter__(self) -> "SQLJanitor": 

120 self.start() 

121 return self 

122 

123 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

124 self.stop() 

125 

126 def start(self) -> None: 

127 self._worker.start() 

128 

129 def stop(self, *args: Any, **kwargs: Any) -> None: 

130 self._worker.stop() 

131 

132 def run(self, stop_requested: Event) -> None: 

133 random.seed() 

134 while not stop_requested.is_set(): 

135 try: 

136 bucket_boundaries = get_sha256_buckets() 

137 

138 # Shuffle the bucket names to reduce the likelihood of two janitors 

139 # concurrently cleaning the same bucket. 

140 random.shuffle(bucket_boundaries) 

141 

142 for bucket in bucket_boundaries: 

143 self.cleanup_bucket(bucket) 

144 except Exception: 

145 LOGGER.exception("Exception while cleaning up SQL storage with janitor") 

146 continue 

147 

148 stop_requested.wait(timeout=self._sleep_interval)