Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/sql/provider.py: 89.76%

127 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16from contextlib import contextmanager 

17from datetime import timedelta 

18from tempfile import NamedTemporaryFile 

19from typing import Any, Dict, Generator, Iterator, List, Optional, Type, Union, cast 

20 

21from alembic import command 

22from alembic.config import Config 

23from importlib_resources import files 

24from sqlalchemy import create_engine, event, text 

25from sqlalchemy.engine import Engine 

26from sqlalchemy.orm import Session, scoped_session, sessionmaker 

27from sqlalchemy.pool import NullPool 

28 

29from buildgrid._exceptions import DatabaseError, RetriableDatabaseError 

30from buildgrid.server.sql import sqlutils 

31from buildgrid.settings import ( 

32 COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS, 

33 COOLDOWN_TIME_JITTER_BASE, 

34 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, 

35) 

36 

37LOGGER = logging.getLogger(__name__) 

38 

39# Each dialect has a limit on the number of bind parameters allowed. This 

40# matters because it determines how large we can allow our IN clauses to get. 

41# 

42# SQLite: 1000 https://www.sqlite.org/limits.html#max_variable_number 

43# PostgreSQL: 32767 (Int16.MAX_VALUE) https://www.postgresql.org/docs/9.4/protocol-message-formats.html 

44# 

45# We'll refer to this as the "inlimit" in the code. The inlimits are 

46# set to 75% of the bind parameter limit of the implementation. 

47DIALECT_INLIMIT_MAP = {"postgresql": 24000, "sqlite": 750} 

48DEFAULT_INLIMIT = 100 

49 

50 

51# NOTE: Obviously these type annotations are useless, but sadly they're what 

52# is in the upstream sqlalchemy2-stubs[0]. 

53# 

54# Once we upgrade to SQLAlchemy 2.0 we can make these more useful, as that 

55# version of SQLAlchemy has sensible type annotations[1]. 

56# 

57# [0]: https://github.com/sqlalchemy/sqlalchemy2-stubs/blob/main/sqlalchemy-stubs/pool/events.pyi#L9 

58# [1]: https://github.com/sqlalchemy/sqlalchemy/blob/main/lib/sqlalchemy/pool/events.py#L96-L100 

59def _sqlite_on_connect(conn: Any, record: Any) -> None: 

60 """SQLite ``PRAGMA`` statements to execute immediately upon connection. 

61 

62 These statements configure the behaviour of the database, and are specific 

63 to SQLite. 

64 

65 See https://www.sqlite.org/pragma.html for details. 

66 

67 Args: 

68 conn (DBAPIConnection): The DBAPI connection that was just connected. 

69 record (_ConnectionRecord): The connection record which contains the 

70 DBAPI connection. 

71 

72 """ 

73 # Use journal_mode=WAL to allow read/write concurrency, as well as the 

74 # performance improvements it brings. 

75 conn.execute("PRAGMA journal_mode=WAL") 

76 conn.execute("PRAGMA synchronous=NORMAL") 

77 

78 

79class SqlProvider: 

80 """Class which provides an interface for interacting with an SQL database. 

81 

82 This class is used to allow configuration of a per-process SQL connection 

83 pool, which can then be shared across multiple components of BuildGrid 

84 which require an SQL connection. 

85 

86 Args: 

87 automigrate (bool): Whether or not to ensure the database is fully 

88 migrated when starting up. Defaults to ``False``, meaning the 

89 database is assumed to be at the up to date already. 

90 

91 connection_string (str | None): The connection string to use when 

92 creating a database connection. If ``None`` then a temporary 

93 SQLite database will be created for the lifetime of this 

94 object. 

95 

96 connection_timeout (int): The timeout to use when attempting to 

97 connect to the database, in seconds. Defaults to 5 seconds if 

98 unset. 

99 

100 lock_timeout (int): The timeout to use when the connection 

101 holds a lock in the database. This is supported only if the database 

102 backend is PostgresQL. 

103 

104 connect_args (Dict[str, Any] | None): Dictionary of DBAPI 

105 connection arguments to pass to the engine. See the 

106 SQLAlchemy `docs`_ for details. 

107 

108 max_overflow (int | None): The number of connections to allow 

109 as "overflow" connections in the connection pool. This is 

110 the number of connections which will be able to be opened 

111 above the value of ``pool_size``. 

112 

113 pool_pre_ping (bool | None): Whether or not to test connections 

114 for liveness on checkout from the connection pool. 

115 

116 pool_recycle (int | None): The number of seconds after which to 

117 recycle database connections. If ``None`` (the default) then 

118 connections won't be recycled (the SQLAlchemy default value 

119 of -1 will be used). 

120 

121 pool_size (int | None): The number of connections to keep open 

122 inside the engine's connection pool. 

123 

124 pool_timeout (int | None): The number of seconds to wait when 

125 attempting to get a connection from the connection pool. 

126 

127 Raises: 

128 ValueError: when ``connection_string`` specifies an in-memory SQLite 

129 database. 

130 

131 .. _docs: https://docs.sqlalchemy.org/en/14/core/engines.html#use-the-connect-args-dictionary-parameter 

132 

133 """ 

