Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/mirrored_cache.py: 87.23%

47 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from typing import Optional 

16 

17from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, Digest 

18from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

19from buildgrid.server.exceptions import NotFoundError 

20from buildgrid.server.logging import buildgrid_logger 

21from buildgrid.server.metrics_names import METRIC 

22from buildgrid.server.metrics_utils import publish_counter_metric 

23 

24LOGGER = buildgrid_logger(__name__) 

25 

26 

27class MirroredCache(ActionCacheABC): 

28 """Synchronize two mirrored action-caches to the same state""" 

29 

30 def __init__( 

31 self, 

32 first: ActionCacheABC, 

33 second: ActionCacheABC, 

34 ): 

35 # Don't pass a storage object to super class 

36 # as blob existence check will be performed by each object. 

37 # On the other hand, the storages of these two caches should point to the same one 

38 # or also mirrored with each other. 

39 super().__init__(allow_updates=True, storage=None) 

40 self._first = first 

41 self._second = second 

42 

43 def start(self) -> None: 

44 self._first.start() 

45 self._second.start() 

46 

47 def stop(self) -> None: 

48 self._second.stop() 

49 self._first.stop() 

50 

51 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

52 self._first.update_action_result(action_digest, action_result) 

53 self._second.update_action_result(action_digest, action_result) 

54 LOGGER.info("Finished dual write to both action-caches.", tags=dict(digest=action_digest)) 

55 

56 def get_action_result(self, action_digest: Digest) -> ActionResult: 

57 first_result = _try_get_action_result(self._first, action_digest) 

58 second_result = _try_get_action_result(self._second, action_digest) 

59 

60 if first_result is None and second_result is None: 

61 raise NotFoundError(f"Action result not found: {action_digest.hash}/{action_digest.size_bytes}") 

62 

63 if first_result is None: 

64 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1) 

65 self._first.update_action_result(action_digest, second_result) # type: ignore[arg-type] 

66 return second_result # type: ignore[return-value] 

67 

68 if second_result is None: 

69 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1) 

70 self._second.update_action_result(action_digest, first_result) 

71 return first_result 

72 

73 if first_result != second_result: 

74 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MISMATCH_COUNT, 1) 

75 LOGGER.warning( 

76 "Different action results in mirrored caches.", 

77 tags=dict(first_result=first_result, second_result=second_result), 

78 ) 

79 return first_result 

80 

81 publish_counter_metric(METRIC.ACTION_CACHE.MIRRORED_MATCH_COUNT, 1) 

82 return first_result 

83 

84 

85def _try_get_action_result(cache: ActionCacheABC, digest: Digest) -> Optional[ActionResult]: 

86 try: 

87 return cache.get_action_result(digest) 

88 except NotFoundError: 

89 return None