Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/pika_publisher.py: 45.22%

115 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16from queue import Empty, Queue 

17from time import sleep 

18from threading import Thread 

19from typing import Dict, Optional 

20 

21import pika # type: ignore 

22 

23from buildgrid.server.rabbitmq.utils import MessageSpec 

24from buildgrid.settings import RABBITMQ_PUBLISHER_CREATION_RETRIES 

25from buildgrid.utils import retry_delay 

26 

27 

28ExchangesDict = Dict[str, pika.exchange_type.ExchangeType] 

29 

30 

31class PikaPublisher: 

32 """Helper class to send messages to a RabbitMQ exchange.""" 

33 def __init__(self, connection_parameters: pika.ConnectionParameters, 

34 exchanges: Optional[ExchangesDict]=None): 

35 """ 

36 Args: 

37 connection_parameters: pika server's connection information 

38 exchanges (dict(str, pika.exchange_type.ExchangeType)): 

39 optional list of exchanges to be declared during 

40 initialization 

41 """ 

42 self._logger = logging.getLogger(__name__) 

43 

44 self._connection = pika.BlockingConnection(parameters=connection_parameters) 

45 # Unlike `PikaConsumer`, this class uses pika's `BlockingConnection` 

46 # adapter. 

47 # That is because it enables `send()` to be implemented as a single 

48 # call to `pika.channel.Channel.basic_publish()`, which provides 

49 # error-handling by raising detailed exception types. 

50 # It also allows to enable delivery confirmations for the channel, 

51 # which makes `basic_publish()` raise `UnroutableError` exceptions 

52 # for messages that fail to be delivered. 

53 # Therefore, users of this class can simply write a single try/except 

54 # block around `send()` (which is also blocking) rather than requiring 

55 # the involvement of a pair of callbacks. 

56 # Note that the cost for code simplicity might be a loss in performance, 

57 # so in the future it could be worth slightly changing the API of the 

58 # Publisher and using pika's `SelectConnection` adapter. 

59 

60 self._channel = self._connection.channel() 

61 # Configure channel to confirm that messages were delivered to 

62 # the broker: 

63 self._channel.confirm_delivery() 

64 

65 if exchanges: 

66 for exchange_name, exchange_type in exchanges.items(): 

67 self._channel.exchange_declare(exchange_name, 

68 exchange_type=exchange_type, 

69 durable=True) 

70 

71 def __del__(self): 

72 if not self._connection.is_closed: 

73 try: 

74 self._connection.close() 

75 self._logger.debug("Connection closed after deleting PikaPublisher.") 

76 except Exception as e: 

77 self._logger.debug(f"Failed to close connection while deleting PikaPublisher: {e}") 

78 

79 def send(self, exchange: str, routing_key: str, body: bytes): 

80 """Send a message to an exchange (blocking call). 

81 

82 Args: 

83 exchange (str): name of the destination exchange 

84 routing_key (str): routing key to bind on 

85 body (bytes): payload of message 

86 

87 Note that, in addition to `pika.exceptions` related to connection 

88 or channel issues, this method may raise: 

89 * `UnroutableError`: if the message is returned by the broker 

90 

91 * `NackError`: if the message is negatively-acknowledged by the broker 

92 """ 

93 self._channel.basic_publish(exchange=exchange, 

94 routing_key=routing_key, 

95 mandatory=True, 

96 body=body) 

97 

98 

99def get_publisher( 

100 params: pika.ConnectionParameters, 

101 exchanges: Optional[ExchangesDict] 

102) -> Optional[PikaPublisher]: 

103 """Create a PikaPublisher, retrying on connection failure. 

104 

105 This method will retry :const:`buildgrid.settings.RABBITMQ_PUBLISHER_CREATION_RETRIES` 

106 times before giving up and returning ``None``. 

107 

108 Args: 

109 params (pika.ConnectionParameters): The Pika connection parameters to 

110 use when opening the connection to RabbitMQ. 

111 exchanges (dict): Map of exchange names to types. This should contain 

112 all the exchanges that this publisher will publish messages on, 

113 so they can be declared before use. 

114 

115 Returns: 

116 :class:``PikaPublisher`` if one can be created, 

117 ``None`` otherwise. 

118 

119 """ 

120 attempt = 1 

121 while attempt < RABBITMQ_PUBLISHER_CREATION_RETRIES: 

122 try: 

123 return PikaPublisher(params, exchanges=exchanges) 

124 except Exception as e: 

125 delay = retry_delay(attempt) 

126 sleep(delay) 

127 attempt += 1 

128 if attempt >= RABBITMQ_PUBLISHER_CREATION_RETRIES: 

129 raise e 

130 return None 

131 

132 

133class RetryingPikaPublisher: 

134 

