Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/with_cache.py: 87.59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

137 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17WithCacheStorage 

18================== 

19 

20A storage provider that first checks a cache, then tries a slower 

21fallback provider. 

22 

23To ensure clients can reliably store blobs in CAS, only `get_blob` 

24calls are cached -- `has_blob` and `missing_blobs` will always query 

25the fallback. 

26""" 

27 

28from concurrent.futures import ThreadPoolExecutor 

29import io 

30import logging 

31from typing import List 

32 

33from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

34from buildgrid.server.metrics_names import ( 

35 CAS_CACHE_BULK_READ_HIT_COUNT_NAME, 

36 CAS_CACHE_BULK_READ_HIT_PERCENTAGE_NAME, 

37 CAS_CACHE_BULK_READ_MISS_COUNT_NAME, 

38 CAS_CACHE_GET_BLOB_HIT_COUNT_NAME, 

39 CAS_CACHE_GET_BLOB_MISS_COUNT_NAME 

40) 

41from buildgrid.server.metrics_utils import Distribution 

42from .storage_abc import StorageABC 

43 

44 

45class _OutputTee(io.BufferedIOBase): 

46 """A file-like object that writes data to two file-like objects. 

47 

48 The files should be in blocking mode; non-blocking mode is unsupported. 

49 """ 

50 

51 def __init__(self, file_a, file_b): 

52 super().__init__() 

53 

54 self._original_a = file_a 

55 if isinstance(file_a, io.BufferedIOBase): 

56 self._a = file_a 

57 else: 

58 self._a = io.BufferedWriter(file_a) 

59 

60 self._original_b = file_b 

61 if isinstance(file_b, io.BufferedIOBase): 

62 self._b = file_b 

63 else: 

64 self._b = io.BufferedWriter(file_b) 

65 

66 def close(self): 

67 super().close() 

68 self._a.close() 

69 self._b.close() 

70 

71 def flush(self): 

72 self._a.flush() 

73 self._b.flush() 

74 

75 def readable(self): 

76 return False 

77 

78 def seekable(self): 

79 return False 

80 

81 def write(self, b): 

82 self._a.write(b) 

83 return self._b.write(b) 

84 

85 def writable(self): 

86 return True 

87 

88 

89class _CachingTee(io.RawIOBase): 

90 """A file-like object that wraps a 'fallback' file, and when it's 

91 read, writes the resulting data to a 'cache' storage provider. 

92 

93 Does not support non-blocking mode. 

