Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/with_cache.py: 93.75%

128 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17WithCacheStorage 

18================== 

19 

20A storage provider that first checks a cache, then tries a slower 

21fallback provider. 

22 

23To ensure clients can reliably store blobs in CAS, only `get_blob` 

24calls are cached -- `has_blob` and `missing_blobs` will always query 

25the fallback. 

26""" 

27 

28import logging 

29from contextlib import ExitStack 

30from typing import IO, Dict, List, Optional, Tuple 

31 

32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import CacheCapabilities, Digest 

33from buildgrid._protos.google.rpc.status_pb2 import Status 

34from buildgrid.server.metrics_names import ( 

35 CAS_CACHE_BULK_READ_HIT_COUNT_NAME, 

36 CAS_CACHE_BULK_READ_HIT_PERCENTAGE_NAME, 

37 CAS_CACHE_BULK_READ_MISS_COUNT_NAME, 

38 CAS_CACHE_GET_BLOB_HIT_COUNT_NAME, 

39 CAS_CACHE_GET_BLOB_MISS_COUNT_NAME, 

40) 

41from buildgrid.server.metrics_utils import Distribution 

42from buildgrid.settings import MAX_IN_MEMORY_BLOB_SIZE_BYTES 

43 

44from ...threading import ContextThreadPoolExecutor 

45from .storage_abc import StorageABC, create_write_session 

46 

47LOGGER = logging.getLogger(__name__) 

48 

49 

50class WithCacheStorage(StorageABC): 

51 def __init__( 

52 self, 

53 cache: StorageABC, 

54 fallback: StorageABC, 

55 defer_fallback_writes: bool = False, 

56 fallback_writer_threads: int = 20, 

57 ) -> None: 

58 self._stack = ExitStack() 

59 self._instance_name = None 

60 self._cache = cache 

61 self._fallback = fallback 

62 self._defer_fallback_writes = defer_fallback_writes 

63 self._fallback_writer_threads = fallback_writer_threads 

64 self._executor = ContextThreadPoolExecutor(self._fallback_writer_threads, "WithCacheFallbackWriter") 

65 

66 def start(self) -> None: 

67 if self._defer_fallback_writes: 

68 self._stack.enter_context(self._executor) 

69 self._stack.enter_context(self._cache) 

70 self._stack.enter_context(self._fallback) 

71 

72 def stop(self) -> None: 

73 self._stack.close() 

74 LOGGER.info(f"Stopped {type(self).__name__}") 

75 

76 def has_blob(self, digest: Digest) -> bool: 

77 try: 

78 if self._defer_fallback_writes and self._cache.has_blob(digest): 

79 return True 

80 except Exception: 

81 LOGGER.warning(f"Failed to check existence of [{digest}] in cache storage", exc_info=True) 

82 

83 return self._fallback.has_blob(digest) 

84 

85 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

86 try: 

87 cache_result = self._cache.get_blob(digest) 

88 if cache_result is not None: 

89 with Distribution(CAS_CACHE_GET_BLOB_HIT_COUNT_NAME) as metric_cache_hit: 

90 metric_cache_hit.count = 1 

91 return cache_result 

92 except Exception: 

93 LOGGER.warning(f"Failed to read [{digest}] from cache storage", exc_info=True) 

94 

95 fallback_result = self._fallback.get_blob(digest) 

96 if fallback_result is None: 

97 return None 

98 

99 with Distribution(CAS_CACHE_GET_BLOB_MISS_COUNT_NAME) as metric_cache_miss: 

100 metric_cache_miss.count = 1 

101 

102 try: 

103 self._cache.commit_write(digest, fallback_result) 

104 except Exception: 

105 LOGGER.warning(f"Failed to write [{digest}] to cache storage after reading blob", exc_info=True) 

106 

107 fallback_result.seek(0) 

108 return fallback_result 

109 

110 def delete_blob(self, digest: Digest) -> None: 

111 self._fallback.delete_blob(digest) 

112 try: 

113 self._cache.delete_blob(digest) 

114 except Exception: 

115 LOGGER.warning(f"Failed to delete [{digest}] from cache storage", exc_info=True) 

116 

117 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

118 # Only report back failures from the fallback 

119 try: 

120 cache_failures = self._cache.bulk_delete(digests) 

121 for failure in cache_failures: 

122 LOGGER.warning(f"Failed to delete [{failure}] from cache storage") 

123 except Exception: 

124 LOGGER.warning("Failed to bulk delete blobs from cache storage", exc_info=True) 

125 

126 return self._fallback.bulk_delete(digests) 

127 

128 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

129 written_to_cache = False 

130 try: 

131 self._cache.commit_write(digest, write_session) 

132 written_to_cache = True 

133 except Exception: 

134 LOGGER.warning(f"Failed to commit write of [{digest}] to cache storage", exc_info=True) 

135 

136 if written_to_cache and self._defer_fallback_writes: 

137 write_session.seek(0) 

138 deferred_session = create_write_session(digest) 

139 while data := write_session.read(MAX_IN_MEMORY_BLOB_SIZE_BYTES): 

140 deferred_session.write(data) 

141 

142 def deferred_submit() -> None: 

143 with deferred_session: 

144 self._fallback.commit_write(digest, deferred_session) 

145 

146 self._executor.submit(deferred_submit) 

147 else: 

148 self._fallback.commit_write(digest, write_session) 

149 

150 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

151 return self._fallback.missing_blobs(digests) 

152 

153 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

154 try: 

155 self._cache.bulk_update_blobs(blobs) 

156 except Exception: 

157 LOGGER.warning("Failed to bulk update blobs in cache storage", exc_info=True) 

158 

159 return self._fallback.bulk_update_blobs(blobs) 

160 

161 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

162 try: 

163 cache_blobs = self._cache.bulk_read_blobs(digests) 

164 except Exception: 

165 LOGGER.warning("Failed to bulk read blobs from cache storage", exc_info=True) 

166 cache_blobs = {} 

167 

168 with Distribution(CAS_CACHE_BULK_READ_HIT_COUNT_NAME) as metric_cache_hit: 

169 metric_cache_hit.count = len(cache_blobs) 

170 

171 uncached_digests = [digest for digest in digests if cache_blobs.get(digest.hash, None) is None] 

172 

173 with Distribution(CAS_CACHE_BULK_READ_MISS_COUNT_NAME) as metric_cache_miss: 

174 metric_cache_miss.count = len(digests) - len(cache_blobs) 

175 

176 with Distribution(CAS_CACHE_BULK_READ_HIT_PERCENTAGE_NAME) as metric_cache_percent: 

177 if len(digests) == 0: 

178 metric_cache_percent.count = 0 

179 else: 

180 metric_cache_percent.count = len(cache_blobs) / len(digests) * 100 

181 

182 fallback_blobs = self._fallback.bulk_read_blobs(uncached_digests) 

183 cache_blobs.update(fallback_blobs) 

184 

185 uncached_blobs = [] 

186 for digest in uncached_digests: 

187 blob = fallback_blobs.get(digest.hash) 

188 if blob is not None: 

189 uncached_blobs.append((digest, blob)) 

190 

191 try: 

192 self._cache.bulk_update_blobs(uncached_blobs) 

193 except Exception: 

194 LOGGER.warning("Failed to add blobs to cache storage after bulk read", exc_info=True) 

195 

196 return cache_blobs 

197 

198 def set_instance_name(self, instance_name: str) -> None: 

199 self._instance_name = instance_name 

200 self._fallback.set_instance_name(instance_name) 

201 self._cache.set_instance_name(instance_name) 

202 

203 def get_capabilities(self) -> CacheCapabilities: 

204 return self._fallback.get_capabilities()