Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/storage_abc.py: 92.86%

126 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17StorageABC 

18================== 

19 

20The abstract base class for storage providers. 

21""" 

22 

23import abc 

24import io 

25import logging 

26from tempfile import TemporaryFile 

27from typing import IO, Any, Dict, Iterator, List, Optional, Tuple, Type, TypeVar 

28 

29from buildgrid._exceptions import NotFoundError 

30from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

32 CacheCapabilities, 

33 Digest, 

34 Directory, 

35 SymlinkAbsolutePathStrategy, 

36) 

37from buildgrid._protos.google.rpc import code_pb2 

38from buildgrid._protos.google.rpc.status_pb2 import Status 

39from buildgrid._types import MessageType 

40from buildgrid.settings import HASH, MAX_IN_MEMORY_BLOB_SIZE_BYTES, MAX_REQUEST_SIZE 

41from buildgrid.utils import get_hash_type 

42 

43LOGGER = logging.getLogger(__name__) 

44 

45M = TypeVar("M", bound=MessageType) 

46 

47 

48def create_write_session(digest: Digest) -> IO[bytes]: 

49 """ 

50 Return a file-like object to which a blob's contents could be written. 

51 

52 For large files, to avoid excess memory usage, upload to temporary file. 

53 For small files we can work in memory for performance. 

54 """ 

55 

56 if digest.size_bytes > MAX_IN_MEMORY_BLOB_SIZE_BYTES: 

57 return TemporaryFile() 

58 return io.BytesIO() 

59 

60 

61T = TypeVar("T", bound="StorageABC") 

62 

63 

64class StorageABC(abc.ABC): 

65 def __enter__(self: T) -> T: 

66 self.start() 

67 return self 

68 

69 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

70 self.stop() 

71 

72 def start(self) -> None: 

73 pass 

74 

75 def stop(self) -> None: 

76 LOGGER.info(f"Stopped {type(self).__name__}") 

77 

78 @abc.abstractmethod 

79 def has_blob(self, digest: Digest) -> bool: 

80 """Return True if the blob with the given instance/digest exists.""" 

81 raise NotImplementedError() 

82 

83 @abc.abstractmethod 

84 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

85 """Return a file-like object containing the blob. Most implementations 

86 will read the entire file into memory and return a `BytesIO` object. 

87 Eventually this should be corrected to handle files which cannot fit 

88 into memory. 

89 

90 The file-like object must be readable and seekable. 

91 

92 If the blob isn't present in storage, return None. 

93 """ 

94 raise NotImplementedError() 

95 

96 @abc.abstractmethod 

97 def delete_blob(self, digest: Digest) -> None: 

98 """Delete the blob from storage if it's present.""" 

99 

100 @abc.abstractmethod 

101 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

102 """Store the contents for a digest. 

103 

104 The storage object is not responsible for verifying that the data 

105 written to the write_session actually matches the digest. The caller 

106 must do that. 

107 """ 

108 raise NotImplementedError() 

109 

110 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

111 """Delete a list of blobs from storage.""" 

112 failed_deletions = [] 

113 for digest in digests: 

114 try: 

115 self.delete_blob(digest) 

116 except Exception: 

117 # If deletion threw an exception, assume deletion failed. More specific implementations 

118 # with more information can return if a blob was missing instead 

119 LOGGER.warning(f"Unable to clean up digest [{digest.hash}/{digest.size_bytes}]", exc_info=True) 

120 failed_deletions.append(f"{digest.hash}/{digest.size_bytes}") 

121 return failed_deletions 

122 

123 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

124 """Return a container containing the blobs not present in CAS.""" 

125 result = [] 

126 for digest in digests: 

127 if not self.has_blob(digest): 

128 result.append(digest) 

129 return result 

130 

131 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

132 """Given a container of (digest, value) tuples, add all the blobs 

133 to CAS. Return a list of Status objects corresponding to the 

134 result of uploading each of the blobs. 

135 

136 Unlike in `commit_write`, the storage object will verify that each of 

137 the digests matches the provided data. 

138 """ 

139 result = [] 

140 for digest, data in blobs: 

141 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

