Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/redis_cache.py: 98.25%
57 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import logging
17from typing import Optional
19import redis
21from buildgrid._enums import ActionCacheEntryType
22from buildgrid._exceptions import NotFoundError
23from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import (
24 ActionResult,
25 Digest
26)
27from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
28from buildgrid.server.cas.storage.redis import redis_client_exception_wrapper
29from buildgrid.server.cas.storage.storage_abc import StorageABC
32class RedisActionCache(ActionCacheABC):
34 def __init__(self, storage: StorageABC, allow_updates: bool=True,
35 cache_failed_actions: bool=True,
36 entry_type: Optional[ActionCacheEntryType] = ActionCacheEntryType.ACTION_RESULT_DIGEST,
37 migrate_entries: Optional[bool] = False,
38 host: str='localhost',
39 port: int=6379, password: Optional[str]=None, db: int=0):
40 """ Initialises a new ActionCache instance using Redis.
41 Stores the `ActionResult` message as a value.
43 Args:
44 storage (StorageABC): storage backend instance to be used to store ActionResults.
45 allow_updates (bool): allow the client to write to storage
46 cache_failed_actions (bool): whether to store failed actions in the Action Cache
47 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests.
48 migrate_entries (bool): if set, migrate entries that contain a value with
49 a different `ActionCacheEntryType` to `entry_type` as they are accessed
50 (False by default).
51 host (str): Redis host (default: localhost)
52 port (int): Redis port (default: 6379)
53 password (str): Redis password
54 db (int): Redis database number
55 """
56 ActionCacheABC.__init__(self, storage=storage, allow_updates=allow_updates)
58 self._logger = logging.getLogger(__name__)
59 # Setting decode_responses=False fixes the folowing mypy error:
60 # Argument 1 to "FromString" of "Message" has incompatible type "str"; expected "ByteString"
61 self._client = redis.Redis(host=host, port=port, password=password, db=db, decode_responses=False)
62 self._cache_failed_actions = cache_failed_actions
64 self._entry_type = entry_type
65 self._migrate_entries = migrate_entries
67 @redis_client_exception_wrapper
68 def get_action_result(self, action_digest: Digest) -> ActionResult:
69 key = self._get_key(action_digest)
70 action_result = self._get_action_result(key, action_digest)
71 if action_result is not None:
72 if self._action_result_blobs_still_exist(action_result):
73 return action_result
75 if self._allow_updates:
76 self._logger.debug(f"Removing {action_digest.hash}/{action_digest.size_bytes}"
77 "from cache due to missing blobs in CAS")
78 self._client.delete(key)
80 raise NotFoundError(f"Key not found: [{key}]")
82 @redis_client_exception_wrapper
83 def update_action_result(self, action_digest: Digest,
84 action_result: ActionResult) -> None:
85 if not self._allow_updates:
86 raise NotImplementedError("Updating cache not allowed")
88 if self._cache_failed_actions or action_result.exit_code == 0:
89 action_result_digest = self._storage.put_message(action_result)
91 cache_key = self._get_key(action_digest)
92 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST:
93 self._client.set(cache_key, action_result_digest.SerializeToString())
94 else:
95 self._client.set(cache_key, action_result.SerializeToString())
97 self._logger.info(
98 f"Result cached for action [{action_digest.hash}/{action_digest.size_bytes}]")
100 def _get_key(self, action_digest: Digest) -> str:
101 return f'action-cache.{action_digest.hash}_{action_digest.size_bytes}'
103 def _get_action_result(self, key: str, action_digest: Digest) -> Optional[ActionResult]:
104 value_in_cache = self._client.get(key)
105 if value_in_cache is None:
106 return None
108 # Attempting to parse the entry as a `Digest` first:
109 action_result_digest = Digest.FromString(value_in_cache)
110 if len(action_result_digest.hash) == len(action_digest.hash):
111 # The cache contains the `Digest` of the `ActionResult`:
112 action_result = self._storage.get_message(action_result_digest,
113 ActionResult)
115 # If configured, update the entry to contain an `ActionResult`:
116 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries:
117 self._logger.debug(f"Converting entry for {action_digest.hash}/{action_digest.size_bytes} "
118 "from Digest to ActionResult")
119 self._client.set(key, action_result.SerializeToString())
120 else:
121 action_result = ActionResult.FromString(value_in_cache)
123 # If configured, update the entry to contain a `Digest`:
124 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries:
125 self._logger.debug(f"Converting entry for {action_digest.hash}/{action_digest.size_bytes} "
126 "from ActionResult to Digest")
127 action_result_digest = self._storage.put_message(action_result)
128 self._client.set(key, action_result_digest.SerializeToString())
130 return action_result