Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql_dialect_delegates/postgresqldelegate.py: 100.00%
17 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-28 16:20 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-28 16:20 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16PostgreSQLDelegate
17==================
19Extra functionality for the SQL index when using a PostgreSQL backend.
21"""
22from datetime import datetime
23from typing import List, Optional, Tuple
25from sqlalchemy.dialects.postgresql import insert
26from sqlalchemy.orm.session import Session as SessionType
27from sqlalchemy.sql.functions import coalesce
29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
30from buildgrid.server.persistence.sql.models import IndexEntry
33class PostgreSQLDelegate:
34 @staticmethod
35 def _save_digests_to_index(
36 digest_blob_pairs: List[Tuple[Digest, Optional[bytes]]], session: SessionType, max_inline_blob_size: int
37 ) -> None:
38 index_table = IndexEntry.__table__
39 update_time = datetime.utcnow()
40 new_rows = [
41 {
42 "digest_hash": digest.hash,
43 "digest_size_bytes": digest.size_bytes,
44 "accessed_timestamp": update_time,
45 "inline_blob": (blob if digest.size_bytes <= max_inline_blob_size else None),
46 }
47 for (digest, blob) in digest_blob_pairs
48 ]
50 base_insert_stmt = insert(index_table).values(new_rows)
52 update_stmt = base_insert_stmt.on_conflict_do_update(
53 index_elements=["digest_hash"],
54 set_={
55 "accessed_timestamp": update_time,
56 "inline_blob": coalesce(base_insert_stmt.excluded.inline_blob, index_table.c.inline_blob),
57 },
58 )
60 session.execute(update_stmt)