Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/janitor/s3.py: 83.33%
108 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-02-11 15:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-02-11 15:07 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import random
16import re
17from dataclasses import dataclass
18from datetime import datetime, timedelta, timezone
19from itertools import product
20from threading import Event
21from typing import Any, Iterator
23from buildgrid.server.cleanup.janitor.config import S3Config
24from buildgrid.server.cleanup.janitor.index import IndexLookup
25from buildgrid.server.cleanup.janitor.utils import check_bucket_versioning, get_s3_client
26from buildgrid.server.logging import buildgrid_logger
27from buildgrid.server.metrics_names import METRIC
28from buildgrid.server.metrics_tags import tag_blob_age, tag_blob_size
29from buildgrid.server.metrics_utils import publish_distribution_metric
30from buildgrid.server.threading import ContextWorker
32LOGGER = buildgrid_logger(__name__)
35@dataclass(frozen=True)
36class ListObjectResult:
37 key: str
38 version_id: str
39 last_modified: datetime
40 size: int
43def publish_s3_object_metrics(s3_objects: set[ListObjectResult]) -> None:
44 for obj in s3_objects:
45 age = datetime.now(tz=timezone.utc) - obj.last_modified
46 age_in_ms = age / timedelta(milliseconds=1)
47 age_range = tag_blob_age(age_in_ms)
48 size_range = tag_blob_size(obj.size)
49 publish_distribution_metric(
50 METRIC.CLEANUP.JANITOR.BLOB_AGE, age_in_ms, objectAgeRange=age_range, objectSizeRange=size_range
51 )
52 publish_distribution_metric(
53 METRIC.CLEANUP.JANITOR.BLOB_BYTES, obj.size, objectAgeRange=age_range, objectSizeRange=size_range
54 )
57class S3Janitor:
59 def __init__(self, s3Config: S3Config, index: IndexLookup):
60 self._bucket_regex = re.compile(s3Config.bucket_regex)
61 self._index = index
62 self._path_prefix = s3Config.path_prefix
63 self._s3 = get_s3_client(s3Config)
64 self._sleep_interval = s3Config.sleep_interval
65 self._hash_prefix_size = s3Config.hash_prefix_size
67 self._stop_requested = Event()
68 self._worker = ContextWorker(target=self.run, name="Janitor", on_shutdown_requested=self._stop_requested.set)
70 def enumerate_versioned_bucket(self, bucket: str, prefix: str) -> Iterator[set[ListObjectResult]]:
71 pages = self._s3.get_paginator("list_object_versions").paginate(Bucket=bucket, Prefix=prefix)
72 for page in pages:
73 if "Versions" not in page:
74 continue
76 list_objects = {
77 ListObjectResult(
78 key=item["Key"],
79 version_id=item["VersionId"],
80 last_modified=item["LastModified"],
81 size=item["Size"],
82 )
83 for item in page["Versions"]
84 }
85 yield list_objects
87 def enumerate_unversioned_bucket(self, bucket: str, prefix: str) -> Iterator[set[ListObjectResult]]:
88 pages = self._s3.get_paginator("list_objects").paginate(Bucket=bucket, Prefix=prefix)
89 for page in pages:
90 if "Contents" not in page:
91 continue
93 list_objects = {
94 ListObjectResult(
95 key=item["Key"],
96 version_id="",
97 last_modified=item["LastModified"],
98 size=item["Size"],
99 )
100 for item in page["Contents"]
101 }
102 yield list_objects
104 def delete_s3_entries(self, bucket: str, missing_objects: list[ListObjectResult]) -> list[str]:
105 LOGGER.info("Deleting orphaned blobs from S3.", tags=dict(digest_count=len(missing_objects)))
106 response = self._s3.delete_objects(
107 Bucket=bucket,
108 Delete={
109 "Objects": [{"Key": obj.key, "VersionId": obj.version_id} for obj in missing_objects],
110 "Quiet": False,
111 },
112 )
113 return [
114 deleted_object["Key"] for deleted_object in response.get("Deleted", []) if "Key" in deleted_object.keys()
115 ]
117 def get_buckets(self) -> list[str]:
118 response = self._s3.list_buckets()
119 return [
120 bucket["Name"] for bucket in response["Buckets"] if self._bucket_regex.search(bucket["Name"]) is not None
121 ]
123 # Generate all the hash prefixes and shuffle them to reduce the likelihood of
124 # two janitors cleaning the same hash prefix
125 def generate_prefixes(self) -> list[str]:
126 if self._hash_prefix_size:
127 prefixes = [
128 (self._path_prefix + "/" if self._path_prefix else "") + "".join(x)
129 for x in product("0123456789abcdef", repeat=self._hash_prefix_size)
130 ]
131 random.shuffle(prefixes)
132 else:
133 prefixes = [self._path_prefix]
134 return prefixes
136 def cleanup_bucket(self, bucket: str) -> int:
137 LOGGER.info("Cleaning up bucket.", tags=dict(bucket=bucket))
139 deleted_count = 0
140 if check_bucket_versioning(self._s3, bucket):
141 enumeration = self.enumerate_versioned_bucket
142 else:
143 enumeration = self.enumerate_unversioned_bucket
145 for prefix in self.generate_prefixes():
146 deleted_count_for_prefix = 0
147 for page in enumeration(bucket, prefix):
148 # Create a mapping between a digest as stored in S3 and a digest as stored in the index
149 # by stripping off any prefix and removing all '/' used by hash_prefix_size
150 digest_map = {obj.key: obj.key.replace(self._path_prefix, "").replace("/", "") for obj in page}
151 publish_s3_object_metrics(page)
152 missing_digests = self._index.get_missing_digests(set(digest_map.values()))
153 missing_objects = [obj for obj in page if digest_map[obj.key] in missing_digests]
154 if missing_objects:
155 self.delete_s3_entries(bucket, missing_objects)
156 deleted_count_for_prefix += len(missing_objects)
157 LOGGER.info(
158 "Deleted blobs from bucket prefix.",
159 tags=dict(digest_count=deleted_count_for_prefix, bucket=bucket, prefix=prefix),
160 )
161 deleted_count += deleted_count_for_prefix
163 LOGGER.info("Deleted blobs total from bucket.", tags=dict(digest_count=deleted_count, bucket=bucket))
164 return deleted_count
166 def __enter__(self) -> "S3Janitor":
167 self.start()
168 return self
170 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
171 self.stop()
173 def start(self) -> None:
174 self._worker.start()
176 def stop(self, *args: Any, **kwargs: Any) -> None:
177 self._worker.stop()
179 def run(self, stop_requested: Event) -> None:
180 random.seed()
181 while not stop_requested.is_set():
182 try:
183 bucket_names = self.get_buckets()
185 # Shuffle the bucket names to reduce the likelihood of two janitors
186 # concurrently cleaning the same bucket.
187 random.shuffle(bucket_names)
189 for bucket in bucket_names:
190 self.cleanup_bucket(bucket)
191 except Exception:
192 LOGGER.exception("Exception while cleaning up S3 storage with janitor")
193 continue
195 stop_requested.wait(timeout=self._sleep_interval)