Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nodes to sync chunks above the disk pool threshold #700

Merged
merged 4 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions apps/arweave/e2e/ar_e2e.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
write_chunk_fixture/3, load_chunk_fixture/2]).

-export([delayed_print/2, packing_type_to_packing/2,
start_source_node/3, source_node_storage_modules/3, max_chunk_offset/1,
start_source_node/3, source_node_storage_modules/3, max_chunk_offset/1, mine_block/4,
assert_block/2, assert_syncs_range/3, assert_does_not_sync_range/3, assert_has_entropy/4,
assert_chunks/3, assert_chunks/4, assert_no_chunks/2,
assert_partition_size/3, assert_empty_partition/3,
assert_partition_size/3, assert_partition_size/4, assert_empty_partition/3,
assert_mine_and_validate/3]).

-include_lib("arweave/include/ar.hrl").
Expand Down Expand Up @@ -90,13 +90,17 @@ start_source_node(Node, unpacked, _WalletFixture) ->
}, true),

?LOG_INFO("Source node ~p started.", [Node]),


ar_e2e:assert_syncs_range(Node, 0, 4*?PARTITION_SIZE),

ar_e2e:assert_partition_size(Node, 0, unpacked),
ar_e2e:assert_partition_size(Node, 1, unpacked),
ar_e2e:assert_partition_size(Node, 2, unpacked, floor(0.5*?PARTITION_SIZE)),

ar_e2e:assert_syncs_range(Node, ?PARTITION_SIZE, 2*?PARTITION_SIZE),
ar_e2e:assert_chunks(Node, unpacked, Chunks),

ar_e2e:assert_empty_partition(Node, 3, unpacked),

?LOG_INFO("Source node ~p assertions passed.", [Node]),

ar_test_node:stop(TempNode),
Expand Down Expand Up @@ -124,11 +128,11 @@ start_source_node(Node, PackingType, WalletFixture) ->

%% Note: small chunks will be padded to 256 KiB. So B1 actually contains 3 chunks of data
%% and B2 starts at a chunk boundary and contains 1 chunk of data.
B1 = mine_block(Node, Wallet, floor(2.5 * ?DATA_CHUNK_SIZE)),
B2 = mine_block(Node, Wallet, floor(0.75 * ?DATA_CHUNK_SIZE)),
B3 = mine_block(Node, Wallet, ?PARTITION_SIZE),
B4 = mine_block(Node, Wallet, ?PARTITION_SIZE),
B5 = mine_block(Node, Wallet, ?PARTITION_SIZE),
B1 = mine_block(Node, Wallet, floor(2.5 * ?DATA_CHUNK_SIZE), false), %% p1
B2 = mine_block(Node, Wallet, floor(0.75 * ?DATA_CHUNK_SIZE), false), %% p1
B3 = mine_block(Node, Wallet, ?PARTITION_SIZE, false), %% p1 to p2
B4 = mine_block(Node, Wallet, floor(0.5 * ?PARTITION_SIZE), false), %% p2
B5 = mine_block(Node, Wallet, ?PARTITION_SIZE, true), %% p3 chunks are stored in disk pool

%% List of {Block, EndOffset, ChunkSize}
Chunks = [
Expand All @@ -146,14 +150,16 @@ start_source_node(Node, PackingType, WalletFixture) ->

SourcePacking = ar_e2e:packing_type_to_packing(PackingType, RewardAddr),

ar_e2e:assert_syncs_range(Node, 0, 4*?PARTITION_SIZE),

ar_e2e:assert_partition_size(Node, 0, SourcePacking),
ar_e2e:assert_partition_size(Node, 1, SourcePacking),

ar_e2e:assert_syncs_range(Node,
?PARTITION_SIZE,
2*?PARTITION_SIZE + ar_storage_module:get_overlap(SourcePacking)),
ar_e2e:assert_partition_size(Node, 2, SourcePacking, floor(0.5*?PARTITION_SIZE)),

ar_e2e:assert_chunks(Node, SourcePacking, Chunks),

ar_e2e:assert_empty_partition(Node, 3, SourcePacking),

?LOG_INFO("Source node ~p assertions passed.", [Node]),

{[B0, B1, B2, B3, B4, B5], RewardAddr, Chunks}.
Expand All @@ -175,20 +181,18 @@ source_node_storage_modules(SourcePacking) ->
{?PARTITION_SIZE, 1, SourcePacking},
{?PARTITION_SIZE, 2, SourcePacking},
{?PARTITION_SIZE, 3, SourcePacking},
{?PARTITION_SIZE, 4, SourcePacking},
{?PARTITION_SIZE, 5, SourcePacking},
{?PARTITION_SIZE, 6, SourcePacking}
{?PARTITION_SIZE, 4, SourcePacking}
].

