diff --git a/lib/consumer.ex b/lib/consumer.ex index ffb2101d..93b0213e 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -312,22 +312,43 @@ defmodule Mediasoup.Consumer do NifWrap.def_handle_call_nif(%{ closed?: &Nif.consumer_closed/1, - dump: &Nif.consumer_dump/1, paused?: &Nif.consumer_paused/1, producer_paused?: &Nif.consumer_producer_paused/1, priority: &Nif.consumer_priority/1, score: &Nif.consumer_score/1, preferred_layers: &Nif.consumer_preferred_layers/1, - current_layers: &Nif.consumer_current_layers/1, - get_stats: &Nif.consumer_get_stats/1, - pause: &Nif.consumer_pause/1, - resume: &Nif.consumer_resume/1, - set_preferred_layers: &Nif.consumer_set_preferred_layers/2, - set_priority: &Nif.consumer_set_priority/2, - unset_priority: &Nif.consumer_unset_priority/1, - request_key_frame: &Nif.consumer_request_key_frame/1 + current_layers: &Nif.consumer_current_layers/1 }) + NifWrap.def_handle_call_async_nif(%{ + set_preferred_layers: &Nif.consumer_set_preferred_layers_async/3, + set_priority: &Nif.consumer_set_priority_async/3, + request_key_frame: &Nif.consumer_request_key_frame_async/2, + unset_priority: &Nif.consumer_unset_priority_async/2, + dump: &Nif.consumer_dump_async/2, + get_stats: &Nif.consumer_get_stats_async/2, + pause: &Nif.consumer_pause_async/2, + resume: &Nif.consumer_resume_async/2 + }) + + @impl true + def handle_info( + {:mediasoup_async_nif_result, {func, from}, result}, + state + ) do + reply = + case func do + func when func in [:set_priority, :dump, :get_stats] -> + Nif.unwrap_ok(result) + + _ -> + result + end + + GenServer.reply(from, reply) + {:noreply, state} + end + @impl true def handle_info({:on_close}, state) do {:stop, :normal, state} diff --git a/lib/nif.ex b/lib/nif.ex index 2099b0b3..261a222b 100644 --- a/lib/nif.ex +++ b/lib/nif.ex @@ -52,68 +52,6 @@ defmodule Mediasoup.Nif do ## worker with async defp create_worker_async(), do: :erlang.nif_error(:nif_not_loaded) defp create_worker_async(_option), do: :erlang.nif_error(:nif_not_loaded) - defp worker_create_router_async(_worker, _option), do: :erlang.nif_error(:nif_not_loaded) - defp worker_dump_async(_worker), do: :erlang.nif_error(:nif_not_loaded) - defp worker_update_settings_async(_worker, _option), do: :erlang.nif_error(:nif_not_loaded) - - defp worker_create_webrtc_server_async(_worker, _option), - do: :erlang.nif_error(:nif_not_loaded) - - ## router with async - defp router_create_pipe_transport_async( - _reference, - _option - ), - do: :erlang.nif_error(:nif_not_loaded) - - defp router_create_webrtc_transport_async(_router, _option), - do: :erlang.nif_error(:nif_not_loaded) - - defp router_create_plain_transport_async(_router, _option), - do: :erlang.nif_error(:nif_not_loaded) - - defp router_dump_async(_router), do: :erlang.nif_error(:nif_not_loaded) - - ## webrtc_server with async - defp webrtc_server_dump_async(_transport), do: :erlang.nif_error(:nif_not_loaded) - - ## pipe_transport with async - defp pipe_transport_get_stats_async(_transport), do: :erlang.nif_error(:nif_not_loaded) - defp pipe_transport_dump_async(_transport), do: :erlang.nif_error(:nif_not_loaded) - defp pipe_transport_consume_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded) - defp pipe_transport_connect_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded) - defp pipe_transport_produce_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded) - - defp pipe_transport_set_max_incoming_bitrate_async(_transport, _bitrate), - do: :erlang.nif_error(:nif_not_loaded) - - defp pipe_transport_consume_data_async(_transport, _option), - do: :erlang.nif_error(:nif_not_loaded) - - defp pipe_transport_produce_data_async(_transport, _option), - do: :erlang.nif_error(:nif_not_loaded) - - ## webrtc_transport with async - defp webrtc_transport_get_stats_async(_transport), do: :erlang.nif_error(:nif_not_loaded) - defp webrtc_transport_dump_async(_transport), do: :erlang.nif_error(:nif_not_loaded) - defp webrtc_transport_restart_ice_async(_transport), do: :erlang.nif_error(:nif_not_loaded) - - defp webrtc_transport_set_max_incoming_bitrate_async(_transport, _bitrate), - do: :erlang.nif_error(:nif_not_loaded) - - defp webrtc_transport_set_max_outgoing_bitrate_async(_transport, _bitrate), - do: :erlang.nif_error(:nif_not_loaded) - - defp webrtc_transport_consume_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded) - - defp webrtc_transport_consume_data_async(_transport, _option), - do: :erlang.nif_error(:nif_not_loaded) - - defp webrtc_transport_connect_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded) - defp webrtc_transport_produce_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded) - - defp webrtc_transport_produce_data_async(_transport, _option), - do: :erlang.nif_error(:nif_not_loaded) # plain transport ## properties @@ -123,35 +61,23 @@ defmodule Mediasoup.Nif do def plain_transport_sctp_state(_transport), do: :erlang.nif_error(:nif_not_loaded) ## methods - ### plain transport with async - defp plain_transport_connect_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded) - defp plain_transport_dump_async(_transport), do: :erlang.nif_error(:nif_not_loaded) - defp plain_transport_get_stats_async(_transport), do: :erlang.nif_error(:nif_not_loaded) - defp plain_transport_produce_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded) - defp plain_transport_consume_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded) ### plain tranasport call - @spec plain_transport_connect(reference, any) :: {:ok} | {:error, String.t()} - def plain_transport_connect(transport, option), - do: plain_transport_connect_async(transport, option) |> handle_async_nif_result() + def plain_transport_connect_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) @spec plain_transport_id(reference) :: String.t() def plain_transport_id(_transport), do: :erlang.nif_error(:nif_not_loaded) - @spec plain_transport_dump(reference) :: {:ok} | {:error, String.t()} - def plain_transport_dump(transport), - do: plain_transport_dump_async(transport) |> handle_async_nif_result() |> unwrap_ok() - @spec plain_transport_get_stats(reference) :: {:ok} | {:error, String.t()} - def plain_transport_get_stats(transport), - do: plain_transport_get_stats_async(transport) |> handle_async_nif_result() |> unwrap_ok() + def plain_transport_dump_async(_transport, _from), do: :erlang.nif_error(:nif_not_loaded) - @spec plain_transport_produce(reference, any) :: {:ok, reference()} | {:error, String.t()} - def plain_transport_produce(transport, option), - do: plain_transport_produce_async(transport, option) |> handle_async_nif_result() + def plain_transport_get_stats_async(_transport, _from), do: :erlang.nif_error(:nif_not_loaded) + + def plain_transport_produce_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - @spec plain_transport_consume(reference, any) :: {:ok, reference()} | {:error, String.t()} - def plain_transport_consume(transport, option), - do: plain_transport_consume_async(transport, option) |> handle_async_nif_result() + def plain_transport_consume_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) @spec plain_transport_close(reference) :: {:ok} | {:error} def plain_transport_close(_transport), do: :erlang.nif_error(:nif_not_loaded) @@ -164,25 +90,6 @@ defmodule Mediasoup.Nif do def plain_transport_event(_transport, _pid, _event_types), do: :erlang.nif_error(:nif_not_loaded) - ## consumer with async - defp consumer_get_stats_async(_consumer), do: :erlang.nif_error(:nif_not_loaded) - defp consumer_pause_async(_consumer), do: :erlang.nif_error(:nif_not_loaded) - defp consumer_resume_async(_consumer), do: :erlang.nif_error(:nif_not_loaded) - - defp consumer_set_preferred_layers_async(_consumer, _referred_layers), - do: :erlang.nif_error(:nif_not_loaded) - - defp consumer_set_priority_async(_consumer, _priority), do: :erlang.nif_error(:nif_not_loaded) - defp consumer_unset_priority_async(_consumer), do: :erlang.nif_error(:nif_not_loaded) - defp consumer_request_key_frame_async(_consumer), do: :erlang.nif_error(:nif_not_loaded) - defp consumer_dump_async(_consumer), do: :erlang.nif_error(:nif_not_loaded) - - ## producer with async - defp producer_pause_async(_producer), do: :erlang.nif_error(:nif_not_loaded) - defp producer_resume_async(_producer), do: :erlang.nif_error(:nif_not_loaded) - defp producer_get_stats_async(_producer), do: :erlang.nif_error(:nif_not_loaded) - defp producer_dump_async(_producer), do: :erlang.nif_error(:nif_not_loaded) - # construct worker def create_worker(), do: create_worker_async() |> handle_async_nif_result() def create_worker(option), do: create_worker_async(option) |> handle_async_nif_result() @@ -191,9 +98,10 @@ defmodule Mediasoup.Nif do def worker_global_count(), do: :erlang.nif_error(:nif_not_loaded) # worker - @spec worker_create_router(reference, Router.create_option()) :: {:ok, reference()} | {:error} - def worker_create_router(worker, option), - do: worker_create_router_async(worker, option) |> handle_async_nif_result() + @spec worker_create_router_async(reference, Router.create_option(), term()) :: + :ok | :error + def worker_create_router_async(_worker, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) @spec worker_id(reference) :: String.t() def worker_id(_worker), do: :erlang.nif_error(:nif_not_loaded) @@ -203,16 +111,18 @@ defmodule Mediasoup.Nif do def worker_event(_worker, _pid, _event_types), do: :erlang.nif_error(:nif_not_loaded) @spec worker_closed(reference) :: boolean def worker_closed(_worker), do: :erlang.nif_error(:nif_not_loaded) - @spec worker_update_settings(reference, Worker.update_option()) :: {:ok} | {:error} - def worker_update_settings(worker, option), - do: worker_update_settings_async(worker, option) |> handle_async_nif_result() - def worker_create_webrtc_server(worker, option), - do: worker_create_webrtc_server_async(worker, option) |> handle_async_nif_result() + @spec worker_update_settings_async(reference, Worker.update_option(), GenServer.from()) :: + {:ok} | {:error} + def worker_update_settings_async(_worker, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - @spec worker_dump(reference) :: map | {:error} - def worker_dump(worker), - do: worker_dump_async(worker) |> handle_async_nif_result() |> unwrap_ok() + def worker_create_webrtc_server_async(_worker, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) + + @spec worker_dump_async(reference, term()) :: map | {:error} + def worker_dump_async(_worker, _from), + do: :erlang.nif_error(:nif_not_loaded) # router @spec router_id(reference) :: String.t() @@ -222,21 +132,18 @@ defmodule Mediasoup.Nif do def router_closed(_router), do: :erlang.nif_error(:nif_not_loaded) - def router_create_pipe_transport( - router, - option + def router_create_pipe_transport_async( + _reference, + _option, + _from ), - do: router_create_pipe_transport_async(router, option) |> handle_async_nif_result() + do: :erlang.nif_error(:nif_not_loaded) - @spec router_create_webrtc_transport(reference, map) :: - {:ok, reference()} | {:error, String.t()} - def router_create_webrtc_transport(router, option), - do: router_create_webrtc_transport_async(router, option) |> handle_async_nif_result() + def router_create_webrtc_transport_async(_router, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - @spec router_create_plain_transport(reference, map) :: - {:ok, reference()} | {:error, String.t()} - def router_create_plain_transport(router, option), - do: router_create_plain_transport_async(router, option) |> handle_async_nif_result() + def router_create_plain_transport_async(_router, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) @spec router_can_consume(reference, String.t(), Router.rtpCapabilities()) :: boolean def router_can_consume(_router, _producer_id, _rtp_capabilities), @@ -247,9 +154,7 @@ defmodule Mediasoup.Nif do @spec router_event(reference, pid, [atom()]) :: {:ok} | {:error} def router_event(_router, _pid, _event_types), do: :erlang.nif_error(:nif_not_loaded) - @spec router_dump(reference) :: any - def router_dump(router), - do: router_dump_async(router) |> handle_async_nif_result() |> unwrap_ok() + def router_dump_async(_router, _from), do: :erlang.nif_error(:nif_not_loaded) # webrtc_server @spec webrtc_server_id(reference) :: String.t() @@ -258,9 +163,8 @@ defmodule Mediasoup.Nif do def webrtc_server_close(_server), do: :erlang.nif_error(:nif_not_loaded) @spec webrtc_server_closed(reference) :: boolean def webrtc_server_closed(_server), do: :erlang.nif_error(:nif_not_loaded) - @spec webrtc_server_dump(reference) :: boolean - def webrtc_server_dump(server), - do: webrtc_server_dump_async(server) |> handle_async_nif_result() |> unwrap_ok() + + def webrtc_server_dump_async(_server, _from), do: :erlang.nif_error(:nif_not_loaded) # webrtc_transport @spec webrtc_transport_id(reference) :: String.t() @@ -270,60 +174,58 @@ defmodule Mediasoup.Nif do @spec webrtc_transport_closed(reference) :: boolean def webrtc_transport_closed(_transport), do: :erlang.nif_error(:nif_not_loaded) - @spec webrtc_transport_consume(reference, any) :: {:ok, reference()} | {:error, String.t()} - def webrtc_transport_consume(transport, option), - do: webrtc_transport_consume_async(transport, option) |> handle_async_nif_result() + @spec webrtc_transport_consume_async(reference, any, term) :: + {:ok, reference()} | {:error, String.t()} + def webrtc_transport_consume_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - @spec webrtc_transport_consume_data(reference, any) :: {:ok, reference()} | {:error, String.t()} - def webrtc_transport_consume_data(transport, option), - do: webrtc_transport_consume_data_async(transport, option) |> handle_async_nif_result() + @spec webrtc_transport_consume_data_async(reference, any, term) :: + {:ok, reference()} | {:error, String.t()} + def webrtc_transport_consume_data_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - @spec webrtc_transport_connect(reference, any) :: {:ok} | {:error, String.t()} - def webrtc_transport_connect(transport, option), - do: webrtc_transport_connect_async(transport, option) |> handle_async_nif_result() + @spec webrtc_transport_connect_async(reference, any, term) :: {:ok} | {:error, String.t()} + def webrtc_transport_connect_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - @spec webrtc_transport_produce(reference, any) :: {:ok, reference()} | {:error, String.t()} - def webrtc_transport_produce(transport, option), - do: webrtc_transport_produce_async(transport, option) |> handle_async_nif_result() + @spec webrtc_transport_produce_async(reference, any, term) :: + {:ok, reference()} | {:error, String.t()} + def webrtc_transport_produce_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - @spec webrtc_transport_produce_data(reference, any) :: {:ok, reference()} | {:error, String.t()} - def webrtc_transport_produce_data(transport, option), - do: webrtc_transport_produce_data_async(transport, option) |> handle_async_nif_result() + @spec webrtc_transport_produce_data_async(reference, any, term) :: + {:ok, reference()} | {:error, String.t()} + def webrtc_transport_produce_data_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) def webrtc_transport_ice_parameters(_transport), do: :erlang.nif_error(:nif_not_loaded) def webrtc_transport_ice_candidates(_transport), do: :erlang.nif_error(:nif_not_loaded) def webrtc_transport_ice_role(_transport), do: :erlang.nif_error(:nif_not_loaded) def webrtc_transport_sctp_parameters(_transport), do: :erlang.nif_error(:nif_not_loaded) - def webrtc_transport_set_max_incoming_bitrate(transport, bitrate), - do: - webrtc_transport_set_max_incoming_bitrate_async(transport, bitrate) - |> handle_async_nif_result() + def webrtc_transport_set_max_incoming_bitrate_async(_transport, _bitrate, _from), + do: :erlang.nif_error(:nif_not_loaded) - def webrtc_transport_set_max_outgoing_bitrate(transport, bitrate), - do: - webrtc_transport_set_max_outgoing_bitrate_async(transport, bitrate) - |> handle_async_nif_result() + def webrtc_transport_set_max_outgoing_bitrate_async(_transport, _bitrate, _from), + do: :erlang.nif_error(:nif_not_loaded) def webrtc_transport_ice_state(_transport), do: :erlang.nif_error(:nif_not_loaded) - def webrtc_transport_restart_ice(transport), - do: webrtc_transport_restart_ice_async(transport) |> handle_async_nif_result() + def webrtc_transport_restart_ice_async(_transport, _from), + do: :erlang.nif_error(:nif_not_loaded) def webrtc_transport_ice_selected_tuple(_transport), do: :erlang.nif_error(:nif_not_loaded) def webrtc_transport_dtls_parameters(_transport), do: :erlang.nif_error(:nif_not_loaded) def webrtc_transport_dtls_state(_transport), do: :erlang.nif_error(:nif_not_loaded) def webrtc_transport_sctp_state(_transport), do: :erlang.nif_error(:nif_not_loaded) - def webrtc_transport_get_stats(transport), - do: webrtc_transport_get_stats_async(transport) |> handle_async_nif_result() |> unwrap_ok() + def webrtc_transport_get_stats_async(_transport, _from), do: :erlang.nif_error(:nif_not_loaded) @spec webrtc_transport_event(reference, pid, [atom()]) :: {:ok} | {:error} def webrtc_transport_event(_transport, _pid, _event_types), do: :erlang.nif_error(:nif_not_loaded) - def webrtc_transport_dump(transport), - do: webrtc_transport_dump_async(transport) |> handle_async_nif_result() |> unwrap_ok() + def webrtc_transport_dump_async(_transport, _from), do: :erlang.nif_error(:nif_not_loaded) # pipe_transport def pipe_transport_id(_transport), do: :erlang.nif_error(:nif_not_loaded) @@ -331,36 +233,32 @@ defmodule Mediasoup.Nif do @spec pipe_transport_closed(reference) :: boolean def pipe_transport_closed(_transport), do: :erlang.nif_error(:nif_not_loaded) - def pipe_transport_consume(transport, option), - do: pipe_transport_consume_async(transport, option) |> handle_async_nif_result() + def pipe_transport_consume_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - def pipe_transport_consume_data(transport, option), - do: pipe_transport_consume_data_async(transport, option) |> handle_async_nif_result() + def pipe_transport_consume_data_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - def pipe_transport_connect(transport, option), - do: pipe_transport_connect_async(transport, option) |> handle_async_nif_result() + def pipe_transport_connect_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - def pipe_transport_produce(transport, option), - do: pipe_transport_produce_async(transport, option) |> handle_async_nif_result() + def pipe_transport_produce_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - def pipe_transport_set_max_incoming_bitrate(transport, bitrate), - do: - pipe_transport_set_max_incoming_bitrate_async(transport, bitrate) - |> handle_async_nif_result() + def pipe_transport_set_max_incoming_bitrate_async(_transport, _bitrate, _from), + do: :erlang.nif_error(:nif_not_loaded) - def pipe_transport_produce_data(transport, option), - do: pipe_transport_produce_data_async(transport, option) |> handle_async_nif_result() + def pipe_transport_produce_data_async(_transport, _option, _from), + do: :erlang.nif_error(:nif_not_loaded) - def pipe_transport_get_stats(transport), - do: pipe_transport_get_stats_async(transport) |> handle_async_nif_result() |> unwrap_ok() + def pipe_transport_get_stats_async(_transport, _from), do: :erlang.nif_error(:nif_not_loaded) def pipe_transport_tuple(_transport), do: :erlang.nif_error(:nif_not_loaded) def pipe_transport_sctp_parameters(_transport), do: :erlang.nif_error(:nif_not_loaded) def pipe_transport_sctp_state(_transport), do: :erlang.nif_error(:nif_not_loaded) def pipe_transport_srtp_parameters(_transport), do: :erlang.nif_error(:nif_not_loaded) - def pipe_transport_dump(transport), - do: pipe_transport_dump_async(transport) |> handle_async_nif_result() |> unwrap_ok() + def pipe_transport_dump_async(_transport, _from), do: :erlang.nif_error(:nif_not_loaded) def pipe_transport_event(_transport, _pid, _event_types), do: :erlang.nif_error(:nif_not_loaded) @@ -392,27 +290,22 @@ defmodule Mediasoup.Nif do def consumer_preferred_layers(_consumer), do: :erlang.nif_error(:nif_not_loaded) def consumer_current_layers(_consumer), do: :erlang.nif_error(:nif_not_loaded) - def consumer_get_stats(consumer), - do: consumer_get_stats_async(consumer) |> handle_async_nif_result() |> unwrap_ok() + def consumer_get_stats_async(_consumer, _from), do: :erlang.nif_error(:nif_not_loaded) - def consumer_pause(consumer), do: consumer_pause_async(consumer) |> handle_async_nif_result() - def consumer_resume(consumer), do: consumer_resume_async(consumer) |> handle_async_nif_result() + def consumer_pause_async(_consumer, _from), do: :erlang.nif_error(:nif_not_loaded) + def consumer_resume_async(_consumer, _from), do: :erlang.nif_error(:nif_not_loaded) - def consumer_set_preferred_layers(consumer, referred_layers), - do: - consumer_set_preferred_layers_async(consumer, referred_layers) |> handle_async_nif_result() + def consumer_set_preferred_layers_async(_consumer, _preferred_layers, _from), + do: :erlang.nif_error(:nif_not_loaded) - def consumer_set_priority(consumer, priority), - do: consumer_set_priority_async(consumer, priority) |> handle_async_nif_result() + def consumer_set_priority_async(_consumer, _priority, _from), + do: :erlang.nif_error(:nif_not_loaded) - def consumer_unset_priority(consumer), - do: consumer_unset_priority_async(consumer) |> handle_async_nif_result() + def consumer_unset_priority_async(_consumer, _from), do: :erlang.nif_error(:nif_not_loaded) - def consumer_request_key_frame(consumer), - do: consumer_request_key_frame_async(consumer) |> handle_async_nif_result() + def consumer_request_key_frame_async(_consumer, _from), do: :erlang.nif_error(:nif_not_loaded) - def consumer_dump(consumer), - do: consumer_dump_async(consumer) |> handle_async_nif_result() |> unwrap_ok() + def consumer_dump_async(_consumer, _from), do: :erlang.nif_error(:nif_not_loaded) # data_consumer @spec data_consumer_id(reference) :: String.t() @@ -445,10 +338,8 @@ defmodule Mediasoup.Nif do def producer_rtp_parameters(_producer), do: :erlang.nif_error(:nif_not_loaded) @spec producer_close(reference) :: {:ok} | {:error} def producer_close(_consumer), do: :erlang.nif_error(:nif_not_loaded) - @spec producer_pause(reference) :: {:ok} | {:error} - def producer_pause(producer), do: producer_pause_async(producer) |> handle_async_nif_result() - @spec producer_resume(reference) :: {:ok} | {:error} - def producer_resume(producer), do: producer_resume_async(producer) |> handle_async_nif_result() + def producer_pause_async(_producer, _from), do: :erlang.nif_error(:nif_not_loaded) + def producer_resume_async(_producer, _from), do: :erlang.nif_error(:nif_not_loaded) @spec producer_closed(reference) :: boolean() def producer_closed(_producer), do: :erlang.nif_error(:nif_not_loaded) @@ -457,15 +348,13 @@ defmodule Mediasoup.Nif do @spec producer_score(reference) :: list() | {:error} def producer_score(_producer), do: :erlang.nif_error(:nif_not_loaded) - @spec producer_get_stats(reference) :: list() | {:error} - def producer_get_stats(producer), - do: producer_get_stats_async(producer) |> handle_async_nif_result() |> unwrap_ok() + + def producer_get_stats_async(_producer, _from), do: :erlang.nif_error(:nif_not_loaded) @spec producer_event(reference, pid, [atom()]) :: {:ok} | {:error} def producer_event(_producer, _pid, _event_types), do: :erlang.nif_error(:nif_not_loaded) - def producer_dump(producer), - do: producer_dump_async(producer) |> handle_async_nif_result() |> unwrap_ok() + def producer_dump_async(_producer, _from), do: :erlang.nif_error(:nif_not_loaded) # data_producer @spec data_producer_id(reference) :: String.t() @@ -502,6 +391,7 @@ defmodule Mediasoup.Nif do end end - defp unwrap_ok({:ok, result}), do: result - defp unwrap_ok(result), do: result + def unwrap_ok({:ok, {}}), do: {:ok} + def unwrap_ok({:ok, result}), do: result + def unwrap_ok(result), do: result end diff --git a/lib/pipe_transport.ex b/lib/pipe_transport.ex index f127d5ee..bd63b071 100644 --- a/lib/pipe_transport.ex +++ b/lib/pipe_transport.ex @@ -283,6 +283,7 @@ defmodule Mediasoup.PipeTransport do GenServer.start_link(__MODULE__, %{reference: reference}, opt) end + @impl true def init(state) do Process.flag(:trap_exit, true) {:ok, supervisor} = DynamicSupervisor.start_link(strategy: :one_for_one) @@ -290,6 +291,7 @@ defmodule Mediasoup.PipeTransport do {:ok, Map.put(state, :supervisor, supervisor)} end + @impl true def handle_call( {:event, [listener, event_types]}, _from, @@ -314,63 +316,49 @@ defmodule Mediasoup.PipeTransport do NifWrap.def_handle_call_nif(%{ close: &Nif.pipe_transport_close/1, closed?: &Nif.pipe_transport_closed/1, - dump: &Nif.pipe_transport_dump/1, - get_stats: &Nif.pipe_transport_get_stats/1, sctp_state: &Nif.pipe_transport_sctp_state/1, - connect: &Nif.pipe_transport_connect/2, tuple: &Nif.pipe_transport_tuple/1, sctp_parameters: &Nif.pipe_transport_sctp_parameters/1, srtp_parameters: &Nif.pipe_transport_srtp_parameters/1 }) - def handle_call( - {:produce, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state - ) do - ret = - Nif.pipe_transport_produce(reference, option) - |> NifWrap.handle_create_result(Producer, supervisor) - - {:reply, ret, state} - end - - def handle_call( - {:produce_data, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state - ) do - ret = - Nif.pipe_transport_produce_data(reference, option) - |> NifWrap.handle_create_result(DataProducer, supervisor) - - {:reply, ret, state} - end + NifWrap.def_handle_call_async_nif(%{ + connect: &Nif.pipe_transport_connect_async/3, + dump: &Nif.pipe_transport_dump_async/2, + get_stats: &Nif.pipe_transport_get_stats_async/2, + produce: &Nif.pipe_transport_produce_async/3, + produce_data: &Nif.pipe_transport_produce_data_async/3, + consume: &Nif.pipe_transport_consume_async/3, + consume_data: &Nif.pipe_transport_consume_data_async/3 + }) - def handle_call( - {:consume, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state - ) do - ret = - Nif.pipe_transport_consume(reference, option) - |> NifWrap.handle_create_result(Consumer, supervisor) + def handle_info( + {:mediasoup_async_nif_result, {message_tag, from}, result}, + %{supervisor: supervisor} = state + ) + when message_tag in [:produce, :consume, :produce_data, :consume_data] do + module = + case message_tag do + :produce -> Producer + :consume -> Consumer + :produce_data -> DataProducer + :consume_data -> DataConsumer + end - {:reply, ret, state} + GenServer.reply(from, NifWrap.handle_create_result(result, module, supervisor)) + {:noreply, state} end - def handle_call( - {:consume_data, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state + @impl true + def handle_info( + {:mediasoup_async_nif_result, {_, from}, result}, + state ) do - ret = - Nif.pipe_transport_consume_data(reference, option) - |> NifWrap.handle_create_result(DataConsumer, supervisor) - - {:reply, ret, state} + GenServer.reply(from, result |> Nif.unwrap_ok()) + {:noreply, state} end + @impl true def terminate(reason, %{reference: reference, supervisor: supervisor} = _state) do DynamicSupervisor.stop(supervisor, reason) Nif.pipe_transport_close(reference) diff --git a/lib/plain_transport.ex b/lib/plain_transport.ex index 2ba5ede7..309fa54e 100644 --- a/lib/plain_transport.ex +++ b/lib/plain_transport.ex @@ -209,6 +209,7 @@ defmodule Mediasoup.PlainTransport do consume(transport, Consumer.Options.from_map(option)) end + @impl true def terminate(reason, %{reference: reference, supervisor: supervisor} = _state) do DynamicSupervisor.stop(supervisor, reason) Nif.plain_transport_close(reference) @@ -223,15 +224,20 @@ defmodule Mediasoup.PlainTransport do sctp_state: &Nif.plain_transport_sctp_state/1, srtp_parameters: &Nif.plain_transport_srtp_parameters/1, # methods - connect: &Nif.plain_transport_connect/2, - dump: &Nif.plain_transport_dump/1, - get_stats: &Nif.plain_transport_get_stats/1, close: &Nif.plain_transport_close/1, closed?: &Nif.plain_transport_closed/1, # events event: &Nif.plain_transport_event/3 }) + NifWrap.def_handle_call_async_nif(%{ + connect: &Nif.plain_transport_connect_async/3, + dump: &Nif.plain_transport_dump_async/2, + get_stats: &Nif.plain_transport_get_stats_async/2, + produce: &Nif.plain_transport_produce_async/3, + consume: &Nif.plain_transport_consume_async/3 + }) + # Mediasoup Plain Transport Events # https://mediasoup.org/documentation/v3/mediasoup/api/#PlainTransport-events @@ -265,6 +271,7 @@ defmodule Mediasoup.PlainTransport do GenServer.start_link(__MODULE__, %{reference: reference}, opt) end + @impl true def init(state) do Process.flag(:trap_exit, true) {:ok, supervisor} = DynamicSupervisor.start_link(strategy: :one_for_one) @@ -292,27 +299,27 @@ defmodule Mediasoup.PlainTransport do {:reply, struct_from_pid_and_ref(self(), reference), state} end - def handle_call( - {:produce, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state - ) do - ret = - Nif.plain_transport_produce(reference, option) - |> NifWrap.handle_create_result(Producer, supervisor) - - {:reply, ret, state} + def handle_info( + {:mediasoup_async_nif_result, {message_tag, from}, result}, + %{supervisor: supervisor} = state + ) + when message_tag in [:produce, :consume] do + module = + case message_tag do + :produce -> Producer + :consume -> Consumer + end + + GenServer.reply(from, NifWrap.handle_create_result(result, module, supervisor)) + {:noreply, state} end - def handle_call( - {:consume, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state + @impl true + def handle_info( + {:mediasoup_async_nif_result, {_, from}, result}, + state ) do - ret = - Nif.plain_transport_consume(reference, option) - |> NifWrap.handle_create_result(Consumer, supervisor) - - {:reply, ret, state} + GenServer.reply(from, result |> Nif.unwrap_ok()) + {:noreply, state} end end diff --git a/lib/producer.ex b/lib/producer.ex index 87fc1b83..26394d3a 100644 --- a/lib/producer.ex +++ b/lib/producer.ex @@ -182,11 +182,13 @@ defmodule Mediasoup.Producer do GenServer.start_link(__MODULE__, %{reference: reference}, opt) end + @impl true def init(state) do Process.flag(:trap_exit, true) {:ok, state} end + @impl true def handle_call( {:event, [listener, event_types]}, _from, @@ -200,6 +202,7 @@ defmodule Mediasoup.Producer do {:reply, result, state} end + @impl true def handle_call( {:struct_from_pid, _arg}, _from, @@ -210,28 +213,52 @@ defmodule Mediasoup.Producer do NifWrap.def_handle_call_nif(%{ closed?: &Nif.producer_closed/1, - dump: &Nif.producer_dump/1, paused?: &Nif.producer_paused/1, - score: &Nif.producer_score/1, - get_stats: &Nif.producer_get_stats/1, - pause: &Nif.producer_pause/1, - resume: &Nif.producer_resume/1 + score: &Nif.producer_score/1 }) + NifWrap.def_handle_call_async_nif(%{ + pause: &Nif.producer_pause_async/2, + resume: &Nif.producer_resume_async/2, + get_stats: &Nif.producer_get_stats_async/2, + dump: &Nif.producer_dump_async/2 + }) + + @impl true + def handle_info( + {:mediasoup_async_nif_result, nil, _}, + state + ) do + {:noreply, state} + end + + @impl true + def handle_info( + {:mediasoup_async_nif_result, {_, from}, result}, + state + ) do + GenServer.reply(from, result |> Nif.unwrap_ok()) + {:noreply, state} + end + + @impl true def handle_info({:on_resume}, %{reference: reference} = state) do - Nif.producer_resume(reference) + Nif.producer_resume_async(reference, nil) {:noreply, state} end + @impl true def handle_info({:on_pause}, %{reference: reference} = state) do - Nif.producer_pause(reference) + Nif.producer_pause_async(reference, nil) {:noreply, state} end + @impl true def handle_info({:on_close}, state) do {:stop, :normal, state} end + @impl true def terminate(_reason, %{reference: reference} = _state) do Nif.producer_close(reference) :ok diff --git a/lib/router.ex b/lib/router.ex index 6245f110..1f4961be 100644 --- a/lib/router.ex +++ b/lib/router.ex @@ -324,6 +324,7 @@ defmodule Mediasoup.Router do GenServer.start_link(__MODULE__, %{reference: reference}, opt) end + @impl true def init(state) do Process.flag(:trap_exit, true) {:ok, supervisor} = DynamicSupervisor.start_link(strategy: :one_for_one) @@ -331,6 +332,7 @@ defmodule Mediasoup.Router do {:ok, Map.put(state, :supervisor, supervisor)} end + @impl true def handle_call( {:event, [listener, event_types]}, _from, @@ -354,27 +356,21 @@ defmodule Mediasoup.Router do NifWrap.def_handle_call_nif(%{ closed?: &Nif.router_closed/1, - dump: &Nif.router_dump/1, can_consume?: &Nif.router_can_consume/3, rtp_capabilities: &Nif.router_rtp_capabilities/1 }) - def handle_call( - {:create_pipe_transport, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state - ) do - ret = - Nif.router_create_pipe_transport(reference, option) - |> NifWrap.handle_create_result(PipeTransport, supervisor) - - {:reply, ret, state} - end + NifWrap.def_handle_call_async_nif(%{ + dump: &Nif.router_dump_async/2, + create_pipe_transport: &Nif.router_create_pipe_transport_async/3, + create_plain_transport: &Nif.router_create_plain_transport_async/3 + }) + @impl true def handle_call( {:create_webrtc_transport, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state + from, + %{reference: reference} = state ) do option = Map.update(option, :webrtc_server, nil, fn webrtc_server -> @@ -385,23 +381,14 @@ defmodule Mediasoup.Router do end end) - ret = - Nif.router_create_webrtc_transport(reference, option) - |> NifWrap.handle_create_result(WebRtcTransport, supervisor) - - {:reply, ret, state} - end - - def handle_call( - {:create_plain_transport, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state - ) do - ret = - Nif.router_create_plain_transport(reference, option) - |> NifWrap.handle_create_result(PlainTransport, supervisor) - - {:reply, ret, state} + case Nif.router_create_webrtc_transport_async( + reference, + option, + {:create_webrtc_transport, from} + ) do + :ok -> {:noreply, state} + error -> {:reply, error, state} + end end def handle_call({:get_node}, _from, state) do @@ -435,6 +422,36 @@ defmodule Mediasoup.Router do {:reply, :ok, Map.put(state, :mapped_pipe_transports, %{id => pair})} end + def handle_info( + {:mediasoup_async_nif_result, {operation, from}, result}, + %{supervisor: supervisor} = state + ) + when operation in [ + :create_pipe_transport, + :create_plain_transport, + :create_webrtc_transport + ] do + module = + case operation do + :create_pipe_transport -> PipeTransport + :create_plain_transport -> PlainTransport + :create_webrtc_transport -> WebRtcTransport + end + + GenServer.reply(from, NifWrap.handle_create_result(result, module, supervisor)) + {:noreply, state} + end + + @impl true + def handle_info( + {:mediasoup_async_nif_result, {_, from}, result}, + state + ) do + GenServer.reply(from, result |> Nif.unwrap_ok()) + {:noreply, state} + end + + @impl true def terminate(reason, %{reference: reference, supervisor: supervisor} = _state) do DynamicSupervisor.stop(supervisor, reason) Nif.router_close(reference) diff --git a/lib/webrtc_server.ex b/lib/webrtc_server.ex index 305a6d4c..47d3215d 100644 --- a/lib/webrtc_server.ex +++ b/lib/webrtc_server.ex @@ -112,6 +112,7 @@ defmodule Mediasoup.WebRtcServer do GenServer.start_link(__MODULE__, %{reference: reference}, opt) end + @impl true def init(state) do Process.flag(:trap_exit, true) {:ok, supervisor} = DynamicSupervisor.start_link(strategy: :one_for_one) @@ -121,8 +122,11 @@ defmodule Mediasoup.WebRtcServer do NifWrap.def_handle_call_nif(%{ close: &Nif.webrtc_server_close/1, - closed?: &Nif.webrtc_server_closed/1, - dump: &Nif.webrtc_server_dump/1 + closed?: &Nif.webrtc_server_closed/1 + }) + + NifWrap.def_handle_call_async_nif(%{ + dump: &Nif.webrtc_server_dump_async/2 }) def handle_call( @@ -132,4 +136,13 @@ defmodule Mediasoup.WebRtcServer do ) do {:reply, reference, state} end + + @impl true + def handle_info( + {:mediasoup_async_nif_result, {_, from}, result}, + state + ) do + GenServer.reply(from, result |> Nif.unwrap_ok()) + {:noreply, state} + end end diff --git a/lib/webrtc_transport.ex b/lib/webrtc_transport.ex index 76236ac6..37b70a15 100644 --- a/lib/webrtc_transport.ex +++ b/lib/webrtc_transport.ex @@ -452,6 +452,7 @@ defmodule Mediasoup.WebRtcTransport do GenServer.start_link(__MODULE__, %{reference: reference}, opt) end + @impl true def init(state) do Process.flag(:trap_exit, true) {:ok, supervisor} = DynamicSupervisor.start_link(strategy: :one_for_one) @@ -459,6 +460,7 @@ defmodule Mediasoup.WebRtcTransport do {:ok, Map.put(state, :supervisor, supervisor)} end + @impl true def handle_call( {:event, [listener, event_types]}, _from, @@ -472,6 +474,7 @@ defmodule Mediasoup.WebRtcTransport do {:reply, result, state} end + @impl true def handle_call( {:struct_from_pid, _arg}, _from, @@ -483,71 +486,73 @@ defmodule Mediasoup.WebRtcTransport do NifWrap.def_handle_call_nif(%{ close: &Nif.webrtc_transport_close/1, closed?: &Nif.webrtc_transport_closed/1, - dump: &Nif.webrtc_transport_dump/1, - get_stats: &Nif.webrtc_transport_get_stats/1, sctp_state: &Nif.webrtc_transport_sctp_state/1, - connect: &Nif.webrtc_transport_connect/2, ice_parameters: &Nif.webrtc_transport_ice_parameters/1, ice_candidates: &Nif.webrtc_transport_ice_candidates/1, ice_role: &Nif.webrtc_transport_ice_role/1, ice_state: &Nif.webrtc_transport_ice_state/1, - restart_ice: &Nif.webrtc_transport_restart_ice/1, ice_selected_tuple: &Nif.webrtc_transport_ice_selected_tuple/1, sctp_parameters: &Nif.webrtc_transport_sctp_parameters/1, - set_max_incoming_bitrate: &Nif.webrtc_transport_set_max_incoming_bitrate/2, - set_max_outgoing_bitrate: &Nif.webrtc_transport_set_max_outgoing_bitrate/2, dtls_parameters: &Nif.webrtc_transport_dtls_parameters/1, dtls_state: &Nif.webrtc_transport_dtls_state/1 }) - def handle_call( - {:produce, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state - ) do - ret = - Nif.webrtc_transport_produce(reference, option) - |> NifWrap.handle_create_result(Producer, supervisor) - - {:reply, ret, state} - end - - def handle_call( - {:produce_data, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state - ) do - ret = - Nif.webrtc_transport_produce_data(reference, option) - |> NifWrap.handle_create_result(DataProducer, supervisor) - - {:reply, ret, state} - end + NifWrap.def_handle_call_async_nif(%{ + produce: &Nif.webrtc_transport_produce_async/3, + consume: &Nif.webrtc_transport_consume_async/3, + produce_data: &Nif.webrtc_transport_produce_data_async/3, + consume_data: &Nif.webrtc_transport_consume_data_async/3, + connect: &Nif.webrtc_transport_connect_async/3, + get_stats: &Nif.webrtc_transport_get_stats_async/2, + dump: &Nif.webrtc_transport_dump_async/2, + set_max_incoming_bitrate: &Nif.webrtc_transport_set_max_incoming_bitrate_async/3, + set_max_outgoing_bitrate: &Nif.webrtc_transport_set_max_outgoing_bitrate_async/3, + restart_ice: &Nif.webrtc_transport_restart_ice_async/2 + }) - def handle_call( - {:consume, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state - ) do - ret = - Nif.webrtc_transport_consume(reference, option) - |> NifWrap.handle_create_result(Consumer, supervisor) + def handle_info( + {:mediasoup_async_nif_result, {message_tag, from}, result}, + %{supervisor: supervisor} = state + ) + when message_tag in [:produce, :consume, :produce_data, :consume_data] do + module = + case message_tag do + :produce -> Producer + :consume -> Consumer + :produce_data -> DataProducer + :consume_data -> DataConsumer + end - {:reply, ret, state} + GenServer.reply(from, NifWrap.handle_create_result(result, module, supervisor)) + {:noreply, state} end - def handle_call( - {:consume_data, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state + @impl true + def handle_info( + {:mediasoup_async_nif_result, {unwrap_ok_func, from}, result}, + state + ) + when unwrap_ok_func in [ + :connect, + :dump, + :get_stats, + :set_max_incoming_bitrate, + :set_max_outgoing_bitrate + ] do + GenServer.reply(from, result |> Nif.unwrap_ok()) + {:noreply, state} + end + + @impl true + def handle_info( + {:mediasoup_async_nif_result, {_, from}, result}, + state ) do - ret = - Nif.webrtc_transport_consume_data(reference, option) - |> NifWrap.handle_create_result(DataConsumer, supervisor) - - {:reply, ret, state} + GenServer.reply(from, result) + {:noreply, state} end + @impl true def terminate(reason, %{reference: reference, supervisor: supervisor} = _state) do DynamicSupervisor.stop(supervisor, reason) Nif.webrtc_transport_close(reference) diff --git a/lib/worker.ex b/lib/worker.ex index 933d7c34..9e79502a 100644 --- a/lib/worker.ex +++ b/lib/worker.ex @@ -208,33 +208,47 @@ defmodule Mediasoup.Worker do end NifWrap.def_handle_call_nif(%{ - id: &Nif.worker_id/1, - update_settings: &Nif.worker_update_settings/2, - dump: &Nif.worker_dump/1 + id: &Nif.worker_id/1 }) - def handle_call( - {:create_router, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state + NifWrap.def_handle_call_async_nif(%{ + update_settings: &Nif.worker_update_settings_async/3, + create_router: &Nif.worker_create_router_async/3, + create_webrtc_server: &Nif.worker_create_webrtc_server_async/3, + dump: &Nif.worker_dump_async/2 + }) + + def handle_info( + {:mediasoup_async_nif_result, {:create_router, from}, result}, + %{supervisor: supervisor} = state ) do - ret = - Nif.worker_create_router(reference, option) - |> NifWrap.handle_create_result(Router, supervisor) + GenServer.reply(from, NifWrap.handle_create_result(result, Router, supervisor)) + {:noreply, state} + end - {:reply, ret, state} + def handle_info( + {:mediasoup_async_nif_result, {:create_webrtc_server, from}, result}, + %{supervisor: supervisor} = state + ) do + GenServer.reply(from, NifWrap.handle_create_result(result, WebRtcServer, supervisor)) + {:noreply, state} end - def handle_call( - {:create_webrtc_server, [option]}, - _from, - %{reference: reference, supervisor: supervisor} = state + def handle_info( + {:mediasoup_async_nif_result, {:dump, from}, result}, + state ) do - ret = - Nif.worker_create_webrtc_server(reference, option) - |> NifWrap.handle_create_result(WebRtcServer, supervisor) + GenServer.reply(from, result |> Nif.unwrap_ok()) - {:reply, ret, state} + {:noreply, state} + end + + def handle_info( + {:mediasoup_async_nif_result, {_, from}, result}, + state + ) do + GenServer.reply(from, result) + {:noreply, state} end def handle_call( diff --git a/lib/wrap.ex b/lib/wrap.ex index a91351da..66d8cbed 100644 --- a/lib/wrap.ex +++ b/lib/wrap.ex @@ -70,6 +70,25 @@ defmodule Mediasoup.NifWrap do end end + @spec def_handle_call_async_nif(any) :: + {:__block__, [], [{:@, [...], [...]} | {:def, [...], [...]}, ...]} + defmacro def_handle_call_async_nif(nif_call_map) do + quote do + @nif_map unquote(nif_call_map) + @nif_keylist Map.keys(@nif_map) + + def handle_call( + {function, arg}, + from, + %{reference: reference} = state + ) + when function in @nif_keylist do + result = apply(Map.fetch!(@nif_map, function), [reference | arg] ++ [{function, from}]) + {:noreply, state} + end + end + end + def handle_create_result(create_result, module, supervisor) do with {:ok, ref} <- create_result, {:ok, pid} <- diff --git a/native/mediasoup_elixir/.gitignore b/native/mediasoup_elixir/.gitignore index 35ee1524..e566a07b 100644 --- a/native/mediasoup_elixir/.gitignore +++ b/native/mediasoup_elixir/.gitignore @@ -1,6 +1,6 @@ # This file should only ignore things that are generated during a `x.py` build, # generated by common IDEs, and optional files controlled by the user that -# affect the build (such as config.toml). +# affect the build. # In particular, things like `mir_dump` should not be listed here; they are only # created during manual debugging and many people like to clean up instead of # having git ignore such leftovers. You can use `.git/info/exclude` to @@ -28,7 +28,6 @@ Session.vim !/src/test/run-make/thumb-none-qemu/example/.cargo ## Configuration -/config.toml /Makefile config.mk config.stamp diff --git a/native/mediasoup_elixir/src/consumer.rs b/native/mediasoup_elixir/src/consumer.rs index 2f0fbb0c..696f7401 100644 --- a/native/mediasoup_elixir/src/consumer.rs +++ b/native/mediasoup_elixir/src/consumer.rs @@ -1,12 +1,13 @@ -use crate::atoms; use crate::json_serde::JsonSerdeWrap; -use crate::{send_async_nif_result, send_msg_from_other_thread, DisposableResourceWrapper}; +use crate::{ + atoms, send_async_nif_result_with_from, send_msg_from_other_thread, DisposableResourceWrapper, +}; use mediasoup::consumer::{ Consumer, ConsumerId, ConsumerLayers, ConsumerOptions, ConsumerScore, ConsumerType, }; use mediasoup::producer::ProducerId; use mediasoup::rtp_parameters::{MediaKind, RtpCapabilities, RtpParameters}; -use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc}; +use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc, Term}; pub type ConsumerRef = DisposableResourceWrapper; #[rustler::resource_impl] @@ -98,10 +99,14 @@ pub fn consumer_current_layers( } #[rustler::nif(name = "consumer_get_stats_async")] -pub fn consumer_get_stats(env: Env, consumer: ResourceArc) -> NifResult<(Atom, Atom)> { +pub fn consumer_get_stats( + env: Env, + consumer: ResourceArc, + from: Term, +) -> NifResult { let consumer = consumer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { consumer .get_stats() .await @@ -110,18 +115,22 @@ pub fn consumer_get_stats(env: Env, consumer: ResourceArc) -> NifRe }) } #[rustler::nif(name = "consumer_pause_async")] -pub fn consumer_pause(env: Env, consumer: ResourceArc) -> NifResult<(Atom, Atom)> { +pub fn consumer_pause(env: Env, consumer: ResourceArc, from: Term) -> NifResult { let consumer = consumer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { consumer.pause().await.map_err(|error| format!("{}", error)) }) } #[rustler::nif(name = "consumer_resume_async")] -pub fn consumer_resume(env: Env, consumer: ResourceArc) -> NifResult<(Atom, Atom)> { +pub fn consumer_resume( + env: Env, + consumer: ResourceArc, + from: Term, +) -> NifResult { let consumer = consumer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { consumer .resume() .await @@ -134,10 +143,11 @@ pub fn consumer_set_preferred_layers( env: Env, consumer: ResourceArc, layer: JsonSerdeWrap, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let consumer = consumer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { consumer .set_preferred_layers(*layer) .await @@ -150,10 +160,11 @@ pub fn consumer_set_priority( env: Env, consumer: ResourceArc, priority: u8, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let consumer = consumer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { consumer .set_priority(priority) .await @@ -164,10 +175,11 @@ pub fn consumer_set_priority( pub fn consumer_unset_priority( env: Env, consumer: ResourceArc, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let consumer = consumer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { consumer .unset_priority() .await @@ -179,10 +191,11 @@ pub fn consumer_unset_priority( pub fn consumer_request_key_frame( env: Env, consumer: ResourceArc, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let consumer = consumer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { consumer .request_key_frame() .await @@ -191,10 +204,10 @@ pub fn consumer_request_key_frame( } #[rustler::nif(name = "consumer_dump_async")] -pub fn consumer_dump(env: Env, consumer: ResourceArc) -> NifResult<(Atom, Atom)> { +pub fn consumer_dump(env: Env, consumer: ResourceArc, from: Term) -> NifResult { let consumer = consumer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { consumer .dump() .await diff --git a/native/mediasoup_elixir/src/lib.rs b/native/mediasoup_elixir/src/lib.rs index 6eeb2892..bd31a37b 100644 --- a/native/mediasoup_elixir/src/lib.rs +++ b/native/mediasoup_elixir/src/lib.rs @@ -12,11 +12,13 @@ mod producer; mod resource; mod router; mod task; +mod term_box; mod webrtc_server; mod webrtc_transport; mod worker; use crate::resource::DisposableResourceWrapper; +use crate::term_box::TermBox; use futures_lite::future; use rustler::{Atom, Encoder, Env, LocalPid, NifResult, OwnedEnv}; @@ -59,4 +61,29 @@ where Ok((atoms::ok(), result_key)) } +pub fn send_async_nif_result_with_from( + env: Env, + from: rustler::Term, + future: Fut, +) -> NifResult +where + T: Encoder, + Fut: future::Future + Send + 'static, +{ + let pid = env.pid(); + let mut my_env = OwnedEnv::new(); + + let from = TermBox::new(from); + task::spawn(async move { + let result = future.await; + + let _ = my_env.send_and_clear(&pid, |env| { + (atoms::mediasoup_async_nif_result(), from.get(env), result).encode(env) + }); + }) + .detach(); + + Ok(atoms::ok()) +} + rustler::init!("Elixir.Mediasoup.Nif"); diff --git a/native/mediasoup_elixir/src/pipe_transport.rs b/native/mediasoup_elixir/src/pipe_transport.rs index 00870e4a..6fc6805f 100644 --- a/native/mediasoup_elixir/src/pipe_transport.rs +++ b/native/mediasoup_elixir/src/pipe_transport.rs @@ -1,11 +1,10 @@ -use crate::atoms; use crate::consumer::{ConsumerOptionsStruct, ConsumerRef}; use crate::data_consumer::{DataConsumerOptionsStruct, DataConsumerRef}; use crate::data_producer::{DataProducerOptionsStruct, DataProducerRef}; use crate::data_structure::SerNumSctpStreams; use crate::json_serde::JsonSerdeWrap; use crate::producer::{ProducerOptionsStruct, ProducerRef}; -use crate::{send_async_nif_result, DisposableResourceWrapper}; +use crate::{atoms, send_async_nif_result_with_from, DisposableResourceWrapper}; use mediasoup::data_structures::{ListenInfo, SctpState, TransportTuple}; use mediasoup::prelude::{ PipeTransport, PipeTransportOptions, PipeTransportRemoteParameters, Transport, @@ -14,7 +13,7 @@ use mediasoup::prelude::{ use mediasoup::sctp_parameters::SctpParameters; use mediasoup::srtp_parameters::SrtpParameters; -use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc}; +use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc, Term}; pub type PipeTransportRef = DisposableResourceWrapper; @@ -109,11 +108,12 @@ pub fn pipe_transport_consume( env: Env, transport: ResourceArc, option: ConsumerOptionsStruct, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option = option.to_option(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .consume(option) .await @@ -128,11 +128,12 @@ pub fn pipe_transport_consume_data( env: Env, transport: ResourceArc, option: DataConsumerOptionsStruct, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option = option.to_option(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .consume_data(option) .await @@ -147,12 +148,13 @@ pub fn pipe_transport_connect( env: Env, transport: ResourceArc, option: JsonSerdeWrap, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option = option.clone(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .connect(option) .await @@ -165,11 +167,12 @@ pub fn pipe_transport_produce( env: Env, transport: ResourceArc, option: ProducerOptionsStruct, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option = option.to_option(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .produce(option) .await @@ -184,11 +187,12 @@ pub fn pipe_transport_produce_data( env: Env, transport: ResourceArc, option: DataProducerOptionsStruct, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option = option.to_option(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .produce_data(option) .await @@ -202,10 +206,11 @@ pub fn pipe_transport_produce_data( pub fn pipe_transport_get_stats( env: Env, transport: ResourceArc, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .get_stats() .await @@ -219,10 +224,11 @@ pub fn pipe_transport_set_max_incoming_bitrate( env: Env, transport: ResourceArc, bitrate: u32, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .set_max_incoming_bitrate(bitrate) .await @@ -257,10 +263,11 @@ pub fn pipe_transport_srtp_parameters( pub fn pipe_transport_dump( env: Env, transport: ResourceArc, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .dump() .await diff --git a/native/mediasoup_elixir/src/plain_transport.rs b/native/mediasoup_elixir/src/plain_transport.rs index 7f193054..34dd1b2f 100644 --- a/native/mediasoup_elixir/src/plain_transport.rs +++ b/native/mediasoup_elixir/src/plain_transport.rs @@ -2,7 +2,7 @@ use crate::consumer::{ConsumerOptionsStruct, ConsumerRef}; use crate::data_structure::SerNumSctpStreams; use crate::json_serde::JsonSerdeWrap; use crate::producer::{ProducerOptionsStruct, ProducerRef}; -use crate::{atoms, send_async_nif_result, DisposableResourceWrapper}; +use crate::{atoms, send_async_nif_result_with_from, DisposableResourceWrapper}; use mediasoup::consumer::ConsumerOptions; use mediasoup::data_structures::{ListenInfo, SctpState, TransportTuple}; use mediasoup::prelude::{ @@ -12,7 +12,7 @@ use mediasoup::prelude::{ use mediasoup::producer::ProducerOptions; use mediasoup::sctp_parameters::SctpParameters; use mediasoup::srtp_parameters::SrtpParameters; -use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc}; +use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc, Term}; pub type PlainTransportRef = DisposableResourceWrapper; @@ -108,11 +108,12 @@ pub fn plain_transport_connect( env: Env, transport: ResourceArc, option: JsonSerdeWrap, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option: PlainTransportRemoteParameters = option.clone(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .connect(option) .await @@ -124,10 +125,11 @@ pub fn plain_transport_connect( pub fn plain_transport_get_stats( env: Env, transport: ResourceArc, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .get_stats() .await @@ -141,12 +143,13 @@ pub fn plain_transport_produce( env: Env, transport: ResourceArc, option: ProducerOptionsStruct, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option: ProducerOptions = option.to_option(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .produce(option) .await @@ -161,12 +164,13 @@ pub fn plain_transport_consume( env: Env, transport: ResourceArc, option: ConsumerOptionsStruct, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option: ConsumerOptions = option.to_option(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .consume(option) .await diff --git a/native/mediasoup_elixir/src/producer.rs b/native/mediasoup_elixir/src/producer.rs index bfe66ddf..0100e0ec 100644 --- a/native/mediasoup_elixir/src/producer.rs +++ b/native/mediasoup_elixir/src/producer.rs @@ -1,9 +1,9 @@ -use crate::atoms; use crate::json_serde::JsonSerdeWrap; -use crate::{send_async_nif_result, send_msg_from_other_thread, DisposableResourceWrapper}; +use crate::{atoms, send_async_nif_result_with_from}; +use crate::{send_msg_from_other_thread, DisposableResourceWrapper}; use mediasoup::producer::{Producer, ProducerId, ProducerOptions, ProducerScore, ProducerType}; use mediasoup::rtp_parameters::{MediaKind, RtpParameters}; -use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc}; +use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc, Term}; pub type ProducerRef = DisposableResourceWrapper; #[rustler::resource_impl] @@ -41,10 +41,10 @@ pub fn producer_close(producer: ResourceArc) -> NifResult<(Atom,)> Ok((atoms::ok(),)) } #[rustler::nif(name = "producer_pause_async")] -pub fn producer_pause(env: Env, producer: ResourceArc) -> NifResult<(Atom, Atom)> { +pub fn producer_pause(env: Env, producer: ResourceArc, from: Term) -> NifResult { let producer = producer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { producer.pause().await.map_err(|error| format!("{}", error)) }) } @@ -72,10 +72,14 @@ pub fn producer_score( } #[rustler::nif(name = "producer_get_stats_async")] -pub fn producer_get_stats(env: Env, producer: ResourceArc) -> NifResult<(Atom, Atom)> { +pub fn producer_get_stats( + env: Env, + producer: ResourceArc, + from: Term, +) -> NifResult { let producer = producer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { producer .get_stats() .await @@ -85,10 +89,14 @@ pub fn producer_get_stats(env: Env, producer: ResourceArc) -> NifRe } #[rustler::nif(name = "producer_resume_async")] -pub fn producer_resume(env: Env, producer: ResourceArc) -> NifResult<(Atom, Atom)> { +pub fn producer_resume( + env: Env, + producer: ResourceArc, + from: Term, +) -> NifResult { let producer = producer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { producer .resume() .await @@ -97,10 +105,10 @@ pub fn producer_resume(env: Env, producer: ResourceArc) -> NifResul } #[rustler::nif(name = "producer_dump_async")] -pub fn producer_dump(env: Env, producer: ResourceArc) -> NifResult<(Atom, Atom)> { +pub fn producer_dump(env: Env, producer: ResourceArc, from: Term) -> NifResult { let producer = producer.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { producer .dump() .await diff --git a/native/mediasoup_elixir/src/router.rs b/native/mediasoup_elixir/src/router.rs index 623b8c10..0a21154e 100644 --- a/native/mediasoup_elixir/src/router.rs +++ b/native/mediasoup_elixir/src/router.rs @@ -1,13 +1,12 @@ -use crate::atoms; use crate::json_serde::JsonSerdeWrap; use crate::pipe_transport::{PipeTransportOptionsStruct, PipeTransportRef}; use crate::plain_transport::{PlainTransportOptionsStruct, PlainTransportRef}; use crate::webrtc_transport::{WebRtcTransportOptionsStruct, WebRtcTransportRef}; -use crate::{send_async_nif_result, DisposableResourceWrapper}; +use crate::{atoms, send_async_nif_result_with_from, DisposableResourceWrapper}; use mediasoup::producer::ProducerId; use mediasoup::router::{Router, RouterId, RouterOptions}; use mediasoup::rtp_parameters::{RtpCapabilities, RtpCapabilitiesFinalized, RtpCodecCapability}; -use rustler::{Env, Error, NifResult, NifStruct, ResourceArc}; +use rustler::{Env, Error, NifResult, NifStruct, ResourceArc, Term}; pub type RouterRef = DisposableResourceWrapper; #[rustler::resource_impl] @@ -34,11 +33,12 @@ pub fn router_create_webrtc_transport( env: Env, router: ResourceArc, option: WebRtcTransportOptionsStruct, -) -> NifResult<(rustler::Atom, rustler::Atom)> { + from: Term, +) -> NifResult { let router = router.get_resource()?; let option = option.try_to_option()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { router .create_webrtc_transport(option) .await @@ -53,13 +53,14 @@ pub fn router_create_plain_transport( env: Env, router: ResourceArc, option: PlainTransportOptionsStruct, -) -> NifResult<(rustler::Atom, rustler::Atom)> { + from: Term, +) -> NifResult { let router = router.get_resource()?; let option = option .try_to_option() .map_err(|error| Error::Term(Box::new(error.to_string())))?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { router .create_plain_transport(option) .await @@ -83,11 +84,12 @@ pub fn router_create_pipe_transport( env: Env, router: ResourceArc, option: PipeTransportOptionsStruct, -) -> NifResult<(rustler::Atom, rustler::Atom)> { + from: Term, +) -> NifResult { let router = router.get_resource()?; let option = option.try_to_option()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { router .create_pipe_transport(option) .await @@ -114,10 +116,11 @@ pub fn router_can_consume( pub fn router_dump( env: Env, router: ResourceArc, -) -> NifResult<(rustler::Atom, rustler::Atom)> { + from: Term, +) -> NifResult { let router = router.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { router .dump() .await diff --git a/native/mediasoup_elixir/src/term_box.rs b/native/mediasoup_elixir/src/term_box.rs new file mode 100644 index 00000000..d45ff46a --- /dev/null +++ b/native/mediasoup_elixir/src/term_box.rs @@ -0,0 +1,44 @@ +use rustler::env::OwnedEnv; +use rustler::env::SavedTerm; +use rustler::Env; +use rustler::Term; + +// based on https://github.com/rusterlium/rustler/issues/333#issuecomment-702236600 + +pub struct TermBox { + inner: std::sync::Mutex, +} + +struct TermBoxContents { + owned_env: OwnedEnv, + saved_term: SavedTerm, +} + +impl TermBox { + pub fn new(term: Term) -> Self { + Self { + inner: std::sync::Mutex::new(TermBoxContents::new(term)), + } + } + + pub fn get<'a>(&self, env: Env<'a>) -> Term<'a> { + let inner = self.inner.lock().expect("Failed to acquire the mutex lock"); + + // Copy over term from owned environment to the target environment + inner.owned_env.run(|inner_env| { + let term = inner.saved_term.load(inner_env); + term.in_env(env) + }) + } +} + +impl TermBoxContents { + fn new(term: Term) -> Self { + let owned_env = OwnedEnv::new(); + let saved_term = owned_env.save(term); + Self { + owned_env: owned_env, + saved_term: saved_term, + } + } +} diff --git a/native/mediasoup_elixir/src/webrtc_server.rs b/native/mediasoup_elixir/src/webrtc_server.rs index c31c8012..5d060a3d 100644 --- a/native/mediasoup_elixir/src/webrtc_server.rs +++ b/native/mediasoup_elixir/src/webrtc_server.rs @@ -1,8 +1,10 @@ -use crate::{atoms, json_serde::JsonSerdeWrap, send_async_nif_result, DisposableResourceWrapper}; +use crate::{ + atoms, json_serde::JsonSerdeWrap, send_async_nif_result_with_from, DisposableResourceWrapper, +}; use mediasoup::prelude::{ ListenInfo, WebRtcServer, WebRtcServerId, WebRtcServerListenInfos, WebRtcServerOptions, }; -use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc}; +use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc, Term}; pub type WebRtcServerRef = DisposableResourceWrapper; @@ -56,10 +58,11 @@ pub fn webrtc_server_closed(server: ResourceArc) -> NifResult, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let server = server.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { server .dump() .await diff --git a/native/mediasoup_elixir/src/webrtc_transport.rs b/native/mediasoup_elixir/src/webrtc_transport.rs index 338f7eec..269a08d9 100644 --- a/native/mediasoup_elixir/src/webrtc_transport.rs +++ b/native/mediasoup_elixir/src/webrtc_transport.rs @@ -5,7 +5,9 @@ use crate::data_structure::SerNumSctpStreams; use crate::json_serde::JsonSerdeWrap; use crate::producer::{ProducerOptionsStruct, ProducerRef}; use crate::webrtc_server::WebRtcServerRef; -use crate::{atoms, send_async_nif_result, send_msg_from_other_thread, DisposableResourceWrapper}; +use crate::{ + atoms, send_async_nif_result_with_from, send_msg_from_other_thread, DisposableResourceWrapper, +}; use mediasoup::data_structures::{ DtlsParameters, DtlsState, IceParameters, IceRole, IceState, ListenInfo, SctpState, TransportTuple, @@ -20,7 +22,7 @@ use mediasoup::transport::TransportId; use mediasoup::webrtc_transport::{ WebRtcTransportListenInfos, WebRtcTransportOptions, WebRtcTransportRemoteParameters, }; -use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc}; +use rustler::{Atom, Env, NifResult, NifStruct, ResourceArc, Term}; pub type WebRtcTransportRef = DisposableResourceWrapper; @@ -54,12 +56,13 @@ pub fn webrtc_transport_consume( env: Env, transport: ResourceArc, option: ConsumerOptionsStruct, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option: ConsumerOptions = option.to_option(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .consume(option) .await @@ -74,12 +77,13 @@ pub fn webrtc_transport_consume_data( env: Env, transport: ResourceArc, option: DataConsumerOptionsStruct, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option: DataConsumerOptions = option.to_option(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .consume_data(option) .await @@ -94,11 +98,12 @@ pub fn webrtc_transport_connect( env: Env, transport: ResourceArc, option: JsonSerdeWrap, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option: WebRtcTransportRemoteParameters = option.clone(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .connect(option) .await @@ -111,11 +116,12 @@ pub fn webrtc_transport_produce( env: Env, transport: ResourceArc, option: ProducerOptionsStruct, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option: ProducerOptions = option.to_option(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .produce(option) .await @@ -130,11 +136,12 @@ pub fn webrtc_transport_produce_data( env: Env, transport: ResourceArc, option: DataProducerOptionsStruct, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; let option: DataProducerOptions = option.to_option(); - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .produce_data(option) .await @@ -181,10 +188,11 @@ pub fn webrtc_transport_set_max_incoming_bitrate( env: Env, transport: ResourceArc, bitrate: u32, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .set_max_incoming_bitrate(bitrate) .await @@ -197,10 +205,11 @@ pub fn webrtc_transport_set_max_outgoing_bitrate( env: Env, transport: ResourceArc, bitrate: u32, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .set_max_outgoing_bitrate(bitrate) .await @@ -220,10 +229,11 @@ pub fn webrtc_transport_ice_state( pub fn webrtc_transport_restart_ice( env: Env, transport: ResourceArc, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .restart_ice() .await @@ -236,10 +246,11 @@ pub fn webrtc_transport_restart_ice( pub fn webrtc_transport_get_stats( env: Env, transport: ResourceArc, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .get_stats() .await @@ -252,10 +263,11 @@ pub fn webrtc_transport_get_stats( pub fn webrtc_transport_dump( env: Env, transport: ResourceArc, -) -> NifResult<(Atom, Atom)> { + from: Term, +) -> NifResult { let transport = transport.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { transport .dump() .await diff --git a/native/mediasoup_elixir/src/worker.rs b/native/mediasoup_elixir/src/worker.rs index 36d3bbff..94c1ed03 100644 --- a/native/mediasoup_elixir/src/worker.rs +++ b/native/mediasoup_elixir/src/worker.rs @@ -4,7 +4,7 @@ use crate::router::{RouterOptionsStruct, RouterRef}; use crate::task; use crate::webrtc_server::{WebRtcServerOptionsStruct, WebRtcServerRef}; use crate::DisposableResourceWrapper; -use crate::{send_async_nif_result, send_msg_from_other_thread}; +use crate::{send_async_nif_result, send_async_nif_result_with_from, send_msg_from_other_thread}; use mediasoup::worker::{ Worker, WorkerDtlsFiles, WorkerId, WorkerLogLevel, WorkerLogTag, WorkerSettings, WorkerUpdateSettings, @@ -42,10 +42,11 @@ pub fn worker_create_router( env: Env, worker: ResourceArc, option: RouterOptionsStruct, -) -> NifResult<(rustler::Atom, rustler::Atom)> { + from: rustler::Term, +) -> NifResult { let worker = worker.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { let option = option.to_option(); worker .create_router(option) @@ -61,12 +62,13 @@ pub fn worker_create_webrtc_server( env: Env, worker: ResourceArc, option: WebRtcServerOptionsStruct, -) -> NifResult<(rustler::Atom, rustler::Atom)> { + from: rustler::Term, +) -> NifResult { let worker = worker.get_resource()?; let option = option .try_to_option() .map_err(|error| Error::Term(Box::new(error.to_string())))?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { worker .create_webrtc_server(option) .await @@ -80,9 +82,10 @@ pub fn worker_create_webrtc_server( pub fn worker_dump( env: Env, worker: ResourceArc, -) -> NifResult<(rustler::Atom, rustler::Atom)> { + from: rustler::Term, +) -> NifResult { let worker = worker.get_resource()?; - send_async_nif_result(env, async move { + send_async_nif_result_with_from(env, from, async move { worker .dump() .await @@ -101,16 +104,17 @@ pub fn worker_update_settings( env: Env, worker: ResourceArc, settings: WorkerUpdateableSettingsStruct, -) -> NifResult<(rustler::Atom, rustler::Atom)> { + from: rustler::Term, +) -> NifResult { let worker = worker.get_resource()?; let settings = settings.try_to_setting()?; - send_async_nif_result(env, async move { - worker - .update_settings(settings) - .await - .map_err(|error| format!("{}", error)) + send_async_nif_result_with_from(env, from, async move { + match worker.update_settings(settings).await { + Ok(_) => (atoms::ok(),), + Err(_err) => (atoms::error(),), + } }) } diff --git a/test/integration/test_webrtc_server.ex b/test/integration/test_webrtc_server.ex index e1559ee7..b580460e 100644 --- a/test/integration/test_webrtc_server.ex +++ b/test/integration/test_webrtc_server.ex @@ -139,6 +139,42 @@ defmodule IntegrateTest.WebRtcServerTest do ) end + def create_webrtc_server_close(worker) do + {:ok, webrtc_server} = + Worker.create_webrtc_server(worker, %WebRtcServer.Options{ + listen_infos: [ + %{ + ip: "127.0.0.1", + announcedIp: "9.9.9.1", + port: 10111, + protocol: :tcp + }, + %{ + ip: "0.0.0.0", + announcedIp: "9.9.9.2", + port: 10112, + protocol: :tcp + }, + %{ + ip: "127.0.0.1", + announcedIp: "9.9.9.1", + port: 10111, + protocol: :udp + }, + %{ + ip: "0.0.0.0", + announcedIp: "9.9.9.2", + port: 10112, + protocol: :udp + } + ] + }) + + assert !WebRtcServer.closed?(webrtc_server) + assert :ok == WebRtcServer.close(webrtc_server) + assert WebRtcServer.closed?(webrtc_server) + end + def create_webrtc_server_without_specifying_port_succeeds(worker) do {_worker, router} = init(worker) diff --git a/test/pipe_transport_test.exs b/test/pipe_transport_test.exs index 2b4b7383..6e7700b3 100644 --- a/test/pipe_transport_test.exs +++ b/test/pipe_transport_test.exs @@ -1,6 +1,5 @@ defmodule PipeTransportTest do use ExUnit.Case - import ExUnit.CaptureLog setup do Mediasoup.LoggerProxy.start_link(max_level: :info) diff --git a/test/webrtc_server_test.exs b/test/webrtc_server_test.exs index 0bd8f3a9..f4b2fa98 100644 --- a/test/webrtc_server_test.exs +++ b/test/webrtc_server_test.exs @@ -16,6 +16,10 @@ defmodule MediasoupElixirWebRtcServerTest do IntegrateTest.WebRtcServerTest.create_webrtc_server_succeeds(worker) end + test "create_webrtc_server_close", %{worker: worker} do + IntegrateTest.WebRtcServerTest.create_webrtc_server_close(worker) + end + test "create_webrtc_server_without_specifying_port_succeeds", %{worker: worker} do IntegrateTest.WebRtcServerTest.create_webrtc_server_without_specifying_port_succeeds(worker) end