Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/remote_cache.py: 95.16%
62 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
1# Copyright (C) 2019, 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17Remote Action Cache
18===================
20Provides an interface to a remote Action Cache. This can be used by other
21services (e.g. an Execution service) to communicate with a remote cache.
23It provides the same API as any other Action Cache instance backend.
25"""
27import grpc
29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest
30from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
31from buildgrid.server.client.actioncache import ActionCacheClient
32from buildgrid.server.client.authentication import ClientCredentials
33from buildgrid.server.client.capabilities import CapabilitiesInterface
34from buildgrid.server.client.channel import setup_channel
35from buildgrid.server.context import current_instance
36from buildgrid.server.exceptions import GrpcUninitializedError, NotFoundError
39class RemoteActionCache(ActionCacheABC):
40 def __init__(
41 self,
42 remote: str,
43 instance_name: str | None = None,
44 retries: int = 0,
45 max_backoff: int = 64,
46 request_timeout: float | None = None,
47 channel_options: tuple[tuple[str, str], ...] | None = None,
48 credentials: ClientCredentials | None = None,
49 ) -> None:
50 """Initialises a new RemoteActionCache instance.
52 Args:
53 remote (str): URL of the remote ActionCache service to open a
54 channel to.
55 instance_name (str | None): The instance name of the remote ActionCache
56 service. If none, uses the current instance context.
57 channel_options (tuple): Optional tuple of channel options to set
58 when opening the gRPC channel to the remote.
59 credentials (dict): Optional credentials to use when opening
60 the gRPC channel. If unset then an insecure channel will be
61 used.
63 """
64 super().__init__()
65 self._remote_instance_name = instance_name
66 self._remote = remote
67 self._channel_options = channel_options
68 if credentials is None:
69 credentials = {}
70 self._credentials = credentials
71 self._channel: grpc.Channel | None = None
72 self._allow_updates = None # type: ignore # TODO STOP THIS
73 self._retries = retries
74 self._max_backoff = max_backoff
75 self._action_cache: ActionCacheClient | None = None
77 def start(self) -> None:
78 if self._channel is None:
79 self._channel, _ = setup_channel(
80 self._remote,
81 auth_token=self._credentials.get("auth-token"),
82 auth_token_refresh_seconds=self._credentials.get("token-refresh-seconds"),
83 client_key=self._credentials.get("tls-client-key"),
84 client_cert=self._credentials.get("tls-client-cert"),
85 server_cert=self._credentials.get("tls-server-cert"),
86 )
87 if self._action_cache is None:
88 self._action_cache = ActionCacheClient(self._channel, self._retries, self._max_backoff)
90 def stop(self) -> None:
91 if self._channel:
92 self._channel.close()
94 @property
95 def remote_instance_name(self) -> str:
96 if self._remote_instance_name is not None:
97 return self._remote_instance_name
98 return current_instance()
100 @property
101 def allow_updates(self) -> bool:
102 if self._channel is None:
103 raise GrpcUninitializedError("Remote cache used before gRPC initialization.")
105 # Check if updates are allowed if we haven't already.
106 # This is done the first time update_action_result is called rather
107 # than on instantiation because the remote cache may not be running
108 # when this object is instantiated.
109 if self._allow_updates is None:
110 interface = CapabilitiesInterface(self._channel)
111 capabilities = interface.get_capabilities(self.remote_instance_name)
112 self._allow_updates = capabilities.cache_capabilities.action_cache_update_capabilities.update_enabled
113 return self._allow_updates
115 def get_action_result(self, action_digest: Digest) -> ActionResult:
116 """Retrieves the cached result for an Action.
118 Queries the remote ActionCache service to retrieve the cached
119 result for a given Action digest. If the remote cache doesn't
120 contain a result for the Action, then ``NotFoundError`` is raised.
122 Args:
123 action_digest (Digest): The digest of the Action to retrieve the
124 cached result of.
126 """
127 if self._action_cache is None:
128 raise GrpcUninitializedError("Remote cache used before gRPC initialization.")
130 action_result = self._action_cache.get(self.remote_instance_name, action_digest)
132 if action_result is None:
133 key = self._get_key(action_digest)
134 raise NotFoundError(f"Key not found: {key}")
135 return action_result
137 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None:
138 """Stores a result for an Action in the remote cache.
140 Sends an ``UpdateActionResult`` request to the remote ActionCache
141 service, to store the result in the remote cache.
143 If the remote cache doesn't allow updates, then this raises a
144 ``NotImplementedError``.
146 Args:
147 action_digest (Digest): The digest of the Action whose result is
148 being cached.
149 action_result (ActionResult): The result to cache for the given
150 Action digest.
152 """
153 if self._action_cache is None:
154 raise GrpcUninitializedError("Remote cache used before gRPC initialization.")
156 if not self.allow_updates:
157 raise NotImplementedError("Updating cache not allowed")
159 self._action_cache.update(self.remote_instance_name, action_digest, action_result)
161 def _get_key(self, action_digest: Digest) -> tuple[str, int]:
162 """Get a hashable cache key from a given Action digest.
164 Args:
165 action_digest (Digest): The digest to produce a cache key for.
167 """
168 return (action_digest.hash, action_digest.size_bytes)