Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Reference Cache 

18================== 

19 

20Implements an in-memory reference cache. 

21 

22For a given key, it 

23""" 

24 

25import collections 

26import logging 

27 

28from buildgrid._exceptions import NotFoundError 

29from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

30 

31 

32class ReferenceCache: 

33 

34 def __init__(self, storage, max_cached_refs, allow_updates=True): 

35 """ Initialises a new ReferenceCache instance. 

36 

37 Args: 

38 storage (StorageABC): storage backend instance to be used. 

39 max_cached_refs (int): maximum number of entries to be stored. 

40 allow_updates (bool): allow the client to write to storage 

41 """ 

42 self.__logger = logging.getLogger(__name__) 

43 

44 self._instance_name = None 

45 

46 self.__storage = storage 

47 

48 self._allow_updates = allow_updates 

49 self._max_cached_refs = max_cached_refs 

50 self._digest_map = collections.OrderedDict() 

51 

52 # --- Public API --- 

53 

54 @property 

55 def instance_name(self): 

56 return self._instance_name 

57 

58 def register_instance_with_server(self, instance_name, server): 

59 """Names and registers the refs instance with a given server.""" 

60 if self._instance_name is None: 

61 server.add_reference_storage_instance(self, instance_name) 

62 

63 self._instance_name = instance_name 

64 

65 else: 

66 raise AssertionError("Instance already registered") 

67 

68 @property 

69 def allow_updates(self): 

70 return self._allow_updates 

71 

72 def get_digest_reference(self, key): 

73 """Retrieves the cached Digest for the given key. 

74 

75 Args: 

76 key: key for Digest to query. 

77 

78 Returns: 

79 The cached Digest matching the given key or raises 

80 NotFoundError. 

81 """ 

82 if key in self._digest_map: 

83 reference_result = self.__storage.get_message(self._digest_map[key], 

84 remote_execution_pb2.Digest) 

85 

86 if reference_result is not None: 

87 return reference_result 

88 

89 del self._digest_map[key] 

90 

91 raise NotFoundError(f"Key not found: {key}") 

92 

93 def get_action_reference(self, key): 

94 """Retrieves the cached ActionResult for the given Action digest. 

95 

96 Args: 

97 key: key for ActionResult to query. 

98 

99 Returns: 

100 The cached ActionResult matching the given key or raises 

101 NotFoundError. 

102 """ 

103 if key in self._digest_map: 

104 reference_result = self.__storage.get_message(self._digest_map[key], 

105 remote_execution_pb2.ActionResult) 

106 

107 if reference_result is not None: 

108 if self._action_result_blobs_still_exist(reference_result): 

109 self._digest_map.move_to_end(key) 

110 return reference_result 

111 

112 del self._digest_map[key] 

113 

114 raise NotFoundError(f"Key not found: {key}") 

115 

116 def update_reference(self, key, result): 

117 """Stores the result in cache for the given key. 

118 

119 If the cache size limit has been reached, the oldest cache entries will 

120 be dropped before insertion so that the cache size never exceeds the 

121 maximum numbers of entries allowed. 

122 

123 Args: 

124 key: key to store result. 

125 result (Digest): result digest to store. 

126 """ 

127 if not self._allow_updates: 

128 raise NotImplementedError("Updating cache not allowed") 

129 

130 if self._max_cached_refs == 0: 

131 return 

132 

133 while len(self._digest_map) >= self._max_cached_refs: 

134 self._digest_map.popitem(last=False) 

135 

136 result_digest = self.__storage.put_message(result) 

137 self._digest_map[key] = result_digest 

138 

139 # --- Private API --- 

140 

141 def _action_result_blobs_still_exist(self, action_result): 

142 """Checks CAS for ActionResult output blobs existance. 

143 

144 Args: 

145 action_result (ActionResult): ActionResult to search referenced 

146 output blobs for. 

147 

148 Returns: 

149 True if all referenced blobs are present in CAS, False otherwise. 

150 """ 

151 blobs_needed = [] 

152 

153 for output_file in action_result.output_files: 

154 blobs_needed.append(output_file.digest) 

155 

156 for output_directory in action_result.output_directories: 

157 blobs_needed.append(output_directory.tree_digest) 

158 tree = self.__storage.get_message(output_directory.tree_digest, 

159 remote_execution_pb2.Tree) 

160 if tree is None: 

161 return False 

162 

163 for file_node in tree.root.files: 

164 blobs_needed.append(file_node.digest) 

165 

166 for child in tree.children: 

167 for file_node in child.files: 

168 blobs_needed.append(file_node.digest) 

169 

170 if action_result.stdout_digest.hash and not action_result.stdout_raw: 

171 blobs_needed.append(action_result.stdout_digest) 

172 

173 if action_result.stderr_digest.hash and not action_result.stderr_raw: 

174 blobs_needed.append(action_result.stderr_digest) 

175 

176 missing = self.__storage.missing_blobs(blobs_needed) 

177 return len(missing) == 0