diff --git a/src/replayq.erl b/src/replayq.erl index cf87f0b..91ddf32 100644 --- a/src/replayq.erl +++ b/src/replayq.erl @@ -244,12 +244,15 @@ append(#{config := #{seg_bytes := BytesLimit, dir := Dir} = Config, }) -> {q(), ack_ref(), [item()]}. pop(Q, Opts) -> - Bytes = maps:get(bytes_limit, Opts, ?DEFAULT_POP_BYTES_LIMIT), + {BytesMode, Bytes} = case maps:get(bytes_limit, Opts, ?DEFAULT_POP_BYTES_LIMIT) of + {Mode0, Bytes0} -> {Mode0, Bytes0}; + Bytes0 when is_integer(Bytes0) -> {at_most, Bytes0} + end, Count = maps:get(count_limit, Opts, ?DEFAULT_POP_COUNT_LIMIT), {StopFun, StopFunAcc} = maps:get(stop_before, Opts, {fun ?MODULE:default_stop_before_func/2, none}), true = (Count > 0), - pop(Q, Bytes, Count, ?NOTHING_TO_ACK, [], StopFun, StopFunAcc). + pop(Q, BytesMode, Bytes, Count, ?NOTHING_TO_ACK, [], StopFun, StopFunAcc). %% @doc peek the queue front item. -spec peek(q()) -> empty | item(). @@ -327,25 +330,30 @@ transform(Id, [Item0 | Rest], Sizer, Count, Bytes, Acc) -> append_in_mem([], Q) -> Q; append_in_mem([Item | Rest], Q) -> append_in_mem(Rest, queue:in(Item, Q)). -pop(Q, _Bytes, 0, AckRef, Acc, _StopFun, _StopFunAcc) -> +pop(Q, _BytesMode, _Bytes, 0, AckRef, Acc, _StopFun, _StopFunAcc) -> Result = lists:reverse(Acc), ok = maybe_save_pending_acks(AckRef, Q, Result), {Q, AckRef, Result}; -pop(#{config := Cfg} = Q, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) -> +pop(Q, _BytesMode, Bytes, _Count, AckRef, Acc, _StopFun, _StopFunAcc) when Bytes =< 0 -> + Result = lists:reverse(Acc), + ok = maybe_save_pending_acks(AckRef, Q, Result), + {Q, AckRef, Result}; +pop(#{config := Cfg} = Q, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) -> case is_empty(Q) of true -> {Q, AckRef, lists:reverse(Acc)}; false when Cfg =:= mem_only -> - pop_mem(Q, Bytes, Count, Acc, StopFun, StopFunAcc); + pop_mem(Q, BytesMode, Bytes, Count, Acc, StopFun, StopFunAcc); false -> - pop2(Q, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) + pop2(Q, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) end. pop_mem(#{in_mem := InMem, stats := #{count := TotalCount, bytes := TotalBytes} = Stats - } = Q, Bytes, Count, Acc, StopFun, StopFunAcc) -> + } = Q, BytesMode, Bytes, Count, Acc, StopFun, StopFunAcc) -> case queue:out(InMem) of - {{value, ?MEM_ONLY_ITEM(Sz, _Item)}, _} when Sz > Bytes andalso Acc =/= [] -> + {{value, ?MEM_ONLY_ITEM(Sz, _Item)}, _} when + BytesMode == at_most andalso Sz > Bytes andalso Acc =/= [] -> {Q, ?NOTHING_TO_ACK, lists:reverse(Acc)}; {{value, ?MEM_ONLY_ITEM(Sz, Item)}, Rest} -> case StopFun(Item, StopFunAcc) of @@ -358,6 +366,7 @@ pop_mem(#{in_mem := InMem, } }, pop(NewQ, + BytesMode, Bytes - Sz, Count - 1, ?NOTHING_TO_ACK, @@ -370,12 +379,13 @@ pop_mem(#{in_mem := InMem, pop2(#{head_segno := ReaderSegno, in_mem := HeadItems, stats := #{count := TotalCount, bytes := TotalBytes} = Stats - } = Q, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) -> + } = Q, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) -> case queue:out(HeadItems) of {empty, _} -> Q1 = open_next_seg(Q), - pop(Q1, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc); - {{value, ?DISK_CP_ITEM(_, Sz, _Item)}, _} when Sz > Bytes andalso Acc =/= [] -> + pop(Q1, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc); + {{value, ?DISK_CP_ITEM(_, Sz, _Item)}, _} when + BytesMode == at_most andalso Sz > Bytes andalso Acc =/= [] -> %% taking the head item would cause exceeding size limit {Q, AckRef, lists:reverse(Acc)}; {{value, ?DISK_CP_ITEM(Id, Sz, Item)}, Rest} -> @@ -395,6 +405,7 @@ pop2(#{head_segno := ReaderSegno, end, NewAckRef = {ReaderSegno, Id}, pop(NewQ, + BytesMode, Bytes - Sz, Count - 1, NewAckRef, @@ -760,3 +771,9 @@ is_segment_full(#{bytes := SegmentBytes}, %% We can change implementation to split items list to avoid %% segment overflow if really necessary SegmentBytes >= SegmentBytesLimit. + +%%%_* Emacs ==================================================================== +%%% Local Variables: +%%% allout-layout: t +%%% erlang-indent-level: 2 +%%% End: diff --git a/test/replayq_tests.erl b/test/replayq_tests.erl index 300d609..fc3dfb6 100644 --- a/test/replayq_tests.erl +++ b/test/replayq_tests.erl @@ -559,6 +559,43 @@ corrupted_commit_test() -> ok = replayq:close(Q4), ok = cleanup(Dir). +pop_at_least_bytes_mem_test() -> + Config = #{ + mem_only => true, + seg_bytes => 1000, + sizer => fun(Item) -> size(Item) end + }, + test_pop_at_least_bytes(Config). + +pop_at_least_bytes_disk_test() -> + Dir = ?DIR, + Config = #{ + dir => Dir, + seg_bytes => 1000, + sizer => fun(Item) -> size(Item) end + }, + test_pop_at_least_bytes(Config), + ok = cleanup(Dir). + +test_pop_at_least_bytes(Config) -> + Q0 = replayq:open(Config), + %% Two 5 bytes elements + Item1 = <<"12345">>, + Item2 = <<"67890">>, + Q1 = replayq:append(Q0, [Item1, Item2]), + ItemSize = 5, + ?assertEqual(ItemSize * 2, replayq:bytes(Q1)), + %% Default behavior: we pop _at most_ N bytes, and return at least 1 item, if any. + %% Asking for less bytes than the 2 elements should yield singleton batch. + {_Q2, _Ack2, [Item1]} = replayq:pop(Q1, #{count_limit => 10, bytes_limit => ItemSize - 1}), + {_Q3, _Ack3, [Item1]} = replayq:pop(Q1, #{count_limit => 10, bytes_limit => {at_most, ItemSize - 1}}), + %% ... but dropping _at least_ less bytes than 1 item should drop both of them. + {Q4, _Ack4, [Item1, Item2]} = + replayq:pop(Q1, #{count_limit => 10, bytes_limit => {at_least, ItemSize + 1}}), + ?assertEqual(0, replayq:bytes(Q4)), + ok = replayq:close(Q4), + ok. + %% helpers =========================================================== cleanup(Dir) -> @@ -586,3 +623,9 @@ filename(Segno) -> %% corrupt the segment corrupt(#{w_cur := #{fd := Fd}}) -> file:write(Fd, "some random bytes"). + +%%%_* Emacs ==================================================================== +%%% Local Variables: +%%% allout-layout: t +%%% erlang-indent-level: 2 +%%% End: