Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/size_differentiated.py: 95.65%
46 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17SizeDifferentiatedStorage
18=========================
20A storage provider which passes requests to other storage providers
21based on the size of the blob being requested.
23"""
25import io
26import logging
27from typing import List, NamedTuple
29# TODO: Use the standard library version of this when we drop support
30# for Python 3.7
31from typing_extensions import TypedDict
33from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
34from .storage_abc import StorageABC
37class SizeLimitedStorageType(TypedDict):
38 max_size: int
39 storage: StorageABC
42# NOTE: This exists separately to the TypedDict to allow attribute-based access
43# at runtime, rather than relying on string-based access to dictionary keys.
44class _SizeLimitedStorage(NamedTuple):
45 max_size: int
46 storage: StorageABC
49class SizeDifferentiatedStorage(StorageABC):
51 def __init__(self, storages: List[SizeLimitedStorageType], fallback: StorageABC):
52 self._logger = logging.getLogger(__name__)
53 self._fallback_storage = fallback
54 self._storages = [_SizeLimitedStorage(**storage) for storage in storages]
55 self._storages.sort(key=lambda storage: storage.max_size)
57 def _storage_from_digest(self, digest: Digest) -> StorageABC:
58 for storage in self._storages:
59 if digest.size_bytes < storage.max_size:
60 return storage.storage
61 # If the blob is too big for any of the size-limited storages,
62 # put it in the fallback.
63 return self._fallback_storage
65 def setup_grpc(self):
66 for storage_tuple in self._storages:
67 storage_tuple.storage.setup_grpc()
69 def has_blob(self, digest: Digest) -> bool:
70 self._logger.debug(f"Checking for blob: [{digest}]")
71 storage = self._storage_from_digest(digest)
72 return storage.has_blob(digest)
74 def get_blob(self, digest: Digest) -> io.IOBase:
75 self._logger.debug(f"Getting blob: [{digest}]")
76 storage = self._storage_from_digest(digest)
77 return storage.get_blob(digest)
79 def delete_blob(self, digest: Digest):
80 self._logger.debug(f"Deleting blob: [{digest}]")
81 storage = self._storage_from_digest(digest)
82 storage.delete_blob(digest)
84 def begin_write(self, digest: Digest) -> io.IOBase:
85 storage = self._storage_from_digest(digest)
86 return storage.begin_write(digest)
88 def commit_write(self, digest: Digest, write_session: io.IOBase):
89 self._logger.debug(f"Writing blob: [{digest}]")
90 storage = self._storage_from_digest(digest)
91 storage.commit_write(digest, write_session)