Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql_dialect_delegates/postgresqldelegate.py: 100.00%

18 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-28 16:48 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16PostgreSQLDelegate 

17================== 

18 

19Extra functionality for the SQL index when using a PostgreSQL backend. 

20 

21""" 

22from datetime import datetime 

23from typing import cast 

24 

25from sqlalchemy import Table 

26from sqlalchemy.dialects.postgresql import insert 

27from sqlalchemy.orm.session import Session as SessionType 

28from sqlalchemy.sql.functions import coalesce 

29 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

31from buildgrid.server.sql.models import IndexEntry 

32 

33 

34class PostgreSQLDelegate: 

35 @staticmethod 

36 def _save_digests_to_index( 

37 digest_blob_pairs: list[tuple[Digest, bytes | None]], session: SessionType, max_inline_blob_size: int 

38 ) -> None: 

39 # See discussion of __table__ typing in https://github.com/sqlalchemy/sqlalchemy/issues/9130 

40 index_table = cast(Table, IndexEntry.__table__) 

41 update_time = datetime.utcnow() 

42 new_rows = [ 

43 { 

44 "digest_hash": digest.hash, 

45 "digest_size_bytes": digest.size_bytes, 

46 "accessed_timestamp": update_time, 

47 "inline_blob": (blob if digest.size_bytes <= max_inline_blob_size else None), 

48 "deleted": False, 

49 } 

50 for (digest, blob) in digest_blob_pairs 

51 ] 

52 

53 base_insert_stmt = insert(index_table).values(new_rows) 

54 

55 update_stmt = base_insert_stmt.on_conflict_do_update( 

56 index_elements=["digest_hash"], 

57 set_={ 

58 "accessed_timestamp": update_time, 

59 "inline_blob": coalesce(base_insert_stmt.excluded.inline_blob, index_table.c.inline_blob), 

60 "deleted": False, 

61 }, 

62 ) 

63 

64 session.execute(update_stmt)