135 """Class to handle message publishing with retry handling. 

136 

137 This class runs a "publisher thread" which creates a :class:`PikaPublisher` 

138 and uses it to publish messages to RabbitMQ. This thread also handles any 

139 Exceptions which get raised in the process of publishing the messages, 

140 retrying the publishing a configurable number of times (or indefinitely). 

141 

142 The :meth:`RetryingPikaPublisher.send` method handles handing messages that 

143 need to be sent through to the publisher thread, which allows clean and 

144 thread-safe message publishing using ``RetryingPikaPublisher`` across 

145 multiple threads (e.g. gRPC request handlers). 

146 

147 """ 

148 

149 def __init__( 

150 self, 

151 connection_parameters: pika.ConnectionParameters, 

152 thread_name: Optional[str]=None, 

153 max_publish_attempts: int=0, 

154 exchanges: Optional[ExchangesDict]=None, 

155 retry_delay_base: int=1 

156 ): 

157 """Instantiate a new RetryingPikaPublisher. 

158 

159 Args: 

160 connection_parameters (pika.ConnectionParameters): The Pika connection 

161 parameters to use when opening the connection to RabbitMQ. 

162 thread_name (str): Optional name to use for the publisher thread. If 

163 unset then the thread will be named according to Python's default 

164 thread naming scheme. 

165 max_publish_attempts (int): The number of times to attempt to publish 

166 a specific message before giving up. If set to 0 (the default) then 

167 publishing will be retried indefinitely until successful. 

168 exchanges (dict): Map of exchange names to types. This should contain 

169 all the exchanges that this publisher will publish messages on, 

170 so they can be declared before use. 

171 retry_delay_base (int): The base multiplier to use when calculating delay 

172 between retry attempts. Defaults to 1. 

173 

174 """ 

175 self._logger = logging.getLogger(__name__) 

176 self._connection_parameters = connection_parameters 

177 self._exchanges = exchanges 

178 self._max_publish_attempts = max_publish_attempts 

179 self._publisher_thread = Thread(target=self._publish_from_queue, name=thread_name) 

180 self._publish_queue: Queue = Queue() 

181 self._retry_delay_base = retry_delay_base 

182 self._run_publisher_thread = False 

183 

184 def _get_publisher(self) -> Optional[PikaPublisher]: 

185 """Attempt to get a publisher until success or no longer needed. 

186 

187 This method repeatedly attempts to construct a new PikaPublisher, 

188 logging an error (at DEBUG level) after every 

189 ``RABBITMQ_PUBLISHER_CREATION_RETRIES`` consecutive failures. 

190 

191 Unlike the ``get_publisher`` helper, this method only returns 

192 when either a publisher is obtained or the ``RetryingPikaPublisher`` 

193 is shutting down. 

194 

195 .. warning:: 

196 This method is not thread-safe, so should only be called from 

197 within the publisher thread itself. 

198 

199 Returns: 

200 A :class:`PikaPublisher` if one can be created successfully. 

201 ``None`` if the internal flag for checking whether to run the 

202 publisher thread is set to ``False``. 

203 

204 """ 

205 publisher = None 

206 while self._run_publisher_thread and not publisher: 

207 try: 

208 publisher = get_publisher(self._connection_parameters, self._exchanges) 

209 except Exception: 

210 self._logger.debug( 

211 "Failed to construct a RabbitMQ publisher in " 

212 f"{RABBITMQ_PUBLISHER_CREATION_RETRIES} attempts", 

213 exc_info=True 

214 ) 

215 if not self._run_publisher_thread: 

216 break 

217 return publisher 

218 

219 def _publish_from_queue(self) -> None: 

220 """Publish messages from the internal publisher queue to RabbitMQ exchanges. 

221 

222 This method runs in a separate thread to allow reuse of a single 

223 RabbitMQ connection to publish all the messages that the owner of this 

224 RetryingPikaPublisher needs to publish. 

225 

226 The messages to be published are read from the internal publishing queue 

227 and then sent on to RabbitMQ by this method. 

228 

229 """ 

230 self._logger.debug("Starting RabbitMQ publisher thread") 

231 publisher = self._get_publisher() 

232 

233 while self._run_publisher_thread: 

234 try: 

235 # Get the next message to publish, with a timeout. 

236 # We set a timeout here to allow relatively responsive 

237 # shutdown of this thread when `self._run_publisher_thread` 

238 # becomes false. 

239 message_spec = self._publish_queue.get(timeout=5) 

240 except Empty: 

241 continue 

242 

243 published = False 

244 attempts = 0 

245 

246 def attempts_remaining(): 

247 if self._max_publish_attempts == 0: 

248 return True 

249 return attempts < self._max_publish_attempts 

250 

251 # NOTE: Checking `self._run_publisher_thread` serves to both allow 

252 # early shutdown if `RetryingPikaPublisher.stop` is called whilst 

253 # we're in a retry loop, and also guard against `publisher` being 

254 # `None` rather than a `PikaPublisher` instance. 

255 while not published and attempts_remaining() and self._run_publisher_thread: 

