Coverage for /builds/BuildGrid/buildgrid/buildgrid/client/cas.py: 16.12%

459 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15# pylint: disable=anomalous-backslash-in-string 

16 

17 

18from collections import namedtuple 

19from contextlib import contextmanager 

20import os 

21import random 

22import time 

23from typing import Dict, List, Optional, Set, Tuple 

24import uuid 

25 

26import grpc 

27from google.protobuf.message import Message 

28 

29import buildgrid.server.context as context_module 

30from buildgrid._exceptions import NotFoundError 

31from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc 

32from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

33 Digest, 

34 Directory 

35) 

36from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc 

37from buildgrid._protos.google.rpc import code_pb2 

38from buildgrid.client.capabilities import CapabilitiesInterface 

39from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT, BATCH_REQUEST_SIZE_THRESHOLD 

40from buildgrid.utils import merkle_tree_maker 

41 

42 

43_FileRequest = namedtuple('_FileRequest', ['digest', 'output_paths']) 

44 

45 

46class _CallCache: 

47 """Per remote grpc.StatusCode.UNIMPLEMENTED call cache.""" 

48 __calls: Dict[grpc.Channel, Set[str]] = {} 

49 

50 @classmethod 

51 def mark_unimplemented(cls, channel, name): 

52 if channel not in cls.__calls: 

53 cls.__calls[channel] = set() 

54 cls.__calls[channel].add(name) 

55 

56 @classmethod 

57 def unimplemented(cls, channel, name): 

58 if channel not in cls.__calls: 

59 return False 

60 return name in cls.__calls[channel] 

61 

62 

63class _CasBatchRequestSizesCache: 

64 """Cache that stores, for each remote, the limit of bytes that can 

65 be transferred using batches as well as a threshold that determines 

66 when a file can be fetched as part of a batch request. 

67 """ 

68 __cas_max_batch_transfer_size: Dict[grpc.Channel, Dict[str, int]] = {} 

69 __cas_batch_request_size_threshold: Dict[grpc.Channel, Dict[str, int]] = {} 

70 

71 @classmethod 

72 def max_effective_batch_size_bytes(cls, channel: grpc.Channel, instance_name: str) -> int: 

73 """Returns the maximum number of bytes that can be transferred 

74 using batch methods for the given remote. 

75 """ 

76 if channel not in cls.__cas_max_batch_transfer_size: 

77 cls.__cas_max_batch_transfer_size[channel] = {} 

78 

79 if instance_name not in cls.__cas_max_batch_transfer_size[channel]: 

80 max_batch_size = cls._get_server_max_batch_total_size_bytes(channel, 

81 instance_name) 

82 

83 cls.__cas_max_batch_transfer_size[channel][instance_name] = max_batch_size 

84 

85 return cls.__cas_max_batch_transfer_size[channel][instance_name] 

86 

87 @classmethod 

88 def batch_request_size_threshold(cls, channel: grpc.Channel, instance_name: str) -> int: 

89 if channel not in cls.__cas_batch_request_size_threshold: 

90 cls.__cas_batch_request_size_threshold[channel] = {} 

91 

92 if instance_name not in cls.__cas_batch_request_size_threshold[channel]: 

93 # Computing the threshold: 

94 max_batch_size = cls.max_effective_batch_size_bytes(channel, 

95 instance_name) 

96 threshold = int(BATCH_REQUEST_SIZE_THRESHOLD * max_batch_size) 

97 

98 cls.__cas_batch_request_size_threshold[channel][instance_name] = threshold 

99 

100 return cls.__cas_batch_request_size_threshold[channel][instance_name] 

101 

102 @classmethod 

103 def _get_server_max_batch_total_size_bytes(cls, channel: grpc.Channel, instance_name: str) -> int: 

104 """Returns the maximum number of bytes that can be effectively 

105 transferred using batches, considering the limits imposed by 

106 the server's configuration and by gRPC. 

107 """ 

108 try: 

109 capabilities_interface = CapabilitiesInterface(channel) 

110 server_capabilities = capabilities_interface.get_capabilities(instance_name) 

111 

112 cache_capabilities = server_capabilities.cache_capabilities 

113 

114 max_batch_total_size = cache_capabilities.max_batch_total_size_bytes 

115 # The server could set this value to 0 (no limit set). 

116 if max_batch_total_size: 

117 return min(max_batch_total_size, MAX_REQUEST_SIZE) 

118 except ConnectionError: 

119 pass 

120 

121 return MAX_REQUEST_SIZE 

122 

123 

124class GrpcRetrier: 

125 def __init__(self, retries: int, max_backoff: int = 64, should_backoff: bool = True): 

126 """Initializes a new :class:`GrpcRetrier`. 

127 

128 Args: 

129 retries (int): The maximum number of attempts for each RPC call. 

130 max_backoff (int): The maximum time to wait between retries. 

131 should_backoff (bool): Whether to backoff at all. Always set this to True except in tests. 

132 """ 

