Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add "pop at least N bytes" functionality #20

Merged
merged 1 commit into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions src/replayq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,15 @@ append(#{config := #{seg_bytes := BytesLimit, dir := Dir} = Config,
}) ->
{q(), ack_ref(), [item()]}.
pop(Q, Opts) ->
Bytes = maps:get(bytes_limit, Opts, ?DEFAULT_POP_BYTES_LIMIT),
{BytesMode, Bytes} = case maps:get(bytes_limit, Opts, ?DEFAULT_POP_BYTES_LIMIT) of
{Mode0, Bytes0} -> {Mode0, Bytes0};
Bytes0 when is_integer(Bytes0) -> {at_most, Bytes0}
end,
Count = maps:get(count_limit, Opts, ?DEFAULT_POP_COUNT_LIMIT),
{StopFun, StopFunAcc} =
maps:get(stop_before, Opts, {fun ?MODULE:default_stop_before_func/2, none}),
true = (Count > 0),
pop(Q, Bytes, Count, ?NOTHING_TO_ACK, [], StopFun, StopFunAcc).
pop(Q, BytesMode, Bytes, Count, ?NOTHING_TO_ACK, [], StopFun, StopFunAcc).

%% @doc peek the queue front item.
-spec peek(q()) -> empty | item().
Expand Down Expand Up @@ -327,25 +330,30 @@ transform(Id, [Item0 | Rest], Sizer, Count, Bytes, Acc) ->
append_in_mem([], Q) -> Q;
append_in_mem([Item | Rest], Q) -> append_in_mem(Rest, queue:in(Item, Q)).

pop(Q, _Bytes, 0, AckRef, Acc, _StopFun, _StopFunAcc) ->
pop(Q, _BytesMode, _Bytes, 0, AckRef, Acc, _StopFun, _StopFunAcc) ->
Result = lists:reverse(Acc),
ok = maybe_save_pending_acks(AckRef, Q, Result),
{Q, AckRef, Result};
pop(#{config := Cfg} = Q, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) ->
pop(Q, _BytesMode, Bytes, _Count, AckRef, Acc, _StopFun, _StopFunAcc) when Bytes =< 0 ->
Result = lists:reverse(Acc),
ok = maybe_save_pending_acks(AckRef, Q, Result),
{Q, AckRef, Result};
pop(#{config := Cfg} = Q, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) ->
case is_empty(Q) of
true ->
{Q, AckRef, lists:reverse(Acc)};
false when Cfg =:= mem_only ->
pop_mem(Q, Bytes, Count, Acc, StopFun, StopFunAcc);
pop_mem(Q, BytesMode, Bytes, Count, Acc, StopFun, StopFunAcc);
false ->
pop2(Q, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc)
pop2(Q, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc)
end.

pop_mem(#{in_mem := InMem,
stats := #{count := TotalCount, bytes := TotalBytes} = Stats
} = Q, Bytes, Count, Acc, StopFun, StopFunAcc) ->
} = Q, BytesMode, Bytes, Count, Acc, StopFun, StopFunAcc) ->
case queue:out(InMem) of
{{value, ?MEM_ONLY_ITEM(Sz, _Item)}, _} when Sz > Bytes andalso Acc =/= [] ->
{{value, ?MEM_ONLY_ITEM(Sz, _Item)}, _} when
BytesMode == at_most andalso Sz > Bytes andalso Acc =/= [] ->
{Q, ?NOTHING_TO_ACK, lists:reverse(Acc)};
{{value, ?MEM_ONLY_ITEM(Sz, Item)}, Rest} ->
case StopFun(Item, StopFunAcc) of
Expand All @@ -358,6 +366,7 @@ pop_mem(#{in_mem := InMem,
}
},
pop(NewQ,
BytesMode,
Bytes - Sz,
Count - 1,
?NOTHING_TO_ACK,
Expand All @@ -370,12 +379,13 @@ pop_mem(#{in_mem := InMem,
pop2(#{head_segno := ReaderSegno,
in_mem := HeadItems,
stats := #{count := TotalCount, bytes := TotalBytes} = Stats
} = Q, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) ->
} = Q, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc) ->
case queue:out(HeadItems) of
{empty, _} ->
Q1 = open_next_seg(Q),
pop(Q1, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc);
{{value, ?DISK_CP_ITEM(_, Sz, _Item)}, _} when Sz > Bytes andalso Acc =/= [] ->
pop(Q1, BytesMode, Bytes, Count, AckRef, Acc, StopFun, StopFunAcc);
{{value, ?DISK_CP_ITEM(_, Sz, _Item)}, _} when
BytesMode == at_most andalso Sz > Bytes andalso Acc =/= [] ->
%% taking the head item would cause exceeding size limit
{Q, AckRef, lists:reverse(Acc)};
{{value, ?DISK_CP_ITEM(Id, Sz, Item)}, Rest} ->
Expand All @@ -395,6 +405,7 @@ pop2(#{head_segno := ReaderSegno,
end,
NewAckRef = {ReaderSegno, Id},
pop(NewQ,
BytesMode,
Bytes - Sz,
Count - 1,
NewAckRef,
Expand Down Expand Up @@ -760,3 +771,9 @@ is_segment_full(#{bytes := SegmentBytes},
%% We can change implementation to split items list to avoid
%% segment overflow if really necessary
SegmentBytes >= SegmentBytesLimit.

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End:
43 changes: 43 additions & 0 deletions test/replayq_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,43 @@ corrupted_commit_test() ->
ok = replayq:close(Q4),
ok = cleanup(Dir).

pop_at_least_bytes_mem_test() ->
Config = #{
mem_only => true,
seg_bytes => 1000,
sizer => fun(Item) -> size(Item) end
},
test_pop_at_least_bytes(Config).

pop_at_least_bytes_disk_test() ->
Dir = ?DIR,
Config = #{
dir => Dir,
seg_bytes => 1000,
sizer => fun(Item) -> size(Item) end
},
test_pop_at_least_bytes(Config),
ok = cleanup(Dir).

test_pop_at_least_bytes(Config) ->
Q0 = replayq:open(Config),
%% Two 5 bytes elements
Item1 = <<"12345">>,
Item2 = <<"67890">>,
Q1 = replayq:append(Q0, [Item1, Item2]),
ItemSize = 5,
?assertEqual(ItemSize * 2, replayq:bytes(Q1)),
%% Default behavior: we pop _at most_ N bytes, and return at least 1 item, if any.
%% Asking for less bytes than the 2 elements should yield singleton batch.
{_Q2, _Ack2, [Item1]} = replayq:pop(Q1, #{count_limit => 10, bytes_limit => ItemSize - 1}),
{_Q3, _Ack3, [Item1]} = replayq:pop(Q1, #{count_limit => 10, bytes_limit => {at_most, ItemSize - 1}}),
%% ... but dropping _at least_ less bytes than 1 item should drop both of them.
{Q4, _Ack4, [Item1, Item2]} =
replayq:pop(Q1, #{count_limit => 10, bytes_limit => {at_least, ItemSize + 1}}),
?assertEqual(0, replayq:bytes(Q4)),
ok = replayq:close(Q4),
ok.

%% helpers ===========================================================

cleanup(Dir) ->
Expand Down Expand Up @@ -586,3 +623,9 @@ filename(Segno) ->
%% corrupt the segment
corrupt(#{w_cur := #{fd := Fd}}) ->
file:write(Fd, "some random bytes").

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End: