Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/janitor/s3.py: 84.81%

79 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import random 

16import re 

17from itertools import product 

18from threading import Event 

19from typing import Any, Iterator, List, Set, Tuple 

20 

21from buildgrid.server.cleanup.janitor.config import JanitorConfig 

22from buildgrid.server.cleanup.janitor.index import IndexLookup 

23from buildgrid.server.cleanup.janitor.utils import check_bucket_versioning, get_s3_client 

24from buildgrid.server.logging import buildgrid_logger 

25from buildgrid.server.threading import ContextWorker 

26 

27LOGGER = buildgrid_logger(__name__) 

28 

29 

30class S3Janitor: 

31 

32 def __init__(self, config: JanitorConfig, index: IndexLookup): 

33 self._bucket_regex = re.compile(config.s3.bucket_regex) 

34 self._index = index 

35 self._path_prefix = config.s3.path_prefix 

36 self._s3 = get_s3_client(config.s3) 

37 self._sleep_interval = config.sleep_interval 

38 self._hash_prefix_size = config.s3.hash_prefix_size 

39 

40 self._stop_requested = Event() 

41 self._worker = ContextWorker(target=self.run, name="Janitor", on_shutdown_requested=self._stop_requested.set) 

42 

43 def enumerate_versioned_bucket(self, bucket: str, prefix: str) -> Iterator[Set[Tuple[str, str]]]: 

44 pages = self._s3.get_paginator("list_object_versions").paginate(Bucket=bucket, Prefix=prefix) 

45 for page in pages: 

46 if "Versions" not in page: 

47 continue 

48 

49 digest_version_pairs = {(item["Key"], item["VersionId"]) for item in page["Versions"]} 

50 yield digest_version_pairs 

51 

52 def enumerate_unversioned_bucket(self, bucket: str, prefix: str) -> Iterator[Set[Tuple[str, str]]]: 

53 pages = self._s3.get_paginator("list_objects").paginate(Bucket=bucket, Prefix=prefix) 

54 for page in pages: 

55 if "Contents" not in page: 

56 continue 

57 

58 digest_version_pairs = {(item["Key"], "") for item in page["Contents"]} 

59 yield digest_version_pairs 

60 

61 def delete_s3_entries(self, bucket: str, digest_versions: Set[Tuple[str, str]]) -> List[str]: 

62 LOGGER.info("Deleting orphaned blobs from S3.", tags=dict(digest_count=len(digest_versions))) 

63 response = self._s3.delete_objects( 

64 Bucket=bucket, 

65 Delete={ 

66 "Objects": [{"Key": digest, "VersionId": version} for digest, version in digest_versions], 

67 "Quiet": False, 

68 }, 

69 ) 

70 return [key["Key"] for key in response.get("Deleted", [])] 

71 

72 def get_buckets(self) -> List[str]: 

73 response = self._s3.list_buckets() 

74 return [ 

75 bucket["Name"] for bucket in response["Buckets"] if self._bucket_regex.search(bucket["Name"]) is not None 

76 ] 

77 

78 # Generate all the hash prefixes and shuffle them to reduce the likelihood of 

79 # two janitors cleaning the same hash prefix 

80 def generate_prefixes(self) -> List[str]: 

81 if self._hash_prefix_size: 

82 prefixes = [ 

83 (self._path_prefix + "/" if self._path_prefix else "") + "".join(x) 

84 for x in product("0123456789abcdef", repeat=self._hash_prefix_size) 

85 ] 

86 random.shuffle(prefixes) 

87 else: 

88 prefixes = [self._path_prefix] 

89 return prefixes 

90 

91 def cleanup_bucket(self, bucket: str) -> int: 

92 LOGGER.info("Cleaning up bucket.", tags=dict(bucket=bucket)) 

93 

94 deleted_count = 0 

95 if check_bucket_versioning(self._s3, bucket): 

96 enumeration = self.enumerate_versioned_bucket 

97 else: 

98 enumeration = self.enumerate_unversioned_bucket 

99 

100 for prefix in self.generate_prefixes(): 

101 deleted_count_for_prefix = 0 

102 for page in enumeration(bucket, prefix): 

103 # Create a mapping between a digest as stored in S3 and a digest as stored in the index 

104 # by stripping off any prefix and removing all '/' used by hash_prefix_size 

105 digest_map = {digest: digest.replace(self._path_prefix, "").replace("/", "") for digest, _ in page} 

106 

107 missing_digest_versions = set( 

108 digest_version 

109 for digest_version in page 

110 if digest_map[digest_version[0]] in self._index.get_missing_digests(set(digest_map.values())) 

111 ) 

112 if missing_digest_versions: 

113 self.delete_s3_entries(bucket, missing_digest_versions) 

114 deleted_count_for_prefix += len(missing_digest_versions) 

115 LOGGER.info( 

116 "Deleted blobs from bucket prefix.", 

117 tags=dict(digest_count=deleted_count_for_prefix, bucket=bucket, prefix=prefix), 

118 ) 

119 deleted_count += deleted_count_for_prefix 

120 

121 LOGGER.info("Deleted blobs total from bucket.", tags=dict(digest_count=deleted_count, bucket=bucket)) 

122 return deleted_count 

123 

124 def start(self) -> None: 

125 self._worker.start() 

126 self._worker.wait() 

127 

128 def stop(self, *args: Any, **kwargs: Any) -> None: 

129 self._worker.stop() 

130 

131 def run(self, stop_requested: Event) -> None: 

132 random.seed() 

133 while not stop_requested.is_set(): 

134 bucket_names = self.get_buckets() 

135 

136 # Shuffle the bucket names to reduce the likelihood of two janitors 

137 # concurrently cleaning the same bucket. 

138 random.shuffle(bucket_names) 

139 

140 for bucket in bucket_names: 

141 self.cleanup_bucket(bucket) 

142 

143 stop_requested.wait(timeout=self._sleep_interval)