Skip to content

Commit

Permalink
fix: only nodes which persist the tip sync the diskpool
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesPiechota committed Feb 6, 2025
1 parent 7d64217 commit 1adf0dc
Showing 1 changed file with 14 additions and 43 deletions.
57 changes: 14 additions & 43 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,6 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
{relative_offset, EndOffset3}]),
{error, failed_to_store_chunk};
ok ->
?LOG_DEBUG([{event, stored_chunk_in_disk_pool},
{data_path_hash, ar_util:encode(DataPathHash2)},
{data_root, ar_util:encode(DataRoot)},
{offset, Offset},
{relative_offset, EndOffset3}]),
ets:insert(ar_disk_pool_data_roots,
{DataRootKey, DiskPoolDataRootValue2}),
ets:update_counter(ar_data_sync_state, disk_pool_size,
Expand Down Expand Up @@ -732,9 +727,7 @@ init({"default" = StoreID, _}) ->
disk_pool_cursor = first,
disk_pool_threshold = DiskPoolThreshold,
store_id = StoreID,
sync_status = init_sync_status(StoreID),
range_start = DiskPoolThreshold,
range_end = DiskPoolThreshold
sync_status = init_sync_status(StoreID)
},
?LOG_INFO([{event, ar_data_sync_start}, {store_id, StoreID},
{range_start, State2#sync_data_state.range_start},
Expand Down Expand Up @@ -765,8 +758,6 @@ init({"default" = StoreID, _}) ->
ets:insert(ar_data_sync_state, {chunk_cache_size, 0}),
timer:apply_interval(200, ?MODULE, record_chunk_cache_size_metric, []),
gen_server:cast(self(), process_store_chunk_queue),
gen_server:cast(self(), sync_intervals),
gen_server:cast(self(), collect_peer_intervals),
{ok, State2};
init({StoreID, RepackInPlacePacking}) ->
?LOG_INFO([{event, ar_data_sync_start}, {store_id, StoreID}]),
Expand All @@ -784,7 +775,10 @@ init({StoreID, RepackInPlacePacking}) ->
range_start = RangeStart,
range_end = RangeEnd,
packing = ar_storage_module:get_packing(StoreID),
sync_status = init_sync_status(StoreID)
sync_status = init_sync_status(StoreID),
%% weave_size and disk_pool_threshold will be set on join
weave_size = 0,
disk_pool_threshold = 0
},
gen_server:cast(self(), sync_intervals),
gen_server:cast(self(), sync_data),
Expand Down Expand Up @@ -951,7 +945,7 @@ handle_cast({collect_peer_intervals, Start, End}, State) when Start >= End ->
{noreply, State#sync_data_state{ all_peers_intervals = #{} }};
handle_cast({collect_peer_intervals, Start, End}, State) ->
#sync_data_state{ sync_intervals_queue = Q,
store_id = StoreID, disk_pool_threshold = DiskPoolThreshold,
store_id = StoreID, weave_size = WeaveSize,
all_peers_intervals = AllPeersIntervals } = State,
IsJoined =
case ar_node:is_joined() of
Expand Down Expand Up @@ -1014,13 +1008,7 @@ handle_cast({collect_peer_intervals, Start, End}, State) ->
true ->
ok;
false ->
End2 = case StoreID == "default" of
true ->
End;
false ->
min(End, DiskPoolThreshold)
end,

End2 = min(End, WeaveSize),
case Start >= End2 of
true ->
ar_util:cast_after(500, self(), {collect_peer_intervals, Start, End});
Expand Down Expand Up @@ -1350,6 +1338,9 @@ handle_call(Request, _From, State) ->
?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {request, Request}]),
{reply, ok, State}.

