Skip to content

Commit

Permalink
fix: set the worker timeout long enough for all workers to finish
Browse files Browse the repository at this point in the history
this should prevent the situation where jobs that were tuck in
the message queue would get timed out, causing more jobs to be
pushed to the message queue
  • Loading branch information
JamesPiechota committed Feb 7, 2025
1 parent ccec386 commit 8c9314b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 23 deletions.
62 changes: 42 additions & 20 deletions apps/arweave/src/ar_tx_emitter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
%% Remove identifiers of recently emitted transactions from the cache after this long.
-define(CLEANUP_RECENTLY_EMITTED_TIMEOUT, 60 * 60 * 1000).

%% How long to wait for a reply from the emitter worker before considering it failed.
-define(WORKER_TIMEOUT, 30 * 1000).
-define(WORKER_CONNECT_TIMEOUT, 1 * 1000).
-define(WORKER_REQUEST_TIMEOUT, 5 * 1000).

%% How frequently to check whether new transactions are appeared for distribution.
-define(CHECK_MEMPOOL_FREQUENCY, 1000).

-record(state, {
currently_emitting,
workers
workers,
%% How long to wait for a reply from the emitter worker before considering it failed.
worker_failed_timeout

}).

%% How many transactions to send to emitters at one go. With CHUNK_SIZE=1, the propagation
Expand All @@ -42,8 +45,19 @@ start_link(Name, Workers) ->

init(Workers) ->
gen_server:cast(?MODULE, process_chunk),

NumWorkers = length(Workers),
NumPeers = max_propagation_peers(),
JobsPerWorker = (?CHUNK_SIZE * NumPeers) div NumWorkers,
%% Only time out a worker after we've given enough time for *all* workers to complete
%% their tasks (including a small 1000 ms buffer). This should prevent a cascade where
%% worker queues keep growing.
WorkerFailedTimeout = JobsPerWorker *
(?WORKER_CONNECT_TIMEOUT + ?WORKER_REQUEST_TIMEOUT + 1000),

State = #state{ workers = queue:from_list(Workers)
, currently_emitting = sets:new()
, worker_failed_timeout = WorkerFailedTimeout
},
{ok, State}.

Expand All @@ -66,20 +80,18 @@ handle_cast(process_chunk, State) ->
% prepare to emit chunk(s)
PropagationQueue = ar_mempool:get_propagation_queue(),
PropagationMax = max_propagation_peers(),
{Q2, Emitting2} = emit(PropagationQueue, Q, Emitting, Peers, PropagationMax, ?CHUNK_SIZE),
State2 = emit(
PropagationQueue, Peers, PropagationMax, ?CHUNK_SIZE, State),

% check later if emit/6 returns an empty set
case sets:is_empty(Emitting2) of
case sets:is_empty(State2#state.currently_emitting) of
true ->
ar_util:cast_after(?CHECK_MEMPOOL_FREQUENCY, ?MODULE, process_chunk);
false ->
ok
end,

NewState = State#state{ workers = Q2
, currently_emitting = Emitting2
},
{noreply, NewState};
{noreply, State2};

handle_cast(Msg, State) ->
?LOG_ERROR([{event, unhandled_cast}, {module, ?MODULE}, {message, Msg}]),
Expand Down Expand Up @@ -140,23 +152,27 @@ max_propagation_peers() ->
{ok, Config} = application:get_env(arweave, config),
Config#config.max_propagation_peers.

emit(_Set, Q, Emitting, _Peers, _MaxPeers, N) when N =< 0 ->
{Q, Emitting};
emit(Set, Q, Emitting, Peers, MaxPeers, N) ->
emit(_Set, _Peers, _MaxPeers, N, State) when N =< 0 ->
State;
emit(Set, Peers, MaxPeers, N, State) ->
case gb_sets:is_empty(Set) of
true ->
{Q, Emitting};
State;
false ->
emit_set_not_empty(Set, Q, Emitting, Peers, MaxPeers, N)
emit_set_not_empty(Set, Peers, MaxPeers, N, State)
end.

emit_set_not_empty(Set, Q, Emitting, Peers, MaxPeers, N) ->
emit_set_not_empty(Set, Peers, MaxPeers, N, State) ->
{{Utility, TXID}, Set2} = gb_sets:take_largest(Set),
case ets:member(ar_tx_emitter_recently_emitted, TXID) of
true ->
emit(Set2, Q, Emitting, Peers, MaxPeers, N);

emit(Set2, Peers, MaxPeers, N, State);
false ->
#state{
workers = Q,
currently_emitting = Emitting,
worker_failed_timeout = WorkerFailedTimeout
} = State,
% only a subset of the whole peers list is
% taken using max_propagation_peers value.
% the first N peers will be used instead of
Expand All @@ -170,11 +186,17 @@ emit_set_not_empty(Set, Q, Emitting, Peers, MaxPeers, N) ->
% messages across all available workers.
Foldl = fun(Peer, {Acc, Workers}) ->
{{value, W}, Workers2} = queue:out(Workers),
gen_server:cast(W, {emit, TXID, Peer, self()}),
erlang:send_after(?WORKER_TIMEOUT, ?MODULE, {timeout, TXID, Peer}),
gen_server:cast(W,
{emit, TXID, Peer,
?WORKER_CONNECT_TIMEOUT, ?WORKER_REQUEST_TIMEOUT, self()}),
erlang:send_after(WorkerFailedTimeout, ?MODULE, {timeout, TXID, Peer}),
{sets:add_element({TXID, Peer}, Acc), queue:in(W, Workers2)}
end,
{Emitting2, Q2} = lists:foldl(Foldl, {Emitting, Q}, PeersToSync),
State2 = State#state{
workers = Q2,
currently_emitting = Emitting2
},

%% The cache storing recently emitted transactions is used instead
%% of an explicit synchronization of the propagation queue updates
Expand All @@ -184,5 +206,5 @@ emit_set_not_empty(Set, Q, Emitting, Peers, MaxPeers, N) ->
erlang:send_after(?CLEANUP_RECENTLY_EMITTED_TIMEOUT, ?MODULE,
{remove_from_recently_emitted, TXID}),
ar_events:send(tx, {emitting_scheduled, Utility, TXID}),
emit(Set2, Q2, Emitting2, Peers, MaxPeers, N - 1)
emit(Set2, Peers, MaxPeers, N - 1, State2)
end.
6 changes: 3 additions & 3 deletions apps/arweave/src/ar_tx_emitter_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ handle_call(Request, _From, State) ->
?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {request, Request}]),
{reply, ok, State}.

handle_cast({emit, TXID, Peer, ReplyTo}, State) ->
handle_cast({emit, TXID, Peer, ConnectTimeout, Timeout, ReplyTo}, State) ->
case ar_mempool:get_tx(TXID) of
not_found ->
ok;
TX ->
StartedAt = erlang:timestamp(),
Opts = #{ connect_timeout => 1
, timeout => 5
Opts = #{ connect_timeout => ConnectTimeout div 1000
, timeout => Timeout div 1000
},
emit(#{ tx_id => TXID
, peer => Peer
Expand Down

0 comments on commit 8c9314b

Please sign in to comment.