Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/service.py: 65.71%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

70 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ActionCacheService 

18================== 

19 

20Allows clients to manually query/update the action cache. 

21""" 

22 

23import logging 

24 

25import grpc 

26 

27from buildgrid._exceptions import InvalidArgumentError, NotFoundError, StorageFullError, RetriableError 

28from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

29from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc 

30from buildgrid.server._authentication import AuthContext, authorize 

31from buildgrid.server.metrics_names import ( 

32 AC_GET_ACTION_RESULT_TIME_METRIC_NAME, 

33 AC_UPDATE_ACTION_RESULT_TIME_METRIC_NAME, 

34 AC_CACHE_HITS_METRIC_NAME, 

35 AC_CACHE_MISSES_METRIC_NAME 

36) 

37from buildgrid.server.metrics_utils import publish_counter_metric, DurationMetric 

38from buildgrid.server.request_metadata_utils import printable_request_metadata 

39 

40 

41class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): 

42 

43 def __init__(self, server): 

44 self.__logger = logging.getLogger(__name__) 

45 

46 self._instances = {} 

47 

48 remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server) 

49 

50 # --- Public API --- 

51 

52 def add_instance(self, name, instance): 

53 self._instances[name] = instance 

54 

55 # --- Public API: Servicer --- 

56 

57 @authorize(AuthContext) 

58 def GetActionResult(self, request, context): 

59 self.__logger.info(f"GetActionResult request from [{context.peer()}] " 

60 f"([{printable_request_metadata(context.invocation_metadata())}])") 

61 

62 try: 

63 instance = self._get_instance(request.instance_name) 

64 with DurationMetric(AC_GET_ACTION_RESULT_TIME_METRIC_NAME, 

65 request.instance_name, 

66 instanced=True): 

67 action_result = instance.get_action_result(request.action_digest) 

68 publish_counter_metric( 

69 AC_CACHE_HITS_METRIC_NAME, 

70 1, 

71 {"instance-name": request.instance_name} 

72 ) 

73 return action_result 

74 

75 except InvalidArgumentError as e: 

76 self.__logger.info(e) 

77 context.set_details(str(e)) 

78 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

79 

80 except NotFoundError as e: 

81 self.__logger.debug(e) 

82 context.set_code(grpc.StatusCode.NOT_FOUND) 

83 publish_counter_metric( 

84 AC_CACHE_MISSES_METRIC_NAME, 

85 1, 

86 {"instance-name": request.instance_name} 

87 ) 

88 

89 except RetriableError as e: 

90 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

91 context.abort_with_status(e.error_status) 

92 

93 except Exception as e: 

94 self.__logger.exception( 

95 f"Unexpected error in GetActionResult; request=[{request}]" 

96 ) 

97 context.set_code(grpc.StatusCode.INTERNAL) 

98 

99 return remote_execution_pb2.ActionResult() 

100 

101 @authorize(AuthContext) 

102 def UpdateActionResult(self, request, context): 

103 self.__logger.info(f"UpdateActionResult request from [{context.peer()}] " 

104 f"([{printable_request_metadata(context.invocation_metadata())}])") 

105 

106 try: 

107 instance = self._get_instance(request.instance_name) 

108 with DurationMetric(AC_UPDATE_ACTION_RESULT_TIME_METRIC_NAME, 

109 request.instance_name, 

110 instanced=True): 

111 instance.update_action_result(request.action_digest, request.action_result) 

112 return request.action_result 

113 

114 except InvalidArgumentError as e: 

115 self.__logger.info(e) 

116 context.set_details(str(e)) 

117 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

118 

119 except NotImplementedError as e: 

120 self.__logger.info(e) 

121 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

122 

123 except StorageFullError as e: 

124 self.__logger.exception(e) 

125 context.set_details(str(e)) 

126 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 

127 

128 except RetriableError as e: 

129 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

130 context.abort_with_status(e.error_status) 

131 

132 except Exception as e: 

133 self.__logger.exception( 

134 f"Unexpected error in UpdateActionResult; request=[{request}]" 

135 ) 

136 context.set_code(grpc.StatusCode.INTERNAL) 

137 

138 return remote_execution_pb2.ActionResult() 

139 

140 # --- Private API --- 

141 

142 def _get_instance(self, instance_name): 

143 try: 

144 return self._instances[instance_name] 

145 

146 except KeyError: 

147 raise InvalidArgumentError(f"Invalid instance name: [{instance_name}]")