handle_info({event, node_state, {initialized, B}}, State) ->
{noreply, State#sync_data_state{ weave_size = B#block.weave_size }};

handle_info({event, node_state, {search_space_upper_bound, Bound}}, State) ->
{noreply, State#sync_data_state{ disk_pool_threshold = Bound }};

Expand Down Expand Up @@ -2576,11 +2567,7 @@ reset_orphaned_data_roots_disk_pool_timestamps(DataRootKeySet) ->
).

store_sync_state(#sync_data_state{ store_id = "default" } = State) ->
#sync_data_state{
block_index = BI, disk_pool_threshold = DiskPoolThreshold,
weave_size = WeaveSize } = State,
%% default storage_module syncs the disk pool.
State2 = State#sync_data_state{ range_start = DiskPoolThreshold, range_end = WeaveSize },
#sync_data_state{ block_index = BI } = State,
DiskPoolDataRoots = ets:foldl(
fun({DataRootKey, V}, Acc) -> maps:put(DataRootKey, V, Acc) end, #{},
ar_disk_pool_data_roots),
Expand All @@ -2595,7 +2582,7 @@ store_sync_state(#sync_data_state{ store_id = "default" } = State) ->
ok ->
ok
end,
State2;
State;
store_sync_state(State) ->
State.

Expand Down Expand Up @@ -2967,7 +2954,7 @@ process_invalid_fetched_chunk(Peer, Byte, State, Event, ExtraLogs) ->
{noreply, State}.

process_valid_fetched_chunk(ChunkArgs, Args, State) ->
#sync_data_state{ store_id = StoreID } = State,
#sync_data_state{ store_id = StoreID, disk_pool_threshold = DiskPoolThreshold } = State,
{Packing, UnpackedChunk, AbsoluteEndOffset, TXRoot, ChunkSize} = ChunkArgs,
{AbsoluteTXStartOffset, TXSize, DataPath, TXPath, DataRoot, Chunk, _ChunkID,
ChunkEndOffset, Peer, Byte} = Args,
Expand All @@ -2985,7 +2972,7 @@ process_valid_fetched_chunk(ChunkArgs, Args, State) ->
{noreply, State};
false ->
true = AbsoluteEndOffset == AbsoluteTXStartOffset + ChunkEndOffset,
case StoreID == "default" of
case AbsoluteEndOffset >= DiskPoolThreshold of
true ->
add_chunk(
DataRoot, DataPath, UnpackedChunk, ChunkEndOffset - 1, TXSize),
Expand Down Expand Up @@ -3222,11 +3209,6 @@ process_disk_pool_item(State, Key, Value) ->
case ets:member(ar_data_sync_state, move_data_root_index_migration_complete) of
true ->
ok = ar_kv:delete(DiskPoolChunksIndex, Key),
?LOG_DEBUG([{event, removed_chunk_from_disk_pool},
{reason, disk_pool_chunk_data_root_expired},
{data_path_hash, ar_util:encode(DataPathHash)},
{data_doot, ar_util:encode(DataRoot)},
{relative_offset, Offset}]),
ok = delete_chunk_data(ChunkDataKey, StoreID),
decrease_occupied_disk_pool_size(ChunkSize, DataRootKey);
false ->
Expand Down Expand Up @@ -3287,11 +3269,6 @@ delete_disk_pool_chunk(Iterator, Args, State) ->
none ->
ok = ar_kv:delete(DiskPoolChunksIndex, DiskPoolKey),
ok = delete_chunk_data(ChunkDataKey, StoreID),
?LOG_DEBUG([{event, removed_chunk_from_disk_pool},
{reason, rotation},
{data_path_hash, ar_util:encode(DataPathHash)},
{data_doot, ar_util:encode(DataRoot)},
{relative_offset, Offset}]),
DataRootKey = data_root_index_get_key(Iterator),
decrease_occupied_disk_pool_size(ChunkSize, DataRootKey);
{TXArgs, Iterator2} ->
Expand Down Expand Up @@ -3417,12 +3394,6 @@ process_disk_pool_immature_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset
case update_chunks_index({AbsoluteOffset, Offset, ChunkDataKey,
TXRoot, DataRoot, TXPath, ChunkSize, unpacked}, State) of
ok ->
?LOG_DEBUG([{event, indexed_disk_pool_chunk},
{data_path_hash, ar_util:encode(DataPathHash)},
{data_root, ar_util:encode(DataRoot)},
{absolute_end_offset, AbsoluteOffset},
{relative_offset, Offset},
{chunk_data_key, ar_util:encode(element(5, Args))}]),
gen_server:cast(self(), {process_disk_pool_chunk_offsets, Iterator, false,
Args}),
{noreply, State};
Expand Down

0 comments on commit 1adf0dc

Please sign in to comment.