Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/s3/s3utils.py: 89.61%

337 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-21 15:45 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import io 

16import os 

17import random 

18import threading 

19import time 

20from tempfile import TemporaryFile 

21from typing import IO, TYPE_CHECKING, Any, BinaryIO, Callable, Iterator, Mapping, Sequence, cast 

22 

23import botocore 

24import pycurl 

25 

26from buildgrid.server.settings import ( 

27 S3_MAX_RETRIES, 

28 S3_MULTIPART_MAX_CONCURRENT_PARTS, 

29 S3_MULTIPART_PART_SIZE, 

30 S3_TIMEOUT_CONNECT, 

31 S3_TIMEOUT_READ, 

32 S3_USERAGENT_NAME, 

33) 

34 

35# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html 

36_RETRIABLE_HTTP_STATUS_CODES = (408, 429, 500, 502, 503, 504, 509) 

37_RETRIABLE_S3_ERROR_CODES = ( 

38 "Throttling", 

39 "ThrottlingException", 

40 "ThrottledException", 

41 "RequestThrottledException", 

42 "ProvisionedThroughputExceededException", 

43) 

44# Maximum backoff in seconds 

45_MAX_BACKOFF = 20 

46# Maximum requests to run in parallel via CurlMulti 

47_MAX_CURLMULTI_CONNECTIONS = 10 

48 

49if TYPE_CHECKING: 

50 from mypy_boto3_s3 import Client as S3Client 

51 

52 

53class _CurlLocal(threading.local): 

54 def __init__(self) -> None: 

55 self.curlmulti = pycurl.CurlMulti() 

56 self.curlmulti.setopt(pycurl.M_MAX_TOTAL_CONNECTIONS, _MAX_CURLMULTI_CONNECTIONS) 

57 

58 

59_curlLocal = _CurlLocal() 

60 

61 

62class S3Object: 

63 def __init__(self, bucket: str, key: str) -> None: 

64 self.bucket = bucket 

65 self.key = key 

66 self.fileobj: IO[bytes] | None = None 

67 self.filesize: int | None = None 

68 self.error: Exception | None = None 

69 self.status_code: int | None = None 

70 self._method: str = "" 

71 self._errfileobj: IO[bytes] | None = None 

72 self._response_headers: dict[str, str] = {} 

73 

74 @property 

75 def response_headers(self) -> dict[str, str]: 

76 return self._response_headers 

77 

78 # Function to process HTTP response headers 

79 def _header_function(self, header_line: bytes) -> None: 

80 header = header_line.decode("ascii") 

81 

82 # Skip status line 

83 if ":" not in header: 

84 return 

85 

86 name, value = header.split(":", maxsplit=1) 

87 name = name.strip().lower() 

88 value = value.strip() 

89 

90 self._response_headers[name] = value 

91 

92 

93class UploadPart(io.BufferedIOBase): 

94 def __init__(self, upload_id: str, number: int, file: IO[bytes], eof: int, size: int, offset: int) -> None: 

95 super().__init__() 

96 self._upload_id = upload_id 

97 self._number = number 

98 self._response = io.BytesIO() 

99 self._file = file 

100 self._content = None 

101 try: 

102 self._fd: int | None = file.fileno() 

103 except OSError: 

104 # The "file" doesn't have a file descriptor, its probably a BytesIO. 

105 # Read our part now so that we don't need to cope with thread safety 

106 # when `UploadPart.read` is called. 

107 self._fd = None 

108 old_position = file.tell() 

109 file.seek(offset) 

110 self._content = file.read(size) 

111 file.seek(old_position) 

112 

113 self._size = size 

114 self._start = offset 

115 self._end = min(eof, offset + size) 

116 self._read_offset = 0 

117 

118 @property 

119 def upload_id(self) -> str: 

120 return self._upload_id 

121 

122 @property 

123 def number(self) -> int: 

124 return self._number 

125 

126 @property 

127 def response(self) -> BinaryIO: 

