Coverage for /builds/BuildGrid/buildgrid/buildgrid/client/authentication.py: 60.42%

48 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import base64 

17import os 

18from collections import namedtuple 

19from typing import Any, Optional, Tuple, TypedDict 

20 

21import grpc 

22 

23from buildgrid._exceptions import InvalidArgumentError 

24from buildgrid.client.auth_token_loader import AuthTokenLoader 

25from buildgrid.utils import read_file 

26 

27ClientCredentials = TypedDict( 

28 "ClientCredentials", 

29 { 

30 "auth-token": str, 

31 "tls-client-key": str, 

32 "tls-client-cert": str, 

33 "tls-server-cert": str, 

34 "token-refresh-seconds": int, 

35 }, 

36 total=False, 

37) 

38 

39 

40def load_tls_channel_credentials( 

41 client_key: Optional[str] = None, client_cert: Optional[str] = None, server_cert: Optional[str] = None 

42) -> Tuple[grpc.ChannelCredentials, Tuple[Optional[str], Optional[str], Optional[str]]]: 

43 """Looks-up and loads TLS gRPC client channel credentials. 

44 

45 Args: 

46 client_key(str, optional): Client certificate chain file path. 

47 client_cert(str, optional): Client private key file path. 

48 server_cert(str, optional): Serve root certificate file path. 

49 

50 Returns: 

51 ChannelCredentials: Credentials to be used for a TLS-encrypted gRPC 

52 client channel. 

53 """ 

54 if server_cert and os.path.exists(server_cert): 

55 server_cert_pem = read_file(server_cert) 

56 else: 

57 server_cert_pem = None 

58 server_cert = None 

59 

60 if client_key and os.path.exists(client_key): 

61 client_key_pem = read_file(client_key) 

62 else: 

63 client_key_pem = None 

64 client_key = None 

65 

66 if client_key_pem and client_cert and os.path.exists(client_cert): 

67 client_cert_pem = read_file(client_cert) 

68 else: 

69 client_cert_pem = None 

70 client_cert = None 

71 

72 credentials = grpc.ssl_channel_credentials( 

73 root_certificates=server_cert_pem, private_key=client_key_pem, certificate_chain=client_cert_pem 

74 ) 

75 

76 return credentials, ( 

77 client_key, 

78 client_cert, 

79 server_cert, 

80 ) 

81 

82 

83class AuthMetadataClientInterceptorBase: 

84 def __init__( 

85 self, 

86 auth_token_loader: Optional[AuthTokenLoader] = None, 

87 auth_secret: Optional[bytes] = None, 

88 ) -> None: 

89 """Initialises a new :class:`AuthMetadataClientInterceptorBase`. 

90 

91 Important: 

92 One of `auth_token_path` or `auth_secret` must be provided. 

93 

94 Args: 

95 auth_token_loader (AuthTokenLoader, optional): Auth token loader than fetches and passes the token 

96 auth_secret (bytes, optional): Authorization secret as bytes. 

97 

98 Raises: 

99 InvalidArgumentError: If neither `auth_token_loader` or `auth_secret` are 

100 provided. 

101 """ 

102 self._auth_token_loader: Optional[AuthTokenLoader] = None 

103 self.__secret: Optional[str] = None 

104 

105 if auth_token_loader: 

106 self._auth_token_loader = auth_token_loader 

107 

108 elif auth_secret: 

109 self.__secret = base64.b64encode(auth_secret.strip()).decode() 

110 

111 else: 

112 raise InvalidArgumentError("A secret or token must be provided") 

113 

114 self.__header_field_name = "authorization" 

115 

116 def _get_secret(self) -> str: 

117 if self._auth_token_loader: 

118 token = self._auth_token_loader.get_token() 

119 else: 

120 assert self.__secret is not None 

121 token = self.__secret 

122 return f"Bearer {token}" 

123 

124 def amend_call_details( # type: ignore[no-untyped-def] # wait for client lib updates here 

125 self, client_call_details, grpc_call_details_class: Any 

126 ): 

127 """Appends an authorization field to given client call details.""" 

128 if client_call_details.metadata is not None: 

129 new_metadata = list(client_call_details.metadata) 

130 else: 

131 new_metadata = [] 

132 

133 new_metadata.append( 

134 ( 

135 self.__header_field_name, 

136 self._get_secret(), 

137 ) 

138 ) 

139 

140 class _ClientCallDetails( 

141 namedtuple( 

142 "_ClientCallDetails", 

143 ( 

144 "method", 

145 "timeout", 

146 "credentials", 

147 "metadata", 

148 "wait_for_ready", 

149 ), 

150 ), 

151 grpc_call_details_class, # type: ignore 

152 ): 

153 pass 

154 

155 return _ClientCallDetails( 

156 client_call_details.method, 

157 client_call_details.timeout, 

158 client_call_details.credentials, 

159 new_metadata, 

160 client_call_details.wait_for_ready, 

161 )