mine_block(Node, Wallet, DataSize) ->
mine_block(Node, Wallet, DataSize, IsTemporary) ->
WeaveSize = ar_test_node:remote_call(Node, ar_node, get_current_weave_size, []),
Addr = ar_wallet:to_address(Wallet),
{TX, Chunks} = generate_tx(Node, Wallet, WeaveSize, DataSize),
B = ar_test_node:post_and_mine(#{ miner => Node, await_on => Node }, [TX]),

?assertEqual(Addr, B#block.reward_addr),

Proofs = ar_test_data_sync:post_proofs(Node, B, TX, Chunks),
Proofs = ar_test_data_sync:post_proofs(Node, B, TX, Chunks, IsTemporary),

ar_test_data_sync:wait_until_syncs_chunks(Node, Proofs, infinity),
B.
Expand Down Expand Up @@ -258,8 +262,9 @@ assert_syncs_range(Node, StartOffset, EndOffset) ->
case HasRange of
true ->
ok;
false ->
SyncRecord = ar_http_iface_client:get_sync_record(Node, json),
_ ->
SyncRecord = ar_http_iface_client:get_sync_record(
ar_test_node:peer_ip(Node)),
?assert(false,
iolist_to_binary(io_lib:format(
"~s failed to sync range ~p - ~p. Sync record: ~p",
Expand All @@ -270,15 +275,16 @@ assert_does_not_sync_range(Node, StartOffset, EndOffset) ->
ar_util:do_until(
fun() -> has_range(Node, StartOffset, EndOffset) end,
1000,
60_000
15_000
),
?assertEqual(false, has_range(Node, StartOffset, EndOffset),
iolist_to_binary(io_lib:format(
"~s synced range when it should not have: ~p - ~p",
[Node, StartOffset, EndOffset]))).

assert_partition_size(Node, PartitionNumber, Packing) ->
Size = ?PARTITION_SIZE,
assert_partition_size(Node, PartitionNumber, Packing, ?PARTITION_SIZE).
assert_partition_size(Node, PartitionNumber, Packing, Size) ->
?LOG_INFO("~p: Asserting partition ~p,~p is size ~p",
[Node, PartitionNumber, ar_serialize:encode_packing(Packing, true), Size]),
?assert(
Expand All @@ -288,7 +294,7 @@ assert_partition_size(Node, PartitionNumber, Packing) ->
[PartitionNumber, Packing]) >= Size
end,
100,
60_000
120_000
),
iolist_to_binary(io_lib:format(
"~s partition ~p,~p failed to reach size ~p. Current size: ~p.",
Expand All @@ -303,14 +309,14 @@ assert_empty_partition(Node, PartitionNumber, Packing) ->
[PartitionNumber, Packing]) > 0
end,
100,
30_000
15_000
),
?assertEqual(
0,
ar_test_node:remote_call(Node, ar_mining_stats, get_partition_data_size,
[PartitionNumber, Packing]),
iolist_to_binary(io_lib:format(
"~s partition ~p,~p os not empty", [Node, PartitionNumber,
"~s partition ~p,~p is not empty", [Node, PartitionNumber,
ar_serialize:encode_packing(Packing, true)]))).

assert_mine_and_validate(MinerNode, ValidatorNode, MinerPacking) ->
Expand Down
25 changes: 20 additions & 5 deletions apps/arweave/e2e/ar_repack_mine_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,38 @@ test_repack_mine({FromPackingType, ToPackingType}) ->
mining_addr = AddrB
}),

ar_e2e:assert_syncs_range(RepackerNode, 0, 4*?PARTITION_SIZE),
ar_e2e:assert_partition_size(RepackerNode, 0, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 1, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 2, ToPacking, floor(0.5*?PARTITION_SIZE)),
ar_e2e:assert_chunks(RepackerNode, ToPacking, Chunks),
ar_e2e:assert_empty_partition(RepackerNode, 3, ToPacking),

ar_test_node:restart_with_config(RepackerNode, Config#config{
storage_modules = StorageModules,
mining_addr = AddrB
}),
ar_e2e:assert_syncs_range(RepackerNode,
?PARTITION_SIZE,
2*?PARTITION_SIZE + ar_storage_module:get_overlap(ToPacking)),

ar_e2e:assert_syncs_range(RepackerNode, 0, 4*?PARTITION_SIZE),
ar_e2e:assert_partition_size(RepackerNode, 0, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 1, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 2, ToPacking, floor(0.5*?PARTITION_SIZE)),
ar_e2e:assert_chunks(RepackerNode, ToPacking, Chunks),
ar_e2e:assert_empty_partition(RepackerNode, 3, ToPacking),

case ToPackingType of
unpacked ->
ok;
_ ->
ar_e2e:assert_mine_and_validate(RepackerNode, ValidatorNode, ToPacking)
ar_e2e:assert_mine_and_validate(RepackerNode, ValidatorNode, ToPacking),

%% Now that we mined a block, the rest of partition 2 is below the disk pool
%% threshold
ar_e2e:assert_syncs_range(RepackerNode, 0, 4*?PARTITION_SIZE),
ar_e2e:assert_partition_size(RepackerNode, 0, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 1, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 2, ToPacking),
%% All of partition 3 is still above the disk pool threshold
ar_e2e:assert_empty_partition(RepackerNode, 3, ToPacking)
end.

test_repacking_blocked({FromPackingType, ToPackingType}) ->
Expand Down
98 changes: 84 additions & 14 deletions apps/arweave/e2e/ar_sync_pack_mine_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ setup_source_node(PackingType) ->
{Blocks, Chunks, PackingType}.

instantiator(GenesisData, SinkPackingType, TestFun) ->
{timeout, 300, {with, {GenesisData, SinkPackingType}, [TestFun]}}.
{timeout, 600, {with, {GenesisData, SinkPackingType}, [TestFun]}}.

%% --------------------------------------------------------------------------------------------
%% Test Registration
Expand Down Expand Up @@ -71,7 +71,9 @@ unpacked_edge_case_test_() ->
{setup, fun () -> setup_source_node(unpacked) end,
fun (GenesisData) ->
[
instantiator(GenesisData, replica_2_9,
instantiator(GenesisData, {replica_2_9, unpacked},
fun test_unpacked_and_packed_sync_pack_mine/1),
instantiator(GenesisData, {unpacked, replica_2_9},
fun test_unpacked_and_packed_sync_pack_mine/1),
instantiator(GenesisData, replica_2_9,
fun test_entropy_first_sync_pack_mine/1),
Expand All @@ -84,16 +86,25 @@ spora_2_6_edge_case_test_() ->
{setup, fun () -> setup_source_node(spora_2_6) end,
fun (GenesisData) ->
[
instantiator(GenesisData, replica_2_9,
instantiator(GenesisData, {replica_2_9, unpacked},
fun test_unpacked_and_packed_sync_pack_mine/1),
instantiator(GenesisData, {unpacked, replica_2_9},
fun test_unpacked_and_packed_sync_pack_mine/1),
instantiator(GenesisData, replica_2_9,
fun test_entropy_first_sync_pack_mine/1),
instantiator(GenesisData, replica_2_9,
fun test_entropy_last_sync_pack_mine/1)

]
end}.

disk_pool_threshold_test_() ->
[
instantiator(unpacked, replica_2_9, fun test_disk_pool_threshold/1),
instantiator(unpacked, spora_2_6, fun test_disk_pool_threshold/1),
instantiator(spora_2_6, replica_2_9, fun test_disk_pool_threshold/1),
instantiator(spora_2_6, spora_2_6, fun test_disk_pool_threshold/1),
instantiator(spora_2_6, unpacked, fun test_disk_pool_threshold/1)
].

%% --------------------------------------------------------------------------------------------
%% test_sync_pack_mine
Expand All @@ -107,6 +118,7 @@ test_sync_pack_mine({{Blocks, Chunks, SourcePackingType}, SinkPackingType}) ->
SinkNode = peer2,

SinkPacking = start_sink_node(SinkNode, SourceNode, B0, SinkPackingType),
%% Partition 1 and half of partition 2 are below the disk pool threshold
ar_e2e:assert_syncs_range(
SinkNode,
?PARTITION_SIZE,
Expand Down Expand Up @@ -134,30 +146,36 @@ test_syncing_blocked({{Blocks, Chunks, SourcePackingType}, SinkPackingType}) ->
ar_e2e:assert_does_not_sync_range(SinkNode, ?PARTITION_SIZE, 2*?PARTITION_SIZE),
ar_e2e:assert_no_chunks(SinkNode, Chunks).

test_unpacked_and_packed_sync_pack_mine({{Blocks, _Chunks, SourcePackingType}, PackingType}) ->
ar_e2e:delayed_print(<<" ~p -> {~p, ~p} ">>, [SourcePackingType, PackingType, unpacked]),
test_unpacked_and_packed_sync_pack_mine(
{{Blocks, _Chunks, SourcePackingType}, {PackingType1, PackingType2}}) ->
ar_e2e:delayed_print(<<" ~p -> {~p, ~p} ">>, [SourcePackingType, PackingType1, PackingType2]),
?LOG_INFO([{event, test_unpacked_and_packed_sync_pack_mine}, {module, ?MODULE},
{from_packing_type, SourcePackingType}, {to_packing_type, PackingType}]),
{from_packing_type, SourcePackingType}, {to_packing_type, {PackingType1, PackingType2}}]),
[B0 | _] = Blocks,
SourceNode = peer1,
SinkNode = peer2,

{SinkPacking, unpacked} = start_sink_node(SinkNode, SourceNode, B0, PackingType, unpacked),
{SinkPacking1, SinkPacking2} = start_sink_node(
SinkNode, SourceNode, B0, PackingType1, PackingType2),
ar_e2e:assert_syncs_range(
SinkNode,
?PARTITION_SIZE,
2*?PARTITION_SIZE + ar_storage_module:get_overlap(SinkPacking)),
ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking),
ar_e2e:assert_partition_size(SinkNode, 1, unpacked),
2*?PARTITION_SIZE + ar_storage_module:get_overlap(SinkPacking1)),
ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking1),
ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking2),
%% XXX: we should be able to assert the chunks here, but since we have two
%% storage modules configurd and are querying the replica_2_9 chunk, GET /chunk gets
%% storage modules configured and are querying the replica_2_9 chunk, GET /chunk gets
%% confused and tries to load the unpacked chunk, which then fails within the middleware
%% handler and 404s. To fix we'd need to update GET /chunk to query all matching
%% storage modules and then find the best one to return. But since this is a rare edge
%% case, we'll just disable the assertion for now.
%% ar_e2e:assert_chunks(SinkNode, SinkPacking, Chunks),

ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, SinkPacking),
MinablePacking = case PackingType1 of
unpacked -> SinkPacking2;
_ -> SinkPacking1
end,
ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, MinablePacking),
ok.


