-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdispatch_queue.gd
272 lines (206 loc) · 6.98 KB
/
dispatch_queue.gd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
extends RefCounted
class_name DispatchQueue
## Emitted when the last queued Task finishes.
## This signal is emitted deferred, so it is safe to call non Thread-safe APIs.
signal all_tasks_finished()
## Helper object that emits "finished" after all Tasks in a list finish.
class TaskGroup:
extends RefCounted
## Emitted after Task executes, passing the result as argument.
## The signal is emitted in the same Thread that executed the Task, so you
## need to connect with CONNECT_DEFERRED if you want to call non Thread-safe APIs.
signal finished(results)
var task_count := 0
var task_results = []
var mutex: Mutex = null
func _init(threaded: bool) -> void:
if threaded:
mutex = Mutex.new()
## Helper method for connecting to the "finished" signal.
##
## This enables the following pattern:
## dispatch_queue.dispatch_group(task_list).then(continuation_callable)
func then(callable: Callable, flags: int = 0) -> int:
return finished.connect(callable, flags | CONNECT_ONE_SHOT)
## Alias for `then` that also adds CONNECT_DEFERRED to flags.
func then_deferred(callable: Callable, flags: int = 0) -> int:
return then(callable, flags | CONNECT_DEFERRED)
func add_task(task: Task) -> void:
task.group = self
task.id_in_group = task_count
task_count += 1
task_results.resize(task_count)
func mark_task_finished(task: Task, result) -> void:
if mutex:
mutex.lock()
task_count -= 1
task_results[task.id_in_group] = result
var is_last_task = task_count == 0
if mutex:
mutex.unlock()
if is_last_task:
finished.emit(task_results)
## A single task to be executed.
##
## Connect to the `finished` signal to receive the result either manually
## or by calling `then`/`then_deferred`.
class Task:
extends RefCounted
## Emitted after all Tasks in the group finish, passing the results Array as argument.
## The signal is emitted in the same Thread that executed the last pending Task, so you
## need to connect with CONNECT_DEFERRED if you want to call non Thread-safe APIs.
signal finished(result)
var callable: Callable
var group: TaskGroup = null
var id_in_group: int = -1
## Helper method for connecting to the "finished" signal.
##
## This enables the following pattern:
## dispatch_queue.dispatch(callable).then(continuation_callable)
func then(callable: Callable, flags: int = 0) -> int:
return finished.connect(callable, flags | CONNECT_ONE_SHOT)
## Alias for `then` that also adds CONNECT_DEFERRED to flags.
func then_deferred(callable: Callable, flags: int = 0) -> int:
return then(callable, flags | CONNECT_DEFERRED)
func execute() -> void:
var result = callable.call()
finished.emit(result)
if group:
group.mark_task_finished(self, result)
class _WorkerPool:
extends RefCounted
var threads: Array[Thread] = []
var should_shutdown := false
var mutex := Mutex.new()
var semaphore := Semaphore.new()
func _notification(what: int) -> void:
if what == NOTIFICATION_PREDELETE and self:
shutdown()
func shutdown() -> void:
if threads.is_empty():
return
should_shutdown = true
for i in threads.size():
semaphore.post()
for t in threads:
if t.is_alive():
t.wait_to_finish()
threads.clear()
should_shutdown = false
var _task_queue = []
var _workers: _WorkerPool = null
func _notification(what: int) -> void:
if what == NOTIFICATION_PREDELETE and self:
shutdown()
## Creates a Thread of execution to process tasks.
## If queue was already serial, this is a no-op, otherwise calls `shutdown` and create a new Thread.
func create_serial() -> void:
create_concurrent(1)
## Creates `thread_count` Threads of execution to process tasks.
## If queue was already concurrent with `thread_count` Threads, this is a no-op.
## Otherwise calls `shutdown` and create new Threads.
## If `thread_count <= 1`, creates a serial queue.
func create_concurrent(thread_count: int = 1) -> void:
if thread_count == get_thread_count():
return
if is_threaded():
shutdown()
_workers = _WorkerPool.new()
var run_loop = self._run_loop.bind(_workers)
for i in max(1, thread_count):
var thread = Thread.new()
_workers.threads.append(thread)
thread.start(run_loop)
## Create a Task for executing `callable`.
## On threaded mode, the Task will be executed on a Thread when there is one available.
## On synchronous mode, the Task will be executed on the next frame.
func dispatch(callable: Callable) -> Task:
var task = Task.new()
if callable.is_valid():
task.callable = callable
if is_threaded():
_workers.mutex.lock()
_task_queue.append(task)
_workers.mutex.unlock()
_workers.semaphore.call_deferred("post")
else:
if _task_queue.is_empty():
call_deferred("_sync_run_next_task")
_task_queue.append(task)
else:
push_error("Trying to dispatch an invalid callable, ignoring it")
return task
## Create all tasks in `task_list` by calling `dispatch` on each value,
## returning the TaskGroup associated with them.
func dispatch_group(task_list: Array[Callable]) -> TaskGroup:
var group = TaskGroup.new(is_threaded())
for callable in task_list:
var task: Task = dispatch(callable)
group.add_task(task)
return group
## Returns whether queue is threaded or synchronous.
func is_threaded() -> bool:
return _workers != null
## Returns the current Thread count.
## Returns 0 on synchronous mode.
func get_thread_count() -> int:
if is_threaded():
return _workers.threads.size()
else:
return 0
## Returns the number of queued tasks.
func size() -> int:
var result
if is_threaded():
_workers.mutex.lock()
result = _task_queue.size()
_workers.mutex.unlock()
else:
result = _task_queue.size()
return result
## Returns whether queue is empty, that is, there are no tasks queued.
func is_empty() -> bool:
return size() <= 0
## Cancel pending Tasks, clearing the current queue.
## Tasks that are being processed will still run to completion.
func clear() -> void:
if is_threaded():
_workers.mutex.lock()
_task_queue.clear()
_workers.mutex.unlock()
else:
_task_queue.clear()
## Cancel pending Tasks, wait and release the used Threads.
## The queue now runs in synchronous mode, so that new tasks will run in the main thread.
## Call `create_serial` or `create_concurrent` to recreate the worker threads.
## This method is called automatically on `NOTIFICATION_PREDELETE`.
## It is safe to call this more than once.
func shutdown() -> void:
clear()
if is_threaded():
var current_workers = _workers
_workers = null
current_workers.shutdown()
func _run_loop(pool: _WorkerPool) -> void:
while true:
pool.semaphore.wait()
if pool.should_shutdown:
break
pool.mutex.lock()
var task = _pop_task()
pool.mutex.unlock()
if task:
task.execute()
func _sync_run_next_task() -> void:
var task = _pop_task()
if task:
task.execute()
call_deferred("_sync_run_next_task")
func _pop_task() -> Task:
var task: Task = _task_queue.pop_front()
if task and _task_queue.is_empty():
task.then_deferred(self._on_last_task_finished)
return task
func _on_last_task_finished(_result):
if is_empty():
all_tasks_finished.emit()