Coverage for /builds/BuildGrid/buildgrid/buildgrid/client/interceptors.py: 96.23%

53 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from typing import Any, Protocol 

17 

18import grpc 

19from grpc import aio 

20 

21 

22class InterceptorFunc(Protocol): 

23 def amend_call_details( # type: ignore[no-untyped-def] 

24 self, client_call_details, grpc_call_details_class: Any 

25 ) -> Any: ... 

26 

27 

28class SyncUnaryUnaryInterceptor(grpc.UnaryUnaryClientInterceptor): # type: ignore[type-arg] 

29 def __init__(self, func: InterceptorFunc): 

30 self.func = func 

31 

32 def intercept_unary_unary(self, continuation, client_call_details, request): # type: ignore[no-untyped-def] 

33 new_details = self.func.amend_call_details(client_call_details, grpc.ClientCallDetails) 

34 return continuation(new_details, request) 

35 

36 

37class SyncUnaryStreamInterceptor(grpc.UnaryStreamClientInterceptor): # type: ignore[type-arg] 

38 def __init__(self, func: InterceptorFunc): 

39 self.func = func 

40 

41 def intercept_unary_stream(self, continuation, client_call_details, request): # type: ignore[no-untyped-def] 

42 new_details = self.func.amend_call_details(client_call_details, grpc.ClientCallDetails) 

43 return continuation(new_details, request) 

44 

45 

46class SyncStreamUnaryInterceptor(grpc.StreamUnaryClientInterceptor): # type: ignore[type-arg] 

47 def __init__(self, func: InterceptorFunc): 

48 self.func = func 

49 

50 def intercept_stream_unary( # type: ignore[no-untyped-def] 

51 self, continuation, client_call_details, request_iterator 

52 ): 

53 new_details = self.func.amend_call_details(client_call_details, grpc.ClientCallDetails) 

54 return continuation(new_details, request_iterator) 

55 

56 

57class SyncStreamStreamInterceptor(grpc.StreamStreamClientInterceptor): # type: ignore[type-arg] 

58 def __init__(self, func: InterceptorFunc): 

59 self.func = func 

60 

61 def intercept_stream_stream( # type: ignore[no-untyped-def] 

62 self, continuation, client_call_details, request_iterator 

63 ): 

64 new_details = self.func.amend_call_details(client_call_details, grpc.ClientCallDetails) 

65 return continuation(new_details, request_iterator) 

66 

67 

68class AsyncUnaryUnaryInterceptor(aio.UnaryUnaryClientInterceptor): # type: ignore[type-arg] 

69 def __init__(self, func: InterceptorFunc): 

70 self.func = func 

71 

72 async def intercept_unary_unary(self, continuation, client_call_details, request): # type: ignore[no-untyped-def] 

73 new_details = self.func.amend_call_details(client_call_details, aio.ClientCallDetails) 

74 return await continuation(new_details, request) 

75 

76 

77class AsyncUnaryStreamInterceptor(aio.UnaryStreamClientInterceptor): # type: ignore[type-arg] 

78 def __init__(self, func: InterceptorFunc): 

79 self.func = func 

80 

81 async def intercept_unary_stream(self, continuation, client_call_details, request): # type: ignore[no-untyped-def] 

82 new_details = self.func.amend_call_details(client_call_details, aio.ClientCallDetails) 

83 return await continuation(new_details, request) 

84 

85 

86class AsyncStreamUnaryInterceptor(aio.StreamUnaryClientInterceptor): # type: ignore[type-arg] 

87 def __init__(self, func: InterceptorFunc): 

88 self.func = func 

89 

90 async def intercept_stream_unary( # type: ignore[no-untyped-def] 

91 self, continuation, client_call_details, request_iterator 

92 ): 

93 new_details = self.func.amend_call_details(client_call_details, aio.ClientCallDetails) 

94 return await continuation(new_details, request_iterator) 

95 

96 

97class AsyncStreamStreamInterceptor(aio.StreamStreamClientInterceptor): # type: ignore[type-arg] 

98 def __init__(self, func: InterceptorFunc): 

99 self.func = func 

100 

101 async def intercept_stream_stream( # type: ignore[no-untyped-def] 

102 self, continuation, client_call_details, request_iterator 

103 ): 

104 new_details = self.func.amend_call_details(client_call_details, aio.ClientCallDetails) 

105 return await continuation(new_details, request_iterator)