94 """ 

95 

96 def __init__(self, fallback_file, digest, cache): 

97 super().__init__() 

98 

99 self._file = fallback_file 

100 self._digest = digest 

101 self._cache = cache 

102 self._cache_session = cache.begin_write(digest) 

103 

104 def close(self): 

105 super().close() 

106 self._cache_session.write(self._file.read()) 

107 self._cache.commit_write(self._digest, self._cache_session) 

108 self._file.close() 

109 

110 def readable(self): 

111 return True 

112 

113 def seekable(self): 

114 return False 

115 

116 def writable(self): 

117 return False 

118 

119 def readall(self): 

120 data = self._file.read() 

121 self._cache_session.write(data) 

122 return data 

123 

124 def readinto(self, b): 

125 bytes_read = self._file.readinto(b) 

126 self._cache_session.write(b[:bytes_read]) 

127 return bytes_read 

128 

129 

130class WithCacheStorage(StorageABC): 

131 

132 def __init__(self, cache, fallback, defer_fallback_writes=False, 

133 fallback_writer_threads=20): 

134 self.__logger = logging.getLogger(__name__) 

135 

136 self._instance_name = None 

137 self._cache = cache 

138 self._fallback = fallback 

139 self._defer_fallback_writes = defer_fallback_writes 

140 if self._defer_fallback_writes: 

141 # pylint: disable=consider-using-with 

142 self._executor = ThreadPoolExecutor( 

143 max_workers=fallback_writer_threads, 

144 thread_name_prefix="WithCacheFallbackWriter" 

145 ) 

146 

147 def __del__(self): 

148 # If we have a fallbackwriter, wait until all work has finished to exit safely 

149 if self._defer_fallback_writes: 

150 self.__logger.warning( 

151 "Shutting down WithCacheStorage with deferred writes. Finishing up deferred writes." 

152 ) 

153 self._executor.shutdown(wait=True) 

154 self.__logger.info( 

155 "WithCacheFallbackWriter finished deferred writes and pool was shut down." 

156 ) 

157 

158 def has_blob(self, digest): 

159 if self._defer_fallback_writes and self._cache.has_blob(digest): 

160 return True 

161 return self._fallback.has_blob(digest) 

162 

163 def get_blob(self, digest): 

164 cache_result = self._cache.get_blob(digest) 

165 if cache_result is not None: 

166 with Distribution(CAS_CACHE_GET_BLOB_HIT_COUNT_NAME) as metric_cache_hit: 

167 metric_cache_hit.count = 1 

168 return cache_result 

169 fallback_result = self._fallback.get_blob(digest) 

170 if fallback_result is None: 

171 return None 

172 

173 with Distribution(CAS_CACHE_GET_BLOB_MISS_COUNT_NAME) as metric_cache_miss: 

174 metric_cache_miss.count = 1 

175 return _CachingTee(fallback_result, digest, self._cache) 

176 

177 def delete_blob(self, digest): 

178 self._fallback.delete_blob(digest) 

179 self._cache.delete_blob(digest) 

180 

181 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

182 # Only report back failures from the fallback 

183 cache_failures = self._cache.bulk_delete(digests) 

184 for failure in cache_failures: 

185 self.__logger.warning(f"Failed to delete [{failure}] from cache storage") 

186 

187 return self._fallback.bulk_delete(digests) 

188 

189 def begin_write(self, digest): 

190 return _OutputTee(self._cache.begin_write(digest), self._fallback.begin_write(digest)) 

191 

192 def _commit_fallback_write(self, digest, write_session): 

193 self._fallback.commit_write(digest, write_session._original_b) 

194 

195 def commit_write(self, digest, write_session): 

196 write_session.flush() 

197 self._cache.commit_write(digest, write_session._original_a) 

198 

199 if self._defer_fallback_writes: 

200 self._executor.submit( 

201 self._commit_fallback_write, 

202 digest, 

203 write_session 

204 ) 

205 else: 

206 self._fallback.commit_write(digest, write_session._original_b) 

207 

208 def missing_blobs(self, digests): 

209 return self._fallback.missing_blobs(digests) 

210 

211 def bulk_update_blobs(self, blobs): 

212 self._cache.bulk_update_blobs(blobs) 

213 return self._fallback.bulk_update_blobs(blobs) 

214 

215 def bulk_read_blobs(self, digests): 

216 cache_blobs = self._cache.bulk_read_blobs(digests) 

217 

218 with Distribution(CAS_CACHE_BULK_READ_HIT_COUNT_NAME) as metric_cache_hit: 

219 metric_cache_hit.count = len(cache_blobs) 

220 

221 uncached_digests = filter( 

222 lambda digest: cache_blobs.get(digest.hash, None) is None, 

223 digests 

224 ) 

225 

226 with Distribution(CAS_CACHE_BULK_READ_MISS_COUNT_NAME) as metric_cache_miss: 

227 metric_cache_miss.count = len(digests) - len(cache_blobs) 

228 

229 with Distribution(CAS_CACHE_BULK_READ_HIT_PERCENTAGE_NAME) as metric_cache_percent: 

230 if len(digests) == 0: 

231 metric_cache_percent.count = 0 

232 else: 

233 metric_cache_percent.count = len(cache_blobs) / len(digests) * 100 

234 

235 fallback_blobs = self._fallback.bulk_read_blobs(uncached_digests) 

236 cache_blobs.update(fallback_blobs) 

237 

238 return cache_blobs 

239 

240 def is_cleanup_enabled(self): 

241 return self._fallback.is_cleanup_enabled() 

242 

243 def set_instance_name(self, instance_name): 

244 self._instance_name = instance_name 

245 self._fallback.set_instance_name(instance_name) 

246 self._cache.set_instance_name(instance_name) 

247 

248 def get_capabilities(self): 

249 return self._fallback.get_capabilities()