Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/sql/provider.py: 92.25%

129 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-28 16:48 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from contextlib import contextmanager 

17from datetime import timedelta 

18from tempfile import NamedTemporaryFile 

19from threading import Lock 

20from typing import Any, Generator, Iterator 

21 

22from sqlalchemy import create_engine, event, text 

23from sqlalchemy.engine import Engine 

24from sqlalchemy.orm import Session, scoped_session, sessionmaker 

25from sqlalchemy.pool import NullPool 

26 

27from buildgrid.server.exceptions import DatabaseError, RetriableDatabaseError 

28from buildgrid.server.logging import buildgrid_logger 

29from buildgrid.server.metrics_names import METRIC 

30from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric 

31from buildgrid.server.settings import ( 

32 COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS, 

33 COOLDOWN_TIME_JITTER_BASE, 

34 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, 

35) 

36from buildgrid.server.sql.models import Base 

37 

38from .utils import ( 

39 SQLPoolDisposeHelper, 

40 is_psycopg2_connection_string, 

41 is_sqlite_connection_string, 

42 is_sqlite_inmemory_connection_string, 

43) 

44 

45LOGGER = buildgrid_logger(__name__) 

46 

47# Each dialect has a limit on the number of bind parameters allowed. This 

48# matters because it determines how large we can allow our IN clauses to get. 

49# 

50# SQLite: 1000 https://www.sqlite.org/limits.html#max_variable_number 

51# PostgreSQL: 32767 (Int16.MAX_VALUE) https://www.postgresql.org/docs/9.4/protocol-message-formats.html 

52# 

53# We'll refer to this as the "inlimit" in the code. The inlimits are 

54# set to 75% of the bind parameter limit of the implementation. 

55DIALECT_INLIMIT_MAP = {"postgresql": 24000, "sqlite": 750} 

56DEFAULT_INLIMIT = 100 

57 

58 

59# NOTE: Obviously these type annotations are useless, but sadly they're what 

60# is in the upstream sqlalchemy2-stubs[0]. 

61# 

62# Once we upgrade to SQLAlchemy 2.0 we can make these more useful, as that 

63# version of SQLAlchemy has sensible type annotations[1]. 

64# 

65# [0]: https://github.com/sqlalchemy/sqlalchemy2-stubs/blob/main/sqlalchemy-stubs/pool/events.pyi#L9 

66# [1]: https://github.com/sqlalchemy/sqlalchemy/blob/main/lib/sqlalchemy/pool/events.py#L96-L100 

67def _sqlite_on_connect(conn: Any, record: Any) -> None: 

68 """SQLite ``PRAGMA`` statements to execute immediately upon connection. 

69 

70 These statements configure the behaviour of the database, and are specific 

71 to SQLite. 

72 

73 See https://www.sqlite.org/pragma.html for details. 

74 

75 Args: 

76 conn (DBAPIConnection): The DBAPI connection that was just connected. 

77 record (_ConnectionRecord): The connection record which contains the 

78 DBAPI connection. 

79 

80 """ 

81 # Use journal_mode=WAL to allow read/write concurrency, as well as the 

82 # performance improvements it brings. 

83 conn.execute("PRAGMA journal_mode=WAL") 

84 conn.execute("PRAGMA synchronous=NORMAL") 

85 

86 

87class SqlProvider: 

