Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/sql/sqlutils.py: 90.41%
73 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2021 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import logging
16import time
18from typing import Any, Tuple, Optional
20from threading import Lock, Event
21from datetime import datetime, timedelta
23from sqlalchemy.orm.session import Session as SessionType
26def is_sqlite_connection_string(connection_string: str) -> bool:
27 if connection_string:
28 return connection_string.startswith("sqlite")
29 return False
32def is_psycopg2_connection_string(connection_string: str) -> bool:
33 if connection_string:
34 if connection_string.startswith("postgresql:"):
35 return True
36 if connection_string.startswith("postgresql+psycopg2:"):
37 return True
38 return False
41def is_sqlite_inmemory_connection_string(full_connection_string: str) -> bool:
42 if is_sqlite_connection_string(full_connection_string):
43 # Valid connection_strings for in-memory SQLite which we don't support could look like:
44 # "sqlite:///file:memdb1?option=value&cache=shared&mode=memory",
45 # "sqlite:///file:memdb1?mode=memory&cache=shared",
46 # "sqlite:///file:memdb1?cache=shared&mode=memory",
47 # "sqlite:///file::memory:?cache=shared",
48 # "sqlite:///file::memory:",
49 # "sqlite:///:memory:",
50 # "sqlite:///",
51 # "sqlite://"
52 # ref: https://www.sqlite.org/inmemorydb.html
53 # Note that a user can also specify drivers, so prefix could become 'sqlite+driver:///'
54 connection_string = full_connection_string
56 uri_split_index = connection_string.find("?")
57 if uri_split_index != -1:
58 connection_string = connection_string[0:uri_split_index]
60 if connection_string.endswith((":memory:", ":///", "://")):
61 return True
62 elif uri_split_index != -1:
63 opts = full_connection_string[uri_split_index + 1:].split("&")
64 if "mode=memory" in opts:
65 return True
67 return False
70class SQLPoolDisposeHelper():
71 """ Helper class for disposing of SQL session connections """
73 def __init__(self, cooldown_time_in_secs: int,
74 min_time_between_dispose_in_minutes: int,
75 sql_engine) -> None:
76 self._logger = logging.getLogger(__name__)
78 self._cooldown_time_in_secs = cooldown_time_in_secs
79 self._min_time_between_dispose_in_minutes = min_time_between_dispose_in_minutes
80 self._last_pool_dispose_time: Optional[datetime] = None
81 self._last_pool_dispose_time_lock = Lock()
82 # When this event is set, SQL queries are executed right away.
83 # During pool disposal, this event is unset for a period of time and new SQL queries
84 # wait until it's set (or a timeout) before they attempt to execute the query.
85 self._pool_dispose_cooldown_event = Event()
86 self._pool_dispose_cooldown_event.set()
87 self._sql_engine = sql_engine
88 self._dispose_pool_on_exceptions: Tuple[Any, ...] = tuple()
89 if self._sql_engine.dialect.name == 'postgresql':
90 import psycopg2 # pylint: disable=import-outside-toplevel
91 self._dispose_pool_on_exceptions = (psycopg2.errors.ReadOnlySqlTransaction, psycopg2.errors.AdminShutdown)
93 def check_dispose_pool(self, session: SessionType, e: Exception) -> bool:
94 """ For selected exceptions invalidate the SQL session and then sleep
95 - returns True when a transient sql connection error is detected
96 - returns False otherwise
97 """
99 # Only do this if the config is relevant
100 if not self._dispose_pool_on_exceptions:
101 return False
103 # Make sure we have a SQL-related cause to check, otherwise skip
104 if e.__cause__ and not isinstance(e.__cause__, Exception):
105 return False
107 cause_type = type(e.__cause__)
108 # Let's see if this exception is related to known disconnect exceptions
109 is_connection_error = cause_type in self._dispose_pool_on_exceptions
110 if not is_connection_error:
111 return False
113 # Make sure this connection will not be re-used
114 session.invalidate()
115 self._logger.info(
116 f'Detected a SQL exception=[{cause_type.__name__}] related to the connection. '
117 'Invalidating this connection.'
118 )
120 # Only allow disposal every self.__min_time_between_dispose_in_minutes
121 now = datetime.utcnow()
122 only_if_after = None
124 # Check if we should dispose the rest of the checked in connections
125 with self._last_pool_dispose_time_lock:
126 if self._last_pool_dispose_time:
127 only_if_after = self._last_pool_dispose_time + \
128 timedelta(minutes=self._min_time_between_dispose_in_minutes)
129 if only_if_after and now < only_if_after:
130 return True
132 # OK, we haven't disposed the pool recently
133 self._last_pool_dispose_time = now
134 self._logger.info('Disposing connection pool and will make new requests wait until '
135 f'{self._cooldown_time_in_secs}s from now before attempting'
136 ' to reconnect. This will give new requests a fresh SQL connection.')
137 self._sql_engine.dispose()
139 # Clear the cooldown event to inform all subsequent SQL sessions to wait a bit
140 self._pool_dispose_cooldown_event.clear()
141 time.sleep(self._cooldown_time_in_secs)
142 self._pool_dispose_cooldown_event.set()
144 return True
146 def wait_if_cooldown_in_effect(self):
147 # If we recently disposed of the SQL pool due to connection issues
148 # allow for some cooldown period before we attempt more SQL
149 if not self._pool_dispose_cooldown_event.is_set():
150 self._pool_dispose_cooldown_event.wait(timeout=self._cooldown_time_in_secs)
151 # Set this anyway, just in case the thread responsible for this failed
152 self._pool_dispose_cooldown_event.set()