Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/client/logstream.py: 48.39%
31 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-01-03 18:01 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-01-03 18:01 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from contextlib import contextmanager
17from typing import Iterator
19import grpc
20from grpc import RpcError
22from buildgrid._protos.build.bazel.remote.logstream.v1.remote_logstream_pb2 import CreateLogStreamRequest, LogStream
23from buildgrid._protos.build.bazel.remote.logstream.v1.remote_logstream_pb2_grpc import LogStreamServiceStub
24from buildgrid.server.app.commands.cmd_logstream import instanced_resource_name
25from buildgrid.server.logging import buildgrid_logger
27LOGGER = buildgrid_logger(__name__)
30class LogStreamClient:
31 def __init__(self, channel: grpc.Channel, instance_name: str = "") -> None:
32 self._channel = channel
33 self._instance_name = instance_name
34 self._logstream_stub: LogStreamServiceStub | None = LogStreamServiceStub(self._channel)
36 def create(self, parent: str) -> LogStream:
37 assert self._logstream_stub, "LogStreamClient used after close"
39 parent = instanced_resource_name(self._instance_name, parent)
40 request = CreateLogStreamRequest(parent=parent)
41 try:
42 return self._logstream_stub.CreateLogStream(request)
43 except RpcError as e:
44 LOGGER.exception(f"Error creating a LogStream: {e.details()}")
45 raise ConnectionError(e.details())
47 def close(self) -> None:
48 self._logstream_stub = None
51@contextmanager
52def logstream_client(channel: grpc.Channel, instance_name: str) -> Iterator[LogStreamClient]:
53 client = LogStreamClient(channel, instance_name)
54 try:
55 yield client
56 finally:
57 client.close()