256 attempts += 1 

257 try: 

258 if publisher is not None: 

259 publisher.send( 

260 message_spec.exchange.name, 

261 message_spec.routing_key, 

262 message_spec.payload 

263 ) 

264 published = True 

265 else: 

266 publisher = self._get_publisher() 

267 except (pika.exceptions.UnroutableError, pika.exceptions.NackError) as e: 

268 # Not much point attempting to retry these errors 

269 self._logger.warning("Message could not be routed", exc_info=True) 

270 message_spec.error = e 

271 break 

272 except ( 

273 pika.exceptions.ChannelClosed, 

274 pika.exceptions.ChannelWrongStateError, 

275 pika.exceptions.StreamLostError 

276 ): 

277 # Throw away the publisher and try again 

278 publisher = self._get_publisher() 

279 except Exception as e: 

280 self._logger.debug("Message publishing failed", exc_info=True) 

281 if attempts_remaining(): 

282 delay = retry_delay(attempts, self._retry_delay_base) 

283 self._logger.debug(f"Retrying message publishing in {delay} seconds") 

284 sleep(delay) 

285 else: 

286 # We're done retrying, set the error and give up 

287 message_spec.error = e 

288 message_spec.set_completion_event() 

289 del publisher 

290 

291 def start(self): 

292 """Start the publisher thread to get ready to publish to RabbitMQ. 

293 

294 This method sets the internal flag that determines whether to keep the 

295 publisher thread running, and starts the thread. This should be called 

296 before calling :meth:`RetryingPikaPublisher.send` to avoid a backlog of 

297 messages on startup, although messages sent before calling this method 

298 will still be published. 

299 

300 """ 

301 self._run_publisher_thread = True 

302 self._publisher_thread.start() 

303 

304 def stop(self): 

305 """Cleanly stop the RabbitMQ publisher thread. 

306 

307 This method sets the internal flag that determines whether to keep the 

308 publisher thread running to ``False``, and then blocks until that thread 

309 finishes. 

310 

311 """ 

312 self._run_publisher_thread = False 

313 self._publisher_thread.join() 

314 

315 def send( 

316 self, 

317 message_spec: MessageSpec, 

318 reraise_exceptions: bool=False, 

319 wait_for_delivery: bool=False, 

320 wait_for_delivery_timeout: Optional[float]=None 

321 ) -> bool: 

322 """Send a message to be published to a RabbitMQ exchange. 

323 

324 This method puts the given :class:`MessageSpec` onto an internal queue 

325 which is consumed by the publisher thread. The message will get processed 

326 by that thread at some point in the future assuming RabbitMQ is accessible. 

327 

328 Setting ``wait_for_delivery`` to ``True`` will make this method block 

329 until the publisher thread has handled the message and successfully 

330 published it to the specified RabbitMQ exchange. 

331 

332 Setting ``reraise_exceptions`` to ``True`` will additionally re-raise 

333 the final exception after running out of retries when a message fails 

334 to be published to RabbitMQ. 

335 

336 .. warning:: 

337 If using an unlimited number of retries, then setting 

338 ``reraise_exceptions`` can lead to this method blocking for an 

339 indefinite period of time. 

340 

341 Args: 

342 message_spec (MessageSpec): The specification of the message that 

343 needs to be published to RabbitMQ. This includes the exchange 

344 to publish to and the message content, as well as providing 

345 a way of communicating publish success/failure from the 

346 publisher thread back to the caller. 

347 reraise_exceptions (bool): If the message can't be published and 

348 the publisher runs out of retry attempts, the Exception that 

349 caused the failure will be re-raised if this is ``True``. 

350 Note, setting this to ``True`` will make this method block 

351 until the publisher thread finishes trying to publish the 

352 message, regardless of ``wait_for_delivery_timeout``. 

353 wait_for_delivery (bool): Whether or not to block until the 

354 publisher thread has confirmed the delivery of the message 

355 to the RabbitMQ exchange. 

356 wait_for_delivery_timeout (float): Time to wait for delivery 

357 confirmation if ``wait_for_delivery`` is set. If ``None`` 

358 (the default), then wait indefinitely. 

359 

360 """ 

361 self._publish_queue.put(message_spec) 

362 

363 if reraise_exceptions and not wait_for_delivery: 

364 self._logger.debug( 

365 "Publisher got a request to reraise exceptions for " 

366 "unroutable messages without wait_for_delivery being " 

367 "explicitly set. Waiting for delivery confirmation " 

368 "anyway." 

369 ) 

370 wait_for_delivery = True 

371 wait_for_delivery_timeout = None 

372 

373 completion_event = message_spec.get_completion_event() 

374 timed_out = False 

375 if wait_for_delivery: 

376 timed_out = completion_event.wait(wait_for_delivery_timeout) 

377 if reraise_exceptions and message_spec.error is not None: 

378 raise message_spec.error 

379 return timed_out