Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/replicated.py: 93.01%
186 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-04-14 16:27 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-04-14 16:27 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17ReplicatedStorage
18=========================
20A storage provider which stores data in multiple storages, replicating
21any data missing in some but present in others.
23"""
24import queue
25import threading
26from contextlib import ExitStack
27from typing import IO, Any, List
29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
30from buildgrid._protos.google.rpc import code_pb2
31from buildgrid._protos.google.rpc.status_pb2 import Status
32from buildgrid.server.decorators import timed
33from buildgrid.server.logging import buildgrid_logger
34from buildgrid.server.metrics_names import METRIC
35from buildgrid.server.metrics_utils import publish_counter_metric
36from buildgrid.server.settings import MAX_REQUEST_COUNT, MAX_REQUEST_SIZE
37from buildgrid.server.threading import ContextWorker
38from buildgrid.server.utils.digests import HashableDigest
40from .storage_abc import StorageABC
42LOGGER = buildgrid_logger(__name__)
45class ReplicatedStorage(StorageABC):
46 TYPE = "Replicated"
48 def __init__(self, storages: list[StorageABC], replication_queue_size: int = 0) -> None:
49 self._stack = ExitStack()
50 self._storages = dict(enumerate(storages))
51 self._enable_replicator_thread = False
53 self._replica_queue: queue.Queue[Any] = queue.Queue(maxsize=replication_queue_size)
54 self.replication_worker = ContextWorker(name="BlobReplicator", target=self.replication_loop)
55 if replication_queue_size > 0:
56 self._enable_replicator_thread = True
58 def start(self) -> None:
59 for storage in self._storages.values():
60 self._stack.enter_context(storage)
61 if self._enable_replicator_thread:
62 self._stack.enter_context(self.replication_worker)
64 def stop(self) -> None:
65 self._stack.close()
67 def replication_loop(self, shutdown_requested: threading.Event) -> None:
68 # Go through all items in the replication queue and call either
69 # get_blob or bulk_read_blobs on them to kick off replication
70 # to all necessary storages
71 while not shutdown_requested.is_set():
72 try:
73 digests_to_replicate: List[Digest] = self._replica_queue.get(timeout=1)
74 except queue.Empty:
75 continue
76 try:
77 # Go through the digests to replicate and batch them where possible
78 batch: List[Digest] = []
79 batch_size = 0
80 for digest in digests_to_replicate:
81 if digest.size_bytes > MAX_REQUEST_SIZE:
82 _ = self.get_blob(digest)
83 continue
85 if len(batch) + 1 > MAX_REQUEST_COUNT or batch_size + digest.size_bytes > MAX_REQUEST_SIZE:
86 _ = self.bulk_read_blobs(batch)
87 batch = []
88 batch_size = 0
90 batch.append(digest)
91 batch_size += digest.size_bytes
93 if len(batch) > 0:
94 _ = self.bulk_read_blobs(batch)
95 except Exception:
96 LOGGER.exception(f"Caught exception while replicating {digests_to_replicate}")
97 shutdown_requested.wait(timeout=1)
98 self._replica_queue.task_done()
100 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
101 def has_blob(self, digest: Digest) -> bool:
102 has_blob: set[int] = set(i for i in self._storages if self._storages[i].has_blob(digest))
103 missing_blob = set(self._storages.keys()) - has_blob
104 if len(missing_blob) < len(self._storages):
105 publish_counter_metric(METRIC.STORAGE.REPLICATED.REQUIRED_REPLICATION_COUNT, len(missing_blob))
106 return len(has_blob) > 0
108 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
109 def get_blob(self, digest: Digest) -> IO[bytes] | None:
110 has_blob: set[int] = set(i for i in self._storages if self._storages[i].has_blob(digest))
111 missing_blob = set(self._storages.keys()) - has_blob
112 blob = None
113 failed_writes = 0
114 for idx in has_blob:
115 if blob := self._storages[idx].get_blob(digest):
116 break
117 LOGGER.error(
118 "Storage shard reported digest exists but downloading failed.",
119 tags=dict(shard_index=idx, digest=digest),
120 )
121 missing_blob.add(idx)
122 if len(missing_blob) < len(self._storages):
123 assert blob is not None
124 for idx in missing_blob:
125 try:
126 self._storages[idx].commit_write(digest, blob)
127 LOGGER.debug("Replicated digest to storage shard.", tags=dict(shard_index=idx, digest=digest))
128 except Exception as e:
129 LOGGER.warning(
130 f"Failed to replicate digest to storage shard: {e}.", tags=dict(shard_index=idx, digest=digest)
131 )
132 failed_writes += 1
133 blob.seek(0)
135 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, failed_writes)
136 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_COUNT, len(missing_blob) - failed_writes)
137 return blob
139 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
140 def delete_blob(self, digest: Digest) -> None:
141 for storage in self._storages.values():
142 storage.delete_blob(digest)
144 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
145 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
146 failed_writes = 0
147 error_msgs: list[str] = []
148 for idx, storage in self._storages.items():
149 try:
150 storage.commit_write(digest, write_session)
151 except Exception as error:
152 LOGGER.warning(
153 f"Failed to write digest to storage shard: {error}", tags=dict(shard_index=idx, digest=digest)
154 )
155 error_msgs.append(str(error))
156 failed_writes += 1
157 write_session.seek(0)
159 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, failed_writes)
160 if failed_writes == len(self._storages):
161 error_string = "Writes to all storages failed with the following errors:\n"
162 error_string += "\n".join(error_msgs)
163 LOGGER.error(error_string)
164 raise RuntimeError(error_string)
166 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE)
167 def bulk_delete(self, digests: list[Digest]) -> list[str]:
168 failed_deletions_set: set[str] = set()
169 for storage in self._storages.values():
170 failed_deletions_set.union(storage.bulk_delete(digests))
171 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(failed_deletions_set), type=self.TYPE)
172 return list(failed_deletions_set)
174 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
175 def missing_blobs(self, digests: list[Digest]) -> list[Digest]:
176 """Call missing_blobs on each storage and only report a blob is missing if it's in none of the
177 storages. The number of blobs missing from a storage but present in others is logged and published
178 as a metric
179 """
180 missing_for_storage: dict[int, set[HashableDigest]] = {}
181 for idx, storage in self._storages.items():
182 response = storage.missing_blobs(digests)
183 missing_for_storage[idx] = {HashableDigest(digest.hash, digest.size_bytes) for digest in response}
185 # Find the set of inconsistent digests, defined as digests which are missing in some, but not all, storages.
186 missing_from_all_storages = set.intersection(*missing_for_storage.values())
187 inconsistent_digests: set[HashableDigest] = (
188 set.union(*missing_for_storage.values()) - missing_from_all_storages
189 )
190 for idx, missing_digests in missing_for_storage.items():
191 inconsistent_digests_for_storage = missing_digests & inconsistent_digests
192 if inconsistent_digests_for_storage:
193 LOGGER.info(
194 "Storage shard has blobs which need to be replicated.",
195 tags=dict(shard_index=idx, digest_count=len(inconsistent_digests_for_storage)),
196 )
198 if self._enable_replicator_thread and len(inconsistent_digests) > 0:
199 try:
200 self._replica_queue.put_nowait([x.to_digest() for x in inconsistent_digests])
201 except queue.Full:
202 LOGGER.warning(
203 "Digests to be replicated were skipped due to full replication queue.",
204 tags=dict(skipped_digests=len(inconsistent_digests)),
205 )
206 publish_counter_metric(
207 METRIC.STORAGE.REPLICATED.REPLICATION_QUEUE_FULL_COUNT, len(inconsistent_digests_for_storage)
208 )
209 publish_counter_metric(METRIC.STORAGE.REPLICATED.REQUIRED_REPLICATION_COUNT, len(inconsistent_digests))
210 missing_blobs_list = [hdigest.to_digest() for hdigest in missing_from_all_storages]
211 return missing_blobs_list
213 # Bulk write to all storages. Errors are not fatal as long as one storage is
214 # successfully written to
215 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
216 def bulk_update_blobs(self, blobs: list[tuple[Digest, bytes]]) -> list[Status]:
217 digest_result: dict[HashableDigest, Status] = {}
218 errored_blobs_total = 0
219 for idx, storage in self._storages.items():
220 errored_blobs_for_storage = 0
221 results = storage.bulk_update_blobs(blobs)
222 for digest_blob_tuple, result in zip(blobs, results):
223 digest, _ = digest_blob_tuple
224 hdigest = HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes)
226 if result.code != code_pb2.OK:
227 errored_blobs_for_storage += 1
229 # Keep track of the status code for this digest, preferring OK over errors
230 if hdigest not in digest_result or digest_result[hdigest].code != code_pb2.OK:
231 digest_result[hdigest] = result
233 if errored_blobs_for_storage > 0:
234 LOGGER.warning(
235 "Failed to write all digests to storage shard.",
236 tags=dict(shard_index=idx, digest_count=len(results), error_count=errored_blobs_for_storage),
237 )
238 errored_blobs_total += errored_blobs_for_storage
240 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, errored_blobs_total)
241 return [digest_result[hdigest] for hdigest in digest_result]
243 # Read blobs from all storages, writing any missing blobs into storages missing
244 # them from storages that have them
245 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
246 def bulk_read_blobs(self, digests: list[Digest]) -> dict[str, bytes]:
247 digest_set = set(HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) for digest in digests)
248 missing_blobs: dict[int, set[HashableDigest]] = {}
249 bulk_read_results: dict[str, bytes] = {}
250 # Find what blobs are missing for this storage and read what's available
251 for idx, storage in self._storages.items():
252 missing_blobs[idx] = set(
253 [
254 HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes)
255 for digest in storage.missing_blobs(digests)
256 ]
257 )
258 present_blobs = digest_set - missing_blobs[idx]
259 blobs_to_read = [x.to_digest() for x in present_blobs if x.hash not in bulk_read_results]
260 bulk_read_results.update(self._storages[idx].bulk_read_blobs(blobs_to_read))
262 replicated_blobs_count = 0
263 errored_blobs_count = 0
264 for idx, missing in missing_blobs.items():
265 # Write any blobs that exist in other storages which are missing from this storage
266 write_batch: list[tuple[Digest, bytes]] = []
267 for digest in missing:
268 if digest.hash in bulk_read_results:
269 write_batch.append((digest.to_digest(), bulk_read_results[digest.hash]))
270 if write_batch:
271 update_results = self._storages[idx].bulk_update_blobs(write_batch)
272 for result in update_results:
273 if result.code != code_pb2.OK:
274 errored_blobs_count += 1
275 else:
276 replicated_blobs_count += 1
277 LOGGER.debug(
278 "Replicated blobs to storage shard.", tags=dict(shard_index=idx, digest_count=len(write_batch))
279 )
280 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_COUNT, replicated_blobs_count)
281 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, errored_blobs_count)
282 return bulk_read_results