Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/sql/provider.py: 92.25%
129 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-28 16:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-28 16:48 +0000
1# Copyright (C) 2023 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from contextlib import contextmanager
17from datetime import timedelta
18from tempfile import NamedTemporaryFile
19from threading import Lock
20from typing import Any, Generator, Iterator
22from sqlalchemy import create_engine, event, text
23from sqlalchemy.engine import Engine
24from sqlalchemy.orm import Session, scoped_session, sessionmaker
25from sqlalchemy.pool import NullPool
27from buildgrid.server.exceptions import DatabaseError, RetriableDatabaseError
28from buildgrid.server.logging import buildgrid_logger
29from buildgrid.server.metrics_names import METRIC
30from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric
31from buildgrid.server.settings import (
32 COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS,
33 COOLDOWN_TIME_JITTER_BASE,
34 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES,
35)
36from buildgrid.server.sql.models import Base
38from .utils import (
39 SQLPoolDisposeHelper,
40 is_psycopg2_connection_string,
41 is_sqlite_connection_string,
42 is_sqlite_inmemory_connection_string,
43)
45LOGGER = buildgrid_logger(__name__)
47# Each dialect has a limit on the number of bind parameters allowed. This
48# matters because it determines how large we can allow our IN clauses to get.
49#
50# SQLite: 1000 https://www.sqlite.org/limits.html#max_variable_number
51# PostgreSQL: 32767 (Int16.MAX_VALUE) https://www.postgresql.org/docs/9.4/protocol-message-formats.html
52#
53# We'll refer to this as the "inlimit" in the code. The inlimits are
54# set to 75% of the bind parameter limit of the implementation.
55DIALECT_INLIMIT_MAP = {"postgresql": 24000, "sqlite": 750}
56DEFAULT_INLIMIT = 100
59# NOTE: Obviously these type annotations are useless, but sadly they're what
60# is in the upstream sqlalchemy2-stubs[0].
61#
62# Once we upgrade to SQLAlchemy 2.0 we can make these more useful, as that
63# version of SQLAlchemy has sensible type annotations[1].
64#
65# [0]: https://github.com/sqlalchemy/sqlalchemy2-stubs/blob/main/sqlalchemy-stubs/pool/events.pyi#L9
66# [1]: https://github.com/sqlalchemy/sqlalchemy/blob/main/lib/sqlalchemy/pool/events.py#L96-L100
67def _sqlite_on_connect(conn: Any, record: Any) -> None:
68 """SQLite ``PRAGMA`` statements to execute immediately upon connection.
70 These statements configure the behaviour of the database, and are specific
71 to SQLite.
73 See https://www.sqlite.org/pragma.html for details.
75 Args:
76 conn (DBAPIConnection): The DBAPI connection that was just connected.
77 record (_ConnectionRecord): The connection record which contains the
78 DBAPI connection.
80 """
81 # Use journal_mode=WAL to allow read/write concurrency, as well as the
82 # performance improvements it brings.
83 conn.execute("PRAGMA journal_mode=WAL")
84 conn.execute("PRAGMA synchronous=NORMAL")
87class SqlProvider:
88 """Class which provides an interface for interacting with an SQL database.
90 This class is used to allow configuration of a per-process SQL connection
91 pool, which can then be shared across multiple components of BuildGrid
92 which require an SQL connection.
94 Args:
95 connection_string (str | None): The connection string to use when
96 creating a database connection. If ``None`` then a temporary
97 SQLite database will be created for the lifetime of this
98 object.
100 connection_timeout (int): The timeout to use when attempting to
101 connect to the database, in seconds. Defaults to 5 seconds if
102 unset.
104 lock_timeout (int): The timeout to use when the connection
105 holds a lock in the database. This is supported only if the database
106 backend is PostgresQL.
108 connect_args (dict[str, Any] | None): Dictionary of DBAPI
109 connection arguments to pass to the engine. See the
110 SQLAlchemy `docs`_ for details.
112 max_overflow (int | None): The number of connections to allow
113 as "overflow" connections in the connection pool. This is
114 the number of connections which will be able to be opened
115 above the value of ``pool_size``.
117 pool_pre_ping (bool | None): Whether or not to test connections
118 for liveness on checkout from the connection pool.
120 pool_recycle (int | None): The number of seconds after which to
121 recycle database connections. If ``None`` (the default) then
122 connections won't be recycled (the SQLAlchemy default value
123 of -1 will be used).
125 pool_size (int | None): The number of connections to keep open
126 inside the engine's connection pool.
128 pool_timeout (int | None): The number of seconds to wait when
129 attempting to get a connection from the connection pool.
131 name (str): Name of the SQLProvider, which is used for metric
132 publishing.
134 Raises:
135 ValueError: when ``connection_string`` specifies an in-memory SQLite
136 database.
138 .. _docs: https://docs.sqlalchemy.org/en/14/core/engines.html#use-the-connect-args-dictionary-parameter
140 """
142 def __init__(
143 self,
144 *,
145 connection_string: str | None = None,
146 connection_timeout: int = 5,
147 lock_timeout: int = 5,
148 connect_args: dict[Any, Any] | None = None,
149 max_overflow: int | None = None,
150 pool_pre_ping: bool | None = None,
151 pool_recycle: int | None = None,
152 pool_size: int | None = None,
153 pool_timeout: int | None = None,
154 name: str = "sql-provider",
155 ):
156 """Initialize an SqlProvider."""
157 self._database_tempfile = None
158 # If we don't have a connection string, we'll make a tempfile to use
159 # as an SQLite database. This tempfile needs to exist for the lifetime
160 # of the SqlProvider.
161 if not connection_string:
162 self._database_tempfile = NamedTemporaryFile(prefix="bgd-", suffix=".db")
163 LOGGER.warning(
164 "No connection string specified for the SQL provider, will use SQLite with tempfile.",
165 tags=dict(tempfile=self._database_tempfile.name),
166 )
167 connection_string = f"sqlite:///{self._database_tempfile.name}"
169 # Set up database connection
170 self._session_factory = sessionmaker(future=True)
171 self._scoped_session_factory = scoped_session(self._session_factory)
173 self._engine = self._create_sqlalchemy_engine(
174 connection_string,
175 connection_timeout,
176 lock_timeout=lock_timeout,
177 connect_args=connect_args,
178 max_overflow=max_overflow,
179 pool_pre_ping=pool_pre_ping,
180 pool_recycle=pool_recycle,
181 pool_size=pool_size,
182 pool_timeout=pool_timeout,
183 )
185 # If we're using a temporary file for the database, we need to create the
186 # tables before we can actually use it.
187 if self._database_tempfile is not None:
188 Base.metadata.create_all(self._engine)
190 LOGGER.info("Created SQL provider.", tags=dict(connection=self._engine.url))
192 self._sql_pool_dispose_helper = SQLPoolDisposeHelper(
193 COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS,
194 COOLDOWN_TIME_JITTER_BASE,
195 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES,
196 self._engine,
197 )
199 self._name = name
200 self._num_sessions = 0
201 self._lock = Lock()
203 def _create_sqlalchemy_engine(
204 self,
205 connection_string: str,
206 connection_timeout: int,
207 lock_timeout: int,
208 *,
209 connect_args: dict[Any, Any] | None = None,
210 max_overflow: int | None = None,
211 pool_pre_ping: bool | None = None,
212 pool_recycle: int | None = None,
213 pool_size: int | None = None,
214 pool_timeout: int | None = None,
215 ) -> Engine:
216 """Create the SQLAlchemy Engine.
218 Args:
219 connection_string: The connection string to use when
220 creating the ``Engine``.
222 connection_timeout: The timeout to use for database
223 connections, in seconds. If set as 0, no timeout
224 is applied.
226 lock_timeout (int): The timeout to use when the connection
227 holds a lock in the database. This is supported only if the database
228 backend is PostgresQL.
230 connect_args: Dictionary of DBAPI
231 connection arguments to pass to the engine. See the
232 SQLAlchemy `docs`_ for details.
234 max_overflow: The number of connections to allow
235 as "overflow" connections in the connection pool. This is
236 the number of connections which will be able to be opened
237 above the value of ``pool_size``.
239 pool_pre_ping: Whether or not to test connections
240 for liveness on checkout from the connection pool.
242 pool_recycle: The number of seconds after which to
243 recycle database connections. If ``None`` (the default) then
244 connections won't be recycled (the SQLAlchemy default value
245 of -1 will be used).
247 pool_size: The number of connections to keep open
248 inside the engine's connection pool.
249 If set as zero, no connection pool is created
250 and other pool_* parameters are ignored.
252 pool_timeout: The number of seconds to wait when
253 attempting to get a connection from the connection pool.
255 Returns:
256 A :class:`sqlalchemy.engine.Engine` set up to connect to the
257 database defined by ``connection_string``.
259 Raises:
260 ValueError: when attempting to connect to an in-memory SQLite
261 database.
263 .. _docs: https://docs.sqlalchemy.org/en/14/core/engines.html#use-the-connect-args-dictionary-parameter
265 """
266 # Disallow sqlite in-memory because multi-threaded access to it is
267 # complex and potentially problematic at best
268 # ref: https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#threading-pooling-behavior
269 if is_sqlite_inmemory_connection_string(connection_string):
270 raise ValueError(
271 "Cannot use SQLite in-memory with BuildGrid "
272 f"(connection_string=[{connection_string}]). Use a file or "
273 "leave the connection_string empty for a tempfile."
274 )
276 extra_engine_args: dict[str, Any] = {}
277 if connect_args is not None:
278 extra_engine_args["connect_args"] = connect_args
279 else:
280 extra_engine_args["connect_args"] = {}
282 if connection_timeout > 0:
283 if is_sqlite_connection_string(connection_string):
284 extra_engine_args["connect_args"]["timeout"] = connection_timeout
285 elif is_psycopg2_connection_string(connection_string):
286 extra_engine_args["connect_args"]["connect_timeout"] = connection_timeout
287 if lock_timeout > 0 and is_psycopg2_connection_string(connection_string):
288 # Additional postgres specific timeouts
289 # Additional libpg options
290 # Note that those timeouts are in milliseconds (so *1000)
291 # User might specifically set options... do not override in this case.
292 extra_engine_args["connect_args"].setdefault("options", f"-c lock_timeout={lock_timeout * 1000}")
294 if pool_size is not None and pool_size == 0:
295 LOGGER.debug("No connection pool is created.")
296 extra_engine_args["poolclass"] = NullPool
297 else:
298 if max_overflow is not None:
299 extra_engine_args["max_overflow"] = max_overflow
300 if pool_pre_ping is not None:
301 extra_engine_args["pool_pre_ping"] = pool_pre_ping
302 if pool_recycle is not None:
303 extra_engine_args["pool_recycle"] = pool_recycle
304 if pool_size is not None:
305 extra_engine_args["pool_size"] = pool_size
306 if pool_timeout is not None:
307 extra_engine_args["pool_timeout"] = pool_timeout
309 LOGGER.debug(f"Additional SQLAlchemy Engine args: [{extra_engine_args}]")
311 engine = create_engine(connection_string, echo=False, future=True, **extra_engine_args)
312 self._session_factory.configure(bind=engine)
314 # Register sqlite-specific connection callback.
315 if engine.dialect.name == "sqlite":
316 event.listen(engine, "connect", _sqlite_on_connect)
318 return engine
320 @property
321 def dialect(self) -> str:
322 """The SQL dialect in use by the configured SQL engine."""
323 return self._engine.dialect.name
325 @property
326 def default_inlimit(self) -> int:
327 """Return the default inlimit size based on the current SQL dialect"""
328 return DIALECT_INLIMIT_MAP.get(self.dialect, DEFAULT_INLIMIT)
330 @contextmanager
331 def session(
332 self,
333 *,
334 scoped: bool = False,
335 sqlite_lock_immediately: bool = False,
336 exceptions_to_not_raise_on: list[type[Exception]] | None = None,
337 exceptions_to_not_rollback_on: list[type[Exception]] | None = None,
338 expire_on_commit: bool = True,
339 ) -> Iterator[Session]:
340 """ContextManager yielding an ORM ``Session`` for the configured database.
342 The :class:`sqlalchemy.orm.Session` lives for the duration of the
343 managed context, and any open transaction is committed upon exiting
344 the context.
346 This method can potentially block for a short while before yielding
347 if the underlying connection pool has recently been disposed of and
348 refreshed due to connectivity issues.
350 When ``sqlite_lock_immediately`` is ``True``, the Session will not
351 yield until the database has been locked by entering into a write
352 transaction when using SQLite.
354 If an Exception is raised whilst in the managed context, the ongoing
355 database transaction is rolled back, and the Exception is reraised.
356 Some Exceptions which suggest a transient connection issue with the
357 database lead to a ``RetriableDatabaseError`` being raised from the
358 Exception instead.
360 ``exceptions_to_not_raise_on`` defines a list of SQLAlchemyError types
361 which should be suppressed instead of re-raised when occurring within
362 the managed context.
364 Similarly, ``exceptions_to_not_rollback_on`` defines a list of
365 SQLAlchemyError types which will not trigger a transaction rollback
366 when occuring within the managed context. Instead, the open transaction
367 will be committed and the session closed.
369 Args:
370 scoped: If true, use a ``scoped_session`` factory to create the
371 session. This results in reuse of the underlying Session object
372 in a given thread.
374 sqlite_lock_immediately: If true, execute a ``BEGIN IMMEDIATE``
375 statement as soon as the session is created when using SQLite.
376 This allows locking for the lifetime of the ``Session`` within
377 this ContextManager, enabling similar behaviour to
378 ``SELECT ... FOR UPDATE`` in other dialects. Defaults to
379 ``False``.
381 exceptions_to_not_raise_on: The list of error types to be suppressed
382 within the context rather than re-raised. Defaults to ``None``,
383 meaning all SQLAlchemyErrors will be re-raised.
385 exceptions_to_not_rollback_on: The list
386 of error types which shouldn't trigger a transaction rollback.
387 Defaults to ``None``, meaning all SQLAlchemyErrors will trigger
388 rollback of the transaction.
390 expire_on_commit: Defaults to True. When True, all instances will
391 be fully expired after each commit(), so that all attribute/object
392 access subsequent to a completed transaction will load from
393 the most recent database state. This flag is ignored if
394 ``scoped == True``
396 Yields:
397 A :class:`sqlalchemy.orm.Session` object.
399 Raises:
400 DatabaseError: when a database session cannot be obtained.
402 RetriableDatabaseError: when the database connection is temporarily
403 interrupted, but can be expected to recover.
405 Exception: Any Exception raised within the context will be re-raised
406 unless it's type is included in the ``exceptions_to_not_raise_on``
407 parameter.
409 """
410 if exceptions_to_not_raise_on is None:
411 exceptions_to_not_raise_on = []
412 if exceptions_to_not_rollback_on is None:
413 exceptions_to_not_rollback_on = []
415 factory: "scoped_session[Session] | sessionmaker[Session]" = self._session_factory
416 if scoped:
417 factory = self._scoped_session_factory
419 # If we recently disposed of the SQL pool due to connection issues
420 # ask the client to try again when it's expected to be working again
421 time_til_retry = self._sql_pool_dispose_helper.time_until_active_pool()
422 if time_til_retry > timedelta(seconds=0):
423 raise RetriableDatabaseError(
424 "Database connection was temporarily interrupted, please retry", time_til_retry
425 )
427 # Try to obtain a session
428 try:
429 session = factory() if scoped else factory(expire_on_commit=expire_on_commit)
430 if sqlite_lock_immediately and session.bind.name == "sqlite": # type: ignore
431 session.execute(text("BEGIN IMMEDIATE"))
432 except Exception as e:
433 LOGGER.error("Unable to obtain a database session.", exc_info=True)
434 raise DatabaseError("Unable to obtain a database session.") from e
436 # Yield the session and catch exceptions that occur while using it
437 # to roll-back if needed
438 try:
439 with self._lock:
440 self._num_sessions += 1
441 num_sessions = self._num_sessions
442 publish_gauge_metric(METRIC.SQL.SQL_ACTIVE_SESSION_GAUGE_TEMPLATE.format(name=self._name), num_sessions)
443 publish_counter_metric(METRIC.SQL.SQL_SESSION_COUNT_TEMPLATE.format(name=self._name), 1)
445 yield session
446 session.commit()
447 except Exception as e:
448 transient_dberr = self._sql_pool_dispose_helper.check_dispose_pool(session, e)
449 if type(e) in exceptions_to_not_rollback_on:
450 try:
451 session.commit()
452 except Exception:
453 pass
454 else:
455 session.rollback()
456 if transient_dberr:
457 LOGGER.warning("Rolling back database session due to transient database error.", exc_info=True)
458 else:
459 LOGGER.error("Error committing database session. Rolling back.", exc_info=True)
460 if type(e) not in exceptions_to_not_raise_on:
461 if transient_dberr:
462 # Ask the client to retry when the pool is expected to be healthy again
463 raise RetriableDatabaseError(
464 "Database connection was temporarily interrupted, please retry",
465 self._sql_pool_dispose_helper.time_until_active_pool(),
466 ) from e
467 raise
468 finally:
469 with self._lock:
470 self._num_sessions -= 1
471 session.close()
473 @contextmanager
474 def scoped_session(
475 self,
476 *,
477 sqlite_lock_immediately: bool = False,
478 exceptions_to_not_raise_on: list[type[Exception]] | None = None,
479 exceptions_to_not_rollback_on: list[type[Exception]] | None = None,
480 ) -> Generator[Session, None, None]:
481 """ContextManager providing a thread-local ORM session for the database.
483 This is a shorthand for ``SqlProvider.session(scoped=True)``.
485 This ContextManager provides a reusable thread-local
486 :class:`sqlalchemy.orm.Session` object. Once the ``Session`` has been
487 created by the initial call, subsequent calls to this method from
488 within a given thread will return the same ``Session`` object until
489 :meth:`SqlProvider.remove_scoped_session` is called.
491 Args:
492 See :meth:`SqlProvider.session` for further details.
494 Yields:
495 A persistent thread-local :class:`sqlalchemy.orm.Session`.
497 """
498 with self.session(
499 scoped=True,
500 sqlite_lock_immediately=sqlite_lock_immediately,
501 exceptions_to_not_raise_on=exceptions_to_not_raise_on,
502 exceptions_to_not_rollback_on=exceptions_to_not_rollback_on,
503 ) as session:
504 yield session
506 def remove_scoped_session(self) -> None:
507 """Remove the thread-local session, if any."""
508 self._scoped_session_factory.remove()