diff --git a/.github/workflows/code-format.yaml b/.github/workflows/code-format.yaml new file mode 100644 index 00000000..a789c389 --- /dev/null +++ b/.github/workflows/code-format.yaml @@ -0,0 +1,13 @@ +name: clang-format Check +on: [push, pull_request] +jobs: + formatting-check: + name: Formatting Check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Run clang-format style check for C/C++/Protobuf programs. + uses: jidicula/clang-format-action@v4.11.0 + with: + clang-format-version: '13' + check-path: 'c_src' diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index a4ec2e1b..7a5ea56d 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -1443,6 +1443,15 @@ handle_connection_event_streams_available(QuicerConnCTX *c_ctx, assert(c_ctx->Connection); ErlNifEnv *env = c_ctx->env; + if (c_ctx->event_mask & QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE) + { + TP_CB_3(streams_available, (uintptr_t)c_ctx->Connection, 0); + return QUIC_STATUS_SUCCESS; + } + else + { + TP_CB_3(streams_available, (uintptr_t)c_ctx->Connection, 1); + } ERL_NIF_TERM props_name[] = { ATOM_BIDI_STREAMS, ATOM_UNIDI_STREAMS }; ERL_NIF_TERM props_value[] = { enif_make_uint64(env, Event->STREAMS_AVAILABLE.BidirectionalCount), diff --git a/c_src/quicer_connection.h b/c_src/quicer_connection.h index 3205fd4f..f5e4d6dd 100644 --- a/c_src/quicer_connection.h +++ b/c_src/quicer_connection.h @@ -22,7 +22,8 @@ limitations under the License. typedef enum QUICER_CONNECTION_EVENT_MASKS { - QUICER_CONNECTION_EVENT_MASK_NST = 0x00000001 + QUICER_CONNECTION_EVENT_MASK_NST = 0x00000001, + QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE = 0x00000002 } QUICER_CONNECTION_EVENT_MASK; ERL_NIF_TERM diff --git a/include/quicer.hrl b/include/quicer.hrl index 485c77f2..97fcd707 100644 --- a/include/quicer.hrl +++ b/include/quicer.hrl @@ -104,6 +104,7 @@ %% QUICER_CONNECTION_EVENT_MASKS -define(QUICER_CONNECTION_EVENT_MASK_NST , 16#00000001). +-define(QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE , 16#00000002). %% QUICER_STREAM_EVENT_MASKS -define(QUICER_STREAM_EVENT_MASK_START_COMPLETE , 16#00000001). diff --git a/src/quicer_connection.erl b/src/quicer_connection.erl index edd683c6..6950a35f 100644 --- a/src/quicer_connection.erl +++ b/src/quicer_connection.erl @@ -414,20 +414,6 @@ handle_info({quic, dgram_state_changed, C, Flags}, ?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => dgram_state_changed, flags => Flags}), default_cb_ret(M:datagram_state_changed(C, Flags, CBState), State); -%%% ============================================================== -%%% Handle messages for link/monitor -%%% ============================================================== -handle_info({'EXIT', _Pid, {shutdown, normal}}, State) -> - %% exit signal from stream - {noreply, State}; - -handle_info({'EXIT', _Pid, {shutdown, _Other}}, State) -> - %% @todo - {noreply, State}; - -handle_info({'EXIT', _Pid, normal}, State) -> - %% @todo - {noreply, State}; handle_info(OtherInfo, #{callback := M, callback_state := CBState} = State) -> default_cb_ret(M:handle_info(OtherInfo, CBState), State). diff --git a/src/quicer_local_stream.erl b/src/quicer_local_stream.erl index 431a9746..14c7345c 100644 --- a/src/quicer_local_stream.erl +++ b/src/quicer_local_stream.erl @@ -17,6 +17,11 @@ %% @doc Stream initiated from local -module(quicer_local_stream). +-export([start/4, + start_link/3, + start_link/4 + ]). + -include("quicer_types.hrl"). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -72,5 +77,21 @@ , handle_continue/2 ]). +-type local_stream_opts() :: stream_opts() | proplists:proplist(). -type cb_ret() :: quicer_stream:cb_ret(). -type cb_state() :: quicer_stream:cb_state(). + +-spec start_link(module(), connection_handle(), local_stream_opts()) -> gen_server:start_ret(). +start_link(CallbackModule, Connection, Opts) -> + start_link(CallbackModule, Connection, Opts, []). +-spec start_link(module(), connection_handle(), local_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start_link(CallbackModule, Connection, Opts, StartOpts) when is_list(Opts)-> + start_link(CallbackModule, Connection, maps:from_list(Opts), StartOpts); +start_link(CallbackModule, Connection, Opts, StartOpts) -> + quicer_stream:start_link(CallbackModule, Connection, Opts#{is_local => true}, StartOpts). + +-spec start(module(), connection_handle(), local_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start(CallbackModule, Connection, Opts, StartOpts) when is_list(Opts) -> + start(CallbackModule, Connection, maps:from_list(Opts), StartOpts); +start(CallbackModule, Connection, Opts, StartOpts) -> + quicer_stream:start(CallbackModule, Connection, Opts#{is_local => true}, StartOpts). diff --git a/src/quicer_remote_stream.erl b/src/quicer_remote_stream.erl index 10914a97..0ee6f124 100644 --- a/src/quicer_remote_stream.erl +++ b/src/quicer_remote_stream.erl @@ -18,6 +18,14 @@ -include("quicer_types.hrl"). +-export([start/4, + start_link/3, + start_link/4, + start/6, + start_link/5, + start_link/6 + ]). + -callback init_handoff(stream_handle(), stream_opts(), connection_handle(), new_stream_props()) -> cb_ret(). %% Prepare callback state before ownership handoff @@ -71,5 +79,30 @@ , handle_continue/2 ]). +-type remote_stream_opts() :: stream_opts() | proplists:proplist(). -type cb_ret() :: quicer_stream:cb_ret(). -type cb_state() :: quicer_stream:cb_state(). + +-spec start_link(module(), connection_handle(), remote_stream_opts()) -> gen_server:start_ret(). +start_link(CallbackModule, Connection, Opts) -> + start_link(CallbackModule, Connection, Opts#{is_local => false}, []). + +-spec start_link(module(), connection_handle(), remote_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start_link(CallbackModule, Connection, Opts, StartOpts) -> + quicer_stream:start_link(CallbackModule, Connection, Opts#{is_local => false}, StartOpts). + +-spec start(module(), connection_handle(), remote_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start(CallbackModule, Connection, Opts, StartOpts) -> + quicer_stream:start(CallbackModule, Connection, Opts#{is_local => false}, StartOpts). + +-spec start_link(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props()) -> gen_server:start_ret(). +start_link(CallbackModule, Connection, Stream, Opts, Props) -> + start_link(CallbackModule, Connection, Stream, Opts, Props, []). + +-spec start_link(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start_link(CallbackModule, Connection, Stream, Opts, Props, StartOpts) -> + quicer_stream:start_link(CallbackModule, Connection, Stream, Opts, Props#{is_local => false}, StartOpts). + +-spec start(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start(CallbackModule, Connection, Stream, Opts, Props, StartOpts) -> + quicer_stream:start(CallbackModule, Connection, Stream, Opts, Props, StartOpts). diff --git a/src/quicer_server_conn_callback.erl b/src/quicer_server_conn_callback.erl index 3de88412..ac7c0aab 100644 --- a/src/quicer_server_conn_callback.erl +++ b/src/quicer_server_conn_callback.erl @@ -38,8 +38,8 @@ , new_stream/3 ]). -init(ConnOpts) when is_list(ConnOpts) -> - init(maps:from_list(ConnOpts)); +-export([handle_info/2]). + init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> init(S#{stream_opts := maps:from_list(SOpts)}); init(ConnOpts) when is_map(ConnOpts) -> @@ -50,7 +50,7 @@ closed(_Conn, #{} = _Flags, S)-> new_conn(Conn, #{version := _Vsn}, #{stream_opts := SOpts} = S) -> %% @TODO configurable behavior of spawning stream acceptor - case quicer_stream:start_link(maps:get(stream_callback, SOpts), Conn, SOpts) of + case quicer_remote_stream:start_link(maps:get(stream_callback, SOpts), Conn, SOpts) of {ok, Pid} -> ok = quicer:async_handshake(Conn), {ok, S#{ conn => Conn @@ -73,8 +73,8 @@ nst_received(_Conn, _Data, S) -> new_stream(Stream, #{is_orphan := true} = StreamProps, #{conn := Conn, streams := Streams, stream_opts := SOpts} = CBState) -> %% Spawn new stream - case quicer_stream:start_link(maps:get(stream_callback, SOpts), Stream, Conn, - SOpts, StreamProps) + case quicer_remote_stream:start_link(maps:get(stream_callback, SOpts), Stream, Conn, + SOpts, StreamProps) of {ok, StreamOwner} -> case quicer:handoff_stream(Stream, StreamOwner) of @@ -116,6 +116,8 @@ connected(Conn, _Flags, #{ slow_start := false, stream_opts := SOpts connected(_Connecion, _Flags, S) -> {ok, S}. +handle_info({'EXIT', _Pid, _Reason}, State) -> + {ok, State}. %% Internals diff --git a/src/quicer_stream.erl b/src/quicer_stream.erl index 5a5cc29e..40e335e5 100644 --- a/src/quicer_stream.erl +++ b/src/quicer_stream.erl @@ -88,8 +88,12 @@ %% API -export([ %% Start before conn handshake, with only Conn handle start_link/3 + , start_link/4 + , start/4 %% Start after conn handshake with new Stream Handle , start_link/5 + , start_link/6 + , start/6 , send/2 , send/3 ]). @@ -128,8 +132,12 @@ {error, Error :: {already_started, pid()}} | {error, Error :: term()} | ignore. -start_link(Callback, Conn, StreamOpts) when is_atom(Callback) -> - gen_server:start_link(?MODULE, [Callback, Conn, StreamOpts], []). +start_link(Callback, Conn, StreamOpts) -> + start_link(Callback, Conn, StreamOpts, []). +start_link(Callback, Conn, StreamOpts, GenStartOpts) when is_atom(Callback) -> + gen_server:start_link(?MODULE, [Callback, Conn, StreamOpts], GenStartOpts). +start(Callback, Conn, StreamOpts, GenStartOpts) when is_atom(Callback) -> + gen_server:start(?MODULE, [Callback, Conn, StreamOpts], GenStartOpts). %%-------------------------------------------------------------------- %% @doc Start a new stream owner process and @@ -144,11 +152,19 @@ start_link(Callback, Conn, StreamOpts) when is_atom(Callback) -> {error, Error :: {already_started, pid()}} | {error, Error :: term()} | ignore. -start_link(Callback, Stream, Conn, StreamOpts, Props) +start_link(Callback, Stream, Conn, StreamOpts, Props) -> + start_link(Callback, Stream, Conn, StreamOpts, Props, []). +start_link(Callback, Stream, Conn, StreamOpts, Props, GenStartOpts) when Callback =/= undefined andalso is_atom(Callback) andalso is_map(Props) -> - gen_server:start_link(?MODULE, [Callback, Stream, Conn, StreamOpts, Props, self()], []). + gen_server:start_link(?MODULE, [Callback, Stream, Conn, StreamOpts, Props, self()], GenStartOpts). + +start(Callback, Stream, Conn, StreamOpts, Props, GenStartOpts) + when Callback =/= undefined + andalso is_atom(Callback) + andalso is_map(Props) -> + gen_server:start(?MODULE, [Callback, Stream, Conn, StreamOpts, Props, self()], GenStartOpts). -spec send(pid(), binary()) -> {ok, Length::non_neg_integer()} | {error, any()}. send(StreamProc, Data) -> @@ -213,6 +229,7 @@ init([Callback, Conn, StreamOpts]) -> {ok, InitState#{ stream => undefined , is_owner => false , is_local => false + , stream_opts => StreamOpts }}; {error, Reason} -> {stop, Reason} @@ -231,6 +248,7 @@ init([Callback, Conn, StreamOpts]) -> , is_owner => true , is_local => true , is_unidir => IsUni + , stream_opts => StreamOpts } }}; {error, Reason, SecReason} -> @@ -249,7 +267,8 @@ init([Callback, Stream, Conn, StreamOpts, Props, PrevOwner]) -> process_flag(trap_exit, true), case Callback:init_handoff(Stream, StreamOpts, Conn, Props) of {ok, CBState} -> - State = #{ is_owner => false + State = #{ is_owner => false %% not yet takeover the ownership + , is_local => false , stream_opts => StreamOpts , conn => Conn , stream => Stream diff --git a/test/example_client_connection.erl b/test/example_client_connection.erl index 85c5af32..030b850f 100644 --- a/test/example_client_connection.erl +++ b/test/example_client_connection.erl @@ -43,6 +43,8 @@ , datagram_state_changed/3 ]). +-export([handle_info/2]). + start_link(Host, Port, {_COpts, _SOpts} = Opts)-> quicer_connection:start_link(?MODULE, {Host, Port}, Opts). @@ -52,13 +54,12 @@ init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> init(S#{stream_opts := maps:from_list(SOpts)}); init(#{conn := Conn, stream_opts := SOpts} = ConnOpts) when is_map(ConnOpts) -> %% for accepting - {ok, Stream2} = quicer_stream:start_link(example_client_stream, Conn, SOpts#{is_local => false}), + {ok, Stream2} = quicer_remote_stream:start(example_client_stream, Conn, SOpts, [{spawn_opt, [link]}]), %% for sending unidi_streams - {ok, Stream1} = quicer_stream:start_link(example_client_stream, Conn, - SOpts#{ is_local => true - , open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL - }), - {ok,_} = quicer_stream:send(Stream1, <<"ping_from_example">>, ?QUICER_SEND_FLAG_SYNC bor ?QUIC_SEND_FLAG_FIN), + {ok, Stream1} = quicer_local_stream:start(example_client_stream, Conn, + SOpts#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}, [{spawn_opt, [link]}]), + + {ok, _} = quicer_stream:send(Stream1, <<"ping_from_example">>, ?QUICER_SEND_FLAG_SYNC bor ?QUIC_SEND_FLAG_FIN), {ok, ConnOpts#{master_stream_pair => {Stream1, Stream2}}}. closed(_Conn, #{is_peer_acked := true}, S)-> @@ -87,7 +88,7 @@ nst_received(_Conn, Data, S) -> new_stream(Stream, Flags, #{ conn := Conn, streams := Streams , stream_opts := SOpts} = CBState) -> %% Spawn new stream - case quicer_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of + case quicer_remote_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of {ok, StreamOwner} -> quicer_connection:handoff_stream(Stream, StreamOwner), {ok, CBState#{ streams := [ {StreamOwner, Stream} | Streams] }}; @@ -120,7 +121,7 @@ peer_needs_streams(C, #{unidi_streams := Current}, S) -> {ok, S}; peer_needs_streams(C, #{bidi_streams := Current}, S) -> ok = quicer:setopt(C, param_conn_settings, #{peer_bidi_stream_count => Current + 1}), - {ok, S}; -%% for https://github.com/microsoft/msquic/issues/3120 -peer_needs_streams(_C, undefined, S) -> {ok, S}. + +handle_info({'EXIT', _Pid, _Reason}, State) -> + {ok, State}. diff --git a/test/example_server_connection.erl b/test/example_server_connection.erl index a1c6dc94..e140eb86 100644 --- a/test/example_server_connection.erl +++ b/test/example_server_connection.erl @@ -50,6 +50,8 @@ , datagram_state_changed/3 ]). +-export([handle_info/2]). + init(ConnOpts) when is_list(ConnOpts) -> init(maps:from_list(ConnOpts)); init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> @@ -61,7 +63,7 @@ closed(_Conn, _CloseProp, S) -> {stop, normal, S}. new_conn(Conn, #{version := _Vsn}, #{stream_opts := SOpts} = S) -> - case quicer_stream:start_link(example_server_stream, Conn, SOpts) of + case quicer_remote_stream:start_link(example_server_stream, Conn, SOpts) of {ok, Pid} -> ok = quicer:async_handshake(Conn), {ok, S#{ conn => Conn @@ -86,7 +88,7 @@ nst_received(_Conn, _Data, S) -> new_stream(Stream, Flags, #{ conn := Conn, streams := Streams , stream_opts := SOpts} = CBState) -> %% Spawn new stream - case quicer_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of + case quicer_remote_stream:start(example_server_stream, Stream, Conn, SOpts, Flags, [{spawn_opt, [link]}]) of {ok, StreamOwner} -> case quicer:handoff_stream(Stream, StreamOwner) of ok -> @@ -119,10 +121,10 @@ peer_needs_streams(C, unidi_streams, S) -> {ok, S}; peer_needs_streams(_C, bidi_streams, S) -> %% leave it for test case to unblock it, see tc_multi_streams_example_server_3 - {ok, S}; -%% for https://github.com/microsoft/msquic/issues/3120 -peer_needs_streams(_C, undefined, S) -> {ok, S}. datagram_state_changed(_C, _Flags, S) -> {ok, S}. + +handle_info({'EXIT', _Pid, _Reason}, State) -> + {ok, State}. diff --git a/test/example_server_stream.erl b/test/example_server_stream.erl index ef168ffc..634e6d72 100644 --- a/test/example_server_stream.erl +++ b/test/example_server_stream.erl @@ -121,10 +121,8 @@ handle_stream_data(Stream, Bin, _Flags, #{is_unidir := true, peer_stream := Peer case PeerStream of undefined -> - {ok, StreamProc} = quicer_stream:start_link(?MODULE, Conn, - [ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL} - , {is_local, true} - ]), + {ok, StreamProc} = quicer_local_stream:start_link(?MODULE, Conn, + [ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL} ]), {ok, _} = quicer_stream:send(StreamProc, Bin), {ok, State#{peer_stream := StreamProc}}; StreamProc when is_pid(StreamProc) ->