Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/redis.py: 94.44%
36 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17RedisStorage
18==================
20A storage provider that stores data in a persistent redis store.
21https://redis.io/
23Redis client: redis-py
24https://github.com/andymccurdy/redis-py
26"""
27import io
28import functools
29import logging
31import redis
33from .storage_abc import StorageABC
36def redis_client_exception_wrapper(func):
37 """ Wrapper from handling redis client exceptions. """
38 @functools.wraps(func)
39 def wrapper(*args, **kwargs):
40 try:
41 return func(*args, **kwargs)
42 except redis.RedisError:
43 logging.getLogger(__name__).exception(
44 f"Redis Exception in [{func.__name__}]")
45 raise RuntimeError
46 return wrapper
49class RedisStorage(StorageABC):
50 """ Interface for communicating with a redis store. """
51 @redis_client_exception_wrapper
52 def __init__(self, **kwargs):
53 self._logger = logging.getLogger(__name__)
54 self._client = redis.Redis(**kwargs)
56 @redis_client_exception_wrapper
57 def has_blob(self, digest) -> bool:
58 self._logger.debug(f"Checking for blob: [{digest}]")
59 return bool(self._client.exists(digest.hash + '_' + str(digest.size_bytes)))
61 @redis_client_exception_wrapper
62 def get_blob(self, digest):
63 self._logger.debug(f"Getting blob: [{digest}]")
64 blob = self._client.get(digest.hash + '_' + str(digest.size_bytes))
65 return None if blob is None else io.BytesIO(blob)
67 @redis_client_exception_wrapper
68 def delete_blob(self, digest):
69 self._logger.debug(f"Deleting blob: [{digest}]")
70 self._client.delete(digest.hash + '_' + str(digest.size_bytes))
72 @redis_client_exception_wrapper
73 def begin_write(self, digest) -> io.BytesIO:
74 return io.BytesIO()
76 @redis_client_exception_wrapper
77 def commit_write(self, digest, write_session):
78 self._logger.debug(f"Writing blob: [{digest}]")
79 self._client.set(digest.hash + '_' + str(digest.size_bytes), write_session.getvalue())
80 write_session.close()