Coverage for /builds/BuildGrid/buildgrid/buildgrid/client/cas.py: 87.15%

459 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import os 

17import uuid 

18from collections import namedtuple 

19from contextlib import contextmanager 

20from io import SEEK_END, BytesIO 

21from typing import IO, BinaryIO, Dict, Iterator, List, Optional, Set, Tuple, TypeVar 

22 

23import grpc 

24from google.protobuf.message import Message 

25 

26import buildgrid.server.context as context_module 

27from buildgrid._exceptions import NotFoundError 

28from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

29 BatchReadBlobsRequest, 

30 BatchUpdateBlobsRequest, 

31 Digest, 

32 Directory, 

33 DirectoryNode, 

34 GetTreeRequest, 

35 Tree, 

36) 

37from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import ContentAddressableStorageStub 

38from buildgrid._protos.google.bytestream.bytestream_pb2 import ReadRequest, WriteRequest 

39from buildgrid._protos.google.bytestream.bytestream_pb2_grpc import ByteStreamStub 

40from buildgrid._protos.google.rpc import code_pb2 

41from buildgrid._types import MessageType 

42from buildgrid.client.capabilities import CapabilitiesInterface 

43from buildgrid.client.retrier import GrpcRetrier 

44from buildgrid.settings import BATCH_REQUEST_SIZE_THRESHOLD, HASH, MAX_REQUEST_COUNT, MAX_REQUEST_SIZE 

45from buildgrid.utils import create_digest_from_file, merkle_tree_maker 

46 

47_FileRequest = namedtuple("_FileRequest", ["digest", "output_paths"]) 

48 

49 

50class _CallCache: 

51 """Per remote grpc.StatusCode.UNIMPLEMENTED call cache.""" 

52 

53 __calls: Dict[grpc.Channel, Set[str]] = {} 

54 

55 @classmethod 

56 def mark_unimplemented(cls, channel: grpc.Channel, name: str) -> None: 

57 if channel not in cls.__calls: 

58 cls.__calls[channel] = set() 

59 cls.__calls[channel].add(name) 

60 

61 @classmethod 

62 def unimplemented(cls, channel: grpc.Channel, name: str) -> bool: 

63 if channel not in cls.__calls: 

64 return False 

65 return name in cls.__calls[channel] 

66 

67 

68class _CasBatchRequestSizesCache: 

69 """Cache that stores, for each remote, the limit of bytes that can 

70 be transferred using batches as well as a threshold that determines 

71 when a file can be fetched as part of a batch request. 

72 """ 

73 

74 __cas_max_batch_transfer_size: Dict[grpc.Channel, Dict[str, int]] = {} 

75 __cas_batch_request_size_threshold: Dict[grpc.Channel, Dict[str, int]] = {} 

76 

77 @classmethod 

78 def max_effective_batch_size_bytes(cls, channel: grpc.Channel, instance_name: str) -> int: 

79 """Returns the maximum number of bytes that can be transferred 

80 using batch methods for the given remote. 

81 """ 

82 if channel not in cls.__cas_max_batch_transfer_size: 

83 cls.__cas_max_batch_transfer_size[channel] = {} 

84 

85 if instance_name not in cls.__cas_max_batch_transfer_size[channel]: 

86 max_batch_size = cls._get_server_max_batch_total_size_bytes(channel, instance_name) 

87 

88 cls.__cas_max_batch_transfer_size[channel][instance_name] = max_batch_size 

89 

90 return cls.__cas_max_batch_transfer_size[channel][instance_name] 

91 

92 @classmethod 

93 def batch_request_size_threshold(cls, channel: grpc.Channel, instance_name: str) -> int: 

94 if channel not in cls.__cas_batch_request_size_threshold: 

95 cls.__cas_batch_request_size_threshold[channel] = {} 

96 

97 if instance_name not in cls.__cas_batch_request_size_threshold[channel]: 

98 # Computing the threshold: 

99 max_batch_size = cls.max_effective_batch_size_bytes(channel, instance_name) 

100 threshold = int(BATCH_REQUEST_SIZE_THRESHOLD * max_batch_size) 

101 

102 cls.__cas_batch_request_size_threshold[channel][instance_name] = threshold 

103 

104 return cls.__cas_batch_request_size_threshold[channel][instance_name] 

105 

106 @classmethod 

107 def _get_server_max_batch_total_size_bytes(cls, channel: grpc.Channel, instance_name: str) -> int: 

108 """Returns the maximum number of bytes that can be effectively 

109 transferred using batches, considering the limits imposed by 

110 the server's configuration and by gRPC. 

111 """ 

112 try: 

113 capabilities_interface = CapabilitiesInterface(channel) 

114 server_capabilities = capabilities_interface.get_capabilities(instance_name) 

115 

116 cache_capabilities = server_capabilities.cache_capabilities 

117 

118 max_batch_total_size = cache_capabilities.max_batch_total_size_bytes 

119 # The server could set this value to 0 (no limit set). 

120 if max_batch_total_size: 

121 return min(max_batch_total_size, MAX_REQUEST_SIZE) 

122 except ConnectionError: 

123 pass 

124 

