Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ActionCacheService 

18================== 

19 

20Allows clients to manually query/update the action cache. 

21""" 

22 

23import logging 

24 

25import grpc 

26 

27from buildgrid._exceptions import InvalidArgumentError, NotFoundError, StorageFullError 

28from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

29from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc 

30from buildgrid.server._authentication import AuthContext, authorize 

31from buildgrid.server.metrics_names import ( 

32 AC_GET_ACTION_RESULT_TIME_METRIC_NAME, 

33 AC_UPDATE_ACTION_RESULT_TIME_METRIC_NAME, 

34 AC_CACHE_HITS_METRIC_NAME, 

35 AC_CACHE_MISSES_METRIC_NAME 

36) 

37from buildgrid.server.metrics_utils import publish_counter_metric, DurationMetric 

38from buildgrid.server.request_metadata_utils import printable_request_metadata 

39 

40 

41class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): 

42 

43 def __init__(self, server): 

44 self.__logger = logging.getLogger(__name__) 

45 

46 self._instances = {} 

47 

48 remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server) 

49 

50 # --- Public API --- 

51 

52 def add_instance(self, name, instance): 

53 self._instances[name] = instance 

54 

55 # --- Public API: Servicer --- 

56 

57 @authorize(AuthContext) 

58 def GetActionResult(self, request, context): 

59 self.__logger.info(f"GetActionResult request from [{context.peer()}] " 

60 f"([{printable_request_metadata(context)}])") 

61 

62 try: 

63 instance = self._get_instance(request.instance_name) 

64 with DurationMetric(AC_GET_ACTION_RESULT_TIME_METRIC_NAME, 

65 request.instance_name, 

66 instanced=True): 

67 action_result = instance.get_action_result(request.action_digest) 

68 publish_counter_metric( 

69 AC_CACHE_HITS_METRIC_NAME, 

70 1, 

71 {"instance-name": request.instance_name} 

72 ) 

73 return action_result 

74 

75 except InvalidArgumentError as e: 

76 self.__logger.info(e) 

77 context.set_details(str(e)) 

78 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

79 

80 except NotFoundError as e: 

81 self.__logger.debug(e) 

82 context.set_code(grpc.StatusCode.NOT_FOUND) 

83 publish_counter_metric( 

84 AC_CACHE_MISSES_METRIC_NAME, 

85 1, 

86 {"instance-name": request.instance_name} 

87 ) 

88 

89 except Exception as e: 

90 self.__logger.exception( 

91 f"Unexpected error in GetActionResult; request=[{request}]" 

92 ) 

93 context.set_code(grpc.StatusCode.INTERNAL) 

94 

95 return remote_execution_pb2.ActionResult() 

96 

97 @authorize(AuthContext) 

98 def UpdateActionResult(self, request, context): 

99 self.__logger.info(f"UpdateActionResult request from [{context.peer()}] " 

100 f"([{printable_request_metadata(context)}])") 

101 

102 try: 

103 instance = self._get_instance(request.instance_name) 

104 with DurationMetric(AC_UPDATE_ACTION_RESULT_TIME_METRIC_NAME, 

105 request.instance_name, 

106 instanced=True): 

107 instance.update_action_result(request.action_digest, request.action_result) 

108 return request.action_result 

109 

110 except InvalidArgumentError as e: 

111 self.__logger.info(e) 

112 context.set_details(str(e)) 

113 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

114 

115 except NotImplementedError as e: 

116 self.__logger.info(e) 

117 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

118 

119 except StorageFullError as e: 

120 self.__logger.exception(e) 

121 context.set_details(str(e)) 

122 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 

123 

124 except Exception as e: 

125 self.__logger.exception( 

126 f"Unexpected error in UpdateActionResult; request=[{request}]" 

127 ) 

128 context.set_code(grpc.StatusCode.INTERNAL) 

129 

130 return remote_execution_pb2.ActionResult() 

131 

132 # --- Private API --- 

133 

134 def _get_instance(self, instance_name): 

135 try: 

136 return self._instances[instance_name] 

137 

138 except KeyError: 

139 raise InvalidArgumentError(f"Invalid instance name: [{instance_name}]")