Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/s3.py: 87.67%

219 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17S3Storage 

18================== 

19 

20A storage provider that stores data in an Amazon S3 bucket. 

21""" 

22 

23import io 

24import logging 

25from collections import defaultdict 

26from contextlib import ExitStack 

27from typing import IO, TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union 

28 

29import boto3 

30from botocore.exceptions import ClientError 

31 

32from buildgrid._exceptions import StorageFullError 

33from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

34from buildgrid._protos.google.rpc import code_pb2 

35from buildgrid._protos.google.rpc.status_pb2 import Status 

36from buildgrid.server.metrics_names import S3_DELETE_ERROR_CHECK_METRIC_NAME 

37from buildgrid.server.metrics_utils import DurationMetric 

38from buildgrid.server.s3 import s3utils 

39from buildgrid.settings import HASH, S3_MAX_UPLOAD_SIZE, S3_TIMEOUT_READ 

40 

41from .storage_abc import StorageABC, create_write_session 

42 

43if TYPE_CHECKING: 

44 from mypy_boto3_s3 import Client as S3Client 

45 

46LOGGER = logging.getLogger(__name__) 

47 

48 

49class S3Storage(StorageABC): 

50 def __init__( 

51 self, 

52 bucket: str, 

53 page_size: int = 1000, 

54 s3_read_timeout_seconds_per_kilobyte: Optional[float] = None, 

55 s3_write_timeout_seconds_per_kilobyte: Optional[float] = None, 

56 s3_read_timeout_min_seconds: float = S3_TIMEOUT_READ, 

57 s3_write_timeout_min_seconds: float = S3_TIMEOUT_READ, 

58 s3_versioned_deletes: bool = False, 

59 s3_hash_prefix_size: Optional[int] = None, 

60 s3_path_prefix_string: Optional[str] = None, 

61 **kwargs: Any, 

62 ) -> None: 

63 self._bucket_template = bucket 

64 self._page_size = page_size 

65 self._s3_read_timeout_seconds_per_kilobyte = s3_read_timeout_seconds_per_kilobyte 

66 self._s3_read_timeout_min_seconds = s3_read_timeout_min_seconds 

67 self._s3_write_timeout_seconds_per_kilobyte = s3_write_timeout_seconds_per_kilobyte 

68 self._s3_write_timeout_min_seconds = s3_write_timeout_min_seconds 

69 self._s3_versioned_deletes = s3_versioned_deletes 

70 self._s3_hash_prefix_size = s3_hash_prefix_size 

71 self._s3_path_prefix_string: Optional[str] = None 

72 if s3_path_prefix_string: 

73 self._s3_path_prefix_string = s3_path_prefix_string.strip("/") 

74 

75 # Boto logs can be very verbose, restrict to WARNING 

76 for boto_logger_name in ["boto3", "botocore", "s3transfer", "urllib3"]: 

77 boto_logger = logging.getLogger(boto_logger_name) 

78 boto_logger.setLevel(max(boto_logger.level, logging.WARNING)) 

79 

80 self._s3: "S3Client" = boto3.client("s3", **kwargs) 

81 

82 self._instance_name = None 

83 

84 def _construct_key_with_prefix(self, digest: Digest) -> str: 

85 if not self._s3_hash_prefix_size and not self._s3_path_prefix_string: 

86 return self._construct_key(digest) 

87 else: 

88 try: 

89 prefix = "" 

90 if self._s3_path_prefix_string: 

91 prefix += self._s3_path_prefix_string + "/" 

92 if self._s3_hash_prefix_size: 

93 prefix += digest.hash[0 : self._s3_hash_prefix_size] + "/" 

94 remaining = digest.hash[self._s3_hash_prefix_size :] 

95 else: 

96 remaining = digest.hash 

97 return f"{prefix}{remaining}_{digest.size_bytes}" 

98 except IndexError: 

99 LOGGER.error( 

100 f"Could not calculate bucket name for digest=[{digest}]. This " 

101 "is either a misconfiguration in the BuildGrid S3 bucket " 

102 "configuration, or a badly formed request." 

103 ) 

104 raise 

105 

106 def _get_bucket_name(self, digest: str) -> str: 

107 try: 

108 return self._bucket_template.format(digest=digest) 

109 except IndexError: 

110 LOGGER.error( 

111 f"Could not calculate bucket name for digest=[{digest}]. This " 

112 "is either a misconfiguration in the BuildGrid S3 bucket " 

113 "configuration, or a badly formed request." 

114 ) 

115 raise 

116 

117 def _construct_key(self, digest: Digest) -> str: 

118 return digest.hash + "_" + str(digest.size_bytes) 

119 

120 def _get_s3object(self, digest: Digest) -> s3utils.S3Object: 

121 object = s3utils.S3Object(self._get_bucket_name(digest.hash), self._construct_key_with_prefix(digest)) 

122 object.filesize = digest.size_bytes 

123 return object 

124 

125 def _remove_key_prefixes(self, key: str) -> str: 

126 # Only interested in last two elements if hash_prefix used 

127 split_key = key.split("/") 

128 if self._s3_hash_prefix_size: 

129 return "".join(split_key[-2:]) 

130 # Only need last element if only a prefix string was used 

131 if self._s3_path_prefix_string: 

132 return split_key[-1] 

133 return key 

134 

135 def _deconstruct_key(self, key: str) -> Tuple[str, int]: 

136 # Remove any prefix, returning key to hash_size_bytes format 

137 key = self._remove_key_prefixes(key) 

138 parts = key.split("_") 

139 size_bytes = int(parts[-1]) 

140 # This isn't as simple as just "the first part of the split" because 

141 # the hash part of the key itself might contain an underscore. 

142 digest_hash = "_".join(parts[0:-1]) 

143 return digest_hash, size_bytes 

144 

145 def _multi_delete_blobs(self, bucket_name: str, digests: List[Dict[str, str]]) -> List[str]: 

146 # TODO fix this: 

147 # expression has type "List[Dict[str, str]]", 

148 # TypedDict item "Objects" has type "Sequence[ObjectIdentifierTypeDef]" 

149 response = self._s3.delete_objects( 

150 Bucket=bucket_name, Delete={"Objects": digests} # type: ignore[typeddict-item] 

151 ) 

152 return_failed = [] 

153 failed_deletions = response.get("Errors", []) 

154 # TODO fix instance name initialization 

155 with DurationMetric( 

156 S3_DELETE_ERROR_CHECK_METRIC_NAME, self._instance_name, instanced=True # type: ignore[arg-type] 

157 ): 

158 for failed_key in failed_deletions: 

159 digest_hash, size_bytes = self._deconstruct_key(failed_key["Key"]) 

160 return_failed.append(f"{digest_hash}/{size_bytes}") 

161 return return_failed 

162 

163 def has_blob(self, digest: Digest) -> bool: 

164 LOGGER.debug(f"Checking for blob: [{digest}]") 

165 try: 

166 s3utils.head_object(self._s3, self._get_s3object(digest)) 

167 except ClientError as e: 

168 if e.response["Error"]["Code"] not in ["404", "NoSuchKey"]: 

169 raise 

170 return False 

171 return True 

172 

173 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

174 LOGGER.debug(f"Getting blob: [{digest}]") 

175 try: 

176 s3object = self._get_s3object(digest) 

177 s3object.fileobj = create_write_session(digest) 

178 s3utils.get_object( 

179 self._s3, 

180 s3object, 

181 timeout_seconds_per_kilobyte=self._s3_read_timeout_seconds_per_kilobyte, 

182 timeout_min_seconds=self._s3_read_timeout_min_seconds, 

183 ) 

184 s3object.fileobj.seek(0) 

185 return s3object.fileobj 

186 except ClientError as e: 

187 if e.response["Error"]["Code"] not in ["404", "NoSuchKey"]: 

188 raise 

189 return None 

190 

191 def _get_version_id(self, bucket: str, key: str) -> Optional[str]: 

192 try: 

193 return self._s3.head_object(Bucket=bucket, Key=key).get("VersionId") 

194 except ClientError as e: 

195 if e.response["Error"]["Code"] not in ["404", "NoSuchKey"]: 

196 raise 

197 return None 

198 

199 def delete_blob(self, digest: Digest) -> None: 

200 LOGGER.debug(f"Deleting blob: [{digest}]") 

201 bucket, key = self._get_bucket_name(digest.hash), self._construct_key_with_prefix(digest) 

202 try: 

203 if self._s3_versioned_deletes and (version_id := self._get_version_id(bucket, key)): 

204 self._s3.delete_object(Bucket=bucket, Key=key, VersionId=version_id) 

205 else: 

206 self._s3.delete_object(Bucket=bucket, Key=key) 

207 except ClientError as e: 

208 if e.response["Error"]["Code"] not in ["404", "NoSuchKey"]: 

209 raise 

210 

211 def _get_version_ids(self, digests: List[Digest]) -> Dict[str, Optional[str]]: 

212 s3objects = [self._get_s3object(digest) for digest in digests] 

213 s3utils.head_objects(self._s3, s3objects) 

214 return { 

215 digest.hash: s3object.response_headers.get("x-amz-version-id") 

216 for digest, s3object in zip(digests, s3objects) 

217 if s3object.error is None 

218 } 

219 

220 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

221 LOGGER.debug(f"Deleting {len(digests)} digests from S3 storage: [{digests}]") 

222 

223 if self._s3_versioned_deletes: 

224 version_ids = self._get_version_ids(digests) 

225 else: 

226 version_ids = {} 

227 

228 bucketed_requests: Dict[str, List[Dict[str, str]]] = defaultdict(list) 

229 for digest in digests: 

230 bucket = self._get_bucket_name(digest.hash) 

231 key = self._construct_key_with_prefix(digest) 

232 bucketed_requests[bucket].append({"Key": key}) 

233 if version_id := version_ids.get(digest.hash): 

234 bucketed_requests[bucket].append({"Key": key, "VersionId": version_id}) 

235 

236 failed_deletions = [] 

237 for bucket, requests in bucketed_requests.items(): 

238 for i in range(0, len(requests), self._page_size): 

239 try: 

240 failed_deletions += self._multi_delete_blobs(bucket, requests[i : i + self._page_size]) 

241 except ClientError as error: 

242 current_failed_deletions = [ 

243 self._deconstruct_key(key_versions["Key"]) 

244 for key_versions in requests[i : i + self._page_size] 

245 ] 

246 failed_deletions += [ 

247 f"{digest_hash}/{digest_size_bytes}" 

248 for digest_hash, digest_size_bytes in current_failed_deletions 

249 ] 

250 LOGGER.exception(error) 

251 LOGGER.exception("Error encountered when trying to delete blobs from the S3 storage") 

252 

253 return failed_deletions 

254 

255 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

256 LOGGER.debug(f"Writing blob: [{digest}]") 

257 write_session.seek(0) 

258 try: 

259 s3object = self._get_s3object(digest) 

260 s3object.fileobj = write_session 

261 s3object.filesize = digest.size_bytes 

262 if digest.size_bytes <= S3_MAX_UPLOAD_SIZE: 

263 s3utils.put_object( 

264 self._s3, 

265 s3object, 

266 timeout_seconds_per_kilobyte=self._s3_write_timeout_seconds_per_kilobyte, 

267 timeout_min_seconds=self._s3_write_timeout_min_seconds, 

268 ) 

269 else: 

270 s3utils.multipart_upload(self._s3, s3object) 

271 except ClientError as error: 

272 if error.response["Error"]["Code"] == "QuotaExceededException": 

273 raise StorageFullError("S3 Quota Exceeded.") from error 

274 raise error 

275 

276 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

277 result = [] 

278 s3objects = [] 

279 for digest in digests: 

280 s3object = self._get_s3object(digest) 

281 s3objects.append(s3object) 

282 s3utils.head_objects(self._s3, s3objects) 

283 for digest, s3object in zip(digests, s3objects): 

284 if s3object.error is not None: 

285 result.append(digest) 

286 return result 

287 

288 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

289 s3object_status_list: List[Union[Tuple[None, Status], Tuple[s3utils.S3Object, None]]] = [] 

290 s3objects = [] 

291 with ExitStack() as stack: 

292 for digest, data in blobs: 

293 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

294 status = Status( 

295 code=code_pb2.INVALID_ARGUMENT, 

296 message="Data doesn't match hash", 

297 ) 

298 s3object_status_list.append((None, status)) 

299 else: 

300 write_session = stack.enter_context(create_write_session(digest)) 

301 write_session.write(data) 

302 write_session.seek(0) 

303 s3object = self._get_s3object(digest) 

304 s3object.fileobj = write_session 

305 s3object.filesize = digest.size_bytes 

306 s3objects.append(s3object) 

307 s3object_status_list.append((s3object, None)) 

308 

309 s3utils.put_objects( 

310 self._s3, 

311 s3objects, 

312 timeout_seconds_per_kilobyte=self._s3_write_timeout_seconds_per_kilobyte, 

313 timeout_min_seconds=self._s3_write_timeout_min_seconds, 

314 ) 

315 

316 result = [] 

317 for res_s3object, res_status in s3object_status_list: 

318 if res_status: 

319 # Failed check before S3 object creation 

320 result.append(res_status) 

321 elif res_s3object: 

322 if res_s3object.error is None: 

323 # PUT was successful 

324 result.append(Status(code=code_pb2.OK)) 

325 else: 

326 result.append(Status(code=code_pb2.UNKNOWN, message=str(res_s3object.error))) 

327 

328 return result 

329 

330 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

331 s3objects: List[s3utils.S3Object] = [] 

332 for digest in digests: 

333 s3object = self._get_s3object(digest) 

334 s3object.fileobj = io.BytesIO() 

335 s3objects.append(s3object) 

336 

337 s3utils.get_objects( 

338 self._s3, 

339 s3objects, 

340 timeout_seconds_per_kilobyte=self._s3_read_timeout_seconds_per_kilobyte, 

341 timeout_min_seconds=self._s3_read_timeout_min_seconds, 

342 ) 

343 

344 blobmap: Dict[str, bytes] = {} 

345 for digest, s3object in zip(digests, s3objects): 

346 if not s3object.error: 

347 if s3object.fileobj: 

348 s3object.fileobj.seek(0) 

349 blobmap[digest.hash] = s3object.fileobj.read() 

350 elif s3object.status_code != 404: 

351 raise s3object.error 

352 return blobmap