Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/operations/service.py: 93.88%

49 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17OperationsService 

18================= 

19 

20""" 

21 

22from typing import cast 

23 

24import grpc 

25from google.protobuf.empty_pb2 import Empty 

26 

27from buildgrid._protos.google.longrunning.operations_pb2 import DESCRIPTOR as OPS_DESCRIPTOR 

28from buildgrid._protos.google.longrunning.operations_pb2 import ( 

29 CancelOperationRequest, 

30 DeleteOperationRequest, 

31 GetOperationRequest, 

32 ListOperationsRequest, 

33 ListOperationsResponse, 

34 Operation, 

35) 

36from buildgrid._protos.google.longrunning.operations_pb2_grpc import ( 

37 OperationsServicer, 

38 add_OperationsServicer_to_server, 

39) 

40from buildgrid.server.decorators import rpc 

41from buildgrid.server.operations.instance import OperationsInstance 

42from buildgrid.server.servicer import InstancedServicer 

43from buildgrid.server.settings import CLIENT_IDENTITY_HEADER_NAME, REQUEST_METADATA_HEADER_NAME 

44 

45 

46def _parse_instance_name(operation_name: str) -> str: 

47 names = operation_name.split("/") 

48 return "/".join(names[:-1]) if len(names) > 1 else "" 

49 

50 

51def _parse_operation_name(name: str) -> str: 

52 names = name.split("/") 

53 return names[-1] if len(names) > 1 else name 

54 

55 

56class OperationsService(OperationsServicer, InstancedServicer[OperationsInstance]): 

57 SERVICE_NAME = "Operations" 

58 REGISTER_METHOD = add_OperationsServicer_to_server 

59 FULL_NAME = OPS_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name 

60 

61 @rpc(instance_getter=lambda r: _parse_instance_name(r.name)) 

62 def GetOperation(self, request: GetOperationRequest, context: grpc.ServicerContext) -> Operation: 

63 operation_name = _parse_operation_name(request.name) 

64 operation, metadata, client_identity = self.current_instance.get_operation(operation_name) 

65 operation.name = request.name 

66 

67 trailing_metadata = [] 

68 if metadata is not None: 

69 trailing_metadata.append((REQUEST_METADATA_HEADER_NAME, metadata.SerializeToString())) 

70 if client_identity is not None: 

71 trailing_metadata.append((CLIENT_IDENTITY_HEADER_NAME, client_identity.SerializeToString())) 

72 

73 if trailing_metadata: 

74 context.set_trailing_metadata(trailing_metadata) # type: ignore[arg-type] # tricky covariance issue 

75 

76 return operation 

77 

78 @rpc(instance_getter=lambda r: cast(str, r.name)) 

79 def ListOperations(self, request: ListOperationsRequest, context: grpc.ServicerContext) -> ListOperationsResponse: 

80 # The request name should be the collection name. In our case, this is just the instance name 

81 result = self.current_instance.list_operations(request.filter, request.page_size, request.page_token) 

82 

83 for operation in result.operations: 

84 operation.name = f"{request.name}/{operation.name}" 

85 

86 return result 

87 

88 @rpc(instance_getter=lambda r: _parse_instance_name(r.name)) 

89 def DeleteOperation(self, request: DeleteOperationRequest, context: grpc.ServicerContext) -> Empty: 

90 context.set_details("BuildGrid does not support DeleteOperation.") 

91 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

92 return Empty() 

93 

94 @rpc(instance_getter=lambda r: _parse_instance_name(r.name)) 

95 def CancelOperation(self, request: CancelOperationRequest, context: grpc.ServicerContext) -> Empty: 

96 self.current_instance.cancel_operation(_parse_operation_name(request.name)) 

97 return Empty()