Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/storage_abc.py: 81.32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17StorageABC 

18================== 

19 

20The abstract base class for storage providers. 

21""" 

22 

23import abc 

24import logging 

25from typing import List, Optional 

26 

27from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

28 CacheCapabilities, Digest, SymlinkAbsolutePathStrategy 

29) 

30from buildgrid._protos.google.rpc.status_pb2 import Status 

31from buildgrid._protos.google.rpc import code_pb2 

32from buildgrid.utils import get_hash_type 

33 

34from ....settings import HASH, MAX_REQUEST_SIZE 

35 

36 

37class StorageABC(abc.ABC): 

38 

39 def setup_grpc(self): 

40 pass 

41 

42 @abc.abstractmethod 

43 def has_blob(self, digest): 

44 """Return True if the blob with the given instance/digest exists.""" 

45 raise NotImplementedError() 

46 

47 @abc.abstractmethod 

48 def get_blob(self, digest): 

49 """Return a file-like object containing the blob. Most implementations 

50 will read the entire file into memory and return a `BytesIO` object. 

51 Eventually this should be corrected to handle files which cannot fit 

52 into memory. 

53 

54 If the blob isn't present in storage, return None. 

55 """ 

56 raise NotImplementedError() 

57 

58 @abc.abstractmethod 

59 def delete_blob(self, digest): 

60 """Delete the blob from storage if it's present.""" 

61 

62 @abc.abstractmethod 

63 def begin_write(self, digest): 

64 """Return a file-like object to which a blob's contents could be 

65 written. 

66 """ 

67 raise NotImplementedError() 

68 

69 @abc.abstractmethod 

70 def commit_write(self, digest, write_session): 

71 """Commit the write operation. `write_session` must be an object 

72 returned by `begin_write`. 

73 

74 The storage object is not responsible for verifying that the data 

75 written to the write_session actually matches the digest. The caller 

76 must do that. 

77 """ 

78 raise NotImplementedError() 

79 

80 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

81 """Delete a list of blobs from storage.""" 

82 failed_deletions = [] 

83 for digest in digests: 

84 try: 

85 self.delete_blob(digest) 

86 except Exception: 

87 # If deletion threw an exception, assume deletion failed. More specific implementations 

88 # with more information can return if a blob was missing instead 

89 logging.getLogger(__name__).warning( 

90 f"Unable to clean up digest [{digest.hash}/{digest.size_bytes}]", exc_info=True) 

91 failed_deletions.append(f'{digest.hash}/{digest.size_bytes}') 

92 return failed_deletions 

93 

94 def missing_blobs(self, digests): 

95 """Return a container containing the blobs not present in CAS.""" 

96 result = [] 

97 for digest in digests: 

98 if not self.has_blob(digest): 

99 result.append(digest) 

100 return result 

101 

102 def bulk_update_blobs(self, blobs): 

103 """Given a container of (digest, value) tuples, add all the blobs 

104 to CAS. Return a list of Status objects corresponding to the 

105 result of uploading each of the blobs. 

106 

107 Unlike in `commit_write`, the storage object will verify that each of 

108 the digests matches the provided data. 

109 """ 

110 result = [] 

111 for digest, data in blobs: 

112 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

113 result.append( 

114 Status( 

115 code=code_pb2.INVALID_ARGUMENT, 

116 message="Data doesn't match hash", 

117 )) 

118 else: 

119 try: 

120 write_session = self.begin_write(digest) 

121 write_session.write(data) 

122 self.commit_write(digest, write_session) 

123 except IOError as ex: 

124 result.append( 

125 Status(code=code_pb2.UNKNOWN, message=str(ex))) 

126 else: 

127 result.append(Status(code=code_pb2.OK)) 

128 return result 

129 

130 def bulk_read_blobs(self, digests): 

131 """ Given an iterable container of digests, return a 

132 {hash: file-like object} dictionary corresponding to the blobs 

133 represented by the input digests. 

134 """ 

135 

136 blobmap = {} 

137 for digest in digests: 

138 blob = self.get_blob(digest) 

139 if blob is not None: 

140 blobmap[digest.hash] = blob 

141 return blobmap 

142 

143 def put_message(self, message): 

144 """Store the given Protobuf message in CAS, returning its digest.""" 

145 message_blob = message.SerializeToString() 

146 digest = Digest(hash=HASH(message_blob).hexdigest(), 

147 size_bytes=len(message_blob)) 

148 session = self.begin_write(digest) 

149 session.write(message_blob) 

150 self.commit_write(digest, session) 

151 return digest 

152 

153 def get_message(self, digest, message_type): 

154 """Retrieve the Protobuf message with the given digest and type from 

155 CAS. If the blob is not present, returns None. 

156 """ 

157 message_blob = self.get_blob(digest) 

158 if message_blob is None: 

159 return None 

160 result = message_type.FromString(message_blob.read()) 

161 message_blob.close() 

162 return result 

163 

164 def is_cleanup_enabled(self): 

165 return False 

166 

167 @property 

168 def instance_name(self) -> Optional[str]: 

169 if hasattr(self, '_instance_name'): 

170 return self._instance_name 

171 return None 

172 

173 def set_instance_name(self, instance_name: str) -> None: 

174 # This method should always get called, so there's no benefit in 

175 # adding an __init__ to this abstract class (therefore adding the 

176 # need for subclasses to call `super()`) just to define a null 

177 # value for this. 

178 # pylint: disable=attribute-defined-outside-init 

179 self._instance_name: Optional[str] = instance_name 

180 

181 def hash_type(self): 

182 return get_hash_type() 

183 

184 def max_batch_total_size_bytes(self): 

185 return MAX_REQUEST_SIZE 

186 

187 def symlink_absolute_path_strategy(self): 

188 # Currently this strategy is hardcoded into BuildGrid 

189 # With no setting to reference 

190 return SymlinkAbsolutePathStrategy.DISALLOWED 

191 

192 def get_capabilities(self): 

193 capabilities = CacheCapabilities() 

194 capabilities.digest_function.extend([self.hash_type()]) 

195 capabilities.max_batch_total_size_bytes = self.max_batch_total_size_bytes() 

196 capabilities.symlink_absolute_path_strategy = self.symlink_absolute_path_strategy() 

197 return capabilities