Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/sharded_cache.py: 86.11%

36 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-07-10 13:10 +0000

1# Copyright (C) 2025 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from contextlib import ExitStack 

17 

18import mmh3 

19 

20from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

21from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

22from buildgrid.server.logging import buildgrid_logger 

23 

24LOGGER = buildgrid_logger(__name__) 

25 

26 

27class ShardedActionCache(ActionCacheABC): 

28 """ 

29 

30 This is a wrapper intended to be used to compose an Action Cache of 

31 multiple other Action Caches, e.g. for sharding a Redis Action Cache. 

32 

33 Requests are forwarded on to a specific shard based on the shard name, 

34 instance name, and Action Digest. 

35 

36 """ 

37 

38 def __init__( 

39 self, 

40 shards: dict[str, ActionCacheABC], 

41 allow_updates: bool = True, 

42 cache_failed_actions: bool = True, 

43 cache_key_salt: str | None = None, 

44 ) -> None: 

45 """Initialise a new sharded Action Cache. 

46 

47 Args: 

48 shards (dict[str, ActionCacheABC]): Mapping of shard name to cache shard. 

49 

50 allow_updates (bool): Allow updates to be pushed to the Action Cache. 

51 Individual shards may specify this separately, which will effectively 

52 override the value defined here for that shard. Defaults to ``True``. 

53 

54 cache_failed_actions (bool): Whether to store failed (non-zero exit 

55 code) actions. Individual shards may specify this separately, which will 

56 effectively override the value defined here for that shard. Default to 

57 ``True``. 

58 

59 cache_key_salt (str): If provided, included in the Redis key for cache entries. Use 

60 to isolate or share specific chunks of a shared Redis cache. 

61 

62 """ 

63 super().__init__(allow_updates=allow_updates) 

64 self._stack = ExitStack() 

65 self._cache_failed_actions = cache_failed_actions 

66 self._cache_key_salt = cache_key_salt 

67 self._shards = shards 

68 

69 def start(self) -> None: 

70 for shard in self._shards.values(): 

71 self._stack.enter_context(shard) 

72 

73 def stop(self) -> None: 

74 self._stack.close() 

75 

76 def _cache_from_digest(self, digest: Digest) -> ActionCacheABC: 

77 def _score(shard_name: str, digest: Digest) -> int: 

78 key = self._get_key(digest) 

79 hash = mmh3.hash(f"{shard_name}\t{key}", signed=False) 

80 return hash 

81 

82 shard_name = min(self._shards.keys(), key=lambda name: _score(name, digest)) 

83 return self._shards[shard_name] 

84 

85 def get_action_result(self, action_digest: Digest) -> ActionResult: 

86 """Retrieves the cached result for an Action. 

87 

88 Determines the expected shard, and attempts to retrieve the ActionResult from 

89 that shard. If the result is not found, a NotFoundError is raised. 

90 

91 Args: 

92 action_digest (Digest): The digest of the Action to retrieve the 

93 cached result of. 

94 

95 """ 

96 return self._cache_from_digest(action_digest).get_action_result(action_digest) 

97 

98 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

99 """Stores a result for an Action in the cache. 

100 

101 Determines which shard a result should be stored in, and attempts to store 

102 it. If the result has a non-zero exit code and `cache_failed_actions` is False 

103 for either this shard or the whole cache, the result is not cached. 

104 

105 Args: 

106 action_digest (Digest): The digest of the Action whose result is 

107 being cached. 

108 

109 action_result (ActionResult): The result to cache for the given 

110 Action digest. 

111 

112 """ 

113 if not self._allow_updates: 

114 raise NotImplementedError("Updating cache not allowed") 

115 

116 if self._cache_failed_actions or action_result.exit_code == 0: 

117 self._cache_from_digest(action_digest).update_action_result(action_digest, action_result) 

118 

119 def _get_key(self, action_digest: Digest) -> str: 

120 if self._cache_key_salt is None: 

121 return action_digest.hash 

122 return f"{self._cache_key_salt}\t{action_digest.hash}"