Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/janitor/s3.py: 81.74%

115 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-07-10 13:10 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import random 

16import re 

17from dataclasses import dataclass 

18from datetime import datetime, timedelta, timezone 

19from itertools import product 

20from threading import Event 

21from typing import Any, Iterator 

22 

23from buildgrid.server.cleanup.janitor.config import S3Config 

24from buildgrid.server.cleanup.janitor.index import IndexLookup 

25from buildgrid.server.cleanup.janitor.utils import check_bucket_versioning, get_s3_client 

26from buildgrid.server.logging import buildgrid_logger 

27from buildgrid.server.metrics_names import METRIC 

28from buildgrid.server.metrics_tags import tag_blob_age, tag_blob_size 

29from buildgrid.server.metrics_utils import publish_distribution_metric 

30from buildgrid.server.threading import ContextWorker 

31 

32LOGGER = buildgrid_logger(__name__) 

33 

34 

35@dataclass(frozen=True) 

36class ListObjectResult: 

37 key: str 

38 version_id: str 

39 last_modified: datetime 

40 size: int 

41 

42 

43def publish_s3_object_metrics(s3_objects: set[ListObjectResult]) -> None: 

44 for obj in s3_objects: 

45 age = datetime.now(tz=timezone.utc) - obj.last_modified 

46 age_in_ms = age / timedelta(milliseconds=1) 

47 age_range = tag_blob_age(age_in_ms) 

48 size_range = tag_blob_size(obj.size) 

49 publish_distribution_metric( 

50 METRIC.CLEANUP.JANITOR.BLOB_AGE, age_in_ms, objectAgeRange=age_range, objectSizeRange=size_range 

51 ) 

52 publish_distribution_metric( 

53 METRIC.CLEANUP.JANITOR.BLOB_BYTES, obj.size, objectAgeRange=age_range, objectSizeRange=size_range 

54 ) 

55 

56 

57class S3Janitor: 

58 

59 def __init__(self, s3Config: S3Config, index: IndexLookup): 

60 self._bucket_regex = re.compile(s3Config.bucket_regex) 

61 self._index = index 

62 self._path_prefix = s3Config.path_prefix 

63 self._s3 = get_s3_client(s3Config) 

64 self._sleep_interval = s3Config.sleep_interval 

65 self._hash_prefix_size = s3Config.hash_prefix_size 

66 self._max_batch_size = s3Config.max_batch_size 

67 self._batch_sleep_interval = s3Config.batch_sleep_interval 

68 

69 self._stop_requested = Event() 

70 self._worker = ContextWorker(target=self.run, name="Janitor", on_shutdown_requested=self._stop_requested.set) 

71 

72 def enumerate_versioned_bucket(self, bucket: str, prefix: str) -> Iterator[set[ListObjectResult]]: 

73 pages = self._s3.get_paginator("list_object_versions").paginate( 

74 Bucket=bucket, Prefix=prefix, PaginationConfig={"PageSize": self._max_batch_size} 

75 ) 

76 for page in pages: 

77 if "Versions" not in page: 

78 continue 

79 

80 list_objects = { 

81 ListObjectResult( 

82 key=item["Key"], 

83 version_id=item["VersionId"], 

84 last_modified=item["LastModified"], 

85 size=item["Size"], 

86 ) 

87 for item in page["Versions"] 

88 } 

89 yield list_objects 

90 

91 def enumerate_unversioned_bucket(self, bucket: str, prefix: str) -> Iterator[set[ListObjectResult]]: 

92 pages = self._s3.get_paginator("list_objects").paginate( 

93 Bucket=bucket, Prefix=prefix, PaginationConfig={"PageSize": self._max_batch_size} 

94 ) 

95 for page in pages: 

96 if "Contents" not in page: 

97 continue 

98 

99 list_objects = { 

100 ListObjectResult( 

101 key=item["Key"], 

102 version_id="", 

103 last_modified=item["LastModified"], 

104 size=item["Size"], 

105 ) 

106 for item in page["Contents"] 

107 } 

108 yield list_objects 

109 

110 def delete_s3_entries(self, bucket: str, missing_objects: list[ListObjectResult]) -> list[str]: 

