Skip to content

Commit

Permalink
feat(libs): add rabbitmq support
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxin688 committed Jun 14, 2024
1 parent ded9e49 commit 7f5cda2
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 11 deletions.
34 changes: 26 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
name = "fastapi-enterprise-template"
version = "0.1.0"
description = "cookiecutter enterprise template for fastapi"
authors = [
{ name = "jeffry", email = "[email protected]" }
]
authors = [{ name = "jeffry", email = "[email protected]" }]
dependencies = [
"fastapi>=0.108.0",
"alembic>=1.13.1",
Expand All @@ -13,7 +11,7 @@ dependencies = [
"uvicorn[standard]>=0.25.0",
"pyjwt>=2.8.0",
"pandas>=2.1.4",
"redis>=5.0.1",
"redis>=5.0.6",
"sentry_sdk>=1.39.2",
"httpx>=0.26.0",
"pydantic_extra_types>=2.4.1",
Expand All @@ -23,6 +21,7 @@ dependencies = [
"cryptography>=42.0.7",
"bcrypt>=4.1.3",
"gunicorn>=22.0.0",
"aio-pika>=9.4.1",
]
readme = "README.md"
requires-python = ">= 3.12"
Expand Down Expand Up @@ -61,7 +60,24 @@ target-version = "py312"

[tool.ruff.lint]
select = ["ALL"]
ignore = ["D", "G002", "DTZ003", "ANN401", "ANN101", "ANN102", "EM101", "PD901", "COM812", "ISC001", "FBT", "A003", "PLR0913", "G004", "E501"]
ignore = [
"D",
"G002",
"DTZ003",
"ANN401",
"ANN101",
"ANN102",
"EM101",
"PD901",
"COM812",
"ISC001",
"FBT",
"A003",
"PLR0913",
"G004",
"E501",
"TRY003",
]
fixable = ["ALL"]


Expand All @@ -73,12 +89,12 @@ fixable = ["ALL"]
"api.py" = ["A002", "B008"]
"deps.py" = ["B008"]
"src/internal/api.py" = ["ARG001"]
"src/auth/schemas.py" = ["N815"] # frontend menu
"src/auth/schemas.py" = ["N815"] # frontend menu
"alembic/*.py" = ["INP001", "UP007"]
"__init__.py" = ["F403"]

[tool.ruff.lint.flake8-bugbear]
extend-immutable-calls=[
extend-immutable-calls = [
"fastapi.Depends",
"fastapi.Query",
"fastapi.params_functions.Form",
Expand Down Expand Up @@ -115,7 +131,7 @@ exclude_lines = [
]

[tool.coverage.run]
concurrency=["thread", "greenlet"]
concurrency = ["thread", "greenlet"]

[tool.mypy]
exclude = "^tools/.*"
Expand All @@ -141,3 +157,5 @@ warn_untyped_fields = true

[tool.pyright]
include = ["src", "tests", "examples"]
reportIncompatibleVariableOverride = false
pythonVersion = "3.12"
14 changes: 13 additions & 1 deletion requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
# with-sources: false

-e file:.
aio-pika==9.4.1
# via fastapi-enterprise-template
aiormq==6.8.0
# via aio-pika
alembic==1.13.1
# via fastapi-enterprise-template
annotated-types==0.6.0
Expand Down Expand Up @@ -64,6 +68,7 @@ identify==2.5.33
idna==3.6
# via anyio
# via httpx
# via yarl
iniconfig==2.0.0
# via pytest
jinja2==3.1.3
Expand All @@ -74,6 +79,8 @@ markupsafe==2.1.3
# via jinja2
# via mako
# via wtforms
multidict==6.0.5
# via yarl
mypy==1.8.0
mypy-extensions==1.0.0
# via black
Expand All @@ -86,6 +93,8 @@ packaging==23.2
# via black
# via gunicorn
# via pytest
pamqp==3.3.0
# via aiormq
pandas==2.1.4
# via fastapi-enterprise-template
pathspec==0.12.1
Expand Down Expand Up @@ -131,7 +140,7 @@ pytz==2023.3.post1
pyyaml==6.0.1
# via pre-commit
# via uvicorn
redis==5.0.1
redis==5.0.6
# via fastapi-enterprise-template
ruff==0.1.11
sentry-sdk==1.39.2
Expand Down Expand Up @@ -183,3 +192,6 @@ websockets==12.0
# via uvicorn
wtforms==3.1.2
# via sqladmin
yarl==1.9.4
# via aio-pika
# via aiormq
14 changes: 13 additions & 1 deletion requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
# with-sources: false

-e file:.
aio-pika==9.4.1
# via fastapi-enterprise-template
aiormq==6.8.0
# via aio-pika
alembic==1.13.1
# via fastapi-enterprise-template
annotated-types==0.6.0
Expand Down Expand Up @@ -48,6 +52,7 @@ httpx==0.26.0
idna==3.6
# via anyio
# via httpx
# via yarl
jinja2==3.1.3
# via sqladmin
mako==1.3.0
Expand All @@ -56,10 +61,14 @@ markupsafe==2.1.3
# via jinja2
# via mako
# via wtforms
multidict==6.0.5
# via yarl
numpy==1.26.3
# via pandas
packaging==24.1
# via gunicorn
pamqp==3.3.0
# via aiormq
pandas==2.1.4
# via fastapi-enterprise-template
phonenumbers==8.13.27
Expand Down Expand Up @@ -89,7 +98,7 @@ pytz==2023.3.post1
# via pandas
pyyaml==6.0.1
# via uvicorn
redis==5.0.1
redis==5.0.6
# via fastapi-enterprise-template
sentry-sdk==1.39.2
# via fastapi-enterprise-template
Expand Down Expand Up @@ -127,3 +136,6 @@ websockets==12.0
# via uvicorn
wtforms==3.1.2
# via sqladmin
yarl==1.9.4
# via aio-pika
# via aiormq
14 changes: 13 additions & 1 deletion src/features/admin/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from src.core.repositories import BaseRepository
from src.core.utils.context import locale_ctx
from src.features.admin import schemas
from src.features.admin.consts import ReservedRoleSlug
from src.features.admin.models import Group, Menu, Permission, Role, User
from src.features.admin.security import verify_password

Expand Down Expand Up @@ -65,7 +66,18 @@ class GroupRepo(BaseRepository[Group, schemas.GroupCreate, schemas.GroupUpdate,


class RoleRepo(BaseRepository[Role, schemas.RoleCreate, schemas.RoleUpdate, schemas.RoleQuery]):
...
async def create(
self,
session: AsyncSession,
obj_in: schemas.RoleCreate,
excludes: set[str] | None = None,
exclude_unset: bool = False,
exclude_none: bool = False,
commit: bool | None = True,
) -> Role:
if obj_in.slug == ReservedRoleSlug.ADMIN:
raise PermissionDenyError("Admin role can't be created again")
return await super().create(session, obj_in, excludes, exclude_unset, exclude_none, commit)


user_repo = UserRepo(User)
Expand Down
Empty file added src/libs/rabbitmq/__init__.py
Empty file.
45 changes: 45 additions & 0 deletions src/libs/rabbitmq/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import TYPE_CHECKING, ClassVar

if TYPE_CHECKING:
from httpx import AsyncClient, Response


class RabbitMQClient:
headers: ClassVar[dict[str, str]] = {"content-type": "application/json"}

def __init__(self, url: str, username: str, password: str) -> None:
self.url: str = url
self.auth: tuple[str, str] = (username, password)

async def create_vhost(self, client: "AsyncClient", vhost: str) -> "Response":
return await client.put(f"{self.url}/api/vhosts/{vhost}", auth=self.auth, headers=self.headers)

async def delete_vhost(self, client: "AsyncClient", vhost: str) -> "Response":
return await client.delete(f"{self.url}/api/vhosts/{vhost}", auth=self.auth, headers=self.headers)

async def get_vhosts(self, client: "AsyncClient") -> "Response":
return await client.get(f"{self.url}/api/vhosts", auth=self.auth, headers=self.headers)

async def get_users(self, client: "AsyncClient") -> "Response":
return await client.get(f"{self.url}/api/users", auth=self.auth, headers=self.headers)

async def create_user(self, client: "AsyncClient", username: str, password: str, tags: str = "") -> "Response":
return await client.put(
f"{self.url}/api/users/{username}",
auth=self.auth,
headers=self.headers,
json={"username": username, "password": password, "tags": tags},
)

async def delete_user(self, client: "AsyncClient", username: str) -> "Response":
return await client.delete(f"{self.url}/api/users/{username}", auth=self.auth, headers=self.headers)

async def create_vhost_permission(
self, client: "AsyncClient", vhost: str, username: str, configure: str, write: str, read: str
) -> "Response":
return await client.put(
f"{self.url}/api/permissions/{vhost}/{username}",
auth=self.auth,
headers=self.headers,
json={"configure": configure, "write": write, "read": read, "username": username, "vhost": vhost},
)
61 changes: 61 additions & 0 deletions src/libs/rabbitmq/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json
import logging
from dataclasses import dataclass

from aio_pika import Message, connect_robust
from aio_pika.abc import AbstractRobustChannel, AbstractRobustConnection

logger = logging.getLogger(__name__)


@dataclass
class RabbitMQSession:
connection: AbstractRobustConnection | None = None
channel: AbstractRobustChannel | None = None

@property
async def _is_open(self) -> bool:
"""Check if the session is open."""
return (
self.connection is not None
and not self.connection.is_closed
and self.channel is not None
and not self.channel.is_closed
)

async def _close(self) -> None:
if self.channel and not self.channel.is_closed:
await self.channel.close()
if self.connection and not self.connection.is_closed:
await self.connection.close()

self.connection = None
self.channel = None

async def connect(self, ampq_url: str, publisher_confirms: bool) -> None:
if self._is_open:
return
try:
self.connection = await connect_robust(url=ampq_url)
self.channel = await self.connection.channel(publisher_confirms=publisher_confirms) # type: ignore # noqa: PGH003
logger.info("Connected to RabbitMQ successfully.")
except Exception:
await self._close()
logger.exception("Failed to connect to RabbitMQ.")
raise

async def disconnect(self) -> None:
if not self._is_open:
return
await self._close()

async def publish(self, message: Message | dict, routing_key: str, expiration: int | None = None) -> None:
if not self.channel or self.channel.is_closed:
raise RuntimeError("RabbitMQ channel is not open.")
if not isinstance(message, Message):
message = Message(body=json.dumps(message, default=str).encode(), expiration=expiration)
async with self.channel.transaction():
await self.channel.default_exchange.publish(message, routing_key)


mq = RabbitMQSession()

0 comments on commit 7f5cda2

Please sign in to comment.