88 """Class which provides an interface for interacting with an SQL database. 

89 

90 This class is used to allow configuration of a per-process SQL connection 

91 pool, which can then be shared across multiple components of BuildGrid 

92 which require an SQL connection. 

93 

94 Args: 

95 connection_string (str | None): The connection string to use when 

96 creating a database connection. If ``None`` then a temporary 

97 SQLite database will be created for the lifetime of this 

98 object. 

99 

100 connection_timeout (int): The timeout to use when attempting to 

101 connect to the database, in seconds. Defaults to 5 seconds if 

102 unset. 

103 

104 lock_timeout (int): The timeout to use when the connection 

105 holds a lock in the database. This is supported only if the database 

106 backend is PostgresQL. 

107 

108 connect_args (dict[str, Any] | None): Dictionary of DBAPI 

109 connection arguments to pass to the engine. See the 

110 SQLAlchemy `docs`_ for details. 

111 

112 max_overflow (int | None): The number of connections to allow 

113 as "overflow" connections in the connection pool. This is 

114 the number of connections which will be able to be opened 

115 above the value of ``pool_size``. 

116 

117 pool_pre_ping (bool | None): Whether or not to test connections 

118 for liveness on checkout from the connection pool. 

119 

120 pool_recycle (int | None): The number of seconds after which to 

121 recycle database connections. If ``None`` (the default) then 

122 connections won't be recycled (the SQLAlchemy default value 

123 of -1 will be used). 

124 

125 pool_size (int | None): The number of connections to keep open 

126 inside the engine's connection pool. 

127 

128 pool_timeout (int | None): The number of seconds to wait when 

129 attempting to get a connection from the connection pool. 

130 

131 name (str): Name of the SQLProvider, which is used for metric 

132 publishing. 

133 

134 Raises: 

135 ValueError: when ``connection_string`` specifies an in-memory SQLite 

136 database. 

137 

138 .. _docs: https://docs.sqlalchemy.org/en/14/core/engines.html#use-the-connect-args-dictionary-parameter 

139 

140 """ 

141 

142 def __init__( 

143 self, 

144 *, 

145 connection_string: str | None = None, 

146 connection_timeout: int = 5, 

147 lock_timeout: int = 5, 

148 connect_args: dict[Any, Any] | None = None, 

149 max_overflow: int | None = None, 

150 pool_pre_ping: bool | None = None, 

151 pool_recycle: int | None = None, 

152 pool_size: int | None = None, 

153 pool_timeout: int | None = None, 

154 name: str = "sql-provider", 

155 ): 

156 """Initialize an SqlProvider.""" 

157 self._database_tempfile = None 

158 # If we don't have a connection string, we'll make a tempfile to use 

159 # as an SQLite database. This tempfile needs to exist for the lifetime 

160 # of the SqlProvider. 

161 if not connection_string: 

162 self._database_tempfile = NamedTemporaryFile(prefix="bgd-", suffix=".db") 

163 LOGGER.warning( 

164 "No connection string specified for the SQL provider, will use SQLite with tempfile.", 

165 tags=dict(tempfile=self._database_tempfile.name), 

166 ) 

167 connection_string = f"sqlite:///{self._database_tempfile.name}" 

168 

169 # Set up database connection 

170 self._session_factory = sessionmaker(future=True) 

171 self._scoped_session_factory = scoped_session(self._session_factory) 

172 

173 self._engine = self._create_sqlalchemy_engine( 

174 connection_string, 

175 connection_timeout, 

176 lock_timeout=lock_timeout, 

177 connect_args=connect_args, 

178 max_overflow=max_overflow, 

179 pool_pre_ping=pool_pre_ping, 

180 pool_recycle=pool_recycle, 

181 pool_size=pool_size, 

182 pool_timeout=pool_timeout, 

183 ) 

184 

185 # If we're using a temporary file for the database, we need to create the 

186 # tables before we can actually use it. 

187 if self._database_tempfile is not None: 

188 Base.metadata.create_all(self._engine) 

189 

190 LOGGER.info("Created SQL provider.", tags=dict(connection=self._engine.url)) 

191 

192 self._sql_pool_dispose_helper = SQLPoolDisposeHelper( 

193 COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS, 

194 COOLDOWN_TIME_JITTER_BASE, 

195 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, 

196 self._engine, 

197 ) 

198 

199 self._name = name 

200 self._num_sessions = 0 

201 self._lock = Lock() 

202 

203 def _create_sqlalchemy_engine( 

204 self, 

205 connection_string: str, 

206 connection_timeout: int, 

207 lock_timeout: int, 

208 *, 

209 connect_args: dict[Any, Any] | None = None, 

210 max_overflow: int | None = None, 

211 pool_pre_ping: bool | None = None, 

212 pool_recycle: int | None = None, 

213 pool_size: int | None = None, 

214 pool_timeout: int | None = None, 

215 ) -> Engine: 

