Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/size_differentiated.py: 97.25%
109 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-28 16:20 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-28 16:20 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17SizeDifferentiatedStorage
18=========================
20A storage provider which passes requests to other storage providers
21based on the size of the blob being requested.
23"""
25import logging
26from collections import defaultdict
27from contextlib import ExitStack
28from typing import IO, Callable, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, TypedDict, TypeVar
30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
31from buildgrid._protos.google.rpc import code_pb2
32from buildgrid._protos.google.rpc.status_pb2 import Status
34from ...threading import ContextThreadPoolExecutor
35from .storage_abc import StorageABC
37LOGGER = logging.getLogger(__name__)
40class SizeLimitedStorageType(TypedDict):
41 max_size: int
42 storage: StorageABC
45# NOTE: This exists separately to the TypedDict to allow attribute-based access
46# at runtime, rather than relying on string-based access to dictionary keys.
47class _SizeLimitedStorage(NamedTuple):
48 max_size: int
49 storage: StorageABC
52_T = TypeVar("_T")
53_R = TypeVar("_R")
56# wrapper functions for the bulk StorageABC interfaces
57def _bulk_delete_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[str]:
58 storage, digests = storage_digests
59 return storage.bulk_delete(digests)
62def _fmb_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[Digest]:
63 storage, digests = storage_digests
64 return storage.missing_blobs(digests)
67def _bulk_update_for_storage(
68 storage_digests: Tuple[StorageABC, List[Tuple[Digest, bytes]]]
69) -> Tuple[StorageABC, List[Status]]:
70 storage, digest_tuples = storage_digests
71 return storage, storage.bulk_update_blobs(digest_tuples)
74def _bulk_read_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> Dict[str, bytes]:
75 storage, digests = storage_digests
76 return storage.bulk_read_blobs(digests)
79class SizeDifferentiatedStorage(StorageABC):
80 def __init__(
81 self, storages: List[SizeLimitedStorageType], fallback: StorageABC, thread_pool_size: Optional[int] = None
82 ) -> None:
83 self._stack = ExitStack()
84 self._fallback_storage = fallback
85 self._storages = [_SizeLimitedStorage(**storage) for storage in storages]
86 self._storages.sort(key=lambda storage: storage.max_size)
87 self._threadpool: Optional[ContextThreadPoolExecutor] = None
88 if thread_pool_size:
89 self._threadpool = ContextThreadPoolExecutor(thread_pool_size, "size-differentiated-storage")
91 def _storage_from_digest(self, digest: Digest) -> StorageABC:
92 for storage in self._storages:
93 if digest.size_bytes < storage.max_size:
94 return storage.storage
95 # If the blob is too big for any of the size-limited storages,
96 # put it in the fallback.
97 return self._fallback_storage
99 def _partition_digests(self, digests: List[Digest]) -> Dict[StorageABC, List[Digest]]:
100 partition: Dict[StorageABC, List[Digest]] = defaultdict(list)
101 for digest in digests:
102 storage = self._storage_from_digest(digest)
103 partition[storage].append(digest)
104 return partition
106 def _map(self, fn: Callable[[_T], _R], args: Iterable[_T]) -> Iterator[_R]:
107 if self._threadpool:
108 return self._threadpool.map(fn, args)
109 else:
110 return map(fn, args)
112 def start(self) -> None:
113 if self._threadpool:
114 self._stack.enter_context(self._threadpool)
115 for storage_tuple in self._storages:
116 self._stack.enter_context(storage_tuple.storage)
118 def stop(self) -> None:
119 self._stack.close()
121 def has_blob(self, digest: Digest) -> bool:
122 LOGGER.debug(f"Checking for blob: [{digest}]")
123 storage = self._storage_from_digest(digest)
124 return storage.has_blob(digest)
126 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
127 LOGGER.debug(f"Getting blob: [{digest}]")
128 storage = self._storage_from_digest(digest)
129 return storage.get_blob(digest)
131 def delete_blob(self, digest: Digest) -> None:
132 LOGGER.debug(f"Deleting blob: [{digest}]")
133 storage = self._storage_from_digest(digest)
134 storage.delete_blob(digest)
136 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
137 LOGGER.debug(f"Writing blob: [{digest}]")
138 storage = self._storage_from_digest(digest)
139 storage.commit_write(digest, write_session)
141 def bulk_delete(self, digests: List[Digest]) -> List[str]:
142 failed_deletions: List[str] = []
143 for result in self._map(_bulk_delete_for_storage, self._partition_digests(digests).items()):
144 failed_deletions.extend(result)
146 return failed_deletions
148 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
149 missing_blobs: List[Digest] = []
151 for result in self._map(_fmb_for_storage, self._partition_digests(digests).items()):
152 missing_blobs.extend(result)
154 return missing_blobs
156 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
157 partitioned_digests: Dict[StorageABC, List[Tuple[Digest, bytes]]] = defaultdict(list)
158 idx_map: Dict[StorageABC, List[int]] = defaultdict(list)
159 for orig_idx, digest_tuple in enumerate(blobs):
160 storage = self._storage_from_digest(digest_tuple[0])
161 partitioned_digests[storage].append(digest_tuple)
162 idx_map[storage].append(orig_idx)
164 results: List[Status] = [Status(code=code_pb2.INTERNAL, message="inconsistent batch results")] * len(blobs)
165 for storage, statuses in self._map(_bulk_update_for_storage, partitioned_digests.items()):
166 for status_idx, status in enumerate(statuses):
167 results[idx_map[storage][status_idx]] = status
168 return results
170 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
171 bulk_read_results: Dict[str, bytes] = {}
172 for result in self._map(_bulk_read_for_storage, self._partition_digests(digests).items()):
173 bulk_read_results.update(result)
175 return bulk_read_results
177 def set_instance_name(self, instance_name: str) -> None:
178 super().set_instance_name(instance_name)
179 for storage_tuple in self._storages:
180 storage_tuple.storage.set_instance_name(instance_name)