Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesPiechota committed Jan 30, 2025
1 parent 71a1b52 commit 090f1fe
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 76 deletions.
10 changes: 6 additions & 4 deletions apps/arweave/e2e/ar_e2e.erl
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ start_source_node(Node, unpacked, _WalletFixture) ->

ar_e2e:assert_partition_size(Node, 0, unpacked),
ar_e2e:assert_partition_size(Node, 1, unpacked),
ar_e2e:assert_partition_size(Node, 2, unpacked, floor(0.5*?PARTITION_SIZE)),
ar_e2e:assert_empty_partition(Node, 3, unpacked),

ar_e2e:assert_syncs_range(Node, ?PARTITION_SIZE, 2*?PARTITION_SIZE),
ar_e2e:assert_syncs_range(Node, 0, 4*?PARTITION_SIZE),
ar_e2e:assert_chunks(Node, unpacked, Chunks),

?LOG_INFO("Source node ~p assertions passed.", [Node]),
Expand Down Expand Up @@ -148,10 +150,10 @@ start_source_node(Node, PackingType, WalletFixture) ->

ar_e2e:assert_partition_size(Node, 0, SourcePacking),
ar_e2e:assert_partition_size(Node, 1, SourcePacking),
ar_e2e:assert_partition_size(Node, 2, SourcePacking, floor(0.5*?PARTITION_SIZE)),
ar_e2e:assert_empty_partition(Node, 3, SourcePacking),

ar_e2e:assert_syncs_range(Node,
?PARTITION_SIZE,
2*?PARTITION_SIZE + ar_storage_module:get_overlap(SourcePacking)),
ar_e2e:assert_syncs_range(Node, 0, 4*?PARTITION_SIZE),
ar_e2e:assert_chunks(Node, SourcePacking, Chunks),

?LOG_INFO("Source node ~p assertions passed.", [Node]),
Expand Down
32 changes: 5 additions & 27 deletions apps/arweave/e2e/ar_repack_mine_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,55 +57,33 @@ test_repack_mine({FromPackingType, ToPackingType}) ->
mining_addr = AddrB
}),

ar_e2e:assert_syncs_range(
RepackerNode,
0,
floor(2.5*?PARTITION_SIZE)),
ar_e2e:assert_syncs_range(RepackerNode, 0, 4*?PARTITION_SIZE),
ar_e2e:assert_partition_size(RepackerNode, 0, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 1, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 2, ToPacking, floor(0.5*?PARTITION_SIZE)),
ar_e2e:assert_empty_partition(RepackerNode, 3, ToPacking),
ar_e2e:assert_does_not_sync_range(
RepackerNode,
floor(2.5*?PARTITION_SIZE),
3*?PARTITION_SIZE),
ar_e2e:assert_chunks(RepackerNode, ToPacking, Chunks),


ar_test_node:restart_with_config(RepackerNode, Config#config{
storage_modules = StorageModules,
mining_addr = AddrB
}),
ar_e2e:assert_syncs_range(
RepackerNode,
0,
floor(2.5*?PARTITION_SIZE)),
ar_e2e:assert_syncs_range(RepackerNode, 0, 4*?PARTITION_SIZE),
ar_e2e:assert_partition_size(RepackerNode, 0, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 1, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 2, ToPacking, floor(0.5*?PARTITION_SIZE)),
ar_e2e:assert_empty_partition(RepackerNode, 3, ToPacking),
ar_e2e:assert_does_not_sync_range(
RepackerNode,
floor(2.5*?PARTITION_SIZE),
3*?PARTITION_SIZE),
ar_e2e:assert_chunks(RepackerNode, ToPacking, Chunks),

case {FromPackingType, ToPackingType} of
{_, unpacked} ->
case ToPackingType of
unpacked ->
ok;
{unpacked, _} ->
%% Due to how we set up the unpacked repacker node, the final disk pool test won't
%% work (the repacker node only has the chunks below the disk pool threshold).
ar_e2e:assert_mine_and_validate(RepackerNode, ValidatorNode, ToPacking);
_ ->
ar_e2e:assert_mine_and_validate(RepackerNode, ValidatorNode, ToPacking),

%% Now that we mined a block, the rest of partition 2 is below the disk pool
%% threshold
ar_e2e:assert_syncs_range(
RepackerNode,
0,
3*?PARTITION_SIZE),
ar_e2e:assert_syncs_range(RepackerNode, 0, 4*?PARTITION_SIZE),
ar_e2e:assert_partition_size(RepackerNode, 0, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 1, ToPacking),
ar_e2e:assert_partition_size(RepackerNode, 2, ToPacking),
Expand Down
30 changes: 16 additions & 14 deletions apps/arweave/e2e/ar_sync_pack_mine_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ setup_source_node(PackingType) ->
{Blocks, Chunks, PackingType}.

