Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/s3_cache.py: 92.98%

114 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-28 16:48 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17S3 Action Cache 

18================== 

19 

20Implements an Action Cache using S3 to store cache entries. 

21 

22""" 

23 

24import io 

25import logging 

26from typing import Any 

27 

28import boto3 

29from botocore.config import Config as BotoConfig 

30from botocore.exceptions import ClientError 

31 

32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

33from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

34from buildgrid.server.cas.storage.storage_abc import StorageABC 

35from buildgrid.server.enums import ActionCacheEntryType 

36from buildgrid.server.exceptions import NotFoundError, StorageFullError 

37from buildgrid.server.logging import buildgrid_logger 

38from buildgrid.server.s3 import s3utils 

39 

40LOGGER = buildgrid_logger(__name__) 

41 

42 

43class S3ActionCache(ActionCacheABC): 

44 def __init__( 

45 self, 

46 storage: StorageABC, 

47 allow_updates: bool = True, 

48 cache_failed_actions: bool = True, 

49 entry_type: ActionCacheEntryType | None = ActionCacheEntryType.ACTION_RESULT_DIGEST, 

50 migrate_entries: bool | None = False, 

51 bucket: str | None = None, 

52 endpoint_url: str | None = None, 

53 aws_access_key_id: str | None = None, 

54 aws_secret_access_key: str | None = None, 

55 config: BotoConfig | None = BotoConfig(), 

56 cache_key_salt: str | None = None, 

57 ): 

58 """Initialises a new ActionCache instance using S3 to persist the action cache. 

59 

60 Args: 

61 storage (StorageABC): storage backend instance to be used to store ActionResults. 

62 allow_updates (bool): allow the client to write to storage 

63 cache_failed_actions (bool): whether to store failed actions in the Action Cache 

64 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests. 

65 migrate_entries (bool): if set, migrate entries that contain a value with 

66 a different `EntryType` to `entry_type` as they are accessed 

67 (False by default). 

68 cache_key_salt (str): if provided, included in the S3 key for cache entries. Use 

69 to isolate or share specific chunks of a shared set of S3 buckets. 

70 

71 bucket (str): Name of bucket 

72 endpoint_url (str): URL of endpoint. 

73 aws_access_key_id (str): S3-ACCESS-KEY 

74 aws_secret_access_key (str): S3-SECRET-KEY 

75 """ 

76 super().__init__(storage=storage, allow_updates=allow_updates) 

77 

78 self._entry_type = entry_type 

79 self._migrate_entries = migrate_entries 

80 

81 self._cache_failed_actions = cache_failed_actions 

82 self._cache_key_salt = cache_key_salt 

83 assert bucket is not None 

84 self._bucket_template = bucket 

85 

86 # Boto logs can be very verbose, restrict to WARNING 

87 for boto_logger_name in ["boto3", "botocore", "s3transfer", "urllib3"]: 

88 boto_logger = logging.getLogger(boto_logger_name) 

89 boto_logger.setLevel(max(boto_logger.level, logging.WARNING)) 

90 

91 # Only pass arguments with a value to support testing with moto_server 

92 client_kwargs: dict[str, Any] = {} 

93 if endpoint_url is not None: 

94 client_kwargs["endpoint_url"] = endpoint_url 

95 if aws_access_key_id is not None: 

96 client_kwargs["aws_access_key_id"] = aws_access_key_id 

97 if aws_secret_access_key is not None: 

98 client_kwargs["aws_secret_access_key"] = aws_secret_access_key 

99 client_kwargs["config"] = config 

100 

101 self._s3cache = boto3.client("s3", **client_kwargs) 

102 

103 # --- Public API --- 

104 @property 

105 def allow_updates(self) -> bool: 

106 return self._allow_updates 

107 

108 def get_action_result(self, action_digest: Digest) -> ActionResult: 

109 """Retrieves the cached ActionResult for the given Action digest. 

110 

111 Args: 

112 action_digest: The digest to get the result for 

113 

114 Returns: 

115 The cached ActionResult matching the given key or raises 

116 NotFoundError. 

117 """ 

118 action_result = self._get_action_result(action_digest) 

119 if action_result is not None: 

120 if self.referenced_blobs_still_exist(action_digest, action_result): 

121 return action_result 

122 

123 if self._allow_updates: 

124 LOGGER.debug( 

125 "Removing action from cache due to missing blobs in CAS.", 

126 tags=dict(digest=action_digest), 

127 ) 

128 self._delete_key_from_cache(action_digest) 

129 

130 raise NotFoundError(f"Key not found: {action_digest.hash}/{action_digest.size_bytes}") 

131 

132 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

133 """Stores the result in cache for the given key. 

134 

135 Args: 

136 action_digest (Digest): digest of Action to update 

137 action_result (ActionResult): ActionResult to store. 

138 """ 

139 if not self._allow_updates: 

140 raise NotImplementedError("Updating cache not allowed") 

141 

142 if self._cache_failed_actions or action_result.exit_code == 0: 

143 assert self._storage, "Storage used before initialization" 

144 action_result_digest = self._storage.put_message(action_result) 

145 

146 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST: 

147 self._update_cache_key(action_digest, action_result_digest.SerializeToString()) 

148 else: 

149 self._update_cache_key(action_digest, action_result.SerializeToString()) 

150 

151 LOGGER.info("Result cached for action.", tags=dict(digest=action_digest)) 

152 

153 # --- Private API --- 

154 def _get_action_result(self, digest: Digest) -> ActionResult | None: 

155 """Get an `ActionResult` from the cache. 

