Skip to content

Commit

Permalink
Run matrix client / room listener callbacks in separate greenlets
Browse files Browse the repository at this point in the history
Also add workaround for matrix-org/matrix-python-sdk#193
  • Loading branch information
ulope committed May 10, 2018
1 parent 357bdb4 commit 891176a
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 10 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from matrix_client.client import MatrixClient
from matrix_client.client import MatrixClient, CACHE
from matrix_client.user import User
from matrix_client.room import Room
from matrix_client.errors import MatrixRequestError

import gevent
import logging
from typing import List

from raiden.network.matrix.room import Room
from raiden.network.matrix.utils import _geventify_callback, Fix429HTTPAdapter


try:
from urllib import quote
except ImportError:
Expand All @@ -17,29 +21,39 @@
class GMatrixClient(MatrixClient):
"""Gevent-compliant MatrixClient child class"""

def listen_forever(self, timeout_ms=30000, exception_handler=None):
def __init__(self, base_url, token=None, user_id=None, valid_cert_check=True,
sync_filter_limit=20, cache_level=CACHE.ALL):
super().__init__(base_url, token, user_id, valid_cert_check, sync_filter_limit,
cache_level)
# TODO: Remove once https://github.com/matrix-org/matrix-python-sdk/issues/193 is fixed
self.api.session.mount('https://', Fix429HTTPAdapter())
self.api.session.mount('http://', Fix429HTTPAdapter())

def listen_forever(self, timeout_ms=30000, exception_handler=None, bad_sync_timeout=5):
""" Keep listening for events forever.
Args:
timeout_ms (int): How long to poll the Home Server for before
retrying.
exception_handler (func(exception)): Optional exception handler
function which can be used to handle exceptions in the caller
thread.
bad_sync_timeout (int): Base time to wait after an error before
retrying. Will be increased according to exponential backoff.
"""
bad_sync_timeout = 5000
_bad_sync_timeout = bad_sync_timeout
self.should_listen = True
while self.should_listen:
try:
self._sync(timeout_ms)
bad_sync_timeout = 5
_bad_sync_timeout = bad_sync_timeout
except MatrixRequestError as e:
logger.warning("A MatrixRequestError occured during sync.")
if e.code >= 500:
logger.warning("Problem occured serverside. Waiting %i seconds",
bad_sync_timeout)
gevent.sleep(bad_sync_timeout)
bad_sync_timeout = min(bad_sync_timeout * 2,
self.bad_sync_timeout_limit)
_bad_sync_timeout)
gevent.sleep(_bad_sync_timeout)
_bad_sync_timeout = min(_bad_sync_timeout * 2,
self.bad_sync_timeout_limit)
else:
raise
except Exception as e:
Expand All @@ -58,8 +72,8 @@ def start_listener_thread(self, timeout_ms=30000, exception_handler=None):
function which can be used to handle exceptions in the caller
thread.
"""
self.sync_thread = gevent.spawn(self.listen_forever, timeout_ms, exception_handler)
self.should_listen = True
self.sync_thread = gevent.spawn(self.listen_forever, timeout_ms, exception_handler)

def search_user_directory(self, term: str) -> List[User]:
""" Search user directory for a given term, returning a list of users
Expand All @@ -82,6 +96,31 @@ def search_user_directory(self, term: str) -> List[User]:
except KeyError:
return []

def modify_presence_list(self, add_user_ids: list = (), remove_user_ids: list = ()):
return self.api._send(
'POST',
f'/presence/list/{self.user_id}',
{
'invite': add_user_ids,
'drop': remove_user_ids
}
)

def get_presence_list(self):
return self.api._send(
'GET',
f'/presence/list/{self.user_id}',
)

def set_presence_state(self, state):
return self.api._send(
'PUT',
f'/presence/{self.user_id}/status',
{
'presence': state
}
)

def typing(self, room: Room, timeout: int=5000):
"""Send typing event directly to api
Expand All @@ -93,3 +132,23 @@ def typing(self, room: Room, timeout: int=5000):
quote(room.room_id), quote(self.user_id),
)
return self.api._send('PUT', path, {'typing': True, 'timeout': timeout})

def add_invite_listener(self, callback):
super().add_invite_listener(_geventify_callback(callback))

def add_leave_listener(self, callback):
super().add_leave_listener(_geventify_callback(callback))

def add_presence_listener(self, callback):
return super().add_presence_listener(_geventify_callback(callback))

def add_listener(self, callback, event_type=None):
return super().add_listener(_geventify_callback(callback), event_type)

def add_ephemeral_listener(self, callback, event_type=None):
return super().add_ephemeral_listener(_geventify_callback(callback), event_type)

def _mkroom(self, room_id):
""" Uses a geventified Room subclass """
self.rooms[room_id] = Room(self, room_id)
return self.rooms[room_id]
14 changes: 14 additions & 0 deletions raiden/network/matrix/room.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from matrix_client.room import Room as MatrixRoom

from raiden.network.matrix.utils import _geventify_callback


class Room(MatrixRoom):
def add_listener(self, callback, event_type=None):
return super().add_listener(_geventify_callback(callback), event_type)

def add_ephemeral_listener(self, callback, event_type=None):
return super().add_ephemeral_listener(_geventify_callback(callback), event_type)

def add_state_listener(self, callback, event_type=None):
super().add_state_listener(_geventify_callback(callback), event_type)
37 changes: 37 additions & 0 deletions raiden/network/matrix/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import json

import gevent
from requests.adapters import HTTPAdapter


def _geventify_callback(callback):
def inner(*args, **kwargs):
gevent.spawn(callback, *args, **kwargs)

return inner


class Fix429HTTPAdapter(HTTPAdapter):
""" Temporary workaround for https://github.com/matrix-org/matrix-python-sdk/issues/193 """

_fallback_retry_timeout = 1000

def build_response(self, req, resp):
response = super().build_response(req, resp)
if response.status_code == 429:
resp_json = response.json()
if 'retry_after_ms' not in resp_json:
if 'error' in resp_json:
try:
error = json.loads(resp_json['error'])
resp_json['retry_after_ms'] = error.get('retry_after_ms',
self._fallback_retry_timeout)
except json.JSONDecodeError:
resp_json['retry_after_ms'] = self._fallback_retry_timeout
else:
resp_json['retry_after_ms'] = self._fallback_retry_timeout

response._content = json.dumps(resp_json).encode(
response.encoding or response.apparent_encoding)
print("Fixing response json to ", response._content)
return response

0 comments on commit 891176a

Please sign in to comment.