Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/sharded.py: 99.06%
106 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2023 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from collections import defaultdict
17from contextlib import ExitStack
18from typing import IO, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar
20import mmh3
22from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
23from buildgrid._protos.google.rpc import code_pb2
24from buildgrid._protos.google.rpc.status_pb2 import Status
25from buildgrid.server.decorators import timed
26from buildgrid.server.logging import buildgrid_logger
27from buildgrid.server.metrics_names import METRIC
28from buildgrid.server.metrics_utils import publish_counter_metric
29from buildgrid.server.threading import ContextThreadPoolExecutor
31from .storage_abc import StorageABC
33LOGGER = buildgrid_logger(__name__)
36_T = TypeVar("_T")
37_R = TypeVar("_R")
40# wrapper functions for the bulk StorageABC interfaces
41def _bulk_delete_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[str]:
42 storage, digests = storage_digests
43 return storage.bulk_delete(digests)
46def _fmb_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[Digest]:
47 storage, digests = storage_digests
48 return storage.missing_blobs(digests)
51def _bulk_update_for_storage(
52 storage_digests: Tuple[StorageABC, List[Tuple[Digest, bytes]]]
53) -> Tuple[StorageABC, List[Status]]:
54 storage, digest_tuples = storage_digests
55 return storage, storage.bulk_update_blobs(digest_tuples)
58def _bulk_read_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> Dict[str, bytes]:
59 storage, digests = storage_digests
60 return storage.bulk_read_blobs(digests)
63class ShardedStorage(StorageABC):
64 TYPE = "Sharded"
66 def __init__(self, storages: Dict[str, StorageABC], thread_pool_size: Optional[int] = None):
67 self._stack = ExitStack()
68 if not storages:
69 raise ValueError("ShardedStorage requires at least one shard")
70 self._storages = storages
71 self._threadpool = None
72 if thread_pool_size:
73 self._threadpool = ContextThreadPoolExecutor(thread_pool_size, "sharded-storage")
75 def start(self) -> None:
76 if self._threadpool:
77 self._stack.enter_context(self._threadpool)
78 for storage in self._storages.values():
79 self._stack.enter_context(storage)
81 def stop(self) -> None:
82 self._stack.close()
83 LOGGER.info(f"Stopped {type(self).__name__}")
85 def _storage_from_digest(self, digest: Digest) -> StorageABC:
86 def _score(shard_name: str, digest: Digest) -> int:
87 hash = mmh3.hash(f"{shard_name}\t{digest.hash}", signed=False)
88 return hash
90 shard_name = min(self._storages.keys(), key=lambda name: _score(name, digest))
91 return self._storages[shard_name]
93 def _partition_digests(self, digests: List[Digest]) -> Dict[StorageABC, List[Digest]]:
94 partition: Dict[StorageABC, List[Digest]] = defaultdict(list)
95 for digest in digests:
96 storage = self._storage_from_digest(digest)
97 partition[storage].append(digest)
98 return partition
100 def _map(self, fn: Callable[[_T], _R], args: Iterable[_T]) -> Iterator[_R]:
101 if self._threadpool:
102 return self._threadpool.map(fn, args)
103 else:
104 return map(fn, args)
106 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
107 def has_blob(self, digest: Digest) -> bool:
108 return self._storage_from_digest(digest).has_blob(digest)
110 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
111 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
112 return self._storage_from_digest(digest).get_blob(digest)
114 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
115 def delete_blob(self, digest: Digest) -> None:
116 self._storage_from_digest(digest).delete_blob(digest)
118 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
119 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
120 self._storage_from_digest(digest).commit_write(digest, write_session)
122 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
123 def bulk_delete(self, digests: List[Digest]) -> List[str]:
124 failed_deletions: List[str] = []
125 for result in self._map(_bulk_delete_for_storage, self._partition_digests(digests).items()):
126 failed_deletions.extend(result)
128 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(failed_deletions), type=self.TYPE)
129 return failed_deletions
131 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
132 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
133 missing_blobs: List[Digest] = []
135 for result in self._map(_fmb_for_storage, self._partition_digests(digests).items()):
136 missing_blobs.extend(result)
138 return missing_blobs
140 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
141 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
142 partitioned_digests: Dict[StorageABC, List[Tuple[Digest, bytes]]] = defaultdict(list)
143 idx_map: Dict[StorageABC, List[int]] = defaultdict(list)
144 for orig_idx, digest_tuple in enumerate(blobs):
145 storage = self._storage_from_digest(digest_tuple[0])
146 partitioned_digests[storage].append(digest_tuple)
147 idx_map[storage].append(orig_idx)
149 results: List[Status] = [Status(code=code_pb2.INTERNAL, message="inconsistent batch results")] * len(blobs)
150 for storage, statuses in self._map(_bulk_update_for_storage, partitioned_digests.items()):
151 for status_idx, status in enumerate(statuses):
152 results[idx_map[storage][status_idx]] = status
153 return results
155 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
156 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
157 bulk_read_results: Dict[str, bytes] = {}
158 for result in self._map(_bulk_read_for_storage, self._partition_digests(digests).items()):
159 bulk_read_results.update(result)
161 return bulk_read_results