Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/decorators/rpc.py: 100.00%
58 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import functools
16import inspect
17from typing import Any, Callable, Iterator, Optional, TypeVar, cast
19import grpc
21from buildgrid.server.context import current_instance, current_method, current_service, method_context, service_context
22from buildgrid.server.logging import buildgrid_logger
23from buildgrid.server.metadata import extract_client_identity_dict, extract_request_metadata_dict
24from buildgrid.server.metrics_names import METRIC
26from .authorize import authorize
27from .errors import handle_errors
28from .instance import instanced
29from .io import network_io
30from .metadata import metadatacontext
31from .requestid import track_request_id
32from .time import timed
34Func = TypeVar("Func", bound=Callable) # type: ignore[type-arg]
35LOGGER = buildgrid_logger(__name__)
38def log_rpc(f: Func) -> Func:
39 @functools.wraps(f)
40 def server_stream_wrapper(self: Any, message: Any, context: grpc.ServicerContext) -> Iterator[Any]:
41 tags = dict(
42 service=current_service(),
43 method=current_method(),
44 peer=context.peer(),
45 **extract_request_metadata_dict(context.invocation_metadata()),
46 **extract_client_identity_dict(current_instance(), context.invocation_metadata()),
47 )
48 LOGGER.info("Received request.", tags=tags)
49 yield from f(self, message, context)
51 @functools.wraps(f)
52 def server_unary_wrapper(self: Any, message: Any, context: grpc.ServicerContext) -> Any:
53 tags = dict(
54 service=current_service(),
55 method=current_method(),
56 peer=context.peer(),
57 **extract_request_metadata_dict(context.invocation_metadata()),
58 **extract_client_identity_dict(current_instance(), context.invocation_metadata()),
59 )
60 LOGGER.info("Received request.", tags=tags)
61 return f(self, message, context)
63 if inspect.isgeneratorfunction(f):
64 return cast(Func, server_stream_wrapper)
65 return cast(Func, server_unary_wrapper)
68def named_rpc(f: Func) -> Func:
69 @functools.wraps(f)
70 def server_stream_wrapper(self: Any, message: Any, context: grpc.ServicerContext) -> Iterator[Any]:
71 with service_context(self.SERVICE_NAME), method_context(f.__name__):
72 yield from f(self, message, context)
74 @functools.wraps(f)
75 def server_unary_wrapper(self: Any, message: Any, context: grpc.ServicerContext) -> Any:
76 with service_context(self.SERVICE_NAME), method_context(f.__name__):
77 return f(self, message, context)
79 if inspect.isgeneratorfunction(f):
80 return cast(Func, server_stream_wrapper)
81 return cast(Func, server_unary_wrapper)
84def rpc(
85 *,
86 instance_getter: Optional[Callable[[Any], str]] = None,
87 request_formatter: Callable[[Any], Any] = lambda r: str(r),
88) -> Callable[[Func], Func]:
89 """
90 The RPC decorator provides common functionality to all buildgrid servicer methods.
91 This decorator should be attached to all endpoints in the application.
93 All endpoints will produce the following metrics, with a tag "code" for grpc status values,
94 a tag "service" for the service name of the RPC, and a tag "method" for the RPC method name.
95 * ``rpc.duration.ms``: The time in milliseconds spent on the method.
96 * ``rpc.input_bytes.count``: The number of message bytes sent from client to server.
97 * ``rpc.output_bytes.count``: The number of message bytes sent from server to client.
99 All other metrics produced during an RPC will have the tags "service" and "method" attached.
101 Args:
102 instance_getter (Callable[[Any], str]): Determines how to fetch the instance name
103 from the request payload. If provided, the tag "instance" will also be applied for all
104 metrics, and the RPC will be enrolled in authentication/authorization.
106 request_formatter (Callable[[Any], str]): Determines how to format the request payloads in logs.
107 """
109 def decorator(func: Func) -> Func:
110 # Note: decorators are applied in reverse order from a normal decorator pattern.
111 # All decorators that apply context vars are invoked earlier in the call chain
112 # such that the context vars remain available for logging and metrics population.
113 func = log_rpc(func)
114 if instance_getter:
115 func = authorize(func)
116 func = handle_errors(request_formatter)(func)
117 func = network_io(func)
118 func = timed(METRIC.RPC.DURATION)(func)
119 func = metadatacontext(func)
120 func = track_request_id(func)
121 func = named_rpc(func)
122 if instance_getter:
123 func = instanced(instance_getter)(func)
125 return func
127 return decorator