Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17S3Storage 

18================== 

19 

20A storage provider that stores data in an Amazon S3 bucket. 

21""" 

22 

23import io 

24import logging 

25 

26import boto3 

27from botocore.exceptions import ClientError 

28 

29from buildgrid._enums import CleanupFailure, MetricRecordDomain 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

31from buildgrid.server.monitoring import get_monitoring_bus 

32from buildgrid.server.metrics_names import S3_DELETE_ERROR_CHECK_METRIC_NAME 

33from buildgrid.server.metrics_utils import DurationMetric 

34from buildgrid.utils import Failure 

35from .storage_abc import StorageABC 

36 

37 

38class S3Storage(StorageABC): 

39 

40 def __init__(self, bucket, page_size=1000, **kwargs): 

41 self.__logger = logging.getLogger(__name__) 

42 

43 self._bucket_template = bucket 

44 self._page_size = page_size 

45 self._s3 = boto3.resource('s3', **kwargs) 

46 

47 self._instance_name = None 

48 

49 def _get_bucket_name(self, digest): 

50 try: 

51 return self._bucket_template.format(digest=digest) 

52 except IndexError as e: 

53 self.__logger.error(f"Could not calculate bucket name for digest=[{digest}]. This " 

54 "is either a misconfiguration in the BuildGrid S3 bucket " 

55 "configuration, or a badly formed request.") 

56 raise 

57 

58 def _construct_key(self, digest): 

59 return digest.hash + '_' + str(digest.size_bytes) 

60 

61 def _deconstruct_key(self, key): 

62 parts = key.split('_') 

63 size_bytes = int(parts[-1]) 

64 # This isn't as simple as just "the first part of the split" because 

65 # the hash part of the key itself might contain an underscore. 

66 hash = '_'.join(parts[0:-1]) 

67 return hash, size_bytes 

68 

69 def _multi_delete_blobs(self, bucket_name, digests): 

70 bucket = self._s3.Bucket(bucket_name) 

71 response = bucket.delete_objects(Delete={'Objects': digests}) 

72 return_failed = [] 

73 failed_deletions = response.get('Errors', []) 

74 with DurationMetric(S3_DELETE_ERROR_CHECK_METRIC_NAME, 

75 MetricRecordDomain.CAS, 

76 self._instance_name, 

77 instanced=True): 

78 for failed_key in failed_deletions: 

79 hash, size_bytes = self._deconstruct_key(failed_key['Key']) 

80 if self.has_blob(Digest(hash=hash, size_bytes=size_bytes)): 

81 fail_type = CleanupFailure.FAILURE 

82 else: 

83 fail_type = CleanupFailure.MISSING 

84 return_failed.append(Failure(hash, fail_type)) 

85 return return_failed 

86 

87 def has_blob(self, digest): 

88 self.__logger.debug(f"Checking for blob: [{digest}]") 

89 try: 

90 self._s3.Object(self._get_bucket_name(digest.hash), 

91 self._construct_key(digest)).load() 

92 except ClientError as e: 

93 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

94 raise 

95 return False 

96 return True 

97 

98 def get_blob(self, digest): 

99 self.__logger.debug(f"Getting blob: [{digest}]") 

100 try: 

101 obj = self._s3.Object(self._get_bucket_name(digest.hash), 

102 self._construct_key(digest)) 

103 return io.BytesIO(obj.get()['Body'].read()) 

104 except ClientError as e: 

105 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

106 raise 

107 return None 

108 

109 def delete_blob(self, digest): 

110 self.__logger.debug(f"Deleting blob: [{digest}]") 

111 try: 

112 self._s3.Object(self._get_bucket_name(digest.hash), 

113 self._construct_key(digest)).delete() 

114 except ClientError as e: 

115 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

116 raise 

117 

118 def bulk_delete(self, digests): 

119 self.__logger.debug(f"Deleting blobs: [{digests}]") 

120 buckets_to_digest_lists = {} 

121 failed_deletions = [] 

122 for digest in digests: 

123 bucket = self._get_bucket_name(digest.hash) 

124 if bucket in buckets_to_digest_lists: 

125 buckets_to_digest_lists[bucket].append({'Key': self._construct_key(digest)}) 

126 else: 

127 buckets_to_digest_lists[bucket] = [{'Key': self._construct_key(digest)}] 

128 if len(buckets_to_digest_lists[bucket]) >= self._page_size: 

129 # delete items for this bucket, hit page limit 

130 failed_deletions += self._multi_delete_blobs(bucket, 

131 buckets_to_digest_lists.pop(bucket)) 

132 # flush remaining items 

133 for bucket in buckets_to_digest_lists: 

134 failed_deletions += self._multi_delete_blobs(bucket, buckets_to_digest_lists[bucket]) 

135 return failed_deletions 

136 

137 def begin_write(self, _digest): 

138 # TODO use multipart API for large blobs? 

139 return io.BytesIO() 

140 

141 def commit_write(self, digest, write_session): 

142 self.__logger.debug(f"Writing blob: [{digest}]") 

143 write_session.seek(0) 

144 self._s3.Bucket(self._get_bucket_name(digest.hash)) \ 

145 .upload_fileobj(write_session, 

146 self._construct_key(digest)) 

147 write_session.close() 

148 

149 def is_cleanup_enabled(self): 

150 return True