Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/s3/s3utils.py: 84.21%
304 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2021 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import io
16import os
17import random
18import threading
19import time
20from typing import IO, TYPE_CHECKING, Any, BinaryIO, Callable, Dict, List, Mapping, Optional, Sequence, Union, cast
22import botocore
23import pycurl
25from buildgrid.server.settings import (
26 S3_MAX_RETRIES,
27 S3_MULTIPART_MAX_CONCURRENT_PARTS,
28 S3_MULTIPART_PART_SIZE,
29 S3_TIMEOUT_CONNECT,
30 S3_TIMEOUT_READ,
31 S3_USERAGENT_NAME,
32)
34# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
35_RETRIABLE_HTTP_STATUS_CODES = (408, 429, 500, 502, 503, 504, 509)
36_RETRIABLE_S3_ERROR_CODES = (
37 "Throttling",
38 "ThrottlingException",
39 "ThrottledException",
40 "RequestThrottledException",
41 "ProvisionedThroughputExceededException",
42)
43# Maximum backoff in seconds
44_MAX_BACKOFF = 20
45# Maximum requests to run in parallel via CurlMulti
46_MAX_CURLMULTI_CONNECTIONS = 10
48if TYPE_CHECKING:
49 from mypy_boto3_s3 import Client as S3Client
52class _CurlLocal(threading.local):
53 def __init__(self) -> None:
54 self.curlmulti = pycurl.CurlMulti()
55 self.curlmulti.setopt(pycurl.M_MAX_TOTAL_CONNECTIONS, _MAX_CURLMULTI_CONNECTIONS)
58_curlLocal = _CurlLocal()
61class S3Object:
62 def __init__(self, bucket: str, key: str) -> None:
63 self.bucket = bucket
64 self.key = key
65 self.fileobj: Optional[IO[bytes]] = None
66 self.filesize: Optional[int] = None
67 self.error: Optional[Exception] = None
68 self.status_code: Optional[int] = None
69 self._method: str = ""
70 self._errfileobj: Optional[IO[bytes]] = None
71 self._response_headers: Dict[str, str] = {}
73 @property
74 def response_headers(self) -> Dict[str, str]:
75 return self._response_headers
77 # Function to process HTTP response headers
78 def _header_function(self, header_line: bytes) -> None:
79 header = header_line.decode("ascii")
81 # Skip status line
82 if ":" not in header:
83 return
85 name, value = header.split(":", maxsplit=1)
86 name = name.strip().lower()
87 value = value.strip()
89 self._response_headers[name] = value
92class UploadPart(io.BufferedIOBase):
93 def __init__(self, upload_id: str, number: int, file: IO[bytes], eof: int, size: int, offset: int) -> None:
94 super().__init__()
95 self._upload_id = upload_id
96 self._number = number
97 self._response = io.BytesIO()
98 self._file = file
99 self._content = None
100 try:
101 self._fd: Optional[int] = file.fileno()
102 except OSError:
103 # The "file" doesn't have a file descriptor, its probably a BytesIO.
104 # Read our part now so that we don't need to cope with thread safety
105 # when `UploadPart.read` is called.
106 self._fd = None
107 old_position = file.tell()
108 file.seek(offset)
109 self._content = file.read(size)
110 file.seek(old_position)
112 self._size = size
113 self._start = offset
114 self._end = min(eof, offset + size)
115 self._read_offset = 0
117 @property
118 def upload_id(self) -> str:
119 return self._upload_id
121 @property
122 def number(self) -> int:
123 return self._number
125 @property
126 def response(self) -> BinaryIO:
127 return self._response
129 def __len__(self) -> int:
130 return self._end - self._start
132 def readable(self) -> bool:
133 return True
135 def seekable(self) -> bool:
136 return False
138 def writable(self) -> bool:
139 return True
141 def read(self, size: Optional[int] = -1) -> bytes:
142 # If we have a real file underlying this part, then we want to do an
143 # `os.pread` for just the part that is relevant.
144 if self._fd is not None:
145 if size is None or size == -1:
146 size = self._size
148 # Calculate the actual read offset and make sure we're within our
149 # section of the file.
150 offset = self._start + self._read_offset
151 if offset >= self._end:
152 return b""
154 # Make sure we only read up to the end of our section of the file,
155 # in case the size requested is larger than the number of bytes
156 # remaining in our section
157 size = min(size, self._end - offset)
158 content = os.pread(self._fd, size, offset)
159 self._read_offset += size
160 return content
162 # Otherwise we can just return our pre-determined slice of the actual
163 # contents. This case should only be reached when MAX_IN_MEMORY_BLOB_SIZE_BYTES
164 # is the same as or larger than S3_MAX_UPLOAD_SIZE, which should ideally
165 # never be the case.
166 else:
167 if self._content is None:
168 raise ValueError(
169 f"Part {self._number} of upload {self._upload_id} is backed "
170 "by a BytesIO but the content couldn't be read when the part "
171 "was instantiated."
172 )
173 return self._content
175 def write(self, b: bytes) -> int: # type: ignore[override]
176 return self._response.write(b)
179def _curl_handle_for_s3(
180 s3: "S3Client", method: str, s3object: S3Object, extra_params: Optional[Mapping[str, Union[str, int]]] = None
181) -> pycurl.Curl:
182 if extra_params is None:
183 extra_params = {}
184 s3object._method = method
185 params: Dict[str, Union[str, int]] = {"Bucket": s3object.bucket, "Key": s3object.key, **extra_params}
186 url = s3.generate_presigned_url(method, Params=params, ExpiresIn=3600)
187 c = pycurl.Curl()
188 c.s3object = s3object # type: ignore
189 c.setopt(pycurl.USERAGENT, S3_USERAGENT_NAME)
190 c.setopt(pycurl.CONNECTTIMEOUT, S3_TIMEOUT_CONNECT)
191 c.setopt(pycurl.TIMEOUT, S3_TIMEOUT_READ)
192 c.setopt(pycurl.FAILONERROR, True)
193 c.setopt(pycurl.URL, url)
194 c.setopt(pycurl.HEADERFUNCTION, s3object._header_function)
195 return c
198# TODO Don't put random attributes on curl like this??
199def c_s3ojb(c: pycurl.Curl) -> S3Object:
200 return c.s3object # type: ignore
203def _curl_should_retry(c: pycurl.Curl, errno: int) -> bool:
204 if errno in (
205 pycurl.E_COULDNT_CONNECT,
206 pycurl.E_SEND_ERROR,
207 pycurl.E_RECV_ERROR,
208 pycurl.E_OPERATION_TIMEDOUT,
209 pycurl.E_PARTIAL_FILE,
210 ):
211 # Retry on network and timeout errors
212 return True
214 if errno == pycurl.E_HTTP_RETURNED_ERROR:
215 s3obj = c_s3ojb(c)
216 if s3obj.status_code in _RETRIABLE_HTTP_STATUS_CODES:
217 # Retry on 'Request Timeout', 'Too Many Requests' and transient server errors
218 return True
220 if error_response := getattr(s3obj.error, "response", None):
221 if error_response["Error"]["Code"] in _RETRIABLE_S3_ERROR_CODES:
222 return True
224 return False
227def _curl_multi_run(
228 objects: Sequence[S3Object], curl_handle_func: Callable[[S3Object], pycurl.Curl], attempt: int = 1
229) -> None:
230 m = _curlLocal.curlmulti
231 for s3object in objects:
232 c = curl_handle_func(s3object)
233 m.add_handle(c)
235 while True:
236 ret, active_handles = m.perform()
237 if ret == pycurl.E_CALL_MULTI_PERFORM:
238 # More processing required
239 continue
241 if active_handles:
242 # Wait for next event
243 m.select(15.0)
244 else:
245 # All operations complete
246 break
248 num_q, ok_list, err_list = m.info_read()
249 assert num_q == 0
251 retry_objects = []
252 for c in ok_list:
253 s3obj = c_s3ojb(c)
254 s3obj.status_code = c.getinfo(pycurl.HTTP_CODE) # type: ignore[no-untyped-call]
255 m.remove_handle(c)
256 c.close()
257 for c, errno, errmsg in err_list:
258 s3obj = c_s3ojb(c)
259 if errno == pycurl.E_HTTP_RETURNED_ERROR:
260 s3obj.status_code = c.getinfo(pycurl.HTTP_CODE) # type: ignore[no-untyped-call]
261 response: Dict[str, Any] = {}
262 response["status_code"] = s3obj.status_code
263 response["headers"] = s3obj._response_headers
264 if (errfileobj := s3obj._errfileobj) is None:
265 response["body"] = b""
266 else:
267 errfileobj.seek(0)
268 response["body"] = errfileobj.read()
269 errfileobj.truncate(0)
270 parser = botocore.parsers.RestXMLParser()
271 # TODO: botocore safely handles `None` being passed here, but it is
272 # probably best to rework this to get the correct `Shape` to match
273 # the type hints from boto3-stubs
274 parsed_response = parser.parse(response, None) # type: ignore[arg-type]
275 s3obj.error = botocore.exceptions.ClientError(parsed_response, s3obj._method)
276 else:
277 s3obj.error = pycurl.error(errmsg)
279 if attempt < S3_MAX_RETRIES and _curl_should_retry(c, errno):
280 s3obj.status_code = None
281 s3obj.error = None
282 retry_objects.append(s3obj)
284 m.remove_handle(c)
285 c.close()
287 if retry_objects and attempt < S3_MAX_RETRIES:
288 # Wait between attempts with truncated exponential backoff with jitter
289 exp_backoff = 2 ** (attempt - 1)
290 exp_backoff_with_jitter = random.random() * exp_backoff
291 time.sleep(min(exp_backoff_with_jitter, _MAX_BACKOFF))
293 _curl_multi_run(retry_objects, curl_handle_func, attempt=attempt + 1)
296def head_objects(s3: "S3Client", objects: Sequence[S3Object]) -> None:
297 def curl_handle_func(s3object: S3Object) -> pycurl.Curl:
298 c = _curl_handle_for_s3(s3, "head_object", s3object)
299 c.setopt(pycurl.NOBODY, True)
300 return c
302 _curl_multi_run(objects, curl_handle_func)
305def head_object(s3: "S3Client", s3object: S3Object) -> None:
306 head_objects(s3, [s3object])
307 if s3object.error is not None:
308 raise s3object.error
311def set_s3_timeout(
312 c: pycurl.Curl,
313 s3object: S3Object,
314 timeout_seconds_per_kilobyte: Optional[float],
315 timeout_min_seconds: float,
316) -> None:
317 timeout = timeout_min_seconds
318 if s3object.filesize is not None and timeout_seconds_per_kilobyte is not None:
319 timeout = max(timeout, s3object.filesize * timeout_seconds_per_kilobyte / 1024)
320 c.setopt(pycurl.TIMEOUT, int(timeout))
323def get_objects(
324 s3: "S3Client",
325 objects: Sequence[S3Object],
326 timeout_seconds_per_kilobyte: Optional[float] = None,
327 timeout_min_seconds: float = S3_TIMEOUT_READ,
328) -> None:
329 def curl_handle_func(s3object: S3Object) -> pycurl.Curl:
330 c = _curl_handle_for_s3(s3, "get_object", s3object)
331 c.setopt(pycurl.WRITEDATA, s3object.fileobj)
332 set_s3_timeout(c, s3object, timeout_seconds_per_kilobyte, timeout_min_seconds)
333 s3object._errfileobj = s3object.fileobj
334 return c
336 _curl_multi_run(objects, curl_handle_func)
339def get_object(
340 s3: "S3Client",
341 s3object: S3Object,
342 timeout_seconds_per_kilobyte: Optional[float] = None,
343 timeout_min_seconds: float = S3_TIMEOUT_READ,
344) -> None:
345 get_objects(s3, [s3object], timeout_seconds_per_kilobyte, timeout_min_seconds)
346 if s3object.error is not None:
347 raise s3object.error
350def put_objects(
351 s3: "S3Client",
352 objects: Sequence[S3Object],
353 timeout_seconds_per_kilobyte: Optional[float] = None,
354 timeout_min_seconds: float = S3_TIMEOUT_READ,
355) -> None:
356 def curl_handle_func(s3object: S3Object) -> pycurl.Curl:
357 c = _curl_handle_for_s3(s3, "put_object", s3object)
358 c.setopt(pycurl.READDATA, s3object.fileobj)
359 c.setopt(pycurl.INFILESIZE_LARGE, s3object.filesize)
360 c.setopt(pycurl.UPLOAD, 1)
361 set_s3_timeout(c, s3object, timeout_seconds_per_kilobyte, timeout_min_seconds)
362 s3object._errfileobj = io.BytesIO()
363 c.setopt(pycurl.WRITEDATA, s3object._errfileobj)
364 return c
366 _curl_multi_run(objects, curl_handle_func)
369def put_object(
370 s3: "S3Client",
371 s3object: S3Object,
372 timeout_seconds_per_kilobyte: Optional[float] = None,
373 timeout_min_seconds: float = S3_TIMEOUT_READ,
374) -> None:
375 put_objects(s3, [s3object], timeout_seconds_per_kilobyte, timeout_min_seconds)
376 if s3object.error is not None:
377 raise s3object.error
380ParsedResponse = Dict[str, Union[str, Dict[str, Union[str, int, Dict[str, str]]]]]
383def _parse_s3_response(s3: "S3Client", response: BinaryIO, s3object: S3Object, shape_name: str) -> ParsedResponse:
384 response_dict: Dict[str, Union[Optional[int], Dict[str, str], bytes]] = {}
385 response_dict["status_code"] = s3object.status_code
386 response_dict["headers"] = s3object.response_headers
387 response.seek(0)
388 response_dict["body"] = response.read()
390 parser = botocore.parsers.RestXMLParser()
391 shape = s3.meta.service_model.shape_for(shape_name)
392 return cast(ParsedResponse, parser.parse(response_dict, shape))
395def start_multipart_upload(s3: "S3Client", s3object: S3Object) -> str:
396 response = io.BytesIO()
398 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl:
399 c = _curl_handle_for_s3(s3, "create_multipart_upload", _s3object)
400 c.setopt(pycurl.WRITEDATA, response)
401 c.setopt(pycurl.POST, 1)
402 return c
404 _curl_multi_run([s3object], curl_handle_func)
405 if s3object.error is not None:
406 raise s3object.error
408 parsed_response = _parse_s3_response(s3, response, s3object, "CreateMultipartUploadOutput")
409 return parsed_response["UploadId"] # type: ignore
412def upload_parts(s3: "S3Client", s3object: S3Object, parts: Sequence[UploadPart]) -> Dict[int, str]:
413 s3object_map = {S3Object(s3object.bucket, s3object.key): part for part in parts}
415 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl:
416 part = s3object_map[_s3object]
417 params: Dict[str, Union[int, str]] = {"UploadId": part.upload_id, "PartNumber": part.number}
418 c = _curl_handle_for_s3(s3, "upload_part", _s3object, extra_params=params)
419 c.setopt(pycurl.READDATA, part)
420 c.setopt(pycurl.UPLOAD, 1)
421 c.setopt(pycurl.INFILESIZE_LARGE, len(part))
422 c.setopt(pycurl.WRITEDATA, part)
423 return c
425 _curl_multi_run(list(s3object_map.keys()), curl_handle_func)
427 errors: List[Exception] = [_s3object.error for _s3object in s3object_map.keys() if _s3object.error is not None]
428 if len(errors) > 0:
429 raise errors[0]
431 uploaded: Dict[int, str] = {}
432 for _s3object, part in s3object_map.items():
433 response = _parse_s3_response(s3, part.response, _s3object, "UploadPartOutput")
434 uploaded[part.number] = response["ResponseMetadata"]["HTTPHeaders"]["etag"] # type: ignore
435 return uploaded
438def complete_multipart_upload(
439 s3: "S3Client", s3object: S3Object, upload_id: str, parts: Mapping[int, str]
440) -> ParsedResponse:
441 # Use the boto3 client directly here, rather than a presigned URL. This is
442 # necessary because boto3's URL presigning is broken for `complete_multipart_upload`
443 # when using s3v4 auth.
444 #
445 # See https://github.com/boto/boto3/issues/2192
446 return cast(
447 ParsedResponse,
448 s3.complete_multipart_upload(
449 Bucket=s3object.bucket,
450 Key=s3object.key,
451 UploadId=upload_id,
452 MultipartUpload={"Parts": [{"ETag": tag, "PartNumber": number} for number, tag in parts.items()]},
453 ),
454 )
457def _list_multipart_parts(s3: "S3Client", s3object: S3Object, upload_id: str) -> ParsedResponse:
458 response = io.BytesIO()
460 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl:
461 params = {"UploadId": upload_id}
462 c = _curl_handle_for_s3(s3, "list_parts", _s3object, extra_params=params)
463 c.setopt(pycurl.WRITEDATA, response)
464 return c
466 _curl_multi_run([s3object], curl_handle_func)
467 if s3object.error is not None:
468 raise s3object.error
469 return _parse_s3_response(s3, response, s3object, "ListPartsOutput")
472def abort_multipart_upload(s3: "S3Client", s3object: S3Object, upload_id: str) -> None:
473 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl:
474 params = {"UploadId": upload_id}
475 c = _curl_handle_for_s3(s3, "abort_multipart_upload", _s3object, extra_params=params)
476 c.setopt(pycurl.CUSTOMREQUEST, "DELETE")
477 return c
479 parts = _list_multipart_parts(s3, s3object, upload_id)
481 # We need to iterate here in case any part uploads slip through in a race
482 # against the AbortMultipartUpload call.
483 #
484 # See https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
485 while len(parts.get("Parts", [])) > 0:
486 _curl_multi_run([s3object], curl_handle_func)
487 try:
488 parts = _list_multipart_parts(s3, s3object, upload_id)
489 except botocore.exceptions.ClientError as e:
490 code = e.response.get("Error", {}).get("Code")
491 if code == "404":
492 # 404 error here means that the multipart upload is properly aborted.
493 break
494 raise e
497def multipart_upload(s3: "S3Client", s3object: S3Object) -> None:
498 if s3object.fileobj is None or s3object.filesize is None:
499 raise TypeError("S3Object provided to multipart upload didn't contain a file.")
501 upload_id = start_multipart_upload(s3, s3object)
503 try:
504 part_number = 1
505 parts: Dict[int, str] = {}
506 queue: List[UploadPart] = []
507 while (part_number - 1) * S3_MULTIPART_PART_SIZE < s3object.filesize:
508 part = UploadPart(
509 upload_id=upload_id,
510 number=part_number,
511 file=s3object.fileobj,
512 eof=s3object.filesize,
513 size=S3_MULTIPART_PART_SIZE,
514 offset=(part_number - 1) * S3_MULTIPART_PART_SIZE,
515 )
516 queue.append(part)
518 part_number += 1
520 if len(queue) >= S3_MULTIPART_MAX_CONCURRENT_PARTS:
521 uploaded = upload_parts(s3, s3object, queue)
522 parts.update(uploaded)
523 queue = []
524 uploaded = upload_parts(s3, s3object, queue)
525 parts.update(uploaded)
527 complete_multipart_upload(s3, s3object, upload_id, parts)
528 except Exception as e:
529 abort_multipart_upload(s3, s3object, upload_id)
530 raise e