Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/client/retrier.py: 73.17%

41 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2021-2022 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import random 

17import time 

18from typing import Any, Callable, TypeVar 

19 

20import grpc 

21 

22from buildgrid.server.exceptions import ( 

23 CancelledError, 

24 FailedPreconditionError, 

25 InvalidArgumentError, 

26 NotFoundError, 

27 PermissionDeniedError, 

28 StorageFullError, 

29) 

30 

31T = TypeVar("T") 

32 

33 

34class GrpcRetrier: 

35 def __init__(self, retries: int, max_backoff: int = 64, should_backoff: bool = True): 

36 """Initializes a new :class:`GrpcRetrier`. 

37 

38 Args: 

39 retries (int): The maximum number of attempts for each RPC call. 

40 max_backoff (int): The maximum time to wait between retries. 

41 should_backoff (bool): Whether to backoff at all. Always set this to True except in tests. 

42 """ 

43 

44 self._retries = retries 

45 self._max_backoff = max_backoff 

46 self._should_backoff = should_backoff 

47 

48 def retry(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: 

49 attempts = 0 

50 while True: 

51 try: 

52 return func(*args, **kwargs) 

53 except grpc.RpcError as e: 

54 status_code = e.code() 

55 

56 # Retry only on UNAVAILABLE and ABORTED 

57 if status_code in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.ABORTED): 

58 attempts += 1 

59 if attempts > self._retries: 

60 raise ConnectionError(e.details()) from e 

61 if self._should_backoff: 

62 # Sleep for 2^(N-1) + jitter seconds, where N is # of attempts 

63 jitter = random.uniform(0, 1) 

64 time.sleep(min(pow(2, attempts - 1) + jitter, self._max_backoff)) 

65 

66 elif status_code == grpc.StatusCode.CANCELLED: 

67 raise CancelledError(e.details()) from e 

68 

69 elif status_code == grpc.StatusCode.INVALID_ARGUMENT: 

70 raise InvalidArgumentError(e.details()) from e 

71 

72 elif status_code == grpc.StatusCode.DEADLINE_EXCEEDED: 

73 raise TimeoutError(e.details()) from e 

74 

75 elif status_code == grpc.StatusCode.NOT_FOUND: 

76 raise NotFoundError("Requested data does not exist on remote") from e 

77 

78 elif status_code == grpc.StatusCode.PERMISSION_DENIED: 

79 raise PermissionDeniedError(e.details()) from e 

80 

81 elif status_code == grpc.StatusCode.RESOURCE_EXHAUSTED: 

82 raise StorageFullError(e.details()) from e 

83 

84 elif status_code == grpc.StatusCode.FAILED_PRECONDITION: 

85 raise FailedPreconditionError(e.details()) from e 

86 

87 elif status_code == grpc.StatusCode.UNIMPLEMENTED: 

88 raise NotImplementedError(e.details()) from e 

89 

90 else: 

91 raise