Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/s3_cache.py: 92.98%

114 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17S3 Action Cache 

18================== 

19 

20Implements an Action Cache using S3 to store cache entries. 

21 

22""" 

23 

24import io 

25import logging 

26from typing import Any, Dict, Optional 

27 

28import boto3 

29from botocore.config import Config as BotoConfig 

30from botocore.exceptions import ClientError 

31 

32from buildgrid._enums import ActionCacheEntryType 

33from buildgrid._exceptions import NotFoundError, StorageFullError 

34from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

35from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

36from buildgrid.server.cas.storage.storage_abc import StorageABC 

37from buildgrid.server.metrics_names import AC_UNUSABLE_CACHE_HITS_METRIC_NAME 

38from buildgrid.server.metrics_utils import publish_counter_metric 

39from buildgrid.server.s3 import s3utils 

40 

41LOGGER = logging.getLogger(__name__) 

42 

43 

44class S3ActionCache(ActionCacheABC): 

45 def __init__( 

46 self, 

47 storage: StorageABC, 

48 allow_updates: bool = True, 

49 cache_failed_actions: bool = True, 

50 entry_type: Optional[ActionCacheEntryType] = ActionCacheEntryType.ACTION_RESULT_DIGEST, 

51 migrate_entries: Optional[bool] = False, 

52 bucket: Optional[str] = None, 

53 endpoint_url: Optional[str] = None, 

54 aws_access_key_id: Optional[str] = None, 

55 aws_secret_access_key: Optional[str] = None, 

56 config: Optional[BotoConfig] = BotoConfig(), 

57 ): 

58 """Initialises a new ActionCache instance using S3 to persist the action cache. 

59 

60 Args: 

61 storage (StorageABC): storage backend instance to be used to store ActionResults. 

62 allow_updates (bool): allow the client to write to storage 

63 cache_failed_actions (bool): whether to store failed actions in the Action Cache 

64 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests. 

65 migrate_entries (bool): if set, migrate entries that contain a value with 

66 a different `EntryType` to `entry_type` as they are accessed 

67 (False by default). 

68 

69 bucket (str): Name of bucket 

70 endpoint_url (str): URL of endpoint. 

71 aws_access_key_id (str): S3-ACCESS-KEY 

72 aws_secret_access_key (str): S3-SECRET-KEY 

73 """ 

74 super().__init__(storage=storage, allow_updates=allow_updates) 

75 

76 self._entry_type = entry_type 

77 self._migrate_entries = migrate_entries 

78 

79 self._cache_failed_actions = cache_failed_actions 

80 assert bucket is not None 

81 self._bucket_template = bucket 

82 

83 # Boto logs can be very verbose, restrict to WARNING 

84 for boto_logger_name in ["boto3", "botocore", "s3transfer", "urllib3"]: 

85 boto_logger = logging.getLogger(boto_logger_name) 

86 boto_logger.setLevel(max(boto_logger.level, logging.WARNING)) 

87 

88 # Only pass arguments with a value to support testing with moto_server 

89 client_kwargs: Dict[str, Any] = {} 

90 if endpoint_url is not None: 

91 client_kwargs["endpoint_url"] = endpoint_url 

92 if aws_access_key_id is not None: 

93 client_kwargs["aws_access_key_id"] = aws_access_key_id 

94 if aws_secret_access_key is not None: 

95 client_kwargs["aws_secret_access_key"] = aws_secret_access_key 

96 client_kwargs["config"] = config 

97 

98 self._s3cache = boto3.client("s3", **client_kwargs) 

99 

100 # --- Public API --- 

101 @property 

102 def allow_updates(self) -> bool: 

103 return self._allow_updates 

104 

105 def get_action_result(self, action_digest: Digest) -> ActionResult: 

106 """Retrieves the cached ActionResult for the given Action digest. 

107 

108 Args: 

109 action_digest: The digest to get the result for 

110 

111 Returns: 

112 The cached ActionResult matching the given key or raises 

113 NotFoundError. 

114 """ 

115 action_result = self._get_action_result(action_digest) 

116 if action_result is not None: 

117 if self._referenced_blobs_still_exist(action_digest, action_result): 

118 return action_result 

119 

120 publish_counter_metric(AC_UNUSABLE_CACHE_HITS_METRIC_NAME, 1, {"instance_name": self._instance_name}) 

121 

122 if self._allow_updates: 

123 LOGGER.debug( 

124 f"Removing {action_digest.hash}/{action_digest.size_bytes} " 

125 "from cache due to missing blobs in CAS" 

126 ) 

127 self._delete_key_from_cache(action_digest) 

128 

129 raise NotFoundError(f"Key not found: {action_digest.hash}/{action_digest.size_bytes}") 

130 

131 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

132 """Stores the result in cache for the given key. 

133 

134 Args: 

135 action_digest (Digest): digest of Action to update 

136 action_result (ActionResult): ActionResult to store. 

137 """ 

138 if not self._allow_updates: 

139 raise NotImplementedError("Updating cache not allowed") 

140 

141 if self._cache_failed_actions or action_result.exit_code == 0: 

142 assert self._storage, "Storage used before initialization" 

143 action_result_digest = self._storage.put_message(action_result) 

144 

145 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST: 

146 self._update_cache_key(action_digest, action_result_digest.SerializeToString()) 

147 else: 

148 self._update_cache_key(action_digest, action_result.SerializeToString()) 

149 

150 LOGGER.info(f"Result cached for action [{action_digest.hash}/{action_digest.size_bytes}]") 

151 

152 # --- Private API --- 

153 def _get_action_result(self, digest: Digest) -> Optional[ActionResult]: 

154 """Get an `ActionResult` from the cache. 

