Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/replicated.py: 94.37%

142 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ReplicatedStorage 

18========================= 

19 

20A storage provider which stores data in multiple storages, replicating 

21any data missing in some but present in others. 

22 

23""" 

24 

25import logging 

26from contextlib import ExitStack 

27from typing import IO, Dict, List, Optional, Set, Tuple 

28 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

30from buildgrid._protos.google.rpc import code_pb2 

31from buildgrid._protos.google.rpc.status_pb2 import Status 

32from buildgrid.server.metrics_names import ( 

33 CAS_REPLICATED_STORAGE_BLOBS_NEED_REPLICATING_COUNT_METRIC_NAME, 

34 CAS_REPLICATED_STORAGE_ERRORED_BLOBS_COUNT_METRIC_NAME, 

35 CAS_REPLICATED_STORAGE_REPLICATED_BLOBS_COUNT_METRIC_NAME, 

36) 

37from buildgrid.server.metrics_utils import publish_counter_metric 

38from buildgrid.utils import HashableDigest 

39 

40from .storage_abc import StorageABC 

41 

42LOGGER = logging.getLogger(__name__) 

43 

44 

45class ReplicatedStorage(StorageABC): 

46 def __init__( 

47 self, 

48 storages: List[StorageABC], 

49 ) -> None: 

50 self._stack = ExitStack() 

51 self._storages = dict(enumerate(storages)) 

52 self._instance_name = None 

53 

54 def set_instance_name(self, instance_name: str) -> None: 

55 super().set_instance_name(instance_name) 

56 for storage in self._storages.values(): 

57 storage.set_instance_name(instance_name) 

58 

59 def start(self) -> None: 

60 for storage in self._storages.values(): 

61 self._stack.enter_context(storage) 

62 

63 def stop(self) -> None: 

64 self._stack.close() 

65 

66 def has_blob(self, digest: Digest) -> bool: 

67 has_blob: Set[int] = set(i for i in self._storages if self._storages[i].has_blob(digest)) 

68 missing_blob = set(self._storages.keys()) - has_blob 

69 if len(missing_blob) < len(self._storages): 

70 publish_counter_metric( 

71 CAS_REPLICATED_STORAGE_BLOBS_NEED_REPLICATING_COUNT_METRIC_NAME, 

72 len(missing_blob), 

73 {"instance_name": self._instance_name or ""}, 

74 ) 

75 return len(has_blob) > 0 

76 

77 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

78 has_blob: Set[int] = set(i for i in self._storages if self._storages[i].has_blob(digest)) 

79 missing_blob = set(self._storages.keys()) - has_blob 

80 blob = None 

81 failed_writes = 0 

82 for idx in has_blob: 

83 if blob := self._storages[idx].get_blob(digest): 

84 break 

85 LOGGER.error(f"Storage #{idx} reported {digest} exists but downloading failed") 

86 missing_blob.add(idx) 

87 if len(missing_blob) < len(self._storages): 

88 assert blob is not None 

89 for idx in missing_blob: 

90 try: 

91 self._storages[idx].commit_write(digest, blob) 

92 LOGGER.debug(f"Replicated {digest.hash}/{digest.size_bytes} to storage #{idx}") 

93 except Exception as e: 

94 LOGGER.warning(f"Failed to replicate {digest.hash}/{digest.size_bytes} to storage #{idx}: {e}") 

95 failed_writes += 1 

96 blob.seek(0) 

97 

98 publish_counter_metric( 

99 CAS_REPLICATED_STORAGE_ERRORED_BLOBS_COUNT_METRIC_NAME, 

100 failed_writes, 

101 {"instance_name": self._instance_name or ""}, 

102 ) 

103 publish_counter_metric( 

104 CAS_REPLICATED_STORAGE_REPLICATED_BLOBS_COUNT_METRIC_NAME, 

105 len(missing_blob) - failed_writes, 

106 {"instance_name": self._instance_name or ""}, 

107 ) 

108 return blob 

109 

110 def delete_blob(self, digest: Digest) -> None: 

111 for storage in self._storages.values(): 

112 storage.delete_blob(digest) 

113 

114 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

115 failed_writes = 0 

116 error_msgs: List[str] = [] 

117 for idx, storage in self._storages.items(): 

118 try: 

119 storage.commit_write(digest, write_session) 

120 except Exception as error: 

121 LOGGER.warning(f"Failed to write {digest.hash}/{digest.size_bytes} to storage #{idx}: {error}") 

122 error_msgs.append(str(error)) 

123 failed_writes += 1 

124 write_session.seek(0) 

125 

126 publish_counter_metric( 

127 CAS_REPLICATED_STORAGE_ERRORED_BLOBS_COUNT_METRIC_NAME, 

128 failed_writes, 

129 {"instance_name": self._instance_name or ""}, 

130 ) 

131 if failed_writes == len(self._storages): 

132 error_string = "Writes to all storages failed with the following errors:\n" 

133 error_string += "\n".join(error_msgs) 

134 LOGGER.error(error_string) 

135 raise RuntimeError(error_string) 

136 

137 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

138 failed_deletions_set: Set[str] = set() 

139 for storage in self._storages.values(): 

140 failed_deletions_set.union(storage.bulk_delete(digests)) 

141 

142 return list(failed_deletions_set) 

143 

144 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

145 """Call missing_blobs on each storage and only report a blob is missing if it's in none of the 