134 

135 def __init__( 

136 self, 

137 *, 

138 automigrate: bool = False, 

139 connection_string: Optional[str] = None, 

140 connection_timeout: int = 5, 

141 lock_timeout: int = 5, 

142 connect_args: Optional[Dict[Any, Any]] = None, 

143 max_overflow: Optional[int] = None, 

144 pool_pre_ping: Optional[bool] = None, 

145 pool_recycle: Optional[int] = None, 

146 pool_size: Optional[int] = None, 

147 pool_timeout: Optional[int] = None, 

148 ): 

149 """Initialize an SqlProvider.""" 

150 self._database_tempfile = None 

151 # If we don't have a connection string, we'll make a tempfile to use 

152 # as an SQLite database. This tempfile needs to exist for the lifetime 

153 # of the SqlProvider. 

154 if not connection_string: 

155 self._database_tempfile = NamedTemporaryFile(prefix="bgd-", suffix=".db") 

156 LOGGER.warning( 

157 "No connection string specified for the SQL provider, " 

158 f"will use SQLite with tempfile: [{self._database_tempfile.name}]" 

159 ) 

160 automigrate = True # since this is a temporary database, we always need to create it 

161 connection_string = f"sqlite:///{self._database_tempfile.name}" 

162 

163 # Set up database connection 

164 self._session_factory = sessionmaker(future=True) 

165 self._scoped_session_factory = scoped_session(self._session_factory) 

166 

167 self._engine = self._create_sqlalchemy_engine( 

168 connection_string, 

169 connection_timeout, 

170 lock_timeout=lock_timeout, 

171 connect_args=connect_args, 

172 max_overflow=max_overflow, 

173 pool_pre_ping=pool_pre_ping, 

174 pool_recycle=pool_recycle, 

175 pool_size=pool_size, 

176 pool_timeout=pool_timeout, 

177 ) 

178 

179 connection = self._engine.url 

180 LOGGER.info(f"Created SQL provider with: {automigrate=}, {connection=}") 

181 

182 if automigrate: 

183 self._create_or_migrate_db() 

184 

185 self._sql_pool_dispose_helper = sqlutils.SQLPoolDisposeHelper( 

186 COOLDOWN_TIME_AFTER_POOL_DISPOSE_SECONDS, 

187 COOLDOWN_TIME_JITTER_BASE, 

188 MIN_TIME_BETWEEN_SQL_POOL_DISPOSE_MINUTES, 

189 self._engine, 

190 ) 

191 

192 def _create_sqlalchemy_engine( 

193 self, 

194 connection_string: str, 

195 connection_timeout: int, 

196 lock_timeout: int, 

197 *, 

198 connect_args: Optional[Dict[Any, Any]] = None, 

199 max_overflow: Optional[int] = None, 

200 pool_pre_ping: Optional[bool] = None, 

201 pool_recycle: Optional[int] = None, 

202 pool_size: Optional[int] = None, 

203 pool_timeout: Optional[int] = None, 

204 ) -> Engine: 

