diff --git a/changelog.md b/changelog.md index 2908f6d..5f6dbee 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,8 @@ +* 4.0.6 + - Use more aggressive buffer overflow mode when using memory mode buffer and the system memory usage is high, to reduce risk of OOM. + - Upgrade replayq from 0.3.10 to 0.3.11. + - Upgrade lc from 0.3.2 to 0.3.4. + * 4.0.5 - Fix `unexpected_id` crash introduced in 4.0.1. diff --git a/rebar.config b/rebar.config index 7a6c24b..6daed70 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,6 @@ {deps, [ {kafka_protocol, "4.1.10"} - , {replayq, "0.3.10"} - , {lc, "0.3.2"} + , {replayq, "0.3.11"} + , {git, "https://github.com/emqx/lc.git", {tag, "0.3.4"}} , {telemetry, "1.1.0"} ]}. diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index 2463281..6c7cbe2 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -929,9 +929,11 @@ enqueue_calls2(Calls, wolff_metrics:queuing_set(Config0, replayq:count(NewQ)), wolff_metrics:queuing_bytes_set(Config0, replayq:bytes(NewQ)), lists:foreach(fun maybe_reply_queued/1, Calls), - Overflow = case maps:get(drop_if_highmem, Config0, false) - andalso replayq:is_mem_only(NewQ) - andalso load_ctl:is_high_mem() of + IsHighMemOverflow = + maps:get(drop_if_highmem, Config0, false) + andalso replayq:is_mem_only(NewQ) + andalso load_ctl:is_high_mem(), + Overflow = case IsHighMemOverflow of true -> max(replayq:overflow(NewQ), CallByteSize); false -> @@ -940,6 +942,7 @@ enqueue_calls2(Calls, handle_overflow(St0#{replayq := NewQ, pending_acks := PendingAcks }, + IsHighMemOverflow, Overflow). maybe_reply_queued(?SEND_REQ(?no_queued_reply, _, _)) -> @@ -956,16 +959,22 @@ eval_ack_cb(?ACK_CB({Caller, Ref}, Partition), BaseOffset) when ?IS_SYNC_REF(Cal _ = erlang:send(Caller, {Ref, Partition, BaseOffset}), ok. -handle_overflow(St, Overflow) when Overflow =< 0 -> +handle_overflow(St, _IsHighMemOverflow, Overflow) when Overflow =< 0 -> ok = maybe_log_discard(St, 0), St; handle_overflow(#{replayq := Q, pending_acks := PendingAcks, config := Config } = St, + IsHighMemOverflow, Overflow) -> + BytesMode = + case IsHighMemOverflow of + true -> at_least; + false -> at_most + end, {NewQ, QAckRef, Items} = - replayq:pop(Q, #{bytes_limit => Overflow, count_limit => 999999999}), + replayq:pop(Q, #{bytes_limit => {BytesMode, Overflow}, count_limit => 999999999}), ok = replayq:ack(NewQ, QAckRef), Calls = get_calls_from_queue_items(Items), CallIDs = lists:map(fun({CallId, _BatchSize}) -> CallId end, Calls),