Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/operations/service.py: 100.00%

89 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17OperationsService 

18================= 

19 

20""" 

21 

22import logging 

23from typing import Dict, Tuple, cast 

24 

25import grpc 

26from google.protobuf.empty_pb2 import Empty 

27 

28from buildgrid._exceptions import InvalidArgumentError 

29from buildgrid._protos.google.longrunning.operations_pb2 import DESCRIPTOR as OPS_DESCRIPTOR 

30from buildgrid._protos.google.longrunning.operations_pb2 import ( 

31 CancelOperationRequest, 

32 DeleteOperationRequest, 

33 GetOperationRequest, 

34 ListOperationsRequest, 

35 ListOperationsResponse, 

36 Operation, 

37) 

38from buildgrid._protos.google.longrunning.operations_pb2_grpc import ( 

39 OperationsServicer, 

40 add_OperationsServicer_to_server, 

41) 

42from buildgrid.server.auth.manager import authorize 

43from buildgrid.server.instance import instanced 

44from buildgrid.server.metrics_names import ( 

45 OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME, 

46 OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME, 

47 OPERATIONS_GET_OPERATION_TIME_METRIC_NAME, 

48 OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME, 

49) 

50from buildgrid.server.metrics_utils import DurationMetric 

51from buildgrid.server.operations.instance import OperationsInstance 

52from buildgrid.server.request_metadata_utils import request_metadata_from_scheduler_dict 

53from buildgrid.server.servicer import InstancedServicer 

54from buildgrid.server.utils.decorators import handle_errors_unary_unary, track_request_id 

55from buildgrid.settings import REQUEST_METADATA_HEADER_NAME 

56 

57LOGGER = logging.getLogger(__name__) 

58 

59 

60def _parse_instance_name(operation_name: str) -> str: 

61 names = operation_name.split("/") 

62 return "/".join(names[:-1]) if len(names) > 1 else "" 

63 

64 

65def _parse_operation_name(name: str) -> str: 

66 names = name.split("/") 

67 return names[-1] if len(names) > 1 else name 

68 

69 

70class OperationsService(OperationsServicer, InstancedServicer[OperationsInstance]): 

71 REGISTER_METHOD = add_OperationsServicer_to_server 

72 FULL_NAME = OPS_DESCRIPTOR.services_by_name["Operations"].full_name 

73 

74 @instanced(lambda r: _parse_instance_name(r.name)) 

75 @authorize 

76 @track_request_id 

77 @DurationMetric(OPERATIONS_GET_OPERATION_TIME_METRIC_NAME) 

78 @handle_errors_unary_unary(Operation) 

79 def GetOperation(self, request: GetOperationRequest, context: grpc.ServicerContext) -> Operation: 

80 LOGGER.info(f"GetOperation request from [{context.peer()}]") 

81 

82 instance_name = _parse_instance_name(request.name) 

83 instance = self.get_instance(instance_name) 

84 

85 operation_name = _parse_operation_name(request.name) 

86 operation, metadata = instance.get_operation(operation_name) 

87 op = Operation() 

88 op.CopyFrom(operation) 

89 op.name = request.name 

90 

91 if metadata is not None: 

92 metadata_entry = self._operation_request_metadata_entry(metadata) 

93 context.set_trailing_metadata([metadata_entry]) # type: ignore[arg-type] # tricky covariance issue 

94 

95 return op 

96 

97 @instanced(lambda r: cast(str, r.name)) 

98 @authorize 

99 @track_request_id 

100 @DurationMetric(OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME) 

101 @handle_errors_unary_unary(ListOperationsResponse) 

102 def ListOperations(self, request: ListOperationsRequest, context: grpc.ServicerContext) -> ListOperationsResponse: 

103 LOGGER.info(f"ListOperations request from [{context.peer()}]") 

104 

105 # The request name should be the collection name 

106 # In our case, this is just the instance name 

107 instance = self.get_instance(request.name) 

108 result = instance.list_operations(request.filter, request.page_size, request.page_token) 

109 

110 for operation in result.operations: 

111 operation.name = f"{request.name}/{operation.name}" 

112 

113 return result 

114 

115 @instanced(lambda r: _parse_instance_name(r.name)) 

116 @authorize 

117 @track_request_id 

118 @DurationMetric(OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME) 

119 @handle_errors_unary_unary(Empty) 

120 def DeleteOperation(self, request: DeleteOperationRequest, context: grpc.ServicerContext) -> Empty: 

121 LOGGER.info(f"DeleteOperation request from [{context.peer()}]") 

122 

123 context.set_details("BuildGrid does not support DeleteOperation.") 

124 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

125 return Empty() 

126 

127 @instanced(lambda r: _parse_instance_name(r.name)) 

128 @authorize 

129 @track_request_id 

130 @DurationMetric(OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME) 

131 @handle_errors_unary_unary(Empty) 

132 def CancelOperation(self, request: CancelOperationRequest, context: grpc.ServicerContext) -> Empty: 

133 LOGGER.info(f"CancelOperation request from [{context.peer()}]") 

134 

135 operation_name = _parse_operation_name(request.name) 

136 instance_name = _parse_instance_name(request.name) 

137 instance = self.get_instance(instance_name) 

138 instance.cancel_operation(operation_name) 

139 

140 return Empty() 

141 

142 # --- Private API --- 

143 

144 def get_instance(self, instance_name: str) -> "OperationsInstance": 

145 try: 

146 return self.instances[instance_name] 

147 except KeyError: 

148 raise InvalidArgumentError( 

149 f"Instance doesn't exist on server: [{instance_name}] " 

150 '(operation ids have the form "instance_name/operation_uuid")' 

151 ) 

152 

153 @staticmethod 

154 def _operation_request_metadata_entry(operation_metadata: Dict[str, str]) -> Tuple[str, bytes]: 

155 request_metadata = request_metadata_from_scheduler_dict(operation_metadata) 

156 return REQUEST_METADATA_HEADER_NAME, request_metadata.SerializeToString()