146 storages. The number of blobs missing from a storage but present in others is logged and published 

147 as a metric 

148 """ 

149 digest_set: Set[HashableDigest] = {HashableDigest(digest.hash, digest.size_bytes) for digest in digests} 

150 missing_from_all_storages = digest_set.copy() 

151 missing_for_storage: Dict[int, Set[HashableDigest]] = {} 

152 

153 blobs_needing_replication_count = 0 

154 for idx, storage in self._storages.items(): 

155 response = storage.missing_blobs(digests) 

156 missing_for_storage[idx] = {HashableDigest(digest.hash, digest.size_bytes) for digest in response} 

157 

158 missing_from_all_storages = set.intersection(*missing_for_storage.values()) 

159 digests_in_at_least_one_storage: Set[HashableDigest] = digest_set - missing_from_all_storages 

160 # Figure out what blobs are present in some storages but not all to report metrics 

161 for idx, missing_digests in missing_for_storage.items(): 

162 inconsistent_digests_for_storage = missing_digests & digests_in_at_least_one_storage 

163 if inconsistent_digests_for_storage: 

164 LOGGER.info( 

165 f"Storage #{idx} has {len(inconsistent_digests_for_storage)} blobs which need to be replicated" 

166 ) 

167 blobs_needing_replication_count += len(inconsistent_digests_for_storage) 

168 

169 publish_counter_metric( 

170 CAS_REPLICATED_STORAGE_BLOBS_NEED_REPLICATING_COUNT_METRIC_NAME, 

171 blobs_needing_replication_count, 

172 {"instance_name": self._instance_name or ""}, 

173 ) 

174 missing_blobs_list = [hdigest.to_digest() for hdigest in missing_from_all_storages] 

175 return missing_blobs_list 

176 

177 # Bulk write to all storages. Errors are not fatal as long as one storage is 

178 # successfully written to 

179 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

180 digest_result: Dict[HashableDigest, Status] = {} 

181 errored_blobs_total = 0 

182 for idx, storage in self._storages.items(): 

183 errored_blobs_for_storage = 0 

184 results = storage.bulk_update_blobs(blobs) 

185 for digest_blob_tuple, result in zip(blobs, results): 

186 digest, _ = digest_blob_tuple 

187 hdigest = HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) 

188 

189 if result.code != code_pb2.OK: 

190 errored_blobs_for_storage += 1 

191 

192 # Keep track of the status code for this digest, preferring OK over errors 

193 if hdigest not in digest_result or digest_result[hdigest].code != code_pb2.OK: 

194 digest_result[hdigest] = result 

195 

196 if errored_blobs_for_storage > 0: 

197 LOGGER.warning(f"Failed to write {errored_blobs_for_storage}/{len(results)} digests to storage #{idx}") 

198 errored_blobs_total += errored_blobs_for_storage 

199 

200 publish_counter_metric( 

201 CAS_REPLICATED_STORAGE_ERRORED_BLOBS_COUNT_METRIC_NAME, 

202 errored_blobs_total, 

203 {"instance_name": self._instance_name or ""}, 

204 ) 

205 return [digest_result[hdigest] for hdigest in digest_result] 

206 

207 # Read blobs from all storages, writing any missing blobs into storages missing 

208 # them from storages that have them 

209 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

210 digest_set = set(HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) for digest in digests) 

211 missing_blobs: Dict[int, Set[HashableDigest]] = {} 

212 bulk_read_results: Dict[str, bytes] = {} 

213 # Find what blobs are missing for this storage and read what's available 

214 for idx, storage in self._storages.items(): 

215 missing_blobs[idx] = set( 

216 [ 

217 HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) 

218 for digest in storage.missing_blobs(digests) 

219 ] 

220 ) 

221 present_blobs = digest_set - missing_blobs[idx] 

222 blobs_to_read = [x.to_digest() for x in present_blobs if x.hash not in bulk_read_results] 

223 bulk_read_results.update(self._storages[idx].bulk_read_blobs(blobs_to_read)) 

224 

225 replicated_blobs_count = 0 

226 errored_blobs_count = 0 

227 for idx, missing in missing_blobs.items(): 

228 # Write any blobs that exist in other storages which are missing from this storage 

229 write_batch: List[Tuple[Digest, bytes]] = [] 

230 for digest in missing: 

231 if digest.hash in bulk_read_results: 

232 write_batch.append((digest.to_digest(), bulk_read_results[digest.hash])) 

233 if write_batch: 

234 update_results = self._storages[idx].bulk_update_blobs(write_batch) 

235 for result in update_results: 

236 if result.code != code_pb2.OK: 

237 errored_blobs_count += 1 

238 else: 

239 replicated_blobs_count += 1 

240 LOGGER.debug(f"Replicated {len(write_batch)} blobs to storage #{idx}") 

241 

242 publish_counter_metric( 

243 CAS_REPLICATED_STORAGE_REPLICATED_BLOBS_COUNT_METRIC_NAME, 

244 replicated_blobs_count, 

245 {"instance_name": self._instance_name or ""}, 

246 ) 

247 publish_counter_metric( 

248 CAS_REPLICATED_STORAGE_ERRORED_BLOBS_COUNT_METRIC_NAME, 

249 errored_blobs_count, 

250 {"instance_name": self._instance_name or ""}, 

251 ) 

252 return bulk_read_results