Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/service.py: 65.33%

75 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ActionCacheService 

18================== 

19 

20Allows clients to manually query/update the action cache. 

21""" 

22 

23import logging 

24 

25import grpc 

26 

27import buildgrid.server.context as context_module 

28 

29from buildgrid._exceptions import InvalidArgumentError, NotFoundError, StorageFullError, RetriableError 

30from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

31from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc 

32from buildgrid.server._authentication import AuthContext, authorize 

33from buildgrid.server.metrics_names import ( 

34 AC_GET_ACTION_RESULT_TIME_METRIC_NAME, 

35 AC_UPDATE_ACTION_RESULT_TIME_METRIC_NAME, 

36 AC_CACHE_HITS_METRIC_NAME, 

37 AC_CACHE_MISSES_METRIC_NAME 

38) 

39from buildgrid.server.metrics_utils import publish_counter_metric, DurationMetric 

40from buildgrid.server.request_metadata_utils import printable_request_metadata 

41 

42 

43class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): 

44 

45 def __init__(self, server): 

46 self.__logger = logging.getLogger(__name__) 

47 

48 self._instances = {} 

49 

50 remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server) 

51 

52 # --- Public API --- 

53 

54 def add_instance(self, name, instance): 

55 self._instances[name] = instance 

56 

57 # --- Public API: Servicer --- 

58 

59 @context_module.metadatacontext() 

60 @authorize(AuthContext) 

61 def GetActionResult(self, request, context): 

62 self.__logger.info(f"GetActionResult request from [{context.peer()}] " 

63 f"([{printable_request_metadata(context.invocation_metadata())}])") 

64 

65 try: 

66 instance = self._get_instance(request.instance_name) 

67 with DurationMetric(AC_GET_ACTION_RESULT_TIME_METRIC_NAME, 

68 request.instance_name, 

69 instanced=True): 

70 action_result = instance.get_action_result(request.action_digest) 

71 publish_counter_metric( 

72 AC_CACHE_HITS_METRIC_NAME, 

73 1, 

74 {"instance-name": request.instance_name} 

75 ) 

76 return action_result 

77 

78 except InvalidArgumentError as e: 

79 self.__logger.info(e) 

80 context.set_details(str(e)) 

81 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

82 

83 except NotFoundError as e: 

84 self.__logger.debug(e) 

85 context.set_code(grpc.StatusCode.NOT_FOUND) 

86 publish_counter_metric( 

87 AC_CACHE_MISSES_METRIC_NAME, 

88 1, 

89 {"instance-name": request.instance_name} 

90 ) 

91 

92 except RetriableError as e: 

93 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

94 context.abort_with_status(e.error_status) 

95 

96 except Exception as e: 

97 self.__logger.exception( 

98 f"Unexpected error in GetActionResult; request=[{request}]" 

99 ) 

100 context.set_details(str(e)) 

101 context.set_code(grpc.StatusCode.INTERNAL) 

102 

103 return remote_execution_pb2.ActionResult() 

104 

105 @context_module.metadatacontext() 

106 @authorize(AuthContext) 

107 def UpdateActionResult(self, request, context): 

108 self.__logger.info(f"UpdateActionResult request from [{context.peer()}] " 

109 f"([{printable_request_metadata(context.invocation_metadata())}])") 

110 

111 try: 

112 instance = self._get_instance(request.instance_name) 

113 with DurationMetric(AC_UPDATE_ACTION_RESULT_TIME_METRIC_NAME, 

114 request.instance_name, 

115 instanced=True): 

116 instance.update_action_result(request.action_digest, request.action_result) 

117 return request.action_result 

118 

119 except InvalidArgumentError as e: 

120 self.__logger.info(e) 

121 context.set_details(str(e)) 

122 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

123 

124 except NotImplementedError as e: 

125 self.__logger.info(e) 

126 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

127 

128 except StorageFullError as e: 

129 self.__logger.exception(e) 

130 context.set_details(str(e)) 

131 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 

132 

133 except RetriableError as e: 

134 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

135 context.abort_with_status(e.error_status) 

136 

137 except Exception as e: 

138 self.__logger.exception( 

139 f"Unexpected error in UpdateActionResult; request=[{request}]" 

140 ) 

141 context.set_details(str(e)) 

142 context.set_code(grpc.StatusCode.INTERNAL) 

143 

144 return remote_execution_pb2.ActionResult() 

145 

146 # --- Private API --- 

147 

148 def _get_instance(self, instance_name): 

149 try: 

150 return self._instances[instance_name] 

151 

152 except KeyError: 

153 raise InvalidArgumentError(f"Invalid instance name: [{instance_name}]")