128 return self._response 

129 

130 def __len__(self) -> int: 

131 return self._end - self._start 

132 

133 def readable(self) -> bool: 

134 return True 

135 

136 def seekable(self) -> bool: 

137 return False 

138 

139 def writable(self) -> bool: 

140 return True 

141 

142 def read(self, size: int | None = -1) -> bytes: 

143 # If we have a real file underlying this part, then we want to do an 

144 # `os.pread` for just the part that is relevant. 

145 if self._fd is not None: 

146 if size is None or size == -1: 

147 size = self._size 

148 

149 # Calculate the actual read offset and make sure we're within our 

150 # section of the file. 

151 offset = self._start + self._read_offset 

152 if offset >= self._end: 

153 return b"" 

154 

155 # Make sure we only read up to the end of our section of the file, 

156 # in case the size requested is larger than the number of bytes 

157 # remaining in our section 

158 size = min(size, self._end - offset) 

159 content = os.pread(self._fd, size, offset) 

160 self._read_offset += size 

161 return content 

162 

163 # Otherwise we can just return our pre-determined slice of the actual 

164 # contents. This case should only be reached when MAX_IN_MEMORY_BLOB_SIZE_BYTES 

165 # is the same as or larger than S3_MAX_UPLOAD_SIZE, which should ideally 

166 # never be the case. 

167 else: 

168 if self._content is None: 

169 raise ValueError( 

170 f"Part {self._number} of upload {self._upload_id} is backed " 

171 "by a BytesIO but the content couldn't be read when the part " 

172 "was instantiated." 

173 ) 

174 return self._content 

175 

176 def write(self, b: bytes) -> int: # type: ignore[override] 

177 return self._response.write(b) 

178 

179 

180def _curl_handle_for_s3( 

181 s3: "S3Client", 

182 method: str, 

183 s3object: S3Object, 

184 extra_params: Mapping[str, str | int] | None = None, 

185 headers: Mapping[str, str] | None = None, 

186) -> pycurl.Curl: 

187 if extra_params is None: 

188 extra_params = {} 

189 s3object._method = method 

190 params: dict[str, str | int] = {"Bucket": s3object.bucket, "Key": s3object.key, **extra_params} 

191 url = s3.generate_presigned_url(method, Params=params, ExpiresIn=3600) 

192 c = pycurl.Curl() 

193 c.s3object = s3object # type: ignore 

194 c.setopt(pycurl.USERAGENT, S3_USERAGENT_NAME) 

195 c.setopt(pycurl.CONNECTTIMEOUT, S3_TIMEOUT_CONNECT) 

196 c.setopt(pycurl.TIMEOUT, S3_TIMEOUT_READ) 

197 c.setopt(pycurl.FAILONERROR, True) 

198 c.setopt(pycurl.URL, url) 

199 c.setopt(pycurl.HEADERFUNCTION, s3object._header_function) 

200 if headers: 

201 header_strs = [f"{k}: {v}" for k, v in headers.items()] 

202 c.setopt(pycurl.HTTPHEADER, header_strs) 

203 return c 

204 

205 

206# TODO Don't put random attributes on curl like this?? 

207def c_s3ojb(c: pycurl.Curl) -> S3Object: 

208 return c.s3object # type: ignore 

209 

210 

211def _curl_should_retry(c: pycurl.Curl, errno: int) -> bool: 

212 if errno in ( 

213 pycurl.E_COULDNT_CONNECT, 

214 pycurl.E_SEND_ERROR, 

215 pycurl.E_RECV_ERROR, 

216 pycurl.E_OPERATION_TIMEDOUT, 

217 pycurl.E_PARTIAL_FILE, 

218 ): 

219 # Retry on network and timeout errors 

220 return True 

221 

222 if errno == pycurl.E_HTTP_RETURNED_ERROR: 

223 s3obj = c_s3ojb(c) 