125 return MAX_REQUEST_SIZE 

126 

127 

128T = TypeVar("T", bound=MessageType) 

129 

130 

131class Downloader: 

132 """Remote CAS files, directories and messages download helper. 

133 

134 The :class:`Downloader` class comes with a generator factory function that 

135 can be used together with the `with` statement for context management:: 

136 

137 from buildgrid.client.cas import download 

138 

139 with download(channel, instance='build') as downloader: 

140 downloader.get_message(message_digest) 

141 """ 

142 

143 def __init__( 

144 self, 

145 channel: grpc.Channel, 

146 instance: Optional[str] = None, 

147 retries: int = 0, 

148 max_backoff: int = 64, 

149 should_backoff: bool = True, 

150 ): 

151 """Initializes a new :class:`Downloader` instance. 

152 

153 Args: 

154 channel (grpc.Channel): A gRPC channel to the CAS endpoint. 

155 instance (str, optional): the targeted instance's name. 

156 """ 

157 self.channel = channel 

158 

159 self.instance_name = instance or "" 

160 

161 self._grpc_retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

162 

163 self.__bytestream_stub: Optional[ByteStreamStub] = ByteStreamStub(self.channel) 

164 self.__cas_stub: Optional[ContentAddressableStorageStub] = ContentAddressableStorageStub(self.channel) 

165 

166 self.__file_requests: Dict[str, _FileRequest] = {} 

167 self.__file_request_count = 0 

168 self.__file_request_size = 0 

169 self.__file_response_size = 0 

170 

171 # --- Public API --- 

172 

173 def get_blob(self, digest: Digest) -> Optional[bytearray]: 

174 """Retrieves a blob from the remote CAS server. 

175 

176 Args: 

177 digest (:obj:`Digest`): the blob's digest to fetch. 

178 

179 Returns: 

180 bytearray: the fetched blob data or None if not found. 

181 """ 

182 try: 

183 blob = self._grpc_retrier.retry(self._fetch_blob, digest) 

184 except NotFoundError: 

185 return None 

186 

187 return blob 

188 

189 def get_blobs(self, digests: List[Digest]) -> List[bytes]: 

190 """Retrieves a list of blobs from the remote CAS server. 

191 

192 Args: 

193 digests (list): list of :obj:`Digest`s for the blobs to fetch. 

194 

195 Returns: 

196 list: the fetched blob data list. 

197 

198 Raises: 

199 NotFoundError: if a blob is not present in the remote CAS server. 

200 """ 

201 # _fetch_blob_batch returns (data, digest) pairs. 

202 # We only want the data. 

203 return [result[0] for result in self._grpc_retrier.retry(self._fetch_blob_batch, digests)] 

204 

205 def get_available_blobs(self, digests: List[Digest]) -> List[Tuple[bytes, Digest]]: 

206 """Retrieves a list of blobs from the remote CAS server. 

207 

208 Skips blobs not found on the server without raising an error. 

209 

210 Args: 

211 digests (list): list of :obj:`Digest`s for the blobs to fetch. 

212 

213 Returns: 

214 list: the fetched blob data list as (data, digest) pairs 

215 """ 

216 return self._grpc_retrier.retry(self._fetch_blob_batch, digests, skip_unavailable=True) 

217 

218 def get_message(self, digest: Digest, message: T) -> T: 

219 """Retrieves a :obj:`Message` from the remote CAS server. 

220 

221 Args: 

222 digest (:obj:`Digest`): the message's digest to fetch. 

223 message (:obj:`Message`): an empty message to fill. 

224 

225 Returns: 

226 :obj:`Message`: `message` filled or emptied if not found. 

227 """ 

228 try: 

229 message_blob = self._grpc_retrier.retry(self._fetch_blob, digest) 

230 except NotFoundError: 

231 message_blob = None 

232 

233 if message_blob is not None: 

234 message.ParseFromString(bytes(message_blob)) 

235 else: 

236 message.Clear() 

237 

238 return message 

239 

240 def get_messages(self, digests: List[Digest], messages: List[Message]) -> List[Message]: 

241 """Retrieves a list of :obj:`Message`s from the remote CAS server. 

242 

243 Note: 

244 The `digests` and `messages` list **must** contain the same number 

245 of elements. 

246 

247 Args: 

248 digests (list): list of :obj:`Digest`s for the messages to fetch. 

249 messages (list): list of empty :obj:`Message`s to fill. 

250 

251 Returns: 

252 list: the fetched and filled message list. 

253 """ 

254 assert len(digests) == len(messages) 

255 

256 # The individual empty messages might be of differing types, so we need 

257 # to set up a mapping 

258 digest_message_map = {digest.hash: message for (digest, message) in zip(digests, messages)} 

259 

260 batch_response = self._grpc_retrier.retry(self._fetch_blob_batch, digests) 

261 

262 messages = [] 

263 for message_blob, message_digest in batch_response: 

264 message = digest_message_map[message_digest.hash] 

265 message.ParseFromString(message_blob) 

266 messages.append(message) 

267 

268 return messages 

269 

270 def download_file(self, digest: Digest, file_path: str, is_executable: bool = False, queue: bool = True) -> None: 

