Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/janitor/index.py: 100.00%
44 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from abc import ABC, abstractmethod
17from typing import Set
19from redis import Redis
20from sqlalchemy import select
22from buildgrid.server.cleanup.janitor.config import RedisConfig
23from buildgrid.server.logging import buildgrid_logger
24from buildgrid.server.redis.provider import RedisProvider
25from buildgrid.server.sql.models import IndexEntry
26from buildgrid.server.sql.provider import SqlProvider
28LOGGER = buildgrid_logger(__name__)
31class IndexLookup(ABC):
33 @abstractmethod
34 def get_missing_digests(self, digests: Set[str]) -> Set[str]:
35 """Return the subset of ``digests`` which is not in the index."""
38class RedisIndexLookup(IndexLookup):
40 def __init__(self, config: RedisConfig):
41 LOGGER.info("Creating a Redis CAS Janitor.")
43 self._batch_size = config.key_batch_size
44 self._index_prefix = config.index_prefix
45 self._redis = RedisProvider(
46 host=config.host,
47 port=config.port,
48 password=config.password,
49 db=config.db,
50 dns_srv_record=config.dns_srv_record,
51 sentinel_master_name=config.sentinel_master_name,
52 )
54 def get_missing_digests(self, digests: Set[str]) -> Set[str]:
55 def _get_missing_digests(redis: "Redis[bytes]") -> Set[str]:
56 # NOTE: Use a sorted list of digests here since we need to pipeline them in
57 # the same order as we zip them with the pipeline results.
58 sorted_digests = sorted(digests)
59 found_digests: Set[str] = set()
60 offset = 0
61 while offset < len(sorted_digests):
62 batch = sorted_digests[offset : offset + self._batch_size]
63 pipe = redis.pipeline()
64 for digest in batch:
65 pipe.exists(f"{self._index_prefix}{digest}")
66 results = pipe.execute()
67 found_digests |= {digest for result, digest in zip(results, batch) if result > 0}
68 offset += self._batch_size
70 return digests - found_digests
72 return self._redis.execute_ro(_get_missing_digests)
75class SqlIndexLookup(IndexLookup):
77 def __init__(self, connection_string: str):
78 LOGGER.info("Creating an SQL CAS Janitor.")
79 self._sql = SqlProvider(connection_string=connection_string)
81 def get_missing_digests(self, digests: Set[str]) -> Set[str]:
82 hashes = set(digest.split("_", 1)[0] for digest in digests)
83 with self._sql.scoped_session() as session:
84 stmt = select(IndexEntry.digest_hash, IndexEntry.digest_size_bytes).filter(
85 IndexEntry.digest_hash.in_(hashes)
86 )
87 found_digests = {f"{row[0]}_{row[1]}" for row in session.execute(stmt)}
88 return digests - found_digests