Coverage for /builds/BuildGrid/buildgrid/buildgrid/client/actioncache.py: 87.23%

47 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from contextlib import contextmanager 

17from typing import Iterator, Optional 

18 

19import grpc 

20 

21import buildgrid.server.context as context_module 

22from buildgrid._exceptions import NotFoundError 

23from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

24 ActionResult, 

25 Digest, 

26 GetActionResultRequest, 

27 UpdateActionResultRequest, 

28) 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import ActionCacheStub 

30from buildgrid.client.retrier import GrpcRetrier 

31 

32 

33class ActionCacheClient: 

34 """Remote ActionCache service client helper. 

35 

36 The :class:`ActionCacheClient` class comes with a generator factory function 

37 that can be used together with the `with` statement for context management:: 

38 

39 from buildgrid.client.actioncache import query 

40 

41 with query(channel, instance='build') as action_cache: 

42 digest, action_result = action_cache.get(action_digest) 

43 """ 

44 

45 def __init__( 

46 self, 

47 channel: grpc.Channel, 

48 instance: Optional[str] = None, 

49 retries: int = 0, 

50 max_backoff: int = 64, 

51 should_backoff: bool = True, 

52 ) -> None: 

53 """Initializes a new :class:`ActionCacheClient` instance. 

54 

55 Args: 

56 channel (grpc.Channel): a gRPC channel to the ActionCache endpoint. 

57 instance (str, optional): the targeted instance's name. 

58 """ 

59 self._grpc_retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

60 self.channel = channel 

61 

62 self.instance_name = instance 

63 

64 self.__actioncache_stub = ActionCacheStub(self.channel) 

65 

66 # --- Public API --- 

67 

68 def get(self, action_digest: Digest) -> Optional[ActionResult]: 

69 """Attempt to retrieve cached :obj:`ActionResult` for : given obj:`Action`.""" 

70 try: 

71 return self._grpc_retrier.retry(self._get, action_digest) 

72 except NotFoundError: 

73 return None 

74 

75 def update(self, action_digest: Digest, action_result: ActionResult) -> Optional[ActionResult]: 

76 """Attempt to map in cache an :obj:`Action` to an :obj:`ActionResult`.""" 

77 try: 

78 return self._grpc_retrier.retry(self._update, action_digest, action_result) 

79 except NotFoundError: 

80 return None 

81 

82 def _get(self, action_digest: Digest) -> ActionResult: 

83 """Retrieves the cached :obj:`ActionResult` for a given :obj:`Action`. 

84 

85 Args: 

86 action_digest (:obj:`Digest`): the action's digest to query. 

87 

88 Returns: 

89 :obj:`ActionResult`: the cached result or None if not found. 

90 

91 Raises: 

92 grpc.RpcError: on any network or remote service error. 

93 """ 

94 

95 request = GetActionResultRequest() 

96 if self.instance_name: 

97 request.instance_name = self.instance_name 

98 request.action_digest.CopyFrom(action_digest) 

99 

100 res = self.__actioncache_stub.GetActionResult(request, metadata=context_module.metadata_list()) 

101 return res 

102 

103 def _update(self, action_digest: Digest, action_result: ActionResult) -> ActionResult: 

104 """Maps in cache an :obj:`Action` to an :obj:`ActionResult`. 

105 

106 Args: 

107 action_digest (:obj:`Digest`): the action's digest to update. 

108 action_result (:obj:`ActionResult`): the action's result. 

109 

110 Returns: 

111 :obj:`ActionResult`: the cached result or None on failure. 

112 

113 Raises: 

114 grpc.RpcError: on any network or remote service error. 

115 """ 

116 

117 request = UpdateActionResultRequest() 

118 if self.instance_name: 

119 request.instance_name = self.instance_name 

120 request.action_digest.CopyFrom(action_digest) 

121 request.action_result.CopyFrom(action_result) 

122 

123 res = self.__actioncache_stub.UpdateActionResult(request, metadata=context_module.metadata_list()) 

124 return res 

125 

126 def close(self) -> None: 

127 """Closes the underlying connection stubs.""" 

128 self.channel.close() 

129 

130 

131@contextmanager 

132def query( 

133 channel: grpc.Channel, 

134 instance: Optional[str] = None, 

135 retries: int = 0, 

136 max_backoff: int = 64, 

137 should_backoff: bool = True, 

138) -> Iterator[ActionCacheClient]: 

139 """Context manager generator for the :class:`ActionCacheClient` class.""" 

140 client = ActionCacheClient( 

141 channel, instance=instance, retries=retries, max_backoff=max_backoff, should_backoff=should_backoff 

142 ) 

143 try: 

144 yield client 

145 finally: 

146 client.close()