Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/client/actioncache.py: 86.36%

44 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from contextlib import contextmanager 

17from typing import Iterator, Optional 

18 

19import grpc 

20 

21from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

22 ActionResult, 

23 Digest, 

24 GetActionResultRequest, 

25 UpdateActionResultRequest, 

26) 

27from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import ActionCacheStub 

28from buildgrid.server.client.retrier import GrpcRetrier 

29from buildgrid.server.exceptions import NotFoundError 

30from buildgrid.server.metadata import metadata_list 

31 

32 

33class ActionCacheClient: 

34 """Remote ActionCache service client helper. 

35 

36 The :class:`ActionCacheClient` class comes with a generator factory function 

37 that can be used together with the `with` statement for context management:: 

38 

39 from buildgrid.server.client.actioncache import query 

40 

41 with query(channel, instance='build') as action_cache: 

42 digest, action_result = action_cache.get(action_digest) 

43 """ 

44 

45 def __init__( 

46 self, 

47 channel: grpc.Channel, 

48 retries: int = 0, 

49 max_backoff: int = 64, 

50 should_backoff: bool = True, 

51 ) -> None: 

52 """Initializes a new :class:`ActionCacheClient` instance. 

53 

54 Args: 

55 channel (grpc.Channel): a gRPC channel to the ActionCache endpoint. 

56 """ 

57 self._grpc_retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

58 self.channel = channel 

59 self._stub = ActionCacheStub(self.channel) 

60 

61 def get(self, instance_name: Optional[str], action_digest: Digest) -> Optional[ActionResult]: 

62 """Attempt to retrieve cached :obj:`ActionResult` for : given obj:`Action`.""" 

63 try: 

64 return self._grpc_retrier.retry(self._get, instance_name, action_digest) 

65 except NotFoundError: 

66 return None 

67 

68 def update( 

69 self, instance_name: Optional[str], action_digest: Digest, action_result: ActionResult 

70 ) -> Optional[ActionResult]: 

71 """Attempt to map in cache an :obj:`Action` to an :obj:`ActionResult`.""" 

72 try: 

73 return self._grpc_retrier.retry(self._update, instance_name, action_digest, action_result) 

74 except NotFoundError: 

75 return None 

76 

77 def _get(self, instance_name: Optional[str], action_digest: Digest) -> ActionResult: 

78 """Retrieves the cached :obj:`ActionResult` for a given :obj:`Action`. 

79 

80 Args: 

81 instance_name (str): the instance name of the action cache. 

82 action_digest (:obj:`Digest`): the action's digest to query. 

83 

84 Returns: 

85 :obj:`ActionResult`: the cached result or None if not found. 

86 

87 Raises: 

88 grpc.RpcError: on any network or remote service error. 

89 """ 

90 

91 request = GetActionResultRequest() 

92 request.instance_name = instance_name or "" 

93 request.action_digest.CopyFrom(action_digest) 

94 

95 res = self._stub.GetActionResult(request, metadata=metadata_list()) 

96 return res 

97 

98 def _update( 

99 self, instance_name: Optional[str], action_digest: Digest, action_result: ActionResult 

100 ) -> ActionResult: 

101 """Maps in cache an :obj:`Action` to an :obj:`ActionResult`. 

102 

103 Args: 

104 instance_name (str): the instance name of the action cache. 

105 action_digest (:obj:`Digest`): the action's digest to update. 

106 action_result (:obj:`ActionResult`): the action's result. 

107 

108 Returns: 

109 :obj:`ActionResult`: the cached result or None on failure. 

110 

111 Raises: 

112 grpc.RpcError: on any network or remote service error. 

113 """ 

114 

115 request = UpdateActionResultRequest() 

116 request.instance_name = instance_name or "" 

117 request.action_digest.CopyFrom(action_digest) 

118 request.action_result.CopyFrom(action_result) 

119 

120 res = self._stub.UpdateActionResult(request, metadata=metadata_list()) 

121 return res 

122 

123 def close(self) -> None: 

124 """Closes the underlying connection stubs.""" 

125 self.channel.close() 

126 

127 

128@contextmanager 

129def query( 

130 channel: grpc.Channel, 

131 retries: int = 0, 

132 max_backoff: int = 64, 

133 should_backoff: bool = True, 

134) -> Iterator[ActionCacheClient]: 

135 """Context manager generator for the :class:`ActionCacheClient` class.""" 

136 client = ActionCacheClient(channel, retries=retries, max_backoff=max_backoff, should_backoff=should_backoff) 

137 try: 

138 yield client 

139 finally: 

140 client.close()