instantiator(GenesisData, SinkPackingType, TestFun) ->
{timeout, 300, {with, {GenesisData, SinkPackingType}, [TestFun]}}.
{timeout, 600, {with, {GenesisData, SinkPackingType}, [TestFun]}}.

%% --------------------------------------------------------------------------------------------
%% Test Registration
Expand Down Expand Up @@ -292,23 +292,27 @@ test_disk_pool_threshold({SourcePackingType, SinkPackingType}) ->
SourceNode = peer1,
SinkNode = peer2,

%% When the source packing type is unpacked, this setup process performs some
%% extra disk pool checks:
%% 1. spin up a spora_2_6 node and mine some blocks
%% 2. some chunks are below the disk pool threshold and some above
%% 3. spin up an unpacked node and sync from spora_2_6
%% 4. shut down the spora_2_6 node
%% 5. now the unpacked node should have synced all of the chunks, both above and below
%% the disk pool threshold
%% 6. proceed with test and spin up the sink node and confirm it too can sink all chunks
%% from the unpacked source node - both above and below the disk pool threshold
{Blocks, Chunks, SourcePackingType} = setup_source_node(SourcePackingType),
[B0 | _] = Blocks,

SinkPacking = start_sink_node(SinkNode, SourceNode, B0, SinkPackingType),
%% Partition 1 and half of partition 2 are below the disk pool threshold
ar_e2e:assert_syncs_range(
SinkNode,
?PARTITION_SIZE,
floor(2.5*?PARTITION_SIZE)),
ar_e2e:assert_syncs_range(SinkNode, ?PARTITION_SIZE, 4*?PARTITION_SIZE),
ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking),
ar_e2e:assert_partition_size(SinkNode, 2, SinkPacking, floor(0.5*?PARTITION_SIZE)),
ar_e2e:assert_empty_partition(SinkNode, 3, SinkPacking),
ar_e2e:assert_does_not_sync_range(
SinkNode,
floor(2.5*?PARTITION_SIZE),
3*?PARTITION_SIZE),
% ar_e2e:assert_chunks(SinkNode, SinkPacking, Chunks),
ar_e2e:assert_does_not_sync_range(SinkNode, 0, ?PARTITION_SIZE),
ar_e2e:assert_chunks(SinkNode, SinkPacking, Chunks),

case SinkPackingType of
unpacked ->
Expand All @@ -318,13 +322,11 @@ test_disk_pool_threshold({SourcePackingType, SinkPackingType}) ->

%% Now that we mined a block, the rest of partition 2 is below the disk pool
%% threshold
ar_e2e:assert_syncs_range(
SinkNode,
?PARTITION_SIZE,
3*?PARTITION_SIZE),
ar_e2e:assert_syncs_range(SinkNode, ?PARTITION_SIZE, 4*?PARTITION_SIZE),
ar_e2e:assert_partition_size(SinkNode, 2, SinkPacking),
%% All of partition 3 is still above the disk pool threshold
ar_e2e:assert_empty_partition(SinkNode, 3, SinkPacking),
ar_e2e:assert_does_not_sync_range(SinkNode, 0, ?PARTITION_SIZE),
ok
end.

Expand Down
38 changes: 10 additions & 28 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ is_chunk_proof_ratio_attractive(ChunkSize, TXSize, DataPath) ->
%% The item is removed from the disk pool when the chunk's offset
%% drops below the disk pool threshold.
add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
?LOG_DEBUG([{event, add_chunk}, {data_root, ar_util:encode(DataRoot)},
{offset, Offset}, {tx_size, TXSize}]),
DataRootIndex = {data_root_index, "default"},
[{_, DiskPoolSize}] = ets:lookup(ar_data_sync_state, disk_pool_size),
DiskPoolChunksIndex = {disk_pool_chunks_index, "default"},
Expand Down Expand Up @@ -190,7 +188,6 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
%% yet.
EndOffset2, TXStartOffset, 5) of
true ->
?LOG_DEBUG([{event, chunk_synced_disk_pool}, {end_offset, EndOffset2}]),
synced;
false ->
{ok, {DataPathHash, DiskPoolChunkKey, PassedState2}}
Expand All @@ -209,7 +206,6 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
synced ->
ok;
{synced_disk_pool, EndOffset4} ->
?LOG_DEBUG([{event, synced_disk_pool}, {end_offset, EndOffset4}]),
case is_estimated_long_term_chunk(DataRootOffsetReply, EndOffset4) of
false ->
temporary;
Expand Down Expand Up @@ -269,8 +265,6 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
Value :: DataPath :: binary() | {Chunk :: binary(), DataPath :: binary()}) ->
ok | {error, term()}.
put_chunk_data(ChunkDataKey, StoreID, Value) ->
?LOG_DEBUG([{event, put_chunk_data}, {chunk_data_key, ar_util:encode(ChunkDataKey)},
{store_id, StoreID}]),
ar_kv:put({chunk_data_db, StoreID}, ChunkDataKey, term_to_binary(Value)).

