Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/size_differentiated.py: 100.00%

127 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-28 16:48 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SizeDifferentiatedStorage 

18========================= 

19 

20A storage provider which passes requests to other storage providers 

21based on the size of the blob being requested. 

22 

23""" 

24 

25 

26from collections import defaultdict 

27from contextlib import ExitStack 

28from typing import IO, Callable, Iterable, Iterator, NamedTuple, TypedDict, TypeVar 

29 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

31from buildgrid._protos.google.rpc import code_pb2 

32from buildgrid._protos.google.rpc.status_pb2 import Status 

33from buildgrid.server.decorators import timed 

34from buildgrid.server.logging import buildgrid_logger 

35from buildgrid.server.metrics_names import METRIC 

36from buildgrid.server.metrics_utils import publish_counter_metric 

37from buildgrid.server.threading import ContextThreadPoolExecutor 

38 

39from .storage_abc import StorageABC 

40 

41LOGGER = buildgrid_logger(__name__) 

42 

43 

44class SizeLimitedStorageType(TypedDict): 

45 max_size: int 

46 storage: StorageABC 

47 

48 

49# NOTE: This exists separately to the TypedDict to allow attribute-based access 

50# at runtime, rather than relying on string-based access to dictionary keys. 

51class _SizeLimitedStorage(NamedTuple): 

52 max_size: int 

53 storage: StorageABC 

54 

55 

56_T = TypeVar("_T") 

57_R = TypeVar("_R") 

58 

59 

60# wrapper functions for the bulk StorageABC interfaces 

61def _bulk_delete_for_storage(storage_digests: tuple[StorageABC, list[Digest]]) -> list[str]: 

62 storage, digests = storage_digests 

63 return storage.bulk_delete(digests) 

64 

65 

66def _fmb_for_storage(storage_digests: tuple[StorageABC, list[Digest]]) -> list[Digest]: 

67 storage, digests = storage_digests 

68 return storage.missing_blobs(digests) 

69 

70 

71def _bulk_update_for_storage( 

72 storage_digests: tuple[StorageABC, list[tuple[Digest, bytes]]], 

73) -> tuple[StorageABC, list[Status]]: 

74 storage, digest_tuples = storage_digests 

75 return storage, storage.bulk_update_blobs(digest_tuples) 

76 

77 

78def _bulk_read_for_storage(storage_digests: tuple[StorageABC, list[Digest]]) -> dict[str, bytes]: 

79 storage, digests = storage_digests 

80 return storage.bulk_read_blobs(digests) 

81 

82 

83class SizeDifferentiatedStorage(StorageABC): 

84 TYPE = "SizeDifferentiated" 

85 

86 def __init__( 

87 self, storages: list[SizeLimitedStorageType], fallback: StorageABC, thread_pool_size: int | None = None 

88 ) -> None: 

89 self._stack = ExitStack() 

90 self._fallback_storage = fallback 

91 self._storages = [_SizeLimitedStorage(**storage) for storage in storages] 

92 self._storages.sort(key=lambda storage: storage.max_size) 

93 self._threadpool: ContextThreadPoolExecutor | None = None 

94 if thread_pool_size: 

95 self._threadpool = ContextThreadPoolExecutor(thread_pool_size, "size-differentiated-storage") 

96 

97 def _storage_from_digest(self, digest: Digest) -> StorageABC: 

98 for storage in self._storages: 

99 if digest.size_bytes < storage.max_size: 

100 return storage.storage 

101 # If the blob is too big for any of the size-limited storages, 

102 # put it in the fallback. 

103 return self._fallback_storage 

104 

105 def _partition_digests(self, digests: list[Digest]) -> dict[StorageABC, list[Digest]]: 

106 partition: dict[StorageABC, list[Digest]] = defaultdict(list) 

107 for digest in digests: 

108 storage = self._storage_from_digest(digest) 

109 partition[storage].append(digest) 

110 return partition 

111 

112 def _map(self, fn: Callable[[_T], _R], args: Iterable[_T]) -> Iterator[_R]: 

113 if self._threadpool: 

114 return self._threadpool.map(fn, args) 

115 else: 

116 return map(fn, args) 

117 

118 def start(self) -> None: 

119 if self._threadpool: 

120 self._stack.enter_context(self._threadpool) 

121 for storage_tuple in self._storages: 

122 self._stack.enter_context(storage_tuple.storage) 

123 

124 def stop(self) -> None: 

125 self._stack.close() 

126 LOGGER.info(f"Stopped {type(self).__name__}") 

127 

128 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

129 def has_blob(self, digest: Digest) -> bool: 

130 LOGGER.debug("Checking for blob.", tags=dict(digest=digest)) 

131 storage = self._storage_from_digest(digest) 

132 return storage.has_blob(digest) 

133 

134 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

135 def get_blob(self, digest: Digest) -> IO[bytes] | None: 

136 LOGGER.debug("Getting blob.", tags=dict(digest=digest)) 

137 storage = self._storage_from_digest(digest) 

138 return storage.get_blob(digest) 

139 

140 @timed(METRIC.STORAGE.STREAM_READ_DURATION, type=TYPE) 

141 def stream_read_blob(self, digest: Digest, chunk_size: int, offset: int = 0, limit: int = 0) -> Iterator[bytes]: 

142 LOGGER.debug("Streaming blob.", tags=dict(digest=digest)) 

143 storage = self._storage_from_digest(digest) 

144 yield from storage.stream_read_blob(digest, chunk_size, offset, limit) 

145 

146 @timed(METRIC.STORAGE.STREAM_WRITE_DURATION, type=TYPE) 

147 def stream_write_blob(self, digest: Digest, chunks: Iterator[bytes]) -> None: 

148 self._storage_from_digest(digest).stream_write_blob(digest, chunks) 

149 

150 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

151 def delete_blob(self, digest: Digest) -> None: 

152 LOGGER.debug("Deleting blob.", tags=dict(digest=digest)) 

153 storage = self._storage_from_digest(digest) 

154 storage.delete_blob(digest) 

155 

156 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

157 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

158 LOGGER.debug("Writing blob.", tags=dict(digest=digest)) 

159 storage = self._storage_from_digest(digest) 

160 storage.commit_write(digest, write_session) 

161 

162 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

163 def bulk_delete(self, digests: list[Digest]) -> list[str]: 

164 failed_deletions: list[str] = [] 

165 for result in self._map(_bulk_delete_for_storage, self._partition_digests(digests).items()): 

166 failed_deletions.extend(result) 

167 

168 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(failed_deletions), type=self.TYPE) 

169 return failed_deletions 

170 

171 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

172 def missing_blobs(self, digests: list[Digest]) -> list[Digest]: 

173 missing_blobs: list[Digest] = [] 

174 

175 for result in self._map(_fmb_for_storage, self._partition_digests(digests).items()): 

176 missing_blobs.extend(result) 

177 

178 return missing_blobs 

179 

180 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

181 def bulk_update_blobs(self, blobs: list[tuple[Digest, bytes]]) -> list[Status]: 

182 partitioned_digests: dict[StorageABC, list[tuple[Digest, bytes]]] = defaultdict(list) 

183 idx_map: dict[StorageABC, list[int]] = defaultdict(list) 

184 for orig_idx, digest_tuple in enumerate(blobs): 

185 storage = self._storage_from_digest(digest_tuple[0]) 

186 partitioned_digests[storage].append(digest_tuple) 

187 idx_map[storage].append(orig_idx) 

188 

189 results: list[Status] = [Status(code=code_pb2.INTERNAL, message="inconsistent batch results")] * len(blobs) 

190 for storage, statuses in self._map(_bulk_update_for_storage, partitioned_digests.items()): 

191 for status_idx, status in enumerate(statuses): 

192 results[idx_map[storage][status_idx]] = status 

193 return results 

194 

195 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

196 def bulk_read_blobs(self, digests: list[Digest]) -> dict[str, bytes]: 

197 bulk_read_results: dict[str, bytes] = {} 

198 for result in self._map(_bulk_read_for_storage, self._partition_digests(digests).items()): 

199 bulk_read_results.update(result) 

200 

201 return bulk_read_results