Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/sql/provider.py: 89.68%

126 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from contextlib import contextmanager 

17from datetime import timedelta 

18from tempfile import NamedTemporaryFile 

19from typing import Any, Dict, Generator, Iterator, List, Optional, Type, Union, cast 

20 

21from alembic import command 

22from alembic.config import Config 

23from importlib_resources import files 

24from sqlalchemy import create_engine, event, text 

25from sqlalchemy.engine import Engine 

26from sqlalchemy.orm import Session, scoped_session, sessionmaker 

27from sqlalchemy.pool import NullPool 

28 

29from buildgrid.server.exceptions import DatabaseError, RetriableDatabaseError 

30from buildgrid.server.logging import buildgrid_logger 

31from buildgrid.server.settings import ( 

32 COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS, 

33 COOLDOWN_TIME_JITTER_BASE, 

34 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, 

35) 

36 

37from .utils import ( 

38 SQLPoolDisposeHelper, 

39 is_psycopg2_connection_string, 

40 is_sqlite_connection_string, 

41 is_sqlite_inmemory_connection_string, 

42) 

43 

44LOGGER = buildgrid_logger(__name__) 

45 

46# Each dialect has a limit on the number of bind parameters allowed. This 

47# matters because it determines how large we can allow our IN clauses to get. 

48# 

49# SQLite: 1000 https://www.sqlite.org/limits.html#max_variable_number 

50# PostgreSQL: 32767 (Int16.MAX_VALUE) https://www.postgresql.org/docs/9.4/protocol-message-formats.html 

51# 

52# We'll refer to this as the "inlimit" in the code. The inlimits are 

53# set to 75% of the bind parameter limit of the implementation. 

54DIALECT_INLIMIT_MAP = {"postgresql": 24000, "sqlite": 750} 

55DEFAULT_INLIMIT = 100 

56 

57 

58# NOTE: Obviously these type annotations are useless, but sadly they're what 

59# is in the upstream sqlalchemy2-stubs[0]. 

60# 

61# Once we upgrade to SQLAlchemy 2.0 we can make these more useful, as that 

62# version of SQLAlchemy has sensible type annotations[1]. 

63# 

64# [0]: https://github.com/sqlalchemy/sqlalchemy2-stubs/blob/main/sqlalchemy-stubs/pool/events.pyi#L9 

65# [1]: https://github.com/sqlalchemy/sqlalchemy/blob/main/lib/sqlalchemy/pool/events.py#L96-L100 

66def _sqlite_on_connect(conn: Any, record: Any) -> None: 

67 """SQLite ``PRAGMA`` statements to execute immediately upon connection. 

68 

69 These statements configure the behaviour of the database, and are specific 

70 to SQLite. 

71 

72 See https://www.sqlite.org/pragma.html for details. 

73 

74 Args: 

75 conn (DBAPIConnection): The DBAPI connection that was just connected. 

76 record (_ConnectionRecord): The connection record which contains the 

77 DBAPI connection. 

78 

79 """ 

80 # Use journal_mode=WAL to allow read/write concurrency, as well as the 

81 # performance improvements it brings. 

82 conn.execute("PRAGMA journal_mode=WAL") 

83 conn.execute("PRAGMA synchronous=NORMAL") 

84 

85 

86class SqlProvider: 

