Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/introspection/service.py: 100.00%

18 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16IntrospectionService 

17==================== 

18 

19Definition of a IntrospectionService used for introspection and querying 

20of BuildGrid's internal state. 

21 

22""" 

23 

24 

25from typing import cast 

26 

27import grpc 

28 

29from buildgrid._protos.buildgrid.v2.introspection_pb2 import ( 

30 DESCRIPTOR, 

31 GetOperationFiltersRequest, 

32 ListWorkersRequest, 

33 ListWorkersResponse, 

34 OperationFilters, 

35) 

36from buildgrid._protos.buildgrid.v2.introspection_pb2_grpc import ( 

37 IntrospectionServicer, 

38 add_IntrospectionServicer_to_server, 

39) 

40from buildgrid.server.decorators import rpc 

41from buildgrid.server.introspection.instance import IntrospectionInstance 

42from buildgrid.server.servicer import InstancedServicer 

43 

44 

45class IntrospectionService(IntrospectionServicer, InstancedServicer[IntrospectionInstance]): 

46 SERVICE_NAME = "Introspection" 

47 REGISTER_METHOD = add_IntrospectionServicer_to_server 

48 FULL_NAME = DESCRIPTOR.services_by_name["Introspection"].full_name 

49 

50 @rpc(instance_getter=lambda r: cast(str, r.instance_name)) 

51 def ListWorkers(self, request: ListWorkersRequest, context: grpc.ServicerContext) -> ListWorkersResponse: 

52 return self.current_instance.list_workers(request) 

53 

54 @rpc(instance_getter=lambda r: cast(str, r.instance_name)) 

55 def GetOperationFilters( 

56 self, request: GetOperationFiltersRequest, context: grpc.ServicerContext 

57 ) -> OperationFilters: 

58 return self.current_instance.get_operation_filters()