Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/mirrored_cache.py: 86.96%

46 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-02-11 15:07 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

16from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

17from buildgrid.server.exceptions import NotFoundError 

18from buildgrid.server.logging import buildgrid_logger 

19from buildgrid.server.metrics_names import METRIC 

20from buildgrid.server.metrics_utils import publish_counter_metric 

21 

22LOGGER = buildgrid_logger(__name__) 

23 

24 

25class MirroredCache(ActionCacheABC): 

26 """Synchronize two mirrored action-caches to the same state""" 

27 

28 def __init__( 

29 self, 

30 first: ActionCacheABC, 

31 second: ActionCacheABC, 

32 ): 

33 # Don't pass a storage object to super class 

34 # as blob existence check will be performed by each object. 

35 # On the other hand, the storages of these two caches should point to the same one 

36 # or also mirrored with each other. 

37 super().__init__(allow_updates=True, storage=None) 

38 self._first = first 

39 self._second = second 

40 

41 def start(self) -> None: 

42 self._first.start() 

43 self._second.start() 

44 

45 def stop(self) -> None: 

46 self._second.stop() 

47 self._first.stop() 

48 

49 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

50 self._first.update_action_result(action_digest, action_result) 

51 self._second.update_action_result(action_digest, action_result) 

52 LOGGER.info("Finished dual write to both action-caches.", tags=dict(digest=action_digest)) 

53 

54 def get_action_result(self, action_digest: Digest) -> ActionResult: 

55 first_result = _try_get_action_result(self._first, action_digest) 

56 second_result = _try_get_action_result(self._second, action_digest) 

57 

58 if first_result is None and second_result is None: 

59 raise NotFoundError(f"Action result not found: {action_digest.hash}/{action_digest.size_bytes}") 

60 

61 if first_result is None: 

62 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1) 

63 self._first.update_action_result(action_digest, second_result) # type: ignore[arg-type] 

64 return second_result # type: ignore[return-value] 

65 

66 if second_result is None: 

67 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1) 

68 self._second.update_action_result(action_digest, first_result) 

69 return first_result 

70 

71 if first_result != second_result: 

72 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1) 

73 LOGGER.warning( 

74 "Different action results in mirrored caches.", 

75 tags=dict(first_result=first_result, second_result=second_result), 

76 ) 

77 return first_result 

78 

79 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MATCH_COUNT, 1) 

80 return first_result 

81 

82 

83def _try_get_action_result(cache: ActionCacheABC, digest: Digest) -> ActionResult | None: 

84 try: 

85 return cache.get_action_result(digest) 

86 except NotFoundError: 

87 return None