Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/remote_cache.py: 94.92%

59 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-04-15 14:01 +0000

1# Copyright (C) 2019, 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Remote Action Cache 

18=================== 

19 

20Provides an interface to a remote Action Cache. This can be used by other 

21services (e.g. an Execution service) to communicate with a remote cache. 

22 

23It provides the same API as any other Action Cache instance backend. 

24 

25""" 

26 

27 

28import logging 

29from typing import Optional, Tuple 

30 

31import grpc 

32 

33from buildgrid._exceptions import GrpcUninitializedError, NotFoundError 

34from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

35from buildgrid.client.actioncache import ActionCacheClient 

36from buildgrid.client.authentication import ClientCredentials 

37from buildgrid.client.capabilities import CapabilitiesInterface 

38from buildgrid.client.channel import setup_channel 

39from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

40 

41LOGGER = logging.getLogger(__name__) 

42 

43 

44class RemoteActionCache(ActionCacheABC): 

45 def __init__( 

46 self, 

47 remote: str, 

48 instance_name: str, 

49 retries: int = 0, 

50 max_backoff: int = 64, 

51 request_timeout: Optional[float] = None, 

52 channel_options: Optional[Tuple[Tuple[str, str], ...]] = None, 

53 credentials: Optional[ClientCredentials] = None, 

54 ) -> None: 

55 """Initialises a new RemoteActionCache instance. 

56 

57 Args: 

58 remote (str): URL of the remote ActionCache service to open a 

59 channel to. 

60 instance_name (str): The instance name of the remote ActionCache 

61 service. 

62 channel_options (tuple): Optional tuple of channel options to set 

63 when opening the gRPC channel to the remote. 

64 credentials (dict): Optional credentials to use when opening 

65 the gRPC channel. If unset then an insecure channel will be 

66 used. 

67 

68 """ 

69 super().__init__() 

70 self._remote_instance_name = instance_name 

71 self._remote = remote 

72 self._channel_options = channel_options 

73 if credentials is None: 

74 credentials = {} 

75 self._credentials = credentials 

76 self._channel: Optional[grpc.Channel] = None 

77 self._allow_updates = None # type: ignore # TODO STOP THIS 

78 self._retries = retries 

79 self._max_backoff = max_backoff 

80 self._action_cache: Optional[ActionCacheClient] = None 

81 

82 def start(self) -> None: 

83 if self._channel is None: 

84 self._channel, _ = setup_channel( 

85 self._remote, 

86 auth_token=self._credentials.get("auth-token"), 

87 auth_token_refresh_seconds=self._credentials.get("token-refresh-seconds"), 

88 client_key=self._credentials.get("tls-client-key"), 

89 client_cert=self._credentials.get("tls-client-cert"), 

90 server_cert=self._credentials.get("tls-server-cert"), 

91 ) 

92 if self._action_cache is None: 

93 self._action_cache = ActionCacheClient( 

94 self._channel, self._remote_instance_name, self._retries, self._max_backoff 

95 ) 

96 

97 def stop(self) -> None: 

98 if self._channel: 

99 self._channel.close() 

100 

101 @property 

102 def allow_updates(self) -> bool: 

103 if self._channel is None: 

104 raise GrpcUninitializedError("Remote cache used before gRPC initialization.") 

105 

106 # Check if updates are allowed if we haven't already. 

107 # This is done the first time update_action_result is called rather 

108 # than on instantiation because the remote cache may not be running 

109 # when this object is instantiated. 

110 if self._allow_updates is None: 

111 interface = CapabilitiesInterface(self._channel) 

112 capabilities = interface.get_capabilities(self._remote_instance_name) 

113 self._allow_updates = capabilities.cache_capabilities.action_cache_update_capabilities.update_enabled 

114 return self._allow_updates 

115 

116 def get_action_result(self, action_digest: Digest) -> ActionResult: 

117 """Retrieves the cached result for an Action. 

118 

119 Queries the remote ActionCache service to retrieve the cached 

120 result for a given Action digest. If the remote cache doesn't 

121 contain a result for the Action, then ``None`` is returned. 

122 

123 Args: 

124 action_digest (Digest): The digest of the Action to retrieve the 

125 cached result of. 

126 

127 """ 

128 if self._action_cache is None: 

129 raise GrpcUninitializedError("Remote cache used before gRPC initialization.") 

130 

131 action_result = self._action_cache.get(action_digest) 

132 

133 if action_result is None: 

134 key = self._get_key(action_digest) 

135 raise NotFoundError(f"Key not found: {key}") 

136 return action_result 

137 

138 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

139 """Stores a result for an Action in the remote cache. 

140 

141 Sends an ``UpdateActionResult`` request to the remote ActionCache 

142 service, to store the result in the remote cache. 

143 

144 If the remote cache doesn't allow updates, then this raises a 

145 ``NotImplementedError``. 

146 

147 Args: 

148 action_digest (Digest): The digest of the Action whose result is 

149 being cached. 

150 action_result (ActionResult): The result to cache for the given 

151 Action digest. 

152 

153 """ 

154 if self._action_cache is None: 

155 raise GrpcUninitializedError("Remote cache used before gRPC initialization.") 

156 

157 if not self.allow_updates: 

158 raise NotImplementedError("Updating cache not allowed") 

159 

160 self._action_cache.update(action_digest, action_result) 

161 

162 def _get_key(self, action_digest: Digest) -> Tuple[str, int]: 

163 """Get a hashable cache key from a given Action digest. 

164 

165 Args: 

166 action_digest (Digest): The digest to produce a cache key for. 

167 

168 """ 

169 return (action_digest.hash, action_digest.size_bytes)