Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/s3/s3utils.py: 91.18%

136 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import io 

16import random 

17import time 

18import threading 

19from typing import BinaryIO, Dict, Optional, Sequence 

20 

21import botocore 

22import pycurl 

23 

24from buildgrid.settings import ( 

25 S3_USERAGENT_NAME, 

26 S3_MAX_RETRIES, 

27 S3_TIMEOUT_CONNECT, 

28 S3_TIMEOUT_READ 

29) 

30 

31 

32# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html 

33_RETRIABLE_HTTP_STATUS_CODES = (408, 429, 500, 502, 503, 504, 509) 

34_RETRIABLE_S3_ERROR_CODES = ('Throttling', 'ThrottlingException', 'ThrottledException', 

35 'RequestThrottledException', 'ProvisionedThroughputExceededException') 

36# Maximum backoff in seconds 

37_MAX_BACKOFF = 20 

38# Maximum requests to run in parallel via CurlMulti 

39_MAX_CURLMULTI_CONNECTIONS = 10 

40 

41 

42class _CurlLocal(threading.local): 

43 def __init__(self): 

44 self.curlmulti = pycurl.CurlMulti() 

45 self.curlmulti.setopt(pycurl.M_MAX_TOTAL_CONNECTIONS, _MAX_CURLMULTI_CONNECTIONS) 

46 

47_curlLocal = _CurlLocal() 

48 

49 

50class S3Object: 

51 def __init__(self, bucket: str, key: str): 

52 self.bucket = bucket 

53 self.key = key 

54 self.fileobj = None 

55 self.filesize = None 

56 self.error = None 

57 self.status_code = None 

58 self._method = None # type: Optional[str] 

59 self._errfileobj = None # type: Optional[BinaryIO] 

60 self._response_headers = {} # type: Dict[str, str] 

61 

62 # Function to process HTTP response headers 

63 def _header_function(self, header_line): 

64 header_line = header_line.decode('ascii') 

65 

66 # Skip status line 

67 if ':' not in header_line: 

68 return 

69 

70 name, value = header_line.split(':', maxsplit=1) 

71 name = name.strip().lower() 

72 value = value.strip() 

73 

74 self._response_headers[name] = value 

75 

76 

77def _curl_handle_for_s3(s3, method: str, s3object: S3Object): 

78 s3object._method = method 

79 url = s3.generate_presigned_url(method, Params={'Bucket': s3object.bucket, 

80 'Key': s3object.key}, ExpiresIn=3600) 

81 c = pycurl.Curl() 

82 c.s3object = s3object # type: ignore 

83 c.setopt(pycurl.USERAGENT, S3_USERAGENT_NAME) 

84 c.setopt(pycurl.CONNECTTIMEOUT, S3_TIMEOUT_CONNECT) 

85 c.setopt(pycurl.TIMEOUT, S3_TIMEOUT_READ) 

86 c.setopt(pycurl.FAILONERROR, True) 

87 c.setopt(pycurl.URL, url) 

88 c.setopt(pycurl.HEADERFUNCTION, s3object._header_function) 

89 return c 

90 

91 

92def _curl_should_retry(c, errno): 

93 if errno in (pycurl.E_COULDNT_CONNECT, pycurl.E_SEND_ERROR, pycurl.E_RECV_ERROR, pycurl.E_OPERATION_TIMEDOUT): 

94 # Retry on network and timeout errors 

95 return True 

96 

97 if errno == pycurl.E_HTTP_RETURNED_ERROR: 

98 if c.s3object.status_code in _RETRIABLE_HTTP_STATUS_CODES: 

99 # Retry on 'Request Timeout', 'Too Many Requests' and transient server errors 

100 return True 

101 

102 if c.s3object.error.response['Error']['Code'] in _RETRIABLE_S3_ERROR_CODES: 

103 return True 

104 

105 return False 

106 

107 

108def _curl_multi_run(objects: Sequence[S3Object], curl_handle_func, attempt=1): 

109 m = _curlLocal.curlmulti 

110 for s3object in objects: 

111 c = curl_handle_func(s3object) 

112 m.add_handle(c) 

113 

114 while True: 

