Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/client/authentication.py: 56.00%

50 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import base64 

17import os 

18from collections import namedtuple 

19from typing import Any, Optional, Tuple, TypedDict 

20 

21import grpc 

22 

23from buildgrid.server.client.auth_token_loader import AuthTokenLoader 

24from buildgrid.server.exceptions import InvalidArgumentError 

25 

26ClientCredentials = TypedDict( 

27 "ClientCredentials", 

28 { 

29 "auth-token": str, 

30 "tls-client-key": str, 

31 "tls-client-cert": str, 

32 "tls-server-cert": str, 

33 "token-refresh-seconds": int, 

34 }, 

35 total=False, 

36) 

37 

38 

39def load_tls_channel_credentials( 

40 client_key: Optional[str] = None, client_cert: Optional[str] = None, server_cert: Optional[str] = None 

41) -> Tuple[grpc.ChannelCredentials, Tuple[Optional[str], Optional[str], Optional[str]]]: 

42 """Looks-up and loads TLS gRPC client channel credentials. 

43 

44 Args: 

45 client_key(str, optional): Client certificate chain file path. 

46 client_cert(str, optional): Client private key file path. 

47 server_cert(str, optional): Serve root certificate file path. 

48 

49 Returns: 

50 ChannelCredentials: Credentials to be used for a TLS-encrypted gRPC 

51 client channel. 

52 """ 

53 if server_cert and os.path.exists(server_cert): 

54 with open(server_cert, "rb") as f: 

55 server_cert_pem = f.read() 

56 else: 

57 server_cert_pem = None 

58 server_cert = None 

59 

60 if client_key and os.path.exists(client_key): 

61 with open(client_key, "rb") as f: 

62 client_key_pem = f.read() 

63 else: 

64 client_key_pem = None 

65 client_key = None 

66 

67 if client_key_pem and client_cert and os.path.exists(client_cert): 

68 with open(client_cert, "rb") as f: 

69 client_cert_pem = f.read() 

70 else: 

71 client_cert_pem = None 

72 client_cert = None 

73 

74 credentials = grpc.ssl_channel_credentials( 

75 root_certificates=server_cert_pem, private_key=client_key_pem, certificate_chain=client_cert_pem 

76 ) 

77 

78 return credentials, ( 

79 client_key, 

80 client_cert, 

81 server_cert, 

82 ) 

83 

84 

85class AuthMetadataClientInterceptorBase: 

86 def __init__( 

87 self, 

88 auth_token_loader: Optional[AuthTokenLoader] = None, 

89 auth_secret: Optional[bytes] = None, 

90 ) -> None: 

91 """Initialises a new :class:`AuthMetadataClientInterceptorBase`. 

92 

93 Important: 

94 One of `auth_token_path` or `auth_secret` must be provided. 

95 

96 Args: 

97 auth_token_loader (AuthTokenLoader, optional): Auth token loader than fetches and passes the token 

98 auth_secret (bytes, optional): Authorization secret as bytes. 

99 

100 Raises: 

101 InvalidArgumentError: If neither `auth_token_loader` or `auth_secret` are 

102 provided. 

103 """ 

104 self._auth_token_loader: Optional[AuthTokenLoader] = None 

105 self.__secret: Optional[str] = None 

106 

107 if auth_token_loader: 

108 self._auth_token_loader = auth_token_loader 

109 

110 elif auth_secret: 

111 self.__secret = base64.b64encode(auth_secret.strip()).decode() 

112 

113 else: 

114 raise InvalidArgumentError("A secret or token must be provided") 

115 

116 self.__header_field_name = "authorization" 

117 

118 def _get_secret(self) -> str: 

119 if self._auth_token_loader: 

120 token = self._auth_token_loader.get_token() 

121 else: 

122 assert self.__secret is not None 

123 token = self.__secret 

124 return f"Bearer {token}" 

125 

126 def amend_call_details( # type: ignore[no-untyped-def] # wait for client lib updates here 

127 self, client_call_details, grpc_call_details_class: Any 

128 ): 

129 """Appends an authorization field to given client call details.""" 

130 if client_call_details.metadata is not None: 

131 new_metadata = list(client_call_details.metadata) 

132 else: 

133 new_metadata = [] 

134 

135 new_metadata.append( 

136 ( 

137 self.__header_field_name, 

138 self._get_secret(), 

139 ) 

140 ) 

141 

142 class _ClientCallDetails( 

143 namedtuple( 

144 "_ClientCallDetails", 

145 ( 

146 "method", 

147 "timeout", 

148 "credentials", 

149 "metadata", 

150 "wait_for_ready", 

151 ), 

152 ), 

153 grpc_call_details_class, # type: ignore 

154 ): 

155 pass 

156 

157 return _ClientCallDetails( 

158 client_call_details.method, 

159 client_call_details.timeout, 

160 client_call_details.credentials, 

161 new_metadata, 

162 client_call_details.wait_for_ready, 

163 )