Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/redis/provider.py: 58.11%
74 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2023 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from time import sleep
17from typing import Callable, List, Optional, Tuple, TypeVar
19import dns.resolver
20import redis
21from redis.backoff import EqualJitterBackoff
22from redis.exceptions import ConnectionError as RedisConnectionError
23from redis.exceptions import TimeoutError as RedisTimeoutError
24from redis.sentinel import Sentinel
26from buildgrid.server.logging import buildgrid_logger
28LOGGER = buildgrid_logger(__name__)
30T = TypeVar("T")
33class RedisProvider:
34 """Provides and manages a Redis connection
36 This class manages the connection to a Redis cache.
38 The connection can be configured by specifying host/port or by specifying
39 a DNS SRV record to use to discover the host/port.
41 If a sentinel master name is provided then it is assumed the connection is
42 to a Redis sentinel and the master and replica clients will be obtained
43 from the sentinel.
45 Args:
46 host (str | None): The hostname of the Redis server to use.
47 port (int | None): The port that Redis is served on.
48 password (str | None): The Redis database password to use.
49 db (int): The Redis database number to use.
50 dns-srv-record (str): Domain name of SRV record used to discover host/port
51 sentinel-master-name (str): Service name of Redis master instance, used
52 in a Redis sentinel configuration
53 retries (int): Max number of times to retry (default 3). Backoff between retries is about 2^(N-1),
54 where N is the number of attempts
56 Raises:
57 RuntimeError: when unable to resolve a host/port to connect to
59 """
61 def __init__(
62 self,
63 *,
64 host: Optional[str] = None,
65 port: Optional[int] = None,
66 password: Optional[str] = None,
67 db: Optional[int] = None,
68 dns_srv_record: Optional[str] = None,
69 sentinel_master_name: Optional[str] = None,
70 retries: int = 3,
71 ) -> None:
72 self._password = password
73 self._db = db
74 self._dns_srv_record = dns_srv_record
75 self._sentinel_master_name = sentinel_master_name
76 self._retries = retries
77 self._retriable_errors = (RedisConnectionError, RedisTimeoutError)
79 self._host_infos: List[Tuple[str, int]] = []
80 self._sentinel: Optional[Sentinel] = None
82 self._socket_timeout = 1.0
84 if host is not None and port is not None:
85 self._host_infos = [(host, port)]
87 if self._dns_srv_record is not None:
88 self._host_infos = self._perform_dns_srv_request(self._dns_srv_record)
90 if not self._host_infos:
91 raise RuntimeError("Either host/port or dns_srv_record must be specified")
93 self._conns = self._connect()
95 def _perform_dns_srv_request(self, domain_name: str) -> List[Tuple[str, int]]:
96 srv_list: List[Tuple[str, int]] = []
98 try:
99 srv_records = dns.resolver.resolve(domain_name, "SRV")
100 except Exception:
101 LOGGER.debug("Unable to resolve DNS name.")
102 raise RuntimeError
104 for srv in srv_records:
105 srv_list.append((str(srv.target).rstrip("."), srv.port))
107 if not srv_list:
108 raise RuntimeError("Host/port not resolvable from DNS SRV record")
110 return srv_list
112 def _connect(self) -> Tuple["redis.Redis[bytes]", "redis.Redis[bytes]"]:
113 if self._sentinel_master_name is None:
114 r = redis.Redis(
115 host=self._host_infos[0][0],
116 port=self._host_infos[0][1],
117 socket_timeout=self._socket_timeout,
118 db=self._db, # type: ignore
119 password=self._password,
120 )
121 return (r, r)
122 else:
123 if not self._sentinel:
124 self._sentinel = Sentinel(
125 self._host_infos,
126 socket_timeout=self._socket_timeout,
127 db=self._db,
128 password=self._password,
129 )
130 return (
131 self._sentinel.master_for(self._sentinel_master_name, socket_timeout=self._socket_timeout),
132 self._sentinel.slave_for(self._sentinel_master_name, socket_timeout=self._socket_timeout),
133 )
135 def _reresolve_reconnect(self) -> None:
136 if self._dns_srv_record:
137 self._host_infos = self._perform_dns_srv_request(self._dns_srv_record)
138 self._conns = self._connect()
140 def execute_rw(self, func: Callable[["redis.Redis[bytes]"], T]) -> T:
141 """Calls ``func`` with the redis read/write client as argument.
143 The ``func`` may be called more than once if the host has changed.
144 """
145 retry_num = 0
146 backoff = EqualJitterBackoff()
147 while True:
148 try:
149 return func(self._conns[0])
150 except self._retriable_errors:
151 retry_num += 1
152 if retry_num > self._retries:
153 raise
154 sleep(backoff.compute(retry_num))
155 self._reresolve_reconnect()
157 def execute_ro(self, func: Callable[["redis.Redis[bytes]"], T]) -> T:
158 """Calls ``func`` with a redis read-only client as argument.
160 The ``func`` may be called more than once if the host has changed.
161 """
162 retry_num = 0
163 backoff = EqualJitterBackoff()
164 while True:
165 try:
166 return func(self._conns[1])
167 except self._retriable_errors:
168 retry_num += 1
169 if retry_num > self._retries:
170 raise
171 sleep(backoff.compute(retry_num))
172 self._reresolve_reconnect()