133 

134 self._retries = retries 

135 self._max_backoff = max_backoff 

136 self._should_backoff = should_backoff 

137 

138 def retry(self, func, *args, **kwargs): 

139 attempts = 0 

140 while True: 

141 try: 

142 return func(*args, **kwargs) 

143 except grpc.RpcError as e: 

144 status_code = e.code() 

145 

146 # Retry only on UNAVAILABLE and ABORTED 

147 if status_code in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.ABORTED): 

148 attempts += 1 

149 if attempts > self._retries: 

150 raise e 

151 if self._should_backoff: 

152 # Sleep for 2^(N-1) + jitter seconds, where N is # of attempts 

153 jitter = random.uniform(0, 1) 

154 time.sleep(pow(2, attempts - 1) + jitter) 

155 

156 elif status_code == grpc.StatusCode.NOT_FOUND: 

157 raise NotFoundError("Requested data does not exist on remote") 

158 

159 else: 

160 raise ConnectionError(e.details()) 

161 

162 

163@contextmanager 

164def download(channel, 

165 instance: str = None, 

166 u_uid: str = None, 

167 retries: int = 0, 

168 max_backoff: int = 64, 

169 should_backoff: bool = True): 

170 """Context manager generator for the :class:`Downloader` class.""" 

171 downloader = Downloader(channel, 

172 instance=instance, 

173 retries=retries, 

174 max_backoff=max_backoff, 

175 should_backoff=should_backoff) 

176 try: 

177 yield downloader 

178 finally: 

179 downloader.close() 

180 

181 

182class Downloader: 

183 """Remote CAS files, directories and messages download helper. 

184 

185 The :class:`Downloader` class comes with a generator factory function that 

186 can be used together with the `with` statement for context management:: 

187 

188 from buildgrid.client.cas import download 

189 

190 with download(channel, instance='build') as downloader: 

191 downloader.get_message(message_digest) 

192 """ 

193 

194 def __init__(self, 

195 channel: grpc.Channel, 

196 instance: str = None, 

197 retries: int = 0, 

198 max_backoff: int = 64, 

199 should_backoff: bool = True): 

200 """Initializes a new :class:`Downloader` instance. 

201 

202 Args: 

203 channel (grpc.Channel): A gRPC channel to the CAS endpoint. 

204 instance (str, optional): the targeted instance's name. 

205 """ 

206 self.channel = channel 

207 

208 self.instance_name = instance 

209 

210 self._grpc_retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

211 

212 self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel) 

213 self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) 

214 

215 self.__file_requests: Dict[str, _FileRequest] = {} 

216 self.__file_request_count = 0 

217 self.__file_request_size = 0 

218 self.__file_response_size = 0 

219 

220 # --- Public API --- 

221 

222 def get_blob(self, digest: Digest) -> Optional[str]: 

223 """Retrieves a blob from the remote CAS server. 

224 

225 Args: 

226 digest (:obj:`Digest`): the blob's digest to fetch. 

227 

228 Returns: 

229 bytearray: the fetched blob data or None if not found. 

230 """ 

231 try: 

232 blob = self._grpc_retrier.retry(self._fetch_blob, digest) 

233 except NotFoundError: 

234 return None 

235 

236 return blob 

237 

238 def get_blobs(self, digests: List[Digest]) -> List[str]: 

239 """Retrieves a list of blobs from the remote CAS server. 

240 

241 Args: 

242 digests (list): list of :obj:`Digest`\ s for the blobs to fetch. 

243 

244 Returns: 

245 list: the fetched blob data list. 

246 

247 Raises: 

248 NotFoundError: if a blob is not present in the remote CAS server. 

249 """ 

250 # _fetch_blob_batch returns (data, digest) pairs. 

251 # We only want the data. 

252 return [result[0] for result in self._grpc_retrier.retry(self._fetch_blob_batch, digests)] 

253 

254 def get_available_blobs(self, digests: List[Digest]) -> List[Tuple[str, Digest]]: 

255 """Retrieves a list of blobs from the remote CAS server. 

256 

257 Skips blobs not found on the server without raising an error. 

258 

259 Args: 

260 digests (list): list of :obj:`Digest`\ s for the blobs to fetch. 

261 

262 Returns: 

263 list: the fetched blob data list as (data, digest) pairs 

264 """ 

265 return self._grpc_retrier.retry(self._fetch_blob_batch, digests, skip_unavailable=True) 

266 

267 def get_message(self, digest: Digest, message: Message) -> Message: 

268 """Retrieves a :obj:`Message` from the remote CAS server. 

269 

270 Args: 

271 digest (:obj:`Digest`): the message's digest to fetch. 

272 message (:obj:`Message`): an empty message to fill. 

273 

274 Returns: 

275 :obj:`Message`: `message` filled or emptied if not found. 

276 """ 

277 try: 

