Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/janitor/s3.py: 83.33%

108 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-02-11 15:07 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import random 

16import re 

17from dataclasses import dataclass 

18from datetime import datetime, timedelta, timezone 

19from itertools import product 

20from threading import Event 

21from typing import Any, Iterator 

22 

23from buildgrid.server.cleanup.janitor.config import S3Config 

24from buildgrid.server.cleanup.janitor.index import IndexLookup 

25from buildgrid.server.cleanup.janitor.utils import check_bucket_versioning, get_s3_client 

26from buildgrid.server.logging import buildgrid_logger 

27from buildgrid.server.metrics_names import METRIC 

28from buildgrid.server.metrics_tags import tag_blob_age, tag_blob_size 

29from buildgrid.server.metrics_utils import publish_distribution_metric 

30from buildgrid.server.threading import ContextWorker 

31 

32LOGGER = buildgrid_logger(__name__) 

33 

34 

35@dataclass(frozen=True) 

36class ListObjectResult: 

37 key: str 

38 version_id: str 

39 last_modified: datetime 

40 size: int 

41 

42 

43def publish_s3_object_metrics(s3_objects: set[ListObjectResult]) -> None: 

44 for obj in s3_objects: 

45 age = datetime.now(tz=timezone.utc) - obj.last_modified 

46 age_in_ms = age / timedelta(milliseconds=1) 

47 age_range = tag_blob_age(age_in_ms) 

48 size_range = tag_blob_size(obj.size) 

49 publish_distribution_metric( 

50 METRIC.CLEANUP.JANITOR.BLOB_AGE, age_in_ms, objectAgeRange=age_range, objectSizeRange=size_range 

51 ) 

52 publish_distribution_metric( 

53 METRIC.CLEANUP.JANITOR.BLOB_BYTES, obj.size, objectAgeRange=age_range, objectSizeRange=size_range 

54 ) 

55 

56 

57class S3Janitor: 

58 

59 def __init__(self, s3Config: S3Config, index: IndexLookup): 

60 self._bucket_regex = re.compile(s3Config.bucket_regex) 

61 self._index = index 

62 self._path_prefix = s3Config.path_prefix 

63 self._s3 = get_s3_client(s3Config) 

64 self._sleep_interval = s3Config.sleep_interval 

65 self._hash_prefix_size = s3Config.hash_prefix_size 

66 

67 self._stop_requested = Event() 

68 self._worker = ContextWorker(target=self.run, name="Janitor", on_shutdown_requested=self._stop_requested.set) 

69 

70 def enumerate_versioned_bucket(self, bucket: str, prefix: str) -> Iterator[set[ListObjectResult]]: 

71 pages = self._s3.get_paginator("list_object_versions").paginate(Bucket=bucket, Prefix=prefix) 

72 for page in pages: 

73 if "Versions" not in page: 

74 continue 

75 

76 list_objects = { 

77 ListObjectResult( 

78 key=item["Key"], 

79 version_id=item["VersionId"], 

80 last_modified=item["LastModified"], 

81 size=item["Size"], 

82 ) 

83 for item in page["Versions"] 

84 } 

85 yield list_objects 

86 

87 def enumerate_unversioned_bucket(self, bucket: str, prefix: str) -> Iterator[set[ListObjectResult]]: 

88 pages = self._s3.get_paginator("list_objects").paginate(Bucket=bucket, Prefix=prefix) 

89 for page in pages: 

90 if "Contents" not in page: 

91 continue 

92 

93 list_objects = { 

94 ListObjectResult( 

95 key=item["Key"], 

96 version_id="", 

97 last_modified=item["LastModified"], 

98 size=item["Size"], 

99 ) 

100 for item in page["Contents"] 

101 } 

102 yield list_objects 

103 

104 def delete_s3_entries(self, bucket: str, missing_objects: list[ListObjectResult]) -> list[str]: 

