Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/size_differentiated.py: 100.00%

119 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SizeDifferentiatedStorage 

18========================= 

19 

20A storage provider which passes requests to other storage providers 

21based on the size of the blob being requested. 

22 

23""" 

24 

25 

26from collections import defaultdict 

27from contextlib import ExitStack 

28from typing import IO, Callable, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, TypedDict, TypeVar 

29 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

31from buildgrid._protos.google.rpc import code_pb2 

32from buildgrid._protos.google.rpc.status_pb2 import Status 

33from buildgrid.server.decorators import timed 

34from buildgrid.server.logging import buildgrid_logger 

35from buildgrid.server.metrics_names import METRIC 

36from buildgrid.server.metrics_utils import publish_counter_metric 

37from buildgrid.server.threading import ContextThreadPoolExecutor 

38 

39from .storage_abc import StorageABC 

40 

41LOGGER = buildgrid_logger(__name__) 

42 

43 

44class SizeLimitedStorageType(TypedDict): 

45 max_size: int 

46 storage: StorageABC 

47 

48 

49# NOTE: This exists separately to the TypedDict to allow attribute-based access 

50# at runtime, rather than relying on string-based access to dictionary keys. 

51class _SizeLimitedStorage(NamedTuple): 

52 max_size: int 

53 storage: StorageABC 

54 

55 

56_T = TypeVar("_T") 

57_R = TypeVar("_R") 

58 

59 

60# wrapper functions for the bulk StorageABC interfaces 

61def _bulk_delete_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[str]: 

62 storage, digests = storage_digests 

63 return storage.bulk_delete(digests) 

64 

65 

66def _fmb_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[Digest]: 

67 storage, digests = storage_digests 

68 return storage.missing_blobs(digests) 

69 

70 

71def _bulk_update_for_storage( 

72 storage_digests: Tuple[StorageABC, List[Tuple[Digest, bytes]]] 

73) -> Tuple[StorageABC, List[Status]]: 

74 storage, digest_tuples = storage_digests 

75 return storage, storage.bulk_update_blobs(digest_tuples) 

76 

77 

78def _bulk_read_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> Dict[str, bytes]: 

79 storage, digests = storage_digests 

80 return storage.bulk_read_blobs(digests) 

81 

82 

83class SizeDifferentiatedStorage(StorageABC): 

84 TYPE = "SizeDifferentiated" 

85 

86 def __init__( 

87 self, storages: List[SizeLimitedStorageType], fallback: StorageABC, thread_pool_size: Optional[int] = None 

88 ) -> None: 

89 self._stack = ExitStack() 

90 self._fallback_storage = fallback 

91 self._storages = [_SizeLimitedStorage(**storage) for storage in storages] 

92 self._storages.sort(key=lambda storage: storage.max_size) 

93 self._threadpool: Optional[ContextThreadPoolExecutor] = None 

94 if thread_pool_size: 

95 self._threadpool = ContextThreadPoolExecutor(thread_pool_size, "size-differentiated-storage") 

96 

97 def _storage_from_digest(self, digest: Digest) -> StorageABC: 

98 for storage in self._storages: 

99 if digest.size_bytes < storage.max_size: 

100 return storage.storage 

101 # If the blob is too big for any of the size-limited storages, 

102 # put it in the fallback. 

103 return self._fallback_storage 

104 

105 def _partition_digests(self, digests: List[Digest]) -> Dict[StorageABC, List[Digest]]: 

106 partition: Dict[StorageABC, List[Digest]] = defaultdict(list) 

107 for digest in digests: 

108 storage = self._storage_from_digest(digest) 

109 partition[storage].append(digest) 

110 return partition 

111 

112 def _map(self, fn: Callable[[_T], _R], args: Iterable[_T]) -> Iterator[_R]: 

113 if self._threadpool: 

114 return self._threadpool.map(fn, args) 

115 else: 

116 return map(fn, args) 

117 

118 def start(self) -> None: 

119 if self._threadpool: 

120 self._stack.enter_context(self._threadpool) 

121 for storage_tuple in self._storages: 

122 self._stack.enter_context(storage_tuple.storage) 

123 

124 def stop(self) -> None: 

125 self._stack.close() 

126 LOGGER.info(f"Stopped {type(self).__name__}") 

127 

128 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

129 def has_blob(self, digest: Digest) -> bool: 

130 LOGGER.debug("Checking for blob.", tags=dict(digest=digest)) 

131 storage = self._storage_from_digest(digest) 

132 return storage.has_blob(digest) 

133 

134 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

135 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

136 LOGGER.debug("Getting blob.", tags=dict(digest=digest)) 

137 storage = self._storage_from_digest(digest) 

138 return storage.get_blob(digest) 

139 

140 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

141 def delete_blob(self, digest: Digest) -> None: 

142 LOGGER.debug("Deleting blob.", tags=dict(digest=digest)) 

143 storage = self._storage_from_digest(digest) 

144 storage.delete_blob(digest) 

145 

146 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

147 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

148 LOGGER.debug("Writing blob.", tags=dict(digest=digest)) 

149 storage = self._storage_from_digest(digest) 

150 storage.commit_write(digest, write_session) 

151 

152 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

153 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

154 failed_deletions: List[str] = [] 

155 for result in self._map(_bulk_delete_for_storage, self._partition_digests(digests).items()): 

156 failed_deletions.extend(result) 

157 

158 publish_counter_metric(METRIC.STORAGE.DELETE_ERRORS_COUNT, len(failed_deletions), type=self.TYPE) 

159 return failed_deletions 

160 

161 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

162 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

163 missing_blobs: List[Digest] = [] 

164 

165 for result in self._map(_fmb_for_storage, self._partition_digests(digests).items()): 

166 missing_blobs.extend(result) 

167 

168 return missing_blobs 

169 

170 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

171 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

172 partitioned_digests: Dict[StorageABC, List[Tuple[Digest, bytes]]] = defaultdict(list) 

173 idx_map: Dict[StorageABC, List[int]] = defaultdict(list) 

174 for orig_idx, digest_tuple in enumerate(blobs): 

175 storage = self._storage_from_digest(digest_tuple[0]) 

176 partitioned_digests[storage].append(digest_tuple) 

177 idx_map[storage].append(orig_idx) 

178 

179 results: List[Status] = [Status(code=code_pb2.INTERNAL, message="inconsistent batch results")] * len(blobs) 

180 for storage, statuses in self._map(_bulk_update_for_storage, partitioned_digests.items()): 

181 for status_idx, status in enumerate(statuses): 

182 results[idx_map[storage][status_idx]] = status 

183 return results 

184 

185 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

186 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

187 bulk_read_results: Dict[str, bytes] = {} 

188 for result in self._map(_bulk_read_for_storage, self._partition_digests(digests).items()): 

189 bulk_read_results.update(result) 

190 

191 return bulk_read_results