Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the RabbitMQ consumer #24

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .env.docker
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
REDIS_URL=redis://redis:6379/0
DATABASE_URL=psql://postgres:postgres@db:5432/postgres
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
RABBITMQ_AMPQ_URL=amqp://guest:guest@rabbitmq:5672/
3 changes: 2 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
REDIS_URL=redis://redis:6379/0
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
RABBITMQ_AMPQ_URL=amqp://guest:guest@rabbitmq:5672/
1 change: 1 addition & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ TEST=True
REDIS_URL=redis://localhost:6379/0
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/postgres
DATABASE_POOL_CLASS=NullPool
RABBITMQ_AMQP_URL=amqp://guest:guest@localhost:5672/
9 changes: 9 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ jobs:
--health-retries 5
ports:
- 5432:5432
rabbitmq:
image: rabbitmq:alpine
ports:
- 5672:5672
options: >-
--health-cmd "rabbitmqctl await_startup"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
Expand Down
5 changes: 4 additions & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ class Settings(BaseSettings):
extra="allow",
case_sensitive=True,
)
TEST: bool = False
REDIS_URL: str = "redis://"
DATABASE_URL: str = "psql://postgres:"
DATABASE_POOL_CLASS: str = "AsyncAdaptedQueuePool"
DATABASE_POOL_SIZE: int = 10
TEST: bool = False
RABBITMQ_AMPQ_URL: str = "amqp://guest:guest@"
RABBITMQ_AMQP_EXCHANGE: str = "safe-transaction-service-events"
RABBITMQ_DECODER_EVENTS_QUEUE_NAME: str = "safe-decoder-service"


settings = Settings()
Empty file.
22 changes: 22 additions & 0 deletions app/datasources/queue/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
class QueueProviderException(Exception):
"""
Generic exception for QueueProvider errors.
"""

pass


class QueueProviderUnableToConnectException(QueueProviderException):
"""
Raised when a connection to RabbitMQ cannot be established.
"""

pass


class QueueProviderNotConnectedException(QueueProviderException):
"""
Raised when no connection is established.
"""

pass
117 changes: 117 additions & 0 deletions app/datasources/queue/queue_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from asyncio import AbstractEventLoop
from typing import Any, Callable

import aio_pika
from aio_pika.abc import (
AbstractExchange,
AbstractIncomingMessage,
AbstractQueue,
AbstractRobustConnection,
ConsumerTag,
ExchangeType,
)

from app.config import settings

