Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/remote_cache.py: 31.48%
54 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2019, 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17Remote Action Cache
18===================
20Provides an interface to a remote Action Cache. This can be used by other
21services (e.g. an Execution service) to communicate with a remote cache.
23It provides the same API as any other Action Cache instance backend.
25"""
28import logging
29from typing import Dict, Optional, Tuple
31from buildgrid._exceptions import GrpcUninitializedError, NotFoundError
32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import (
33 ActionResult,
34 Digest
35)
36from buildgrid.client.actioncache import ActionCacheClient
37from buildgrid.client.capabilities import CapabilitiesInterface
38from buildgrid.client.channel import setup_channel
39from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
42class RemoteActionCache(ActionCacheABC):
44 def __init__(
45 self,
46 remote: str,
47 instance_name: str,
48 retries=0,
49 max_backoff=64,
50 request_timeout: Optional[float] = None,
51 channel_options: Optional[Tuple[Tuple[str, str], ...]]=None,
52 tls_credentials: Optional[Dict[str, str]]=None
53 ):
54 """Initialises a new RemoteActionCache instance.
56 Args:
57 remote (str): URL of the remote ActionCache service to open a
58 channel to.
59 instance_name (str): The instance name of the remote ActionCache
60 service.
61 channel_options (tuple): Optional tuple of channel options to set
62 when opening the gRPC channel to the remote.
63 tls_credentials (dict): Optional credentials to use when opening
64 the gRPC channel. If unset then an insecure channel will be
65 used.
67 """
68 ActionCacheABC.__init__(self)
69 self._logger = logging.getLogger(__name__)
70 self._remote_instance_name = instance_name
71 self._remote = remote
72 self._channel_options = channel_options
73 if tls_credentials is None:
74 tls_credentials = {}
75 self._tls_credentials = tls_credentials
76 self._channel = None
77 self._allow_updates = None
78 self._retries = retries
79 self._max_backoff = max_backoff
80 self._action_cache = None
82 def setup_grpc(self):
83 if self._channel is None:
84 self._channel, _ = setup_channel(
85 self._remote,
86 auth_token=None,
87 client_key=self._tls_credentials.get("tls-client-key"),
88 client_cert=self._tls_credentials.get("tls-client-cert"),
89 server_cert=self._tls_credentials.get("tls-server-cert")
90 )
91 if self._action_cache is None:
92 self._action_cache = ActionCacheClient(
93 self._channel, self._remote_instance_name,
94 self._retries, self._max_backoff)
96 @property
97 def allow_updates(self) -> bool:
98 if self._channel is None:
99 raise GrpcUninitializedError("Remote cache used before gRPC initialization.")
101 # Check if updates are allowed if we haven't already.
102 # This is done the first time update_action_result is called rather
103 # than on instantiation because the remote cache may not be running
104 # when this object is instantiated.
105 if self._allow_updates is None:
106 interface = CapabilitiesInterface(self._channel)
107 capabilities = interface.get_capabilities(self._remote_instance_name)
108 self._allow_updates = (capabilities
109 .cache_capabilities
110 .action_cache_update_capabilities
111 .update_enabled)
112 return self._allow_updates
114 def get_action_result(self, action_digest: Digest) -> ActionResult:
115 """Retrieves the cached result for an Action.
117 Queries the remote ActionCache service to retrieve the cached
118 result for a given Action digest. If the remote cache doesn't
119 contain a result for the Action, then ``None`` is returned.
121 Args:
122 action_digest (Digest): The digest of the Action to retrieve the
123 cached result of.
125 """
126 if self._action_cache is None:
127 raise GrpcUninitializedError("Remote cache used before gRPC initialization.")
129 action_result = self._action_cache.get(action_digest)
131 if action_result is None:
132 key = self._get_key(action_digest)
133 raise NotFoundError(f"Key not found: {key}")
134 return action_result
136 def update_action_result(self, action_digest: Digest,
137 action_result: ActionResult) -> None:
138 """Stores a result for an Action in the remote cache.
140 Sends an ``UpdateActionResult`` request to the remote ActionCache
141 service, to store the result in the remote cache.
143 If the remote cache doesn't allow updates, then this raises a
144 ``NotImplementedError``.
146 Args:
147 action_digest (Digest): The digest of the Action whose result is
148 being cached.
149 action_result (ActionResult): The result to cache for the given
150 Action digest.
152 """
153 if self._action_cache is None:
154 raise GrpcUninitializedError("Remote cache used before gRPC initialization.")
156 if not self.allow_updates:
157 raise NotImplementedError("Updating cache not allowed")
158 return self._action_cache.update(action_digest, action_result)
160 def _get_key(self, action_digest):
161 """Get a hashable cache key from a given Action digest.
163 Args:
164 action_digest (Digest): The digest to produce a cache key for.
166 """
167 return (action_digest.hash, action_digest.size_bytes)