Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17WithCacheStorage 

18================== 

19 

20A storage provider that first checks a cache, then tries a slower 

21fallback provider. 

22 

23To ensure clients can reliably store blobs in CAS, only `get_blob` 

24calls are cached -- `has_blob` and `missing_blobs` will always query 

25the fallback. 

26""" 

27 

28import io 

29import logging 

30 

31from .storage_abc import StorageABC 

32 

33 

34class _OutputTee(io.BufferedIOBase): 

35 """A file-like object that writes data to two file-like objects. 

36 

37 The files should be in blocking mode; non-blocking mode is unsupported. 

38 """ 

39 

40 def __init__(self, file_a, file_b): 

41 super().__init__() 

42 

43 self._original_a = file_a 

44 if isinstance(file_a, io.BufferedIOBase): 

45 self._a = file_a 

46 else: 

47 self._a = io.BufferedWriter(file_a) 

48 

49 self._original_b = file_b 

50 if isinstance(file_b, io.BufferedIOBase): 

51 self._b = file_b 

52 else: 

53 self._b = io.BufferedWriter(file_b) 

54 

55 def close(self): 

56 super().close() 

57 self._a.close() 

58 self._b.close() 

59 

60 def flush(self): 

61 self._a.flush() 

62 self._b.flush() 

63 

64 def readable(self): 

65 return False 

66 

67 def seekable(self): 

68 return False 

69 

70 def write(self, b): 

71 self._a.write(b) 

72 return self._b.write(b) 

73 

74 def writable(self): 

75 return True 

76 

77 

78class _CachingTee(io.RawIOBase): 

79 """A file-like object that wraps a 'fallback' file, and when it's 

80 read, writes the resulting data to a 'cache' storage provider. 

81 

82 Does not support non-blocking mode. 

83 """ 

84 

85 def __init__(self, fallback_file, digest, cache): 

86 super().__init__() 

87 

88 self._file = fallback_file 

89 self._digest = digest 

90 self._cache = cache 

91 self._cache_session = cache.begin_write(digest) 

92 

93 def close(self): 

94 super().close() 

95 self._cache_session.write(self._file.read()) 

96 self._cache.commit_write(self._digest, self._cache_session) 

97 self._file.close() 

98 

99 def readable(self): 

100 return True 

101 

102 def seekable(self): 

103 return False 

104 

105 def writable(self): 

106 return False 

107 

108 def readall(self): 

109 data = self._file.read() 

110 self._cache_session.write(data) 

111 return data 

112 

113 def readinto(self, b): 

114 bytes_read = self._file.readinto(b) 

115 self._cache_session.write(b[:bytes_read]) 

116 return bytes_read 

117 

118 

119class WithCacheStorage(StorageABC): 

120 

121 def __init__(self, cache, fallback): 

122 self.__logger = logging.getLogger(__name__) 

123 

124 self._cache = cache 

125 self._fallback = fallback 

126 

127 def has_blob(self, digest): 

128 return self._fallback.has_blob(digest) 

129 

130 def get_blob(self, digest): 

131 cache_result = self._cache.get_blob(digest) 

132 if cache_result is not None: 

133 return cache_result 

134 fallback_result = self._fallback.get_blob(digest) 

135 if fallback_result is None: 

136 return None 

137 return _CachingTee(fallback_result, digest, self._cache) 

138 

139 def delete_blob(self, digest): 

140 self._fallback.delete_blob(digest) 

141 self._cache.delete_blob(digest) 

142 

143 def bulk_delete(self, digests): 

144 self._cache.bulk_delete(digests) 

145 self._fallback.bulk_delete(digests) 

146 

147 def begin_write(self, digest): 

148 return _OutputTee(self._cache.begin_write(digest), self._fallback.begin_write(digest)) 

149 

150 def commit_write(self, digest, write_session): 

151 write_session.flush() 

152 self._cache.commit_write(digest, write_session._original_a) 

153 self._fallback.commit_write(digest, write_session._original_b) 

154 

155 def missing_blobs(self, blobs): 

156 return self._fallback.missing_blobs(blobs) 

157 

158 def bulk_update_blobs(self, blobs): 

159 self._cache.bulk_update_blobs(blobs) 

160 return self._fallback.bulk_update_blobs(blobs) 

161 

162 def bulk_read_blobs(self, digests): 

163 cache_blobs = self._cache.bulk_read_blobs(digests) 

164 uncached_digests = filter( 

165 lambda digest: cache_blobs.get(digest.hash, None) is None, 

166 digests 

167 ) 

168 fallback_blobs = self._fallback.bulk_read_blobs(uncached_digests) 

169 cache_blobs.update(fallback_blobs) 

170 return cache_blobs