Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/redis_cache.py: 98.46%
65 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-28 16:20 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-28 16:20 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import logging
16from typing import Optional
18from buildgrid._enums import ActionCacheEntryType
19from buildgrid._exceptions import NotFoundError
20from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest
21from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
22from buildgrid.server.cas.storage.redis import redis_client_exception_wrapper
23from buildgrid.server.cas.storage.storage_abc import StorageABC
24from buildgrid.server.metrics_names import AC_UNUSABLE_CACHE_HITS_METRIC_NAME
25from buildgrid.server.metrics_utils import publish_counter_metric
26from buildgrid.server.redis.provider import RedisProvider
28LOGGER = logging.getLogger(__name__)
31class RedisActionCache(ActionCacheABC):
32 def __init__(
33 self,
34 storage: StorageABC,
35 redis: RedisProvider,
36 allow_updates: bool = True,
37 cache_failed_actions: bool = True,
38 entry_type: Optional[ActionCacheEntryType] = ActionCacheEntryType.ACTION_RESULT_DIGEST,
39 migrate_entries: Optional[bool] = False,
40 ) -> None:
41 """Initialises a new ActionCache instance using Redis.
42 Stores the `ActionResult` message as a value.
44 Args:
45 storage (StorageABC): storage backend instance to be used to store ActionResults.
46 redis (RedisProvider): Redis connection provider
47 allow_updates (bool): allow the client to write to storage
48 cache_failed_actions (bool): whether to store failed actions in the Action Cache
49 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests.
50 migrate_entries (bool): if set, migrate entries that contain a value with
51 a different `ActionCacheEntryType` to `entry_type` as they are accessed
52 (False by default).
53 """
54 super().__init__(storage=storage, allow_updates=allow_updates)
56 self._redis = redis
57 self._cache_failed_actions = cache_failed_actions
58 self._entry_type = entry_type
59 self._migrate_entries = migrate_entries
61 @redis_client_exception_wrapper
62 def get_action_result(self, action_digest: Digest) -> ActionResult:
63 key = self._get_key(action_digest)
64 action_result = self._get_action_result(key, action_digest)
65 if action_result is not None:
66 if self._action_result_blobs_still_exist(action_result):
67 return action_result
69 publish_counter_metric(AC_UNUSABLE_CACHE_HITS_METRIC_NAME, 1, {"instance_name": self._instance_name})
71 if self._allow_updates:
72 LOGGER.debug(
73 f"Removing {action_digest.hash}/{action_digest.size_bytes}"
74 "from cache due to missing blobs in CAS"
75 )
76 self._redis.execute_rw(lambda r: r.delete(key))
78 raise NotFoundError(f"Key not found: [{key}]")
80 @redis_client_exception_wrapper
81 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None:
82 if not self._allow_updates:
83 raise NotImplementedError("Updating cache not allowed")
85 if self._cache_failed_actions or action_result.exit_code == 0:
86 assert self._storage, "Storage used before initialization"
87 action_result_digest = self._storage.put_message(action_result)
89 cache_key = self._get_key(action_digest)
91 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST:
92 self._redis.execute_rw(lambda r: r.set(cache_key, action_result_digest.SerializeToString()))
93 else:
94 self._redis.execute_rw(lambda r: r.set(cache_key, action_result.SerializeToString()))
96 LOGGER.info(f"Result cached for action [{action_digest.hash}/{action_digest.size_bytes}]")
98 def _get_key(self, action_digest: Digest) -> str:
99 return f"action-cache.{action_digest.hash}_{action_digest.size_bytes}"
101 def _get_action_result(self, key: str, action_digest: Digest) -> Optional[ActionResult]:
102 value_in_cache = self._redis.execute_ro(lambda r: r.get(key))
104 if value_in_cache is None:
105 return None
107 # Attempting to parse the entry as a `Digest` first:
108 action_result_digest = Digest.FromString(value_in_cache)
109 if len(action_result_digest.hash) == len(action_digest.hash):
110 # The cache contains the `Digest` of the `ActionResult`:
111 assert self._storage, "Storage used before initialization"
112 action_result = self._storage.get_message(action_result_digest, ActionResult)
114 # If configured, update the entry to contain an `ActionResult`:
115 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries:
116 LOGGER.debug(
117 f"Converting entry for {action_digest.hash}/{action_digest.size_bytes} "
118 "from Digest to ActionResult"
119 )
120 assert action_result, "Returned result was none"
121 result = action_result.SerializeToString()
122 self._redis.execute_rw(lambda r: r.set(key, result))
123 else:
124 action_result = ActionResult.FromString(value_in_cache)
126 # If configured, update the entry to contain a `Digest`:
127 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries:
128 LOGGER.debug(
129 f"Converting entry for {action_digest.hash}/{action_digest.size_bytes} "
130 "from ActionResult to Digest"
131 )
132 assert self._storage, "Storage used before initialization"
133 action_result_digest = self._storage.put_message(action_result)
134 self._redis.execute_rw(lambda r: r.set(key, action_result_digest.SerializeToString()))
136 return action_result