Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/logstream/stream_storage/stream_storage_abc.py: 82.50%
80 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15StreamStorageABC
16====================
18The abstract base class for stream storage providers.
19"""
21import abc
22from typing import Optional, Iterator, NamedTuple
23from uuid import uuid4
25from buildgrid._exceptions import OutOfRangeError
28class StreamHandle(NamedTuple):
29 name: str
30 write_resource_name: str
32 @staticmethod
33 def _create_new(parent: str = '', prefix: str = '') -> 'StreamHandle':
34 read_name = f'{parent}/logStreams/{prefix}{str(uuid4())}'
35 write_name = f'{read_name}/{str(uuid4())}'
36 return StreamHandle(read_name, write_name)
38 @staticmethod
39 def _read_name_from_write_name(write_name: str) -> str:
40 return write_name[:write_name.rfind('/')]
43class StreamLength(NamedTuple):
44 length: int
45 finished: bool
48class StreamChunk(NamedTuple):
49 data: bytes
50 chunk_length: int
51 finished: bool
54class StreamStorageABC(abc.ABC):
55 def create_stream(self, parent: str, prefix: str) -> StreamHandle:
56 """
57 Creates a stream for a specific parent with a specific prefix and a
58 random UUID, and returns a StreamHandle namedtuple containing:
59 name (str): The name of the string for read access
60 write_resource_name (str): The secret name used only for writing
61 to the stream
62 """
63 new_stream_handle = StreamHandle._create_new(parent, prefix)
64 self._create_stream(new_stream_handle)
65 return new_stream_handle
67 @abc.abstractmethod
68 def _create_stream(self, stream_handle: StreamHandle):
69 """
70 Creates a stream with a specific StreamHandle in the StreamStorage
71 """
72 raise NotImplementedError()
74 @abc.abstractmethod
75 def stream_exists(self, stream_name: str) -> bool:
76 """
77 Returns True when there is a stream with the given instance/stream_name.
78 """
79 raise NotImplementedError()
81 @abc.abstractmethod
82 def stream_finished(self, stream_name: str) -> bool:
83 """
84 Returns True/False depending on whether the stream has been marked
85 as completed.
87 Raises:
88 NotFoundError when a stream with the given instance/stream_name
89 does not exist.
90 """
91 raise NotImplementedError()
93 @abc.abstractmethod
94 def stream_length(self, stream_name: str) -> StreamLength:
95 """
96 Returns a namedtuple of type StreamLength for the stream
97 with the given instance/stream_name.
98 length (int): The length of the stream in bytes
99 finished (bool): A boolean indicating whether the stream has finished
101 Raises:
102 NotFoundError when a stream with the given instance/stream_name
103 does not exist.
104 """
105 raise NotImplementedError()
107 def writeable_stream_length(self, write_resource_name: str) -> StreamLength:
108 """
109 Returns a namedtuple of type StreamLength for the stream
110 with the given instance/write_resource_name.
111 length (int): The length of the stream in bytes
112 finished (bool): A boolean indicating whether the stream has finished
114 Raises:
115 NotFoundError when a stream with the given instance/stream_name
116 does not exist.
117 """
118 read_name = StreamHandle._read_name_from_write_name(write_resource_name)
119 return self.stream_length(read_name)
121 def append_to_stream(self, write_resource_name: str, message: Optional[bytes] = None, *,
122 mark_finished: Optional[bool] = False) -> None:
123 """
124 Appends the `message` to the stream with the passed `write_resource_name`
125 as long as the stream has not been marked as finished yet and marks the stream as
126 finished when the relevant arg is set to True.
128 Raises:
129 NotFoundError when a stream with the given instance/write_resource_name
130 does not exist.
131 StreamFinishedError when the stream was already marked as finished and
132 no further writes are allowed.
133 StorageFullError when the backing storage is full.
134 WriteError when the write_resource_name is correct but
135 the write failed for other reasons.
136 """
137 read_name = StreamHandle._read_name_from_write_name(
138 write_resource_name)
139 self._append_to_stream(read_name,
140 write_resource_name,
141 message,
142 mark_finished=mark_finished)
144 @abc.abstractmethod
145 def _append_to_stream(self, stream_name, write_resource_name: str, message: Optional[bytes] = None,
146 *, mark_finished: Optional[bool] = False) -> None:
147 """
148 Appends the `message` to the stream with the passed `name` and `write_resource_name`
149 as long as the stream has not been marked as finished yet and marks the stream as
150 finished when the relevant arg is set to True.
152 Raises:
153 NotFoundError when a stream with the given instance/write_resource_name
154 does not exist.
155 StreamFinishedError when the stream was already marked as finished and
156 no further writes are allowed.
157 StorageFullError when the backing storage is full.
158 WriteError when the write_resource_name is correct but
159 the write failed for other reasons.
160 """
161 raise NotImplementedError()
163 @abc.abstractmethod
164 def read_stream_chunk_blocking(
165 self, stream_name: str, read_offset_bytes: int,
166 read_limit_bytes: Optional[int] = None) -> StreamChunk:
167 """
168 Returns a commited chunk of the stream message or waits until it is available as follows:
169 chunk = message[read_offset_bytes, min(read_limit_bytes, finished_stream_length) )
171 When the `read_limit_bytes` argument is not set, this method will return the whole message
172 starting from the specified offset, blocking until the stream finishes.
174 In cases in which the stream has not finished and the chunk offset and size requested was
175 not commited yet, this method will block and wait until the chunk is commited and/or
176 the stream is finished, returning the appropriate chunk.
178 Raises:
179 NotFoundError when a stream with the given instance/stream_name
180 does not exist.
181 OutOfRangeError when a finished stream does not contain a chunk of data
182 at the specified offset.
183 """
184 raise NotImplementedError()
186 @abc.abstractmethod
187 def read_stream_chunk(self, stream_name: str, read_offset_bytes: int,
188 read_limit_bytes: Optional[int] = None) -> StreamChunk:
189 """
190 Returns a commited chunk of the stream message or raises if it is not available as follows:
191 chunk = message[read_offset_bytes, min(read_limit_bytes, finished_stream_length) )
193 When the `read_limit_bytes` argument is not set, this method will return the whole message
194 starting from the specified offest, raising if the stream hasn't finished.
196 Raises:
197 NotFoundError when a stream with the given instance/stream_name
198 does not exist.
199 StreamWritePendingError when the requested chunk is not fully commited.
200 OutOfRangeError when a finished stream does not contain a chunk of data
201 at the specified offset.
202 """
203 raise NotImplementedError()
205 def read_stream_bytes_blocking_iterator(
206 self,
207 stream_name: str,
208 max_chunk_size: int,
209 offset: int = 0) -> Iterator[bytes]:
210 """
211 An iterator returning commited chunks of up to `chunk_size` until the end of the stream.
212 Blocks and waits until the stream finishes.
214 Can optionally start at a specific `chunk_offset`, which starts streaming data from
215 (`stream_offset = chunk_offset * chunk_size`)
217 Raises:
218 NotFoundError when a stream with the given instance/stream_name
219 does not exist.
220 OutOfRangeError when a finished stream does not contain a chunk of data
221 at the specified offset.
222 """
223 more_to_stream = True
224 while more_to_stream:
225 try:
226 chunk = self.read_stream_chunk_blocking(
227 stream_name, offset, max_chunk_size)
228 if chunk.finished:
229 more_to_stream = False
230 else:
231 offset += chunk.chunk_length
232 yield chunk.data
233 except OutOfRangeError:
234 more_to_stream = False
236 def new_client_streaming(self, stream_name: str):
237 """
238 Inform the StreamStorage backend that a new client
239 is streaming the stream `stream_name` in case
240 it cares about the number of clients streaming a specific
241 stream.
242 This can be useful for e.g. cleaning up old streams.
243 No-op by default.
244 Raises:
245 NotFoundError when a stream with the given instance/stream_name
246 does not exist.
247 """
248 pass
250 def streaming_client_left(self, stream_name: str):
251 """
252 Inform the StreamStorage backend that a client
253 streaming the stream `stream_name` has left, in case
254 it cares about the number of clients streaming a specific
255 stream.
256 This can be useful for e.g. cleaning up old streams.
257 No-op by default.
258 Raises:
259 NotFoundError when a stream with the given instance/stream_name
260 does not/no longer exist(s).
261 """
262 pass
264 def wait_for_streaming_clients(self, write_resource_name: str, timeout: int=300) -> bool:
265 """
266 Returns a boolean specifying if at least one client is streaming this stream
267 within the specified timeout.
269 This allows the writers to only start writing when there are readers
270 interested in the stream.
271 Note: Accepts `write_resource_name` and calls
272 `_wait_for_streaming_clients(read_name)` on the backend.
273 Raises:
274 NotFoundError when a stream with the given instance/stream_name
275 does not exist.
276 """
277 read_name = StreamHandle._read_name_from_write_name(write_resource_name)
278 return self._wait_for_streaming_clients(read_name, timeout)
280 @abc.abstractmethod
281 def _wait_for_streaming_clients(self, stream_name: str, timeout: int) -> bool:
282 """
283 Returns a boolean specifying if at least one client is streaming this stream
284 within the specified timeout.
286 This allows the writers to only start writing when there are readers
287 interested in the stream.
288 Raises:
289 NotFoundError when a stream with the given instance/stream_name
290 does not exist.
291 """
292 raise NotImplementedError()
294 @abc.abstractmethod
295 def delete_stream(self, write_resource_name):
296 raise NotImplementedError()
298 def set_instance_name(self, instance_name: str) -> None:
299 # This method should always get called, so there's no benefit in
300 # adding an __init__ to this abstract class (therefore adding the
301 # need for subclasses to call `super()`) just to define a null
302 # value for this.
303 # pylint: disable=attribute-defined-outside-init
304 self._instance_name: Optional[str] = instance_name