-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEvent.py
64 lines (52 loc) · 1.66 KB
/
Event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Thanks to Longpoke(https://stackoverflow.com/users/80243/l%cc%b2%cc%b3o%cc%b2%cc%b3%cc%b3n%cc%b2%cc%b3%cc%b3g%cc%b2%cc%b3%cc%b3p%cc%b2%cc%b3o%cc%b2%cc%b3%cc%b3k%cc%b2%cc%b3%cc%b3e%cc%b2%cc%b3%cc%b3)
# https://stackoverflow.com/questions/1092531/event-system-in-python
class Subscriber(object):
def __init__(self, func, callOnce=False) -> None:
super().__init__()
self.func = func
self.callOnce = callOnce
class Event(list):
"""Event subscription.
A list of callable objects. Calling an instance of this will cause a
call to each item in the list in ascending order by index.
Example Usage:
>>> def f(x):
... print 'f(%s)' % x
>>> def g(x):
... print 'g(%s)' % x
>>> e = Event()
>>> e()
>>> e.append(f)
>>> e(123)
f(123)
>>> e.remove(f)
>>> e()
>>> e += (f, g)
>>> e(10)
f(10)
g(10)
>>> del e[0]
>>> e(2)
g(2)
"""
def __call__(self, *args, **kwargs):
needsRemoval = []
for subscriber in self:
subscriber.func(*args, **kwargs)
if subscriber.callOnce:
needsRemoval.append(subscriber)
for s in needsRemoval:
self.remove(s)
def __repr__(self):
return f"Event({list.__repr__(self)})"
def append(self, func) -> None:
self.subscribe(func)
def subscribe(self, func, callOnce=False):
super().append(Subscriber(func, callOnce))
def unsubscribe(self, func):
for subscriber in self:
if subscriber.func == func:
self.remove(subscriber)
break
def once(self, func):
self.subscribe(func, True)