Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/client/authentication.py: 56.00%
50 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import base64
17import os
18from collections import namedtuple
19from typing import Any, Optional, Tuple, TypedDict
21import grpc
23from buildgrid.server.client.auth_token_loader import AuthTokenLoader
24from buildgrid.server.exceptions import InvalidArgumentError
26ClientCredentials = TypedDict(
27 "ClientCredentials",
28 {
29 "auth-token": str,
30 "tls-client-key": str,
31 "tls-client-cert": str,
32 "tls-server-cert": str,
33 "token-refresh-seconds": int,
34 },
35 total=False,
36)
39def load_tls_channel_credentials(
40 client_key: Optional[str] = None, client_cert: Optional[str] = None, server_cert: Optional[str] = None
41) -> Tuple[grpc.ChannelCredentials, Tuple[Optional[str], Optional[str], Optional[str]]]:
42 """Looks-up and loads TLS gRPC client channel credentials.
44 Args:
45 client_key(str, optional): Client certificate chain file path.
46 client_cert(str, optional): Client private key file path.
47 server_cert(str, optional): Serve root certificate file path.
49 Returns:
50 ChannelCredentials: Credentials to be used for a TLS-encrypted gRPC
51 client channel.
52 """
53 if server_cert and os.path.exists(server_cert):
54 with open(server_cert, "rb") as f:
55 server_cert_pem = f.read()
56 else:
57 server_cert_pem = None
58 server_cert = None
60 if client_key and os.path.exists(client_key):
61 with open(client_key, "rb") as f:
62 client_key_pem = f.read()
63 else:
64 client_key_pem = None
65 client_key = None
67 if client_key_pem and client_cert and os.path.exists(client_cert):
68 with open(client_cert, "rb") as f:
69 client_cert_pem = f.read()
70 else:
71 client_cert_pem = None
72 client_cert = None
74 credentials = grpc.ssl_channel_credentials(
75 root_certificates=server_cert_pem, private_key=client_key_pem, certificate_chain=client_cert_pem
76 )
78 return credentials, (
79 client_key,
80 client_cert,
81 server_cert,
82 )
85class AuthMetadataClientInterceptorBase:
86 def __init__(
87 self,
88 auth_token_loader: Optional[AuthTokenLoader] = None,
89 auth_secret: Optional[bytes] = None,
90 ) -> None:
91 """Initialises a new :class:`AuthMetadataClientInterceptorBase`.
93 Important:
94 One of `auth_token_path` or `auth_secret` must be provided.
96 Args:
97 auth_token_loader (AuthTokenLoader, optional): Auth token loader than fetches and passes the token
98 auth_secret (bytes, optional): Authorization secret as bytes.
100 Raises:
101 InvalidArgumentError: If neither `auth_token_loader` or `auth_secret` are
102 provided.
103 """
104 self._auth_token_loader: Optional[AuthTokenLoader] = None
105 self.__secret: Optional[str] = None
107 if auth_token_loader:
108 self._auth_token_loader = auth_token_loader
110 elif auth_secret:
111 self.__secret = base64.b64encode(auth_secret.strip()).decode()
113 else:
114 raise InvalidArgumentError("A secret or token must be provided")
116 self.__header_field_name = "authorization"
118 def _get_secret(self) -> str:
119 if self._auth_token_loader:
120 token = self._auth_token_loader.get_token()
121 else:
122 assert self.__secret is not None
123 token = self.__secret
124 return f"Bearer {token}"
126 def amend_call_details( # type: ignore[no-untyped-def] # wait for client lib updates here
127 self, client_call_details, grpc_call_details_class: Any
128 ):
129 """Appends an authorization field to given client call details."""
130 if client_call_details.metadata is not None:
131 new_metadata = list(client_call_details.metadata)
132 else:
133 new_metadata = []
135 new_metadata.append(
136 (
137 self.__header_field_name,
138 self._get_secret(),
139 )
140 )
142 class _ClientCallDetails(
143 namedtuple(
144 "_ClientCallDetails",
145 (
146 "method",
147 "timeout",
148 "credentials",
149 "metadata",
150 "wait_for_ready",
151 ),
152 ),
153 grpc_call_details_class, # type: ignore
154 ):
155 pass
157 return _ClientCallDetails(
158 client_call_details.method,
159 client_call_details.timeout,
160 client_call_details.credentials,
161 new_metadata,
162 client_call_details.wait_for_ready,
163 )