Expand Down Expand Up @@ -266,14 +284,66 @@ test_entropy_last_sync_pack_mine({{Blocks, Chunks, SourcePackingType}, SinkPacki
ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, SinkPacking),
ok.

test_disk_pool_threshold({SourcePackingType, SinkPackingType}) ->
ar_e2e:delayed_print(<<" ~p -> ~p ">>, [SourcePackingType, SinkPackingType]),
?LOG_INFO([{event, test_disk_pool_threshold}, {module, ?MODULE},
{from_packing_type, SourcePackingType}, {to_packing_type, SinkPackingType}]),

SourceNode = peer1,
SinkNode = peer2,

%% When the source packing type is unpacked, this setup process performs some
%% extra disk pool checks:
%% 1. spin up a spora_2_6 node and mine some blocks
%% 2. some chunks are below the disk pool threshold and some above
%% 3. spin up an unpacked node and sync from spora_2_6
%% 4. shut down the spora_2_6 node
%% 5. now the unpacked node should have synced all of the chunks, both above and below
%% the disk pool threshold
%% 6. proceed with test and spin up the sink node and confirm it too can sink all chunks
%% from the unpacked source node - both above and below the disk pool threshold
{Blocks, Chunks, SourcePackingType} = setup_source_node(SourcePackingType),
[B0 | _] = Blocks,

SinkPacking = start_sink_node(SinkNode, SourceNode, B0, SinkPackingType),
%% Partition 1 and half of partition 2 are below the disk pool threshold
ar_e2e:assert_syncs_range(SinkNode, ?PARTITION_SIZE, 4*?PARTITION_SIZE),
ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking),
ar_e2e:assert_partition_size(SinkNode, 2, SinkPacking, floor(0.5*?PARTITION_SIZE)),
ar_e2e:assert_empty_partition(SinkNode, 3, SinkPacking),
ar_e2e:assert_does_not_sync_range(SinkNode, 0, ?PARTITION_SIZE),
ar_e2e:assert_chunks(SinkNode, SinkPacking, Chunks),

