Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/assigner.py: 95.51%
89 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1import random
2import threading
3import uuid
4from contextlib import contextmanager
5from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional
7from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession
8from buildgrid.server.context import current_instance, instance_context
9from buildgrid.server.logging import buildgrid_logger
10from buildgrid.server.threading import ContextWorker
12from .properties import PropertySet, hash_from_dict
14if TYPE_CHECKING:
15 # Avoid circular import
16 from .impl import Scheduler
19LOGGER = buildgrid_logger(__name__)
22class JobAssigner:
23 def __init__(
24 self,
25 scheduler: "Scheduler",
26 property_set: PropertySet,
27 job_assignment_interval: float = 1.0,
28 priority_percentage: int = 100,
29 ):
30 self._lock = threading.Lock()
31 # Dict[Instance, Dict[Hash, Dict[BotName, Dict[Key, Event]]]]
32 self._events: Dict[str, Dict[str, Dict[str, Dict[str, threading.Event]]]] = {}
33 self._scheduler = scheduler
34 self._property_set = property_set
35 # Here we allow immediately starting a new assignment if a bot is added to the lookup.
36 self._new_bots_added = threading.Event()
37 self.assigner = ContextWorker(
38 target=self.begin, name="JobAssignment", on_shutdown_requested=self._new_bots_added.set
39 )
40 self.job_assignment_interval = job_assignment_interval
41 self.priority_percentage = priority_percentage
43 def __enter__(self) -> "JobAssigner":
44 self.start()
45 return self
47 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
48 self.stop()
50 def start(self) -> None:
51 self.assigner.start()
53 def stop(self) -> None:
54 self.assigner.stop()
56 def listener_count(self, instance_name: Optional[str] = None) -> int:
57 with self._lock:
58 return len(
59 {
60 bot_name
61 for event_instance_name, instance_events in self._events.items()
62 if instance_name is None or instance_name == event_instance_name
63 for bot_events in instance_events.values()
64 for bot_name in bot_events
65 }
66 )
68 @contextmanager
69 def assignment_context(self, bot_session: BotSession) -> Iterator[threading.Event]:
70 key = str(uuid.uuid4())
71 event = threading.Event()
72 worker_hashes = set(map(hash_from_dict, self._property_set.worker_properties(bot_session)))
73 instance_name = current_instance()
74 try:
75 with self._lock:
76 self._events.setdefault(instance_name, {})
77 for worker_hash in worker_hashes:
78 self._events[instance_name].setdefault(worker_hash, {})
79 self._events[instance_name][worker_hash].setdefault(bot_session.name, {})
80 self._events[instance_name][worker_hash][bot_session.name][key] = event
81 self._new_bots_added.set()
82 yield event
83 finally:
84 with self._lock:
85 for worker_hash in worker_hashes:
86 del self._events[instance_name][worker_hash][bot_session.name][key]
87 if len(self._events[instance_name][worker_hash][bot_session.name]) == 0:
88 del self._events[instance_name][worker_hash][bot_session.name]
89 if len(self._events[instance_name][worker_hash]) == 0:
90 del self._events[instance_name][worker_hash]
91 if len(self._events[instance_name]) == 0:
92 del self._events[instance_name]
94 def assign_jobs(self, shutdown_requested: threading.Event, instance_name: str, oldest_first: bool = False) -> None:
95 """Assign jobs to the currently connected workers
97 This method iterates over the buckets of currently connected workers,
98 and requests a number of job assignments from the scheduler to cover
99 the number of workers in each bucket. Empty buckets are skipped.
100 """
102 with self._lock:
103 worker_hashes = list(self._events.get(instance_name, {}).keys())
105 random.shuffle(worker_hashes)
106 for worker_hash in worker_hashes:
107 if shutdown_requested.is_set():
108 return
110 with self._lock:
111 bot_names = list(self._events.get(instance_name, {}).get(worker_hash, {}))
113 if bot_names:
114 if oldest_first:
115 assigned_bot_names = self._scheduler.assign_n_leases_by_age(
116 capability_hash=worker_hash, bot_names=bot_names
117 )
118 else:
119 assigned_bot_names = self._scheduler.assign_n_leases_by_priority(
120 capability_hash=worker_hash, bot_names=bot_names
121 )
122 with self._lock:
123 for name in assigned_bot_names:
124 for event in self._events.get(instance_name, {}).get(worker_hash, {}).get(name, {}).values():
125 event.set()
127 def begin(self, shutdown_requested: threading.Event) -> None:
128 while not shutdown_requested.is_set():
129 oldest_first = random.randint(1, 100) > self.priority_percentage
131 with self._lock:
132 instance_names = list(self._events)
134 for instance_name in instance_names:
135 try:
136 with instance_context(instance_name):
137 self.assign_jobs(shutdown_requested, instance_name, oldest_first=oldest_first)
138 except Exception:
139 LOGGER.exception(
140 "Error in job assignment thread.", tags=dict(instance_name=instance_name), exc_info=True
141 )
143 self._new_bots_added.wait(timeout=self.job_assignment_interval)
144 self._new_bots_added.clear()