Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/s3/s3utils.py: 84.21%

304 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-04-15 14:01 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import io 

16import os 

17import random 

18import threading 

19import time 

20from typing import IO, TYPE_CHECKING, Any, BinaryIO, Callable, Dict, List, Mapping, Optional, Sequence, Union, cast 

21 

22import botocore 

23import pycurl 

24 

25from buildgrid.settings import ( 

26 S3_MAX_RETRIES, 

27 S3_MULTIPART_MAX_CONCURRENT_PARTS, 

28 S3_MULTIPART_PART_SIZE, 

29 S3_TIMEOUT_CONNECT, 

30 S3_TIMEOUT_READ, 

31 S3_USERAGENT_NAME, 

32) 

33 

34# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html 

35_RETRIABLE_HTTP_STATUS_CODES = (408, 429, 500, 502, 503, 504, 509) 

36_RETRIABLE_S3_ERROR_CODES = ( 

37 "Throttling", 

38 "ThrottlingException", 

39 "ThrottledException", 

40 "RequestThrottledException", 

41 "ProvisionedThroughputExceededException", 

42) 

43# Maximum backoff in seconds 

44_MAX_BACKOFF = 20 

45# Maximum requests to run in parallel via CurlMulti 

46_MAX_CURLMULTI_CONNECTIONS = 10 

47 

48if TYPE_CHECKING: 

49 from mypy_boto3_s3 import Client as S3Client 

50 

51 

52class _CurlLocal(threading.local): 

53 def __init__(self) -> None: 

54 self.curlmulti = pycurl.CurlMulti() 

55 self.curlmulti.setopt(pycurl.M_MAX_TOTAL_CONNECTIONS, _MAX_CURLMULTI_CONNECTIONS) 

56 

57 

58_curlLocal = _CurlLocal() 

59 

60 

61class S3Object: 

62 def __init__(self, bucket: str, key: str) -> None: 

63 self.bucket = bucket 

64 self.key = key 

65 self.fileobj: Optional[IO[bytes]] = None 

66 self.filesize: Optional[int] = None 

67 self.error: Optional[Exception] = None 

68 self.status_code: Optional[int] = None 

69 self._method: str = "" 

70 self._errfileobj: Optional[IO[bytes]] = None 

71 self._response_headers: Dict[str, str] = {} 

72 

73 @property 

74 def response_headers(self) -> Dict[str, str]: 

75 return self._response_headers 

76 

77 # Function to process HTTP response headers 

78 def _header_function(self, header_line: bytes) -> None: 

79 header = header_line.decode("ascii") 

80 

81 # Skip status line 

82 if ":" not in header: 

83 return 

84 

85 name, value = header.split(":", maxsplit=1) 

86 name = name.strip().lower() 

87 value = value.strip() 

88 

89 self._response_headers[name] = value 

90 

91 

92class UploadPart(io.BufferedIOBase): 

93 def __init__(self, upload_id: str, number: int, file: IO[bytes], eof: int, size: int, offset: int) -> None: 

94 super().__init__() 

95 self._upload_id = upload_id 

96 self._number = number 

97 self._response = io.BytesIO() 

98 self._file = file 

99 self._content = None 

100 try: 

101 self._fd: Optional[int] = file.fileno() 

102 except OSError: 

103 # The "file" doesn't have a file descriptor, its probably a BytesIO. 

104 # Read our part now so that we don't need to cope with thread safety 

105 # when `UploadPart.read` is called. 

106 self._fd = None 

107 old_position = file.tell() 

108 file.seek(offset) 

109 self._content = file.read(size) 

110 file.seek(old_position) 

111 

112 self._size = size 

113 self._start = offset 

114 self._end = min(eof, offset + size) 

115 self._read_offset = 0 

116 

117 @property 

118 def upload_id(self) -> str: 

119 return self._upload_id 

120 

121 @property 

122 def number(self) -> int: 

123 return self._number 

124 

125 @property 

126 def response(self) -> BinaryIO: 

127 return self._response 

128 

129 def __len__(self) -> int: 

130 return self._end - self._start 

131 

132 def readable(self) -> bool: 

133 return True 

134 

135 def seekable(self) -> bool: 

136 return False 

137 

138 def writable(self) -> bool: 

139 return True 

140 

141 def read(self, size: Optional[int] = -1) -> bytes: 

142 # If we have a real file underlying this part, then we want to do an 

143 # `os.pread` for just the part that is relevant. 

144 if self._fd is not None: 

145 if size is None or size == -1: 

146 size = self._size 

147 

148 # Calculate the actual read offset and make sure we're within our 

149 # section of the file. 

150 offset = self._start + self._read_offset 

151 if offset >= self._end: 

152 return b"" 

153 

154 # Make sure we only read up to the end of our section of the file, 

155 # in case the size requested is larger than the number of bytes 

156 # remaining in our section 

157 size = min(size, self._end - offset) 

158 content = os.pread(self._fd, size, offset) 

159 self._read_offset += size 

160 return content 

161 

162 # Otherwise we can just return our pre-determined slice of the actual 

163 # contents. This case should only be reached when MAX_IN_MEMORY_BLOB_SIZE_BYTES 

164 # is the same as or larger than S3_MAX_UPLOAD_SIZE, which should ideally 

165 # never be the case. 

166 else: 

167 if self._content is None: 

168 raise ValueError( 

169 f"Part {self._number} of upload {self._upload_id} is backed " 

170 "by a BytesIO but the content couldn't be read when the part " 

171 "was instantiated." 

172 ) 

173 return self._content 

174 

175 def write(self, b: bytes) -> int: # type: ignore[override] 

176 return self._response.write(b) 

177 

178 

179def _curl_handle_for_s3( 

180 s3: "S3Client", method: str, s3object: S3Object, extra_params: Optional[Mapping[str, Union[str, int]]] = None 

181) -> pycurl.Curl: 

182 if extra_params is None: 

183 extra_params = {} 

184 s3object._method = method 

185 params: Dict[str, Union[str, int]] = {"Bucket": s3object.bucket, "Key": s3object.key, **extra_params} 

186 url = s3.generate_presigned_url(method, Params=params, ExpiresIn=3600) 

187 c = pycurl.Curl() 

188 c.s3object = s3object # type: ignore 

189 c.setopt(pycurl.USERAGENT, S3_USERAGENT_NAME) 

190 c.setopt(pycurl.CONNECTTIMEOUT, S3_TIMEOUT_CONNECT) 

191 c.setopt(pycurl.TIMEOUT, S3_TIMEOUT_READ) 

192 c.setopt(pycurl.FAILONERROR, True) 

193 c.setopt(pycurl.URL, url) 

194 c.setopt(pycurl.HEADERFUNCTION, s3object._header_function) 

195 return c 

196 

197 

198# TODO Don't put random attributes on curl like this?? 

199def c_s3ojb(c: pycurl.Curl) -> S3Object: 

200 return c.s3object # type: ignore 

201 

202 

203def _curl_should_retry(c: pycurl.Curl, errno: int) -> bool: 

204 if errno in ( 

205 pycurl.E_COULDNT_CONNECT, 

206 pycurl.E_SEND_ERROR, 

207 pycurl.E_RECV_ERROR, 

208 pycurl.E_OPERATION_TIMEDOUT, 

209 pycurl.E_PARTIAL_FILE, 

210 ): 

211 # Retry on network and timeout errors 

212 return True 

213 

214 if errno == pycurl.E_HTTP_RETURNED_ERROR: 

215 s3obj = c_s3ojb(c) 

216 if s3obj.status_code in _RETRIABLE_HTTP_STATUS_CODES: 

217 # Retry on 'Request Timeout', 'Too Many Requests' and transient server errors 

218 return True 

219 

220 if error_response := getattr(s3obj.error, "response", None): 

221 if error_response["Error"]["Code"] in _RETRIABLE_S3_ERROR_CODES: 

222 return True 

223 

224 return False 

225 

226 

227def _curl_multi_run( 

228 objects: Sequence[S3Object], curl_handle_func: Callable[[S3Object], pycurl.Curl], attempt: int = 1 

229) -> None: 

230 m = _curlLocal.curlmulti 

231 for s3object in objects: 

232 c = curl_handle_func(s3object) 

233 m.add_handle(c) 

234 

235 while True: 

236 ret, active_handles = m.perform() 

237 if ret == pycurl.E_CALL_MULTI_PERFORM: 

238 # More processing required 

239 continue 

240 

241 if active_handles: 

242 # Wait for next event 

243 m.select(15.0) 

244 else: 

245 # All operations complete 