205 """Create the SQLAlchemy Engine. 

206 

207 Args: 

208 connection_string: The connection string to use when 

209 creating the ``Engine``. 

210 

211 connection_timeout: The timeout to use for database 

212 connections, in seconds. If set as 0, no timeout 

213 is applied. 

214 

215 lock_timeout (int): The timeout to use when the connection 

216 holds a lock in the database. This is supported only if the database 

217 backend is PostgresQL. 

218 

219 connect_args: Dictionary of DBAPI 

220 connection arguments to pass to the engine. See the 

221 SQLAlchemy `docs`_ for details. 

222 

223 max_overflow: The number of connections to allow 

224 as "overflow" connections in the connection pool. This is 

225 the number of connections which will be able to be opened 

226 above the value of ``pool_size``. 

227 

228 pool_pre_ping: Whether or not to test connections 

229 for liveness on checkout from the connection pool. 

230 

231 pool_recycle: The number of seconds after which to 

232 recycle database connections. If ``None`` (the default) then 

233 connections won't be recycled (the SQLAlchemy default value 

234 of -1 will be used). 

235 

236 pool_size: The number of connections to keep open 

237 inside the engine's connection pool. 

238 If set as zero, no connection pool is created 

239 and other pool_* parameters are ignored. 

240 

241 pool_timeout: The number of seconds to wait when 

242 attempting to get a connection from the connection pool. 

243 

244 Returns: 

245 A :class:`sqlalchemy.engine.Engine` set up to connect to the 

246 database defined by ``connection_string``. 

247 

248 Raises: 

249 ValueError: when attempting to connect to an in-memory SQLite 

250 database. 

251 

252 .. _docs: https://docs.sqlalchemy.org/en/14/core/engines.html#use-the-connect-args-dictionary-parameter 

253 

254 """ 

255 # Disallow sqlite in-memory because multi-threaded access to it is 

256 # complex and potentially problematic at best 

257 # ref: https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#threading-pooling-behavior 

258 if sqlutils.is_sqlite_inmemory_connection_string(connection_string): 

259 raise ValueError( 

260 "Cannot use SQLite in-memory with BuildGrid " 

261 f"(connection_string=[{connection_string}]). Use a file or " 

262 "leave the connection_string empty for a tempfile." 

263 ) 

264 

265 extra_engine_args: Dict[str, Any] = {} 

266 if connect_args is not None: 

267 extra_engine_args["connect_args"] = connect_args 

268 else: 

269 extra_engine_args["connect_args"] = {} 

270 

271 if connection_timeout > 0: 

272 if sqlutils.is_sqlite_connection_string(connection_string): 

273 extra_engine_args["connect_args"]["timeout"] = connection_timeout 

274 elif sqlutils.is_psycopg2_connection_string(connection_string): 

275 extra_engine_args["connect_args"]["connect_timeout"] = connection_timeout 

276 if lock_timeout > 0 and sqlutils.is_psycopg2_connection_string(connection_string): 

277 # Additional postgres specific timeouts 

278 # Additional libpg options 

279 # Note that those timeouts are in milliseconds (so *1000) 

280 # User might specifically set options... do not override in this case. 

281 extra_engine_args["connect_args"].setdefault("options", f"-c lock_timeout={lock_timeout * 1000}") 

282 

283 if pool_size is not None and pool_size == 0: 

284 LOGGER.debug("No connection pool is created") 

285 extra_engine_args["poolclass"] = NullPool 

286 else: 

287 if max_overflow is not None: 

288 extra_engine_args["max_overflow"] = max_overflow 

289 if pool_pre_ping is not None: 

290 extra_engine_args["pool_pre_ping"] = pool_pre_ping 

291 if pool_recycle is not None: 

292 extra_engine_args["pool_recycle"] = pool_recycle 

293 if pool_size is not None: 

294 extra_engine_args["pool_size"] = pool_size 

295 if pool_timeout is not None: 

296 extra_engine_args["pool_timeout"] = pool_timeout 

297 

298 LOGGER.debug(f"Additional SQLAlchemy Engine args: [{extra_engine_args}]") 

299 

300 engine = create_engine(connection_string, echo=False, future=True, **extra_engine_args) 

301 self._session_factory.configure(bind=engine) 

302 

303 # Register sqlite-specific connection callback. 

304 if engine.dialect.name == "sqlite": 

305 event.listen(engine, "connect", _sqlite_on_connect) 

306 

307 return cast(Engine, engine) 

308 

309 def _create_or_migrate_db(self) -> None: 

