Skip to content

Commit

Permalink
fix: restrict selected peers to last 30 days, randomize a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesPiechota committed Feb 3, 2025
1 parent ba1404a commit b933cb9
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 34 deletions.
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_bridge.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ handle_info({event, block, {new, B, _}}, State) ->
{ok, Config} = application:get_env(arweave, config),
TrustedPeers = ar_peers:get_trusted_peers(),
SpecialPeers = Config#config.block_gossip_peers,
Peers = ((SpecialPeers ++ ar_peers:get_peers(lifetime)) -- TrustedPeers) ++ TrustedPeers,
Peers = ((SpecialPeers ++ ar_peers:get_peers(current)) -- TrustedPeers) ++ TrustedPeers,
JSON =
case B#block.height >= ar_fork:height_2_6() of
true ->
Expand Down
18 changes: 9 additions & 9 deletions apps/arweave/src/ar_data_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

-behaviour(gen_server).

-export([start_link/0, get_bucket_peers/1, collect_peers/0]).
-export([start_link/0, get_bucket_peers/1, collect_peers/0, pick_peers/2]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

Expand Down Expand Up @@ -67,6 +67,12 @@ get_bucket_peers(Bucket, Cursor, Peers) ->
PickedPeers
end.

%% @doc Return a list of peers where 80% of the peers are randomly chosen
%% from the first 20% of Peers and the other 20% of the peers are randomly
%% chosen from the other 80% of Peers.
pick_peers(Peers, N) ->
pick_peers(Peers, length(Peers), N).

%%%===================================================================
%%% Generic server callbacks.
%%%===================================================================
Expand Down Expand Up @@ -176,17 +182,11 @@ terminate(_Reason, _State) ->
%%% Private functions.
%%%===================================================================

%% @doc Return a list of peers where 80% of the peers are randomly chosen
%% from the first 20% of Peers and the other 20% of the peers are randomly
%% chosen from the other 80% of Peers.
pick_peers(Peers, N) ->
pick_peers(Peers, length(Peers), N).

pick_peers(Peers, PeerLen, N) when N >= PeerLen ->
Peers;
pick_peers([], _PeerLen, _N) ->
[];
pick_peers(_Peers, _PeerLen, 0) ->
pick_peers(_Peers, _PeerLen, N) when N =< 0 ->
[];
pick_peers(Peers, PeerLen, N) ->
%% N: the target number of peers to pick
Expand All @@ -204,7 +204,7 @@ pick_peers(Peers, PeerLen, N) ->

collect_peers() ->
N = ?DATA_DISCOVERY_COLLECT_PEERS_COUNT,
%% rank peers by their current rating since we care about their recent throughput performance
%% rank peers by current rating since we care about their recent throughput performance
collect_peers(lists:sublist(ar_peers:get_peers(current), N)).

collect_peers([Peer | Peers]) ->
Expand Down
7 changes: 5 additions & 2 deletions apps/arweave/src/ar_data_sync_worker_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,15 @@ cut_peer_queue(MaxQueue, PeerTasks, State) ->
TasksToCut when TasksToCut > 0 ->
%% The peer has a large queue of tasks. Reduce the queue size by removing the
%% oldest tasks.
{TaskQueue2, _} = queue:split(MaxQueue, TaskQueue),
?LOG_DEBUG([{event, cut_peer_queue},
{peer, ar_util:format_peer(Peer)},
{task_queue_len, queue:len(TaskQueue)},
{active_count, PeerTasks#peer_tasks.active_count},
{scheduled_tasks, State#state.scheduled_task_count},
{max_queue, MaxQueue}, {tasks_to_cut, TasksToCut}]),
{TaskQueue2, _} = queue:split(MaxQueue, TaskQueue),
{max_queue, MaxQueue}, {tasks_to_cut, TasksToCut},
{task_queue_len2, queue:len(TaskQueue2)}
]),
{
PeerTasks#peer_tasks{
task_queue = TaskQueue2, task_queue_len = queue:len(TaskQueue2) },
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_node_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ get_missing_txs_and_retry(#block{ txs = TXIDs }, _Worker)
ok;
get_missing_txs_and_retry(BShadow, Worker) ->
get_missing_txs_and_retry(BShadow#block.indep_hash, BShadow#block.txs,
Worker, ar_peers:get_peers(lifetime), [], 0).
Worker, ar_peers:get_peers(current), [], 0).

get_missing_txs_and_retry(_H, _TXIDs, _Worker, _Peers, _TXs, TotalSize)
when TotalSize > ?BLOCK_TX_DATA_SIZE_LIMIT ->
Expand Down
39 changes: 21 additions & 18 deletions apps/arweave/src/ar_peers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ stats(Ranking, Peers) ->
).

