Coverage for /builds/BuildGrid/buildgrid/buildgrid/cleanup/janitor/s3.py: 84.81%

79 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16import random 

17import re 

18from itertools import product 

19from threading import Event 

20from typing import Any, Iterator, List, Set, Tuple 

21 

22from buildgrid.cleanup.janitor.config import JanitorConfig 

23from buildgrid.cleanup.janitor.index import IndexLookup 

24from buildgrid.cleanup.janitor.utils import check_bucket_versioning, get_s3_client 

25from buildgrid.server.threading import ContextWorker 

26 

27LOGGER = logging.getLogger(__name__) 

28 

29 

30class S3Janitor: 

31 

32 def __init__(self, config: JanitorConfig, index: IndexLookup): 

33 self._bucket_regex = re.compile(config.s3.bucket_regex) 

34 self._index = index 

35 self._path_prefix = config.s3.path_prefix 

36 self._s3 = get_s3_client(config.s3) 

37 self._sleep_interval = config.sleep_interval 

38 self._hash_prefix_size = config.s3.hash_prefix_size 

39 

40 self._stop_requested = Event() 

41 self._worker = ContextWorker(target=self.run, name="Janitor", on_shutdown_requested=self._stop_requested.set) 

42 

43 def enumerate_versioned_bucket(self, bucket: str, prefix: str) -> Iterator[Set[Tuple[str, str]]]: 

44 pages = self._s3.get_paginator("list_object_versions").paginate(Bucket=bucket, Prefix=prefix) 

45 for page in pages: 

46 if "Versions" not in page: 

47 continue 

48 

49 digest_version_pairs = {(item["Key"], item["VersionId"]) for item in page["Versions"]} 

50 yield digest_version_pairs 

51 

52 def enumerate_unversioned_bucket(self, bucket: str, prefix: str) -> Iterator[Set[Tuple[str, str]]]: 

53 pages = self._s3.get_paginator("list_objects").paginate(Bucket=bucket, Prefix=prefix) 

54 for page in pages: 

55 if "Contents" not in page: 

56 continue 

57 

58 digest_version_pairs = {(item["Key"], "") for item in page["Contents"]} 

59 yield digest_version_pairs 

60 

61 def delete_s3_entries(self, bucket: str, digest_versions: Set[Tuple[str, str]]) -> List[str]: 

62 LOGGER.info(f"Deleting {len(digest_versions)} orphaned blobs from S3") 

63 response = self._s3.delete_objects( 

64 Bucket=bucket, 

65 Delete={ 

66 "Objects": [{"Key": digest, "VersionId": version} for digest, version in digest_versions], 

67 "Quiet": False, 

68 }, 

69 ) 

70 return [key["Key"] for key in response.get("Deleted", [])] 

71 

72 def get_buckets(self) -> List[str]: 

73 response = self._s3.list_buckets() 

74 return [ 

75 bucket["Name"] for bucket in response["Buckets"] if self._bucket_regex.search(bucket["Name"]) is not None 

76 ] 

77 

78 # Generate all the hash prefixes and shuffle them to reduce the likelihood of 

79 # two janitors cleaning the same hash prefix 

80 def generate_prefixes(self) -> List[str]: 

81 if self._hash_prefix_size: 

82 prefixes = [ 

83 (self._path_prefix + "/" if self._path_prefix else "") + "".join(x) 

84 for x in product("0123456789abcdef", repeat=self._hash_prefix_size) 

85 ] 

86 random.shuffle(prefixes) 

87 else: 

88 prefixes = [self._path_prefix] 

89 return prefixes 

90 

91 def cleanup_bucket(self, bucket: str) -> int: 

92 LOGGER.info(f"Cleaning up bucket: '{bucket}'") 

93 

94 deleted_count = 0 

95 if check_bucket_versioning(self._s3, bucket): 

96 enumeration = self.enumerate_versioned_bucket 

97 else: 

98 enumeration = self.enumerate_unversioned_bucket 

99 

100 for prefix in self.generate_prefixes(): 

101 deleted_count_for_prefix = 0 

102 for page in enumeration(bucket, prefix): 

103 # Create a mapping between a digest as stored in S3 and a digest as stored in the index 

104 # by stripping off any prefix and removing all '/' used by hash_prefix_size 

105 digest_map = {digest: digest.replace(self._path_prefix, "").replace("/", "") for digest, _ in page} 

106 

107 missing_digest_versions = set( 

108 digest_version 

109 for digest_version in page 

110 if digest_map[digest_version[0]] in self._index.get_missing_digests(set(digest_map.values())) 

111 ) 

112 if missing_digest_versions: 

113 self.delete_s3_entries(bucket, missing_digest_versions) 

114 deleted_count_for_prefix += len(missing_digest_versions) 

115 LOGGER.info(f"Deleted {deleted_count_for_prefix} blobs from '{bucket}/{prefix}'") 

116 deleted_count += deleted_count_for_prefix 

117 

118 LOGGER.info(f"Deleted {deleted_count} blobs total from bucket '{bucket}'") 

119 return deleted_count 

120 

121 def start(self) -> None: 

122 self._worker.start() 

123 self._worker.wait() 

124 

125 def stop(self, *args: Any, **kwargs: Any) -> None: 

126 self._worker.stop() 

127 

128 def run(self, stop_requested: Event) -> None: 

129 random.seed() 

130 while not stop_requested.is_set(): 

131 bucket_names = self.get_buckets() 

132 

133 # Shuffle the bucket names to reduce the likelihood of two janitors 

134 # concurrently cleaning the same bucket. 

135 random.shuffle(bucket_names) 

136 

137 for bucket in bucket_names: 

138 self.cleanup_bucket(bucket) 

139 

140 stop_requested.wait(timeout=self._sleep_interval)