Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/logstream/instance.py : 72.73%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import logging
17from typing import TYPE_CHECKING, Optional
19from buildgrid._protos.build.bazel.remote.logstream.v1.remote_logstream_pb2 import LogStream
20from buildgrid.server.cas.logstream.stream_storage.stream_storage_abc import StreamStorageABC
21from buildgrid.server.cas.logstream.stream_storage.memory import MemoryStreamStorage
22if TYPE_CHECKING:
23 from buildgrid.server.server import Server
26class LogStreamInstance:
28 def __init__(self, prefix: str, storage: Optional[StreamStorageABC]=None):
29 self._logger = logging.getLogger(__name__)
30 self.instance_name: Optional[str] = None
31 self._prefix = prefix
32 if storage is None:
33 storage = MemoryStreamStorage()
34 self._stream_store = storage
36 def register_instance_with_server(self, instance_name: str, server: 'Server') -> None:
37 if self.instance_name is None:
38 server.add_logstream_instance(self, instance_name)
39 self.instance_name = instance_name
40 else:
41 raise AssertionError("Instance already registered")
43 def create_logstream(self, parent: str) -> LogStream:
44 return LogStream(**self._stream_store.create_stream(parent, self._prefix)._asdict())