forked from ilastik/lazyflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtestRequestLock.py
206 lines (172 loc) · 6.11 KB
/
testRequestLock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from __future__ import division
from builtins import range
from builtins import object
import time
import random
import logging
import threading
from lazyflow.request import Request, RequestLock
from test_utilities import fail_after_timeout
logger = logging.getLogger("tests.testRequestLock")
class ThreadRequest(object):
"""
Just threading.Thread, but the API looks a bit like Request.
"""
def __init__(self, fn):
self.thr = threading.Thread(target=fn)
def submit(self):
self.thr.start()
def wait(self):
self.thr.join()
@fail_after_timeout(20)
def test_RequestLock():
assert Request.global_thread_pool.num_workers > 0, \
"This test must be used with the real threadpool."
lockA = RequestLock()
lockB = RequestLock()
def log_request_system_status():
status = ( "*************************\n"
+ 'lockA.pending: {}\n'.format( len(lockA._pendingRequests))
+ 'lockB.pending: {}\n'.format( len(lockB._pendingRequests))
#+ "suspended Requests: {}\n".format( len(Request.global_suspend_set) )
+ "global job queue: {}\n".format( len(Request.global_thread_pool.unassigned_tasks) ) )
for worker in Request.global_thread_pool.workers:
status += "{} queued tasks: {}\n".format( worker.name, len(worker.job_queue) )
status += "*****************************************************"
logger.debug(status)
running = [True]
def periodic_status():
while running[0]:
time.sleep(0.5)
log_request_system_status()
# Uncomment these lines to print periodic status while the test runs...
status_thread = threading.Thread(target=periodic_status)
status_thread.daemon = True
status_thread.start()
try:
_impl_test_lock(lockA, lockB, Request, 1000)
except:
log_request_system_status()
running[0] = False
status_thread.join()
global paused
paused = False
Request.reset_thread_pool(Request.global_thread_pool.num_workers)
if lockA.locked():
lockA.release()
if lockB.locked():
lockB.release()
raise
log_request_system_status()
running[0] = False
status_thread.join()
def test_ThreadingLock():
# As a sanity check that our test works properly,
# try running it with 'normal' locks.
# The test should pass no matter which task & lock implementation we use.
_impl_test_lock(threading.Lock(), threading.Lock(), ThreadRequest, 100)
paused = True
def _impl_test_lock(lockA, lockB, task_class, num_tasks):
"""
Simple test to start a lot of tasks that acquire/release the same two locks.
We want to make sure that the test itself has sound logic,
so it is written to be agnostic to the lock/task type.
This test should work for both Requests (with RequestLocks) or 'normal'
threading.Threads and Locks (as long as the API is adapted a bit to look
like Requests via ThreadRequest, above.)
"""
global paused
paused = True
progressAB = [0,0]
# Prepare
lockB.acquire()
def f1():
"""
A.acquire(); B.release()
"""
time.sleep(random.random() / 1000.0)
lockA.acquire()
logger.debug('Acquired A. Progress: {}'.format(progressAB[0]))
while paused: pass
assert lockB.locked()
lockB.release()
progressAB[0] += 1
if isinstance( lockA, RequestLock ):
logger.debug('lockA.pending: {}'.format( len(lockA._pendingRequests)) )
def f2():
"""
B.acquire(); A.release()
"""
time.sleep(random.random() / 1000.0)
lockB.acquire()
logger.debug('Acquired B. Progress: {}'.format(progressAB[1]))
while paused: pass
assert lockA.locked()
lockA.release()
progressAB[1] += 1
if isinstance( lockB, RequestLock ):
logger.debug('lockB.pending: {}'.format( len(lockB._pendingRequests)) )
tasks = []
for _ in range(num_tasks):
tasks.append( task_class(f1) )
tasks.append( task_class(f2) )
for task in tasks:
task.submit()
logger.debug("Pause for tasks to finish queuing.....")
time.sleep(0.1)
paused = False
for task in tasks:
task.wait()
lockA.acquire() # A should be left in released state.
lockB.release()
logger.debug("DONE")
def test_cancellation_behavior():
"""
If a request is cancelled while it was waiting on a lock,
it should raise the CancellationException.
"""
lock = RequestLock()
lock.acquire()
def f():
try:
with lock:
assert False
except Request.CancellationException:
pass
else:
assert False
finished = [False]
cancelled = [False]
failed = [False]
def handle_finished(result):
finished[0] = True
def handle_cancelled():
cancelled[0] = True
def handle_failed(*args):
failed[0] = True
req = Request(f)
req.notify_finished( handle_finished )
req.notify_failed( handle_failed )
req.notify_cancelled( handle_cancelled )
req.submit()
req.cancel()
time.sleep(0.1)
lock.release()
time.sleep(0.1)
assert not finished[0] and not failed[0] and cancelled[0]
if __name__ == "__main__":
import sys
sys.argv.append("--nocapture") # Don't steal stdout. Show it on the console as usual.
sys.argv.append("--nologcapture") # Don't set the logging level to DEBUG. Leave it alone.
import nose
# Logging is OFF by default when running from command-line nose, i.e.:
# nosetests thisFile.py)
# but ON by default if running this test directly, i.e.:
# python thisFile.py
formatter = logging.Formatter('%(levelname)s %(name)s %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logging.getLogger().addHandler( handler )
logger.setLevel(logging.DEBUG)
ret = nose.run(defaultTest=__file__)
if not ret: sys.exit(1)