271 """Retrieves a file from the remote CAS server. 

272 

273 If queuing is allowed (`queue=True`), the download request **may** be 

274 defer. An explicit call to :func:`~flush` can force the request to be 

275 send immediately (along with the rest of the queued batch). 

276 

277 Args: 

278 digest (:obj:`Digest`): the file's digest to fetch. 

279 file_path (str): absolute or relative path to the local file to write. 

280 is_executable (bool): whether the file is executable or not. 

281 queue (bool, optional): whether or not the download request may be 

282 queued and submitted as part of a batch upload request. Defaults 

283 to True. 

284 

285 Raises: 

286 NotFoundError: if `digest` is not present in the remote CAS server. 

287 OSError: if `file_path` does not exist or is not readable. 

288 """ 

289 if not os.path.isabs(file_path): 

290 file_path = os.path.abspath(file_path) 

291 

292 if not queue or digest.size_bytes > self._queueable_file_size_threshold(): 

293 self._grpc_retrier.retry(self._fetch_file, digest, file_path, is_executable) 

294 else: 

295 self._queue_file(digest, file_path, is_executable=is_executable) 

296 

297 def download_directory(self, digest: Digest, directory_path: str) -> None: 

298 """Retrieves a :obj:`Directory` from the remote CAS server. 

299 

300 Args: 

301 digest (:obj:`Digest`): the directory's digest to fetch. 

302 directory_path (str): the path to download to 

303 

304 Raises: 

305 NotFoundError: if `digest` is not present in the remote CAS server. 

306 FileExistsError: if `directory_path` already contains parts of their 

307 fetched directory's content. 

308 """ 

309 if not os.path.isabs(directory_path): 

310 directory_path = os.path.abspath(directory_path) 

311 

312 self._grpc_retrier.retry(self._fetch_directory, digest, directory_path) 

313 

314 def flush(self) -> None: 

315 """Ensures any queued request gets sent.""" 

316 if self.__file_requests: 

317 self._grpc_retrier.retry(self._fetch_file_batch, self.__file_requests) 

318 

319 self.__file_requests.clear() 

320 self.__file_request_count = 0 

321 self.__file_request_size = 0 

322 self.__file_response_size = 0 

323 

324 def close(self) -> None: 

325 """Closes the underlying connection stubs. 

326 

327 Note: 

328 This will always send pending requests before closing connections, 

329 if any. 

330 """ 

331 self.flush() 

332 

333 self.__bytestream_stub = None 

334 self.__cas_stub = None 

335 

336 # --- Private API --- 

337 

338 def _fetch_blob(self, digest: Digest) -> bytearray: 

339 """Fetches a blob using ByteStream.Read()""" 

340 

341 assert self.__bytestream_stub, "Downloader used after closing" 

342 

343 if self.instance_name: 

344 resource_name = "/".join([self.instance_name, "blobs", digest.hash, str(digest.size_bytes)]) 

345 else: 

346 resource_name = "/".join(["blobs", digest.hash, str(digest.size_bytes)]) 

347 

348 read_blob = bytearray() 

349 read_request = ReadRequest() 

350 read_request.resource_name = resource_name 

351 read_request.read_offset = 0 

352 

353 for read_response in self.__bytestream_stub.Read(read_request, metadata=context_module.metadata_list()): 

354 read_blob += read_response.data 

355 

356 assert len(read_blob) == digest.size_bytes 

357 return read_blob 

358 

359 def _fetch_blob_batch( 

360 self, digests: List[Digest], *, skip_unavailable: bool = False 

361 ) -> List[Tuple[bytes, Digest]]: 

362 """Fetches blobs using ContentAddressableStorage.BatchReadBlobs() 

363 Returns (data, digest) pairs""" 

364 

365 assert self.__cas_stub, "Downloader used after closing" 

366 

367 batch_fetched = False 

368 read_blobs = [] 

369 

370 # First, try BatchReadBlobs(), if not already known not being implemented: 

371 if not _CallCache.unimplemented(self.channel, "BatchReadBlobs"): 

372 batch_request = BatchReadBlobsRequest() 

373 batch_request.digests.extend(digests) 

374 if self.instance_name is not None: 

375 batch_request.instance_name = self.instance_name 

376 

377 try: 

378 batch_response = self.__cas_stub.BatchReadBlobs(batch_request, metadata=context_module.metadata_list()) 

379 

380 for response in batch_response.responses: 

381 assert response.digest in digests 

382 

383 if response.status.code == code_pb2.OK: 

384 read_blobs.append((response.data, response.digest)) 

385 elif response.status.code == code_pb2.NOT_FOUND: 

386 if not skip_unavailable: 

387 raise NotFoundError("Requested blob does not exist on the remote.") 

388 else: 

389 raise ConnectionError("Error in CAS reply while fetching blob.") 

390 

391 batch_fetched = True 

392 

393 except grpc.RpcError as e: 

394 status_code = e.code() 

395 if status_code == grpc.StatusCode.UNIMPLEMENTED: 

396 _CallCache.mark_unimplemented(self.channel, "BatchReadBlobs") 

397 elif status_code == grpc.StatusCode.INVALID_ARGUMENT: 

