Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document the behavior of async_stream_nolink #13285

Merged
merged 3 commits into from
Jan 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion lib/elixir/lib/task/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ defmodule Task.Supervisor do
build_stream(supervisor, :nolink, enumerable, {module, function, args}, options)
end

@doc """
@doc ~S"""
Returns a stream that runs the given `function` concurrently on each
element in `enumerable`.

Expand All @@ -414,6 +414,38 @@ defmodule Task.Supervisor do
to `async_nolink/3`.

See `async_stream/6` for discussion and examples.

## Error handling and cleanup

Even if tasks are not linked to the caller, there is no risk of leaving dangling tasks
running after the stream halts.

Consider the following example:

Task.Supervisor.async_stream_nolink(MySupervisor, collection, fun, on_timeout: :kill_task, ordered: false)
|> Enum.each(fn
{:ok, _} -> :ok
{:exit, reason} -> raise "Task exited: #{Exception.format_exit(reason)}"
end)

If one task raises or times out:

1. the second clause gets called
2. an exception is raised
3. the stream halts
4. all ongoing tasks will be shut down

Here is another example:

Task.Supervisor.async_stream_nolink(MySupervisor, collection, fun, on_timeout: :kill_task, ordered: false)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.take(3)

This will return the three first tasks to succeed, ignoring timeouts and errors, and shut down
every ongoing task.

Just running the stream with `Stream.run/1` on the other hand would ignore errors and process the whole stream.

"""
@doc since: "1.4.0"
@spec async_stream_nolink(Supervisor.supervisor(), Enumerable.t(), (term -> term), keyword) ::
Expand Down
Loading