Skip to content

Commit

Permalink
[Core] Refactor Alertmanager to retry unhandled alerts
Browse files Browse the repository at this point in the history
We are currently creating a copy of each alert to avoid segfaults when
the next pop_alerts invalidates the lt alert objects we are holding
for handler callbacks.

However these alert copies are not deep enough since need to also
resolve the alert object methods e.g. message() therefore it is still
possible to result in lt segfaults.

The workaround is to check for any handlers not called, give them some
more time and eventually discard if still not handled.

Ref: arvidn/libtorrent#6437
  • Loading branch information
cas-- authored and doadin committed Sep 21, 2024
1 parent aae4348 commit 14e83a7
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 58 deletions.
64 changes: 40 additions & 24 deletions deluge/core/alertmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import logging
import threading
from collections import defaultdict
from types import SimpleNamespace
from typing import Any, Callable

from twisted.internet import reactor, threads
Expand Down Expand Up @@ -57,11 +56,13 @@ def __init__(self):

# handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
self.handlers = defaultdict(list)
self.handlers_retry_timeout = 0.3
self.handlers_retry_count = 6
self.delayed_calls = []
self._event = threading.Event()

def update(self):
self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
pass

def start(self):
thread = threading.Thread(
Expand All @@ -71,10 +72,7 @@ def start(self):
self._event.set()

def stop(self):
for delayed_call in self.delayed_calls:
if delayed_call.active():
delayed_call.cancel()
self.delayed_calls = []
self.cancel_delayed_calls()

def pause(self):
self._event.clear()
Expand All @@ -89,9 +87,41 @@ def wait_for_alert_in_thread(self):
if self._event.wait():
threads.blockingCallFromThread(reactor, self.maybe_handle_alerts)

def maybe_handle_alerts(self):
if self._component_state == 'Started':
self.handle_alerts()
def cancel_delayed_calls(self):
"""Cancel all delayed handlers."""
for delayed_call in self.delayed_calls:
if delayed_call.active():
delayed_call.cancel()
self.delayed_calls = []

def check_delayed_calls(self, retries: int = 0) -> bool:
"""Returns True if any handler calls are delayed (upto retry limit)."""
self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
if not self.delayed_calls:
return False

if retries > self.handlers_retry_count:
log.warning(
'Alert handlers timeout reached, cancelling: %s', self.delayed_calls
)
self.cancel_delayed_calls()
return False

return True

def maybe_handle_alerts(self, retries: int = 0) -> None:
if self._component_state != 'Started':
return

if self.check_delayed_calls(retries):
log.debug('Waiting for delayed alerts: %s', self.delayed_calls)
retries += 1
reactor.callLater(
self.handlers_retry_timeout, self.maybe_handle_alerts, retries
)
return

self.handle_alerts()

def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None:
"""
Expand Down Expand Up @@ -153,21 +183,7 @@ def handle_alerts(self):
if log.isEnabledFor(logging.DEBUG):
log.debug('Handling alert: %s', alert_type)

alert_copy = self.create_alert_copy(alert)
self.delayed_calls.append(reactor.callLater(0, handler, alert_copy))

@staticmethod
def create_alert_copy(alert):
"""Create a Python copy of libtorrent alert
Avoid segfault if an alert is handled after next pop_alert call"""
return SimpleNamespace(
**{
attr: getattr(alert, attr)
for attr in dir(alert)
if not attr.startswith('__')
}
)
self.delayed_calls.append(reactor.callLater(0, handler, alert))

def set_alert_queue_size(self, queue_size):
"""Sets the maximum size of the libtorrent alert queue"""
Expand Down
83 changes: 49 additions & 34 deletions deluge/tests/test_alertmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,19 @@
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
from types import SimpleNamespace
from dataclasses import dataclass

import pytest_twisted
import pytest

import deluge.component as component
from deluge.conftest import BaseTestCase
from deluge.core.core import Core


class DummyAlert1:
def __init__(self):
self.message = '1'


class DummyAlert2:
def __init__(self):
self.message = '2'


class SessionMock:
class LtSessionMock:
def __init__(self):
self.alerts = []

def set_alerts(self):
self.alerts = [DummyAlert1(), DummyAlert2()]
def push_alerts(self, alerts):
self.alerts = alerts

def wait_for_alert(self, timeout):
return self.alerts[0] if len(self.alerts) > 0 else None
Expand All @@ -38,16 +26,38 @@ def pop_alerts(self):
return alerts


class TestAlertManager(BaseTestCase):
def set_up(self):
@dataclass
class LtAlertMock:
type: int
name: str
message: str

def message(self):
return self.message

def what(self):
return self.name


@pytest.fixture
def mock_alert1():
return LtAlertMock(type=1, name='mock_alert1', message='Alert 1')


@pytest.fixture
def mock_alert2():
return LtAlertMock(type=2, name='mock_alert2', message='Alert 2')


class TestAlertManager:
@pytest.fixture(autouse=True)
def set_up(self, component):
self.core = Core()
self.core.config.config['lsd'] = False
self.am = component.get('AlertManager')
self.am.session = SessionMock()
return component.start(['AlertManager'])
self.am.session = LtSessionMock()

def tear_down(self):
return component.shutdown()
component.start(['AlertManager'])

def test_register_handler(self):
def handler(alert):
Expand All @@ -58,22 +68,27 @@ def handler(alert):
assert self.am.handlers['dummy1'] == [handler]
assert self.am.handlers['dummy2'] == [handler]

@pytest_twisted.ensureDeferred
async def test_pop_alert(self, mock_callback):
mock_callback.reset_mock()
self.am.register_handler('DummyAlert1', mock_callback)
self.am.session.set_alerts()
async def test_pop_alert(self, mock_callback, mock_alert1, mock_alert2):
self.am.register_handler('mock_alert1', mock_callback)

self.am.session.push_alerts([mock_alert1, mock_alert2])

await mock_callback.deferred
mock_callback.assert_called_once_with(SimpleNamespace(message='1'))

@pytest_twisted.ensureDeferred
async def test_pause_not_pop_alert(self, mock_callback):
mock_callback.assert_called_once_with(mock_alert1)

async def test_pause_not_pop_alert(
self, component, mock_alert1, mock_alert2, mock_callback
):
await component.pause(['AlertManager'])
self.am.register_handler('DummyAlert1', mock_callback)
self.am.session.set_alerts()

self.am.register_handler('mock_alert1', mock_callback)
self.am.session.push_alerts([mock_alert1, mock_alert2])

await mock_callback.deferred

mock_callback.assert_not_called()
assert not self.am._event.isSet()
assert not self.am._event.is_set()
assert len(self.am.session.alerts) == 2

def test_deregister_handler(self):
Expand Down

0 comments on commit 14e83a7

Please sign in to comment.