Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/s3_cache.py: 87.85%

107 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17S3 Action Cache 

18================== 

19 

20Implements an Action Cache using S3 to store cache entries. 

21 

22""" 

23 

24import io 

25import logging 

26from typing import Any, Dict, Optional 

27 

28import boto3 

29from botocore.config import Config as BotoConfig 

30from botocore.exceptions import ClientError 

31 

32from buildgrid._exceptions import ( 

33 NotFoundError, 

34 StorageFullError 

35) 

36from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

37 ActionResult, 

38 Digest, 

39) 

40from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

41from buildgrid.server.cas.storage.storage_abc import StorageABC 

42from buildgrid.server.s3 import s3utils 

43from buildgrid._enums import ActionCacheEntryType 

44 

45 

46class S3ActionCache(ActionCacheABC): 

47 def __init__(self, storage: StorageABC, allow_updates: bool=True, 

48 cache_failed_actions: bool=True, 

49 entry_type: Optional[ActionCacheEntryType]=ActionCacheEntryType.ACTION_RESULT_DIGEST, 

50 migrate_entries: Optional[bool]=False, 

51 bucket: Optional[str]=None, 

52 endpoint: Optional[str]=None, access_key: Optional[str]=None, 

53 secret_key: Optional[str]=None, 

54 config: Optional[BotoConfig]=BotoConfig()): 

55 """ Initialises a new ActionCache instance using S3 to persist the action cache. 

56 

57 Args: 

58 storage (StorageABC): storage backend instance to be used to store ActionResults. 

59 allow_updates (bool): allow the client to write to storage 

60 cache_failed_actions (bool): whether to store failed actions in the Action Cache 

61 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests. 

62 migrate_entries (bool): if set, migrate entries that contain a value with 

63 a different `EntryType` to `entry_type` as they are accessed 

64 (False by default). 

65 

66 bucket (str): Name of bucket 

67 endpoint (str): URL of endpoint. 

68 access-key (str): S3-ACCESS-KEY 

69 secret-key (str): S3-SECRET-KEY 

70 """ 

71 ActionCacheABC.__init__(self, storage=storage, allow_updates=allow_updates) 

72 self._logger = logging.getLogger(__name__) 

73 

74 self._entry_type = entry_type 

75 self._migrate_entries = migrate_entries 

76 

77 self._cache_failed_actions = cache_failed_actions 

78 assert bucket is not None 

79 self._bucket_template = bucket 

80 

81 # Boto logs can be very verbose, restrict to WARNING 

82 for boto_logger_name in [ 

83 'boto3', 'botocore', 

84 's3transfer', 'urllib3' 

85 ]: 

86 boto_logger = logging.getLogger(boto_logger_name) 

87 boto_logger.setLevel(max(boto_logger.level, logging.WARNING)) 

88 

89 # Only pass arguments with a value to support testing with moto_server 

90 client_kwargs = {} # type: Dict[str, Any] 

91 if endpoint is not None: 

92 client_kwargs["endpoint_url"] = endpoint 

93 if access_key is not None: 

94 client_kwargs["aws_access_key_id"] = access_key 

95 if secret_key is not None: 

96 client_kwargs["aws_secret_access_key"] = secret_key 

97 client_kwargs["config"] = config 

98 

99 self._s3cache = boto3.client('s3', **client_kwargs) 

100 

101 # --- Public API --- 

102 @property 

103 def allow_updates(self) -> bool: 

104 return self._allow_updates 

105 

106 def get_action_result(self, action_digest: Digest) -> ActionResult: 

107 """Retrieves the cached ActionResult for the given Action digest. 

108 

109 Args: 

110 action_digest: The digest to get the result for 

111 

112 Returns: 

113 The cached ActionResult matching the given key or raises 

114 NotFoundError. 

115 """ 

116 action_result = self._get_action_result(action_digest) 

117 if action_result is not None: 

118 if self._action_result_blobs_still_exist(action_result): 

119 return action_result 

120 

121 if self._allow_updates: 

122 self._logger.debug(f"Removing {action_digest.hash}/{action_digest.size_bytes} " 

123 "from cache due to missing blobs in CAS") 

124 self._delete_key_from_cache(action_digest) 

125 

126 raise NotFoundError(f"Key not found: {action_digest.hash}/{action_digest.size_bytes}") 

127 

128 def update_action_result(self, action_digest: Digest, 

129 action_result: ActionResult) -> None: 

130 """Stores the result in cache for the given key. 

131 

132 Args: 

133 action_digest (Digest): digest of Action to update 

134 action_result (ActionResult): ActionResult to store. 

135 """ 

136 if not self._allow_updates: 

137 raise NotImplementedError("Updating cache not allowed") 

138 

139 if self._cache_failed_actions or action_result.exit_code == 0: 

140 action_result_digest = self._storage.put_message(action_result) 

141 

142 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST: 

143 self._update_cache_key(action_digest, action_result_digest.SerializeToString()) 

144 else: 

145 self._update_cache_key(action_digest, action_result.SerializeToString()) 

146 

147 self._logger.info( 

148 f"Result cached for action [{action_digest.hash}/{action_digest.size_bytes}]") 

149 

150 # --- Private API --- 

151 def _get_action_result(self, digest: Digest) -> Optional[ActionResult]: 

152 """Get an `ActionResult` from the cache. 

