Skip to content

Commit

Permalink
feat: drop at least overflowed bytes when memory is under stress
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Jan 31, 2025
1 parent e8ea47c commit 76f079e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
* 4.0.6
- Use more aggressive buffer overflow mode when using memory mode buffer and the system memory usage is high, to reduce risk of OOM.
- Upgrade replayq from 0.3.10 to 0.3.11.
- Upgrade lc from 0.3.2 to 0.3.4.
p
* 4.0.5
- Fix `unexpected_id` crash introduced in 4.0.1.

Expand Down
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{deps, [ {kafka_protocol, "4.1.10"}
, {replayq, "0.3.10"}
, {lc, "0.3.2"}
, {replayq, "0.3.11"}
, {lc, "0.3.4"}
, {telemetry, "1.1.0"}
]}.

Expand Down
19 changes: 14 additions & 5 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -929,9 +929,11 @@ enqueue_calls2(Calls,
wolff_metrics:queuing_set(Config0, replayq:count(NewQ)),
wolff_metrics:queuing_bytes_set(Config0, replayq:bytes(NewQ)),
lists:foreach(fun maybe_reply_queued/1, Calls),
Overflow = case maps:get(drop_if_highmem, Config0, false)
andalso replayq:is_mem_only(NewQ)
andalso load_ctl:is_high_mem() of
IsHighMemOverflow =
maps:get(drop_if_highmem, Config0, false)
andalso replayq:is_mem_only(NewQ)
andalso load_ctl:is_high_mem(),
Overflow = case IsHighMemOverflow of
true ->
max(replayq:overflow(NewQ), CallByteSize);
false ->
Expand All @@ -940,6 +942,7 @@ enqueue_calls2(Calls,
handle_overflow(St0#{replayq := NewQ,
pending_acks := PendingAcks
},
IsHighMemOverflow,
Overflow).

maybe_reply_queued(?SEND_REQ(?no_queued_reply, _, _)) ->
Expand All @@ -956,16 +959,22 @@ eval_ack_cb(?ACK_CB({Caller, Ref}, Partition), BaseOffset) when ?IS_SYNC_REF(Cal
_ = erlang:send(Caller, {Ref, Partition, BaseOffset}),
ok.

handle_overflow(St, Overflow) when Overflow =< 0 ->
handle_overflow(St, _IsHighMemOverflow, Overflow) when Overflow =< 0 ->
ok = maybe_log_discard(St, 0),
St;
handle_overflow(#{replayq := Q,
pending_acks := PendingAcks,
config := Config
} = St,
IsHighMemOverflow,
Overflow) ->
BytesMode =
case IsHighMemOverflow of
true -> at_least;
false -> at_most
end,
{NewQ, QAckRef, Items} =
replayq:pop(Q, #{bytes_limit => Overflow, count_limit => 999999999}),
replayq:pop(Q, #{bytes_limit => {BytesMode, Overflow}, count_limit => 999999999}),
ok = replayq:ack(NewQ, QAckRef),
Calls = get_calls_from_queue_items(Items),
CallIDs = lists:map(fun({CallId, _BatchSize}) -> CallId end, Calls),
Expand Down

0 comments on commit 76f079e

Please sign in to comment.