Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/redis.py: 97.94%
97 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17RedisStorage
18==================
20A storage provider that stores data in a persistent redis store.
21https://redis.io/
23Redis client: redis-py
24https://github.com/andymccurdy/redis-py
26"""
27import functools
28import io
29from typing import IO, Any, Callable, Dict, List, Optional, Tuple, TypeVar, cast
31import redis
33from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
34from buildgrid._protos.google.rpc import code_pb2
35from buildgrid._protos.google.rpc.status_pb2 import Status
36from buildgrid.server.decorators import timed
37from buildgrid.server.logging import buildgrid_logger
38from buildgrid.server.metrics_names import METRIC
39from buildgrid.server.redis.provider import RedisProvider
40from buildgrid.server.settings import HASH
42from .storage_abc import StorageABC
44LOGGER = buildgrid_logger(__name__)
46Func = TypeVar("Func", bound=Callable) # type: ignore[type-arg]
49def redis_client_exception_wrapper(func: Func) -> Func:
50 """Wrapper from handling redis client exceptions."""
52 @functools.wraps(func)
53 def wrapper(*args: Any, **kwargs: Any) -> Any:
54 try:
55 return func(*args, **kwargs)
56 except redis.RedisError:
57 LOGGER.exception("Redis Exception.", tags=dict(func_name=func.__name__))
58 raise RuntimeError
60 return cast(Func, wrapper)
63class RedisStorage(StorageABC):
64 """Interface for communicating with a redis store."""
66 TYPE = "Redis"
68 @redis_client_exception_wrapper
69 def __init__(self, redis: RedisProvider) -> None:
70 self._redis = redis
72 def _construct_key(self, digest: Digest) -> str:
73 """Helper to get the redis key name for a particular digest"""
74 return digest.hash + "_" + str(digest.size_bytes)
76 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
77 @redis_client_exception_wrapper
78 def has_blob(self, digest: Digest) -> bool:
79 LOGGER.debug("Checking for blob.", tags=dict(digest=digest))
80 return bool(self._redis.execute_ro(lambda r: r.exists(self._construct_key(digest))))
82 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
83 @redis_client_exception_wrapper
84 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
85 LOGGER.debug("Getting blob.", tags=dict(digest=digest))
86 blob = self._redis.execute_ro(lambda r: r.get(self._construct_key(digest)))
87 return None if blob is None else io.BytesIO(blob)
89 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
90 @redis_client_exception_wrapper
91 def delete_blob(self, digest: Digest) -> None:
92 LOGGER.debug("Deleting blob.", tags=dict(digest=digest))
93 self._redis.execute_rw(lambda r: r.delete(self._construct_key(digest)))
95 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
96 @redis_client_exception_wrapper
97 def bulk_delete(self, digests: List[Digest]) -> List[str]:
98 keys = [self._construct_key(digest) for digest in digests]
99 self._redis.execute_rw(lambda r: r.delete(*keys))
100 return []
102 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
103 @redis_client_exception_wrapper
104 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
105 LOGGER.debug("Writing blob.", tags=dict(digest=digest))
106 write_session.seek(0)
107 self._redis.execute_rw(lambda r: r.set(self._construct_key(digest), write_session.read()))
109 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
110 @redis_client_exception_wrapper
111 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
112 # Exist takes multiple keys, but only returns the number of keys which
113 # exist, not which keys do/don't exist. Instead pipeline N exist
114 # calls, which allows distinguishing which keys do/don't exist.
116 def validate_digests(r: "redis.Redis[bytes]") -> List[int]:
117 pipe = r.pipeline()
118 for digest in digests:
119 pipe.exists(self._construct_key(digest))
120 return pipe.execute()
122 results = self._redis.execute_ro(validate_digests)
124 missing_digests: List[Digest] = []
125 for digest, result in zip(digests, results):
126 if not result:
127 missing_digests.append(digest)
128 return missing_digests
130 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
131 @redis_client_exception_wrapper
132 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
133 keymap: Dict[str, bytes] = {}
134 results: List[Status] = []
135 for digest, data in blobs:
136 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash:
137 results.append(Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash"))
138 continue
139 results.append(Status(code=code_pb2.OK))
140 keymap[self._construct_key(digest)] = data
142 self._redis.execute_rw(lambda r: r.mset(keymap)) # type: ignore[arg-type]
143 # mset can't fail according to the documentation so return OK for all remaining digests
144 return results
146 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
147 @redis_client_exception_wrapper
148 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
149 redis_keys = [self._construct_key(x) for x in digests]
150 found_blobs = self._redis.execute_ro(lambda r: r.mget(redis_keys))
151 result_map: Dict[str, bytes] = {}
152 for digest, blob in zip(digests, found_blobs):
153 if blob is not None:
154 result_map[digest.hash] = blob
155 return result_map