Coverage for /builds/BuildGrid/buildgrid/buildgrid/client/cas.py: 84.33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

453 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15# pylint: disable=anomalous-backslash-in-string 

16 

17 

18from collections import namedtuple 

19from contextlib import contextmanager 

20import os 

21import random 

22import time 

23from typing import Dict, List, Optional, Set, Tuple 

24import uuid 

25 

26import grpc 

27from google.protobuf.message import Message 

28 

29from buildgrid._exceptions import NotFoundError 

30from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc 

31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

32 Digest, 

33 Directory 

34) 

35from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc 

36from buildgrid._protos.google.rpc import code_pb2 

37from buildgrid.client.capabilities import CapabilitiesInterface 

38from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT, BATCH_REQUEST_SIZE_THRESHOLD 

39from buildgrid.utils import merkle_tree_maker 

40 

41 

42_FileRequest = namedtuple('_FileRequest', ['digest', 'output_paths']) 

43 

44 

45class _CallCache: 

46 """Per remote grpc.StatusCode.UNIMPLEMENTED call cache.""" 

47 __calls: Dict[grpc.Channel, Set[str]] = {} 

48 

49 @classmethod 

50 def mark_unimplemented(cls, channel, name): 

51 if channel not in cls.__calls: 

52 cls.__calls[channel] = set() 

53 cls.__calls[channel].add(name) 

54 

55 @classmethod 

56 def unimplemented(cls, channel, name): 

57 if channel not in cls.__calls: 

58 return False 

59 return name in cls.__calls[channel] 

60 

61 

62class _CasBatchRequestSizesCache: 

63 """Cache that stores, for each remote, the limit of bytes that can 

64 be transferred using batches as well as a threshold that determines 

65 when a file can be fetched as part of a batch request. 

66 """ 

67 __cas_max_batch_transfer_size: Dict[grpc.Channel, Dict[str, int]] = {} 

68 __cas_batch_request_size_threshold: Dict[grpc.Channel, Dict[str, int]] = {} 

69 

70 @classmethod 

71 def max_effective_batch_size_bytes(cls, channel: grpc.Channel, instance_name: str) -> int: 

72 """Returns the maximum number of bytes that can be transferred 

73 using batch methods for the given remote. 

74 """ 

75 if channel not in cls.__cas_max_batch_transfer_size: 

76 cls.__cas_max_batch_transfer_size[channel] = {} 

77 

78 if instance_name not in cls.__cas_max_batch_transfer_size[channel]: 

79 max_batch_size = cls._get_server_max_batch_total_size_bytes(channel, 

80 instance_name) 

81 

82 cls.__cas_max_batch_transfer_size[channel][instance_name] = max_batch_size 

83 

84 return cls.__cas_max_batch_transfer_size[channel][instance_name] 

85 

86 @classmethod 

87 def batch_request_size_threshold(cls, channel: grpc.Channel, instance_name: str) -> int: 

88 if channel not in cls.__cas_batch_request_size_threshold: 

89 cls.__cas_batch_request_size_threshold[channel] = {} 

90 

91 if instance_name not in cls.__cas_batch_request_size_threshold[channel]: 

92 # Computing the threshold: 

93 max_batch_size = cls.max_effective_batch_size_bytes(channel, 

94 instance_name) 

95 threshold = int(BATCH_REQUEST_SIZE_THRESHOLD * max_batch_size) 

96 

97 cls.__cas_batch_request_size_threshold[channel][instance_name] = threshold 

98 

99 return cls.__cas_batch_request_size_threshold[channel][instance_name] 

100 

101 @classmethod 

102 def _get_server_max_batch_total_size_bytes(cls, channel: grpc.Channel, instance_name: str) -> int: 

103 """Returns the maximum number of bytes that can be effectively 

104 transferred using batches, considering the limits imposed by 

105 the server's configuration and by gRPC. 

106 """ 

107 try: 

108 capabilities_interface = CapabilitiesInterface(channel) 

109 server_capabilities = capabilities_interface.get_capabilities(instance_name) 

110 

111 cache_capabilities = server_capabilities.cache_capabilities 

112 

113 max_batch_total_size = cache_capabilities.max_batch_total_size_bytes 

114 # The server could set this value to 0 (no limit set). 

115 if max_batch_total_size: 

116 return min(max_batch_total_size, MAX_REQUEST_SIZE) 

117 except ConnectionError: 

118 pass 

119 

120 return MAX_REQUEST_SIZE 

121 

122 

123class GrpcRetrier: 

124 def __init__(self, retries: int, max_backoff: int = 64, should_backoff: bool = True): 

