Skip to content

Commit

Permalink
Add the RabbitMQ consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
falvaradorodriguez committed Dec 12, 2024
1 parent 4bf14f7 commit a3c935b
Show file tree
Hide file tree
Showing 17 changed files with 365 additions and 7 deletions.
3 changes: 2 additions & 1 deletion .env.docker
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
REDIS_URL=redis://redis:6379/0
DATABASE_URL=psql://postgres:postgres@db:5432/postgres
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
RABBITMQ_AMPQ_URL=amqp://guest:guest@rabbitmq:5672/
3 changes: 2 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
REDIS_URL=redis://redis:6379/0
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres
RABBITMQ_AMPQ_URL=amqp://guest:guest@rabbitmq:5672/
1 change: 1 addition & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ TEST=True
REDIS_URL=redis://localhost:6379/0
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/postgres
DATABASE_POOL_CLASS=NullPool
RABBITMQ_AMPQ_URL=amqp://guest:guest@localhost:5672/
9 changes: 9 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ jobs:
--health-retries 5
ports:
- 5432:5432
rabbitmq:
image: rabbitmq:alpine
ports:
- 5672:5672
options: >-
--health-cmd "rabbitmqctl await_startup"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
Expand Down
5 changes: 4 additions & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ class Settings(BaseSettings):
extra="allow",
case_sensitive=True,
)
TEST: bool = False
REDIS_URL: str = "redis://"
DATABASE_URL: str = "psql://postgres:"
DATABASE_POOL_CLASS: str = "AsyncAdaptedQueuePool"
DATABASE_POOL_SIZE: int = 10
TEST: bool = False
RABBITMQ_AMPQ_URL: str = "amqp://guest:guest@"
RABBITMQ_AMQP_EXCHANGE: str = "safe-transaction-service-events"
RABBITMQ_DECODER_EVENTS_QUEUE_NAME: str = "safe-decoder-service"


settings = Settings()
Empty file.
136 changes: 136 additions & 0 deletions app/datasources/queue/queue_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from asyncio import AbstractEventLoop
from typing import Any, Callable

import aio_pika
from aio_pika.abc import (
AbstractExchange,
AbstractIncomingMessage,
AbstractQueue,
AbstractRobustConnection,
ConsumerTag,
ExchangeType,
)

from app.config import settings


class QueueProviderException(Exception):
"""
Generic exception for QueueProvider errors.
"""

pass


class QueueProviderUnableToConnectException(QueueProviderException):
"""
Raised when a connection to RabbitMQ cannot be established.
"""

pass


class QueueProviderNotConnectedException(QueueProviderException):
"""
Raised when no connection is established.
"""

pass


class QueueProvider:

_connection: AbstractRobustConnection | None
_exchange: AbstractExchange | None
_events_queue: AbstractQueue | None

def __init__(self) -> None:
"""
Initializes the QueueProvider instance with default values.
"""
self._connection = None
self._exchange = None
self._events_queue = None

async def _connect(self, loop: AbstractEventLoop) -> None:
"""
Establishes a connection to RabbitMQ and sets up the exchange and queue.
:param loop: The asyncio event loop used for the connection.
:return:
"""
try:
self._connection = await aio_pika.connect_robust(
url=settings.RABBITMQ_AMPQ_URL, loop=loop
)
except aio_pika.exceptions.AMQPConnectionError as e:
raise QueueProviderUnableToConnectException(e)

channel = await self._connection.channel()
self._exchange = await channel.declare_exchange(
settings.RABBITMQ_AMQP_EXCHANGE, ExchangeType.FANOUT
)
self._events_queue = await channel.declare_queue(
settings.RABBITMQ_DECODER_EVENTS_QUEUE_NAME, durable=True
)
if self._events_queue:
await self._events_queue.bind(self._exchange)

async def connect(self, loop: AbstractEventLoop) -> None:
"""
Ensures that the RabbitMQ connection is established.
:param loop: The asyncio event loop used to establish the connection.
:return:
"""
if not self._connection:
await self._connect(loop)

def is_connected(self) -> bool:
"""
Verifies if the connection to RabbitMQ is established.
:return: True` if the connection is established, `False` otherwise.
"""
return self._connection is not None

async def disconnect(self) -> None:
"""
Safely closes the RabbitMQ connection and cleans up resources.
:return:
"""
if self._connection:
if self._events_queue and self._exchange:
await self._events_queue.unbind(exchange=self._exchange)
await self._events_queue.delete(if_unused=False, if_empty=False)
await self._connection.close()
self._exchange = None
self._connection = None
self._events_queue = None

async def consume(self, callback: Callable[[str], Any]) -> ConsumerTag:
"""
Starts consuming messages from the declared queue.
- Each message is processed using the provided callback function.
:param callback: A function to process incoming messages.
:return: A tag identifying the active consumer.
:raises QueueProviderNotConnectedException: if no connection or queue is initialized.
"""
if not self._connection or not self._events_queue:
raise QueueProviderNotConnectedException()

async def wrapped_callback(message: AbstractIncomingMessage) -> None:
"""
Wrapper for processing the message and handling ACKs.
:param message: The incoming RabbitMQ message.
"""
await message.ack()
body = message.body
if body:
callback(body.decode("utf-8"))

return await self._events_queue.consume(wrapped_callback)
37 changes: 37 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,51 @@
import asyncio
import logging
from contextlib import asynccontextmanager

from fastapi import APIRouter, FastAPI

