Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/redis_cache.py: 98.21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

56 statements  

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17from typing import Optional 

18 

19import redis 

20 

21from buildgrid._enums import ActionCacheEntryType 

22from buildgrid._exceptions import NotFoundError 

23from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

24 ActionResult, 

25 Digest 

26) 

27from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

28from buildgrid.server.cas.storage.redis import redis_client_exception_wrapper 

29from buildgrid.server.cas.storage.storage_abc import StorageABC 

30 

31 

32class RedisActionCache(ActionCacheABC): 

33 

34 def __init__(self, storage: StorageABC, allow_updates: bool=True, 

35 cache_failed_actions: bool=True, 

36 entry_type: Optional[ActionCacheEntryType] = ActionCacheEntryType.ACTION_RESULT_DIGEST, 

37 migrate_entries: Optional[bool] = False, 

38 host: str='localhost', 

39 port: int=6379, password: Optional[str]=None, db: int=0): 

40 """ Initialises a new ActionCache instance using Redis. 

41 Stores the `ActionResult` message as a value. 

42 

43 Args: 

44 storage (StorageABC): storage backend instance to be used to store ActionResults. 

45 allow_updates (bool): allow the client to write to storage 

46 cache_failed_actions (bool): whether to store failed actions in the Action Cache 

47 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests. 

48 migrate_entries (bool): if set, migrate entries that contain a value with 

49 a different `ActionCacheEntryType` to `entry_type` as they are accessed 

50 (False by default). 

51 host (str): Redis host (default: localhost) 

52 port (int): Redis port (default: 6379) 

53 password (str): Redis password 

54 db (int): Redis database number 

55 """ 

56 ActionCacheABC.__init__(self, storage=storage, allow_updates=allow_updates) 

57 

58 self._logger = logging.getLogger(__name__) 

59 # Setting decode_responses=False fixes the folowing mypy error: 

60 # Argument 1 to "FromString" of "Message" has incompatible type "str"; expected "ByteString" 

61 self._client = redis.Redis(host=host, port=port, password=password, db=db, decode_responses=False) 

62 self._cache_failed_actions = cache_failed_actions 

63 

64 self._entry_type = entry_type 

65 self._migrate_entries = migrate_entries 

66 

67 @redis_client_exception_wrapper 

68 def get_action_result(self, action_digest: Digest) -> ActionResult: 

69 key = self._get_key(action_digest) 

70 action_result = self._get_action_result(key, action_digest) 

71 if action_result is not None and self._action_result_blobs_still_exist(action_result): 

72 return action_result 

73 

74 if self._allow_updates: 

75 self._logger.debug(f"Removing {action_digest.hash}/{action_digest.size_bytes}" 

76 "from cache due to missing blobs in CAS") 

77 self._client.delete(key) 

78 

79 raise NotFoundError(f"Key not found: [{key}]") 

80 

81 @redis_client_exception_wrapper 

82 def update_action_result(self, action_digest: Digest, 

83 action_result: ActionResult) -> None: 

84 if not self._allow_updates: 

85 raise NotImplementedError("Updating cache not allowed") 

86 

87 if self._cache_failed_actions or action_result.exit_code == 0: 

88 action_result_digest = self._storage.put_message(action_result) 

89 

90 cache_key = self._get_key(action_digest) 

91 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST: 

92 self._client.set(cache_key, action_result_digest.SerializeToString()) 

93 else: 

94 self._client.set(cache_key, action_result.SerializeToString()) 

95 

96 self._logger.info( 

97 f"Result cached for action [{action_digest.hash}/{action_digest.size_bytes}]") 

98 

99 def _get_key(self, action_digest: Digest) -> str: 

100 return f'action-cache.{action_digest.hash}_{action_digest.size_bytes}' 

101 

102 def _get_action_result(self, key: str, action_digest: Digest) -> Optional[ActionResult]: 

103 value_in_cache = self._client.get(key) 

104 if value_in_cache is None: 

105 return None 

106 

107 # Attempting to parse the entry as a `Digest` first: 

108 action_result_digest = Digest.FromString(value_in_cache) 

109 if len(action_result_digest.hash) == len(action_digest.hash): 

110 # The cache contains the `Digest` of the `ActionResult`: 

111 action_result = self._storage.get_message(action_result_digest, 

112 ActionResult) 

113 

114 # If configured, update the entry to contain an `ActionResult`: 

115 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries: 

116 self._logger.debug(f"Converting entry for {action_digest.hash}/{action_digest.size_bytes} " 

117 "from Digest to ActionResult") 

118 self._client.set(key, action_result.SerializeToString()) 

119 else: 

120 action_result = ActionResult.FromString(value_in_cache) 

121 

122 # If configured, update the entry to contain a `Digest`: 

123 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries: 

124 self._logger.debug(f"Converting entry for {action_digest.hash}/{action_digest.size_bytes} " 

125 "from ActionResult to Digest") 

126 action_result_digest = self._storage.put_message(action_result) 

127 self._client.set(key, action_result_digest.SerializeToString()) 

128 

129 return action_result