Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/service.py: 94.44%

108 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-21 15:45 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17CAS services 

18================== 

19 

20Implements the Content Addressable Storage API and ByteStream API. 

21""" 

22 

23import itertools 

24import re 

25from typing import Any, Iterator, cast 

26 

27import grpc 

28 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

31 BatchReadBlobsRequest, 

32 BatchReadBlobsResponse, 

33 BatchUpdateBlobsRequest, 

34 BatchUpdateBlobsResponse, 

35 Digest, 

36 FindMissingBlobsRequest, 

37 FindMissingBlobsResponse, 

38 GetTreeRequest, 

39 GetTreeResponse, 

40) 

41from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import ( 

42 ContentAddressableStorageServicer, 

43 add_ContentAddressableStorageServicer_to_server, 

44) 

45from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc 

46from buildgrid._protos.google.bytestream.bytestream_pb2 import ( 

47 QueryWriteStatusRequest, 

48 QueryWriteStatusResponse, 

49 ReadRequest, 

50 ReadResponse, 

51 WriteRequest, 

52 WriteResponse, 

53) 

54from buildgrid._protos.google.rpc import code_pb2, status_pb2 

55from buildgrid.server.cas.instance import ( 

56 EMPTY_BLOB, 

57 EMPTY_BLOB_DIGEST, 

58 ByteStreamInstance, 

59 ContentAddressableStorageInstance, 

60) 

61from buildgrid.server.decorators import rpc 

62from buildgrid.server.enums import ByteStreamResourceType 

63from buildgrid.server.exceptions import InvalidArgumentError 

64from buildgrid.server.logging import buildgrid_logger 

65from buildgrid.server.servicer import InstancedServicer 

66from buildgrid.server.settings import HASH_LENGTH 

67 

68LOGGER = buildgrid_logger(__name__) 

69 

70 

71def _printable_batch_update_blobs_request(request: BatchUpdateBlobsRequest) -> dict[str, Any]: 

72 # Log the digests but not the data 

73 return { 

74 "instance_name": request.instance_name, 

75 "digests": [r.digest for r in request.requests], 

76 } 

77 

78 

79class ContentAddressableStorageService( 

80 ContentAddressableStorageServicer, InstancedServicer[ContentAddressableStorageInstance] 

81): 

82 SERVICE_NAME = "ContentAddressableStorage" 

83 REGISTER_METHOD = add_ContentAddressableStorageServicer_to_server 

84 FULL_NAME = RE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name 

85 

86 @rpc(instance_getter=lambda r: cast(str, r.instance_name)) 

87 def FindMissingBlobs( 

88 self, request: FindMissingBlobsRequest, context: grpc.ServicerContext 

89 ) -> FindMissingBlobsResponse: 

90 # No need to find the empty blob in the cas because the empty blob cannot be missing 

91 digests_to_find = [digest for digest in request.blob_digests if digest != EMPTY_BLOB_DIGEST] 

92 return self.get_instance(request.instance_name).find_missing_blobs(digests_to_find) 

93 

94 @rpc(instance_getter=lambda r: cast(str, r.instance_name), request_formatter=_printable_batch_update_blobs_request) 

95 def BatchUpdateBlobs( 

96 self, request: BatchUpdateBlobsRequest, context: grpc.ServicerContext 

97 ) -> BatchUpdateBlobsResponse: 

98 return self.get_instance(request.instance_name).batch_update_blobs(request.requests) 

99 

100 @rpc(instance_getter=lambda r: cast(str, r.instance_name)) 

101 def BatchReadBlobs(self, request: BatchReadBlobsRequest, context: grpc.ServicerContext) -> BatchReadBlobsResponse: 

102 # No need to actually read the empty blob in the cas as it is always present 

103 digests_to_read = [digest for digest in request.digests if digest != EMPTY_BLOB_DIGEST] 

104 empty_digest_count = len(request.digests) - len(digests_to_read) 

105 

106 instance = self.get_instance(request.instance_name) 

107 response = instance.batch_read_blobs(digests_to_read) 

108 

109 # Append the empty blobs to the response 

110 for _ in range(empty_digest_count): 

111 response_proto = response.responses.add() 

112 response_proto.data = EMPTY_BLOB 

113 response_proto.digest.CopyFrom(EMPTY_BLOB_DIGEST) 

114 status_code = code_pb2.OK 

115 response_proto.status.CopyFrom(status_pb2.Status(code=status_code)) 

116 

117 return response 

118 

119 @rpc(instance_getter=lambda r: cast(str, r.instance_name)) 

120 def GetTree(self, request: GetTreeRequest, context: grpc.ServicerContext) -> Iterator[GetTreeResponse]: 

121 yield from self.get_instance(request.instance_name).get_tree(request) 

122 

123 

124class ResourceNameRegex: 

125 # CAS read name format: "{instance_name}/blobs/{hash}/{size}" 

126 READ = "^(.*?)/?(blobs/.*/[0-9]*)$" 

127 

128 # CAS write name format: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}[optional arbitrary extra content]" 

129 WRITE = "^(.*?)/?(uploads/.*/blobs/.*/[0-9]*)" 

130 

131 

132def _parse_resource_name(resource_name: str, regex: str) -> tuple[str, str, "ByteStreamResourceType"]: 

133 cas_match = re.match(regex, resource_name) 

134 if cas_match: 

135 return cas_match[1], cas_match[2], ByteStreamResourceType.CAS 

136 else: 

137 raise InvalidArgumentError(f"Invalid resource name: [{resource_name}]") 

138 

139 

140def _read_instance_name(resource_name: str) -> str: 

141 return _parse_resource_name(resource_name, ResourceNameRegex.READ)[0] 

142 

143 

144def _write_instance_name(resource_name: str) -> str: 

145 return _parse_resource_name(resource_name, ResourceNameRegex.WRITE)[0] 

146 

147 

148def _printable_write_request(request: WriteRequest) -> dict[str, Any]: 

149 # Log all the fields except `data`: 

150 return { 

151 "resource_name": request.resource_name, 

152 "write_offset": request.write_offset, 

153 "finish_write": request.finish_write, 

154 } 

155 

156 

157class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer, InstancedServicer[ByteStreamInstance]): 

158 SERVICE_NAME = "ByteStream" 

159 REGISTER_METHOD = bytestream_pb2_grpc.add_ByteStreamServicer_to_server 

160 FULL_NAME = bytestream_pb2.DESCRIPTOR.services_by_name[SERVICE_NAME].full_name 

161 

162 @rpc(instance_getter=lambda r: _read_instance_name(r.resource_name)) 

163 def Read(self, request: ReadRequest, context: grpc.ServicerContext) -> Iterator[ReadResponse]: 

164 _, resource_name, resource_type = _parse_resource_name(request.resource_name, ResourceNameRegex.READ) 

165 if resource_type == ByteStreamResourceType.CAS: 

166 blob_details = resource_name.split("/") 

167 if len(blob_details[1]) != HASH_LENGTH: 

168 raise InvalidArgumentError(f"Invalid digest [{resource_name}]") 

169 try: 

170 digest = Digest(hash=blob_details[1], size_bytes=int(blob_details[2])) 

171 except ValueError: 

172 raise InvalidArgumentError(f"Invalid digest [{resource_name}]") 

173 

174 bytes_returned = 0 

175 expected_bytes = digest.size_bytes - request.read_offset 

176 if request.read_limit: 

177 expected_bytes = min(expected_bytes, request.read_limit) 

178 

179 try: 

180 if digest.size_bytes == 0: 

181 if digest.hash != EMPTY_BLOB_DIGEST.hash: 

182 raise InvalidArgumentError(f"Invalid digest [{digest.hash}/{digest.size_bytes}]") 

183 yield bytestream_pb2.ReadResponse(data=EMPTY_BLOB) 

184 return 

185 

186 for blob in self.current_instance.read_cas_blob(digest, request.read_offset, request.read_limit): 

187 bytes_returned += len(blob.data) 

188 yield blob 

189 finally: 

190 if bytes_returned != expected_bytes: 

191 LOGGER.warning( 

192 "Read request exited early.", 

193 tags=dict( 

194 digest=digest, 

195 bytes_returned=bytes_returned, 

196 expected_bytes=expected_bytes, 

197 read_offset=request.read_offset, 

198 read_limit=request.read_limit, 

199 ), 

200 ) 

201 else: 

202 LOGGER.info("Read request completed.", tags=dict(digest=digest)) 

203 

204 @rpc(instance_getter=lambda r: _write_instance_name(r.resource_name), request_formatter=_printable_write_request) 

205 def Write(self, request_iterator: Iterator[WriteRequest], context: grpc.ServicerContext) -> WriteResponse: 

206 request = next(request_iterator) 

207 _, resource_name, resource_type = _parse_resource_name( 

208 request.resource_name, 

209 ResourceNameRegex.WRITE, 

210 ) 

211 if resource_type == ByteStreamResourceType.CAS: 

212 blob_details = resource_name.split("/") 

213 _, hash_, size_bytes = blob_details[1], blob_details[3], blob_details[4] 

214 write_response = self.current_instance.write_cas_blob( 

215 hash_, size_bytes, itertools.chain([request], request_iterator) 

216 ) 

217 if write_response.committed_size == int(size_bytes): 

218 LOGGER.info("Write request completed.", tags=dict(digest=f"{hash_}/{size_bytes}")) 

219 return write_response 

220 return bytestream_pb2.WriteResponse() 

221 

222 @rpc(instance_getter=lambda r: _write_instance_name(r.resource_name)) 

223 def QueryWriteStatus( 

224 self, request: QueryWriteStatusRequest, context: grpc.ServicerContext 

225 ) -> QueryWriteStatusResponse: 

226 context.abort(grpc.StatusCode.UNIMPLEMENTED, "Method not implemented!")