125 """Initializes a new :class:`GrpcRetrier`. 

126 

127 Args: 

128 retries (int): The maximum number of attempts for each RPC call. 

129 max_backoff (int): The maximum time to wait between retries. 

130 should_backoff (bool): Whether to backoff at all. Always set this to True except in tests. 

131 """ 

132 

133 self._retries = retries 

134 self._max_backoff = max_backoff 

135 self._should_backoff = should_backoff 

136 

137 def retry(self, func, *args): 

138 attempts = 0 

139 while attempts <= self._retries: 

140 try: 

141 return func(*args) 

142 except grpc.RpcError as e: 

143 status_code = e.code() 

144 

145 # Retry only on UNAVAILABLE and ABORTED 

146 if status_code in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.ABORTED): 

147 attempts += 1 

148 if self._should_backoff: 

149 # Sleep for 2^(N-1) + jitter seconds, where N is # of attempts 

150 jitter = random.uniform(0, 1) 

151 time.sleep(pow(2, attempts - 1) + jitter) 

152 

153 elif status_code == grpc.StatusCode.NOT_FOUND: 

154 raise FileNotFoundError("Requested data does not exist on remote") 

155 

156 else: 

157 raise ConnectionError(e.details()) 

158 

159 

160@contextmanager 

161def download(channel, 

162 instance: str = None, 

163 u_uid: str = None, 

164 retries: int = 0, 

165 max_backoff: int = 64, 

166 should_backoff: bool = True): 

167 """Context manager generator for the :class:`Downloader` class.""" 

168 downloader = Downloader(channel, 

169 instance=instance, 

170 retries=retries, 

171 max_backoff=max_backoff, 

172 should_backoff=should_backoff) 

173 try: 

174 yield downloader 

175 finally: 

176 downloader.close() 

177 

178 

179class Downloader: 

180 """Remote CAS files, directories and messages download helper. 

181 

182 The :class:`Downloader` class comes with a generator factory function that 

183 can be used together with the `with` statement for context management:: 

184 

185 from buildgrid.client.cas import download 

186 

187 with download(channel, instance='build') as downloader: 

188 downloader.get_message(message_digest) 

189 """ 

190 

191 def __init__(self, 

192 channel: grpc.Channel, 

193 instance: str = None, 

194 retries: int = 0, 

195 max_backoff: int = 64, 

196 should_backoff: bool = True): 

197 """Initializes a new :class:`Downloader` instance. 

198 

199 Args: 

200 channel (grpc.Channel): A gRPC channel to the CAS endpoint. 

201 instance (str, optional): the targeted instance's name. 

202 """ 

203 self.channel = channel 

204 

205 self.instance_name = instance 

206 

207 self._grpc_retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

208 

209 self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel) 

210 self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) 

211 

212 self.__file_requests: Dict[str, _FileRequest] = {} 

213 self.__file_request_count = 0 

214 self.__file_request_size = 0 

215 self.__file_response_size = 0 

216 

217 # --- Public API --- 

218 

219 def get_blob(self, digest: Digest) -> Optional[str]: 

220 """Retrieves a blob from the remote CAS server. 

221 

222 Args: 

223 digest (:obj:`Digest`): the blob's digest to fetch. 

224 

225 Returns: 

226 bytearray: the fetched blob data or None if not found. 

227 """ 

228 try: 

229 blob = self._grpc_retrier.retry(self._fetch_blob, digest) 

230 except FileNotFoundError: 

231 return None 

232 

233 return blob 

234 

235 def get_blobs(self, digests: List[Digest]) -> List[str]: 

236 """Retrieves a list of blobs from the remote CAS server. 

237 

238 Args: 

239 digests (list): list of :obj:`Digest`\ s for the blobs to fetch. 

240 

241 Returns: 

242 list: the fetched blob data list. 

243 """ 

244 # _fetch_blob_batch returns (data, digest) pairs. 

245 # We only want the data. 

246 return [result[0] for result in self._grpc_retrier.retry(self._fetch_blob_batch, digests)] 

247 

248 def get_message(self, digest: Digest, message: Message) -> Message: 

249 """Retrieves a :obj:`Message` from the remote CAS server. 

250 

251 Args: 

252 digest (:obj:`Digest`): the message's digest to fetch. 

253 message (:obj:`Message`): an empty message to fill. 

254 

255 Returns: 

256 :obj:`Message`: `message` filled or emptied if not found. 

257 """ 

258 try: 

259 message_blob = self._grpc_retrier.retry(self._fetch_blob, digest) 

260 except NotFoundError: 

261 message_blob = None 

262 

263 if message_blob is not None: 

264 message.ParseFromString(message_blob) 

265 else: 

266 message.Clear() 

267 

268 return message 

269 

270 def get_messages(self, digests: List[Digest], messages: List[Message]) -> List[Message]: 

