diff --git a/apps/arweave/e2e/ar_e2e.erl b/apps/arweave/e2e/ar_e2e.erl index a6e2347ab..d97042197 100644 --- a/apps/arweave/e2e/ar_e2e.erl +++ b/apps/arweave/e2e/ar_e2e.erl @@ -4,10 +4,10 @@ write_chunk_fixture/3, load_chunk_fixture/2]). -export([delayed_print/2, packing_type_to_packing/2, - start_source_node/3, source_node_storage_modules/3, max_chunk_offset/1, + start_source_node/3, source_node_storage_modules/3, max_chunk_offset/1, mine_block/4, assert_block/2, assert_syncs_range/3, assert_does_not_sync_range/3, assert_has_entropy/4, assert_chunks/3, assert_chunks/4, assert_no_chunks/2, - assert_partition_size/3, assert_empty_partition/3, + assert_partition_size/3, assert_partition_size/4, assert_empty_partition/3, assert_mine_and_validate/3]). -include_lib("arweave/include/ar.hrl"). @@ -90,13 +90,17 @@ start_source_node(Node, unpacked, _WalletFixture) -> }, true), ?LOG_INFO("Source node ~p started.", [Node]), - + + ar_e2e:assert_syncs_range(Node, 0, 4*?PARTITION_SIZE), + ar_e2e:assert_partition_size(Node, 0, unpacked), ar_e2e:assert_partition_size(Node, 1, unpacked), + ar_e2e:assert_partition_size(Node, 2, unpacked, floor(0.5*?PARTITION_SIZE)), - ar_e2e:assert_syncs_range(Node, ?PARTITION_SIZE, 2*?PARTITION_SIZE), ar_e2e:assert_chunks(Node, unpacked, Chunks), + ar_e2e:assert_empty_partition(Node, 3, unpacked), + ?LOG_INFO("Source node ~p assertions passed.", [Node]), ar_test_node:stop(TempNode), @@ -124,11 +128,11 @@ start_source_node(Node, PackingType, WalletFixture) -> %% Note: small chunks will be padded to 256 KiB. So B1 actually contains 3 chunks of data %% and B2 starts at a chunk boundary and contains 1 chunk of data. - B1 = mine_block(Node, Wallet, floor(2.5 * ?DATA_CHUNK_SIZE)), - B2 = mine_block(Node, Wallet, floor(0.75 * ?DATA_CHUNK_SIZE)), - B3 = mine_block(Node, Wallet, ?PARTITION_SIZE), - B4 = mine_block(Node, Wallet, ?PARTITION_SIZE), - B5 = mine_block(Node, Wallet, ?PARTITION_SIZE), + B1 = mine_block(Node, Wallet, floor(2.5 * ?DATA_CHUNK_SIZE), false), %% p1 + B2 = mine_block(Node, Wallet, floor(0.75 * ?DATA_CHUNK_SIZE), false), %% p1 + B3 = mine_block(Node, Wallet, ?PARTITION_SIZE, false), %% p1 to p2 + B4 = mine_block(Node, Wallet, floor(0.5 * ?PARTITION_SIZE), false), %% p2 + B5 = mine_block(Node, Wallet, ?PARTITION_SIZE, true), %% p3 chunks are stored in disk pool %% List of {Block, EndOffset, ChunkSize} Chunks = [ @@ -146,14 +150,16 @@ start_source_node(Node, PackingType, WalletFixture) -> SourcePacking = ar_e2e:packing_type_to_packing(PackingType, RewardAddr), + ar_e2e:assert_syncs_range(Node, 0, 4*?PARTITION_SIZE), + ar_e2e:assert_partition_size(Node, 0, SourcePacking), ar_e2e:assert_partition_size(Node, 1, SourcePacking), - - ar_e2e:assert_syncs_range(Node, - ?PARTITION_SIZE, - 2*?PARTITION_SIZE + ar_storage_module:get_overlap(SourcePacking)), + ar_e2e:assert_partition_size(Node, 2, SourcePacking, floor(0.5*?PARTITION_SIZE)), + ar_e2e:assert_chunks(Node, SourcePacking, Chunks), + ar_e2e:assert_empty_partition(Node, 3, SourcePacking), + ?LOG_INFO("Source node ~p assertions passed.", [Node]), {[B0, B1, B2, B3, B4, B5], RewardAddr, Chunks}. @@ -175,12 +181,10 @@ source_node_storage_modules(SourcePacking) -> {?PARTITION_SIZE, 1, SourcePacking}, {?PARTITION_SIZE, 2, SourcePacking}, {?PARTITION_SIZE, 3, SourcePacking}, - {?PARTITION_SIZE, 4, SourcePacking}, - {?PARTITION_SIZE, 5, SourcePacking}, - {?PARTITION_SIZE, 6, SourcePacking} + {?PARTITION_SIZE, 4, SourcePacking} ]. -mine_block(Node, Wallet, DataSize) -> +mine_block(Node, Wallet, DataSize, IsTemporary) -> WeaveSize = ar_test_node:remote_call(Node, ar_node, get_current_weave_size, []), Addr = ar_wallet:to_address(Wallet), {TX, Chunks} = generate_tx(Node, Wallet, WeaveSize, DataSize), @@ -188,7 +192,7 @@ mine_block(Node, Wallet, DataSize) -> ?assertEqual(Addr, B#block.reward_addr), - Proofs = ar_test_data_sync:post_proofs(Node, B, TX, Chunks), + Proofs = ar_test_data_sync:post_proofs(Node, B, TX, Chunks, IsTemporary), ar_test_data_sync:wait_until_syncs_chunks(Node, Proofs, infinity), B. @@ -258,8 +262,9 @@ assert_syncs_range(Node, StartOffset, EndOffset) -> case HasRange of true -> ok; - false -> - SyncRecord = ar_http_iface_client:get_sync_record(Node, json), + _ -> + SyncRecord = ar_http_iface_client:get_sync_record( + ar_test_node:peer_ip(Node)), ?assert(false, iolist_to_binary(io_lib:format( "~s failed to sync range ~p - ~p. Sync record: ~p", @@ -270,7 +275,7 @@ assert_does_not_sync_range(Node, StartOffset, EndOffset) -> ar_util:do_until( fun() -> has_range(Node, StartOffset, EndOffset) end, 1000, - 60_000 + 15_000 ), ?assertEqual(false, has_range(Node, StartOffset, EndOffset), iolist_to_binary(io_lib:format( @@ -278,7 +283,8 @@ assert_does_not_sync_range(Node, StartOffset, EndOffset) -> [Node, StartOffset, EndOffset]))). assert_partition_size(Node, PartitionNumber, Packing) -> - Size = ?PARTITION_SIZE, + assert_partition_size(Node, PartitionNumber, Packing, ?PARTITION_SIZE). +assert_partition_size(Node, PartitionNumber, Packing, Size) -> ?LOG_INFO("~p: Asserting partition ~p,~p is size ~p", [Node, PartitionNumber, ar_serialize:encode_packing(Packing, true), Size]), ?assert( @@ -288,7 +294,7 @@ assert_partition_size(Node, PartitionNumber, Packing) -> [PartitionNumber, Packing]) >= Size end, 100, - 60_000 + 120_000 ), iolist_to_binary(io_lib:format( "~s partition ~p,~p failed to reach size ~p. Current size: ~p.", @@ -303,14 +309,14 @@ assert_empty_partition(Node, PartitionNumber, Packing) -> [PartitionNumber, Packing]) > 0 end, 100, - 30_000 + 15_000 ), ?assertEqual( 0, ar_test_node:remote_call(Node, ar_mining_stats, get_partition_data_size, [PartitionNumber, Packing]), iolist_to_binary(io_lib:format( - "~s partition ~p,~p os not empty", [Node, PartitionNumber, + "~s partition ~p,~p is not empty", [Node, PartitionNumber, ar_serialize:encode_packing(Packing, true)]))). assert_mine_and_validate(MinerNode, ValidatorNode, MinerPacking) -> diff --git a/apps/arweave/e2e/ar_repack_mine_tests.erl b/apps/arweave/e2e/ar_repack_mine_tests.erl index 680deff1e..a2d0ee3d1 100644 --- a/apps/arweave/e2e/ar_repack_mine_tests.erl +++ b/apps/arweave/e2e/ar_repack_mine_tests.erl @@ -57,23 +57,38 @@ test_repack_mine({FromPackingType, ToPackingType}) -> mining_addr = AddrB }), + ar_e2e:assert_syncs_range(RepackerNode, 0, 4*?PARTITION_SIZE), + ar_e2e:assert_partition_size(RepackerNode, 0, ToPacking), ar_e2e:assert_partition_size(RepackerNode, 1, ToPacking), + ar_e2e:assert_partition_size(RepackerNode, 2, ToPacking, floor(0.5*?PARTITION_SIZE)), + ar_e2e:assert_chunks(RepackerNode, ToPacking, Chunks), + ar_e2e:assert_empty_partition(RepackerNode, 3, ToPacking), ar_test_node:restart_with_config(RepackerNode, Config#config{ storage_modules = StorageModules, mining_addr = AddrB }), - ar_e2e:assert_syncs_range(RepackerNode, - ?PARTITION_SIZE, - 2*?PARTITION_SIZE + ar_storage_module:get_overlap(ToPacking)), - + ar_e2e:assert_syncs_range(RepackerNode, 0, 4*?PARTITION_SIZE), + ar_e2e:assert_partition_size(RepackerNode, 0, ToPacking), + ar_e2e:assert_partition_size(RepackerNode, 1, ToPacking), + ar_e2e:assert_partition_size(RepackerNode, 2, ToPacking, floor(0.5*?PARTITION_SIZE)), ar_e2e:assert_chunks(RepackerNode, ToPacking, Chunks), + ar_e2e:assert_empty_partition(RepackerNode, 3, ToPacking), case ToPackingType of unpacked -> ok; _ -> - ar_e2e:assert_mine_and_validate(RepackerNode, ValidatorNode, ToPacking) + ar_e2e:assert_mine_and_validate(RepackerNode, ValidatorNode, ToPacking), + + %% Now that we mined a block, the rest of partition 2 is below the disk pool + %% threshold + ar_e2e:assert_syncs_range(RepackerNode, 0, 4*?PARTITION_SIZE), + ar_e2e:assert_partition_size(RepackerNode, 0, ToPacking), + ar_e2e:assert_partition_size(RepackerNode, 1, ToPacking), + ar_e2e:assert_partition_size(RepackerNode, 2, ToPacking), + %% All of partition 3 is still above the disk pool threshold + ar_e2e:assert_empty_partition(RepackerNode, 3, ToPacking) end. test_repacking_blocked({FromPackingType, ToPackingType}) -> diff --git a/apps/arweave/e2e/ar_sync_pack_mine_tests.erl b/apps/arweave/e2e/ar_sync_pack_mine_tests.erl index a13fb0d41..d069bc8a1 100644 --- a/apps/arweave/e2e/ar_sync_pack_mine_tests.erl +++ b/apps/arweave/e2e/ar_sync_pack_mine_tests.erl @@ -17,7 +17,7 @@ setup_source_node(PackingType) -> {Blocks, Chunks, PackingType}. instantiator(GenesisData, SinkPackingType, TestFun) -> - {timeout, 300, {with, {GenesisData, SinkPackingType}, [TestFun]}}. + {timeout, 600, {with, {GenesisData, SinkPackingType}, [TestFun]}}. %% -------------------------------------------------------------------------------------------- %% Test Registration @@ -71,7 +71,9 @@ unpacked_edge_case_test_() -> {setup, fun () -> setup_source_node(unpacked) end, fun (GenesisData) -> [ - instantiator(GenesisData, replica_2_9, + instantiator(GenesisData, {replica_2_9, unpacked}, + fun test_unpacked_and_packed_sync_pack_mine/1), + instantiator(GenesisData, {unpacked, replica_2_9}, fun test_unpacked_and_packed_sync_pack_mine/1), instantiator(GenesisData, replica_2_9, fun test_entropy_first_sync_pack_mine/1), @@ -84,16 +86,25 @@ spora_2_6_edge_case_test_() -> {setup, fun () -> setup_source_node(spora_2_6) end, fun (GenesisData) -> [ - instantiator(GenesisData, replica_2_9, + instantiator(GenesisData, {replica_2_9, unpacked}, + fun test_unpacked_and_packed_sync_pack_mine/1), + instantiator(GenesisData, {unpacked, replica_2_9}, fun test_unpacked_and_packed_sync_pack_mine/1), instantiator(GenesisData, replica_2_9, fun test_entropy_first_sync_pack_mine/1), instantiator(GenesisData, replica_2_9, fun test_entropy_last_sync_pack_mine/1) - ] end}. +disk_pool_threshold_test_() -> + [ + instantiator(unpacked, replica_2_9, fun test_disk_pool_threshold/1), + instantiator(unpacked, spora_2_6, fun test_disk_pool_threshold/1), + instantiator(spora_2_6, replica_2_9, fun test_disk_pool_threshold/1), + instantiator(spora_2_6, spora_2_6, fun test_disk_pool_threshold/1), + instantiator(spora_2_6, unpacked, fun test_disk_pool_threshold/1) + ]. %% -------------------------------------------------------------------------------------------- %% test_sync_pack_mine @@ -107,6 +118,7 @@ test_sync_pack_mine({{Blocks, Chunks, SourcePackingType}, SinkPackingType}) -> SinkNode = peer2, SinkPacking = start_sink_node(SinkNode, SourceNode, B0, SinkPackingType), + %% Partition 1 and half of partition 2 are below the disk pool threshold ar_e2e:assert_syncs_range( SinkNode, ?PARTITION_SIZE, @@ -134,30 +146,36 @@ test_syncing_blocked({{Blocks, Chunks, SourcePackingType}, SinkPackingType}) -> ar_e2e:assert_does_not_sync_range(SinkNode, ?PARTITION_SIZE, 2*?PARTITION_SIZE), ar_e2e:assert_no_chunks(SinkNode, Chunks). -test_unpacked_and_packed_sync_pack_mine({{Blocks, _Chunks, SourcePackingType}, PackingType}) -> - ar_e2e:delayed_print(<<" ~p -> {~p, ~p} ">>, [SourcePackingType, PackingType, unpacked]), +test_unpacked_and_packed_sync_pack_mine( + {{Blocks, _Chunks, SourcePackingType}, {PackingType1, PackingType2}}) -> + ar_e2e:delayed_print(<<" ~p -> {~p, ~p} ">>, [SourcePackingType, PackingType1, PackingType2]), ?LOG_INFO([{event, test_unpacked_and_packed_sync_pack_mine}, {module, ?MODULE}, - {from_packing_type, SourcePackingType}, {to_packing_type, PackingType}]), + {from_packing_type, SourcePackingType}, {to_packing_type, {PackingType1, PackingType2}}]), [B0 | _] = Blocks, SourceNode = peer1, SinkNode = peer2, - {SinkPacking, unpacked} = start_sink_node(SinkNode, SourceNode, B0, PackingType, unpacked), + {SinkPacking1, SinkPacking2} = start_sink_node( + SinkNode, SourceNode, B0, PackingType1, PackingType2), ar_e2e:assert_syncs_range( SinkNode, ?PARTITION_SIZE, - 2*?PARTITION_SIZE + ar_storage_module:get_overlap(SinkPacking)), - ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking), - ar_e2e:assert_partition_size(SinkNode, 1, unpacked), + 2*?PARTITION_SIZE + ar_storage_module:get_overlap(SinkPacking1)), + ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking1), + ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking2), %% XXX: we should be able to assert the chunks here, but since we have two - %% storage modules configurd and are querying the replica_2_9 chunk, GET /chunk gets + %% storage modules configured and are querying the replica_2_9 chunk, GET /chunk gets %% confused and tries to load the unpacked chunk, which then fails within the middleware %% handler and 404s. To fix we'd need to update GET /chunk to query all matching %% storage modules and then find the best one to return. But since this is a rare edge %% case, we'll just disable the assertion for now. %% ar_e2e:assert_chunks(SinkNode, SinkPacking, Chunks), - ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, SinkPacking), + MinablePacking = case PackingType1 of + unpacked -> SinkPacking2; + _ -> SinkPacking1 + end, + ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, MinablePacking), ok. @@ -266,6 +284,52 @@ test_entropy_last_sync_pack_mine({{Blocks, Chunks, SourcePackingType}, SinkPacki ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, SinkPacking), ok. +test_disk_pool_threshold({SourcePackingType, SinkPackingType}) -> + ar_e2e:delayed_print(<<" ~p -> ~p ">>, [SourcePackingType, SinkPackingType]), + ?LOG_INFO([{event, test_disk_pool_threshold}, {module, ?MODULE}, + {from_packing_type, SourcePackingType}, {to_packing_type, SinkPackingType}]), + + SourceNode = peer1, + SinkNode = peer2, + + %% When the source packing type is unpacked, this setup process performs some + %% extra disk pool checks: + %% 1. spin up a spora_2_6 node and mine some blocks + %% 2. some chunks are below the disk pool threshold and some above + %% 3. spin up an unpacked node and sync from spora_2_6 + %% 4. shut down the spora_2_6 node + %% 5. now the unpacked node should have synced all of the chunks, both above and below + %% the disk pool threshold + %% 6. proceed with test and spin up the sink node and confirm it too can sink all chunks + %% from the unpacked source node - both above and below the disk pool threshold + {Blocks, Chunks, SourcePackingType} = setup_source_node(SourcePackingType), + [B0 | _] = Blocks, + + SinkPacking = start_sink_node(SinkNode, SourceNode, B0, SinkPackingType), + %% Partition 1 and half of partition 2 are below the disk pool threshold + ar_e2e:assert_syncs_range(SinkNode, ?PARTITION_SIZE, 4*?PARTITION_SIZE), + ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking), + ar_e2e:assert_partition_size(SinkNode, 2, SinkPacking, floor(0.5*?PARTITION_SIZE)), + ar_e2e:assert_empty_partition(SinkNode, 3, SinkPacking), + ar_e2e:assert_does_not_sync_range(SinkNode, 0, ?PARTITION_SIZE), + ar_e2e:assert_chunks(SinkNode, SinkPacking, Chunks), + + case SinkPackingType of + unpacked -> + ok; + _ -> + ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, SinkPacking), + + %% Now that we mined a block, the rest of partition 2 is below the disk pool + %% threshold + ar_e2e:assert_syncs_range(SinkNode, ?PARTITION_SIZE, 4*?PARTITION_SIZE), + ar_e2e:assert_partition_size(SinkNode, 2, SinkPacking), + %% All of partition 3 is still above the disk pool threshold + ar_e2e:assert_empty_partition(SinkNode, 3, SinkPacking), + ar_e2e:assert_does_not_sync_range(SinkNode, 0, ?PARTITION_SIZE), + ok + end. + start_sink_node(Node, SourceNode, B0, PackingType) -> Wallet = ar_test_node:remote_call(Node, ar_e2e, load_wallet_fixture, [wallet_b]), SinkAddr = ar_wallet:to_address(Wallet), @@ -273,7 +337,13 @@ start_sink_node(Node, SourceNode, B0, PackingType) -> {ok, Config} = ar_test_node:get_config(Node), StorageModules = [ - {?PARTITION_SIZE, 1, SinkPacking} + {?PARTITION_SIZE, 1, SinkPacking}, + {?PARTITION_SIZE, 2, SinkPacking}, + {?PARTITION_SIZE, 3, SinkPacking}, + {?PARTITION_SIZE, 4, SinkPacking}, + {?PARTITION_SIZE, 5, SinkPacking}, + {?PARTITION_SIZE, 6, SinkPacking}, + {?PARTITION_SIZE, 10, SinkPacking} ], ?assertEqual(ar_test_node:peer_name(Node), ar_test_node:start_other_node(Node, B0, Config#config{ diff --git a/apps/arweave/include/ar.hrl b/apps/arweave/include/ar.hrl index b3236dc74..cf219028f 100644 --- a/apps/arweave/include/ar.hrl +++ b/apps/arweave/include/ar.hrl @@ -136,8 +136,10 @@ -define(RECENT_BLOCKS_WITHOUT_TIMESTAMP, 5). -endif. -%% How long to wait before giving up on test(s). --define(TEST_TIMEOUT, 90 * 60). +%% How long to wait before giving up on unit test(s). +-define(TEST_TIMEOUT, 90 * 60). %% 90 minutes +%% How long to wait before giving up on e2e test(s). +-define(E2E_TEST_TIMEOUT, 6 * 60 * 60). %% 6 hours %% The maximum byte size of a single POST body. -define(MAX_BODY_SIZE, 15 * 1024 * 1024). diff --git a/apps/arweave/src/ar.erl b/apps/arweave/src/ar.erl index 743a25a8a..eac193efe 100644 --- a/apps/arweave/src/ar.erl +++ b/apps/arweave/src/ar.erl @@ -911,6 +911,10 @@ tests(Mod) -> tests(test, Mod). tests(TestType, Mods, Config) when is_list(Mods) -> + TotalTimeout = case TestType of + e2e -> ?E2E_TEST_TIMEOUT; + _ -> ?TEST_TIMEOUT + end, try start_for_tests(TestType, Config), ar_test_node:boot_peers(TestType), @@ -922,7 +926,7 @@ tests(TestType, Mods, Config) when is_list(Mods) -> end, Result = try - eunit:test({timeout, ?TEST_TIMEOUT, [Mods]}, [verbose, {print_depth, 100}]) + eunit:test({timeout, TotalTimeout, [Mods]}, [verbose, {print_depth, 100}]) after ar_test_node:stop_peers(TestType) end, diff --git a/apps/arweave/src/ar_chunk_copy.erl b/apps/arweave/src/ar_chunk_copy.erl index 32b0d0739..5fe447716 100644 --- a/apps/arweave/src/ar_chunk_copy.erl +++ b/apps/arweave/src/ar_chunk_copy.erl @@ -4,7 +4,7 @@ -behaviour(gen_server). --export([start_link/1, register_workers/0, ready_for_work/1, read_range/4]). +-export([start_link/1, register_workers/0, read_range/4]). -export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). @@ -42,18 +42,21 @@ register_workers() -> register_read_workers() -> {ok, Config} = application:get_env(arweave, config), + StoreIDs = [ + ar_storage_module:id(StorageModule) || StorageModule <- Config#config.storage_modules + ] ++ ["default"], {Workers, WorkerMap} = lists:foldl( - fun(StorageModule, {AccWorkers, AccWorkerMap}) -> - StoreID = ar_storage_module:id(StorageModule), - Name = list_to_atom("ar_data_sync_worker_" ++ StoreID), + fun(StoreID, {AccWorkers, AccWorkerMap}) -> + Label = ar_storage_module:label_by_id(StoreID), + Name = list_to_atom("ar_data_sync_worker_" ++ Label), Worker = ?CHILD_WITH_ARGS(ar_data_sync_worker, worker, Name, [Name]), {[ Worker | AccWorkers], AccWorkerMap#{StoreID => Name}} end, {[], #{}}, - Config#config.storage_modules + StoreIDs ), {Workers, WorkerMap}. @@ -68,7 +71,7 @@ ready_for_work(StoreID) -> end. read_range(Start, End, OriginStoreID, TargetStoreID) -> - case ar_chunk_copy:ready_for_work(OriginStoreID) of + case ready_for_work(OriginStoreID) of true -> Args = {Start, End, OriginStoreID, TargetStoreID}, gen_server:cast(?MODULE, {read_range, Args}), @@ -185,8 +188,6 @@ process_queue(Worker) -> {empty, _} -> Worker; {{value, Args}, Q2}-> - ?LOG_DEBUG([{event, process_queue}, {module, ?MODULE}, - {active_count, Worker#worker_tasks.active_count}, {args, Args}]), gen_server:cast(Worker#worker_tasks.worker, {read_range, Args}), Worker2 = Worker#worker_tasks{ task_queue = Q2, @@ -225,7 +226,8 @@ helpers_test_() -> [ {timeout, 30, fun test_ready_for_work/0}, {timeout, 30, fun test_enqueue_read_range/0}, - {timeout, 30, fun test_process_queue/0} + {timeout, 30, fun test_process_queue/0}, + {timeout, 30, fun test_register_workers/0} ]. test_ready_for_work() -> @@ -308,3 +310,13 @@ test_process_queue() -> queue:to_list(ExpectedWorker3#worker_tasks.task_queue), queue:to_list(Worker3#worker_tasks.task_queue)). +test_register_workers() -> + {ok, Config} = application:get_env(arweave, config), + StoreIDs = [ + ar_storage_module:id(StorageModule) || StorageModule <- Config#config.storage_modules], + lists:foreach( + fun(StoreID) -> + ?assertEqual(true, ready_for_work(StoreID)) + end, + StoreIDs ++ ["default"] + ). diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index 749d956e6..85545283a 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -379,6 +379,7 @@ init({StoreID, RepackInPlacePacking}) -> State2 = case RepackInPlacePacking of none -> + ar_device_lock:set_device_lock_metric(StoreID, repack, off), State#state{ repack_cursor = none, repack_status = off, @@ -394,6 +395,7 @@ init({StoreID, RepackInPlacePacking}) -> {cursor, RepackCursor}, {store_id, StoreID}, {target_packing, ar_serialize:encode_packing(Packing, true)}]), + ar_device_lock:set_device_lock_metric(StoreID, repack, paused), State#state{ repack_cursor = RepackCursor, target_packing = Packing, @@ -471,6 +473,7 @@ handle_cast({expire_repack_request, Ref}, #state{ packing_map = Map } = State) - handle_cast(repacking_complete, State) -> #state{ store_id = StoreID } = State, ar_device_lock:release_lock(repack, StoreID), + ar_device_lock:set_device_lock_metric(StoreID, repack, complete), State2 = State#state{ repack_status = complete }, maybe_log_repacking_complete(State2), {noreply, State2}; diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index e8b7bec92..a66d3765d 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -202,7 +202,7 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) -> {error, failed_to_store_chunk} end end, - case CheckSynced of + Result = case CheckSynced of synced -> ok; {synced_disk_pool, EndOffset4} -> @@ -241,6 +241,7 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) -> ?LOG_DEBUG([{event, stored_chunk_in_disk_pool}, {data_path_hash, ar_util:encode(DataPathHash2)}, {data_root, ar_util:encode(DataRoot)}, + {offset, Offset}, {relative_offset, EndOffset3}]), ets:insert(ar_disk_pool_data_roots, {DataRootKey, DiskPoolDataRootValue2}), @@ -255,7 +256,15 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) -> end end end - end. + end, + case Result of + {error, _} -> + ?LOG_ERROR([{event, failed_to_add_chunk_to_disk_pool}, + {offset, Offset}, {error, Result}]); + _ -> + ok + end, + Result. %% @doc Store the given value in the chunk data DB. -spec put_chunk_data( @@ -685,7 +694,6 @@ debug_get_disk_pool_chunks(Cursor) -> %%%=================================================================== init({"default" = StoreID, _}) -> - ?LOG_INFO([{event, ar_data_sync_start}, {store_id, StoreID}]), %% Trap exit to avoid corrupting any open files on quit.. process_flag(trap_exit, true), {ok, Config} = application:get_env(arweave, config), @@ -722,9 +730,14 @@ init({"default" = StoreID, _}) -> disk_pool_cursor = first, disk_pool_threshold = DiskPoolThreshold, store_id = StoreID, - sync_status = off + sync_status = paused, + range_start = DiskPoolThreshold, + range_end = DiskPoolThreshold }, - record_sync_status_metric(off, StoreID), + ar_device_lock:set_device_lock_metric(StoreID, sync, paused), + ?LOG_INFO([{event, ar_data_sync_start}, {store_id, StoreID}, + {range_start, State2#sync_data_state.range_start}, + {range_end, State2#sync_data_state.range_end}]), timer:apply_interval(?REMOVE_EXPIRED_DATA_ROOTS_FREQUENCY_MS, ?MODULE, remove_expired_disk_pool_data_roots, []), lists:foreach( @@ -751,6 +764,8 @@ init({"default" = StoreID, _}) -> ets:insert(ar_data_sync_state, {chunk_cache_size, 0}), timer:apply_interval(200, ?MODULE, record_chunk_cache_size_metric, []), gen_server:cast(self(), process_store_chunk_queue), + gen_server:cast(self(), sync_intervals), + gen_server:cast(self(), collect_peer_intervals), {ok, State2}; init({StoreID, RepackInPlacePacking}) -> ?LOG_INFO([{event, ar_data_sync_start}, {store_id, StoreID}]), @@ -774,7 +789,7 @@ init({StoreID, RepackInPlacePacking}) -> packing = ar_storage_module:get_packing(StoreID), sync_status = SyncStatus }, - record_sync_status_metric(SyncStatus, StoreID), + ar_device_lock:set_device_lock_metric(StoreID, sync, SyncStatus), gen_server:cast(self(), sync_intervals), gen_server:cast(self(), sync_data), {ok, State2}; @@ -782,23 +797,10 @@ init({StoreID, RepackInPlacePacking}) -> State2 = State#sync_data_state{ sync_status = off }, - record_sync_status_metric(off, StoreID), + ar_device_lock:set_device_lock_metric(StoreID, sync, off), {ok, State2} end. -record_sync_status_metric(off, StoreID) -> - record_sync_status_metric2(-1, StoreID); -record_sync_status_metric(active, StoreID) -> - record_sync_status_metric2(1, StoreID); -record_sync_status_metric(paused, StoreID) -> - record_sync_status_metric2(0, StoreID); -record_sync_status_metric(_, StoreID) -> - record_sync_status_metric2(-2, StoreID). - -record_sync_status_metric2(StatusCode, StoreID) -> - StoreIDLabel = ar_storage_module:label_by_id(StoreID), - prometheus_gauge:set(sync_status, [StoreIDLabel], StatusCode). - handle_cast({move_data_root_index, Cursor, N}, State) -> move_data_root_index(Cursor, N, State), {noreply, State}; @@ -835,13 +837,12 @@ handle_cast({join, RecentBI}, State) -> repair_data_root_offset_index(BI, State), DiskPoolThreshold = get_disk_pool_threshold(RecentBI), ets:insert(ar_data_sync_state, {disk_pool_threshold, DiskPoolThreshold}), - State2 = + State2 = store_sync_state( State#sync_data_state{ weave_size = WeaveSize, block_index = RecentBI, disk_pool_threshold = DiskPoolThreshold - }, - store_sync_state(State2), + }), {noreply, State2}; handle_cast({cut, Start}, #sync_data_state{ store_id = StoreID, @@ -892,16 +893,18 @@ handle_cast({add_tip_block, BlockTXPairs, BI}, State) -> ar_events:send(sync_record, {global_cut, BlockStartOffset}), DiskPoolThreshold = get_disk_pool_threshold(BI), ets:insert(ar_data_sync_state, {disk_pool_threshold, DiskPoolThreshold}), - State2 = State#sync_data_state{ weave_size = WeaveSize, - block_index = BI, disk_pool_threshold = DiskPoolThreshold }, - store_sync_state(State2), + State2 = store_sync_state( + State#sync_data_state{ + weave_size = WeaveSize, + block_index = BI, + disk_pool_threshold = DiskPoolThreshold + }), {noreply, State2}; handle_cast(sync_data, State) -> #sync_data_state{ store_id = StoreID } = State, Status = ar_device_lock:acquire_lock(sync, StoreID, State#sync_data_state.sync_status), State2 = State#sync_data_state{ sync_status = Status }, - record_sync_status_metric(Status, StoreID), State3 = case Status of active -> do_sync_data(State2); @@ -917,7 +920,6 @@ handle_cast(sync_data2, State) -> #sync_data_state{ store_id = StoreID } = State, Status = ar_device_lock:acquire_lock(sync, StoreID, State#sync_data_state.sync_status), State2 = State#sync_data_state{ sync_status = Status }, - record_sync_status_metric(Status, StoreID), State3 = case Status of active -> do_sync_data2(State2); @@ -1016,15 +1018,20 @@ handle_cast({collect_peer_intervals, Start, End}, State) -> true -> ok; false -> - case Start >= DiskPoolThreshold of + End2 = case StoreID == "default" of + true -> + End; + false -> + min(End, DiskPoolThreshold) + end, + + case Start >= End2 of true -> ar_util:cast_after(500, self(), {collect_peer_intervals, Start, End}); false -> - %% All checks have passed, find and enqueue intervals for one %% All checks have passed, find and enqueue intervals for one %% sync bucket worth of chunks starting at offset Start - ar_peer_intervals:fetch( - Start, min(End, DiskPoolThreshold), StoreID, AllPeersIntervals) + ar_peer_intervals:fetch(Start, End2, StoreID, AllPeersIntervals) end end, @@ -1082,7 +1089,6 @@ handle_cast(sync_intervals, State) -> #sync_data_state{ store_id = StoreID } = State, Status = ar_device_lock:acquire_lock(sync, StoreID, State#sync_data_state.sync_status), State2 = State#sync_data_state{ sync_status = Status }, - record_sync_status_metric(Status, StoreID), State3 = case Status of active -> do_sync_intervals(State2); @@ -1602,7 +1608,6 @@ do_sync_data2(#sync_data_state{ get_unsynced_intervals_from_other_storage_modules(StoreID, OtherStoreID, RangeStart, RangeEnd) end, - % ?LOG_DEBUG([{event, sync_data2}, {store_id, StoreID}, {intervals, Intervals}]), gen_server:cast(self(), sync_data2), State#sync_data_state{ unsynced_intervals_from_other_storage_modules = Intervals, @@ -2568,7 +2573,11 @@ reset_orphaned_data_roots_disk_pool_timestamps(DataRootKeySet) -> ). store_sync_state(#sync_data_state{ store_id = "default" } = State) -> - #sync_data_state{ block_index = BI } = State, + #sync_data_state{ + block_index = BI, disk_pool_threshold = DiskPoolThreshold, + weave_size = WeaveSize } = State, + %% default storage_module syncs the disk pool. + State2 = State#sync_data_state{ range_start = DiskPoolThreshold, range_end = WeaveSize }, DiskPoolDataRoots = ets:foldl( fun({DataRootKey, V}, Acc) -> maps:put(DataRootKey, V, Acc) end, #{}, ar_disk_pool_data_roots), @@ -2582,9 +2591,10 @@ store_sync_state(#sync_data_state{ store_id = "default" } = State) -> ok; ok -> ok - end; -store_sync_state(_State) -> - ok. + end, + State2; +store_sync_state(State) -> + State. %% @doc Look to StoreID to find data that TargetStoreID is missing. %% Args: @@ -2973,9 +2983,16 @@ process_valid_fetched_chunk(ChunkArgs, Args, State) -> {noreply, State}; false -> true = AbsoluteEndOffset == AbsoluteTXStartOffset + ChunkEndOffset, - pack_and_store_chunk({DataRoot, AbsoluteEndOffset, TXPath, TXRoot, - DataPath, Packing, ChunkEndOffset, ChunkSize, Chunk, - UnpackedChunk, none, none}, State) + case StoreID == "default" of + true -> + add_chunk( + DataRoot, DataPath, UnpackedChunk, ChunkEndOffset - 1, TXSize), + {noreply, State}; + false -> + pack_and_store_chunk({DataRoot, AbsoluteEndOffset, TXPath, TXRoot, + DataPath, Packing, ChunkEndOffset, ChunkSize, Chunk, + UnpackedChunk, none, none}, State) + end end end. @@ -3158,21 +3175,21 @@ log_failed_to_store_chunk(Reason, AbsoluteOffset, Offset, DataRoot, DataPathHash {data_root, ar_util:encode(DataRoot)}, {store_id, StoreID}]). -get_required_chunk_packing(Offset, ChunkSize, #sync_data_state{ store_id = StoreID }) -> - case Offset =< ?STRICT_DATA_SPLIT_THRESHOLD andalso ChunkSize < ?DATA_CHUNK_SIZE of +get_required_chunk_packing(_Offset, _ChunkSize, #sync_data_state{ store_id = "default" }) -> + unpacked; +get_required_chunk_packing(Offset, ChunkSize, State) -> + #sync_data_state{ store_id = StoreID } = State, + IsEarlySmallChunk = + Offset =< ?STRICT_DATA_SPLIT_THRESHOLD andalso ChunkSize < ?DATA_CHUNK_SIZE, + case IsEarlySmallChunk of true -> unpacked; false -> - case StoreID of - "default" -> - unpacked; - _ -> - case ar_storage_module:get_packing(StoreID) of - {replica_2_9, _Addr} -> - unpacked_padded; - Packing -> - Packing - end + case ar_storage_module:get_packing(StoreID) of + {replica_2_9, _Addr} -> + unpacked_padded; + Packing -> + Packing end end. diff --git a/apps/arweave/src/ar_device_lock.erl b/apps/arweave/src/ar_device_lock.erl index 1a240400f..cf0c8998a 100644 --- a/apps/arweave/src/ar_device_lock.erl +++ b/apps/arweave/src/ar_device_lock.erl @@ -2,7 +2,8 @@ -behaviour(gen_server). --export([get_store_id_to_device_map/0, is_ready/0, acquire_lock/3, release_lock/2]). +-export([get_store_id_to_device_map/0, is_ready/0, acquire_lock/3, release_lock/2, + set_device_lock_metric/3]). -export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]). @@ -72,6 +73,7 @@ acquire_lock(Mode, StoreID, CurrentStatus) -> true -> ok; false -> + set_device_lock_metric(StoreID, Mode, NewStatus), ?LOG_INFO([{event, acquire_device_lock}, {mode, Mode}, {store_id, StoreID}, {old_status, CurrentStatus}, {new_status, NewStatus}]) end, @@ -80,6 +82,17 @@ acquire_lock(Mode, StoreID, CurrentStatus) -> release_lock(Mode, StoreID) -> gen_server:cast(?MODULE, {release_lock, Mode, StoreID}). +set_device_lock_metric(StoreID, Mode, Status) -> + StatusCode = case Status of + off -> -1; + paused -> 0; + active -> 1; + complete -> 2; + _ -> -2 + end, + StoreIDLabel = ar_storage_module:label_by_id(StoreID), + prometheus_gauge:set(device_lock_status, [StoreIDLabel, Mode], StatusCode). + %%%=================================================================== %%% Generic server callbacks. %%%=================================================================== @@ -180,6 +193,14 @@ get_system_device(StorageModule) -> _ -> Device end. +do_acquire_lock(Mode, "default", State) -> + %% "default" storage module is a special case. It can only be in sync mode. + case Mode of + sync -> + {true, State}; + _ -> + {false, State} + end; do_acquire_lock(Mode, StoreID, State) -> MaxPrepareLocks = State#state.num_replica_2_9_workers, Device = maps:get(StoreID, State#state.store_id_to_device), diff --git a/apps/arweave/src/ar_entropy_gen.erl b/apps/arweave/src/ar_entropy_gen.erl index e2dd28f80..79aced3df 100644 --- a/apps/arweave/src/ar_entropy_gen.erl +++ b/apps/arweave/src/ar_entropy_gen.erl @@ -144,6 +144,7 @@ init({StoreID, Packing}) -> %% Entropy generation is complete complete end, + BucketEndOffset = ar_chunk_storage:get_chunk_bucket_end(Cursor), RepackCursor = case Packing == ar_storage_module:get_packing(StoreID) of @@ -164,27 +165,14 @@ init({StoreID, Packing}) -> prepare_status = PrepareStatus, repack_cursor = RepackCursor }, - record_prepare_status_metric(PrepareStatus, StoreID), + ar_device_lock:set_device_lock_metric(StoreID, prepare, PrepareStatus), {ok, State}. -record_prepare_status_metric(paused, StoreID) -> - record_prepare_status_metric2(0, StoreID); -record_prepare_status_metric(active, StoreID) -> - record_prepare_status_metric2(1, StoreID); -record_prepare_status_metric(complete, StoreID) -> - record_prepare_status_metric2(2, StoreID); -record_prepare_status_metric(_, StoreID) -> - record_prepare_status_metric2(-1, StoreID). - -record_prepare_status_metric2(StatusCode, StoreID) -> - StoreIDLabel = ar_storage_module:label_by_id(StoreID), - prometheus_gauge:set(prepare_replica_2_9_status, [StoreIDLabel], StatusCode). handle_cast(prepare_entropy, State) -> #state{ store_id = StoreID } = State, NewStatus = ar_device_lock:acquire_lock(prepare, StoreID, State#state.prepare_status), State2 = State#state{ prepare_status = NewStatus }, - record_prepare_status_metric(NewStatus, StoreID), State3 = case NewStatus of active -> do_prepare_entropy(State2); @@ -359,7 +347,7 @@ do_prepare_entropy(State) -> end, case StoreEntropy of complete -> - record_prepare_status_metric(complete, StoreID), + ar_device_lock:set_device_lock_metric(StoreID, prepare, complete), State#state{ prepare_status = complete }; waiting_for_repack -> ?LOG_INFO([{event, waiting_for_repacking}, diff --git a/apps/arweave/src/ar_http_iface_client.erl b/apps/arweave/src/ar_http_iface_client.erl index 121247017..80296b684 100644 --- a/apps/arweave/src/ar_http_iface_client.erl +++ b/apps/arweave/src/ar_http_iface_client.erl @@ -11,7 +11,7 @@ get_tx_data/2, get_wallet_list_chunk/2, get_wallet_list_chunk/3, get_wallet_list/2, add_peer/1, get_info/1, get_info/2, get_peers/1, get_time/2, get_height/1, get_block_index/3, - get_sync_record/1, get_sync_record/2, get_sync_record/3, + get_sync_record/1, get_sync_record/3, get_chunk_binary/3, get_mempool/1, get_sync_buckets/1, get_recent_hash_list/1, get_recent_hash_list_diff/2, get_reward_history/3, @@ -361,14 +361,7 @@ decode_block_index(Bin, json) -> end. get_sync_record(Peer) -> - get_sync_record(Peer, binary). - -get_sync_record(Peer, Encoding) -> - ContentType = case Encoding of - binary -> <<"application/etf">>; - json -> <<"application/json">> - end, - Headers = [{<<"Content-Type">>, ContentType}], + Headers = [{<<"Content-Type">>, <<"application/etf">>}], handle_sync_record_response(ar_http:req(#{ peer => Peer, method => get, @@ -869,8 +862,8 @@ handle_chunk_response({ok, {{<<"200">>, _}, _, Body, Start, End}}, RequestedPack end; false -> ?LOG_WARNING([{event, peer_served_proof_with_wrong_packing}, - {requested_packing, ar_serialize:encode_packing(RequestedPacking)}, - {got_packing, ar_serialize:encode_packing(Packing)}, + {requested_packing, ar_serialize:encode_packing(RequestedPacking, false)}, + {got_packing, ar_serialize:encode_packing(Packing, false)}, {peer, ar_util:format_peer(Peer)}]), {error, wrong_packing} end diff --git a/apps/arweave/src/ar_metrics.erl b/apps/arweave/src/ar_metrics.erl index 47f509443..e384f207e 100644 --- a/apps/arweave/src/ar_metrics.erl +++ b/apps/arweave/src/ar_metrics.erl @@ -461,9 +461,10 @@ register() -> "'type' can be 'sync_range' or 'read_range'. 'peer' is the peer the task " "is intended for - for 'read_range' tasks this will be 'localhost'."}]), - prometheus_gauge:new([{name, sync_status}, - {labels, [store_id]}, - {help, "The syncing status of the storage module."}]), + prometheus_gauge:new([{name, device_lock_status}, + {labels, [store_id, mode]}, + {help, "The device lock status of the storage module. " + "-1: off, 0: paused, 1: active, 2: complete -2: unknown"}]), prometheus_gauge:new([{name, sync_intervals_queue_size}, {labels, [store_id]}, {help, "The size of the syncing intervals queue."}]), @@ -482,9 +483,6 @@ register() -> "indicates whether this is the time to generate a single 8 MiB entropy or " "the time to generate all 32 entropies needed for full chunks."} ]), - prometheus_gauge:new([{name, prepare_replica_2_9_status}, - {labels, [store_id]}, - {help, "The replica 2.9 preparation status."}]), %% --------------------------------------------------------------------------------------- %% Pool related metrics diff --git a/apps/arweave/src/ar_sync_record.erl b/apps/arweave/src/ar_sync_record.erl index 60be89212..010f14595 100644 --- a/apps/arweave/src/ar_sync_record.erl +++ b/apps/arweave/src/ar_sync_record.erl @@ -451,7 +451,7 @@ add2(End, Start, ID, State) -> add2(End, Start, Packing, ID, State) -> #state{ sync_record_by_id = SyncRecordByID, sync_record_by_id_type = SyncRecordByIDType, - state_db = StateDB, store_id = StoreID } = State, + state_db = StateDB, store_id = StoreID } = State, ByType = maps:get({ID, Packing}, SyncRecordByIDType, ar_intervals:new()), ByType2 = ar_intervals:add(ByType, End, Start), SyncRecordByIDType2 = maps:put({ID, Packing}, ByType2, SyncRecordByIDType), diff --git a/apps/arweave/test/ar_test_data_sync.erl b/apps/arweave/test/ar_test_data_sync.erl index 66ad5e2ff..1a5d59d7e 100644 --- a/apps/arweave/test/ar_test_data_sync.erl +++ b/apps/arweave/test/ar_test_data_sync.erl @@ -10,7 +10,7 @@ tx/2, tx/3, tx/4, wait_until_syncs_chunk/2, wait_until_syncs_chunks/1, wait_until_syncs_chunks/2, wait_until_syncs_chunks/3, get_tx_offset/2, get_tx_data/1, - post_random_blocks/1, get_records_with_proofs/3, post_proofs/4, + post_random_blocks/1, get_records_with_proofs/3, post_proofs/4, post_proofs/5, generate_random_split/1, generate_random_original_split/1, generate_random_standard_split/0, generate_random_original_v1_split/0]). @@ -316,10 +316,18 @@ post_blocks(Wallet, BlockMap) -> ). post_proofs(Peer, B, TX, Chunks) -> + post_proofs(Peer, B, TX, Chunks, false). +post_proofs(Peer, B, TX, Chunks, IsTemporary) -> Proofs = build_proofs(B, TX, Chunks), + + HttpStatus = case IsTemporary of + true -> <<"303">>; + false -> <<"200">> + end, + lists:foreach( fun({_, Proof}) -> - {ok, {{<<"200">>, _}, _, _, _, _}} = + {ok, {{HttpStatus, _}, _, _, _, _}} = ar_test_node:post_chunk(Peer, ar_serialize:jsonify(Proof)) end, Proofs