Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/sharded.py: 95.88%

97 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17from collections import defaultdict 

18from contextlib import ExitStack 

19from typing import IO, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar 

20 

21import mmh3 

22 

23from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

24from buildgrid._protos.google.rpc import code_pb2 

25from buildgrid._protos.google.rpc.status_pb2 import Status 

26from buildgrid.server.threading import ContextThreadPoolExecutor 

27 

28from .storage_abc import StorageABC 

29 

30LOGGER = logging.getLogger(__name__) 

31 

32 

33_T = TypeVar("_T") 

34_R = TypeVar("_R") 

35 

36 

37# wrapper functions for the bulk StorageABC interfaces 

38def _bulk_delete_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[str]: 

39 storage, digests = storage_digests 

40 return storage.bulk_delete(digests) 

41 

42 

43def _fmb_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> List[Digest]: 

44 storage, digests = storage_digests 

45 return storage.missing_blobs(digests) 

46 

47 

48def _bulk_update_for_storage( 

49 storage_digests: Tuple[StorageABC, List[Tuple[Digest, bytes]]] 

50) -> Tuple[StorageABC, List[Status]]: 

51 storage, digest_tuples = storage_digests 

52 return storage, storage.bulk_update_blobs(digest_tuples) 

53 

54 

55def _bulk_read_for_storage(storage_digests: Tuple[StorageABC, List[Digest]]) -> Dict[str, bytes]: 

56 storage, digests = storage_digests 

57 return storage.bulk_read_blobs(digests) 

58 

59 

60class ShardedStorage(StorageABC): 

61 def __init__(self, storages: Dict[str, StorageABC], thread_pool_size: Optional[int] = None): 

62 self._stack = ExitStack() 

63 if not storages: 

64 raise ValueError("ShardedStorage requires at least one shard") 

65 self._storages = storages 

66 self._threadpool = None 

67 if thread_pool_size: 

68 self._threadpool = ContextThreadPoolExecutor(thread_pool_size, "sharded-storage") 

69 

70 def start(self) -> None: 

71 if self._threadpool: 

72 self._stack.enter_context(self._threadpool) 

73 for storage in self._storages.values(): 

74 self._stack.enter_context(storage) 

75 

76 def stop(self) -> None: 

77 self._stack.close() 

78 LOGGER.info(f"Stopped {type(self).__name__}") 

79 

80 def _storage_from_digest(self, digest: Digest) -> StorageABC: 

81 def _score(shard_name: str, digest: Digest) -> int: 

82 hash = mmh3.hash(f"{shard_name}\t{digest.hash}", signed=False) 

83 return hash 

84 

85 shard_name = min(self._storages.keys(), key=lambda name: _score(name, digest)) 

86 return self._storages[shard_name] 

87 

88 def _partition_digests(self, digests: List[Digest]) -> Dict[StorageABC, List[Digest]]: 

89 partition: Dict[StorageABC, List[Digest]] = defaultdict(list) 

90 for digest in digests: 

91 storage = self._storage_from_digest(digest) 

92 partition[storage].append(digest) 

93 return partition 

94 

95 def _map(self, fn: Callable[[_T], _R], args: Iterable[_T]) -> Iterator[_R]: 

96 if self._threadpool: 

97 return self._threadpool.map(fn, args) 

98 else: 

99 return map(fn, args) 

100 

101 def has_blob(self, digest: Digest) -> bool: 

102 return self._storage_from_digest(digest).has_blob(digest) 

103 

104 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

105 return self._storage_from_digest(digest).get_blob(digest) 

106 

107 def delete_blob(self, digest: Digest) -> None: 

108 self._storage_from_digest(digest).delete_blob(digest) 

109 

110 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

111 self._storage_from_digest(digest).commit_write(digest, write_session) 

112 

113 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

114 failed_deletions: List[str] = [] 

115 for result in self._map(_bulk_delete_for_storage, self._partition_digests(digests).items()): 

116 failed_deletions.extend(result) 

117 

118 return failed_deletions 

119 

120 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

121 missing_blobs: List[Digest] = [] 

122 

123 for result in self._map(_fmb_for_storage, self._partition_digests(digests).items()): 

124 missing_blobs.extend(result) 

125 

126 return missing_blobs 

127 

128 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

129 partitioned_digests: Dict[StorageABC, List[Tuple[Digest, bytes]]] = defaultdict(list) 

130 idx_map: Dict[StorageABC, List[int]] = defaultdict(list) 

131 for orig_idx, digest_tuple in enumerate(blobs): 

132 storage = self._storage_from_digest(digest_tuple[0]) 

133 partitioned_digests[storage].append(digest_tuple) 

134 idx_map[storage].append(orig_idx) 

135 

136 results: List[Status] = [Status(code=code_pb2.INTERNAL, message="inconsistent batch results")] * len(blobs) 

137 for storage, statuses in self._map(_bulk_update_for_storage, partitioned_digests.items()): 

138 for status_idx, status in enumerate(statuses): 

139 results[idx_map[storage][status_idx]] = status 

140 return results 

141 

142 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

143 bulk_read_results: Dict[str, bytes] = {} 

144 for result in self._map(_bulk_read_for_storage, self._partition_digests(digests).items()): 

145 bulk_read_results.update(result) 

146 

147 return bulk_read_results 

148 

149 def set_instance_name(self, instance_name: str) -> None: 

150 super().set_instance_name(instance_name) 

151 for storage in self._storages.values(): 

152 storage.set_instance_name(instance_name)