Skip to content

Commit

Permalink
Merge pull request kafka4beam#84 from zmstone/main
Browse files Browse the repository at this point in the history
fix: unexpected_id crash
  • Loading branch information
zmstone authored Jan 15, 2025
2 parents 3c8a354 + 5d3da73 commit e8ea47c
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 10 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* 4.0.5
- Fix `unexpected_id` crash introduced in 4.0.1.

* 4.0.4
- Upgrade to kafka_protocol-4.1.10 for discover/connect timeout fix.
- Upgrade to replayq from 0.3.4 to 0.3.10.
Expand Down
25 changes: 21 additions & 4 deletions src/wolff_pendack.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ insert_backlog(#{next_id := Id, backlog := Cbs, count := Count} = X, Cb) ->
NewCbs = insert_cb(Cbs, Id, Cb),
{Id, X#{next_id => Id + 1, backlog => NewCbs, count => Count + 1}}.

%% For inflights, there can be a gap if some calls have been dropped
%% from the backlog.
%% So we catch the non_sequential_call_id error and enqueue as a
%% new ID range.
insert_inflight(Cbs, Id, Cb) ->
try
insert_cb(Cbs, Id, Cb)
catch
error:{non_sequential_call_id, _, _} ->
queue:in({Id, Cb}, Cbs)
end.

insert_cb(Cbs, Id, Cb) ->
case queue:out_r(Cbs) of
{empty, _} ->
Expand All @@ -76,11 +88,16 @@ insert_cb1(Cbs, Key, Cb, Id, Cb1) ->
queue:in({Id, Cb1}, queue:in({Key, Cb}, Cbs)).

%% If the ID is a single integer, then expand it to a range.
%% Assert that new ID is the very next (+1) ID.
%% The assertion should not fail for backlog queue because the next_id
%% keeps track of it (otherwise a bug).
%% The assertion may fail for inflight queue if backlog was dropped
%% (e.g. due to OOM protection) so it is expected to be caught.
expand_id(Id0, Id) when is_integer(Id0) ->
Id =:= Id0 + 1 orelse error({unexpected_id, Id0, Id}),
Id =:= Id0 + 1 orelse error({non_sequential_call_id, Id0, Id}),
expand_id({Id0, Id0}, Id);
expand_id({MinId, MaxId}, Id) ->
Id =:= MaxId + 1 orelse error({unexpected_id, {MinId, MaxId}, Id}),
Id =:= MaxId + 1 orelse error({non_sequential_call_id, {MinId, MaxId}, Id}),
{MinId, Id}.

%% @doc Take the callback from the inflight queue.
Expand Down Expand Up @@ -149,6 +166,6 @@ move1(#{backlog := Backlog0, inflight := Inflight0} = X, Id) ->
false ->
X;
{ok, Cb, Backlog} ->
Inflight = insert_cb(Inflight0, Id, Cb),
X#{backlog := Backlog, inflight := Inflight}
Inflight = insert_inflight(Inflight0, Id, Cb),
X#{backlog := Backlog, inflight := Inflight}
end.
8 changes: 4 additions & 4 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -514,20 +514,20 @@ send_to_kafka(#{sent_reqs := SentReqs,
NewInflightCalls = InflightCalls + NrOfCalls,
_ = wolff_metrics:inflight_set(Config, NewInflightCalls),
#kpro_req{ref = Ref, no_ack = NoAck} = Req = make_request(Items, St0),
St1 = St0#{replayq := NewQ},
Sent = #{req_ref => Ref,
q_items => Items,
q_ack_ref => QAckRef,
attempts => 1
},
St2 = St1#{sent_reqs := queue:in(Sent, SentReqs),
St1 = St0#{replayq := NewQ,
sent_reqs := queue:in(Sent, SentReqs),
sent_reqs_count := NewSentReqsCount,
inflight_calls := NewInflightCalls,
pending_acks := NewPendingAcks
},
ok = request_async(Conn, Req),
St3 = maybe_fake_kafka_ack(NoAck, Sent, St2),
maybe_send_to_kafka(St3).
St2 = maybe_fake_kafka_ack(NoAck, Sent, St1),
maybe_send_to_kafka(St2).

%% when require no acks do not add to sent_reqs and ack caller immediately
maybe_fake_kafka_ack(_NoAck = true, Sent, St) ->
Expand Down
107 changes: 105 additions & 2 deletions test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
deinstall_event_logging/1,
print_telemetry_check/2]).


-include_lib("eunit/include/eunit.hrl").
-include_lib("lc/include/lc.hrl").
-include("wolff.hrl").
-include_lib("kafka_protocol/include/kpro.hrl").

-define(KEY, key(?FUNCTION_NAME)).
-define(HOSTS, [{"localhost", 9092}]).
Expand Down Expand Up @@ -326,6 +326,7 @@ zero_ack_test() ->
ets:delete(CntrEventsTable),
deinstall_event_logging(?FUNCTION_NAME).