246 break 

247 

248 num_q, ok_list, err_list = m.info_read() 

249 assert num_q == 0 

250 

251 retry_objects = [] 

252 for c in ok_list: 

253 s3obj = c_s3ojb(c) 

254 s3obj.status_code = c.getinfo(pycurl.HTTP_CODE) 

255 m.remove_handle(c) 

256 c.close() 

257 for c, errno, errmsg in err_list: 

258 s3obj = c_s3ojb(c) 

259 if errno == pycurl.E_HTTP_RETURNED_ERROR: 

260 s3obj.status_code = c.getinfo(pycurl.HTTP_CODE) 

261 response: Dict[str, Any] = {} 

262 response["status_code"] = s3obj.status_code 

263 response["headers"] = s3obj._response_headers 

264 if (errfileobj := s3obj._errfileobj) is None: 

265 response["body"] = b"" 

266 else: 

267 errfileobj.seek(0) 

268 response["body"] = errfileobj.read() 

269 errfileobj.truncate(0) 

270 parser = botocore.parsers.RestXMLParser() 

271 # TODO: botocore safely handles `None` being passed here, but it is 

272 # probably best to rework this to get the correct `Shape` to match 

273 # the type hints from boto3-stubs 

274 parsed_response = parser.parse(response, None) # type: ignore[arg-type] 

275 s3obj.error = botocore.exceptions.ClientError(parsed_response, s3obj._method) 

276 else: 

277 s3obj.error = pycurl.error(errmsg) 

278 

279 if attempt < S3_MAX_RETRIES and _curl_should_retry(c, errno): 

280 s3obj.status_code = None 

281 s3obj.error = None 

282 retry_objects.append(s3obj) 

283 

284 m.remove_handle(c) 

285 c.close() 

286 

287 if retry_objects and attempt < S3_MAX_RETRIES: 

288 # Wait between attempts with truncated exponential backoff with jitter 

289 exp_backoff = 2 ** (attempt - 1) 

290 exp_backoff_with_jitter = random.random() * exp_backoff 

291 time.sleep(min(exp_backoff_with_jitter, _MAX_BACKOFF)) 

292 

293 _curl_multi_run(retry_objects, curl_handle_func, attempt=attempt + 1) 

294 

295 

296def head_objects(s3: "S3Client", objects: Sequence[S3Object]) -> None: 

297 def curl_handle_func(s3object: S3Object) -> pycurl.Curl: 

298 c = _curl_handle_for_s3(s3, "head_object", s3object) 

299 c.setopt(pycurl.NOBODY, True) 

300 return c 

301 

302 _curl_multi_run(objects, curl_handle_func) 

303 

304 

305def head_object(s3: "S3Client", s3object: S3Object) -> None: 

306 head_objects(s3, [s3object]) 

307 if s3object.error is not None: 

308 raise s3object.error 

309 

310 

311def set_s3_timeout( 

312 c: pycurl.Curl, 

313 s3object: S3Object, 

314 timeout_seconds_per_kilobyte: Optional[float], 

315 timeout_min_seconds: float, 

316) -> None: 

317 timeout = timeout_min_seconds 

318 if s3object.filesize is not None and timeout_seconds_per_kilobyte is not None: 

319 timeout = max(timeout, s3object.filesize * timeout_seconds_per_kilobyte / 1024) 

320 c.setopt(pycurl.TIMEOUT, int(timeout)) 

321 

322 

323def get_objects( 

324 s3: "S3Client", 

325 objects: Sequence[S3Object], 

326 timeout_seconds_per_kilobyte: Optional[float] = None, 

327 timeout_min_seconds: float = S3_TIMEOUT_READ, 

328) -> None: 

329 def curl_handle_func(s3object: S3Object) -> pycurl.Curl: 

330 c = _curl_handle_for_s3(s3, "get_object", s3object) 

331 c.setopt(pycurl.WRITEDATA, s3object.fileobj) 

332 set_s3_timeout(c, s3object, timeout_seconds_per_kilobyte, timeout_min_seconds) 

333 s3object._errfileobj = s3object.fileobj 

334 return c 

335 

336 _curl_multi_run(objects, curl_handle_func) 

337 

338 

339def get_object( 

340 s3: "S3Client", 

341 s3object: S3Object, 

342 timeout_seconds_per_kilobyte: Optional[float] = None, 

343 timeout_min_seconds: float = S3_TIMEOUT_READ, 

344) -> None: 

