Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/service.py: 94.44%
108 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-05-21 15:45 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17CAS services
18==================
20Implements the Content Addressable Storage API and ByteStream API.
21"""
23import itertools
24import re
25from typing import Any, Iterator, cast
27import grpc
29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR
30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import (
31 BatchReadBlobsRequest,
32 BatchReadBlobsResponse,
33 BatchUpdateBlobsRequest,
34 BatchUpdateBlobsResponse,
35 Digest,
36 FindMissingBlobsRequest,
37 FindMissingBlobsResponse,
38 GetTreeRequest,
39 GetTreeResponse,
40)
41from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import (
42 ContentAddressableStorageServicer,
43 add_ContentAddressableStorageServicer_to_server,
44)
45from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
46from buildgrid._protos.google.bytestream.bytestream_pb2 import (
47 QueryWriteStatusRequest,
48 QueryWriteStatusResponse,
49 ReadRequest,
50 ReadResponse,
51 WriteRequest,
52 WriteResponse,
53)
54from buildgrid._protos.google.rpc import code_pb2, status_pb2
55from buildgrid.server.cas.instance import (
56 EMPTY_BLOB,
57 EMPTY_BLOB_DIGEST,
58 ByteStreamInstance,
59 ContentAddressableStorageInstance,
60)
61from buildgrid.server.decorators import rpc
62from buildgrid.server.enums import ByteStreamResourceType
63from buildgrid.server.exceptions import InvalidArgumentError
64from buildgrid.server.logging import buildgrid_logger
65from buildgrid.server.servicer import InstancedServicer
66from buildgrid.server.settings import HASH_LENGTH
68LOGGER = buildgrid_logger(__name__)
71def _printable_batch_update_blobs_request(request: BatchUpdateBlobsRequest) -> dict[str, Any]:
72 # Log the digests but not the data
73 return {
74 "instance_name": request.instance_name,
75 "digests": [r.digest for r in request.requests],
76 }
79class ContentAddressableStorageService(
80 ContentAddressableStorageServicer, InstancedServicer[ContentAddressableStorageInstance]
81):
82 SERVICE_NAME = "ContentAddressableStorage"
83 REGISTER_METHOD = add_ContentAddressableStorageServicer_to_server
84 FULL_NAME = RE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name
86 @rpc(instance_getter=lambda r: cast(str, r.instance_name))
87 def FindMissingBlobs(
88 self, request: FindMissingBlobsRequest, context: grpc.ServicerContext
89 ) -> FindMissingBlobsResponse:
90 # No need to find the empty blob in the cas because the empty blob cannot be missing
91 digests_to_find = [digest for digest in request.blob_digests if digest != EMPTY_BLOB_DIGEST]
92 return self.get_instance(request.instance_name).find_missing_blobs(digests_to_find)
94 @rpc(instance_getter=lambda r: cast(str, r.instance_name), request_formatter=_printable_batch_update_blobs_request)
95 def BatchUpdateBlobs(
96 self, request: BatchUpdateBlobsRequest, context: grpc.ServicerContext
97 ) -> BatchUpdateBlobsResponse:
98 return self.get_instance(request.instance_name).batch_update_blobs(request.requests)
100 @rpc(instance_getter=lambda r: cast(str, r.instance_name))
101 def BatchReadBlobs(self, request: BatchReadBlobsRequest, context: grpc.ServicerContext) -> BatchReadBlobsResponse:
102 # No need to actually read the empty blob in the cas as it is always present
103 digests_to_read = [digest for digest in request.digests if digest != EMPTY_BLOB_DIGEST]
104 empty_digest_count = len(request.digests) - len(digests_to_read)
106 instance = self.get_instance(request.instance_name)
107 response = instance.batch_read_blobs(digests_to_read)
109 # Append the empty blobs to the response
110 for _ in range(empty_digest_count):
111 response_proto = response.responses.add()
112 response_proto.data = EMPTY_BLOB
113 response_proto.digest.CopyFrom(EMPTY_BLOB_DIGEST)
114 status_code = code_pb2.OK
115 response_proto.status.CopyFrom(status_pb2.Status(code=status_code))
117 return response
119 @rpc(instance_getter=lambda r: cast(str, r.instance_name))
120 def GetTree(self, request: GetTreeRequest, context: grpc.ServicerContext) -> Iterator[GetTreeResponse]:
121 yield from self.get_instance(request.instance_name).get_tree(request)
124class ResourceNameRegex:
125 # CAS read name format: "{instance_name}/blobs/{hash}/{size}"
126 READ = "^(.*?)/?(blobs/.*/[0-9]*)$"
128 # CAS write name format: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}[optional arbitrary extra content]"
129 WRITE = "^(.*?)/?(uploads/.*/blobs/.*/[0-9]*)"
132def _parse_resource_name(resource_name: str, regex: str) -> tuple[str, str, "ByteStreamResourceType"]:
133 cas_match = re.match(regex, resource_name)
134 if cas_match:
135 return cas_match[1], cas_match[2], ByteStreamResourceType.CAS
136 else:
137 raise InvalidArgumentError(f"Invalid resource name: [{resource_name}]")
140def _read_instance_name(resource_name: str) -> str:
141 return _parse_resource_name(resource_name, ResourceNameRegex.READ)[0]
144def _write_instance_name(resource_name: str) -> str:
145 return _parse_resource_name(resource_name, ResourceNameRegex.WRITE)[0]
148def _printable_write_request(request: WriteRequest) -> dict[str, Any]:
149 # Log all the fields except `data`:
150 return {
151 "resource_name": request.resource_name,
152 "write_offset": request.write_offset,
153 "finish_write": request.finish_write,
154 }
157class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer, InstancedServicer[ByteStreamInstance]):
158 SERVICE_NAME = "ByteStream"
159 REGISTER_METHOD = bytestream_pb2_grpc.add_ByteStreamServicer_to_server
160 FULL_NAME = bytestream_pb2.DESCRIPTOR.services_by_name[SERVICE_NAME].full_name
162 @rpc(instance_getter=lambda r: _read_instance_name(r.resource_name))
163 def Read(self, request: ReadRequest, context: grpc.ServicerContext) -> Iterator[ReadResponse]:
164 _, resource_name, resource_type = _parse_resource_name(request.resource_name, ResourceNameRegex.READ)
165 if resource_type == ByteStreamResourceType.CAS:
166 blob_details = resource_name.split("/")
167 if len(blob_details[1]) != HASH_LENGTH:
168 raise InvalidArgumentError(f"Invalid digest [{resource_name}]")
169 try:
170 digest = Digest(hash=blob_details[1], size_bytes=int(blob_details[2]))
171 except ValueError:
172 raise InvalidArgumentError(f"Invalid digest [{resource_name}]")
174 bytes_returned = 0
175 expected_bytes = digest.size_bytes - request.read_offset
176 if request.read_limit:
177 expected_bytes = min(expected_bytes, request.read_limit)
179 try:
180 if digest.size_bytes == 0:
181 if digest.hash != EMPTY_BLOB_DIGEST.hash:
182 raise InvalidArgumentError(f"Invalid digest [{digest.hash}/{digest.size_bytes}]")
183 yield bytestream_pb2.ReadResponse(data=EMPTY_BLOB)
184 return
186 for blob in self.current_instance.read_cas_blob(digest, request.read_offset, request.read_limit):
187 bytes_returned += len(blob.data)
188 yield blob
189 finally:
190 if bytes_returned != expected_bytes:
191 LOGGER.warning(
192 "Read request exited early.",
193 tags=dict(
194 digest=digest,
195 bytes_returned=bytes_returned,
196 expected_bytes=expected_bytes,
197 read_offset=request.read_offset,
198 read_limit=request.read_limit,
199 ),
200 )
201 else:
202 LOGGER.info("Read request completed.", tags=dict(digest=digest))
204 @rpc(instance_getter=lambda r: _write_instance_name(r.resource_name), request_formatter=_printable_write_request)
205 def Write(self, request_iterator: Iterator[WriteRequest], context: grpc.ServicerContext) -> WriteResponse:
206 request = next(request_iterator)
207 _, resource_name, resource_type = _parse_resource_name(
208 request.resource_name,
209 ResourceNameRegex.WRITE,
210 )
211 if resource_type == ByteStreamResourceType.CAS:
212 blob_details = resource_name.split("/")
213 _, hash_, size_bytes = blob_details[1], blob_details[3], blob_details[4]
214 write_response = self.current_instance.write_cas_blob(
215 hash_, size_bytes, itertools.chain([request], request_iterator)
216 )
217 if write_response.committed_size == int(size_bytes):
218 LOGGER.info("Write request completed.", tags=dict(digest=f"{hash_}/{size_bytes}"))
219 return write_response
220 return bytestream_pb2.WriteResponse()
222 @rpc(instance_getter=lambda r: _write_instance_name(r.resource_name))
223 def QueryWriteStatus(
224 self, request: QueryWriteStatusRequest, context: grpc.ServicerContext
225 ) -> QueryWriteStatusResponse:
226 context.abort(grpc.StatusCode.UNIMPLEMENTED, "Method not implemented!")