Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17OperationsInstance 

18================== 

19An instance of the LongRunningOperations Service. 

20""" 

21 

22import logging 

23 

24from buildgrid._exceptions import InvalidArgumentError, NotFoundError 

25from buildgrid._protos.google.longrunning import operations_pb2 

26 

27 

28class OperationsInstance: 

29 

30 def __init__(self, scheduler): 

31 self.__logger = logging.getLogger(__name__) 

32 

33 self._scheduler = scheduler 

34 self._instance_name = None 

35 

36 # --- Public API --- 

37 

38 @property 

39 def instance_name(self): 

40 return self._instance_name 

41 

42 @property 

43 def scheduler(self): 

44 return self._scheduler 

45 

46 def register_instance_with_server(self, instance_name, server): 

47 """Names and registers the operations instance with a given server.""" 

48 if self._instance_name is None: 

49 server.add_operations_instance(self, instance_name) 

50 

51 self._instance_name = instance_name 

52 

53 else: 

54 raise AssertionError("Instance already registered") 

55 

56 def get_operation(self, job_name): 

57 try: 

58 operation = self._scheduler.get_job_operation(job_name) 

59 

60 except NotFoundError: 

61 raise InvalidArgumentError(f"Operation name does not exist: [{job_name}]") 

62 

63 return operation 

64 

65 def list_operations(self, list_filter, page_size, page_token): 

66 # TODO: Pages 

67 # Spec says number of pages and length of a page are optional 

68 response = operations_pb2.ListOperationsResponse() 

69 

70 response.operations.extend(self._scheduler.get_all_operations()) 

71 

72 return response 

73 

74 def delete_operation(self, job_name): 

75 try: 

76 self._scheduler.delete_job_operation(job_name) 

77 

78 except NotFoundError: 

79 raise InvalidArgumentError(f"Operation name does not exist: [{job_name}]") 

80 

81 def cancel_operation(self, job_name): 

82 try: 

83 self._scheduler.cancel_job_operation(job_name) 

84 

85 except NotFoundError: 

86 raise InvalidArgumentError(f"Operation name does not exist: [{job_name}]")