Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/action_cache_abc.py: 94.19%

86 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17from abc import ABC, abstractmethod 

18from typing import Any, Optional, Set, TypeVar 

19 

20from buildgrid._exceptions import NotFoundError 

21from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR 

22from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

23 Action, 

24 ActionResult, 

25 Digest, 

26 DigestFunction, 

27 Tree, 

28) 

29from buildgrid.server.cas.instance import EMPTY_BLOB_DIGEST 

30from buildgrid.server.cas.storage.storage_abc import StorageABC 

31from buildgrid.server.servicer import Instance 

32from buildgrid.utils import HashableDigest, get_hash_type 

33 

34LOGGER = logging.getLogger(__name__) 

35 

36 

37T = TypeVar("T", bound="ActionCacheABC") 

38 

39 

40class ActionCacheABC(Instance, ABC): 

41 SERVICE_NAME = RE_DESCRIPTOR.services_by_name["ActionCache"].full_name 

42 

43 def __init__(self, allow_updates: bool = False, storage: Optional[StorageABC] = None): 

44 self._allow_updates = allow_updates 

45 self._storage = storage 

46 

47 @property 

48 def allow_updates(self) -> bool: 

49 return self._allow_updates 

50 

51 def hash_type(self) -> "DigestFunction.Value.ValueType": 

52 return get_hash_type() 

53 

54 def __enter__(self: T) -> T: 

55 self.start() 

56 return self 

57 

58 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

59 self.stop() 

60 

61 def start(self) -> None: 

62 if self._storage is not None: 

63 self._storage.start() 

64 

65 def stop(self) -> None: 

66 if self._storage is not None: 

67 self._storage.stop() 

68 LOGGER.info("Stopped ActionCache") 

69 

70 # NOTE: This method exists for compatibility reasons. Ideally it should never 

71 # be used with an up-to-date configuration. 

72 def set_instance_name(self, instance_name: str) -> None: 

73 LOGGER.warning( 

74 "Cache instances should be defined in a 'caches' list and passed " 

75 "to an ActionCache service, rather than defined in the 'services' " 

76 "list themselves." 

77 ) 

78 super().set_instance_name(instance_name) 

79 

80 @abstractmethod 

81 def get_action_result(self, action_digest: Digest) -> ActionResult: 

82 raise NotImplementedError() 

83 

84 @abstractmethod 

85 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None: 

86 raise NotImplementedError() 

87 

88 def _referenced_blobs_still_exist(self, action_digest: Digest, action_result: ActionResult) -> bool: 

89 """Checks CAS for Action and ActionResult output blobs existence. 

90 

91 Args: 

92 action_digest (Digest): Digest for the Action whose top level digests 

93 will be searched for. 

94 action_result (ActionResult): ActionResult to search referenced 

95 output blobs for. 

96 

97 Returns: 

98 True if all referenced blobs are present in CAS, False otherwise. 

99 """ 

100 if not self._storage: 

101 return True 

102 blobs_needed: Set[HashableDigest] = set() 

103 

104 for output_file in action_result.output_files: 

105 blobs_needed.add(HashableDigest(output_file.digest.hash, output_file.digest.size_bytes)) 

106 

107 for output_directory in action_result.output_directories: 

108 if output_directory.HasField("tree_digest"): 

109 blobs_needed.add( 

110 HashableDigest(output_directory.tree_digest.hash, output_directory.tree_digest.size_bytes) 

111 ) 

112 tree = self._storage.get_message(output_directory.tree_digest, Tree) 

113 if tree is None: 

114 return False 

115 

116 for file_node in tree.root.files: 

117 blobs_needed.add(HashableDigest(file_node.digest.hash, file_node.digest.size_bytes)) 

118 

119 for child in tree.children: 

120 for file_node in child.files: 

121 blobs_needed.add(HashableDigest(file_node.digest.hash, file_node.digest.size_bytes)) 

122 elif output_directory.HasField("root_directory_digest"): 

123 blobs_needed.add( 

124 HashableDigest( 

125 output_directory.root_directory_digest.hash, output_directory.root_directory_digest.size_bytes 

126 ) 

127 ) 

128 try: 

129 for directory in self._storage.get_tree( 

130 output_directory.root_directory_digest, raise_on_missing_subdir=True 

131 ): 

132 blobs_needed.update( 

133 [ 

134 HashableDigest(file_node.digest.hash, file_node.digest.size_bytes) 

135 for file_node in directory.files 

136 ] 

137 ) 

138 blobs_needed.update( 

139 [ 

140 HashableDigest(dir_node.digest.hash, dir_node.digest.size_bytes) 

141 for dir_node in directory.directories 

142 ] 

143 ) 

144 except NotFoundError: 

145 return False 

146 

147 if action_result.stdout_digest.hash and not action_result.stdout_raw: 

148 blobs_needed.add(HashableDigest(action_result.stdout_digest.hash, action_result.stdout_digest.size_bytes)) 

149 

150 if action_result.stderr_digest.hash and not action_result.stderr_raw: 

151 blobs_needed.add(HashableDigest(action_result.stderr_digest.hash, action_result.stderr_digest.size_bytes)) 

152 

153 # Additionally refresh the TTL of the ActionDigest and the top level digests 

154 # contained within. This will keep the Action around for use cases like bgd-browser 

155 # where you want to look at both the Action and it's ActionResult, but with minimal 

156 # overhead. 

157 action = self._storage.get_message(action_digest, Action) 

158 action_blobs: Set[HashableDigest] = set() 

159 if action: 

160 action_blobs.add(HashableDigest(action_digest.hash, action_digest.size_bytes)) 

161 action_blobs.add(HashableDigest(action.command_digest.hash, action.command_digest.size_bytes)) 

162 action_blobs.add(HashableDigest(action.input_root_digest.hash, action.input_root_digest.size_bytes)) 

163 

164 blobs_to_check: Set[HashableDigest] = blobs_needed | action_blobs 

165 # No need to check the underlying storage for the empty blob as it is a special case blob which always exists 

166 # It is possible that the empty blob is not actually present in the underlying storage 

167 blobs_to_check.discard(HashableDigest(EMPTY_BLOB_DIGEST.hash, EMPTY_BLOB_DIGEST.size_bytes)) 

168 missing = self._storage.missing_blobs([blob.to_digest() for blob in blobs_to_check]) 

169 required_missing = [blob for blob in missing if HashableDigest(blob.hash, blob.size_bytes) in blobs_needed] 

170 if len(required_missing) != 0: 

171 LOGGER.debug(f"Missing {len(required_missing)}/{len(blobs_needed)}") 

172 return False 

173 return True