398 read_blobs.clear() 

399 else: 

400 raise 

401 

402 # Fallback to Read() if no BatchReadBlobs(): 

403 if not batch_fetched: 

404 for digest in digests: 

405 blob = self._grpc_retrier.retry(self._fetch_blob, digest) 

406 read_blobs.append((blob, digest)) 

407 

408 return read_blobs 

409 

410 def _fetch_file(self, digest: Digest, file_path: str, is_executable: bool = False) -> None: 

411 """Fetches a file using ByteStream.Read()""" 

412 

413 assert self.__bytestream_stub, "Downloader used after closing" 

414 

415 if self.instance_name: 

416 resource_name = "/".join([self.instance_name, "blobs", digest.hash, str(digest.size_bytes)]) 

417 else: 

418 resource_name = "/".join(["blobs", digest.hash, str(digest.size_bytes)]) 

419 

420 os.makedirs(os.path.dirname(file_path), exist_ok=True) 

421 

422 read_request = ReadRequest() 

423 read_request.resource_name = resource_name 

424 read_request.read_offset = 0 

425 

426 with open(file_path, "wb") as byte_file: 

427 for read_response in self.__bytestream_stub.Read(read_request, metadata=context_module.metadata_list()): 

428 byte_file.write(read_response.data) 

429 

430 assert byte_file.tell() == digest.size_bytes 

431 

432 if is_executable: 

433 os.chmod(file_path, 0o755) # rwxr-xr-x / 755 

434 

435 def _queue_file(self, digest: Digest, file_path: str, is_executable: bool = False) -> None: 

436 """Queues a file for later batch download""" 

437 batch_size_limit = self._max_effective_batch_size_bytes() 

438 

439 if self.__file_request_size + digest.ByteSize() > batch_size_limit: 

440 self.flush() 

441 elif self.__file_response_size + digest.size_bytes > batch_size_limit: 

442 self.flush() 

443 elif self.__file_request_count >= MAX_REQUEST_COUNT: 

444 self.flush() 

445 

446 output_path = (file_path, is_executable) 

447 

448 # When queueing a file we take into account the cases where 

449 # we might want to download the same digest to different paths. 

450 if digest.hash not in self.__file_requests: 

451 request = _FileRequest(digest=digest, output_paths=[output_path]) 

452 self.__file_requests[digest.hash] = request 

453 

454 self.__file_request_count += 1 

455 self.__file_request_size += digest.ByteSize() 

456 self.__file_response_size += digest.size_bytes 

457 else: 

458 # We already have that hash queued; we'll fetch the blob 

459 # once and write copies of it: 

460 self.__file_requests[digest.hash].output_paths.append(output_path) 

461 

462 def _fetch_file_batch(self, requests: Dict[str, _FileRequest]) -> None: 

463 """Sends queued data using ContentAddressableStorage.BatchReadBlobs(). 

464 

465 Takes a dictionary (digest.hash, _FileRequest) as input. 

466 """ 

467 batch_digests = [request.digest for request in requests.values()] 

468 batch_responses = self._fetch_blob_batch(batch_digests) 

469 

470 for file_blob, file_digest in batch_responses: 

471 output_paths = requests[file_digest.hash].output_paths 

472 

473 for file_path, is_executable in output_paths: 

474 os.makedirs(os.path.dirname(file_path), exist_ok=True) 

475 

476 with open(file_path, "wb") as byte_file: 

477 byte_file.write(file_blob) 

478 

479 if is_executable: 

480 os.chmod(file_path, 0o755) # rwxr-xr-x / 755 

481 

482 def _fetch_directory(self, digest: Digest, directory_path: str) -> None: 

483 """Fetches a file using ByteStream.GetTree()""" 

484 # Better fail early if the local root path cannot be created: 

485 

486 assert self.__cas_stub, "Downloader used after closing" 

487 

488 os.makedirs(directory_path, exist_ok=True) 

489 

490 directories = {} 

491 directory_fetched = False 

492 # First, try GetTree() if not known to be unimplemented yet: 

493 if not _CallCache.unimplemented(self.channel, "GetTree"): 

494 tree_request = GetTreeRequest() 

495 tree_request.root_digest.CopyFrom(digest) 

496 tree_request.page_size = MAX_REQUEST_COUNT 

497 if self.instance_name is not None: 

498 tree_request.instance_name = self.instance_name 

499 

500 try: 

501 for tree_response in self.__cas_stub.GetTree(tree_request): 

502 for directory in tree_response.directories: 

503 directory_blob = directory.SerializeToString() 

504 directory_hash = HASH(directory_blob).hexdigest() 

505 

506 directories[directory_hash] = directory 

507 

508 assert digest.hash in directories 

509 

510 directory = directories[digest.hash] 

511 self._write_directory(directory, directory_path, directories=directories) 

512 

513 directory_fetched = True 

514 except grpc.RpcError as e: 

515 status_code = e.code() 

516 if status_code == grpc.StatusCode.UNIMPLEMENTED: 

517 _CallCache.mark_unimplemented(self.channel, "GetTree") 

518 

519 else: 

520 raise 

521 

