Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/remote_cache.py: 95.24%

63 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2019, 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Remote Action Cache 

18=================== 

19 

20Provides an interface to a remote Action Cache. This can be used by other 

21services (e.g. an Execution service) to communicate with a remote cache. 

22 

23It provides the same API as any other Action Cache instance backend. 

24 

25""" 

26 

27 

28from typing import Optional, Tuple 

29 

30import grpc 

31 

32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

33from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

34from buildgrid.server.client.actioncache import ActionCacheClient 

35from buildgrid.server.client.authentication import ClientCredentials 

36from buildgrid.server.client.capabilities import CapabilitiesInterface 

37from buildgrid.server.client.channel import setup_channel 

38from buildgrid.server.context import current_instance 

39from buildgrid.server.exceptions import GrpcUninitializedError, NotFoundError 

40 

41 

42class RemoteActionCache(ActionCacheABC): 

43 def __init__( 

44 self, 

45 remote: str, 

46 instance_name: Optional[str] = None, 

47 retries: int = 0, 

48 max_backoff: int = 64, 

49 request_timeout: Optional[float] = None, 

50 channel_options: Optional[Tuple[Tuple[str, str], ...]] = None, 

51 credentials: Optional[ClientCredentials] = None, 

52 ) -> None: 

53 """Initialises a new RemoteActionCache instance. 

54 

55 Args: 

56 remote (str): URL of the remote ActionCache service to open a 

57 channel to. 

58 instance_name (Optional[str]): The instance name of the remote ActionCache 

59 service. If none, uses the current instance context. 

60 channel_options (tuple): Optional tuple of channel options to set 

61 when opening the gRPC channel to the remote. 

62 credentials (dict): Optional credentials to use when opening 

63 the gRPC channel. If unset then an insecure channel will be 

64 used. 

65 

66 """ 

67 super().__init__() 

68 self._remote_instance_name = instance_name 

69 self._remote = remote 

70 self._channel_options = channel_options 

71 if credentials is None: 

72 credentials = {} 

73 self._credentials = credentials 

74 self._channel: Optional[grpc.Channel] = None 

75 self._allow_updates = None # type: ignore # TODO STOP THIS 

76 self._retries = retries 

77 self._max_backoff = max_backoff 

78 self._action_cache: Optional[ActionCacheClient] = None 

79 

80 def start(self) -> None: 

81 if self._channel is None: 

82 self._channel, _ = setup_channel( 

83 self._remote, 

84 auth_token=self._credentials.get("auth-token"), 

85 auth_token_refresh_seconds=self._credentials.get("token-refresh-seconds"), 

86 client_key=self._credentials.get("tls-client-key"), 

87 client_cert=self._credentials.get("tls-client-cert"), 

88 server_cert=self._credentials.get("tls-server-cert"), 

89 ) 

90 if self._action_cache is None: 

91 self._action_cache = ActionCacheClient(self._channel, self._retries, self._max_backoff) 

92 

93 def stop(self) -> None: 

94 if self._channel: 

95 self._channel.close() 

96 

97 @property 

98 def remote_instance_name(self) -> str: 

99 if self._remote_instance_name is not None: 

100 return self._remote_instance_name 

101 return current_instance() 

102 

103 @property 

104 def allow_updates(self) -> bool: 

105 if self._channel is None: 

106 raise GrpcUninitializedError("Remote cache used before gRPC initialization.") 

107 

108 # Check if updates are allowed if we haven't already. 

109 # This is done the first time update_action_result is called rather 

110 # than on instantiation because the remote cache may not be running 

111 # when this object is instantiated. 

112 if self._allow_updates is None: 

113 interface = CapabilitiesInterface(self._channel) 

114 capabilities = interface.get_capabilities(self.remote_instance_name) 

115 self._allow_updates = capabilities.cache_capabilities.action_cache_update_capabilities.update_enabled 

116 return self._allow_updates 

117 

118 def get_action_result(self, action_digest: Digest) -> ActionResult: 

119 """Retrieves the cached result for an Action. 

120 

121 Queries the remote ActionCache service to retrieve the cached 

122 result for a given Action digest. If the remote cache doesn't 

123 contain a result for the Action, then ``None`` is returned. 

124 

125 Args: 

126 action_digest (Digest): The digest of the Action to retrieve the 

127 cached result of. 

128 

129 """ 

130 if self._action_cache is None: 

131 raise GrpcUninitializedError("Remote cache used before gRPC initialization.") 

132 

133 action_result = self._action_cache.get(self.remote_instance_name, action_digest) 

134 

135 if action_result is None: 

136 key = self._get_key(action_digest) 

137 raise NotFoundError(f"Key not found: {key}") 

138 return action_result 

139 

140 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

141 """Stores a result for an Action in the remote cache. 

142 

143 Sends an ``UpdateActionResult`` request to the remote ActionCache 

144 service, to store the result in the remote cache. 

145 

146 If the remote cache doesn't allow updates, then this raises a 

147 ``NotImplementedError``. 

148 

149 Args: 

150 action_digest (Digest): The digest of the Action whose result is 

151 being cached. 

152 action_result (ActionResult): The result to cache for the given 

153 Action digest. 

154 

155 """ 

156 if self._action_cache is None: 

157 raise GrpcUninitializedError("Remote cache used before gRPC initialization.") 

158 

159 if not self.allow_updates: 

160 raise NotImplementedError("Updating cache not allowed") 

161 

162 self._action_cache.update(self.remote_instance_name, action_digest, action_result) 

163 

164 def _get_key(self, action_digest: Digest) -> Tuple[str, int]: 

165 """Get a hashable cache key from a given Action digest. 

166 

167 Args: 

168 action_digest (Digest): The digest to produce a cache key for. 

169 

170 """ 

171 return (action_digest.hash, action_digest.size_bytes)