Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/action_cache_abc.py: 94.12%

85 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from abc import ABC, abstractmethod 

17from typing import Any, Optional, Set, TypeVar 

18 

19from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, ActionResult, Digest, Tree 

20from buildgrid.server.cas.instance import EMPTY_BLOB_DIGEST 

21from buildgrid.server.cas.storage.storage_abc import StorageABC 

22from buildgrid.server.exceptions import NotFoundError 

23from buildgrid.server.logging import buildgrid_logger 

24from buildgrid.server.metrics_names import METRIC 

25from buildgrid.server.metrics_utils import publish_counter_metric 

26from buildgrid.server.utils.digests import HashableDigest 

27 

28LOGGER = buildgrid_logger(__name__) 

29 

30 

31T = TypeVar("T", bound="ActionCacheABC") 

32 

33 

34class ActionCacheABC(ABC): 

35 def __init__(self, allow_updates: bool = False, storage: Optional[StorageABC] = None): 

36 self._allow_updates = allow_updates 

37 self._storage = storage 

38 

39 @property 

40 def allow_updates(self) -> bool: 

41 return self._allow_updates 

42 

43 def __enter__(self: T) -> T: 

44 self.start() 

45 return self 

46 

47 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

48 self.stop() 

49 

50 def start(self) -> None: 

51 if self._storage is not None: 

52 self._storage.start() 

53 

54 def stop(self) -> None: 

55 if self._storage is not None: 

56 self._storage.stop() 

57 LOGGER.info("Stopped ActionCache.") 

58 

59 @abstractmethod 

60 def get_action_result(self, action_digest: Digest) -> ActionResult: 

61 raise NotImplementedError() 

62 

63 @abstractmethod 

64 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

65 raise NotImplementedError() 

66 

67 def referenced_blobs_still_exist(self, action_digest: Digest, action_result: ActionResult) -> bool: 

68 if self._referenced_blobs_still_exist(action_digest, action_result): 

69 return True 

70 

71 publish_counter_metric(METRIC.ACTION_CACHE.INVALID_CACHE_COUNT, 1) 

72 return False 

73 

74 def _referenced_blobs_still_exist(self, action_digest: Digest, action_result: ActionResult) -> bool: 

75 """Checks CAS for Action and ActionResult output blobs existence. 

76 

77 Args: 

78 action_digest (Digest): Digest for the Action whose top level digests 

79 will be searched for. 

80 action_result (ActionResult): ActionResult to search referenced 

81 output blobs for. 

82 

83 Returns: 

84 True if all referenced blobs are present in CAS, False otherwise. 

85 """ 

86 if not self._storage: 

87 return True 

88 blobs_needed: Set[HashableDigest] = set() 

89 

90 for output_file in action_result.output_files: 

91 blobs_needed.add(HashableDigest(output_file.digest.hash, output_file.digest.size_bytes)) 

92 

93 for output_directory in action_result.output_directories: 

94 if output_directory.HasField("tree_digest"): 

95 blobs_needed.add( 

96 HashableDigest(output_directory.tree_digest.hash, output_directory.tree_digest.size_bytes) 

97 ) 

98 tree = self._storage.get_message(output_directory.tree_digest, Tree) 

99 if tree is None: 

100 return False 

101 

102 for file_node in tree.root.files: 

103 blobs_needed.add(HashableDigest(file_node.digest.hash, file_node.digest.size_bytes)) 

104 

105 for child in tree.children: 

106 for file_node in child.files: 

107 blobs_needed.add(HashableDigest(file_node.digest.hash, file_node.digest.size_bytes)) 

108 elif output_directory.HasField("root_directory_digest"): 

109 blobs_needed.add( 

110 HashableDigest( 

111 output_directory.root_directory_digest.hash, output_directory.root_directory_digest.size_bytes 

112 ) 

113 ) 

114 try: 

115 for directory in self._storage.get_tree( 

116 output_directory.root_directory_digest, raise_on_missing_subdir=True 

117 ): 

118 blobs_needed.update( 

119 [ 

120 HashableDigest(file_node.digest.hash, file_node.digest.size_bytes) 

121 for file_node in directory.files 

122 ] 

123 ) 

124 blobs_needed.update( 

125 [ 

126 HashableDigest(dir_node.digest.hash, dir_node.digest.size_bytes) 

127 for dir_node in directory.directories 

128 ] 

129 ) 

130 except NotFoundError: 

131 return False 

132 

133 if action_result.stdout_digest.hash and not action_result.stdout_raw: 

134 blobs_needed.add(HashableDigest(action_result.stdout_digest.hash, action_result.stdout_digest.size_bytes)) 

135 

136 if action_result.stderr_digest.hash and not action_result.stderr_raw: 

137 blobs_needed.add(HashableDigest(action_result.stderr_digest.hash, action_result.stderr_digest.size_bytes)) 

138 

139 # Additionally refresh the TTL of the ActionDigest and the top level digests 

140 # contained within. This will keep the Action around for use cases like bgd-browser 

141 # where you want to look at both the Action and it's ActionResult, but with minimal 

142 # overhead. 

143 action = self._storage.get_message(action_digest, Action) 

144 action_blobs: Set[HashableDigest] = set() 

145 if action: 

146 action_blobs.add(HashableDigest(action_digest.hash, action_digest.size_bytes)) 

147 action_blobs.add(HashableDigest(action.command_digest.hash, action.command_digest.size_bytes)) 

148 action_blobs.add(HashableDigest(action.input_root_digest.hash, action.input_root_digest.size_bytes)) 

149 

150 blobs_to_check: Set[HashableDigest] = blobs_needed | action_blobs 

151 # No need to check the underlying storage for the empty blob as it is a special case blob which always exists 

152 # It is possible that the empty blob is not actually present in the underlying storage 

153 blobs_to_check.discard(HashableDigest(EMPTY_BLOB_DIGEST.hash, EMPTY_BLOB_DIGEST.size_bytes)) 

154 missing = self._storage.missing_blobs([blob.to_digest() for blob in blobs_to_check]) 

155 required_missing = [blob for blob in missing if HashableDigest(blob.hash, blob.size_bytes) in blobs_needed] 

156 if len(required_missing) != 0: 

157 LOGGER.debug( 

158 "Missing blobs.", tags=dict(required_missing=len(required_missing), blobs_needed=len(blobs_needed)) 

159 ) 

160 return False 

161 return True