Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/referencestorage/storage.py: 68.12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

69 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Reference Cache 

18================== 

19 

20Implements an in-memory reference cache. 

21 

22For a given key, it 

23""" 

24 

25import collections 

26import logging 

27 

28from buildgrid._exceptions import NotFoundError 

29from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

30 

31 

32class ReferenceCache: 

33 

34 def __init__(self, storage, max_cached_refs, allow_updates=True): 

35 """ Initialises a new ReferenceCache instance. 

36 

37 Args: 

38 storage (StorageABC): storage backend instance to be used. 

39 max_cached_refs (int): maximum number of entries to be stored. 

40 allow_updates (bool): allow the client to write to storage 

41 """ 

42 self.__logger = logging.getLogger(__name__) 

43 

44 self._instance_name = None 

45 

46 self.__storage = storage 

47 

48 self._allow_updates = allow_updates 

49 self._max_cached_refs = max_cached_refs 

50 self._digest_map = collections.OrderedDict() 

51 

52 # --- Public API --- 

53 

54 @property 

55 def instance_name(self): 

56 return self._instance_name 

57 

58 def setup_grpc(self): 

59 self.__storage.setup_grpc() 

60 

61 def register_instance_with_server(self, instance_name, server): 

62 """Names and registers the refs instance with a given server.""" 

63 if self._instance_name is None: 

64 server.add_reference_storage_instance(self, instance_name) 

65 

66 self._instance_name = instance_name 

67 

68 else: 

69 raise AssertionError("Instance already registered") 

70 

71 @property 

72 def allow_updates(self): 

73 return self._allow_updates 

74 

75 def get_digest_reference(self, key): 

76 """Retrieves the cached Digest for the given key. 

77 

78 Args: 

79 key: key for Digest to query. 

80 

81 Returns: 

82 The cached Digest matching the given key or raises 

83 NotFoundError. 

84 """ 

85 if key in self._digest_map: 

86 reference_result = self.__storage.get_message(self._digest_map[key], 

87 remote_execution_pb2.Digest) 

88 

89 if reference_result is not None: 

90 return reference_result 

91 

92 del self._digest_map[key] 

93 

94 raise NotFoundError(f"Key not found: {key}") 

95 

96 def get_action_reference(self, key): 

97 """Retrieves the cached ActionResult for the given Action digest. 

98 

99 Args: 

100 key: key for ActionResult to query. 

101 

102 Returns: 

103 The cached ActionResult matching the given key or raises 

104 NotFoundError. 

105 """ 

106 if key in self._digest_map: 

107 reference_result = self.__storage.get_message(self._digest_map[key], 

108 remote_execution_pb2.ActionResult) 

109 

110 if reference_result is not None: 

111 if self._action_result_blobs_still_exist(reference_result): 

112 self._digest_map.move_to_end(key) 

113 return reference_result 

114 

115 del self._digest_map[key] 

116 

117 raise NotFoundError(f"Key not found: {key}") 

118 

119 def update_reference(self, key, result): 

120 """Stores the result in cache for the given key. 

121 

122 If the cache size limit has been reached, the oldest cache entries will 

123 be dropped before insertion so that the cache size never exceeds the 

124 maximum numbers of entries allowed. 

125 

126 Args: 

127 key: key to store result. 

128 result (Digest): result digest to store. 

129 """ 

130 if not self._allow_updates: 

131 raise NotImplementedError("Updating cache not allowed") 

132 

133 if self._max_cached_refs == 0: 

134 return 

135 

136 while len(self._digest_map) >= self._max_cached_refs: 

137 self._digest_map.popitem(last=False) 

138 

139 result_digest = self.__storage.put_message(result) 

140 self._digest_map[key] = result_digest 

141 

142 # --- Private API --- 

143 

144 def _action_result_blobs_still_exist(self, action_result): 

145 """Checks CAS for ActionResult output blobs existance. 

146 

147 Args: 

148 action_result (ActionResult): ActionResult to search referenced 

149 output blobs for. 

150 

151 Returns: 

152 True if all referenced blobs are present in CAS, False otherwise. 

153 """ 

154 blobs_needed = [] 

155 

156 for output_file in action_result.output_files: 

157 blobs_needed.append(output_file.digest) 

158 

159 for output_directory in action_result.output_directories: 

160 blobs_needed.append(output_directory.tree_digest) 

161 tree = self.__storage.get_message(output_directory.tree_digest, 

162 remote_execution_pb2.Tree) 

163 if tree is None: 

164 return False 

165 

166 for file_node in tree.root.files: 

167 blobs_needed.append(file_node.digest) 

168 

169 for child in tree.children: 

170 for file_node in child.files: 

171 blobs_needed.append(file_node.digest) 

172 

173 if action_result.stdout_digest.hash and not action_result.stdout_raw: 

174 blobs_needed.append(action_result.stdout_digest) 

175 

176 if action_result.stderr_digest.hash and not action_result.stderr_raw: 

177 blobs_needed.append(action_result.stderr_digest) 

178 

179 missing = self.__storage.missing_blobs(blobs_needed) 

180 return len(missing) == 0