Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/janitor/s3.py: 84.81%
79 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import random
16import re
17from itertools import product
18from threading import Event
19from typing import Any, Iterator, List, Set, Tuple
21from buildgrid.server.cleanup.janitor.config import JanitorConfig
22from buildgrid.server.cleanup.janitor.index import IndexLookup
23from buildgrid.server.cleanup.janitor.utils import check_bucket_versioning, get_s3_client
24from buildgrid.server.logging import buildgrid_logger
25from buildgrid.server.threading import ContextWorker
27LOGGER = buildgrid_logger(__name__)
30class S3Janitor:
32 def __init__(self, config: JanitorConfig, index: IndexLookup):
33 self._bucket_regex = re.compile(config.s3.bucket_regex)
34 self._index = index
35 self._path_prefix = config.s3.path_prefix
36 self._s3 = get_s3_client(config.s3)
37 self._sleep_interval = config.sleep_interval
38 self._hash_prefix_size = config.s3.hash_prefix_size
40 self._stop_requested = Event()
41 self._worker = ContextWorker(target=self.run, name="Janitor", on_shutdown_requested=self._stop_requested.set)
43 def enumerate_versioned_bucket(self, bucket: str, prefix: str) -> Iterator[Set[Tuple[str, str]]]:
44 pages = self._s3.get_paginator("list_object_versions").paginate(Bucket=bucket, Prefix=prefix)
45 for page in pages:
46 if "Versions" not in page:
47 continue
49 digest_version_pairs = {(item["Key"], item["VersionId"]) for item in page["Versions"]}
50 yield digest_version_pairs
52 def enumerate_unversioned_bucket(self, bucket: str, prefix: str) -> Iterator[Set[Tuple[str, str]]]:
53 pages = self._s3.get_paginator("list_objects").paginate(Bucket=bucket, Prefix=prefix)
54 for page in pages:
55 if "Contents" not in page:
56 continue
58 digest_version_pairs = {(item["Key"], "") for item in page["Contents"]}
59 yield digest_version_pairs
61 def delete_s3_entries(self, bucket: str, digest_versions: Set[Tuple[str, str]]) -> List[str]:
62 LOGGER.info("Deleting orphaned blobs from S3.", tags=dict(digest_count=len(digest_versions)))
63 response = self._s3.delete_objects(
64 Bucket=bucket,
65 Delete={
66 "Objects": [{"Key": digest, "VersionId": version} for digest, version in digest_versions],
67 "Quiet": False,
68 },
69 )
70 return [key["Key"] for key in response.get("Deleted", [])]
72 def get_buckets(self) -> List[str]:
73 response = self._s3.list_buckets()
74 return [
75 bucket["Name"] for bucket in response["Buckets"] if self._bucket_regex.search(bucket["Name"]) is not None
76 ]
78 # Generate all the hash prefixes and shuffle them to reduce the likelihood of
79 # two janitors cleaning the same hash prefix
80 def generate_prefixes(self) -> List[str]:
81 if self._hash_prefix_size:
82 prefixes = [
83 (self._path_prefix + "/" if self._path_prefix else "") + "".join(x)
84 for x in product("0123456789abcdef", repeat=self._hash_prefix_size)
85 ]
86 random.shuffle(prefixes)
87 else:
88 prefixes = [self._path_prefix]
89 return prefixes
91 def cleanup_bucket(self, bucket: str) -> int:
92 LOGGER.info("Cleaning up bucket.", tags=dict(bucket=bucket))
94 deleted_count = 0
95 if check_bucket_versioning(self._s3, bucket):
96 enumeration = self.enumerate_versioned_bucket
97 else:
98 enumeration = self.enumerate_unversioned_bucket
100 for prefix in self.generate_prefixes():
101 deleted_count_for_prefix = 0
102 for page in enumeration(bucket, prefix):
103 # Create a mapping between a digest as stored in S3 and a digest as stored in the index
104 # by stripping off any prefix and removing all '/' used by hash_prefix_size
105 digest_map = {digest: digest.replace(self._path_prefix, "").replace("/", "") for digest, _ in page}
107 missing_digest_versions = set(
108 digest_version
109 for digest_version in page
110 if digest_map[digest_version[0]] in self._index.get_missing_digests(set(digest_map.values()))
111 )
112 if missing_digest_versions:
113 self.delete_s3_entries(bucket, missing_digest_versions)
114 deleted_count_for_prefix += len(missing_digest_versions)
115 LOGGER.info(
116 "Deleted blobs from bucket prefix.",
117 tags=dict(digest_count=deleted_count_for_prefix, bucket=bucket, prefix=prefix),
118 )
119 deleted_count += deleted_count_for_prefix
121 LOGGER.info("Deleted blobs total from bucket.", tags=dict(digest_count=deleted_count, bucket=bucket))
122 return deleted_count
124 def start(self) -> None:
125 self._worker.start()
126 self._worker.wait()
128 def stop(self, *args: Any, **kwargs: Any) -> None:
129 self._worker.stop()
131 def run(self, stop_requested: Event) -> None:
132 random.seed()
133 while not stop_requested.is_set():
134 bucket_names = self.get_buckets()
136 # Shuffle the bucket names to reduce the likelihood of two janitors
137 # concurrently cleaning the same bucket.
138 random.shuffle(bucket_names)
140 for bucket in bucket_names:
141 self.cleanup_bucket(bucket)
143 stop_requested.wait(timeout=self._sleep_interval)