Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import base64 

17from collections import namedtuple 

18import os 

19 

20import grpc 

21 

22from buildgrid._exceptions import InvalidArgumentError 

23from buildgrid.utils import read_file 

24 

25 

26def load_tls_channel_credentials(client_key=None, client_cert=None, server_cert=None): 

27 """Looks-up and loads TLS gRPC client channel credentials. 

28 

29 Args: 

30 client_key(str, optional): Client certificate chain file path. 

31 client_cert(str, optional): Client private key file path. 

32 server_cert(str, optional): Serve root certificate file path. 

33 

34 Returns: 

35 ChannelCredentials: Credentials to be used for a TLS-encrypted gRPC 

36 client channel. 

37 """ 

38 if server_cert and os.path.exists(server_cert): 

39 server_cert_pem = read_file(server_cert) 

40 else: 

41 server_cert_pem = None 

42 server_cert = None 

43 

44 if client_key and os.path.exists(client_key): 

45 client_key_pem = read_file(client_key) 

46 else: 

47 client_key_pem = None 

48 client_key = None 

49 

50 if client_key_pem and client_cert and os.path.exists(client_cert): 

51 client_cert_pem = read_file(client_cert) 

52 else: 

53 client_cert_pem = None 

54 client_cert = None 

55 

56 credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_pem, 

57 private_key=client_key_pem, 

58 certificate_chain=client_cert_pem) 

59 

60 return credentials, (client_key, client_cert, server_cert,) 

61 

62 

63def load_channel_authorization_token(auth_token=None): 

64 """Looks-up and loads client authorization token. 

65 

66 Args: 

67 auth_token (str, optional): Token file path. 

68 

69 Returns: 

70 str: Encoded token string. 

71 """ 

72 if auth_token and os.path.exists(auth_token): 

73 return read_file(auth_token).decode() 

74 

75 # TODO: Try loading the token from a default location? 

76 

77 return None 

78 

79 

80class AuthMetadataClientInterceptor( 

81 grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor, 

82 grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor): 

83 

84 def __init__(self, auth_token=None, auth_secret=None): 

85 """Initialises a new :class:`AuthMetadataClientInterceptor`. 

86 

87 Important: 

88 One of `auth_token` or `auth_secret` must be provided. 

89 

90 Args: 

91 auth_token (str, optional): Authorization token as a string. 

92 auth_secret (str, optional): Authorization secret as a string. 

93 

94 Raises: 

95 InvalidArgumentError: If neither `auth_token` or `auth_secret` are 

96 provided. 

97 """ 

98 if auth_token: 

99 self.__secret = auth_token.strip() 

100 

101 elif auth_secret: 

102 self.__secret = base64.b64encode(auth_secret.strip()) 

103 

104 else: 

105 raise InvalidArgumentError("A secret or token must be provided") 

106 

107 self.__header_field_name = 'authorization' 

108 self.__header_field_value = f'Bearer {self.__secret}' 

109 

110 # --- Public API --- 

111 

112 def intercept_unary_unary(self, continuation, client_call_details, request): 

113 new_details = self._amend_call_details(client_call_details) 

114 

115 return continuation(new_details, request) 

116 

117 def intercept_unary_stream(self, continuation, client_call_details, request): 

118 new_details = self._amend_call_details(client_call_details) 

119 

120 return continuation(new_details, request) 

121 

122 def intercept_stream_unary(self, continuation, client_call_details, request_iterator): 

123 new_details = self._amend_call_details(client_call_details) 

124 

125 return continuation(new_details, request_iterator) 

126 

127 def intercept_stream_stream(self, continuation, client_call_details, request_iterator): 

128 new_details = self._amend_call_details(client_call_details) 

129 

130 return continuation(new_details, request_iterator) 

131 

132 # --- Private API --- 

133 

134 def _amend_call_details(self, client_call_details): 

135 """Appends an authorization field to given client call details.""" 

136 if client_call_details.metadata is not None: 

137 new_metadata = list(client_call_details.metadata) 

138 else: 

139 new_metadata = [] 

140 

141 new_metadata.append((self.__header_field_name, self.__header_field_value,)) 

142 

143 class _ClientCallDetails( 

144 namedtuple('_ClientCallDetails', 

145 ('method', 'timeout', 'credentials', 'metadata',)), 

146 grpc.ClientCallDetails): 

147 pass 

148 

149 return _ClientCallDetails(client_call_details.method, 

150 client_call_details.timeout, 

151 client_call_details.credentials, 

152 new_metadata)