Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/utils.py: 92.86%
14 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2021 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License' is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17RabbitMQ-related utilities
18==========================
20This module contains common utilities related to using RabbitMQ and the
21related internal publishing/consuming thread mechanisms.
23"""
26from threading import Event
28from buildgrid.server.rabbitmq._enums import Exchange
31class MessageSpec:
33 """Information about a message to be published on a RabbitMQ exchange.
35 This class is a simple wrapper around a message payload, intended to be
36 used to communicate a request to publish a RabbitMQ message from a gRPC
37 handler thread to the RabbitMQ publisher thread.
39 It contains all the data a publisher thread needs to publish the message
40 on a RabbitMQ exchange.
42 """
44 def __init__(self, *, exchange: Exchange, payload: bytes, routing_key: str=''):
45 """Instantiate a new MessageSpec.
47 Args:
48 exchange (Exchange): The name of the RabbitMQ exchange that this
49 message should be published to.
50 payload (bytes): The desired message payload, in bytes. This
51 could be a serialized proto for example.
52 routing_key (str): The key to use when routing this message
53 in RabbitMQ. Optional, defaults to ''.
55 """
56 self._event = Event()
57 self.error = None
58 self.exchange = exchange
59 self.payload = payload
60 self.routing_key = routing_key
62 def get_completion_event(self):
63 """Return the ``threading.Event`` which indicates publish completion.
65 This event gets set by the publisher once either the message has been
66 published successfully or the publisher has run out of retry attempts
67 to publish the message.
69 """
70 return self._event
72 def set_completion_event(self):
73 """Set the ``threading.Event`` to mark publishing completed.
75 This should be called by the publisher thread once either the message
76 has been published successfully or the publishing has failed and run
77 out of retries.
79 Calling this method will wake up any threads that are waiting for
80 the publishing of a message to be completed.
82 """
83 self._event.set()