Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/assigner.py: 95.51%

89 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1import random 

2import threading 

3import uuid 

4from contextlib import contextmanager 

5from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional 

6 

7from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession 

8from buildgrid.server.context import current_instance, instance_context 

9from buildgrid.server.logging import buildgrid_logger 

10from buildgrid.server.threading import ContextWorker 

11 

12from .properties import PropertySet, hash_from_dict 

13 

14if TYPE_CHECKING: 

15 # Avoid circular import 

16 from .impl import Scheduler 

17 

18 

19LOGGER = buildgrid_logger(__name__) 

20 

21 

22class JobAssigner: 

23 def __init__( 

24 self, 

25 scheduler: "Scheduler", 

26 property_set: PropertySet, 

27 job_assignment_interval: float = 1.0, 

28 priority_percentage: int = 100, 

29 ): 

30 self._lock = threading.Lock() 

31 # Dict[Instance, Dict[Hash, Dict[BotName, Dict[Key, Event]]]] 

32 self._events: Dict[str, Dict[str, Dict[str, Dict[str, threading.Event]]]] = {} 

33 self._scheduler = scheduler 

34 self._property_set = property_set 

35 # Here we allow immediately starting a new assignment if a bot is added to the lookup. 

36 self._new_bots_added = threading.Event() 

37 self.assigner = ContextWorker( 

38 target=self.begin, name="JobAssignment", on_shutdown_requested=self._new_bots_added.set 

39 ) 

40 self.job_assignment_interval = job_assignment_interval 

41 self.priority_percentage = priority_percentage 

42 

43 def __enter__(self) -> "JobAssigner": 

44 self.start() 

45 return self 

46 

47 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

48 self.stop() 

49 

50 def start(self) -> None: 

51 self.assigner.start() 

52 

53 def stop(self) -> None: 

54 self.assigner.stop() 

55 

56 def listener_count(self, instance_name: Optional[str] = None) -> int: 

57 with self._lock: 

58 return len( 

59 { 

60 bot_name 

61 for event_instance_name, instance_events in self._events.items() 

62 if instance_name is None or instance_name == event_instance_name 

63 for bot_events in instance_events.values() 

64 for bot_name in bot_events 

65 } 

66 ) 

67 

68 @contextmanager 

69 def assignment_context(self, bot_session: BotSession) -> Iterator[threading.Event]: 

70 key = str(uuid.uuid4()) 

71 event = threading.Event() 

72 worker_hashes = set(map(hash_from_dict, self._property_set.worker_properties(bot_session))) 

73 instance_name = current_instance() 

74 try: 

75 with self._lock: 

76 self._events.setdefault(instance_name, {}) 

77 for worker_hash in worker_hashes: 

78 self._events[instance_name].setdefault(worker_hash, {}) 

79 self._events[instance_name][worker_hash].setdefault(bot_session.name, {}) 

80 self._events[instance_name][worker_hash][bot_session.name][key] = event 

81 self._new_bots_added.set() 

82 yield event 

83 finally: 

84 with self._lock: 

85 for worker_hash in worker_hashes: 

86 del self._events[instance_name][worker_hash][bot_session.name][key] 

87 if len(self._events[instance_name][worker_hash][bot_session.name]) == 0: 

88 del self._events[instance_name][worker_hash][bot_session.name] 

89 if len(self._events[instance_name][worker_hash]) == 0: 

90 del self._events[instance_name][worker_hash] 

91 if len(self._events[instance_name]) == 0: 

92 del self._events[instance_name] 

93 

94 def assign_jobs(self, shutdown_requested: threading.Event, instance_name: str, oldest_first: bool = False) -> None: 

95 """Assign jobs to the currently connected workers 

96 

97 This method iterates over the buckets of currently connected workers, 

98 and requests a number of job assignments from the scheduler to cover 

99 the number of workers in each bucket. Empty buckets are skipped. 

100 """ 

101 

102 with self._lock: 

103 worker_hashes = list(self._events.get(instance_name, {}).keys()) 

104 

105 random.shuffle(worker_hashes) 

106 for worker_hash in worker_hashes: 

107 if shutdown_requested.is_set(): 

108 return 

109 

110 with self._lock: 

111 bot_names = list(self._events.get(instance_name, {}).get(worker_hash, {})) 

112 

113 if bot_names: 

114 if oldest_first: 

115 assigned_bot_names = self._scheduler.assign_n_leases_by_age( 

116 capability_hash=worker_hash, bot_names=bot_names 

117 ) 

118 else: 

119 assigned_bot_names = self._scheduler.assign_n_leases_by_priority( 

120 capability_hash=worker_hash, bot_names=bot_names 

121 ) 

122 with self._lock: 

123 for name in assigned_bot_names: 

124 for event in self._events.get(instance_name, {}).get(worker_hash, {}).get(name, {}).values(): 

125 event.set() 

126 

127 def begin(self, shutdown_requested: threading.Event) -> None: 

128 while not shutdown_requested.is_set(): 

129 oldest_first = random.randint(1, 100) > self.priority_percentage 

130 

131 with self._lock: 

132 instance_names = list(self._events) 

133 

134 for instance_name in instance_names: 

135 try: 

136 with instance_context(instance_name): 

137 self.assign_jobs(shutdown_requested, instance_name, oldest_first=oldest_first) 

138 except Exception: 

139 LOGGER.exception( 

140 "Error in job assignment thread.", tags=dict(instance_name=instance_name), exc_info=True 

141 ) 

142 

143 self._new_bots_added.wait(timeout=self.job_assignment_interval) 

144 self._new_bots_added.clear()