Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/redis_cache.py: 98.39%

62 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from typing import Optional 

17 

18from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

19from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

20from buildgrid.server.cas.storage.redis import redis_client_exception_wrapper 

21from buildgrid.server.cas.storage.storage_abc import StorageABC 

22from buildgrid.server.enums import ActionCacheEntryType 

23from buildgrid.server.exceptions import NotFoundError 

24from buildgrid.server.logging import buildgrid_logger 

25from buildgrid.server.redis.provider import RedisProvider 

26 

27LOGGER = buildgrid_logger(__name__) 

28 

29 

30class RedisActionCache(ActionCacheABC): 

31 def __init__( 

32 self, 

33 storage: StorageABC, 

34 redis: RedisProvider, 

35 allow_updates: bool = True, 

36 cache_failed_actions: bool = True, 

37 entry_type: Optional[ActionCacheEntryType] = ActionCacheEntryType.ACTION_RESULT_DIGEST, 

38 migrate_entries: Optional[bool] = False, 

39 ) -> None: 

40 """Initialises a new ActionCache instance using Redis. 

41 Stores the `ActionResult` message as a value. 

42 

43 Args: 

44 storage (StorageABC): storage backend instance to be used to store ActionResults. 

45 redis (RedisProvider): Redis connection provider 

46 allow_updates (bool): allow the client to write to storage 

47 cache_failed_actions (bool): whether to store failed actions in the Action Cache 

48 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests. 

49 migrate_entries (bool): if set, migrate entries that contain a value with 

50 a different `ActionCacheEntryType` to `entry_type` as they are accessed 

51 (False by default). 

52 """ 

53 super().__init__(storage=storage, allow_updates=allow_updates) 

54 

55 self._redis = redis 

56 self._cache_failed_actions = cache_failed_actions 

57 self._entry_type = entry_type 

58 self._migrate_entries = migrate_entries 

59 

60 @redis_client_exception_wrapper 

61 def get_action_result(self, action_digest: Digest) -> ActionResult: 

62 key = self._get_key(action_digest) 

63 action_result = self._get_action_result(key, action_digest) 

64 if action_result is not None: 

65 if self.referenced_blobs_still_exist(action_digest, action_result): 

66 return action_result 

67 

68 if self._allow_updates: 

69 LOGGER.debug( 

70 "Removing digest from cache due to missing blobs in CAS.", tags=dict(digest=action_digest) 

71 ) 

72 self._redis.execute_rw(lambda r: r.delete(key)) 

73 

74 raise NotFoundError(f"Key not found: [{key}]") 

75 

76 @redis_client_exception_wrapper 

77 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

78 if not self._allow_updates: 

79 raise NotImplementedError("Updating cache not allowed") 

80 

81 if self._cache_failed_actions or action_result.exit_code == 0: 

82 assert self._storage, "Storage used before initialization" 

83 action_result_digest = self._storage.put_message(action_result) 

84 

85 cache_key = self._get_key(action_digest) 

86 

87 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST: 

88 self._redis.execute_rw(lambda r: r.set(cache_key, action_result_digest.SerializeToString())) 

89 else: 

90 self._redis.execute_rw(lambda r: r.set(cache_key, action_result.SerializeToString())) 

91 

92 LOGGER.info("Result cached for action.", tags=dict(digest=action_digest)) 

93 

94 def _get_key(self, action_digest: Digest) -> str: 

95 return f"action-cache.{action_digest.hash}_{action_digest.size_bytes}" 

96 

97 def _get_action_result(self, key: str, action_digest: Digest) -> Optional[ActionResult]: 

98 value_in_cache = self._redis.execute_ro(lambda r: r.get(key)) 

99 

100 if value_in_cache is None: 

101 return None 

102 

103 # Attempting to parse the entry as a `Digest` first: 

104 action_result_digest = Digest.FromString(value_in_cache) 

105 if len(action_result_digest.hash) == len(action_digest.hash): 

106 # The cache contains the `Digest` of the `ActionResult`: 

107 assert self._storage, "Storage used before initialization" 

108 action_result = self._storage.get_message(action_result_digest, ActionResult) 

109 

110 # If configured, update the entry to contain an `ActionResult`: 

111 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries: 

112 LOGGER.debug("Converting entry from Digest to ActionResult.", tags=dict(digest=action_digest)) 

113 assert action_result, "Returned result was none" 

114 result = action_result.SerializeToString() 

115 self._redis.execute_rw(lambda r: r.set(key, result)) 

116 else: 

117 action_result = ActionResult.FromString(value_in_cache) 

118 

119 # If configured, update the entry to contain a `Digest`: 

120 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries: 

121 LOGGER.debug("Converting entry from ActionResult to Digest.", tags=dict(digest=action_digest)) 

122 assert self._storage, "Storage used before initialization" 

123 action_result_digest = self._storage.put_message(action_result) 

124 self._redis.execute_rw(lambda r: r.set(key, action_result_digest.SerializeToString())) 

125 

126 return action_result