Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/redis_cache.py: 98.39%
62 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from typing import Optional
18from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest
19from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
20from buildgrid.server.cas.storage.redis import redis_client_exception_wrapper
21from buildgrid.server.cas.storage.storage_abc import StorageABC
22from buildgrid.server.enums import ActionCacheEntryType
23from buildgrid.server.exceptions import NotFoundError
24from buildgrid.server.logging import buildgrid_logger
25from buildgrid.server.redis.provider import RedisProvider
27LOGGER = buildgrid_logger(__name__)
30class RedisActionCache(ActionCacheABC):
31 def __init__(
32 self,
33 storage: StorageABC,
34 redis: RedisProvider,
35 allow_updates: bool = True,
36 cache_failed_actions: bool = True,
37 entry_type: Optional[ActionCacheEntryType] = ActionCacheEntryType.ACTION_RESULT_DIGEST,
38 migrate_entries: Optional[bool] = False,
39 ) -> None:
40 """Initialises a new ActionCache instance using Redis.
41 Stores the `ActionResult` message as a value.
43 Args:
44 storage (StorageABC): storage backend instance to be used to store ActionResults.
45 redis (RedisProvider): Redis connection provider
46 allow_updates (bool): allow the client to write to storage
47 cache_failed_actions (bool): whether to store failed actions in the Action Cache
48 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests.
49 migrate_entries (bool): if set, migrate entries that contain a value with
50 a different `ActionCacheEntryType` to `entry_type` as they are accessed
51 (False by default).
52 """
53 super().__init__(storage=storage, allow_updates=allow_updates)
55 self._redis = redis
56 self._cache_failed_actions = cache_failed_actions
57 self._entry_type = entry_type
58 self._migrate_entries = migrate_entries
60 @redis_client_exception_wrapper
61 def get_action_result(self, action_digest: Digest) -> ActionResult:
62 key = self._get_key(action_digest)
63 action_result = self._get_action_result(key, action_digest)
64 if action_result is not None:
65 if self.referenced_blobs_still_exist(action_digest, action_result):
66 return action_result
68 if self._allow_updates:
69 LOGGER.debug(
70 "Removing digest from cache due to missing blobs in CAS.", tags=dict(digest=action_digest)
71 )
72 self._redis.execute_rw(lambda r: r.delete(key))
74 raise NotFoundError(f"Key not found: [{key}]")
76 @redis_client_exception_wrapper
77 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None:
78 if not self._allow_updates:
79 raise NotImplementedError("Updating cache not allowed")
81 if self._cache_failed_actions or action_result.exit_code == 0:
82 assert self._storage, "Storage used before initialization"
83 action_result_digest = self._storage.put_message(action_result)
85 cache_key = self._get_key(action_digest)
87 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST:
88 self._redis.execute_rw(lambda r: r.set(cache_key, action_result_digest.SerializeToString()))
89 else:
90 self._redis.execute_rw(lambda r: r.set(cache_key, action_result.SerializeToString()))
92 LOGGER.info("Result cached for action.", tags=dict(digest=action_digest))
94 def _get_key(self, action_digest: Digest) -> str:
95 return f"action-cache.{action_digest.hash}_{action_digest.size_bytes}"
97 def _get_action_result(self, key: str, action_digest: Digest) -> Optional[ActionResult]:
98 value_in_cache = self._redis.execute_ro(lambda r: r.get(key))
100 if value_in_cache is None:
101 return None
103 # Attempting to parse the entry as a `Digest` first:
104 action_result_digest = Digest.FromString(value_in_cache)
105 if len(action_result_digest.hash) == len(action_digest.hash):
106 # The cache contains the `Digest` of the `ActionResult`:
107 assert self._storage, "Storage used before initialization"
108 action_result = self._storage.get_message(action_result_digest, ActionResult)
110 # If configured, update the entry to contain an `ActionResult`:
111 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries:
112 LOGGER.debug("Converting entry from Digest to ActionResult.", tags=dict(digest=action_digest))
113 assert action_result, "Returned result was none"
114 result = action_result.SerializeToString()
115 self._redis.execute_rw(lambda r: r.set(key, result))
116 else:
117 action_result = ActionResult.FromString(value_in_cache)
119 # If configured, update the entry to contain a `Digest`:
120 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries:
121 LOGGER.debug("Converting entry from ActionResult to Digest.", tags=dict(digest=action_digest))
122 assert self._storage, "Storage used before initialization"
123 action_result_digest = self._storage.put_message(action_result)
124 self._redis.execute_rw(lambda r: r.set(key, action_result_digest.SerializeToString()))
126 return action_result