271 """Retrieves a list of :obj:`Message`\ s from the remote CAS server. 

272 

273 Note: 

274 The `digests` and `messages` list **must** contain the same number 

275 of elements. 

276 

277 Args: 

278 digests (list): list of :obj:`Digest`\ s for the messages to fetch. 

279 messages (list): list of empty :obj:`Message`\ s to fill. 

280 

281 Returns: 

282 list: the fetched and filled message list. 

283 """ 

284 assert len(digests) == len(messages) 

285 

286 # The individual empty messages might be of differing types, so we need 

287 # to set up a mapping 

288 digest_message_map = {digest.hash: message for (digest, message) in zip(digests, messages)} 

289 

290 batch_response = self._grpc_retrier.retry(self._fetch_blob_batch, digests) 

291 

292 messages = [] 

293 for message_blob, message_digest in batch_response: 

294 message = digest_message_map[message_digest.hash] 

295 message.ParseFromString(message_blob) 

296 messages.append(message) 

297 

298 return messages 

299 

300 def download_file(self, digest: Digest, file_path: str, is_executable: bool = False, queue: bool = True): 

301 """Retrieves a file from the remote CAS server. 

302 

303 If queuing is allowed (`queue=True`), the download request **may** be 

304 defer. An explicit call to :func:`~flush` can force the request to be 

305 send immediately (along with the rest of the queued batch). 

306 

307 Args: 

308 digest (:obj:`Digest`): the file's digest to fetch. 

309 file_path (str): absolute or relative path to the local file to write. 

310 is_executable (bool): whether the file is executable or not. 

311 queue (bool, optional): whether or not the download request may be 

312 queued and submitted as part of a batch upload request. Defaults 

313 to True. 

314 

315 Raises: 

316 FileNotFoundError: if `digest` is not present in the remote CAS server. 

317 OSError: if `file_path` does not exist or is not readable. 

318 """ 

319 if not os.path.isabs(file_path): 

320 file_path = os.path.abspath(file_path) 

321 

322 if not queue or digest.size_bytes > self._queueable_file_size_threshold(): 

323 self._grpc_retrier.retry(self._fetch_file, digest, file_path, is_executable) 

324 else: 

325 self._queue_file(digest, file_path, is_executable=is_executable) 

326 

327 def download_directory(self, digest: Digest, directory_path: str): 

328 """Retrieves a :obj:`Directory` from the remote CAS server. 

329 

330 Args: 

331 digest (:obj:`Digest`): the directory's digest to fetch. 

332 directory_path (str): the path to download to 

333 

334 Raises: 

335 NotFoundError: if `digest` is not present in the remote CAS server. 

336 FileExistsError: if `directory_path` already contains parts of their 

337 fetched directory's content. 

338 """ 

339 if not os.path.isabs(directory_path): 

340 directory_path = os.path.abspath(directory_path) 

341 

342 self._grpc_retrier.retry(self._fetch_directory, digest, directory_path) 

343 

344 def flush(self): 

345 """Ensures any queued request gets sent.""" 

346 if self.__file_requests: 

347 self._grpc_retrier.retry(self._fetch_file_batch, self.__file_requests) 

348 

349 self.__file_requests.clear() 

350 self.__file_request_count = 0 

351 self.__file_request_size = 0 

352 self.__file_response_size = 0 

353 

354 def close(self): 

355 """Closes the underlying connection stubs. 

356 

357 Note: 

358 This will always send pending requests before closing connections, 

359 if any. 

360 """ 

361 self.flush() 

362 

363 self.__bytestream_stub = None 

364 self.__cas_stub = None 

365 

366 # --- Private API --- 

367 

368 def _fetch_blob(self, digest: Digest) -> bytearray: 

369 """Fetches a blob using ByteStream.Read()""" 

370 

371 if self.instance_name: 

372 resource_name = '/'.join([self.instance_name, 'blobs', 

373 digest.hash, str(digest.size_bytes)]) 

374 else: 

375 resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) 

376 

377 read_blob = bytearray() 

378 read_request = bytestream_pb2.ReadRequest() 

379 read_request.resource_name = resource_name 

380 read_request.read_offset = 0 

381 

382 for read_response in self.__bytestream_stub.Read(read_request): 

383 read_blob += read_response.data 

384 

385 assert len(read_blob) == digest.size_bytes 

386 return read_blob 

387 

388 def _fetch_blob_batch(self, digests: List[Digest]) -> List[Tuple[str, Digest]]: 

389 """Fetches blobs using ContentAddressableStorage.BatchReadBlobs() 

390 Returns (data, digest) pairs""" 

391 batch_fetched = False 

392 read_blobs = [] 

393 

394 # First, try BatchReadBlobs(), if not already known not being implemented: 

