Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/lru_memory_cache.py: 94.64%
112 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17LRUMemoryCache
18==================
20A storage provider that stores data in memory. When the size limit
21is reached, items are deleted from the cache with the least recently
22used item being deleted first.
23"""
25import collections
26import io
27import threading
28from typing import IO, Any, Dict, List, Optional, Tuple
30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
31from buildgrid._protos.google.rpc import code_pb2
32from buildgrid._protos.google.rpc.status_pb2 import Status
33from buildgrid.server.decorators import timed
34from buildgrid.server.logging import buildgrid_logger
35from buildgrid.server.metrics_names import METRIC
36from buildgrid.server.settings import HASH
38from .storage_abc import StorageABC
40LOGGER = buildgrid_logger(__name__)
43class _NullBytesIO(io.BufferedIOBase):
44 """A file-like object that discards all data written to it."""
46 def writable(self) -> bool:
47 return True
49 # TODO how to type an override here? __buffer: bytes | bytearray | memoryview | array | mmap
50 def write(self, b: Any) -> int:
51 return len(b)
54class LRUMemoryCache(StorageABC):
55 TYPE = "LRU"
57 def __init__(self, limit: int) -> None:
58 self._limit = limit
59 self._storage: "collections.OrderedDict[Tuple[str, int], bytes]" = collections.OrderedDict()
60 self._bytes_stored = 0
61 self._lock = threading.Lock()
63 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
64 def has_blob(self, digest: Digest) -> bool:
65 LOGGER.debug("Checking for blob.", tags=dict(digest=digest))
66 with self._lock:
67 return self._has_blob(digest)
69 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
70 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
71 with self._lock:
72 return [digest for digest in digests if not self._has_blob(digest)]
74 def _has_blob(self, digest: Digest) -> bool:
75 key = (digest.hash, digest.size_bytes)
76 if key in self._storage:
77 self._storage.move_to_end(key)
78 return True
79 return False
81 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
82 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
83 LOGGER.debug("Getting blob.", tags=dict(digest=digest))
84 with self._lock:
85 if (result := self._get_blob(digest)) is not None:
86 return io.BytesIO(result)
87 return None
89 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
90 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
91 with self._lock:
92 return {digest.hash: result for digest in digests if (result := self._get_blob(digest)) is not None}
94 def _get_blob(self, digest: Digest) -> Optional[bytes]:
95 key = (digest.hash, digest.size_bytes)
96 if key in self._storage:
97 self._storage.move_to_end(key)
98 return self._storage[key]
99 return None
101 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
102 def delete_blob(self, digest: Digest) -> None:
103 LOGGER.debug("Deleting blob.", tags=dict(digest=digest))
104 with self._lock:
105 self._delete_blob(digest)
107 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE)
108 def bulk_delete(self, digests: List[Digest]) -> List[str]:
109 with self._lock:
110 for digest in digests:
111 self._delete_blob(digest)
112 return []
114 def _delete_blob(self, digest: Digest) -> None:
115 if self._storage.pop((digest.hash, digest.size_bytes), None):
116 self._bytes_stored -= digest.size_bytes
118 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
119 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
120 with self._lock:
121 self._commit_write(digest, write_session)
123 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
124 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
125 with self._lock:
126 result = []
127 for digest, data in blobs:
128 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash:
129 result.append(Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash"))
130 continue
131 try:
132 self._commit_write(digest, io.BytesIO(data))
133 result.append(Status(code=code_pb2.OK))
134 except Exception as e:
135 result.append(Status(code=code_pb2.UNKNOWN, message=str(e)))
136 return result
138 def _commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
139 LOGGER.debug("Writing blob.", tags=dict(digest=digest))
140 if digest.size_bytes > self._limit:
141 # We can't cache this object, so return without doing anything.
142 return
144 key = (digest.hash, digest.size_bytes)
145 if key in self._storage:
146 # Digest already in cache, mark it as recently used
147 self._storage.move_to_end(key)
148 return
150 size_after_write = self._bytes_stored + digest.size_bytes
151 if size_after_write > self._limit:
152 # Delete stuff until there's enough space to write this blob
153 LOGGER.debug(
154 "LRU cleanup triggered.",
155 tags=dict(current_size=self._bytes_stored, limit=self._limit, additional_bytes=digest.size_bytes),
156 )
157 while size_after_write > self._limit:
158 deleted_key = self._storage.popitem(last=False)[0]
159 self._bytes_stored -= deleted_key[1]
160 size_after_write -= deleted_key[1]
161 LOGGER.debug("LRU cleanup finished.", tags=dict(current_size=self._bytes_stored))
162 elif size_after_write < 0:
163 # This should never happen
164 LOGGER.error(
165 "LRU overflow writing a additional bytes.",
166 tags=dict(
167 digest=digest,
168 additional_bytes=digest.size_bytes,
169 current_size=self._bytes_stored,
170 size_after_write=size_after_write,
171 ),
172 )
173 raise OverflowError()
175 write_session.seek(0)
176 self._storage[key] = write_session.read()
177 self._bytes_stored += digest.size_bytes