discover_peers() ->
case get_peers(lifetime) of
case get_peers(current) of
[] ->
ok;
Peers ->
Expand Down Expand Up @@ -427,7 +427,7 @@ handle_cast({warning, Peer}, State) ->
Performance = update_rating(Peer, false),
case Performance#performance.average_success < ?MINIMUM_SUCCESS of
true ->
remove_peer(Peer);
remove_peer(low_success, Peer);
false ->
ok
end,
Expand All @@ -445,7 +445,7 @@ handle_info({event, block, {rejected, Reason, _H, Peer}}, State) when Peer /= no
case {IssueBan, IssueWarning, Ignore} of
{true, false, false} ->
ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME),
remove_peer(Peer);
remove_peer(banned, Peer);
{false, true, false} ->
issue_warning(Peer, block_rejected, Reason);
{false, false, true} ->
Expand Down Expand Up @@ -664,7 +664,7 @@ maybe_rotate_peer_ports(Peer) ->
PortMap2 = shift_port_map_left(PortMap),
PortMap3 = erlang:setelement(MaxSize, PortMap2, Port),
ets:insert(?MODULE, {{peer_ip, IP}, {PortMap3, MaxSize}}),
remove_peer(RemovedPeer)
remove_peer(rotated, RemovedPeer)
end
end
end.
Expand Down Expand Up @@ -874,6 +874,8 @@ calculate_ema(OldEMA, Value, Alpha) ->

maybe_add_peer(Peer, Release) ->
maybe_rotate_peer_ports(Peer),
%% If we've just added his peer, flag it as active and connected.
connected_peer(Peer),
case ets:lookup(?MODULE, {peer, Peer}) of
[{_, #performance{ release = Release }}] ->
ok;
Expand All @@ -888,10 +890,11 @@ maybe_add_peer(Peer, Release) ->
end
end.

remove_peer(RemovedPeer) ->
remove_peer(Reason, RemovedPeer) ->
?LOG_DEBUG([
{event, remove_peer},
{peer, ar_util:format_peer(RemovedPeer)}
{peer, ar_util:format_peer(RemovedPeer)},
{reason, Reason}
]),
Performance = get_or_init_performance(RemovedPeer),
TotalLifetimeRating = get_total_rating(lifetime),
Expand Down Expand Up @@ -1072,15 +1075,15 @@ rotate_peer_ports_test() ->
maybe_rotate_peer_ports(Peer),
[{_, {PortMap, 1}}] = ets:lookup(?MODULE, {peer_ip, {2, 2, 2, 2}}),
?assertEqual(1, element(1, PortMap)),
remove_peer(Peer),
remove_peer(test, Peer),
?assertEqual([], ets:lookup(?MODULE, {peer_ip, {2, 2, 2, 2}})),
maybe_rotate_peer_ports(Peer),
Peer2 = {2, 2, 2, 2, 2},
maybe_rotate_peer_ports(Peer2),
[{_, {PortMap2, 2}}] = ets:lookup(?MODULE, {peer_ip, {2, 2, 2, 2}}),
?assertEqual(1, element(1, PortMap2)),
?assertEqual(2, element(2, PortMap2)),
remove_peer(Peer),
remove_peer(test, Peer),
[{_, {PortMap3, 2}}] = ets:lookup(?MODULE, {peer_ip, {2, 2, 2, 2}}),
?assertEqual(empty_slot, element(1, PortMap3)),
?assertEqual(2, element(2, PortMap3)),
Expand Down Expand Up @@ -1128,22 +1131,22 @@ rotate_peer_ports_test() ->
?assertEqual(4, element(3, PortMap7)),
?assertEqual(5, element(4, PortMap7)),
?assertEqual(11, element(10, PortMap7)),
remove_peer(Peer4),
remove_peer(test, Peer4),
[{_, {PortMap8, 10}}] = ets:lookup(?MODULE, {peer_ip, {2, 2, 2, 2}}),
?assertEqual(empty_slot, element(3, PortMap8)),
?assertEqual(3, element(2, PortMap8)),
?assertEqual(5, element(4, PortMap8)),
remove_peer(Peer2),
remove_peer(Peer3),
remove_peer(Peer5),
remove_peer(Peer6),
remove_peer(Peer7),
remove_peer(Peer8),
remove_peer(Peer9),
remove_peer(Peer10),
remove_peer(test, Peer2),
remove_peer(test, Peer3),
remove_peer(test, Peer5),
remove_peer(test, Peer6),
remove_peer(test, Peer7),
remove_peer(test, Peer8),
remove_peer(test, Peer9),
remove_peer(test, Peer10),
[{_, {PortMap9, 10}}] = ets:lookup(?MODULE, {peer_ip, {2, 2, 2, 2}}),
?assertEqual(11, element(10, PortMap9)),
remove_peer(Peer11),
remove_peer(test, Peer11),
?assertEqual([], ets:lookup(?MODULE, {peer_ip, {2, 2, 2, 2}})).

update_rating_test() ->
Expand Down
6 changes: 3 additions & 3 deletions apps/arweave/src/ar_poller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ handle_cast(collect_peers, #state{ pause = true } = State) ->
handle_cast(collect_peers, State) ->
#state{ worker_count = N, workers = Workers } = State,
TrustedPeers = ar_util:pick_random(ar_peers:get_trusted_peers(), N div 3),
Peers = ar_peers:get_peers(lifetime),
PickedPeers = TrustedPeers ++ lists:sublist((Peers -- TrustedPeers),
N - length(TrustedPeers)),
Peers = ar_peers:get_peers(current),
OtherPeers = ar_data_discovery:pick_peers(Peers -- TrustedPeers, N - length(TrustedPeers)),
PickedPeers = TrustedPeers ++ OtherPeers,
start_polling_peers(Workers, PickedPeers),
ar_util:cast_after(?COLLECT_PEERS_FREQUENCY_MS, ?MODULE, collect_peers),
{noreply, State};
Expand Down

0 comments on commit b933cb9

Please sign in to comment.