Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/lru_memory_cache.py: 93.94%
66 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-27 12:56 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-27 12:56 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17LRUMemoryCache
18==================
20A storage provider that stores data in memory. When the size limit
21is reached, items are deleted from the cache with the least recently
22used item being deleted first.
23"""
25import collections
26import io
27import logging
28import threading
29from typing import IO, Any, Optional, Tuple
31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
33from .storage_abc import StorageABC
35LOGGER = logging.getLogger(__name__)
38class _NullBytesIO(io.BufferedIOBase):
39 """A file-like object that discards all data written to it."""
41 def writable(self) -> bool:
42 return True
44 # TODO how to type an override here? __buffer: bytes | bytearray | memoryview | array | mmap
45 def write(self, b: Any) -> int:
46 return len(b)
49class LRUMemoryCache(StorageABC):
50 def __init__(self, limit: int) -> None:
51 self._limit = limit
52 self._storage: "collections.OrderedDict[Tuple[str, int], bytes]" = collections.OrderedDict()
53 self._bytes_stored = 0
54 self._lock = threading.Lock()
56 def has_blob(self, digest: Digest) -> bool:
57 LOGGER.debug(f"Checking for blob: [{digest}]")
58 with self._lock:
59 key = (digest.hash, digest.size_bytes)
60 result = key in self._storage
61 if result:
62 self._storage.move_to_end(key)
63 return result
65 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
66 LOGGER.debug(f"Getting blob: [{digest}]")
67 with self._lock:
68 key = (digest.hash, digest.size_bytes)
69 if key in self._storage:
70 self._storage.move_to_end(key)
71 return io.BytesIO(self._storage[key])
72 return None
74 def delete_blob(self, digest: Digest) -> None:
75 LOGGER.debug(f"Deleting blob: [{digest}]")
76 key = (digest.hash, digest.size_bytes)
77 with self._lock:
78 deleted_blob = self._storage.pop(key, None)
79 if deleted_blob:
80 self._bytes_stored -= digest.size_bytes
82 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
83 LOGGER.debug(f"Writing blob: [{digest}]")
84 if digest.size_bytes > self._limit:
85 # We can't cache this object, so return without doing anything.
86 return
87 with self._lock:
88 key = (digest.hash, digest.size_bytes)
89 if key in self._storage:
90 # Digest already in cache, mark it as recently used
91 self._storage.move_to_end(key)
92 return
94 size_after_write = self._bytes_stored + digest.size_bytes
95 if size_after_write > self._limit:
96 # Delete stuff until there's enough space to write this blob
97 LOGGER.debug(
98 f"LRU cleanup triggered. current_size=[{self._bytes_stored}], "
99 f"limit=[{self._limit}], additional_bytes=[{digest.size_bytes}"
100 )
101 while size_after_write > self._limit:
102 deleted_key = self._storage.popitem(last=False)[0]
103 self._bytes_stored -= deleted_key[1]
104 size_after_write -= deleted_key[1]
105 LOGGER.debug(f"LRU cleanup finished, current_size=[{self._bytes_stored}]")
106 elif size_after_write < 0:
107 # This should never happen
108 LOGGER.error(
109 f"Overflow: writing a additional_bytes=[{digest.size_bytes}] "
110 f"causes the current_size=[{self._bytes_stored}] to become "
111 f"size_after_write=[{size_after_write}]"
112 )
113 raise OverflowError()
115 write_session.seek(0)
116 self._storage[key] = write_session.read()
117 self._bytes_stored += digest.size_bytes