224 if s3obj.status_code in _RETRIABLE_HTTP_STATUS_CODES: 

225 # Retry on 'Request Timeout', 'Too Many Requests' and transient server errors 

226 return True 

227 

228 if error_response := getattr(s3obj.error, "response", None): 

229 if error_response["Error"]["Code"] in _RETRIABLE_S3_ERROR_CODES: 

230 return True 

231 

232 return False 

233 

234 

235def _curl_multi_run( 

236 objects: Sequence[S3Object], curl_handle_func: Callable[[S3Object], pycurl.Curl], attempt: int = 1 

237) -> None: 

238 m = _curlLocal.curlmulti 

239 for s3object in objects: 

240 c = curl_handle_func(s3object) 

241 m.add_handle(c) 

242 

243 while True: 

244 ret, active_handles = m.perform() 

245 if ret == pycurl.E_CALL_MULTI_PERFORM: 

246 # More processing required 

247 continue 

248 

249 if active_handles: 

250 # Wait for next event 

251 m.select(15.0) 

252 else: 

253 # All operations complete 

254 break 

255 

256 num_q, ok_list, err_list = m.info_read() 

257 assert num_q == 0 

258 

259 retry_objects = [] 

260 for c in ok_list: 

261 s3obj = c_s3ojb(c) 

262 s3obj.status_code = c.getinfo(pycurl.HTTP_CODE) # type: ignore[no-untyped-call] 

263 m.remove_handle(c) 

264 c.close() 

265 for c, errno, errmsg in err_list: 

266 s3obj = c_s3ojb(c) 

267 if errno == pycurl.E_HTTP_RETURNED_ERROR: 

268 s3obj.status_code = c.getinfo(pycurl.HTTP_CODE) # type: ignore[no-untyped-call] 

269 response: dict[str, Any] = {} 

270 response["status_code"] = s3obj.status_code 

271 response["headers"] = s3obj._response_headers 

272 if (errfileobj := s3obj._errfileobj) is None: 

273 response["body"] = b"" 

274 else: 

275 errfileobj.seek(0) 

276 response["body"] = errfileobj.read() 

277 errfileobj.truncate(0) 

278 parser = botocore.parsers.RestXMLParser() 

279 # TODO: botocore safely handles `None` being passed here, but it is 

280 # probably best to rework this to get the correct `Shape` to match 

281 # the type hints from boto3-stubs 

282 parsed_response = parser.parse(response, None) # type: ignore[arg-type] 

283 s3obj.error = botocore.exceptions.ClientError(parsed_response, s3obj._method) 

284 else: 

285 s3obj.error = pycurl.error(errmsg) 

286 

287 if attempt < S3_MAX_RETRIES and _curl_should_retry(c, errno): 

288 s3obj.status_code = None 

289 s3obj.error = None 

290 retry_objects.append(s3obj) 

291 

292 m.remove_handle(c) 

293 c.close() 

294 

295 if retry_objects and attempt < S3_MAX_RETRIES: 

296 # Wait between attempts with truncated exponential backoff with jitter 

297 exp_backoff = 2 ** (attempt - 1) 

298 exp_backoff_with_jitter = random.random() * exp_backoff 

299 time.sleep(min(exp_backoff_with_jitter, _MAX_BACKOFF)) 

300 

301 _curl_multi_run(retry_objects, curl_handle_func, attempt=attempt + 1) 

302 

303 

304def head_objects(s3: "S3Client", objects: Sequence[S3Object]) -> None: 

305 def curl_handle_func(s3object: S3Object) -> pycurl.Curl: 

306 c = _curl_handle_for_s3(s3, "head_object", s3object) 

307 c.setopt(pycurl.NOBODY, True) 

308 return c 

309 

310 _curl_multi_run(objects, curl_handle_func) 

311 

312 

313def head_object(s3: "S3Client", s3object: S3Object) -> None: 

314 head_objects(s3, [s3object]) 

315 if s3object.error is not None: 

