diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ed6b2fa..caadcc0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,21 +17,25 @@ concurrency: jobs: test: - name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} + name: julia -t${{ matrix.threads}} - ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} runs-on: ${{ matrix.os }} + timeout-minutes: 30 strategy: fail-fast: false matrix: - version: - - 'nightly' - os: - - ubuntu-latest - - macOS-latest - - windows-latest - arch: - - x64 - - x86 + threads: + # - '1' + - '4,4' + version: [nightly] + os: [ubuntu-latest, windows-latest, macOS-latest] + arch: [x64, x86, aarch64] exclude: + - os: ubuntu-latest + arch: aarch64 + - os: windows-latest + arch: aarch64 + - os: macOS-latest + arch: x64 - os: macOS-latest arch: x86 steps: @@ -44,6 +48,7 @@ jobs: - uses: julia-actions/julia-runtest@v1 env: JULIA_DISTRIBUTED_TESTING_STANDALONE: 1 + JULIA_NUM_THREADS: '${{ matrix.threads}}' - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v5 with: diff --git a/src/cluster.jl b/src/cluster.jl index 0b42961..538c2e3 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -151,7 +151,7 @@ function set_worker_state(w, state) end function check_worker_state(w::Worker) - if w.state === W_CREATED + if (@atomic w.state) === W_CREATED if !isclusterlazy() if PGRP.topology === :all_to_all # Since higher pids connect with lower pids, the remote worker @@ -163,10 +163,10 @@ function check_worker_state(w::Worker) else w.ct_time = time() if myid() > w.id - t = Threads.@spawn Threads.threadpool() exec_conn_func(w) + t = @async exec_conn_func(w) else # route request via node 1 - t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) + t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) end errormonitor(t) wait_for_conn(w) @@ -190,20 +190,14 @@ function exec_conn_func(w::Worker) end function wait_for_conn(w) - if w.state === W_CREATED + if (@atomic w.state) === W_CREATED timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") - T = Threads.@spawn Threads.threadpool() begin - sleep($timeout) - lock(w.c_state) do - notify(w.c_state; all=true) - end - end - errormonitor(T) - lock(w.c_state) do - wait(w.c_state) - w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + if timedwait(() -> (@atomic w.state) === W_CONNECTED, timeout) === :timed_out + # Notify any waiters on the state and throw + @lock w.c_state notify(w.c_state) + error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") end end nothing @@ -258,7 +252,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std else sock = listen(interface, LPROC.bind_port) end - errormonitor(Threads.@spawn while isopen(sock) + errormonitor(@async while isopen(sock) client = accept(sock) process_messages(client, client, true) end) @@ -290,7 +284,7 @@ end function redirect_worker_output(ident, stream) - t = Threads.@spawn while !eof(stream) + t = @async while !eof(stream) line = readline(stream) if startswith(line, " From worker ") # stdout's of "additional" workers started from an initial worker on a host are not available @@ -329,7 +323,7 @@ function read_worker_host_port(io::IO) leader = String[] try while ntries > 0 - readtask = Threads.@spawn Threads.threadpool() readline(io) + readtask = @async readline(io) yield() while !istaskdone(readtask) && ((time_ns() - t0) < timeout) sleep(0.05) @@ -430,7 +424,7 @@ if launching workers programmatically, execute `addprocs` in its own task. ```julia # On busy clusters, call `addprocs` asynchronously -t = Threads.@spawn addprocs(...) +t = @async addprocs(...) ``` ```julia @@ -496,13 +490,14 @@ function addprocs_locked(manager::ClusterManager; kwargs...) # call manager's `launch` is a separate task. This allows the master # process initiate the connection setup process as and when workers come # online - t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy) + # NOTE: Must be `@async`. See FIXME above + t_launch = @async launch(manager, params, launched, launch_ntfy) @sync begin while true if isempty(launched) istaskdone(t_launch) && break - Threads.@spawn Threads.threadpool() begin + @async begin # NOTE: Must be `@async`. See FIXME above sleep(1) notify(launch_ntfy) end @@ -512,7 +507,8 @@ function addprocs_locked(manager::ClusterManager; kwargs...) if !isempty(launched) wconfig = popfirst!(launched) let wconfig=wconfig - Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q) + # NOTE: Must be `@async`. See FIXME above + @async setup_launched_worker(manager, wconfig, launched_q) end end end @@ -592,7 +588,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch wconfig.port = port let wconfig=wconfig - Threads.@spawn Threads.threadpool() begin + @async begin pid = create_worker(manager, wconfig) remote_do(redirect_output_from_additional_worker, frompid, pid, port) push!(launched_q, pid) @@ -660,7 +656,7 @@ function create_worker(manager, wconfig) for jw in PGRP.workers if (jw.id != 1) && (jw.id < w.id) # wait for wl to join - if jw.state === W_CREATED + if (@atomic jw.state) === W_CREATED lock(jw.c_state) do wait(jw.c_state) end @@ -688,7 +684,7 @@ function create_worker(manager, wconfig) for wl in wlist lock(wl.c_state) do - if wl.state === W_CREATED + if (@atomic wl.state) === W_CREATED # wait for wl to join wait(wl.c_state) end @@ -758,7 +754,7 @@ function check_master_connect() end errormonitor( - Threads.@spawn begin + @async begin timeout = worker_timeout() if timedwait(() -> !haskey(map_pid_wrkr, 1), timeout) === :timed_out print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n") @@ -890,7 +886,7 @@ function nprocs() n = length(PGRP.workers) # filter out workers in the process of being setup/shutdown. for jw in PGRP.workers - if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED) + if !isa(jw, LocalProcess) && ((@atomic jw.state) !== W_CONNECTED) n = n - 1 end end @@ -941,7 +937,7 @@ julia> procs() function procs() if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy()) # filter out workers in the process of being setup/shutdown. - return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)] + return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)] else return Int[x.id for x in PGRP.workers] end @@ -950,7 +946,7 @@ end function id_in_procs(id) # faster version of `id in procs()` if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy()) for x in PGRP.workers - if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED) + if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === W_CONNECTED) return true end end @@ -972,7 +968,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned. """ function procs(pid::Integer) if myid() == 1 - all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)] + all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)] if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager)) Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)] else @@ -1050,13 +1046,13 @@ function rmprocs(pids...; waitfor=typemax(Int)) pids = vcat(pids...) if waitfor == 0 - t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int)) + t = @async _rmprocs(pids, typemax(Int)) yield() return t else _rmprocs(pids, waitfor) # return a dummy task object that user code can wait on. - return Threads.@spawn Threads.threadpool() nothing + return @async nothing end end @@ -1079,11 +1075,11 @@ function _rmprocs(pids, waitfor) start = time_ns() while (time_ns() - start) < waitfor*1e9 - all(w -> w.state === W_TERMINATED, rmprocset) && break + all(w -> (@atomic w.state) === W_TERMINATED, rmprocset) && break sleep(min(0.1, waitfor - (time_ns() - start)/1e9)) end - unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)] + unremoved = [wrkr.id for wrkr in filter(w -> (@atomic w.state) !== W_TERMINATED, rmprocset)] if length(unremoved) > 0 estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.") throw(ErrorException(estr)) @@ -1239,7 +1235,7 @@ function interrupt(pids::AbstractVector=workers()) @assert myid() == 1 @sync begin for pid in pids - Threads.@spawn Threads.threadpool() interrupt(pid) + @async interrupt(pid) end end end diff --git a/src/macros.jl b/src/macros.jl index 5f3ce1e..a4fec31 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex) # execute locally last as we do not want local execution to block serialization # of the request to remote nodes. for _ in 1:run_locally - Threads.@spawn Threads.threadpool() Core.eval(m, ex) + @async Core.eval(m, ex) end end nothing @@ -275,7 +275,7 @@ function preduce(reducer, f, R) end function pfor(f, R) - t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) + t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) @spawnat :any f(R, first(c), last(c)) end errormonitor(t) diff --git a/src/managers.jl b/src/managers.jl index cf48c6f..a71f166 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -113,7 +113,7 @@ addprocs([ * `exeflags`: additional flags passed to the worker processes. It can either be a `Cmd`, a `String` holding one flag, or a collection of strings, with one element per flag. - E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`. + E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`. * `topology`: Specifies how the workers connect to each other. Sending a message between unconnected workers results in an error. @@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: # Wait for all launches to complete. @sync for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt - Threads.@spawn Threads.threadpool() try + @async try launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) catch e print(stderr, "exception launching on machine $(machine) : $(e)\n") @@ -740,16 +740,24 @@ function kill(manager::SSHManager, pid::Int, config::WorkerConfig) nothing end -function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15) +function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wait = 6, exit_timeout = 15, term_timeout = 15) + # profile_wait = 6 is 1s for profile, 5s for the report to show # First, try sending `exit()` to the remote over the usual control channels remote_do(exit, pid) - timer_task = Threads.@spawn Threads.threadpool() begin + timer_task = @async begin sleep(exit_timeout) # Check to see if our child exited, and if not, send an actual kill signal if !process_exited(config.process) - @warn("Failed to gracefully kill worker $(pid), sending SIGQUIT") + @warn "Failed to gracefully kill worker $(pid)" + profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10) + if profile_sig !== nothing + @warn("Sending profile $(profile_sig[1]) to worker $(pid)") + kill(config.process, profile_sig[2]) + sleep(profile_wait) + end + @warn("Sending SIGQUIT to worker $(pid)") kill(config.process, Base.SIGQUIT) sleep(term_timeout) diff --git a/src/messages.jl b/src/messages.jl index 70baa25..6e895f0 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -194,13 +194,13 @@ end function flush_gc_msgs() try for w in (PGRP::ProcessGroup).workers - if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag + if isa(w,Worker) && ((@atomic w.state) == W_CONNECTED) && w.gcflag flush_gc_msgs(w) end end catch e bt = catch_backtrace() - Threads.@spawn showerror(stderr, e, bt) + @async showerror(stderr, e, bt) end end diff --git a/src/process_messages.jl b/src/process_messages.jl index 15f5be6..a444651 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -85,7 +85,7 @@ function schedule_call(rid, thunk) rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) - errormonitor(Threads.@spawn run_work_thunk(rv, thunk)) + errormonitor(@async run_work_thunk(rv, thunk)) return rv end end @@ -118,7 +118,7 @@ end ## message event handlers ## function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true) - errormonitor(Threads.@spawn process_tcp_streams(r_stream, w_stream, incoming)) + errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming)) end function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool) @@ -148,7 +148,7 @@ Julia version number to perform the authentication handshake. See also [`cluster_cookie`](@ref). """ function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true) - errormonitor(Threads.@spawn message_handler_loop(r_stream, w_stream, incoming)) + errormonitor(@async message_handler_loop(r_stream, w_stream, incoming)) end function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) @@ -222,7 +222,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) println(stderr, "Process($(myid())) - Unknown remote, closing connection.") elseif !(wpid in map_del_wrkr) werr = worker_from_id(wpid) - oldstate = werr.state + oldstate = @atomic werr.state set_worker_state(werr, W_TERMINATED) # If unhandleable error occurred talking to pid 1, exit @@ -283,7 +283,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version) schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...)) end function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version) - errormonitor(Threads.@spawn begin + errormonitor(@async begin v = run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), false) if isa(v, SyncTake) try @@ -299,7 +299,7 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi end function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) - errormonitor(Threads.@spawn begin + errormonitor(@async begin rv = schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...)) deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c)) nothing @@ -307,7 +307,7 @@ function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) end function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version) - errormonitor(Threads.@spawn run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true)) + errormonitor(@async run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true)) end function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) @@ -350,7 +350,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) # The constructor registers the object with a global registry. Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig)) else - Threads.@spawn connect_to_peer(cluster_manager, rpid, wconfig) + @async connect_to_peer(cluster_manager, rpid, wconfig) end end end diff --git a/src/remotecall.jl b/src/remotecall.jl index eda3899..e9bb95e 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -205,7 +205,7 @@ or to use a local [`Channel`](@ref) as a proxy: ```julia p = 1 f = Future(p) -errormonitor(Threads.@spawn put!(f, remotecall_fetch(long_computation, p))) +errormonitor(@async put!(f, remotecall_fetch(long_computation, p))) isready(f) # will not block ``` """ @@ -274,7 +274,7 @@ end const any_gc_flag = Threads.Condition() function start_gc_msgs_task() errormonitor( - Threads.@spawn begin + @async begin while true lock(any_gc_flag) do # this might miss events @@ -322,7 +322,7 @@ function process_worker(rr) msg = (remoteref_id(rr), myid()) # Needs to acquire a lock on the del_msg queue - T = Threads.@spawn Threads.threadpool() begin + T = @async begin publish_del_msg!($w, $msg) end Base.errormonitor(T) diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 900a438..f0669b6 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -10,6 +10,8 @@ else @test startswith(pathof(Distributed), sharedir) end +pathsep = Sys.iswindows() ? ";" : ":" + @test cluster_cookie() isa String include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "testenv.jl")) @@ -151,27 +153,6 @@ function poll_while(f::Function; timeout_seconds::Integer = 120) return true end -function _getenv_include_thread_unsafe() - environment_variable_name = "JULIA_TEST_INCLUDE_THREAD_UNSAFE" - default_value = "false" - environment_variable_value = strip(get(ENV, environment_variable_name, default_value)) - b = parse(Bool, environment_variable_value)::Bool - return b -end -const _env_include_thread_unsafe = _getenv_include_thread_unsafe() -function include_thread_unsafe_tests() - if Threads.maxthreadid() > 1 - if _env_include_thread_unsafe - return true - end - msg = "Skipping a thread-unsafe test because `Threads.maxthreadid() > 1`" - @warn msg Threads.maxthreadid() - Test.@test_broken false - return false - end - return true -end - # Distributed GC tests for Futures function test_futures_dgc(id) f = remotecall(myid, id) @@ -227,29 +208,44 @@ put!(f, :OK) @test remotecall_fetch(k->haskey(Distributed.PGRP.refs, k), wid1, fid) == false @test fetch(f) === :OK -# RemoteException should be thrown on a put! when another process has set the value -f = Future(wid1) -fid = remoteref_id(f) +# Test this multiple times as races have been seen where `@spawn` was used over +# `@async`. Issue #124 +max_attempts = 100 +for i in 1:max_attempts + let f = Future(wid1), fid = remoteref_id(f), fstore = RemoteChannel(wid2) + # RemoteException should be thrown on a put! when another process has set the value -fstore = RemoteChannel(wid2) -put!(fstore, f) # send f to wid2 -put!(f, :OK) # set value from master + put!(fstore, f) # send f to wid2 + put!(f, :OK) # set value from master -@test remotecall_fetch(k->haskey(Distributed.PGRP.refs, k), wid1, fid) == true + @test remotecall_fetch(k->haskey(Distributed.PGRP.refs, k), wid1, fid) == true -testval = remotecall_fetch(wid2, fstore) do x - try - put!(fetch(x), :OK) - return 0 - catch e - if isa(e, RemoteException) - return 1 - else - return 2 + # fstore should be ready immediately, but races due to use of `@spawn` have caused + # this to fail in the past. So we poll for readiness before the main test after this + # which internally checks for `isready` to decide whether to error or not + w = remotecall_fetch(wid2, fstore) do x + timedwait(() -> isready(fetch(x)), 10) + end + w == :ok || @info "isready timed out on attempt $i (max $max_attempts)" + @test w == :ok + + # This is the actual test. It should fail because the value is already set remotely + testval = remotecall_fetch(wid2, fstore) do x + try + put!(fetch(x), :OK) + return 0 + catch e + if isa(e, RemoteException) + return 1 + else + rethrow() + end + end end + testval == 1 || @info "test failed on attempt $i (max $max_attempts)" + @test testval == 1 end end -@test testval == 1 # Issue number #25847 @everywhere function f25847(ref) @@ -294,16 +290,16 @@ let wid1 = workers()[1], fstore = RemoteChannel(wid2) put!(fstore, rr) - if include_thread_unsafe_tests() - # timedwait() is necessary because wid1 is asynchronously informed of - # the existence of rr/rrid through the call to `put!(fstore, rr)`. - @test timedwait(() -> remotecall_fetch(k -> haskey(Distributed.PGRP.refs, k), wid1, rrid), 10) === :ok - end + + # timedwait() is necessary because wid1 is asynchronously informed of + # the existence of rr/rrid through the call to `put!(fstore, rr)`. + @test timedwait(() -> remotecall_fetch(k -> haskey(Distributed.PGRP.refs, k), wid1, rrid), 10) === :ok + finalize(rr) # finalize locally yield() # flush gc msgs - if include_thread_unsafe_tests() - @test remotecall_fetch(k -> haskey(Distributed.PGRP.refs, k), wid1, rrid) == true - end + + @test timedwait(() -> remotecall_fetch(k -> haskey(Distributed.PGRP.refs, k), wid1, rrid), 10) === :ok + remotecall_fetch(r -> (finalize(take!(r)); yield(); nothing), wid2, fstore) # finalize remotely sleep(0.5) # to ensure that wid2 messages have been executed on wid1 @test poll_while(() -> remotecall_fetch(k -> haskey(Distributed.PGRP.refs, k), wid1, rrid)) @@ -1755,18 +1751,17 @@ function reuseport_tests() end # Ensure that the code has indeed been successfully executed everywhere - @test all(in(results), procs()) + return all(in(results), procs()) end # Test that the client port is reused. SO_REUSEPORT may not be supported on # all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX @assert nprocs() == 1 addprocs_with_testenv(4; lazy=false) -if ccall(:jl_has_so_reuseport, Int32, ()) == 1 - reuseport_tests() -else - @info "SO_REUSEPORT is unsupported, skipping reuseport tests" -end + +skip_reuseexport = ccall(:jl_has_so_reuseport, Int32, ()) != 1 +skip_reuseexport && @debug "SO_REUSEPORT support missing, reuseport_tests skipped" +@test reuseport_tests() skip = skip_reuseexport # issue #27933 a27933 = :_not_defined_27933 @@ -1840,9 +1835,11 @@ let julia = `$(Base.julia_cmd()) --startup-file=no`; mktempdir() do tmp project = mkdir(joinpath(tmp, "project")) depots = [mkdir(joinpath(tmp, "depot1")), mkdir(joinpath(tmp, "depot2"))] load_path = [mkdir(joinpath(tmp, "load_path")), "@stdlib", "@"] - pathsep = Sys.iswindows() ? ";" : ":" + shipped_depots = DEPOT_PATH[2:end] # stdlib caches + env = Dict( - "JULIA_DEPOT_PATH" => join(depots, pathsep), + # needs a trailing pathsep to access the stdlib depot + "JULIA_DEPOT_PATH" => join(depots, pathsep) * pathsep, "JULIA_LOAD_PATH" => join(load_path, pathsep), # Explicitly propagate `TMPDIR`, in the event that we're running on a # CI system where `TMPDIR` is special. @@ -1872,7 +1869,7 @@ let julia = `$(Base.julia_cmd()) --startup-file=no`; mktempdir() do tmp end """ cmd = setenv(`$(julia) -p1 -e $(testcode * extracode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # --project extracode = """ for w in workers() @@ -1881,11 +1878,11 @@ let julia = `$(Base.julia_cmd()) --startup-file=no`; mktempdir() do tmp end """ cmd = setenv(`$(julia) --project=$(project) -p1 -e $(testcode * extracode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # JULIA_PROJECT cmd = setenv(`$(julia) -p1 -e $(testcode * extracode)`, (env["JULIA_PROJECT"] = project; env)) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # Pkg.activate(...) activateish = """ Base.ACTIVE_PROJECT[] = $(repr(project)) @@ -1893,11 +1890,17 @@ let julia = `$(Base.julia_cmd()) --startup-file=no`; mktempdir() do tmp addprocs(1) """ cmd = setenv(`$(julia) -e $(activateish * testcode * extracode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # JULIA_(LOAD|DEPOT)_PATH shufflecode = """ - d = reverse(DEPOT_PATH) - append!(empty!(DEPOT_PATH), d) + function reverse_first_two(depots) + custom_depots = depots[1:2] + standard_depots = depots[3:end] + custom_depots = reverse(custom_depots) + return append!(custom_depots, standard_depots) + end + new_depots = reverse_first_two(DEPOT_PATH) + append!(empty!(DEPOT_PATH), new_depots) l = reverse(LOAD_PATH) append!(empty!(LOAD_PATH), l) """ @@ -1908,27 +1911,27 @@ let julia = `$(Base.julia_cmd()) --startup-file=no`; mktempdir() do tmp extracode = """ for w in workers() @test remotecall_fetch(load_path, w) == $(repr(reverse(load_path))) - @test remotecall_fetch(depot_path, w) == $(repr(reverse(depots))) + @test remotecall_fetch(depot_path, w) == $(repr(vcat(reverse(depots), shipped_depots))) end """ cmd = setenv(`$(julia) -e $(shufflecode * addcode * testcode * extracode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # Mismatch when shuffling after proc addition failcode = shufflecode * setupcode * """ for w in workers() @test remotecall_fetch(load_path, w) == reverse(LOAD_PATH) == $(repr(load_path)) - @test remotecall_fetch(depot_path, w) == reverse(DEPOT_PATH) == $(repr(depots)) + @test remotecall_fetch(depot_path, w) == reverse_first_two(DEPOT_PATH) == $(repr(vcat(depots, shipped_depots))) end """ cmd = setenv(`$(julia) -p1 -e $(failcode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # Passing env or exeflags to addprocs(...) to override defaults envcode = """ using Distributed project = mktempdir() env = Dict( "JULIA_LOAD_PATH" => string(LOAD_PATH[1], $(repr(pathsep)), "@stdlib"), - "JULIA_DEPOT_PATH" => DEPOT_PATH[1], + "JULIA_DEPOT_PATH" => DEPOT_PATH[1] * $(repr(pathsep)), "TMPDIR" => ENV["TMPDIR"], ) addprocs(1; env = env, exeflags = `--project=\$(project)`) @@ -1936,14 +1939,14 @@ let julia = `$(Base.julia_cmd()) --startup-file=no`; mktempdir() do tmp addprocs(1; env = env) """ * setupcode * """ for w in workers() - @test remotecall_fetch(depot_path, w) == [DEPOT_PATH[1]] + @test remotecall_fetch(depot_path, w) == vcat(DEPOT_PATH[1], $(repr(shipped_depots))) @test remotecall_fetch(load_path, w) == [LOAD_PATH[1], "@stdlib"] @test remotecall_fetch(active_project, w) == project @test remotecall_fetch(Base.active_project, w) == joinpath(project, "Project.toml") end """ cmd = setenv(`$(julia) -e $(envcode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) end end include("splitrange.jl") @@ -1959,7 +1962,7 @@ begin # Next, ensure we get a log message when a worker does not cleanly exit w = only(addprocs(1)) - @test_logs (:warn, r"sending SIGQUIT") begin + @test_logs (:warn, r"Sending SIGQUIT") match_mode=:any begin remote_do(w) do # Cause the 'exit()' message that `rmprocs()` sends to do nothing Core.eval(Base, :(exit() = nothing)) diff --git a/test/runtests.jl b/test/runtests.jl index d34d07c..085bd18 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,14 +1,14 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license +using Test + # Run the distributed test outside of the main driver since it needs its own # set of dedicated workers. include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "testenv.jl")) disttestfile = joinpath(@__DIR__, "distributed_exec.jl") -cmd = `$test_exename $test_exeflags $disttestfile` - -if !success(pipeline(cmd; stdout=stdout, stderr=stderr)) && ccall(:jl_running_on_valgrind,Cint,()) == 0 - error("Distributed test failed, cmd : $cmd") +@testset let cmd = `$test_exename $test_exeflags $disttestfile` + @test success(pipeline(cmd; stdout=stdout, stderr=stderr)) && ccall(:jl_running_on_valgrind,Cint,()) == 0 end include("managers.jl") diff --git a/test/threads.jl b/test/threads.jl index 385c91f..c6d8d92 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -1,5 +1,5 @@ using Test -using Distributed, Base.Threads +using Distributed using Base.Iterators: product exeflags = ("--startup-file=no", @@ -12,7 +12,7 @@ function call_on(f, wid, tid) t = Task(f) ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid - 1) schedule(t) - @assert threadid(t) == tid + @assert Threads.threadid(t) == tid t end end @@ -27,12 +27,12 @@ isfailed(rr) = fetch_from_owner(istaskfailed, rr) @testset "RemoteChannel allows put!/take! from thread other than 1" begin ws = ts = product(1:2, 1:2) + + # We want (the default) laziness, so that we wait for `Worker.c_state`! + procs_added = addprocs(2; exeflags, lazy=true) + @testset "from worker $w1 to $w2 via 1" for (w1, w2) in ws @testset "from thread $w1.$t1 to $w2.$t2" for (t1, t2) in ts - # We want (the default) laziness, so that we wait for `Worker.c_state`! - procs_added = addprocs(2; exeflags, lazy=true) - @everywhere procs_added using Base.Threads - p1 = procs_added[w1] p2 = procs_added[w2] chan_id = first(procs_added) @@ -57,8 +57,8 @@ isfailed(rr) = fetch_from_owner(istaskfailed, rr) @test !isfailed(send) @test !isfailed(recv) - - rmprocs(procs_added) end end + + rmprocs(procs_added) end