From bdb401d04b91875def63f820f362ad2118f41949 Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Tue, 2 Jan 2024 12:33:48 +0100 Subject: [PATCH] Remove asynchronous class --- .../events/services/queue_service.py | 160 +----------------- 1 file changed, 5 insertions(+), 155 deletions(-) diff --git a/safe_transaction_service/events/services/queue_service.py b/safe_transaction_service/events/services/queue_service.py index 2de4beb3d..8bf6af2d3 100644 --- a/safe_transaction_service/events/services/queue_service.py +++ b/safe_transaction_service/events/services/queue_service.py @@ -17,10 +17,7 @@ @lru_cache() def getQueueService(): if settings.EVENTS_QUEUE_URL: - if settings.EVENTS_QUEUE_ASYNC_CONNECTION: - return AsyncQueueService() - else: - return SyncQueueService() + return SyncQueueService() else: # Mock send_event to not configured host us is not mandatory configure a queue for events return MockedQueueService() @@ -49,11 +46,10 @@ def send_event( if self._channel is None or not self._channel.is_open: logger.warning("Connection is still not initialized") if fail_retry: - logger.info("Adding %s to unsent messages", payload) + logger.debug("Adding %s to unsent messages", payload) self.unsent_events.append(payload) - if not self.is_async(): - # Try to reconnect - self.connect() + # Try to reconnect + self.connect() return False try: @@ -62,7 +58,7 @@ def send_event( exchange=self.exchange_name, routing_key="", body=event ) return True - except pika.exceptions.ConnectionClosedByBroker: + except pika.exceptions.AMQPConnectionError: logger.warning("Event can not be sent due to there is no channel opened") if fail_retry: logger.info("Adding %s to unsent messages", payload) @@ -91,148 +87,6 @@ def remove_unsent_events(self): self.unsent_events = [] -class AsyncQueueService(QueueService): - # Singleton class definition - def __init__(self): - super().__init__() - self.connect() - - @property - def is_async(self): - return True - - def connect(self) -> GeventConnection: - """ - This method connects to RabbitMq. - When the connection is established, the on_connection_open method - will be invoked by pika. - - :return: GeventConnection - """ - return GeventConnection( - self._connection_parameters, - on_open_callback=self.on_connection_open, - on_open_error_callback=self.on_connection_open_error, - on_close_callback=self.on_connection_closed, - ) - - def on_connection_open(self, connection: GeventConnection): - """ - This method is called by pika once the connection to RabbitMQ has - been established. It passes the handle to the connection object. - - :param GeventConnection connection: The connection - """ - - logger.info("Connection opened with %s", self._connection_parameters.host) - self._connection = connection - self.open_channel() - - def on_connection_open_error(self, connection: GeventConnection, err: Exception): - """ - This method is called by pika if the connection to RabbitMQ - can't be established. Connection object is paased if were necessary - Always retry the reconnection every 5 seconds. - - :param GeventConnection: The connection - :param Exception err: The error - """ - logger.error( - "Connection open failed with %s, retrying in 5 seconds: %s", - self._connection_parameters.host, - err, - ) - connection.ioloop.call_later(5, self.connect) - - def on_connection_closed(self, connection: GeventConnection, reason: Exception): - """ - This method is invoked by pika when the connection to RabbitMQ is - closed unexpectedly. Since it is unexpected, we will reconnect to - RabbitMQ if it disconnects. - - :param GeventConnection: The closed connection obj - :param Exception reason: exception representing reason for loss of - connection. - """ - self._connection = connection - self._channel = None - logger.error( - "Connection closed with %s, reopening in 5 seconds: %s", - self._connection_parameters.host, - reason, - ) - connection.ioloop.call_later(5, self.connect) - - def open_channel(self): - """ - This method will open a new channel with RabbitMQ by issuing the - Channel.Open RPC command. When RabbitMQ confirms the channel is open - by sending the Channel.OpenOK RPC reply, the on_channel_open method - will be invoked. - """ - logger.info("Opening a new channel") - self._connection.channel(on_open_callback=self.on_channel_open) - - def on_channel_open(self, channel: Channel): - """ - This method is invoked by pika when the channel has been opened. - The channel object is passed in so we can make use of it. - - :param pika.channel.Channel channel: The channel object - """ - logger.info("Channel with number %i opened", channel.channel_number) - self._channel = channel - self._channel.add_on_close_callback(self.on_channel_closed) - self.setup_exchange() - - def on_channel_closed(self, channel: Channel, reason: Exception): - """ - Invoked by pika when RabbitMQ unexpectedly closes the channel. - Channels are usually closed if you attempt to do something that - violates the protocol. - In this method we retry to open a new channel with rabbitMQ if the connection is still open. - - :param Channel channel: The closed channel - :param Exception reason: why the channel was closed - """ - logger.warning("Channel %i was closed: %s", channel.channel_number, reason) - self._channel = None - if self._connection and self._connection.is_open: - # If channel was closed and connection is still active we try to reopen the channel - logger.error( - "Connection is opened retry to open channel in 5 seconds: %s", - self._connection_parameters.host, - reason, - ) - self._connection.ioloop.call_later(5, self.open_channel()) - - def setup_exchange(self): - """ - Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC - command. When it is complete, the on_exchange_declareok method will - be invoked by pika. - """ - logger.info("Declaring exchange %s", self.exchange_name) - - self._channel.exchange_declare( - exchange=self.exchange_name, - exchange_type=ExchangeType.fanout, - durable=True, - callback=self.on_exchange_declareok, - ) - - def on_exchange_declareok(self, _unused_frame): - """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC - command. - Send unsent messages that cannot be sent as due connection errors. - - :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame - """ - - logger.info("Exchange declared: %s", self.exchange_name) - self.send_unsent_events() - - class SyncQueueService(QueueService): """ Synchronous connection with test purpose as we cannot test using gevent connection @@ -242,10 +96,6 @@ def __init__(self): super().__init__() self.connect() - @property - def is_async(self): - return False - def connect(self) -> BlockingConnection: """ This method connects to RabbitMq using Blockingconnection.