Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ExecutionInstance 

18================= 

19An instance of the Remote Execution Service. 

20""" 

21 

22import logging 

23 

24from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError 

25from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Command 

26from buildgrid.utils import get_hash_type 

27 

28 

29class ExecutionInstance: 

30 

31 def __init__(self, scheduler, storage, property_keys, discard_unwatched_jobs: bool = False): 

32 self.__logger = logging.getLogger(__name__) 

33 

34 self._scheduler = scheduler 

35 self._instance_name = None 

36 self._discard_unwatched_jobs = discard_unwatched_jobs 

37 

38 self._property_keys = property_keys 

39 self._unique_keys = set(["OSFamily"]) # Those keys only allow one value to be set 

40 

41 self.__storage = storage 

42 

43 # --- Public API --- 

44 

45 @property 

46 def instance_name(self): 

47 return self._instance_name 

48 

49 @property 

50 def discard_unwatched_jobs(self) -> bool: 

51 return self._discard_unwatched_jobs 

52 

53 @property 

54 def scheduler(self): 

55 return self._scheduler 

56 

57 def register_instance_with_server(self, instance_name, server): 

58 """Names and registers the execution instance with a given server.""" 

59 if self._instance_name is None: 

60 server.add_execution_instance(self, instance_name) 

61 

62 self._instance_name = instance_name 

63 if self._scheduler is not None: 

64 self._scheduler.set_instance_name(instance_name) 

65 

66 else: 

67 raise AssertionError("Instance already registered") 

68 

69 def hash_type(self): 

70 return get_hash_type() 

71 

72 def execute(self, action_digest, skip_cache_lookup): 

73 """ Sends a job for execution. 

74 Queues an action and creates an Operation instance to be associated with 

75 this action. 

76 """ 

77 action = self.__storage.get_message(action_digest, Action) 

78 

79 if not action: 

80 raise FailedPreconditionError("Could not get action from storage.") 

81 

82 command = self.__storage.get_message(action.command_digest, Command) 

83 

84 if not command: 

85 raise FailedPreconditionError( 

86 "Could not get command from storage.") 

87 

88 platform_requirements = {} 

89 for platform_property in command.platform.properties: 

90 if platform_property.name not in self._property_keys: 

91 raise FailedPreconditionError( 

92 f"Unregistered platform property [{platform_property.name}]. " 

93 f"Known properties are: [{self._property_keys}]") 

94 

95 elif platform_property.name in self._unique_keys: 

96 if platform_property.name in platform_requirements: 

97 raise FailedPreconditionError( 

98 f"Platform property [{platform_property.name}] can only be set once.") 

99 if platform_property.name not in platform_requirements: 

100 platform_requirements[platform_property.name] = set() 

101 platform_requirements[platform_property.name].add( 

102 platform_property.value) 

103 

104 return self._scheduler.queue_job_action(action, action_digest, 

105 platform_requirements=platform_requirements, 

106 skip_cache_lookup=skip_cache_lookup) 

107 

108 def register_job_peer(self, job_name, peer): 

109 try: 

110 return self._scheduler.register_job_peer(job_name, peer) 

111 

112 except NotFoundError: 

113 raise InvalidArgumentError(f"Job name does not exist: [{job_name}]") 

114 

115 def register_operation_peer(self, operation_name, peer): 

116 try: 

117 self._scheduler.register_job_operation_peer(operation_name, peer) 

118 

119 except NotFoundError: 

120 raise InvalidArgumentError(f"Operation name does not exist: [{operation_name}]") 

121 

122 def unregister_operation_peer(self, operation_name, peer): 

123 try: 

124 self._scheduler.unregister_job_operation_peer(operation_name, peer, self._discard_unwatched_jobs) 

125 

126 except NotFoundError: 

127 # Operation already unregistered due to being finished/cancelled 

128 pass 

129 

130 def stream_operation_updates(self, operation_name, context): 

131 for error, operation in self._scheduler.stream_operation_updates(operation_name, context): 

132 if error is not None: 

133 error.last_response = operation 

134 raise error 

135 

136 yield operation 

137 

138 if not context.is_active() or operation.done: 

139 return 

140 return