278 message_blob = self._grpc_retrier.retry(self._fetch_blob, digest) 

279 except NotFoundError: 

280 message_blob = None 

281 

282 if message_blob is not None: 

283 message.ParseFromString(message_blob) 

284 else: 

285 message.Clear() 

286 

287 return message 

288 

289 def get_messages(self, digests: List[Digest], messages: List[Message]) -> List[Message]: 

290 """Retrieves a list of :obj:`Message`\ s from the remote CAS server. 

291 

292 Note: 

293 The `digests` and `messages` list **must** contain the same number 

294 of elements. 

295 

296 Args: 

297 digests (list): list of :obj:`Digest`\ s for the messages to fetch. 

298 messages (list): list of empty :obj:`Message`\ s to fill. 

299 

300 Returns: 

301 list: the fetched and filled message list. 

302 """ 

303 assert len(digests) == len(messages) 

304 

305 # The individual empty messages might be of differing types, so we need 

306 # to set up a mapping 

307 digest_message_map = {digest.hash: message for (digest, message) in zip(digests, messages)} 

308 

309 batch_response = self._grpc_retrier.retry(self._fetch_blob_batch, digests) 

310 

311 messages = [] 

312 for message_blob, message_digest in batch_response: 

313 message = digest_message_map[message_digest.hash] 

314 message.ParseFromString(message_blob) 

315 messages.append(message) 

316 

317 return messages 

318 

319 def download_file(self, digest: Digest, file_path: str, is_executable: bool = False, queue: bool = True): 

320 """Retrieves a file from the remote CAS server. 

321 

322 If queuing is allowed (`queue=True`), the download request **may** be 

323 defer. An explicit call to :func:`~flush` can force the request to be 

324 send immediately (along with the rest of the queued batch). 

325 

326 Args: 

327 digest (:obj:`Digest`): the file's digest to fetch. 

328 file_path (str): absolute or relative path to the local file to write. 

329 is_executable (bool): whether the file is executable or not. 

330 queue (bool, optional): whether or not the download request may be 

331 queued and submitted as part of a batch upload request. Defaults 

332 to True. 

333 

334 Raises: 

335 NotFoundError: if `digest` is not present in the remote CAS server. 

336 OSError: if `file_path` does not exist or is not readable. 

337 """ 

338 if not os.path.isabs(file_path): 

339 file_path = os.path.abspath(file_path) 

340 

341 if not queue or digest.size_bytes > self._queueable_file_size_threshold(): 

342 self._grpc_retrier.retry(self._fetch_file, digest, file_path, is_executable) 

343 else: 

344 self._queue_file(digest, file_path, is_executable=is_executable) 

345 

346 def download_directory(self, digest: Digest, directory_path: str): 

347 """Retrieves a :obj:`Directory` from the remote CAS server. 

348 

349 Args: 

350 digest (:obj:`Digest`): the directory's digest to fetch. 

351 directory_path (str): the path to download to 

352 

353 Raises: 

354 NotFoundError: if `digest` is not present in the remote CAS server. 

355 FileExistsError: if `directory_path` already contains parts of their 

356 fetched directory's content. 

357 """ 

358 if not os.path.isabs(directory_path): 

359 directory_path = os.path.abspath(directory_path) 

360 

361 self._grpc_retrier.retry(self._fetch_directory, digest, directory_path) 

362 

363 def flush(self): 

364 """Ensures any queued request gets sent.""" 

365 if self.__file_requests: 

366 self._grpc_retrier.retry(self._fetch_file_batch, self.__file_requests) 

367 

368 self.__file_requests.clear() 

369 self.__file_request_count = 0 

370 self.__file_request_size = 0 

371 self.__file_response_size = 0 

372 

373 def close(self): 

374 """Closes the underlying connection stubs. 

375 

376 Note: 

377 This will always send pending requests before closing connections, 

378 if any. 

379 """ 

380 self.flush() 

381 

382 self.__bytestream_stub = None 

383 self.__cas_stub = None 

384 

385 # --- Private API --- 

386 

387 def _fetch_blob(self, digest: Digest) -> bytearray: 

388 """Fetches a blob using ByteStream.Read()""" 

389 

390 if self.instance_name: 

391 resource_name = '/'.join([self.instance_name, 'blobs', 

392 digest.hash, str(digest.size_bytes)]) 

393 else: 

394 resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) 

395 

396 read_blob = bytearray() 

397 read_request = bytestream_pb2.ReadRequest() 

398 read_request.resource_name = resource_name 

399 read_request.read_offset = 0 

400 

401 for read_response in (self.__bytestream_stub.Read(read_request, 

402 metadata=context_module.metadata_list())): 

403 read_blob += read_response.data 

404 

405 assert len(read_blob) == digest.size_bytes 

406 return read_blob 

407 

408 def _fetch_blob_batch(self, digests: List[Digest], *, skip_unavailable=False) -> List[Tuple[str, Digest]]: 

