Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/client/asset.py: 79.07%

43 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from datetime import datetime 

17from typing import Any, Iterable, Mapping, Optional 

18 

19import grpc 

20from google.protobuf.timestamp_pb2 import Timestamp 

21 

22from buildgrid._protos.build.bazel.remote.asset.v1.remote_asset_pb2 import ( 

23 PushBlobRequest, 

24 PushBlobResponse, 

25 PushDirectoryRequest, 

26 PushDirectoryResponse, 

27 Qualifier, 

28) 

29from buildgrid._protos.build.bazel.remote.asset.v1.remote_asset_pb2_grpc import FetchStub, PushStub 

30from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

31from buildgrid.server.client.retrier import GrpcRetrier 

32from buildgrid.server.logging import buildgrid_logger 

33from buildgrid.server.metadata import metadata_list 

34 

35LOGGER = buildgrid_logger(__name__) 

36 

37 

38class AssetClient: 

39 """Client for Fetch and Push services defined in remote_asset protocol""" 

40 

41 def __init__( 

42 self, 

43 channel: grpc.Channel, 

44 instance_name: str, 

45 retries: int = 0, 

46 max_backoff: int = 64, 

47 should_backoff: bool = True, 

48 ) -> None: 

49 self._channel = channel 

50 self._instance_name = instance_name 

51 self._push_stub = PushStub(channel) 

52 self._fetch_stub = FetchStub(channel) 

53 self._retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

54 

55 def __enter__(self) -> "AssetClient": 

56 return self 

57 

58 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

59 self._channel.close() 

60 LOGGER.info("Stopped AssetClient.") 

61 

62 def push_blob( 

63 self, 

64 *, 

65 uris: Iterable[str], 

66 qualifiers: Mapping[str, str], 

67 blob_digest: Digest, 

68 expire_at: Optional[datetime] = None, 

69 referenced_blobs: Iterable[Digest] = [], 

70 referenced_directories: Iterable[Digest] = [], 

71 ) -> PushBlobResponse: 

72 def _push_blob() -> PushBlobResponse: 

73 qualifiers_pb = [Qualifier(name=name, value=value) for name, value in qualifiers.items()] 

74 expire_at_pb: Optional[Timestamp] = None 

75 if expire_at is not None: 

76 expire_at_pb = Timestamp() 

77 expire_at_pb.FromDatetime(expire_at) 

78 

79 request = PushBlobRequest( 

80 instance_name=self._instance_name, 

81 uris=uris, 

82 qualifiers=qualifiers_pb, 

83 expire_at=expire_at_pb, 

84 blob_digest=blob_digest, 

85 references_blobs=referenced_blobs, 

86 references_directories=referenced_directories, 

87 ) 

88 return self._push_stub.PushBlob(request=request, metadata=metadata_list()) 

89 

90 return self._retrier.retry(_push_blob) 

91 

92 def push_directory( 

93 self, 

94 *, 

95 uris: Iterable[str], 

96 qualifiers: Mapping[str, str], 

97 root_directory_digest: Digest, 

98 expire_at: Optional[datetime] = None, 

99 referenced_blobs: Iterable[Digest] = [], 

100 referenced_directories: Iterable[Digest] = [], 

101 ) -> PushDirectoryResponse: 

102 def _push_directory() -> PushDirectoryResponse: 

103 qualifiers_pb = [Qualifier(name=name, value=value) for name, value in qualifiers.items()] 

104 expire_at_pb: Optional[Timestamp] = None 

105 if expire_at is not None: 

106 expire_at_pb = Timestamp() 

107 expire_at_pb.FromDatetime(expire_at) 

108 

109 request = PushDirectoryRequest( 

110 instance_name=self._instance_name, 

111 uris=uris, 

112 qualifiers=qualifiers_pb, 

113 expire_at=expire_at_pb, 

114 root_directory_digest=root_directory_digest, 

115 references_blobs=referenced_blobs, 

116 references_directories=referenced_directories, 

117 ) 

118 return self._push_stub.PushDirectory(request=request, metadata=metadata_list()) 

119 

120 return self._retrier.retry(_push_directory)