Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/mirrored_cache.py: 87.23%
47 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from typing import Optional
17from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest
18from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC
19from buildgrid.server.exceptions import NotFoundError
20from buildgrid.server.logging import buildgrid_logger
21from buildgrid.server.metrics_names import METRIC
22from buildgrid.server.metrics_utils import publish_counter_metric
24LOGGER = buildgrid_logger(__name__)
27class MirroredCache(ActionCacheABC):
28 """Synchronize two mirrored action-caches to the same state"""
30 def __init__(
31 self,
32 first: ActionCacheABC,
33 second: ActionCacheABC,
34 ):
35 # Don't pass a storage object to super class
36 # as blob existence check will be performed by each object.
37 # On the other hand, the storages of these two caches should point to the same one
38 # or also mirrored with each other.
39 super().__init__(allow_updates=True, storage=None)
40 self._first = first
41 self._second = second
43 def start(self) -> None:
44 self._first.start()
45 self._second.start()
47 def stop(self) -> None:
48 self._second.stop()
49 self._first.stop()
51 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None:
52 self._first.update_action_result(action_digest, action_result)
53 self._second.update_action_result(action_digest, action_result)
54 LOGGER.info("Finished dual write to both action-caches.", tags=dict(digest=action_digest))
56 def get_action_result(self, action_digest: Digest) -> ActionResult:
57 first_result = _try_get_action_result(self._first, action_digest)
58 second_result = _try_get_action_result(self._second, action_digest)
60 if first_result is None and second_result is None:
61 raise NotFoundError(f"Action result not found: {action_digest.hash}/{action_digest.size_bytes}")
63 if first_result is None:
64 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1)
65 self._first.update_action_result(action_digest, second_result) # type: ignore[arg-type]
66 return second_result # type: ignore[return-value]
68 if second_result is None:
69 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1)
70 self._second.update_action_result(action_digest, first_result)
71 return first_result
73 if first_result != second_result:
74 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1)
75 LOGGER.warning(
76 "Different action results in mirrored caches.",
77 tags=dict(first_result=first_result, second_result=second_result),
78 )
79 return first_result
81 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MATCH_COUNT, 1)
82 return first_result
85def _try_get_action_result(cache: ActionCacheABC, digest: Digest) -> Optional[ActionResult]:
86 try:
87 return cache.get_action_result(digest)
88 except NotFoundError:
89 return None