87 """Class which provides an interface for interacting with an SQL database. 

88 

89 This class is used to allow configuration of a per-process SQL connection 

90 pool, which can then be shared across multiple components of BuildGrid 

91 which require an SQL connection. 

92 

93 Args: 

94 automigrate (bool): Whether or not to ensure the database is fully 

95 migrated when starting up. Defaults to ``False``, meaning the 

96 database is assumed to be at the up to date already. 

97 

98 connection_string (str | None): The connection string to use when 

99 creating a database connection. If ``None`` then a temporary 

100 SQLite database will be created for the lifetime of this 

101 object. 

102 

103 connection_timeout (int): The timeout to use when attempting to 

104 connect to the database, in seconds. Defaults to 5 seconds if 

105 unset. 

106 

107 lock_timeout (int): The timeout to use when the connection 

108 holds a lock in the database. This is supported only if the database 

109 backend is PostgresQL. 

110 

111 connect_args (Dict[str, Any] | None): Dictionary of DBAPI 

112 connection arguments to pass to the engine. See the 

113 SQLAlchemy `docs`_ for details. 

114 

115 max_overflow (int | None): The number of connections to allow 

116 as "overflow" connections in the connection pool. This is 

117 the number of connections which will be able to be opened 

118 above the value of ``pool_size``. 

119 

120 pool_pre_ping (bool | None): Whether or not to test connections 

121 for liveness on checkout from the connection pool. 

122 

123 pool_recycle (int | None): The number of seconds after which to 

124 recycle database connections. If ``None`` (the default) then 

125 connections won't be recycled (the SQLAlchemy default value 

126 of -1 will be used). 

127 

128 pool_size (int | None): The number of connections to keep open 

129 inside the engine's connection pool. 

130 

131 pool_timeout (int | None): The number of seconds to wait when 

132 attempting to get a connection from the connection pool. 

133 

134 Raises: 

135 ValueError: when ``connection_string`` specifies an in-memory SQLite 

136 database. 

137 

138 .. _docs: https://docs.sqlalchemy.org/en/14/core/engines.html#use-the-connect-args-dictionary-parameter 

139 

140 """ 

141 

142 def __init__( 

143 self, 

144 *, 

145 automigrate: bool = False, 

146 connection_string: Optional[str] = None, 

147 connection_timeout: int = 5, 

148 lock_timeout: int = 5, 

149 connect_args: Optional[Dict[Any, Any]] = None, 

150 max_overflow: Optional[int] = None, 

151 pool_pre_ping: Optional[bool] = None, 

152 pool_recycle: Optional[int] = None, 

153 pool_size: Optional[int] = None, 

154 pool_timeout: Optional[int] = None, 

155 ): 

156 """Initialize an SqlProvider.""" 

157 self._database_tempfile = None 

158 # If we don't have a connection string, we'll make a tempfile to use 

159 # as an SQLite database. This tempfile needs to exist for the lifetime 

160 # of the SqlProvider. 

161 if not connection_string: 

162 self._database_tempfile = NamedTemporaryFile(prefix="bgd-", suffix=".db") 

163 LOGGER.warning( 

164 "No connection string specified for the SQL provider, will use SQLite with tempfile.", 

165 tags=dict(tempfile=self._database_tempfile.name), 

166 ) 

167 automigrate = True # since this is a temporary database, we always need to create it 

168 connection_string = f"sqlite:///{self._database_tempfile.name}" 

169 

170 # Set up database connection 

171 self._session_factory = sessionmaker(future=True) 

172 self._scoped_session_factory = scoped_session(self._session_factory) 

173 

174 self._engine = self._create_sqlalchemy_engine( 

175 connection_string, 

176 connection_timeout, 

177 lock_timeout=lock_timeout, 

178 connect_args=connect_args, 

179 max_overflow=max_overflow, 

180 pool_pre_ping=pool_pre_ping, 

181 pool_recycle=pool_recycle, 

182 pool_size=pool_size, 

183 pool_timeout=pool_timeout, 

184 ) 

185 

186 LOGGER.info("Created SQL provider.", tags=dict(automigrate=automigrate, connection=self._engine.url)) 

187 

188 if automigrate: 

189 self._create_or_migrate_db() 

190 

191 self._sql_pool_dispose_helper = SQLPoolDisposeHelper( 

192 COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS, 

193 COOLDOWN_TIME_JITTER_BASE, 

194 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, 

195 self._engine, 

196 ) 

197 

198 def _create_sqlalchemy_engine( 

199 self, 

200 connection_string: str, 

201 connection_timeout: int, 

202 lock_timeout: int, 

203 *, 

204 connect_args: Optional[Dict[Any, Any]] = None, 

205 max_overflow: Optional[int] = None, 

206 pool_pre_ping: Optional[bool] = None, 

207 pool_recycle: Optional[int] = None, 

208 pool_size: Optional[int] = None, 

209 pool_timeout: Optional[int] = None, 

210 ) -> Engine: 