310 """Ensure that the database schema is up to date. 

311 

312 This method runs the Alembic ``upgrade`` command to ensure that all of 

313 the database migrations have been run and the schema is up to date. 

314 

315 .. warning:: 

316 

317 Do not run this method concurrently against a single database, at 

318 risk of migrations conflicting with themselves and causing at least 

319 one of the callers to fail. 

320 

321 """ 

322 LOGGER.warning("Will attempt migration to latest version if needed.") 

323 

324 config: Config = Config() 

325 config.set_main_option("script_location", str(files("buildgrid.server.persistence.sql").joinpath("alembic"))) 

326 

327 with self._engine.begin() as connection: 

328 # NOTE: pylint doesn't like this for some reason, but it is the 

329 # documented way to set the connection. 

330 # https://alembic.sqlalchemy.org/en/latest/api/config.html#alembic.config.Config 

331 config.attributes["connection"] = connection 

332 command.upgrade(config, "head") 

333 

334 @property 

335 def dialect(self) -> str: 

336 """The SQL dialect in use by the configured SQL engine.""" 

337 return self._engine.dialect.name 

338 

339 @property 

340 def default_inlimit(self) -> int: 

341 """Return the default inlimit size based on the current SQL dialect""" 

342 return DIALECT_INLIMIT_MAP.get(self.dialect, DEFAULT_INLIMIT) 

343 

344 @contextmanager 

345 def session( 

346 self, 

347 *, 

348 scoped: bool = False, 

349 sqlite_lock_immediately: bool = False, 

350 exceptions_to_not_raise_on: Optional[List[Type[Exception]]] = None, 

351 exceptions_to_not_rollback_on: Optional[List[Type[Exception]]] = None, 

352 expire_on_commit: bool = True, 

353 ) -> Iterator[Session]: 

354 """ContextManager yielding an ORM ``Session`` for the configured database. 

355 

356 The :class:`sqlalchemy.orm.Session` lives for the duration of the 

357 managed context, and any open transaction is committed upon exiting 

358 the context. 

359 

360 This method can potentially block for a short while before yielding 

361 if the underlying connection pool has recently been disposed of and 

362 refreshed due to connectivity issues. 

363 

364 When ``sqlite_lock_immediately`` is ``True``, the Session will not 

365 yield until the database has been locked by entering into a write 

366 transaction when using SQLite. 

367 

368 If an Exception is raised whilst in the managed context, the ongoing 

369 database transaction is rolled back, and the Exception is reraised. 

370 Some Exceptions which suggest a transient connection issue with the 

371 database lead to a ``RetriableDatabaseError`` being raised from the 

372 Exception instead. 

373 

374 ``exceptions_to_not_raise_on`` defines a list of SQLAlchemyError types 

375 which should be suppressed instead of re-raised when occurring within 

376 the managed context. 

377 

378 Similarly, ``exceptions_to_not_rollback_on`` defines a list of 

379 SQLAlchemyError types which will not trigger a transaction rollback 

380 when occuring within the managed context. Instead, the open transaction 

381 will be committed and the session closed. 

382 

383 Args: 

384 scoped: If true, use a ``scoped_session`` factory to create the 

385 session. This results in reuse of the underlying Session object 

386 in a given thread. 

387 

388 sqlite_lock_immediately: If true, execute a ``BEGIN IMMEDIATE`` 

389 statement as soon as the session is created when using SQLite. 

390 This allows locking for the lifetime of the ``Session`` within 

391 this ContextManager, enabling similar behaviour to 

392 ``SELECT ... FOR UPDATE`` in other dialects. Defaults to 

393 ``False``. 

394 

395 exceptions_to_not_raise_on: The list of error types to be suppressed 

396 within the context rather than re-raised. Defaults to ``None``, 

397 meaning all SQLAlchemyErrors will be re-raised. 

398 

399 exceptions_to_not_rollback_on: The list 

400 of error types which shouldn't trigger a transaction rollback. 

401 Defaults to ``None``, meaning all SQLAlchemyErrors will trigger 

402 rollback of the transaction. 

403 

404 expire_on_commit: Defaults to True. When True, all instances will 

405 be fully expired after each commit(), so that all attribute/object 

406 access subsequent to a completed transaction will load from 

407 the most recent database state. This flag is ignored if 

408 ``scoped == True`` 

409 

410 Yields: 

411 A :class:`sqlalchemy.orm.Session` object. 

412 

413 Raises: 

414 DatabaseError: when a database session cannot be obtained. 

415 

416 RetriableDatabaseError: when the database connection is temporarily 

417 interrupted, but can be expected to recover. 

418 

419 Exception: Any Exception raised within the context will be re-raised 

420 unless it's type is included in the ``exceptions_to_not_raise_on`` 

421 parameter. 

422 

423 """ 

