Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/notifier.py: 96.74%
92 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import select
17import uuid
18from contextlib import contextmanager
19from threading import Event, Lock
20from typing import Any, Dict, Iterator, Tuple
22from sqlalchemy import select as sql_select
23from sqlalchemy.orm import Session
25from buildgrid.server.logging import buildgrid_logger
26from buildgrid.server.sql.models import JobEntry
27from buildgrid.server.sql.provider import SqlProvider
28from buildgrid.server.threading import ContextWorker
30LOGGER = buildgrid_logger(__name__)
33class OperationsNotifier:
34 def __init__(self, sql_provider: SqlProvider, poll_interval: float = 1) -> None:
35 """
36 Creates a notifier for changes to jobs, used by observes of related operations.
38 Note: jobs have a one-to-many relationship with operations, and for each operation
39 there can be multiple clients listening for updates.
40 """
42 self._sql = sql_provider
43 self._lock = Lock()
44 self._listeners: Dict[str, Dict[str, Event]] = {}
45 self.poll_interval = poll_interval
46 self.worker = ContextWorker(name="OperationsNotifier", target=self.begin)
48 def __enter__(self: "OperationsNotifier") -> "OperationsNotifier":
49 self.start()
50 return self
52 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
53 self.stop()
55 def start(self) -> None:
56 self.worker.start()
58 def stop(self) -> None:
59 self.worker.stop()
61 def listener_count(self) -> int:
62 """Used for reporting job metrics about the scheduling."""
63 with self._lock:
64 return sum(len(events) for events in self._listeners.values())
66 def begin(self, shutdown_requested: Event) -> None:
67 LOGGER.info("Starting job notifier thread.")
69 while not shutdown_requested.is_set():
70 try:
71 with self._sql.session() as session:
72 if self._sql.dialect == "postgresql":
73 self._listen_for_updates(shutdown_requested, session)
74 else:
75 self._poll_for_updates(shutdown_requested, session)
76 except Exception as e:
77 LOGGER.warning(
78 f"OperationsNotifier encountered exception: {e}.",
79 tags=dict(retry_delay_seconds=self.poll_interval),
80 )
81 # Sleep for a bit so that we give enough time for the
82 # database to potentially recover
83 shutdown_requested.wait(timeout=self.poll_interval)
85 def _listen_for_updates(self, shutdown_requested: Event, session: Session) -> None:
86 # In our `LISTEN` call, we want to *bypass the ORM* and *use the underlying Engine connection directly*.
87 # This is because using a `session.execute()` will implicitly create a SQL transaction, causing
88 # notifications to only be delivered when that transaction is committed.
89 engine_conn = session.connection()
91 try:
92 connection_fairy = engine_conn.connection
93 connection_fairy.cursor().execute("LISTEN job_updated;") # type: ignore
94 connection_fairy.commit()
95 except Exception:
96 LOGGER.warning("Could not start listening to DB for job updates.", exc_info=True)
97 raise
99 while not shutdown_requested.is_set():
100 dbapi_connection = connection_fairy.dbapi_connection # type: ignore
101 # Wait until the connection is ready for reading. Timeout and try again if there was nothing to read.
102 # If the connection becomes readable, collect the notifications it has received and handle them.
103 #
104 # See https://www.psycopg.org/docs/advanced.html#asynchronous-notifications
105 if select.select([dbapi_connection], [], [], self.poll_interval) == ([], [], []):
106 continue
108 dbapi_connection.poll()
109 while dbapi_connection.notifies:
110 notify = dbapi_connection.notifies.pop()
111 self.notify(notify.payload)
113 def _poll_for_updates(self, shutdown_requested: Event, session: Session) -> None:
114 prev_data: Dict[str, Tuple[bool, int]] = {}
115 while not shutdown_requested.is_set():
116 with self._lock:
117 names = list(self._listeners)
119 # Only query for the minimal amount of data required.
120 # The subscribers can choose how they want to act (e.g. by querying the full job data).
121 statement = sql_select(JobEntry.name, JobEntry.cancelled, JobEntry.stage).where(JobEntry.name.in_(names))
122 next_data: Dict[str, Tuple[bool, int]] = {}
123 for [name, cancelled, stage] in session.execute(statement).all():
124 next_data[name] = (cancelled, stage)
126 for name in next_data:
127 if name not in prev_data or prev_data[name] != next_data[name]:
128 self.notify(name)
130 prev_data = next_data
131 shutdown_requested.wait(timeout=self.poll_interval)
133 def notify(self, job_name: str) -> None:
134 with self._lock:
135 if job_name in self._listeners:
136 for event in self._listeners[job_name].values():
137 event.set()
139 @contextmanager
140 def subscription(self, job_name: str) -> Iterator[Event]:
141 """
142 Register a threading.Event object which is triggered each time the associated job_name updates
143 its cancelled or stage status. After waiting for an event, the caller should immediately call
144 event.clear() if they wish to re-use the event again, otherwise the event object will remain set.
145 """
147 # Create a unique key for the subscription which is deleted when the job is no longer monitored.
148 key = str(uuid.uuid4())
149 event = Event()
150 try:
151 with self._lock:
152 if job_name not in self._listeners:
153 self._listeners[job_name] = {}
154 self._listeners[job_name][key] = event
155 yield event
156 finally:
157 with self._lock:
158 del self._listeners[job_name][key]
159 if len(self._listeners[job_name]) == 0:
160 del self._listeners[job_name]