from . import VERSION
from .datasources.queue.queue_provider import (
QueueProvider,
QueueProviderUnableToConnectException,
)
from .routers import about, contracts, default
from .services.events import EventsService


@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Define the lifespan of the application:
- Connects to the QueueProvider at startup.
- Disconnects from the QueueProvider at shutdown.
"""
queue_provider = QueueProvider()
consume_task = None
try:
loop = asyncio.get_running_loop()
try:
await queue_provider.connect(loop)
except QueueProviderUnableToConnectException as e:
logging.error(f"Unable to connect to Queue Provider: {e}")
if queue_provider.is_connected():
consume_task = asyncio.create_task(
queue_provider.consume(EventsService.process_event)
)
yield
finally:
if consume_task:
consume_task.cancel()
await queue_provider.disconnect()


app = FastAPI(
title="Safe Decoder Service",
description="Safe Core{API} decoder service",
version=VERSION,
docs_url=None,
redoc_url=None,
lifespan=lifespan,
)

# Router configuration
Expand Down
38 changes: 38 additions & 0 deletions app/services/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import json
import logging
from typing import Dict


class EventsService:

@classmethod
def process_event(cls, message: str) -> None:
"""
Processes the incoming event message.
:param message: The incoming message to process, expected to be a JSON string.
"""
try:
tx_service_event = json.loads(message)

if cls.is_event_valid(tx_service_event):
# TODO: process event!
pass
else:
logging.error(
f"Unsupported message. A valid message should have at least 'chainId' and 'type': {message}"
)
except json.JSONDecodeError:
logging.error(f"Unsupported message. Cannot parse as JSON: {message}")

@staticmethod
def is_event_valid(tx_service_event: Dict) -> bool:
"""
Validates if the event has the required fields 'chainId' and 'type' as strings.
:param tx_service_event: The event object to validate.
:return: True if the event is valid (both 'chainId' and 'type' are strings), False otherwise.
"""
return isinstance(tx_service_event.get("chainId"), str) and isinstance(
tx_service_event.get("type"), str
)
Empty file.
Empty file.
59 changes: 59 additions & 0 deletions app/tests/datasources/queue/test_queue_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import unittest
from unittest.mock import patch

import aio_pika
from aio_pika.abc import AbstractRobustConnection

from app.config import settings
from app.datasources.queue.queue_provider import (
QueueProvider,
QueueProviderUnableToConnectException,
)


class TestQueueProviderIntegration(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.provider = QueueProvider()
self.loop = asyncio.get_event_loop()

async def test_connect_success(self):
self.assertFalse(self.provider.is_connected())
await self.provider.connect(self.loop)
self.assertTrue(self.provider.is_connected())
await self.provider.disconnect()
self.assertFalse(self.provider.is_connected())

async def test_connect_failure(self):
provider = QueueProvider()

with patch("app.config.settings.RABBITMQ_AMPQ_URL", "amqp://invalid-url"):
with self.assertRaises(QueueProviderUnableToConnectException):
await provider.connect(self.loop)

async def test_consume(self):
await self.provider.connect(self.loop)
assert isinstance(self.provider._connection, AbstractRobustConnection)
message = "Test message"
channel = await self.provider._connection.channel()
exchange = await channel.declare_exchange(
settings.RABBITMQ_AMQP_EXCHANGE, aio_pika.ExchangeType.FANOUT
)

await exchange.publish(
aio_pika.Message(body=message.encode("utf-8")),
routing_key="",
)

received_messages = []

def callback(message: str):
received_messages.append(message)

await self.provider.consume(callback)

# Wait to make sure the message is consumed.
await asyncio.sleep(1)

self.assertIn(message, received_messages)
await self.provider.disconnect()
Empty file added app/tests/services/__init__.py
Empty file.
50 changes: 50 additions & 0 deletions app/tests/services/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import unittest
from unittest.mock import patch

from app.services.events import EventsService


class TestEventsService(unittest.TestCase):

def test_is_event_valid(self):
valid_event = {"chainId": "123", "type": "transaction"}
self.assertTrue(EventsService.is_event_valid(valid_event))

def test_is_event_invalid(self):
invalid_event_missing_chain_id = {"type": "transaction"}
self.assertFalse(EventsService.is_event_valid(invalid_event_missing_chain_id))

invalid_event_missing_type = {"chainId": "123"}
self.assertFalse(EventsService.is_event_valid(invalid_event_missing_type))

invalid_event_invalid_chain_id = {"chainId": 123, "type": "transaction"}
self.assertFalse(EventsService.is_event_valid(invalid_event_invalid_chain_id))

invalid_event_invalid_type = {"chainId": "123", "type": 123}
self.assertFalse(EventsService.is_event_valid(invalid_event_invalid_type))

@patch("logging.error")
def test_process_event_valid_message(self, mock_log):
valid_message = '{"chainId": "123", "type": "transaction"}'

EventsService.process_event(valid_message)

mock_log.assert_not_called()

@patch("logging.error")
def test_process_event_invalid_json(self, mock_log):
invalid_message = '{"chainId": "123", "type": "transaction"'

EventsService.process_event(invalid_message)

mock_log.assert_called_with(
'Unsupported message. Cannot parse as JSON: {"chainId": "123", "type": "transaction"'
)

invalid_message_invalid_type = '{"chainId": "123", "type": 123}'

EventsService.process_event(invalid_message_invalid_type)

mock_log.assert_called_with(
'Unsupported message. A valid message should have at least \'chainId\' and \'type\': {"chainId": "123", "type": 123}'
)
Loading

0 comments on commit a3c935b

Please sign in to comment.