Skip to content

Commit

Permalink
feat: add dgram recv callback (#247)
Browse files Browse the repository at this point in the history
* chore: fix some typing and namings

* feat: dgram recv callback

- add dgram_recv callback in quicer_connection
- align naming datagram => dgram
  • Loading branch information
qzhuyan authored Dec 18, 2023
1 parent 0163aa2 commit 09a60d3
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 28 deletions.
6 changes: 3 additions & 3 deletions c_src/quicer_eterms.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ extern ERL_NIF_TERM ATOM_CACERTFILE;
/* msquic execution profile for registration */
/*-------------------------------------------------------*/
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_LOW_LATENCY; // Default
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT;
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER;
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME;
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_MAX_THROUGHPUT;
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_SCAVENGER;
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_REAL_TIME;

/*-----------------------------------------*/
/* msquic params starts */
Expand Down
12 changes: 6 additions & 6 deletions c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ ERL_NIF_TERM ATOM_CACERTFILE;
/* msquic execution profile for registration */
/*-------------------------------------------------------*/
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_LOW_LATENCY; // Default
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT;
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER;
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME;
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_MAX_THROUGHPUT;
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_SCAVENGER;
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_REAL_TIME;

/*-----------------------------------------*/
/* msquic params starts */
Expand Down Expand Up @@ -512,11 +512,11 @@ ERL_NIF_TERM ATOM_QUIC_DATAGRAM_SEND_CANCELED;
/*-------------------------------------------------------*/ \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_LOW_LATENCY, \
quic_execution_profile_low_latency); \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT, \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_MAX_THROUGHPUT, \
quic_execution_profile_max_throughput); \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER, \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_SCAVENGER, \
quic_execution_profile_scavenger); \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME, \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_REAL_TIME, \
quic_execution_profile_real_time); \
/*-----------------------------------------*/ \
/* msquic params starts */ \
Expand Down
7 changes: 3 additions & 4 deletions c_src/quicer_reg.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,15 @@ parse_reg_conf(ERL_NIF_TERM eprofile, QUIC_REGISTRATION_CONFIG *RegConfig)
{
RegConfig->ExecutionProfile = QUIC_EXECUTION_PROFILE_LOW_LATENCY;
}
else if (IS_SAME_TERM(eprofile,
ATOM_QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT))
else if (IS_SAME_TERM(eprofile, ATOM_QUIC_EXECUTION_PROFILE_MAX_THROUGHPUT))
{
RegConfig->ExecutionProfile = QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT;
}
else if (IS_SAME_TERM(eprofile, ATOM_QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER))
else if (IS_SAME_TERM(eprofile, ATOM_QUIC_EXECUTION_PROFILE_SCAVENGER))
{
RegConfig->ExecutionProfile = QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER;
}
else if (IS_SAME_TERM(eprofile, ATOM_QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME))
else if (IS_SAME_TERM(eprofile, ATOM_QUIC_EXECUTION_PROFILE_REAL_TIME))
{
RegConfig->ExecutionProfile = QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME;
}
Expand Down
4 changes: 3 additions & 1 deletion docs/messages_to_owner.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ and the stream active mode is set to false (passive mode).

More streams are available due to flow control from the peer.