409 """Fetches blobs using ContentAddressableStorage.BatchReadBlobs() 

410 Returns (data, digest) pairs""" 

411 batch_fetched = False 

412 read_blobs = [] 

413 

414 # First, try BatchReadBlobs(), if not already known not being implemented: 

415 if not _CallCache.unimplemented(self.channel, 'BatchReadBlobs'): 

416 batch_request = remote_execution_pb2.BatchReadBlobsRequest() 

417 batch_request.digests.extend(digests) 

418 if self.instance_name is not None: 

419 batch_request.instance_name = self.instance_name 

420 

421 try: 

422 batch_response = (self.__cas_stub.BatchReadBlobs(batch_request, 

423 metadata=context_module.metadata_list())) 

424 

425 for response in batch_response.responses: 

426 assert response.digest in digests 

427 

428 if response.status.code == code_pb2.OK: 

429 read_blobs.append((response.data, response.digest)) 

430 elif response.status.code == code_pb2.NOT_FOUND: 

431 if not skip_unavailable: 

432 raise NotFoundError('Requested blob does not exist ' 

433 'on the remote.') 

434 else: 

435 raise ConnectionError('Error in CAS reply while fetching blob.') 

436 

437 batch_fetched = True 

438 

439 except grpc.RpcError as e: 

440 status_code = e.code() 

441 if status_code == grpc.StatusCode.UNIMPLEMENTED: 

442 _CallCache.mark_unimplemented(self.channel, 'BatchReadBlobs') 

443 elif status_code == grpc.StatusCode.INVALID_ARGUMENT: 

444 read_blobs.clear() 

445 else: 

446 raise 

447 

448 # Fallback to Read() if no BatchReadBlobs(): 

449 if not batch_fetched: 

450 for digest in digests: 

451 blob = self._grpc_retrier.retry(self._fetch_blob, digest) 

452 read_blobs.append((blob, digest)) 

453 

454 return read_blobs 

455 

456 def _fetch_file(self, digest: Digest, file_path: str, is_executable: bool = False): 

457 """Fetches a file using ByteStream.Read()""" 

458 if self.instance_name: 

459 resource_name = '/'.join([self.instance_name, 'blobs', 

460 digest.hash, str(digest.size_bytes)]) 

461 else: 

462 resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) 

463 

464 os.makedirs(os.path.dirname(file_path), exist_ok=True) 

465 

466 read_request = bytestream_pb2.ReadRequest() 

467 read_request.resource_name = resource_name 

468 read_request.read_offset = 0 

469 

470 with open(file_path, 'wb') as byte_file: 

471 for read_response in (self.__bytestream_stub.Read(read_request, 

472 metadata=context_module.metadata_list())): 

473 byte_file.write(read_response.data) 

474 

475 assert byte_file.tell() == digest.size_bytes 

476 

477 if is_executable: 

478 os.chmod(file_path, 0o755) # rwxr-xr-x / 755 

479 

480 def _queue_file(self, digest: Digest, file_path: str, is_executable: bool = False): 

481 """Queues a file for later batch download""" 

482 batch_size_limit = self._max_effective_batch_size_bytes() 

483 

484 if self.__file_request_size + digest.ByteSize() > batch_size_limit: 

485 self.flush() 

486 elif self.__file_response_size + digest.size_bytes > batch_size_limit: 

487 self.flush() 

488 elif self.__file_request_count >= MAX_REQUEST_COUNT: 

489 self.flush() 

490 

491 output_path = (file_path, is_executable) 

492 

493 # When queueing a file we take into account the cases where 

494 # we might want to download the same digest to different paths. 

495 if digest.hash not in self.__file_requests: 

496 request = _FileRequest(digest=digest, output_paths=[output_path]) 

497 self.__file_requests[digest.hash] = request 

498 

499 self.__file_request_count += 1 

500 self.__file_request_size += digest.ByteSize() 

501 self.__file_response_size += digest.size_bytes 

502 else: 

503 # We already have that hash queued; we'll fetch the blob 

504 # once and write copies of it: 

505 self.__file_requests[digest.hash].output_paths.append(output_path) 

506 

507 def _fetch_file_batch(self, requests): 

508 """Sends queued data using ContentAddressableStorage.BatchReadBlobs(). 

509 

510 Takes a dictionary (digest.hash, _FileRequest) as input. 

511 """ 

512 batch_digests = [request.digest for request in requests.values()] 

513 batch_responses = self._fetch_blob_batch(batch_digests) 

514 

515 for file_blob, file_digest in batch_responses: 

516 output_paths = requests[file_digest.hash].output_paths 

517 

518 for file_path, is_executable in output_paths: 

519 os.makedirs(os.path.dirname(file_path), exist_ok=True) 

520 

521 with open(file_path, 'wb') as byte_file: 

522 byte_file.write(file_blob) 

523 

