Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/replicated.py: 93.01%

186 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-04-14 16:27 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ReplicatedStorage 

18========================= 

19 

20A storage provider which stores data in multiple storages, replicating 

21any data missing in some but present in others. 

22 

23""" 

24import queue 

25import threading 

26from contextlib import ExitStack 

27from typing import IO, Any, List 

28 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

30from buildgrid._protos.google.rpc import code_pb2 

31from buildgrid._protos.google.rpc.status_pb2 import Status 

32from buildgrid.server.decorators import timed 

33from buildgrid.server.logging import buildgrid_logger 

34from buildgrid.server.metrics_names import METRIC 

35from buildgrid.server.metrics_utils import publish_counter_metric 

36from buildgrid.server.settings import MAX_REQUEST_COUNT, MAX_REQUEST_SIZE 

37from buildgrid.server.threading import ContextWorker 

38from buildgrid.server.utils.digests import HashableDigest 

39 

40from .storage_abc import StorageABC 

41 

42LOGGER = buildgrid_logger(__name__) 

43 

44 

45class ReplicatedStorage(StorageABC): 

46 TYPE = "Replicated" 

47 

48 def __init__(self, storages: list[StorageABC], replication_queue_size: int = 0) -> None: 

49 self._stack = ExitStack() 

50 self._storages = dict(enumerate(storages)) 

51 self._enable_replicator_thread = False 

52 

53 self._replica_queue: queue.Queue[Any] = queue.Queue(maxsize=replication_queue_size) 

54 self.replication_worker = ContextWorker(name="BlobReplicator", target=self.replication_loop) 

55 if replication_queue_size > 0: 

56 self._enable_replicator_thread = True 

57 

58 def start(self) -> None: 

59 for storage in self._storages.values(): 

60 self._stack.enter_context(storage) 

61 if self._enable_replicator_thread: 

62 self._stack.enter_context(self.replication_worker) 

63 

64 def stop(self) -> None: 

65 self._stack.close() 

66 

67 def replication_loop(self, shutdown_requested: threading.Event) -> None: 

68 # Go through all items in the replication queue and call either 

69 # get_blob or bulk_read_blobs on them to kick off replication 

70 # to all necessary storages 

71 while not shutdown_requested.is_set(): 

72 try: 

73 digests_to_replicate: List[Digest] = self._replica_queue.get(timeout=1) 

74 except queue.Empty: 

75 continue 

76 try: 

77 # Go through the digests to replicate and batch them where possible 

78 batch: List[Digest] = [] 

79 batch_size = 0 

80 for digest in digests_to_replicate: 

81 if digest.size_bytes > MAX_REQUEST_SIZE: 

82 _ = self.get_blob(digest) 

83 continue 

84 

85 if len(batch) + 1 > MAX_REQUEST_COUNT or batch_size + digest.size_bytes > MAX_REQUEST_SIZE: 

86 _ = self.bulk_read_blobs(batch) 

87 batch = [] 

88 batch_size = 0 

89 

90 batch.append(digest) 

91 batch_size += digest.size_bytes 

92 

93 if len(batch) > 0: 

94 _ = self.bulk_read_blobs(batch) 

95 except Exception: 

96 LOGGER.exception(f"Caught exception while replicating {digests_to_replicate}") 

97 shutdown_requested.wait(timeout=1) 

98 self._replica_queue.task_done() 

99 

100 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

101 def has_blob(self, digest: Digest) -> bool: 

102 has_blob: set[int] = set(i for i in self._storages if self._storages[i].has_blob(digest)) 

103 missing_blob = set(self._storages.keys()) - has_blob 

104 if len(missing_blob) < len(self._storages): 

105 publish_counter_metric(METRIC.STORAGE.REPLICATED.REQUIRED_REPLICATION_COUNT, len(missing_blob)) 

106 return len(has_blob) > 0 

107 

108 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

109 def get_blob(self, digest: Digest) -> IO[bytes] | None: 

110 has_blob: set[int] = set(i for i in self._storages if self._storages[i].has_blob(digest)) 

111 missing_blob = set(self._storages.keys()) - has_blob 

112 blob = None 

113 failed_writes = 0 

114 for idx in has_blob: 

115 if blob := self._storages[idx].get_blob(digest): 

116 break 

117 LOGGER.error( 

118 "Storage shard reported digest exists but downloading failed.", 

119 tags=dict(shard_index=idx, digest=digest), 

120 ) 

121 missing_blob.add(idx) 

122 if len(missing_blob) < len(self._storages): 

123 assert blob is not None 

124 for idx in missing_blob: 

125 try: 

126 self._storages[idx].commit_write(digest, blob) 

127 LOGGER.debug("Replicated digest to storage shard.", tags=dict(shard_index=idx, digest=digest)) 

128 except Exception as e: 

129 LOGGER.warning( 

130 f"Failed to replicate digest to storage shard: {e}.", tags=dict(shard_index=idx, digest=digest) 

131 ) 

132 failed_writes += 1 

133 blob.seek(0) 

134 

135 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, failed_writes) 

136 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_COUNT, len(missing_blob) - failed_writes) 

137 return blob 

138 

139 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

140 def delete_blob(self, digest: Digest) -> None: 

141 for storage in self._storages.values(): 

142 storage.delete_blob(digest) 

143 

144 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

145 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

146 failed_writes = 0 

147 error_msgs: list[str] = [] 

148 for idx, storage in self._storages.items(): 

149 try: 

150 storage.commit_write(digest, write_session) 

151 except Exception as error: 

152 LOGGER.warning( 

153 f"Failed to write digest to storage shard: {error}", tags=dict(shard_index=idx, digest=digest) 

154 ) 

155 error_msgs.append(str(error)) 

156 failed_writes += 1 

157 write_session.seek(0) 

158 

159 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, failed_writes) 

160 if failed_writes == len(self._storages): 

161 error_string = "Writes to all storages failed with the following errors:\n" 

162 error_string += "\n".join(error_msgs) 

163 LOGGER.error(error_string) 

164 raise RuntimeError(error_string) 

165 

166 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

167 def bulk_delete(self, digests: list[Digest]) -> list[str]: 

168 failed_deletions_set: set[str] = set() 

169 for storage in self._storages.values(): 

170 failed_deletions_set.union(storage.bulk_delete(digests)) 

171 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(failed_deletions_set), type=self.TYPE) 

172 return list(failed_deletions_set) 

173 

174 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

175 def missing_blobs(self, digests: list[Digest]) -> list[Digest]: 

176 """Call missing_blobs on each storage and only report a blob is missing if it's in none of the 

