Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/with_cache.py: 97.69%

130 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17WithCacheStorage 

18================== 

19 

20A storage provider that first checks a cache, then tries a slower 

21fallback provider. 

22 

23To ensure clients can reliably store blobs in CAS, only `get_blob` 

24calls are cached -- `has_blob` and `missing_blobs` will always query 

25the fallback. 

26""" 

27 

28 

29from contextlib import ExitStack 

30from typing import IO, Dict, List, Optional, Tuple 

31 

32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

33from buildgrid._protos.google.rpc.status_pb2 import Status 

34from buildgrid.server.decorators import timed 

35from buildgrid.server.logging import buildgrid_logger 

36from buildgrid.server.metrics_names import METRIC 

37from buildgrid.server.metrics_utils import publish_counter_metric, publish_distribution_metric 

38from buildgrid.server.settings import MAX_IN_MEMORY_BLOB_SIZE_BYTES 

39from buildgrid.server.threading import ContextThreadPoolExecutor 

40 

41from .storage_abc import StorageABC, create_write_session 

42 

43LOGGER = buildgrid_logger(__name__) 

44 

45 

46class WithCacheStorage(StorageABC): 

47 TYPE = "WithCache" 

48 

49 def __init__( 

50 self, 

51 cache: StorageABC, 

52 fallback: StorageABC, 

53 defer_fallback_writes: bool = False, 

54 fallback_writer_threads: int = 20, 

55 ) -> None: 

56 self._stack = ExitStack() 

57 self._cache = cache 

58 self._fallback = fallback 

59 self._defer_fallback_writes = defer_fallback_writes 

60 self._fallback_writer_threads = fallback_writer_threads 

61 self._executor = ContextThreadPoolExecutor(self._fallback_writer_threads, "WithCacheFallbackWriter") 

62 

63 def start(self) -> None: 

64 if self._defer_fallback_writes: 

65 self._stack.enter_context(self._executor) 

66 self._stack.enter_context(self._cache) 

67 self._stack.enter_context(self._fallback) 

68 

69 def stop(self) -> None: 

70 self._stack.close() 

71 LOGGER.info(f"Stopped {type(self).__name__}") 

72 

73 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

74 def has_blob(self, digest: Digest) -> bool: 

75 try: 

76 if self._defer_fallback_writes and self._cache.has_blob(digest): 

77 return True 

78 except Exception: 

79 LOGGER.warning( 

80 "Failed to check existence of digest in cache storage.", tags=dict(digest=digest), exc_info=True 

81 ) 

82 

83 return self._fallback.has_blob(digest) 

84 

85 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

86 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

87 try: 

88 cache_result = self._cache.get_blob(digest) 

89 if cache_result is not None: 

90 publish_distribution_metric(METRIC.STORAGE.WITH_CACHE.CACHE_HIT_COUNT, 1) 

91 return cache_result 

92 except Exception: 

93 LOGGER.warning("Failed to read digest from cache storage.", tags=dict(digest=digest), exc_info=True) 

94 

95 fallback_result = self._fallback.get_blob(digest) 

96 if fallback_result is None: 

97 return None 

98 

99 publish_distribution_metric(METRIC.STORAGE.WITH_CACHE.CACHE_MISS_COUNT, 1) 

100 

101 try: 

102 self._cache.commit_write(digest, fallback_result) 

103 except Exception: 

104 LOGGER.warning( 

105 "Failed to write digest to cache storage after reading blob.", tags=dict(digest=digest), exc_info=True 

106 ) 

107 

108 fallback_result.seek(0) 

109 return fallback_result 

110 

111 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

112 def delete_blob(self, digest: Digest) -> None: 

113 self._fallback.delete_blob(digest) 

114 try: 

115 self._cache.delete_blob(digest) 

116 except Exception: 

117 LOGGER.warning("Failed to delete digest from cache storage.", tags=dict(digest=digest), exc_info=True) 

118 

119 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

120 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

121 # Only report back failures from the fallback 

122 try: 

123 cache_failures = self._cache.bulk_delete(digests) 

124 for failure in cache_failures: 

125 LOGGER.warning("Failed to delete digest from cache storage.", tags=dict(digest=failure)) 

126 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(cache_failures), type=self.TYPE) 

127 except Exception: 

128 LOGGER.warning("Failed to bulk delete blobs from cache storage.", exc_info=True) 

129 

130 fallback_failures = self._fallback.bulk_delete(digests) 

131 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(fallback_failures), type=self.TYPE) 

132 return fallback_failures 

133 

134 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

135 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

136 written_to_cache = False 

137 try: 

138 self._cache.commit_write(digest, write_session) 

139 written_to_cache = True 

140 except Exception: 

141 LOGGER.warning( 

142 "Failed to commit write of digest to cache storage.", tags=dict(digest=digest), exc_info=True 

143 ) 

144 

145 if written_to_cache and self._defer_fallback_writes: 

146 write_session.seek(0) 

147 deferred_session = create_write_session(digest) 

148 while data := write_session.read(MAX_IN_MEMORY_BLOB_SIZE_BYTES): 

149 deferred_session.write(data) 

150 

151 def deferred_submit() -> None: 

152 with deferred_session: 

153 self._fallback.commit_write(digest, deferred_session) 

154 

155 self._executor.submit(deferred_submit) 

156 else: 

157 self._fallback.commit_write(digest, write_session) 

158 

159 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

160 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

161 return self._fallback.missing_blobs(digests) 

162 

163 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

164 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

165 try: 

166 self._cache.bulk_update_blobs(blobs) 

167 except Exception: 

168 LOGGER.warning("Failed to bulk update blobs in cache storage.", exc_info=True) 

169 

170 return self._fallback.bulk_update_blobs(blobs) 

171 

172 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

173 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

174 try: 

175 cache_blobs = self._cache.bulk_read_blobs(digests) 

176 except Exception: 

177 LOGGER.warning("Failed to bulk read blobs from cache storage.", exc_info=True) 

178 cache_blobs = {} 

179 

180 publish_distribution_metric(METRIC.STORAGE.WITH_CACHE.CACHE_HIT_COUNT, len(cache_blobs)) 

181 

182 uncached_digests = [digest for digest in digests if cache_blobs.get(digest.hash, None) is None] 

183 

184 publish_distribution_metric(METRIC.STORAGE.WITH_CACHE.CACHE_MISS_COUNT, len(digests) - len(cache_blobs)) 

185 

186 metric_cache_percent = 0.0 

187 if len(digests) > 0: 

188 metric_cache_percent = len(cache_blobs) / len(digests) * 100 

189 publish_distribution_metric(METRIC.STORAGE.WITH_CACHE.CACHE_HIT_PERCENT, metric_cache_percent) 

190 

191 fallback_blobs = self._fallback.bulk_read_blobs(uncached_digests) 

192 cache_blobs.update(fallback_blobs) 

193 

194 uncached_blobs = [] 

195 for digest in uncached_digests: 

196 blob = fallback_blobs.get(digest.hash) 

197 if blob is not None: 

198 uncached_blobs.append((digest, blob)) 

199 

200 try: 

201 self._cache.bulk_update_blobs(uncached_blobs) 

202 except Exception: 

203 LOGGER.warning("Failed to add blobs to cache storage after bulk read.", exc_info=True) 

204 

205 return cache_blobs