Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/instance.py: 74.75%

99 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17ExecutionInstance 

18================= 

19An instance of the Remote Execution Service. 

20""" 

21 

22import logging 

23 

24from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError 

25from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

26 Action, ActionCacheUpdateCapabilities, Command 

27) 

28from buildgrid.utils import get_hash_type 

29 

30 

31class ExecutionInstance: 

32 

33 def __init__(self, scheduler, storage, property_keys, match_properties, 

34 discard_unwatched_jobs: bool = False, action_cache=None): 

35 self.__logger = logging.getLogger(__name__) 

36 

37 self._scheduler = scheduler 

38 self._instance_name = None 

39 self._discard_unwatched_jobs = discard_unwatched_jobs 

40 

41 self._property_keys = property_keys 

42 self._match_properties = match_properties 

43 self._unique_keys = set(["OSFamily"]) # Those keys only allow one value to be set 

44 

45 self.__storage = storage 

46 self._action_cache = action_cache 

47 self._storage_capabilities = None 

48 

49 # --- Public API --- 

50 

51 @property 

52 def instance_name(self): 

53 return self._instance_name 

54 

55 @property 

56 def discard_unwatched_jobs(self) -> bool: 

57 return self._discard_unwatched_jobs 

58 

59 @property 

60 def scheduler(self): 

61 return self._scheduler 

62 

63 def setup_grpc(self): 

64 self._scheduler.setup_grpc() 

65 self.__storage.setup_grpc() 

66 

67 if self._action_cache is not None: 

68 self._action_cache.setup_grpc() 

69 

70 def register_instance_with_server(self, instance_name, server): 

71 """Names and registers the execution instance with a given server.""" 

72 if self._instance_name is None: 

73 server.add_execution_instance(self, instance_name) 

74 

75 self._instance_name = instance_name 

76 if self._scheduler is not None: 

77 self._scheduler.set_instance_name(instance_name) 

78 

79 else: 

80 raise AssertionError("Instance already registered") 

81 

82 def hash_type(self): 

83 return get_hash_type() 

84 

85 def execute(self, action_digest, skip_cache_lookup, priority=0): 

86 """ Sends a job for execution. 

87 Queues an action and creates an Operation instance to be associated with 

88 this action. 

89 """ 

90 action = self.__storage.get_message(action_digest, Action) 

91 

92 if not action: 

93 raise FailedPreconditionError("Could not get action from storage.") 

94 

95 if action.HasField('platform'): 

96 platform = action.platform 

97 else: 

98 self.__logger.debug("Platform not set in Action, fetching command.") 

99 command = self.__storage.get_message(action.command_digest, Command) 

100 if not command: 

101 raise FailedPreconditionError( 

102 "Could not get command from storage.") 

103 

104 platform = command.platform 

105 

106 platform_requirements = {} 

107 for platform_property in platform.properties: 

108 name = platform_property.name 

109 if name not in self._property_keys: 

110 raise FailedPreconditionError( 

111 f"Unregistered platform property [{name}]. " 

112 f"Known properties are: [{self._property_keys}]") 

113 

114 if name in self._unique_keys and name in platform_requirements: 

115 raise FailedPreconditionError( 

116 f"Platform property [{name}] can only be set once.") 

117 

118 if name in self._match_properties: 

119 if name not in platform_requirements: 

120 platform_requirements[name] = set() 

121 platform_requirements[name].add(platform_property.value) 

122 

123 return self._scheduler.queue_job_action(action, action_digest, 

124 platform_requirements=platform_requirements, 

125 skip_cache_lookup=skip_cache_lookup, priority=priority) 

126 

127 def register_job_peer(self, job_name, peer, request_metadata=None): 

128 try: 

129 return self._scheduler.register_job_peer( 

130 job_name, peer, request_metadata=request_metadata) 

131 

132 except NotFoundError: 

133 raise InvalidArgumentError(f"Job name does not exist: [{job_name}]") 

134 

135 def register_operation_peer(self, operation_name, peer): 

136 try: 

137 self._scheduler.register_job_operation_peer(operation_name, peer) 

138 

139 except NotFoundError: 

140 raise InvalidArgumentError(f"Operation name does not exist: [{operation_name}]") 

141 

142 def unregister_operation_peer(self, operation_name, peer): 

143 try: 

144 self._scheduler.unregister_job_operation_peer(operation_name, peer, self._discard_unwatched_jobs) 

145 

146 except NotFoundError: 

147 # Operation already unregistered due to being finished/cancelled 

148 pass 

149 except TimeoutError: 

150 self.__logger.warning(f"Could not unregister_operation_peer for " 

151 f"operation_name=[{operation_name}], peer=[{peer}] due to" 

152 f" timeout.", exc_info=True) 

153 

154 def stream_operation_updates(self, operation_name, context): 

155 for error, operation in self._scheduler.stream_operation_updates(operation_name, context): 

156 if error is not None: 

157 error.last_response = operation 

158 raise error 

159 

160 yield operation 

161 

162 if not context.is_active() or operation.done: 

163 return 

164 return 

165 

166 def get_storage_capabilities(self): 

167 if self._storage_capabilities is None: 

168 self._storage_capabilities = self.__storage.get_capabilities() 

169 return self._storage_capabilities 

170 

171 def get_action_cache_capabilities(self): 

172 hash_type = None 

173 capabilities = None 

174 if self._action_cache is not None: 

175 capabilities = ActionCacheUpdateCapabilities() 

176 hash_type = self._action_cache.hash_type() 

177 capabilities.update_enabled = self._action_cache.allow_updates 

178 return hash_type, capabilities