Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/remote_cache.py: 31.48%

54 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2019, 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Remote Action Cache 

18=================== 

19 

20Provides an interface to a remote Action Cache. This can be used by other 

21services (e.g. an Execution service) to communicate with a remote cache. 

22 

23It provides the same API as any other Action Cache instance backend. 

24 

25""" 

26 

27 

28import logging 

29from typing import Dict, Optional, Tuple 

30 

31from buildgrid._exceptions import GrpcUninitializedError, NotFoundError 

32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

33 ActionResult, 

34 Digest 

35) 

36from buildgrid.client.actioncache import ActionCacheClient 

37from buildgrid.client.capabilities import CapabilitiesInterface 

38from buildgrid.client.channel import setup_channel 

39from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

40 

41 

42class RemoteActionCache(ActionCacheABC): 

43 

44 def __init__( 

45 self, 

46 remote: str, 

47 instance_name: str, 

48 retries=0, 

49 max_backoff=64, 

50 request_timeout: Optional[float] = None, 

51 channel_options: Optional[Tuple[Tuple[str, str], ...]]=None, 

52 tls_credentials: Optional[Dict[str, str]]=None 

53 ): 

54 """Initialises a new RemoteActionCache instance. 

55 

56 Args: 

57 remote (str): URL of the remote ActionCache service to open a 

58 channel to. 

59 instance_name (str): The instance name of the remote ActionCache 

60 service. 

61 channel_options (tuple): Optional tuple of channel options to set 

62 when opening the gRPC channel to the remote. 

63 tls_credentials (dict): Optional credentials to use when opening 

64 the gRPC channel. If unset then an insecure channel will be 

65 used. 

66 

67 """ 

68 ActionCacheABC.__init__(self) 

69 self._logger = logging.getLogger(__name__) 

70 self._remote_instance_name = instance_name 

71 self._remote = remote 

72 self._channel_options = channel_options 

73 if tls_credentials is None: 

74 tls_credentials = {} 

75 self._tls_credentials = tls_credentials 

76 self._channel = None 

77 self._allow_updates = None 

78 self._retries = retries 

79 self._max_backoff = max_backoff 

80 self._action_cache = None 

81 

82 def setup_grpc(self): 

83 if self._channel is None: 

84 self._channel, _ = setup_channel( 

85 self._remote, 

86 auth_token=None, 

87 client_key=self._tls_credentials.get("tls-client-key"), 

88 client_cert=self._tls_credentials.get("tls-client-cert"), 

89 server_cert=self._tls_credentials.get("tls-server-cert") 

90 ) 

91 if self._action_cache is None: 

92 self._action_cache = ActionCacheClient( 

93 self._channel, self._remote_instance_name, 

94 self._retries, self._max_backoff) 

95 

96 @property 

97 def allow_updates(self) -> bool: 

98 if self._channel is None: 

99 raise GrpcUninitializedError("Remote cache used before gRPC initialization.") 

100 

101 # Check if updates are allowed if we haven't already. 

102 # This is done the first time update_action_result is called rather 

103 # than on instantiation because the remote cache may not be running 

104 # when this object is instantiated. 

105 if self._allow_updates is None: 

106 interface = CapabilitiesInterface(self._channel) 

107 capabilities = interface.get_capabilities(self._remote_instance_name) 

108 self._allow_updates = (capabilities 

109 .cache_capabilities 

110 .action_cache_update_capabilities 

111 .update_enabled) 

112 return self._allow_updates 

113 

114 def get_action_result(self, action_digest: Digest) -> ActionResult: 

115 """Retrieves the cached result for an Action. 

116 

117 Queries the remote ActionCache service to retrieve the cached 

118 result for a given Action digest. If the remote cache doesn't 

119 contain a result for the Action, then ``None`` is returned. 

120 

121 Args: 

122 action_digest (Digest): The digest of the Action to retrieve the 

123 cached result of. 

124 

125 """ 

126 if self._action_cache is None: 

127 raise GrpcUninitializedError("Remote cache used before gRPC initialization.") 

128 

129 action_result = self._action_cache.get(action_digest) 

130 

131 if action_result is None: 

132 key = self._get_key(action_digest) 

133 raise NotFoundError(f"Key not found: {key}") 

134 return action_result 

135 

136 def update_action_result(self, action_digest: Digest, 

137 action_result: ActionResult) -> None: 

138 """Stores a result for an Action in the remote cache. 

139 

140 Sends an ``UpdateActionResult`` request to the remote ActionCache 

141 service, to store the result in the remote cache. 

142 

143 If the remote cache doesn't allow updates, then this raises a 

144 ``NotImplementedError``. 

145 

146 Args: 

147 action_digest (Digest): The digest of the Action whose result is 

148 being cached. 

149 action_result (ActionResult): The result to cache for the given 

150 Action digest. 

151 

152 """ 

153 if self._action_cache is None: 

154 raise GrpcUninitializedError("Remote cache used before gRPC initialization.") 

155 

156 if not self.allow_updates: 

157 raise NotImplementedError("Updating cache not allowed") 

158 return self._action_cache.update(action_digest, action_result) 

159 

160 def _get_key(self, action_digest): 

161 """Get a hashable cache key from a given Action digest. 

162 

163 Args: 

164 action_digest (Digest): The digest to produce a cache key for. 

165 

166 """ 

167 return (action_digest.hash, action_digest.size_bytes)