Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/sql/sqlutils.py: 90.41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

73 statements  

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16import time 

17 

18from typing import Any, Tuple, Optional 

19 

20from threading import Lock, Event 

21from datetime import datetime, timedelta 

22 

23from sqlalchemy.orm.session import Session as SessionType 

24 

25 

26def is_sqlite_connection_string(connection_string: str) -> bool: 

27 if connection_string: 

28 return connection_string.startswith("sqlite") 

29 return False 

30 

31 

32def is_psycopg2_connection_string(connection_string: str) -> bool: 

33 if connection_string: 

34 if connection_string.startswith("postgresql:"): 

35 return True 

36 if connection_string.startswith("postgresql+psycopg2:"): 

37 return True 

38 return False 

39 

40 

41def is_sqlite_inmemory_connection_string(full_connection_string: str) -> bool: 

42 if is_sqlite_connection_string(full_connection_string): 

43 # Valid connection_strings for in-memory SQLite which we don't support could look like: 

44 # "sqlite:///file:memdb1?option=value&cache=shared&mode=memory", 

45 # "sqlite:///file:memdb1?mode=memory&cache=shared", 

46 # "sqlite:///file:memdb1?cache=shared&mode=memory", 

47 # "sqlite:///file::memory:?cache=shared", 

48 # "sqlite:///file::memory:", 

49 # "sqlite:///:memory:", 

50 # "sqlite:///", 

51 # "sqlite://" 

52 # ref: https://www.sqlite.org/inmemorydb.html 

53 # Note that a user can also specify drivers, so prefix could become 'sqlite+driver:///' 

54 connection_string = full_connection_string 

55 

56 uri_split_index = connection_string.find("?") 

57 if uri_split_index != -1: 

58 connection_string = connection_string[0:uri_split_index] 

59 

60 if connection_string.endswith((":memory:", ":///", "://")): 

61 return True 

62 elif uri_split_index != -1: 

63 opts = full_connection_string[uri_split_index + 1:].split("&") 

64 if "mode=memory" in opts: 

65 return True 

66 

67 return False 

68 

69 

70class SQLPoolDisposeHelper(): 

71 """ Helper class for disposing of SQL session connections """ 

72 

73 def __init__(self, cooldown_time_in_secs: int, 

74 min_time_between_dispose_in_minutes: int, 

75 sql_engine) -> None: 

76 self._logger = logging.getLogger(__name__) 

77 

78 self._cooldown_time_in_secs = cooldown_time_in_secs 

79 self._min_time_between_dispose_in_minutes = min_time_between_dispose_in_minutes 

80 self._last_pool_dispose_time: Optional[datetime] = None 

81 self._last_pool_dispose_time_lock = Lock() 

82 # When this event is set, SQL queries are executed right away. 

83 # During pool disposal, this event is unset for a period of time and new SQL queries 

84 # wait until it's set (or a timeout) before they attempt to execute the query. 

85 self._pool_dispose_cooldown_event = Event() 

86 self._pool_dispose_cooldown_event.set() 

87 self._sql_engine = sql_engine 

88 self._dispose_pool_on_exceptions: Tuple[Any, ...] = tuple() 

89 if self._sql_engine.dialect.name == 'postgresql': 

90 import psycopg2 # pylint: disable=import-outside-toplevel 

91 self._dispose_pool_on_exceptions = (psycopg2.errors.ReadOnlySqlTransaction, psycopg2.errors.AdminShutdown) 

92 

93 def check_dispose_pool(self, session: SessionType, e: Exception) -> bool: 

94 """ For selected exceptions invalidate the SQL session and then sleep 

95 - returns True when a transient sql connection error is detected 

96 - returns False otherwise 

97 """ 

98 

99 # Only do this if the config is relevant 

100 if not self._dispose_pool_on_exceptions: 

101 return False 

102 

103 # Make sure we have a SQL-related cause to check, otherwise skip 

104 if e.__cause__ and not isinstance(e.__cause__, Exception): 

105 return False 

106 

107 cause_type = type(e.__cause__) 

108 # Let's see if this exception is related to known disconnect exceptions 

109 is_connection_error = cause_type in self._dispose_pool_on_exceptions 

110 if not is_connection_error: 

111 return False 

112 

113 # Make sure this connection will not be re-used 

114 session.invalidate() 

115 self._logger.info( 

116 f'Detected a SQL exception=[{cause_type.__name__}] related to the connection. ' 

117 'Invalidating this connection.' 

118 ) 

119 

120 # Only allow disposal every self.__min_time_between_dispose_in_minutes 

121 now = datetime.utcnow() 

122 only_if_after = None 

123 

124 # Check if we should dispose the rest of the checked in connections 

125 with self._last_pool_dispose_time_lock: 

126 if self._last_pool_dispose_time: 

127 only_if_after = self._last_pool_dispose_time + \ 

128 timedelta(minutes=self._min_time_between_dispose_in_minutes) 

129 if only_if_after and now < only_if_after: 

130 return True 

131 

132 # OK, we haven't disposed the pool recently 

133 self._last_pool_dispose_time = now 

134 self._logger.info('Disposing connection pool and will make new requests wait until ' 

135 f'{self._cooldown_time_in_secs}s from now before attempting' 

136 ' to reconnect. This will give new requests a fresh SQL connection.') 

137 self._sql_engine.dispose() 

138 

139 # Clear the cooldown event to inform all subsequent SQL sessions to wait a bit 

140 self._pool_dispose_cooldown_event.clear() 

141 time.sleep(self._cooldown_time_in_secs) 

142 self._pool_dispose_cooldown_event.set() 

143 

144 return True 

145 

146 def wait_if_cooldown_in_effect(self): 

147 # If we recently disposed of the SQL pool due to connection issues 

148 # allow for some cooldown period before we attempt more SQL 

149 if not self._pool_dispose_cooldown_event.is_set(): 

150 self._pool_dispose_cooldown_event.wait(timeout=self._cooldown_time_in_secs) 

151 # Set this anyway, just in case the thread responsible for this failed 

152 self._pool_dispose_cooldown_event.set()