Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/janitor/s3.py: 81.74%
115 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-07-10 13:10 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-07-10 13:10 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import random
16import re
17from dataclasses import dataclass
18from datetime import datetime, timedelta, timezone
19from itertools import product
20from threading import Event
21from typing import Any, Iterator
23from buildgrid.server.cleanup.janitor.config import S3Config
24from buildgrid.server.cleanup.janitor.index import IndexLookup
25from buildgrid.server.cleanup.janitor.utils import check_bucket_versioning, get_s3_client
26from buildgrid.server.logging import buildgrid_logger
27from buildgrid.server.metrics_names import METRIC
28from buildgrid.server.metrics_tags import tag_blob_age, tag_blob_size
29from buildgrid.server.metrics_utils import publish_distribution_metric
30from buildgrid.server.threading import ContextWorker
32LOGGER = buildgrid_logger(__name__)
35@dataclass(frozen=True)
36class ListObjectResult:
37 key: str
38 version_id: str
39 last_modified: datetime
40 size: int
43def publish_s3_object_metrics(s3_objects: set[ListObjectResult]) -> None:
44 for obj in s3_objects:
45 age = datetime.now(tz=timezone.utc) - obj.last_modified
46 age_in_ms = age / timedelta(milliseconds=1)
47 age_range = tag_blob_age(age_in_ms)
48 size_range = tag_blob_size(obj.size)
49 publish_distribution_metric(
50 METRIC.CLEANUP.JANITOR.BLOB_AGE, age_in_ms, objectAgeRange=age_range, objectSizeRange=size_range
51 )
52 publish_distribution_metric(
53 METRIC.CLEANUP.JANITOR.BLOB_BYTES, obj.size, objectAgeRange=age_range, objectSizeRange=size_range
54 )
57class S3Janitor:
59 def __init__(self, s3Config: S3Config, index: IndexLookup):
60 self._bucket_regex = re.compile(s3Config.bucket_regex)
61 self._index = index
62 self._path_prefix = s3Config.path_prefix
63 self._s3 = get_s3_client(s3Config)
64 self._sleep_interval = s3Config.sleep_interval
65 self._hash_prefix_size = s3Config.hash_prefix_size
66 self._max_batch_size = s3Config.max_batch_size
67 self._batch_sleep_interval = s3Config.batch_sleep_interval
69 self._stop_requested = Event()
70 self._worker = ContextWorker(target=self.run, name="Janitor", on_shutdown_requested=self._stop_requested.set)
72 def enumerate_versioned_bucket(self, bucket: str, prefix: str) -> Iterator[set[ListObjectResult]]:
73 pages = self._s3.get_paginator("list_object_versions").paginate(
74 Bucket=bucket, Prefix=prefix, PaginationConfig={"PageSize": self._max_batch_size}
75 )
76 for page in pages:
77 if "Versions" not in page:
78 continue
80 list_objects = {
81 ListObjectResult(
82 key=item["Key"],
83 version_id=item["VersionId"],
84 last_modified=item["LastModified"],
85 size=item["Size"],
86 )
87 for item in page["Versions"]
88 }
89 yield list_objects
91 def enumerate_unversioned_bucket(self, bucket: str, prefix: str) -> Iterator[set[ListObjectResult]]:
92 pages = self._s3.get_paginator("list_objects").paginate(
93 Bucket=bucket, Prefix=prefix, PaginationConfig={"PageSize": self._max_batch_size}
94 )
95 for page in pages:
96 if "Contents" not in page:
97 continue
99 list_objects = {
100 ListObjectResult(
101 key=item["Key"],
102 version_id="",
103 last_modified=item["LastModified"],
104 size=item["Size"],
105 )
106 for item in page["Contents"]
107 }
108 yield list_objects
110 def delete_s3_entries(self, bucket: str, missing_objects: list[ListObjectResult]) -> list[str]:
111 LOGGER.info("Deleting orphaned blobs from S3.", tags=dict(digest_count=len(missing_objects)))
112 response = self._s3.delete_objects(
113 Bucket=bucket,
114 Delete={
115 "Objects": [{"Key": obj.key, "VersionId": obj.version_id} for obj in missing_objects],
116 "Quiet": False,
117 },
118 )
119 return [
120 deleted_object["Key"] for deleted_object in response.get("Deleted", []) if "Key" in deleted_object.keys()
121 ]
123 def get_buckets(self) -> list[str]:
124 response = self._s3.list_buckets()
125 return [
126 bucket["Name"] for bucket in response["Buckets"] if self._bucket_regex.search(bucket["Name"]) is not None
127 ]
129 # Generate all the hash prefixes and shuffle them to reduce the likelihood of
130 # two janitors cleaning the same hash prefix
131 def generate_prefixes(self) -> list[str]:
132 if self._hash_prefix_size:
133 prefixes = [
134 (self._path_prefix + "/" if self._path_prefix else "") + "".join(x)
135 for x in product("0123456789abcdef", repeat=self._hash_prefix_size)
136 ]
137 random.shuffle(prefixes)
138 else:
139 prefixes = [self._path_prefix]
140 return prefixes
142 def cleanup_bucket(self, bucket: str) -> int:
143 LOGGER.info("Cleaning up bucket.", tags=dict(bucket=bucket))
145 deleted_count = 0
146 if check_bucket_versioning(self._s3, bucket):
147 enumeration = self.enumerate_versioned_bucket
148 else:
149 enumeration = self.enumerate_unversioned_bucket
151 for prefix in self.generate_prefixes():
152 deleted_count_for_prefix = 0
153 for page in enumeration(bucket, prefix):
154 if self._stop_requested.is_set():
155 LOGGER.info("Janitor stop requested.")
156 return deleted_count
157 # Create a mapping between a digest as stored in S3 and a digest as stored in the index
158 # by stripping off any prefix and removing all '/' used by hash_prefix_size
159 digest_map = {obj.key: obj.key.replace(self._path_prefix, "").replace("/", "") for obj in page}
160 publish_s3_object_metrics(page)
161 missing_digests = self._index.get_missing_digests(set(digest_map.values()))
162 missing_objects = [obj for obj in page if digest_map[obj.key] in missing_digests]
163 if missing_objects:
164 self.delete_s3_entries(bucket, missing_objects)
165 deleted_count_for_prefix += len(missing_objects)
166 if self._batch_sleep_interval:
167 self._stop_requested.wait(timeout=self._batch_sleep_interval)
168 LOGGER.info(
169 "Deleted blobs from bucket prefix.",
170 tags=dict(digest_count=deleted_count_for_prefix, bucket=bucket, prefix=prefix),
171 )
172 deleted_count += deleted_count_for_prefix
174 LOGGER.info("Deleted blobs total from bucket.", tags=dict(digest_count=deleted_count, bucket=bucket))
175 return deleted_count
177 def __enter__(self) -> "S3Janitor":
178 self.start()
179 return self
181 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
182 self.stop()
184 def start(self) -> None:
185 self._worker.start()
187 def stop(self, *args: Any, **kwargs: Any) -> None:
188 self._worker.stop()
190 def run(self, stop_requested: Event) -> None:
191 random.seed()
192 while not stop_requested.is_set():
193 try:
194 bucket_names = self.get_buckets()
196 # Shuffle the bucket names to reduce the likelihood of two janitors
197 # concurrently cleaning the same bucket.
198 random.shuffle(bucket_names)
200 for bucket in bucket_names:
201 self.cleanup_bucket(bucket)
202 except Exception:
203 LOGGER.exception("Exception while cleaning up S3 storage with janitor")
204 continue
206 stop_requested.wait(timeout=self._sleep_interval)