From 322a49a75260d72ba1eadf3f731f4e97b1005668 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Wed, 18 Dec 2024 00:01:27 +0300 Subject: [PATCH] docs: add FastStream integration --- docs/index.md | 1 + docs/introduction/fastapi.md | 2 +- docs/introduction/faststream.md | 42 ++++++++++++++++ pyproject.toml | 8 +-- tests/integrations/faststream/__init__.py | 5 ++ .../faststream/test_faststream_di.py | 46 +++++++++++++++++ .../test_faststream_di_pass_message.py | 50 +++++++++++++++++++ 7 files changed, 150 insertions(+), 4 deletions(-) create mode 100644 docs/introduction/faststream.md create mode 100644 tests/integrations/faststream/__init__.py create mode 100644 tests/integrations/faststream/test_faststream_di.py create mode 100644 tests/integrations/faststream/test_faststream_di_pass_message.py diff --git a/docs/index.md b/docs/index.md index 4d128ce..c31d7fe 100644 --- a/docs/index.md +++ b/docs/index.md @@ -9,6 +9,7 @@ introduction/ioc-container introduction/fastapi introduction/litestar + introduction/faststream introduction/inject-factories introduction/multiple-containers introduction/dynamic-container diff --git a/docs/introduction/fastapi.md b/docs/introduction/fastapi.md index 0d1075f..d709418 100644 --- a/docs/introduction/fastapi.md +++ b/docs/introduction/fastapi.md @@ -32,7 +32,7 @@ async def read_root( container.DependentFactory, fastapi.Depends(container.DIContainer.dependent_factory), ], -) -> datetime.datetime: +) -> str: return some_dependency.async_resource diff --git a/docs/introduction/faststream.md b/docs/introduction/faststream.md new file mode 100644 index 0000000..6332469 --- /dev/null +++ b/docs/introduction/faststream.md @@ -0,0 +1,42 @@ +# Usage with `FastStream` + +```python +import datetime +import contextlib +import typing + +from faststream import FastStream, Depends, Logger +from faststream.rabbit import RabbitBroker + +from tests import container + + +@contextlib.asynccontextmanager +async def lifespan_manager() -> typing.AsyncIterator[None]: + try: + yield + finally: + await container.DIContainer.tear_down() + + +broker = RabbitBroker() +app = FastStream(broker, lifespan=lifespan_manager) + + +@broker.subscriber("in") +async def read_root( + logger: Logger, + some_dependency: typing.Annotated[ + container.DependentFactory, + Depends(container.DIContainer.dependent_factory) + ], +) -> datetime.datetime: + startup_time = some_dependency.async_resource + logger.info(startup_time) + return startup_time + + +@app.after_startup +async def t() -> None: + await broker.publish(None, "in") +``` diff --git a/pyproject.toml b/pyproject.toml index dacf98f..370ebaa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,8 +27,6 @@ docs = "https://that-depends.readthedocs.io" [dependency-groups] dev = [ - "fastapi", - "litestar", "httpx", "pytest", "pytest-cov", @@ -37,7 +35,11 @@ dev = [ "ruff", "mypy", "typing-extensions", - "pre-commit" + "pre-commit", + # integrations tests + "fastapi", + "litestar", + "faststream[nats]", ] [build-system] diff --git a/tests/integrations/faststream/__init__.py b/tests/integrations/faststream/__init__.py new file mode 100644 index 0000000..9c7848a --- /dev/null +++ b/tests/integrations/faststream/__init__.py @@ -0,0 +1,5 @@ +import pytest + + +pytest.importorskip("faststream") +pytest.importorskip("nats") diff --git a/tests/integrations/faststream/test_faststream_di.py b/tests/integrations/faststream/test_faststream_di.py new file mode 100644 index 0000000..67c9864 --- /dev/null +++ b/tests/integrations/faststream/test_faststream_di.py @@ -0,0 +1,46 @@ +import datetime +import typing + +from faststream import Depends +from faststream.nats import NatsBroker, TestNatsBroker + +from tests import container + + +broker = NatsBroker() + +TEST_SUBJECT = "test" + + +@broker.subscriber(TEST_SUBJECT) +async def index_subscruber( + dependency: typing.Annotated[ + container.DependentFactory, + Depends(container.DIContainer.dependent_factory), + ], + free_dependency: typing.Annotated[ + container.FreeFactory, + Depends(container.DIContainer.resolver(container.FreeFactory)), + ], + singleton: typing.Annotated[ + container.SingletonFactory, + Depends(container.DIContainer.singleton), + ], + singleton_attribute: typing.Annotated[bool, Depends(container.DIContainer.singleton.dep1)], +) -> datetime.datetime: + assert dependency.sync_resource == free_dependency.dependent_factory.sync_resource + assert dependency.async_resource == free_dependency.dependent_factory.async_resource + assert singleton.dep1 is True + assert singleton_attribute is True + return dependency.async_resource + + +async def test_read_main() -> None: + async with TestNatsBroker(broker) as br: + result = await br.request(None, TEST_SUBJECT) + + result_str = typing.cast(str, await result.decode()) + assert ( + datetime.datetime.fromisoformat(result_str.replace("Z", "+00:00")) + == await container.DIContainer.async_resource() + ) diff --git a/tests/integrations/faststream/test_faststream_di_pass_message.py b/tests/integrations/faststream/test_faststream_di_pass_message.py new file mode 100644 index 0000000..fc07835 --- /dev/null +++ b/tests/integrations/faststream/test_faststream_di_pass_message.py @@ -0,0 +1,50 @@ +import typing + +from faststream import BaseMiddleware, Context, Depends +from faststream.broker.message import StreamMessage +from faststream.nats import NatsBroker, TestNatsBroker +from faststream.nats.message import NatsMessage + +from that_depends import BaseContainer, container_context, fetch_context_item, providers + + +class ContextMiddleware(BaseMiddleware): + async def consume_scope( + self, + call_next: typing.Callable[..., typing.Awaitable[typing.Any]], + msg: StreamMessage[typing.Any], + ) -> typing.Any: # noqa: ANN401 + async with container_context({"request": msg}): + return await call_next(msg) + + +broker = NatsBroker(middlewares=(ContextMiddleware,), validate=False) + +TEST_SUBJECT = "test" + + +class DIContainer(BaseContainer): + context_request = providers.Factory( + lambda: fetch_context_item("request"), + ) + + +@broker.subscriber(TEST_SUBJECT) +async def index_subscruber( + context_request: typing.Annotated[ + NatsMessage, + Depends(DIContainer.context_request, cast=False), + ], + message: typing.Annotated[ + NatsMessage, + Context(), + ], +) -> bool: + return message is context_request + + +async def test_read_main() -> None: + async with TestNatsBroker(broker) as br: + result = await br.request(None, TEST_SUBJECT) + + assert (await result.decode()) is True