Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/size_differentiated.py: 97.27%

110 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SizeDifferentiatedStorage 

18========================= 

19 

20A storage provider which passes requests to other storage providers 

21based on the size of the blob being requested. 

22 

23""" 

24 

25import logging 

26from collections import defaultdict 

27from contextlib import ExitStack 

28from typing import IO, Callable, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, TypedDict, TypeVar 

29 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

31from buildgrid._protos.google.rpc import code_pb2 

32from buildgrid._protos.google.rpc.status_pb2 import Status 

33 

34from ...threading import ContextThreadPoolExecutor 

35from .storage_abc import StorageABC 

36 

37LOGGER = logging.getLogger(__name__) 

38 

39 

40class SizeLimitedStorageType(TypedDict): 

41 max_size: int 

42 storage: StorageABC 

43 

44 

45# NOTE: This exists separately to the TypedDict to allow attribute-based access 

46# at runtime, rather than relying on string-based access to dictionary keys. 

47class _SizeLimitedStorage(NamedTuple): 

48 max_size: int 

49 storage: StorageABC 

50 

51 

52_T = TypeVar("_T") 

53_R = TypeVar("_R") 

54 

55 

56# wrapper functions for the bulk StorageABC interfaces 

57def _bulk_delete_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[str]: 

58 storage, digests = storage_digests 

59 return storage.bulk_delete(digests) 

60 

61 

62def _fmb_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[Digest]: 

63 storage, digests = storage_digests 

64 return storage.missing_blobs(digests) 

65 

66 

67def _bulk_update_for_storage( 

68 storage_digests: Tuple[StorageABC, List[Tuple[Digest, bytes]]] 

69) -> Tuple[StorageABC, List[Status]]: 

70 storage, digest_tuples = storage_digests 

71 return storage, storage.bulk_update_blobs(digest_tuples) 

72 

73 

74def _bulk_read_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> Dict[str, bytes]: 

75 storage, digests = storage_digests 

76 return storage.bulk_read_blobs(digests) 

77 

78 

79class SizeDifferentiatedStorage(StorageABC): 

80 def __init__( 

81 self, storages: List[SizeLimitedStorageType], fallback: StorageABC, thread_pool_size: Optional[int] = None 

82 ) -> None: 

83 self._stack = ExitStack() 

84 self._fallback_storage = fallback 

85 self._storages = [_SizeLimitedStorage(**storage) for storage in storages] 

86 self._storages.sort(key=lambda storage: storage.max_size) 

87 self._threadpool: Optional[ContextThreadPoolExecutor] = None 

88 if thread_pool_size: 

89 self._threadpool = ContextThreadPoolExecutor(thread_pool_size, "size-differentiated-storage") 

90 

91 def _storage_from_digest(self, digest: Digest) -> StorageABC: 

92 for storage in self._storages: 

93 if digest.size_bytes < storage.max_size: 

94 return storage.storage 

95 # If the blob is too big for any of the size-limited storages, 

96 # put it in the fallback. 

97 return self._fallback_storage 

98 

99 def _partition_digests(self, digests: List[Digest]) -> Dict[StorageABC, List[Digest]]: 

100 partition: Dict[StorageABC, List[Digest]] = defaultdict(list) 

101 for digest in digests: 

102 storage = self._storage_from_digest(digest) 

103 partition[storage].append(digest) 

104 return partition 

105 

106 def _map(self, fn: Callable[[_T], _R], args: Iterable[_T]) -> Iterator[_R]: 

107 if self._threadpool: 

108 return self._threadpool.map(fn, args) 

109 else: 

110 return map(fn, args) 

111 

112 def start(self) -> None: 

113 if self._threadpool: 

114 self._stack.enter_context(self._threadpool) 

115 for storage_tuple in self._storages: 

116 self._stack.enter_context(storage_tuple.storage) 

117 

118 def stop(self) -> None: 

119 self._stack.close() 

120 LOGGER.info(f"Stopped {type(self).__name__}") 

121 

122 def has_blob(self, digest: Digest) -> bool: 

123 LOGGER.debug(f"Checking for blob: [{digest}]") 

124 storage = self._storage_from_digest(digest) 

125 return storage.has_blob(digest) 

126 

127 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

128 LOGGER.debug(f"Getting blob: [{digest}]") 

129 storage = self._storage_from_digest(digest) 

130 return storage.get_blob(digest) 

131 

132 def delete_blob(self, digest: Digest) -> None: 

133 LOGGER.debug(f"Deleting blob: [{digest}]") 

134 storage = self._storage_from_digest(digest) 

135 storage.delete_blob(digest) 

136 

137 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

138 LOGGER.debug(f"Writing blob: [{digest}]") 

139 storage = self._storage_from_digest(digest) 

140 storage.commit_write(digest, write_session) 

141 

142 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

143 failed_deletions: List[str] = [] 

144 for result in self._map(_bulk_delete_for_storage, self._partition_digests(digests).items()): 

145 failed_deletions.extend(result) 

146 

147 return failed_deletions 

148 

149 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

150 missing_blobs: List[Digest] = [] 

151 

152 for result in self._map(_fmb_for_storage, self._partition_digests(digests).items()): 

153 missing_blobs.extend(result) 

154 

155 return missing_blobs 

156 

157 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

158 partitioned_digests: Dict[StorageABC, List[Tuple[Digest, bytes]]] = defaultdict(list) 

159 idx_map: Dict[StorageABC, List[int]] = defaultdict(list) 

160 for orig_idx, digest_tuple in enumerate(blobs): 

161 storage = self._storage_from_digest(digest_tuple[0]) 

162 partitioned_digests[storage].append(digest_tuple) 

163 idx_map[storage].append(orig_idx) 

164 

165 results: List[Status] = [Status(code=code_pb2.INTERNAL, message="inconsistent batch results")] * len(blobs) 

166 for storage, statuses in self._map(_bulk_update_for_storage, partitioned_digests.items()): 

167 for status_idx, status in enumerate(statuses): 

168 results[idx_map[storage][status_idx]] = status 

169 return results 

170 

171 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

172 bulk_read_results: Dict[str, bytes] = {} 

173 for result in self._map(_bulk_read_for_storage, self._partition_digests(digests).items()): 

174 bulk_read_results.update(result) 

175 

176 return bulk_read_results 

177 

178 def set_instance_name(self, instance_name: str) -> None: 

179 super().set_instance_name(instance_name) 

180 for storage_tuple in self._storages: 

181 storage_tuple.storage.set_instance_name(instance_name)