Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/lru_memory_cache.py: 94.64%

112 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17LRUMemoryCache 

18================== 

19 

20A storage provider that stores data in memory. When the size limit 

21is reached, items are deleted from the cache with the least recently 

22used item being deleted first. 

23""" 

24 

25import collections 

26import io 

27import threading 

28from typing import IO, Any, Dict, List, Optional, Tuple 

29 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

31from buildgrid._protos.google.rpc import code_pb2 

32from buildgrid._protos.google.rpc.status_pb2 import Status 

33from buildgrid.server.decorators import timed 

34from buildgrid.server.logging import buildgrid_logger 

35from buildgrid.server.metrics_names import METRIC 

36from buildgrid.server.settings import HASH 

37 

38from .storage_abc import StorageABC 

39 

40LOGGER = buildgrid_logger(__name__) 

41 

42 

43class _NullBytesIO(io.BufferedIOBase): 

44 """A file-like object that discards all data written to it.""" 

45 

46 def writable(self) -> bool: 

47 return True 

48 

49 # TODO how to type an override here? __buffer: bytes | bytearray | memoryview | array | mmap 

50 def write(self, b: Any) -> int: 

51 return len(b) 

52 

53 

54class LRUMemoryCache(StorageABC): 

55 TYPE = "LRU" 

56 

57 def __init__(self, limit: int) -> None: 

58 self._limit = limit 

59 self._storage: "collections.OrderedDict[Tuple[str, int], bytes]" = collections.OrderedDict() 

60 self._bytes_stored = 0 

61 self._lock = threading.Lock() 

62 

63 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

64 def has_blob(self, digest: Digest) -> bool: 

65 LOGGER.debug("Checking for blob.", tags=dict(digest=digest)) 

66 with self._lock: 

67 return self._has_blob(digest) 

68 

69 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

70 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

71 with self._lock: 

72 return [digest for digest in digests if not self._has_blob(digest)] 

73 

74 def _has_blob(self, digest: Digest) -> bool: 

75 key = (digest.hash, digest.size_bytes) 

76 if key in self._storage: 

77 self._storage.move_to_end(key) 

78 return True 

79 return False 

80 

81 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

82 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

83 LOGGER.debug("Getting blob.", tags=dict(digest=digest)) 

84 with self._lock: 

85 if (result := self._get_blob(digest)) is not None: 

86 return io.BytesIO(result) 

87 return None 

88 

89 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

90 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

91 with self._lock: 

92 return {digest.hash: result for digest in digests if (result := self._get_blob(digest)) is not None} 

93 

94 def _get_blob(self, digest: Digest) -> Optional[bytes]: 

95 key = (digest.hash, digest.size_bytes) 

96 if key in self._storage: 

97 self._storage.move_to_end(key) 

98 return self._storage[key] 

99 return None 

100 

101 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

102 def delete_blob(self, digest: Digest) -> None: 

103 LOGGER.debug("Deleting blob.", tags=dict(digest=digest)) 

104 with self._lock: 

105 self._delete_blob(digest) 

106 

107 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

108 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

109 with self._lock: 

110 for digest in digests: 

111 self._delete_blob(digest) 

112 return [] 

113 

114 def _delete_blob(self, digest: Digest) -> None: 

115 if self._storage.pop((digest.hash, digest.size_bytes), None): 

116 self._bytes_stored -= digest.size_bytes 

117 

118 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

119 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

120 with self._lock: 

121 self._commit_write(digest, write_session) 

122 

123 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

124 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

125 with self._lock: 

126 result = [] 

127 for digest, data in blobs: 

128 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

129 result.append(Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash")) 

130 continue 

131 try: 

132 self._commit_write(digest, io.BytesIO(data)) 

133 result.append(Status(code=code_pb2.OK)) 

134 except Exception as e: 

135 result.append(Status(code=code_pb2.UNKNOWN, message=str(e))) 

136 return result 

137 

138 def _commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

139 LOGGER.debug("Writing blob.", tags=dict(digest=digest)) 

140 if digest.size_bytes > self._limit: 

141 # We can't cache this object, so return without doing anything. 

142 return 

143 

144 key = (digest.hash, digest.size_bytes) 

145 if key in self._storage: 

146 # Digest already in cache, mark it as recently used 

147 self._storage.move_to_end(key) 

148 return 

149 

150 size_after_write = self._bytes_stored + digest.size_bytes 

151 if size_after_write > self._limit: 

152 # Delete stuff until there's enough space to write this blob 

153 LOGGER.debug( 

154 "LRU cleanup triggered.", 

155 tags=dict(current_size=self._bytes_stored, limit=self._limit, additional_bytes=digest.size_bytes), 

156 ) 

157 while size_after_write > self._limit: 

158 deleted_key = self._storage.popitem(last=False)[0] 

159 self._bytes_stored -= deleted_key[1] 

160 size_after_write -= deleted_key[1] 

161 LOGGER.debug("LRU cleanup finished.", tags=dict(current_size=self._bytes_stored)) 

162 elif size_after_write < 0: 

163 # This should never happen 

164 LOGGER.error( 

165 "LRU overflow writing a additional bytes.", 

166 tags=dict( 

167 digest=digest, 

168 additional_bytes=digest.size_bytes, 

169 current_size=self._bytes_stored, 

170 size_after_write=size_after_write, 

171 ), 

172 ) 

173 raise OverflowError() 

174 

175 write_session.seek(0) 

176 self._storage[key] = write_session.read() 

177 self._bytes_stored += digest.size_bytes