395 if not _CallCache.unimplemented(self.channel, 'BatchReadBlobs'): 

396 batch_request = remote_execution_pb2.BatchReadBlobsRequest() 

397 batch_request.digests.extend(digests) 

398 if self.instance_name is not None: 

399 batch_request.instance_name = self.instance_name 

400 

401 try: 

402 batch_response = self.__cas_stub.BatchReadBlobs(batch_request) 

403 for response in batch_response.responses: 

404 assert response.digest in digests 

405 

406 read_blobs.append((response.data, response.digest)) 

407 

408 if response.status.code == code_pb2.NOT_FOUND: 

409 raise FileNotFoundError('Requested blob does not exist ' 

410 'on the remote.') 

411 if response.status.code != code_pb2.OK: 

412 raise ConnectionError('Error in CAS reply while fetching blob.') 

413 

414 batch_fetched = True 

415 

416 except grpc.RpcError as e: 

417 status_code = e.code() 

418 if status_code == grpc.StatusCode.UNIMPLEMENTED: 

419 _CallCache.mark_unimplemented(self.channel, 'BatchReadBlobs') 

420 elif status_code == grpc.StatusCode.INVALID_ARGUMENT: 

421 read_blobs.clear() 

422 else: 

423 raise 

424 

425 # Fallback to Read() if no BatchReadBlobs(): 

426 if not batch_fetched: 

427 for digest in digests: 

428 blob = self._grpc_retrier.retry(self._fetch_blob, digest) 

429 read_blobs.append((blob, digest)) 

430 

431 return read_blobs 

432 

433 def _fetch_file(self, digest: Digest, file_path: str, is_executable: bool = False): 

434 """Fetches a file using ByteStream.Read()""" 

435 if self.instance_name: 

436 resource_name = '/'.join([self.instance_name, 'blobs', 

437 digest.hash, str(digest.size_bytes)]) 

438 else: 

439 resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) 

440 

441 os.makedirs(os.path.dirname(file_path), exist_ok=True) 

442 

443 read_request = bytestream_pb2.ReadRequest() 

444 read_request.resource_name = resource_name 

445 read_request.read_offset = 0 

446 

447 with open(file_path, 'wb') as byte_file: 

448 for read_response in self.__bytestream_stub.Read(read_request): 

449 byte_file.write(read_response.data) 

450 

451 assert byte_file.tell() == digest.size_bytes 

452 

453 if is_executable: 

454 os.chmod(file_path, 0o755) # rwxr-xr-x / 755 

455 

456 def _queue_file(self, digest: Digest, file_path: str, is_executable: bool = False): 

457 """Queues a file for later batch download""" 

458 batch_size_limit = self._max_effective_batch_size_bytes() 

459 

460 if self.__file_request_size + digest.ByteSize() > batch_size_limit: 

461 self.flush() 

462 elif self.__file_response_size + digest.size_bytes > batch_size_limit: 

463 self.flush() 

464 elif self.__file_request_count >= MAX_REQUEST_COUNT: 

465 self.flush() 

466 

467 output_path = (file_path, is_executable) 

468 

469 # When queueing a file we take into account the cases where 

470 # we might want to download the same digest to different paths. 

471 if digest.hash not in self.__file_requests: 

472 request = _FileRequest(digest=digest, output_paths=[output_path]) 

473 self.__file_requests[digest.hash] = request 

474 

475 self.__file_request_count += 1 

476 self.__file_request_size += digest.ByteSize() 

477 self.__file_response_size += digest.size_bytes 

478 else: 

479 # We already have that hash queued; we'll fetch the blob 

480 # once and write copies of it: 

481 self.__file_requests[digest.hash].output_paths.append(output_path) 

482 

483 def _fetch_file_batch(self, requests): 

484 """Sends queued data using ContentAddressableStorage.BatchReadBlobs(). 

485 

486 Takes a dictionary (digest.hash, _FileRequest) as input. 

487 """ 

488 batch_digests = [request.digest for request in requests.values()] 

489 batch_responses = self._fetch_blob_batch(batch_digests) 

490 

491 for file_blob, file_digest in batch_responses: 

492 output_paths = requests[file_digest.hash].output_paths 

493 

494 for file_path, is_executable in output_paths: 

495 os.makedirs(os.path.dirname(file_path), exist_ok=True) 

496 

497 with open(file_path, 'wb') as byte_file: 

498 byte_file.write(file_blob) 

499 

500 if is_executable: 

501 os.chmod(file_path, 0o755) # rwxr-xr-x / 755 

502 

503 def _fetch_directory(self, digest: Digest, directory_path: str): 

504 """Fetches a file using ByteStream.GetTree()""" 

505 # Better fail early if the local root path cannot be created: 

506 os.makedirs(directory_path, exist_ok=True) 

