Skip to content

Commit

Permalink
perf: Non blocking GenServer.handle_call (#308)
Browse files Browse the repository at this point in the history
* perf: Non blocking GenServer.handle_call
  • Loading branch information
satoren authored Oct 18, 2024
1 parent 7c44433 commit ce55710
Show file tree
Hide file tree
Showing 24 changed files with 661 additions and 497 deletions.
39 changes: 30 additions & 9 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -312,22 +312,43 @@ defmodule Mediasoup.Consumer do

NifWrap.def_handle_call_nif(%{
closed?: &Nif.consumer_closed/1,
dump: &Nif.consumer_dump/1,
paused?: &Nif.consumer_paused/1,
producer_paused?: &Nif.consumer_producer_paused/1,
priority: &Nif.consumer_priority/1,
score: &Nif.consumer_score/1,
preferred_layers: &Nif.consumer_preferred_layers/1,
current_layers: &Nif.consumer_current_layers/1,
get_stats: &Nif.consumer_get_stats/1,
pause: &Nif.consumer_pause/1,
resume: &Nif.consumer_resume/1,
set_preferred_layers: &Nif.consumer_set_preferred_layers/2,
set_priority: &Nif.consumer_set_priority/2,
unset_priority: &Nif.consumer_unset_priority/1,
request_key_frame: &Nif.consumer_request_key_frame/1
current_layers: &Nif.consumer_current_layers/1
})

NifWrap.def_handle_call_async_nif(%{
set_preferred_layers: &Nif.consumer_set_preferred_layers_async/3,
set_priority: &Nif.consumer_set_priority_async/3,
request_key_frame: &Nif.consumer_request_key_frame_async/2,
unset_priority: &Nif.consumer_unset_priority_async/2,
dump: &Nif.consumer_dump_async/2,
get_stats: &Nif.consumer_get_stats_async/2,
pause: &Nif.consumer_pause_async/2,
resume: &Nif.consumer_resume_async/2
})

@impl true
def handle_info(
{:mediasoup_async_nif_result, {func, from}, result},
state
) do
reply =
case func do
func when func in [:set_priority, :dump, :get_stats] ->
Nif.unwrap_ok(result)

_ ->
result
end

GenServer.reply(from, reply)
{:noreply, state}
end

@impl true
def handle_info({:on_close}, state) do
{:stop, :normal, state}
Expand Down
298 changes: 94 additions & 204 deletions lib/nif.ex

Large diffs are not rendered by default.

76 changes: 32 additions & 44 deletions lib/pipe_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,15 @@ defmodule Mediasoup.PipeTransport do
GenServer.start_link(__MODULE__, %{reference: reference}, opt)
end

@impl true
def init(state) do
Process.flag(:trap_exit, true)
{:ok, supervisor} = DynamicSupervisor.start_link(strategy: :one_for_one)

{:ok, Map.put(state, :supervisor, supervisor)}
end

@impl true
def handle_call(
{:event, [listener, event_types]},
_from,
Expand All @@ -314,63 +316,49 @@ defmodule Mediasoup.PipeTransport do
NifWrap.def_handle_call_nif(%{
close: &Nif.pipe_transport_close/1,
closed?: &Nif.pipe_transport_closed/1,
dump: &Nif.pipe_transport_dump/1,
get_stats: &Nif.pipe_transport_get_stats/1,
sctp_state: &Nif.pipe_transport_sctp_state/1,
connect: &Nif.pipe_transport_connect/2,
tuple: &Nif.pipe_transport_tuple/1,
sctp_parameters: &Nif.pipe_transport_sctp_parameters/1,
srtp_parameters: &Nif.pipe_transport_srtp_parameters/1
})

def handle_call(
{:produce, [option]},
_from,
%{reference: reference, supervisor: supervisor} = state
) do
ret =
Nif.pipe_transport_produce(reference, option)
|> NifWrap.handle_create_result(Producer, supervisor)

{:reply, ret, state}
end

def handle_call(
{:produce_data, [option]},
_from,
%{reference: reference, supervisor: supervisor} = state
) do
ret =
Nif.pipe_transport_produce_data(reference, option)
|> NifWrap.handle_create_result(DataProducer, supervisor)

{:reply, ret, state}
end
NifWrap.def_handle_call_async_nif(%{
connect: &Nif.pipe_transport_connect_async/3,
dump: &Nif.pipe_transport_dump_async/2,
get_stats: &Nif.pipe_transport_get_stats_async/2,
produce: &Nif.pipe_transport_produce_async/3,
produce_data: &Nif.pipe_transport_produce_data_async/3,
consume: &Nif.pipe_transport_consume_async/3,
consume_data: &Nif.pipe_transport_consume_data_async/3
})

def handle_call(
{:consume, [option]},
_from,
%{reference: reference, supervisor: supervisor} = state
) do
ret =
Nif.pipe_transport_consume(reference, option)
|> NifWrap.handle_create_result(Consumer, supervisor)
def handle_info(
{:mediasoup_async_nif_result, {message_tag, from}, result},
%{supervisor: supervisor} = state
)
when message_tag in [:produce, :consume, :produce_data, :consume_data] do
module =
case message_tag do
:produce -> Producer
:consume -> Consumer
:produce_data -> DataProducer
:consume_data -> DataConsumer
end

{:reply, ret, state}
GenServer.reply(from, NifWrap.handle_create_result(result, module, supervisor))
{:noreply, state}
end

def handle_call(
{:consume_data, [option]},
_from,
%{reference: reference, supervisor: supervisor} = state
@impl true
def handle_info(
{:mediasoup_async_nif_result, {_, from}, result},
state
) do
ret =
Nif.pipe_transport_consume_data(reference, option)
|> NifWrap.handle_create_result(DataConsumer, supervisor)

{:reply, ret, state}
GenServer.reply(from, result |> Nif.unwrap_ok())
{:noreply, state}
end

@impl true
def terminate(reason, %{reference: reference, supervisor: supervisor} = _state) do
DynamicSupervisor.stop(supervisor, reason)
Nif.pipe_transport_close(reference)
Expand Down
51 changes: 29 additions & 22 deletions lib/plain_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ defmodule Mediasoup.PlainTransport do
consume(transport, Consumer.Options.from_map(option))
end

@impl true
def terminate(reason, %{reference: reference, supervisor: supervisor} = _state) do
DynamicSupervisor.stop(supervisor, reason)
Nif.plain_transport_close(reference)
Expand All @@ -223,15 +224,20 @@ defmodule Mediasoup.PlainTransport do
sctp_state: &Nif.plain_transport_sctp_state/1,
srtp_parameters: &Nif.plain_transport_srtp_parameters/1,
# methods
connect: &Nif.plain_transport_connect/2,
dump: &Nif.plain_transport_dump/1,
get_stats: &Nif.plain_transport_get_stats/1,
close: &Nif.plain_transport_close/1,
closed?: &Nif.plain_transport_closed/1,
# events
event: &Nif.plain_transport_event/3
})

NifWrap.def_handle_call_async_nif(%{
connect: &Nif.plain_transport_connect_async/3,
dump: &Nif.plain_transport_dump_async/2,
get_stats: &Nif.plain_transport_get_stats_async/2,
produce: &Nif.plain_transport_produce_async/3,
consume: &Nif.plain_transport_consume_async/3
})

# Mediasoup Plain Transport Events
# https://mediasoup.org/documentation/v3/mediasoup/api/#PlainTransport-events

Expand Down Expand Up @@ -265,6 +271,7 @@ defmodule Mediasoup.PlainTransport do
GenServer.start_link(__MODULE__, %{reference: reference}, opt)
end

@impl true
def init(state) do
Process.flag(:trap_exit, true)
{:ok, supervisor} = DynamicSupervisor.start_link(strategy: :one_for_one)
Expand Down Expand Up @@ -292,27 +299,27 @@ defmodule Mediasoup.PlainTransport do
{:reply, struct_from_pid_and_ref(self(), reference), state}
end

def handle_call(
{:produce, [option]},
_from,
%{reference: reference, supervisor: supervisor} = state
) do
ret =
Nif.plain_transport_produce(reference, option)
|> NifWrap.handle_create_result(Producer, supervisor)

{:reply, ret, state}
def handle_info(
{:mediasoup_async_nif_result, {message_tag, from}, result},
%{supervisor: supervisor} = state
)
when message_tag in [:produce, :consume] do
module =
case message_tag do
:produce -> Producer
:consume -> Consumer
end

GenServer.reply(from, NifWrap.handle_create_result(result, module, supervisor))
{:noreply, state}
end

def handle_call(
{:consume, [option]},
_from,
%{reference: reference, supervisor: supervisor} = state
@impl true
def handle_info(
{:mediasoup_async_nif_result, {_, from}, result},
state
) do
ret =
Nif.plain_transport_consume(reference, option)
|> NifWrap.handle_create_result(Consumer, supervisor)

{:reply, ret, state}
GenServer.reply(from, result |> Nif.unwrap_ok())
{:noreply, state}
end
end
41 changes: 34 additions & 7 deletions lib/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,13 @@ defmodule Mediasoup.Producer do
GenServer.start_link(__MODULE__, %{reference: reference}, opt)
end

@impl true
def init(state) do
Process.flag(:trap_exit, true)
{:ok, state}
end

@impl true
def handle_call(
{:event, [listener, event_types]},
_from,
Expand All @@ -200,6 +202,7 @@ defmodule Mediasoup.Producer do
{:reply, result, state}
end

@impl true
def handle_call(
{:struct_from_pid, _arg},
_from,
Expand All @@ -210,28 +213,52 @@ defmodule Mediasoup.Producer do

NifWrap.def_handle_call_nif(%{
closed?: &Nif.producer_closed/1,
dump: &Nif.producer_dump/1,
paused?: &Nif.producer_paused/1,
score: &Nif.producer_score/1,
get_stats: &Nif.producer_get_stats/1,
pause: &Nif.producer_pause/1,
resume: &Nif.producer_resume/1
score: &Nif.producer_score/1
})

NifWrap.def_handle_call_async_nif(%{
pause: &Nif.producer_pause_async/2,
resume: &Nif.producer_resume_async/2,
get_stats: &Nif.producer_get_stats_async/2,
dump: &Nif.producer_dump_async/2
})

@impl true
def handle_info(
{:mediasoup_async_nif_result, nil, _},
state
) do
{:noreply, state}
end

@impl true
def handle_info(
{:mediasoup_async_nif_result, {_, from}, result},
state
) do
GenServer.reply(from, result |> Nif.unwrap_ok())
{:noreply, state}
end

@impl true
def handle_info({:on_resume}, %{reference: reference} = state) do
Nif.producer_resume(reference)
Nif.producer_resume_async(reference, nil)
{:noreply, state}
end

@impl true
def handle_info({:on_pause}, %{reference: reference} = state) do
Nif.producer_pause(reference)
Nif.producer_pause_async(reference, nil)
{:noreply, state}
end

@impl true
def handle_info({:on_close}, state) do
{:stop, :normal, state}
end

@impl true
def terminate(_reason, %{reference: reference} = _state) do
Nif.producer_close(reference)
:ok
Expand Down
Loading

0 comments on commit ce55710

Please sign in to comment.