Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/notifier.py: 96.74%
92 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-04-14 16:27 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-04-14 16:27 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import select
17import uuid
18from contextlib import contextmanager
19from threading import Event, Lock
20from typing import Any, Iterator, cast
22from sqlalchemy import select as sql_select
23from sqlalchemy.orm import Session
25from buildgrid.server.logging import buildgrid_logger
26from buildgrid.server.sql.models import JobEntry
27from buildgrid.server.sql.provider import SqlProvider
28from buildgrid.server.threading import ContextWorker
30LOGGER = buildgrid_logger(__name__)
33class OperationsNotifier:
34 def __init__(self, sql_provider: SqlProvider, poll_interval: float = 1) -> None:
35 """
36 Creates a notifier for changes to jobs, used by observes of related operations.
38 Note: jobs have a one-to-many relationship with operations, and for each operation
39 there can be multiple clients listening for updates.
40 """
42 self._sql = sql_provider
43 self._lock = Lock()
44 self._listeners: dict[str, dict[str, Event]] = {}
45 self.poll_interval = poll_interval
46 self.worker = ContextWorker(name="OperationsNotifier", target=self.begin)
48 def __enter__(self: "OperationsNotifier") -> "OperationsNotifier":
49 self.start()
50 return self
52 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
53 self.stop()
55 def start(self) -> None:
56 self.worker.start()
58 def stop(self) -> None:
59 self.worker.stop()
61 def listener_count(self) -> int:
62 """Used for reporting job metrics about the scheduling."""
63 with self._lock:
64 return sum(len(events) for events in self._listeners.values())
66 def begin(self, shutdown_requested: Event) -> None:
67 LOGGER.info("Starting job notifier thread.")
69 while not shutdown_requested.is_set():
70 try:
71 with self._sql.session() as session:
72 if self._sql.dialect == "postgresql":
73 self._listen_for_updates(shutdown_requested, session)
74 else:
75 self._poll_for_updates(shutdown_requested, session)
76 except Exception as e:
77 LOGGER.warning(
78 f"OperationsNotifier encountered exception: {e}.",
79 tags=dict(retry_delay_seconds=self.poll_interval),
80 )
81 # Sleep for a bit so that we give enough time for the
82 # database to potentially recover
83 shutdown_requested.wait(timeout=self.poll_interval)
85 def _listen_for_updates(self, shutdown_requested: Event, session: Session) -> None:
86 # In our `LISTEN` call, we want to *bypass the ORM* and *use the underlying Engine connection directly*.
87 # This is because using a `session.execute()` will implicitly create a SQL transaction, causing
88 # notifications to only be delivered when that transaction is committed.
89 from psycopg2.extensions import connection
91 try:
92 pool_connection = session.connection().connection
93 pool_connection.cursor().execute("LISTEN job_updated;")
94 pool_connection.commit()
95 except Exception:
96 LOGGER.warning("Could not start listening to DB for job updates.", exc_info=True)
97 raise
99 while not shutdown_requested.is_set() and pool_connection.dbapi_connection is not None:
100 # If we're in this method, we know we have a psycopg2 connection object here.
101 dbapi_connection = cast(connection, pool_connection.dbapi_connection)
103 # Wait until the connection is ready for reading. Timeout and try again if there was nothing to read.
104 # If the connection becomes readable, collect the notifications it has received and handle them.
105 #
106 # See https://www.psycopg.org/docs/advanced.html#asynchronous-notifications
107 if select.select([dbapi_connection], [], [], self.poll_interval) == ([], [], []):
108 continue
110 dbapi_connection.poll()
111 while dbapi_connection.notifies:
112 notify = dbapi_connection.notifies.pop()
113 self.notify(notify.payload)
115 def _poll_for_updates(self, shutdown_requested: Event, session: Session) -> None:
116 prev_data: dict[str, tuple[bool, int]] = {}
117 while not shutdown_requested.is_set():
118 with self._lock:
119 names = list(self._listeners)
121 # Only query for the minimal amount of data required.
122 # The subscribers can choose how they want to act (e.g. by querying the full job data).
123 statement = sql_select(JobEntry.name, JobEntry.cancelled, JobEntry.stage).where(JobEntry.name.in_(names))
124 next_data: dict[str, tuple[bool, int]] = {}
125 for [name, cancelled, stage] in session.execute(statement).all():
126 next_data[name] = (cancelled, stage)
128 for name in next_data:
129 if name not in prev_data or prev_data[name] != next_data[name]:
130 self.notify(name)
132 prev_data = next_data
133 shutdown_requested.wait(timeout=self.poll_interval)
135 def notify(self, job_name: str) -> None:
136 with self._lock:
137 if job_name in self._listeners:
138 for event in self._listeners[job_name].values():
139 event.set()
141 @contextmanager
142 def subscription(self, job_name: str) -> Iterator[Event]:
143 """
144 Register a threading.Event object which is triggered each time the associated job_name updates
145 its cancelled or stage status. After waiting for an event, the caller should immediately call
146 event.clear() if they wish to re-use the event again, otherwise the event object will remain set.
147 """
149 # Create a unique key for the subscription which is deleted when the job is no longer monitored.
150 key = str(uuid.uuid4())
151 event = Event()
152 try:
153 with self._lock:
154 if job_name not in self._listeners:
155 self._listeners[job_name] = {}
156 self._listeners[job_name][key] = event
157 yield event
158 finally:
159 with self._lock:
160 del self._listeners[job_name][key]
161 if len(self._listeners[job_name]) == 0:
162 del self._listeners[job_name]