Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/storage_abc.py: 78.02%

91 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17StorageABC 

18================== 

19 

20The abstract base class for storage providers. 

21""" 

22 

23import abc 

24import logging 

25from typing import List, Optional 

26 

27from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

28 CacheCapabilities, Digest, SymlinkAbsolutePathStrategy 

29) 

30from buildgrid._protos.google.rpc.status_pb2 import Status 

31from buildgrid._protos.google.rpc import code_pb2 

32from buildgrid.utils import get_hash_type 

33 

34from ....settings import HASH, MAX_REQUEST_SIZE 

35 

36 

37class StorageABC(abc.ABC): 

38 

39 def setup_grpc(self): 

40 pass 

41 

42 @abc.abstractmethod 

43 def has_blob(self, digest): 

44 """Return True if the blob with the given instance/digest exists.""" 

45 raise NotImplementedError() 

46 

47 @abc.abstractmethod 

48 def get_blob(self, digest): 

49 """Return a file-like object containing the blob. Most implementations 

50 will read the entire file into memory and return a `BytesIO` object. 

51 Eventually this should be corrected to handle files which cannot fit 

52 into memory. 

53 

54 The file-like object must be readable. To use this with an index, it must 

55 be seekable as well. 

56 

57 If the blob isn't present in storage, return None. 

58 """ 

59 raise NotImplementedError() 

60 

61 @abc.abstractmethod 

62 def delete_blob(self, digest): 

63 """Delete the blob from storage if it's present.""" 

64 

65 @abc.abstractmethod 

66 def begin_write(self, digest): 

67 """Return a file-like object to which a blob's contents could be 

68 written. 

69 """ 

70 raise NotImplementedError() 

71 

72 @abc.abstractmethod 

73 def commit_write(self, digest, write_session): 

74 """Commit the write operation. `write_session` must be an object 

75 returned by `begin_write`. 

76 

77 The storage object is not responsible for verifying that the data 

78 written to the write_session actually matches the digest. The caller 

79 must do that. 

80 """ 

81 raise NotImplementedError() 

82 

83 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

84 """Delete a list of blobs from storage.""" 

85 failed_deletions = [] 

86 for digest in digests: 

87 try: 

88 self.delete_blob(digest) 

89 except Exception: 

90 # If deletion threw an exception, assume deletion failed. More specific implementations 

91 # with more information can return if a blob was missing instead 

92 logging.getLogger(__name__).warning( 

93 f"Unable to clean up digest [{digest.hash}/{digest.size_bytes}]", exc_info=True) 

94 failed_deletions.append(f'{digest.hash}/{digest.size_bytes}') 

95 return failed_deletions 

96 

97 def missing_blobs(self, digests): 

98 """Return a container containing the blobs not present in CAS.""" 

99 result = [] 

100 for digest in digests: 

101 if not self.has_blob(digest): 

102 result.append(digest) 

103 return result 

104 

105 def bulk_update_blobs(self, blobs): 

106 """Given a container of (digest, value) tuples, add all the blobs 

107 to CAS. Return a list of Status objects corresponding to the 

108 result of uploading each of the blobs. 

109 

110 Unlike in `commit_write`, the storage object will verify that each of 

111 the digests matches the provided data. 

112 """ 

113 result = [] 

114 for digest, data in blobs: 

115 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

116 result.append( 

117 Status( 

118 code=code_pb2.INVALID_ARGUMENT, 

119 message="Data doesn't match hash", 

120 )) 

121 else: 

122 try: 

123 write_session = self.begin_write(digest) 

124 write_session.write(data) 

125 self.commit_write(digest, write_session) 

126 except IOError as ex: 

127 result.append( 

128 Status(code=code_pb2.UNKNOWN, message=str(ex))) 

129 else: 

130 result.append(Status(code=code_pb2.OK)) 

131 return result 

132 

133 def bulk_read_blobs(self, digests): 

134 """ Given an iterable container of digests, return a 

135 {hash: file-like object} dictionary corresponding to the blobs 

136 represented by the input digests. 

137 

138 Each file-like object must be readable. To use this with an index, 

139 it must be seekable as well. 

140 """ 

141 

142 blobmap = {} 

143 for digest in digests: 

144 blob = self.get_blob(digest) 

145 if blob is not None: 

146 blobmap[digest.hash] = blob 

147 return blobmap 

148 

149 def put_message(self, message): 

150 """Store the given Protobuf message in CAS, returning its digest.""" 

151 message_blob = message.SerializeToString() 

152 digest = Digest(hash=HASH(message_blob).hexdigest(), 

153 size_bytes=len(message_blob)) 

154 session = self.begin_write(digest) 

155 session.write(message_blob) 

156 self.commit_write(digest, session) 

157 return digest 

158 

159 def get_message(self, digest, message_type): 

160 """Retrieve the Protobuf message with the given digest and type from 

161 CAS. If the blob is not present, returns None. 

162 """ 

163 message_blob = self.get_blob(digest) 

164 if message_blob is None: 

165 return None 

166 result = message_type.FromString(message_blob.read()) 

167 message_blob.close() 

168 return result 

169 

170 def is_cleanup_enabled(self): 

171 return False 

172 

173 @property 

174 def instance_name(self) -> Optional[str]: 

175 if hasattr(self, '_instance_name'): 

176 return self._instance_name 

177 return None 

178 

179 def set_instance_name(self, instance_name: str) -> None: 

180 # This method should always get called, so there's no benefit in 

181 # adding an __init__ to this abstract class (therefore adding the 

182 # need for subclasses to call `super()`) just to define a null 

183 # value for this. 

184 # pylint: disable=attribute-defined-outside-init 

185 self._instance_name: Optional[str] = instance_name 

186 

187 def hash_type(self): 

188 return get_hash_type() 

189 

190 def max_batch_total_size_bytes(self): 

191 return MAX_REQUEST_SIZE 

192 

193 def symlink_absolute_path_strategy(self): 

194 # Currently this strategy is hardcoded into BuildGrid 

195 # With no setting to reference 

196 return SymlinkAbsolutePathStrategy.DISALLOWED 

197 

198 def get_capabilities(self): 

199 capabilities = CacheCapabilities() 

200 capabilities.digest_function.extend([self.hash_type()]) 

201 capabilities.max_batch_total_size_bytes = self.max_batch_total_size_bytes() 

202 capabilities.symlink_absolute_path_strategy = self.symlink_absolute_path_strategy() 

203 return capabilities