Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/notifier.py: 96.74%

92 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import select 

17import uuid 

18from contextlib import contextmanager 

19from threading import Event, Lock 

20from typing import Any, Dict, Iterator, Tuple 

21 

22from sqlalchemy import select as sql_select 

23from sqlalchemy.orm import Session 

24 

25from buildgrid.server.logging import buildgrid_logger 

26from buildgrid.server.sql.models import JobEntry 

27from buildgrid.server.sql.provider import SqlProvider 

28from buildgrid.server.threading import ContextWorker 

29 

30LOGGER = buildgrid_logger(__name__) 

31 

32 

33class OperationsNotifier: 

34 def __init__(self, sql_provider: SqlProvider, poll_interval: float = 1) -> None: 

35 """ 

36 Creates a notifier for changes to jobs, used by observes of related operations. 

37 

38 Note: jobs have a one-to-many relationship with operations, and for each operation 

39 there can be multiple clients listening for updates. 

40 """ 

41 

42 self._sql = sql_provider 

43 self._lock = Lock() 

44 self._listeners: Dict[str, Dict[str, Event]] = {} 

45 self.poll_interval = poll_interval 

46 self.worker = ContextWorker(name="OperationsNotifier", target=self.begin) 

47 

48 def __enter__(self: "OperationsNotifier") -> "OperationsNotifier": 

49 self.start() 

50 return self 

51 

52 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

53 self.stop() 

54 

55 def start(self) -> None: 

56 self.worker.start() 

57 

58 def stop(self) -> None: 

59 self.worker.stop() 

60 

61 def listener_count(self) -> int: 

62 """Used for reporting job metrics about the scheduling.""" 

63 with self._lock: 

64 return sum(len(events) for events in self._listeners.values()) 

65 

66 def begin(self, shutdown_requested: Event) -> None: 

67 LOGGER.info("Starting job notifier thread.") 

68 

69 while not shutdown_requested.is_set(): 

70 try: 

71 with self._sql.session() as session: 

72 if self._sql.dialect == "postgresql": 

73 self._listen_for_updates(shutdown_requested, session) 

74 else: 

75 self._poll_for_updates(shutdown_requested, session) 

76 except Exception as e: 

77 LOGGER.warning( 

78 f"OperationsNotifier encountered exception: {e}.", 

79 tags=dict(retry_delay_seconds=self.poll_interval), 

80 ) 

81 # Sleep for a bit so that we give enough time for the 

82 # database to potentially recover 

83 shutdown_requested.wait(timeout=self.poll_interval) 

84 

85 def _listen_for_updates(self, shutdown_requested: Event, session: Session) -> None: 

86 # In our `LISTEN` call, we want to *bypass the ORM* and *use the underlying Engine connection directly*. 

87 # This is because using a `session.execute()` will implicitly create a SQL transaction, causing 

88 # notifications to only be delivered when that transaction is committed. 

89 engine_conn = session.connection() 

90 

91 try: 

92 connection_fairy = engine_conn.connection 

93 connection_fairy.cursor().execute("LISTEN job_updated;") # type: ignore 

94 connection_fairy.commit() 

95 except Exception: 

96 LOGGER.warning("Could not start listening to DB for job updates.", exc_info=True) 

97 raise 

98 

99 while not shutdown_requested.is_set(): 

100 dbapi_connection = connection_fairy.dbapi_connection # type: ignore 

101 # Wait until the connection is ready for reading. Timeout and try again if there was nothing to read. 

102 # If the connection becomes readable, collect the notifications it has received and handle them. 

103 # 

104 # See https://www.psycopg.org/docs/advanced.html#asynchronous-notifications 

105 if select.select([dbapi_connection], [], [], self.poll_interval) == ([], [], []): 

106 continue 

107 

108 dbapi_connection.poll() 

109 while dbapi_connection.notifies: 

110 notify = dbapi_connection.notifies.pop() 

111 self.notify(notify.payload) 

112 

113 def _poll_for_updates(self, shutdown_requested: Event, session: Session) -> None: 

114 prev_data: Dict[str, Tuple[bool, int]] = {} 

115 while not shutdown_requested.is_set(): 

116 with self._lock: 

117 names = list(self._listeners) 

118 

119 # Only query for the minimal amount of data required. 

120 # The subscribers can choose how they want to act (e.g. by querying the full job data). 

121 statement = sql_select(JobEntry.name, JobEntry.cancelled, JobEntry.stage).where(JobEntry.name.in_(names)) 

122 next_data: Dict[str, Tuple[bool, int]] = {} 

123 for [name, cancelled, stage] in session.execute(statement).all(): 

124 next_data[name] = (cancelled, stage) 

125 

126 for name in next_data: 

127 if name not in prev_data or prev_data[name] != next_data[name]: 

128 self.notify(name) 

129 

130 prev_data = next_data 

131 shutdown_requested.wait(timeout=self.poll_interval) 

132 

133 def notify(self, job_name: str) -> None: 

134 with self._lock: 

135 if job_name in self._listeners: 

136 for event in self._listeners[job_name].values(): 

137 event.set() 

138 

139 @contextmanager 

140 def subscription(self, job_name: str) -> Iterator[Event]: 

141 """ 

142 Register a threading.Event object which is triggered each time the associated job_name updates 

143 its cancelled or stage status. After waiting for an event, the caller should immediately call 

144 event.clear() if they wish to re-use the event again, otherwise the event object will remain set. 

145 """ 

146 

147 # Create a unique key for the subscription which is deleted when the job is no longer monitored. 

148 key = str(uuid.uuid4()) 

149 event = Event() 

150 try: 

151 with self._lock: 

152 if job_name not in self._listeners: 

153 self._listeners[job_name] = {} 

154 self._listeners[job_name][key] = event 

155 yield event 

156 finally: 

157 with self._lock: 

158 del self._listeners[job_name][key] 

159 if len(self._listeners[job_name]) == 0: 

160 del self._listeners[job_name]