Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/redis_cache.py: 98.44%
64 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest
17from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
18from buildgrid.server.cas.storage.redis import redis_client_exception_wrapper
19from buildgrid.server.cas.storage.storage_abc import StorageABC
20from buildgrid.server.enums import ActionCacheEntryType
21from buildgrid.server.exceptions import NotFoundError
22from buildgrid.server.logging import buildgrid_logger
23from buildgrid.server.redis.provider import RedisProvider
25LOGGER = buildgrid_logger(__name__)
28class RedisActionCache(ActionCacheABC):
29 def __init__(
30 self,
31 storage: StorageABC,
32 redis: RedisProvider,
33 allow_updates: bool = True,
34 cache_failed_actions: bool = True,
35 entry_type: ActionCacheEntryType | None = ActionCacheEntryType.ACTION_RESULT_DIGEST,
36 migrate_entries: bool | None = False,
37 cache_key_salt: str | None = None,
38 ) -> None:
39 """Initialises a new ActionCache instance using Redis.
40 Stores the `ActionResult` message as a value.
42 Args:
43 storage (StorageABC): storage backend instance to be used to store ActionResults.
44 redis (RedisProvider): Redis connection provider
45 allow_updates (bool): allow the client to write to storage
46 cache_failed_actions (bool): whether to store failed actions in the Action Cache
47 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests.
48 migrate_entries (bool): if set, migrate entries that contain a value with
49 a different `ActionCacheEntryType` to `entry_type` as they are accessed
50 (False by default).
51 cache_key_salt (str): if provided, included in the Redis key for cache entries. Use
52 to isolate or share specific chunks of a shared Redis cache.
53 """
54 super().__init__(storage=storage, allow_updates=allow_updates)
56 self._redis = redis
57 self._cache_failed_actions = cache_failed_actions
58 self._cache_key_salt = cache_key_salt
59 self._entry_type = entry_type
60 self._migrate_entries = migrate_entries
62 @redis_client_exception_wrapper
63 def get_action_result(self, action_digest: Digest) -> ActionResult:
64 key = self._get_key(action_digest)
65 action_result = self._get_action_result(key, action_digest)
66 if action_result is not None:
67 if self.referenced_blobs_still_exist(action_digest, action_result):
68 return action_result
70 if self._allow_updates:
71 LOGGER.debug(
72 "Removing digest from cache due to missing blobs in CAS.", tags=dict(digest=action_digest)
73 )
74 self._redis.execute_rw(lambda r: r.delete(key))
76 raise NotFoundError(f"Key not found: [{key}]")
78 @redis_client_exception_wrapper
79 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None:
80 if not self._allow_updates:
81 raise NotImplementedError("Updating cache not allowed")
83 if self._cache_failed_actions or action_result.exit_code == 0:
84 assert self._storage, "Storage used before initialization"
85 action_result_digest = self._storage.put_message(action_result)
87 cache_key = self._get_key(action_digest)
89 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST:
90 self._redis.execute_rw(lambda r: r.set(cache_key, action_result_digest.SerializeToString()))
91 else:
92 self._redis.execute_rw(lambda r: r.set(cache_key, action_result.SerializeToString()))
94 LOGGER.info("Result cached for action.", tags=dict(digest=action_digest))
96 def _get_key(self, action_digest: Digest) -> str:
97 if not self._cache_key_salt:
98 return f"action-cache.{action_digest.hash}_{action_digest.size_bytes}"
99 return f"action-cache.{self._cache_key_salt}.{action_digest.hash}_{action_digest.size_bytes}"
101 def _get_action_result(self, key: str, action_digest: Digest) -> ActionResult | None:
102 value_in_cache = self._redis.execute_ro(lambda r: r.get(key))
104 if value_in_cache is None:
105 return None
107 # Attempting to parse the entry as a `Digest` first:
108 action_result_digest = Digest.FromString(value_in_cache)
109 if len(action_result_digest.hash) == len(action_digest.hash):
110 # The cache contains the `Digest` of the `ActionResult`:
111 assert self._storage, "Storage used before initialization"
112 action_result = self._storage.get_message(action_result_digest, ActionResult)
114 # If configured, update the entry to contain an `ActionResult`:
115 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries:
116 LOGGER.debug("Converting entry from Digest to ActionResult.", tags=dict(digest=action_digest))
117 assert action_result, "Returned result was none"
118 result = action_result.SerializeToString()
119 self._redis.execute_rw(lambda r: r.set(key, result))
120 else:
121 action_result = ActionResult.FromString(value_in_cache)
123 # If configured, update the entry to contain a `Digest`:
124 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries:
125 LOGGER.debug("Converting entry from ActionResult to Digest.", tags=dict(digest=action_digest))
126 assert self._storage, "Storage used before initialization"
127 action_result_digest = self._storage.put_message(action_result)
128 self._redis.execute_rw(lambda r: r.set(key, action_result_digest.SerializeToString()))
130 return action_result