177 storages. The number of blobs missing from a storage but present in others is logged and published 

178 as a metric 

179 """ 

180 missing_for_storage: dict[int, set[HashableDigest]] = {} 

181 for idx, storage in self._storages.items(): 

182 response = storage.missing_blobs(digests) 

183 missing_for_storage[idx] = {HashableDigest(digest.hash, digest.size_bytes) for digest in response} 

184 

185 # Find the set of inconsistent digests, defined as digests which are missing in some, but not all, storages. 

186 missing_from_all_storages = set.intersection(*missing_for_storage.values()) 

187 inconsistent_digests: set[HashableDigest] = ( 

188 set.union(*missing_for_storage.values()) - missing_from_all_storages 

189 ) 

190 for idx, missing_digests in missing_for_storage.items(): 

191 inconsistent_digests_for_storage = missing_digests & inconsistent_digests 

192 if inconsistent_digests_for_storage: 

193 LOGGER.info( 

194 "Storage shard has blobs which need to be replicated.", 

195 tags=dict(shard_index=idx, digest_count=len(inconsistent_digests_for_storage)), 

196 ) 

197 

198 if self._enable_replicator_thread and len(inconsistent_digests) > 0: 

199 try: 

200 self._replica_queue.put_nowait([x.to_digest() for x in inconsistent_digests]) 

201 except queue.Full: 

202 LOGGER.warning( 

203 "Digests to be replicated were skipped due to full replication queue.", 

204 tags=dict(skipped_digests=len(inconsistent_digests)), 

205 ) 

206 publish_counter_metric( 

207 METRIC.STORAGE.REPLICATED.REPLICATION_QUEUE_FULL_COUNT, len(inconsistent_digests_for_storage) 

208 ) 

209 publish_counter_metric(METRIC.STORAGE.REPLICATED.REQUIRED_REPLICATION_COUNT, len(inconsistent_digests)) 

210 missing_blobs_list = [hdigest.to_digest() for hdigest in missing_from_all_storages] 

211 return missing_blobs_list 

212 

213 # Bulk write to all storages. Errors are not fatal as long as one storage is 

214 # successfully written to 

215 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

216 def bulk_update_blobs(self, blobs: list[tuple[Digest, bytes]]) -> list[Status]: 

217 digest_result: dict[HashableDigest, Status] = {} 

218 errored_blobs_total = 0 

219 for idx, storage in self._storages.items(): 

220 errored_blobs_for_storage = 0 

221 results = storage.bulk_update_blobs(blobs) 

222 for digest_blob_tuple, result in zip(blobs, results): 

223 digest, _ = digest_blob_tuple 

224 hdigest = HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) 

225 

226 if result.code != code_pb2.OK: 

227 errored_blobs_for_storage += 1 

228 

229 # Keep track of the status code for this digest, preferring OK over errors 

230 if hdigest not in digest_result or digest_result[hdigest].code != code_pb2.OK: 

231 digest_result[hdigest] = result 

232 

233 if errored_blobs_for_storage > 0: 

234 LOGGER.warning( 

235 "Failed to write all digests to storage shard.", 

236 tags=dict(shard_index=idx, digest_count=len(results), error_count=errored_blobs_for_storage), 

237 ) 

238 errored_blobs_total += errored_blobs_for_storage 

239 

240 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, errored_blobs_total) 

241 return [digest_result[hdigest] for hdigest in digest_result] 

242 

243 # Read blobs from all storages, writing any missing blobs into storages missing 

244 # them from storages that have them 

245 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

246 def bulk_read_blobs(self, digests: list[Digest]) -> dict[str, bytes]: 

247 digest_set = set(HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) for digest in digests) 

248 missing_blobs: dict[int, set[HashableDigest]] = {} 

249 bulk_read_results: dict[str, bytes] = {} 

250 # Find what blobs are missing for this storage and read what's available 

251 for idx, storage in self._storages.items(): 

252 missing_blobs[idx] = set( 

253 [ 

254 HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) 

255 for digest in storage.missing_blobs(digests) 

256 ] 

257 ) 

258 present_blobs = digest_set - missing_blobs[idx] 

259 blobs_to_read = [x.to_digest() for x in present_blobs if x.hash not in bulk_read_results] 

260 bulk_read_results.update(self._storages[idx].bulk_read_blobs(blobs_to_read)) 

261 

262 replicated_blobs_count = 0 

263 errored_blobs_count = 0 

264 for idx, missing in missing_blobs.items(): 

265 # Write any blobs that exist in other storages which are missing from this storage 

266 write_batch: list[tuple[Digest, bytes]] = [] 

267 for digest in missing: 

268 if digest.hash in bulk_read_results: 

269 write_batch.append((digest.to_digest(), bulk_read_results[digest.hash])) 

270 if write_batch: 

271 update_results = self._storages[idx].bulk_update_blobs(write_batch) 

272 for result in update_results: 

273 if result.code != code_pb2.OK: 

274 errored_blobs_count += 1 

275 else: 

276 replicated_blobs_count += 1 

277 LOGGER.debug( 

278 "Replicated blobs to storage shard.", tags=dict(shard_index=idx, digest_count=len(write_batch)) 

279 ) 

280 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_COUNT, replicated_blobs_count) 

281 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, errored_blobs_count) 

282 return bulk_read_results