155 

156 If present, returns the `ActionResult` corresponding to the given digest. 

157 Otherwise returns None. 

158 

159 When configured to do so, if the value stored in the entry in S3 contains 

160 a different type, it will convert it to `entry_type`. 

161 

162 Args: 

163 digest: Action digest to get the associated ActionResult digest for 

164 

165 Returns: 

166 The `ActionResult` or None if the digest is not present 

167 """ 

168 value_in_cache = self._get_value_from_cache(digest) 

169 if not value_in_cache: 

170 return None 

171 

172 # Attempting to parse the entry as a `Digest` first: 

173 action_result_digest = Digest.FromString(value_in_cache) 

174 if len(action_result_digest.hash) == len(digest.hash): 

175 # The cache contains the `Digest` of the `ActionResult`: 

176 assert self._storage, "Storage used before initialization" 

177 action_result = self._storage.get_message(action_result_digest, ActionResult) 

178 

179 # If configured, update the entry to contain an `ActionResult`: 

180 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries: 

181 LOGGER.debug(f"Converting entry for {digest.hash}/{digest.size_bytes} from Digest to ActionResult") 

182 assert action_result, "Returned result was none" 

183 self._update_cache_key(digest, action_result.SerializeToString()) 

184 

185 else: 

186 action_result = ActionResult.FromString(value_in_cache) 

187 

188 # If configured, update the entry to contain a `Digest`: 

189 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries: 

190 LOGGER.debug(f"Converting entry for {digest.hash}/{digest.size_bytes} from ActionResult to Digest") 

191 assert self._storage, "Storage used before initialization" 

192 action_result_digest = self._storage.put_message(action_result) 

193 self._update_cache_key(digest, action_result_digest.SerializeToString()) 

194 

195 return action_result 

196 

197 def _get_value_from_cache(self, digest: Digest) -> Optional[bytes]: 

198 """Get the bytes stored in cache for the given Digest. 

199 

200 Args: 

201 digest: Action digest to get the associated bytes in S3 

202 

203 Returns: 

204 bytes or None if the digest is not present 

205 """ 

206 

207 try: 

208 s3object = self._get_s3object(digest) 

209 s3object.fileobj = io.BytesIO() 

210 s3utils.get_object(self._s3cache, s3object) 

211 s3object.fileobj.seek(0) 

212 return s3object.fileobj.read() 

213 except ClientError as e: 

214 if e.response["Error"]["Code"] not in ["404", "NoSuchKey"]: 

215 raise 

216 return None 

217 

218 def _update_cache_key(self, digest: Digest, value: bytes) -> None: 

219 try: 

220 s3object = self._get_s3object(digest) 

221 s3object.fileobj = io.BytesIO(value) 

222 s3object.filesize = len(value) 

223 s3utils.put_object(self._s3cache, s3object) 

224 except ClientError as error: 

225 if error.response["Error"]["Code"] == "QuotaExceededException": 

226 raise StorageFullError("ActionCache S3 Quota Exceeded.") from error 

227 raise error 

228 

229 def _delete_key_from_cache(self, digest: Digest) -> None: 

230 """Remove an entry from the ActionCache 

231 

232 Args: 

233 digest: entry to remove from the ActionCache 

234 

235 Returns: 

236 None 

237 """ 

238 if not self._allow_updates: 

239 raise NotImplementedError("Updating cache not allowed") 

240 

241 self._s3cache.delete_object(Bucket=self._get_bucket_name(digest), Key=self._get_key(digest)) 

242 

243 @staticmethod 

244 def _get_key(digest: Digest) -> str: 

245 """ 

246 Given a `Digest`, returns the key used to store its 

247 corresponding entry in S3. 

248 """ 

249 return f"{digest.hash}_{digest.size_bytes}" 

250 

251 def _get_bucket_name(self, digest: Digest) -> str: 

252 """Return the formatted bucket name for a given digest. 

253 

254 This formats the bucket template defined in the configuration file 

255 using a digest to be stored in the cache. This allows the cache 

256 contents to be sharded across multiple S3 buckets, allowing us to 

257 cache more Actions than can be stored in a single S3 bucket. 

258 

259 Currently the only variable interpolated into the template is 

260 ``digest``, which contains the hash part of the Digest. A template 

261 string which includes undefined variables will result in a 

262 non-functional ActionCache. 

263 

264 Args: 

265 digest (Digest): The digest to get the bucket name for. 

266 

267 Returns: 

268 str: The bucket name corresponding to the given Digest. 

269 

270 """ 

271 try: 

272 return self._bucket_template.format(digest=digest.hash) 

273 except IndexError: 

274 LOGGER.error( 

275 f"Could not generate bucket name for digest=[{digest.hash}]. " 

276 "This is either a misconfiguration in the BuildGrid S3 " 

277 "ActionCache bucket configuration, or a badly formed request." 

278 ) 

279 raise 

280 

281 def _get_s3object(self, digest: Digest) -> s3utils.S3Object: 

282 return s3utils.S3Object(self._get_bucket_name(digest), self._get_key(digest))