Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/client/actioncache.py: 86.36%
44 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from contextlib import contextmanager
17from typing import Iterator, Optional
19import grpc
21from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import (
22 ActionResult,
23 Digest,
24 GetActionResultRequest,
25 UpdateActionResultRequest,
26)
27from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2_grpc import ActionCacheStub
28from buildgrid.server.client.retrier import GrpcRetrier
29from buildgrid.server.exceptions import NotFoundError
30from buildgrid.server.metadata import metadata_list
33class ActionCacheClient:
34 """Remote ActionCache service client helper.
36 The :class:`ActionCacheClient` class comes with a generator factory function
37 that can be used together with the `with` statement for context management::
39 from buildgrid.server.client.actioncache import query
41 with query(channel, instance='build') as action_cache:
42 digest, action_result = action_cache.get(action_digest)
43 """
45 def __init__(
46 self,
47 channel: grpc.Channel,
48 retries: int = 0,
49 max_backoff: int = 64,
50 should_backoff: bool = True,
51 ) -> None:
52 """Initializes a new :class:`ActionCacheClient` instance.
54 Args:
55 channel (grpc.Channel): a gRPC channel to the ActionCache endpoint.
56 """
57 self._grpc_retrier = GrpcRetrier(retries=retries, max_backoff=max_backoff, should_backoff=should_backoff)
58 self.channel = channel
59 self._stub = ActionCacheStub(self.channel)
61 def get(self, instance_name: Optional[str], action_digest: Digest) -> Optional[ActionResult]:
62 """Attempt to retrieve cached :obj:`ActionResult` for : given obj:`Action`."""
63 try:
64 return self._grpc_retrier.retry(self._get, instance_name, action_digest)
65 except NotFoundError:
66 return None
68 def update(
69 self, instance_name: Optional[str], action_digest: Digest, action_result: ActionResult
70 ) -> Optional[ActionResult]:
71 """Attempt to map in cache an :obj:`Action` to an :obj:`ActionResult`."""
72 try:
73 return self._grpc_retrier.retry(self._update, instance_name, action_digest, action_result)
74 except NotFoundError:
75 return None
77 def _get(self, instance_name: Optional[str], action_digest: Digest) -> ActionResult:
78 """Retrieves the cached :obj:`ActionResult` for a given :obj:`Action`.
80 Args:
81 instance_name (str): the instance name of the action cache.
82 action_digest (:obj:`Digest`): the action's digest to query.
84 Returns:
85 :obj:`ActionResult`: the cached result or None if not found.
87 Raises:
88 grpc.RpcError: on any network or remote service error.
89 """
91 request = GetActionResultRequest()
92 request.instance_name = instance_name or ""
93 request.action_digest.CopyFrom(action_digest)
95 res = self._stub.GetActionResult(request, metadata=metadata_list())
96 return res
98 def _update(
99 self, instance_name: Optional[str], action_digest: Digest, action_result: ActionResult
100 ) -> ActionResult:
101 """Maps in cache an :obj:`Action` to an :obj:`ActionResult`.
103 Args:
104 instance_name (str): the instance name of the action cache.
105 action_digest (:obj:`Digest`): the action's digest to update.
106 action_result (:obj:`ActionResult`): the action's result.
108 Returns:
109 :obj:`ActionResult`: the cached result or None on failure.
111 Raises:
112 grpc.RpcError: on any network or remote service error.
113 """
115 request = UpdateActionResultRequest()
116 request.instance_name = instance_name or ""
117 request.action_digest.CopyFrom(action_digest)
118 request.action_result.CopyFrom(action_result)
120 res = self._stub.UpdateActionResult(request, metadata=metadata_list())
121 return res
123 def close(self) -> None:
124 """Closes the underlying connection stubs."""
125 self.channel.close()
128@contextmanager
129def query(
130 channel: grpc.Channel,
131 retries: int = 0,
132 max_backoff: int = 64,
133 should_backoff: bool = True,
134) -> Iterator[ActionCacheClient]:
135 """Context manager generator for the :class:`ActionCacheClient` class."""
136 client = ActionCacheClient(channel, retries=retries, max_backoff=max_backoff, should_backoff=should_backoff)
137 try:
138 yield client
139 finally:
140 client.close()