111 LOGGER.info("Deleting orphaned blobs from S3.", tags=dict(digest_count=len(missing_objects))) 

112 response = self._s3.delete_objects( 

113 Bucket=bucket, 

114 Delete={ 

115 "Objects": [{"Key": obj.key, "VersionId": obj.version_id} for obj in missing_objects], 

116 "Quiet": False, 

117 }, 

118 ) 

119 return [ 

120 deleted_object["Key"] for deleted_object in response.get("Deleted", []) if "Key" in deleted_object.keys() 

121 ] 

122 

123 def get_buckets(self) -> list[str]: 

124 response = self._s3.list_buckets() 

125 return [ 

126 bucket["Name"] for bucket in response["Buckets"] if self._bucket_regex.search(bucket["Name"]) is not None 

127 ] 

128 

129 # Generate all the hash prefixes and shuffle them to reduce the likelihood of 

130 # two janitors cleaning the same hash prefix 

131 def generate_prefixes(self) -> list[str]: 

132 if self._hash_prefix_size: 

133 prefixes = [ 

134 (self._path_prefix + "/" if self._path_prefix else "") + "".join(x) 

135 for x in product("0123456789abcdef", repeat=self._hash_prefix_size) 

136 ] 

137 random.shuffle(prefixes) 

138 else: 

139 prefixes = [self._path_prefix] 

140 return prefixes 

141 

142 def cleanup_bucket(self, bucket: str) -> int: 

143 LOGGER.info("Cleaning up bucket.", tags=dict(bucket=bucket)) 

144 

145 deleted_count = 0 

146 if check_bucket_versioning(self._s3, bucket): 

147 enumeration = self.enumerate_versioned_bucket 

148 else: 

149 enumeration = self.enumerate_unversioned_bucket 

150 

151 for prefix in self.generate_prefixes(): 

152 deleted_count_for_prefix = 0 

153 for page in enumeration(bucket, prefix): 

154 if self._stop_requested.is_set(): 

155 LOGGER.info("Janitor stop requested.") 

156 return deleted_count 

157 # Create a mapping between a digest as stored in S3 and a digest as stored in the index 

158 # by stripping off any prefix and removing all '/' used by hash_prefix_size 

159 digest_map = {obj.key: obj.key.replace(self._path_prefix, "").replace("/", "") for obj in page} 

160 publish_s3_object_metrics(page) 

161 missing_digests = self._index.get_missing_digests(set(digest_map.values())) 

162 missing_objects = [obj for obj in page if digest_map[obj.key] in missing_digests] 

163 if missing_objects: 

164 self.delete_s3_entries(bucket, missing_objects) 

165 deleted_count_for_prefix += len(missing_objects) 

166 if self._batch_sleep_interval: 

167 self._stop_requested.wait(timeout=self._batch_sleep_interval) 

168 LOGGER.info( 

169 "Deleted blobs from bucket prefix.", 

170 tags=dict(digest_count=deleted_count_for_prefix, bucket=bucket, prefix=prefix), 

171 ) 

172 deleted_count += deleted_count_for_prefix 

173 

174 LOGGER.info("Deleted blobs total from bucket.", tags=dict(digest_count=deleted_count, bucket=bucket)) 

175 return deleted_count 

176 

177 def __enter__(self) -> "S3Janitor": 

178 self.start() 

179 return self 

180 

181 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

182 self.stop() 

183 

184 def start(self) -> None: 

185 self._worker.start() 

186 

187 def stop(self, *args: Any, **kwargs: Any) -> None: 

188 self._worker.stop() 

189 

190 def run(self, stop_requested: Event) -> None: 

191 random.seed() 

192 while not stop_requested.is_set(): 

193 try: 

194 bucket_names = self.get_buckets() 

195 

196 # Shuffle the bucket names to reduce the likelihood of two janitors 

197 # concurrently cleaning the same bucket. 

198 random.shuffle(bucket_names) 

199 

200 for bucket in bucket_names: 

201 self.cleanup_bucket(bucket) 

202 except Exception: 

203 LOGGER.exception("Exception while cleaning up S3 storage with janitor") 

204 continue 

205 

206 stop_requested.wait(timeout=self._sleep_interval)