Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/storage_abc.py: 100.00%

76 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17StorageABC 

18================== 

19 

20The abstract base class for storage providers. 

21""" 

22 

23import abc 

24import io 

25from tempfile import TemporaryFile 

26from typing import IO, Any, Dict, Iterator, List, Optional, Tuple, Type, TypeVar 

27 

28from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest, Directory 

29from buildgrid._protos.google.rpc.status_pb2 import Status 

30from buildgrid.server.exceptions import NotFoundError 

31from buildgrid.server.logging import buildgrid_logger 

32from buildgrid.server.metrics_names import METRIC 

33from buildgrid.server.metrics_utils import timer 

34from buildgrid.server.settings import HASH, MAX_IN_MEMORY_BLOB_SIZE_BYTES 

35from buildgrid.server.types import MessageType 

36 

37LOGGER = buildgrid_logger(__name__) 

38 

39M = TypeVar("M", bound=MessageType) 

40 

41 

42def create_write_session(digest: Digest) -> IO[bytes]: 

43 """ 

44 Return a file-like object to which a blob's contents could be written. 

45 

46 For large files, to avoid excess memory usage, upload to temporary file. 

47 For small files we can work in memory for performance. 

48 """ 

49 

50 if digest.size_bytes > MAX_IN_MEMORY_BLOB_SIZE_BYTES: 

51 return TemporaryFile() 

52 return io.BytesIO() 

53 

54 

55T = TypeVar("T", bound="StorageABC") 

56 

57 

58class StorageABC(abc.ABC): 

59 TYPE: str 

60 

61 def __enter__(self: T) -> T: 

62 self.start() 

63 return self 

64 

65 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

66 self.stop() 

67 

68 def start(self) -> None: 

69 pass 

70 

71 def stop(self) -> None: 

72 LOGGER.info(f"Stopped {type(self).__name__}") 

73 

74 @abc.abstractmethod 

75 def has_blob(self, digest: Digest) -> bool: 

76 """Return True if the blob with the given instance/digest exists.""" 

77 

78 @abc.abstractmethod 

79 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

80 """Return a file-like object containing the blob. Most implementations 

81 will read the entire file into memory and return a `BytesIO` object. 

82 Eventually this should be corrected to handle files which cannot fit 

83 into memory. 

84 

85 The file-like object must be readable and seekable. 

86 

87 If the blob isn't present in storage, return None. 

88 """ 

89 

90 @abc.abstractmethod 

91 def delete_blob(self, digest: Digest) -> None: 

92 """Delete the blob from storage if it's present.""" 

93 

94 @abc.abstractmethod 

95 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

96 """Store the contents for a digest. 

97 

98 The storage object is not responsible for verifying that the data 

99 written to the write_session actually matches the digest. The caller 

100 must do that. 

101 """ 

102 

103 @abc.abstractmethod 

104 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

105 """Delete a list of blobs from storage.""" 

106 

107 @abc.abstractmethod 

108 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

109 """Return a container containing the blobs not present in CAS.""" 

110 

111 @abc.abstractmethod 

112 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

113 """Given a container of (digest, value) tuples, add all the blobs 

114 to CAS. Return a list of Status objects corresponding to the 

115 result of uploading each of the blobs. 

116 

117 Unlike in `commit_write`, the storage object will verify that each of 

118 the digests matches the provided data. 

119 """ 

120 

121 @abc.abstractmethod 

122 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

123 """Given an iterable container of digests, return a 

124 {hash: file-like object} dictionary corresponding to the blobs 

125 represented by the input digests. 

126 

127 Each file-like object must be readable and seekable. 

128 """ 

129 

130 def put_message(self, message: MessageType) -> Digest: 

131 """Store the given Protobuf message in CAS, returning its digest.""" 

132 message_blob = message.SerializeToString() 

133 digest = Digest(hash=HASH(message_blob).hexdigest(), size_bytes=len(message_blob)) 

134 with create_write_session(digest) as session: 

135 session.write(message_blob) 

136 self.commit_write(digest, session) 

137 return digest 

138 

139 def get_message(self, digest: Digest, message_type: Type[M]) -> Optional[M]: 

140 """Retrieve the Protobuf message with the given digest and type from 

141 CAS. If the blob is not present, returns None. 

142 """ 

143 message_blob = self.get_blob(digest) 

144 if message_blob is None: 

145 return None 

146 try: 

147 return message_type.FromString(message_blob.read()) 

148 finally: 

149 message_blob.close() 

150 

151 def get_tree(self, root_digest: Digest, raise_on_missing_subdir: bool = False) -> Iterator[Directory]: 

152 # From the spec, a NotFound response only occurs if the root directory is missing. 

153 with timer(METRIC.STORAGE.GET_TREE_DURATION, type=self.TYPE): 

154 root_directory = self.get_message(root_digest, Directory) 

155 if root_directory is None: 

156 raise NotFoundError(f"Root digest not found: {root_digest.hash}/{root_digest.size_bytes}") 

157 yield root_directory 

158 

159 queue = [subdir.digest for subdir in root_directory.directories] 

160 while queue: 

161 blobs = self.bulk_read_blobs(queue) 

162 

163 # GetTree allows for missing subtrees, but knowing some digests 

164 # are missing without scanning the result on the caller side 

165 # makes certain usages more efficient 

166 if raise_on_missing_subdir and len(blobs) < len(queue): 

167 raise NotFoundError( 

168 f"Missing entries under root directory: {root_digest.hash}/{root_digest.size_bytes}" 

169 ) 

170 

171 directories = [Directory.FromString(b) for b in blobs.values()] 

172 queue = [subdir.digest for d in directories for subdir in d.directories] 

173 

174 if len(directories) > 0: 

175 yield from directories