Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/logstream/service.py: 93.33%

30 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17 

18from buildgrid._exceptions import InvalidArgumentError 

19from buildgrid._protos.build.bazel.remote.logstream.v1 import remote_logstream_pb2_grpc 

20from buildgrid._protos.build.bazel.remote.logstream.v1.remote_logstream_pb2 import ( 

21 CreateLogStreamRequest, 

22 LogStream 

23) 

24from buildgrid.server._authentication import AuthContext, authorize 

25from buildgrid.server.cas.logstream.instance import LogStreamInstance 

26from buildgrid.server.metrics_names import ( 

27 LOGSTREAM_CREATE_LOG_STREAM_TIME_METRIC_NAME 

28) 

29from buildgrid.server.metrics_utils import DurationMetric 

30 

31 

32class LogStreamService(remote_logstream_pb2_grpc.LogStreamServiceServicer): 

33 

34 def __init__(self, server): 

35 self._logger = logging.getLogger(__name__) 

36 self._instances = {} 

37 

38 remote_logstream_pb2_grpc.add_LogStreamServiceServicer_to_server(self, server) 

39 

40 def add_instance(self, name: str, instance: LogStreamInstance) -> None: 

41 self._instances[name] = instance 

42 

43 @authorize(AuthContext) 

44 @DurationMetric(LOGSTREAM_CREATE_LOG_STREAM_TIME_METRIC_NAME) 

45 def CreateLogStream(self, request: CreateLogStreamRequest, context) -> LogStream: 

46 self._logger.debug(f"CreateLogStream request from [{context.peer()}]") 

47 

48 instance_name = '' 

49 parent = request.parent 

50 if '/' in parent: 

51 instance_name, parent = request.parent.rsplit("/", 1) 

52 instance = self._get_instance(instance_name) 

53 

54 return instance.create_logstream(parent) 

55 

56 def _get_instance(self, instance_name: str): 

57 try: 

58 return self._instances[instance_name] 

59 except KeyError: 

60 raise InvalidArgumentError(f"No instance named [{instance_name}]")