Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/redis/provider.py: 58.11%

74 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from time import sleep 

17from typing import Callable, List, Optional, Tuple, TypeVar 

18 

19import dns.resolver 

20import redis 

21from redis.backoff import EqualJitterBackoff 

22from redis.exceptions import ConnectionError as RedisConnectionError 

23from redis.exceptions import TimeoutError as RedisTimeoutError 

24from redis.sentinel import Sentinel 

25 

26from buildgrid.server.logging import buildgrid_logger 

27 

28LOGGER = buildgrid_logger(__name__) 

29 

30T = TypeVar("T") 

31 

32 

33class RedisProvider: 

34 """Provides and manages a Redis connection 

35 

36 This class manages the connection to a Redis cache. 

37 

38 The connection can be configured by specifying host/port or by specifying 

39 a DNS SRV record to use to discover the host/port. 

40 

41 If a sentinel master name is provided then it is assumed the connection is 

42 to a Redis sentinel and the master and replica clients will be obtained 

43 from the sentinel. 

44 

45 Args: 

46 host (str | None): The hostname of the Redis server to use. 

47 port (int | None): The port that Redis is served on. 

48 password (str | None): The Redis database password to use. 

49 db (int): The Redis database number to use. 

50 dns-srv-record (str): Domain name of SRV record used to discover host/port 

51 sentinel-master-name (str): Service name of Redis master instance, used 

52 in a Redis sentinel configuration 

53 retries (int): Max number of times to retry (default 3). Backoff between retries is about 2^(N-1), 

54 where N is the number of attempts 

55 

56 Raises: 

57 RuntimeError: when unable to resolve a host/port to connect to 

58 

59 """ 

60 

61 def __init__( 

62 self, 

63 *, 

64 host: Optional[str] = None, 

65 port: Optional[int] = None, 

66 password: Optional[str] = None, 

67 db: Optional[int] = None, 

68 dns_srv_record: Optional[str] = None, 

69 sentinel_master_name: Optional[str] = None, 

70 retries: int = 3, 

71 ) -> None: 

72 self._password = password 

73 self._db = db 

74 self._dns_srv_record = dns_srv_record 

75 self._sentinel_master_name = sentinel_master_name 

76 self._retries = retries 

77 self._retriable_errors = (RedisConnectionError, RedisTimeoutError) 

78 

79 self._host_infos: List[Tuple[str, int]] = [] 

80 self._sentinel: Optional[Sentinel] = None 

81 

82 self._socket_timeout = 1.0 

83 

84 if host is not None and port is not None: 

85 self._host_infos = [(host, port)] 

86 

87 if self._dns_srv_record is not None: 

88 self._host_infos = self._perform_dns_srv_request(self._dns_srv_record) 

89 

90 if not self._host_infos: 

91 raise RuntimeError("Either host/port or dns_srv_record must be specified") 

92 

93 self._conns = self._connect() 

94 

95 def _perform_dns_srv_request(self, domain_name: str) -> List[Tuple[str, int]]: 

96 srv_list: List[Tuple[str, int]] = [] 

97 

98 try: 

99 srv_records = dns.resolver.resolve(domain_name, "SRV") 

100 except Exception: 

101 LOGGER.debug("Unable to resolve DNS name.") 

102 raise RuntimeError 

103 

104 for srv in srv_records: 

105 srv_list.append((str(srv.target).rstrip("."), srv.port)) 

106 

107 if not srv_list: 

108 raise RuntimeError("Host/port not resolvable from DNS SRV record") 

109 

110 return srv_list 

111 

112 def _connect(self) -> Tuple["redis.Redis[bytes]", "redis.Redis[bytes]"]: 

113 if self._sentinel_master_name is None: 

114 r = redis.Redis( 

115 host=self._host_infos[0][0], 

116 port=self._host_infos[0][1], 

117 socket_timeout=self._socket_timeout, 

118 db=self._db, # type: ignore 

119 password=self._password, 

120 ) 

121 return (r, r) 

122 else: 

123 if not self._sentinel: 

124 self._sentinel = Sentinel( 

125 self._host_infos, 

126 socket_timeout=self._socket_timeout, 

127 db=self._db, 

128 password=self._password, 

129 ) 

130 return ( 

131 self._sentinel.master_for(self._sentinel_master_name, socket_timeout=self._socket_timeout), 

132 self._sentinel.slave_for(self._sentinel_master_name, socket_timeout=self._socket_timeout), 

133 ) 

134 

135 def _reresolve_reconnect(self) -> None: 

136 if self._dns_srv_record: 

137 self._host_infos = self._perform_dns_srv_request(self._dns_srv_record) 

138 self._conns = self._connect() 

139 

140 def execute_rw(self, func: Callable[["redis.Redis[bytes]"], T]) -> T: 

141 """Calls ``func`` with the redis read/write client as argument. 

142 

143 The ``func`` may be called more than once if the host has changed. 

144 """ 

145 retry_num = 0 

146 backoff = EqualJitterBackoff() 

147 while True: 

148 try: 

149 return func(self._conns[0]) 

150 except self._retriable_errors: 

151 retry_num += 1 

152 if retry_num > self._retries: 

153 raise 

154 sleep(backoff.compute(retry_num)) 

155 self._reresolve_reconnect() 

156 

157 def execute_ro(self, func: Callable[["redis.Redis[bytes]"], T]) -> T: 

158 """Calls ``func`` with a redis read-only client as argument. 

159 

160 The ``func`` may be called more than once if the host has changed. 

161 """ 

162 retry_num = 0 

163 backoff = EqualJitterBackoff() 

164 while True: 

165 try: 

166 return func(self._conns[1]) 

167 except self._retriable_errors: 

168 retry_num += 1 

169 if retry_num > self._retries: 

170 raise 

171 sleep(backoff.compute(retry_num)) 

172 self._reresolve_reconnect()