524 if is_executable: 

525 os.chmod(file_path, 0o755) # rwxr-xr-x / 755 

526 

527 def _fetch_directory(self, digest: Digest, directory_path: str): 

528 """Fetches a file using ByteStream.GetTree()""" 

529 # Better fail early if the local root path cannot be created: 

530 os.makedirs(directory_path, exist_ok=True) 

531 

532 directories = {} 

533 directory_fetched = False 

534 # First, try GetTree() if not known to be unimplemented yet: 

535 if not _CallCache.unimplemented(self.channel, 'GetTree'): 

536 tree_request = remote_execution_pb2.GetTreeRequest() 

537 tree_request.root_digest.CopyFrom(digest) 

538 tree_request.page_size = MAX_REQUEST_COUNT 

539 if self.instance_name is not None: 

540 tree_request.instance_name = self.instance_name 

541 

542 try: 

543 for tree_response in self.__cas_stub.GetTree(tree_request): 

544 for directory in tree_response.directories: 

545 directory_blob = directory.SerializeToString() 

546 directory_hash = HASH(directory_blob).hexdigest() 

547 

548 directories[directory_hash] = directory 

549 

550 assert digest.hash in directories 

551 

552 directory = directories[digest.hash] 

553 self._write_directory(directory, directory_path, directories=directories) 

554 

555 directory_fetched = True 

556 except grpc.RpcError as e: 

557 status_code = e.code() 

558 if status_code == grpc.StatusCode.UNIMPLEMENTED: 

559 _CallCache.mark_unimplemented(self.channel, 'GetTree') 

560 

561 else: 

562 raise 

563 

564 # If no GetTree(), _write_directory() will use BatchReadBlobs() 

565 # if available or Read() if not. 

566 if not directory_fetched: 

567 directory = remote_execution_pb2.Directory() 

568 directory.ParseFromString(self._grpc_retrier.retry(self._fetch_blob, digest)) 

569 

570 self._write_directory(directory, directory_path) 

571 

572 def _write_directory(self, root_directory: Directory, root_path: str, directories: Dict[str, Directory] = None): 

573 """Generates a local directory structure""" 

574 os.makedirs(root_path, exist_ok=True) 

575 self._write_directory_recursively(root_directory, root_path, directories=None) 

576 

577 def _write_directory_recursively(self, 

578 root_directory: Directory, 

579 root_path: str, 

580 directories: Dict[str, Directory] = None): 

581 """Generate local directory recursively""" 

582 # i) Files: 

583 for file_node in root_directory.files: 

584 file_path = os.path.join(root_path, file_node.name) 

585 

586 if os.path.lexists(file_path): 

587 raise FileExistsError(f"'{file_path}' already exists") 

588 

589 self.download_file(file_node.digest, file_path, 

590 is_executable=file_node.is_executable) 

591 self.flush() 

592 

593 # ii) Directories: 

594 pending_directory_digests = [] 

595 pending_directory_paths = {} 

596 for directory_node in root_directory.directories: 

597 directory_hash = directory_node.digest.hash 

598 

599 # FIXME: Guard against ../ 

600 directory_path = os.path.join(root_path, directory_node.name) 

601 os.mkdir(directory_path) 

602 

603 if directories and directory_node.digest.hash in directories: 

604 # We already have the directory; just write it: 

605 directory = directories[directory_hash] 

606 

607 self._write_directory_recursively(directory, directory_path, 

608 directories=directories) 

609 else: 

610 # Gather all the directories that we need to get to 

611 # try fetching them in a single batch request: 

612 pending_directory_digests.append(directory_node.digest) 

613 pending_directory_paths[directory_hash] = directory_path 

614 

615 if pending_directory_paths: 

616 fetched_blobs = self._grpc_retrier.retry(self._fetch_blob_batch, pending_directory_digests) 

617 

618 for (directory_blob, directory_digest) in fetched_blobs: 

619 directory = remote_execution_pb2.Directory() 

620 directory.ParseFromString(directory_blob) 

621 

622 # Assuming that the server might not return the blobs in 

623 # the same order than they were asked for, we read 

624 # the hashes of the returned blobs: 

625 # Guarantees for the reply orderings might change in 

626 # the specification at some point. 

627 # See: github.com/bazelbuild/remote-apis/issues/52 

628 

629 directory_path = pending_directory_paths[directory_digest.hash] 

630 

631 self._write_directory(directory, directory_path, 

632 directories=directories) 

633 

634 # iii) Symlinks: 

635 for symlink_node in root_directory.symlinks: 

636 symlink_path = os.path.join(root_path, symlink_node.name) 

637 os.symlink(symlink_node.target, symlink_path) 

638 

639 def _max_effective_batch_size_bytes(self): 

640 """Returns the effective maximum number of bytes that can be 

641 transferred using batches, considering gRPC maximum message size. 

642 """ 

643 return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel, 

644 self.instance_name) 

