Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/remote_cache.py: 94.92%
59 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
1# Copyright (C) 2019, 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17Remote Action Cache
18===================
20Provides an interface to a remote Action Cache. This can be used by other
21services (e.g. an Execution service) to communicate with a remote cache.
23It provides the same API as any other Action Cache instance backend.
25"""
28import logging
29from typing import Optional, Tuple
31import grpc
33from buildgrid._exceptions import GrpcUninitializedError, NotFoundError
34from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest
35from buildgrid.client.actioncache import ActionCacheClient
36from buildgrid.client.authentication import ClientCredentials
37from buildgrid.client.capabilities import CapabilitiesInterface
38from buildgrid.client.channel import setup_channel
39from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
41LOGGER = logging.getLogger(__name__)
44class RemoteActionCache(ActionCacheABC):
45 def __init__(
46 self,
47 remote: str,
48 instance_name: str,
49 retries: int = 0,
50 max_backoff: int = 64,
51 request_timeout: Optional[float] = None,
52 channel_options: Optional[Tuple[Tuple[str, str], ...]] = None,
53 credentials: Optional[ClientCredentials] = None,
54 ) -> None:
55 """Initialises a new RemoteActionCache instance.
57 Args:
58 remote (str): URL of the remote ActionCache service to open a
59 channel to.
60 instance_name (str): The instance name of the remote ActionCache
61 service.
62 channel_options (tuple): Optional tuple of channel options to set
63 when opening the gRPC channel to the remote.
64 credentials (dict): Optional credentials to use when opening
65 the gRPC channel. If unset then an insecure channel will be
66 used.
68 """
69 super().__init__()
70 self._remote_instance_name = instance_name
71 self._remote = remote
72 self._channel_options = channel_options
73 if credentials is None:
74 credentials = {}
75 self._credentials = credentials
76 self._channel: Optional[grpc.Channel] = None
77 self._allow_updates = None # type: ignore # TODO STOP THIS
78 self._retries = retries
79 self._max_backoff = max_backoff
80 self._action_cache: Optional[ActionCacheClient] = None
82 def start(self) -> None:
83 if self._channel is None:
84 self._channel, _ = setup_channel(
85 self._remote,
86 auth_token=self._credentials.get("auth-token"),
87 auth_token_refresh_seconds=self._credentials.get("token-refresh-seconds"),
88 client_key=self._credentials.get("tls-client-key"),
89 client_cert=self._credentials.get("tls-client-cert"),
90 server_cert=self._credentials.get("tls-server-cert"),
91 )
92 if self._action_cache is None:
93 self._action_cache = ActionCacheClient(
94 self._channel, self._remote_instance_name, self._retries, self._max_backoff
95 )
97 def stop(self) -> None:
98 if self._channel:
99 self._channel.close()
101 @property
102 def allow_updates(self) -> bool:
103 if self._channel is None:
104 raise GrpcUninitializedError("Remote cache used before gRPC initialization.")
106 # Check if updates are allowed if we haven't already.
107 # This is done the first time update_action_result is called rather
108 # than on instantiation because the remote cache may not be running
109 # when this object is instantiated.
110 if self._allow_updates is None:
111 interface = CapabilitiesInterface(self._channel)
112 capabilities = interface.get_capabilities(self._remote_instance_name)
113 self._allow_updates = capabilities.cache_capabilities.action_cache_update_capabilities.update_enabled
114 return self._allow_updates
116 def get_action_result(self, action_digest: Digest) -> ActionResult:
117 """Retrieves the cached result for an Action.
119 Queries the remote ActionCache service to retrieve the cached
120 result for a given Action digest. If the remote cache doesn't
121 contain a result for the Action, then ``None`` is returned.
123 Args:
124 action_digest (Digest): The digest of the Action to retrieve the
125 cached result of.
127 """
128 if self._action_cache is None:
129 raise GrpcUninitializedError("Remote cache used before gRPC initialization.")
131 action_result = self._action_cache.get(action_digest)
133 if action_result is None:
134 key = self._get_key(action_digest)
135 raise NotFoundError(f"Key not found: {key}")
136 return action_result
138 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None:
139 """Stores a result for an Action in the remote cache.
141 Sends an ``UpdateActionResult`` request to the remote ActionCache
142 service, to store the result in the remote cache.
144 If the remote cache doesn't allow updates, then this raises a
145 ``NotImplementedError``.
147 Args:
148 action_digest (Digest): The digest of the Action whose result is
149 being cached.
150 action_result (ActionResult): The result to cache for the given
151 Action digest.
153 """
154 if self._action_cache is None:
155 raise GrpcUninitializedError("Remote cache used before gRPC initialization.")
157 if not self.allow_updates:
158 raise NotImplementedError("Updating cache not allowed")
160 self._action_cache.update(action_digest, action_result)
162 def _get_key(self, action_digest: Digest) -> Tuple[str, int]:
163 """Get a hashable cache key from a given Action digest.
165 Args:
166 action_digest (Digest): The digest to produce a cache key for.
168 """
169 return (action_digest.hash, action_digest.size_bytes)