From f1855898385dbdde29b4b4a30ee59ee3e24aba20 Mon Sep 17 00:00:00 2001 From: Vincent Michel Date: Wed, 12 Feb 2025 15:40:02 +0100 Subject: [PATCH] [Backport] Use send_signal to send events in postgresql server implementation (#9677) * Use send_signal to send events in postgresql server implementation * Address @touilleMan's comments --- server/parsec/components/events.py | 2 +- server/parsec/components/invite.py | 4 +- server/parsec/components/memory/events.py | 4 +- server/parsec/components/memory/invite.py | 11 ++-- server/parsec/components/postgresql/events.py | 62 +++++++++---------- .../parsec/components/postgresql/factory.py | 14 ++--- server/parsec/components/postgresql/invite.py | 51 +++++++-------- .../components/postgresql/organization.py | 4 -- .../postgresql/organization_update.py | 7 +-- server/parsec/components/postgresql/realm.py | 9 +-- .../components/postgresql/realm_create.py | 8 +-- .../components/postgresql/realm_rename.py | 8 +-- .../components/postgresql/realm_rotate_key.py | 8 +-- .../components/postgresql/realm_share.py | 8 +-- .../components/postgresql/realm_unshare.py | 8 +-- .../parsec/components/postgresql/sequester.py | 5 +- .../postgresql/sequester_create_service.py | 7 +-- .../postgresql/sequester_revoke_service.py | 11 ++-- server/parsec/components/postgresql/user.py | 12 +--- .../postgresql/user_create_device.py | 8 +-- .../components/postgresql/user_create_user.py | 8 +-- .../components/postgresql/user_freeze_user.py | 13 ++-- .../components/postgresql/user_revoke_user.py | 13 ++-- .../components/postgresql/user_update_user.py | 13 ++-- server/parsec/components/postgresql/vlob.py | 5 -- .../components/postgresql/vlob_create.py | 5 +- .../components/postgresql/vlob_update.py | 8 +-- .../authenticated/test_events_listen.py | 24 +++---- server/tests/test_cross_server_event.py | 2 +- server/tests/test_sse.py | 6 +- 30 files changed, 153 insertions(+), 195 deletions(-) diff --git a/server/parsec/components/events.py b/server/parsec/components/events.py index 22dc04c5095..f01919a049e 100644 --- a/server/parsec/components/events.py +++ b/server/parsec/components/events.py @@ -220,7 +220,7 @@ def spy(self) -> Iterator[EventBusSpy]: async def send(self, event: Event) -> None: raise NotImplementedError - def send_nowait(self, event: Event) -> None: + async def test_send(self, event: Event) -> None: raise NotImplementedError diff --git a/server/parsec/components/invite.py b/server/parsec/components/invite.py index d94f611b90f..f59fb494cb0 100644 --- a/server/parsec/components/invite.py +++ b/server/parsec/components/invite.py @@ -36,7 +36,6 @@ ) from parsec.api import api from parsec.client_context import AuthenticatedClientContext, InvitedClientContext -from parsec.components.events import EventBus from parsec.config import BackendConfig, EmailConfig, MockedEmailConfig, SmtpEmailConfig from parsec.logging import get_logger from parsec.templates import get_template @@ -592,8 +591,7 @@ def process_claimer_step( class BaseInviteComponent: - def __init__(self, event_bus: EventBus, config: BackendConfig): - self._event_bus = event_bus + def __init__(self, config: BackendConfig): self._config = config # Used by `new_for_user` implementations diff --git a/server/parsec/components/memory/events.py b/server/parsec/components/memory/events.py index df2bda15c07..ef4932f6d2e 100644 --- a/server/parsec/components/memory/events.py +++ b/server/parsec/components/memory/events.py @@ -27,8 +27,8 @@ async def send(self, event: Event) -> None: await self._send_events_channel.send(event) @override - def send_nowait(self, event: Event) -> None: - self._send_events_channel.send_nowait(event) + async def test_send(self, event: Event) -> None: + await self.send(event) @asynccontextmanager diff --git a/server/parsec/components/memory/invite.py b/server/parsec/components/memory/invite.py index d5294985d59..2f28650744e 100644 --- a/server/parsec/components/memory/invite.py +++ b/server/parsec/components/memory/invite.py @@ -2,7 +2,7 @@ from __future__ import annotations from collections.abc import Buffer -from typing import Any, override +from typing import override from parsec._parsec import ( CancelledGreetingAttemptReason, @@ -16,6 +16,7 @@ UserID, UserProfile, ) +from parsec.components.events import EventBus from parsec.components.invite import ( BaseInviteComponent, DeviceInvitation, @@ -52,6 +53,7 @@ MemoryOrganization, MemoryUser, ) +from parsec.config import BackendConfig from parsec.events import ( EventGreetingAttemptCancelled, EventGreetingAttemptJoined, @@ -76,10 +78,11 @@ class MemoryInviteComponent(BaseInviteComponent): def __init__( self, data: MemoryDatamodel, - *args: Any, - **kwargs: Any, + event_bus: EventBus, + config: BackendConfig, ) -> None: - super().__init__(*args, **kwargs) + super().__init__(config) + self._event_bus = event_bus self._data = data def _get_shamir_recovery_invitation( diff --git a/server/parsec/components/postgresql/events.py b/server/parsec/components/postgresql/events.py index ab15685157f..7eef4d7d659 100644 --- a/server/parsec/components/postgresql/events.py +++ b/server/parsec/components/postgresql/events.py @@ -1,11 +1,9 @@ # Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS -import math from contextlib import asynccontextmanager -from typing import AsyncIterator, override +from typing import AsyncIterator, cast, override import anyio -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from parsec._parsec import ( ActiveUsersLimit, @@ -29,44 +27,35 @@ class PGEventBus(EventBus): - def __init__(self, send_events_channel: MemoryObjectSendStream[Event]): - super().__init__() - self._send_events_channel = send_events_channel + """The `EventBus.send` method is not implemented for the PostgreSQL event bus. - @override - async def send(self, event: Event) -> None: - await self._send_events_channel.send(event) + Events are typically sent after a change in the database, meaning that there is + always an open transaction to commit. For better concurrency handling, it makes + more sense to send the event as part of the transaction, i.e using: + + await send_signal(conn, event) + + instead of using another connection such as the notification connection used + in `test_send` for testing purposes. + """ + + def __init__(self, conn: AsyncpgConnection): + super().__init__() + self._conn = conn @override - def send_nowait(self, event: Event) -> None: - self._send_events_channel.send_nowait(event) + async def test_send(self, event: Event) -> None: + await send_signal(self._conn, event) @asynccontextmanager async def event_bus_factory(pool: AsyncpgPool) -> AsyncIterator[PGEventBus]: - # TODO: add typing once use anyio>=4 (currently incompatible with fastapi) - send_events_channel, receive_events_channel = anyio.create_memory_object_stream(math.inf) - receive_events_channel: MemoryObjectReceiveStream[Event] - - event_bus = PGEventBus(send_events_channel) _connection_lost = False def _on_notification_conn_termination(conn: object) -> None: nonlocal _connection_lost _connection_lost = True - task_group.cancel_scope.cancel() - - async def _pump_events(notification_conn: AsyncpgConnection) -> None: - async for event in receive_events_channel: - logger.info_with_debug_extra( - "Received internal event", - type=event.type, - event_id=event.event_id.hex, - organization_id=event.organization_id.str, - debug_extra=event.model_dump(), - ) - - await send_signal(notification_conn, event) + cancel_scope.cancel() def _on_notification(conn: object, pid: int, channel: str, payload: object) -> None: assert isinstance(payload, str) @@ -77,22 +66,29 @@ def _on_notification(conn: object, pid: int, channel: str, payload: object) -> N "Invalid notif received", pid=pid, channel=channel, payload=payload, exc_info=exc ) return + logger.info_with_debug_extra( + "Dispatching event", + type=event.type, + event_id=event.event_id.hex, + organization_id=event.organization_id.str, + debug_extra=event.model_dump(), + ) event_bus._dispatch_incoming_event(event) try: async with pool.acquire() as notification_conn: - async with anyio.create_task_group() as task_group: + conn = cast(AsyncpgConnection, notification_conn) + event_bus = PGEventBus(conn) + + with anyio.CancelScope() as cancel_scope: notification_conn.add_termination_listener(_on_notification_conn_termination) await notification_conn.add_listener("app_notification", _on_notification) try: - task_group.start_soon(_pump_events, notification_conn) yield event_bus finally: await notification_conn.remove_listener("app_notification", _on_notification) - task_group.cancel_scope.cancel() - finally: if _connection_lost: raise ConnectionError("PostgreSQL notification query has been lost") diff --git a/server/parsec/components/postgresql/factory.py b/server/parsec/components/postgresql/factory.py index 25e3dad0285..d2e4cf33345 100644 --- a/server/parsec/components/postgresql/factory.py +++ b/server/parsec/components/postgresql/factory.py @@ -44,14 +44,12 @@ async def components_factory( webhooks = WebhooksComponent(config, http_client) events = PGEventsComponent(pool=pool, config=config, event_bus=event_bus) ping = PGPingComponent(pool=pool) - organization = PGOrganizationComponent( - pool=pool, webhooks=webhooks, config=config, event_bus=event_bus - ) + organization = PGOrganizationComponent(pool=pool, webhooks=webhooks, config=config) auth = PGAuthComponent(pool=pool, event_bus=event_bus, config=config) - invite = PGInviteComponent(pool=pool, event_bus=event_bus, config=config) - user = PGUserComponent(pool=pool, event_bus=event_bus) - vlob = PGVlobComponent(pool=pool, event_bus=event_bus, webhooks=webhooks) - realm = PGRealmComponent(pool=pool, event_bus=event_bus, webhooks=webhooks) + invite = PGInviteComponent(pool=pool, config=config) + user = PGUserComponent(pool=pool) + vlob = PGVlobComponent(pool=pool, webhooks=webhooks) + realm = PGRealmComponent(pool=pool, webhooks=webhooks) blockstore = blockstore_factory( config=config.blockstore_config, postgresql_pool=pool ) @@ -59,7 +57,7 @@ async def components_factory( shamir = PGShamirComponent(pool=pool) pki = None - sequester = PGSequesterComponent(pool=pool, event_bus=event_bus) + sequester = PGSequesterComponent(pool=pool) components = { "event_bus": event_bus, diff --git a/server/parsec/components/postgresql/invite.py b/server/parsec/components/postgresql/invite.py index ef79e85998e..5e3451002e2 100644 --- a/server/parsec/components/postgresql/invite.py +++ b/server/parsec/components/postgresql/invite.py @@ -23,7 +23,6 @@ UserProfile, invited_cmds, ) -from parsec.components.events import EventBus from parsec.components.invite import ( BaseInviteComponent, DeviceInvitation, @@ -55,6 +54,7 @@ ) from parsec.components.organization import Organization, OrganizationGetBadOutcome from parsec.components.postgresql import AsyncpgConnection, AsyncpgPool +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.organization import PGOrganizationComponent from parsec.components.postgresql.user import PGUserComponent, UserInfo from parsec.components.postgresql.utils import ( @@ -1001,7 +1001,6 @@ async def query_retrieve_active_human_by_email( async def _send_invitation_event( conn: AsyncpgConnection, - event_bus: EventBus, organization_id: OrganizationID, invitation_info: InvitationInfo, status: InvitationStatus, @@ -1010,7 +1009,6 @@ async def _send_invitation_event( case UserInvitationInfo(): return await _send_invitation_event_for_user( conn, - event_bus, organization_id, invitation_info.token, status, @@ -1018,7 +1016,6 @@ async def _send_invitation_event( case DeviceInvitationInfo(): return await _send_invitation_event_for_device( conn, - event_bus, organization_id, invitation_info.token, invitation_info.claimer_user_id, @@ -1027,7 +1024,6 @@ async def _send_invitation_event( case ShamirRecoveryInvitationInfo(): return await _send_invitation_event_for_shamir_recovery( conn, - event_bus, organization_id, invitation_info.token, invitation_info.shamir_recovery_setup_internal_id, @@ -1037,7 +1033,6 @@ async def _send_invitation_event( async def _send_invitation_event_for_user( conn: AsyncpgConnection, - event_bus: EventBus, organization_id: OrganizationID, token: InvitationToken, status: InvitationStatus, @@ -1056,7 +1051,8 @@ async def _send_invitation_event_for_user( case unknown: assert False, repr(unknown) - await event_bus.send( + await send_signal( + conn, EventInvitation( organization_id=organization_id, token=token, @@ -1068,7 +1064,6 @@ async def _send_invitation_event_for_user( async def _send_invitation_event_for_device( conn: AsyncpgConnection, - event_bus: EventBus, organization_id: OrganizationID, token: InvitationToken, claimer_user_id: UserID, @@ -1076,7 +1071,8 @@ async def _send_invitation_event_for_device( ) -> None: # Only the corresponding user can greet a device invitation possible_greeters = {claimer_user_id} - await event_bus.send( + await send_signal( + conn, EventInvitation( organization_id=organization_id, token=token, @@ -1088,7 +1084,6 @@ async def _send_invitation_event_for_device( async def _send_invitation_event_for_shamir_recovery( conn: AsyncpgConnection, - event_bus: EventBus, organization_id: OrganizationID, token: InvitationToken, shamir_recovery_setup_internal_id: int, @@ -1110,7 +1105,8 @@ async def _send_invitation_event_for_shamir_recovery( case unknown: assert False, repr(unknown) - await event_bus.send( + await send_signal( + conn, EventInvitation( organization_id=organization_id, token=token, @@ -1131,7 +1127,6 @@ async def _do_new_invitation( created_on: DateTime, invitation_type: InvitationType, suggested_token: InvitationToken, - event_bus: EventBus, ) -> InvitationToken: match invitation_type: case InvitationType.USER: @@ -1181,7 +1176,6 @@ async def _do_new_invitation( case InvitationType.USER: await _send_invitation_event_for_user( conn, - event_bus, organization_id=organization_id, token=token, status=InvitationStatus.PENDING, @@ -1189,7 +1183,6 @@ async def _do_new_invitation( case InvitationType.DEVICE: await _send_invitation_event_for_device( conn, - event_bus, organization_id=organization_id, token=token, claimer_user_id=author_user_id, @@ -1199,7 +1192,6 @@ async def _do_new_invitation( assert shamir_recovery_setup is not None await _send_invitation_event_for_shamir_recovery( conn, - event_bus, organization_id=organization_id, token=token, shamir_recovery_setup_internal_id=shamir_recovery_setup, @@ -1226,8 +1218,8 @@ async def _human_handle_from_user_id( class PGInviteComponent(BaseInviteComponent): - def __init__(self, pool: AsyncpgPool, event_bus: EventBus, config: BackendConfig) -> None: - super().__init__(event_bus, config) + def __init__(self, pool: AsyncpgPool, config: BackendConfig) -> None: + super().__init__(config) self.pool = pool self.organization: PGOrganizationComponent @@ -1277,7 +1269,6 @@ async def new_for_user( suggested_token = force_token or InvitationToken.new() token = await _do_new_invitation( conn, - event_bus=self._event_bus, organization_id=organization_id, author_user_id=author_user_id, author_device_id=author, @@ -1340,7 +1331,6 @@ async def new_for_device( suggested_token = force_token or InvitationToken.new() token = await _do_new_invitation( conn, - event_bus=self._event_bus, organization_id=organization_id, author_user_id=author_user_id, author_device_id=author, @@ -1430,7 +1420,6 @@ async def new_for_shamir_recovery( suggested_token = force_token or InvitationToken.new() token = await _do_new_invitation( conn, - event_bus=self._event_bus, organization_id=organization_id, author_user_id=author_user_id, author_device_id=author, @@ -1588,7 +1577,6 @@ async def cancel( await _send_invitation_event( conn, - self._event_bus, organization_id=organization_id, invitation_info=invitation_info, status=InvitationStatus.PENDING, @@ -2318,13 +2306,14 @@ async def greeter_cancel_greeting_attempt( conn, greeting_attempt_info.internal_id, GreeterOrClaimer.GREETER, reason, now ) - await self._event_bus.send( + await send_signal( + conn, EventGreetingAttemptCancelled( organization_id=organization_id, token=invitation_info.token, greeter=greeting_attempt_info.greeter, greeting_attempt=greeting_attempt, - ) + ), ) @override @@ -2388,13 +2377,14 @@ async def claimer_cancel_greeting_attempt( conn, greeting_attempt_info.internal_id, GreeterOrClaimer.CLAIMER, reason, now ) - await self._event_bus.send( + await send_signal( + conn, EventGreetingAttemptCancelled( organization_id=organization_id, token=invitation_info.token, greeter=greeting_attempt_info.greeter, greeting_attempt=greeting_attempt, - ) + ), ) @override @@ -2464,13 +2454,14 @@ async def greeter_step( case Buffer() as data: # When completing the `WAIT_PEER` step, send a `GreetingAttemptJoined` event if step_index == 0: - await self._event_bus.send( + await send_signal( + conn, EventGreetingAttemptJoined( organization_id=org.organization_id, token=invitation_info.token, greeter=greeting_attempt_info.greeter, greeting_attempt=greeting_attempt, - ) + ), ) return data @@ -2542,13 +2533,14 @@ async def claimer_step( case self.StepOutcome.NOT_READY: # During the `WAIT_PEER` step, send a `GreetingAttemptReady` event to the greeter if step_index == 0: - await self._event_bus.send( + await send_signal( + conn, EventGreetingAttemptReady( organization_id=org.organization_id, token=token, greeter=greeting_attempt_info.greeter, greeting_attempt=greeting_attempt, - ) + ), ) return NotReady() case Buffer() as data: @@ -2618,7 +2610,6 @@ async def complete( await _send_invitation_event( conn, - event_bus=self._event_bus, organization_id=organization_id, invitation_info=invitation_info, status=InvitationStatus.FINISHED, diff --git a/server/parsec/components/postgresql/organization.py b/server/parsec/components/postgresql/organization.py index df9f52c9f59..8240c3d4f6e 100644 --- a/server/parsec/components/postgresql/organization.py +++ b/server/parsec/components/postgresql/organization.py @@ -16,7 +16,6 @@ VerifyKey, ) from parsec.ballpark import TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.organization import ( BaseOrganizationComponent, Organization, @@ -106,11 +105,9 @@ def __init__( pool: AsyncpgPool, webhooks: WebhooksComponent, config: BackendConfig, - event_bus: EventBus, ) -> None: super().__init__(webhooks, config) self.pool = pool - self.event_bus = event_bus self.user: PGUserComponent def register_components(self, user: PGUserComponent, **kwargs) -> None: @@ -325,7 +322,6 @@ async def update( tos: UnsetType | None | dict[TosLocale, TosUrl] = Unset, ) -> None | OrganizationUpdateBadOutcome: return await organization_update( - self.event_bus, conn, now, id, diff --git a/server/parsec/components/postgresql/organization_update.py b/server/parsec/components/postgresql/organization_update.py index 7519a66b402..9edd7a42930 100644 --- a/server/parsec/components/postgresql/organization_update.py +++ b/server/parsec/components/postgresql/organization_update.py @@ -8,13 +8,13 @@ DateTime, OrganizationID, ) -from parsec.components.events import EventBus from parsec.components.organization import ( OrganizationUpdateBadOutcome, TosLocale, TosUrl, ) from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.utils import ( Q, SqlQueryParam, @@ -57,7 +57,6 @@ def _q_update_factory( async def organization_update( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, id: OrganizationID, @@ -120,7 +119,7 @@ async def organization_update( # TODO: the event is triggered even if the orga was already expired, is this okay ? if now_is_expired: - await event_bus.send(EventOrganizationExpired(organization_id=id)) + await send_signal(conn, EventOrganizationExpired(organization_id=id)) if tos is not Unset: - await event_bus.send(EventOrganizationTosUpdated(organization_id=id)) + await send_signal(conn, EventOrganizationTosUpdated(organization_id=id)) diff --git a/server/parsec/components/postgresql/realm.py b/server/parsec/components/postgresql/realm.py index 393a7f6d202..89e149ef180 100644 --- a/server/parsec/components/postgresql/realm.py +++ b/server/parsec/components/postgresql/realm.py @@ -17,7 +17,6 @@ VlobID, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection, AsyncpgPool from parsec.components.postgresql.realm_create import realm_create from parsec.components.postgresql.realm_dump_realms_granted_roles import ( @@ -84,10 +83,9 @@ class PGRealmComponent(BaseRealmComponent): - def __init__(self, pool: AsyncpgPool, event_bus: EventBus, webhooks: WebhooksComponent): + def __init__(self, pool: AsyncpgPool, webhooks: WebhooksComponent): super().__init__(webhooks) self.pool = pool - self.event_bus = event_bus @override @transaction @@ -108,7 +106,6 @@ async def create( | RequireGreaterTimestamp ): return await realm_create( - self.event_bus, conn, now, organization_id, @@ -139,7 +136,6 @@ async def share( | RequireGreaterTimestamp ): return await realm_share( - self.event_bus, conn, now, organization_id, @@ -169,7 +165,6 @@ async def unshare( | RequireGreaterTimestamp ): return await realm_unshare( - self.event_bus, conn, now, organization_id, @@ -199,7 +194,6 @@ async def rename( | RequireGreaterTimestamp ): return await realm_rename( - self.event_bus, conn, now, organization_id, @@ -236,7 +230,6 @@ async def rotate_key( | RejectedBySequesterService ): return await realm_rotate_key( - self.event_bus, conn, now, organization_id, diff --git a/server/parsec/components/postgresql/realm_create.py b/server/parsec/components/postgresql/realm_create.py index 82effa83697..8b1f066ddfa 100644 --- a/server/parsec/components/postgresql/realm_create.py +++ b/server/parsec/components/postgresql/realm_create.py @@ -11,8 +11,8 @@ VerifyKey, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.queries import ( AuthAndLockCommonOnlyBadOutcome, AuthAndLockCommonOnlyData, @@ -115,7 +115,6 @@ async def realm_create( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -217,14 +216,15 @@ async def realm_create( # Send the corresponding event - await event_bus.send( + await send_signal( + conn, EventRealmCertificate( organization_id=organization_id, timestamp=certif.timestamp, realm_id=certif.realm_id, user_id=certif.user_id, role_removed=certif.role is None, - ) + ), ) return certif diff --git a/server/parsec/components/postgresql/realm_rename.py b/server/parsec/components/postgresql/realm_rename.py index d8cdb6e842c..69abc2e4265 100644 --- a/server/parsec/components/postgresql/realm_rename.py +++ b/server/parsec/components/postgresql/realm_rename.py @@ -10,8 +10,8 @@ VerifyKey, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.queries import ( AuthAndLockCommonOnlyBadOutcome, AuthAndLockCommonOnlyData, @@ -79,7 +79,6 @@ async def realm_rename( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -175,14 +174,15 @@ async def realm_rename( case unknown: assert False, unknown - await event_bus.send( + await send_signal( + conn, EventRealmCertificate( organization_id=organization_id, timestamp=certif.timestamp, realm_id=certif.realm_id, user_id=db_common_data.user_id, role_removed=False, - ) + ), ) return certif diff --git a/server/parsec/components/postgresql/realm_rotate_key.py b/server/parsec/components/postgresql/realm_rotate_key.py index b9ecd6b6f93..63d0a26b27c 100644 --- a/server/parsec/components/postgresql/realm_rotate_key.py +++ b/server/parsec/components/postgresql/realm_rotate_key.py @@ -12,8 +12,8 @@ VerifyKey, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.queries import ( AuthAndLockCommonOnlyBadOutcome, AuthAndLockCommonOnlyData, @@ -181,7 +181,6 @@ async def realm_rotate_key( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -381,14 +380,15 @@ def arg_gen(): arg_gen(), ) - await event_bus.send( + await send_signal( + conn, EventRealmCertificate( organization_id=organization_id, timestamp=certif.timestamp, realm_id=certif.realm_id, user_id=db_common.user_id, role_removed=False, - ) + ), ) return certif diff --git a/server/parsec/components/postgresql/realm_share.py b/server/parsec/components/postgresql/realm_share.py index 0c3f12a3ceb..b3615f2c3fd 100644 --- a/server/parsec/components/postgresql/realm_share.py +++ b/server/parsec/components/postgresql/realm_share.py @@ -11,8 +11,8 @@ VerifyKey, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.queries import ( AuthAndLockCommonOnlyBadOutcome, AuthAndLockCommonOnlyData, @@ -141,7 +141,6 @@ async def realm_share( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -337,14 +336,15 @@ async def realm_share( case unknown: assert False, unknown - await event_bus.send( + await send_signal( + conn, EventRealmCertificate( organization_id=organization_id, timestamp=certif.timestamp, realm_id=certif.realm_id, user_id=certif.user_id, role_removed=False, - ) + ), ) return certif diff --git a/server/parsec/components/postgresql/realm_unshare.py b/server/parsec/components/postgresql/realm_unshare.py index 4d51b44219c..7624a366188 100644 --- a/server/parsec/components/postgresql/realm_unshare.py +++ b/server/parsec/components/postgresql/realm_unshare.py @@ -10,8 +10,8 @@ VerifyKey, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.queries import ( AuthAndLockCommonOnlyBadOutcome, AuthAndLockCommonOnlyData, @@ -102,7 +102,6 @@ async def realm_unshare( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -247,14 +246,15 @@ async def realm_unshare( case unknown: assert False, unknown - await event_bus.send( + await send_signal( + conn, EventRealmCertificate( organization_id=organization_id, timestamp=certif.timestamp, realm_id=certif.realm_id, user_id=certif.user_id, role_removed=True, - ) + ), ) return certif diff --git a/server/parsec/components/postgresql/sequester.py b/server/parsec/components/postgresql/sequester.py index f796b4254d2..78107eec198 100644 --- a/server/parsec/components/postgresql/sequester.py +++ b/server/parsec/components/postgresql/sequester.py @@ -38,9 +38,8 @@ class PGSequesterComponent(BaseSequesterComponent): - def __init__(self, pool: AsyncpgPool, event_bus: EventBus): + def __init__(self, pool: AsyncpgPool): self.pool = pool - self.event_bus = event_bus @override @transaction @@ -58,7 +57,6 @@ async def create_service( | RequireGreaterTimestamp ): return await sequester_create_service( - self.event_bus, conn, now, organization_id, @@ -97,7 +95,6 @@ async def revoke_service( | RequireGreaterTimestamp ): return await sequester_revoke_service( - self.event_bus, conn, now, organization_id, diff --git a/server/parsec/components/postgresql/sequester_create_service.py b/server/parsec/components/postgresql/sequester_create_service.py index 1f7bf20c8cc..109bc78f71d 100644 --- a/server/parsec/components/postgresql/sequester_create_service.py +++ b/server/parsec/components/postgresql/sequester_create_service.py @@ -8,8 +8,8 @@ SequesterVerifyKeyDer, ) from parsec.ballpark import RequireGreaterTimestamp -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.utils import Q from parsec.components.sequester import ( SequesterCreateServiceStoreBadOutcome, @@ -95,7 +95,6 @@ async def sequester_create_service( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -185,8 +184,8 @@ async def sequester_create_service( case unknown: assert False, repr(unknown) - await event_bus.send( - EventSequesterCertificate(organization_id=organization_id, timestamp=certif.timestamp) + await send_signal( + conn, EventSequesterCertificate(organization_id=organization_id, timestamp=certif.timestamp) ) return certif diff --git a/server/parsec/components/postgresql/sequester_revoke_service.py b/server/parsec/components/postgresql/sequester_revoke_service.py index 28ba35f2c97..359f7848e2c 100644 --- a/server/parsec/components/postgresql/sequester_revoke_service.py +++ b/server/parsec/components/postgresql/sequester_revoke_service.py @@ -8,8 +8,8 @@ SequesterVerifyKeyDer, ) from parsec.ballpark import RequireGreaterTimestamp -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.utils import Q from parsec.components.sequester import ( SequesterRevokeServiceStoreBadOutcome, @@ -97,7 +97,6 @@ async def sequester_revoke_service( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -180,14 +179,14 @@ async def sequester_revoke_service( case unknown: assert False, repr(unknown) - await event_bus.send( - EventSequesterCertificate(organization_id=organization_id, timestamp=certif.timestamp) - ) - match row["topic_updated"]: case True: pass case unknown: assert False, repr(unknown) + await send_signal( + conn, EventSequesterCertificate(organization_id=organization_id, timestamp=certif.timestamp) + ) + return certif diff --git a/server/parsec/components/postgresql/user.py b/server/parsec/components/postgresql/user.py index a163fbbace4..6ea7e675a64 100644 --- a/server/parsec/components/postgresql/user.py +++ b/server/parsec/components/postgresql/user.py @@ -17,7 +17,6 @@ VlobID, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection, AsyncpgPool from parsec.components.postgresql.user_accept_tos import user_accept_tos from parsec.components.postgresql.user_create_device import user_create_device @@ -105,10 +104,9 @@ def _make_q_lock_common_topic(for_update: bool = False, for_share=False) -> Q: class PGUserComponent(BaseUserComponent): - def __init__(self, pool: AsyncpgPool, event_bus: EventBus) -> None: + def __init__(self, pool: AsyncpgPool) -> None: super().__init__() self.pool = pool - self.event_bus = event_bus async def _check_common_topic( self, conn: AsyncpgConnection, organization_id: OrganizationID @@ -185,7 +183,6 @@ async def create_user( | RequireGreaterTimestamp ): return await user_create_user( - self.event_bus, conn, now, organization_id, @@ -216,7 +213,6 @@ async def create_device( | RequireGreaterTimestamp ): return await user_create_device( - self.event_bus, conn, now, organization_id, @@ -244,7 +240,6 @@ async def update_user( | RequireGreaterTimestamp ): return await user_update_user( - self.event_bus, conn, now, organization_id, @@ -318,7 +313,6 @@ async def revoke_user( | RequireGreaterTimestamp ): return await user_revoke_user( - self.event_bus, conn, now, organization_id, @@ -337,9 +331,7 @@ async def freeze_user( user_email: str | None, frozen: bool, ) -> UserInfo | UserFreezeUserBadOutcome: - return await user_freeze_user( - self.event_bus, conn, organization_id, user_id, user_email, frozen - ) + return await user_freeze_user(conn, organization_id, user_id, user_email, frozen) @override @no_transaction diff --git a/server/parsec/components/postgresql/user_create_device.py b/server/parsec/components/postgresql/user_create_device.py index a2e497b7b1e..eda68113a72 100644 --- a/server/parsec/components/postgresql/user_create_device.py +++ b/server/parsec/components/postgresql/user_create_device.py @@ -9,8 +9,8 @@ VerifyKey, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.queries import ( AuthAndLockCommonOnlyBadOutcome, AuthAndLockCommonOnlyData, @@ -71,7 +71,6 @@ async def user_create_device( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -153,11 +152,12 @@ async def user_create_device( case unknown: assert False, unknown - await event_bus.send( + await send_signal( + conn, EventCommonCertificate( organization_id=organization_id, timestamp=certif.timestamp, - ) + ), ) return certif diff --git a/server/parsec/components/postgresql/user_create_user.py b/server/parsec/components/postgresql/user_create_user.py index 14fdc911944..1d1324757db 100644 --- a/server/parsec/components/postgresql/user_create_user.py +++ b/server/parsec/components/postgresql/user_create_user.py @@ -11,8 +11,8 @@ VerifyKey, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.queries import ( AuthAndLockCommonOnlyBadOutcome, AuthAndLockCommonOnlyData, @@ -159,7 +159,6 @@ async def user_create_user( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -278,8 +277,9 @@ async def user_create_user( case unknown: assert False, unknown - await event_bus.send( - EventCommonCertificate(organization_id=organization_id, timestamp=user_certif.timestamp) + await send_signal( + conn, + EventCommonCertificate(organization_id=organization_id, timestamp=user_certif.timestamp), ) return user_certif, device_certif diff --git a/server/parsec/components/postgresql/user_freeze_user.py b/server/parsec/components/postgresql/user_freeze_user.py index 09323e35d0e..30911728362 100644 --- a/server/parsec/components/postgresql/user_freeze_user.py +++ b/server/parsec/components/postgresql/user_freeze_user.py @@ -8,8 +8,8 @@ OrganizationID, UserID, ) -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.utils import ( Q, ) @@ -105,7 +105,6 @@ async def user_freeze_user( - event_bus: EventBus, conn: AsyncpgConnection, organization_id: OrganizationID, user_id: UserID | None, @@ -173,18 +172,20 @@ async def user_freeze_user( ) if info.frozen: - await event_bus.send( + await send_signal( + conn, EventUserRevokedOrFrozen( organization_id=organization_id, user_id=info.user_id, - ) + ), ) else: - await event_bus.send( + await send_signal( + conn, EventUserUnfrozen( organization_id=organization_id, user_id=info.user_id, - ) + ), ) return info diff --git a/server/parsec/components/postgresql/user_revoke_user.py b/server/parsec/components/postgresql/user_revoke_user.py index b60123a8873..bd82d5a9366 100644 --- a/server/parsec/components/postgresql/user_revoke_user.py +++ b/server/parsec/components/postgresql/user_revoke_user.py @@ -10,8 +10,8 @@ VerifyKey, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.queries import ( AuthAndLockCommonOnlyBadOutcome, AuthAndLockCommonOnlyData, @@ -124,7 +124,6 @@ async def user_revoke_user( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -256,17 +255,19 @@ async def user_revoke_user( case unknown: assert False, unknown - await event_bus.send( + await send_signal( + conn, EventCommonCertificate( organization_id=organization_id, timestamp=certif.timestamp, - ) + ), ) - await event_bus.send( + await send_signal( + conn, EventUserRevokedOrFrozen( organization_id=organization_id, user_id=certif.user_id, - ) + ), ) return certif diff --git a/server/parsec/components/postgresql/user_update_user.py b/server/parsec/components/postgresql/user_update_user.py index 955a376d0c0..9bae55177e8 100644 --- a/server/parsec/components/postgresql/user_update_user.py +++ b/server/parsec/components/postgresql/user_update_user.py @@ -10,8 +10,8 @@ VerifyKey, ) from parsec.ballpark import RequireGreaterTimestamp, TimestampOutOfBallpark -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.queries import ( AuthAndLockCommonOnlyBadOutcome, AuthAndLockCommonOnlyData, @@ -88,7 +88,6 @@ async def user_update_user( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -219,18 +218,20 @@ async def user_update_user( case unknown: assert False, unknown - await event_bus.send( + await send_signal( + conn, EventCommonCertificate( organization_id=organization_id, timestamp=certif.timestamp, - ) + ), ) - await event_bus.send( + await send_signal( + conn, EventUserUpdated( organization_id=organization_id, user_id=certif.user_id, new_profile=certif.new_profile, - ) + ), ) return certif diff --git a/server/parsec/components/postgresql/vlob.py b/server/parsec/components/postgresql/vlob.py index d3f6e723248..07b3d1a8da0 100644 --- a/server/parsec/components/postgresql/vlob.py +++ b/server/parsec/components/postgresql/vlob.py @@ -13,7 +13,6 @@ RequireGreaterTimestamp, TimestampOutOfBallpark, ) -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection, AsyncpgPool from parsec.components.postgresql.utils import ( Q, @@ -58,12 +57,10 @@ class PGVlobComponent(BaseVlobComponent): def __init__( self, pool: AsyncpgPool, - event_bus: EventBus, webhooks: WebhooksComponent, ): super().__init__(webhooks) self.pool = pool - self.event_bus = event_bus async def _get_vlob_info( self, conn: AsyncpgConnection, organization_id: OrganizationID, vlob_id: VlobID @@ -102,7 +99,6 @@ async def create( | SequesterServiceUnavailable ): return await vlob_create( - self.event_bus, conn, now, organization_id, @@ -139,7 +135,6 @@ async def update( | SequesterServiceUnavailable ): return await vlob_update( - self.event_bus, conn, now, organization_id, diff --git a/server/parsec/components/postgresql/vlob_create.py b/server/parsec/components/postgresql/vlob_create.py index 62e5a45f13e..3a320b3e4a4 100644 --- a/server/parsec/components/postgresql/vlob_create.py +++ b/server/parsec/components/postgresql/vlob_create.py @@ -12,8 +12,8 @@ TimestampOutOfBallpark, timestamps_in_the_ballpark, ) -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.utils import ( Q, RetryNeeded, @@ -178,7 +178,6 @@ async def vlob_create( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -366,4 +365,4 @@ async def vlob_create( last_common_certificate_timestamp=last_common_certificate_timestamp, last_realm_certificate_timestamp=last_realm_certificate_timestamp, ) - await event_bus.send(event) + await send_signal(conn, event) diff --git a/server/parsec/components/postgresql/vlob_update.py b/server/parsec/components/postgresql/vlob_update.py index 85960c18256..46bdd0c0c97 100644 --- a/server/parsec/components/postgresql/vlob_update.py +++ b/server/parsec/components/postgresql/vlob_update.py @@ -12,8 +12,8 @@ TimestampOutOfBallpark, timestamps_in_the_ballpark, ) -from parsec.components.events import EventBus from parsec.components.postgresql import AsyncpgConnection +from parsec.components.postgresql.events import send_signal from parsec.components.postgresql.utils import ( Q, RetryNeeded, @@ -128,7 +128,6 @@ async def vlob_update( - event_bus: EventBus, conn: AsyncpgConnection, now: DateTime, organization_id: OrganizationID, @@ -320,7 +319,8 @@ async def vlob_update( case unknown: assert False, repr(unknown) - await event_bus.send( + await send_signal( + conn, EventVlob( organization_id=organization_id, author=author, @@ -331,5 +331,5 @@ async def vlob_update( blob=blob if len(blob) < EVENT_VLOB_MAX_BLOB_SIZE else None, last_common_certificate_timestamp=last_common_certificate_timestamp, last_realm_certificate_timestamp=last_realm_certificate_timestamp, - ) + ), ) diff --git a/server/tests/api_v5/authenticated/test_events_listen.py b/server/tests/api_v5/authenticated/test_events_listen.py index cf53aa7b331..668d1c9d410 100644 --- a/server/tests/api_v5/authenticated/test_events_listen.py +++ b/server/tests/api_v5/authenticated/test_events_listen.py @@ -163,7 +163,7 @@ async def test_authenticated_events_listen_ok( else: event = gen_event(organization_id=coolorg.organization_id) - await backend.event_bus.send(event) + await backend.event_bus.test_send(event) event = await alice_sse.next_event() assert event == authenticated_cmds.latest.events_listen.RepOk(expected) @@ -292,7 +292,7 @@ async def test_self_vlob_events_skipped( ) async def send_vlob_event(author: DeviceID, vlob_id: VlobID): - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventVlob( organization_id=coolorg.organization_id, author=author, @@ -338,17 +338,17 @@ async def test_self_certificate_events_provided( ) ) - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventCommonCertificate( organization_id=minimalorg.organization_id, timestamp=TIMESTAMP ) ) - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventSequesterCertificate( organization_id=minimalorg.organization_id, timestamp=TIMESTAMP ) ) - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventShamirRecoveryCertificate( organization_id=minimalorg.organization_id, timestamp=TIMESTAMP, @@ -360,7 +360,7 @@ async def test_self_certificate_events_provided( ), ) ) - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventRealmCertificate( organization_id=minimalorg.organization_id, timestamp=TIMESTAMP, @@ -409,7 +409,7 @@ async def test_receive_event_of_newly_shared_realm( ) async def send_vlob_event(org: OrganizationID, version: int): - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventVlob( organization_id=org, author=OTHER_DEVICE_ID, @@ -429,7 +429,7 @@ async def send_vlob_event(org: OrganizationID, version: int): for org in orgs: # 1) Share for other user, should be ignored - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventRealmCertificate( organization_id=org, timestamp=TIMESTAMP, @@ -442,7 +442,7 @@ async def send_vlob_event(org: OrganizationID, version: int): # 2) Share for Alice, now events should be received - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventRealmCertificate( organization_id=org, timestamp=TIMESTAMP, @@ -455,7 +455,7 @@ async def send_vlob_event(org: OrganizationID, version: int): # 3) Unshare for other user, events should still be received - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventRealmCertificate( organization_id=org, timestamp=TIMESTAMP, @@ -468,7 +468,7 @@ async def send_vlob_event(org: OrganizationID, version: int): # 4) Unshare for Alice, now events should no longer be received - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventRealmCertificate( organization_id=org, timestamp=TIMESTAMP, @@ -482,7 +482,7 @@ async def send_vlob_event(org: OrganizationID, version: int): # 5) Event always received for Alice, once we have received this event # we know the previous one has been ignored as expected - await backend.event_bus.send( + await backend.event_bus.test_send( events.EventCommonCertificate( organization_id=org, timestamp=TIMESTAMP, diff --git a/server/tests/test_cross_server_event.py b/server/tests/test_cross_server_event.py index bdbe6012895..22ab54aa2fd 100644 --- a/server/tests/test_cross_server_event.py +++ b/server/tests/test_cross_server_event.py @@ -24,6 +24,6 @@ def on_b2_receive_event(event): event = EventPinged(organization_id=OrganizationID("Org"), ping="hello") b2.event_bus.connect(on_b2_receive_event) - await b1.event_bus.send(event) + await b1.event_bus.test_send(event) b2_event = await b2_received_events.get() assert b2_event == event diff --git a/server/tests/test_sse.py b/server/tests/test_sse.py index 650b84eb517..746bbad3da2 100644 --- a/server/tests/test_sse.py +++ b/server/tests/test_sse.py @@ -114,7 +114,7 @@ async def test_close_on_backpressure(minimalorg: MinimalorgRpcClients, backend: mock_send_nowait = MagicMock(side_effect=anyio.WouldBlock) registered_clients[reg_client_id].channel_sender.send_nowait = mock_send_nowait - await backend.event_bus.send( + await backend.event_bus.test_send( EventPinged( organization_id=minimalorg.organization_id, ping="foo", @@ -242,7 +242,7 @@ async def test_keep_alive_real_server(minimalorg: MinimalorgRpcClients, app: Asg async def test_close_on_user_revoked(coolorg: CoolorgRpcClients, backend: Backend) -> None: async def send_ping(ping: str) -> None: - await backend.event_bus.send( + await backend.event_bus.test_send( EventPinged( organization_id=coolorg.organization_id, ping=ping, @@ -295,7 +295,7 @@ async def test_close_on_organization_tos_updated( coolorg: CoolorgRpcClients, backend: Backend ) -> None: async def send_ping(ping: str) -> None: - await backend.event_bus.send( + await backend.event_bus.test_send( EventPinged( organization_id=coolorg.organization_id, ping=ping,