316 raise s3object.error 

317 

318 

319def set_s3_timeout( 

320 c: pycurl.Curl, 

321 s3object: S3Object, 

322 timeout_seconds_per_kilobyte: float | None, 

323 timeout_min_seconds: float, 

324) -> None: 

325 timeout = timeout_min_seconds 

326 if s3object.filesize is not None and timeout_seconds_per_kilobyte is not None: 

327 timeout = max(timeout, s3object.filesize * timeout_seconds_per_kilobyte / 1024) 

328 c.setopt(pycurl.TIMEOUT, int(timeout)) 

329 

330 

331def get_objects( 

332 s3: "S3Client", 

333 objects: Sequence[S3Object], 

334 timeout_seconds_per_kilobyte: float | None = None, 

335 timeout_min_seconds: float = S3_TIMEOUT_READ, 

336 headers: Mapping[str, str] | None = None, 

337) -> None: 

338 def curl_handle_func(s3object: S3Object) -> pycurl.Curl: 

339 c = _curl_handle_for_s3(s3, "get_object", s3object, headers=headers) 

340 c.setopt(pycurl.WRITEDATA, s3object.fileobj) 

341 set_s3_timeout(c, s3object, timeout_seconds_per_kilobyte, timeout_min_seconds) 

342 s3object._errfileobj = s3object.fileobj 

343 return c 

344 

345 _curl_multi_run(objects, curl_handle_func) 

346 

347 

348def get_object( 

349 s3: "S3Client", 

350 s3object: S3Object, 

351 timeout_seconds_per_kilobyte: float | None = None, 

352 timeout_min_seconds: float = S3_TIMEOUT_READ, 

353 headers: Mapping[str, str] | None = None, 

354) -> None: 

355 get_objects(s3, [s3object], timeout_seconds_per_kilobyte, timeout_min_seconds, headers=headers) 

356 if s3object.error is not None: 

357 raise s3object.error 

358 

359 

360def put_objects( 

361 s3: "S3Client", 

362 objects: Sequence[S3Object], 

363 timeout_seconds_per_kilobyte: float | None = None, 

364 timeout_min_seconds: float = S3_TIMEOUT_READ, 

365) -> None: 

366 def curl_handle_func(s3object: S3Object) -> pycurl.Curl: 

367 c = _curl_handle_for_s3(s3, "put_object", s3object) 

368 c.setopt(pycurl.READDATA, s3object.fileobj) 

369 c.setopt(pycurl.INFILESIZE_LARGE, s3object.filesize) 

370 c.setopt(pycurl.UPLOAD, 1) 

371 set_s3_timeout(c, s3object, timeout_seconds_per_kilobyte, timeout_min_seconds) 

372 s3object._errfileobj = io.BytesIO() 

373 c.setopt(pycurl.WRITEDATA, s3object._errfileobj) 

374 return c 

375 

376 _curl_multi_run(objects, curl_handle_func) 

377 

378 

379def put_object( 

380 s3: "S3Client", 

381 s3object: S3Object, 

382 timeout_seconds_per_kilobyte: float | None = None, 

383 timeout_min_seconds: float = S3_TIMEOUT_READ, 

384) -> None: 

385 put_objects(s3, [s3object], timeout_seconds_per_kilobyte, timeout_min_seconds) 

386 if s3object.error is not None: 

387 raise s3object.error 

388 

389 

390ParsedResponse = dict[str, str | dict[str, str | int | dict[str, str]]] 

391 

392 

393def _parse_s3_response(s3: "S3Client", response: BinaryIO, s3object: S3Object, shape_name: str) -> ParsedResponse: 

394 response_dict: dict[str, int | None | dict[str, str] | bytes] = {} 

395 response_dict["status_code"] = s3object.status_code 

396 response_dict["headers"] = s3object.response_headers 

397 response.seek(0) 

398 response_dict["body"] = response.read() 

399 

400 parser = botocore.parsers.RestXMLParser() 

