Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17RemoteStorage 

18================== 

19 

20Forwwards storage requests to a remote storage. 

21""" 

22 

23import io 

24import logging 

25 

26from buildgrid.client.cas import download, upload 

27from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc 

28from buildgrid._protos.google.rpc import code_pb2 

29from buildgrid._protos.google.rpc import status_pb2 

30from buildgrid.settings import HASH 

31 

32from .storage_abc import StorageABC 

33 

34 

35class RemoteStorage(StorageABC): 

36 

37 def __init__(self, channel, instance_name): 

38 self.__logger = logging.getLogger(__name__) 

39 

40 self.instance_name = instance_name 

41 self.channel = channel 

42 

43 self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel) 

44 

45 def has_blob(self, digest): 

46 self.__logger.debug(f"Checking for blob: [{digest}]") 

47 if not self.missing_blobs([digest]): 

48 return True 

49 return False 

50 

51 def get_blob(self, digest): 

52 self.__logger.debug(f"Getting blob: [{digest}]") 

53 with download(self.channel, instance=self.instance_name) as downloader: 

54 blob = downloader.get_blob(digest) 

55 if blob is not None: 

56 return io.BytesIO(blob) 

57 else: 

58 return None 

59 

60 def delete_blob(self, digest): 

61 """ The REAPI doesn't have a deletion method, so we can't support 

62 deletion for remote storage. 

63 """ 

64 raise NotImplementedError( 

65 "Deletion is not supported for remote storage!") 

66 

67 def bulk_delete(self, digests): 

68 """ The REAPI doesn't have a deletion method, so we can't support 

69 bulk deletion for remote storage. 

70 """ 

71 raise NotImplementedError( 

72 " Bulk deletion is not supported for remote storage!") 

73 

74 def begin_write(self, digest): 

75 return io.BytesIO() 

76 

77 def commit_write(self, digest, write_session): 

78 self.__logger.debug(f"Writing blob: [{digest}]") 

79 with upload(self.channel, instance=self.instance_name) as uploader: 

80 uploader.put_blob(write_session.getvalue()) 

81 

82 def missing_blobs(self, blobs): 

83 if len(blobs) > 100: 

84 self.__logger.debug( 

85 f"Missing blobs request for: {blobs[:100]} (truncated)") 

86 else: 

87 self.__logger.debug(f"Missing blobs request for: {blobs}") 

88 request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self.instance_name) 

89 

90 for blob in blobs: 

91 request_digest = request.blob_digests.add() 

92 request_digest.hash = blob.hash 

93 request_digest.size_bytes = blob.size_bytes 

94 

95 response = self._stub_cas.FindMissingBlobs(request) 

96 

97 return [x for x in response.missing_blob_digests] 

98 

99 def bulk_update_blobs(self, blobs): 

100 sent_digests = [] 

101 with upload(self.channel, instance=self.instance_name) as uploader: 

102 for digest, blob in blobs: 

103 if len(blob) != digest.size_bytes or HASH(blob).hexdigest() != digest.hash: 

104 sent_digests.append(remote_execution_pb2.Digest()) 

105 else: 

106 sent_digests.append(uploader.put_blob(blob, digest=digest, queue=True)) 

107 

108 assert len(sent_digests) == len(blobs) 

109 

110 return [status_pb2.Status(code=code_pb2.OK) if d.ByteSize() > 0 

111 else status_pb2.Status(code=code_pb2.UNKNOWN) for d in sent_digests]