Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/lru_memory_cache.py: 93.94%

66 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-03-27 12:56 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17LRUMemoryCache 

18================== 

19 

20A storage provider that stores data in memory. When the size limit 

21is reached, items are deleted from the cache with the least recently 

22used item being deleted first. 

23""" 

24 

25import collections 

26import io 

27import logging 

28import threading 

29from typing import IO, Any, Optional, Tuple 

30 

31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

32 

33from .storage_abc import StorageABC 

34 

35LOGGER = logging.getLogger(__name__) 

36 

37 

38class _NullBytesIO(io.BufferedIOBase): 

39 """A file-like object that discards all data written to it.""" 

40 

41 def writable(self) -> bool: 

42 return True 

43 

44 # TODO how to type an override here? __buffer: bytes | bytearray | memoryview | array | mmap 

45 def write(self, b: Any) -> int: 

46 return len(b) 

47 

48 

49class LRUMemoryCache(StorageABC): 

50 def __init__(self, limit: int) -> None: 

51 self._limit = limit 

52 self._storage: "collections.OrderedDict[Tuple[str, int], bytes]" = collections.OrderedDict() 

53 self._bytes_stored = 0 

54 self._lock = threading.Lock() 

55 

56 def has_blob(self, digest: Digest) -> bool: 

57 LOGGER.debug(f"Checking for blob: [{digest}]") 

58 with self._lock: 

59 key = (digest.hash, digest.size_bytes) 

60 result = key in self._storage 

61 if result: 

62 self._storage.move_to_end(key) 

63 return result 

64 

65 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

66 LOGGER.debug(f"Getting blob: [{digest}]") 

67 with self._lock: 

68 key = (digest.hash, digest.size_bytes) 

69 if key in self._storage: 

70 self._storage.move_to_end(key) 

71 return io.BytesIO(self._storage[key]) 

72 return None 

73 

74 def delete_blob(self, digest: Digest) -> None: 

75 LOGGER.debug(f"Deleting blob: [{digest}]") 

76 key = (digest.hash, digest.size_bytes) 

77 with self._lock: 

78 deleted_blob = self._storage.pop(key, None) 

79 if deleted_blob: 

80 self._bytes_stored -= digest.size_bytes 

81 

82 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

83 LOGGER.debug(f"Writing blob: [{digest}]") 

84 if digest.size_bytes > self._limit: 

85 # We can't cache this object, so return without doing anything. 

86 return 

87 with self._lock: 

88 key = (digest.hash, digest.size_bytes) 

89 if key in self._storage: 

90 # Digest already in cache, mark it as recently used 

91 self._storage.move_to_end(key) 

92 return 

93 

94 size_after_write = self._bytes_stored + digest.size_bytes 

95 if size_after_write > self._limit: 

96 # Delete stuff until there's enough space to write this blob 

97 LOGGER.debug( 

98 f"LRU cleanup triggered. current_size=[{self._bytes_stored}], " 

99 f"limit=[{self._limit}], additional_bytes=[{digest.size_bytes}" 

100 ) 

101 while size_after_write > self._limit: 

102 deleted_key = self._storage.popitem(last=False)[0] 

103 self._bytes_stored -= deleted_key[1] 

104 size_after_write -= deleted_key[1] 

105 LOGGER.debug(f"LRU cleanup finished, current_size=[{self._bytes_stored}]") 

106 elif size_after_write < 0: 

107 # This should never happen 

108 LOGGER.error( 

109 f"Overflow: writing a additional_bytes=[{digest.size_bytes}] " 

110 f"causes the current_size=[{self._bytes_stored}] to become " 

111 f"size_after_write=[{size_after_write}]" 

112 ) 

113 raise OverflowError() 

114 

115 write_session.seek(0) 

116 self._storage[key] = write_session.read() 

117 self._bytes_stored += digest.size_bytes