Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/size_differentiated.py: 97.25%

109 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-03-28 16:20 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SizeDifferentiatedStorage 

18========================= 

19 

20A storage provider which passes requests to other storage providers 

21based on the size of the blob being requested. 

22 

23""" 

24 

25import logging 

26from collections import defaultdict 

27from contextlib import ExitStack 

28from typing import IO, Callable, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, TypedDict, TypeVar 

29 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

31from buildgrid._protos.google.rpc import code_pb2 

32from buildgrid._protos.google.rpc.status_pb2 import Status 

33 

34from ...threading import ContextThreadPoolExecutor 

35from .storage_abc import StorageABC 

36 

37LOGGER = logging.getLogger(__name__) 

38 

39 

40class SizeLimitedStorageType(TypedDict): 

41 max_size: int 

42 storage: StorageABC 

43 

44 

45# NOTE: This exists separately to the TypedDict to allow attribute-based access 

46# at runtime, rather than relying on string-based access to dictionary keys. 

47class _SizeLimitedStorage(NamedTuple): 

48 max_size: int 

49 storage: StorageABC 

50 

51 

52_T = TypeVar("_T") 

53_R = TypeVar("_R") 

54 

55 

56# wrapper functions for the bulk StorageABC interfaces 

57def _bulk_delete_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[str]: 

58 storage, digests = storage_digests 

59 return storage.bulk_delete(digests) 

60 

61 

62def _fmb_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[Digest]: 

63 storage, digests = storage_digests 

64 return storage.missing_blobs(digests) 

65 

66 

67def _bulk_update_for_storage( 

68 storage_digests: Tuple[StorageABC, List[Tuple[Digest, bytes]]] 

69) -> Tuple[StorageABC, List[Status]]: 

70 storage, digest_tuples = storage_digests 

71 return storage, storage.bulk_update_blobs(digest_tuples) 

72 

73 

74def _bulk_read_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> Dict[str, bytes]: 

75 storage, digests = storage_digests 

76 return storage.bulk_read_blobs(digests) 

77 

78 

79class SizeDifferentiatedStorage(StorageABC): 

80 def __init__( 

81 self, storages: List[SizeLimitedStorageType], fallback: StorageABC, thread_pool_size: Optional[int] = None 

82 ) -> None: 

83 self._stack = ExitStack() 

84 self._fallback_storage = fallback 

85 self._storages = [_SizeLimitedStorage(**storage) for storage in storages] 

86 self._storages.sort(key=lambda storage: storage.max_size) 

87 self._threadpool: Optional[ContextThreadPoolExecutor] = None 

88 if thread_pool_size: 

89 self._threadpool = ContextThreadPoolExecutor(thread_pool_size, "size-differentiated-storage") 

90 

91 def _storage_from_digest(self, digest: Digest) -> StorageABC: 

92 for storage in self._storages: 

93 if digest.size_bytes < storage.max_size: 

94 return storage.storage 

95 # If the blob is too big for any of the size-limited storages, 

96 # put it in the fallback. 

97 return self._fallback_storage 

98 

99 def _partition_digests(self, digests: List[Digest]) -> Dict[StorageABC, List[Digest]]: 

100 partition: Dict[StorageABC, List[Digest]] = defaultdict(list) 

101 for digest in digests: 

102 storage = self._storage_from_digest(digest) 

103 partition[storage].append(digest) 

104 return partition 

105 

106 def _map(self, fn: Callable[[_T], _R], args: Iterable[_T]) -> Iterator[_R]: 

107 if self._threadpool: 

108 return self._threadpool.map(fn, args) 

109 else: 

110 return map(fn, args) 

111 

112 def start(self) -> None: 

113 if self._threadpool: 

114 self._stack.enter_context(self._threadpool) 

115 for storage_tuple in self._storages: 

116 self._stack.enter_context(storage_tuple.storage) 

117 

118 def stop(self) -> None: 

119 self._stack.close() 

120 

121 def has_blob(self, digest: Digest) -> bool: 

122 LOGGER.debug(f"Checking for blob: [{digest}]") 

123 storage = self._storage_from_digest(digest) 

124 return storage.has_blob(digest) 

125 

126 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

127 LOGGER.debug(f"Getting blob: [{digest}]") 

128 storage = self._storage_from_digest(digest) 

129 return storage.get_blob(digest) 

130 

131 def delete_blob(self, digest: Digest) -> None: 

132 LOGGER.debug(f"Deleting blob: [{digest}]") 

133 storage = self._storage_from_digest(digest) 

134 storage.delete_blob(digest) 

135 

136 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

137 LOGGER.debug(f"Writing blob: [{digest}]") 

138 storage = self._storage_from_digest(digest) 

139 storage.commit_write(digest, write_session) 

140 

141 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

142 failed_deletions: List[str] = [] 

143 for result in self._map(_bulk_delete_for_storage, self._partition_digests(digests).items()): 

144 failed_deletions.extend(result) 

145 

146 return failed_deletions 

147 

148 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

149 missing_blobs: List[Digest] = [] 

150 

151 for result in self._map(_fmb_for_storage, self._partition_digests(digests).items()): 

152 missing_blobs.extend(result) 

153 

154 return missing_blobs 

155 

156 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

157 partitioned_digests: Dict[StorageABC, List[Tuple[Digest, bytes]]] = defaultdict(list) 

158 idx_map: Dict[StorageABC, List[int]] = defaultdict(list) 

159 for orig_idx, digest_tuple in enumerate(blobs): 

160 storage = self._storage_from_digest(digest_tuple[0]) 

161 partitioned_digests[storage].append(digest_tuple) 

162 idx_map[storage].append(orig_idx) 

163 

164 results: List[Status] = [Status(code=code_pb2.INTERNAL, message="inconsistent batch results")] * len(blobs) 

165 for storage, statuses in self._map(_bulk_update_for_storage, partitioned_digests.items()): 

166 for status_idx, status in enumerate(statuses): 

167 results[idx_map[storage][status_idx]] = status 

168 return results 

169 

170 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

171 bulk_read_results: Dict[str, bytes] = {} 

172 for result in self._map(_bulk_read_for_storage, self._partition_digests(digests).items()): 

173 bulk_read_results.update(result) 

174 

175 return bulk_read_results 

176 

177 def set_instance_name(self, instance_name: str) -> None: 

178 super().set_instance_name(instance_name) 

179 for storage_tuple in self._storages: 

180 storage_tuple.storage.set_instance_name(instance_name)