Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/logstream/stream_storage/memory.py: 80.46%
174 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from collections import OrderedDict
16from itertools import islice
17import logging
18from threading import Event, Lock
19from typing import Dict, Optional
21from buildgrid.settings import MAX_LOGSTREAMS_BEFORE_CLEANUP
22from buildgrid._exceptions import NotFoundError, StreamAlreadyExistsError, StreamFinishedError, StreamWritePendingError
23from buildgrid.server.cas.logstream.stream_storage.stream_storage_abc import StreamStorageABC, \
24 StreamHandle, StreamLength, StreamChunk
27class _MemoryStream():
28 def __init__(self, stream_handle: StreamHandle):
29 self._stream_handle = stream_handle
30 self._stream_length = StreamLength(length=0, finished=False)
32 self._read_ready = Event()
33 self._writer_lock = Lock()
34 self._streamlength_snapshot_lock = Lock()
35 self._data: bytearray = bytearray()
37 @property
38 def finished(self):
39 return self._stream_length.finished
41 @property
42 def streamlength(self) -> StreamLength:
43 with self._streamlength_snapshot_lock:
44 return StreamLength(**self._stream_length._asdict())
46 @property
47 def name(self) -> str:
48 return self._stream_handle.name
50 @property
51 def write_name(self) -> str:
52 return self._stream_handle.write_resource_name
54 def append(self, message: Optional[bytes] = None, mark_finished: Optional[bool] = False):
55 if self.streamlength.finished:
56 raise StreamFinishedError(
57 f'The LogStream with the parent=[{self.write_name}] '
58 'has already been marked as finished, cannot append')
60 # Append data and update length/finished
61 with self._writer_lock:
62 if message:
63 self._data.extend(message)
64 with self._streamlength_snapshot_lock:
65 new_length = self._stream_length.length
66 if message:
67 new_length += len(message)
68 new_finished = False
69 if mark_finished:
70 new_finished = True
72 self._stream_length = StreamLength(new_length, new_finished)
74 # Inform anyone that was waiting on data to check
75 self._read_ready.set()
76 self._read_ready.clear()
78 def _get_chunk(self,
79 streamlength_snapshot: StreamLength,
80 offset: int,
81 end_offset: Optional[int] = None) -> Optional[StreamChunk]:
82 if streamlength_snapshot.length >= offset:
83 # If we have a message starting at offset `offset`, return
84 # message[offset:end_offset]
85 # When `end_offset` > `stream_length`, returns message up to
86 # `stream_length`
87 data_chunk = bytes(self._data[offset:end_offset])
88 return StreamChunk(data_chunk,
89 len(data_chunk),
90 streamlength_snapshot.finished)
91 elif streamlength_snapshot.finished:
92 # If we the stream has finished and we didn't get enough bytes,
93 # we are done!
94 raise StreamFinishedError(
95 f'The LogStream with the parent=[{self.write_name}] '
96 'has already been marked as finished and its length is '
97 f'length=[{streamlength_snapshot.length}], '
98 f'cannot read message at offset=[{offset}].')
99 else:
100 return None
102 def read_chunk(self, offset: int = 0, limit: Optional[int] = None) -> StreamChunk:
103 end_offset = None
104 if limit:
105 end_offset = offset + limit
107 streamlength_snapshot = self.streamlength
108 chunk = self._get_chunk(streamlength_snapshot, offset, end_offset)
110 if chunk:
111 return chunk
113 raise StreamWritePendingError(
114 f'Only length=[{streamlength_snapshot.length}] bytes '
115 f'were commited to the LogStream with the parent=[{self.write_name}] '
116 f' so far, writes starting at offset=[{offset}] are still pending.'
117 )
119 def read_chunk_blocking(self,
120 offset: int = 0,
121 limit: Optional[int] = None) -> StreamChunk:
122 max_end_offset = None
123 if limit:
124 max_end_offset = offset + limit
126 streamlength_snapshot = self.streamlength
127 give_up = False
128 while not give_up:
129 give_up = streamlength_snapshot.finished
130 chunk = self._get_chunk(streamlength_snapshot, offset, max_end_offset)
131 if chunk:
132 return chunk
133 if not self._read_ready.wait(timeout=600):
134 break
136 return StreamChunk(data=bytes(),
137 chunk_length=streamlength_snapshot.length,
138 finished=streamlength_snapshot.finished)
141class MemoryStreamStorage(StreamStorageABC):
142 def __init__(self, max_streams_count: int = MAX_LOGSTREAMS_BEFORE_CLEANUP):
143 self._logger = logging.getLogger(__name__)
144 self._streams: Dict[str, _MemoryStream] = {}
145 self._streams_readers: OrderedDict[str, int] = OrderedDict()
146 self._streams_readers_lock = Lock()
147 self._streams_readers_changed_event = Event()
148 self._max_stream_count = max_streams_count
149 self._cleanup_max_batch_size = max_streams_count // 2
151 def _lru_cleanup(self):
152 if len(self._streams_readers) < self._max_stream_count:
153 return
155 self._logger.info(f"Running LRU cleanup of unused LogStreams; "
156 f"max_stream_count=[{self._max_stream_count}],"
157 f"cleanup_max_batch_size=[{self._cleanup_max_batch_size}]")
159 streams_to_remove = []
160 changes = False
161 with self._streams_readers_lock:
162 # Figure out which streams to clean up by iterating through
163 # Do not mutate dictionary during iteration (python doesn't like that)
164 stream_iterator = islice(self._streams_readers.items(), 0, None)
165 while len(streams_to_remove) < self._cleanup_max_batch_size and stream_iterator:
166 try:
167 stream_name, reader_count = next(stream_iterator)
168 except StopIteration:
169 break
171 if reader_count == 0:
172 streams_to_remove.append(stream_name)
174 # Now loop through the list of all streams names
175 # that were marked for removal and actually clean up
176 for stream_name in streams_to_remove:
177 self._delete_stream(stream_name, delete_readers=True)
178 changes = True
180 if len(streams_to_remove) == 0:
181 self._logger.info("Cleaned up n=[0] LogStreams, "
182 "all LogStreams in use.")
183 else:
184 self._logger.info(f"Cleaned up n=[{len(streams_to_remove)}] LogStreams.")
186 if changes:
187 self._streams_readers_changed_event.set()
188 self._streams_readers_changed_event.clear()
190 def _create_stream(self, stream_handle: StreamHandle):
191 if stream_handle.name in self._streams:
192 raise StreamAlreadyExistsError(
193 f'A LogStream with the parent [{stream_handle.name}] already exists, '
194 'and should be used instead of creating a new stream.')
195 self._lru_cleanup()
196 self._streams[stream_handle.name] = _MemoryStream(stream_handle)
197 self._streams_readers[stream_handle.name] = 0
198 self._logger.debug(f'Stream created: {stream_handle}')
200 def stream_exists(self, stream_name: str) -> bool:
201 return stream_name in self._streams
203 def new_client_streaming(self, stream_name: str):
204 if stream_name in self._streams:
205 with self._streams_readers_lock:
206 self._streams_readers[stream_name] += 1
207 self._streams_readers_changed_event.set()
208 self._streams_readers_changed_event.clear()
209 else:
210 raise NotFoundError(
211 f'A LogStream with the parent [{stream_name}] doesn\'t exist.')
213 def streaming_client_left(self, stream_name: str):
214 if stream_name in self._streams:
215 with self._streams_readers_lock:
216 self._streams_readers[stream_name] -= 1
217 self._streams_readers_changed_event.set()
218 self._streams_readers_changed_event.clear()
219 else:
220 raise NotFoundError(
221 f'A LogStream with the parent [{stream_name}] doesn\'t exist.')
223 def stream_finished(self, stream_name: str) -> bool:
224 if stream_name in self._streams:
225 return self._streams[stream_name].finished
226 raise NotFoundError(
227 f'A LogStream with the parent [{stream_name}] doesn\'t exist')
229 def stream_length(self, stream_name: str) -> StreamLength:
230 if stream_name in self._streams:
231 return self._streams[stream_name].streamlength
232 raise NotFoundError(
233 f'A LogStream with the parent [{stream_name}] doesn\'t exist')
235 def _append_to_stream(self, stream_name, write_resource_name: str, message: Optional[bytes] = None,
236 *, mark_finished: Optional[bool] = False) -> None:
237 if stream_name not in self._streams or self._streams[
238 stream_name].write_name != write_resource_name:
239 raise NotFoundError(
240 f'A LogStream with the parent [{write_resource_name}] doesn\'t exist'
241 )
242 self._streams[stream_name].append(message, mark_finished)
244 def read_stream_chunk_blocking(
245 self, stream_name: str, read_offset_bytes: int,
246 read_limit_bytes: Optional[int] = None) -> StreamChunk:
247 if stream_name not in self._streams:
248 raise NotFoundError(
249 f'A LogStream with the parent [{stream_name}] doesn\'t exist')
250 return self._streams[stream_name].read_chunk_blocking(
251 read_offset_bytes, read_limit_bytes)
253 def read_stream_chunk(self, stream_name: str, read_offset_bytes: int,
254 read_limit_bytes: Optional[int] = None) -> StreamChunk:
255 if stream_name not in self._streams:
256 raise NotFoundError(
257 f'A LogStream with the parent [{stream_name}] doesn\'t exist')
258 return self._streams[stream_name].read_chunk(read_offset_bytes, read_limit_bytes)
260 def _delete_stream(self, stream_name: str, delete_readers: bool = True):
261 # Evict actual stream from memory
262 self._logger.debug("Deleting stream: {stream_name}")
263 try:
264 del self._streams[stream_name]
265 except KeyError:
266 # already deleted
267 pass
268 # Evict counter
269 if delete_readers:
270 try:
271 del self._streams_readers[stream_name]
272 except KeyError:
273 pass
275 def _wait_for_streaming_clients(self, stream_name: str, timeout: int) -> bool:
276 if stream_name in self._streams_readers:
277 n_streaming = self._streams_readers.get(stream_name, 0)
278 if n_streaming < 1:
279 self._streams_readers_changed_event.wait(timeout=timeout)
280 try:
281 n_streaming = self._streams_readers[stream_name]
282 except KeyError:
283 raise NotFoundError(
284 f'The Logstream with parent [{stream_name}] was deleted')
285 return bool(n_streaming)
286 raise NotFoundError(
287 f'A LogStream with the parent [{stream_name}] doesn\'t exist')
289 def delete_stream(self, write_resource_name: str):
290 raise NotImplementedError()