Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/sharded_cache.py: 86.11%
36 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-07-10 13:10 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-07-10 13:10 +0000
1# Copyright (C) 2025 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from contextlib import ExitStack
18import mmh3
20from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest
21from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
22from buildgrid.server.logging import buildgrid_logger
24LOGGER = buildgrid_logger(__name__)
27class ShardedActionCache(ActionCacheABC):
28 """
30 This is a wrapper intended to be used to compose an Action Cache of
31 multiple other Action Caches, e.g. for sharding a Redis Action Cache.
33 Requests are forwarded on to a specific shard based on the shard name,
34 instance name, and Action Digest.
36 """
38 def __init__(
39 self,
40 shards: dict[str, ActionCacheABC],
41 allow_updates: bool = True,
42 cache_failed_actions: bool = True,
43 cache_key_salt: str | None = None,
44 ) -> None:
45 """Initialise a new sharded Action Cache.
47 Args:
48 shards (dict[str, ActionCacheABC]): Mapping of shard name to cache shard.
50 allow_updates (bool): Allow updates to be pushed to the Action Cache.
51 Individual shards may specify this separately, which will effectively
52 override the value defined here for that shard. Defaults to ``True``.
54 cache_failed_actions (bool): Whether to store failed (non-zero exit
55 code) actions. Individual shards may specify this separately, which will
56 effectively override the value defined here for that shard. Default to
57 ``True``.
59 cache_key_salt (str): If provided, included in the Redis key for cache entries. Use
60 to isolate or share specific chunks of a shared Redis cache.
62 """
63 super().__init__(allow_updates=allow_updates)
64 self._stack = ExitStack()
65 self._cache_failed_actions = cache_failed_actions
66 self._cache_key_salt = cache_key_salt
67 self._shards = shards
69 def start(self) -> None:
70 for shard in self._shards.values():
71 self._stack.enter_context(shard)
73 def stop(self) -> None:
74 self._stack.close()
76 def _cache_from_digest(self, digest: Digest) -> ActionCacheABC:
77 def _score(shard_name: str, digest: Digest) -> int:
78 key = self._get_key(digest)
79 hash = mmh3.hash(f"{shard_name}\t{key}", signed=False)
80 return hash
82 shard_name = min(self._shards.keys(), key=lambda name: _score(name, digest))
83 return self._shards[shard_name]
85 def get_action_result(self, action_digest: Digest) -> ActionResult:
86 """Retrieves the cached result for an Action.
88 Determines the expected shard, and attempts to retrieve the ActionResult from
89 that shard. If the result is not found, a NotFoundError is raised.
91 Args:
92 action_digest (Digest): The digest of the Action to retrieve the
93 cached result of.
95 """
96 return self._cache_from_digest(action_digest).get_action_result(action_digest)
98 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None:
99 """Stores a result for an Action in the cache.
101 Determines which shard a result should be stored in, and attempts to store
102 it. If the result has a non-zero exit code and `cache_failed_actions` is False
103 for either this shard or the whole cache, the result is not cached.
105 Args:
106 action_digest (Digest): The digest of the Action whose result is
107 being cached.
109 action_result (ActionResult): The result to cache for the given
110 Action digest.
112 """
113 if not self._allow_updates:
114 raise NotImplementedError("Updating cache not allowed")
116 if self._cache_failed_actions or action_result.exit_code == 0:
117 self._cache_from_digest(action_digest).update_action_result(action_digest, action_result)
119 def _get_key(self, action_digest: Digest) -> str:
120 if self._cache_key_salt is None:
121 return action_digest.hash
122 return f"{self._cache_key_salt}\t{action_digest.hash}"