Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17StorageABC 

18================== 

19 

20The abstract base class for storage providers. 

21""" 

22 

23import abc 

24from typing import Optional 

25 

26from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

27from buildgrid._protos.google.rpc.status_pb2 import Status 

28from buildgrid._protos.google.rpc import code_pb2 

29from buildgrid.server.monitoring import get_monitoring_bus 

30 

31from ....settings import HASH 

32 

33 

34class StorageABC(abc.ABC): 

35 

36 @abc.abstractmethod 

37 def has_blob(self, digest): 

38 """Return True if the blob with the given instance/digest exists.""" 

39 raise NotImplementedError() 

40 

41 @abc.abstractmethod 

42 def get_blob(self, digest): 

43 """Return a file-like object containing the blob. Most implementations 

44 will read the entire file into memory and return a `BytesIO` object. 

45 Eventually this should be corrected to handle files which cannot fit 

46 into memory. 

47 

48 If the blob isn't present in storage, return None. 

49 """ 

50 raise NotImplementedError() 

51 

52 @abc.abstractmethod 

53 def delete_blob(self, digest): 

54 """Delete the blob from storage if it's present.""" 

55 

56 @abc.abstractmethod 

57 def begin_write(self, digest): 

58 """Return a file-like object to which a blob's contents could be 

59 written. 

60 """ 

61 raise NotImplementedError() 

62 

63 @abc.abstractmethod 

64 def commit_write(self, digest, write_session): 

65 """Commit the write operation. `write_session` must be an object 

66 returned by `begin_write`. 

67 

68 The storage object is not responsible for verifying that the data 

69 written to the write_session actually matches the digest. The caller 

70 must do that. 

71 """ 

72 raise NotImplementedError() 

73 

74 def bulk_delete(self, digests): 

75 """Delete a list of blobs from storage.""" 

76 for digest in digests: 

77 self.delete_blob(digest) 

78 

79 def missing_blobs(self, digests): 

80 """Return a container containing the blobs not present in CAS.""" 

81 result = [] 

82 for digest in digests: 

83 if not self.has_blob(digest): 

84 result.append(digest) 

85 return result 

86 

87 def bulk_update_blobs(self, blobs): 

88 """Given a container of (digest, value) tuples, add all the blobs 

89 to CAS. Return a list of Status objects corresponding to the 

90 result of uploading each of the blobs. 

91 

92 Unlike in `commit_write`, the storage object will verify that each of 

93 the digests matches the provided data. 

94 """ 

95 result = [] 

96 for digest, data in blobs: 

97 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

98 result.append( 

99 Status( 

100 code=code_pb2.INVALID_ARGUMENT, 

101 message="Data doesn't match hash", 

102 )) 

103 else: 

104 try: 

105 write_session = self.begin_write(digest) 

106 write_session.write(data) 

107 self.commit_write(digest, write_session) 

108 except IOError as ex: 

109 result.append( 

110 Status(code=code_pb2.UNKNOWN, message=str(ex))) 

111 else: 

112 result.append(Status(code=code_pb2.OK)) 

113 return result 

114 

115 def bulk_read_blobs(self, digests): 

116 """ Given an iterable container of digests, return a 

117 {hash: file-like object} dictionary corresponding to the blobs 

118 represented by the input digests. 

119 """ 

120 

121 blobmap = {} 

122 for digest in digests: 

123 blob = self.get_blob(digest) 

124 if blob is not None: 

125 blobmap[digest.hash] = blob 

126 return blobmap 

127 

128 def put_message(self, message): 

129 """Store the given Protobuf message in CAS, returning its digest.""" 

130 message_blob = message.SerializeToString() 

131 digest = Digest(hash=HASH(message_blob).hexdigest(), 

132 size_bytes=len(message_blob)) 

133 session = self.begin_write(digest) 

134 session.write(message_blob) 

135 self.commit_write(digest, session) 

136 return digest 

137 

138 def get_message(self, digest, message_type): 

139 """Retrieve the Protobuf message with the given digest and type from 

140 CAS. If the blob is not present, returns None. 

141 """ 

142 message_blob = self.get_blob(digest) 

143 if message_blob is None: 

144 return None 

145 result = message_type.FromString(message_blob.read()) 

146 message_blob.close() 

147 return result 

148 

149 def is_cleanup_enabled(self): 

150 return False 

151 

152 def set_instance_name(self, instance_name: str) -> None: 

153 self._instance_name: Optional[str] = instance_name