Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/storage_abc.py: 78.02%
91 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17StorageABC
18==================
20The abstract base class for storage providers.
21"""
23import abc
24import logging
25from typing import List, Optional
27from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import (
28 CacheCapabilities, Digest, SymlinkAbsolutePathStrategy
29)
30from buildgrid._protos.google.rpc.status_pb2 import Status
31from buildgrid._protos.google.rpc import code_pb2
32from buildgrid.utils import get_hash_type
34from ....settings import HASH, MAX_REQUEST_SIZE
37class StorageABC(abc.ABC):
39 def setup_grpc(self):
40 pass
42 @abc.abstractmethod
43 def has_blob(self, digest):
44 """Return True if the blob with the given instance/digest exists."""
45 raise NotImplementedError()
47 @abc.abstractmethod
48 def get_blob(self, digest):
49 """Return a file-like object containing the blob. Most implementations
50 will read the entire file into memory and return a `BytesIO` object.
51 Eventually this should be corrected to handle files which cannot fit
52 into memory.
54 The file-like object must be readable. To use this with an index, it must
55 be seekable as well.
57 If the blob isn't present in storage, return None.
58 """
59 raise NotImplementedError()
61 @abc.abstractmethod
62 def delete_blob(self, digest):
63 """Delete the blob from storage if it's present."""
65 @abc.abstractmethod
66 def begin_write(self, digest):
67 """Return a file-like object to which a blob's contents could be
68 written.
69 """
70 raise NotImplementedError()
72 @abc.abstractmethod
73 def commit_write(self, digest, write_session):
74 """Commit the write operation. `write_session` must be an object
75 returned by `begin_write`.
77 The storage object is not responsible for verifying that the data
78 written to the write_session actually matches the digest. The caller
79 must do that.
80 """
81 raise NotImplementedError()
83 def bulk_delete(self, digests: List[Digest]) -> List[str]:
84 """Delete a list of blobs from storage."""
85 failed_deletions = []
86 for digest in digests:
87 try:
88 self.delete_blob(digest)
89 except Exception:
90 # If deletion threw an exception, assume deletion failed. More specific implementations
91 # with more information can return if a blob was missing instead
92 logging.getLogger(__name__).warning(
93 f"Unable to clean up digest [{digest.hash}/{digest.size_bytes}]", exc_info=True)
94 failed_deletions.append(f'{digest.hash}/{digest.size_bytes}')
95 return failed_deletions
97 def missing_blobs(self, digests):
98 """Return a container containing the blobs not present in CAS."""
99 result = []
100 for digest in digests:
101 if not self.has_blob(digest):
102 result.append(digest)
103 return result
105 def bulk_update_blobs(self, blobs):
106 """Given a container of (digest, value) tuples, add all the blobs
107 to CAS. Return a list of Status objects corresponding to the
108 result of uploading each of the blobs.
110 Unlike in `commit_write`, the storage object will verify that each of
111 the digests matches the provided data.
112 """
113 result = []
114 for digest, data in blobs:
115 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash:
116 result.append(
117 Status(
118 code=code_pb2.INVALID_ARGUMENT,
119 message="Data doesn't match hash",
120 ))
121 else:
122 try:
123 write_session = self.begin_write(digest)
124 write_session.write(data)
125 self.commit_write(digest, write_session)
126 except IOError as ex:
127 result.append(
128 Status(code=code_pb2.UNKNOWN, message=str(ex)))
129 else:
130 result.append(Status(code=code_pb2.OK))
131 return result
133 def bulk_read_blobs(self, digests):
134 """ Given an iterable container of digests, return a
135 {hash: file-like object} dictionary corresponding to the blobs
136 represented by the input digests.
138 Each file-like object must be readable. To use this with an index,
139 it must be seekable as well.
140 """
142 blobmap = {}
143 for digest in digests:
144 blob = self.get_blob(digest)
145 if blob is not None:
146 blobmap[digest.hash] = blob
147 return blobmap
149 def put_message(self, message):
150 """Store the given Protobuf message in CAS, returning its digest."""
151 message_blob = message.SerializeToString()
152 digest = Digest(hash=HASH(message_blob).hexdigest(),
153 size_bytes=len(message_blob))
154 session = self.begin_write(digest)
155 session.write(message_blob)
156 self.commit_write(digest, session)
157 return digest
159 def get_message(self, digest, message_type):
160 """Retrieve the Protobuf message with the given digest and type from
161 CAS. If the blob is not present, returns None.
162 """
163 message_blob = self.get_blob(digest)
164 if message_blob is None:
165 return None
166 result = message_type.FromString(message_blob.read())
167 message_blob.close()
168 return result
170 def is_cleanup_enabled(self):
171 return False
173 @property
174 def instance_name(self) -> Optional[str]:
175 if hasattr(self, '_instance_name'):
176 return self._instance_name
177 return None
179 def set_instance_name(self, instance_name: str) -> None:
180 # This method should always get called, so there's no benefit in
181 # adding an __init__ to this abstract class (therefore adding the
182 # need for subclasses to call `super()`) just to define a null
183 # value for this.
184 # pylint: disable=attribute-defined-outside-init
185 self._instance_name: Optional[str] = instance_name
187 def hash_type(self):
188 return get_hash_type()
190 def max_batch_total_size_bytes(self):
191 return MAX_REQUEST_SIZE
193 def symlink_absolute_path_strategy(self):
194 # Currently this strategy is hardcoded into BuildGrid
195 # With no setting to reference
196 return SymlinkAbsolutePathStrategy.DISALLOWED
198 def get_capabilities(self):
199 capabilities = CacheCapabilities()
200 capabilities.digest_function.extend([self.hash_type()])
201 capabilities.max_batch_total_size_bytes = self.max_batch_total_size_bytes()
202 capabilities.symlink_absolute_path_strategy = self.symlink_absolute_path_strategy()
203 return capabilities