Skip to content

Commit

Permalink
Release 5.12.2
Browse files Browse the repository at this point in the history
### Changelog:
- Fix(backend): Centrifugo freezing in sync contexts.
  • Loading branch information
onegreyonewhite committed Dec 31, 2024
1 parent f177cfb commit 37c3bcc
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 25 deletions.
10 changes: 8 additions & 2 deletions test_src/test_proj/test_notificator.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
from cent import AsyncClient as CentrifugoClient, CentRequest
from cent import AsyncClient as AsyncCentrifugoClient, Client as SyncCentrifugoClient, CentRequest
from vstutils.models.cent_notify import Notificator


messages_log: list[CentRequest] = []


class Client(CentrifugoClient):
class Client(AsyncCentrifugoClient):
async def _send(self, request: CentRequest, *args, **kwargs):
messages_log.append(request)


class SyncClient(SyncCentrifugoClient):
def _send(self, request: CentRequest, *args, **kwargs):
messages_log.append(request)


class DummyNotificator(Notificator):
client_class = Client
sync_client_class = SyncClient

def is_usable(self):
return True
2 changes: 1 addition & 1 deletion vstutils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# pylint: disable=django-not-available
__version__: str = '5.12.1'
__version__: str = '5.12.2'
66 changes: 47 additions & 19 deletions vstutils/models/cent_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.db.models import signals
from django.conf import settings
from django.contrib.auth import get_user_model
from cent import AsyncClient as CentrifugoClient, BatchRequest, PublishRequest
from cent import AsyncClient, Client, BatchRequest, PublishRequest

from .base import get_proxy_labels
from ..utils import raise_context_decorator_with_default
Expand All @@ -20,7 +20,8 @@

class Notificator:
__slots__ = ('queue', 'cent_client', 'label', '_signals', '__weakref__')
client_class = CentrifugoClient
client_class = AsyncClient
sync_client_class = Client
_json_renderer = ORJSONRenderer()

def __init__(self, queue=None, client=None, label=None, autoconnect=True):
Expand Down Expand Up @@ -53,14 +54,16 @@ def get_openapi_secret(self):
return settings.CENTRIFUGO_CLIENT_KWARGS.get('token_hmac_secret_key', '')

@raise_context_decorator_with_default(verbose=False)
def get_client(self):
def get_client(self, asynchronous=True):
centrifugo_client_kwargs = {
'api_url': settings.CENTRIFUGO_CLIENT_KWARGS['address'],
'api_key': settings.CENTRIFUGO_CLIENT_KWARGS['api_key'],
'timeout': settings.CENTRIFUGO_CLIENT_KWARGS.get('timeout'),
}
logger.debug(f"Getting Centrifugo client with kwargs: {centrifugo_client_kwargs}")
return self.client_class(**centrifugo_client_kwargs)
if asynchronous:
return self.client_class(**centrifugo_client_kwargs)
return self.sync_client_class(**centrifugo_client_kwargs)

def create_notification_from_instance(self, instance): # pylint: disable=invalid-name
if not self.is_usable():
Expand All @@ -83,33 +86,58 @@ def create_notification(self, labels, data):
def clear_messages(self):
self.queue.clear()

def _prep_messages(self, objects, provided_label):
publish_requests: list[PublishRequest] = []
sent_channels = set()

for obj_labels, data in objects:
with contextlib.suppress(Exception):
for obj_label in obj_labels:
channel = self.get_subscription_channel(provided_label or obj_label)
publish_requests.append(PublishRequest(
channel=channel,
data=data,
))
sent_channels.add(channel)

return publish_requests, sent_channels

def send(self):
with contextlib.suppress(Exception):
return async_to_sync(self.asend)()
self.queue, objects = [], tuple(self.queue)

if objects:
cent_client = self.get_client(asynchronous=False)
if not cent_client:
self.queue.extend(objects)
async_to_sync(self.asend)()
return

publish_requests: list[PublishRequest]
sent_channels: set[str]

publish_requests, sent_channels = self._prep_messages(objects, self.label)

if publish_requests:
logger.debug(f'Send notifications about {len(objects)} updates to channel(s) {sent_channels}.')
return cent_client.batch(
BatchRequest(requests=publish_requests),
)

async def asend(self):
try:
self.queue, objects = [], tuple(self.queue)

sent_channels = set()
provided_label = self.label

if objects and self.cent_client is None:
self.cent_client = self.get_client()
if not self.cent_client:
return

publish_requests: list[PublishRequest] = []

for obj_labels, data in objects:
with contextlib.suppress(Exception):
for obj_label in obj_labels:
channel = self.get_subscription_channel(provided_label or obj_label)
publish_requests.append(PublishRequest(
channel=channel,
data=data,
))
sent_channels.add(channel)
publish_requests: list[PublishRequest]
sent_channels: set[str]

publish_requests, sent_channels = self._prep_messages(objects, self.label)

if publish_requests:
logger.debug(f'Send notifications about {len(objects)} updates to channel(s) {sent_channels}.')
return await self.cent_client.batch(
Expand Down
8 changes: 5 additions & 3 deletions vstutils/models/cent_notify.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ from ..utils import raise_context_decorator_with_default as raise_context_decora
from .base import get_proxy_labels as get_proxy_labels
from .model import BaseModel as BaseModel
from django.db.models import signals, Model
from cent import AsyncClient
from cent import AsyncClient, Client

logger: logging.Logger


class Notificator:
client_class = AsyncClient
sync_client_class = Client
queue: _t.List[_t.Tuple[_t.Sequence[_t.Text], _t.Any]]
cent_client: AsyncClient
label: _t.Text
Expand All @@ -21,16 +22,17 @@ class Notificator:
def __init__(
self,
queue: _t.Optional[_t.List[_t.Tuple[_t.Sequence[_t.Text], _t.Any]]] = ...,
client: _t.Optional[AsyncClient] = ...,
client: _t.Optional[AsyncClient|Client] = ...,
label: _t.Optional[_t.Text] = ...,
autoconnect: bool = ...
) -> None: ...
def _prep_messages(self, list, str) -> _t.Tuple[list, set]: ...
def is_usable(self) -> bool: ...
def connect_signal(self, signal: signals.ModelSignal): ...
def disconnect_signal(self, signal: signals.ModelSignal): ...
def signal_handler(self, instance: Model, *args, **kwargs) -> None: ...
def get_openapi_secret(self) -> _t.Text: ...
def get_client(self) -> AsyncClient: ...
def get_client(self, asynchronous: bool = True) -> AsyncClient: ...
def create_notification_from_instance(self, instance) -> None: ...
def create_notification(self, labels, data) -> None: ...
def clear_messages(self) -> None: ...
Expand Down

0 comments on commit 37c3bcc

Please sign in to comment.