Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/s3.py: 89.70%
165 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17S3Storage
18==================
20A storage provider that stores data in an Amazon S3 bucket.
21"""
23import io
24import logging
25from typing import Dict, List
26from tempfile import TemporaryFile
28import boto3
29from botocore.exceptions import ClientError
31from buildgrid._exceptions import StorageFullError
32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
33from buildgrid._protos.google.rpc.status_pb2 import Status
34from buildgrid._protos.google.rpc import code_pb2
35from buildgrid.server.metrics_names import S3_DELETE_ERROR_CHECK_METRIC_NAME
36from buildgrid.server.metrics_utils import DurationMetric
37from buildgrid.server.s3 import s3utils
38from buildgrid.settings import HASH, MAX_IN_MEMORY_BLOB_SIZE_BYTES
39from .storage_abc import StorageABC
42class S3Storage(StorageABC):
44 def __init__(self, bucket, page_size=1000, **kwargs):
45 self.__logger = logging.getLogger(__name__)
47 self._bucket_template = bucket
48 self._page_size = page_size
50 # Boto logs can be very verbose, restrict to WARNING
51 for boto_logger_name in [
52 'boto3', 'botocore',
53 's3transfer', 'urllib3'
54 ]:
55 boto_logger = logging.getLogger(boto_logger_name)
56 boto_logger.setLevel(max(boto_logger.level, logging.WARNING))
58 self._s3 = boto3.client('s3', **kwargs)
60 self._instance_name = None
62 def _get_bucket_name(self, digest):
63 try:
64 return self._bucket_template.format(digest=digest)
65 except IndexError:
66 self.__logger.error(f"Could not calculate bucket name for digest=[{digest}]. This "
67 "is either a misconfiguration in the BuildGrid S3 bucket "
68 "configuration, or a badly formed request.")
69 raise
71 def _construct_key(self, digest):
72 return digest.hash + '_' + str(digest.size_bytes)
74 def _get_s3object(self, digest):
75 return s3utils.S3Object(self._get_bucket_name(digest.hash), self._construct_key(digest))
77 def _deconstruct_key(self, key):
78 parts = key.split('_')
79 size_bytes = int(parts[-1])
80 # This isn't as simple as just "the first part of the split" because
81 # the hash part of the key itself might contain an underscore.
82 digest_hash = '_'.join(parts[0:-1])
83 return digest_hash, size_bytes
85 def _multi_delete_blobs(self, bucket_name, digests):
86 response = self._s3.delete_objects(Bucket=bucket_name, Delete={'Objects': digests})
87 return_failed = []
88 failed_deletions = response.get('Errors', [])
89 with DurationMetric(S3_DELETE_ERROR_CHECK_METRIC_NAME,
90 self._instance_name,
91 instanced=True):
92 for failed_key in failed_deletions:
93 digest_hash, size_bytes = self._deconstruct_key(failed_key['Key'])
94 return_failed.append(f'{digest_hash}/{size_bytes}')
95 return return_failed
97 def has_blob(self, digest):
98 self.__logger.debug(f"Checking for blob: [{digest}]")
99 try:
100 s3utils.head_object(self._s3, self._get_s3object(digest))
101 except ClientError as e:
102 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']:
103 raise
104 return False
105 return True
107 def get_blob(self, digest):
108 self.__logger.debug(f"Getting blob: [{digest}]")
109 try:
110 s3object = self._get_s3object(digest)
112 if digest.size_bytes > MAX_IN_MEMORY_BLOB_SIZE_BYTES:
113 # To avoid storing the whole file in memory, download to a
114 # temporary file.
115 ret = TemporaryFile() # pylint: disable=consider-using-with
116 else:
117 # But, to maximize performance, keep blobs that are small
118 # enough in-memory.
119 ret = io.BytesIO()
121 s3object.fileobj = ret
122 s3utils.get_object(self._s3, s3object)
123 ret.seek(0)
124 return ret
125 except ClientError as e:
126 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']:
127 raise
128 return None
130 def delete_blob(self, digest):
131 self.__logger.debug(f"Deleting blob: [{digest}]")
132 try:
133 self._s3.delete_object(Bucket=self._get_bucket_name(digest.hash),
134 Key=self._construct_key(digest))
135 except ClientError as e:
136 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']:
137 raise
139 def bulk_delete(self, digests: List[Digest]) -> List[str]:
140 self.__logger.debug(f"Deleting {len(digests)} digests from S3 storage: [{digests}]")
141 buckets_to_digest_lists: Dict[str, List[Dict[str, str]]] = {}
142 failed_deletions = []
143 for digest in digests:
144 bucket = self._get_bucket_name(digest.hash)
145 if bucket in buckets_to_digest_lists:
146 buckets_to_digest_lists[bucket].append({'Key': self._construct_key(digest)})
147 else:
148 buckets_to_digest_lists[bucket] = [{'Key': self._construct_key(digest)}]
149 if len(buckets_to_digest_lists[bucket]) >= self._page_size:
150 # delete items for this bucket, hit page limit
151 failed_deletions += self._multi_delete_blobs(bucket,
152 buckets_to_digest_lists.pop(bucket))
153 # flush remaining items
154 for bucket, digest_list in buckets_to_digest_lists.items():
155 failed_deletions += self._multi_delete_blobs(bucket, digest_list)
156 return failed_deletions
158 def begin_write(self, digest):
159 if digest.size_bytes > MAX_IN_MEMORY_BLOB_SIZE_BYTES:
160 # To avoid storing the whole file in memory, upload to a
161 # temporary file.
162 write_session = TemporaryFile() # pylint: disable=consider-using-with
163 else:
164 # But, to maximize performance, keep blobs that are small
165 # enough in-memory.
166 write_session = io.BytesIO()
167 return write_session
169 def commit_write(self, digest, write_session):
170 self.__logger.debug(f"Writing blob: [{digest}]")
171 write_session.seek(0)
172 try:
173 s3object = self._get_s3object(digest)
174 s3object.fileobj = write_session
175 s3object.filesize = digest.size_bytes
176 s3utils.put_object(self._s3, s3object)
177 except ClientError as error:
178 if error.response['Error']['Code'] == 'QuotaExceededException':
179 raise StorageFullError("S3 Quota Exceeded.") from error
180 raise error
181 finally:
182 write_session.close()
184 def is_cleanup_enabled(self):
185 return True
187 def missing_blobs(self, digests):
188 result = []
189 s3objects = []
190 for digest in digests:
191 s3object = self._get_s3object(digest)
192 s3objects.append(s3object)
193 s3utils.head_objects(self._s3, s3objects)
194 for digest, s3object in zip(digests, s3objects):
195 if s3object.error is not None:
196 result.append(digest)
197 return result
199 def bulk_update_blobs(self, blobs):
200 s3object_status_list = []
201 s3objects = []
202 for digest, data in blobs:
203 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash:
204 status = Status(
205 code=code_pb2.INVALID_ARGUMENT,
206 message="Data doesn't match hash",
207 )
208 s3object_status_list.append((None, status))
209 else:
210 write_session = self.begin_write(digest)
211 write_session.write(data)
212 write_session.seek(0)
213 s3object = self._get_s3object(digest)
214 s3object.fileobj = write_session
215 s3object.filesize = digest.size_bytes
216 s3objects.append(s3object)
217 s3object_status_list.append((s3object, None))
219 s3utils.put_objects(self._s3, s3objects)
221 result = []
222 for s3object, status in s3object_status_list:
223 if status is not None:
224 # Failed check before S3 object creation
225 result.append(status)
226 elif s3object.error is None:
227 # PUT was successful
228 result.append(Status(code=code_pb2.OK))
229 else:
230 result.append(Status(code=code_pb2.UNKNOWN, message=str(s3object.error)))
231 return result
233 def bulk_read_blobs(self, digests):
234 s3objects = []
235 blobmap = {}
236 for digest in digests:
237 s3object = self._get_s3object(digest)
238 s3object.fileobj = io.BytesIO()
239 s3objects.append(s3object)
241 s3utils.get_objects(self._s3, s3objects)
243 for digest, s3object in zip(digests, s3objects):
244 if s3object.error is None:
245 s3object.fileobj.seek(0)
246 blobmap[digest.hash] = s3object.fileobj
247 elif s3object.status_code != 404:
248 raise s3object.error
249 return blobmap