216 """Create the SQLAlchemy Engine. 

217 

218 Args: 

219 connection_string: The connection string to use when 

220 creating the ``Engine``. 

221 

222 connection_timeout: The timeout to use for database 

223 connections, in seconds. If set as 0, no timeout 

224 is applied. 

225 

226 lock_timeout (int): The timeout to use when the connection 

227 holds a lock in the database. This is supported only if the database 

228 backend is PostgresQL. 

229 

230 connect_args: Dictionary of DBAPI 

231 connection arguments to pass to the engine. See the 

232 SQLAlchemy `docs`_ for details. 

233 

234 max_overflow: The number of connections to allow 

235 as "overflow" connections in the connection pool. This is 

236 the number of connections which will be able to be opened 

237 above the value of ``pool_size``. 

238 

239 pool_pre_ping: Whether or not to test connections 

240 for liveness on checkout from the connection pool. 

241 

242 pool_recycle: The number of seconds after which to 

243 recycle database connections. If ``None`` (the default) then 

244 connections won't be recycled (the SQLAlchemy default value 

245 of -1 will be used). 

246 

247 pool_size: The number of connections to keep open 

248 inside the engine's connection pool. 

249 If set as zero, no connection pool is created 

250 and other pool_* parameters are ignored. 

251 

252 pool_timeout: The number of seconds to wait when 

253 attempting to get a connection from the connection pool. 

254 

255 Returns: 

256 A :class:`sqlalchemy.engine.Engine` set up to connect to the 

257 database defined by ``connection_string``. 

258 

259 Raises: 

260 ValueError: when attempting to connect to an in-memory SQLite 

261 database. 

262 

263 .. _docs: https://docs.sqlalchemy.org/en/14/core/engines.html#use-the-connect-args-dictionary-parameter 

264 

265 """ 

266 # Disallow sqlite in-memory because multi-threaded access to it is 

267 # complex and potentially problematic at best 

268 # ref: https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#threading-pooling-behavior 

269 if is_sqlite_inmemory_connection_string(connection_string): 

270 raise ValueError( 

271 "Cannot use SQLite in-memory with BuildGrid " 

272 f"(connection_string=[{connection_string}]). Use a file or " 

273 "leave the connection_string empty for a tempfile." 

274 ) 

275 

276 extra_engine_args: dict[str, Any] = {} 

277 if connect_args is not None: 

278 extra_engine_args["connect_args"] = connect_args 

279 else: 

280 extra_engine_args["connect_args"] = {} 

281 

282 if connection_timeout > 0: 

283 if is_sqlite_connection_string(connection_string): 

284 extra_engine_args["connect_args"]["timeout"] = connection_timeout 

285 elif is_psycopg2_connection_string(connection_string): 

286 extra_engine_args["connect_args"]["connect_timeout"] = connection_timeout 

287 if lock_timeout > 0 and is_psycopg2_connection_string(connection_string): 

288 # Additional postgres specific timeouts 

289 # Additional libpg options 

290 # Note that those timeouts are in milliseconds (so *1000) 

291 # User might specifically set options... do not override in this case. 

292 extra_engine_args["connect_args"].setdefault("options", f"-c lock_timeout={lock_timeout * 1000}") 

293 

294 if pool_size is not None and pool_size == 0: 

295 LOGGER.debug("No connection pool is created.") 

296 extra_engine_args["poolclass"] = NullPool 

297 else: 

298 if max_overflow is not None: 

299 extra_engine_args["max_overflow"] = max_overflow 

300 if pool_pre_ping is not None: 

301 extra_engine_args["pool_pre_ping"] = pool_pre_ping 

302 if pool_recycle is not None: 

303 extra_engine_args["pool_recycle"] = pool_recycle 

304 if pool_size is not None: 

305 extra_engine_args["pool_size"] = pool_size 

306 if pool_timeout is not None: 

307 extra_engine_args["pool_timeout"] = pool_timeout 

308 

309 LOGGER.debug(f"Additional SQLAlchemy Engine args: [{extra_engine_args}]") 

310 

311 engine = create_engine(connection_string, echo=False, future=True, **extra_engine_args) 

312 self._session_factory.configure(bind=engine) 

313 

314 # Register sqlite-specific connection callback. 

