Skip to content

Commit

Permalink
chore: A few improvements. (#246)
Browse files Browse the repository at this point in the history
* feat: API for starting local/remote stream

* fix: quicer_connection callback handle DOWN messages

* feat: able to mask out STREAMS_AVAILABLE

* ci: check c code format

* chore: improve code coverage
  • Loading branch information
qzhuyan authored Dec 12, 2023
1 parent 8c6f804 commit d8e1873
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 44 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/code-format.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: clang-format Check
on: [push, pull_request]
jobs:
formatting-check:
name: Formatting Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run clang-format style check for C/C++/Protobuf programs.
uses: jidicula/[email protected]
with:
clang-format-version: '13'
check-path: 'c_src'
9 changes: 9 additions & 0 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,15 @@ handle_connection_event_streams_available(QuicerConnCTX *c_ctx,
assert(c_ctx->Connection);
ErlNifEnv *env = c_ctx->env;

if (c_ctx->event_mask & QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE)
{
TP_CB_3(streams_available, (uintptr_t)c_ctx->Connection, 0);
return QUIC_STATUS_SUCCESS;
}
else
{
TP_CB_3(streams_available, (uintptr_t)c_ctx->Connection, 1);
}
ERL_NIF_TERM props_name[] = { ATOM_BIDI_STREAMS, ATOM_UNIDI_STREAMS };
ERL_NIF_TERM props_value[]
= { enif_make_uint64(env, Event->STREAMS_AVAILABLE.BidirectionalCount),
Expand Down
3 changes: 2 additions & 1 deletion c_src/quicer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ limitations under the License.

typedef enum QUICER_CONNECTION_EVENT_MASKS
{
QUICER_CONNECTION_EVENT_MASK_NST = 0x00000001
QUICER_CONNECTION_EVENT_MASK_NST = 0x00000001,
QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE = 0x00000002
} QUICER_CONNECTION_EVENT_MASK;

ERL_NIF_TERM
Expand Down
1 change: 1 addition & 0 deletions include/quicer.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@

%% QUICER_CONNECTION_EVENT_MASKS
-define(QUICER_CONNECTION_EVENT_MASK_NST , 16#00000001).
-define(QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE , 16#00000002).

%% QUICER_STREAM_EVENT_MASKS
-define(QUICER_STREAM_EVENT_MASK_START_COMPLETE , 16#00000001).
Expand Down
14 changes: 0 additions & 14 deletions src/quicer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -414,20 +414,6 @@ handle_info({quic, dgram_state_changed, C, Flags},
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => dgram_state_changed, flags => Flags}),
default_cb_ret(M:datagram_state_changed(C, Flags, CBState), State);

%%% ==============================================================
%%% Handle messages for link/monitor
%%% ==============================================================
handle_info({'EXIT', _Pid, {shutdown, normal}}, State) ->
%% exit signal from stream
{noreply, State};

handle_info({'EXIT', _Pid, {shutdown, _Other}}, State) ->
%% @todo
{noreply, State};

handle_info({'EXIT', _Pid, normal}, State) ->
%% @todo
{noreply, State};
handle_info(OtherInfo, #{callback := M,
callback_state := CBState} = State) ->
default_cb_ret(M:handle_info(OtherInfo, CBState), State).
Expand Down
21 changes: 21 additions & 0 deletions src/quicer_local_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
%% @doc Stream initiated from local
-module(quicer_local_stream).

-export([start/4,
start_link/3,
start_link/4
]).

-include("quicer_types.hrl").

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand Down Expand Up @@ -72,5 +77,21 @@
, handle_continue/2
]).

-type local_stream_opts() :: stream_opts() | proplists:proplist().
-type cb_ret() :: quicer_stream:cb_ret().
-type cb_state() :: quicer_stream:cb_state().

-spec start_link(module(), connection_handle(), local_stream_opts()) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Opts) ->
start_link(CallbackModule, Connection, Opts, []).
-spec start_link(module(), connection_handle(), local_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Opts, StartOpts) when is_list(Opts)->
start_link(CallbackModule, Connection, maps:from_list(Opts), StartOpts);
start_link(CallbackModule, Connection, Opts, StartOpts) ->
quicer_stream:start_link(CallbackModule, Connection, Opts#{is_local => true}, StartOpts).

-spec start(module(), connection_handle(), local_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret().
start(CallbackModule, Connection, Opts, StartOpts) when is_list(Opts) ->
start(CallbackModule, Connection, maps:from_list(Opts), StartOpts);
start(CallbackModule, Connection, Opts, StartOpts) ->
quicer_stream:start(CallbackModule, Connection, Opts#{is_local => true}, StartOpts).
33 changes: 33 additions & 0 deletions src/quicer_remote_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

-include("quicer_types.hrl").

-export([start/4,
start_link/3,
start_link/4,
start/6,
start_link/5,
start_link/6
]).

-callback init_handoff(stream_handle(), stream_opts(), connection_handle(), new_stream_props()) -> cb_ret().
%% Prepare callback state before ownership handoff

Expand Down Expand Up @@ -71,5 +79,30 @@
, handle_continue/2
]).

-type remote_stream_opts() :: stream_opts() | proplists:proplist().
-type cb_ret() :: quicer_stream:cb_ret().
-type cb_state() :: quicer_stream:cb_state().

-spec start_link(module(), connection_handle(), remote_stream_opts()) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Opts) ->
start_link(CallbackModule, Connection, Opts#{is_local => false}, []).

-spec start_link(module(), connection_handle(), remote_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Opts, StartOpts) ->
quicer_stream:start_link(CallbackModule, Connection, Opts#{is_local => false}, StartOpts).

-spec start(module(), connection_handle(), remote_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret().
start(CallbackModule, Connection, Opts, StartOpts) ->
quicer_stream:start(CallbackModule, Connection, Opts#{is_local => false}, StartOpts).

-spec start_link(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props()) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Stream, Opts, Props) ->
start_link(CallbackModule, Connection, Stream, Opts, Props, []).

-spec start_link(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props(), [gen_server:start_opt()]) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Stream, Opts, Props, StartOpts) ->
quicer_stream:start_link(CallbackModule, Connection, Stream, Opts, Props#{is_local => false}, StartOpts).

-spec start(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props(), [gen_server:start_opt()]) -> gen_server:start_ret().
start(CallbackModule, Connection, Stream, Opts, Props, StartOpts) ->
quicer_stream:start(CallbackModule, Connection, Stream, Opts, Props, StartOpts).
12 changes: 7 additions & 5 deletions src/quicer_server_conn_callback.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
, new_stream/3
]).

init(ConnOpts) when is_list(ConnOpts) ->
init(maps:from_list(ConnOpts));
-export([handle_info/2]).

init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)});
init(ConnOpts) when is_map(ConnOpts) ->
Expand All @@ -50,7 +50,7 @@ closed(_Conn, #{} = _Flags, S)->

new_conn(Conn, #{version := _Vsn}, #{stream_opts := SOpts} = S) ->
%% @TODO configurable behavior of spawning stream acceptor
case quicer_stream:start_link(maps:get(stream_callback, SOpts), Conn, SOpts) of
case quicer_remote_stream:start_link(maps:get(stream_callback, SOpts), Conn, SOpts) of
{ok, Pid} ->
ok = quicer:async_handshake(Conn),
{ok, S#{ conn => Conn
Expand All @@ -73,8 +73,8 @@ nst_received(_Conn, _Data, S) ->
new_stream(Stream, #{is_orphan := true} = StreamProps,
#{conn := Conn, streams := Streams, stream_opts := SOpts} = CBState) ->
%% Spawn new stream
case quicer_stream:start_link(maps:get(stream_callback, SOpts), Stream, Conn,
SOpts, StreamProps)
case quicer_remote_stream:start_link(maps:get(stream_callback, SOpts), Stream, Conn,
SOpts, StreamProps)
of
{ok, StreamOwner} ->
case quicer:handoff_stream(Stream, StreamOwner) of
Expand Down Expand Up @@ -116,6 +116,8 @@ connected(Conn, _Flags, #{ slow_start := false, stream_opts := SOpts
connected(_Connecion, _Flags, S) ->
{ok, S}.

handle_info({'EXIT', _Pid, _Reason}, State) ->
{ok, State}.

%% Internals

Expand Down
29 changes: 24 additions & 5 deletions src/quicer_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@
%% API
-export([ %% Start before conn handshake, with only Conn handle
start_link/3
, start_link/4
, start/4
%% Start after conn handshake with new Stream Handle
, start_link/5
, start_link/6
, start/6
, send/2
, send/3
]).
Expand Down Expand Up @@ -128,8 +132,12 @@
{error, Error :: {already_started, pid()}} |
{error, Error :: term()} |
ignore.
start_link(Callback, Conn, StreamOpts) when is_atom(Callback) ->
gen_server:start_link(?MODULE, [Callback, Conn, StreamOpts], []).
start_link(Callback, Conn, StreamOpts) ->
start_link(Callback, Conn, StreamOpts, []).
start_link(Callback, Conn, StreamOpts, GenStartOpts) when is_atom(Callback) ->
gen_server:start_link(?MODULE, [Callback, Conn, StreamOpts], GenStartOpts).
start(Callback, Conn, StreamOpts, GenStartOpts) when is_atom(Callback) ->
gen_server:start(?MODULE, [Callback, Conn, StreamOpts], GenStartOpts).

%%--------------------------------------------------------------------
%% @doc Start a new stream owner process and
Expand All @@ -144,11 +152,19 @@ start_link(Callback, Conn, StreamOpts) when is_atom(Callback) ->
{error, Error :: {already_started, pid()}} |
{error, Error :: term()} |
ignore.
start_link(Callback, Stream, Conn, StreamOpts, Props)
start_link(Callback, Stream, Conn, StreamOpts, Props) ->
start_link(Callback, Stream, Conn, StreamOpts, Props, []).
start_link(Callback, Stream, Conn, StreamOpts, Props, GenStartOpts)
when Callback =/= undefined
andalso is_atom(Callback)
andalso is_map(Props) ->
gen_server:start_link(?MODULE, [Callback, Stream, Conn, StreamOpts, Props, self()], []).
gen_server:start_link(?MODULE, [Callback, Stream, Conn, StreamOpts, Props, self()], GenStartOpts).

start(Callback, Stream, Conn, StreamOpts, Props, GenStartOpts)
when Callback =/= undefined
andalso is_atom(Callback)
andalso is_map(Props) ->
gen_server:start(?MODULE, [Callback, Stream, Conn, StreamOpts, Props, self()], GenStartOpts).

-spec send(pid(), binary()) -> {ok, Length::non_neg_integer()} | {error, any()}.
send(StreamProc, Data) ->
Expand Down Expand Up @@ -213,6 +229,7 @@ init([Callback, Conn, StreamOpts]) ->
{ok, InitState#{ stream => undefined
, is_owner => false
, is_local => false
, stream_opts => StreamOpts
}};
{error, Reason} ->
{stop, Reason}
Expand All @@ -231,6 +248,7 @@ init([Callback, Conn, StreamOpts]) ->
, is_owner => true
, is_local => true
, is_unidir => IsUni
, stream_opts => StreamOpts
}
}};
{error, Reason, SecReason} ->
Expand All @@ -249,7 +267,8 @@ init([Callback, Stream, Conn, StreamOpts, Props, PrevOwner]) ->
process_flag(trap_exit, true),
case Callback:init_handoff(Stream, StreamOpts, Conn, Props) of
{ok, CBState} ->
State = #{ is_owner => false
State = #{ is_owner => false %% not yet takeover the ownership
, is_local => false
, stream_opts => StreamOpts
, conn => Conn
, stream => Stream
Expand Down
21 changes: 11 additions & 10 deletions test/example_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
, datagram_state_changed/3
]).

