Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17from typing import TYPE_CHECKING, Optional 

18 

19from buildgrid._protos.build.bazel.remote.logstream.v1.remote_logstream_pb2 import LogStream 

20from buildgrid.server.cas.logstream.stream_storage.stream_storage_abc import StreamStorageABC 

21from buildgrid.server.cas.logstream.stream_storage.memory import MemoryStreamStorage 

22if TYPE_CHECKING: 

23 from buildgrid.server.server import Server 

24 

25 

26class LogStreamInstance: 

27 

28 def __init__(self, prefix: str, storage: Optional[StreamStorageABC]=None): 

29 self._logger = logging.getLogger(__name__) 

30 self.instance_name: Optional[str] = None 

31 self._prefix = prefix 

32 if storage is None: 

33 storage = MemoryStreamStorage() 

34 self._stream_store = storage 

35 

36 def register_instance_with_server(self, instance_name: str, server: 'Server') -> None: 

37 if self.instance_name is None: 

38 server.add_logstream_instance(self, instance_name) 

39 self.instance_name = instance_name 

40 else: 

41 raise AssertionError("Instance already registered") 

42 

43 def create_logstream(self, parent: str) -> LogStream: 

44 return LogStream(**self._stream_store.create_stream(parent, self._prefix)._asdict())