153 

154 If present, returns the `ActionResult` corresponding to the given digest. 

155 Otherwise returns None. 

156 

157 When configured to do so, if the value stored in the entry in S3 contains 

158 a different type, it will convert it to `entry_type`. 

159 

160 Args: 

161 digest: Action digest to get the associated ActionResult digest for 

162 

163 Returns: 

164 The `ActionResult` or None if the digest is not present 

165 """ 

166 value_in_cache = self._get_value_from_cache(digest) 

167 if not value_in_cache: 

168 return None 

169 

170 # Attempting to parse the entry as a `Digest` first: 

171 action_result_digest = Digest.FromString(value_in_cache) 

172 if len(action_result_digest.hash) == len(digest.hash): 

173 # The cache contains the `Digest` of the `ActionResult`: 

174 action_result = self._storage.get_message(action_result_digest, 

175 ActionResult) 

176 

177 # If configured, update the entry to contain an `ActionResult`: 

178 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries: 

179 self._logger.debug(f"Converting entry for {digest.hash}/{digest.size_bytes} " 

180 "from Digest to ActionResult") 

181 self._update_cache_key(digest, action_result.SerializeToString()) 

182 

183 else: 

184 action_result = ActionResult.FromString(value_in_cache) 

185 

186 # If configured, update the entry to contain a `Digest`: 

187 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries: 

188 self._logger.debug(f"Converting entry for {digest.hash}/{digest.size_bytes} " 

189 "from ActionResult to Digest") 

190 action_result_digest = self._storage.put_message(action_result) 

191 self._update_cache_key(digest, action_result_digest.SerializeToString()) 

192 

193 return action_result 

194 

195 def _get_value_from_cache(self, digest: Digest) -> Optional[bytes]: 

196 """Get the bytes stored in cache for the given Digest. 

197 

198 Args: 

199 digest: Action digest to get the associated bytes in S3 

200 

201 Returns: 

202 bytes or None if the digest is not present 

203 """ 

204 

205 try: 

206 s3object = self._get_s3object(digest) 

207 s3object.fileobj = io.BytesIO() 

208 s3utils.get_object(self._s3cache, s3object) 

209 s3object.fileobj.seek(0) 

210 return s3object.fileobj.read() 

211 except ClientError as e: 

212 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

213 raise 

214 return None 

215 

216 def _update_cache_key(self, digest: Digest, value: bytes) -> None: 

217 try: 

218 s3object = self._get_s3object(digest) 

219 s3object.fileobj = io.BytesIO(value) 

220 s3object.filesize = len(value) 

221 s3utils.put_object(self._s3cache, s3object) 

222 except ClientError as error: 

223 if error.response['Error']['Code'] == 'QuotaExceededException': 

224 raise StorageFullError("ActionCache S3 Quota Exceeded.") from error 

225 raise error 

226 

227 def _delete_key_from_cache(self, digest: Digest) -> None: 

228 """Remove an entry from the ActionCache 

229 

230 Args: 

231 digest: entry to remove from the ActionCache 

232 

233 Returns: 

234 None 

235 """ 

236 if not self._allow_updates: 

237 raise NotImplementedError("Updating cache not allowed") 

238 

239 self._s3cache.delete_object(Bucket=self._get_bucket_name(digest), Key=self._get_key(digest)) 

240 

241 @staticmethod 

242 def _get_key(digest: Digest) -> str: 

243 """ 

244 Given a `Digest`, returns the key used to store its 

245 corresponding entry in S3. 

246 """ 

247 return f'{digest.hash}_{digest.size_bytes}' 

248 

249 def _get_bucket_name(self, digest: Digest) -> str: 

250 """Return the formatted bucket name for a given digest. 

251 

252 This formats the bucket template defined in the configuration file 

253 using a digest to be stored in the cache. This allows the cache 

254 contents to be sharded across multiple S3 buckets, allowing us to 

255 cache more Actions than can be stored in a single S3 bucket. 

256 

257 Currently the only variable interpolated into the template is 

258 ``digest``, which contains the hash part of the Digest. A template 

259 string which includes undefined variables will result in a 

260 non-functional ActionCache. 

261 

262 Args: 

263 digest (Digest): The digest to get the bucket name for. 

264 

265 Returns: 

266 str: The bucket name corresponding to the given Digest. 

267 

268 """ 

269 try: 

270 return self._bucket_template.format(digest=digest.hash) 

271 except IndexError: 

272 self._logger.error( 

273 f"Could not generate bucket name for digest=[{digest.hash}]. " 

274 "This is either a misconfiguration in the BuildGrid S3 " 

275 "ActionCache bucket configuration, or a badly formed request." 

276 ) 

277 raise 

278 

279 def _get_s3object(self, digest: Digest): 

280 return s3utils.S3Object(self._get_bucket_name(digest), self._get_key(digest))