Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/utils.py: 92.86%

14 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License' is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17RabbitMQ-related utilities 

18========================== 

19 

20This module contains common utilities related to using RabbitMQ and the 

21related internal publishing/consuming thread mechanisms. 

22 

23""" 

24 

25 

26from threading import Event 

27 

28from buildgrid.server.rabbitmq._enums import Exchange 

29 

30 

31class MessageSpec: 

32 

33 """Information about a message to be published on a RabbitMQ exchange. 

34 

35 This class is a simple wrapper around a message payload, intended to be 

36 used to communicate a request to publish a RabbitMQ message from a gRPC 

37 handler thread to the RabbitMQ publisher thread. 

38 

39 It contains all the data a publisher thread needs to publish the message 

40 on a RabbitMQ exchange. 

41 

42 """ 

43 

44 def __init__(self, *, exchange: Exchange, payload: bytes, routing_key: str=''): 

45 """Instantiate a new MessageSpec. 

46 

47 Args: 

48 exchange (Exchange): The name of the RabbitMQ exchange that this 

49 message should be published to. 

50 payload (bytes): The desired message payload, in bytes. This 

51 could be a serialized proto for example. 

52 routing_key (str): The key to use when routing this message 

53 in RabbitMQ. Optional, defaults to ''. 

54 

55 """ 

56 self._event = Event() 

57 self.error = None 

58 self.exchange = exchange 

59 self.payload = payload 

60 self.routing_key = routing_key 

61 

62 def get_completion_event(self): 

63 """Return the ``threading.Event`` which indicates publish completion. 

64 

65 This event gets set by the publisher once either the message has been 

66 published successfully or the publisher has run out of retry attempts 

67 to publish the message. 

68 

69 """ 

70 return self._event 

71 

72 def set_completion_event(self): 

73 """Set the ``threading.Event`` to mark publishing completed. 

74 

75 This should be called by the publisher thread once either the message 

76 has been published successfully or the publishing has failed and run 

77 out of retries. 

78 

79 Calling this method will wake up any threads that are waiting for 

80 the publishing of a message to be completed. 

81 

82 """ 

83 self._event.set()