Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/sql/sqlutils.py: 97.22%

72 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16import random 

17from datetime import datetime, timedelta 

18from threading import Lock 

19from typing import Any, Optional, Tuple 

20 

21from sqlalchemy.engine import Engine 

22from sqlalchemy.orm.session import Session as SessionType 

23 

24LOGGER = logging.getLogger(__name__) 

25 

26 

27def is_sqlite_connection_string(connection_string: str) -> bool: 

28 if connection_string: 

29 return connection_string.startswith("sqlite") 

30 return False 

31 

32 

33def is_psycopg2_connection_string(connection_string: str) -> bool: 

34 if connection_string: 

35 if connection_string.startswith("postgresql:"): 

36 return True 

37 if connection_string.startswith("postgresql+psycopg2:"): 

38 return True 

39 return False 

40 

41 

42def is_sqlite_inmemory_connection_string(full_connection_string: str) -> bool: 

43 if is_sqlite_connection_string(full_connection_string): 

44 # Valid connection_strings for in-memory SQLite which we don't support could look like: 

45 # "sqlite:///file:memdb1?option=value&cache=shared&mode=memory", 

46 # "sqlite:///file:memdb1?mode=memory&cache=shared", 

47 # "sqlite:///file:memdb1?cache=shared&mode=memory", 

48 # "sqlite:///file::memory:?cache=shared", 

49 # "sqlite:///file::memory:", 

50 # "sqlite:///:memory:", 

51 # "sqlite:///", 

52 # "sqlite://" 

53 # ref: https://www.sqlite.org/inmemorydb.html 

54 # Note that a user can also specify drivers, so prefix could become 'sqlite+driver:///' 

55 connection_string = full_connection_string 

56 

57 uri_split_index = connection_string.find("?") 

58 if uri_split_index != -1: 

59 connection_string = connection_string[0:uri_split_index] 

60 

61 if connection_string.endswith((":memory:", ":///", "://")): 

62 return True 

63 elif uri_split_index != -1: 

64 opts = full_connection_string[uri_split_index + 1 :].split("&") 

65 if "mode=memory" in opts: 

66 return True 

67 

68 return False 

69 

70 

71class SQLPoolDisposeHelper: 

72 """Helper class for disposing of SQL session connections""" 

73 

74 def __init__( 

75 self, 

76 cooldown_time_in_secs: int, 

77 cooldown_jitter_base_in_secs: int, 

78 min_time_between_dispose_in_minutes: int, 

79 sql_engine: Engine, 

80 ) -> None: 

81 self._cooldown_time_in_secs = cooldown_time_in_secs 

82 self._cooldown_jitter_base_in_secs = cooldown_jitter_base_in_secs 

83 self._min_time_between_dispose_in_minutes = min_time_between_dispose_in_minutes 

84 self._last_pool_dispose_time: Optional[datetime] = None 

85 self._last_pool_dispose_time_lock = Lock() 

86 self._sql_engine = sql_engine 

87 self._dispose_pool_on_exceptions: Tuple[Any, ...] = tuple() 

88 if self._sql_engine.dialect.name == "postgresql": 

89 import psycopg2 

90 

91 self._dispose_pool_on_exceptions = (psycopg2.errors.ReadOnlySqlTransaction, psycopg2.errors.AdminShutdown) 

92 

93 def check_dispose_pool(self, session: SessionType, e: Exception) -> bool: 

94 """For selected exceptions invalidate the SQL session 

95 - returns True when a transient sql connection error is detected 

96 - returns False otherwise 

97 """ 

98 

99 # Only do this if the config is relevant 

100 if not self._dispose_pool_on_exceptions: 

101 return False 

102 

103 # Make sure we have a SQL-related cause to check, otherwise skip 

104 if e.__cause__ and not isinstance(e.__cause__, Exception): 

105 return False 

106 

107 cause_type = type(e.__cause__) 

108 # Let's see if this exception is related to known disconnect exceptions 

109 is_connection_error = cause_type in self._dispose_pool_on_exceptions 

110 if not is_connection_error: 

111 return False 

112 

113 # Make sure this connection will not be re-used 

114 session.invalidate() 

115 LOGGER.info( 

116 f"Detected a SQL exception=[{cause_type.__name__}] related to the connection. " 

117 "Invalidating this connection." 

118 ) 

119 

120 # Only allow disposal every self.__min_time_between_dispose_in_minutes 

121 now = datetime.utcnow() 

122 only_if_after = None 

123 

124 # Check if we should dispose the rest of the checked in connections 

125 with self._last_pool_dispose_time_lock: 

126 if self._last_pool_dispose_time: 

127 only_if_after = self._last_pool_dispose_time + timedelta( 

128 minutes=self._min_time_between_dispose_in_minutes 

129 ) 

130 if only_if_after and now < only_if_after: 

131 return True 

132 

133 # OK, we haven't disposed the pool recently 

134 self._last_pool_dispose_time = now 

135 LOGGER.info( 

136 "Disposing connection pool and will ask clients to retry until " 

137 f"{self._cooldown_time_in_secs}s from now. This will give new " 

138 "requests a fresh SQL connection." 

139 ) 

140 self._sql_engine.dispose() 

141 

142 return True 

143 

144 def time_until_active_pool(self) -> timedelta: 

145 """The time at which the pool is expected to become 

146 active after a pool disposal. This adds small amounts of jitter 

147 to help spread out load due to retrying clients 

148 """ 

149 if self._last_pool_dispose_time: 

150 time_til_active = self._last_pool_dispose_time + timedelta(seconds=self._cooldown_time_in_secs) 

151 if datetime.utcnow() < time_til_active: 

152 return timedelta( 

153 seconds=self._cooldown_time_in_secs 

154 + random.uniform(-self._cooldown_jitter_base_in_secs, self._cooldown_jitter_base_in_secs) 

155 ) 

156 return timedelta(seconds=0)