211 """Create the SQLAlchemy Engine. 

212 

213 Args: 

214 connection_string: The connection string to use when 

215 creating the ``Engine``. 

216 

217 connection_timeout: The timeout to use for database 

218 connections, in seconds. If set as 0, no timeout 

219 is applied. 

220 

221 lock_timeout (int): The timeout to use when the connection 

222 holds a lock in the database. This is supported only if the database 

223 backend is PostgresQL. 

224 

225 connect_args: Dictionary of DBAPI 

226 connection arguments to pass to the engine. See the 

227 SQLAlchemy `docs`_ for details. 

228 

229 max_overflow: The number of connections to allow 

230 as "overflow" connections in the connection pool. This is 

231 the number of connections which will be able to be opened 

232 above the value of ``pool_size``. 

233 

234 pool_pre_ping: Whether or not to test connections 

235 for liveness on checkout from the connection pool. 

236 

237 pool_recycle: The number of seconds after which to 

238 recycle database connections. If ``None`` (the default) then 

239 connections won't be recycled (the SQLAlchemy default value 

240 of -1 will be used). 

241 

242 pool_size: The number of connections to keep open 

243 inside the engine's connection pool. 

244 If set as zero, no connection pool is created 

245 and other pool_* parameters are ignored. 

246 

247 pool_timeout: The number of seconds to wait when 

248 attempting to get a connection from the connection pool. 

249 

250 Returns: 

251 A :class:`sqlalchemy.engine.Engine` set up to connect to the 

252 database defined by ``connection_string``. 

253 

254 Raises: 

255 ValueError: when attempting to connect to an in-memory SQLite 

256 database. 

257 

258 .. _docs: https://docs.sqlalchemy.org/en/14/core/engines.html#use-the-connect-args-dictionary-parameter 

259 

260 """ 

261 # Disallow sqlite in-memory because multi-threaded access to it is 

262 # complex and potentially problematic at best 

263 # ref: https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#threading-pooling-behavior 

264 if is_sqlite_inmemory_connection_string(connection_string): 

265 raise ValueError( 

266 "Cannot use SQLite in-memory with BuildGrid " 

267 f"(connection_string=[{connection_string}]). Use a file or " 

268 "leave the connection_string empty for a tempfile." 

269 ) 

270 

271 extra_engine_args: Dict[str, Any] = {} 

272 if connect_args is not None: 

273 extra_engine_args["connect_args"] = connect_args 

274 else: 

275 extra_engine_args["connect_args"] = {} 

276 

277 if connection_timeout > 0: 

278 if is_sqlite_connection_string(connection_string): 

279 extra_engine_args["connect_args"]["timeout"] = connection_timeout 

280 elif is_psycopg2_connection_string(connection_string): 

281 extra_engine_args["connect_args"]["connect_timeout"] = connection_timeout 

282 if lock_timeout > 0 and is_psycopg2_connection_string(connection_string): 

283 # Additional postgres specific timeouts 

284 # Additional libpg options 

285 # Note that those timeouts are in milliseconds (so *1000) 

286 # User might specifically set options... do not override in this case. 

287 extra_engine_args["connect_args"].setdefault("options", f"-c lock_timeout={lock_timeout * 1000}") 

288 

289 if pool_size is not None and pool_size == 0: 

290 LOGGER.debug("No connection pool is created.") 

291 extra_engine_args["poolclass"] = NullPool 

292 else: 

293 if max_overflow is not None: 

294 extra_engine_args["max_overflow"] = max_overflow 

295 if pool_pre_ping is not None: 

296 extra_engine_args["pool_pre_ping"] = pool_pre_ping 

297 if pool_recycle is not None: 

298 extra_engine_args["pool_recycle"] = pool_recycle 

299 if pool_size is not None: 

300 extra_engine_args["pool_size"] = pool_size 

301 if pool_timeout is not None: 

302 extra_engine_args["pool_timeout"] = pool_timeout 

303 

304 LOGGER.debug(f"Additional SQLAlchemy Engine args: [{extra_engine_args}]") 

305 

306 engine = create_engine(connection_string, echo=False, future=True, **extra_engine_args) 

307 self._session_factory.configure(bind=engine) 

308 

309 # Register sqlite-specific connection callback. 

310 if engine.dialect.name == "sqlite": 

311 event.listen(engine, "connect", _sqlite_on_connect) 

312 

313 return cast(Engine, engine) 

314 

315 def _create_or_migrate_db(self) -> None: 

