Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/size_differentiated.py: 95.65%

46 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17SizeDifferentiatedStorage 

18========================= 

19 

20A storage provider which passes requests to other storage providers 

21based on the size of the blob being requested. 

22 

23""" 

24 

25import io 

26import logging 

27from typing import List, NamedTuple 

28 

29# TODO: Use the standard library version of this when we drop support 

30# for Python 3.7 

31from typing_extensions import TypedDict 

32 

33from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

34from .storage_abc import StorageABC 

35 

36 

37class SizeLimitedStorageType(TypedDict): 

38 max_size: int 

39 storage: StorageABC 

40 

41 

42# NOTE: This exists separately to the TypedDict to allow attribute-based access 

43# at runtime, rather than relying on string-based access to dictionary keys. 

44class _SizeLimitedStorage(NamedTuple): 

45 max_size: int 

46 storage: StorageABC 

47 

48 

49class SizeDifferentiatedStorage(StorageABC): 

50 

51 def __init__(self, storages: List[SizeLimitedStorageType], fallback: StorageABC): 

52 self._logger = logging.getLogger(__name__) 

53 self._fallback_storage = fallback 

54 self._storages = [_SizeLimitedStorage(**storage) for storage in storages] 

55 self._storages.sort(key=lambda storage: storage.max_size) 

56 

57 def _storage_from_digest(self, digest: Digest) -> StorageABC: 

58 for storage in self._storages: 

59 if digest.size_bytes < storage.max_size: 

60 return storage.storage 

61 # If the blob is too big for any of the size-limited storages, 

62 # put it in the fallback. 

63 return self._fallback_storage 

64 

65 def setup_grpc(self): 

66 for storage_tuple in self._storages: 

67 storage_tuple.storage.setup_grpc() 

68 

69 def has_blob(self, digest: Digest) -> bool: 

70 self._logger.debug(f"Checking for blob: [{digest}]") 

71 storage = self._storage_from_digest(digest) 

72 return storage.has_blob(digest) 

73 

74 def get_blob(self, digest: Digest) -> io.IOBase: 

75 self._logger.debug(f"Getting blob: [{digest}]") 

76 storage = self._storage_from_digest(digest) 

77 return storage.get_blob(digest) 

78 

79 def delete_blob(self, digest: Digest): 

80 self._logger.debug(f"Deleting blob: [{digest}]") 

81 storage = self._storage_from_digest(digest) 

82 storage.delete_blob(digest) 

83 

84 def begin_write(self, digest: Digest) -> io.IOBase: 

85 storage = self._storage_from_digest(digest) 

86 return storage.begin_write(digest) 

87 

88 def commit_write(self, digest: Digest, write_session: io.IOBase): 

89 self._logger.debug(f"Writing blob: [{digest}]") 

90 storage = self._storage_from_digest(digest) 

91 storage.commit_write(digest, write_session)