Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/redis.py: 97.50%

80 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-03-28 16:20 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17RedisStorage 

18================== 

19 

20A storage provider that stores data in a persistent redis store. 

21https://redis.io/ 

22 

23Redis client: redis-py 

24https://github.com/andymccurdy/redis-py 

25 

26""" 

27import functools 

28import io 

29import logging 

30from typing import IO, Any, Callable, Dict, List, Optional, Tuple, TypeVar, cast 

31 

32import redis 

33 

34from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

35from buildgrid._protos.google.rpc import code_pb2 

36from buildgrid._protos.google.rpc.status_pb2 import Status 

37from buildgrid.server.redis.provider import RedisProvider 

38from buildgrid.settings import HASH 

39 

40from .storage_abc import StorageABC 

41 

42LOGGER = logging.getLogger(__name__) 

43 

44Func = TypeVar("Func", bound=Callable) # type: ignore[type-arg] 

45 

46 

47def redis_client_exception_wrapper(func: Func) -> Func: 

48 """Wrapper from handling redis client exceptions.""" 

49 

50 @functools.wraps(func) 

51 def wrapper(*args: Any, **kwargs: Any) -> Any: 

52 try: 

53 return func(*args, **kwargs) 

54 except redis.RedisError: 

55 LOGGER.exception(f"Redis Exception in [{func.__name__}]") 

56 raise RuntimeError 

57 

58 return cast(Func, wrapper) 

59 

60 

61class RedisStorage(StorageABC): 

62 """Interface for communicating with a redis store.""" 

63 

64 @redis_client_exception_wrapper 

65 def __init__(self, redis: RedisProvider) -> None: 

66 self._redis = redis 

67 

68 def _construct_key(self, digest: Digest) -> str: 

69 """Helper to get the redis key name for a particular digest""" 

70 return digest.hash + "_" + str(digest.size_bytes) 

71 

72 @redis_client_exception_wrapper 

73 def has_blob(self, digest: Digest) -> bool: 

74 LOGGER.debug(f"Checking for blob: [{digest}]") 

75 return bool(self._redis.execute_ro(lambda r: r.exists(self._construct_key(digest)))) 

76 

77 @redis_client_exception_wrapper 

78 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

79 LOGGER.debug(f"Getting blob: [{digest}]") 

80 blob = self._redis.execute_ro(lambda r: r.get(self._construct_key(digest))) 

81 return None if blob is None else io.BytesIO(blob) 

82 

83 @redis_client_exception_wrapper 

84 def delete_blob(self, digest: Digest) -> None: 

85 LOGGER.debug(f"Deleting blob: [{digest}]") 

86 self._redis.execute_rw(lambda r: r.delete(self._construct_key(digest))) 

87 

88 @redis_client_exception_wrapper 

89 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

90 LOGGER.debug(f"Writing blob: [{digest}]") 

91 write_session.seek(0) 

92 self._redis.execute_rw(lambda r: r.set(self._construct_key(digest), write_session.read())) 

93 

94 @redis_client_exception_wrapper 

95 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

96 # Exist takes multiple keys, but only returns the number of keys which 

97 # exist, not which keys do/don't exist. Instead pipeline N exist 

98 # calls, which allows distinguishing which keys do/don't exist. 

99 

100 def validate_digests(r: "redis.Redis[bytes]") -> List[int]: 

101 pipe = r.pipeline() 

102 for digest in digests: 

103 pipe.exists(self._construct_key(digest)) 

104 return pipe.execute() 

105 

106 results = self._redis.execute_ro(validate_digests) 

107 

108 missing_digests: List[Digest] = [] 

109 for digest, result in zip(digests, results): 

110 if not result: 

111 missing_digests.append(digest) 

112 return missing_digests 

113 

114 @redis_client_exception_wrapper 

115 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

116 keymap: Dict[str, bytes] = {} 

117 results: List[Status] = [] 

118 for blob in blobs: 

119 if len(blob[1]) != blob[0].size_bytes or HASH(blob[1]).hexdigest() != blob[0].hash: 

120 results.append( 

121 Status( 

122 code=code_pb2.INVALID_ARGUMENT, 

123 message="Data doesn't match hash", 

124 ) 

125 ) 

126 else: 

127 results.append(Status(code=code_pb2.OK)) 

128 keymap[self._construct_key(blob[0])] = blob[1] 

129 self._redis.execute_rw(lambda r: r.mset(keymap)) # type: ignore[arg-type] 

130 # mset can't fail according to the documentation so return OK for all remaining digests 

131 return results 

132 

133 @redis_client_exception_wrapper 

134 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

135 redis_keys = [self._construct_key(x) for x in digests] 

136 found_blobs = self._redis.execute_ro(lambda r: r.mget(redis_keys)) 

137 result_map: Dict[str, bytes] = {} 

138 for digest, blob in zip(digests, found_blobs): 

139 if blob is not None: 

140 result_map[digest.hash] = blob 

141 return result_map