Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/operations/service.py: 0.00%
98 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2021 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17OperationsService
18=================
20"""
22import logging
24import grpc
26from google.protobuf.empty_pb2 import Empty
28from buildgrid._exceptions import InvalidArgumentError, RetriableError
29from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
30from buildgrid.server._authentication import AuthContext, authorize
31from buildgrid.server.metrics_names import (
32 OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME,
33 OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME,
34 OPERATIONS_GET_OPERATION_TIME_METRIC_NAME,
35 OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME
36)
37from buildgrid.server.metrics_utils import DurationMetric
40class OperationsService(operations_pb2_grpc.OperationsServicer):
42 def __init__(self, server):
43 self._logger = logging.getLogger(__name__)
45 self._instances = {}
47 operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
49 # --- Public API ---
51 def add_instance(self, instance_name, instance):
52 """Registers a new servicer instance.
54 Args:
55 instance_name (str): The new instance's name.
56 instance (OperationsInstance): The new instance itself.
57 """
58 self._instances[instance_name] = instance
60 # --- Public API: Servicer ---
62 @authorize(AuthContext)
63 @DurationMetric(OPERATIONS_GET_OPERATION_TIME_METRIC_NAME)
64 def GetOperation(self, request, context):
65 self._logger.info(f"GetOperation request from [{context.peer()}]")
67 try:
68 name = request.name
70 instance_name = self._parse_instance_name(name)
71 instance = self._get_instance(instance_name)
73 operation_name = self._parse_operation_name(name)
74 operation, _ = instance.get_operation(operation_name)
75 op = operations_pb2.Operation()
76 op.CopyFrom(operation)
77 op.name = name
79 # TODO: Convert the metadata returned by `get_operation()`
80 # into a `RequestMetadata` message and attach it to gRPC
81 # trailing metadata as an entry of type:
82 # ('requestmetadata-bin', RequestMetadata).
84 return op
86 except InvalidArgumentError as e:
87 self._logger.info(e)
88 context.set_details(str(e))
89 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
91 except RetriableError as e:
92 context.abort_with_status(e.error_status)
94 except Exception as e:
95 self._logger.exception(
96 f"Unexpected error in GetOperation; request=[{request}]"
97 )
98 context.set_details(str(e))
99 context.set_code(grpc.StatusCode.INTERNAL)
101 return operations_pb2.Operation()
103 @authorize(AuthContext)
104 @DurationMetric(OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME)
105 def ListOperations(self, request, context):
106 self._logger.info(f"ListOperations request from [{context.peer()}]")
108 try:
109 # The request name should be the collection name
110 # In our case, this is just the instance_name
111 instance_name = request.name
112 instance = self._get_instance(instance_name)
114 result = instance.list_operations(request.filter,
115 request.page_size,
116 request.page_token)
118 for operation in result.operations:
119 operation.name = f"{instance_name}/{operation.name}"
121 return result
123 except InvalidArgumentError as e:
124 # This is a client error. Don't log at error on the server side
125 self._logger.info(e)
126 context.set_details(str(e))
127 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
129 except RetriableError as e:
130 context.abort_with_status(e.error_status)
132 except Exception as e:
133 self._logger.exception(
134 f"Unexpected error in ListOperations; request=[{request}]"
135 )
136 context.set_details(str(e))
137 context.set_code(grpc.StatusCode.INTERNAL)
139 return operations_pb2.ListOperationsResponse()
141 @authorize(AuthContext)
142 @DurationMetric(OPERATIONS_DELETE_OPERATION_TIME_METRIC_NAME)
143 def DeleteOperation(self, request, context):
144 self._logger.info(f"DeleteOperation request from [{context.peer()}]")
146 context.set_details("BuildGrid does not support DeleteOperation.")
147 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
148 return Empty()
150 @authorize(AuthContext)
151 @DurationMetric(OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME)
152 def CancelOperation(self, request, context):
153 self._logger.info(f"CancelOperation request from [{context.peer()}]")
155 try:
156 name = request.name
158 instance_name = self._parse_instance_name(name)
159 instance = self._get_instance(instance_name)
161 operation_name = self._parse_operation_name(name)
162 instance.cancel_operation(operation_name)
164 except InvalidArgumentError as e:
165 self._logger.info(e)
166 context.set_details(str(e))
167 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
169 except RetriableError as e:
170 context.abort_with_status(e.error_status)
172 except Exception as e:
173 self._logger.exception(
174 f"Unexpected error in CancelOperation; request=[{request}]"
175 )
176 context.set_details(str(e))
177 context.set_code(grpc.StatusCode.INTERNAL)
179 return Empty()
181 # --- Private API ---
183 @staticmethod
184 def _parse_instance_name(name):
185 """ If the instance name is not blank, 'name' will have the form
186 {instance_name}/{operation_uuid}. Otherwise, it will just be
187 {operation_uuid} """
188 names = name.split('/')
189 return '/'.join(names[:-1]) if len(names) > 1 else ''
191 @staticmethod
192 def _parse_operation_name(name):
193 names = name.split('/')
194 return names[-1] if len(names) > 1 else name
196 def _get_instance(self, name):
197 try:
198 return self._instances[name]
200 except KeyError:
201 raise InvalidArgumentError(f"Instance doesn't exist on server: [{name}]")