Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/operations/service.py: 80.56%
108 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17OperationsService
18=================
20"""
22import logging
24import grpc
26from google.protobuf.empty_pb2 import Empty
28from buildgrid._exceptions import InvalidArgumentError, RetriableError
29from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
30from buildgrid.server._authentication import AuthContext, authorize
31from buildgrid.server.metrics_names import (
32 OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME,
33 OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME,
34 OPERATIONS_GET_OPERATION_TIME_METRIC_NAME,
35 OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME
36)
37from buildgrid.server.metrics_utils import DurationMetric
38from buildgrid.server.request_metadata_utils import request_metadata_from_scheduler_dict
41class OperationsService(operations_pb2_grpc.OperationsServicer):
43 def __init__(self, server):
44 self.__logger = logging.getLogger(__name__)
46 self._instances = {}
48 operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
50 # --- Public API ---
52 def add_instance(self, instance_name, instance):
53 """Registers a new servicer instance.
55 Args:
56 instance_name (str): The new instance's name.
57 instance (OperationsInstance): The new instance itself.
58 """
59 self._instances[instance_name] = instance
61 # --- Public API: Servicer ---
63 @authorize(AuthContext)
64 @DurationMetric(OPERATIONS_GET_OPERATION_TIME_METRIC_NAME)
65 def GetOperation(self, request, context):
66 self.__logger.info(f"GetOperation request from [{context.peer()}]")
68 try:
69 name = request.name
71 instance_name = self._parse_instance_name(name)
72 instance = self._get_instance(instance_name)
74 operation_name = self._parse_operation_name(name)
75 operation, metadata = instance.get_operation(operation_name)
76 op = operations_pb2.Operation()
77 op.CopyFrom(operation)
78 op.name = name
80 if metadata is not None:
81 metadata_entry = self._operation_request_metadata_entry(metadata)
82 context.set_trailing_metadata([metadata_entry])
84 return op
86 except InvalidArgumentError as e:
87 self.__logger.info(e)
88 context.set_details(str(e))
89 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
91 except RetriableError as e:
92 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}")
93 context.abort_with_status(e.error_status)
95 except Exception as e:
96 self.__logger.exception(
97 f"Unexpected error in GetOperation; request=[{request}]"
98 )
99 context.set_details(str(e))
100 context.set_code(grpc.StatusCode.INTERNAL)
102 return operations_pb2.Operation()
104 @authorize(AuthContext)
105 @DurationMetric(OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME)
106 def ListOperations(self, request, context):
107 self.__logger.info(f"ListOperations request from [{context.peer()}]")
109 try:
110 # The request name should be the collection name
111 # In our case, this is just the instance_name
112 instance_name = request.name
113 instance = self._get_instance(instance_name)
115 result = instance.list_operations(request.filter,
116 request.page_size,
117 request.page_token)
119 for operation in result.operations:
120 operation.name = f"{instance_name}/{operation.name}"
122 return result
124 except InvalidArgumentError as e:
125 # This is a client error. Don't log at error on the server side
126 self.__logger.info(e)
127 context.set_details(str(e))
128 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
130 except RetriableError as e:
131 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}")
132 context.abort_with_status(e.error_status)
134 except Exception as e:
135 self.__logger.exception(
136 f"Unexpected error in ListOperations; request=[{request}]"
137 )
138 context.set_details(str(e))
139 context.set_code(grpc.StatusCode.INTERNAL)
141 return operations_pb2.ListOperationsResponse()
143 @authorize(AuthContext)
144 @DurationMetric(OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME)
145 def DeleteOperation(self, request, context):
146 self.__logger.info(f"DeleteOperation request from [{context.peer()}]")
148 context.set_details("BuildGrid does not support DeleteOperation.")
149 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
150 return Empty()
152 @authorize(AuthContext)
153 @DurationMetric(OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME)
154 def CancelOperation(self, request, context):
155 self.__logger.info(f"CancelOperation request from [{context.peer()}]")
157 try:
158 name = request.name
160 instance_name = self._parse_instance_name(name)
161 instance = self._get_instance(instance_name)
163 operation_name = self._parse_operation_name(name)
164 instance.cancel_operation(operation_name)
166 except InvalidArgumentError as e:
167 self.__logger.info(e)
168 context.set_details(str(e))
169 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
171 except RetriableError as e:
172 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}")
173 context.abort_with_status(e.error_status)
175 except Exception as e:
176 self.__logger.exception(
177 f"Unexpected error in CancelOperation; request=[{request}]"
178 )
179 context.set_details(str(e))
180 context.set_code(grpc.StatusCode.INTERNAL)
182 return Empty()
184 # --- Private API ---
186 def _parse_instance_name(self, name):
187 """ If the instance name is not blank, 'name' will have the form
188 {instance_name}/{operation_uuid}. Otherwise, it will just be
189 {operation_uuid} """
190 names = name.split('/')
191 return '/'.join(names[:-1]) if len(names) > 1 else ''
193 def _parse_operation_name(self, name):
194 names = name.split('/')
195 return names[-1] if len(names) > 1 else name
197 def _get_instance(self, name):
198 try:
199 return self._instances[name]
201 except KeyError:
202 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]")
204 @staticmethod
205 def _operation_request_metadata_entry(operation_metadata):
206 request_metadata = request_metadata_from_scheduler_dict(operation_metadata)
207 return 'requestmetadata-bin', request_metadata.SerializeToString()