Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/sql/notifier.py: 96.74%

92 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1import logging 

2import select 

3import uuid 

4from contextlib import contextmanager 

5from threading import Event, Lock 

6from typing import Any, Dict, Iterator, Tuple 

7 

8from sqlalchemy import select as sql_select 

9from sqlalchemy.orm import Session 

10 

11from buildgrid.server.persistence.sql.models import JobEntry 

12from buildgrid.server.sql.provider import SqlProvider 

13from buildgrid.server.threading import ContextWorker 

14 

15LOGGER = logging.getLogger(__name__) 

16 

17 

18class OperationsNotifier: 

19 def __init__(self, sql_provider: SqlProvider, poll_interval: float = 1) -> None: 

20 """ 

21 Creates a notifier for changes to jobs, used by observes of related operations. 

22 

23 Note: jobs have a one-to-many relationship with operations, and for each operation 

24 there can be multiple clients listening for updates. 

25 """ 

26 

27 self._sql = sql_provider 

28 self._lock = Lock() 

29 self._listeners: Dict[str, Dict[str, Event]] = {} 

30 self.poll_interval = poll_interval 

31 self.worker = ContextWorker(name="OperationsNotifier", target=self.begin) 

32 

33 def __enter__(self: "OperationsNotifier") -> "OperationsNotifier": 

34 self.start() 

35 return self 

36 

37 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

38 self.stop() 

39 

40 def start(self) -> None: 

41 self.worker.start() 

42 

43 def stop(self) -> None: 

44 self.worker.stop() 

45 

46 def listener_count(self) -> int: 

47 """Used for reporting job metrics about the scheduling.""" 

48 with self._lock: 

49 return sum(len(events) for events in self._listeners.values()) 

50 

51 def begin(self, shutdown_requested: Event) -> None: 

52 LOGGER.info("Starting job notifier thread") 

53 

54 while not shutdown_requested.is_set(): 

55 try: 

56 with self._sql.session() as session: 

57 if self._sql.dialect == "postgresql": 

58 self._listen_for_updates(shutdown_requested, session) 

59 else: 

60 self._poll_for_updates(shutdown_requested, session) 

61 except Exception as e: 

62 LOGGER.warning( 

63 f"OperationsNotifier encountered exception: [{e}];" 

64 f"Retrying in poll_interval=[{self.poll_interval}] seconds." 

65 ) 

66 # Sleep for a bit so that we give enough time for the 

67 # database to potentially recover 

68 shutdown_requested.wait(timeout=self.poll_interval) 

69 

70 def _listen_for_updates(self, shutdown_requested: Event, session: Session) -> None: 

71 # In our `LISTEN` call, we want to *bypass the ORM* and *use the underlying Engine connection directly*. 

72 # This is because using a `session.execute()` will implicitly create a SQL transaction, causing 

73 # notifications to only be delivered when that transaction is committed. 

74 engine_conn = session.connection() 

75 

76 try: 

77 connection_fairy = engine_conn.connection 

78 connection_fairy.cursor().execute("LISTEN job_updated;") # type: ignore 

79 connection_fairy.commit() 

80 except Exception: 

81 LOGGER.warning("Could not start listening to DB for job updates", exc_info=True) 

82 raise 

83 

84 while not shutdown_requested.is_set(): 

85 dbapi_connection = connection_fairy.dbapi_connection # type: ignore 

86 # Wait until the connection is ready for reading. Timeout and try again if there was nothing to read. 

87 # If the connection becomes readable, collect the notifications it has received and handle them. 

88 # 

89 # See https://www.psycopg.org/docs/advanced.html#asynchronous-notifications 

90 if select.select([dbapi_connection], [], [], self.poll_interval) == ([], [], []): 

91 continue 

92 

93 dbapi_connection.poll() 

94 while dbapi_connection.notifies: 

95 notify = dbapi_connection.notifies.pop() 

96 self.notify(notify.payload) 

97 

98 def _poll_for_updates(self, shutdown_requested: Event, session: Session) -> None: 

99 prev_data: Dict[str, Tuple[bool, int]] = {} 

100 while not shutdown_requested.is_set(): 

101 with self._lock: 

102 names = list(self._listeners) 

103 

104 # Only query for the minimal amount of data required. 

105 # The subscribers can choose how they want to act (e.g. by querying the full job data). 

106 statement = sql_select(JobEntry.name, JobEntry.cancelled, JobEntry.stage).where(JobEntry.name.in_(names)) 

107 next_data: Dict[str, Tuple[bool, int]] = {} 

108 for [name, cancelled, stage] in session.execute(statement).all(): 

109 next_data[name] = (cancelled, stage) 

110 

111 for name in next_data: 

112 if name not in prev_data or prev_data[name] != next_data[name]: 

113 self.notify(name) 

114 

115 prev_data = next_data 

116 shutdown_requested.wait(timeout=self.poll_interval) 

117 

118 def notify(self, job_name: str) -> None: 

119 with self._lock: 

120 if job_name in self._listeners: 

121 for event in self._listeners[job_name].values(): 

122 event.set() 

123 

124 @contextmanager 

125 def subscription(self, job_name: str) -> Iterator[Event]: 

126 """ 

127 Register a threading.Event object which is triggered each time the associated job_name updates 

128 its cancelled or stage status. After waiting for an event, the caller should immediately call 

129 event.clear() if they wish to re-use the event again, otherwise the event object will remain set. 

130 """ 

131 

132 # Create a unique key for the subscription which is deleted when the job is no longer monitored. 

133 key = str(uuid.uuid4()) 

134 event = Event() 

135 try: 

136 with self._lock: 

137 if job_name not in self._listeners: 

138 self._listeners[job_name] = {} 

139 self._listeners[job_name][key] = event 

140 yield event 

141 finally: 

142 with self._lock: 

143 del self._listeners[job_name][key] 

144 if len(self._listeners[job_name]) == 0: 

145 del self._listeners[job_name]