Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/introspection/service.py: 100.00%
18 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16IntrospectionService
17====================
19Definition of a IntrospectionService used for introspection and querying
20of BuildGrid's internal state.
22"""
25from typing import cast
27import grpc
29from buildgrid._protos.buildgrid.v2.introspection_pb2 import (
30 DESCRIPTOR,
31 GetOperationFiltersRequest,
32 ListWorkersRequest,
33 ListWorkersResponse,
34 OperationFilters,
35)
36from buildgrid._protos.buildgrid.v2.introspection_pb2_grpc import (
37 IntrospectionServicer,
38 add_IntrospectionServicer_to_server,
39)
40from buildgrid.server.decorators import rpc
41from buildgrid.server.introspection.instance import IntrospectionInstance
42from buildgrid.server.servicer import InstancedServicer
45class IntrospectionService(IntrospectionServicer, InstancedServicer[IntrospectionInstance]):
46 SERVICE_NAME = "Introspection"
47 REGISTER_METHOD = add_IntrospectionServicer_to_server
48 FULL_NAME = DESCRIPTOR.services_by_name["Introspection"].full_name
50 @rpc(instance_getter=lambda r: cast(str, r.instance_name))
51 def ListWorkers(self, request: ListWorkersRequest, context: grpc.ServicerContext) -> ListWorkersResponse:
52 return self.current_instance.list_workers(request)
54 @rpc(instance_getter=lambda r: cast(str, r.instance_name))
55 def GetOperationFilters(
56 self, request: GetOperationFiltersRequest, context: grpc.ServicerContext
57 ) -> OperationFilters:
58 return self.current_instance.get_operation_filters()