Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/pika_publisher.py: 45.22%
115 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2021 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import logging
16from queue import Empty, Queue
17from time import sleep
18from threading import Thread
19from typing import Dict, Optional
21import pika # type: ignore
23from buildgrid.server.rabbitmq.utils import MessageSpec
24from buildgrid.settings import RABBITMQ_PUBLISHER_CREATION_RETRIES
25from buildgrid.utils import retry_delay
28ExchangesDict = Dict[str, pika.exchange_type.ExchangeType]
31class PikaPublisher:
32 """Helper class to send messages to a RabbitMQ exchange."""
33 def __init__(self, connection_parameters: pika.ConnectionParameters,
34 exchanges: Optional[ExchangesDict]=None):
35 """
36 Args:
37 connection_parameters: pika server's connection information
38 exchanges (dict(str, pika.exchange_type.ExchangeType)):
39 optional list of exchanges to be declared during
40 initialization
41 """
42 self._logger = logging.getLogger(__name__)
44 self._connection = pika.BlockingConnection(parameters=connection_parameters)
45 # Unlike `PikaConsumer`, this class uses pika's `BlockingConnection`
46 # adapter.
47 # That is because it enables `send()` to be implemented as a single
48 # call to `pika.channel.Channel.basic_publish()`, which provides
49 # error-handling by raising detailed exception types.
50 # It also allows to enable delivery confirmations for the channel,
51 # which makes `basic_publish()` raise `UnroutableError` exceptions
52 # for messages that fail to be delivered.
53 # Therefore, users of this class can simply write a single try/except
54 # block around `send()` (which is also blocking) rather than requiring
55 # the involvement of a pair of callbacks.
56 # Note that the cost for code simplicity might be a loss in performance,
57 # so in the future it could be worth slightly changing the API of the
58 # Publisher and using pika's `SelectConnection` adapter.
60 self._channel = self._connection.channel()
61 # Configure channel to confirm that messages were delivered to
62 # the broker:
63 self._channel.confirm_delivery()
65 if exchanges:
66 for exchange_name, exchange_type in exchanges.items():
67 self._channel.exchange_declare(exchange_name,
68 exchange_type=exchange_type,
69 durable=True)
71 def __del__(self):
72 if not self._connection.is_closed:
73 try:
74 self._connection.close()
75 self._logger.debug("Connection closed after deleting PikaPublisher.")
76 except Exception as e:
77 self._logger.debug(f"Failed to close connection while deleting PikaPublisher: {e}")
79 def send(self, exchange: str, routing_key: str, body: bytes):
80 """Send a message to an exchange (blocking call).
82 Args:
83 exchange (str): name of the destination exchange
84 routing_key (str): routing key to bind on
85 body (bytes): payload of message
87 Note that, in addition to `pika.exceptions` related to connection
88 or channel issues, this method may raise:
89 * `UnroutableError`: if the message is returned by the broker
91 * `NackError`: if the message is negatively-acknowledged by the broker
92 """
93 self._channel.basic_publish(exchange=exchange,
94 routing_key=routing_key,
95 mandatory=True,
96 body=body)
99def get_publisher(
100 params: pika.ConnectionParameters,
101 exchanges: Optional[ExchangesDict]
102) -> Optional[PikaPublisher]:
103 """Create a PikaPublisher, retrying on connection failure.
105 This method will retry :const:`buildgrid.settings.RABBITMQ_PUBLISHER_CREATION_RETRIES`
106 times before giving up and returning ``None``.
108 Args:
109 params (pika.ConnectionParameters): The Pika connection parameters to
110 use when opening the connection to RabbitMQ.
111 exchanges (dict): Map of exchange names to types. This should contain
112 all the exchanges that this publisher will publish messages on,
113 so they can be declared before use.
115 Returns:
116 :class:``PikaPublisher`` if one can be created,
117 ``None`` otherwise.
119 """
120 attempt = 1
121 while attempt < RABBITMQ_PUBLISHER_CREATION_RETRIES:
122 try:
123 return PikaPublisher(params, exchanges=exchanges)
124 except Exception as e:
125 delay = retry_delay(attempt)
126 sleep(delay)
127 attempt += 1
128 if attempt >= RABBITMQ_PUBLISHER_CREATION_RETRIES:
129 raise e
130 return None
133class RetryingPikaPublisher:
135 """Class to handle message publishing with retry handling.
137 This class runs a "publisher thread" which creates a :class:`PikaPublisher`
138 and uses it to publish messages to RabbitMQ. This thread also handles any
139 Exceptions which get raised in the process of publishing the messages,
140 retrying the publishing a configurable number of times (or indefinitely).
142 The :meth:`RetryingPikaPublisher.send` method handles handing messages that
143 need to be sent through to the publisher thread, which allows clean and
144 thread-safe message publishing using ``RetryingPikaPublisher`` across
145 multiple threads (e.g. gRPC request handlers).
147 """
149 def __init__(
150 self,
151 connection_parameters: pika.ConnectionParameters,
152 thread_name: Optional[str]=None,
153 max_publish_attempts: int=0,
154 exchanges: Optional[ExchangesDict]=None,
155 retry_delay_base: int=1
156 ):
157 """Instantiate a new RetryingPikaPublisher.
159 Args:
160 connection_parameters (pika.ConnectionParameters): The Pika connection
161 parameters to use when opening the connection to RabbitMQ.
162 thread_name (str): Optional name to use for the publisher thread. If
163 unset then the thread will be named according to Python's default
164 thread naming scheme.
165 max_publish_attempts (int): The number of times to attempt to publish
166 a specific message before giving up. If set to 0 (the default) then
167 publishing will be retried indefinitely until successful.
168 exchanges (dict): Map of exchange names to types. This should contain
169 all the exchanges that this publisher will publish messages on,
170 so they can be declared before use.
171 retry_delay_base (int): The base multiplier to use when calculating delay
172 between retry attempts. Defaults to 1.
174 """
175 self._logger = logging.getLogger(__name__)
176 self._connection_parameters = connection_parameters
177 self._exchanges = exchanges
178 self._max_publish_attempts = max_publish_attempts
179 self._publisher_thread = Thread(target=self._publish_from_queue, name=thread_name)
180 self._publish_queue: Queue = Queue()
181 self._retry_delay_base = retry_delay_base
182 self._run_publisher_thread = False
184 def _get_publisher(self) -> Optional[PikaPublisher]:
185 """Attempt to get a publisher until success or no longer needed.
187 This method repeatedly attempts to construct a new PikaPublisher,
188 logging an error (at DEBUG level) after every
189 ``RABBITMQ_PUBLISHER_CREATION_RETRIES`` consecutive failures.
191 Unlike the ``get_publisher`` helper, this method only returns
192 when either a publisher is obtained or the ``RetryingPikaPublisher``
193 is shutting down.
195 .. warning::
196 This method is not thread-safe, so should only be called from
197 within the publisher thread itself.
199 Returns:
200 A :class:`PikaPublisher` if one can be created successfully.
201 ``None`` if the internal flag for checking whether to run the
202 publisher thread is set to ``False``.
204 """
205 publisher = None
206 while self._run_publisher_thread and not publisher:
207 try:
208 publisher = get_publisher(self._connection_parameters, self._exchanges)
209 except Exception:
210 self._logger.debug(
211 "Failed to construct a RabbitMQ publisher in "
212 f"{RABBITMQ_PUBLISHER_CREATION_RETRIES} attempts",
213 exc_info=True
214 )
215 if not self._run_publisher_thread:
216 break
217 return publisher
219 def _publish_from_queue(self) -> None:
220 """Publish messages from the internal publisher queue to RabbitMQ exchanges.
222 This method runs in a separate thread to allow reuse of a single
223 RabbitMQ connection to publish all the messages that the owner of this
224 RetryingPikaPublisher needs to publish.
226 The messages to be published are read from the internal publishing queue
227 and then sent on to RabbitMQ by this method.
229 """
230 self._logger.debug("Starting RabbitMQ publisher thread")
231 publisher = self._get_publisher()
233 while self._run_publisher_thread:
234 try:
235 # Get the next message to publish, with a timeout.
236 # We set a timeout here to allow relatively responsive
237 # shutdown of this thread when `self._run_publisher_thread`
238 # becomes false.
239 message_spec = self._publish_queue.get(timeout=5)
240 except Empty:
241 continue
243 published = False
244 attempts = 0
246 def attempts_remaining():
247 if self._max_publish_attempts == 0:
248 return True
249 return attempts < self._max_publish_attempts
251 # NOTE: Checking `self._run_publisher_thread` serves to both allow
252 # early shutdown if `RetryingPikaPublisher.stop` is called whilst
253 # we're in a retry loop, and also guard against `publisher` being
254 # `None` rather than a `PikaPublisher` instance.
255 while not published and attempts_remaining() and self._run_publisher_thread:
256 attempts += 1
257 try:
258 if publisher is not None:
259 publisher.send(
260 message_spec.exchange.name,
261 message_spec.routing_key,
262 message_spec.payload
263 )
264 published = True
265 else:
266 publisher = self._get_publisher()
267 except (pika.exceptions.UnroutableError, pika.exceptions.NackError) as e:
268 # Not much point attempting to retry these errors
269 self._logger.warning("Message could not be routed", exc_info=True)
270 message_spec.error = e
271 break
272 except (
273 pika.exceptions.ChannelClosed,
274 pika.exceptions.ChannelWrongStateError,
275 pika.exceptions.StreamLostError
276 ):
277 # Throw away the publisher and try again
278 publisher = self._get_publisher()
279 except Exception as e:
280 self._logger.debug("Message publishing failed", exc_info=True)
281 if attempts_remaining():
282 delay = retry_delay(attempts, self._retry_delay_base)
283 self._logger.debug(f"Retrying message publishing in {delay} seconds")
284 sleep(delay)
285 else:
286 # We're done retrying, set the error and give up
287 message_spec.error = e
288 message_spec.set_completion_event()
289 del publisher
291 def start(self):
292 """Start the publisher thread to get ready to publish to RabbitMQ.
294 This method sets the internal flag that determines whether to keep the
295 publisher thread running, and starts the thread. This should be called
296 before calling :meth:`RetryingPikaPublisher.send` to avoid a backlog of
297 messages on startup, although messages sent before calling this method
298 will still be published.
300 """
301 self._run_publisher_thread = True
302 self._publisher_thread.start()
304 def stop(self):
305 """Cleanly stop the RabbitMQ publisher thread.
307 This method sets the internal flag that determines whether to keep the
308 publisher thread running to ``False``, and then blocks until that thread
309 finishes.
311 """
312 self._run_publisher_thread = False
313 self._publisher_thread.join()
315 def send(
316 self,
317 message_spec: MessageSpec,
318 reraise_exceptions: bool=False,
319 wait_for_delivery: bool=False,
320 wait_for_delivery_timeout: Optional[float]=None
321 ) -> bool:
322 """Send a message to be published to a RabbitMQ exchange.
324 This method puts the given :class:`MessageSpec` onto an internal queue
325 which is consumed by the publisher thread. The message will get processed
326 by that thread at some point in the future assuming RabbitMQ is accessible.
328 Setting ``wait_for_delivery`` to ``True`` will make this method block
329 until the publisher thread has handled the message and successfully
330 published it to the specified RabbitMQ exchange.
332 Setting ``reraise_exceptions`` to ``True`` will additionally re-raise
333 the final exception after running out of retries when a message fails
334 to be published to RabbitMQ.
336 .. warning::
337 If using an unlimited number of retries, then setting
338 ``reraise_exceptions`` can lead to this method blocking for an
339 indefinite period of time.
341 Args:
342 message_spec (MessageSpec): The specification of the message that
343 needs to be published to RabbitMQ. This includes the exchange
344 to publish to and the message content, as well as providing
345 a way of communicating publish success/failure from the
346 publisher thread back to the caller.
347 reraise_exceptions (bool): If the message can't be published and
348 the publisher runs out of retry attempts, the Exception that
349 caused the failure will be re-raised if this is ``True``.
350 Note, setting this to ``True`` will make this method block
351 until the publisher thread finishes trying to publish the
352 message, regardless of ``wait_for_delivery_timeout``.
353 wait_for_delivery (bool): Whether or not to block until the
354 publisher thread has confirmed the delivery of the message
355 to the RabbitMQ exchange.
356 wait_for_delivery_timeout (float): Time to wait for delivery
357 confirmation if ``wait_for_delivery`` is set. If ``None``
358 (the default), then wait indefinitely.
360 """
361 self._publish_queue.put(message_spec)
363 if reraise_exceptions and not wait_for_delivery:
364 self._logger.debug(
365 "Publisher got a request to reraise exceptions for "
366 "unroutable messages without wait_for_delivery being "
367 "explicitly set. Waiting for delivery confirmation "
368 "anyway."
369 )
370 wait_for_delivery = True
371 wait_for_delivery_timeout = None
373 completion_event = message_spec.get_completion_event()
374 timed_out = False
375 if wait_for_delivery:
376 timed_out = completion_event.wait(wait_for_delivery_timeout)
377 if reraise_exceptions and message_spec.error is not None:
378 raise message_spec.error
379 return timed_out