Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/operations/service.py: 93.88%
49 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17OperationsService
18=================
20"""
22from typing import cast
24import grpc
25from google.protobuf.empty_pb2 import Empty
27from buildgrid._protos.google.longrunning.operations_pb2 import DESCRIPTOR as OPS_DESCRIPTOR
28from buildgrid._protos.google.longrunning.operations_pb2 import (
29 CancelOperationRequest,
30 DeleteOperationRequest,
31 GetOperationRequest,
32 ListOperationsRequest,
33 ListOperationsResponse,
34 Operation,
35)
36from buildgrid._protos.google.longrunning.operations_pb2_grpc import (
37 OperationsServicer,
38 add_OperationsServicer_to_server,
39)
40from buildgrid.server.decorators import rpc
41from buildgrid.server.operations.instance import OperationsInstance
42from buildgrid.server.servicer import InstancedServicer
43from buildgrid.server.settings import CLIENT_IDENTITY_HEADER_NAME, REQUEST_METADATA_HEADER_NAME
46def _parse_instance_name(operation_name: str) -> str:
47 names = operation_name.split("/")
48 return "/".join(names[:-1]) if len(names) > 1 else ""
51def _parse_operation_name(name: str) -> str:
52 names = name.split("/")
53 return names[-1] if len(names) > 1 else name
56class OperationsService(OperationsServicer, InstancedServicer[OperationsInstance]):
57 SERVICE_NAME = "Operations"
58 REGISTER_METHOD = add_OperationsServicer_to_server
59 FULL_NAME = OPS_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name
61 @rpc(instance_getter=lambda r: _parse_instance_name(r.name))
62 def GetOperation(self, request: GetOperationRequest, context: grpc.ServicerContext) -> Operation:
63 operation_name = _parse_operation_name(request.name)
64 operation, metadata, client_identity = self.current_instance.get_operation(operation_name)
65 operation.name = request.name
67 trailing_metadata = []
68 if metadata is not None:
69 trailing_metadata.append((REQUEST_METADATA_HEADER_NAME, metadata.SerializeToString()))
70 if client_identity is not None:
71 trailing_metadata.append((CLIENT_IDENTITY_HEADER_NAME, client_identity.SerializeToString()))
73 if trailing_metadata:
74 context.set_trailing_metadata(trailing_metadata) # type: ignore[arg-type] # tricky covariance issue
76 return operation
78 @rpc(instance_getter=lambda r: cast(str, r.name))
79 def ListOperations(self, request: ListOperationsRequest, context: grpc.ServicerContext) -> ListOperationsResponse:
80 # The request name should be the collection name. In our case, this is just the instance name
81 result = self.current_instance.list_operations(request.filter, request.page_size, request.page_token)
83 for operation in result.operations:
84 operation.name = f"{request.name}/{operation.name}"
86 return result
88 @rpc(instance_getter=lambda r: _parse_instance_name(r.name))
89 def DeleteOperation(self, request: DeleteOperationRequest, context: grpc.ServicerContext) -> Empty:
90 context.set_details("BuildGrid does not support DeleteOperation.")
91 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
92 return Empty()
94 @rpc(instance_getter=lambda r: _parse_instance_name(r.name))
95 def CancelOperation(self, request: CancelOperationRequest, context: grpc.ServicerContext) -> Empty:
96 self.current_instance.cancel_operation(_parse_operation_name(request.name))
97 return Empty()