Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/replicated.py: 96.62%

148 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ReplicatedStorage 

18========================= 

19 

20A storage provider which stores data in multiple storages, replicating 

21any data missing in some but present in others. 

22 

23""" 

24 

25 

26from contextlib import ExitStack 

27from typing import IO, Dict, List, Optional, Set, Tuple 

28 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

30from buildgrid._protos.google.rpc import code_pb2 

31from buildgrid._protos.google.rpc.status_pb2 import Status 

32from buildgrid.server.decorators import timed 

33from buildgrid.server.logging import buildgrid_logger 

34from buildgrid.server.metrics_names import METRIC 

35from buildgrid.server.metrics_utils import publish_counter_metric 

36from buildgrid.server.utils.digests import HashableDigest 

37 

38from .storage_abc import StorageABC 

39 

40LOGGER = buildgrid_logger(__name__) 

41 

42 

43class ReplicatedStorage(StorageABC): 

44 TYPE = "Replicated" 

45 

46 def __init__(self, storages: List[StorageABC]) -> None: 

47 self._stack = ExitStack() 

48 self._storages = dict(enumerate(storages)) 

49 

50 def start(self) -> None: 

51 for storage in self._storages.values(): 

52 self._stack.enter_context(storage) 

53 

54 def stop(self) -> None: 

55 self._stack.close() 

56 

57 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

58 def has_blob(self, digest: Digest) -> bool: 

59 has_blob: Set[int] = set(i for i in self._storages if self._storages[i].has_blob(digest)) 

60 missing_blob = set(self._storages.keys()) - has_blob 

61 if len(missing_blob) < len(self._storages): 

62 publish_counter_metric(METRIC.STORAGE.REPLICATED.REQUIRED_REPLICATION_COUNT, len(missing_blob)) 

63 return len(has_blob) > 0 

64 

65 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

66 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

67 has_blob: Set[int] = set(i for i in self._storages if self._storages[i].has_blob(digest)) 

68 missing_blob = set(self._storages.keys()) - has_blob 

69 blob = None 

70 failed_writes = 0 

71 for idx in has_blob: 

72 if blob := self._storages[idx].get_blob(digest): 

73 break 

74 LOGGER.error( 

75 "Storage shard reported digest exists but downloading failed.", 

76 tags=dict(shard_index=idx, digest=digest), 

77 ) 

78 missing_blob.add(idx) 

79 if len(missing_blob) < len(self._storages): 

80 assert blob is not None 

81 for idx in missing_blob: 

82 try: 

83 self._storages[idx].commit_write(digest, blob) 

84 LOGGER.debug("Replicated digest to storage shard.", tags=dict(shard_index=idx, digest=digest)) 

85 except Exception as e: 

86 LOGGER.warning( 

87 f"Failed to replicate digest to storage shard: {e}.", tags=dict(shard_index=idx, digest=digest) 

88 ) 

89 failed_writes += 1 

90 blob.seek(0) 

91 

92 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, failed_writes) 

93 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_COUNT, len(missing_blob) - failed_writes) 

94 return blob 

95 

96 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

97 def delete_blob(self, digest: Digest) -> None: 

98 for storage in self._storages.values(): 

99 storage.delete_blob(digest) 

100 

101 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

102 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

103 failed_writes = 0 

104 error_msgs: List[str] = [] 

105 for idx, storage in self._storages.items(): 

106 try: 

107 storage.commit_write(digest, write_session) 

108 except Exception as error: 

109 LOGGER.warning( 

110 f"Failed to write digest to storage shard: {error}", tags=dict(shard_index=idx, digest=digest) 

111 ) 

112 error_msgs.append(str(error)) 

113 failed_writes += 1 

114 write_session.seek(0) 

115 

116 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, failed_writes) 

117 if failed_writes == len(self._storages): 

118 error_string = "Writes to all storages failed with the following errors:\n" 

119 error_string += "\n".join(error_msgs) 

120 LOGGER.error(error_string) 

121 raise RuntimeError(error_string) 

122 

123 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

124 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

125 failed_deletions_set: Set[str] = set() 

126 for storage in self._storages.values(): 

127 failed_deletions_set.union(storage.bulk_delete(digests)) 

128 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(failed_deletions_set), type=self.TYPE) 

129 return list(failed_deletions_set) 

130 

131 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

132 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

133 """Call missing_blobs on each storage and only report a blob is missing if it's in none of the 

134 storages. The number of blobs missing from a storage but present in others is logged and published 

135 as a metric 