507 

508 directories = {} 

509 directory_fetched = False 

510 # First, try GetTree() if not known to be unimplemented yet: 

511 if not _CallCache.unimplemented(self.channel, 'GetTree'): 

512 tree_request = remote_execution_pb2.GetTreeRequest() 

513 tree_request.root_digest.CopyFrom(digest) 

514 tree_request.page_size = MAX_REQUEST_COUNT 

515 if self.instance_name is not None: 

516 tree_request.instance_name = self.instance_name 

517 

518 try: 

519 for tree_response in self.__cas_stub.GetTree(tree_request): 

520 for directory in tree_response.directories: 

521 directory_blob = directory.SerializeToString() 

522 directory_hash = HASH(directory_blob).hexdigest() 

523 

524 directories[directory_hash] = directory 

525 

526 assert digest.hash in directories 

527 

528 directory = directories[digest.hash] 

529 self._write_directory(directory, directory_path, directories=directories) 

530 

531 directory_fetched = True 

532 except grpc.RpcError as e: 

533 status_code = e.code() 

534 if status_code == grpc.StatusCode.UNIMPLEMENTED: 

535 _CallCache.mark_unimplemented(self.channel, 'GetTree') 

536 

537 else: 

538 raise 

539 

540 # If no GetTree(), _write_directory() will use BatchReadBlobs() 

541 # if available or Read() if not. 

542 if not directory_fetched: 

543 directory = remote_execution_pb2.Directory() 

544 directory.ParseFromString(self._grpc_retrier.retry(self._fetch_blob, digest)) 

545 

546 self._write_directory(directory, directory_path) 

547 

548 def _write_directory(self, root_directory: Directory, root_path: str, directories: Dict[str, Directory] = None): 

549 """Generates a local directory structure""" 

550 os.makedirs(root_path, exist_ok=True) 

551 self._write_directory_recursively(root_directory, root_path, directories=None) 

552 

553 def _write_directory_recursively(self, 

554 root_directory: Directory, 

555 root_path: str, 

556 directories: Dict[str, Directory] = None): 

557 """Generate local directory recursively""" 

558 # i) Files: 

559 for file_node in root_directory.files: 

560 file_path = os.path.join(root_path, file_node.name) 

561 

562 if os.path.lexists(file_path): 

563 raise FileExistsError(f"'{file_path}' already exists") 

564 

565 self.download_file(file_node.digest, file_path, 

566 is_executable=file_node.is_executable) 

567 self.flush() 

568 

569 # ii) Directories: 

570 pending_directory_digests = [] 

571 pending_directory_paths = {} 

572 for directory_node in root_directory.directories: 

573 directory_hash = directory_node.digest.hash 

574 

575 # FIXME: Guard against ../ 

576 directory_path = os.path.join(root_path, directory_node.name) 

577 os.mkdir(directory_path) 

578 

579 if directories and directory_node.digest.hash in directories: 

580 # We already have the directory; just write it: 

581 directory = directories[directory_hash] 

582 

583 self._write_directory_recursively(directory, directory_path, 

584 directories=directories) 

585 else: 

586 # Gather all the directories that we need to get to 

587 # try fetching them in a single batch request: 

588 pending_directory_digests.append(directory_node.digest) 

589 pending_directory_paths[directory_hash] = directory_path 

590 

591 if pending_directory_paths: 

592 fetched_blobs = self._grpc_retrier.retry(self._fetch_blob_batch, pending_directory_digests) 

593 

594 for (directory_blob, directory_digest) in fetched_blobs: 

595 directory = remote_execution_pb2.Directory() 

596 directory.ParseFromString(directory_blob) 

597 

598 # Assuming that the server might not return the blobs in 

599 # the same order than they were asked for, we read 

600 # the hashes of the returned blobs: 

601 # Guarantees for the reply orderings might change in 

602 # the specification at some point. 

603 # See: github.com/bazelbuild/remote-apis/issues/52 

604 

605 directory_path = pending_directory_paths[directory_digest.hash] 

606 

607 self._write_directory(directory, directory_path, 

608 directories=directories) 

609 

610 # iii) Symlinks: 

611 for symlink_node in root_directory.symlinks: 

612 symlink_path = os.path.join(root_path, symlink_node.name) 

613 os.symlink(symlink_node.target, symlink_path) 

614 

615 def _max_effective_batch_size_bytes(self): 

616 """Returns the effective maximum number of bytes that can be 

617 transferred using batches, considering gRPC maximum message size. 

618 """ 

619 return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel, 

620 self.instance_name) 

621 

622 def _queueable_file_size_threshold(self): 

623 """Returns the size limit up until which files can be queued to 

624 be requested in a batch. 

625 """ 

626 return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel, 

627 self.instance_name) 

