Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/notifier.py: 96.74%

92 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-04-14 16:27 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import select 

17import uuid 

18from contextlib import contextmanager 

19from threading import Event, Lock 

20from typing import Any, Iterator, cast 

21 

22from sqlalchemy import select as sql_select 

23from sqlalchemy.orm import Session 

24 

25from buildgrid.server.logging import buildgrid_logger 

26from buildgrid.server.sql.models import JobEntry 

27from buildgrid.server.sql.provider import SqlProvider 

28from buildgrid.server.threading import ContextWorker 

29 

30LOGGER = buildgrid_logger(__name__) 

31 

32 

33class OperationsNotifier: 

34 def __init__(self, sql_provider: SqlProvider, poll_interval: float = 1) -> None: 

35 """ 

36 Creates a notifier for changes to jobs, used by observes of related operations. 

37 

38 Note: jobs have a one-to-many relationship with operations, and for each operation 

39 there can be multiple clients listening for updates. 

40 """ 

41 

42 self._sql = sql_provider 

43 self._lock = Lock() 

44 self._listeners: dict[str, dict[str, Event]] = {} 

45 self.poll_interval = poll_interval 

46 self.worker = ContextWorker(name="OperationsNotifier", target=self.begin) 

47 

48 def __enter__(self: "OperationsNotifier") -> "OperationsNotifier": 

49 self.start() 

50 return self 

51 

52 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

53 self.stop() 

54 

55 def start(self) -> None: 

56 self.worker.start() 

57 

58 def stop(self) -> None: 

59 self.worker.stop() 

60 

61 def listener_count(self) -> int: 

62 """Used for reporting job metrics about the scheduling.""" 

63 with self._lock: 

64 return sum(len(events) for events in self._listeners.values()) 

65 

66 def begin(self, shutdown_requested: Event) -> None: 

67 LOGGER.info("Starting job notifier thread.") 

68 

69 while not shutdown_requested.is_set(): 

70 try: 

71 with self._sql.session() as session: 

72 if self._sql.dialect == "postgresql": 

73 self._listen_for_updates(shutdown_requested, session) 

74 else: 

75 self._poll_for_updates(shutdown_requested, session) 

76 except Exception as e: 

77 LOGGER.warning( 

78 f"OperationsNotifier encountered exception: {e}.", 

79 tags=dict(retry_delay_seconds=self.poll_interval), 

80 ) 

81 # Sleep for a bit so that we give enough time for the 

82 # database to potentially recover 

83 shutdown_requested.wait(timeout=self.poll_interval) 

84 

85 def _listen_for_updates(self, shutdown_requested: Event, session: Session) -> None: 

86 # In our `LISTEN` call, we want to *bypass the ORM* and *use the underlying Engine connection directly*. 

87 # This is because using a `session.execute()` will implicitly create a SQL transaction, causing 

88 # notifications to only be delivered when that transaction is committed. 

89 from psycopg2.extensions import connection 

90 

91 try: 

92 pool_connection = session.connection().connection 

93 pool_connection.cursor().execute("LISTEN job_updated;") 

94 pool_connection.commit() 

95 except Exception: 

96 LOGGER.warning("Could not start listening to DB for job updates.", exc_info=True) 

97 raise 

98 

99 while not shutdown_requested.is_set() and pool_connection.dbapi_connection is not None: 

100 # If we're in this method, we know we have a psycopg2 connection object here. 

101 dbapi_connection = cast(connection, pool_connection.dbapi_connection) 

102 

103 # Wait until the connection is ready for reading. Timeout and try again if there was nothing to read. 

104 # If the connection becomes readable, collect the notifications it has received and handle them. 

105 # 

106 # See https://www.psycopg.org/docs/advanced.html#asynchronous-notifications 

107 if select.select([dbapi_connection], [], [], self.poll_interval) == ([], [], []): 

108 continue 

109 

110 dbapi_connection.poll() 

111 while dbapi_connection.notifies: 

112 notify = dbapi_connection.notifies.pop() 

113 self.notify(notify.payload) 

114 

115 def _poll_for_updates(self, shutdown_requested: Event, session: Session) -> None: 

116 prev_data: dict[str, tuple[bool, int]] = {} 

117 while not shutdown_requested.is_set(): 

118 with self._lock: 

119 names = list(self._listeners) 

120 

121 # Only query for the minimal amount of data required. 

122 # The subscribers can choose how they want to act (e.g. by querying the full job data). 

123 statement = sql_select(JobEntry.name, JobEntry.cancelled, JobEntry.stage).where(JobEntry.name.in_(names)) 

124 next_data: dict[str, tuple[bool, int]] = {} 

125 for [name, cancelled, stage] in session.execute(statement).all(): 

126 next_data[name] = (cancelled, stage) 

127 

128 for name in next_data: 

129 if name not in prev_data or prev_data[name] != next_data[name]: 

130 self.notify(name) 

131 

132 prev_data = next_data 

133 shutdown_requested.wait(timeout=self.poll_interval) 

134 

135 def notify(self, job_name: str) -> None: 

136 with self._lock: 

137 if job_name in self._listeners: 

138 for event in self._listeners[job_name].values(): 

139 event.set() 

140 

141 @contextmanager 

142 def subscription(self, job_name: str) -> Iterator[Event]: 

143 """ 

144 Register a threading.Event object which is triggered each time the associated job_name updates 

145 its cancelled or stage status. After waiting for an event, the caller should immediately call 

146 event.clear() if they wish to re-use the event again, otherwise the event object will remain set. 

147 """ 

148 

149 # Create a unique key for the subscription which is deleted when the job is no longer monitored. 

150 key = str(uuid.uuid4()) 

151 event = Event() 

152 try: 

153 with self._lock: 

154 if job_name not in self._listeners: 

155 self._listeners[job_name] = {} 

156 self._listeners[job_name][key] = event 

157 yield event 

158 finally: 

159 with self._lock: 

160 del self._listeners[job_name][key] 

161 if len(self._listeners[job_name]) == 0: 

162 del self._listeners[job_name]