Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/sql/provider.py: 89.68%
126 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2023 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from contextlib import contextmanager
17from datetime import timedelta
18from tempfile import NamedTemporaryFile
19from typing import Any, Dict, Generator, Iterator, List, Optional, Type, Union, cast
21from alembic import command
22from alembic.config import Config
23from importlib_resources import files
24from sqlalchemy import create_engine, event, text
25from sqlalchemy.engine import Engine
26from sqlalchemy.orm import Session, scoped_session, sessionmaker
27from sqlalchemy.pool import NullPool
29from buildgrid.server.exceptions import DatabaseError, RetriableDatabaseError
30from buildgrid.server.logging import buildgrid_logger
31from buildgrid.server.settings import (
32 COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS,
33 COOLDOWN_TIME_JITTER_BASE,
34 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES,
35)
37from .utils import (
38 SQLPoolDisposeHelper,
39 is_psycopg2_connection_string,
40 is_sqlite_connection_string,
41 is_sqlite_inmemory_connection_string,
42)
44LOGGER = buildgrid_logger(__name__)
46# Each dialect has a limit on the number of bind parameters allowed. This
47# matters because it determines how large we can allow our IN clauses to get.
48#
49# SQLite: 1000 https://www.sqlite.org/limits.html#max_variable_number
50# PostgreSQL: 32767 (Int16.MAX_VALUE) https://www.postgresql.org/docs/9.4/protocol-message-formats.html
51#
52# We'll refer to this as the "inlimit" in the code. The inlimits are
53# set to 75% of the bind parameter limit of the implementation.
54DIALECT_INLIMIT_MAP = {"postgresql": 24000, "sqlite": 750}
55DEFAULT_INLIMIT = 100
58# NOTE: Obviously these type annotations are useless, but sadly they're what
59# is in the upstream sqlalchemy2-stubs[0].
60#
61# Once we upgrade to SQLAlchemy 2.0 we can make these more useful, as that
62# version of SQLAlchemy has sensible type annotations[1].
63#
64# [0]: https://github.com/sqlalchemy/sqlalchemy2-stubs/blob/main/sqlalchemy-stubs/pool/events.pyi#L9
65# [1]: https://github.com/sqlalchemy/sqlalchemy/blob/main/lib/sqlalchemy/pool/events.py#L96-L100
66def _sqlite_on_connect(conn: Any, record: Any) -> None:
67 """SQLite ``PRAGMA`` statements to execute immediately upon connection.
69 These statements configure the behaviour of the database, and are specific
70 to SQLite.
72 See https://www.sqlite.org/pragma.html for details.
74 Args:
75 conn (DBAPIConnection): The DBAPI connection that was just connected.
76 record (_ConnectionRecord): The connection record which contains the
77 DBAPI connection.
79 """
80 # Use journal_mode=WAL to allow read/write concurrency, as well as the
81 # performance improvements it brings.
82 conn.execute("PRAGMA journal_mode=WAL")
83 conn.execute("PRAGMA synchronous=NORMAL")
86class SqlProvider:
87 """Class which provides an interface for interacting with an SQL database.
89 This class is used to allow configuration of a per-process SQL connection
90 pool, which can then be shared across multiple components of BuildGrid
91 which require an SQL connection.
93 Args:
94 automigrate (bool): Whether or not to ensure the database is fully
95 migrated when starting up. Defaults to ``False``, meaning the
96 database is assumed to be at the up to date already.
98 connection_string (str | None): The connection string to use when
99 creating a database connection. If ``None`` then a temporary
100 SQLite database will be created for the lifetime of this
101 object.
103 connection_timeout (int): The timeout to use when attempting to
104 connect to the database, in seconds. Defaults to 5 seconds if
105 unset.
107 lock_timeout (int): The timeout to use when the connection
108 holds a lock in the database. This is supported only if the database
109 backend is PostgresQL.
111 connect_args (Dict[str, Any] | None): Dictionary of DBAPI
112 connection arguments to pass to the engine. See the
113 SQLAlchemy `docs`_ for details.
115 max_overflow (int | None): The number of connections to allow
116 as "overflow" connections in the connection pool. This is
117 the number of connections which will be able to be opened
118 above the value of ``pool_size``.
120 pool_pre_ping (bool | None): Whether or not to test connections
121 for liveness on checkout from the connection pool.
123 pool_recycle (int | None): The number of seconds after which to
124 recycle database connections. If ``None`` (the default) then
125 connections won't be recycled (the SQLAlchemy default value
126 of -1 will be used).
128 pool_size (int | None): The number of connections to keep open
129 inside the engine's connection pool.
131 pool_timeout (int | None): The number of seconds to wait when
132 attempting to get a connection from the connection pool.
134 Raises:
135 ValueError: when ``connection_string`` specifies an in-memory SQLite
136 database.
138 .. _docs: https://docs.sqlalchemy.org/en/14/core/engines.html#use-the-connect-args-dictionary-parameter
140 """
142 def __init__(
143 self,
144 *,
145 automigrate: bool = False,
146 connection_string: Optional[str] = None,
147 connection_timeout: int = 5,
148 lock_timeout: int = 5,
149 connect_args: Optional[Dict[Any, Any]] = None,
150 max_overflow: Optional[int] = None,
151 pool_pre_ping: Optional[bool] = None,
152 pool_recycle: Optional[int] = None,
153 pool_size: Optional[int] = None,
154 pool_timeout: Optional[int] = None,
155 ):
156 """Initialize an SqlProvider."""
157 self._database_tempfile = None
158 # If we don't have a connection string, we'll make a tempfile to use
159 # as an SQLite database. This tempfile needs to exist for the lifetime
160 # of the SqlProvider.
161 if not connection_string:
162 self._database_tempfile = NamedTemporaryFile(prefix="bgd-", suffix=".db")
163 LOGGER.warning(
164 "No connection string specified for the SQL provider, will use SQLite with tempfile.",
165 tags=dict(tempfile=self._database_tempfile.name),
166 )
167 automigrate = True # since this is a temporary database, we always need to create it
168 connection_string = f"sqlite:///{self._database_tempfile.name}"
170 # Set up database connection
171 self._session_factory = sessionmaker(future=True)
172 self._scoped_session_factory = scoped_session(self._session_factory)
174 self._engine = self._create_sqlalchemy_engine(
175 connection_string,
176 connection_timeout,
177 lock_timeout=lock_timeout,
178 connect_args=connect_args,
179 max_overflow=max_overflow,
180 pool_pre_ping=pool_pre_ping,
181 pool_recycle=pool_recycle,
182 pool_size=pool_size,
183 pool_timeout=pool_timeout,
184 )
186 LOGGER.info("Created SQL provider.", tags=dict(automigrate=automigrate, connection=self._engine.url))
188 if automigrate:
189 self._create_or_migrate_db()
191 self._sql_pool_dispose_helper = SQLPoolDisposeHelper(
192 COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS,
193 COOLDOWN_TIME_JITTER_BASE,
194 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES,
195 self._engine,
196 )
198 def _create_sqlalchemy_engine(
199 self,
200 connection_string: str,
201 connection_timeout: int,
202 lock_timeout: int,
203 *,
204 connect_args: Optional[Dict[Any, Any]] = None,
205 max_overflow: Optional[int] = None,
206 pool_pre_ping: Optional[bool] = None,
207 pool_recycle: Optional[int] = None,
208 pool_size: Optional[int] = None,
209 pool_timeout: Optional[int] = None,
210 ) -> Engine:
211 """Create the SQLAlchemy Engine.
213 Args:
214 connection_string: The connection string to use when
215 creating the ``Engine``.
217 connection_timeout: The timeout to use for database
218 connections, in seconds. If set as 0, no timeout
219 is applied.
221 lock_timeout (int): The timeout to use when the connection
222 holds a lock in the database. This is supported only if the database
223 backend is PostgresQL.
225 connect_args: Dictionary of DBAPI
226 connection arguments to pass to the engine. See the
227 SQLAlchemy `docs`_ for details.
229 max_overflow: The number of connections to allow
230 as "overflow" connections in the connection pool. This is
231 the number of connections which will be able to be opened
232 above the value of ``pool_size``.
234 pool_pre_ping: Whether or not to test connections
235 for liveness on checkout from the connection pool.
237 pool_recycle: The number of seconds after which to
238 recycle database connections. If ``None`` (the default) then
239 connections won't be recycled (the SQLAlchemy default value
240 of -1 will be used).
242 pool_size: The number of connections to keep open
243 inside the engine's connection pool.
244 If set as zero, no connection pool is created
245 and other pool_* parameters are ignored.
247 pool_timeout: The number of seconds to wait when
248 attempting to get a connection from the connection pool.
250 Returns:
251 A :class:`sqlalchemy.engine.Engine` set up to connect to the
252 database defined by ``connection_string``.
254 Raises:
255 ValueError: when attempting to connect to an in-memory SQLite
256 database.
258 .. _docs: https://docs.sqlalchemy.org/en/14/core/engines.html#use-the-connect-args-dictionary-parameter
260 """
261 # Disallow sqlite in-memory because multi-threaded access to it is
262 # complex and potentially problematic at best
263 # ref: https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#threading-pooling-behavior
264 if is_sqlite_inmemory_connection_string(connection_string):
265 raise ValueError(
266 "Cannot use SQLite in-memory with BuildGrid "
267 f"(connection_string=[{connection_string}]). Use a file or "
268 "leave the connection_string empty for a tempfile."
269 )
271 extra_engine_args: Dict[str, Any] = {}
272 if connect_args is not None:
273 extra_engine_args["connect_args"] = connect_args
274 else:
275 extra_engine_args["connect_args"] = {}
277 if connection_timeout > 0:
278 if is_sqlite_connection_string(connection_string):
279 extra_engine_args["connect_args"]["timeout"] = connection_timeout
280 elif is_psycopg2_connection_string(connection_string):
281 extra_engine_args["connect_args"]["connect_timeout"] = connection_timeout
282 if lock_timeout > 0 and is_psycopg2_connection_string(connection_string):
283 # Additional postgres specific timeouts
284 # Additional libpg options
285 # Note that those timeouts are in milliseconds (so *1000)
286 # User might specifically set options... do not override in this case.
287 extra_engine_args["connect_args"].setdefault("options", f"-c lock_timeout={lock_timeout * 1000}")
289 if pool_size is not None and pool_size == 0:
290 LOGGER.debug("No connection pool is created.")
291 extra_engine_args["poolclass"] = NullPool
292 else:
293 if max_overflow is not None:
294 extra_engine_args["max_overflow"] = max_overflow
295 if pool_pre_ping is not None:
296 extra_engine_args["pool_pre_ping"] = pool_pre_ping
297 if pool_recycle is not None:
298 extra_engine_args["pool_recycle"] = pool_recycle
299 if pool_size is not None:
300 extra_engine_args["pool_size"] = pool_size
301 if pool_timeout is not None:
302 extra_engine_args["pool_timeout"] = pool_timeout
304 LOGGER.debug(f"Additional SQLAlchemy Engine args: [{extra_engine_args}]")
306 engine = create_engine(connection_string, echo=False, future=True, **extra_engine_args)
307 self._session_factory.configure(bind=engine)
309 # Register sqlite-specific connection callback.
310 if engine.dialect.name == "sqlite":
311 event.listen(engine, "connect", _sqlite_on_connect)
313 return cast(Engine, engine)
315 def _create_or_migrate_db(self) -> None:
316 """Ensure that the database schema is up to date.
318 This method runs the Alembic ``upgrade`` command to ensure that all of
319 the database migrations have been run and the schema is up to date.
321 .. warning::
323 Do not run this method concurrently against a single database, at
324 risk of migrations conflicting with themselves and causing at least
325 one of the callers to fail.
327 """
328 LOGGER.warning("Will attempt migration to latest version if needed.")
330 config: Config = Config()
331 config.set_main_option("script_location", str(files("buildgrid.server.sql").joinpath("alembic")))
333 with self._engine.begin() as connection:
334 # NOTE: pylint doesn't like this for some reason, but it is the
335 # documented way to set the connection.
336 # https://alembic.sqlalchemy.org/en/latest/api/config.html#alembic.config.Config
337 config.attributes["connection"] = connection
338 command.upgrade(config, "head")
340 @property
341 def dialect(self) -> str:
342 """The SQL dialect in use by the configured SQL engine."""
343 return self._engine.dialect.name
345 @property
346 def default_inlimit(self) -> int:
347 """Return the default inlimit size based on the current SQL dialect"""
348 return DIALECT_INLIMIT_MAP.get(self.dialect, DEFAULT_INLIMIT)
350 @contextmanager
351 def session(
352 self,
353 *,
354 scoped: bool = False,
355 sqlite_lock_immediately: bool = False,
356 exceptions_to_not_raise_on: Optional[List[Type[Exception]]] = None,
357 exceptions_to_not_rollback_on: Optional[List[Type[Exception]]] = None,
358 expire_on_commit: bool = True,
359 ) -> Iterator[Session]:
360 """ContextManager yielding an ORM ``Session`` for the configured database.
362 The :class:`sqlalchemy.orm.Session` lives for the duration of the
363 managed context, and any open transaction is committed upon exiting
364 the context.
366 This method can potentially block for a short while before yielding
367 if the underlying connection pool has recently been disposed of and
368 refreshed due to connectivity issues.
370 When ``sqlite_lock_immediately`` is ``True``, the Session will not
371 yield until the database has been locked by entering into a write
372 transaction when using SQLite.
374 If an Exception is raised whilst in the managed context, the ongoing
375 database transaction is rolled back, and the Exception is reraised.
376 Some Exceptions which suggest a transient connection issue with the
377 database lead to a ``RetriableDatabaseError`` being raised from the
378 Exception instead.
380 ``exceptions_to_not_raise_on`` defines a list of SQLAlchemyError types
381 which should be suppressed instead of re-raised when occurring within
382 the managed context.
384 Similarly, ``exceptions_to_not_rollback_on`` defines a list of
385 SQLAlchemyError types which will not trigger a transaction rollback
386 when occuring within the managed context. Instead, the open transaction
387 will be committed and the session closed.
389 Args:
390 scoped: If true, use a ``scoped_session`` factory to create the
391 session. This results in reuse of the underlying Session object
392 in a given thread.
394 sqlite_lock_immediately: If true, execute a ``BEGIN IMMEDIATE``
395 statement as soon as the session is created when using SQLite.
396 This allows locking for the lifetime of the ``Session`` within
397 this ContextManager, enabling similar behaviour to
398 ``SELECT ... FOR UPDATE`` in other dialects. Defaults to
399 ``False``.
401 exceptions_to_not_raise_on: The list of error types to be suppressed
402 within the context rather than re-raised. Defaults to ``None``,
403 meaning all SQLAlchemyErrors will be re-raised.
405 exceptions_to_not_rollback_on: The list
406 of error types which shouldn't trigger a transaction rollback.
407 Defaults to ``None``, meaning all SQLAlchemyErrors will trigger
408 rollback of the transaction.
410 expire_on_commit: Defaults to True. When True, all instances will
411 be fully expired after each commit(), so that all attribute/object
412 access subsequent to a completed transaction will load from
413 the most recent database state. This flag is ignored if
414 ``scoped == True``
416 Yields:
417 A :class:`sqlalchemy.orm.Session` object.
419 Raises:
420 DatabaseError: when a database session cannot be obtained.
422 RetriableDatabaseError: when the database connection is temporarily
423 interrupted, but can be expected to recover.
425 Exception: Any Exception raised within the context will be re-raised
426 unless it's type is included in the ``exceptions_to_not_raise_on``
427 parameter.
429 """
430 if exceptions_to_not_raise_on is None:
431 exceptions_to_not_raise_on = []
432 if exceptions_to_not_rollback_on is None:
433 exceptions_to_not_rollback_on = []
435 factory: "Union[scoped_session, sessionmaker[Session]]" = self._session_factory
436 if scoped:
437 factory = self._scoped_session_factory
439 # If we recently disposed of the SQL pool due to connection issues
440 # ask the client to try again when it's expected to be working again
441 time_til_retry = self._sql_pool_dispose_helper.time_until_active_pool()
442 if time_til_retry > timedelta(seconds=0):
443 raise RetriableDatabaseError(
444 "Database connection was temporarily interrupted, please retry", time_til_retry
445 )
447 # Try to obtain a session
448 try:
449 session = factory() if scoped else factory(expire_on_commit=expire_on_commit)
450 if sqlite_lock_immediately and session.bind.name == "sqlite": # type: ignore
451 session.execute(text("BEGIN IMMEDIATE"))
452 except Exception as e:
453 LOGGER.error("Unable to obtain a database session.", exc_info=True)
454 raise DatabaseError("Unable to obtain a database session.") from e
456 # Yield the session and catch exceptions that occur while using it
457 # to roll-back if needed
458 try:
459 yield session
460 session.commit()
461 except Exception as e:
462 transient_dberr = self._sql_pool_dispose_helper.check_dispose_pool(session, e)
463 if type(e) in exceptions_to_not_rollback_on:
464 try:
465 session.commit()
466 except Exception:
467 pass
468 else:
469 session.rollback()
470 if transient_dberr:
471 LOGGER.warning("Rolling back database session due to transient database error.", exc_info=True)
472 else:
473 LOGGER.error("Error committing database session. Rolling back.", exc_info=True)
474 if type(e) not in exceptions_to_not_raise_on:
475 if transient_dberr:
476 # Ask the client to retry when the pool is expected to be healthy again
477 raise RetriableDatabaseError(
478 "Database connection was temporarily interrupted, please retry",
479 self._sql_pool_dispose_helper.time_until_active_pool(),
480 ) from e
481 raise
482 finally:
483 session.close()
485 @contextmanager
486 def scoped_session(
487 self,
488 *,
489 sqlite_lock_immediately: bool = False,
490 exceptions_to_not_raise_on: Optional[List[Type[Exception]]] = None,
491 exceptions_to_not_rollback_on: Optional[List[Type[Exception]]] = None,
492 ) -> Generator[Session, None, None]:
493 """ContextManager providing a thread-local ORM session for the database.
495 This is a shorthand for ``SqlProvider.session(scoped=True)``.
497 This ContextManager provides a reusable thread-local
498 :class:`sqlalchemy.orm.Session` object. Once the ``Session`` has been
499 created by the initial call, subsequent calls to this method from
500 within a given thread will return the same ``Session`` object until
501 :meth:`SqlProvider.remove_scoped_session` is called.
503 Args:
504 See :meth:`SqlProvider.session` for further details.
506 Yields:
507 A persistent thread-local :class:`sqlalchemy.orm.Session`.
509 """
510 with self.session(
511 scoped=True,
512 sqlite_lock_immediately=sqlite_lock_immediately,
513 exceptions_to_not_raise_on=exceptions_to_not_raise_on,
514 exceptions_to_not_rollback_on=exceptions_to_not_rollback_on,
515 ) as session:
516 yield session
518 def remove_scoped_session(self) -> None:
519 """Remove the thread-local session, if any."""
520 self._scoped_session_factory.remove()