316 """Ensure that the database schema is up to date. 

317 

318 This method runs the Alembic ``upgrade`` command to ensure that all of 

319 the database migrations have been run and the schema is up to date. 

320 

321 .. warning:: 

322 

323 Do not run this method concurrently against a single database, at 

324 risk of migrations conflicting with themselves and causing at least 

325 one of the callers to fail. 

326 

327 """ 

328 LOGGER.warning("Will attempt migration to latest version if needed.") 

329 

330 config: Config = Config() 

331 config.set_main_option("script_location", str(files("buildgrid.server.sql").joinpath("alembic"))) 

332 

333 with self._engine.begin() as connection: 

334 # NOTE: pylint doesn't like this for some reason, but it is the 

335 # documented way to set the connection. 

336 # https://alembic.sqlalchemy.org/en/latest/api/config.html#alembic.config.Config 

337 config.attributes["connection"] = connection 

338 command.upgrade(config, "head") 

339 

340 @property 

341 def dialect(self) -> str: 

342 """The SQL dialect in use by the configured SQL engine.""" 

343 return self._engine.dialect.name 

344 

345 @property 

346 def default_inlimit(self) -> int: 

347 """Return the default inlimit size based on the current SQL dialect""" 

348 return DIALECT_INLIMIT_MAP.get(self.dialect, DEFAULT_INLIMIT) 

349 

350 @contextmanager 

351 def session( 

352 self, 

353 *, 

354 scoped: bool = False, 

355 sqlite_lock_immediately: bool = False, 

356 exceptions_to_not_raise_on: Optional[List[Type[Exception]]] = None, 

357 exceptions_to_not_rollback_on: Optional[List[Type[Exception]]] = None, 

358 expire_on_commit: bool = True, 

359 ) -> Iterator[Session]: 

360 """ContextManager yielding an ORM ``Session`` for the configured database. 

361 

362 The :class:`sqlalchemy.orm.Session` lives for the duration of the 

363 managed context, and any open transaction is committed upon exiting 

364 the context. 

365 

366 This method can potentially block for a short while before yielding 

367 if the underlying connection pool has recently been disposed of and 

368 refreshed due to connectivity issues. 

369 

370 When ``sqlite_lock_immediately`` is ``True``, the Session will not 

371 yield until the database has been locked by entering into a write 

372 transaction when using SQLite. 

373 

374 If an Exception is raised whilst in the managed context, the ongoing 

375 database transaction is rolled back, and the Exception is reraised. 

376 Some Exceptions which suggest a transient connection issue with the 

377 database lead to a ``RetriableDatabaseError`` being raised from the 

378 Exception instead. 

379 

380 ``exceptions_to_not_raise_on`` defines a list of SQLAlchemyError types 

381 which should be suppressed instead of re-raised when occurring within 

382 the managed context. 

383 

384 Similarly, ``exceptions_to_not_rollback_on`` defines a list of 

385 SQLAlchemyError types which will not trigger a transaction rollback 

386 when occuring within the managed context. Instead, the open transaction 

387 will be committed and the session closed. 

388 

389 Args: 

390 scoped: If true, use a ``scoped_session`` factory to create the 

391 session. This results in reuse of the underlying Session object 

392 in a given thread. 

393 

394 sqlite_lock_immediately: If true, execute a ``BEGIN IMMEDIATE`` 

395 statement as soon as the session is created when using SQLite. 

396 This allows locking for the lifetime of the ``Session`` within 

397 this ContextManager, enabling similar behaviour to 

398 ``SELECT ... FOR UPDATE`` in other dialects. Defaults to 

399 ``False``. 

400 

401 exceptions_to_not_raise_on: The list of error types to be suppressed 

402 within the context rather than re-raised. Defaults to ``None``, 

403 meaning all SQLAlchemyErrors will be re-raised. 

404 

405 exceptions_to_not_rollback_on: The list 

406 of error types which shouldn't trigger a transaction rollback. 

407 Defaults to ``None``, meaning all SQLAlchemyErrors will trigger 

408 rollback of the transaction. 

409 

410 expire_on_commit: Defaults to True. When True, all instances will 

411 be fully expired after each commit(), so that all attribute/object 

412 access subsequent to a completed transaction will load from 

413 the most recent database state. This flag is ignored if 

414 ``scoped == True`` 

415 

416 Yields: 

417 A :class:`sqlalchemy.orm.Session` object. 

418 

419 Raises: 

420 DatabaseError: when a database session cannot be obtained. 

421 

422 RetriableDatabaseError: when the database connection is temporarily 

423 interrupted, but can be expected to recover. 

424 

425 Exception: Any Exception raised within the context will be re-raised 

426 unless it's type is included in the ``exceptions_to_not_raise_on`` 

427 parameter. 

428 

429 """ 