628 

629 

630@contextmanager 

631def upload(channel: grpc.Channel, 

632 instance: str = None, 

633 u_uid: str = None, 

634 retries: int = 0, 

635 max_backoff: int = 64, 

636 should_backoff: bool = True): 

637 """Context manager generator for the :class:`Uploader` class.""" 

638 uploader = Uploader(channel, 

639 instance=instance, 

640 u_uid=u_uid, 

641 retries=retries, 

642 max_backoff=max_backoff, 

643 should_backoff=should_backoff) 

644 try: 

645 yield uploader 

646 finally: 

647 uploader.close() 

648 

649 

650class Uploader: 

651 """Remote CAS files, directories and messages upload helper. 

652 

653 The :class:`Uploader` class comes with a generator factory function that can 

654 be used together with the `with` statement for context management:: 

655 

656 from buildgrid.client.cas import upload 

657 

658 with upload(channel, instance='build') as uploader: 

659 uploader.upload_file('/path/to/local/file') 

660 """ 

661 

662 def __init__(self, 

663 channel: grpc.Channel, 

664 instance: str = None, 

665 u_uid: str = None, 

666 retries: int = 0, 

667 max_backoff: int = 64, 

668 should_backoff: bool = True): 

669 """Initializes a new :class:`Uploader` instance. 

670 

671 Args: 

672 channel (grpc.Channel): A gRPC channel to the CAS endpoint. 

673 instance (str, optional): the targeted instance's name. 

674 u_uid (str, optional): a UUID for CAS transactions. 

675 """ 

676 self.channel = channel 

677 

678 self.instance_name = instance 

679 if u_uid is not None: 

680 self.u_uid = u_uid 

681 else: 

682 self.u_uid = str(uuid.uuid4()) 

683 

684 self._grpc_retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

685 

686 self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel) 

687 self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) 

688 

689 self.__requests: Dict[str, Tuple[bytes, Digest]] = {} 

690 self.__request_count = 0 

691 self.__request_size = 0 

692 

693 # --- Public API --- 

694 

695 def put_blob(self, blob: bytes, digest: Digest = None, queue: bool = False) -> Digest: 

696 """Stores a blob into the remote CAS server. 

697 

698 If queuing is allowed (`queue=True`), the upload request **may** be 

699 defer. An explicit call to :func:`~flush` can force the request to be 

700 send immediately (along with the rest of the queued batch). 

701 

702 Args: 

703 blob (bytes): the blob's data. 

704 digest (:obj:`Digest`, optional): the blob's digest. 

705 queue (bool, optional): whether or not the upload request may be 

706 queued and submitted as part of a batch upload request. Defaults 

707 to False. 

708 

709 Returns: 

710 :obj:`Digest`: the sent blob's digest. 

711 """ 

712 

713 if not queue or len(blob) > self._queueable_file_size_threshold(): 

714 blob_digest = self._grpc_retrier.retry(self._send_blob, blob, digest) 

715 else: 

716 blob_digest = self._queue_blob(blob, digest=digest) 

717 

718 return blob_digest 

719 

720 def put_message(self, message: Message, digest: Digest = None, queue: bool = False) -> Digest: 

721 """Stores a message into the remote CAS server. 

722 

723 If queuing is allowed (`queue=True`), the upload request **may** be 

724 defer. An explicit call to :func:`~flush` can force the request to be 

725 send immediately (along with the rest of the queued batch). 

726 

727 Args: 

728 message (:obj:`Message`): the message object. 

729 digest (:obj:`Digest`, optional): the message's digest. 

730 queue (bool, optional): whether or not the upload request may be 

731 queued and submitted as part of a batch upload request. Defaults 

732 to False. 

733 

734 Returns: 

735 :obj:`Digest`: the sent message's digest. 

736 """ 

737 message_blob = message.SerializeToString() 

738 

739 if not queue or len(message_blob) > self._queueable_file_size_threshold(): 

740 message_digest = self._grpc_retrier.retry(self._send_blob, message_blob, digest) 

741 else: 

742 message_digest = self._queue_blob(message_blob, digest=digest) 

743 

744 return message_digest 

745 

746 def upload_file(self, file_path: str, queue: bool = True) -> Digest: 

747 """Stores a local file into the remote CAS storage. 

748 

749 If queuing is allowed (`queue=True`), the upload request **may** be 

750 defer. An explicit call to :func:`~flush` can force the request to be 

751 send immediately (allong with the rest of the queued batch). 

752 

753 Args: 

754 file_path (str): absolute or relative path to a local file. 

755 queue (bool, optional): whether or not the upload request may be 

756 queued and submitted as part of a batch upload request. Defaults 

757 to True. 

758 

759 Returns: 

760 :obj:`Digest`: The digest of the file's content. 

761 

762 Raises: 

763 FileNotFoundError: If `file_path` does not exist. 

764 PermissionError: If `file_path` is not readable. 

765 """ 

