Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/instance.py: 91.67%

228 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17Storage Instances 

18================= 

19Instances of CAS and ByteStream 

20""" 

21 

22from datetime import timedelta 

23from typing import Iterable, Iterator, List, Optional, Sequence, Set, Tuple 

24 

25from cachetools import TTLCache 

26from grpc import RpcError 

27 

28from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

30 BatchReadBlobsResponse, 

31 BatchUpdateBlobsRequest, 

32 BatchUpdateBlobsResponse, 

33 Digest, 

34 Directory, 

35 FindMissingBlobsResponse, 

36 GetTreeRequest, 

37 GetTreeResponse, 

38 Tree, 

39) 

40from buildgrid._protos.google.bytestream import bytestream_pb2 as bs_pb2 

41from buildgrid._protos.google.rpc import code_pb2, status_pb2 

42from buildgrid.server.cas.storage.storage_abc import StorageABC, create_write_session 

43from buildgrid.server.exceptions import ( 

44 IncompleteReadError, 

45 InvalidArgumentError, 

46 NotFoundError, 

47 OutOfRangeError, 

48 PermissionDeniedError, 

49 RetriableError, 

50) 

51from buildgrid.server.logging import buildgrid_logger 

52from buildgrid.server.metrics_names import METRIC 

53from buildgrid.server.metrics_utils import publish_counter_metric, publish_distribution_metric 

54from buildgrid.server.servicer import Instance 

55from buildgrid.server.settings import HASH, HASH_LENGTH, MAX_REQUEST_COUNT, MAX_REQUEST_SIZE, STREAM_ERROR_RETRY_PERIOD 

56from buildgrid.server.utils.digests import create_digest 

57 

58LOGGER = buildgrid_logger(__name__) 

59 

60EMPTY_BLOB = b"" 

61EMPTY_BLOB_DIGEST: Digest = create_digest(EMPTY_BLOB) 

62 

63 

64class ContentAddressableStorageInstance(Instance): 

65 SERVICE_NAME = RE_DESCRIPTOR.services_by_name["ContentAddressableStorage"].full_name 

66 

67 def __init__( 

68 self, 

69 storage: StorageABC, 

70 read_only: bool = False, 

71 tree_cache_size: Optional[int] = None, 

72 tree_cache_ttl_minutes: float = 60, 

73 ) -> None: 

74 self._storage = storage 

75 self.__read_only = read_only 

76 

77 self._tree_cache: Optional[TTLCache[Tuple[str, int], Digest]] = None 

78 if tree_cache_size: 

79 self._tree_cache = TTLCache(tree_cache_size, tree_cache_ttl_minutes * 60) 

80 

81 def start(self) -> None: 

82 self._storage.start() 

83 

84 def stop(self) -> None: 

85 self._storage.stop() 

86 LOGGER.info("Stopped CAS.") 

87 

88 def find_missing_blobs(self, blob_digests: Sequence[Digest]) -> FindMissingBlobsResponse: 

89 deduplicated_digests: List[Digest] = [] 

90 seen: Set[str] = set() 

91 for digest in blob_digests: 

92 if digest.hash in seen: 

93 continue 

94 seen.add(digest.hash) 

95 deduplicated_digests.append(digest) 

96 blob_digests = deduplicated_digests 

97 

98 missing_blobs = self._storage.missing_blobs(blob_digests) 

99 

100 num_blobs_in_request = len(blob_digests) 

101 if num_blobs_in_request > 0: 

102 num_blobs_missing = len(missing_blobs) 

103 percent_missing = float((num_blobs_missing / num_blobs_in_request) * 100) 

104 

105 publish_distribution_metric(METRIC.CAS.BLOBS_COUNT, num_blobs_in_request) 

106 publish_distribution_metric(METRIC.CAS.BLOBS_MISSING_COUNT, num_blobs_missing) 

107 publish_distribution_metric(METRIC.CAS.BLOBS_MISSING_PERCENT, percent_missing) 

108 

109 for digest in blob_digests: 

110 publish_distribution_metric(METRIC.CAS.BLOB_BYTES, digest.size_bytes) 

111 

112 return FindMissingBlobsResponse(missing_blob_digests=missing_blobs) 

113 

114 def batch_update_blobs(self, requests: Sequence[BatchUpdateBlobsRequest.Request]) -> BatchUpdateBlobsResponse: 

115 if self.__read_only: 

116 raise PermissionDeniedError("CAS is read-only") 

117 

118 if len(requests) > 0: 

119 publish_distribution_metric(METRIC.CAS.BLOBS_COUNT, len(requests)) 

120 

121 storage = self._storage 

122 store = [] 

123 seen: Set[str] = set() 

124 for request_proto in requests: 

125 if request_proto.digest.hash in seen: 

126 continue 

127 seen.add(request_proto.digest.hash) 

128 store.append((request_proto.digest, request_proto.data)) 

129 publish_distribution_metric(METRIC.CAS.BLOB_BYTES, request_proto.digest.size_bytes) 

130 

131 response = BatchUpdateBlobsResponse() 

132 statuses = storage.bulk_update_blobs(store) 

133 

134 for (digest, _), status in zip(store, statuses): 

135 response_proto = response.responses.add() 

136 response_proto.digest.CopyFrom(digest) 

137 response_proto.status.CopyFrom(status) 

138 

139 return response 

140 

141 def batch_read_blobs(self, digests: Sequence[Digest]) -> BatchReadBlobsResponse: 

142 storage = self._storage 

143 

144 if len(digests) > 0: 

145 publish_distribution_metric(METRIC.CAS.BLOBS_COUNT, len(digests)) 

146 

147 # Only process unique digests 

148 good_digests = [] 

149 bad_digests = [] 

150 seen: Set[str] = set() 

151 requested_bytes = 0 

152 for digest in digests: 

153 if digest.hash in seen: 

154 continue 

155 seen.add(digest.hash) 

156 

157 if len(digest.hash) != HASH_LENGTH: 

158 bad_digests.append(digest) 

159 else: 

160 good_digests.append(digest) 

161 requested_bytes += digest.size_bytes 

162 

163 if requested_bytes > MAX_REQUEST_SIZE: 

164 raise InvalidArgumentError( 

165 "Combined total size of blobs exceeds " 

166 "server limit. " 

167 f"({requested_bytes} > {MAX_REQUEST_SIZE} [byte])" 

168 ) 

169 

170 if len(good_digests) > 0: 

171 blobs_read = storage.bulk_read_blobs(good_digests) 

172 else: 

173 blobs_read = {} 

174 

175 response = BatchReadBlobsResponse() 

176 

177 for digest in good_digests: 

178 response_proto = response.responses.add() 

179 response_proto.digest.CopyFrom(digest) 

180 

181 if digest.hash in blobs_read and blobs_read[digest.hash] is not None: 

182 response_proto.data = blobs_read[digest.hash] 

183 status_code = code_pb2.OK 

184 

185 publish_distribution_metric(METRIC.CAS.BLOB_BYTES, digest.size_bytes) 

186 else: 

187 status_code = code_pb2.NOT_FOUND 

188 LOGGER.info("Blob not found from BatchReadBlobs.", tags=dict(digest=digest)) 

189 

190 response_proto.status.CopyFrom(status_pb2.Status(code=status_code)) 

191 

192 for digest in bad_digests: 

193 response_proto = response.responses.add() 

194 response_proto.digest.CopyFrom(digest) 

195 status_code = code_pb2.INVALID_ARGUMENT 

196 response_proto.status.CopyFrom(status_pb2.Status(code=status_code)) 

197 

198 return response 

199 

200 def lookup_tree_cache(self, root_digest: Digest) -> Optional[Tree]: 

201 """Find full Tree from cache""" 

202 if self._tree_cache is None: 

203 return None 

204 tree = None 

205 if response_digest := self._tree_cache.get((root_digest.hash, root_digest.size_bytes)): 

206 tree = self._storage.get_message(response_digest, Tree) 

207 if tree is None: 

208 self._tree_cache.pop((root_digest.hash, root_digest.size_bytes)) 

209 

210 publish_counter_metric(METRIC.CAS.TREE_CACHE_HIT_COUNT, 1 if tree else 0) 

211 return tree 

212 

213 def put_tree_cache(self, root_digest: Digest, root: Directory, children: Iterable[Directory]) -> None: 

214 """Put Tree with a full list of directories into CAS""" 

215 if self._tree_cache is None: 

216 return 

217 tree = Tree(root=root, children=children) 

218 tree_digest = self._storage.put_message(tree) 

219 self._tree_cache[(root_digest.hash, root_digest.size_bytes)] = tree_digest 

220 

221 def get_tree(self, request: GetTreeRequest) -> Iterator[GetTreeResponse]: 

222 storage = self._storage 

223 

224 if not request.page_size: 

225 request.page_size = MAX_REQUEST_COUNT 

226 

227 if tree := self.lookup_tree_cache(request.root_digest): 

228 # Cache hit, yield responses based on page size 

229 directories = [tree.root] 

230 directories.extend(tree.children) 

231 yield from ( 

232 GetTreeResponse(directories=directories[start : start + request.page_size]) 

233 for start in range(0, len(directories), request.page_size) 

234 ) 

235 return 

236 

237 results = [] 

238 response = GetTreeResponse() 

239 

240 for dir in storage.get_tree(request.root_digest): 

241 response.directories.append(dir) 

242 results.append(dir) 

243 if len(response.directories) >= request.page_size: 

244 yield response 

245 response.Clear() 

246 

247 if response.directories: 

248 yield response 

249 if results: 

250 self.put_tree_cache(request.root_digest, results[0], results[1:]) 

251 

252 

253class ByteStreamInstance(Instance): 

254 SERVICE_NAME = bs_pb2.DESCRIPTOR.services_by_name["ByteStream"].full_name 

255 

256 BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size 

257 

258 def __init__( 

259 self, 

260 storage: StorageABC, 

261 read_only: bool = False, 

262 disable_overwrite_early_return: bool = False, 

263 ) -> None: 

264 self._storage = storage 

265 self._query_activity_timeout = 30 

266 

267 self.__read_only = read_only 

268 

269 # If set, prevents `ByteStream.Write()` from returning without 

270 # reading all the client's `WriteRequests` for a digest that is 

271 # already in storage (i.e. not follow the REAPI-specified 

272 # behavior). 

273 self.__disable_overwrite_early_return = disable_overwrite_early_return 

274 # (Should only be used to work around issues with implementations 

275 # that treat the server half-closing its end of the gRPC stream 

276 # as a HTTP/2 stream error.) 

277 

278 def start(self) -> None: 

279 self._storage.start() 

280 

281 def stop(self) -> None: 

282 self._storage.stop() 

283 LOGGER.info("Stopped ByteStream.") 

284 

285 def read_cas_blob(self, digest: Digest, read_offset: int, read_limit: int) -> Iterator[bs_pb2.ReadResponse]: 

286 digest_str = f"'{digest.hash}/{digest.size_bytes}'" 

287 # Check the given read offset and limit. 

288 if read_offset < 0 or read_offset > digest.size_bytes: 

289 raise OutOfRangeError(f"Read offset out of range for {digest_str}: {read_offset=}") 

290 

291 if read_limit < 0: 

292 raise InvalidArgumentError(f"Read limit out of range for {digest_str}: {read_limit=}") 

293 

294 bytes_requested = digest.size_bytes - read_offset 

295 if read_limit: 

296 bytes_requested = min(read_limit, bytes_requested) 

297 

298 if bytes_requested == 0: 

299 yield bs_pb2.ReadResponse(data=b"") 

300 return 

301 

302 bytes_remaining = bytes_requested 

303 

304 # Read the blob from storage and send its contents to the client. 

305 result = self._storage.get_blob(digest) 

306 if result is None: 

307 raise NotFoundError(f"Blob not found for {digest_str}") 

308 

309 try: 

310 if read_offset > 0: 

311 result.seek(read_offset) 

312 

313 publish_distribution_metric(METRIC.CAS.BLOB_BYTES, digest.size_bytes) 

314 

315 # https://docs.python.org/3/library/io.html#io.RawIOBase.read 

316 # If 0 bytes are returned, and size was not 0, this indicates end of file. 

317 while block_data := result.read(min(self.BLOCK_SIZE, bytes_remaining)): 

318 bytes_remaining -= len(block_data) 

319 yield bs_pb2.ReadResponse(data=block_data) 

320 finally: 

321 result.close() 

322 

323 if bytes_remaining != 0: 

324 raise IncompleteReadError( 

325 f"Blob incomplete: {digest_str}, from Bytestream.Read. " 

326 f"Only read {bytes_requested - bytes_remaining} bytes out of " 

327 f"requested {bytes_requested} bytes. {read_offset=} {read_limit=}" 

328 ) 

329 

330 def write_cas_blob( 

331 self, digest_hash: str, digest_size: str, requests: Iterator[bs_pb2.WriteRequest] 

332 ) -> bs_pb2.WriteResponse: 

333 if self.__read_only: 

334 raise PermissionDeniedError("ByteStream is read-only") 

335 

336 if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit(): 

337 raise InvalidArgumentError(f"Invalid digest [{digest_hash}/{digest_size}]") 

338 

339 digest = Digest(hash=digest_hash, size_bytes=int(digest_size)) 

340 

341 publish_distribution_metric(METRIC.CAS.BLOB_BYTES, digest.size_bytes) 

342 

343 if self._storage.has_blob(digest): 

344 # According to the REAPI specification: 

345 # "When attempting an upload, if another client has already 

346 # completed the upload (which may occur in the middle of a single 

347 # upload if another client uploads the same blob concurrently), 

348 # the request will terminate immediately [...]". 

349 # 

350 # However, half-closing the stream can be problematic with some 

351 # intermediaries like HAProxy. 

352 # (https://github.com/haproxy/haproxy/issues/1219) 

353 # 

354 # If half-closing the stream is not allowed, we read and drop 

355 # all the client's messages before returning, still saving 

356 # the cost of a write to storage. 

357 if self.__disable_overwrite_early_return: 

358 try: 

359 for request in requests: 

360 if request.finish_write: 

361 break 

362 continue 

363 except RpcError: 

364 msg = "ByteStream client disconnected whilst streaming requests, upload cancelled." 

365 LOGGER.debug(msg) 

366 raise RetriableError(msg, retry_period=timedelta(seconds=STREAM_ERROR_RETRY_PERIOD)) 

367 

368 return bs_pb2.WriteResponse(committed_size=digest.size_bytes) 

369 

370 # Start the write session and write the first request's data. 

371 

372 with create_write_session(digest) as write_session: 

373 computed_hash = HASH() 

374 

375 # Handle subsequent write requests. 

376 bytes_count = 0 

377 try: 

378 for request in requests: 

379 write_session.write(request.data) 

380 

381 computed_hash.update(request.data) 

382 bytes_count += len(request.data) 

383 

384 if request.finish_write: 

385 break 

386 except RpcError: 

387 write_session.close() 

388 msg = "ByteStream client disconnected whilst streaming requests, upload cancelled." 

389 LOGGER.debug(msg) 

390 raise RetriableError(msg, retry_period=timedelta(seconds=STREAM_ERROR_RETRY_PERIOD)) 

391 

392 # Check that the data matches the provided digest. 

393 if bytes_count != digest.size_bytes: 

394 raise NotImplementedError( 

395 "Cannot close stream before finishing write, " 

396 f"got {bytes_count} bytes but expected {digest.size_bytes}" 

397 ) 

398 

399 if computed_hash.hexdigest() != digest.hash: 

400 raise InvalidArgumentError("Data does not match hash") 

401 

402 self._storage.commit_write(digest, write_session) 

403 return bs_pb2.WriteResponse(committed_size=bytes_count)