522 # If no GetTree(), _write_directory() will use BatchReadBlobs() 

523 # if available or Read() if not. 

524 if not directory_fetched: 

525 directory = Directory() 

526 directory.ParseFromString(self._grpc_retrier.retry(self._fetch_blob, digest)) 

527 

528 self._write_directory(directory, directory_path) 

529 

530 def _write_directory( 

531 self, root_directory: Directory, root_path: str, directories: Optional[Dict[str, Directory]] = None 

532 ) -> None: 

533 """Generates a local directory structure""" 

534 os.makedirs(root_path, exist_ok=True) 

535 self._write_directory_recursively(root_directory, root_path, directories=None) 

536 

537 def _write_directory_recursively( 

538 self, root_directory: Directory, root_path: str, directories: Optional[Dict[str, Directory]] = None 

539 ) -> None: 

540 """Generate local directory recursively""" 

541 # i) Files: 

542 for file_node in root_directory.files: 

543 file_path = os.path.join(root_path, file_node.name) 

544 

545 if os.path.lexists(file_path): 

546 raise FileExistsError(f"'{file_path}' already exists") 

547 

548 self.download_file(file_node.digest, file_path, is_executable=file_node.is_executable) 

549 self.flush() 

550 

551 # ii) Directories: 

552 pending_directory_digests = [] 

553 pending_directory_paths = {} 

554 for directory_node in root_directory.directories: 

555 directory_hash = directory_node.digest.hash 

556 

557 # FIXME: Guard against ../ 

558 directory_path = os.path.join(root_path, directory_node.name) 

559 os.mkdir(directory_path) 

560 

561 if directories and directory_node.digest.hash in directories: 

562 # We already have the directory; just write it: 

563 directory = directories[directory_hash] 

564 

565 self._write_directory_recursively(directory, directory_path, directories=directories) 

566 else: 

567 # Gather all the directories that we need to get to 

568 # try fetching them in a single batch request: 

569 if directory_hash not in pending_directory_paths: 

570 pending_directory_digests.append(directory_node.digest) 

571 pending_directory_paths[directory_hash] = [directory_path] 

572 else: 

573 pending_directory_paths[directory_hash].append(directory_path) 

574 

575 if pending_directory_paths: 

576 fetched_blobs = self._grpc_retrier.retry(self._fetch_blob_batch, pending_directory_digests) 

577 

578 for directory_blob, directory_digest in fetched_blobs: 

579 directory = Directory() 

580 directory.ParseFromString(directory_blob) 

581 

582 # Assuming that the server might not return the blobs in 

583 # the same order than they were asked for, we read 

584 # the hashes of the returned blobs: 

585 # Guarantees for the reply orderings might change in 

586 # the specification at some point. 

587 # See: github.com/bazelbuild/remote-apis/issues/52 

588 

589 for directory_path in pending_directory_paths[directory_digest.hash]: 

590 self._write_directory(directory, directory_path, directories=directories) 

591 

592 # iii) Symlinks: 

593 for symlink_node in root_directory.symlinks: 

594 symlink_path = os.path.join(root_path, symlink_node.name) 

595 os.symlink(symlink_node.target, symlink_path) 

596 

597 def _max_effective_batch_size_bytes(self) -> int: 

598 """Returns the effective maximum number of bytes that can be 

599 transferred using batches, considering gRPC maximum message size. 

600 """ 

601 return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel, self.instance_name) 

602 

603 def _queueable_file_size_threshold(self) -> int: 

604 """Returns the size limit up until which files can be queued to 

605 be requested in a batch. 

606 """ 

607 return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel, self.instance_name) 

608 

609 

610@contextmanager 

611def download( 

612 channel: grpc.Channel, 

613 instance: Optional[str] = None, 

614 u_uid: Optional[str] = None, 

615 retries: int = 0, 

616 max_backoff: int = 64, 

617 should_backoff: bool = True, 

618) -> Iterator[Downloader]: 

619 """Context manager generator for the :class:`Downloader` class.""" 

620 downloader = Downloader( 

621 channel, instance=instance, retries=retries, max_backoff=max_backoff, should_backoff=should_backoff 

622 ) 

623 try: 

624 yield downloader 

625 finally: 

626 downloader.close() 

627 

628 

629class Uploader: 

630 """Remote CAS files, directories and messages upload helper. 

631 

632 The :class:`Uploader` class comes with a generator factory function that can 

633 be used together with the `with` statement for context management:: 

634 

635 from buildgrid.client.cas import upload 

636 

637 with upload(channel, instance='build') as uploader: 

638 uploader.upload_file('/path/to/local/file') 

639 """ 

640 

641 def __init__( 

642 self, 

643 channel: grpc.Channel, 

644 instance: Optional[str] = None, 

645 u_uid: Optional[str] = None, 

646 retries: int = 0, 

647 max_backoff: int = 64, 

648 should_backoff: bool = True, 

649 ): 

650 """Initializes a new :class:`Uploader` instance. 

651 

652 Args: 

653 channel (grpc.Channel): A gRPC channel to the CAS endpoint. 

654 instance (str, optional): the targeted instance's name. 

655 u_uid (str, optional): a UUID for CAS transactions. 

656 """ 