345 get_objects(s3, [s3object], timeout_seconds_per_kilobyte, timeout_min_seconds) 

346 if s3object.error is not None: 

347 raise s3object.error 

348 

349 

350def put_objects( 

351 s3: "S3Client", 

352 objects: Sequence[S3Object], 

353 timeout_seconds_per_kilobyte: Optional[float] = None, 

354 timeout_min_seconds: float = S3_TIMEOUT_READ, 

355) -> None: 

356 def curl_handle_func(s3object: S3Object) -> pycurl.Curl: 

357 c = _curl_handle_for_s3(s3, "put_object", s3object) 

358 c.setopt(pycurl.READDATA, s3object.fileobj) 

359 c.setopt(pycurl.INFILESIZE_LARGE, s3object.filesize) 

360 c.setopt(pycurl.UPLOAD, 1) 

361 set_s3_timeout(c, s3object, timeout_seconds_per_kilobyte, timeout_min_seconds) 

362 s3object._errfileobj = io.BytesIO() 

363 c.setopt(pycurl.WRITEDATA, s3object._errfileobj) 

364 return c 

365 

366 _curl_multi_run(objects, curl_handle_func) 

367 

368 

369def put_object( 

370 s3: "S3Client", 

371 s3object: S3Object, 

372 timeout_seconds_per_kilobyte: Optional[float] = None, 

373 timeout_min_seconds: float = S3_TIMEOUT_READ, 

374) -> None: 

375 put_objects(s3, [s3object], timeout_seconds_per_kilobyte, timeout_min_seconds) 

376 if s3object.error is not None: 

377 raise s3object.error 

378 

379 

380ParsedResponse = Dict[str, Union[str, Dict[str, Union[str, int, Dict[str, str]]]]] 

381 

382 

383def _parse_s3_response(s3: "S3Client", response: BinaryIO, s3object: S3Object, shape_name: str) -> ParsedResponse: 

384 response_dict: Dict[str, Union[Optional[int], Dict[str, str], bytes]] = {} 

385 response_dict["status_code"] = s3object.status_code 

386 response_dict["headers"] = s3object.response_headers 

387 response.seek(0) 

388 response_dict["body"] = response.read() 

389 

390 parser = botocore.parsers.RestXMLParser() 

391 shape = s3.meta.service_model.shape_for(shape_name) 

392 return cast(ParsedResponse, parser.parse(response_dict, shape)) 

393 

394 

395def start_multipart_upload(s3: "S3Client", s3object: S3Object) -> str: 

396 response = io.BytesIO() 

397 

398 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl: 

399 c = _curl_handle_for_s3(s3, "create_multipart_upload", _s3object) 

400 c.setopt(pycurl.WRITEDATA, response) 

401 c.setopt(pycurl.POST, 1) 

402 return c 

403 

404 _curl_multi_run([s3object], curl_handle_func) 

405 if s3object.error is not None: 

406 raise s3object.error 

407 

408 parsed_response = _parse_s3_response(s3, response, s3object, "CreateMultipartUploadOutput") 

409 return parsed_response["UploadId"] # type: ignore 

410 

411 

412def upload_parts(s3: "S3Client", s3object: S3Object, parts: Sequence[UploadPart]) -> Dict[int, str]: 

413 s3object_map = {S3Object(s3object.bucket, s3object.key): part for part in parts} 

414 

415 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl: 

416 part = s3object_map[_s3object] 

417 params: Dict[str, Union[int, str]] = {"UploadId": part.upload_id, "PartNumber": part.number} 

418 c = _curl_handle_for_s3(s3, "upload_part", _s3object, extra_params=params) 

419 c.setopt(pycurl.READDATA, part) 

420 c.setopt(pycurl.UPLOAD, 1) 

421 c.setopt(pycurl.INFILESIZE_LARGE, len(part)) 

422 c.setopt(pycurl.WRITEDATA, part) 

423 return c 

424 

425 _curl_multi_run(list(s3object_map.keys()), curl_handle_func) 

426 

427 errors: List[Exception] = [_s3object.error for _s3object in s3object_map.keys() if _s3object.error is not None] 

428 if len(errors) > 0: 

429 raise errors[0] 

430 

431 uploaded: Dict[int, str] = {} 

