Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/size_differentiated.py: 100.00%
119 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17SizeDifferentiatedStorage
18=========================
20A storage provider which passes requests to other storage providers
21based on the size of the blob being requested.
23"""
26from collections import defaultdict
27from contextlib import ExitStack
28from typing import IO, Callable, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, TypedDict, TypeVar
30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
31from buildgrid._protos.google.rpc import code_pb2
32from buildgrid._protos.google.rpc.status_pb2 import Status
33from buildgrid.server.decorators import timed
34from buildgrid.server.logging import buildgrid_logger
35from buildgrid.server.metrics_names import METRIC
36from buildgrid.server.metrics_utils import publish_counter_metric
37from buildgrid.server.threading import ContextThreadPoolExecutor
39from .storage_abc import StorageABC
41LOGGER = buildgrid_logger(__name__)
44class SizeLimitedStorageType(TypedDict):
45 max_size: int
46 storage: StorageABC
49# NOTE: This exists separately to the TypedDict to allow attribute-based access
50# at runtime, rather than relying on string-based access to dictionary keys.
51class _SizeLimitedStorage(NamedTuple):
52 max_size: int
53 storage: StorageABC
56_T = TypeVar("_T")
57_R = TypeVar("_R")
60# wrapper functions for the bulk StorageABC interfaces
61def _bulk_delete_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[str]:
62 storage, digests = storage_digests
63 return storage.bulk_delete(digests)
66def _fmb_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[Digest]:
67 storage, digests = storage_digests
68 return storage.missing_blobs(digests)
71def _bulk_update_for_storage(
72 storage_digests: Tuple[StorageABC, List[Tuple[Digest, bytes]]]
73) -> Tuple[StorageABC, List[Status]]:
74 storage, digest_tuples = storage_digests
75 return storage, storage.bulk_update_blobs(digest_tuples)
78def _bulk_read_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> Dict[str, bytes]:
79 storage, digests = storage_digests
80 return storage.bulk_read_blobs(digests)
83class SizeDifferentiatedStorage(StorageABC):
84 TYPE = "SizeDifferentiated"
86 def __init__(
87 self, storages: List[SizeLimitedStorageType], fallback: StorageABC, thread_pool_size: Optional[int] = None
88 ) -> None:
89 self._stack = ExitStack()
90 self._fallback_storage = fallback
91 self._storages = [_SizeLimitedStorage(**storage) for storage in storages]
92 self._storages.sort(key=lambda storage: storage.max_size)
93 self._threadpool: Optional[ContextThreadPoolExecutor] = None
94 if thread_pool_size:
95 self._threadpool = ContextThreadPoolExecutor(thread_pool_size, "size-differentiated-storage")
97 def _storage_from_digest(self, digest: Digest) -> StorageABC:
98 for storage in self._storages:
99 if digest.size_bytes < storage.max_size:
100 return storage.storage
101 # If the blob is too big for any of the size-limited storages,
102 # put it in the fallback.
103 return self._fallback_storage
105 def _partition_digests(self, digests: List[Digest]) -> Dict[StorageABC, List[Digest]]:
106 partition: Dict[StorageABC, List[Digest]] = defaultdict(list)
107 for digest in digests:
108 storage = self._storage_from_digest(digest)
109 partition[storage].append(digest)
110 return partition
112 def _map(self, fn: Callable[[_T], _R], args: Iterable[_T]) -> Iterator[_R]:
113 if self._threadpool:
114 return self._threadpool.map(fn, args)
115 else:
116 return map(fn, args)
118 def start(self) -> None:
119 if self._threadpool:
120 self._stack.enter_context(self._threadpool)
121 for storage_tuple in self._storages:
122 self._stack.enter_context(storage_tuple.storage)
124 def stop(self) -> None:
125 self._stack.close()
126 LOGGER.info(f"Stopped {type(self).__name__}")
128 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
129 def has_blob(self, digest: Digest) -> bool:
130 LOGGER.debug("Checking for blob.", tags=dict(digest=digest))
131 storage = self._storage_from_digest(digest)
132 return storage.has_blob(digest)
134 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
135 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
136 LOGGER.debug("Getting blob.", tags=dict(digest=digest))
137 storage = self._storage_from_digest(digest)
138 return storage.get_blob(digest)
140 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
141 def delete_blob(self, digest: Digest) -> None:
142 LOGGER.debug("Deleting blob.", tags=dict(digest=digest))
143 storage = self._storage_from_digest(digest)
144 storage.delete_blob(digest)
146 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
147 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
148 LOGGER.debug("Writing blob.", tags=dict(digest=digest))
149 storage = self._storage_from_digest(digest)
150 storage.commit_write(digest, write_session)
152 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE)
153 def bulk_delete(self, digests: List[Digest]) -> List[str]:
154 failed_deletions: List[str] = []
155 for result in self._map(_bulk_delete_for_storage, self._partition_digests(digests).items()):
156 failed_deletions.extend(result)
158 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(failed_deletions), type=self.TYPE)
159 return failed_deletions
161 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
162 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
163 missing_blobs: List[Digest] = []
165 for result in self._map(_fmb_for_storage, self._partition_digests(digests).items()):
166 missing_blobs.extend(result)
168 return missing_blobs
170 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
171 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
172 partitioned_digests: Dict[StorageABC, List[Tuple[Digest, bytes]]] = defaultdict(list)
173 idx_map: Dict[StorageABC, List[int]] = defaultdict(list)
174 for orig_idx, digest_tuple in enumerate(blobs):
175 storage = self._storage_from_digest(digest_tuple[0])
176 partitioned_digests[storage].append(digest_tuple)
177 idx_map[storage].append(orig_idx)
179 results: List[Status] = [Status(code=code_pb2.INTERNAL, message="inconsistent batch results")] * len(blobs)
180 for storage, statuses in self._map(_bulk_update_for_storage, partitioned_digests.items()):
181 for status_idx, status in enumerate(statuses):
182 results[idx_map[storage][status_idx]] = status
183 return results
185 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
186 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
187 bulk_read_results: Dict[str, bytes] = {}
188 for result in self._map(_bulk_read_for_storage, self._partition_digests(digests).items()):
189 bulk_read_results.update(result)
191 return bulk_read_results