Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/instance.py: 78.57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

98 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ExecutionInstance 

18================= 

19An instance of the Remote Execution Service. 

20""" 

21 

22import logging 

23 

24from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError 

25from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

26 Action, ActionCacheUpdateCapabilities, Command 

27) 

28from buildgrid.utils import get_hash_type 

29 

30 

31class ExecutionInstance: 

32 

33 def __init__(self, scheduler, storage, property_keys, match_properties, 

34 discard_unwatched_jobs: bool = False, action_cache=None): 

35 self.__logger = logging.getLogger(__name__) 

36 

37 self._scheduler = scheduler 

38 self._instance_name = None 

39 self._discard_unwatched_jobs = discard_unwatched_jobs 

40 

41 self._property_keys = property_keys 

42 self._match_properties = match_properties 

43 self._unique_keys = set(["OSFamily"]) # Those keys only allow one value to be set 

44 

45 self.__storage = storage 

46 self._action_cache = action_cache 

47 self._storage_capabilities = None 

48 

49 # --- Public API --- 

50 

51 @property 

52 def instance_name(self): 

53 return self._instance_name 

54 

55 @property 

56 def discard_unwatched_jobs(self) -> bool: 

57 return self._discard_unwatched_jobs 

58 

59 @property 

60 def scheduler(self): 

61 return self._scheduler 

62 

63 def setup_grpc(self): 

64 self._scheduler.setup_grpc() 

65 self.__storage.setup_grpc() 

66 self._action_cache.setup_grpc() 

67 

68 def register_instance_with_server(self, instance_name, server): 

69 """Names and registers the execution instance with a given server.""" 

70 if self._instance_name is None: 

71 server.add_execution_instance(self, instance_name) 

72 

73 self._instance_name = instance_name 

74 if self._scheduler is not None: 

75 self._scheduler.set_instance_name(instance_name) 

76 

77 else: 

78 raise AssertionError("Instance already registered") 

79 

80 def hash_type(self): 

81 return get_hash_type() 

82 

83 def execute(self, action_digest, skip_cache_lookup, priority=0): 

84 """ Sends a job for execution. 

85 Queues an action and creates an Operation instance to be associated with 

86 this action. 

87 """ 

88 action = self.__storage.get_message(action_digest, Action) 

89 

90 if not action: 

91 raise FailedPreconditionError("Could not get action from storage.") 

92 

93 if action.HasField('platform'): 

94 platform = action.platform 

95 else: 

96 self.__logger.debug("Platform not set in Action, fetching command.") 

97 command = self.__storage.get_message(action.command_digest, Command) 

98 if not command: 

99 raise FailedPreconditionError( 

100 "Could not get command from storage.") 

101 

102 platform = command.platform 

103 

104 platform_requirements = {} 

105 for platform_property in platform.properties: 

106 name = platform_property.name 

107 if name not in self._property_keys: 

108 raise FailedPreconditionError( 

109 f"Unregistered platform property [{name}]. " 

110 f"Known properties are: [{self._property_keys}]") 

111 

112 if name in self._unique_keys and name in platform_requirements: 

113 raise FailedPreconditionError( 

114 f"Platform property [{name}] can only be set once.") 

115 

116 if name in self._match_properties: 

117 if name not in platform_requirements: 

118 platform_requirements[name] = set() 

119 platform_requirements[name].add(platform_property.value) 

120 

121 return self._scheduler.queue_job_action(action, action_digest, 

122 platform_requirements=platform_requirements, 

123 skip_cache_lookup=skip_cache_lookup, priority=priority) 

124 

125 def register_job_peer(self, job_name, peer, request_metadata=None): 

126 try: 

127 return self._scheduler.register_job_peer( 

128 job_name, peer, request_metadata=request_metadata) 

129 

130 except NotFoundError: 

131 raise InvalidArgumentError(f"Job name does not exist: [{job_name}]") 

132 

133 def register_operation_peer(self, operation_name, peer): 

134 try: 

135 self._scheduler.register_job_operation_peer(operation_name, peer) 

136 

137 except NotFoundError: 

138 raise InvalidArgumentError(f"Operation name does not exist: [{operation_name}]") 

139 

140 def unregister_operation_peer(self, operation_name, peer): 

141 try: 

142 self._scheduler.unregister_job_operation_peer(operation_name, peer, self._discard_unwatched_jobs) 

143 

144 except NotFoundError: 

145 # Operation already unregistered due to being finished/cancelled 

146 pass 

147 except TimeoutError: 

148 self.__logger.warning(f"Could not unregister_operation_peer for " 

149 f"operation_name=[{operation_name}], peer=[{peer}] due to" 

150 f" timeout.", exc_info=True) 

151 

152 def stream_operation_updates(self, operation_name, context): 

153 for error, operation in self._scheduler.stream_operation_updates(operation_name, context): 

154 if error is not None: 

155 error.last_response = operation 

156 raise error 

157 

158 yield operation 

159 

160 if not context.is_active() or operation.done: 

161 return 

162 return 

163 

164 def get_storage_capabilities(self): 

165 if self._storage_capabilities is None: 

166 self._storage_capabilities = self.__storage.get_capabilities() 

167 return self._storage_capabilities 

168 

169 def get_action_cache_capabilities(self): 

170 hash_type = None 

171 capabilities = None 

172 if self._action_cache is not None: 

173 capabilities = ActionCacheUpdateCapabilities() 

174 hash_type = self._action_cache.hash_type() 

175 capabilities.update_enabled = self._action_cache.allow_updates 

176 return hash_type, capabilities