Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/storage_abc.py: 100.00%
76 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17StorageABC
18==================
20The abstract base class for storage providers.
21"""
23import abc
24import io
25from tempfile import TemporaryFile
26from typing import IO, Any, Dict, Iterator, List, Optional, Tuple, Type, TypeVar
28from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest, Directory
29from buildgrid._protos.google.rpc.status_pb2 import Status
30from buildgrid.server.exceptions import NotFoundError
31from buildgrid.server.logging import buildgrid_logger
32from buildgrid.server.metrics_names import METRIC
33from buildgrid.server.metrics_utils import timer
34from buildgrid.server.settings import HASH, MAX_IN_MEMORY_BLOB_SIZE_BYTES
35from buildgrid.server.types import MessageType
37LOGGER = buildgrid_logger(__name__)
39M = TypeVar("M", bound=MessageType)
42def create_write_session(digest: Digest) -> IO[bytes]:
43 """
44 Return a file-like object to which a blob's contents could be written.
46 For large files, to avoid excess memory usage, upload to temporary file.
47 For small files we can work in memory for performance.
48 """
50 if digest.size_bytes > MAX_IN_MEMORY_BLOB_SIZE_BYTES:
51 return TemporaryFile()
52 return io.BytesIO()
55T = TypeVar("T", bound="StorageABC")
58class StorageABC(abc.ABC):
59 TYPE: str
61 def __enter__(self: T) -> T:
62 self.start()
63 return self
65 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
66 self.stop()
68 def start(self) -> None:
69 pass
71 def stop(self) -> None:
72 LOGGER.info(f"Stopped {type(self).__name__}")
74 @abc.abstractmethod
75 def has_blob(self, digest: Digest) -> bool:
76 """Return True if the blob with the given instance/digest exists."""
78 @abc.abstractmethod
79 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
80 """Return a file-like object containing the blob. Most implementations
81 will read the entire file into memory and return a `BytesIO` object.
82 Eventually this should be corrected to handle files which cannot fit
83 into memory.
85 The file-like object must be readable and seekable.
87 If the blob isn't present in storage, return None.
88 """
90 @abc.abstractmethod
91 def delete_blob(self, digest: Digest) -> None:
92 """Delete the blob from storage if it's present."""
94 @abc.abstractmethod
95 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
96 """Store the contents for a digest.
98 The storage object is not responsible for verifying that the data
99 written to the write_session actually matches the digest. The caller
100 must do that.
101 """
103 @abc.abstractmethod
104 def bulk_delete(self, digests: List[Digest]) -> List[str]:
105 """Delete a list of blobs from storage."""
107 @abc.abstractmethod
108 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
109 """Return a container containing the blobs not present in CAS."""
111 @abc.abstractmethod
112 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
113 """Given a container of (digest, value) tuples, add all the blobs
114 to CAS. Return a list of Status objects corresponding to the
115 result of uploading each of the blobs.
117 Unlike in `commit_write`, the storage object will verify that each of
118 the digests matches the provided data.
119 """
121 @abc.abstractmethod
122 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
123 """Given an iterable container of digests, return a
124 {hash: file-like object} dictionary corresponding to the blobs
125 represented by the input digests.
127 Each file-like object must be readable and seekable.
128 """
130 def put_message(self, message: MessageType) -> Digest:
131 """Store the given Protobuf message in CAS, returning its digest."""
132 message_blob = message.SerializeToString()
133 digest = Digest(hash=HASH(message_blob).hexdigest(), size_bytes=len(message_blob))
134 with create_write_session(digest) as session:
135 session.write(message_blob)
136 self.commit_write(digest, session)
137 return digest
139 def get_message(self, digest: Digest, message_type: Type[M]) -> Optional[M]:
140 """Retrieve the Protobuf message with the given digest and type from
141 CAS. If the blob is not present, returns None.
142 """
143 message_blob = self.get_blob(digest)
144 if message_blob is None:
145 return None
146 try:
147 return message_type.FromString(message_blob.read())
148 finally:
149 message_blob.close()
151 def get_tree(self, root_digest: Digest, raise_on_missing_subdir: bool = False) -> Iterator[Directory]:
152 # From the spec, a NotFound response only occurs if the root directory is missing.
153 with timer(METRIC.STORAGE.GET_TREE_DURATION, type=self.TYPE):
154 root_directory = self.get_message(root_digest, Directory)
155 if root_directory is None:
156 raise NotFoundError(f"Root digest not found: {root_digest.hash}/{root_digest.size_bytes}")
157 yield root_directory
159 queue = [subdir.digest for subdir in root_directory.directories]
160 while queue:
161 blobs = self.bulk_read_blobs(queue)
163 # GetTree allows for missing subtrees, but knowing some digests
164 # are missing without scanning the result on the caller side
165 # makes certain usages more efficient
166 if raise_on_missing_subdir and len(blobs) < len(queue):
167 raise NotFoundError(
168 f"Missing entries under root directory: {root_digest.hash}/{root_digest.size_bytes}"
169 )
171 directories = [Directory.FromString(b) for b in blobs.values()]
172 queue = [subdir.digest for d in directories for subdir in d.directories]
174 if len(directories) > 0:
175 yield from directories