Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/instance.py: 91.67%
228 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17Storage Instances
18=================
19Instances of CAS and ByteStream
20"""
22from datetime import timedelta
23from typing import Iterable, Iterator, List, Optional, Sequence, Set, Tuple
25from cachetools import TTLCache
26from grpc import RpcError
28from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR
29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import (
30 BatchReadBlobsResponse,
31 BatchUpdateBlobsRequest,
32 BatchUpdateBlobsResponse,
33 Digest,
34 Directory,
35 FindMissingBlobsResponse,
36 GetTreeRequest,
37 GetTreeResponse,
38 Tree,
39)
40from buildgrid._protos.google.bytestream import bytestream_pb2 as bs_pb2
41from buildgrid._protos.google.rpc import code_pb2, status_pb2
42from buildgrid.server.cas.storage.storage_abc import StorageABC, create_write_session
43from buildgrid.server.exceptions import (
44 IncompleteReadError,
45 InvalidArgumentError,
46 NotFoundError,
47 OutOfRangeError,
48 PermissionDeniedError,
49 RetriableError,
50)
51from buildgrid.server.logging import buildgrid_logger
52from buildgrid.server.metrics_names import METRIC
53from buildgrid.server.metrics_utils import publish_counter_metric, publish_distribution_metric
54from buildgrid.server.servicer import Instance
55from buildgrid.server.settings import HASH, HASH_LENGTH, MAX_REQUEST_COUNT, MAX_REQUEST_SIZE, STREAM_ERROR_RETRY_PERIOD
56from buildgrid.server.utils.digests import create_digest
58LOGGER = buildgrid_logger(__name__)
60EMPTY_BLOB = b""
61EMPTY_BLOB_DIGEST: Digest = create_digest(EMPTY_BLOB)
64class ContentAddressableStorageInstance(Instance):
65 SERVICE_NAME = RE_DESCRIPTOR.services_by_name["ContentAddressableStorage"].full_name
67 def __init__(
68 self,
69 storage: StorageABC,
70 read_only: bool = False,
71 tree_cache_size: Optional[int] = None,
72 tree_cache_ttl_minutes: float = 60,
73 ) -> None:
74 self._storage = storage
75 self.__read_only = read_only
77 self._tree_cache: Optional[TTLCache[Tuple[str, int], Digest]] = None
78 if tree_cache_size:
79 self._tree_cache = TTLCache(tree_cache_size, tree_cache_ttl_minutes * 60)
81 def start(self) -> None:
82 self._storage.start()
84 def stop(self) -> None:
85 self._storage.stop()
86 LOGGER.info("Stopped CAS.")
88 def find_missing_blobs(self, blob_digests: Sequence[Digest]) -> FindMissingBlobsResponse:
89 deduplicated_digests: List[Digest] = []
90 seen: Set[str] = set()
91 for digest in blob_digests:
92 if digest.hash in seen:
93 continue
94 seen.add(digest.hash)
95 deduplicated_digests.append(digest)
96 blob_digests = deduplicated_digests
98 missing_blobs = self._storage.missing_blobs(blob_digests)
100 num_blobs_in_request = len(blob_digests)
101 if num_blobs_in_request > 0:
102 num_blobs_missing = len(missing_blobs)
103 percent_missing = float((num_blobs_missing / num_blobs_in_request) * 100)
105 publish_distribution_metric(METRIC.CAS.BLOBS_COUNT, num_blobs_in_request)
106 publish_distribution_metric(METRIC.CAS.BLOBS_MISSING_COUNT, num_blobs_missing)
107 publish_distribution_metric(METRIC.CAS.BLOBS_MISSING_PERCENT, percent_missing)
109 for digest in blob_digests:
110 publish_distribution_metric(METRIC.CAS.BLOB_BYTES, digest.size_bytes)
112 return FindMissingBlobsResponse(missing_blob_digests=missing_blobs)
114 def batch_update_blobs(self, requests: Sequence[BatchUpdateBlobsRequest.Request]) -> BatchUpdateBlobsResponse:
115 if self.__read_only:
116 raise PermissionDeniedError("CAS is read-only")
118 if len(requests) > 0:
119 publish_distribution_metric(METRIC.CAS.BLOBS_COUNT, len(requests))
121 storage = self._storage
122 store = []
123 seen: Set[str] = set()
124 for request_proto in requests:
125 if request_proto.digest.hash in seen:
126 continue
127 seen.add(request_proto.digest.hash)
128 store.append((request_proto.digest, request_proto.data))
129 publish_distribution_metric(METRIC.CAS.BLOB_BYTES, request_proto.digest.size_bytes)
131 response = BatchUpdateBlobsResponse()
132 statuses = storage.bulk_update_blobs(store)
134 for (digest, _), status in zip(store, statuses):
135 response_proto = response.responses.add()
136 response_proto.digest.CopyFrom(digest)
137 response_proto.status.CopyFrom(status)
139 return response
141 def batch_read_blobs(self, digests: Sequence[Digest]) -> BatchReadBlobsResponse:
142 storage = self._storage
144 if len(digests) > 0:
145 publish_distribution_metric(METRIC.CAS.BLOBS_COUNT, len(digests))
147 # Only process unique digests
148 good_digests = []
149 bad_digests = []
150 seen: Set[str] = set()
151 requested_bytes = 0
152 for digest in digests:
153 if digest.hash in seen:
154 continue
155 seen.add(digest.hash)
157 if len(digest.hash) != HASH_LENGTH:
158 bad_digests.append(digest)
159 else:
160 good_digests.append(digest)
161 requested_bytes += digest.size_bytes
163 if requested_bytes > MAX_REQUEST_SIZE:
164 raise InvalidArgumentError(
165 "Combined total size of blobs exceeds "
166 "server limit. "
167 f"({requested_bytes} > {MAX_REQUEST_SIZE} [byte])"
168 )
170 if len(good_digests) > 0:
171 blobs_read = storage.bulk_read_blobs(good_digests)
172 else:
173 blobs_read = {}
175 response = BatchReadBlobsResponse()
177 for digest in good_digests:
178 response_proto = response.responses.add()
179 response_proto.digest.CopyFrom(digest)
181 if digest.hash in blobs_read and blobs_read[digest.hash] is not None:
182 response_proto.data = blobs_read[digest.hash]
183 status_code = code_pb2.OK
185 publish_distribution_metric(METRIC.CAS.BLOB_BYTES, digest.size_bytes)
186 else:
187 status_code = code_pb2.NOT_FOUND
188 LOGGER.info("Blob not found from BatchReadBlobs.", tags=dict(digest=digest))
190 response_proto.status.CopyFrom(status_pb2.Status(code=status_code))
192 for digest in bad_digests:
193 response_proto = response.responses.add()
194 response_proto.digest.CopyFrom(digest)
195 status_code = code_pb2.INVALID_ARGUMENT
196 response_proto.status.CopyFrom(status_pb2.Status(code=status_code))
198 return response
200 def lookup_tree_cache(self, root_digest: Digest) -> Optional[Tree]:
201 """Find full Tree from cache"""
202 if self._tree_cache is None:
203 return None
204 tree = None
205 if response_digest := self._tree_cache.get((root_digest.hash, root_digest.size_bytes)):
206 tree = self._storage.get_message(response_digest, Tree)
207 if tree is None:
208 self._tree_cache.pop((root_digest.hash, root_digest.size_bytes))
210 publish_counter_metric(METRIC.CAS.TREE_CACHE_HIT_COUNT, 1 if tree else 0)
211 return tree
213 def put_tree_cache(self, root_digest: Digest, root: Directory, children: Iterable[Directory]) -> None:
214 """Put Tree with a full list of directories into CAS"""
215 if self._tree_cache is None:
216 return
217 tree = Tree(root=root, children=children)
218 tree_digest = self._storage.put_message(tree)
219 self._tree_cache[(root_digest.hash, root_digest.size_bytes)] = tree_digest
221 def get_tree(self, request: GetTreeRequest) -> Iterator[GetTreeResponse]:
222 storage = self._storage
224 if not request.page_size:
225 request.page_size = MAX_REQUEST_COUNT
227 if tree := self.lookup_tree_cache(request.root_digest):
228 # Cache hit, yield responses based on page size
229 directories = [tree.root]
230 directories.extend(tree.children)
231 yield from (
232 GetTreeResponse(directories=directories[start : start + request.page_size])
233 for start in range(0, len(directories), request.page_size)
234 )
235 return
237 results = []
238 response = GetTreeResponse()
240 for dir in storage.get_tree(request.root_digest):
241 response.directories.append(dir)
242 results.append(dir)
243 if len(response.directories) >= request.page_size:
244 yield response
245 response.Clear()
247 if response.directories:
248 yield response
249 if results:
250 self.put_tree_cache(request.root_digest, results[0], results[1:])
253class ByteStreamInstance(Instance):
254 SERVICE_NAME = bs_pb2.DESCRIPTOR.services_by_name["ByteStream"].full_name
256 BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
258 def __init__(
259 self,
260 storage: StorageABC,
261 read_only: bool = False,
262 disable_overwrite_early_return: bool = False,
263 ) -> None:
264 self._storage = storage
265 self._query_activity_timeout = 30
267 self.__read_only = read_only
269 # If set, prevents `ByteStream.Write()` from returning without
270 # reading all the client's `WriteRequests` for a digest that is
271 # already in storage (i.e. not follow the REAPI-specified
272 # behavior).
273 self.__disable_overwrite_early_return = disable_overwrite_early_return
274 # (Should only be used to work around issues with implementations
275 # that treat the server half-closing its end of the gRPC stream
276 # as a HTTP/2 stream error.)
278 def start(self) -> None:
279 self._storage.start()
281 def stop(self) -> None:
282 self._storage.stop()
283 LOGGER.info("Stopped ByteStream.")
285 def read_cas_blob(self, digest: Digest, read_offset: int, read_limit: int) -> Iterator[bs_pb2.ReadResponse]:
286 digest_str = f"'{digest.hash}/{digest.size_bytes}'"
287 # Check the given read offset and limit.
288 if read_offset < 0 or read_offset > digest.size_bytes:
289 raise OutOfRangeError(f"Read offset out of range for {digest_str}: {read_offset=}")
291 if read_limit < 0:
292 raise InvalidArgumentError(f"Read limit out of range for {digest_str}: {read_limit=}")
294 bytes_requested = digest.size_bytes - read_offset
295 if read_limit:
296 bytes_requested = min(read_limit, bytes_requested)
298 if bytes_requested == 0:
299 yield bs_pb2.ReadResponse(data=b"")
300 return
302 bytes_remaining = bytes_requested
304 # Read the blob from storage and send its contents to the client.
305 result = self._storage.get_blob(digest)
306 if result is None:
307 raise NotFoundError(f"Blob not found for {digest_str}")
309 try:
310 if read_offset > 0:
311 result.seek(read_offset)
313 publish_distribution_metric(METRIC.CAS.BLOB_BYTES, digest.size_bytes)
315 # https://docs.python.org/3/library/io.html#io.RawIOBase.read
316 # If 0 bytes are returned, and size was not 0, this indicates end of file.
317 while block_data := result.read(min(self.BLOCK_SIZE, bytes_remaining)):
318 bytes_remaining -= len(block_data)
319 yield bs_pb2.ReadResponse(data=block_data)
320 finally:
321 result.close()
323 if bytes_remaining != 0:
324 raise IncompleteReadError(
325 f"Blob incomplete: {digest_str}, from Bytestream.Read. "
326 f"Only read {bytes_requested - bytes_remaining} bytes out of "
327 f"requested {bytes_requested} bytes. {read_offset=} {read_limit=}"
328 )
330 def write_cas_blob(
331 self, digest_hash: str, digest_size: str, requests: Iterator[bs_pb2.WriteRequest]
332 ) -> bs_pb2.WriteResponse:
333 if self.__read_only:
334 raise PermissionDeniedError("ByteStream is read-only")
336 if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit():
337 raise InvalidArgumentError(f"Invalid digest [{digest_hash}/{digest_size}]")
339 digest = Digest(hash=digest_hash, size_bytes=int(digest_size))
341 publish_distribution_metric(METRIC.CAS.BLOB_BYTES, digest.size_bytes)
343 if self._storage.has_blob(digest):
344 # According to the REAPI specification:
345 # "When attempting an upload, if another client has already
346 # completed the upload (which may occur in the middle of a single
347 # upload if another client uploads the same blob concurrently),
348 # the request will terminate immediately [...]".
349 #
350 # However, half-closing the stream can be problematic with some
351 # intermediaries like HAProxy.
352 # (https://github.com/haproxy/haproxy/issues/1219)
353 #
354 # If half-closing the stream is not allowed, we read and drop
355 # all the client's messages before returning, still saving
356 # the cost of a write to storage.
357 if self.__disable_overwrite_early_return:
358 try:
359 for request in requests:
360 if request.finish_write:
361 break
362 continue
363 except RpcError:
364 msg = "ByteStream client disconnected whilst streaming requests, upload cancelled."
365 LOGGER.debug(msg)
366 raise RetriableError(msg, retry_period=timedelta(seconds=STREAM_ERROR_RETRY_PERIOD))
368 return bs_pb2.WriteResponse(committed_size=digest.size_bytes)
370 # Start the write session and write the first request's data.
372 with create_write_session(digest) as write_session:
373 computed_hash = HASH()
375 # Handle subsequent write requests.
376 bytes_count = 0
377 try:
378 for request in requests:
379 write_session.write(request.data)
381 computed_hash.update(request.data)
382 bytes_count += len(request.data)
384 if request.finish_write:
385 break
386 except RpcError:
387 write_session.close()
388 msg = "ByteStream client disconnected whilst streaming requests, upload cancelled."
389 LOGGER.debug(msg)
390 raise RetriableError(msg, retry_period=timedelta(seconds=STREAM_ERROR_RETRY_PERIOD))
392 # Check that the data matches the provided digest.
393 if bytes_count != digest.size_bytes:
394 raise NotImplementedError(
395 "Cannot close stream before finishing write, "
396 f"got {bytes_count} bytes but expected {digest.size_bytes}"
397 )
399 if computed_hash.hexdigest() != digest.hash:
400 raise InvalidArgumentError("Data does not match hash")
402 self._storage.commit_write(digest, write_session)
403 return bs_pb2.WriteResponse(committed_size=bytes_count)