Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor events publisher #1818

Merged
merged 10 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 45 additions & 155 deletions safe_transaction_service/events/services/queue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,44 @@
logger = logging.getLogger(__name__)


class QueueServiceProvider:
def __new__(cls):
if not hasattr(cls, "instance"):
if settings.EVENTS_QUEUE_URL:
if settings.EVENTS_QUEUE_ASYNC_CONNECTION:
cls.instance = AsyncQueueService()
else:
cls.instance = SyncQueueService()
else:
# Mock send_event to not configured host us is not mandatory configure a queue for events
cls.instance = MockedQueueService()
logger.warning("MockedQueueService is used")
return cls.instance
def getQueueService():
if settings.EVENTS_QUEUE_URL:
return SyncQueueService()
else:
# Mock send_event to not configured host us is not mandatory configure a queue for events
return MockedQueueService()
logger.warning("MockedQueueService is used")


class QueueServicePool:
"""
Context manager to get a QueueService connection from the pool or create a new one and append it to the pool if all the
instances are taken. Very useful for gevent, as it is not safe to share one Pika connection across threads.
https://pika.readthedocs.io/en/stable/faq.html
# To avoid leave unsent messages in QueueService instances the queue service pool is rotating every QueueService in the list.
Use:
```
with QueueServicePool() as queue_service:
queue_service...
```
"""

queue_service_pool = []

def __init__(self):
self.instance: QueueService

@classmethod
def del_singleton(cls):
if hasattr(cls, "instance"):
del cls.instance
def __enter__(self):
if self.queue_service_pool:
# If there are elements on the pool, take them
self.instance = self.queue_service_pool.pop()
else:
# If not, get a new client
self.instance = getQueueService()
return self.instance

def __exit__(self, exc_type, exc_val, exc_tb):
self.queue_service_pool.insert(0, self.instance)


