Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/s3.py: 89.70%

165 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17S3Storage 

18================== 

19 

20A storage provider that stores data in an Amazon S3 bucket. 

21""" 

22 

23import io 

24import logging 

25from typing import Dict, List 

26from tempfile import TemporaryFile 

27 

28import boto3 

29from botocore.exceptions import ClientError 

30 

31from buildgrid._exceptions import StorageFullError 

32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

33from buildgrid._protos.google.rpc.status_pb2 import Status 

34from buildgrid._protos.google.rpc import code_pb2 

35from buildgrid.server.metrics_names import S3_DELETE_ERROR_CHECK_METRIC_NAME 

36from buildgrid.server.metrics_utils import DurationMetric 

37from buildgrid.server.s3 import s3utils 

38from buildgrid.settings import HASH, MAX_IN_MEMORY_BLOB_SIZE_BYTES 

39from .storage_abc import StorageABC 

40 

41 

42class S3Storage(StorageABC): 

43 

44 def __init__(self, bucket, page_size=1000, **kwargs): 

45 self.__logger = logging.getLogger(__name__) 

46 

47 self._bucket_template = bucket 

48 self._page_size = page_size 

49 

50 # Boto logs can be very verbose, restrict to WARNING 

51 for boto_logger_name in [ 

52 'boto3', 'botocore', 

53 's3transfer', 'urllib3' 

54 ]: 

55 boto_logger = logging.getLogger(boto_logger_name) 

56 boto_logger.setLevel(max(boto_logger.level, logging.WARNING)) 

57 

58 self._s3 = boto3.client('s3', **kwargs) 

59 

60 self._instance_name = None 

61 

62 def _get_bucket_name(self, digest): 

63 try: 

64 return self._bucket_template.format(digest=digest) 

65 except IndexError: 

66 self.__logger.error(f"Could not calculate bucket name for digest=[{digest}]. This " 

67 "is either a misconfiguration in the BuildGrid S3 bucket " 

68 "configuration, or a badly formed request.") 

69 raise 

70 

71 def _construct_key(self, digest): 

72 return digest.hash + '_' + str(digest.size_bytes) 

73 

74 def _get_s3object(self, digest): 

75 return s3utils.S3Object(self._get_bucket_name(digest.hash), self._construct_key(digest)) 

76 

77 def _deconstruct_key(self, key): 

78 parts = key.split('_') 

79 size_bytes = int(parts[-1]) 

80 # This isn't as simple as just "the first part of the split" because 

81 # the hash part of the key itself might contain an underscore. 

82 digest_hash = '_'.join(parts[0:-1]) 

83 return digest_hash, size_bytes 

84 

85 def _multi_delete_blobs(self, bucket_name, digests): 

86 response = self._s3.delete_objects(Bucket=bucket_name, Delete={'Objects': digests}) 

87 return_failed = [] 

88 failed_deletions = response.get('Errors', []) 

89 with DurationMetric(S3_DELETE_ERROR_CHECK_METRIC_NAME, 

90 self._instance_name, 

91 instanced=True): 

92 for failed_key in failed_deletions: 

93 digest_hash, size_bytes = self._deconstruct_key(failed_key['Key']) 

94 return_failed.append(f'{digest_hash}/{size_bytes}') 

95 return return_failed 

96 

97 def has_blob(self, digest): 

98 self.__logger.debug(f"Checking for blob: [{digest}]") 

99 try: 

100 s3utils.head_object(self._s3, self._get_s3object(digest)) 

101 except ClientError as e: 

102 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

103 raise 

104 return False 

105 return True 

106 

107 def get_blob(self, digest): 

108 self.__logger.debug(f"Getting blob: [{digest}]") 

109 try: 

110 s3object = self._get_s3object(digest) 

111 

112 if digest.size_bytes > MAX_IN_MEMORY_BLOB_SIZE_BYTES: 

113 # To avoid storing the whole file in memory, download to a 

114 # temporary file. 

115 ret = TemporaryFile() # pylint: disable=consider-using-with 

116 else: 

117 # But, to maximize performance, keep blobs that are small 

118 # enough in-memory. 

119 ret = io.BytesIO() 

120 

121 s3object.fileobj = ret 

122 s3utils.get_object(self._s3, s3object) 

123 ret.seek(0) 

124 return ret 

125 except ClientError as e: 

126 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

127 raise 

128 return None 

129 

130 def delete_blob(self, digest): 

131 self.__logger.debug(f"Deleting blob: [{digest}]") 

132 try: 

133 self._s3.delete_object(Bucket=self._get_bucket_name(digest.hash), 

134 Key=self._construct_key(digest)) 

135 except ClientError as e: 

136 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

137 raise 

138 

139 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

140 self.__logger.debug(f"Deleting {len(digests)} digests from S3 storage: [{digests}]") 

141 buckets_to_digest_lists: Dict[str, List[Dict[str, str]]] = {} 

142 failed_deletions = [] 

143 for digest in digests: 

144 bucket = self._get_bucket_name(digest.hash) 

145 if bucket in buckets_to_digest_lists: 

146 buckets_to_digest_lists[bucket].append({'Key': self._construct_key(digest)}) 

147 else: 

148 buckets_to_digest_lists[bucket] = [{'Key': self._construct_key(digest)}] 

149 if len(buckets_to_digest_lists[bucket]) >= self._page_size: 

150 # delete items for this bucket, hit page limit 

151 failed_deletions += self._multi_delete_blobs(bucket, 

152 buckets_to_digest_lists.pop(bucket)) 

153 # flush remaining items 

154 for bucket, digest_list in buckets_to_digest_lists.items(): 

155 failed_deletions += self._multi_delete_blobs(bucket, digest_list) 

156 return failed_deletions 

157 

158 def begin_write(self, digest): 

159 if digest.size_bytes > MAX_IN_MEMORY_BLOB_SIZE_BYTES: 

160 # To avoid storing the whole file in memory, upload to a 

161 # temporary file. 

162 write_session = TemporaryFile() # pylint: disable=consider-using-with 

163 else: 

164 # But, to maximize performance, keep blobs that are small 

165 # enough in-memory. 

166 write_session = io.BytesIO() 

167 return write_session 

168 

169 def commit_write(self, digest, write_session): 

170 self.__logger.debug(f"Writing blob: [{digest}]") 

171 write_session.seek(0) 

172 try: 

173 s3object = self._get_s3object(digest) 

174 s3object.fileobj = write_session 

175 s3object.filesize = digest.size_bytes 

176 s3utils.put_object(self._s3, s3object) 

177 except ClientError as error: 

178 if error.response['Error']['Code'] == 'QuotaExceededException': 

179 raise StorageFullError("S3 Quota Exceeded.") from error 

180 raise error 

181 finally: 

182 write_session.close() 

183 

184 def is_cleanup_enabled(self): 

185 return True 

186 

187 def missing_blobs(self, digests): 

188 result = [] 

189 s3objects = [] 

190 for digest in digests: 

191 s3object = self._get_s3object(digest) 

192 s3objects.append(s3object) 

193 s3utils.head_objects(self._s3, s3objects) 

194 for digest, s3object in zip(digests, s3objects): 

195 if s3object.error is not None: 

196 result.append(digest) 

197 return result 

198 

199 def bulk_update_blobs(self, blobs): 

200 s3object_status_list = [] 

201 s3objects = [] 

202 for digest, data in blobs: 

203 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

204 status = Status( 

205 code=code_pb2.INVALID_ARGUMENT, 

206 message="Data doesn't match hash", 

207 ) 

208 s3object_status_list.append((None, status)) 

209 else: 

210 write_session = self.begin_write(digest) 

211 write_session.write(data) 

212 write_session.seek(0) 

213 s3object = self._get_s3object(digest) 

214 s3object.fileobj = write_session 

215 s3object.filesize = digest.size_bytes 

216 s3objects.append(s3object) 

217 s3object_status_list.append((s3object, None)) 

218 

219 s3utils.put_objects(self._s3, s3objects) 

220 

221 result = [] 

222 for s3object, status in s3object_status_list: 

223 if status is not None: 

224 # Failed check before S3 object creation 

225 result.append(status) 

226 elif s3object.error is None: 

227 # PUT was successful 

228 result.append(Status(code=code_pb2.OK)) 

229 else: 

230 result.append(Status(code=code_pb2.UNKNOWN, message=str(s3object.error))) 

231 return result 

232 

233 def bulk_read_blobs(self, digests): 

234 s3objects = [] 

235 blobmap = {} 

236 for digest in digests: 

237 s3object = self._get_s3object(digest) 

238 s3object.fileobj = io.BytesIO() 

239 s3objects.append(s3object) 

240 

241 s3utils.get_objects(self._s3, s3objects) 

242 

243 for digest, s3object in zip(digests, s3objects): 

244 if s3object.error is None: 

245 s3object.fileobj.seek(0) 

246 blobmap[digest.hash] = s3object.fileobj 

247 elif s3object.status_code != 404: 

248 raise s3object.error 

249 return blobmap