105 LOGGER.info("Deleting orphaned blobs from S3.", tags=dict(digest_count=len(missing_objects))) 

106 response = self._s3.delete_objects( 

107 Bucket=bucket, 

108 Delete={ 

109 "Objects": [{"Key": obj.key, "VersionId": obj.version_id} for obj in missing_objects], 

110 "Quiet": False, 

111 }, 

112 ) 

113 return [ 

114 deleted_object["Key"] for deleted_object in response.get("Deleted", []) if "Key" in deleted_object.keys() 

115 ] 

116 

117 def get_buckets(self) -> list[str]: 

118 response = self._s3.list_buckets() 

119 return [ 

120 bucket["Name"] for bucket in response["Buckets"] if self._bucket_regex.search(bucket["Name"]) is not None 

121 ] 

122 

123 # Generate all the hash prefixes and shuffle them to reduce the likelihood of 

124 # two janitors cleaning the same hash prefix 

125 def generate_prefixes(self) -> list[str]: 

126 if self._hash_prefix_size: 

127 prefixes = [ 

128 (self._path_prefix + "/" if self._path_prefix else "") + "".join(x) 

129 for x in product("0123456789abcdef", repeat=self._hash_prefix_size) 

130 ] 

131 random.shuffle(prefixes) 

132 else: 

133 prefixes = [self._path_prefix] 

134 return prefixes 

135 

136 def cleanup_bucket(self, bucket: str) -> int: 

137 LOGGER.info("Cleaning up bucket.", tags=dict(bucket=bucket)) 

138 

139 deleted_count = 0 

140 if check_bucket_versioning(self._s3, bucket): 

141 enumeration = self.enumerate_versioned_bucket 

142 else: 

143 enumeration = self.enumerate_unversioned_bucket 

144 

145 for prefix in self.generate_prefixes(): 

146 deleted_count_for_prefix = 0 

147 for page in enumeration(bucket, prefix): 

148 # Create a mapping between a digest as stored in S3 and a digest as stored in the index 

149 # by stripping off any prefix and removing all '/' used by hash_prefix_size 

150 digest_map = {obj.key: obj.key.replace(self._path_prefix, "").replace("/", "") for obj in page} 

151 publish_s3_object_metrics(page) 

152 missing_digests = self._index.get_missing_digests(set(digest_map.values())) 

153 missing_objects = [obj for obj in page if digest_map[obj.key] in missing_digests] 

154 if missing_objects: 

155 self.delete_s3_entries(bucket, missing_objects) 

156 deleted_count_for_prefix += len(missing_objects) 

157 LOGGER.info( 

158 "Deleted blobs from bucket prefix.", 

159 tags=dict(digest_count=deleted_count_for_prefix, bucket=bucket, prefix=prefix), 

160 ) 

161 deleted_count += deleted_count_for_prefix 

162 

163 LOGGER.info("Deleted blobs total from bucket.", tags=dict(digest_count=deleted_count, bucket=bucket)) 

164 return deleted_count 

165 

166 def __enter__(self) -> "S3Janitor": 

167 self.start() 

168 return self 

169 

170 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

171 self.stop() 

172 

173 def start(self) -> None: 

174 self._worker.start() 

175 

176 def stop(self, *args: Any, **kwargs: Any) -> None: 

177 self._worker.stop() 

178 

179 def run(self, stop_requested: Event) -> None: 

180 random.seed() 

181 while not stop_requested.is_set(): 

182 try: 

183 bucket_names = self.get_buckets() 

184 

185 # Shuffle the bucket names to reduce the likelihood of two janitors 

186 # concurrently cleaning the same bucket. 

187 random.shuffle(bucket_names) 

188 

189 for bucket in bucket_names: 

190 self.cleanup_bucket(bucket) 

191 except Exception: 

192 LOGGER.exception("Exception while cleaning up S3 storage with janitor") 

193 continue 

194 

195 stop_requested.wait(timeout=self._sleep_interval)