645 

646 def _queueable_file_size_threshold(self): 

647 """Returns the size limit up until which files can be queued to 

648 be requested in a batch. 

649 """ 

650 return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel, 

651 self.instance_name) 

652 

653 

654@contextmanager 

655def upload(channel: grpc.Channel, 

656 instance: str = None, 

657 u_uid: str = None, 

658 retries: int = 0, 

659 max_backoff: int = 64, 

660 should_backoff: bool = True): 

661 """Context manager generator for the :class:`Uploader` class.""" 

662 uploader = Uploader(channel, 

663 instance=instance, 

664 u_uid=u_uid, 

665 retries=retries, 

666 max_backoff=max_backoff, 

667 should_backoff=should_backoff) 

668 try: 

669 yield uploader 

670 finally: 

671 uploader.close() 

672 

673 

674class Uploader: 

675 """Remote CAS files, directories and messages upload helper. 

676 

677 The :class:`Uploader` class comes with a generator factory function that can 

678 be used together with the `with` statement for context management:: 

679 

680 from buildgrid.client.cas import upload 

681 

682 with upload(channel, instance='build') as uploader: 

683 uploader.upload_file('/path/to/local/file') 

684 """ 

685 

686 def __init__(self, 

687 channel: grpc.Channel, 

688 instance: str = None, 

689 u_uid: str = None, 

690 retries: int = 0, 

691 max_backoff: int = 64, 

692 should_backoff: bool = True): 

693 """Initializes a new :class:`Uploader` instance. 

694 

695 Args: 

696 channel (grpc.Channel): A gRPC channel to the CAS endpoint. 

697 instance (str, optional): the targeted instance's name. 

698 u_uid (str, optional): a UUID for CAS transactions. 

699 """ 

700 self.channel = channel 

701 

702 self.instance_name = instance 

703 if u_uid is not None: 

704 self.u_uid = u_uid 

705 else: 

706 self.u_uid = str(uuid.uuid4()) 

707 

708 self._grpc_retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

709 

710 self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel) 

711 self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) 

712 

713 self.__requests: Dict[str, Tuple[bytes, Digest]] = {} 

714 self.__request_count = 0 

715 self.__request_size = 0 

716 

717 # --- Public API --- 

718 

719 def put_blob(self, blob: bytes, digest: Digest = None, queue: bool = False) -> Digest: 

720 """Stores a blob into the remote CAS server. 

721 

722 If queuing is allowed (`queue=True`), the upload request **may** be 

723 defer. An explicit call to :func:`~flush` can force the request to be 

724 send immediately (along with the rest of the queued batch). 

725 

726 Args: 

727 blob (bytes): the blob's data. 

728 digest (:obj:`Digest`, optional): the blob's digest. 

729 queue (bool, optional): whether or not the upload request may be 

730 queued and submitted as part of a batch upload request. Defaults 

731 to False. 

732 

733 Returns: 

734 :obj:`Digest`: the sent blob's digest. 

735 """ 

736 

737 if not queue or len(blob) > self._queueable_file_size_threshold(): 

738 blob_digest = self._grpc_retrier.retry(self._send_blob, blob, digest) 

739 else: 

740 blob_digest = self._queue_blob(blob, digest=digest) 

741 

742 return blob_digest 

743 

744 def put_message(self, message: Message, digest: Digest = None, queue: bool = False) -> Digest: 

745 """Stores a message into the remote CAS server. 

746 

747 If queuing is allowed (`queue=True`), the upload request **may** be 

748 defer. An explicit call to :func:`~flush` can force the request to be 

749 send immediately (along with the rest of the queued batch). 

750 

751 Args: 

752 message (:obj:`Message`): the message object. 

753 digest (:obj:`Digest`, optional): the message's digest. 

754 queue (bool, optional): whether or not the upload request may be 

755 queued and submitted as part of a batch upload request. Defaults 

756 to False. 

757 

758 Returns: 

759 :obj:`Digest`: the sent message's digest. 

760 """ 

761 message_blob = message.SerializeToString() 

762 

763 if not queue or len(message_blob) > self._queueable_file_size_threshold(): 

764 message_digest = self._grpc_retrier.retry(self._send_blob, message_blob, digest) 

765 else: 

766 message_digest = self._queue_blob(message_blob, digest=digest) 

767 

768 return message_digest 

769 

770 def upload_file(self, file_path: str, queue: bool = True) -> Digest: 

771 """Stores a local file into the remote CAS storage. 

772 

773 If queuing is allowed (`queue=True`), the upload request **may** be 

774 defer. An explicit call to :func:`~flush` can force the request to be 

775 send immediately (allong with the rest of the queued batch). 

776 

777 Args: 

778 file_path (str): absolute or relative path to a local file. 

779 queue (bool, optional): whether or not the upload request may be 

780 queued and submitted as part of a batch upload request. Defaults 

781 to True. 

782 

783 Returns: 

784 :obj:`Digest`: The digest of the file's content. 

785 

786 Raises: 

787 FileNotFoundError: If `file_path` does not exist. 

788 PermissionError: If `file_path` is not readable. 

789 """ 

