Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/replicated.py: 96.62%
148 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17ReplicatedStorage
18=========================
20A storage provider which stores data in multiple storages, replicating
21any data missing in some but present in others.
23"""
26from contextlib import ExitStack
27from typing import IO, Dict, List, Optional, Set, Tuple
29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
30from buildgrid._protos.google.rpc import code_pb2
31from buildgrid._protos.google.rpc.status_pb2 import Status
32from buildgrid.server.decorators import timed
33from buildgrid.server.logging import buildgrid_logger
34from buildgrid.server.metrics_names import METRIC
35from buildgrid.server.metrics_utils import publish_counter_metric
36from buildgrid.server.utils.digests import HashableDigest
38from .storage_abc import StorageABC
40LOGGER = buildgrid_logger(__name__)
43class ReplicatedStorage(StorageABC):
44 TYPE = "Replicated"
46 def __init__(self, storages: List[StorageABC]) -> None:
47 self._stack = ExitStack()
48 self._storages = dict(enumerate(storages))
50 def start(self) -> None:
51 for storage in self._storages.values():
52 self._stack.enter_context(storage)
54 def stop(self) -> None:
55 self._stack.close()
57 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
58 def has_blob(self, digest: Digest) -> bool:
59 has_blob: Set[int] = set(i for i in self._storages if self._storages[i].has_blob(digest))
60 missing_blob = set(self._storages.keys()) - has_blob
61 if len(missing_blob) < len(self._storages):
62 publish_counter_metric(METRIC.STORAGE.REPLICATED.REQUIRED_REPLICATION_COUNT, len(missing_blob))
63 return len(has_blob) > 0
65 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
66 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
67 has_blob: Set[int] = set(i for i in self._storages if self._storages[i].has_blob(digest))
68 missing_blob = set(self._storages.keys()) - has_blob
69 blob = None
70 failed_writes = 0
71 for idx in has_blob:
72 if blob := self._storages[idx].get_blob(digest):
73 break
74 LOGGER.error(
75 "Storage shard reported digest exists but downloading failed.",
76 tags=dict(shard_index=idx, digest=digest),
77 )
78 missing_blob.add(idx)
79 if len(missing_blob) < len(self._storages):
80 assert blob is not None
81 for idx in missing_blob:
82 try:
83 self._storages[idx].commit_write(digest, blob)
84 LOGGER.debug("Replicated digest to storage shard.", tags=dict(shard_index=idx, digest=digest))
85 except Exception as e:
86 LOGGER.warning(
87 f"Failed to replicate digest to storage shard: {e}.", tags=dict(shard_index=idx, digest=digest)
88 )
89 failed_writes += 1
90 blob.seek(0)
92 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, failed_writes)
93 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_COUNT, len(missing_blob) - failed_writes)
94 return blob
96 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
97 def delete_blob(self, digest: Digest) -> None:
98 for storage in self._storages.values():
99 storage.delete_blob(digest)
101 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
102 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
103 failed_writes = 0
104 error_msgs: List[str] = []
105 for idx, storage in self._storages.items():
106 try:
107 storage.commit_write(digest, write_session)
108 except Exception as error:
109 LOGGER.warning(
110 f"Failed to write digest to storage shard: {error}", tags=dict(shard_index=idx, digest=digest)
111 )
112 error_msgs.append(str(error))
113 failed_writes += 1
114 write_session.seek(0)
116 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, failed_writes)
117 if failed_writes == len(self._storages):
118 error_string = "Writes to all storages failed with the following errors:\n"
119 error_string += "\n".join(error_msgs)
120 LOGGER.error(error_string)
121 raise RuntimeError(error_string)
123 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE)
124 def bulk_delete(self, digests: List[Digest]) -> List[str]:
125 failed_deletions_set: Set[str] = set()
126 for storage in self._storages.values():
127 failed_deletions_set.union(storage.bulk_delete(digests))
128 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(failed_deletions_set), type=self.TYPE)
129 return list(failed_deletions_set)
131 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
132 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
133 """Call missing_blobs on each storage and only report a blob is missing if it's in none of the
134 storages. The number of blobs missing from a storage but present in others is logged and published
135 as a metric
136 """
137 digest_set: Set[HashableDigest] = {HashableDigest(digest.hash, digest.size_bytes) for digest in digests}
138 missing_from_all_storages = digest_set.copy()
139 missing_for_storage: Dict[int, Set[HashableDigest]] = {}
141 blobs_needing_replication_count = 0
142 for idx, storage in self._storages.items():
143 response = storage.missing_blobs(digests)
144 missing_for_storage[idx] = {HashableDigest(digest.hash, digest.size_bytes) for digest in response}
146 missing_from_all_storages = set.intersection(*missing_for_storage.values())
147 digests_in_at_least_one_storage: Set[HashableDigest] = digest_set - missing_from_all_storages
148 # Figure out what blobs are present in some storages but not all to report metrics
149 for idx, missing_digests in missing_for_storage.items():
150 inconsistent_digests_for_storage = missing_digests & digests_in_at_least_one_storage
151 if inconsistent_digests_for_storage:
152 LOGGER.info(
153 "Storage shard has blobs which need to be replicated.",
154 tags=dict(shard_index=idx, digest_count=len(inconsistent_digests_for_storage)),
155 )
156 blobs_needing_replication_count += len(inconsistent_digests_for_storage)
158 publish_counter_metric(METRIC.STORAGE.REPLICATED.REQUIRED_REPLICATION_COUNT, blobs_needing_replication_count)
159 missing_blobs_list = [hdigest.to_digest() for hdigest in missing_from_all_storages]
160 return missing_blobs_list
162 # Bulk write to all storages. Errors are not fatal as long as one storage is
163 # successfully written to
164 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
165 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
166 digest_result: Dict[HashableDigest, Status] = {}
167 errored_blobs_total = 0
168 for idx, storage in self._storages.items():
169 errored_blobs_for_storage = 0
170 results = storage.bulk_update_blobs(blobs)
171 for digest_blob_tuple, result in zip(blobs, results):
172 digest, _ = digest_blob_tuple
173 hdigest = HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes)
175 if result.code != code_pb2.OK:
176 errored_blobs_for_storage += 1
178 # Keep track of the status code for this digest, preferring OK over errors
179 if hdigest not in digest_result or digest_result[hdigest].code != code_pb2.OK:
180 digest_result[hdigest] = result
182 if errored_blobs_for_storage > 0:
183 LOGGER.warning(
184 "Failed to write all digests to storage shard.",
185 tags=dict(shard_index=idx, digest_count=len(results), error_count=errored_blobs_for_storage),
186 )
187 errored_blobs_total += errored_blobs_for_storage
189 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, errored_blobs_total)
190 return [digest_result[hdigest] for hdigest in digest_result]
192 # Read blobs from all storages, writing any missing blobs into storages missing
193 # them from storages that have them
194 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
195 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
196 digest_set = set(HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) for digest in digests)
197 missing_blobs: Dict[int, Set[HashableDigest]] = {}
198 bulk_read_results: Dict[str, bytes] = {}
199 # Find what blobs are missing for this storage and read what's available
200 for idx, storage in self._storages.items():
201 missing_blobs[idx] = set(
202 [
203 HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes)
204 for digest in storage.missing_blobs(digests)
205 ]
206 )
207 present_blobs = digest_set - missing_blobs[idx]
208 blobs_to_read = [x.to_digest() for x in present_blobs if x.hash not in bulk_read_results]
209 bulk_read_results.update(self._storages[idx].bulk_read_blobs(blobs_to_read))
211 replicated_blobs_count = 0
212 errored_blobs_count = 0
213 for idx, missing in missing_blobs.items():
214 # Write any blobs that exist in other storages which are missing from this storage
215 write_batch: List[Tuple[Digest, bytes]] = []
216 for digest in missing:
217 if digest.hash in bulk_read_results:
218 write_batch.append((digest.to_digest(), bulk_read_results[digest.hash]))
219 if write_batch:
220 update_results = self._storages[idx].bulk_update_blobs(write_batch)
221 for result in update_results:
222 if result.code != code_pb2.OK:
223 errored_blobs_count += 1
224 else:
225 replicated_blobs_count += 1
226 LOGGER.debug(
227 "Replicated blobs to storage shard.", tags=dict(shard_index=idx, digest_count=len(write_batch))
228 )
230 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_COUNT, replicated_blobs_count)
231 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, errored_blobs_count)
232 return bulk_read_results