Skip to content

Commit

Permalink
fix: only nodes which persist the tip sync the diskpool
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesPiechota committed Feb 6, 2025
1 parent 7d64217 commit 82b850e
Showing 1 changed file with 4 additions and 33 deletions.
37 changes: 4 additions & 33 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,6 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
{relative_offset, EndOffset3}]),
{error, failed_to_store_chunk};
ok ->
?LOG_DEBUG([{event, stored_chunk_in_disk_pool},
{data_path_hash, ar_util:encode(DataPathHash2)},
{data_root, ar_util:encode(DataRoot)},
{offset, Offset},
{relative_offset, EndOffset3}]),
ets:insert(ar_disk_pool_data_roots,
{DataRootKey, DiskPoolDataRootValue2}),
ets:update_counter(ar_data_sync_state, disk_pool_size,
Expand Down Expand Up @@ -765,8 +760,6 @@ init({"default" = StoreID, _}) ->
ets:insert(ar_data_sync_state, {chunk_cache_size, 0}),
timer:apply_interval(200, ?MODULE, record_chunk_cache_size_metric, []),
gen_server:cast(self(), process_store_chunk_queue),
gen_server:cast(self(), sync_intervals),
gen_server:cast(self(), collect_peer_intervals),
{ok, State2};
init({StoreID, RepackInPlacePacking}) ->
?LOG_INFO([{event, ar_data_sync_start}, {store_id, StoreID}]),
Expand Down Expand Up @@ -951,7 +944,7 @@ handle_cast({collect_peer_intervals, Start, End}, State) when Start >= End ->
{noreply, State#sync_data_state{ all_peers_intervals = #{} }};
handle_cast({collect_peer_intervals, Start, End}, State) ->
#sync_data_state{ sync_intervals_queue = Q,
store_id = StoreID, disk_pool_threshold = DiskPoolThreshold,
store_id = StoreID, weave_size = WeaveSize,
all_peers_intervals = AllPeersIntervals } = State,
IsJoined =
case ar_node:is_joined() of
Expand Down Expand Up @@ -1014,13 +1007,7 @@ handle_cast({collect_peer_intervals, Start, End}, State) ->
true ->
ok;
false ->
End2 = case StoreID == "default" of
true ->
End;
false ->
min(End, DiskPoolThreshold)
end,

End2 = min(End, WeaveSize),
case Start >= End2 of
true ->
ar_util:cast_after(500, self(), {collect_peer_intervals, Start, End});
Expand Down Expand Up @@ -2967,7 +2954,7 @@ process_invalid_fetched_chunk(Peer, Byte, State, Event, ExtraLogs) ->
{noreply, State}.

process_valid_fetched_chunk(ChunkArgs, Args, State) ->
#sync_data_state{ store_id = StoreID } = State,
#sync_data_state{ store_id = StoreID, disk_pool_threshold = DiskPoolThreshold } = State,
{Packing, UnpackedChunk, AbsoluteEndOffset, TXRoot, ChunkSize} = ChunkArgs,
{AbsoluteTXStartOffset, TXSize, DataPath, TXPath, DataRoot, Chunk, _ChunkID,
ChunkEndOffset, Peer, Byte} = Args,
Expand All @@ -2985,7 +2972,7 @@ process_valid_fetched_chunk(ChunkArgs, Args, State) ->
{noreply, State};
false ->
true = AbsoluteEndOffset == AbsoluteTXStartOffset + ChunkEndOffset,
case StoreID == "default" of
case AbsoluteEndOffset > DiskPoolThreshold of
true ->
add_chunk(
DataRoot, DataPath, UnpackedChunk, ChunkEndOffset - 1, TXSize),
Expand Down Expand Up @@ -3222,11 +3209,6 @@ process_disk_pool_item(State, Key, Value) ->
case ets:member(ar_data_sync_state, move_data_root_index_migration_complete) of
true ->
ok = ar_kv:delete(DiskPoolChunksIndex, Key),
?LOG_DEBUG([{event, removed_chunk_from_disk_pool},
{reason, disk_pool_chunk_data_root_expired},
{data_path_hash, ar_util:encode(DataPathHash)},
{data_doot, ar_util:encode(DataRoot)},
{relative_offset, Offset}]),
ok = delete_chunk_data(ChunkDataKey, StoreID),
decrease_occupied_disk_pool_size(ChunkSize, DataRootKey);
false ->
Expand Down Expand Up @@ -3287,11 +3269,6 @@ delete_disk_pool_chunk(Iterator, Args, State) ->
none ->
ok = ar_kv:delete(DiskPoolChunksIndex, DiskPoolKey),
ok = delete_chunk_data(ChunkDataKey, StoreID),
?LOG_DEBUG([{event, removed_chunk_from_disk_pool},
{reason, rotation},
{data_path_hash, ar_util:encode(DataPathHash)},
{data_doot, ar_util:encode(DataRoot)},
{relative_offset, Offset}]),
DataRootKey = data_root_index_get_key(Iterator),
decrease_occupied_disk_pool_size(ChunkSize, DataRootKey);
{TXArgs, Iterator2} ->
Expand Down Expand Up @@ -3417,12 +3394,6 @@ process_disk_pool_immature_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset
case update_chunks_index({AbsoluteOffset, Offset, ChunkDataKey,
TXRoot, DataRoot, TXPath, ChunkSize, unpacked}, State) of
ok ->
?LOG_DEBUG([{event, indexed_disk_pool_chunk},
{data_path_hash, ar_util:encode(DataPathHash)},
{data_root, ar_util:encode(DataRoot)},
{absolute_end_offset, AbsoluteOffset},
{relative_offset, Offset},
{chunk_data_key, ar_util:encode(element(5, Args))}]),
gen_server:cast(self(), {process_disk_pool_chunk_offsets, Iterator, false,
Args}),
{noreply, State};
Expand Down

0 comments on commit 82b850e

Please sign in to comment.