get_chunk_data(ChunkDataKey, StoreID) ->
Expand Down Expand Up @@ -832,7 +826,6 @@ handle_cast({join, RecentBI}, State) ->
repair_data_root_offset_index(BI, State),
DiskPoolThreshold = get_disk_pool_threshold(RecentBI),
ets:insert(ar_data_sync_state, {disk_pool_threshold, DiskPoolThreshold}),
?LOG_DEBUG([{event, join}, {store_id, StoreID}, {disk_pool_threshold, DiskPoolThreshold}]),
State2 =
State#sync_data_state{
weave_size = WeaveSize,
Expand Down Expand Up @@ -896,8 +889,6 @@ handle_cast({add_tip_block, BlockTXPairs, BI}, State) ->
ar_events:send(sync_record, {global_cut, BlockStartOffset}),
DiskPoolThreshold = get_disk_pool_threshold(BI),
ets:insert(ar_data_sync_state, {disk_pool_threshold, DiskPoolThreshold}),
?LOG_DEBUG([{event, add_tip_block}, {store_id, StoreID},
{disk_pool_threshold, DiskPoolThreshold}, {weave_size, WeaveSize}]),
State2 = State#sync_data_state{ weave_size = WeaveSize,
block_index = BI, disk_pool_threshold = DiskPoolThreshold },
State3 = case StoreID == "default" of
Expand Down Expand Up @@ -1029,8 +1020,6 @@ handle_cast({collect_peer_intervals, Start, End}, State) ->
false ->
min(End, DiskPoolThreshold)
end,
?LOG_DEBUG([{event, collect_peer_intervals}, {store_id, StoreID},
{s, Start}, {e, End}, {e2, End2}]),