401 shape = s3.meta.service_model.shape_for(shape_name) 

402 return cast(ParsedResponse, parser.parse(response_dict, shape)) 

403 

404 

405def start_multipart_upload(s3: "S3Client", s3object: S3Object) -> str: 

406 response = io.BytesIO() 

407 

408 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl: 

409 c = _curl_handle_for_s3(s3, "create_multipart_upload", _s3object) 

410 c.setopt(pycurl.WRITEDATA, response) 

411 c.setopt(pycurl.POST, 1) 

412 return c 

413 

414 _curl_multi_run([s3object], curl_handle_func) 

415 if s3object.error is not None: 

416 raise s3object.error 

417 

418 parsed_response = _parse_s3_response(s3, response, s3object, "CreateMultipartUploadOutput") 

419 return parsed_response["UploadId"] # type: ignore 

420 

421 

422def upload_parts(s3: "S3Client", s3object: S3Object, parts: Sequence[UploadPart]) -> dict[int, str]: 

423 s3object_map = {S3Object(s3object.bucket, s3object.key): part for part in parts} 

424 

425 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl: 

426 part = s3object_map[_s3object] 

427 params: dict[str, int | str] = {"UploadId": part.upload_id, "PartNumber": part.number} 

428 c = _curl_handle_for_s3(s3, "upload_part", _s3object, extra_params=params) 

429 c.setopt(pycurl.READDATA, part) 

430 c.setopt(pycurl.UPLOAD, 1) 

431 c.setopt(pycurl.INFILESIZE_LARGE, len(part)) 

432 c.setopt(pycurl.WRITEDATA, part) 

433 return c 

434 

435 _curl_multi_run(list(s3object_map.keys()), curl_handle_func) 

436 

437 errors: list[Exception] = [_s3object.error for _s3object in s3object_map.keys() if _s3object.error is not None] 

438 if len(errors) > 0: 

439 raise errors[0] 

440 

441 uploaded: dict[int, str] = {} 

442 for _s3object, part in s3object_map.items(): 

443 response = _parse_s3_response(s3, part.response, _s3object, "UploadPartOutput") 

444 uploaded[part.number] = response["ResponseMetadata"]["HTTPHeaders"]["etag"] # type: ignore 

445 return uploaded 

446 

447 

448def complete_multipart_upload( 

449 s3: "S3Client", s3object: S3Object, upload_id: str, parts: Mapping[int, str] 

450) -> ParsedResponse: 

451 # Use the boto3 client directly here, rather than a presigned URL. This is 

452 # necessary because boto3's URL presigning is broken for `complete_multipart_upload` 

453 # when using s3v4 auth. 

454 # 

455 # See https://github.com/boto/boto3/issues/2192 

456 return cast( 

457 ParsedResponse, 

458 s3.complete_multipart_upload( 

459 Bucket=s3object.bucket, 

460 Key=s3object.key, 

461 UploadId=upload_id, 

462 MultipartUpload={"Parts": [{"ETag": tag, "PartNumber": number} for number, tag in parts.items()]}, 

463 ), 

464 ) 

465 

466 

467def _list_multipart_parts(s3: "S3Client", s3object: S3Object, upload_id: str) -> ParsedResponse: 

468 response = io.BytesIO() 

469 

470 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl: 

471 params = {"UploadId": upload_id} 

472 c = _curl_handle_for_s3(s3, "list_parts", _s3object, extra_params=params) 

473 c.setopt(pycurl.WRITEDATA, response) 

474 return c 

475 

476 _curl_multi_run([s3object], curl_handle_func) 

477 if s3object.error is not None: 

478 raise s3object.error 

479 return _parse_s3_response(s3, response, s3object, "ListPartsOutput") 

480 

481 

482def abort_multipart_upload(s3: "S3Client", s3object: S3Object, upload_id: str) -> None: 

483 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl: 

484 params = {"UploadId": upload_id} 

