Skip to content

Commit

Permalink
Ensure the compiler is notified immediately after async
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim committed Jan 18, 2024
1 parent 9a2fb13 commit 0da0b28
Showing 1 changed file with 31 additions and 30 deletions.
61 changes: 31 additions & 30 deletions lib/elixir/lib/kernel/parallel_compiler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,36 @@ defmodule Kernel.ParallelCompiler do
# TODO: Deprecate this on Elixir v1.20.
@doc deprecated: "Use `pmap/2` instead"
def async(fun) when is_function(fun, 0) do
{ref, task} = inner_async(fun)
send(task.pid, ref)
task
end

defp inner_async(fun) do
case :erlang.get(:elixir_compiler_info) do
{compiler_pid, file_pid} ->
ref = make_ref()
file = :erlang.get(:elixir_compiler_file)
dest = :erlang.get(:elixir_compiler_dest)

{:error_handler, error_handler} = :erlang.process_info(self(), :error_handler)
{_parent, checker} = Module.ParallelChecker.get()

Task.async(fn ->
send(compiler_pid, {:async, self()})
Module.ParallelChecker.put(compiler_pid, checker)
:erlang.put(:elixir_compiler_info, {compiler_pid, file_pid})
:erlang.put(:elixir_compiler_file, file)
dest != :undefined and :erlang.put(:elixir_compiler_dest, dest)
:erlang.process_flag(:error_handler, error_handler)
fun.()
end)
task =
Task.async(fn ->
Module.ParallelChecker.put(compiler_pid, checker)
:erlang.put(:elixir_compiler_info, {compiler_pid, file_pid})
:erlang.put(:elixir_compiler_file, file)
dest != :undefined and :erlang.put(:elixir_compiler_dest, dest)
:erlang.process_flag(:error_handler, error_handler)

receive do
^ref -> fun.()
end
end)

send(compiler_pid, {:async, task.pid})
{ref, task}

:undefined ->
raise ArgumentError,
Expand All @@ -52,41 +65,29 @@ defmodule Kernel.ParallelCompiler do
"""
@doc since: "1.16.0"
def pmap(collection, fun) when is_function(fun, 1) do
parent = self()
ref = make_ref()

# We spawn a series of tasks for parallel processing.
# The tasks notify themselves to the compiler.
tasks =
# The tasks are waiting until we give the go ahead.
refs_tasks =
Enum.map(collection, fn item ->
async(fn ->
send(parent, {ref, self()})

receive do
^ref -> fun.(item)
end
end)
inner_async(fn -> fun.(item) end)
end)

# Then the tasks notify us. This is important because if
# we wait before the tasks notify the compiler, we may be
# released as there is nothing else running.
on =
for %{pid: pid} <- tasks do
receive do
{^ref, ^pid} -> pid
end
end

# Notify the compiler we are waiting on the tasks.
{compiler_pid, file_pid} = :erlang.get(:elixir_compiler_info)
defining = :elixir_module.compiler_modules()
on = Enum.map(refs_tasks, fn {_ref, %{pid: pid}} -> pid end)
send(compiler_pid, {:waiting, :pmap, self(), ref, file_pid, on, defining, :raise})

# Now we allow the tasks to run. This step is not strictly
# necessary but it makes compilation more deterministic by
# only allowing tasks to run once we are waiting.
Enum.each(on, &send(&1, ref))
tasks =
Enum.map(refs_tasks, fn {ref, task} ->
send(task.pid, ref)
task
end)

# Await tasks and notify the compiler they are done. We could
# have the tasks report directly to the compiler, which in turn
Expand Down

0 comments on commit 0da0b28

Please sign in to comment.