From 37c3bccc367d5b20b7774bb2f9b6454cc3079dbb Mon Sep 17 00:00:00 2001 From: Sergei Kliuikov Date: Mon, 30 Dec 2024 21:45:53 -0800 Subject: [PATCH] Release 5.12.2 ### Changelog: - Fix(backend): Centrifugo freezing in sync contexts. --- test_src/test_proj/test_notificator.py | 10 +++- vstutils/__init__.py | 2 +- vstutils/models/cent_notify.py | 66 ++++++++++++++++++-------- vstutils/models/cent_notify.pyi | 8 ++-- 4 files changed, 61 insertions(+), 25 deletions(-) diff --git a/test_src/test_proj/test_notificator.py b/test_src/test_proj/test_notificator.py index 7c8065f1..df4d3157 100644 --- a/test_src/test_proj/test_notificator.py +++ b/test_src/test_proj/test_notificator.py @@ -1,17 +1,23 @@ -from cent import AsyncClient as CentrifugoClient, CentRequest +from cent import AsyncClient as AsyncCentrifugoClient, Client as SyncCentrifugoClient, CentRequest from vstutils.models.cent_notify import Notificator messages_log: list[CentRequest] = [] -class Client(CentrifugoClient): +class Client(AsyncCentrifugoClient): async def _send(self, request: CentRequest, *args, **kwargs): messages_log.append(request) +class SyncClient(SyncCentrifugoClient): + def _send(self, request: CentRequest, *args, **kwargs): + messages_log.append(request) + + class DummyNotificator(Notificator): client_class = Client + sync_client_class = SyncClient def is_usable(self): return True diff --git a/vstutils/__init__.py b/vstutils/__init__.py index 72698ac0..543e7966 100644 --- a/vstutils/__init__.py +++ b/vstutils/__init__.py @@ -1,2 +1,2 @@ # pylint: disable=django-not-available -__version__: str = '5.12.1' +__version__: str = '5.12.2' diff --git a/vstutils/models/cent_notify.py b/vstutils/models/cent_notify.py index 4c2a8b01..3f5bcfe4 100644 --- a/vstutils/models/cent_notify.py +++ b/vstutils/models/cent_notify.py @@ -8,7 +8,7 @@ from django.db.models import signals from django.conf import settings from django.contrib.auth import get_user_model -from cent import AsyncClient as CentrifugoClient, BatchRequest, PublishRequest +from cent import AsyncClient, Client, BatchRequest, PublishRequest from .base import get_proxy_labels from ..utils import raise_context_decorator_with_default @@ -20,7 +20,8 @@ class Notificator: __slots__ = ('queue', 'cent_client', 'label', '_signals', '__weakref__') - client_class = CentrifugoClient + client_class = AsyncClient + sync_client_class = Client _json_renderer = ORJSONRenderer() def __init__(self, queue=None, client=None, label=None, autoconnect=True): @@ -53,14 +54,16 @@ def get_openapi_secret(self): return settings.CENTRIFUGO_CLIENT_KWARGS.get('token_hmac_secret_key', '') @raise_context_decorator_with_default(verbose=False) - def get_client(self): + def get_client(self, asynchronous=True): centrifugo_client_kwargs = { 'api_url': settings.CENTRIFUGO_CLIENT_KWARGS['address'], 'api_key': settings.CENTRIFUGO_CLIENT_KWARGS['api_key'], 'timeout': settings.CENTRIFUGO_CLIENT_KWARGS.get('timeout'), } logger.debug(f"Getting Centrifugo client with kwargs: {centrifugo_client_kwargs}") - return self.client_class(**centrifugo_client_kwargs) + if asynchronous: + return self.client_class(**centrifugo_client_kwargs) + return self.sync_client_class(**centrifugo_client_kwargs) def create_notification_from_instance(self, instance): # pylint: disable=invalid-name if not self.is_usable(): @@ -83,33 +86,58 @@ def create_notification(self, labels, data): def clear_messages(self): self.queue.clear() + def _prep_messages(self, objects, provided_label): + publish_requests: list[PublishRequest] = [] + sent_channels = set() + + for obj_labels, data in objects: + with contextlib.suppress(Exception): + for obj_label in obj_labels: + channel = self.get_subscription_channel(provided_label or obj_label) + publish_requests.append(PublishRequest( + channel=channel, + data=data, + )) + sent_channels.add(channel) + + return publish_requests, sent_channels + def send(self): with contextlib.suppress(Exception): - return async_to_sync(self.asend)() + self.queue, objects = [], tuple(self.queue) + + if objects: + cent_client = self.get_client(asynchronous=False) + if not cent_client: + self.queue.extend(objects) + async_to_sync(self.asend)() + return + + publish_requests: list[PublishRequest] + sent_channels: set[str] + + publish_requests, sent_channels = self._prep_messages(objects, self.label) + + if publish_requests: + logger.debug(f'Send notifications about {len(objects)} updates to channel(s) {sent_channels}.') + return cent_client.batch( + BatchRequest(requests=publish_requests), + ) async def asend(self): try: self.queue, objects = [], tuple(self.queue) - sent_channels = set() - provided_label = self.label - if objects and self.cent_client is None: self.cent_client = self.get_client() if not self.cent_client: return - publish_requests: list[PublishRequest] = [] - - for obj_labels, data in objects: - with contextlib.suppress(Exception): - for obj_label in obj_labels: - channel = self.get_subscription_channel(provided_label or obj_label) - publish_requests.append(PublishRequest( - channel=channel, - data=data, - )) - sent_channels.add(channel) + publish_requests: list[PublishRequest] + sent_channels: set[str] + + publish_requests, sent_channels = self._prep_messages(objects, self.label) + if publish_requests: logger.debug(f'Send notifications about {len(objects)} updates to channel(s) {sent_channels}.') return await self.cent_client.batch( diff --git a/vstutils/models/cent_notify.pyi b/vstutils/models/cent_notify.pyi index 5c1516b4..58c33035 100644 --- a/vstutils/models/cent_notify.pyi +++ b/vstutils/models/cent_notify.pyi @@ -6,13 +6,14 @@ from ..utils import raise_context_decorator_with_default as raise_context_decora from .base import get_proxy_labels as get_proxy_labels from .model import BaseModel as BaseModel from django.db.models import signals, Model -from cent import AsyncClient +from cent import AsyncClient, Client logger: logging.Logger class Notificator: client_class = AsyncClient + sync_client_class = Client queue: _t.List[_t.Tuple[_t.Sequence[_t.Text], _t.Any]] cent_client: AsyncClient label: _t.Text @@ -21,16 +22,17 @@ class Notificator: def __init__( self, queue: _t.Optional[_t.List[_t.Tuple[_t.Sequence[_t.Text], _t.Any]]] = ..., - client: _t.Optional[AsyncClient] = ..., + client: _t.Optional[AsyncClient|Client] = ..., label: _t.Optional[_t.Text] = ..., autoconnect: bool = ... ) -> None: ... + def _prep_messages(self, list, str) -> _t.Tuple[list, set]: ... def is_usable(self) -> bool: ... def connect_signal(self, signal: signals.ModelSignal): ... def disconnect_signal(self, signal: signals.ModelSignal): ... def signal_handler(self, instance: Model, *args, **kwargs) -> None: ... def get_openapi_secret(self) -> _t.Text: ... - def get_client(self) -> AsyncClient: ... + def get_client(self, asynchronous: bool = True) -> AsyncClient: ... def create_notification_from_instance(self, instance) -> None: ... def create_notification(self, labels, data) -> None: ... def clear_messages(self) -> None: ...