657 self.channel = channel 

658 

659 self.instance_name = instance or "" 

660 if u_uid is not None: 

661 self.u_uid = u_uid 

662 else: 

663 self.u_uid = str(uuid.uuid4()) 

664 

665 self._grpc_retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

666 

667 self.__bytestream_stub: Optional[ByteStreamStub] = ByteStreamStub(self.channel) 

668 self.__cas_stub: Optional[ContentAddressableStorageStub] = ContentAddressableStorageStub(self.channel) 

669 

670 self.__requests: Dict[str, Tuple[bytes, Digest]] = {} 

671 self.__request_count = 0 

672 self.__request_size = 0 

673 

674 # --- Public API --- 

675 

676 def put_blob( 

677 self, blob: IO[bytes], digest: Optional[Digest] = None, queue: bool = False, length: Optional[int] = None 

678 ) -> Digest: 

679 """Stores a blob into the remote CAS server. 

680 

681 If queuing is allowed (`queue=True`), the upload request **may** be 

682 defer. An explicit call to :func:`~flush` can force the request to be 

683 send immediately (along with the rest of the queued batch). 

684 

685 The caller should set at least one of ``digest`` or ``length`` to 

686 allow the uploader to skip determining the size of the blob using 

687 multiple seeks. 

688 

689 Args: 

690 blob (IO[bytes]): a file-like object containing the blob. 

691 digest (:obj:`Digest`, optional): the blob's digest. 

692 queue (bool, optional): whether or not the upload request may be 

693 queued and submitted as part of a batch upload request. Defaults 

694 to False. 

695 length (int, optional): The size of the blob, in bytes. If ``digest`` 

696 is also set, this is ignored in favour of ``digest.size_bytes``. 

697 

698 Returns: 

699 :obj:`Digest`: the sent blob's digest. 

700 """ 

701 if digest is not None: 

702 length = digest.size_bytes 

703 elif length is None: 

704 # If neither the digest or the length were set, fall back to 

705 # seeking to the end of the object to find the length 

706 blob.seek(0, SEEK_END) 

707 length = blob.tell() 

708 blob.seek(0) 

709 

710 if not queue or length > self._queueable_file_size_threshold(): 

711 blob_digest = self._grpc_retrier.retry(self._send_blob, blob, digest) 

712 else: 

713 blob_digest = self._queue_blob(blob.read(), digest=digest) 

714 

715 return blob_digest 

716 

717 def put_message(self, message: Message, digest: Optional[Digest] = None, queue: bool = False) -> Digest: 

718 """Stores a message into the remote CAS server. 

719 

720 If queuing is allowed (`queue=True`), the upload request **may** be 

721 defer. An explicit call to :func:`~flush` can force the request to be 

722 send immediately (along with the rest of the queued batch). 

723 

724 Args: 

725 message (:obj:`Message`): the message object. 

726 digest (:obj:`Digest`, optional): the message's digest. 

727 queue (bool, optional): whether or not the upload request may be 

728 queued and submitted as part of a batch upload request. Defaults 

729 to False. 

730 

731 Returns: 

732 :obj:`Digest`: the sent message's digest. 

733 """ 

734 message_blob = message.SerializeToString() 

735 

736 if not queue or len(message_blob) > self._queueable_file_size_threshold(): 

737 message_digest = self._grpc_retrier.retry(self._send_blob, BytesIO(message_blob), digest) 

738 else: 

739 message_digest = self._queue_blob(message_blob, digest=digest) 

740 

741 return message_digest 

742 

743 def upload_file(self, file_path: str, queue: bool = True) -> Digest: 

744 """Stores a local file into the remote CAS storage. 

745 

746 If queuing is allowed (`queue=True`), the upload request **may** be 

747 defer. An explicit call to :func:`~flush` can force the request to be 

748 send immediately (allong with the rest of the queued batch). 

749 

750 Args: 

751 file_path (str): absolute or relative path to a local file. 

752 queue (bool, optional): whether or not the upload request may be 

753 queued and submitted as part of a batch upload request. Defaults 

754 to True. 

755 

756 Returns: 

757 :obj:`Digest`: The digest of the file's content. 

758 

759 Raises: 

760 FileNotFoundError: If `file_path` does not exist. 

761 PermissionError: If `file_path` is not readable. 

762 """ 

763 if not os.path.isabs(file_path): 

764 file_path = os.path.abspath(file_path) 

765 

766 with open(file_path, "rb") as file_object: 

767 if not queue or os.path.getsize(file_path) > self._queueable_file_size_threshold(): 

768 file_digest = self._grpc_retrier.retry(self._send_blob, file_object) 

769 else: 

770 file_digest = self._queue_blob(file_object.read()) 

771 

772 return file_digest 

773 

774 def upload_directory(self, directory_path: str, queue: bool = True) -> Digest: 