class QueueService:
Expand All @@ -55,7 +75,10 @@ def send_event(
if self._channel is None or not self._channel.is_open:
logger.warning("Connection is still not initialized")
if fail_retry:
logger.debug("Adding %s to unsent messages", payload)
self.unsent_events.append(payload)
# Try to reconnect
self.connect()
return False

try:
Expand All @@ -64,9 +87,10 @@ def send_event(
exchange=self.exchange_name, routing_key="", body=event
)
return True
except pika.exceptions.ConnectionClosedByBroker:
except pika.exceptions.AMQPConnectionError:
logger.warning("Event can not be sent due to there is no channel opened")
if fail_retry:
logger.info("Adding %s to unsent messages", payload)
self.unsent_events.append(payload)
return False

Expand All @@ -92,143 +116,6 @@ def remove_unsent_events(self):
self.unsent_events = []


class AsyncQueueService(QueueService):
# Singleton class definition
def __init__(self):
super().__init__()
self.connect()

def connect(self) -> GeventConnection:
"""
This method connects to RabbitMq.
When the connection is established, the on_connection_open method
will be invoked by pika.

:return: GeventConnection
"""
return GeventConnection(
self._connection_parameters,
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed,
)

def on_connection_open(self, connection: GeventConnection):
"""
This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object.

:param GeventConnection connection: The connection
"""

logger.info("Connection opened with %s", self._connection_parameters.host)
self._connection = connection
self.open_channel()

def on_connection_open_error(self, connection: GeventConnection, err: Exception):
"""
This method is called by pika if the connection to RabbitMQ
can't be established. Connection object is paased if were necessary
Always retry the reconnection every 5 seconds.

:param GeventConnection: The connection
:param Exception err: The error
"""
logger.error(
"Connection open failed with %s, retrying in 5 seconds: %s",
self._connection_parameters.host,
err,
)
connection.ioloop.call_later(5, self.connect)

def on_connection_closed(self, connection: GeventConnection, reason: Exception):
"""
This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.

:param GeventConnection: The closed connection obj
:param Exception reason: exception representing reason for loss of
connection.
"""
self._channel = None
logger.error(
"Connection closed with %s, reopening in 5 seconds: %s",
self._connection_parameters.host,
reason,
)
connection.ioloop.call_later(5, self.connect)

def open_channel(self):
"""
This method will open a new channel with RabbitMQ by issuing the
Channel.Open RPC command. When RabbitMQ confirms the channel is open
by sending the Channel.OpenOK RPC reply, the on_channel_open method
will be invoked.
"""
logger.info("Opening a new channel")
self._connection.channel(on_open_callback=self.on_channel_open)

def on_channel_open(self, channel: Channel):
"""
This method is invoked by pika when the channel has been opened.
The channel object is passed in so we can make use of it.

:param pika.channel.Channel channel: The channel object
"""
logger.info("Channel with number %i opened", channel.channel_number)
self._channel = channel
self._channel.add_on_close_callback(self.on_channel_closed)
self.setup_exchange()

def on_channel_closed(self, channel: Channel, reason: Exception):
"""
Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol.
In this method we retry to open a new channel with rabbitMQ if the connection is still open.

:param Channel channel: The closed channel
:param Exception reason: why the channel was closed
"""
logger.warning("Channel %i was closed: %s", channel.channel_number, reason)
self._channel = None
if self._connection and self._connection.is_open:
# If channel was closed and connection is still active we try to reopen the channel
logger.error(
"Connection is opened retry to open channel in 5 seconds: %s",
self._connection_parameters.host,
reason,
)
self._connection.ioloop.call_later(5, self.open_channel())

def setup_exchange(self):
"""
Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
command. When it is complete, the on_exchange_declareok method will
be invoked by pika.
"""
logger.info("Declaring exchange %s", self.exchange_name)

self._channel.exchange_declare(
exchange=self.exchange_name,
exchange_type=ExchangeType.fanout,
durable=True,
callback=self.on_exchange_declareok,
)

def on_exchange_declareok(self, _unused_frame):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
command.
Send unsent messages that cannot be sent as due connection errors.

:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
"""

logger.info("Exchange declared: %s", self.exchange_name)
self.send_unsent_events()


class SyncQueueService(QueueService):
"""
Synchronous connection with test purpose as we cannot test using gevent connection
Expand All @@ -248,7 +135,10 @@ def connect(self) -> BlockingConnection:
try:
self._connection = BlockingConnection(self._connection_parameters)
self._channel = self.open_channel()
self._channel.confirm_delivery()
self.setup_exchange()
# Send messages if there was any missing
self.send_unsent_events()
return self._connection
except pika.exceptions.AMQPConnectionError:
logger.error("Cannot open connection, retrying")
Expand Down
14 changes: 0 additions & 14 deletions safe_transaction_service/events/tasks.py

This file was deleted.

26 changes: 23 additions & 3 deletions safe_transaction_service/events/tests/test_queue_service.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import json
from unittest import mock
from unittest.mock import MagicMock

from django.test import TestCase

from pika.channel import Channel
from pika.exceptions import ConnectionClosedByBroker

from safe_transaction_service.events.services.queue_service import QueueServiceProvider
from safe_transaction_service.events.services.queue_service import (
QueueServicePool,
getQueueService,
)


class TestQueueService(TestCase):
def setUp(self):
self.queue_service = QueueServiceProvider()
self.queue_service = getQueueService()
# Create queue for test
self.queue = "test_queue"
self.queue_service._channel.queue_declare(self.queue)
Expand All @@ -22,7 +26,7 @@ def setUp(self):
self.queue_service._channel.queue_purge(self.queue)

def test_send_unsent_messages(self):
queue_service = QueueServiceProvider()
queue_service = getQueueService()
messages_to_send = 10
queue_service.remove_unsent_events()
with mock.patch.object(
Expand Down Expand Up @@ -55,3 +59,19 @@ def test_send_event_to_queue(self):
# Check if message was written to the queue
_, _, body = self.queue_service._channel.basic_get(self.queue, auto_ack=True)
self.assertEquals(json.loads(body), payload)

@mock.patch(
"safe_transaction_service.events.services.queue_service.getQueueService"
)
def test_queue_service_pool(self, mock_get_queue_service: MagicMock):
queue_service = getQueueService()
QueueServicePool.queue_service_pool = [queue_service]
with QueueServicePool() as queue_service:
self.assertEqual(queue_service, queue_service)
mock_get_queue_service.assert_not_called()

QueueServicePool.queue_service_pool = []
mock_get_queue_service.return_value = queue_service
with QueueServicePool() as queue_service:
self.assertEqual(queue_service, queue_service)
mock_get_queue_service.assert_called_once()
38 changes: 0 additions & 38 deletions safe_transaction_service/events/tests/test_tasks.py

This file was deleted.

5 changes: 3 additions & 2 deletions safe_transaction_service/history/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from django.dispatch import receiver
from django.utils import timezone

from safe_transaction_service.events.tasks import send_event_to_queue_task
from safe_transaction_service.notifications.tasks import send_notification_task

from ..events.services.queue_service import QueueServicePool
from .models import (
ERC20Transfer,
ERC721Transfer,
Expand Down Expand Up @@ -168,7 +168,8 @@ def process_webhook(
countdown=5,
priority=2, # Almost lowest priority
)
send_event_to_queue_task.delay(payload)
with QueueServicePool() as queue_service:
queue_service.send_event(payload)
else:
logger.debug(
"Notification will not be sent for created=%s object=%s",
Expand Down
Loading