Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/bots/job_assigner.py: 91.74%

109 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2022 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17import random 

18import threading 

19import uuid 

20from contextlib import contextmanager 

21from itertools import chain, combinations 

22from threading import Event, Lock 

23from typing import Any, Dict, Iterable, Iterator, List, Set, TypeVar 

24 

25from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession 

26from buildgrid.server.persistence.sql.impl import SQLDataStore 

27from buildgrid.server.threading import ContextWorker 

28from buildgrid.utils import convert_values_to_sorted_lists, flatten_capabilities, hash_from_dict 

29 

30LOGGER = logging.getLogger(__name__) 

31 

32 

33T = TypeVar("T", bound="JobAssigner") 

34 

35 

36class JobAssigner: 

37 def __init__(self, data_store: SQLDataStore, job_assignment_interval: float = 1.0, priority_percentage: int = 100): 

38 self._lock = Lock() 

39 # Dict[Hash, Dict[BotName, Dict[Key, Event]]] 

40 self._events: Dict[str, Dict[str, Dict[str, Event]]] = {} 

41 self._data_store = data_store 

42 # Here we allow immediately starting a new assignment if a bot is added to the lookup. 

43 self._new_bots_added = Event() 

44 self._job_assignment_worker = ContextWorker( 

45 target=self.begin, name="JobAssignment", on_shutdown_requested=self._new_bots_added.set 

46 ) 

47 self._job_assignment_interval = job_assignment_interval 

48 self._priority_percentage = priority_percentage 

49 

50 def __enter__(self: T) -> T: 

51 self.start() 

52 return self 

53 

54 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

55 self.stop() 

56 

57 def start(self) -> None: 

58 self._job_assignment_worker.start() 

59 

60 def stop(self) -> None: 

61 self._job_assignment_worker.stop() 

62 

63 def listener_count(self) -> int: 

64 with self._lock: 

65 return len({bot_name for bot_events in self._events.values() for bot_name in bot_events}) 

66 

67 @contextmanager 

68 def assignment_context(self, bot_session: BotSession) -> Iterator[threading.Event]: 

69 key = str(uuid.uuid4()) 

70 event = Event() 

71 worker_hashes = get_partial_capabilities_hashes(bot_capabilities(bot_session)) 

72 try: 

73 with self._lock: 

74 for worker_hash in worker_hashes: 

75 if worker_hash not in self._events: 

76 self._events[worker_hash] = {} 

77 if bot_session.name not in self._events[worker_hash]: 

78 self._events[worker_hash][bot_session.name] = {} 

79 self._events[worker_hash][bot_session.name][key] = event 

80 self._new_bots_added.set() 

81 yield event 

82 finally: 

83 with self._lock: 

84 for worker_hash in worker_hashes: 

85 del self._events[worker_hash][bot_session.name][key] 

86 if len(self._events[worker_hash][bot_session.name]) == 0: 

87 del self._events[worker_hash][bot_session.name] 

88 if len(self._events[worker_hash]) == 0: 

89 del self._events[worker_hash] 

90 

91 def assign_jobs(self, shutdown_requested: threading.Event, oldest_first: bool = False) -> None: 

92 """Assign jobs to the currently connected workers 

93 

94 This method iterates over the buckets of currently connected workers, 

95 and requests a number of job assignments from the data store to cover 

96 the number of workers in each bucket. Empty buckets are skipped. 

97 """ 

98 

99 with self._lock: 

100 worker_hashes = list(self._events.keys()) 

101 

102 random.shuffle(worker_hashes) 

103 for worker_hash in worker_hashes: 

104 if shutdown_requested.is_set(): 

105 return 

106 

107 with self._lock: 

108 bot_names = list(self._events.get(worker_hash, {})) 

109 

110 if bot_names: 

111 if oldest_first: 

112 assigned_bot_names = self._data_store.assign_n_leases_by_age( 

113 capability_hash=worker_hash, bot_names=bot_names 

114 ) 

115 else: 

116 assigned_bot_names = self._data_store.assign_n_leases_by_priority( 

117 capability_hash=worker_hash, bot_names=bot_names 

118 ) 

119 with self._lock: 

120 for name in assigned_bot_names: 

121 for event in self._events.get(worker_hash, {}).get(name, {}).values(): 

122 event.set() 

123 

124 def begin(self, shutdown_requested: threading.Event) -> None: 

125 while not shutdown_requested.is_set(): 

126 try: 

127 oldest_first = random.randint(1, 100) > self._priority_percentage 

128 self.assign_jobs(shutdown_requested, oldest_first=oldest_first) 

129 except Exception: 

130 LOGGER.exception("Error in job assignment thread", exc_info=True) 

131 self._new_bots_added.wait(timeout=self._job_assignment_interval) 

132 self._new_bots_added.clear() 

133 

134 

135def get_partial_capabilities(capabilities: Dict[str, List[str]]) -> Iterable[Dict[str, List[str]]]: 

136 """ 

137 Given a capabilities dictionary with all values as lists, yield all partial capabilities dictionaries. 

138 """ 

139 

140 CAPABILITIES_WARNING_THRESHOLD = 10 

141 

142 caps_flat = flatten_capabilities(capabilities) 

143 

144 if len(caps_flat) > CAPABILITIES_WARNING_THRESHOLD: 

145 LOGGER.warning( 

146 "A worker with a large capabilities dictionary has been connected. " 

147 f"Processing its capabilities may take a while. Capabilities: {capabilities}" 

148 ) 

149 

150 # Using the itertools powerset recipe, construct the powerset of the tuples 

151 capabilities_powerset = chain.from_iterable(combinations(caps_flat, r) for r in range(len(caps_flat) + 1)) 

152 for partial_capability_tuples in capabilities_powerset: 

153 partial_dict: Dict[str, List[str]] = {} 

154 

155 for tup in partial_capability_tuples: 

156 partial_dict.setdefault(tup[0], []).append(tup[1]) 

157 yield partial_dict 

158 

159 

160def get_partial_capabilities_hashes(capabilities: Dict[str, Set[str]]) -> List[str]: 

161 """ 

162 Given a list of configurations, obtain each partial configuration for each configuration, 

163 obtain the hash of each partial configuration, compile these into a list, and return the result. 

164 """ 

165 

166 # Convert requirements values to sorted lists to make them json-serializable 

167 normalized_capabilities = convert_values_to_sorted_lists(capabilities) 

168 

169 capabilities_list = [] 

170 for partial_capability in get_partial_capabilities(normalized_capabilities): 

171 capabilities_list.append(hash_from_dict(partial_capability)) 

172 return capabilities_list 

173 

174 

175def bot_capabilities(bot_session: BotSession) -> Dict[str, Set[str]]: 

176 worker_capabilities: Dict[str, Set[str]] = {} 

177 if bot_session.worker.devices: 

178 # According to the spec: 

179 # "The first device in the worker is the "primary device" - 

180 # that is, the device running a bot and which is 

181 # responsible for actually executing commands." 

182 primary_device = bot_session.worker.devices[0] 

183 

184 for device_property in primary_device.properties: 

185 if device_property.key not in worker_capabilities: 

186 worker_capabilities[device_property.key] = set() 

187 worker_capabilities[device_property.key].add(device_property.value) 

188 return worker_capabilities