Skip to content

Commit

Permalink
[Core] Call wait_for_alert in a thread
Browse files Browse the repository at this point in the history
This spawns a thread in alertmanager to call wait_for_alert in a thread.
This reduces latency to deluge responding to events.
And removes all `hasattr` checks in Component

Closes: deluge-torrent#221
  • Loading branch information
DjLegolas authored and doadin committed Sep 21, 2024
1 parent fc84b3f commit aae4348
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 10 deletions.
6 changes: 3 additions & 3 deletions deluge/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class Component:
paused by instructing the :class:`ComponentRegistry` to pause
this Component.
**pause()** - This method is called when the component is being paused.
**pause()** - This method is called when the component is being paused.
**resume()** - This method is called when the component resumes from a Paused
**resume()** - This method is called when the component resumes from a Paused
state.
**shutdown()** - This method is called when the client is exiting. If the
Expand All @@ -85,7 +85,7 @@ class Component:
**Stopped** - The Component has either been stopped or has yet to be started.
**Stopping** - The Component has had it's stop method called, but it hasn't
**Stopping** - The Component has had its stop method called, but it hasn't
fully stopped yet.
**Paused** - The Component has had its update timer stopped, but will
Expand Down
10 changes: 6 additions & 4 deletions deluge/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ def mock_callback():
The returned Mock instance will have a `deferred` attribute which will complete when the callback has been called.
"""

def reset():
def reset(timeout=0.5, *args, **kwargs):
if mock.called:
original_reset_mock()
deferred = Deferred()
deferred.addTimeout(0.5, reactor)
original_reset_mock(*args, **kwargs)
if mock.deferred:
mock.deferred.cancel()
deferred = Deferred(canceller=lambda x: deferred.callback(None))
deferred.addTimeout(timeout, reactor)
mock.side_effect = lambda *args, **kw: deferred.callback((args, kw))
mock.deferred = deferred

Expand Down
31 changes: 28 additions & 3 deletions deluge/core/alertmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
"""
import contextlib
import logging
import threading
from collections import defaultdict
from types import SimpleNamespace
from typing import Any, Callable

from twisted.internet import reactor
from twisted.internet import reactor, threads

import deluge.component as component
from deluge._libtorrent import lt
Expand All @@ -34,7 +35,7 @@ class AlertManager(component.Component):

def __init__(self):
log.debug('AlertManager init...')
component.Component.__init__(self, 'AlertManager', interval=0.3)
component.Component.__init__(self, 'AlertManager')
self.session = component.get('Core').session

# Increase the alert queue size so that alerts don't get lost.
Expand All @@ -57,17 +58,41 @@ def __init__(self):
# handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
self.handlers = defaultdict(list)
self.delayed_calls = []
self._event = threading.Event()

def update(self):
self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
self.handle_alerts()

def start(self):
thread = threading.Thread(
target=self.wait_for_alert_in_thread, name='alert-poller', daemon=True
)
thread.start()
self._event.set()

def stop(self):
for delayed_call in self.delayed_calls:
if delayed_call.active():
delayed_call.cancel()
self.delayed_calls = []

def pause(self):
self._event.clear()

def resume(self):
self._event.set()

def wait_for_alert_in_thread(self):
while self._component_state not in ('Stopping', 'Stopped'):
if self.session.wait_for_alert(1000) is None:
continue
if self._event.wait():
threads.blockingCallFromThread(reactor, self.maybe_handle_alerts)

def maybe_handle_alerts(self):
if self._component_state == 'Started':
self.handle_alerts()

def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None:
"""
Registers a function that will be called when 'alert_type' is pop'd
Expand Down
48 changes: 48 additions & 0 deletions deluge/tests/test_alertmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,47 @@
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
from types import SimpleNamespace

import pytest_twisted

import deluge.component as component
from deluge.conftest import BaseTestCase
from deluge.core.core import Core


class DummyAlert1:
def __init__(self):
self.message = '1'


class DummyAlert2:
def __init__(self):
self.message = '2'


class SessionMock:
def __init__(self):
self.alerts = []

def set_alerts(self):
self.alerts = [DummyAlert1(), DummyAlert2()]

def wait_for_alert(self, timeout):
return self.alerts[0] if len(self.alerts) > 0 else None

def pop_alerts(self):
alerts = self.alerts
self.alerts = []
return alerts


class TestAlertManager(BaseTestCase):
def set_up(self):
self.core = Core()
self.core.config.config['lsd'] = False
self.am = component.get('AlertManager')
self.am.session = SessionMock()
return component.start(['AlertManager'])

def tear_down(self):
Expand All @@ -28,6 +58,24 @@ def handler(alert):
assert self.am.handlers['dummy1'] == [handler]
assert self.am.handlers['dummy2'] == [handler]

@pytest_twisted.ensureDeferred
async def test_pop_alert(self, mock_callback):
mock_callback.reset_mock()
self.am.register_handler('DummyAlert1', mock_callback)
self.am.session.set_alerts()
await mock_callback.deferred
mock_callback.assert_called_once_with(SimpleNamespace(message='1'))

@pytest_twisted.ensureDeferred
async def test_pause_not_pop_alert(self, mock_callback):
await component.pause(['AlertManager'])
self.am.register_handler('DummyAlert1', mock_callback)
self.am.session.set_alerts()
await mock_callback.deferred
mock_callback.assert_not_called()
assert not self.am._event.isSet()
assert len(self.am.session.alerts) == 2

def test_deregister_handler(self):
def handler(alert):
...
Expand Down

0 comments on commit aae4348

Please sign in to comment.