Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/s3/s3utils.py: 89.61%
337 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
1# Copyright (C) 2021 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import io
16import os
17import random
18import threading
19import time
20from tempfile import TemporaryFile
21from typing import IO, TYPE_CHECKING, Any, BinaryIO, Callable, Iterator, Mapping, Sequence, cast
23import botocore
24import pycurl
26from buildgrid.server.settings import (
27 S3_MAX_RETRIES,
28 S3_MULTIPART_MAX_CONCURRENT_PARTS,
29 S3_MULTIPART_PART_SIZE,
30 S3_TIMEOUT_CONNECT,
31 S3_TIMEOUT_READ,
32 S3_USERAGENT_NAME,
33)
35# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
36_RETRIABLE_HTTP_STATUS_CODES = (408, 429, 500, 502, 503, 504, 509)
37_RETRIABLE_S3_ERROR_CODES = (
38 "Throttling",
39 "ThrottlingException",
40 "ThrottledException",
41 "RequestThrottledException",
42 "ProvisionedThroughputExceededException",
43)
44# Maximum backoff in seconds
45_MAX_BACKOFF = 20
46# Maximum requests to run in parallel via CurlMulti
47_MAX_CURLMULTI_CONNECTIONS = 10
49if TYPE_CHECKING:
50 from mypy_boto3_s3 import Client as S3Client
53class _CurlLocal(threading.local):
54 def __init__(self) -> None:
55 self.curlmulti = pycurl.CurlMulti()
56 self.curlmulti.setopt(pycurl.M_MAX_TOTAL_CONNECTIONS, _MAX_CURLMULTI_CONNECTIONS)
59_curlLocal = _CurlLocal()
62class S3Object:
63 def __init__(self, bucket: str, key: str) -> None:
64 self.bucket = bucket
65 self.key = key
66 self.fileobj: IO[bytes] | None = None
67 self.filesize: int | None = None
68 self.error: Exception | None = None
69 self.status_code: int | None = None
70 self._method: str = ""
71 self._errfileobj: IO[bytes] | None = None
72 self._response_headers: dict[str, str] = {}
74 @property
75 def response_headers(self) -> dict[str, str]:
76 return self._response_headers
78 # Function to process HTTP response headers
79 def _header_function(self, header_line: bytes) -> None:
80 header = header_line.decode("ascii")
82 # Skip status line
83 if ":" not in header:
84 return
86 name, value = header.split(":", maxsplit=1)
87 name = name.strip().lower()
88 value = value.strip()
90 self._response_headers[name] = value
93class UploadPart(io.BufferedIOBase):
94 def __init__(self, upload_id: str, number: int, file: IO[bytes], eof: int, size: int, offset: int) -> None:
95 super().__init__()
96 self._upload_id = upload_id
97 self._number = number
98 self._response = io.BytesIO()
99 self._file = file
100 self._content = None
101 try:
102 self._fd: int | None = file.fileno()
103 except OSError:
104 # The "file" doesn't have a file descriptor, its probably a BytesIO.
105 # Read our part now so that we don't need to cope with thread safety
106 # when `UploadPart.read` is called.
107 self._fd = None
108 old_position = file.tell()
109 file.seek(offset)
110 self._content = file.read(size)
111 file.seek(old_position)
113 self._size = size
114 self._start = offset
115 self._end = min(eof, offset + size)
116 self._read_offset = 0
118 @property
119 def upload_id(self) -> str:
120 return self._upload_id
122 @property
123 def number(self) -> int:
124 return self._number
126 @property
127 def response(self) -> BinaryIO:
128 return self._response
130 def __len__(self) -> int:
131 return self._end - self._start
133 def readable(self) -> bool:
134 return True
136 def seekable(self) -> bool:
137 return False
139 def writable(self) -> bool:
140 return True
142 def read(self, size: int | None = -1) -> bytes:
143 # If we have a real file underlying this part, then we want to do an
144 # `os.pread` for just the part that is relevant.
145 if self._fd is not None:
146 if size is None or size == -1:
147 size = self._size
149 # Calculate the actual read offset and make sure we're within our
150 # section of the file.
151 offset = self._start + self._read_offset
152 if offset >= self._end:
153 return b""
155 # Make sure we only read up to the end of our section of the file,
156 # in case the size requested is larger than the number of bytes
157 # remaining in our section
158 size = min(size, self._end - offset)
159 content = os.pread(self._fd, size, offset)
160 self._read_offset += size
161 return content
163 # Otherwise we can just return our pre-determined slice of the actual
164 # contents. This case should only be reached when MAX_IN_MEMORY_BLOB_SIZE_BYTES
165 # is the same as or larger than S3_MAX_UPLOAD_SIZE, which should ideally
166 # never be the case.
167 else:
168 if self._content is None:
169 raise ValueError(
170 f"Part {self._number} of upload {self._upload_id} is backed "
171 "by a BytesIO but the content couldn't be read when the part "
172 "was instantiated."
173 )
174 return self._content
176 def write(self, b: bytes) -> int: # type: ignore[override]
177 return self._response.write(b)
180def _curl_handle_for_s3(
181 s3: "S3Client",
182 method: str,
183 s3object: S3Object,
184 extra_params: Mapping[str, str | int] | None = None,
185 headers: Mapping[str, str] | None = None,
186) -> pycurl.Curl:
187 if extra_params is None:
188 extra_params = {}
189 s3object._method = method
190 params: dict[str, str | int] = {"Bucket": s3object.bucket, "Key": s3object.key, **extra_params}
191 url = s3.generate_presigned_url(method, Params=params, ExpiresIn=3600)
192 c = pycurl.Curl()
193 c.s3object = s3object # type: ignore
194 c.setopt(pycurl.USERAGENT, S3_USERAGENT_NAME)
195 c.setopt(pycurl.CONNECTTIMEOUT, S3_TIMEOUT_CONNECT)
196 c.setopt(pycurl.TIMEOUT, S3_TIMEOUT_READ)
197 c.setopt(pycurl.FAILONERROR, True)
198 c.setopt(pycurl.URL, url)
199 c.setopt(pycurl.HEADERFUNCTION, s3object._header_function)
200 if headers:
201 header_strs = [f"{k}: {v}" for k, v in headers.items()]
202 c.setopt(pycurl.HTTPHEADER, header_strs)
203 return c
206# TODO Don't put random attributes on curl like this??
207def c_s3ojb(c: pycurl.Curl) -> S3Object:
208 return c.s3object # type: ignore
211def _curl_should_retry(c: pycurl.Curl, errno: int) -> bool:
212 if errno in (
213 pycurl.E_COULDNT_CONNECT,
214 pycurl.E_SEND_ERROR,
215 pycurl.E_RECV_ERROR,
216 pycurl.E_OPERATION_TIMEDOUT,
217 pycurl.E_PARTIAL_FILE,
218 ):
219 # Retry on network and timeout errors
220 return True
222 if errno == pycurl.E_HTTP_RETURNED_ERROR:
223 s3obj = c_s3ojb(c)
224 if s3obj.status_code in _RETRIABLE_HTTP_STATUS_CODES:
225 # Retry on 'Request Timeout', 'Too Many Requests' and transient server errors
226 return True
228 if error_response := getattr(s3obj.error, "response", None):
229 if error_response["Error"]["Code"] in _RETRIABLE_S3_ERROR_CODES:
230 return True
232 return False
235def _curl_multi_run(
236 objects: Sequence[S3Object], curl_handle_func: Callable[[S3Object], pycurl.Curl], attempt: int = 1
237) -> None:
238 m = _curlLocal.curlmulti
239 for s3object in objects:
240 c = curl_handle_func(s3object)
241 m.add_handle(c)
243 while True:
244 ret, active_handles = m.perform()
245 if ret == pycurl.E_CALL_MULTI_PERFORM:
246 # More processing required
247 continue
249 if active_handles:
250 # Wait for next event
251 m.select(15.0)
252 else:
253 # All operations complete
254 break
256 num_q, ok_list, err_list = m.info_read()
257 assert num_q == 0
259 retry_objects = []
260 for c in ok_list:
261 s3obj = c_s3ojb(c)
262 s3obj.status_code = c.getinfo(pycurl.HTTP_CODE) # type: ignore[no-untyped-call]
263 m.remove_handle(c)
264 c.close()
265 for c, errno, errmsg in err_list:
266 s3obj = c_s3ojb(c)
267 if errno == pycurl.E_HTTP_RETURNED_ERROR:
268 s3obj.status_code = c.getinfo(pycurl.HTTP_CODE) # type: ignore[no-untyped-call]
269 response: dict[str, Any] = {}
270 response["status_code"] = s3obj.status_code
271 response["headers"] = s3obj._response_headers
272 if (errfileobj := s3obj._errfileobj) is None:
273 response["body"] = b""
274 else:
275 errfileobj.seek(0)
276 response["body"] = errfileobj.read()
277 errfileobj.truncate(0)
278 parser = botocore.parsers.RestXMLParser()
279 # TODO: botocore safely handles `None` being passed here, but it is
280 # probably best to rework this to get the correct `Shape` to match
281 # the type hints from boto3-stubs
282 parsed_response = parser.parse(response, None) # type: ignore[arg-type]
283 s3obj.error = botocore.exceptions.ClientError(parsed_response, s3obj._method)
284 else:
285 s3obj.error = pycurl.error(errmsg)
287 if attempt < S3_MAX_RETRIES and _curl_should_retry(c, errno):
288 s3obj.status_code = None
289 s3obj.error = None
290 retry_objects.append(s3obj)
292 m.remove_handle(c)
293 c.close()
295 if retry_objects and attempt < S3_MAX_RETRIES:
296 # Wait between attempts with truncated exponential backoff with jitter
297 exp_backoff = 2 ** (attempt - 1)
298 exp_backoff_with_jitter = random.random() * exp_backoff
299 time.sleep(min(exp_backoff_with_jitter, _MAX_BACKOFF))
301 _curl_multi_run(retry_objects, curl_handle_func, attempt=attempt + 1)
304def head_objects(s3: "S3Client", objects: Sequence[S3Object]) -> None:
305 def curl_handle_func(s3object: S3Object) -> pycurl.Curl:
306 c = _curl_handle_for_s3(s3, "head_object", s3object)
307 c.setopt(pycurl.NOBODY, True)
308 return c
310 _curl_multi_run(objects, curl_handle_func)
313def head_object(s3: "S3Client", s3object: S3Object) -> None:
314 head_objects(s3, [s3object])
315 if s3object.error is not None:
316 raise s3object.error
319def set_s3_timeout(
320 c: pycurl.Curl,
321 s3object: S3Object,
322 timeout_seconds_per_kilobyte: float | None,
323 timeout_min_seconds: float,
324) -> None:
325 timeout = timeout_min_seconds
326 if s3object.filesize is not None and timeout_seconds_per_kilobyte is not None:
327 timeout = max(timeout, s3object.filesize * timeout_seconds_per_kilobyte / 1024)
328 c.setopt(pycurl.TIMEOUT, int(timeout))
331def get_objects(
332 s3: "S3Client",
333 objects: Sequence[S3Object],
334 timeout_seconds_per_kilobyte: float | None = None,
335 timeout_min_seconds: float = S3_TIMEOUT_READ,
336 headers: Mapping[str, str] | None = None,
337) -> None:
338 def curl_handle_func(s3object: S3Object) -> pycurl.Curl:
339 c = _curl_handle_for_s3(s3, "get_object", s3object, headers=headers)
340 c.setopt(pycurl.WRITEDATA, s3object.fileobj)
341 set_s3_timeout(c, s3object, timeout_seconds_per_kilobyte, timeout_min_seconds)
342 s3object._errfileobj = s3object.fileobj
343 return c
345 _curl_multi_run(objects, curl_handle_func)
348def get_object(
349 s3: "S3Client",
350 s3object: S3Object,
351 timeout_seconds_per_kilobyte: float | None = None,
352 timeout_min_seconds: float = S3_TIMEOUT_READ,
353 headers: Mapping[str, str] | None = None,
354) -> None:
355 get_objects(s3, [s3object], timeout_seconds_per_kilobyte, timeout_min_seconds, headers=headers)
356 if s3object.error is not None:
357 raise s3object.error
360def put_objects(
361 s3: "S3Client",
362 objects: Sequence[S3Object],
363 timeout_seconds_per_kilobyte: float | None = None,
364 timeout_min_seconds: float = S3_TIMEOUT_READ,
365) -> None:
366 def curl_handle_func(s3object: S3Object) -> pycurl.Curl:
367 c = _curl_handle_for_s3(s3, "put_object", s3object)
368 c.setopt(pycurl.READDATA, s3object.fileobj)
369 c.setopt(pycurl.INFILESIZE_LARGE, s3object.filesize)
370 c.setopt(pycurl.UPLOAD, 1)
371 set_s3_timeout(c, s3object, timeout_seconds_per_kilobyte, timeout_min_seconds)
372 s3object._errfileobj = io.BytesIO()
373 c.setopt(pycurl.WRITEDATA, s3object._errfileobj)
374 return c
376 _curl_multi_run(objects, curl_handle_func)
379def put_object(
380 s3: "S3Client",
381 s3object: S3Object,
382 timeout_seconds_per_kilobyte: float | None = None,
383 timeout_min_seconds: float = S3_TIMEOUT_READ,
384) -> None:
385 put_objects(s3, [s3object], timeout_seconds_per_kilobyte, timeout_min_seconds)
386 if s3object.error is not None:
387 raise s3object.error
390ParsedResponse = dict[str, str | dict[str, str | int | dict[str, str]]]
393def _parse_s3_response(s3: "S3Client", response: BinaryIO, s3object: S3Object, shape_name: str) -> ParsedResponse:
394 response_dict: dict[str, int | None | dict[str, str] | bytes] = {}
395 response_dict["status_code"] = s3object.status_code
396 response_dict["headers"] = s3object.response_headers
397 response.seek(0)
398 response_dict["body"] = response.read()
400 parser = botocore.parsers.RestXMLParser()
401 shape = s3.meta.service_model.shape_for(shape_name)
402 return cast(ParsedResponse, parser.parse(response_dict, shape))
405def start_multipart_upload(s3: "S3Client", s3object: S3Object) -> str:
406 response = io.BytesIO()
408 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl:
409 c = _curl_handle_for_s3(s3, "create_multipart_upload", _s3object)
410 c.setopt(pycurl.WRITEDATA, response)
411 c.setopt(pycurl.POST, 1)
412 return c
414 _curl_multi_run([s3object], curl_handle_func)
415 if s3object.error is not None:
416 raise s3object.error
418 parsed_response = _parse_s3_response(s3, response, s3object, "CreateMultipartUploadOutput")
419 return parsed_response["UploadId"] # type: ignore
422def upload_parts(s3: "S3Client", s3object: S3Object, parts: Sequence[UploadPart]) -> dict[int, str]:
423 s3object_map = {S3Object(s3object.bucket, s3object.key): part for part in parts}
425 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl:
426 part = s3object_map[_s3object]
427 params: dict[str, int | str] = {"UploadId": part.upload_id, "PartNumber": part.number}
428 c = _curl_handle_for_s3(s3, "upload_part", _s3object, extra_params=params)
429 c.setopt(pycurl.READDATA, part)
430 c.setopt(pycurl.UPLOAD, 1)
431 c.setopt(pycurl.INFILESIZE_LARGE, len(part))
432 c.setopt(pycurl.WRITEDATA, part)
433 return c
435 _curl_multi_run(list(s3object_map.keys()), curl_handle_func)
437 errors: list[Exception] = [_s3object.error for _s3object in s3object_map.keys() if _s3object.error is not None]
438 if len(errors) > 0:
439 raise errors[0]
441 uploaded: dict[int, str] = {}
442 for _s3object, part in s3object_map.items():
443 response = _parse_s3_response(s3, part.response, _s3object, "UploadPartOutput")
444 uploaded[part.number] = response["ResponseMetadata"]["HTTPHeaders"]["etag"] # type: ignore
445 return uploaded
448def complete_multipart_upload(
449 s3: "S3Client", s3object: S3Object, upload_id: str, parts: Mapping[int, str]
450) -> ParsedResponse:
451 # Use the boto3 client directly here, rather than a presigned URL. This is
452 # necessary because boto3's URL presigning is broken for `complete_multipart_upload`
453 # when using s3v4 auth.
454 #
455 # See https://github.com/boto/boto3/issues/2192
456 return cast(
457 ParsedResponse,
458 s3.complete_multipart_upload(
459 Bucket=s3object.bucket,
460 Key=s3object.key,
461 UploadId=upload_id,
462 MultipartUpload={"Parts": [{"ETag": tag, "PartNumber": number} for number, tag in parts.items()]},
463 ),
464 )
467def _list_multipart_parts(s3: "S3Client", s3object: S3Object, upload_id: str) -> ParsedResponse:
468 response = io.BytesIO()
470 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl:
471 params = {"UploadId": upload_id}
472 c = _curl_handle_for_s3(s3, "list_parts", _s3object, extra_params=params)
473 c.setopt(pycurl.WRITEDATA, response)
474 return c
476 _curl_multi_run([s3object], curl_handle_func)
477 if s3object.error is not None:
478 raise s3object.error
479 return _parse_s3_response(s3, response, s3object, "ListPartsOutput")
482def abort_multipart_upload(s3: "S3Client", s3object: S3Object, upload_id: str) -> None:
483 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl:
484 params = {"UploadId": upload_id}
485 c = _curl_handle_for_s3(s3, "abort_multipart_upload", _s3object, extra_params=params)
486 c.setopt(pycurl.CUSTOMREQUEST, "DELETE")
487 return c
489 parts = _list_multipart_parts(s3, s3object, upload_id)
491 # We need to iterate here in case any part uploads slip through in a race
492 # against the AbortMultipartUpload call.
493 #
494 # See https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
495 while len(parts.get("Parts", [])) > 0:
496 _curl_multi_run([s3object], curl_handle_func)
497 try:
498 parts = _list_multipart_parts(s3, s3object, upload_id)
499 except botocore.exceptions.ClientError as e:
500 code = e.response.get("Error", {}).get("Code")
501 if code == "404":
502 # 404 error here means that the multipart upload is properly aborted.
503 break
504 raise e
507def multipart_upload(s3: "S3Client", s3object: S3Object) -> None:
508 if s3object.fileobj is None or s3object.filesize is None:
509 raise TypeError("S3Object provided to multipart upload didn't contain a file.")
511 upload_id = start_multipart_upload(s3, s3object)
513 try:
514 part_number = 1
515 parts: dict[int, str] = {}
516 queue: list[UploadPart] = []
517 while (part_number - 1) * S3_MULTIPART_PART_SIZE < s3object.filesize:
518 part = UploadPart(
519 upload_id=upload_id,
520 number=part_number,
521 file=s3object.fileobj,
522 eof=s3object.filesize,
523 size=S3_MULTIPART_PART_SIZE,
524 offset=(part_number - 1) * S3_MULTIPART_PART_SIZE,
525 )
526 queue.append(part)
528 part_number += 1
530 if len(queue) >= S3_MULTIPART_MAX_CONCURRENT_PARTS:
531 uploaded = upload_parts(s3, s3object, queue)
532 parts.update(uploaded)
533 queue = []
534 uploaded = upload_parts(s3, s3object, queue)
535 parts.update(uploaded)
537 complete_multipart_upload(s3, s3object, upload_id, parts)
538 except Exception as e:
539 abort_multipart_upload(s3, s3object, upload_id)
540 raise e
543def stream_multipart_upload(s3: "S3Client", s3object: S3Object, chunks: Iterator[bytes]) -> None:
544 upload_id = start_multipart_upload(s3, s3object)
546 try:
547 part_number = 1
548 parts: dict[int, str] = {}
550 with TemporaryFile() as buffer:
551 for chunk in chunks:
552 buffer.write(chunk)
553 if buffer.tell() < S3_MULTIPART_PART_SIZE:
554 continue
555 size = buffer.tell()
556 buffer.seek(0)
557 part = UploadPart(
558 upload_id=upload_id,
559 number=part_number,
560 file=buffer,
561 eof=size,
562 size=size,
563 offset=0,
564 )
566 uploaded = upload_parts(s3, s3object, [part])
567 parts.update(uploaded)
568 part_number += 1
569 # Reset the buffer
570 buffer.truncate(0)
571 buffer.seek(0)
573 if buffer.tell() > 0:
574 size = buffer.tell()
575 buffer.seek(0)
576 part = UploadPart(
577 upload_id=upload_id,
578 number=part_number,
579 file=buffer,
580 eof=size,
581 size=size,
582 offset=0,
583 )
584 uploaded = upload_parts(s3, s3object, [part])
585 parts.update(uploaded)
587 complete_multipart_upload(s3, s3object, upload_id, parts)
588 except Exception as e:
589 abort_multipart_upload(s3, s3object, upload_id)
590 raise e