790 if not os.path.isabs(file_path): 

791 file_path = os.path.abspath(file_path) 

792 

793 with open(file_path, 'rb') as bytes_steam: 

794 file_bytes = bytes_steam.read() 

795 

796 if not queue or len(file_bytes) > self._queueable_file_size_threshold(): 

797 file_digest = self._grpc_retrier.retry(self._send_blob, file_bytes) 

798 else: 

799 file_digest = self._queue_blob(file_bytes) 

800 

801 return file_digest 

802 

803 def upload_directory(self, directory_path: str, queue: bool = True) -> Digest: 

804 """Stores a local folder into the remote CAS storage. 

805 

806 If queuing is allowed (`queue=True`), the upload request **may** be 

807 defer. An explicit call to :func:`~flush` can force the request to be 

808 send immediately (allong with the rest of the queued batch). 

809 

810 Args: 

811 directory_path (str): absolute or relative path to a local folder. 

812 queue (bool, optional): wheter or not the upload requests may be 

813 queued and submitted as part of a batch upload request. Defaults 

814 to True. 

815 

816 Returns: 

817 :obj:`Digest`: The digest of the top :obj:`Directory`. 

818 

819 Raises: 

820 FileNotFoundError: If `directory_path` does not exist. 

821 PermissionError: If `directory_path` is not readable. 

822 """ 

823 if not os.path.isabs(directory_path): 

824 directory_path = os.path.abspath(directory_path) 

825 

826 if not queue: 

827 for node, blob, _ in merkle_tree_maker(directory_path): 

828 if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR: 

829 last_directory_node = node 

830 

831 self._grpc_retrier.retry(self._send_blob, blob, node.digest) 

832 

833 else: 

834 for node, blob, _ in merkle_tree_maker(directory_path): 

835 if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR: 

836 last_directory_node = node 

837 

838 self._queue_blob(blob, digest=node.digest) 

839 

840 return last_directory_node.digest 

841 

842 def upload_tree(self, directory_path: str, queue: bool = True) -> Digest: 

843 """Stores a local folder into the remote CAS storage as a :obj:`Tree`. 

844 

845 If queuing is allowed (`queue=True`), the upload request **may** be 

846 defer. An explicit call to :func:`~flush` can force the request to be 

847 send immediately (allong with the rest of the queued batch). 

848 

849 Args: 

850 directory_path (str): absolute or relative path to a local folder. 

851 queue (bool, optional): wheter or not the upload requests may be 

852 queued and submitted as part of a batch upload request. Defaults 

853 to True. 

854 

855 Returns: 

856 :obj:`Digest`: The digest of the :obj:`Tree`. 

857 

858 Raises: 

859 FileNotFoundError: If `directory_path` does not exist. 

860 PermissionError: If `directory_path` is not readable. 

861 """ 

862 if not os.path.isabs(directory_path): 

863 directory_path = os.path.abspath(directory_path) 

864 

865 directories = [] 

866 

867 if not queue: 

868 for node, blob, _ in merkle_tree_maker(directory_path): 

869 if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR: 

870 # TODO: Get the Directory object from merkle_tree_maker(): 

871 directory = remote_execution_pb2.Directory() 

872 directory.ParseFromString(blob) 

873 directories.append(directory) 

874 

875 self._grpc_retrier.retry(self._send_blob, blob, node.digest) 

876 

877 else: 

878 for node, blob, _ in merkle_tree_maker(directory_path): 

879 if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR: 

880 # TODO: Get the Directory object from merkle_tree_maker(): 

881 directory = remote_execution_pb2.Directory() 

882 directory.ParseFromString(blob) 

883 directories.append(directory) 

884 

885 self._queue_blob(blob, digest=node.digest) 

886 

887 tree = remote_execution_pb2.Tree() 

888 tree.root.CopyFrom(directories[-1]) 

889 tree.children.extend(directories[:-1]) 

890 

891 return self.put_message(tree, queue=queue) 

892 

893 def flush(self): 

894 """Ensures any queued request gets sent.""" 

895 if self.__requests: 

896 self._grpc_retrier.retry(self._send_blob_batch, self.__requests) 

897 

898 self.__requests.clear() 

899 self.__request_count = 0 

900 self.__request_size = 0 

901 

902 def close(self): 

903 """Closes the underlying connection stubs. 

904 

905 Note: 

906 This will always send pending requests before closing connections, 

907 if any. 

908 """ 

909 self.flush() 

910 

911 self.__bytestream_stub = None 

912 self.__cas_stub = None 

913 

914 # --- Private API --- 

915 

916 def _send_blob(self, blob: bytes, digest: Digest = None) -> Digest: 

