Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/redis.py: 97.50%
80 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-28 16:20 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-28 16:20 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17RedisStorage
18==================
20A storage provider that stores data in a persistent redis store.
21https://redis.io/
23Redis client: redis-py
24https://github.com/andymccurdy/redis-py
26"""
27import functools
28import io
29import logging
30from typing import IO, Any, Callable, Dict, List, Optional, Tuple, TypeVar, cast
32import redis
34from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
35from buildgrid._protos.google.rpc import code_pb2
36from buildgrid._protos.google.rpc.status_pb2 import Status
37from buildgrid.server.redis.provider import RedisProvider
38from buildgrid.settings import HASH
40from .storage_abc import StorageABC
42LOGGER = logging.getLogger(__name__)
44Func = TypeVar("Func", bound=Callable) # type: ignore[type-arg]
47def redis_client_exception_wrapper(func: Func) -> Func:
48 """Wrapper from handling redis client exceptions."""
50 @functools.wraps(func)
51 def wrapper(*args: Any, **kwargs: Any) -> Any:
52 try:
53 return func(*args, **kwargs)
54 except redis.RedisError:
55 LOGGER.exception(f"Redis Exception in [{func.__name__}]")
56 raise RuntimeError
58 return cast(Func, wrapper)
61class RedisStorage(StorageABC):
62 """Interface for communicating with a redis store."""
64 @redis_client_exception_wrapper
65 def __init__(self, redis: RedisProvider) -> None:
66 self._redis = redis
68 def _construct_key(self, digest: Digest) -> str:
69 """Helper to get the redis key name for a particular digest"""
70 return digest.hash + "_" + str(digest.size_bytes)
72 @redis_client_exception_wrapper
73 def has_blob(self, digest: Digest) -> bool:
74 LOGGER.debug(f"Checking for blob: [{digest}]")
75 return bool(self._redis.execute_ro(lambda r: r.exists(self._construct_key(digest))))
77 @redis_client_exception_wrapper
78 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
79 LOGGER.debug(f"Getting blob: [{digest}]")
80 blob = self._redis.execute_ro(lambda r: r.get(self._construct_key(digest)))
81 return None if blob is None else io.BytesIO(blob)
83 @redis_client_exception_wrapper
84 def delete_blob(self, digest: Digest) -> None:
85 LOGGER.debug(f"Deleting blob: [{digest}]")
86 self._redis.execute_rw(lambda r: r.delete(self._construct_key(digest)))
88 @redis_client_exception_wrapper
89 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
90 LOGGER.debug(f"Writing blob: [{digest}]")
91 write_session.seek(0)
92 self._redis.execute_rw(lambda r: r.set(self._construct_key(digest), write_session.read()))
94 @redis_client_exception_wrapper
95 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
96 # Exist takes multiple keys, but only returns the number of keys which
97 # exist, not which keys do/don't exist. Instead pipeline N exist
98 # calls, which allows distinguishing which keys do/don't exist.
100 def validate_digests(r: "redis.Redis[bytes]") -> List[int]:
101 pipe = r.pipeline()
102 for digest in digests:
103 pipe.exists(self._construct_key(digest))
104 return pipe.execute()
106 results = self._redis.execute_ro(validate_digests)
108 missing_digests: List[Digest] = []
109 for digest, result in zip(digests, results):
110 if not result:
111 missing_digests.append(digest)
112 return missing_digests
114 @redis_client_exception_wrapper
115 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
116 keymap: Dict[str, bytes] = {}
117 results: List[Status] = []
118 for blob in blobs:
119 if len(blob[1]) != blob[0].size_bytes or HASH(blob[1]).hexdigest() != blob[0].hash:
120 results.append(
121 Status(
122 code=code_pb2.INVALID_ARGUMENT,
123 message="Data doesn't match hash",
124 )
125 )
126 else:
127 results.append(Status(code=code_pb2.OK))
128 keymap[self._construct_key(blob[0])] = blob[1]
129 self._redis.execute_rw(lambda r: r.mset(keymap)) # type: ignore[arg-type]
130 # mset can't fail according to the documentation so return OK for all remaining digests
131 return results
133 @redis_client_exception_wrapper
134 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
135 redis_keys = [self._construct_key(x) for x in digests]
136 found_blobs = self._redis.execute_ro(lambda r: r.mget(redis_keys))
137 result_map: Dict[str, bytes] = {}
138 for digest, blob in zip(digests, found_blobs):
139 if blob is not None:
140 result_map[digest.hash] = blob
141 return result_map