485 c = _curl_handle_for_s3(s3, "abort_multipart_upload", _s3object, extra_params=params) 

486 c.setopt(pycurl.CUSTOMREQUEST, "DELETE") 

487 return c 

488 

489 parts = _list_multipart_parts(s3, s3object, upload_id) 

490 

491 # We need to iterate here in case any part uploads slip through in a race 

492 # against the AbortMultipartUpload call. 

493 # 

494 # See https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html 

495 while len(parts.get("Parts", [])) > 0: 

496 _curl_multi_run([s3object], curl_handle_func) 

497 try: 

498 parts = _list_multipart_parts(s3, s3object, upload_id) 

499 except botocore.exceptions.ClientError as e: 

500 code = e.response.get("Error", {}).get("Code") 

501 if code == "404": 

502 # 404 error here means that the multipart upload is properly aborted. 

503 break 

504 raise e 

505 

506 

507def multipart_upload(s3: "S3Client", s3object: S3Object) -> None: 

508 if s3object.fileobj is None or s3object.filesize is None: 

509 raise TypeError("S3Object provided to multipart upload didn't contain a file.") 

510 

511 upload_id = start_multipart_upload(s3, s3object) 

512 

513 try: 

514 part_number = 1 

515 parts: dict[int, str] = {} 

516 queue: list[UploadPart] = [] 

517 while (part_number - 1) * S3_MULTIPART_PART_SIZE < s3object.filesize: 

518 part = UploadPart( 

519 upload_id=upload_id, 

520 number=part_number, 

521 file=s3object.fileobj, 

522 eof=s3object.filesize, 

523 size=S3_MULTIPART_PART_SIZE, 

524 offset=(part_number - 1) * S3_MULTIPART_PART_SIZE, 

525 ) 

526 queue.append(part) 

527 

528 part_number += 1 

529 

530 if len(queue) >= S3_MULTIPART_MAX_CONCURRENT_PARTS: 

531 uploaded = upload_parts(s3, s3object, queue) 

532 parts.update(uploaded) 

533 queue = [] 

534 uploaded = upload_parts(s3, s3object, queue) 

535 parts.update(uploaded) 

536 

537 complete_multipart_upload(s3, s3object, upload_id, parts) 

538 except Exception as e: 

539 abort_multipart_upload(s3, s3object, upload_id) 

540 raise e 

541 

542 

543def stream_multipart_upload(s3: "S3Client", s3object: S3Object, chunks: Iterator[bytes]) -> None: 

544 upload_id = start_multipart_upload(s3, s3object) 

545 

546 try: 

547 part_number = 1 

548 parts: dict[int, str] = {} 

549 

550 with TemporaryFile() as buffer: 

551 for chunk in chunks: 

552 buffer.write(chunk) 

553 if buffer.tell() < S3_MULTIPART_PART_SIZE: 

554 continue 

555 size = buffer.tell() 

556 buffer.seek(0) 

557 part = UploadPart( 

558 upload_id=upload_id, 

559 number=part_number, 

560 file=buffer, 

561 eof=size, 

562 size=size, 

563 offset=0, 

564 ) 

565 

566 uploaded = upload_parts(s3, s3object, [part]) 

567 parts.update(uploaded) 

568 part_number += 1 

569 # Reset the buffer 

570 buffer.truncate(0) 

571 buffer.seek(0) 

572 

573 if buffer.tell() > 0: 

574 size = buffer.tell() 

575 buffer.seek(0) 

576 part = UploadPart( 

577 upload_id=upload_id, 

578 number=part_number, 

579 file=buffer, 

580 eof=size, 

581 size=size, 

582 offset=0, 

583 ) 

584 uploaded = upload_parts(s3, s3object, [part]) 

585 parts.update(uploaded) 

586 

587 complete_multipart_upload(s3, s3object, upload_id, parts) 

588 except Exception as e: 

589 abort_multipart_upload(s3, s3object, upload_id) 

590 raise e