If you don't want this event, set 'QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE'
`Available = Max - Used`
```erlang
Expand Down Expand Up @@ -315,7 +317,7 @@ Peer wants to open more streams but cannot due to flow control
with connection handle and integer flag
```erlang
{quic, binary(), connection_handle(), flag :: non_neg_integer()}
{quic, binary(), connection_handle(), Flags :: non_neg_integer()}
```
### DATAGRAM send completed, success or fail.
Expand Down
16 changes: 8 additions & 8 deletions include/quicer_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@
conf_handle() |
reg_handle().

-type registration_profile() :: quic_execution_profile_low_latency |
quic_execution_profile_max_throughput |
quic_execution_profile_scavenger |
quic_execution_profile_realtime.

-type registration_profile() :: execution_profile().
-type quic_handle_level() :: quic_tls | quic_configuration | false.

-type listen_on() :: inet:port_number() | string().
Expand Down Expand Up @@ -304,9 +300,9 @@

-type execution_profile() ::
quic_execution_profile_low_latency |
quic_execution_profile_type_max_throughput |
quic_execution_profile_type_scavenger |
quic_execution_profile_type_realtime.
quic_execution_profile_max_throughput |
quic_execution_profile_scavenger |
quic_execution_profile_real_time.

%% Connection Event Props
-type new_conn_props() :: #{ version := integer()
Expand Down Expand Up @@ -388,4 +384,8 @@
| ?QUIC_DATAGRAM_SEND_ACKNOWLEDGED_SPURIOUS %% Acknowledged after being suspected lost
| ?QUIC_DATAGRAM_SEND_CANCELED. %% Send cancelled

-type dgram_state() :: #{ dgram_send_enabled := boolean()
, dgram_max_len := uint64()
}.

-endif. %% QUICER_TYPES_HRL
15 changes: 14 additions & 1 deletion src/quicer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@
-callback nst_received(connection_handle(), TicketBin :: binary(), cb_state()) -> cb_ret().
%% Client only, New session ticket received,

-callback dgram_state_changed(connection_handle(), dgram_state(), cb_state()) -> cb_ret().
%% Handle Datagram State Changed event.

-callback dgram_recv(connection_handle(), DataBin :: binary(), Flags :: non_neg_integer(), cb_state()) -> cb_ret().
%% Handle Unreliable Datagram of RFC 9221.

-callback handle_call(Req::term(), From::gen_server:from(), cb_state()) -> cb_ret().

-callback handle_info(Info::term(), cb_state()) -> cb_ret().
Expand All @@ -120,6 +126,8 @@
, handle_continue/2
, peer_needs_streams/3 %% require newer MsQuic
, nst_received/3 %% client only
, dgram_state_changed/3 %% because dgram could be off
, dgram_recv/4 %% because dgram could be off
]).
%% Handle API call with callback state.

Expand Down Expand Up @@ -412,7 +420,12 @@ handle_info({quic, nst_received, C, TicketBin},
handle_info({quic, dgram_state_changed, C, Flags},
#{callback := M, callback_state := CBState} = State) ->
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => dgram_state_changed, flags => Flags}),
default_cb_ret(M:datagram_state_changed(C, Flags, CBState), State);
default_cb_ret(M:dgram_state_changed(C, Flags, CBState), State);

handle_info({quic, Bin, C, Flags},
#{conn := C, callback := M, callback_state := CBState} = State) when is_binary(Bin) ->
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => dgram_recv, flags => Flags}),
default_cb_ret(M:dgram_recv(C, Bin, Flags, CBState), State);

handle_info(OtherInfo, #{callback := M,
callback_state := CBState} = State) ->
Expand Down
4 changes: 2 additions & 2 deletions test/example_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
, resumed/3
, nst_received/3
, new_stream/3
, datagram_state_changed/3
, dgram_state_changed/3
]).

-export([handle_info/2]).
Expand Down Expand Up @@ -96,7 +96,7 @@ new_stream(Stream, Flags, #{ conn := Conn, streams := Streams
Other
end.

datagram_state_changed(_Conn, _Flags, S) ->
dgram_state_changed(_Conn, _Flags, S) ->
?tp(debug, #{module => ?MODULE, conn => _Conn, flags => state, event => dgram_state_changed}),
{ok, S}.

Expand Down
14 changes: 12 additions & 2 deletions test/example_server_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
, resumed/3
, nst_received/3
, new_stream/3
, datagram_state_changed/3
, dgram_state_changed/3
, dgram_recv/4
]).

-export([handle_info/2]).
Expand Down Expand Up @@ -123,7 +124,16 @@ peer_needs_streams(_C, bidi_streams, S) ->
%% leave it for test case to unblock it, see tc_multi_streams_example_server_3
{ok, S}.

datagram_state_changed(_C, _Flags, S) ->
dgram_state_changed(_C, _Flags, S) ->
{ok, S}.

dgram_recv(C, Bin, _Flag, S) ->
%% maybe peer didn't enable,
case quicer:send_dgram(C, Bin) of
{ok, _} -> ok;
Error -> %% for testing when peer disable the receiving
ct:pal("send dgram error: ~p~n", [Error])
end,
{ok, S}.

handle_info({'EXIT', _Pid, _Reason}, State) ->
Expand Down
80 changes: 80 additions & 0 deletions test/quicer_connection_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,86 @@ tc_conn_client_bad_cert(Config) ->
ct:fail({run_error, Error})
end.

tc_datagram_disallowed(Config) ->
Port = select_port(),
ServerConnCallback = example_server_connection,
ServerStreamCallback = example_server_stream,
ListenerOpts = [{conn_acceptors, 4} | default_listen_opts(Config)],
ConnectionOpts = [ {conn_callback, ServerConnCallback}
, {stream_acceptors, 2}
| default_conn_opts()],
StreamOpts = [ {stream_callback, ServerStreamCallback}
, {disable_fpbuffer, true}
| default_stream_opts() ],
%% GIVEN: A listener with datagram_receive_enabled = false
Options = {ListenerOpts, ConnectionOpts, StreamOpts},

{ok, _} = quicer:spawn_listener(mqtt, Port, Options),
%% WHEN: Client send dgram data
{ok, Conn} = quicer:connect("localhost", Port, default_conn_opts(), 5000),
%% THEN: It get an error
?assertEqual({error, dgram_send_error, invalid_state}, quicer:send_dgram(Conn, <<"dg_ping">>)),
quicer:shutdown_connection(Conn),
ok.

tc_datagram_peer_allowed(Config) ->
Port = select_port(),
ServerConnCallback = example_server_connection,
ServerStreamCallback = example_server_stream,
%% GIVEN: A listener with datagram_receive_enabled = 1 (true)
ListenerOpts = [{conn_acceptors, 4}, {datagram_receive_enabled, 1} | default_listen_opts(Config)],
ConnectionOpts = [ {conn_callback, ServerConnCallback}
, {stream_acceptors, 2}
| default_conn_opts()],
StreamOpts = [ {stream_callback, ServerStreamCallback}
, {disable_fpbuffer, true}
| default_stream_opts() ],
Options = {ListenerOpts, ConnectionOpts, StreamOpts},

{ok, _} = quicer:spawn_listener(mqtt, Port, Options),
%% WHEN: A client send_dgram
{ok, Conn} = quicer:connect("localhost", Port, default_conn_opts(), 5000),
%% THEN: It should success
?assertEqual({ok, 7}, quicer:send_dgram(Conn, <<"dg_ping">>)),

receive
%% THEN: the client should not recv dgram from peer as the receiving is disabled
{quic, Data, _Conn, _Flag} when is_binary(Data) ->
ct:fail("client side dgram recv timeout")
after 500 ->
ok
end,
quicer:shutdown_connection(Conn),
ok.

tc_datagram_local_peer_allowed(Config) ->
Port = select_port(),
ServerConnCallback = example_server_connection,
ServerStreamCallback = example_server_stream,
%% GIVEN: A listener with datagram_receive_enabled = 1 (true)
ListenerOpts = [{conn_acceptors, 4}, {datagram_receive_enabled, 1} | default_listen_opts(Config)],
ConnectionOpts = [ {conn_callback, ServerConnCallback}
, {stream_acceptors, 2}
| default_conn_opts()],
StreamOpts = [ {stream_callback, ServerStreamCallback}
, {disable_fpbuffer, true}
| default_stream_opts() ],
Options = {ListenerOpts, ConnectionOpts, StreamOpts},

{ok, _} = quicer:spawn_listener(mqtt, Port, Options),
%% WHEN: Client connect with datagram_receive_enabled = 1 (true)
{ok, Conn} = quicer:connect("localhost", Port, [{datagram_receive_enabled, 1} | default_conn_opts()], 5000),
?assertEqual({ok, 7}, quicer:send_dgram(Conn, <<"dg_ping">>)),
receive
%% THEN: the client is able to receive the dgram from server
{quic, <<"dg_ping">>, Conn, Flag} ->
?assertEqual(0, Flag)
after 1000 ->
ct:fail("client side dgram recv timeout")
end,
quicer:shutdown_connection(Conn),
ok.

run_tc_conn_client_bad_cert(Config)->
Port = select_port(),
Owner = self(),
Expand Down
3 changes: 2 additions & 1 deletion test/quicer_test_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ reset_global_reg()->
quicer:reg_open().

shutdown_all_listeners() ->
lists:foreach(fun quicer:shutdown_listener/1,
lists:foreach(fun({{Id, _ListenOn}, _Pid}) ->
quicer:terminate_listener(Id) end,
quicer:listeners()).

%%%_* Emacs ====================================================================
Expand Down

0 comments on commit 09a60d3

Please sign in to comment.