-export([handle_info/2]).

start_link(Host, Port, {_COpts, _SOpts} = Opts)->
quicer_connection:start_link(?MODULE, {Host, Port}, Opts).

Expand All @@ -52,13 +54,12 @@ init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)});
init(#{conn := Conn, stream_opts := SOpts} = ConnOpts) when is_map(ConnOpts) ->
%% for accepting
{ok, Stream2} = quicer_stream:start_link(example_client_stream, Conn, SOpts#{is_local => false}),
{ok, Stream2} = quicer_remote_stream:start(example_client_stream, Conn, SOpts, [{spawn_opt, [link]}]),
%% for sending unidi_streams
{ok, Stream1} = quicer_stream:start_link(example_client_stream, Conn,
SOpts#{ is_local => true
, open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL
}),
{ok,_} = quicer_stream:send(Stream1, <<"ping_from_example">>, ?QUICER_SEND_FLAG_SYNC bor ?QUIC_SEND_FLAG_FIN),
{ok, Stream1} = quicer_local_stream:start(example_client_stream, Conn,
SOpts#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}, [{spawn_opt, [link]}]),

{ok, _} = quicer_stream:send(Stream1, <<"ping_from_example">>, ?QUICER_SEND_FLAG_SYNC bor ?QUIC_SEND_FLAG_FIN),
{ok, ConnOpts#{master_stream_pair => {Stream1, Stream2}}}.

closed(_Conn, #{is_peer_acked := true}, S)->
Expand Down Expand Up @@ -87,7 +88,7 @@ nst_received(_Conn, Data, S) ->
new_stream(Stream, Flags, #{ conn := Conn, streams := Streams
, stream_opts := SOpts} = CBState) ->
%% Spawn new stream
case quicer_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of
case quicer_remote_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of
{ok, StreamOwner} ->
quicer_connection:handoff_stream(Stream, StreamOwner),
{ok, CBState#{ streams := [ {StreamOwner, Stream} | Streams] }};
Expand Down Expand Up @@ -120,7 +121,7 @@ peer_needs_streams(C, #{unidi_streams := Current}, S) ->
{ok, S};
peer_needs_streams(C, #{bidi_streams := Current}, S) ->
ok = quicer:setopt(C, param_conn_settings, #{peer_bidi_stream_count => Current + 1}),
{ok, S};
%% for https://github.com/microsoft/msquic/issues/3120
peer_needs_streams(_C, undefined, S) ->
{ok, S}.

handle_info({'EXIT', _Pid, _Reason}, State) ->
{ok, State}.
12 changes: 7 additions & 5 deletions test/example_server_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
, datagram_state_changed/3
]).

-export([handle_info/2]).

init(ConnOpts) when is_list(ConnOpts) ->
init(maps:from_list(ConnOpts));
init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
Expand All @@ -61,7 +63,7 @@ closed(_Conn, _CloseProp, S) ->
{stop, normal, S}.

new_conn(Conn, #{version := _Vsn}, #{stream_opts := SOpts} = S) ->
case quicer_stream:start_link(example_server_stream, Conn, SOpts) of
case quicer_remote_stream:start_link(example_server_stream, Conn, SOpts) of
{ok, Pid} ->
ok = quicer:async_handshake(Conn),
{ok, S#{ conn => Conn
Expand All @@ -86,7 +88,7 @@ nst_received(_Conn, _Data, S) ->
new_stream(Stream, Flags, #{ conn := Conn, streams := Streams
, stream_opts := SOpts} = CBState) ->
%% Spawn new stream
case quicer_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of
case quicer_remote_stream:start(example_server_stream, Stream, Conn, SOpts, Flags, [{spawn_opt, [link]}]) of
{ok, StreamOwner} ->
case quicer:handoff_stream(Stream, StreamOwner) of
ok ->
Expand Down Expand Up @@ -119,10 +121,10 @@ peer_needs_streams(C, unidi_streams, S) ->
{ok, S};
peer_needs_streams(_C, bidi_streams, S) ->
%% leave it for test case to unblock it, see tc_multi_streams_example_server_3
{ok, S};
%% for https://github.com/microsoft/msquic/issues/3120
peer_needs_streams(_C, undefined, S) ->
{ok, S}.

datagram_state_changed(_C, _Flags, S) ->
{ok, S}.

handle_info({'EXIT', _Pid, _Reason}, State) ->
{ok, State}.
6 changes: 2 additions & 4 deletions test/example_server_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,8 @@ handle_stream_data(Stream, Bin, _Flags, #{is_unidir := true, peer_stream := Peer

case PeerStream of
undefined ->
{ok, StreamProc} = quicer_stream:start_link(?MODULE, Conn,
[ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}
, {is_local, true}
]),
{ok, StreamProc} = quicer_local_stream:start_link(?MODULE, Conn,
[ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL} ]),
{ok, _} = quicer_stream:send(StreamProc, Bin),
{ok, State#{peer_stream := StreamProc}};
StreamProc when is_pid(StreamProc) ->
Expand Down

0 comments on commit d8e1873

Please sign in to comment.