315 if engine.dialect.name == "sqlite": 

316 event.listen(engine, "connect", _sqlite_on_connect) 

317 

318 return engine 

319 

320 @property 

321 def dialect(self) -> str: 

322 """The SQL dialect in use by the configured SQL engine.""" 

323 return self._engine.dialect.name 

324 

325 @property 

326 def default_inlimit(self) -> int: 

327 """Return the default inlimit size based on the current SQL dialect""" 

328 return DIALECT_INLIMIT_MAP.get(self.dialect, DEFAULT_INLIMIT) 

329 

330 @contextmanager 

331 def session( 

332 self, 

333 *, 

334 scoped: bool = False, 

335 sqlite_lock_immediately: bool = False, 

336 exceptions_to_not_raise_on: list[type[Exception]] | None = None, 

337 exceptions_to_not_rollback_on: list[type[Exception]] | None = None, 

338 expire_on_commit: bool = True, 

339 ) -> Iterator[Session]: 

340 """ContextManager yielding an ORM ``Session`` for the configured database. 

341 

342 The :class:`sqlalchemy.orm.Session` lives for the duration of the 

343 managed context, and any open transaction is committed upon exiting 

344 the context. 

345 

346 This method can potentially block for a short while before yielding 

347 if the underlying connection pool has recently been disposed of and 

348 refreshed due to connectivity issues. 

349 

350 When ``sqlite_lock_immediately`` is ``True``, the Session will not 

351 yield until the database has been locked by entering into a write 

352 transaction when using SQLite. 

353 

354 If an Exception is raised whilst in the managed context, the ongoing 

355 database transaction is rolled back, and the Exception is reraised. 

356 Some Exceptions which suggest a transient connection issue with the 

357 database lead to a ``RetriableDatabaseError`` being raised from the 

358 Exception instead. 

359 

360 ``exceptions_to_not_raise_on`` defines a list of SQLAlchemyError types 

361 which should be suppressed instead of re-raised when occurring within 

362 the managed context. 

363 

364 Similarly, ``exceptions_to_not_rollback_on`` defines a list of 

365 SQLAlchemyError types which will not trigger a transaction rollback 

366 when occuring within the managed context. Instead, the open transaction 

367 will be committed and the session closed. 

368 

369 Args: 

370 scoped: If true, use a ``scoped_session`` factory to create the 

371 session. This results in reuse of the underlying Session object 

372 in a given thread. 

373 

374 sqlite_lock_immediately: If true, execute a ``BEGIN IMMEDIATE`` 

375 statement as soon as the session is created when using SQLite. 

376 This allows locking for the lifetime of the ``Session`` within 

377 this ContextManager, enabling similar behaviour to 

378 ``SELECT ... FOR UPDATE`` in other dialects. Defaults to 

379 ``False``. 

380 

381 exceptions_to_not_raise_on: The list of error types to be suppressed 

382 within the context rather than re-raised. Defaults to ``None``, 

383 meaning all SQLAlchemyErrors will be re-raised. 

384 

385 exceptions_to_not_rollback_on: The list 

386 of error types which shouldn't trigger a transaction rollback. 

387 Defaults to ``None``, meaning all SQLAlchemyErrors will trigger 

388 rollback of the transaction. 

389 

390 expire_on_commit: Defaults to True. When True, all instances will 

391 be fully expired after each commit(), so that all attribute/object 

392 access subsequent to a completed transaction will load from 

393 the most recent database state. This flag is ignored if 

394 ``scoped == True`` 

395 

396 Yields: 

397 A :class:`sqlalchemy.orm.Session` object. 

398 

399 Raises: 

400 DatabaseError: when a database session cannot be obtained. 

401 

402 RetriableDatabaseError: when the database connection is temporarily 

403 interrupted, but can be expected to recover. 

404 

405 Exception: Any Exception raised within the context will be re-raised 

406 unless it's type is included in the ``exceptions_to_not_raise_on`` 

407 parameter. 

408 

409 """ 

410 if exceptions_to_not_raise_on is None: 

411 exceptions_to_not_raise_on = [] 

412 if exceptions_to_not_rollback_on is None: 

