Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/redis_cache.py: 98.46%

65 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-03-28 16:20 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16from typing import Optional 

17 

18from buildgrid._enums import ActionCacheEntryType 

19from buildgrid._exceptions import NotFoundError 

20from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

21from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

22from buildgrid.server.cas.storage.redis import redis_client_exception_wrapper 

23from buildgrid.server.cas.storage.storage_abc import StorageABC 

24from buildgrid.server.metrics_names import AC_UNUSABLE_CACHE_HITS_METRIC_NAME 

25from buildgrid.server.metrics_utils import publish_counter_metric 

26from buildgrid.server.redis.provider import RedisProvider 

27 

28LOGGER = logging.getLogger(__name__) 

29 

30 

31class RedisActionCache(ActionCacheABC): 

32 def __init__( 

33 self, 

34 storage: StorageABC, 

35 redis: RedisProvider, 

36 allow_updates: bool = True, 

37 cache_failed_actions: bool = True, 

38 entry_type: Optional[ActionCacheEntryType] = ActionCacheEntryType.ACTION_RESULT_DIGEST, 

39 migrate_entries: Optional[bool] = False, 

40 ) -> None: 

41 """Initialises a new ActionCache instance using Redis. 

42 Stores the `ActionResult` message as a value. 

43 

44 Args: 

45 storage (StorageABC): storage backend instance to be used to store ActionResults. 

46 redis (RedisProvider): Redis connection provider 

47 allow_updates (bool): allow the client to write to storage 

48 cache_failed_actions (bool): whether to store failed actions in the Action Cache 

49 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests. 

50 migrate_entries (bool): if set, migrate entries that contain a value with 

51 a different `ActionCacheEntryType` to `entry_type` as they are accessed 

52 (False by default). 

53 """ 

54 super().__init__(storage=storage, allow_updates=allow_updates) 

55 

56 self._redis = redis 

57 self._cache_failed_actions = cache_failed_actions 

58 self._entry_type = entry_type 

59 self._migrate_entries = migrate_entries 

60 

61 @redis_client_exception_wrapper 

62 def get_action_result(self, action_digest: Digest) -> ActionResult: 

63 key = self._get_key(action_digest) 

64 action_result = self._get_action_result(key, action_digest) 

65 if action_result is not None: 

66 if self._action_result_blobs_still_exist(action_result): 

67 return action_result 

68 

69 publish_counter_metric(AC_UNUSABLE_CACHE_HITS_METRIC_NAME, 1, {"instance_name": self._instance_name}) 

70 

71 if self._allow_updates: 

72 LOGGER.debug( 

73 f"Removing {action_digest.hash}/{action_digest.size_bytes}" 

74 "from cache due to missing blobs in CAS" 

75 ) 

76 self._redis.execute_rw(lambda r: r.delete(key)) 

77 

78 raise NotFoundError(f"Key not found: [{key}]") 

79 

80 @redis_client_exception_wrapper 

81 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

82 if not self._allow_updates: 

83 raise NotImplementedError("Updating cache not allowed") 

84 

85 if self._cache_failed_actions or action_result.exit_code == 0: 

86 assert self._storage, "Storage used before initialization" 

87 action_result_digest = self._storage.put_message(action_result) 

88 

89 cache_key = self._get_key(action_digest) 

90 

91 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST: 

92 self._redis.execute_rw(lambda r: r.set(cache_key, action_result_digest.SerializeToString())) 

93 else: 

94 self._redis.execute_rw(lambda r: r.set(cache_key, action_result.SerializeToString())) 

95 

96 LOGGER.info(f"Result cached for action [{action_digest.hash}/{action_digest.size_bytes}]") 

97 

98 def _get_key(self, action_digest: Digest) -> str: 

99 return f"action-cache.{action_digest.hash}_{action_digest.size_bytes}" 

100 

101 def _get_action_result(self, key: str, action_digest: Digest) -> Optional[ActionResult]: 

102 value_in_cache = self._redis.execute_ro(lambda r: r.get(key)) 

103 

104 if value_in_cache is None: 

105 return None 

106 

107 # Attempting to parse the entry as a `Digest` first: 

108 action_result_digest = Digest.FromString(value_in_cache) 

109 if len(action_result_digest.hash) == len(action_digest.hash): 

110 # The cache contains the `Digest` of the `ActionResult`: 

111 assert self._storage, "Storage used before initialization" 

112 action_result = self._storage.get_message(action_result_digest, ActionResult) 

113 

114 # If configured, update the entry to contain an `ActionResult`: 

115 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries: 

116 LOGGER.debug( 

117 f"Converting entry for {action_digest.hash}/{action_digest.size_bytes} " 

118 "from Digest to ActionResult" 

119 ) 

120 assert action_result, "Returned result was none" 

121 result = action_result.SerializeToString() 

122 self._redis.execute_rw(lambda r: r.set(key, result)) 

123 else: 

124 action_result = ActionResult.FromString(value_in_cache) 

125 

126 # If configured, update the entry to contain a `Digest`: 

127 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries: 

128 LOGGER.debug( 

129 f"Converting entry for {action_digest.hash}/{action_digest.size_bytes} " 

130 "from ActionResult to Digest" 

131 ) 

132 assert self._storage, "Storage used before initialization" 

133 action_result_digest = self._storage.put_message(action_result) 

134 self._redis.execute_rw(lambda r: r.set(key, action_result_digest.SerializeToString())) 

135 

136 return action_result