Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/size_differentiated.py: 100.00%
127 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-28 16:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-28 16:48 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17SizeDifferentiatedStorage
18=========================
20A storage provider which passes requests to other storage providers
21based on the size of the blob being requested.
23"""
26from collections import defaultdict
27from contextlib import ExitStack
28from typing import IO, Callable, Iterable, Iterator, NamedTuple, TypedDict, TypeVar
30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
31from buildgrid._protos.google.rpc import code_pb2
32from buildgrid._protos.google.rpc.status_pb2 import Status
33from buildgrid.server.decorators import timed
34from buildgrid.server.logging import buildgrid_logger
35from buildgrid.server.metrics_names import METRIC
36from buildgrid.server.metrics_utils import publish_counter_metric
37from buildgrid.server.threading import ContextThreadPoolExecutor
39from .storage_abc import StorageABC
41LOGGER = buildgrid_logger(__name__)
44class SizeLimitedStorageType(TypedDict):
45 max_size: int
46 storage: StorageABC
49# NOTE: This exists separately to the TypedDict to allow attribute-based access
50# at runtime, rather than relying on string-based access to dictionary keys.
51class _SizeLimitedStorage(NamedTuple):
52 max_size: int
53 storage: StorageABC
56_T = TypeVar("_T")
57_R = TypeVar("_R")
60# wrapper functions for the bulk StorageABC interfaces
61def _bulk_delete_for_storage(storage_digests: tuple[StorageABC, list[Digest]]) -> list[str]:
62 storage, digests = storage_digests
63 return storage.bulk_delete(digests)
66def _fmb_for_storage(storage_digests: tuple[StorageABC, list[Digest]]) -> list[Digest]:
67 storage, digests = storage_digests
68 return storage.missing_blobs(digests)
71def _bulk_update_for_storage(
72 storage_digests: tuple[StorageABC, list[tuple[Digest, bytes]]],
73) -> tuple[StorageABC, list[Status]]:
74 storage, digest_tuples = storage_digests
75 return storage, storage.bulk_update_blobs(digest_tuples)
78def _bulk_read_for_storage(storage_digests: tuple[StorageABC, list[Digest]]) -> dict[str, bytes]:
79 storage, digests = storage_digests
80 return storage.bulk_read_blobs(digests)
83class SizeDifferentiatedStorage(StorageABC):
84 TYPE = "SizeDifferentiated"
86 def __init__(
87 self, storages: list[SizeLimitedStorageType], fallback: StorageABC, thread_pool_size: int | None = None
88 ) -> None:
89 self._stack = ExitStack()
90 self._fallback_storage = fallback
91 self._storages = [_SizeLimitedStorage(**storage) for storage in storages]
92 self._storages.sort(key=lambda storage: storage.max_size)
93 self._threadpool: ContextThreadPoolExecutor | None = None
94 if thread_pool_size:
95 self._threadpool = ContextThreadPoolExecutor(thread_pool_size, "size-differentiated-storage")
97 def _storage_from_digest(self, digest: Digest) -> StorageABC:
98 for storage in self._storages:
99 if digest.size_bytes < storage.max_size:
100 return storage.storage
101 # If the blob is too big for any of the size-limited storages,
102 # put it in the fallback.
103 return self._fallback_storage
105 def _partition_digests(self, digests: list[Digest]) -> dict[StorageABC, list[Digest]]:
106 partition: dict[StorageABC, list[Digest]] = defaultdict(list)
107 for digest in digests:
108 storage = self._storage_from_digest(digest)
109 partition[storage].append(digest)
110 return partition
112 def _map(self, fn: Callable[[_T], _R], args: Iterable[_T]) -> Iterator[_R]:
113 if self._threadpool:
114 return self._threadpool.map(fn, args)
115 else:
116 return map(fn, args)
118 def start(self) -> None:
119 if self._threadpool:
120 self._stack.enter_context(self._threadpool)
121 for storage_tuple in self._storages:
122 self._stack.enter_context(storage_tuple.storage)
124 def stop(self) -> None:
125 self._stack.close()
126 LOGGER.info(f"Stopped {type(self).__name__}")
128 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
129 def has_blob(self, digest: Digest) -> bool:
130 LOGGER.debug("Checking for blob.", tags=dict(digest=digest))
131 storage = self._storage_from_digest(digest)
132 return storage.has_blob(digest)
134 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
135 def get_blob(self, digest: Digest) -> IO[bytes] | None:
136 LOGGER.debug("Getting blob.", tags=dict(digest=digest))
137 storage = self._storage_from_digest(digest)
138 return storage.get_blob(digest)
140 @timed(METRIC.STORAGE.STREAM_READ_DURATION, type=TYPE)
141 def stream_read_blob(self, digest: Digest, chunk_size: int, offset: int = 0, limit: int = 0) -> Iterator[bytes]:
142 LOGGER.debug("Streaming blob.", tags=dict(digest=digest))
143 storage = self._storage_from_digest(digest)
144 yield from storage.stream_read_blob(digest, chunk_size, offset, limit)
146 @timed(METRIC.STORAGE.STREAM_WRITE_DURATION, type=TYPE)
147 def stream_write_blob(self, digest: Digest, chunks: Iterator[bytes]) -> None:
148 self._storage_from_digest(digest).stream_write_blob(digest, chunks)
150 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
151 def delete_blob(self, digest: Digest) -> None:
152 LOGGER.debug("Deleting blob.", tags=dict(digest=digest))
153 storage = self._storage_from_digest(digest)
154 storage.delete_blob(digest)
156 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
157 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
158 LOGGER.debug("Writing blob.", tags=dict(digest=digest))
159 storage = self._storage_from_digest(digest)
160 storage.commit_write(digest, write_session)
162 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE)
163 def bulk_delete(self, digests: list[Digest]) -> list[str]:
164 failed_deletions: list[str] = []
165 for result in self._map(_bulk_delete_for_storage, self._partition_digests(digests).items()):
166 failed_deletions.extend(result)
168 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(failed_deletions), type=self.TYPE)
169 return failed_deletions
171 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
172 def missing_blobs(self, digests: list[Digest]) -> list[Digest]:
173 missing_blobs: list[Digest] = []
175 for result in self._map(_fmb_for_storage, self._partition_digests(digests).items()):
176 missing_blobs.extend(result)
178 return missing_blobs
180 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
181 def bulk_update_blobs(self, blobs: list[tuple[Digest, bytes]]) -> list[Status]:
182 partitioned_digests: dict[StorageABC, list[tuple[Digest, bytes]]] = defaultdict(list)
183 idx_map: dict[StorageABC, list[int]] = defaultdict(list)
184 for orig_idx, digest_tuple in enumerate(blobs):
185 storage = self._storage_from_digest(digest_tuple[0])
186 partitioned_digests[storage].append(digest_tuple)
187 idx_map[storage].append(orig_idx)
189 results: list[Status] = [Status(code=code_pb2.INTERNAL, message="inconsistent batch results")] * len(blobs)
190 for storage, statuses in self._map(_bulk_update_for_storage, partitioned_digests.items()):
191 for status_idx, status in enumerate(statuses):
192 results[idx_map[storage][status_idx]] = status
193 return results
195 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
196 def bulk_read_blobs(self, digests: list[Digest]) -> dict[str, bytes]:
197 bulk_read_results: dict[str, bytes] = {}
198 for result in self._map(_bulk_read_for_storage, self._partition_digests(digests).items()):
199 bulk_read_results.update(result)
201 return bulk_read_results