Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/lru_memory_cache.py: 95.59%

68 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17LRUMemoryCache 

18================== 

19 

20A storage provider that stores data in memory. When the size limit 

21is reached, items are deleted from the cache with the least recently 

22used item being deleted first. 

23""" 

24 

25import collections 

26import io 

27import logging 

28import threading 

29 

30from .storage_abc import StorageABC 

31 

32 

33class _NullBytesIO(io.BufferedIOBase): 

34 """A file-like object that discards all data written to it.""" 

35 

36 def writable(self): 

37 return True 

38 

39 def write(self, b): 

40 return len(b) 

41 

42 

43class LRUMemoryCache(StorageABC): 

44 

45 def __init__(self, limit): 

46 self.__logger = logging.getLogger(__name__) 

47 

48 self._limit = limit 

49 self._storage = collections.OrderedDict() 

50 self._bytes_stored = 0 

51 self._lock = threading.Lock() 

52 self.logger = logging.getLogger(__name__) 

53 

54 def has_blob(self, digest): 

55 self.__logger.debug(f"Checking for blob: [{digest}]") 

56 with self._lock: 

57 key = (digest.hash, digest.size_bytes) 

58 result = key in self._storage 

59 if result: 

60 self._storage.move_to_end(key) 

61 return result 

62 

63 def get_blob(self, digest): 

64 self.__logger.debug(f"Getting blob: [{digest}]") 

65 with self._lock: 

66 key = (digest.hash, digest.size_bytes) 

67 if key in self._storage: 

68 self._storage.move_to_end(key) 

69 return io.BytesIO(self._storage[key]) 

70 return None 

71 

72 def delete_blob(self, digest): 

73 self.__logger.debug(f"Deleting blob: [{digest}]") 

74 key = (digest.hash, digest.size_bytes) 

75 with self._lock: 

76 deleted_blob = self._storage.pop(key, None) 

77 if deleted_blob: 

78 self._bytes_stored -= digest.size_bytes 

79 

80 def begin_write(self, digest): 

81 if digest.size_bytes > self._limit: 

82 # Don't try to cache objects bigger than our memory limit. 

83 return _NullBytesIO() 

84 return io.BytesIO() 

85 

86 def commit_write(self, digest, write_session): 

87 self.__logger.debug(f"Writing blob: [{digest}]") 

88 if isinstance(write_session, _NullBytesIO): 

89 # We can't cache this object, so return without doing anything. 

90 return 

91 with self._lock: 

92 key = (digest.hash, digest.size_bytes) 

93 if key in self._storage: 

94 # Digest already in cache, mark it as recently used 

95 self._storage.move_to_end(key) 

96 return 

97 

98 size_after_write = self._bytes_stored + digest.size_bytes 

99 if size_after_write > self._limit: 

100 # Delete stuff until there's enough space to write this blob 

101 self.__logger.debug(f"LRU cleanup triggered. current_size=[{self._bytes_stored}], " 

102 f"limit=[{self._limit}], additional_bytes=[{digest.size_bytes}") 

103 while size_after_write > self._limit: 

104 deleted_key = self._storage.popitem(last=False)[0] 

105 self._bytes_stored -= deleted_key[1] 

106 size_after_write -= deleted_key[1] 

107 self.__logger.debug(f"LRU cleanup finished, current_size=[{self._bytes_stored}]") 

108 elif size_after_write < 0: 

109 # This should never happen 

110 self.__logger.error(f"Overflow: writing a additional_bytes=[{digest.size_bytes}] " 

111 f"causes the current_size=[{self._bytes_stored}] to become " 

112 f"size_after_write=[{size_after_write}]") 

113 raise OverflowError() 

114 

115 self._storage[key] = write_session.getvalue() 

116 self._bytes_stored += digest.size_bytes