Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/janitor/index.py: 100.00%

44 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from abc import ABC, abstractmethod 

17from typing import Set 

18 

19from redis import Redis 

20from sqlalchemy import select 

21 

22from buildgrid.server.cleanup.janitor.config import RedisConfig 

23from buildgrid.server.logging import buildgrid_logger 

24from buildgrid.server.redis.provider import RedisProvider 

25from buildgrid.server.sql.models import IndexEntry 

26from buildgrid.server.sql.provider import SqlProvider 

27 

28LOGGER = buildgrid_logger(__name__) 

29 

30 

31class IndexLookup(ABC): 

32 

33 @abstractmethod 

34 def get_missing_digests(self, digests: Set[str]) -> Set[str]: 

35 """Return the subset of ``digests`` which is not in the index.""" 

36 

37 

38class RedisIndexLookup(IndexLookup): 

39 

40 def __init__(self, config: RedisConfig): 

41 LOGGER.info("Creating a Redis CAS Janitor.") 

42 

43 self._batch_size = config.key_batch_size 

44 self._index_prefix = config.index_prefix 

45 self._redis = RedisProvider( 

46 host=config.host, 

47 port=config.port, 

48 password=config.password, 

49 db=config.db, 

50 dns_srv_record=config.dns_srv_record, 

51 sentinel_master_name=config.sentinel_master_name, 

52 ) 

53 

54 def get_missing_digests(self, digests: Set[str]) -> Set[str]: 

55 def _get_missing_digests(redis: "Redis[bytes]") -> Set[str]: 

56 # NOTE: Use a sorted list of digests here since we need to pipeline them in 

57 # the same order as we zip them with the pipeline results. 

58 sorted_digests = sorted(digests) 

59 found_digests: Set[str] = set() 

60 offset = 0 

61 while offset < len(sorted_digests): 

62 batch = sorted_digests[offset : offset + self._batch_size] 

63 pipe = redis.pipeline() 

64 for digest in batch: 

65 pipe.exists(f"{self._index_prefix}{digest}") 

66 results = pipe.execute() 

67 found_digests |= {digest for result, digest in zip(results, batch) if result > 0} 

68 offset += self._batch_size 

69 

70 return digests - found_digests 

71 

72 return self._redis.execute_ro(_get_missing_digests) 

73 

74 

75class SqlIndexLookup(IndexLookup): 

76 

77 def __init__(self, connection_string: str): 

78 LOGGER.info("Creating an SQL CAS Janitor.") 

79 self._sql = SqlProvider(connection_string=connection_string) 

80 

81 def get_missing_digests(self, digests: Set[str]) -> Set[str]: 

82 hashes = set(digest.split("_", 1)[0] for digest in digests) 

83 with self._sql.scoped_session() as session: 

84 stmt = select(IndexEntry.digest_hash, IndexEntry.digest_size_bytes).filter( 

85 IndexEntry.digest_hash.in_(hashes) 

86 ) 

87 found_digests = {f"{row[0]}_{row[1]}" for row in session.execute(stmt)} 

88 return digests - found_digests