Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/redis.py: 97.94%

97 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17RedisStorage 

18================== 

19 

20A storage provider that stores data in a persistent redis store. 

21https://redis.io/ 

22 

23Redis client: redis-py 

24https://github.com/andymccurdy/redis-py 

25 

26""" 

27import functools 

28import io 

29from typing import IO, Any, Callable, Dict, List, Optional, Tuple, TypeVar, cast 

30 

31import redis 

32 

33from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

34from buildgrid._protos.google.rpc import code_pb2 

35from buildgrid._protos.google.rpc.status_pb2 import Status 

36from buildgrid.server.decorators import timed 

37from buildgrid.server.logging import buildgrid_logger 

38from buildgrid.server.metrics_names import METRIC 

39from buildgrid.server.redis.provider import RedisProvider 

40from buildgrid.server.settings import HASH 

41 

42from .storage_abc import StorageABC 

43 

44LOGGER = buildgrid_logger(__name__) 

45 

46Func = TypeVar("Func", bound=Callable) # type: ignore[type-arg] 

47 

48 

49def redis_client_exception_wrapper(func: Func) -> Func: 

50 """Wrapper from handling redis client exceptions.""" 

51 

52 @functools.wraps(func) 

53 def wrapper(*args: Any, **kwargs: Any) -> Any: 

54 try: 

55 return func(*args, **kwargs) 

56 except redis.RedisError: 

57 LOGGER.exception("Redis Exception.", tags=dict(func_name=func.__name__)) 

58 raise RuntimeError 

59 

60 return cast(Func, wrapper) 

61 

62 

63class RedisStorage(StorageABC): 

64 """Interface for communicating with a redis store.""" 

65 

66 TYPE = "Redis" 

67 

68 @redis_client_exception_wrapper 

69 def __init__(self, redis: RedisProvider) -> None: 

70 self._redis = redis 

71 

72 def _construct_key(self, digest: Digest) -> str: 

73 """Helper to get the redis key name for a particular digest""" 

74 return digest.hash + "_" + str(digest.size_bytes) 

75 

76 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

77 @redis_client_exception_wrapper 

78 def has_blob(self, digest: Digest) -> bool: 

79 LOGGER.debug("Checking for blob.", tags=dict(digest=digest)) 

80 return bool(self._redis.execute_ro(lambda r: r.exists(self._construct_key(digest)))) 

81 

82 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

83 @redis_client_exception_wrapper 

84 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

85 LOGGER.debug("Getting blob.", tags=dict(digest=digest)) 

86 blob = self._redis.execute_ro(lambda r: r.get(self._construct_key(digest))) 

87 return None if blob is None else io.BytesIO(blob) 

88 

89 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

90 @redis_client_exception_wrapper 

91 def delete_blob(self, digest: Digest) -> None: 

92 LOGGER.debug("Deleting blob.", tags=dict(digest=digest)) 

93 self._redis.execute_rw(lambda r: r.delete(self._construct_key(digest))) 

94 

95 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

96 @redis_client_exception_wrapper 

97 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

98 keys = [self._construct_key(digest) for digest in digests] 

99 self._redis.execute_rw(lambda r: r.delete(*keys)) 

100 return [] 

101 

102 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

103 @redis_client_exception_wrapper 

104 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

105 LOGGER.debug("Writing blob.", tags=dict(digest=digest)) 

106 write_session.seek(0) 

107 self._redis.execute_rw(lambda r: r.set(self._construct_key(digest), write_session.read())) 

108 

109 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

110 @redis_client_exception_wrapper 

111 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

112 # Exist takes multiple keys, but only returns the number of keys which 

113 # exist, not which keys do/don't exist. Instead pipeline N exist 

114 # calls, which allows distinguishing which keys do/don't exist. 

115 

116 def validate_digests(r: "redis.Redis[bytes]") -> List[int]: 

117 pipe = r.pipeline() 

118 for digest in digests: 

119 pipe.exists(self._construct_key(digest)) 

120 return pipe.execute() 

121 

122 results = self._redis.execute_ro(validate_digests) 

123 

124 missing_digests: List[Digest] = [] 

125 for digest, result in zip(digests, results): 

126 if not result: 

127 missing_digests.append(digest) 

128 return missing_digests 

129 

130 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

131 @redis_client_exception_wrapper 

132 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

133 keymap: Dict[str, bytes] = {} 

134 results: List[Status] = [] 

135 for digest, data in blobs: 

136 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

137 results.append(Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash")) 

138 continue 

139 results.append(Status(code=code_pb2.OK)) 

140 keymap[self._construct_key(digest)] = data 

141 

142 self._redis.execute_rw(lambda r: r.mset(keymap)) # type: ignore[arg-type] 

143 # mset can't fail according to the documentation so return OK for all remaining digests 

144 return results 

145 

146 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

147 @redis_client_exception_wrapper 

148 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

149 redis_keys = [self._construct_key(x) for x in digests] 

150 found_blobs = self._redis.execute_ro(lambda r: r.mget(redis_keys)) 

151 result_map: Dict[str, bytes] = {} 

152 for digest, blob in zip(digests, found_blobs): 

153 if blob is not None: 

154 result_map[digest.hash] = blob 

155 return result_map