142 result.append(Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash")) 

143 continue 

144 try: 

145 with create_write_session(digest) as write_session: 

146 write_session.write(data) 

147 self.commit_write(digest, write_session) 

148 result.append(Status(code=code_pb2.OK)) 

149 except IOError as ex: 

150 result.append(Status(code=code_pb2.UNKNOWN, message=str(ex))) 

151 return result 

152 

153 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

154 """Given an iterable container of digests, return a 

155 {hash: file-like object} dictionary corresponding to the blobs 

156 represented by the input digests. 

157 

158 Each file-like object must be readable and seekable. 

159 """ 

160 

161 blobmap = {} 

162 for digest in digests: 

163 blob = self.get_blob(digest) 

164 if blob is not None: 

165 with blob as b: 

166 blobmap[digest.hash] = b.read() 

167 return blobmap 

168 

169 def put_message(self, message: MessageType) -> Digest: 

170 """Store the given Protobuf message in CAS, returning its digest.""" 

171 message_blob = message.SerializeToString() 

172 digest = Digest(hash=HASH(message_blob).hexdigest(), size_bytes=len(message_blob)) 

173 with create_write_session(digest) as session: 

174 session.write(message_blob) 

175 self.commit_write(digest, session) 

176 return digest 

177 

178 def get_message(self, digest: Digest, message_type: Type[M]) -> Optional[M]: 

179 """Retrieve the Protobuf message with the given digest and type from 

180 CAS. If the blob is not present, returns None. 

181 """ 

182 message_blob = self.get_blob(digest) 

183 if message_blob is None: 

184 return None 

185 try: 

186 return message_type.FromString(message_blob.read()) 

187 finally: 

188 message_blob.close() 

189 

190 def get_tree(self, root_digest: Digest, raise_on_missing_subdir: bool = False) -> Iterator[Directory]: 

191 # From the spec, a NotFound response only occurs if the root directory is missing. 

192 root_directory = self.get_message(root_digest, Directory) 

193 if root_directory is None: 

194 raise NotFoundError(f"Root digest not found: {root_digest.hash}/{root_digest.size_bytes}") 

195 yield root_directory 

196 

197 queue = [subdir.digest for subdir in root_directory.directories] 

198 while queue: 

199 blobs = self.bulk_read_blobs(queue) 

200 

201 # GetTree allows for missing subtrees, but knowing some digests 

202 # are missing without scanning the result on the caller side 

203 # makes certain usages more efficient 

204 if raise_on_missing_subdir and len(blobs) < len(queue): 

205 raise NotFoundError( 

206 f"Missing entries under root directory: {root_digest.hash}/{root_digest.size_bytes}" 

207 ) 

208 

209 directories = [Directory.FromString(b) for b in blobs.values()] 

210 queue = [subdir.digest for d in directories for subdir in d.directories] 

211 

212 if len(directories) > 0: 

213 yield from directories 

214 

215 @property 

216 def instance_name(self) -> Optional[str]: 

217 if hasattr(self, "_instance_name"): 

218 return self._instance_name 

219 return None 

220 

221 def set_instance_name(self, instance_name: str) -> None: 

222 # This method should always get called, so there's no benefit in 

223 # adding an __init__ to this abstract class (therefore adding the 

224 # need for subclasses to call `super()`) just to define a null 

225 # value for this. 

226 self._instance_name: Optional[str] = instance_name 

227 

228 def hash_type(self) -> "remote_execution_pb2.DigestFunction.Value.ValueType": 

229 return get_hash_type() 

230 

231 def max_batch_total_size_bytes(self) -> int: 

232 return MAX_REQUEST_SIZE 

233 

234 def symlink_absolute_path_strategy(self) -> "SymlinkAbsolutePathStrategy.Value.ValueType": 

235 # Currently this strategy is hardcoded into BuildGrid 

236 # With no setting to reference 

237 return SymlinkAbsolutePathStrategy.DISALLOWED 

238 

239 def get_capabilities(self) -> CacheCapabilities: 

240 capabilities = CacheCapabilities() 

241 capabilities.digest_functions.extend([self.hash_type()]) 

242 capabilities.max_batch_total_size_bytes = self.max_batch_total_size_bytes() 

243 capabilities.symlink_absolute_path_strategy = self.symlink_absolute_path_strategy() 

244 return capabilities