diff --git a/.env.docker b/.env.docker index 4897451..001f745 100644 --- a/.env.docker +++ b/.env.docker @@ -1,2 +1,3 @@ REDIS_URL=redis://redis:6379/0 -DATABASE_URL=psql://postgres:postgres@db:5432/postgres +DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres +RABBITMQ_AMPQ_URL=amqp://guest:guest@rabbitmq:5672/ \ No newline at end of file diff --git a/.env.sample b/.env.sample index 00898e9..001f745 100644 --- a/.env.sample +++ b/.env.sample @@ -1,2 +1,3 @@ REDIS_URL=redis://redis:6379/0 -DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres \ No newline at end of file +DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/postgres +RABBITMQ_AMPQ_URL=amqp://guest:guest@rabbitmq:5672/ \ No newline at end of file diff --git a/.env.test b/.env.test index d31da29..020e890 100644 --- a/.env.test +++ b/.env.test @@ -2,3 +2,4 @@ TEST=True REDIS_URL=redis://localhost:6379/0 DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/postgres DATABASE_POOL_CLASS=NullPool +RABBITMQ_AMPQ_URL=amqp://guest:guest@localhost:5672/ \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1365220..b6391b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,6 +53,15 @@ jobs: --health-retries 5 ports: - 5432:5432 + rabbitmq: + image: rabbitmq:alpine + ports: + - 5672:5672 + options: >- + --health-cmd "rabbitmqctl await_startup" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/app/config.py b/app/config.py index efbe4d4..5e79b96 100644 --- a/app/config.py +++ b/app/config.py @@ -14,11 +14,14 @@ class Settings(BaseSettings): extra="allow", case_sensitive=True, ) + TEST: bool = False REDIS_URL: str = "redis://" DATABASE_URL: str = "psql://postgres:" DATABASE_POOL_CLASS: str = "AsyncAdaptedQueuePool" DATABASE_POOL_SIZE: int = 10 - TEST: bool = False + RABBITMQ_AMPQ_URL: str = "amqp://guest:guest@" + RABBITMQ_AMQP_EXCHANGE: str = "safe-transaction-service-events" + RABBITMQ_DECODER_EVENTS_QUEUE_NAME: str = "safe-decoder-service" settings = Settings() diff --git a/app/datasources/queue/__init__.py b/app/datasources/queue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/datasources/queue/queue_provider.py b/app/datasources/queue/queue_provider.py new file mode 100644 index 0000000..bf267f1 --- /dev/null +++ b/app/datasources/queue/queue_provider.py @@ -0,0 +1,136 @@ +from asyncio import AbstractEventLoop +from typing import Any, Callable + +import aio_pika +from aio_pika.abc import ( + AbstractExchange, + AbstractIncomingMessage, + AbstractQueue, + AbstractRobustConnection, + ConsumerTag, + ExchangeType, +) + +from app.config import settings + + +class QueueProviderException(Exception): + """ + Generic exception for QueueProvider errors. + """ + + pass + + +class QueueProviderUnableToConnectException(QueueProviderException): + """ + Raised when a connection to RabbitMQ cannot be established. + """ + + pass + + +class QueueProviderNotConnectedException(QueueProviderException): + """ + Raised when no connection is established. + """ + + pass + + +class QueueProvider: + + _connection: AbstractRobustConnection | None + _exchange: AbstractExchange | None + _events_queue: AbstractQueue | None + + def __init__(self) -> None: + """ + Initializes the QueueProvider instance with default values. + """ + self._connection = None + self._exchange = None + self._events_queue = None + + async def _connect(self, loop: AbstractEventLoop) -> None: + """ + Establishes a connection to RabbitMQ and sets up the exchange and queue. + + :param loop: The asyncio event loop used for the connection. + :return: + """ + try: + self._connection = await aio_pika.connect_robust( + url=settings.RABBITMQ_AMPQ_URL, loop=loop + ) + except aio_pika.exceptions.AMQPConnectionError as e: + raise QueueProviderUnableToConnectException(e) + + channel = await self._connection.channel() + self._exchange = await channel.declare_exchange( + settings.RABBITMQ_AMQP_EXCHANGE, ExchangeType.FANOUT + ) + self._events_queue = await channel.declare_queue( + settings.RABBITMQ_DECODER_EVENTS_QUEUE_NAME, durable=True + ) + if self._events_queue: + await self._events_queue.bind(self._exchange) + + async def connect(self, loop: AbstractEventLoop) -> None: + """ + Ensures that the RabbitMQ connection is established. + + :param loop: The asyncio event loop used to establish the connection. + :return: + """ + if not self._connection: + await self._connect(loop) + + def is_connected(self) -> bool: + """ + Verifies if the connection to RabbitMQ is established. + + :return: True` if the connection is established, `False` otherwise. + """ + return self._connection is not None + + async def disconnect(self) -> None: + """ + Safely closes the RabbitMQ connection and cleans up resources. + + :return: + """ + if self._connection: + if self._events_queue and self._exchange: + await self._events_queue.unbind(exchange=self._exchange) + await self._events_queue.delete(if_unused=False, if_empty=False) + await self._connection.close() + self._exchange = None + self._connection = None + self._events_queue = None + + async def consume(self, callback: Callable[[str], Any]) -> ConsumerTag: + """ + Starts consuming messages from the declared queue. + + - Each message is processed using the provided callback function. + + :param callback: A function to process incoming messages. + :return: A tag identifying the active consumer. + :raises QueueProviderNotConnectedException: if no connection or queue is initialized. + """ + if not self._connection or not self._events_queue: + raise QueueProviderNotConnectedException() + + async def wrapped_callback(message: AbstractIncomingMessage) -> None: + """ + Wrapper for processing the message and handling ACKs. + + :param message: The incoming RabbitMQ message. + """ + await message.ack() + body = message.body + if body: + callback(body.decode("utf-8")) + + return await self._events_queue.consume(wrapped_callback) diff --git a/app/main.py b/app/main.py index 5b2e5a1..8179c1d 100644 --- a/app/main.py +++ b/app/main.py @@ -1,7 +1,43 @@ +import asyncio +import logging +from contextlib import asynccontextmanager + from fastapi import APIRouter, FastAPI from . import VERSION +from .datasources.queue.queue_provider import ( + QueueProvider, + QueueProviderUnableToConnectException, +) from .routers import about, contracts, default +from .services.events import EventsService + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """ + Define the lifespan of the application: + - Connects to the QueueProvider at startup. + - Disconnects from the QueueProvider at shutdown. + """ + queue_provider = QueueProvider() + consume_task = None + try: + loop = asyncio.get_running_loop() + try: + await queue_provider.connect(loop) + except QueueProviderUnableToConnectException as e: + logging.error(f"Unable to connect to Queue Provider: {e}") + if queue_provider.is_connected(): + consume_task = asyncio.create_task( + queue_provider.consume(EventsService.process_event) + ) + yield + finally: + if consume_task: + consume_task.cancel() + await queue_provider.disconnect() + app = FastAPI( title="Safe Decoder Service", @@ -9,6 +45,7 @@ version=VERSION, docs_url=None, redoc_url=None, + lifespan=lifespan, ) # Router configuration diff --git a/app/services/events.py b/app/services/events.py new file mode 100644 index 0000000..47b0a72 --- /dev/null +++ b/app/services/events.py @@ -0,0 +1,38 @@ +import json +import logging +from typing import Dict + + +class EventsService: + + @classmethod + def process_event(cls, message: str) -> None: + """ + Processes the incoming event message. + + :param message: The incoming message to process, expected to be a JSON string. + """ + try: + tx_service_event = json.loads(message) + + if cls.is_event_valid(tx_service_event): + # TODO: process event! + pass + else: + logging.error( + f"Unsupported message. A valid message should have at least 'chainId' and 'type': {message}" + ) + except json.JSONDecodeError: + logging.error(f"Unsupported message. Cannot parse as JSON: {message}") + + @staticmethod + def is_event_valid(tx_service_event: Dict) -> bool: + """ + Validates if the event has the required fields 'chainId' and 'type' as strings. + + :param tx_service_event: The event object to validate. + :return: True if the event is valid (both 'chainId' and 'type' are strings), False otherwise. + """ + return isinstance(tx_service_event.get("chainId"), str) and isinstance( + tx_service_event.get("type"), str + ) diff --git a/app/tests/datasources/__init__.py b/app/tests/datasources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/tests/datasources/queue/__init__.py b/app/tests/datasources/queue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/tests/datasources/queue/test_queue_provider.py b/app/tests/datasources/queue/test_queue_provider.py new file mode 100644 index 0000000..1ee68e7 --- /dev/null +++ b/app/tests/datasources/queue/test_queue_provider.py @@ -0,0 +1,59 @@ +import asyncio +import unittest +from unittest.mock import patch + +import aio_pika +from aio_pika.abc import AbstractRobustConnection + +from app.config import settings +from app.datasources.queue.queue_provider import ( + QueueProvider, + QueueProviderUnableToConnectException, +) + + +class TestQueueProviderIntegration(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.provider = QueueProvider() + self.loop = asyncio.get_event_loop() + + async def test_connect_success(self): + self.assertFalse(self.provider.is_connected()) + await self.provider.connect(self.loop) + self.assertTrue(self.provider.is_connected()) + await self.provider.disconnect() + self.assertFalse(self.provider.is_connected()) + + async def test_connect_failure(self): + provider = QueueProvider() + + with patch("app.config.settings.RABBITMQ_AMPQ_URL", "amqp://invalid-url"): + with self.assertRaises(QueueProviderUnableToConnectException): + await provider.connect(self.loop) + + async def test_consume(self): + await self.provider.connect(self.loop) + assert isinstance(self.provider._connection, AbstractRobustConnection) + message = "Test message" + channel = await self.provider._connection.channel() + exchange = await channel.declare_exchange( + settings.RABBITMQ_AMQP_EXCHANGE, aio_pika.ExchangeType.FANOUT + ) + + await exchange.publish( + aio_pika.Message(body=message.encode("utf-8")), + routing_key="", + ) + + received_messages = [] + + def callback(message: str): + received_messages.append(message) + + await self.provider.consume(callback) + + # Wait to make sure the message is consumed. + await asyncio.sleep(1) + + self.assertIn(message, received_messages) + await self.provider.disconnect() diff --git a/app/tests/services/__init__.py b/app/tests/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/tests/services/test_events.py b/app/tests/services/test_events.py new file mode 100644 index 0000000..154716d --- /dev/null +++ b/app/tests/services/test_events.py @@ -0,0 +1,50 @@ +import unittest +from unittest.mock import patch + +from app.services.events import EventsService + + +class TestEventsService(unittest.TestCase): + + def test_is_event_valid(self): + valid_event = {"chainId": "123", "type": "transaction"} + self.assertTrue(EventsService.is_event_valid(valid_event)) + + def test_is_event_invalid(self): + invalid_event_missing_chain_id = {"type": "transaction"} + self.assertFalse(EventsService.is_event_valid(invalid_event_missing_chain_id)) + + invalid_event_missing_type = {"chainId": "123"} + self.assertFalse(EventsService.is_event_valid(invalid_event_missing_type)) + + invalid_event_invalid_chain_id = {"chainId": 123, "type": "transaction"} + self.assertFalse(EventsService.is_event_valid(invalid_event_invalid_chain_id)) + + invalid_event_invalid_type = {"chainId": "123", "type": 123} + self.assertFalse(EventsService.is_event_valid(invalid_event_invalid_type)) + + @patch("logging.error") + def test_process_event_valid_message(self, mock_log): + valid_message = '{"chainId": "123", "type": "transaction"}' + + EventsService.process_event(valid_message) + + mock_log.assert_not_called() + + @patch("logging.error") + def test_process_event_invalid_json(self, mock_log): + invalid_message = '{"chainId": "123", "type": "transaction"' + + EventsService.process_event(invalid_message) + + mock_log.assert_called_with( + 'Unsupported message. Cannot parse as JSON: {"chainId": "123", "type": "transaction"' + ) + + invalid_message_invalid_type = '{"chainId": "123", "type": 123}' + + EventsService.process_event(invalid_message_invalid_type) + + mock_log.assert_called_with( + 'Unsupported message. A valid message should have at least \'chainId\' and \'type\': {"chainId": "123", "type": 123}' + ) diff --git a/docker-compose.yml b/docker-compose.yml index 0d99882..bea154b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,12 +25,27 @@ services: timeout: 5s retries: 3 + # Only for development purposes + rabbitmq: + image: rabbitmq:alpine + ports: + - "5672:5672" + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 10s + timeout: 30s + retries: 5 + redis: image: redis:alpine ports: - "6379:6379" command: - --appendonly yes + healthcheck: + test: [ "CMD", "redis-cli", "ping" ] + timeout: 5s + retries: 3 web: build: @@ -43,6 +58,13 @@ services: - "8888:8888" volumes: - nginx-shared:/nginx + depends_on: + redis: + condition: service_healthy + db: + condition: service_healthy + rabbitmq: + condition: service_healthy command: docker/web/run_web.sh dramatiq-worker: diff --git a/requirements/prod.txt b/requirements/prod.txt index 2214cb0..9a9dad3 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -1,3 +1,4 @@ +aio-pika==9.5.1 alembic==1.14.0 asyncpg==0.30.0 dramatiq[redis, watch]==1.17.1 diff --git a/run_tests.sh b/run_tests.sh index aa515a4..f01b733 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -4,10 +4,10 @@ set -euo pipefail export ENV_FILE=.env.test export DB_NAME=testdb # Test in different database -docker compose -f docker-compose.yml build --force-rm redis db -docker compose -f docker-compose.yml up --no-start redis db -docker compose -f docker-compose.yml start redis db +docker compose -f docker-compose.yml build --force-rm redis db rabbitmq +docker compose -f docker-compose.yml up --no-start redis db rabbitmq +docker compose -f docker-compose.yml start redis db rabbitmq -# sleep 10 +sleep 10 pytest -rxXs