Coverage for /builds/BuildGrid/buildgrid/buildgrid/client/asset.py: 79.07%

43 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16from datetime import datetime 

17from typing import Any, Iterable, Mapping, Optional 

18 

19import grpc 

20from google.protobuf.timestamp_pb2 import Timestamp 

21 

22import buildgrid.server.context as context_module 

23from buildgrid._protos.build.bazel.remote.asset.v1.remote_asset_pb2 import ( 

24 PushBlobRequest, 

25 PushBlobResponse, 

26 PushDirectoryRequest, 

27 PushDirectoryResponse, 

28 Qualifier, 

29) 

30from buildgrid._protos.build.bazel.remote.asset.v1.remote_asset_pb2_grpc import FetchStub, PushStub 

31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

32from buildgrid.client.retrier import GrpcRetrier 

33 

34LOGGER = logging.getLogger(__name__) 

35 

36 

37class AssetClient: 

38 """Client for Fetch and Push services defined in remote_asset protocol""" 

39 

40 def __init__( 

41 self, 

42 channel: grpc.Channel, 

43 instance_name: str, 

44 retries: int = 0, 

45 max_backoff: int = 64, 

46 should_backoff: bool = True, 

47 ) -> None: 

48 self._channel = channel 

49 self._instance_name = instance_name 

50 self._push_stub = PushStub(channel) 

51 self._fetch_stub = FetchStub(channel) 

52 self._retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

53 

54 def __enter__(self) -> "AssetClient": 

55 return self 

56 

57 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

58 self._channel.close() 

59 LOGGER.info("Stopped AssetClient") 

60 

61 def push_blob( 

62 self, 

63 *, 

64 uris: Iterable[str], 

65 qualifiers: Mapping[str, str], 

66 blob_digest: Digest, 

67 expire_at: Optional[datetime] = None, 

68 referenced_blobs: Iterable[Digest] = [], 

69 referenced_directories: Iterable[Digest] = [], 

70 ) -> PushBlobResponse: 

71 def _push_blob() -> PushBlobResponse: 

72 qualifiers_pb = [Qualifier(name=name, value=value) for name, value in qualifiers.items()] 

73 expire_at_pb: Optional[Timestamp] = None 

74 if expire_at is not None: 

75 expire_at_pb = Timestamp() 

76 expire_at_pb.FromDatetime(expire_at) 

77 

78 request = PushBlobRequest( 

79 instance_name=self._instance_name, 

80 uris=uris, 

81 qualifiers=qualifiers_pb, 

82 expire_at=expire_at_pb, 

83 blob_digest=blob_digest, 

84 references_blobs=referenced_blobs, 

85 references_directories=referenced_directories, 

86 ) 

87 return self._push_stub.PushBlob(request=request, metadata=context_module.metadata_list()) 

88 

89 return self._retrier.retry(_push_blob) 

90 

91 def push_directory( 

92 self, 

93 *, 

94 uris: Iterable[str], 

95 qualifiers: Mapping[str, str], 

96 root_directory_digest: Digest, 

97 expire_at: Optional[datetime] = None, 

98 referenced_blobs: Iterable[Digest] = [], 

99 referenced_directories: Iterable[Digest] = [], 

100 ) -> PushDirectoryResponse: 

101 def _push_directory() -> PushDirectoryResponse: 

102 qualifiers_pb = [Qualifier(name=name, value=value) for name, value in qualifiers.items()] 

103 expire_at_pb: Optional[Timestamp] = None 

104 if expire_at is not None: 

105 expire_at_pb = Timestamp() 

106 expire_at_pb.FromDatetime(expire_at) 

107 

108 request = PushDirectoryRequest( 

109 instance_name=self._instance_name, 

110 uris=uris, 

111 qualifiers=qualifiers_pb, 

112 expire_at=expire_at_pb, 

113 root_directory_digest=root_directory_digest, 

114 references_blobs=referenced_blobs, 

115 references_directories=referenced_directories, 

116 ) 

117 return self._push_stub.PushDirectory(request=request, metadata=context_module.metadata_list()) 

118 

119 return self._retrier.retry(_push_directory)