Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/mirrored_cache.py: 86.96%
46 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-02-11 15:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-02-11 15:07 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest
16from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
17from buildgrid.server.exceptions import NotFoundError
18from buildgrid.server.logging import buildgrid_logger
19from buildgrid.server.metrics_names import METRIC
20from buildgrid.server.metrics_utils import publish_counter_metric
22LOGGER = buildgrid_logger(__name__)
25class MirroredCache(ActionCacheABC):
26 """Synchronize two mirrored action-caches to the same state"""
28 def __init__(
29 self,
30 first: ActionCacheABC,
31 second: ActionCacheABC,
32 ):
33 # Don't pass a storage object to super class
34 # as blob existence check will be performed by each object.
35 # On the other hand, the storages of these two caches should point to the same one
36 # or also mirrored with each other.
37 super().__init__(allow_updates=True, storage=None)
38 self._first = first
39 self._second = second
41 def start(self) -> None:
42 self._first.start()
43 self._second.start()
45 def stop(self) -> None:
46 self._second.stop()
47 self._first.stop()
49 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None:
50 self._first.update_action_result(action_digest, action_result)
51 self._second.update_action_result(action_digest, action_result)
52 LOGGER.info("Finished dual write to both action-caches.", tags=dict(digest=action_digest))
54 def get_action_result(self, action_digest: Digest) -> ActionResult:
55 first_result = _try_get_action_result(self._first, action_digest)
56 second_result = _try_get_action_result(self._second, action_digest)
58 if first_result is None and second_result is None:
59 raise NotFoundError(f"Action result not found: {action_digest.hash}/{action_digest.size_bytes}")
61 if first_result is None:
62 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1)
63 self._first.update_action_result(action_digest, second_result) # type: ignore[arg-type]
64 return second_result # type: ignore[return-value]
66 if second_result is None:
67 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1)
68 self._second.update_action_result(action_digest, first_result)
69 return first_result
71 if first_result != second_result:
72 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1)
73 LOGGER.warning(
74 "Different action results in mirrored caches.",
75 tags=dict(first_result=first_result, second_result=second_result),
76 )
77 return first_result
79 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MATCH_COUNT, 1)
80 return first_result
83def _try_get_action_result(cache: ActionCacheABC, digest: Digest) -> ActionResult | None:
84 try:
85 return cache.get_action_result(digest)
86 except NotFoundError:
87 return None