Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/redis_cache.py: 98.44%

64 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-21 15:45 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

17from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

18from buildgrid.server.cas.storage.redis import redis_client_exception_wrapper 

19from buildgrid.server.cas.storage.storage_abc import StorageABC 

20from buildgrid.server.enums import ActionCacheEntryType 

21from buildgrid.server.exceptions import NotFoundError 

22from buildgrid.server.logging import buildgrid_logger 

23from buildgrid.server.redis.provider import RedisProvider 

24 

25LOGGER = buildgrid_logger(__name__) 

26 

27 

28class RedisActionCache(ActionCacheABC): 

29 def __init__( 

30 self, 

31 storage: StorageABC, 

32 redis: RedisProvider, 

33 allow_updates: bool = True, 

34 cache_failed_actions: bool = True, 

35 entry_type: ActionCacheEntryType | None = ActionCacheEntryType.ACTION_RESULT_DIGEST, 

36 migrate_entries: bool | None = False, 

37 cache_key_salt: str | None = None, 

38 ) -> None: 

39 """Initialises a new ActionCache instance using Redis. 

40 Stores the `ActionResult` message as a value. 

41 

42 Args: 

43 storage (StorageABC): storage backend instance to be used to store ActionResults. 

44 redis (RedisProvider): Redis connection provider 

45 allow_updates (bool): allow the client to write to storage 

46 cache_failed_actions (bool): whether to store failed actions in the Action Cache 

47 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests. 

48 migrate_entries (bool): if set, migrate entries that contain a value with 

49 a different `ActionCacheEntryType` to `entry_type` as they are accessed 

50 (False by default). 

51 cache_key_salt (str): if provided, included in the Redis key for cache entries. Use 

52 to isolate or share specific chunks of a shared Redis cache. 

53 """ 

54 super().__init__(storage=storage, allow_updates=allow_updates) 

55 

56 self._redis = redis 

57 self._cache_failed_actions = cache_failed_actions 

58 self._cache_key_salt = cache_key_salt 

59 self._entry_type = entry_type 

60 self._migrate_entries = migrate_entries 

61 

62 @redis_client_exception_wrapper 

63 def get_action_result(self, action_digest: Digest) -> ActionResult: 

64 key = self._get_key(action_digest) 

65 action_result = self._get_action_result(key, action_digest) 

66 if action_result is not None: 

67 if self.referenced_blobs_still_exist(action_digest, action_result): 

68 return action_result 

69 

70 if self._allow_updates: 

71 LOGGER.debug( 

72 "Removing digest from cache due to missing blobs in CAS.", tags=dict(digest=action_digest) 

73 ) 

74 self._redis.execute_rw(lambda r: r.delete(key)) 

75 

76 raise NotFoundError(f"Key not found: [{key}]") 

77 

78 @redis_client_exception_wrapper 

79 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

80 if not self._allow_updates: 

81 raise NotImplementedError("Updating cache not allowed") 

82 

83 if self._cache_failed_actions or action_result.exit_code == 0: 

84 assert self._storage, "Storage used before initialization" 

85 action_result_digest = self._storage.put_message(action_result) 

86 

87 cache_key = self._get_key(action_digest) 

88 

89 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST: 

90 self._redis.execute_rw(lambda r: r.set(cache_key, action_result_digest.SerializeToString())) 

91 else: 

92 self._redis.execute_rw(lambda r: r.set(cache_key, action_result.SerializeToString())) 

93 

94 LOGGER.info("Result cached for action.", tags=dict(digest=action_digest)) 

95 

96 def _get_key(self, action_digest: Digest) -> str: 

97 if not self._cache_key_salt: 

98 return f"action-cache.{action_digest.hash}_{action_digest.size_bytes}" 

99 return f"action-cache.{self._cache_key_salt}.{action_digest.hash}_{action_digest.size_bytes}" 

100 

101 def _get_action_result(self, key: str, action_digest: Digest) -> ActionResult | None: 

102 value_in_cache = self._redis.execute_ro(lambda r: r.get(key)) 

103 

104 if value_in_cache is None: 

105 return None 

106 

107 # Attempting to parse the entry as a `Digest` first: 

108 action_result_digest = Digest.FromString(value_in_cache) 

109 if len(action_result_digest.hash) == len(action_digest.hash): 

110 # The cache contains the `Digest` of the `ActionResult`: 

111 assert self._storage, "Storage used before initialization" 

112 action_result = self._storage.get_message(action_result_digest, ActionResult) 

113 

114 # If configured, update the entry to contain an `ActionResult`: 

115 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries: 

116 LOGGER.debug("Converting entry from Digest to ActionResult.", tags=dict(digest=action_digest)) 

117 assert action_result, "Returned result was none" 

118 result = action_result.SerializeToString() 

119 self._redis.execute_rw(lambda r: r.set(key, result)) 

120 else: 

121 action_result = ActionResult.FromString(value_in_cache) 

122 

123 # If configured, update the entry to contain a `Digest`: 

124 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries: 

125 LOGGER.debug("Converting entry from ActionResult to Digest.", tags=dict(digest=action_digest)) 

126 assert self._storage, "Storage used before initialization" 

127 action_result_digest = self._storage.put_message(action_result) 

128 self._redis.execute_rw(lambda r: r.set(key, action_result_digest.SerializeToString())) 

129 

130 return action_result