Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/mirrored_cache.py: 89.36%

47 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16from typing import Optional 

17 

18from google.protobuf import text_format 

19 

20from buildgrid._exceptions import NotFoundError 

21from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

22from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

23from buildgrid.server.metrics_names import AC_MIRRORED_MATCH_COUNT_METRIC_NAME, AC_MIRRORED_MISMATCH_COUNT_METRIC_NAME 

24from buildgrid.server.metrics_utils import publish_counter_metric 

25 

26LOGGER = logging.getLogger(__name__) 

27 

28 

29class MirroredCache(ActionCacheABC): 

30 """Synchronize two mirrored action-caches to the same state""" 

31 

32 def __init__( 

33 self, 

34 first: ActionCacheABC, 

35 second: ActionCacheABC, 

36 ): 

37 # Don't pass a storage object to super class 

38 # as blob existence check will be performed by each object. 

39 # On the other hand, the storages of these two caches should point to the same one 

40 # or also mirrored with each other. 

41 super().__init__(allow_updates=True, storage=None) 

42 self._first = first 

43 self._second = second 

44 

45 def start(self) -> None: 

46 self._first.start() 

47 self._second.start() 

48 

49 def stop(self) -> None: 

50 self._second.stop() 

51 self._first.stop() 

52 

53 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

54 self._first.update_action_result(action_digest, action_result) 

55 self._second.update_action_result(action_digest, action_result) 

56 LOGGER.info(f"Finished dual write to both action-caches: {text_format.MessageToString(action_digest)}") 

57 

58 def get_action_result(self, action_digest: Digest) -> ActionResult: 

59 first_result = MirroredCache._try_get_action_result(self._first, action_digest) 

60 second_result = MirroredCache._try_get_action_result(self._second, action_digest) 

61 

62 if first_result is None and second_result is None: 

63 raise NotFoundError(f"Action result not found: {action_digest.hash}/{action_digest.size_bytes}") 

64 elif first_result is None: 

65 publish_counter_metric( 

66 AC_MIRRORED_MISMATCH_COUNT_METRIC_NAME, 1, {"instance_name": self._instance_name or ""} 

67 ) 

68 self._first.update_action_result(action_digest, second_result) # type: ignore[arg-type] 

69 return second_result # type: ignore[return-value] 

70 elif second_result is None: 

71 publish_counter_metric( 

72 AC_MIRRORED_MISMATCH_COUNT_METRIC_NAME, 1, {"instance_name": self._instance_name or ""} 

73 ) 

74 self._second.update_action_result(action_digest, first_result) 

75 return first_result 

76 else: 

77 if first_result != second_result: 

78 publish_counter_metric( 

79 AC_MIRRORED_MISMATCH_COUNT_METRIC_NAME, 1, {"instance_name": self._instance_name or ""} 

80 ) 

81 LOGGER.warning( 

82 f"Different action results in mirrored caches, first: {text_format.MessageToString(first_result)}, " 

83 f"second: {text_format.MessageToString(second_result)}" 

84 ) 

85 else: 

86 publish_counter_metric( 

87 AC_MIRRORED_MATCH_COUNT_METRIC_NAME, 1, {"instance_name": self._instance_name or ""} 

88 ) 

89 return first_result 

90 

91 def _try_get_action_result(cache: ActionCacheABC, digest: Digest) -> Optional[ActionResult]: 

92 try: 

93 return cache.get_action_result(digest) 

94 except NotFoundError: 

95 return None