Skip to content

Commit

Permalink
[Backport] Use send_signal to send events in postgresql server implem…
Browse files Browse the repository at this point in the history
…entation (#9677)

* Use send_signal to send events in postgresql server implementation

* Address @touilleMan's comments
  • Loading branch information
vxgmichel authored Feb 12, 2025
1 parent 96e980f commit f185589
Show file tree
Hide file tree
Showing 30 changed files with 153 additions and 195 deletions.
2 changes: 1 addition & 1 deletion server/parsec/components/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def spy(self) -> Iterator[EventBusSpy]:
async def send(self, event: Event) -> None:
raise NotImplementedError

def send_nowait(self, event: Event) -> None:
async def test_send(self, event: Event) -> None:
raise NotImplementedError


Expand Down
4 changes: 1 addition & 3 deletions server/parsec/components/invite.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
)
from parsec.api import api
from parsec.client_context import AuthenticatedClientContext, InvitedClientContext
from parsec.components.events import EventBus
from parsec.config import BackendConfig, EmailConfig, MockedEmailConfig, SmtpEmailConfig
from parsec.logging import get_logger
from parsec.templates import get_template
Expand Down Expand Up @@ -592,8 +591,7 @@ def process_claimer_step(


class BaseInviteComponent:
def __init__(self, event_bus: EventBus, config: BackendConfig):
self._event_bus = event_bus
def __init__(self, config: BackendConfig):
self._config = config

# Used by `new_for_user` implementations
Expand Down
4 changes: 2 additions & 2 deletions server/parsec/components/memory/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ async def send(self, event: Event) -> None:
await self._send_events_channel.send(event)

@override
def send_nowait(self, event: Event) -> None:
self._send_events_channel.send_nowait(event)
async def test_send(self, event: Event) -> None:
await self.send(event)


@asynccontextmanager
Expand Down
11 changes: 7 additions & 4 deletions server/parsec/components/memory/invite.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from __future__ import annotations

from collections.abc import Buffer
from typing import Any, override
from typing import override

from parsec._parsec import (
CancelledGreetingAttemptReason,
Expand All @@ -16,6 +16,7 @@
UserID,
UserProfile,
)
from parsec.components.events import EventBus
from parsec.components.invite import (
BaseInviteComponent,
DeviceInvitation,
Expand Down Expand Up @@ -52,6 +53,7 @@
MemoryOrganization,
MemoryUser,
)
from parsec.config import BackendConfig
from parsec.events import (
EventGreetingAttemptCancelled,
EventGreetingAttemptJoined,
Expand All @@ -76,10 +78,11 @@ class MemoryInviteComponent(BaseInviteComponent):
def __init__(
self,
data: MemoryDatamodel,
*args: Any,
**kwargs: Any,
event_bus: EventBus,
config: BackendConfig,
) -> None:
super().__init__(*args, **kwargs)
super().__init__(config)
self._event_bus = event_bus
self._data = data

def _get_shamir_recovery_invitation(
Expand Down
62 changes: 29 additions & 33 deletions server/parsec/components/postgresql/events.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
# Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS

import math
from contextlib import asynccontextmanager
from typing import AsyncIterator, override
from typing import AsyncIterator, cast, override

import anyio
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

from parsec._parsec import (
ActiveUsersLimit,
Expand All @@ -29,44 +27,35 @@


class PGEventBus(EventBus):
def __init__(self, send_events_channel: MemoryObjectSendStream[Event]):
super().__init__()
self._send_events_channel = send_events_channel
"""The `EventBus.send` method is not implemented for the PostgreSQL event bus.
@override
async def send(self, event: Event) -> None:
await self._send_events_channel.send(event)
Events are typically sent after a change in the database, meaning that there is
always an open transaction to commit. For better concurrency handling, it makes
more sense to send the event as part of the transaction, i.e using:
await send_signal(conn, event)
instead of using another connection such as the notification connection used
in `test_send` for testing purposes.
"""

def __init__(self, conn: AsyncpgConnection):
super().__init__()
self._conn = conn

@override
def send_nowait(self, event: Event) -> None:
self._send_events_channel.send_nowait(event)
async def test_send(self, event: Event) -> None:
await send_signal(self._conn, event)


@asynccontextmanager
async def event_bus_factory(pool: AsyncpgPool) -> AsyncIterator[PGEventBus]:
# TODO: add typing once use anyio>=4 (currently incompatible with fastapi)
send_events_channel, receive_events_channel = anyio.create_memory_object_stream(math.inf)
receive_events_channel: MemoryObjectReceiveStream[Event]

event_bus = PGEventBus(send_events_channel)
_connection_lost = False

def _on_notification_conn_termination(conn: object) -> None:
nonlocal _connection_lost
_connection_lost = True
task_group.cancel_scope.cancel()

async def _pump_events(notification_conn: AsyncpgConnection) -> None:
async for event in receive_events_channel:
logger.info_with_debug_extra(
"Received internal event",
type=event.type,
event_id=event.event_id.hex,
organization_id=event.organization_id.str,
debug_extra=event.model_dump(),
)

await send_signal(notification_conn, event)
cancel_scope.cancel()

def _on_notification(conn: object, pid: int, channel: str, payload: object) -> None:
assert isinstance(payload, str)
Expand All @@ -77,22 +66,29 @@ def _on_notification(conn: object, pid: int, channel: str, payload: object) -> N
"Invalid notif received", pid=pid, channel=channel, payload=payload, exc_info=exc
)
return
logger.info_with_debug_extra(
"Dispatching event",
type=event.type,
event_id=event.event_id.hex,
organization_id=event.organization_id.str,
debug_extra=event.model_dump(),
)
event_bus._dispatch_incoming_event(event)

try:
async with pool.acquire() as notification_conn:
async with anyio.create_task_group() as task_group:
conn = cast(AsyncpgConnection, notification_conn)
event_bus = PGEventBus(conn)

with anyio.CancelScope() as cancel_scope:
notification_conn.add_termination_listener(_on_notification_conn_termination)

await notification_conn.add_listener("app_notification", _on_notification)
try:
task_group.start_soon(_pump_events, notification_conn)
yield event_bus
finally:
await notification_conn.remove_listener("app_notification", _on_notification)

task_group.cancel_scope.cancel()

finally:
if _connection_lost:
raise ConnectionError("PostgreSQL notification query has been lost")
Expand Down
14 changes: 6 additions & 8 deletions server/parsec/components/postgresql/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,20 @@ async def components_factory(
webhooks = WebhooksComponent(config, http_client)
events = PGEventsComponent(pool=pool, config=config, event_bus=event_bus)
ping = PGPingComponent(pool=pool)
organization = PGOrganizationComponent(
pool=pool, webhooks=webhooks, config=config, event_bus=event_bus
)
organization = PGOrganizationComponent(pool=pool, webhooks=webhooks, config=config)
auth = PGAuthComponent(pool=pool, event_bus=event_bus, config=config)
invite = PGInviteComponent(pool=pool, event_bus=event_bus, config=config)
user = PGUserComponent(pool=pool, event_bus=event_bus)
vlob = PGVlobComponent(pool=pool, event_bus=event_bus, webhooks=webhooks)
realm = PGRealmComponent(pool=pool, event_bus=event_bus, webhooks=webhooks)
invite = PGInviteComponent(pool=pool, config=config)
user = PGUserComponent(pool=pool)
vlob = PGVlobComponent(pool=pool, webhooks=webhooks)
realm = PGRealmComponent(pool=pool, webhooks=webhooks)
blockstore = blockstore_factory(
config=config.blockstore_config, postgresql_pool=pool
)
block = PGBlockComponent(pool=pool, blockstore=blockstore)
shamir = PGShamirComponent(pool=pool)

pki = None
sequester = PGSequesterComponent(pool=pool, event_bus=event_bus)
sequester = PGSequesterComponent(pool=pool)

components = {
"event_bus": event_bus,
Expand Down
Loading

0 comments on commit f185589

Please sign in to comment.