Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/client/retrier.py: 73.17%
41 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2021-2022 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import random
17import time
18from typing import Any, Callable, TypeVar
20import grpc
22from buildgrid.server.exceptions import (
23 CancelledError,
24 FailedPreconditionError,
25 InvalidArgumentError,
26 NotFoundError,
27 PermissionDeniedError,
28 StorageFullError,
29)
31T = TypeVar("T")
34class GrpcRetrier:
35 def __init__(self, retries: int, max_backoff: int = 64, should_backoff: bool = True):
36 """Initializes a new :class:`GrpcRetrier`.
38 Args:
39 retries (int): The maximum number of attempts for each RPC call.
40 max_backoff (int): The maximum time to wait between retries.
41 should_backoff (bool): Whether to backoff at all. Always set this to True except in tests.
42 """
44 self._retries = retries
45 self._max_backoff = max_backoff
46 self._should_backoff = should_backoff
48 def retry(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
49 attempts = 0
50 while True:
51 try:
52 return func(*args, **kwargs)
53 except grpc.RpcError as e:
54 status_code = e.code()
56 # Retry only on UNAVAILABLE and ABORTED
57 if status_code in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.ABORTED):
58 attempts += 1
59 if attempts > self._retries:
60 raise ConnectionError(e.details()) from e
61 if self._should_backoff:
62 # Sleep for 2^(N-1) + jitter seconds, where N is # of attempts
63 jitter = random.uniform(0, 1)
64 time.sleep(min(pow(2, attempts - 1) + jitter, self._max_backoff))
66 elif status_code == grpc.StatusCode.CANCELLED:
67 raise CancelledError(e.details()) from e
69 elif status_code == grpc.StatusCode.INVALID_ARGUMENT:
70 raise InvalidArgumentError(e.details()) from e
72 elif status_code == grpc.StatusCode.DEADLINE_EXCEEDED:
73 raise TimeoutError(e.details()) from e
75 elif status_code == grpc.StatusCode.NOT_FOUND:
76 raise NotFoundError("Requested data does not exist on remote") from e
78 elif status_code == grpc.StatusCode.PERMISSION_DENIED:
79 raise PermissionDeniedError(e.details()) from e
81 elif status_code == grpc.StatusCode.RESOURCE_EXHAUSTED:
82 raise StorageFullError(e.details()) from e
84 elif status_code == grpc.StatusCode.FAILED_PRECONDITION:
85 raise FailedPreconditionError(e.details()) from e
87 elif status_code == grpc.StatusCode.UNIMPLEMENTED:
88 raise NotImplementedError(e.details()) from e
90 else:
91 raise