Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/client/asset.py: 79.07%
43 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from datetime import datetime
17from typing import Any, Iterable, Mapping, Optional
19import grpc
20from google.protobuf.timestamp_pb2 import Timestamp
22from buildgrid._protos.build.bazel.remote.asset.v1.remote_asset_pb2 import (
23 PushBlobRequest,
24 PushBlobResponse,
25 PushDirectoryRequest,
26 PushDirectoryResponse,
27 Qualifier,
28)
29from buildgrid._protos.build.bazel.remote.asset.v1.remote_asset_pb2_grpc import FetchStub, PushStub
30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
31from buildgrid.server.client.retrier import GrpcRetrier
32from buildgrid.server.logging import buildgrid_logger
33from buildgrid.server.metadata import metadata_list
35LOGGER = buildgrid_logger(__name__)
38class AssetClient:
39 """Client for Fetch and Push services defined in remote_asset protocol"""
41 def __init__(
42 self,
43 channel: grpc.Channel,
44 instance_name: str,
45 retries: int = 0,
46 max_backoff: int = 64,
47 should_backoff: bool = True,
48 ) -> None:
49 self._channel = channel
50 self._instance_name = instance_name
51 self._push_stub = PushStub(channel)
52 self._fetch_stub = FetchStub(channel)
53 self._retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff)
55 def __enter__(self) -> "AssetClient":
56 return self
58 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
59 self._channel.close()
60 LOGGER.info("Stopped AssetClient.")
62 def push_blob(
63 self,
64 *,
65 uris: Iterable[str],
66 qualifiers: Mapping[str, str],
67 blob_digest: Digest,
68 expire_at: Optional[datetime] = None,
69 referenced_blobs: Iterable[Digest] = [],
70 referenced_directories: Iterable[Digest] = [],
71 ) -> PushBlobResponse:
72 def _push_blob() -> PushBlobResponse:
73 qualifiers_pb = [Qualifier(name=name, value=value) for name, value in qualifiers.items()]
74 expire_at_pb: Optional[Timestamp] = None
75 if expire_at is not None:
76 expire_at_pb = Timestamp()
77 expire_at_pb.FromDatetime(expire_at)
79 request = PushBlobRequest(
80 instance_name=self._instance_name,
81 uris=uris,
82 qualifiers=qualifiers_pb,
83 expire_at=expire_at_pb,
84 blob_digest=blob_digest,
85 references_blobs=referenced_blobs,
86 references_directories=referenced_directories,
87 )
88 return self._push_stub.PushBlob(request=request, metadata=metadata_list())
90 return self._retrier.retry(_push_blob)
92 def push_directory(
93 self,
94 *,
95 uris: Iterable[str],
96 qualifiers: Mapping[str, str],
97 root_directory_digest: Digest,
98 expire_at: Optional[datetime] = None,
99 referenced_blobs: Iterable[Digest] = [],
100 referenced_directories: Iterable[Digest] = [],
101 ) -> PushDirectoryResponse:
102 def _push_directory() -> PushDirectoryResponse:
103 qualifiers_pb = [Qualifier(name=name, value=value) for name, value in qualifiers.items()]
104 expire_at_pb: Optional[Timestamp] = None
105 if expire_at is not None:
106 expire_at_pb = Timestamp()
107 expire_at_pb.FromDatetime(expire_at)
109 request = PushDirectoryRequest(
110 instance_name=self._instance_name,
111 uris=uris,
112 qualifiers=qualifiers_pb,
113 expire_at=expire_at_pb,
114 root_directory_digest=root_directory_digest,
115 references_blobs=referenced_blobs,
116 references_directories=referenced_directories,
117 )
118 return self._push_stub.PushDirectory(request=request, metadata=metadata_list())
120 return self._retrier.retry(_push_directory)