Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/remote.py: 89.13%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

92 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17RemoteStorage 

18================== 

19 

20Forwwards storage requests to a remote storage. 

21""" 

22 

23import io 

24import logging 

25from tempfile import NamedTemporaryFile 

26 

27from buildgrid._exceptions import GrpcUninitializedError 

28from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc 

29from buildgrid._protos.google.rpc import code_pb2 

30from buildgrid._protos.google.rpc import status_pb2 

31from buildgrid.client.capabilities import CapabilitiesInterface 

32from buildgrid.client.cas import download, upload 

33from buildgrid.client.channel import setup_channel 

34from buildgrid.settings import HASH, MAX_READ_FROM_MEMORY_BLOB_SIZE_BYTES 

35from .storage_abc import StorageABC 

36 

37 

38class RemoteStorage(StorageABC): 

39 

40 def __init__(self, remote, instance_name, channel_options=None, tls_credentials=None, retries=0, max_backoff=64): 

41 self.__logger = logging.getLogger(__name__) 

42 

43 self.remote_instance_name = instance_name 

44 self._remote = remote 

45 self._channel_options = channel_options 

46 if tls_credentials is None: 

47 tls_credentials = {} 

48 self._tls_credentials = tls_credentials 

49 self.retries = retries 

50 self.max_backoff = max_backoff 

51 

52 self._stub_cas = None 

53 self.channel = None 

54 

55 def setup_grpc(self): 

56 self.channel, _ = setup_channel( 

57 self._remote, 

58 auth_token=None, 

59 client_key=self._tls_credentials.get("tls-client-key"), 

60 client_cert=self._tls_credentials.get("tls-client-cert"), 

61 server_cert=self._tls_credentials.get("tls-server-cert") 

62 ) 

63 

64 self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) 

65 

66 def get_capabilities(self): 

67 interface = CapabilitiesInterface(self.channel) 

68 capabilities = interface.get_capabilities(self.remote_instance_name) 

69 return capabilities.cache_capabilities 

70 

71 def has_blob(self, digest): 

72 self.__logger.debug(f"Checking for blob: [{digest}]") 

73 if not self.missing_blobs([digest]): 

74 return True 

75 return False 

76 

77 def get_blob(self, digest): 

78 if self.channel is None: 

79 raise GrpcUninitializedError("Remote CAS backend used before gRPC initialization.") 

80 

81 self.__logger.debug(f"Getting blob: [{digest}]") 

82 with download(self.channel, 

83 instance=self.remote_instance_name, 

84 retries=self.retries, 

85 max_backoff=self.max_backoff) as downloader: 

86 if digest.size_bytes > MAX_READ_FROM_MEMORY_BLOB_SIZE_BYTES: 

87 # Avoid storing the large blob completely in memory. 

88 temp_file = NamedTemporaryFile(delete=True) # pylint: disable=consider-using-with 

89 try: 

90 downloader.download_file(digest, temp_file.name, queue=False) 

91 except FileNotFoundError: 

92 return None 

93 reader = io.BufferedReader(temp_file) 

94 reader.seek(0) 

95 return reader 

96 else: 

97 blob = downloader.get_blob(digest) 

98 if blob is not None: 

99 return io.BytesIO(blob) 

100 else: 

101 return None 

102 

103 def delete_blob(self, digest): 

104 """ The REAPI doesn't have a deletion method, so we can't support 

105 deletion for remote storage. 

106 """ 

107 raise NotImplementedError( 

108 "Deletion is not supported for remote storage!") 

109 

110 def bulk_delete(self, digests): 

111 """ The REAPI doesn't have a deletion method, so we can't support 

112 bulk deletion for remote storage. 

113 """ 

114 raise NotImplementedError( 

115 " Bulk deletion is not supported for remote storage!") 

116 

117 def begin_write(self, digest): 

118 return io.BytesIO() 

119 

120 def commit_write(self, digest, write_session): 

121 if self.channel is None: 

122 raise GrpcUninitializedError("Remote CAS backend used before gRPC initialization.") 

123 

124 self.__logger.debug(f"Writing blob: [{digest}]") 

125 with upload(self.channel, 

126 instance=self.remote_instance_name, 

127 retries=self.retries, 

128 max_backoff=self.max_backoff) as uploader: 

129 uploader.put_blob(write_session.getvalue()) 

130 

131 def missing_blobs(self, digests): 

132 if self._stub_cas is None: 

133 raise GrpcUninitializedError("Remote CAS backend used before gRPC initialization.") 

134 

135 if len(digests) > 100: 

136 self.__logger.debug( 

137 f"Missing blobs request for: {digests[:100]} (truncated)") 

138 else: 

139 self.__logger.debug(f"Missing blobs request for: {digests}") 

140 request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self.remote_instance_name) 

141 

142 for blob in digests: 

143 request_digest = request.blob_digests.add() 

144 request_digest.hash = blob.hash 

145 request_digest.size_bytes = blob.size_bytes 

146 

147 response = self._stub_cas.FindMissingBlobs(request) 

148 

149 return response.missing_blob_digests 

150 

151 def bulk_update_blobs(self, blobs): 

152 if self._stub_cas is None: 

153 raise GrpcUninitializedError("Remote CAS backend used before gRPC initialization.") 

154 

155 sent_digests = [] 

156 with upload(self.channel, 

157 instance=self.remote_instance_name, 

158 retries=self.retries, 

159 max_backoff=self.max_backoff) as uploader: 

160 for digest, blob in blobs: 

161 if len(blob) != digest.size_bytes or HASH(blob).hexdigest() != digest.hash: 

162 sent_digests.append(remote_execution_pb2.Digest()) 

163 else: 

164 sent_digests.append(uploader.put_blob(blob, digest=digest, queue=True)) 

165 

166 assert len(sent_digests) == len(blobs) 

167 

168 return [status_pb2.Status(code=code_pb2.OK) if d.ByteSize() > 0 

169 else status_pb2.Status(code=code_pb2.UNKNOWN) for d in sent_digests]