Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/index/sql_dialect_delegates/postgresqldelegate.py: 100.00%

17 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-03-28 16:20 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16PostgreSQLDelegate 

17================== 

18 

19Extra functionality for the SQL index when using a PostgreSQL backend. 

20 

21""" 

22from datetime import datetime 

23from typing import List, Optional, Tuple 

24 

25from sqlalchemy.dialects.postgresql import insert 

26from sqlalchemy.orm.session import Session as SessionType 

27from sqlalchemy.sql.functions import coalesce 

28 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

30from buildgrid.server.persistence.sql.models import IndexEntry 

31 

32 

33class PostgreSQLDelegate: 

34 @staticmethod 

35 def _save_digests_to_index( 

36 digest_blob_pairs: List[Tuple[Digest, Optional[bytes]]], session: SessionType, max_inline_blob_size: int 

37 ) -> None: 

38 index_table = IndexEntry.__table__ 

39 update_time = datetime.utcnow() 

40 new_rows = [ 

41 { 

42 "digest_hash": digest.hash, 

43 "digest_size_bytes": digest.size_bytes, 

44 "accessed_timestamp": update_time, 

45 "inline_blob": (blob if digest.size_bytes <= max_inline_blob_size else None), 

46 } 

47 for (digest, blob) in digest_blob_pairs 

48 ] 

49 

50 base_insert_stmt = insert(index_table).values(new_rows) 

51 

52 update_stmt = base_insert_stmt.on_conflict_do_update( 

53 index_elements=["digest_hash"], 

54 set_={ 

55 "accessed_timestamp": update_time, 

56 "inline_blob": coalesce(base_insert_stmt.excluded.inline_blob, index_table.c.inline_blob), 

57 }, 

58 ) 

59 

60 session.execute(update_stmt)