Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/with_cache.py: 97.69%
130 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17WithCacheStorage
18==================
20A storage provider that first checks a cache, then tries a slower
21fallback provider.
23To ensure clients can reliably store blobs in CAS, only `get_blob`
24calls are cached -- `has_blob` and `missing_blobs` will always query
25the fallback.
26"""
29from contextlib import ExitStack
30from typing import IO, Dict, List, Optional, Tuple
32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
33from buildgrid._protos.google.rpc.status_pb2 import Status
34from buildgrid.server.decorators import timed
35from buildgrid.server.logging import buildgrid_logger
36from buildgrid.server.metrics_names import METRIC
37from buildgrid.server.metrics_utils import publish_counter_metric, publish_distribution_metric
38from buildgrid.server.settings import MAX_IN_MEMORY_BLOB_SIZE_BYTES
39from buildgrid.server.threading import ContextThreadPoolExecutor
41from .storage_abc import StorageABC, create_write_session
43LOGGER = buildgrid_logger(__name__)
46class WithCacheStorage(StorageABC):
47 TYPE = "WithCache"
49 def __init__(
50 self,
51 cache: StorageABC,
52 fallback: StorageABC,
53 defer_fallback_writes: bool = False,
54 fallback_writer_threads: int = 20,
55 ) -> None:
56 self._stack = ExitStack()
57 self._cache = cache
58 self._fallback = fallback
59 self._defer_fallback_writes = defer_fallback_writes
60 self._fallback_writer_threads = fallback_writer_threads
61 self._executor = ContextThreadPoolExecutor(self._fallback_writer_threads, "WithCacheFallbackWriter")
63 def start(self) -> None:
64 if self._defer_fallback_writes:
65 self._stack.enter_context(self._executor)
66 self._stack.enter_context(self._cache)
67 self._stack.enter_context(self._fallback)
69 def stop(self) -> None:
70 self._stack.close()
71 LOGGER.info(f"Stopped {type(self).__name__}")
73 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
74 def has_blob(self, digest: Digest) -> bool:
75 try:
76 if self._defer_fallback_writes and self._cache.has_blob(digest):
77 return True
78 except Exception:
79 LOGGER.warning(
80 "Failed to check existence of digest in cache storage.", tags=dict(digest=digest), exc_info=True
81 )
83 return self._fallback.has_blob(digest)
85 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
86 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
87 try:
88 cache_result = self._cache.get_blob(digest)
89 if cache_result is not None:
90 publish_distribution_metric(METRIC.STORAGE.WITH_CACHE.CACHE_HIT_COUNT, 1)
91 return cache_result
92 except Exception:
93 LOGGER.warning("Failed to read digest from cache storage.", tags=dict(digest=digest), exc_info=True)
95 fallback_result = self._fallback.get_blob(digest)
96 if fallback_result is None:
97 return None
99 publish_distribution_metric(METRIC.STORAGE.WITH_CACHE.CACHE_MISS_COUNT, 1)
101 try:
102 self._cache.commit_write(digest, fallback_result)
103 except Exception:
104 LOGGER.warning(
105 "Failed to write digest to cache storage after reading blob.", tags=dict(digest=digest), exc_info=True
106 )
108 fallback_result.seek(0)
109 return fallback_result
111 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
112 def delete_blob(self, digest: Digest) -> None:
113 self._fallback.delete_blob(digest)
114 try:
115 self._cache.delete_blob(digest)
116 except Exception:
117 LOGGER.warning("Failed to delete digest from cache storage.", tags=dict(digest=digest), exc_info=True)
119 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
120 def bulk_delete(self, digests: List[Digest]) -> List[str]:
121 # Only report back failures from the fallback
122 try:
123 cache_failures = self._cache.bulk_delete(digests)
124 for failure in cache_failures:
125 LOGGER.warning("Failed to delete digest from cache storage.", tags=dict(digest=failure))
126 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(cache_failures), type=self.TYPE)
127 except Exception:
128 LOGGER.warning("Failed to bulk delete blobs from cache storage.", exc_info=True)
130 fallback_failures = self._fallback.bulk_delete(digests)
131 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(fallback_failures), type=self.TYPE)
132 return fallback_failures
134 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
135 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
136 written_to_cache = False
137 try:
138 self._cache.commit_write(digest, write_session)
139 written_to_cache = True
140 except Exception:
141 LOGGER.warning(
142 "Failed to commit write of digest to cache storage.", tags=dict(digest=digest), exc_info=True
143 )
145 if written_to_cache and self._defer_fallback_writes:
146 write_session.seek(0)
147 deferred_session = create_write_session(digest)
148 while data := write_session.read(MAX_IN_MEMORY_BLOB_SIZE_BYTES):
149 deferred_session.write(data)
151 def deferred_submit() -> None:
152 with deferred_session:
153 self._fallback.commit_write(digest, deferred_session)
155 self._executor.submit(deferred_submit)
156 else:
157 self._fallback.commit_write(digest, write_session)
159 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
160 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
161 return self._fallback.missing_blobs(digests)
163 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
164 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
165 try:
166 self._cache.bulk_update_blobs(blobs)
167 except Exception:
168 LOGGER.warning("Failed to bulk update blobs in cache storage.", exc_info=True)
170 return self._fallback.bulk_update_blobs(blobs)
172 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
173 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
174 try:
175 cache_blobs = self._cache.bulk_read_blobs(digests)
176 except Exception:
177 LOGGER.warning("Failed to bulk read blobs from cache storage.", exc_info=True)
178 cache_blobs = {}
180 publish_distribution_metric(METRIC.STORAGE.WITH_CACHE.CACHE_HIT_COUNT, len(cache_blobs))
182 uncached_digests = [digest for digest in digests if cache_blobs.get(digest.hash, None) is None]
184 publish_distribution_metric(METRIC.STORAGE.WITH_CACHE.CACHE_MISS_COUNT, len(digests) - len(cache_blobs))
186 metric_cache_percent = 0.0
187 if len(digests) > 0:
188 metric_cache_percent = len(cache_blobs) / len(digests) * 100
189 publish_distribution_metric(METRIC.STORAGE.WITH_CACHE.CACHE_HIT_PERCENT, metric_cache_percent)
191 fallback_blobs = self._fallback.bulk_read_blobs(uncached_digests)
192 cache_blobs.update(fallback_blobs)
194 uncached_blobs = []
195 for digest in uncached_digests:
196 blob = fallback_blobs.get(digest.hash)
197 if blob is not None:
198 uncached_blobs.append((digest, blob))
200 try:
201 self._cache.bulk_update_blobs(uncached_blobs)
202 except Exception:
203 LOGGER.warning("Failed to add blobs to cache storage after bulk read.", exc_info=True)
205 return cache_blobs