Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/remote.py: 37.86%

103 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17RemoteStorage 

18================== 

19 

20Forwwards storage requests to a remote storage. 

21""" 

22 

23import io 

24import logging 

25from tempfile import NamedTemporaryFile 

26 

27import buildgrid.server.context as context_module 

28from buildgrid._exceptions import GrpcUninitializedError, NotFoundError 

29from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc 

30from buildgrid._protos.google.rpc import code_pb2 

31from buildgrid._protos.google.rpc import status_pb2 

32from buildgrid.client.capabilities import CapabilitiesInterface 

33from buildgrid.client.cas import download, upload 

34from buildgrid.client.channel import setup_channel 

35from buildgrid.settings import HASH, MAX_IN_MEMORY_BLOB_SIZE_BYTES 

36from .storage_abc import StorageABC 

37 

38 

39class RemoteStorage(StorageABC): 

40 

41 def __init__(self, remote, instance_name, channel_options=None, tls_credentials=None, retries=0, max_backoff=64, 

42 request_timeout=None): 

43 self.__logger = logging.getLogger(__name__) 

44 

45 self.remote_instance_name = instance_name 

46 self._remote = remote 

47 self._channel_options = channel_options 

48 if tls_credentials is None: 

49 tls_credentials = {} 

50 self._tls_credentials = tls_credentials 

51 self.retries = retries 

52 self.max_backoff = max_backoff 

53 self._request_timeout = request_timeout 

54 

55 self._stub_cas = None 

56 self.channel = None 

57 

58 def setup_grpc(self): 

59 if self.channel is None: 

60 self.channel, _ = setup_channel( 

61 self._remote, 

62 auth_token=None, 

63 client_key=self._tls_credentials.get("tls-client-key"), 

64 client_cert=self._tls_credentials.get("tls-client-cert"), 

65 server_cert=self._tls_credentials.get("tls-server-cert"), 

66 timeout=self._request_timeout 

67 ) 

68 

69 if self._stub_cas is None: 

70 self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) 

71 

72 def get_capabilities(self): 

73 interface = CapabilitiesInterface(self.channel) 

74 capabilities = interface.get_capabilities(self.remote_instance_name) 

75 return capabilities.cache_capabilities 

76 

77 def has_blob(self, digest): 

78 self.__logger.debug(f"Checking for blob: [{digest}]") 

79 if not self.missing_blobs([digest]): 

80 return True 

81 return False 

82 

83 def get_blob(self, digest): 

84 if self.channel is None: 

85 raise GrpcUninitializedError("Remote CAS backend used before gRPC initialization.") 

86 

87 self.__logger.debug(f"Getting blob: [{digest}]") 

88 with download(self.channel, 

89 instance=self.remote_instance_name, 

90 retries=self.retries, 

91 max_backoff=self.max_backoff) as downloader: 

92 if digest.size_bytes > MAX_IN_MEMORY_BLOB_SIZE_BYTES: 

93 # Avoid storing the large blob completely in memory. 

94 temp_file = NamedTemporaryFile(delete=True) # pylint: disable=consider-using-with 

95 try: 

96 downloader.download_file(digest, temp_file.name, queue=False) 

97 except NotFoundError: 

98 return None 

99 reader = io.BufferedReader(temp_file) 

100 reader.seek(0) 

101 return reader 

102 else: 

103 blob = downloader.get_blob(digest) 

104 if blob is not None: 

105 return io.BytesIO(blob) 

106 else: 

107 return None 

108 

109 def delete_blob(self, digest): 

110 """ The REAPI doesn't have a deletion method, so we can't support 

111 deletion for remote storage. 

112 """ 

113 raise NotImplementedError( 

114 "Deletion is not supported for remote storage!") 

115 

116 def bulk_delete(self, digests): 

117 """ The REAPI doesn't have a deletion method, so we can't support 

118 bulk deletion for remote storage. 

119 """ 

120 raise NotImplementedError( 

121 " Bulk deletion is not supported for remote storage!") 

122 

123 def begin_write(self, digest): 

124 return io.BytesIO() 

125 

126 def commit_write(self, digest, write_session): 

127 if self.channel is None: 

128 raise GrpcUninitializedError("Remote CAS backend used before gRPC initialization.") 

129 

130 self.__logger.debug(f"Writing blob: [{digest}]") 

131 with upload(self.channel, 

132 instance=self.remote_instance_name, 

133 retries=self.retries, 

134 max_backoff=self.max_backoff) as uploader: 

135 uploader.put_blob(write_session.getvalue()) 

136 

137 def missing_blobs(self, digests): 

138 if self._stub_cas is None: 

139 raise GrpcUninitializedError("Remote CAS backend used before gRPC initialization.") 

140 

141 if len(digests) > 100: 

142 self.__logger.debug( 

143 f"Missing blobs request for: {digests[:100]} (truncated)") 

144 else: 

145 self.__logger.debug(f"Missing blobs request for: {digests}") 

146 request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self.remote_instance_name) 

147 

148 for blob in digests: 

149 request_digest = request.blob_digests.add() 

150 request_digest.hash = blob.hash 

151 request_digest.size_bytes = blob.size_bytes 

152 

153 response = (self._stub_cas.FindMissingBlobs(request, 

154 metadata=context_module.metadata_list())) 

155 

156 return response.missing_blob_digests 

157 

158 def bulk_update_blobs(self, blobs): 

159 if self._stub_cas is None: 

160 raise GrpcUninitializedError("Remote CAS backend used before gRPC initialization.") 

161 

162 sent_digests = [] 

163 with upload(self.channel, 

164 instance=self.remote_instance_name, 

165 retries=self.retries, 

166 max_backoff=self.max_backoff) as uploader: 

167 for digest, blob in blobs: 

168 if len(blob) != digest.size_bytes or HASH(blob).hexdigest() != digest.hash: 

169 sent_digests.append(remote_execution_pb2.Digest()) 

170 else: 

171 sent_digests.append(uploader.put_blob(blob, digest=digest, queue=True)) 

172 

173 assert len(sent_digests) == len(blobs) 

174 

175 return [status_pb2.Status(code=code_pb2.OK) if d.ByteSize() > 0 

176 else status_pb2.Status(code=code_pb2.UNKNOWN) for d in sent_digests] 

177 

178 def bulk_read_blobs(self, digests): 

179 if self._stub_cas is None: 

180 raise GrpcUninitializedError("Remote CAS backend used before gRPC initialization.") 

181 

182 self.__logger.debug(f"Bulk read blobs request for: {digests}") 

183 with download(self.channel, 

184 instance=self.remote_instance_name, 

185 retries=self.retries, 

186 max_backoff=self.max_backoff) as downloader: 

187 results = downloader.get_available_blobs(digests) 

188 # Transform List of (data, digest) pairs to expected hash-blob map 

189 return {result[1].hash: io.BytesIO(result[0]) for result in results}