%% replayq overflow while inflight is not empty
replayq_overflow_while_inflight_test() ->
ClientCfg = client_config(),
{ok, Client} = start_client(<<"client-1">>, ?HOSTS, ClientCfg),
Expand Down Expand Up @@ -376,7 +377,7 @@ replayq_overflow_while_inflight_test() ->
io:format(user, "~p~n", [sys:get_state(Pid)]),
error(timeout)
end,
%% the 3rd batch should stay in the queue
%% the 3rd batch should stay in the queue because max_send_ahead is 0
receive
{ack_3, Any} -> error({unexpected, Any})
after
Expand All @@ -389,6 +390,108 @@ replayq_overflow_while_inflight_test() ->
ok = stop_client(Client)
end.

%% replayq overflow while inflight is not empty and resend after Kafka is connected again
replayq_overflow_while_disconnected_test() ->
ClientCfg = client_config(),
{ok, Client} = start_client(<<"client-1">>, ?HOSTS, ClientCfg),
Msg = fun(Value) -> #{key => <<>>, value => Value} end,
Batch = fun(Value) -> [Msg(Value), Msg(Value)] end,
BatchSize = wolff_producer:batch_bytes(Batch(<<"testdata1">>)),
ProducerCfg = #{max_batch_bytes => 1, %% ensure send one call at a time
replayq_max_total_bytes => BatchSize,
required_acks => all_isr,
max_send_ahead => 1, %% allow 2 inflight requests
reconnect_delay_ms => 0
},
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),
Pid = wolff_producers:lookup_producer(Producers, 0),
#{conn := Conn} = sys:get_state(Pid),
?assert(is_process_alive(Pid)),
TesterPid = self(),
ok = meck:new(kpro, [no_history, no_link, passthrough]),
meck:expect(kpro, send,
fun(_Conn, Req) ->
Payload = iolist_to_binary(lists:last(tuple_to_list(Req))),
[_, <<N, _/binary>>] = binary:split(Payload, <<"testdata">>),
TesterPid ! {sent_to_kafka, <<"testdata", N>>},
ok
end),
AckFun = fun(_Partition, BaseOffset) -> TesterPid ! {ack, BaseOffset}, ok end,
SendF = fun(AckFun0, Value) -> wolff:send(Producers, Batch(Value), AckFun0) end,
%% send 4 batches first 2 will be inflight, 3nd is overflow (kicked out by the 4th)
proc_lib:spawn_link(fun() -> SendF(AckFun, <<"testdata1">>) end),
proc_lib:spawn_link(fun() -> timer:sleep(5), SendF(AckFun, <<"testdata2">>) end),
proc_lib:spawn_link(fun() -> timer:sleep(10), SendF(AckFun, <<"testdata3">>) end),
proc_lib:spawn_link(fun() -> timer:sleep(15), SendF(AckFun, <<"testdata4">>) end),
ExpectSent =
fun(Payload) ->
receive
{sent_to_kafka, Payload} -> ok
after
2000 ->
error(timeout)
end
end,
ExpectAck =
fun(Offset) ->
receive
{ack, Offset} -> ok
after
2000 -> error(timeout)
end
end,
try
%% the 1st batch is sent to Kafka, but no reply, so no ack
ok = ExpectSent(<<"testdata1">>),
ok = ExpectSent(<<"testdata2">>),
%% kill the connection, producer should trigger a reconnect
exit(Conn, kill),
%% the 3rd batch should be dropped (pushed out by the 4th)
ok = ExpectAck(buffer_overflow_discarded),
%% after reconnected, expect a resend of first 2 messages to Kafka
ok = ExpectSent(<<"testdata1">>),
ok = ExpectSent(<<"testdata2">>),
%% the 4th batch should stay in the queue because max_send_ahead is 1 (max-inflight=2)
receive
{sent_to_kafka, <<"testdata4">>} -> error(unexpected);
{ack, _} = Ack -> error({unexpected, Ack})
after
100 ->
ok
end,
%% fake a Kafka produce response for the 1st message
ok = mock_kafka_ack(Pid, 1),
%% expect the ack for the 1st message
ok = ExpectAck(1),
%% now expect the 4th message to be sent
ok = ExpectSent(<<"testdata4">>)
after
meck:unload(kpro),
ok = wolff:stop_producers(Producers),
ok = stop_client(Client)
end.

mock_kafka_ack(Pid, Offset) ->
#{conn := NewConn, sent_reqs := SentReqs} = sys:get_state(Pid),
{value, #{req_ref := ReqRef}} = queue:peek(SentReqs),
RspBody =
#{responses =>
[#{topic => <<"test-topic">>,
partition_responses =>
[#{partition => 0,
error_code => no_error,
base_offset => Offset,
error_message => <<>>,
log_start_offset => 0,
log_append_time => -1,
record_errors => []}
]}]},
Rsp = #kpro_rsp{api = produce,
ref = ReqRef,
msg = RspBody},
Pid ! {msg, NewConn, Rsp},
ok.

replayq_overflow_test() ->
CntrEventsTable = ets:new(cntr_events, [public]),
install_event_logging(?FUNCTION_NAME, CntrEventsTable, false),
Expand Down

0 comments on commit e8ea47c

Please sign in to comment.