432 for _s3object, part in s3object_map.items(): 

433 response = _parse_s3_response(s3, part.response, _s3object, "UploadPartOutput") 

434 uploaded[part.number] = response["ResponseMetadata"]["HTTPHeaders"]["etag"] # type: ignore 

435 return uploaded 

436 

437 

438def complete_multipart_upload( 

439 s3: "S3Client", s3object: S3Object, upload_id: str, parts: Mapping[int, str] 

440) -> ParsedResponse: 

441 # Use the boto3 client directly here, rather than a presigned URL. This is 

442 # necessary because boto3's URL presigning is broken for `complete_multipart_upload` 

443 # when using s3v4 auth. 

444 # 

445 # See https://github.com/boto/boto3/issues/2192 

446 return cast( 

447 ParsedResponse, 

448 s3.complete_multipart_upload( 

449 Bucket=s3object.bucket, 

450 Key=s3object.key, 

451 UploadId=upload_id, 

452 MultipartUpload={"Parts": [{"ETag": tag, "PartNumber": number} for number, tag in parts.items()]}, 

453 ), 

454 ) 

455 

456 

457def _list_multipart_parts(s3: "S3Client", s3object: S3Object, upload_id: str) -> ParsedResponse: 

458 response = io.BytesIO() 

459 

460 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl: 

461 params = {"UploadId": upload_id} 

462 c = _curl_handle_for_s3(s3, "list_parts", _s3object, extra_params=params) 

463 c.setopt(pycurl.WRITEDATA, response) 

464 return c 

465 

466 _curl_multi_run([s3object], curl_handle_func) 

467 if s3object.error is not None: 

468 raise s3object.error 

469 return _parse_s3_response(s3, response, s3object, "ListPartsOutput") 

470 

471 

472def abort_multipart_upload(s3: "S3Client", s3object: S3Object, upload_id: str) -> None: 

473 def curl_handle_func(_s3object: S3Object) -> pycurl.Curl: 

474 params = {"UploadId": upload_id} 

475 c = _curl_handle_for_s3(s3, "abort_multipart_upload", _s3object, extra_params=params) 

476 c.setopt(pycurl.CUSTOMREQUEST, "DELETE") 

477 return c 

478 

479 parts = _list_multipart_parts(s3, s3object, upload_id) 

480 

481 # We need to iterate here in case any part uploads slip through in a race 

482 # against the AbortMultipartUpload call. 

483 # 

484 # See https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html 

485 while len(parts.get("Parts", [])) > 0: 

486 _curl_multi_run([s3object], curl_handle_func) 

487 try: 

488 parts = _list_multipart_parts(s3, s3object, upload_id) 

489 except botocore.exceptions.ClientError as e: 

490 code = e.response.get("Error", {}).get("Code") 

491 if code == "404": 

492 # 404 error here means that the multipart upload is properly aborted. 

493 break 

494 raise e 

495 

496 

497def multipart_upload(s3: "S3Client", s3object: S3Object) -> None: 

498 if s3object.fileobj is None or s3object.filesize is None: 

499 raise TypeError("S3Object provided to multipart upload didn't contain a file.") 

500 

501 upload_id = start_multipart_upload(s3, s3object) 

502 

503 try: 

504 part_number = 1 

505 parts: Dict[int, str] = {} 

506 queue: List[UploadPart] = [] 

507 while (part_number - 1) * S3_MULTIPART_PART_SIZE < s3object.filesize: 

508 part = UploadPart( 

509 upload_id=upload_id, 

510 number=part_number, 

511 file=s3object.fileobj, 

512 eof=s3object.filesize, 

513 size=S3_MULTIPART_PART_SIZE, 

514 offset=(part_number - 1) * S3_MULTIPART_PART_SIZE, 

515 ) 

516 queue.append(part) 

517 

518 part_number += 1 

519 

520 if len(queue) >= S3_MULTIPART_MAX_CONCURRENT_PARTS: 

521 uploaded = upload_parts(s3, s3object, queue) 

522 parts.update(uploaded) 

523 queue = [] 

524 uploaded = upload_parts(s3, s3object, queue) 

525 parts.update(uploaded) 

526 

527 complete_multipart_upload(s3, s3object, upload_id, parts) 

528 except Exception as e: 

529 abort_multipart_upload(s3, s3object, upload_id) 

530 raise e