Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safety improvements #122

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
- uses: julia-actions/julia-runtest@v1
env:
JULIA_DISTRIBUTED_TESTING_STANDALONE: 1
JULIA_NUM_THREADS: 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
JULIA_NUM_THREADS: 4
JULIA_NUM_THREADS: 4,4

Shake out having lots of threads in both pools?

Copy link
Collaborator Author

@JamesWrigley JamesWrigley Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep makes sense; fixed in 2874281.

- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v5
with:
Expand Down
34 changes: 14 additions & 20 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ function set_worker_state(w, state)
end

function check_worker_state(w::Worker)
if w.state === W_CREATED
if (@atomic w.state) === W_CREATED
if !isclusterlazy()
if PGRP.topology === :all_to_all
# Since higher pids connect with lower pids, the remote worker
Expand Down Expand Up @@ -190,20 +190,14 @@ function exec_conn_func(w::Worker)
end

function wait_for_conn(w)
if w.state === W_CREATED
if (@atomic w.state) === W_CREATED
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

T = Threads.@spawn Threads.threadpool() begin
sleep($timeout)
lock(w.c_state) do
notify(w.c_state; all=true)
end
end
errormonitor(T)
lock(w.c_state) do
wait(w.c_state)
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
if timedwait(() -> (@atomic w.state) === WorkerState_connected, timeout) === :timed_out
# Notify any waiters on the state and throw
@lock w.c_state notify(w.c_state)
error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
end
nothing
Expand Down Expand Up @@ -660,7 +654,7 @@ function create_worker(manager, wconfig)
for jw in PGRP.workers
if (jw.id != 1) && (jw.id < w.id)
# wait for wl to join
if jw.state === W_CREATED
if (@atomic jw.state) === W_CREATED
lock(jw.c_state) do
wait(jw.c_state)
end
Expand Down Expand Up @@ -688,7 +682,7 @@ function create_worker(manager, wconfig)

for wl in wlist
lock(wl.c_state) do
if wl.state === W_CREATED
if (@atomic wl.state) === W_CREATED
# wait for wl to join
wait(wl.c_state)
end
Expand Down Expand Up @@ -890,7 +884,7 @@ function nprocs()
n = length(PGRP.workers)
# filter out workers in the process of being setup/shutdown.
for jw in PGRP.workers
if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED)
if !isa(jw, LocalProcess) && ((@atomic jw.state) !== W_CONNECTED)
n = n - 1
end
end
Expand Down Expand Up @@ -941,7 +935,7 @@ julia> procs()
function procs()
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
# filter out workers in the process of being setup/shutdown.
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
else
return Int[x.id for x in PGRP.workers]
end
Expand All @@ -950,7 +944,7 @@ end
function id_in_procs(id) # faster version of `id in procs()`
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
for x in PGRP.workers
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED)
if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === W_CONNECTED)
return true
end
end
Expand All @@ -972,7 +966,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned.
"""
function procs(pid::Integer)
if myid() == 1
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager))
Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
else
Expand Down Expand Up @@ -1079,11 +1073,11 @@ function _rmprocs(pids, waitfor)

start = time_ns()
while (time_ns() - start) < waitfor*1e9
all(w -> w.state === W_TERMINATED, rmprocset) && break
all(w -> (@atomic w.state) === W_TERMINATED, rmprocset) && break
sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
end

unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)]
unremoved = [wrkr.id for wrkr in filter(w -> (@atomic w.state) !== W_TERMINATED, rmprocset)]
if length(unremoved) > 0
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
throw(ErrorException(estr))
Expand Down
2 changes: 1 addition & 1 deletion src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ end
function flush_gc_msgs()
try
for w in (PGRP::ProcessGroup).workers
if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
if isa(w,Worker) && ((@atomic w.state) == W_CONNECTED) && w.gcflag
flush_gc_msgs(w)
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
println(stderr, "Process($(myid())) - Unknown remote, closing connection.")
elseif !(wpid in map_del_wrkr)
werr = worker_from_id(wpid)
oldstate = werr.state
oldstate = @atomic werr.state
set_worker_state(werr, W_TERMINATED)

# If unhandleable error occurred talking to pid 1, exit
Expand Down
Loading