Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/s3_cache.py: 92.86%
112 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17S3 Action Cache
18==================
20Implements an Action Cache using S3 to store cache entries.
22"""
24import io
25import logging
26from typing import Any, Dict, Optional
28import boto3
29from botocore.config import Config as BotoConfig
30from botocore.exceptions import ClientError
32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest
33from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
34from buildgrid.server.cas.storage.storage_abc import StorageABC
35from buildgrid.server.enums import ActionCacheEntryType
36from buildgrid.server.exceptions import NotFoundError, StorageFullError
37from buildgrid.server.logging import buildgrid_logger
38from buildgrid.server.s3 import s3utils
40LOGGER = buildgrid_logger(__name__)
43class S3ActionCache(ActionCacheABC):
44 def __init__(
45 self,
46 storage: StorageABC,
47 allow_updates: bool = True,
48 cache_failed_actions: bool = True,
49 entry_type: Optional[ActionCacheEntryType] = ActionCacheEntryType.ACTION_RESULT_DIGEST,
50 migrate_entries: Optional[bool] = False,
51 bucket: Optional[str] = None,
52 endpoint_url: Optional[str] = None,
53 aws_access_key_id: Optional[str] = None,
54 aws_secret_access_key: Optional[str] = None,
55 config: Optional[BotoConfig] = BotoConfig(),
56 ):
57 """Initialises a new ActionCache instance using S3 to persist the action cache.
59 Args:
60 storage (StorageABC): storage backend instance to be used to store ActionResults.
61 allow_updates (bool): allow the client to write to storage
62 cache_failed_actions (bool): whether to store failed actions in the Action Cache
63 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests.
64 migrate_entries (bool): if set, migrate entries that contain a value with
65 a different `EntryType` to `entry_type` as they are accessed
66 (False by default).
68 bucket (str): Name of bucket
69 endpoint_url (str): URL of endpoint.
70 aws_access_key_id (str): S3-ACCESS-KEY
71 aws_secret_access_key (str): S3-SECRET-KEY
72 """
73 super().__init__(storage=storage, allow_updates=allow_updates)
75 self._entry_type = entry_type
76 self._migrate_entries = migrate_entries
78 self._cache_failed_actions = cache_failed_actions
79 assert bucket is not None
80 self._bucket_template = bucket
82 # Boto logs can be very verbose, restrict to WARNING
83 for boto_logger_name in ["boto3", "botocore", "s3transfer", "urllib3"]:
84 boto_logger = logging.getLogger(boto_logger_name)
85 boto_logger.setLevel(max(boto_logger.level, logging.WARNING))
87 # Only pass arguments with a value to support testing with moto_server
88 client_kwargs: Dict[str, Any] = {}
89 if endpoint_url is not None:
90 client_kwargs["endpoint_url"] = endpoint_url
91 if aws_access_key_id is not None:
92 client_kwargs["aws_access_key_id"] = aws_access_key_id
93 if aws_secret_access_key is not None:
94 client_kwargs["aws_secret_access_key"] = aws_secret_access_key
95 client_kwargs["config"] = config
97 self._s3cache = boto3.client("s3", **client_kwargs)
99 # --- Public API ---
100 @property
101 def allow_updates(self) -> bool:
102 return self._allow_updates
104 def get_action_result(self, action_digest: Digest) -> ActionResult:
105 """Retrieves the cached ActionResult for the given Action digest.
107 Args:
108 action_digest: The digest to get the result for
110 Returns:
111 The cached ActionResult matching the given key or raises
112 NotFoundError.
113 """
114 action_result = self._get_action_result(action_digest)
115 if action_result is not None:
116 if self.referenced_blobs_still_exist(action_digest, action_result):
117 return action_result
119 if self._allow_updates:
120 LOGGER.debug(
121 "Removing action from cache due to missing blobs in CAS.",
122 tags=dict(digest=action_digest),
123 )
124 self._delete_key_from_cache(action_digest)
126 raise NotFoundError(f"Key not found: {action_digest.hash}/{action_digest.size_bytes}")
128 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None:
129 """Stores the result in cache for the given key.
131 Args:
132 action_digest (Digest): digest of Action to update
133 action_result (ActionResult): ActionResult to store.
134 """
135 if not self._allow_updates:
136 raise NotImplementedError("Updating cache not allowed")
138 if self._cache_failed_actions or action_result.exit_code == 0:
139 assert self._storage, "Storage used before initialization"
140 action_result_digest = self._storage.put_message(action_result)
142 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST:
143 self._update_cache_key(action_digest, action_result_digest.SerializeToString())
144 else:
145 self._update_cache_key(action_digest, action_result.SerializeToString())
147 LOGGER.info("Result cached for action.", tags=dict(digest=action_digest))
149 # --- Private API ---
150 def _get_action_result(self, digest: Digest) -> Optional[ActionResult]:
151 """Get an `ActionResult` from the cache.
153 If present, returns the `ActionResult` corresponding to the given digest.
154 Otherwise returns None.
156 When configured to do so, if the value stored in the entry in S3 contains
157 a different type, it will convert it to `entry_type`.
159 Args:
160 digest: Action digest to get the associated ActionResult digest for
162 Returns:
163 The `ActionResult` or None if the digest is not present
164 """
165 value_in_cache = self._get_value_from_cache(digest)
166 if not value_in_cache:
167 return None
169 # Attempting to parse the entry as a `Digest` first:
170 action_result_digest = Digest.FromString(value_in_cache)
171 if len(action_result_digest.hash) == len(digest.hash):
172 # The cache contains the `Digest` of the `ActionResult`:
173 assert self._storage, "Storage used before initialization"
174 action_result = self._storage.get_message(action_result_digest, ActionResult)
176 # If configured, update the entry to contain an `ActionResult`:
177 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries:
178 LOGGER.debug("Converting entry from Digest to ActionResult.", tags=dict(digest=digest))
179 assert action_result, "Returned result was none"
180 self._update_cache_key(digest, action_result.SerializeToString())
182 else:
183 action_result = ActionResult.FromString(value_in_cache)
185 # If configured, update the entry to contain a `Digest`:
186 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries:
187 LOGGER.debug("Converting entry from ActionResult to Digest.", tags=dict(digest=digest))
188 assert self._storage, "Storage used before initialization"
189 action_result_digest = self._storage.put_message(action_result)
190 self._update_cache_key(digest, action_result_digest.SerializeToString())
192 return action_result
194 def _get_value_from_cache(self, digest: Digest) -> Optional[bytes]:
195 """Get the bytes stored in cache for the given Digest.
197 Args:
198 digest: Action digest to get the associated bytes in S3
200 Returns:
201 bytes or None if the digest is not present
202 """
204 try:
205 s3object = self._get_s3object(digest)
206 s3object.fileobj = io.BytesIO()
207 s3utils.get_object(self._s3cache, s3object)
208 s3object.fileobj.seek(0)
209 return s3object.fileobj.read()
210 except ClientError as e:
211 if e.response["Error"]["Code"] not in ["404", "NoSuchKey"]:
212 raise
213 return None
215 def _update_cache_key(self, digest: Digest, value: bytes) -> None:
216 try:
217 s3object = self._get_s3object(digest)
218 s3object.fileobj = io.BytesIO(value)
219 s3object.filesize = len(value)
220 s3utils.put_object(self._s3cache, s3object)
221 except ClientError as error:
222 if error.response["Error"]["Code"] == "QuotaExceededException":
223 raise StorageFullError("ActionCache S3 Quota Exceeded.") from error
224 raise error
226 def _delete_key_from_cache(self, digest: Digest) -> None:
227 """Remove an entry from the ActionCache
229 Args:
230 digest: entry to remove from the ActionCache
232 Returns:
233 None
234 """
235 if not self._allow_updates:
236 raise NotImplementedError("Updating cache not allowed")
238 self._s3cache.delete_object(Bucket=self._get_bucket_name(digest), Key=self._get_key(digest))
240 @staticmethod
241 def _get_key(digest: Digest) -> str:
242 """
243 Given a `Digest`, returns the key used to store its
244 corresponding entry in S3.
245 """
246 return f"{digest.hash}_{digest.size_bytes}"
248 def _get_bucket_name(self, digest: Digest) -> str:
249 """Return the formatted bucket name for a given digest.
251 This formats the bucket template defined in the configuration file
252 using a digest to be stored in the cache. This allows the cache
253 contents to be sharded across multiple S3 buckets, allowing us to
254 cache more Actions than can be stored in a single S3 bucket.
256 Currently the only variable interpolated into the template is
257 ``digest``, which contains the hash part of the Digest. A template
258 string which includes undefined variables will result in a
259 non-functional ActionCache.
261 Args:
262 digest (Digest): The digest to get the bucket name for.
264 Returns:
265 str: The bucket name corresponding to the given Digest.
267 """
268 try:
269 return self._bucket_template.format(digest=digest.hash)
270 except IndexError:
271 LOGGER.error(
272 (
273 "Could not generate bucket name for digest. "
274 "This is either a misconfiguration in the BuildGrid S3 "
275 "ActionCache bucket configuration, or a badly formed request."
276 ),
277 tags=dict(digest=digest),
278 )
279 raise
281 def _get_s3object(self, digest: Digest) -> s3utils.S3Object:
282 return s3utils.S3Object(self._get_bucket_name(digest), self._get_key(digest))