Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/redis/provider.py: 58.11%

74 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16from time import sleep 

17from typing import Callable, List, Optional, Tuple, TypeVar 

18 

19import dns.resolver 

20import redis 

21from redis.backoff import EqualJitterBackoff 

22from redis.exceptions import ConnectionError as RedisConnectionError 

23from redis.exceptions import TimeoutError as RedisTimeoutError 

24from redis.sentinel import Sentinel 

25 

26LOGGER = logging.getLogger(__name__) 

27 

28T = TypeVar("T") 

29 

30 

31class RedisProvider: 

32 """Provides and manages a Redis connection 

33 

34 This class manages the connection to a Redis cache. 

35 

36 The connection can be configured by specifying host/port or by specifying 

37 a DNS SRV record to use to discover the host/port. 

38 

39 If a sentinel master name is provided then it is assumed the connection is 

40 to a Redis sentinel and the master and replica clients will be obtained 

41 from the sentinel. 

42 

43 Args: 

44 host (str | None): The hostname of the Redis server to use. 

45 port (int | None): The port that Redis is served on. 

46 password (str | None): The Redis database password to use. 

47 db (int): The Redis database number to use. 

48 dns-srv-record (str): Domain name of SRV record used to discover host/port 

49 sentinel-master-name (str): Service name of Redis master instance, used 

50 in a Redis sentinel configuration 

51 retries (int): Max number of times to retry (default 3). Backoff between retries is about 2^(N-1), 

52 where N is the number of attempts 

53 

54 Raises: 

55 RuntimeError: when unable to resolve a host/port to connect to 

56 

57 """ 

58 

59 def __init__( 

60 self, 

61 *, 

62 host: Optional[str] = None, 

63 port: Optional[int] = None, 

64 password: Optional[str] = None, 

65 db: Optional[int] = None, 

66 dns_srv_record: Optional[str] = None, 

67 sentinel_master_name: Optional[str] = None, 

68 retries: int = 3, 

69 ) -> None: 

70 self._password = password 

71 self._db = db 

72 self._dns_srv_record = dns_srv_record 

73 self._sentinel_master_name = sentinel_master_name 

74 self._retries = retries 

75 self._retriable_errors = (RedisConnectionError, RedisTimeoutError) 

76 

77 self._host_infos: List[Tuple[str, int]] = [] 

78 self._sentinel: Optional[Sentinel] = None 

79 

80 self._socket_timeout = 1.0 

81 

82 if host is not None and port is not None: 

83 self._host_infos = [(host, port)] 

84 

85 if self._dns_srv_record is not None: 

86 self._host_infos = self._perform_dns_srv_request(self._dns_srv_record) 

87 

88 if not self._host_infos: 

89 raise RuntimeError("Either host/port or dns_srv_record must be specified") 

90 

91 self._conns = self._connect() 

92 

93 def _perform_dns_srv_request(self, domain_name: str) -> List[Tuple[str, int]]: 

94 srv_list: List[Tuple[str, int]] = [] 

95 

96 try: 

97 srv_records = dns.resolver.resolve(domain_name, "SRV") 

98 except Exception: 

99 LOGGER.debug("Unable to resolve DNS name") 

100 raise RuntimeError 

101 

102 for srv in srv_records: 

103 srv_list.append((str(srv.target).rstrip("."), srv.port)) 

104 

105 if not srv_list: 

106 raise RuntimeError("Host/port not resolvable from DNS SRV record") 

107 

108 return srv_list 

109 

110 def _connect(self) -> Tuple["redis.Redis[bytes]", "redis.Redis[bytes]"]: 

111 if self._sentinel_master_name is None: 

112 r = redis.Redis( 

113 host=self._host_infos[0][0], 

114 port=self._host_infos[0][1], 

115 socket_timeout=self._socket_timeout, 

116 db=self._db, # type: ignore 

117 password=self._password, 

118 ) 

119 return (r, r) 

120 else: 

121 if not self._sentinel: 

122 self._sentinel = Sentinel( 

123 self._host_infos, 

124 socket_timeout=self._socket_timeout, 

125 db=self._db, 

126 password=self._password, 

127 ) 

128 return ( 

129 self._sentinel.master_for(self._sentinel_master_name, socket_timeout=self._socket_timeout), 

130 self._sentinel.slave_for(self._sentinel_master_name, socket_timeout=self._socket_timeout), 

131 ) 

132 

133 def _reresolve_reconnect(self) -> None: 

134 if self._dns_srv_record: 

135 self._host_infos = self._perform_dns_srv_request(self._dns_srv_record) 

136 self._conns = self._connect() 

137 

138 def execute_rw(self, func: Callable[["redis.Redis[bytes]"], T]) -> T: 

139 """Calls ``func`` with the redis read/write client as argument. 

140 

141 The ``func`` may be called more than once if the host has changed. 

142 """ 

143 retry_num = 0 

144 backoff = EqualJitterBackoff() 

145 while True: 

146 try: 

147 return func(self._conns[0]) 

148 except self._retriable_errors: 

149 retry_num += 1 

150 if retry_num > self._retries: 

151 raise 

152 sleep(backoff.compute(retry_num)) 

153 self._reresolve_reconnect() 

154 

155 def execute_ro(self, func: Callable[["redis.Redis[bytes]"], T]) -> T: 

156 """Calls ``func`` with a redis read-only client as argument. 

157 

158 The ``func`` may be called more than once if the host has changed. 

159 """ 

160 retry_num = 0 

161 backoff = EqualJitterBackoff() 

162 while True: 

163 try: 

164 return func(self._conns[1]) 

165 except self._retriable_errors: 

166 retry_num += 1 

167 if retry_num > self._retries: 

168 raise 

169 sleep(backoff.compute(retry_num)) 

170 self._reresolve_reconnect()