case Start >= End2 of
true ->
Expand Down Expand Up @@ -1574,9 +1563,6 @@ do_sync_data(State) ->
%% storage_module
Intervals = get_unsynced_intervals_from_other_storage_modules(
StoreID, "default", RangeStart, min(RangeEnd, DiskPoolThreshold)),
?LOG_INFO([{event, do_sync_data}, {store_id, StoreID}, {range_start, RangeStart},
{range_end, RangeEnd}, {disk_pool_threshold, DiskPoolThreshold},
{default_intervals, length(Intervals)}]),
gen_server:cast(self(), sync_data2),
%% Find all storage_modules that might include the target chunks (e.g. neighboring
%% storage_modules with an overlap, or unpacked copies used for packing, etc...)
Expand Down Expand Up @@ -2178,7 +2164,6 @@ move_disk_pool_index(Cursor, State) ->
none ->
ok;
{ok, Key, Value} ->
?LOG_DEBUG([{event, move_disk_pool_index}, {old, Old}, {new, New}, {key, ar_util:encode(Key)}]),
ok = ar_kv:put(New, Key, Value),
ok = ar_kv:delete(Old, Key),
move_disk_pool_index(Key, State)
Expand Down Expand Up @@ -2992,13 +2977,16 @@ process_valid_fetched_chunk(ChunkArgs, Args, State) ->
true = AbsoluteEndOffset == AbsoluteTXStartOffset + ChunkEndOffset,
case StoreID == "default" andalso AbsoluteEndOffset > DiskPoolThreshold of
true ->
{BlockStartOffset, BlockEndOffset, TXRoot} = ar_block_index:get_block_bounds(Byte),
BlockRelativeOffset = AbsoluteEndOffset - BlockStartOffset,
TxRelativeOffset = BlockRelativeOffset - AbsoluteTXStartOffset,
Result = add_chunk(DataRoot, DataPath, UnpackedChunk, ChunkEndOffset - 1, TXSize),
?LOG_DEBUG([{event, add_chunk}, {store_id, StoreID},
{result, Result}, {absolute_end_offset, AbsoluteEndOffset}]),
{noreply, State};
Result = add_chunk(
DataRoot, DataPath, UnpackedChunk, ChunkEndOffset - 1, TXSize),
case Result of
{error, Reason} ->
?LOG_ERROR([{event, failed_to_add_chunk_to_disk_pool},
{end_offset, AbsoluteEndOffset}, {reason, Reason}]),
{noreply, State};
_ ->
{noreply, State}
end;
false ->
pack_and_store_chunk({DataRoot, AbsoluteEndOffset, TXPath, TXRoot,
DataPath, Packing, ChunkEndOffset, ChunkSize, Chunk,
Expand Down Expand Up @@ -3122,9 +3110,6 @@ store_chunk2(ChunkArgs, Args, State) ->
DataPathHash = crypto:hash(sha256, DataPath),
ShouldStoreInChunkStorage = ar_chunk_storage:is_storage_supported(AbsoluteOffset,
ChunkSize, Packing),
?LOG_DEBUG([{event, store_chunk2}, {should_store_in_chunk_storage, ShouldStoreInChunkStorage},
{absolute_offset, AbsoluteOffset}, {chunk_size, ChunkSize}, {packing, Packing},
{store_id, StoreID}]),
CleanRecord =
case {ShouldStoreInChunkStorage, ar_storage_module:get_packing(StoreID)} of
{true, {replica_2_9, _}} ->
Expand Down Expand Up @@ -3553,9 +3538,6 @@ process_disk_pool_matured_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset,
gen_server:cast(self(), process_disk_pool_item),
{noreply, deregister_currently_processed_disk_pool_key(Key, State)};
{ok, {Chunk, DataPath}} ->
?LOG_DEBUG([{event, disk_pool_matured_chunk}, {data_path_hash, ar_util:encode(DataPathHash)},
{data_root, ar_util:encode(DataRoot)}, {absolute_end_offset, AbsoluteOffset},
{relative_offset, Offset}, {chunk_data_key, ar_util:encode(element(5, Args))}]),
increment_chunk_cache_size(),
Args2 = {DataRoot, AbsoluteOffset, TXPath, TXRoot, DataPath, unpacked,
Offset, ChunkSize, Chunk, Chunk, none, none},
Expand Down
3 changes: 0 additions & 3 deletions apps/arweave/src/ar_sync_record.erl
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ name(StoreID) ->
add2(End, Start, ID, State) ->
#state{ sync_record_by_id = SyncRecordByID, state_db = StateDB,
store_id = StoreID } = State,
?LOG_DEBUG([{event, add2}, {s, Start}, {e, End}, {id, ID}, {store_id, StoreID}]),
SyncRecord = maps:get(ID, SyncRecordByID, ar_intervals:new()),
SyncRecord2 = ar_intervals:add(SyncRecord, End, Start),
SyncRecordByID2 = maps:put(ID, SyncRecord2, SyncRecordByID),
Expand All @@ -453,7 +452,6 @@ add2(End, Start, ID, State) ->
add2(End, Start, Packing, ID, State) ->
#state{ sync_record_by_id = SyncRecordByID, sync_record_by_id_type = SyncRecordByIDType,
state_db = StateDB, store_id = StoreID } = State,
?LOG_DEBUG([{event, add2}, {s, Start}, {e, End}, {id, ID}, {packing, ar_serialize:encode_packing(Packing, true)}, {store_id, StoreID}]),
ByType = maps:get({ID, Packing}, SyncRecordByIDType, ar_intervals:new()),
ByType2 = ar_intervals:add(ByType, End, Start),
SyncRecordByIDType2 = maps:put({ID, Packing}, ByType2, SyncRecordByIDType),
Expand All @@ -478,7 +476,6 @@ add2(End, Start, Packing, ID, State) ->
delete2(End, Start, ID, State) ->
#state{ sync_record_by_id = SyncRecordByID, sync_record_by_id_type = SyncRecordByIDType,
state_db = StateDB, store_id = StoreID } = State,
?LOG_DEBUG([{event, delete2}, {s, Start}, {e, End}, {id, ID}, {store_id, StoreID}]),
SyncRecord = maps:get(ID, SyncRecordByID, ar_intervals:new()),
SyncRecord2 = ar_intervals:delete(SyncRecord, End, Start),
SyncRecordByID2 = maps:put(ID, SyncRecord2, SyncRecordByID),
Expand Down

0 comments on commit 090f1fe

Please sign in to comment.