424 if exceptions_to_not_raise_on is None: 

425 exceptions_to_not_raise_on = [] 

426 if exceptions_to_not_rollback_on is None: 

427 exceptions_to_not_rollback_on = [] 

428 

429 factory: "Union[scoped_session, sessionmaker[Session]]" = self._session_factory 

430 if scoped: 

431 factory = self._scoped_session_factory 

432 

433 # If we recently disposed of the SQL pool due to connection issues 

434 # ask the client to try again when it's expected to be working again 

435 time_til_retry = self._sql_pool_dispose_helper.time_until_active_pool() 

436 if time_til_retry > timedelta(seconds=0): 

437 raise RetriableDatabaseError( 

438 "Database connection was temporarily interrupted, please retry", time_til_retry 

439 ) 

440 

441 # Try to obtain a session 

442 try: 

443 session = factory() if scoped else factory(expire_on_commit=expire_on_commit) 

444 if sqlite_lock_immediately and session.bind.name == "sqlite": # type: ignore 

445 session.execute(text("BEGIN IMMEDIATE")) 

446 except Exception as e: 

447 LOGGER.error("Unable to obtain a database session.", exc_info=True) 

448 raise DatabaseError("Unable to obtain a database session.") from e 

449 

450 # Yield the session and catch exceptions that occur while using it 

451 # to roll-back if needed 

452 try: 

453 yield session 

454 session.commit() 

455 except Exception as e: 

456 transient_dberr = self._sql_pool_dispose_helper.check_dispose_pool(session, e) 

457 if type(e) in exceptions_to_not_rollback_on: 

458 try: 

459 session.commit() 

460 except Exception: 

461 pass 

462 else: 

463 session.rollback() 

464 if transient_dberr: 

465 LOGGER.warning("Rolling back database session due to transient database error.", exc_info=True) 

466 else: 

467 LOGGER.error("Error committing database session. Rolling back.", exc_info=True) 

468 if type(e) not in exceptions_to_not_raise_on: 

469 if transient_dberr: 

470 # Ask the client to retry when the pool is expected to be healthy again 

471 raise RetriableDatabaseError( 

472 "Database connection was temporarily interrupted, please retry", 

473 self._sql_pool_dispose_helper.time_until_active_pool(), 

474 ) from e 

475 raise 

476 finally: 

477 session.close() 

478 

479 @contextmanager 

480 def scoped_session( 

481 self, 

482 *, 

483 sqlite_lock_immediately: bool = False, 

484 exceptions_to_not_raise_on: Optional[List[Type[Exception]]] = None, 

485 exceptions_to_not_rollback_on: Optional[List[Type[Exception]]] = None, 

486 ) -> Generator[Session, None, None]: 

487 """ContextManager providing a thread-local ORM session for the database. 

488 

489 This is a shorthand for ``SqlProvider.session(scoped=True)``. 

490 

491 This ContextManager provides a reusable thread-local 

492 :class:`sqlalchemy.orm.Session` object. Once the ``Session`` has been 

493 created by the initial call, subsequent calls to this method from 

494 within a given thread will return the same ``Session`` object until 

495 :meth:`SqlProvider.remove_scoped_session` is called. 

496 

497 Args: 

498 See :meth:`SqlProvider.session` for further details. 

499 

500 Yields: 

501 A persistent thread-local :class:`sqlalchemy.orm.Session`. 

502 

503 """ 

504 with self.session( 

505 scoped=True, 

506 sqlite_lock_immediately=sqlite_lock_immediately, 

507 exceptions_to_not_raise_on=exceptions_to_not_raise_on, 

508 exceptions_to_not_rollback_on=exceptions_to_not_rollback_on, 

509 ) as session: 

510 yield session 

511 

512 def remove_scoped_session(self) -> None: 

513 """Remove the thread-local session, if any.""" 

514 self._scoped_session_factory.remove()