Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

# Copyright (C) 2018 Bloomberg LP 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# <http://www.apache.org/licenses/LICENSE-2.0> 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

 

 

""" 

ExecutionInstance 

================= 

An instance of the Remote Execution Service. 

""" 

 

import logging 

import queue 

 

from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError 

from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Command 

from buildgrid.utils import get_hash_type 

 

 

class ExecutionInstance: 

 

def __init__(self, scheduler, storage, property_keys): 

self.__logger = logging.getLogger(__name__) 

 

self._scheduler = scheduler 

self._instance_name = None 

self._standard_keys = set(["ISA", "OSFamily"]) 

self._property_keys = set(property_keys) if property_keys else set() 

self.__storage = storage 

 

# --- Public API --- 

 

@property 

def instance_name(self): 

return self._instance_name 

 

@property 

def scheduler(self): 

return self._scheduler 

 

def register_instance_with_server(self, instance_name, server): 

"""Names and registers the execution instance with a given server.""" 

if self._instance_name is None: 

server.add_execution_instance(self, instance_name) 

 

self._instance_name = instance_name 

if self._scheduler is not None: 

self._scheduler.set_instance_name(instance_name) 

 

else: 

raise AssertionError("Instance already registered") 

 

def hash_type(self): 

return get_hash_type() 

 

def execute(self, action_digest, skip_cache_lookup): 

""" Sends a job for execution. 

Queues an action and creates an Operation instance to be associated with 

this action. 

""" 

action = self.__storage.get_message(action_digest, Action) 

 

if not action: 

raise FailedPreconditionError("Could not get action from storage.") 

 

command = self.__storage.get_message(action.command_digest, Command) 

 

if not command: 

raise FailedPreconditionError( 

"Could not get command from storage.") 

 

platform_requirements = {} 

for platform_property in command.platform.properties: 

if platform_property.name not in self._standard_keys: 

if platform_property.name not in self._property_keys: 

raise FailedPreconditionError( 

"Unregistered platform property.") 

 

elif platform_property.name == "OSFamily": 

if platform_property.name in platform_requirements: 

raise FailedPreconditionError( 

"Multiple OSFamilies specified.") 

 

if platform_property.name not in platform_requirements: 

platform_requirements[platform_property.name] = set() 

platform_requirements[platform_property.name].add( 

platform_property.value) 

 

return self._scheduler.queue_job_action(action, action_digest, 

platform_requirements=platform_requirements, 

skip_cache_lookup=skip_cache_lookup) 

 

def register_job_peer(self, job_name, peer, message_queue): 

try: 

return self._scheduler.register_job_peer(job_name, 

peer, message_queue) 

 

except NotFoundError: 

raise InvalidArgumentError("Job name does not exist: [{}]" 

.format(job_name)) 

 

def register_operation_peer(self, operation_name, peer, message_queue): 

try: 

self._scheduler.register_job_operation_peer(operation_name, 

peer, message_queue) 

 

except NotFoundError: 

raise InvalidArgumentError("Operation name does not exist: [{}]" 

.format(operation_name)) 

 

def unregister_operation_peer(self, operation_name, peer): 

try: 

self._scheduler.unregister_job_operation_peer(operation_name, peer) 

 

except NotFoundError: 

# Operation already unregistered due to being finished/cancelled 

pass 

 

def get_next_operation_update(self, message_queue, timeout): 

error, operation = message_queue.get(timeout=timeout) 

if error is not None: 

raise error 

return operation 

 

def stream_operation_updates_while_context_is_active(self, message_queue, context, timeout=5): 

while context.is_active(): 

try: 

operation = self.get_next_operation_update(message_queue, timeout) 

while not operation.done: 

yield operation 

operation = self.get_next_operation_update(message_queue, timeout) 

if not context.is_active(): 

return # Raises StopIteration 

 

yield operation 

 

return # Raises StopIteration 

except queue.Empty: 

pass