-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsyncpri.py
109 lines (93 loc) · 2.59 KB
/
syncpri.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import pyb
import _thread
class Event(object):
def __init__(self, initval=False, *, auto_reset=True, mutex=None):
self.__val = initval
self.__auto_reset = auto_reset
if mutex is None:
mutex = _thread.allocate_lock()
self.__mutex = mutex
@staticmethod
def wait_any(events):
# to avoid iterator
events = list(events)
loop_cond = True
while loop_cond:
for event in events:
if event.__val:
event.wait()
loop_cond = False
break
def wait(self):
# sync current state with __val
# assume that the reason is not effect
if not self.__val:
self.reset()
# to make sure this thread will be blocked, not the other
while not self.__mutex.locked():
pass
self.__mutex.acquire()
self.__mutex.release() # FIXED
if self.__auto_reset:
self.reset()
def __enter__(self):
self.wait()
def __exit__(self, *args):
pass
def set(self):
self.__val = True
if self.__mutex.locked():
self.__mutex.release()
def reset(self):
self.__val = False
if not self.__mutex.locked():
# avoid the situation that the same thread acquire __lock twice
if type(self.__mutex) is SpinMutex:
self.__mutex.acquire(dummy=True)
else:
_thread.start_new_thread(self.__mutex.acquire, [])
@property
def value(self):
return self.__val
class SpinMutex(object):
def __init__(self, *, restrict_owner=True, using_critical_section=False):
self.__val = False
self.__restrict_owner = restrict_owner
self.__owner = -1
self.__using_critical_section = using_critical_section
def acquire(self, dummy=False):
if dummy:
self.__val = True
else:
thread_id = _thread.get_ident()
if self.__val:
if self.__owner == thread_id:
raise RuntimeError('dead lock')
while self.__val:
pass
irq_state = None
if self.__using_critical_section:
irq_state = pyb.disable_irq()
self.__val = True
self.__owner = thread_id
if self.__using_critical_section:
pyb.enable_irq(irq_state)
def release(self):
if not self.__val:
raise RuntimeError('mutex didnt aquired yet')
thread_id = _thread.get_ident()
if self.__restrict_owner and self.__owner != thread_id:
raise RuntimeError('mutex been released by other thread')
irq_state = None
if self.__using_critical_section:
irq_state = pyb.disable_irq()
self.__owner = -1
self.__val = False
if self.__using_critical_section:
pyb.enable_irq(irq_state)
def locked(self):
return self.__val
def __enter__(self):
self.acquire()
def __exit__(self, *args):
self.release()