775 """Stores a local folder into the remote CAS storage. 

776 

777 If queuing is allowed (`queue=True`), the upload request **may** be 

778 defer. An explicit call to :func:`~flush` can force the request to be 

779 send immediately (allong with the rest of the queued batch). 

780 

781 Args: 

782 directory_path (str): absolute or relative path to a local folder. 

783 queue (bool, optional): wheter or not the upload requests may be 

784 queued and submitted as part of a batch upload request. Defaults 

785 to True. 

786 

787 Returns: 

788 :obj:`Digest`: The digest of the top :obj:`Directory`. 

789 

790 Raises: 

791 FileNotFoundError: If `directory_path` does not exist. 

792 PermissionError: If `directory_path` is not readable. 

793 """ 

794 if not os.path.isabs(directory_path): 

795 directory_path = os.path.abspath(directory_path) 

796 

797 if not queue: 

798 for node, blob, _ in merkle_tree_maker(directory_path): 

799 if node.DESCRIPTOR is DirectoryNode.DESCRIPTOR: 

800 last_directory_node = node 

801 

802 self._grpc_retrier.retry(self._send_blob, blob, node.digest) 

803 

804 else: 

805 for node, blob, _ in merkle_tree_maker(directory_path): 

806 if node.DESCRIPTOR is DirectoryNode.DESCRIPTOR: 

807 last_directory_node = node 

808 

809 if node.digest.size_bytes > self._queueable_file_size_threshold(): 

810 self._grpc_retrier.retry(self._send_blob, blob, node.digest) 

811 else: 

812 self._queue_blob(blob.read(), digest=node.digest) 

813 

814 return last_directory_node.digest 

815 

816 def upload_tree(self, directory_path: str, queue: bool = True) -> Digest: 

817 """Stores a local folder into the remote CAS storage as a :obj:`Tree`. 

818 

819 If queuing is allowed (`queue=True`), the upload request **may** be 

820 defer. An explicit call to :func:`~flush` can force the request to be 

821 send immediately (allong with the rest of the queued batch). 

822 

823 Args: 

824 directory_path (str): absolute or relative path to a local folder. 

825 queue (bool, optional): wheter or not the upload requests may be 

826 queued and submitted as part of a batch upload request. Defaults 

827 to True. 

828 

829 Returns: 

830 :obj:`Digest`: The digest of the :obj:`Tree`. 

831 

832 Raises: 

833 FileNotFoundError: If `directory_path` does not exist. 

834 PermissionError: If `directory_path` is not readable. 

835 """ 

836 if not os.path.isabs(directory_path): 

837 directory_path = os.path.abspath(directory_path) 

838 

839 directories = [] 

840 

841 if not queue: 

842 for node, blob, _ in merkle_tree_maker(directory_path): 

843 if node.DESCRIPTOR is DirectoryNode.DESCRIPTOR: 

844 # TODO: Get the Directory object from merkle_tree_maker(): 

845 directory = Directory() 

846 directory.ParseFromString(blob.read()) 

847 directories.append(directory) 

848 

849 self._grpc_retrier.retry(self._send_blob, blob, node.digest) 

850 

851 else: 

852 for node, blob, _ in merkle_tree_maker(directory_path): 

853 if node.DESCRIPTOR is DirectoryNode.DESCRIPTOR: 

854 # TODO: Get the Directory object from merkle_tree_maker(): 

855 directory = Directory() 

856 directory.ParseFromString(blob.read()) 

857 directories.append(directory) 

858 

859 if node.digest.size_bytes > self._queueable_file_size_threshold(): 

860 self._grpc_retrier.retry(self._send_blob, blob, node.digest) 

861 else: 

862 self._queue_blob(blob.read(), digest=node.digest) 

863 

864 tree = Tree() 

865 tree.root.CopyFrom(directories[-1]) 

866 tree.children.extend(directories[:-1]) 

867 

868 return self.put_message(tree, queue=queue) 

869 

870 def flush(self) -> None: 

871 """Ensures any queued request gets sent.""" 

872 if self.__requests: 

873 self._grpc_retrier.retry(self._send_blob_batch, self.__requests) 

874 

875 self.__requests.clear() 

876 self.__request_count = 0 

877 self.__request_size = 0 

878 

879 def close(self) -> None: 

880 """Closes the underlying connection stubs. 

881 

882 Note: 

883 This will always send pending requests before closing connections, 

884 if any. 

885 """ 

886 self.flush() 

887 

888 self.__bytestream_stub = None 

889 self.__cas_stub = None 

890 

891 # --- Private API --- 

892 

893 def _send_blob(self, blob: BinaryIO, digest: Optional[Digest] = None) -> Digest: 

894 """Sends a memory block using ByteStream.Write()""" 

895 

896 assert self.__bytestream_stub, "Uploader used after closing" 

897 

898 blob.seek(0) 

899 blob_digest = Digest() 

900 if digest is not None: 

901 blob_digest.CopyFrom(digest) 

902 else: 

903 blob_digest = create_digest_from_file(blob) 

904 

905 if self.instance_name: 

906 resource_name = "/".join( 

907 [self.instance_name, "uploads", self.u_uid, "blobs", blob_digest.hash, str(blob_digest.size_bytes)] 

908 ) 

909 else: 

910 resource_name = "/".join(["uploads", self.u_uid, "blobs", blob_digest.hash, str(blob_digest.size_bytes)]) 

911 

