Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql_dialect_delegates/postgresqldelegate.py: 100.00%
18 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-28 16:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-28 16:48 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16PostgreSQLDelegate
17==================
19Extra functionality for the SQL index when using a PostgreSQL backend.
21"""
22from datetime import datetime
23from typing import cast
25from sqlalchemy import Table
26from sqlalchemy.dialects.postgresql import insert
27from sqlalchemy.orm.session import Session as SessionType
28from sqlalchemy.sql.functions import coalesce
30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
31from buildgrid.server.sql.models import IndexEntry
34class PostgreSQLDelegate:
35 @staticmethod
36 def _save_digests_to_index(
37 digest_blob_pairs: list[tuple[Digest, bytes | None]], session: SessionType, max_inline_blob_size: int
38 ) -> None:
39 # See discussion of __table__ typing in https://github.com/sqlalchemy/sqlalchemy/issues/9130
40 index_table = cast(Table, IndexEntry.__table__)
41 update_time = datetime.utcnow()
42 new_rows = [
43 {
44 "digest_hash": digest.hash,
45 "digest_size_bytes": digest.size_bytes,
46 "accessed_timestamp": update_time,
47 "inline_blob": (blob if digest.size_bytes <= max_inline_blob_size else None),
48 "deleted": False,
49 }
50 for (digest, blob) in digest_blob_pairs
51 ]
53 base_insert_stmt = insert(index_table).values(new_rows)
55 update_stmt = base_insert_stmt.on_conflict_do_update(
56 index_elements=["digest_hash"],
57 set_={
58 "accessed_timestamp": update_time,
59 "inline_blob": coalesce(base_insert_stmt.excluded.inline_blob, index_table.c.inline_blob),
60 "deleted": False,
61 },
62 )
64 session.execute(update_stmt)