430 if exceptions_to_not_raise_on is None: 

431 exceptions_to_not_raise_on = [] 

432 if exceptions_to_not_rollback_on is None: 

433 exceptions_to_not_rollback_on = [] 

434 

435 factory: "Union[scoped_session, sessionmaker[Session]]" = self._session_factory 

436 if scoped: 

437 factory = self._scoped_session_factory 

438 

439 # If we recently disposed of the SQL pool due to connection issues 

440 # ask the client to try again when it's expected to be working again 

441 time_til_retry = self._sql_pool_dispose_helper.time_until_active_pool() 

442 if time_til_retry > timedelta(seconds=0): 

443 raise RetriableDatabaseError( 

444 "Database connection was temporarily interrupted, please retry", time_til_retry 

445 ) 

446 

447 # Try to obtain a session 

448 try: 

449 session = factory() if scoped else factory(expire_on_commit=expire_on_commit) 

450 if sqlite_lock_immediately and session.bind.name == "sqlite": # type: ignore 

451 session.execute(text("BEGIN IMMEDIATE")) 

452 except Exception as e: 

453 LOGGER.error("Unable to obtain a database session.", exc_info=True) 

454 raise DatabaseError("Unable to obtain a database session.") from e 

455 

456 # Yield the session and catch exceptions that occur while using it 

457 # to roll-back if needed 

458 try: 

459 yield session 

460 session.commit() 

461 except Exception as e: 

462 transient_dberr = self._sql_pool_dispose_helper.check_dispose_pool(session, e) 

463 if type(e) in exceptions_to_not_rollback_on: 

464 try: 

465 session.commit() 

466 except Exception: 

467 pass 

468 else: 

469 session.rollback() 

470 if transient_dberr: 

471 LOGGER.warning("Rolling back database session due to transient database error.", exc_info=True) 

472 else: 

473 LOGGER.error("Error committing database session. Rolling back.", exc_info=True) 

474 if type(e) not in exceptions_to_not_raise_on: 

475 if transient_dberr: 

476 # Ask the client to retry when the pool is expected to be healthy again 

477 raise RetriableDatabaseError( 

478 "Database connection was temporarily interrupted, please retry", 

479 self._sql_pool_dispose_helper.time_until_active_pool(), 

480 ) from e 

481 raise 

482 finally: 

483 session.close() 

484 

485 @contextmanager 

486 def scoped_session( 

487 self, 

488 *, 

489 sqlite_lock_immediately: bool = False, 

490 exceptions_to_not_raise_on: Optional[List[Type[Exception]]] = None, 

491 exceptions_to_not_rollback_on: Optional[List[Type[Exception]]] = None, 

492 ) -> Generator[Session, None, None]: 

493 """ContextManager providing a thread-local ORM session for the database. 

494 

495 This is a shorthand for ``SqlProvider.session(scoped=True)``. 

496 

497 This ContextManager provides a reusable thread-local 

498 :class:`sqlalchemy.orm.Session` object. Once the ``Session`` has been 

499 created by the initial call, subsequent calls to this method from 

500 within a given thread will return the same ``Session`` object until 

501 :meth:`SqlProvider.remove_scoped_session` is called. 

502 

503 Args: 

504 See :meth:`SqlProvider.session` for further details. 

505 

506 Yields: 

507 A persistent thread-local :class:`sqlalchemy.orm.Session`. 

508 

509 """ 

510 with self.session( 

511 scoped=True, 

512 sqlite_lock_immediately=sqlite_lock_immediately, 

513 exceptions_to_not_raise_on=exceptions_to_not_raise_on, 

514 exceptions_to_not_rollback_on=exceptions_to_not_rollback_on, 

515 ) as session: 

516 yield session 

517 

518 def remove_scoped_session(self) -> None: 

519 """Remove the thread-local session, if any.""" 

520 self._scoped_session_factory.remove()