912 def __write_request_stream(resource: str, content: BinaryIO) -> Iterator[WriteRequest]: 

913 offset = 0 

914 finished = False 

915 remaining = blob_digest.size_bytes - offset 

916 while not finished: 

917 chunk_size = min(remaining, MAX_REQUEST_SIZE) 

918 remaining -= chunk_size 

919 

920 request = WriteRequest() 

921 request.resource_name = resource 

922 request.data = content.read(chunk_size) 

923 request.write_offset = offset 

924 request.finish_write = remaining <= 0 

925 

926 yield request 

927 

928 offset += chunk_size 

929 finished = request.finish_write 

930 

931 write_requests = __write_request_stream(resource_name, blob) 

932 

933 write_response = self.__bytestream_stub.Write(write_requests, metadata=context_module.metadata_list()) 

934 

935 assert write_response.committed_size == blob_digest.size_bytes 

936 

937 return blob_digest 

938 

939 def _queue_blob(self, blob: bytes, digest: Optional[Digest] = None) -> Digest: 

940 """Queues a memory block for later batch upload""" 

941 blob_digest = Digest() 

942 if digest is not None: 

943 blob_digest.CopyFrom(digest) 

944 else: 

945 blob_digest.hash = HASH(blob).hexdigest() 

946 blob_digest.size_bytes = len(blob) 

947 

948 # If we are here queueing a file we know that its size is 

949 # smaller than gRPC's message size limit. 

950 # We'll make a single batch request as big as the server allows. 

951 batch_size_limit = self._max_effective_batch_size_bytes() 

952 

953 if self.__request_size + blob_digest.size_bytes > batch_size_limit: 

954 self.flush() 

955 elif self.__request_count >= MAX_REQUEST_COUNT: 

956 self.flush() 

957 

958 self.__requests[blob_digest.hash] = (blob, blob_digest) 

959 self.__request_count += 1 

960 self.__request_size += blob_digest.size_bytes 

961 

962 return blob_digest 

963 

964 def _send_blob_batch(self, batch: Dict[str, Tuple[bytes, Digest]]) -> List[Digest]: 

965 """Sends queued data using ContentAddressableStorage.BatchUpdateBlobs()""" 

966 

967 assert self.__cas_stub, "Uploader used after closing" 

968 

969 batch_fetched = False 

970 written_digests = [] 

971 

972 # First, try BatchUpdateBlobs(), if not already known not being implemented: 

973 if not _CallCache.unimplemented(self.channel, "BatchUpdateBlobs"): 

974 batch_request = BatchUpdateBlobsRequest() 

975 if self.instance_name is not None: 

976 batch_request.instance_name = self.instance_name 

977 

978 for blob, digest in batch.values(): 

979 request = batch_request.requests.add() 

980 request.digest.CopyFrom(digest) 

981 request.data = blob 

982 

983 try: 

984 batch_response = self.__cas_stub.BatchUpdateBlobs( 

985 batch_request, metadata=context_module.metadata_list() 

986 ) 

987 

988 for response in batch_response.responses: 

989 assert response.digest.hash in batch 

990 

991 written_digests.append(response.digest) 

992 if response.status.code != code_pb2.OK: 

993 response.digest.Clear() 

994 

995 batch_fetched = True 

996 

997 except grpc.RpcError as e: 

998 status_code = e.code() 

999 if status_code == grpc.StatusCode.UNIMPLEMENTED: 

1000 _CallCache.mark_unimplemented(self.channel, "BatchUpdateBlobs") 

1001 

1002 elif status_code == grpc.StatusCode.INVALID_ARGUMENT: 

1003 written_digests.clear() 

1004 batch_fetched = False 

1005 

1006 else: 

1007 raise 

1008 

1009 # Fallback to Write() if no BatchUpdateBlobs(): 

1010 if not batch_fetched: 

1011 for blob, digest in batch.values(): 

1012 written_digests.append(self._send_blob(BytesIO(blob))) 

1013 

1014 return written_digests 

1015 

1016 def _max_effective_batch_size_bytes(self) -> int: 

1017 """Returns the effective maximum number of bytes that can be 

1018 transferred using batches, considering gRPC maximum message size. 

1019 """ 

1020 return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel, self.instance_name) 

1021 

1022 def _queueable_file_size_threshold(self) -> int: 

1023 """Returns the size limit up until which files can be queued to 

1024 be requested in a batch. 

1025 """ 

1026 return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel, self.instance_name) 

1027 

1028 

1029@contextmanager 

1030def upload( 

1031 channel: grpc.Channel, 

1032 instance: Optional[str] = None, 

1033 u_uid: Optional[str] = None, 

1034 retries: int = 0, 

1035 max_backoff: int = 64, 

1036 should_backoff: bool = True, 

1037) -> Iterator[Uploader]: 

1038 """Context manager generator for the :class:`Uploader` class.""" 

1039 uploader = Uploader( 

1040 channel, 

1041 instance=instance, 

1042 u_uid=u_uid, 

1043 retries=retries, 

1044 max_backoff=max_backoff, 

1045 should_backoff=should_backoff, 

1046 ) 

1047 try: 

1048 yield uploader 

1049 finally: 

1050 uploader.close()