766 if not os.path.isabs(file_path): 

767 file_path = os.path.abspath(file_path) 

768 

769 with open(file_path, 'rb') as bytes_steam: 

770 file_bytes = bytes_steam.read() 

771 

772 if not queue or len(file_bytes) > self._queueable_file_size_threshold(): 

773 file_digest = self._grpc_retrier.retry(self._send_blob, file_bytes) 

774 else: 

775 file_digest = self._queue_blob(file_bytes) 

776 

777 return file_digest 

778 

779 def upload_directory(self, directory_path: str, queue: bool = True) -> Digest: 

780 """Stores a local folder into the remote CAS storage. 

781 

782 If queuing is allowed (`queue=True`), the upload request **may** be 

783 defer. An explicit call to :func:`~flush` can force the request to be 

784 send immediately (allong with the rest of the queued batch). 

785 

786 Args: 

787 directory_path (str): absolute or relative path to a local folder. 

788 queue (bool, optional): wheter or not the upload requests may be 

789 queued and submitted as part of a batch upload request. Defaults 

790 to True. 

791 

792 Returns: 

793 :obj:`Digest`: The digest of the top :obj:`Directory`. 

794 

795 Raises: 

796 FileNotFoundError: If `directory_path` does not exist. 

797 PermissionError: If `directory_path` is not readable. 

798 """ 

799 if not os.path.isabs(directory_path): 

800 directory_path = os.path.abspath(directory_path) 

801 

802 if not queue: 

803 for node, blob, _ in merkle_tree_maker(directory_path): 

804 if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR: 

805 last_directory_node = node 

806 

807 self._grpc_retrier.retry(self._send_blob, blob, node.digest) 

808 

809 else: 

810 for node, blob, _ in merkle_tree_maker(directory_path): 

811 if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR: 

812 last_directory_node = node 

813 

814 self._queue_blob(blob, digest=node.digest) 

815 

816 return last_directory_node.digest 

817 

818 def upload_tree(self, directory_path: str, queue: bool = True) -> Digest: 

819 """Stores a local folder into the remote CAS storage as a :obj:`Tree`. 

820 

821 If queuing is allowed (`queue=True`), the upload request **may** be 

822 defer. An explicit call to :func:`~flush` can force the request to be 

823 send immediately (allong with the rest of the queued batch). 

824 

825 Args: 

826 directory_path (str): absolute or relative path to a local folder. 

827 queue (bool, optional): wheter or not the upload requests may be 

828 queued and submitted as part of a batch upload request. Defaults 

829 to True. 

830 

831 Returns: 

832 :obj:`Digest`: The digest of the :obj:`Tree`. 

833 

834 Raises: 

835 FileNotFoundError: If `directory_path` does not exist. 

836 PermissionError: If `directory_path` is not readable. 

837 """ 

838 if not os.path.isabs(directory_path): 

839 directory_path = os.path.abspath(directory_path) 

840 

841 directories = [] 

842 

843 if not queue: 

844 for node, blob, _ in merkle_tree_maker(directory_path): 

845 if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR: 

846 # TODO: Get the Directory object from merkle_tree_maker(): 

847 directory = remote_execution_pb2.Directory() 

848 directory.ParseFromString(blob) 

849 directories.append(directory) 

850 

851 self._grpc_retrier.retry(self._send_blob, blob, node.digest) 

852 

853 else: 

854 for node, blob, _ in merkle_tree_maker(directory_path): 

855 if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR: 

856 # TODO: Get the Directory object from merkle_tree_maker(): 

857 directory = remote_execution_pb2.Directory() 

858 directory.ParseFromString(blob) 

859 directories.append(directory) 

860 

861 self._queue_blob(blob, digest=node.digest) 

862 

863 tree = remote_execution_pb2.Tree() 

864 tree.root.CopyFrom(directories[-1]) 

865 tree.children.extend(directories[:-1]) 

866 

867 return self.put_message(tree, queue=queue) 

868 

869 def flush(self): 

870 """Ensures any queued request gets sent.""" 

871 if self.__requests: 

872 self._grpc_retrier.retry(self._send_blob_batch, self.__requests) 

873 

874 self.__requests.clear() 

875 self.__request_count = 0 

876 self.__request_size = 0 

877 

878 def close(self): 

879 """Closes the underlying connection stubs. 

880 

881 Note: 

882 This will always send pending requests before closing connections, 

883 if any. 

884 """ 

885 self.flush() 

886 

887 self.__bytestream_stub = None 

888 self.__cas_stub = None 

889 

890 # --- Private API --- 