413 exceptions_to_not_rollback_on = [] 

414 

415 factory: "scoped_session[Session] | sessionmaker[Session]" = self._session_factory 

416 if scoped: 

417 factory = self._scoped_session_factory 

418 

419 # If we recently disposed of the SQL pool due to connection issues 

420 # ask the client to try again when it's expected to be working again 

421 time_til_retry = self._sql_pool_dispose_helper.time_until_active_pool() 

422 if time_til_retry > timedelta(seconds=0): 

423 raise RetriableDatabaseError( 

424 "Database connection was temporarily interrupted, please retry", time_til_retry 

425 ) 

426 

427 # Try to obtain a session 

428 try: 

429 session = factory() if scoped else factory(expire_on_commit=expire_on_commit) 

430 if sqlite_lock_immediately and session.bind.name == "sqlite": # type: ignore 

431 session.execute(text("BEGIN IMMEDIATE")) 

432 except Exception as e: 

433 LOGGER.error("Unable to obtain a database session.", exc_info=True) 

434 raise DatabaseError("Unable to obtain a database session.") from e 

435 

436 # Yield the session and catch exceptions that occur while using it 

437 # to roll-back if needed 

438 try: 

439 with self._lock: 

440 self._num_sessions += 1 

441 num_sessions = self._num_sessions 

442 publish_gauge_metric(METRIC.SQL.SQL_ACTIVE_SESSION_GAUGE_TEMPLATE.format(name=self._name), num_sessions) 

443 publish_counter_metric(METRIC.SQL.SQL_SESSION_COUNT_TEMPLATE.format(name=self._name), 1) 

444 

445 yield session 

446 session.commit() 

447 except Exception as e: 

448 transient_dberr = self._sql_pool_dispose_helper.check_dispose_pool(session, e) 

449 if type(e) in exceptions_to_not_rollback_on: 

450 try: 

451 session.commit() 

452 except Exception: 

453 pass 

454 else: 

455 session.rollback() 

456 if transient_dberr: 

457 LOGGER.warning("Rolling back database session due to transient database error.", exc_info=True) 

458 else: 

459 LOGGER.error("Error committing database session. Rolling back.", exc_info=True) 

460 if type(e) not in exceptions_to_not_raise_on: 

461 if transient_dberr: 

462 # Ask the client to retry when the pool is expected to be healthy again 

463 raise RetriableDatabaseError( 

464 "Database connection was temporarily interrupted, please retry", 

465 self._sql_pool_dispose_helper.time_until_active_pool(), 

466 ) from e 

467 raise 

468 finally: 

469 with self._lock: 

470 self._num_sessions -= 1 

471 session.close() 

472 

473 @contextmanager 

474 def scoped_session( 

475 self, 

476 *, 

477 sqlite_lock_immediately: bool = False, 

478 exceptions_to_not_raise_on: list[type[Exception]] | None = None, 

479 exceptions_to_not_rollback_on: list[type[Exception]] | None = None, 

480 ) -> Generator[Session, None, None]: 

481 """ContextManager providing a thread-local ORM session for the database. 

482 

483 This is a shorthand for ``SqlProvider.session(scoped=True)``. 

484 

485 This ContextManager provides a reusable thread-local 

486 :class:`sqlalchemy.orm.Session` object. Once the ``Session`` has been 

487 created by the initial call, subsequent calls to this method from 

488 within a given thread will return the same ``Session`` object until 

489 :meth:`SqlProvider.remove_scoped_session` is called. 

490 

491 Args: 

492 See :meth:`SqlProvider.session` for further details. 

493 

494 Yields: 

495 A persistent thread-local :class:`sqlalchemy.orm.Session`. 

496 

497 """ 

498 with self.session( 

499 scoped=True, 

500 sqlite_lock_immediately=sqlite_lock_immediately, 

501 exceptions_to_not_raise_on=exceptions_to_not_raise_on, 

502 exceptions_to_not_rollback_on=exceptions_to_not_rollback_on, 

503 ) as session: 

504 yield session 

505 

506 def remove_scoped_session(self) -> None: 

507 """Remove the thread-local session, if any.""" 

508 self._scoped_session_factory.remove()