from .exceptions import (
QueueProviderNotConnectedException,
QueueProviderUnableToConnectException,
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move the exceptions to exceptions.py inside of the queue package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Moved here


class QueueProvider:

_connection: AbstractRobustConnection | None
_exchange: AbstractExchange | None
_events_queue: AbstractQueue | None

def __init__(self) -> None:
"""
Initializes the QueueProvider instance with default values.
"""
self._connection = None
self._exchange = None
self._events_queue = None

async def _connect(self, loop: AbstractEventLoop) -> None:
"""
Establishes a connection to RabbitMQ and sets up the exchange and queue.

:param loop: The asyncio event loop used for the connection.
:return:
"""
try:
self._connection = await aio_pika.connect_robust(
url=settings.RABBITMQ_AMPQ_URL, loop=loop
)
except aio_pika.exceptions.AMQPConnectionError as e:
raise QueueProviderUnableToConnectException(e)

channel = await self._connection.channel()
self._exchange = await channel.declare_exchange(
settings.RABBITMQ_AMQP_EXCHANGE, ExchangeType.FANOUT
)
self._events_queue = await channel.declare_queue(
settings.RABBITMQ_DECODER_EVENTS_QUEUE_NAME, durable=True
)
if self._events_queue:
await self._events_queue.bind(self._exchange)

async def connect(self, loop: AbstractEventLoop) -> None:
"""
Ensures that the RabbitMQ connection is established.

:param loop: The asyncio event loop used to establish the connection.
:return:
"""
if not self._connection:
await self._connect(loop)

def is_connected(self) -> bool:
"""
Verifies if the connection to RabbitMQ is established.

:return: True` if the connection is established, `False` otherwise.
"""
return self._connection is not None

async def disconnect(self) -> None:
"""
Safely closes the RabbitMQ connection and cleans up resources.

:return:
"""
if self._connection:
if self._events_queue and self._exchange:
await self._events_queue.unbind(exchange=self._exchange)
await self._events_queue.delete(if_unused=False, if_empty=False)
await self._connection.close()
self._exchange = None
self._connection = None
self._events_queue = None

async def consume(self, callback: Callable[[str], Any]) -> ConsumerTag:
"""
Starts consuming messages from the declared queue.

- Each message is processed using the provided callback function.

:param callback: A function to process incoming messages.
:return: A tag identifying the active consumer.
:raises QueueProviderNotConnectedException: if no connection or queue is initialized.
"""
if not self._connection or not self._events_queue:
raise QueueProviderNotConnectedException()

async def wrapped_callback(message: AbstractIncomingMessage) -> None:
"""
Wrapper for processing the message and handling ACKs.

:param message: The incoming RabbitMQ message.
"""
await message.ack()
body = message.body
if body:
callback(body.decode("utf-8"))

return await self._events_queue.consume(wrapped_callback)
36 changes: 36 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,50 @@
import asyncio
import logging
from contextlib import asynccontextmanager

from fastapi import APIRouter, FastAPI

from . import VERSION
from .datasources.queue.exceptions import QueueProviderUnableToConnectException
from .datasources.queue.queue_provider import QueueProvider
from .routers import about, contracts, default
from .services.events import EventsService


@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Define the lifespan of the application:
- Connects to the QueueProvider at startup.
- Disconnects from the QueueProvider at shutdown.
"""
queue_provider = QueueProvider()
consume_task = None
try:
loop = asyncio.get_running_loop()
try:
await queue_provider.connect(loop)
except QueueProviderUnableToConnectException as e:
logging.error(f"Unable to connect to Queue Provider: {e}")
if queue_provider.is_connected():
events_service = EventsService()
consume_task = asyncio.create_task(
queue_provider.consume(events_service.process_event)
)
yield
finally:
if consume_task:
consume_task.cancel()
await queue_provider.disconnect()


app = FastAPI(
title="Safe Decoder Service",
description="Safe Core{API} decoder service",
version=VERSION,
docs_url=None,
redoc_url=None,
lifespan=lifespan,
)

# Router configuration
Expand Down
36 changes: 36 additions & 0 deletions app/services/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import json
import logging
from typing import Dict


class EventsService:

def process_event(self, message: str) -> None:
"""
Processes the incoming event message.

:param message: The incoming message to process, expected to be a JSON string.
"""
try:
tx_service_event = json.loads(message)

if self.is_event_valid(tx_service_event):
# TODO: process event!
pass
else:
logging.error(
f"Unsupported message. A valid message should have at least 'chainId' and 'type': {message}"
)
except json.JSONDecodeError:
logging.error(f"Unsupported message. Cannot parse as JSON: {message}")

def is_event_valid(self, tx_service_event: Dict) -> bool:
"""
Validates if the event has the required fields 'chainId' and 'type' as strings.

:param tx_service_event: The event object to validate.
:return: True if the event is valid (both 'chainId' and 'type' are strings), False otherwise.
"""
return isinstance(tx_service_event.get("chainId"), str) and isinstance(
tx_service_event.get("type"), str
)
Empty file.
Empty file.
57 changes: 57 additions & 0 deletions app/tests/datasources/queue/test_queue_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio
import unittest
from unittest.mock import patch

import aio_pika
from aio_pika.abc import AbstractRobustConnection

from app.config import settings
from app.datasources.queue.exceptions import QueueProviderUnableToConnectException
from app.datasources.queue.queue_provider import QueueProvider


class TestQueueProviderIntegration(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.provider = QueueProvider()
self.loop = asyncio.get_event_loop()

async def test_connect_success(self):
self.assertFalse(self.provider.is_connected())
await self.provider.connect(self.loop)
self.assertTrue(self.provider.is_connected())
await self.provider.disconnect()
self.assertFalse(self.provider.is_connected())

async def test_connect_failure(self):
provider = QueueProvider()

with patch("app.config.settings.RABBITMQ_AMPQ_URL", "amqp://invalid-url"):
with self.assertRaises(QueueProviderUnableToConnectException):
await provider.connect(self.loop)

async def test_consume(self):
await self.provider.connect(self.loop)
assert isinstance(self.provider._connection, AbstractRobustConnection)
message = "Test message"
channel = await self.provider._connection.channel()
exchange = await channel.declare_exchange(
settings.RABBITMQ_AMQP_EXCHANGE, aio_pika.ExchangeType.FANOUT
)

await exchange.publish(
aio_pika.Message(body=message.encode("utf-8")),
routing_key="",
)

received_messages = []

def callback(message: str):
received_messages.append(message)

await self.provider.consume(callback)

# Wait to make sure the message is consumed.
await asyncio.sleep(1)

self.assertIn(message, received_messages)
await self.provider.disconnect()
Empty file.
49 changes: 49 additions & 0 deletions app/tests/services/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import unittest
from unittest.mock import patch

from app.services.events import EventsService


class TestEventsService(unittest.TestCase):

def test_is_event_valid(self):
valid_event = {"chainId": "123", "type": "transaction"}
self.assertTrue(EventsService().is_event_valid(valid_event))

invalid_event_missing_chain_id = {"type": "transaction"}
self.assertFalse(EventsService().is_event_valid(invalid_event_missing_chain_id))

invalid_event_missing_type = {"chainId": "123"}
self.assertFalse(EventsService().is_event_valid(invalid_event_missing_type))

invalid_event_invalid_chain_id = {"chainId": 123, "type": "transaction"}
self.assertFalse(EventsService().is_event_valid(invalid_event_invalid_chain_id))

invalid_event_invalid_type = {"chainId": "123", "type": 123}
self.assertFalse(EventsService().is_event_valid(invalid_event_invalid_type))

@patch("logging.error")
def test_process_event_valid_message(self, mock_log):
valid_message = '{"chainId": "123", "type": "transaction"}'

EventsService().process_event(valid_message)

mock_log.assert_not_called()

@patch("logging.error")
def test_process_event_invalid_json(self, mock_log):
invalid_message = '{"chainId": "123", "type": "transaction"'

EventsService().process_event(invalid_message)

mock_log.assert_called_with(
'Unsupported message. Cannot parse as JSON: {"chainId": "123", "type": "transaction"'
)

invalid_message_invalid_type = '{"chainId": "123", "type": 123}'

EventsService().process_event(invalid_message_invalid_type)

mock_log.assert_called_with(
'Unsupported message. A valid message should have at least \'chainId\' and \'type\': {"chainId": "123", "type": 123}'
)
Loading
Loading