case SinkPackingType of
unpacked ->
ok;
_ ->
ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, SinkPacking),

%% Now that we mined a block, the rest of partition 2 is below the disk pool
%% threshold
ar_e2e:assert_syncs_range(SinkNode, ?PARTITION_SIZE, 4*?PARTITION_SIZE),
ar_e2e:assert_partition_size(SinkNode, 2, SinkPacking),
%% All of partition 3 is still above the disk pool threshold
ar_e2e:assert_empty_partition(SinkNode, 3, SinkPacking),
ar_e2e:assert_does_not_sync_range(SinkNode, 0, ?PARTITION_SIZE),
ok
end.

start_sink_node(Node, SourceNode, B0, PackingType) ->
Wallet = ar_test_node:remote_call(Node, ar_e2e, load_wallet_fixture, [wallet_b]),
SinkAddr = ar_wallet:to_address(Wallet),
SinkPacking = ar_e2e:packing_type_to_packing(PackingType, SinkAddr),
{ok, Config} = ar_test_node:get_config(Node),

StorageModules = [
{?PARTITION_SIZE, 1, SinkPacking}
{?PARTITION_SIZE, 1, SinkPacking},
{?PARTITION_SIZE, 2, SinkPacking},
{?PARTITION_SIZE, 3, SinkPacking},
{?PARTITION_SIZE, 4, SinkPacking},
{?PARTITION_SIZE, 5, SinkPacking},
{?PARTITION_SIZE, 6, SinkPacking},
{?PARTITION_SIZE, 10, SinkPacking}
],
?assertEqual(ar_test_node:peer_name(Node),
ar_test_node:start_other_node(Node, B0, Config#config{
Expand Down
6 changes: 4 additions & 2 deletions apps/arweave/include/ar.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@
-define(RECENT_BLOCKS_WITHOUT_TIMESTAMP, 5).
-endif.

%% How long to wait before giving up on test(s).
-define(TEST_TIMEOUT, 90 * 60).
%% How long to wait before giving up on unit test(s).
-define(TEST_TIMEOUT, 90 * 60). %% 90 minutes
%% How long to wait before giving up on e2e test(s).
-define(E2E_TEST_TIMEOUT, 6 * 60 * 60). %% 6 hours

%% The maximum byte size of a single POST body.
-define(MAX_BODY_SIZE, 15 * 1024 * 1024).
Expand Down
6 changes: 5 additions & 1 deletion apps/arweave/src/ar.erl
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,10 @@ tests(Mod) ->
tests(test, Mod).

tests(TestType, Mods, Config) when is_list(Mods) ->
TotalTimeout = case TestType of
e2e -> ?E2E_TEST_TIMEOUT;
_ -> ?TEST_TIMEOUT
end,
try
start_for_tests(TestType, Config),
ar_test_node:boot_peers(TestType),
Expand All @@ -922,7 +926,7 @@ tests(TestType, Mods, Config) when is_list(Mods) ->
end,
Result =
try
eunit:test({timeout, ?TEST_TIMEOUT, [Mods]}, [verbose, {print_depth, 100}])
eunit:test({timeout, TotalTimeout, [Mods]}, [verbose, {print_depth, 100}])
after
ar_test_node:stop_peers(TestType)
end,
Expand Down
Loading
Loading