136 """ 

137 digest_set: Set[HashableDigest] = {HashableDigest(digest.hash, digest.size_bytes) for digest in digests} 

138 missing_from_all_storages = digest_set.copy() 

139 missing_for_storage: Dict[int, Set[HashableDigest]] = {} 

140 

141 blobs_needing_replication_count = 0 

142 for idx, storage in self._storages.items(): 

143 response = storage.missing_blobs(digests) 

144 missing_for_storage[idx] = {HashableDigest(digest.hash, digest.size_bytes) for digest in response} 

145 

146 missing_from_all_storages = set.intersection(*missing_for_storage.values()) 

147 digests_in_at_least_one_storage: Set[HashableDigest] = digest_set - missing_from_all_storages 

148 # Figure out what blobs are present in some storages but not all to report metrics 

149 for idx, missing_digests in missing_for_storage.items(): 

150 inconsistent_digests_for_storage = missing_digests & digests_in_at_least_one_storage 

151 if inconsistent_digests_for_storage: 

152 LOGGER.info( 

153 "Storage shard has blobs which need to be replicated.", 

154 tags=dict(shard_index=idx, digest_count=len(inconsistent_digests_for_storage)), 

155 ) 

156 blobs_needing_replication_count += len(inconsistent_digests_for_storage) 

157 

158 publish_counter_metric(METRIC.STORAGE.REPLICATED.REQUIRED_REPLICATION_COUNT, blobs_needing_replication_count) 

159 missing_blobs_list = [hdigest.to_digest() for hdigest in missing_from_all_storages] 

160 return missing_blobs_list 

161 

162 # Bulk write to all storages. Errors are not fatal as long as one storage is 

163 # successfully written to 

164 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

165 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

166 digest_result: Dict[HashableDigest, Status] = {} 

167 errored_blobs_total = 0 

168 for idx, storage in self._storages.items(): 

169 errored_blobs_for_storage = 0 

170 results = storage.bulk_update_blobs(blobs) 

171 for digest_blob_tuple, result in zip(blobs, results): 

172 digest, _ = digest_blob_tuple 

173 hdigest = HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) 

174 

175 if result.code != code_pb2.OK: 

176 errored_blobs_for_storage += 1 

177 

178 # Keep track of the status code for this digest, preferring OK over errors 

179 if hdigest not in digest_result or digest_result[hdigest].code != code_pb2.OK: 

180 digest_result[hdigest] = result 

181 

182 if errored_blobs_for_storage > 0: 

183 LOGGER.warning( 

184 "Failed to write all digests to storage shard.", 

185 tags=dict(shard_index=idx, digest_count=len(results), error_count=errored_blobs_for_storage), 

186 ) 

187 errored_blobs_total += errored_blobs_for_storage 

188 

189 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, errored_blobs_total) 

190 return [digest_result[hdigest] for hdigest in digest_result] 

191 

192 # Read blobs from all storages, writing any missing blobs into storages missing 

193 # them from storages that have them 

194 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

195 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

196 digest_set = set(HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) for digest in digests) 

197 missing_blobs: Dict[int, Set[HashableDigest]] = {} 

198 bulk_read_results: Dict[str, bytes] = {} 

199 # Find what blobs are missing for this storage and read what's available 

200 for idx, storage in self._storages.items(): 

201 missing_blobs[idx] = set( 

202 [ 

203 HashableDigest(hash=digest.hash, size_bytes=digest.size_bytes) 

204 for digest in storage.missing_blobs(digests) 

205 ] 

206 ) 

207 present_blobs = digest_set - missing_blobs[idx] 

208 blobs_to_read = [x.to_digest() for x in present_blobs if x.hash not in bulk_read_results] 

209 bulk_read_results.update(self._storages[idx].bulk_read_blobs(blobs_to_read)) 

210 

211 replicated_blobs_count = 0 

212 errored_blobs_count = 0 

213 for idx, missing in missing_blobs.items(): 

214 # Write any blobs that exist in other storages which are missing from this storage 

215 write_batch: List[Tuple[Digest, bytes]] = [] 

216 for digest in missing: 

217 if digest.hash in bulk_read_results: 

218 write_batch.append((digest.to_digest(), bulk_read_results[digest.hash])) 

219 if write_batch: 

220 update_results = self._storages[idx].bulk_update_blobs(write_batch) 

221 for result in update_results: 

222 if result.code != code_pb2.OK: 

223 errored_blobs_count += 1 

224 else: 

225 replicated_blobs_count += 1 

226 LOGGER.debug( 

227 "Replicated blobs to storage shard.", tags=dict(shard_index=idx, digest_count=len(write_batch)) 

228 ) 

229 

230 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_COUNT, replicated_blobs_count) 

231 publish_counter_metric(METRIC.STORAGE.REPLICATED.REPLICATION_ERROR_COUNT, errored_blobs_count) 

232 return bulk_read_results