Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/with_cache.py: 73.81%

210 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17WithCacheStorage 

18================== 

19 

20A storage provider that first checks a cache, then tries a slower 

21fallback provider. 

22 

23To ensure clients can reliably store blobs in CAS, only `get_blob` 

24calls are cached -- `has_blob` and `missing_blobs` will always query 

25the fallback. 

26""" 

27 

28from concurrent.futures import ThreadPoolExecutor 

29import io 

30import logging 

31from typing import List 

32 

33from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

34from buildgrid.server.metrics_names import ( 

35 CAS_CACHE_BULK_READ_HIT_COUNT_NAME, 

36 CAS_CACHE_BULK_READ_HIT_PERCENTAGE_NAME, 

37 CAS_CACHE_BULK_READ_MISS_COUNT_NAME, 

38 CAS_CACHE_GET_BLOB_HIT_COUNT_NAME, 

39 CAS_CACHE_GET_BLOB_MISS_COUNT_NAME 

40) 

41from buildgrid.server.metrics_utils import Distribution 

42from .storage_abc import StorageABC 

43 

44 

45class _OutputTee(io.BufferedIOBase): 

46 """A file-like object that writes data to two file-like objects. 

47 

48 The files should be in blocking mode; non-blocking mode is unsupported. 

49 """ 

50 

51 def __init__(self, file_a, file_b): 

52 super().__init__() 

53 

54 self._original_a = file_a 

55 if isinstance(file_a, io.BufferedIOBase): 

56 self._a = file_a 

57 else: 

58 self._a = io.BufferedWriter(file_a) 

59 

60 self._original_b = file_b 

61 if isinstance(file_b, io.BufferedIOBase): 

62 self._b = file_b 

63 else: 

64 self._b = io.BufferedWriter(file_b) 

65 

66 def close(self): 

67 super().close() 

68 if self._a: 

69 self._a.close() 

70 self._b.close() 

71 

72 def flush(self): 

73 try: 

74 if self._a: 

75 self._a.flush() 

76 except Exception: 

77 self.__logger.warning("Failed to flush write session to cache storage", exc_info=True) 

78 self._a.close() 

79 self._a = None 

80 self._b.flush() 

81 

82 def readable(self): 

83 return False 

84 

85 def seekable(self): 

86 return False 

87 

88 def write(self, b): 

89 try: 

90 if self._a: 

91 self._a.write(b) 

92 except Exception: 

93 self.__logger.warning("Failed to write to cache storage", exc_info=True) 

94 self._a.close() 

95 self._a = None 

96 

97 return self._b.write(b) 

98 

99 def writable(self): 

100 return True 

101 

102 

103class _CachingTee(io.RawIOBase): 

104 """A file-like object that wraps a 'fallback' file, and when it's 

105 read, writes the resulting data to a 'cache' storage provider. 

106 

107 You can call seek() with this class, but only after the data has been 

108 written into the caching provider. 

109 

110 Does not support non-blocking mode. 