891 

892 def _send_blob(self, blob: bytes, digest: Digest = None) -> Digest: 

893 """Sends a memory block using ByteStream.Write()""" 

894 blob_digest = Digest() 

895 if digest is not None: 

896 blob_digest.CopyFrom(digest) 

897 else: 

898 blob_digest.hash = HASH(blob).hexdigest() 

899 blob_digest.size_bytes = len(blob) 

900 if self.instance_name: 

901 resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs', 

902 blob_digest.hash, str(blob_digest.size_bytes)]) 

903 else: 

904 resource_name = '/'.join(['uploads', self.u_uid, 'blobs', 

905 blob_digest.hash, str(blob_digest.size_bytes)]) 

906 

907 def __write_request_stream(resource, content): 

908 offset = 0 

909 finished = False 

910 remaining = len(content) 

911 while not finished: 

912 chunk_size = min(remaining, MAX_REQUEST_SIZE) 

913 remaining -= chunk_size 

914 

915 request = bytestream_pb2.WriteRequest() 

916 request.resource_name = resource 

917 request.data = content[offset:offset + chunk_size] 

918 request.write_offset = offset 

919 request.finish_write = remaining <= 0 

920 

921 yield request 

922 

923 offset += chunk_size 

924 finished = request.finish_write 

925 

926 write_requests = __write_request_stream(resource_name, blob) 

927 write_response = self.__bytestream_stub.Write(write_requests) 

928 assert write_response.committed_size == blob_digest.size_bytes 

929 

930 return blob_digest 

931 

932 def _queue_blob(self, blob: bytes, digest: Digest = None) -> Digest: 

933 """Queues a memory block for later batch upload""" 

934 blob_digest = Digest() 

935 if digest is not None: 

936 blob_digest.CopyFrom(digest) 

937 else: 

938 blob_digest.hash = HASH(blob).hexdigest() 

939 blob_digest.size_bytes = len(blob) 

940 

941 # If we are here queueing a file we know that its size is 

942 # smaller than gRPC's message size limit. 

943 # We'll make a single batch request as big as the server allows. 

944 batch_size_limit = self._max_effective_batch_size_bytes() 

945 

946 if self.__request_size + blob_digest.size_bytes > batch_size_limit: 

947 self.flush() 

948 elif self.__request_count >= MAX_REQUEST_COUNT: 

949 self.flush() 

950 

951 self.__requests[blob_digest.hash] = (blob, blob_digest) 

952 self.__request_count += 1 

953 self.__request_size += blob_digest.size_bytes 

954 

955 return blob_digest 

956 

957 def _send_blob_batch(self, batch: Dict[str, Tuple[bytes, Digest]]) -> List[str]: 

958 """Sends queued data using ContentAddressableStorage.BatchUpdateBlobs()""" 

959 batch_fetched = False 

960 written_digests = [] 

961 

962 # First, try BatchUpdateBlobs(), if not already known not being implemented: 

963 if not _CallCache.unimplemented(self.channel, 'BatchUpdateBlobs'): 

964 batch_request = remote_execution_pb2.BatchUpdateBlobsRequest() 

965 if self.instance_name is not None: 

966 batch_request.instance_name = self.instance_name 

967 

968 for blob, digest in batch.values(): 

969 request = batch_request.requests.add() 

970 request.digest.CopyFrom(digest) 

971 request.data = blob 

972 

973 try: 

974 batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request) 

975 for response in batch_response.responses: 

976 assert response.digest.hash in batch 

977 

978 written_digests.append(response.digest) 

979 if response.status.code != code_pb2.OK: 

980 response.digest.Clear() 

981 

982 batch_fetched = True 

983 

984 except grpc.RpcError as e: 

985 status_code = e.code() 

986 if status_code == grpc.StatusCode.UNIMPLEMENTED: 

987 _CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs') 

988 

989 elif status_code == grpc.StatusCode.INVALID_ARGUMENT: 

990 written_digests.clear() 

991 batch_fetched = False 

992 

993 else: 

994 raise 

995 

996 # Fallback to Write() if no BatchUpdateBlobs(): 

997 if not batch_fetched: 

998 for blob, digest in batch.values(): 

999 written_digests.append(self._send_blob(blob)) 

1000 

1001 return written_digests 

1002 

1003 def _max_effective_batch_size_bytes(self): 

1004 """Returns the effective maximum number of bytes that can be 

1005 transferred using batches, considering gRPC maximum message size. 

1006 """ 

1007 return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel, 

1008 self.instance_name) 

1009 

1010 def _queueable_file_size_threshold(self): 

1011 """Returns the size limit up until which files can be queued to 

1012 be requested in a batch. 

1013 """ 

1014 return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel, 

1015 self.instance_name)