156 

157 If present, returns the `ActionResult` corresponding to the given digest. 

158 Otherwise returns None. 

159 

160 When configured to do so, if the value stored in the entry in S3 contains 

161 a different type, it will convert it to `entry_type`. 

162 

163 Args: 

164 digest: Action digest to get the associated ActionResult digest for 

165 

166 Returns: 

167 The `ActionResult` or None if the digest is not present 

168 """ 

169 value_in_cache = self._get_value_from_cache(digest) 

170 if not value_in_cache: 

171 return None 

172 

173 # Attempting to parse the entry as a `Digest` first: 

174 action_result_digest = Digest.FromString(value_in_cache) 

175 if len(action_result_digest.hash) == len(digest.hash): 

176 # The cache contains the `Digest` of the `ActionResult`: 

177 assert self._storage, "Storage used before initialization" 

178 action_result = self._storage.get_message(action_result_digest, ActionResult) 

179 

180 # If configured, update the entry to contain an `ActionResult`: 

181 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries: 

182 LOGGER.debug("Converting entry from Digest to ActionResult.", tags=dict(digest=digest)) 

183 assert action_result, "Returned result was none" 

184 self._update_cache_key(digest, action_result.SerializeToString()) 

185 

186 else: 

187 action_result = ActionResult.FromString(value_in_cache) 

188 

189 # If configured, update the entry to contain a `Digest`: 

190 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries: 

191 LOGGER.debug("Converting entry from ActionResult to Digest.", tags=dict(digest=digest)) 

192 assert self._storage, "Storage used before initialization" 

193 action_result_digest = self._storage.put_message(action_result) 

194 self._update_cache_key(digest, action_result_digest.SerializeToString()) 

195 

196 return action_result 

197 

198 def _get_value_from_cache(self, digest: Digest) -> bytes | None: 

199 """Get the bytes stored in cache for the given Digest. 

200 

201 Args: 

202 digest: Action digest to get the associated bytes in S3 

203 

204 Returns: 

205 bytes or None if the digest is not present 

206 """ 

207 

208 try: 

209 s3object = self._get_s3object(digest) 

210 s3object.fileobj = io.BytesIO() 

211 s3utils.get_object(self._s3cache, s3object) 

212 s3object.fileobj.seek(0) 

213 return s3object.fileobj.read() 

214 except ClientError as e: 

215 if e.response["Error"]["Code"] not in ["404", "NoSuchKey"]: 

216 raise 

217 return None 

218 

219 def _update_cache_key(self, digest: Digest, value: bytes) -> None: 

220 try: 

221 s3object = self._get_s3object(digest) 

222 s3object.fileobj = io.BytesIO(value) 

223 s3object.filesize = len(value) 

224 s3utils.put_object(self._s3cache, s3object) 

225 except ClientError as error: 

226 if error.response["Error"]["Code"] == "QuotaExceededException": 

227 raise StorageFullError("ActionCache S3 Quota Exceeded.") from error 

228 raise error 

229 

230 def _delete_key_from_cache(self, digest: Digest) -> None: 

231 """Remove an entry from the ActionCache 

232 

233 Args: 

234 digest: entry to remove from the ActionCache 

235 

236 Returns: 

237 None 

238 """ 

239 if not self._allow_updates: 

240 raise NotImplementedError("Updating cache not allowed") 

241 

242 self._s3cache.delete_object(Bucket=self._get_bucket_name(digest), Key=self._get_key(digest)) 

243 

244 def _get_key(self, digest: Digest) -> str: 

245 """ 

246 Given a `Digest`, returns the key used to store its 

247 corresponding entry in S3. 

248 """ 

249 if self._cache_key_salt is None: 

250 return f"{digest.hash}_{digest.size_bytes}" 

251 return f"{self._cache_key_salt}_{digest.hash}_{digest.size_bytes}" 

252 

253 def _get_bucket_name(self, digest: Digest) -> str: 

254 """Return the formatted bucket name for a given digest. 

255 

256 This formats the bucket template defined in the configuration file 

257 using a digest to be stored in the cache. This allows the cache 

258 contents to be sharded across multiple S3 buckets, allowing us to 

259 cache more Actions than can be stored in a single S3 bucket. 

260 

261 Currently the only variable interpolated into the template is 

262 ``digest``, which contains the hash part of the Digest. A template 

263 string which includes undefined variables will result in a 

264 non-functional ActionCache. 

265 

266 Args: 

267 digest (Digest): The digest to get the bucket name for. 

268 

269 Returns: 

270 str: The bucket name corresponding to the given Digest. 

271 

272 """ 

273 try: 

274 return self._bucket_template.format(digest=digest.hash) 

275 except IndexError: 

276 LOGGER.error( 

277 ( 

278 "Could not generate bucket name for digest. " 

279 "This is either a misconfiguration in the BuildGrid S3 " 

280 "ActionCache bucket configuration, or a badly formed request." 

281 ), 

282 tags=dict(digest=digest), 

283 ) 

284 raise 

285 

286 def _get_s3object(self, digest: Digest) -> s3utils.S3Object: 

287 return s3utils.S3Object(self._get_bucket_name(digest), self._get_key(digest))