Coverage for /builds/BuildGrid/buildgrid/buildgrid/cleanup/janitor/index.py: 100.00%

44 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16from abc import ABC, abstractmethod 

17from typing import Set 

18 

19from redis import Redis 

20from sqlalchemy import select 

21 

22from buildgrid.cleanup.janitor.config import RedisConfig 

23from buildgrid.server.persistence.sql.models import IndexEntry 

24from buildgrid.server.redis.provider import RedisProvider 

25from buildgrid.server.sql.provider import SqlProvider 

26 

27LOGGER = logging.getLogger(__name__) 

28 

29 

30class IndexLookup(ABC): 

31 

32 @abstractmethod 

33 def get_missing_digests(self, digests: Set[str]) -> Set[str]: 

34 """Return the subset of ``digests`` which is not in the index.""" 

35 

36 

37class RedisIndexLookup(IndexLookup): 

38 

39 def __init__(self, config: RedisConfig): 

40 LOGGER.info("Creating a Redis CAS Janitor") 

41 

42 self._batch_size = config.key_batch_size 

43 self._index_prefix = config.index_prefix 

44 self._redis = RedisProvider( 

45 host=config.host, 

46 port=config.port, 

47 password=config.password, 

48 db=config.db, 

49 dns_srv_record=config.dns_srv_record, 

50 sentinel_master_name=config.sentinel_master_name, 

51 ) 

52 

53 def get_missing_digests(self, digests: Set[str]) -> Set[str]: 

54 def _get_missing_digests(redis: "Redis[bytes]") -> Set[str]: 

55 # NOTE: Use a sorted list of digests here since we need to pipeline them in 

56 # the same order as we zip them with the pipeline results. 

57 sorted_digests = sorted(digests) 

58 found_digests: Set[str] = set() 

59 offset = 0 

60 while offset < len(sorted_digests): 

61 batch = sorted_digests[offset : offset + self._batch_size] 

62 pipe = redis.pipeline() 

63 for digest in batch: 

64 pipe.exists(f"{self._index_prefix}{digest}") 

65 results = pipe.execute() 

66 found_digests |= {digest for result, digest in zip(results, batch) if result > 0} 

67 offset += self._batch_size 

68 

69 return digests - found_digests 

70 

71 return self._redis.execute_ro(_get_missing_digests) 

72 

73 

74class SqlIndexLookup(IndexLookup): 

75 

76 def __init__(self, connection_string: str): 

77 LOGGER.info("Creating an SQL CAS Janitor") 

78 self._sql = SqlProvider(connection_string=connection_string) 

79 

80 def get_missing_digests(self, digests: Set[str]) -> Set[str]: 

81 hashes = set(digest.split("_", 1)[0] for digest in digests) 

82 with self._sql.scoped_session() as session: 

83 stmt = select(IndexEntry.digest_hash, IndexEntry.digest_size_bytes).filter( 

84 IndexEntry.digest_hash.in_(hashes) 

85 ) 

86 found_digests = {f"{row[0]}_{row[1]}" for row in session.execute(stmt)} 

87 return digests - found_digests