Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/decorators/rpc.py: 100.00%

58 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import functools 

16import inspect 

17from typing import Any, Callable, Iterator, Optional, TypeVar, cast 

18 

19import grpc 

20 

21from buildgrid.server.context import current_instance, current_method, current_service, method_context, service_context 

22from buildgrid.server.logging import buildgrid_logger 

23from buildgrid.server.metadata import extract_client_identity_dict, extract_request_metadata_dict 

24from buildgrid.server.metrics_names import METRIC 

25 

26from .authorize import authorize 

27from .errors import handle_errors 

28from .instance import instanced 

29from .io import network_io 

30from .metadata import metadatacontext 

31from .requestid import track_request_id 

32from .time import timed 

33 

34Func = TypeVar("Func", bound=Callable) # type: ignore[type-arg] 

35LOGGER = buildgrid_logger(__name__) 

36 

37 

38def log_rpc(f: Func) -> Func: 

39 @functools.wraps(f) 

40 def server_stream_wrapper(self: Any, message: Any, context: grpc.ServicerContext) -> Iterator[Any]: 

41 tags = dict( 

42 service=current_service(), 

43 method=current_method(), 

44 peer=context.peer(), 

45 **extract_request_metadata_dict(context.invocation_metadata()), 

46 **extract_client_identity_dict(current_instance(), context.invocation_metadata()), 

47 ) 

48 LOGGER.info("Received request.", tags=tags) 

49 yield from f(self, message, context) 

50 

51 @functools.wraps(f) 

52 def server_unary_wrapper(self: Any, message: Any, context: grpc.ServicerContext) -> Any: 

53 tags = dict( 

54 service=current_service(), 

55 method=current_method(), 

56 peer=context.peer(), 

57 **extract_request_metadata_dict(context.invocation_metadata()), 

58 **extract_client_identity_dict(current_instance(), context.invocation_metadata()), 

59 ) 

60 LOGGER.info("Received request.", tags=tags) 

61 return f(self, message, context) 

62 

63 if inspect.isgeneratorfunction(f): 

64 return cast(Func, server_stream_wrapper) 

65 return cast(Func, server_unary_wrapper) 

66 

67 

68def named_rpc(f: Func) -> Func: 

69 @functools.wraps(f) 

70 def server_stream_wrapper(self: Any, message: Any, context: grpc.ServicerContext) -> Iterator[Any]: 

71 with service_context(self.SERVICE_NAME), method_context(f.__name__): 

72 yield from f(self, message, context) 

73 

74 @functools.wraps(f) 

75 def server_unary_wrapper(self: Any, message: Any, context: grpc.ServicerContext) -> Any: 

76 with service_context(self.SERVICE_NAME), method_context(f.__name__): 

77 return f(self, message, context) 

78 

79 if inspect.isgeneratorfunction(f): 

80 return cast(Func, server_stream_wrapper) 

81 return cast(Func, server_unary_wrapper) 

82 

83 

84def rpc( 

85 *, 

86 instance_getter: Optional[Callable[[Any], str]] = None, 

87 request_formatter: Callable[[Any], Any] = lambda r: str(r), 

88) -> Callable[[Func], Func]: 

89 """ 

90 The RPC decorator provides common functionality to all buildgrid servicer methods. 

91 This decorator should be attached to all endpoints in the application. 

92 

93 All endpoints will produce the following metrics, with a tag "code" for grpc status values, 

94 a tag "service" for the service name of the RPC, and a tag "method" for the RPC method name. 

95 * ``rpc.duration.ms``: The time in milliseconds spent on the method. 

96 * ``rpc.input_bytes.count``: The number of message bytes sent from client to server. 

97 * ``rpc.output_bytes.count``: The number of message bytes sent from server to client. 

98 

99 All other metrics produced during an RPC will have the tags "service" and "method" attached. 

100 

101 Args: 

102 instance_getter (Callable[[Any], str]): Determines how to fetch the instance name 

103 from the request payload. If provided, the tag "instance" will also be applied for all 

104 metrics, and the RPC will be enrolled in authentication/authorization. 

105 

106 request_formatter (Callable[[Any], str]): Determines how to format the request payloads in logs. 

107 """ 

108 

109 def decorator(func: Func) -> Func: 

110 # Note: decorators are applied in reverse order from a normal decorator pattern. 

111 # All decorators that apply context vars are invoked earlier in the call chain 

112 # such that the context vars remain available for logging and metrics population. 

113 func = log_rpc(func) 

114 if instance_getter: 

115 func = authorize(func) 

116 func = handle_errors(request_formatter)(func) 

117 func = network_io(func) 

118 func = timed(METRIC.RPC.DURATION)(func) 

119 func = metadatacontext(func) 

120 func = track_request_id(func) 

121 func = named_rpc(func) 

122 if instance_getter: 

123 func = instanced(instance_getter)(func) 

124 

125 return func 

126 

127 return decorator