115 ret, active_handles = m.perform() 

116 if ret == pycurl.E_CALL_MULTI_PERFORM: 

117 # More processing required 

118 continue 

119 

120 if active_handles: 

121 # Wait for next event 

122 m.select(15.0) 

123 else: 

124 # All operations complete 

125 break 

126 

127 num_q, ok_list, err_list = m.info_read() 

128 assert num_q == 0 

129 

130 retry_objects = [] 

131 for c in ok_list: 

132 m.remove_handle(c) 

133 c.close() 

134 for c, errno, errmsg in err_list: 

135 if errno == pycurl.E_HTTP_RETURNED_ERROR: 

136 c.s3object.status_code = c.getinfo(pycurl.HTTP_CODE) 

137 response = {} 

138 response["status_code"] = c.s3object.status_code 

139 response["headers"] = c.s3object._response_headers 

140 if c.s3object._errfileobj is None: 

141 response["body"] = "" 

142 else: 

143 c.s3object._errfileobj.seek(0) 

144 response["body"] = c.s3object._errfileobj.read() 

145 c.s3object._errfileobj.truncate(0) 

146 parser = botocore.parsers.RestXMLParser() 

147 parsed_response = parser.parse(response, None) 

148 c.s3object.error = botocore.exceptions.ClientError(parsed_response, c.s3object._method) 

149 else: 

150 c.s3object.error = pycurl.error(errmsg) 

151 

152 if attempt < S3_MAX_RETRIES and _curl_should_retry(c, errno): 

153 c.s3object.status_code = None 

154 c.s3object.error = None 

155 retry_objects.append(c.s3object) 

156 

157 m.remove_handle(c) 

158 c.close() 

159 

160 if retry_objects and attempt < S3_MAX_RETRIES: 

161 # Wait between attempts with truncated exponential backoff with jitter 

162 exp_backoff = 2 ** (attempt - 1) 

163 exp_backoff_with_jitter = random.random() * exp_backoff 

164 time.sleep(min(exp_backoff_with_jitter, _MAX_BACKOFF)) 

165 

166 _curl_multi_run(retry_objects, curl_handle_func, attempt=attempt + 1) 

167 

168 

169def head_objects(s3, objects: Sequence[S3Object]): 

170 def curl_handle_func(s3object: S3Object): 

171 c = _curl_handle_for_s3(s3, 'head_object', s3object) 

172 c.setopt(pycurl.NOBODY, True) 

173 return c 

174 

175 _curl_multi_run(objects, curl_handle_func) 

176 

177 

178def head_object(s3, s3object: S3Object): 

179 head_objects(s3, [s3object]) 

180 if s3object.error is not None: 

181 raise s3object.error 

182 

183 

184def get_objects(s3, objects: Sequence[S3Object]): 

185 def curl_handle_func(s3object: S3Object): 

186 c = _curl_handle_for_s3(s3, 'get_object', s3object) 

187 c.setopt(pycurl.WRITEDATA, s3object.fileobj) 

188 s3object._errfileobj = s3object.fileobj 

189 return c 

190 

191 _curl_multi_run(objects, curl_handle_func) 

192 

193 

194def get_object(s3, s3object: S3Object): 

195 get_objects(s3, [s3object]) 

196 if s3object.error is not None: 

197 raise s3object.error 

198 

199 

200def put_objects(s3, objects: Sequence[S3Object]): 

201 def curl_handle_func(s3object: S3Object): 

202 c = _curl_handle_for_s3(s3, 'put_object', s3object) 

203 c.setopt(pycurl.READDATA, s3object.fileobj) 

204 c.setopt(pycurl.INFILESIZE_LARGE, s3object.filesize) 

205 c.setopt(pycurl.UPLOAD, 1) 

206 s3object._errfileobj = io.BytesIO() 

207 c.setopt(pycurl.WRITEDATA, s3object._errfileobj) 

208 return c 

209 

210 _curl_multi_run(objects, curl_handle_func) 

211 

212 

213def put_object(s3, s3object: S3Object): 

214 put_objects(s3, [s3object]) 

215 if s3object.error is not None: 

216 raise s3object.error