Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

# Copyright (C) 2018 Bloomberg LP 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# <http://www.apache.org/licenses/LICENSE-2.0> 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

 

 

""" 

WithCacheStorage 

================== 

 

A storage provider that first checks a cache, then tries a slower 

fallback provider. 

 

To ensure clients can reliably store blobs in CAS, only `get_blob` 

calls are cached -- `has_blob` and `missing_blobs` will always query 

the fallback. 

""" 

 

import io 

import logging 

 

from .storage_abc import StorageABC 

 

 

class _OutputTee(io.BufferedIOBase): 

"""A file-like object that writes data to two file-like objects. 

 

The files should be in blocking mode; non-blocking mode is unsupported. 

""" 

 

def __init__(self, file_a, file_b): 

super().__init__() 

 

self._original_a = file_a 

if isinstance(file_a, io.BufferedIOBase): 

self._a = file_a 

else: 

self._a = io.BufferedWriter(file_a) 

 

self._original_b = file_b 

if isinstance(file_b, io.BufferedIOBase): 

self._b = file_b 

else: 

self._b = io.BufferedWriter(file_b) 

 

def close(self): 

super().close() 

self._a.close() 

self._b.close() 

 

def flush(self): 

self._a.flush() 

self._b.flush() 

 

def readable(self): 

return False 

 

def seekable(self): 

return False 

 

def write(self, b): 

self._a.write(b) 

return self._b.write(b) 

 

def writable(self): 

return True 

 

 

class _CachingTee(io.RawIOBase): 

"""A file-like object that wraps a 'fallback' file, and when it's 

read, writes the resulting data to a 'cache' storage provider. 

 

Does not support non-blocking mode. 

""" 

 

def __init__(self, fallback_file, digest, cache): 

super().__init__() 

 

self._file = fallback_file 

self._digest = digest 

self._cache = cache 

self._cache_session = cache.begin_write(digest) 

 

def close(self): 

super().close() 

self._cache_session.write(self._file.read()) 

self._cache.commit_write(self._digest, self._cache_session) 

self._file.close() 

 

def readable(self): 

return True 

 

def seekable(self): 

return False 

 

def writable(self): 

return False 

 

def readall(self): 

data = self._file.read() 

self._cache_session.write(data) 

return data 

 

def readinto(self, b): 

bytes_read = self._file.readinto(b) 

self._cache_session.write(b[:bytes_read]) 

return bytes_read 

 

 

class WithCacheStorage(StorageABC): 

 

def __init__(self, cache, fallback): 

self.__logger = logging.getLogger(__name__) 

 

self._cache = cache 

self._fallback = fallback 

 

def has_blob(self, digest): 

return self._fallback.has_blob(digest) 

 

def get_blob(self, digest): 

cache_result = self._cache.get_blob(digest) 

if cache_result is not None: 

return cache_result 

fallback_result = self._fallback.get_blob(digest) 

if fallback_result is None: 

return None 

return _CachingTee(fallback_result, digest, self._cache) 

 

def delete_blob(self, digest): 

self._fallback.delete_blob(digest) 

self._cache.delete_blob(digest) 

 

def begin_write(self, digest): 

return _OutputTee(self._cache.begin_write(digest), self._fallback.begin_write(digest)) 

 

def commit_write(self, digest, write_session): 

write_session.flush() 

self._cache.commit_write(digest, write_session._original_a) 

self._fallback.commit_write(digest, write_session._original_b) 

 

def missing_blobs(self, blobs): 

return self._fallback.missing_blobs(blobs) 

 

def bulk_update_blobs(self, blobs): 

self._cache.bulk_update_blobs(blobs) 

return self._fallback.bulk_update_blobs(blobs)