Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/s3.py: 89.51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

162 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17S3Storage 

18================== 

19 

20A storage provider that stores data in an Amazon S3 bucket. 

21""" 

22 

23import io 

24import logging 

25from typing import Dict, List 

26from tempfile import TemporaryFile 

27 

28import boto3 

29from botocore.exceptions import ClientError 

30 

31from buildgrid._exceptions import StorageFullError 

32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

33from buildgrid._protos.google.rpc.status_pb2 import Status 

34from buildgrid._protos.google.rpc import code_pb2 

35from buildgrid.server.metrics_names import S3_DELETE_ERROR_CHECK_METRIC_NAME 

36from buildgrid.server.metrics_utils import DurationMetric 

37from buildgrid.server.s3 import s3utils 

38from buildgrid.settings import HASH, MAX_READ_FROM_MEMORY_BLOB_SIZE_BYTES 

39from .storage_abc import StorageABC 

40 

41 

42class S3Storage(StorageABC): 

43 

44 def __init__(self, bucket, page_size=1000, **kwargs): 

45 self.__logger = logging.getLogger(__name__) 

46 

47 self._bucket_template = bucket 

48 self._page_size = page_size 

49 

50 # Boto logs can be very verbose, restrict to WARNING 

51 for boto_logger_name in [ 

52 'boto3', 'botocore', 

53 's3transfer', 'urllib3' 

54 ]: 

55 boto_logger = logging.getLogger(boto_logger_name) 

56 boto_logger.setLevel(max(boto_logger.level, logging.WARNING)) 

57 

58 self._s3 = boto3.client('s3', **kwargs) 

59 

60 self._instance_name = None 

61 

62 def _get_bucket_name(self, digest): 

63 try: 

64 return self._bucket_template.format(digest=digest) 

65 except IndexError: 

66 self.__logger.error(f"Could not calculate bucket name for digest=[{digest}]. This " 

67 "is either a misconfiguration in the BuildGrid S3 bucket " 

68 "configuration, or a badly formed request.") 

69 raise 

70 

71 def _construct_key(self, digest): 

72 return digest.hash + '_' + str(digest.size_bytes) 

73 

74 def _get_s3object(self, digest): 

75 return s3utils.S3Object(self._get_bucket_name(digest.hash), self._construct_key(digest)) 

76 

77 def _deconstruct_key(self, key): 

78 parts = key.split('_') 

79 size_bytes = int(parts[-1]) 

80 # This isn't as simple as just "the first part of the split" because 

81 # the hash part of the key itself might contain an underscore. 

82 digest_hash = '_'.join(parts[0:-1]) 

83 return digest_hash, size_bytes 

84 

85 def _multi_delete_blobs(self, bucket_name, digests): 

86 response = self._s3.delete_objects(Bucket=bucket_name, Delete={'Objects': digests}) 

87 return_failed = [] 

88 failed_deletions = response.get('Errors', []) 

89 with DurationMetric(S3_DELETE_ERROR_CHECK_METRIC_NAME, 

90 self._instance_name, 

91 instanced=True): 

92 for failed_key in failed_deletions: 

93 digest_hash, size_bytes = self._deconstruct_key(failed_key['Key']) 

94 return_failed.append(f'{digest_hash}/{size_bytes}') 

95 return return_failed 

96 

97 def has_blob(self, digest): 

98 self.__logger.debug(f"Checking for blob: [{digest}]") 

99 try: 

100 s3utils.head_object(self._s3, self._get_s3object(digest)) 

101 except ClientError as e: 

102 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

103 raise 

104 return False 

105 return True 

106 

107 def get_blob(self, digest): 

108 self.__logger.debug(f"Getting blob: [{digest}]") 

109 try: 

110 s3object = self._get_s3object(digest) 

111 

112 if digest.size_bytes > MAX_READ_FROM_MEMORY_BLOB_SIZE_BYTES: 

113 # To avoid storing the whole file in memory, download to a 

114 # temporary file. 

115 ret = TemporaryFile() # pylint: disable=consider-using-with 

116 else: 

117 # But, to maximize performance, keep blobs that are small 

118 # enough in-memory. 

119 ret = io.BytesIO() 

120 

121 s3object.fileobj = ret 

122 s3utils.get_object(self._s3, s3object) 

123 ret.seek(0) 

124 return ret 

125 except ClientError as e: 

126 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

127 raise 

128 return None 

129 

130 def delete_blob(self, digest): 

131 self.__logger.debug(f"Deleting blob: [{digest}]") 

132 try: 

133 self._s3.delete_object(Bucket=self._get_bucket_name(digest.hash), 

134 Key=self._construct_key(digest)) 

135 except ClientError as e: 

136 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

137 raise 

138 

139 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

140 self.__logger.debug(f"Deleting {len(digests)} digests from S3 storage: [{digests}]") 

141 buckets_to_digest_lists: Dict[str, List[Dict[str, str]]] = {} 

142 failed_deletions = [] 

143 for digest in digests: 

144 bucket = self._get_bucket_name(digest.hash) 

145 if bucket in buckets_to_digest_lists: 

146 buckets_to_digest_lists[bucket].append({'Key': self._construct_key(digest)}) 

147 else: 

148 buckets_to_digest_lists[bucket] = [{'Key': self._construct_key(digest)}] 

149 if len(buckets_to_digest_lists[bucket]) >= self._page_size: 

150 # delete items for this bucket, hit page limit 

151 failed_deletions += self._multi_delete_blobs(bucket, 

152 buckets_to_digest_lists.pop(bucket)) 

153 # flush remaining items 

154 for bucket, digest_list in buckets_to_digest_lists.items(): 

155 failed_deletions += self._multi_delete_blobs(bucket, digest_list) 

156 return failed_deletions 

157 

158 def begin_write(self, _digest): 

159 return io.BytesIO() 

160 

161 def commit_write(self, digest, write_session): 

162 self.__logger.debug(f"Writing blob: [{digest}]") 

163 write_session.seek(0) 

164 try: 

165 s3object = self._get_s3object(digest) 

166 s3object.fileobj = write_session 

167 s3object.filesize = digest.size_bytes 

168 s3utils.put_object(self._s3, s3object) 

169 except ClientError as error: 

170 if error.response['Error']['Code'] == 'QuotaExceededException': 

171 raise StorageFullError("S3 Quota Exceeded.") from error 

172 raise error 

173 finally: 

174 write_session.close() 

175 

176 def is_cleanup_enabled(self): 

177 return True 

178 

179 def missing_blobs(self, digests): 

180 result = [] 

181 s3objects = [] 

182 for digest in digests: 

183 s3object = self._get_s3object(digest) 

184 s3objects.append(s3object) 

185 s3utils.head_objects(self._s3, s3objects) 

186 for digest, s3object in zip(digests, s3objects): 

187 if s3object.error is not None: 

188 result.append(digest) 

189 return result 

190 

191 def bulk_update_blobs(self, blobs): 

192 s3object_status_list = [] 

193 s3objects = [] 

194 for digest, data in blobs: 

195 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

196 status = Status( 

197 code=code_pb2.INVALID_ARGUMENT, 

198 message="Data doesn't match hash", 

199 ) 

200 s3object_status_list.append((None, status)) 

201 else: 

202 write_session = self.begin_write(digest) 

203 write_session.write(data) 

204 write_session.seek(0) 

205 s3object = self._get_s3object(digest) 

206 s3object.fileobj = write_session 

207 s3object.filesize = digest.size_bytes 

208 s3objects.append(s3object) 

209 s3object_status_list.append((s3object, None)) 

210 

211 s3utils.put_objects(self._s3, s3objects) 

212 

213 result = [] 

214 for s3object, status in s3object_status_list: 

215 if status is not None: 

216 # Failed check before S3 object creation 

217 result.append(status) 

218 elif s3object.error is None: 

219 # PUT was successful 

220 result.append(Status(code=code_pb2.OK)) 

221 else: 

222 result.append(Status(code=code_pb2.UNKNOWN, message=str(s3object.error))) 

223 return result 

224 

225 def bulk_read_blobs(self, digests): 

226 s3objects = [] 

227 blobmap = {} 

228 for digest in digests: 

229 s3object = self._get_s3object(digest) 

230 s3object.fileobj = io.BytesIO() 

231 s3objects.append(s3object) 

232 

233 s3utils.get_objects(self._s3, s3objects) 

234 

235 for digest, s3object in zip(digests, s3objects): 

236 if s3object.error is None: 

237 s3object.fileobj.seek(0) 

238 blobmap[digest.hash] = s3object.fileobj 

239 elif s3object.status_code != 404: 

240 raise s3object.error 

241 return blobmap