111 """ 

112 

113 def __init__(self, fallback_file, digest, cache): 

114 super().__init__() 

115 

116 self._file = fallback_file 

117 self._digest = digest 

118 self._cache = cache 

119 self._cache_session = cache.begin_write(digest) 

120 

121 self._read_attempted = False 

122 

123 def close(self): 

124 super().close() 

125 try: 

126 if self._cache_session: 

127 self._cache_session.write(self._file.read()) 

128 self._cache.commit_write(self._digest, self._cache_session) 

129 except Exception: 

130 self.__logger.warning("Failed to write to cache storage after reading blob", exc_info=True) 

131 self._cache_session = None 

132 self._file.close() 

133 

134 def readable(self): 

135 return True 

136 

137 def seekable(self): 

138 return True 

139 

140 def writable(self): 

141 return False 

142 

143 def readall(self): 

144 data = self._file.read() 

145 try: 

146 if self._cache_session and not self._read_attempted: 

147 self._cache_session.write(data) 

148 except Exception: 

149 self.__logger.warning("Failed to write to cache storage after reading blob", exc_info=True) 

150 self._cache_session = None 

151 self._read_attempted = True 

152 return data 

153 

154 def readinto(self, b): 

155 bytes_read = self._file.readinto(b) 

156 try: 

157 if self._cache_session and not self._read_attempted: 

158 self._cache_session.write(b[:bytes_read]) 

159 except Exception: 

160 self.__logger.warning("Failed to write to cache storage after reading blob", exc_info=True) 

161 self._cache_session = None 

162 self._read_attempted = True 

163 return bytes_read 

164 

165 def seek(self, offset, whence=io.SEEK_SET): 

166 if self._read_attempted: 

167 return self._file.seek(offset, whence) 

168 else: 

169 raise io.UnsupportedOperation("_CachingTee: Cannot seek before read") 

170 

171 

172class WithCacheStorage(StorageABC): 

173 

174 def __init__(self, cache, fallback, defer_fallback_writes=False, 

175 fallback_writer_threads=20): 

176 self.__logger = logging.getLogger(__name__) 

177 

178 self._instance_name = None 

179 self._cache = cache 

180 self._fallback = fallback 

181 self._defer_fallback_writes = defer_fallback_writes 

182 if self._defer_fallback_writes: 

183 # pylint: disable=consider-using-with 

184 self._executor = ThreadPoolExecutor( 

185 max_workers=fallback_writer_threads, 

186 thread_name_prefix="WithCacheFallbackWriter" 

187 ) 

188 

189 def __del__(self): 

190 # If we have a fallbackwriter, wait until all work has finished to exit safely 

191 if self._defer_fallback_writes: 

192 self.__logger.warning( 

193 "Shutting down WithCacheStorage with deferred writes. Finishing up deferred writes." 

194 ) 

195 self._executor.shutdown(wait=True) 

196 self.__logger.info( 

197 "WithCacheFallbackWriter finished deferred writes and pool was shut down." 

198 ) 

199 

200 def setup_grpc(self): 

201 self._cache.setup_grpc() 

202 self._fallback.setup_grpc() 

203 

204 def has_blob(self, digest): 

205 try: 

206 if self._defer_fallback_writes and self._cache.has_blob(digest): 

207 return True 

208 except Exception: 

209 self.__logger.warning(f"Failed to check existence of [{digest}] in cache storage", exc_info=True) 

210 

211 return self._fallback.has_blob(digest) 

212 

213 def get_blob(self, digest): 

214 try: 

215 cache_result = self._cache.get_blob(digest) 

216 if cache_result is not None: 

217 with Distribution(CAS_CACHE_GET_BLOB_HIT_COUNT_NAME) as metric_cache_hit: 

218 metric_cache_hit.count = 1 

219 return cache_result 

220 except Exception: 

221 self.__logger.warning(f"Failed to read [{digest}] from cache storage", exc_info=True) 

222 

223 fallback_result = self._fallback.get_blob(digest) 

224 if fallback_result is None: 

225 return None 

226 

227 with Distribution(CAS_CACHE_GET_BLOB_MISS_COUNT_NAME) as metric_cache_miss: 

228 metric_cache_miss.count = 1 

229 return _CachingTee(fallback_result, digest, self._cache) 

230 

231 def delete_blob(self, digest): 

232 self._fallback.delete_blob(digest) 

233 try: 

234 self._cache.delete_blob(digest) 

235 except Exception: 

236 self.__logger.warning(f"Failed to delete [{digest}] from cache storage", exc_info=True) 

237 

238 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

239 # Only report back failures from the fallback 

240 try: 

241 cache_failures = self._cache.bulk_delete(digests) 

242 for failure in cache_failures: 

243 self.__logger.warning(f"Failed to delete [{failure}] from cache storage") 

244 except Exception: 

245 self.__logger.warning("Failed to bulk delete blobs from cache storage", exc_info=True) 

246 

247 return self._fallback.bulk_delete(digests) 

248 

249 def begin_write(self, digest): 

250 return _OutputTee(self._cache.begin_write(digest), self._fallback.begin_write(digest)) 

251 

252 def _commit_fallback_write(self, digest, write_session): 

253 self._fallback.commit_write(digest, write_session._original_b) 

254 

255 def commit_write(self, digest, write_session): 

256 write_session.flush() 

257 written_to_cache = False 

258 try: 

259 if write_session._a: 

260 self._cache.commit_write(digest, write_session._original_a) 

261 written_to_cache = True 

262 except Exception: 

263 self.__logger.warning(f"Failed to commit write of [{digest}] to cache storage", exc_info=True) 

264 

265 if written_to_cache and self._defer_fallback_writes: 

266 self._executor.submit( 

267 self._commit_fallback_write, 

268 digest, 

269 write_session 

270 ) 

271 else: 

272 self._fallback.commit_write(digest, write_session._original_b) 

273 

274 def missing_blobs(self, digests): 

275 return self._fallback.missing_blobs(digests) 

276 

277 def bulk_update_blobs(self, blobs): 

278 try: 

279 self._cache.bulk_update_blobs(blobs) 

280 except Exception: 

281 self.__logger.warning("Failed to bulk update blobs in cache storage", exc_info=True) 

282 

283 return self._fallback.bulk_update_blobs(blobs) 

284 

285 def bulk_read_blobs(self, digests): 

286 try: 

287 cache_blobs = self._cache.bulk_read_blobs(digests) 

288 except Exception: 

289 self.__logger.warning("Failed to bulk read blobs from cache storage", exc_info=True) 

290 cache_blobs = {} 

291 

292 with Distribution(CAS_CACHE_BULK_READ_HIT_COUNT_NAME) as metric_cache_hit: 

293 metric_cache_hit.count = len(cache_blobs) 

294 

295 uncached_digests = [digest for digest in digests if cache_blobs.get(digest.hash, None) is None] 

296 

297 with Distribution(CAS_CACHE_BULK_READ_MISS_COUNT_NAME) as metric_cache_miss: 

298 metric_cache_miss.count = len(digests) - len(cache_blobs) 

299 

300 with Distribution(CAS_CACHE_BULK_READ_HIT_PERCENTAGE_NAME) as metric_cache_percent: 

301 if len(digests) == 0: 

302 metric_cache_percent.count = 0 

303 else: 

304 metric_cache_percent.count = len(cache_blobs) / len(digests) * 100 

305 

306 fallback_blobs = self._fallback.bulk_read_blobs(uncached_digests) 

307 cache_blobs.update(fallback_blobs) 

308 

309 uncached_blobs = [] 

310 for digest in uncached_digests: 

311 blob = fallback_blobs.get(digest.hash) 

312 if blob is not None: 

313 uncached_blobs.append((digest, blob.read())) 

314 blob.seek(0) 

315 

316 try: 

317 self._cache.bulk_update_blobs(uncached_blobs) 

318 except Exception: 

319 self.__logger.warning("Failed to add blobs to cache storage after bulk read", exc_info=True) 

320 

321 return cache_blobs 

322 

323 def is_cleanup_enabled(self): 

324 return self._fallback.is_cleanup_enabled() 

325 

326 def set_instance_name(self, instance_name): 

327 self._instance_name = instance_name 

328 self._fallback.set_instance_name(instance_name) 

329 self._cache.set_instance_name(instance_name) 

330 

331 def get_capabilities(self): 

332 return self._fallback.get_capabilities()