917 """Sends a memory block using ByteStream.Write()""" 

918 blob_digest = Digest() 

919 if digest is not None: 

920 blob_digest.CopyFrom(digest) 

921 else: 

922 blob_digest.hash = HASH(blob).hexdigest() 

923 blob_digest.size_bytes = len(blob) 

924 if self.instance_name: 

925 resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs', 

926 blob_digest.hash, str(blob_digest.size_bytes)]) 

927 else: 

928 resource_name = '/'.join(['uploads', self.u_uid, 'blobs', 

929 blob_digest.hash, str(blob_digest.size_bytes)]) 

930 

931 def __write_request_stream(resource, content): 

932 offset = 0 

933 finished = False 

934 remaining = len(content) 

935 while not finished: 

936 chunk_size = min(remaining, MAX_REQUEST_SIZE) 

937 remaining -= chunk_size 

938 

939 request = bytestream_pb2.WriteRequest() 

940 request.resource_name = resource 

941 request.data = content[offset:offset + chunk_size] 

942 request.write_offset = offset 

943 request.finish_write = remaining <= 0 

944 

945 yield request 

946 

947 offset += chunk_size 

948 finished = request.finish_write 

949 

950 write_requests = __write_request_stream(resource_name, blob) 

951 

952 write_response = (self.__bytestream_stub.Write(write_requests, 

953 metadata=context_module.metadata_list())) 

954 

955 assert write_response.committed_size == blob_digest.size_bytes 

956 

957 return blob_digest 

958 

959 def _queue_blob(self, blob: bytes, digest: Digest = None) -> Digest: 

960 """Queues a memory block for later batch upload""" 

961 blob_digest = Digest() 

962 if digest is not None: 

963 blob_digest.CopyFrom(digest) 

964 else: 

965 blob_digest.hash = HASH(blob).hexdigest() 

966 blob_digest.size_bytes = len(blob) 

967 

968 # If we are here queueing a file we know that its size is 

969 # smaller than gRPC's message size limit. 

970 # We'll make a single batch request as big as the server allows. 

971 batch_size_limit = self._max_effective_batch_size_bytes() 

972 

973 if self.__request_size + blob_digest.size_bytes > batch_size_limit: 

974 self.flush() 

975 elif self.__request_count >= MAX_REQUEST_COUNT: 

976 self.flush() 

977 

978 self.__requests[blob_digest.hash] = (blob, blob_digest) 

979 self.__request_count += 1 

980 self.__request_size += blob_digest.size_bytes 

981 

982 return blob_digest 

983 

984 def _send_blob_batch(self, batch: Dict[str, Tuple[bytes, Digest]]) -> List[str]: 

985 """Sends queued data using ContentAddressableStorage.BatchUpdateBlobs()""" 

986 batch_fetched = False 

987 written_digests = [] 

988 

989 # First, try BatchUpdateBlobs(), if not already known not being implemented: 

990 if not _CallCache.unimplemented(self.channel, 'BatchUpdateBlobs'): 

991 batch_request = remote_execution_pb2.BatchUpdateBlobsRequest() 

992 if self.instance_name is not None: 

993 batch_request.instance_name = self.instance_name 

994 

995 for blob, digest in batch.values(): 

996 request = batch_request.requests.add() 

997 request.digest.CopyFrom(digest) 

998 request.data = blob 

999 

1000 try: 

1001 batch_response = (self.__cas_stub.BatchUpdateBlobs(batch_request, 

1002 metadata=context_module.metadata_list())) 

1003 

1004 for response in batch_response.responses: 

1005 assert response.digest.hash in batch 

1006 

1007 written_digests.append(response.digest) 

1008 if response.status.code != code_pb2.OK: 

1009 response.digest.Clear() 

1010 

1011 batch_fetched = True 

1012 

1013 except grpc.RpcError as e: 

1014 status_code = e.code() 

1015 if status_code == grpc.StatusCode.UNIMPLEMENTED: 

1016 _CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs') 

1017 

1018 elif status_code == grpc.StatusCode.INVALID_ARGUMENT: 

1019 written_digests.clear() 

1020 batch_fetched = False 

1021 

1022 else: 

1023 raise 

1024 

1025 # Fallback to Write() if no BatchUpdateBlobs(): 

1026 if not batch_fetched: 

1027 for blob, digest in batch.values(): 

1028 written_digests.append(self._send_blob(blob)) 

1029 

1030 return written_digests 

1031 

1032 def _max_effective_batch_size_bytes(self): 

1033 """Returns the effective maximum number of bytes that can be 

1034 transferred using batches, considering gRPC maximum message size. 

1035 """ 

1036 return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel, 

1037 self.instance_name) 

1038 

1039 def _queueable_file_size_threshold(self): 

1040 """Returns the size limit up until which files can be queued to 

1041 be requested in a batch. 

1042 """ 

1043 return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel, 

1044 self.instance_name)