Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jp/cm partition table 20240515 #571

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions apps/arweave/src/ar_coordination.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
start_link/0, computed_h1/2, compute_h2_for_peer/2, computed_h2_for_peer/1,
get_public_state/0, send_h1_batch_to_peer/0, stat_loop/0, get_peers/1, get_peer/1,
update_peer/2, remove_peer/1, garbage_collect/0, is_exit_peer/0,
get_unique_partitions_list/0, get_self_plus_external_partitions_list/0,
get_partition_table/0, get_self_plus_external_partitions_list/0,
get_cluster_partitions_list/0
]).

Expand Down Expand Up @@ -378,12 +378,11 @@ get_peer(PartitionNumber, State) ->
send_h1(Candidate, State) ->
#mining_candidate{
partition_number2 = PartitionNumber2, cm_h1_list = H1List } = Candidate,
Peer = get_peer(PartitionNumber2, State),
case get_peer(PartitionNumber2, State) of
none ->
ok;
Peer ->
Candidate2 = Candidate#mining_candidate { label = <<"cm">> },
Candidate2 = Candidate#mining_candidate{ label = <<"cm">> },
spawn(fun() ->
ar_http_iface_client:cm_h1_send(Peer, Candidate2)
end),
Expand Down Expand Up @@ -438,6 +437,7 @@ remove_mining_peer(Peer, State) ->

refetch_peer_partitions(Peers) ->
spawn(fun() ->

ar_util:pmap(
fun(Peer) ->
case ar_http_iface_client:get_cm_partition_table(Peer) of
Expand All @@ -461,9 +461,21 @@ refetch_peer_partitions(Peers) ->
refetch_pool_peer_partitions() ->
gen_server:cast(?MODULE, refetch_pool_peer_partitions).

get_unique_partitions_list() ->
Set = get_unique_partitions_set(ar_mining_io:get_partitions(), sets:new()),
lists:sort(sets:to_list(Set)).
get_partition_table() ->
lists:foldl(
fun({BucketSize, Bucket, {spora_2_6, MiningAddress}}, Acc) ->
[
{[
{bucket, Bucket},
{bucketsize, BucketSize},
{addr, ar_util:encode(MiningAddress)}
]}
| Acc
]
end,
sets:new(),
ar_mining_io:get_storage_modules()
)

get_unique_partitions_set() ->
get_unique_partitions_set(ar_mining_io:get_partitions(), sets:new()).
Expand Down
235 changes: 67 additions & 168 deletions apps/arweave/src/ar_http_iface_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
get_pool_cm_jobs/2, post_pool_cm_jobs/2, post_cm_partition_table_to_pool/2]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_pricing.hrl").
-include_lib("arweave/include/ar_config.hrl").
-include_lib("arweave/include/ar_data_sync.hrl").
-include_lib("arweave/include/ar_data_discovery.hrl").
Expand Down Expand Up @@ -556,114 +555,74 @@ get_previous_vdf_session(Peer) ->
Reply
end.

%% -----------------------------------------------------------------------------
%% Coordinated Mining and Pool Requests
%% -----------------------------------------------------------------------------

get_cm_partition_table(Peer) ->
{Peer3, Headers, BasePath, IsPeerRequest} =
case Peer of
{pool, URL} ->
{Peer2, Path2} = get_peer_and_path_from_url(URL),
{Peer2, pool_client_headers(), Path2, false};
_ ->
{Peer, cm_p2p_headers(), "", true}
end,
handle_cm_partition_table_response(ar_http:req(#{
peer => Peer3,
method => get,
path => BasePath ++ "/coordinated_mining/partition_table",
timeout => 5 * 1000,
connect_timeout => 500,
headers => Headers,
is_peer_request => IsPeerRequest
})).
Req = build_cm_or_pool_request(get, Peer, "/coordinated_mining/partition_table"),
handle_cm_partition_table_response(ar_http:req(Req)).

cm_h1_send(Peer, Candidate) ->
{Peer3, Headers, BasePath, IsPeerRequest} =
case Peer of
{pool, URL} ->
{Peer2, Path2} = get_peer_and_path_from_url(URL),
{Peer2, pool_client_headers(), Path2, false};
_ ->
{Peer, cm_p2p_headers(), "", true}
end,
JSON =
case is_binary(Candidate) of
true ->
Candidate;
false ->
ar_serialize:jsonify(ar_serialize:candidate_to_json_struct(Candidate))
end,
handle_cm_noop_response(ar_http:req(#{
peer => Peer3,
method => post,
path => BasePath ++ "/coordinated_mining/h1",
timeout => 5 * 1000,
connect_timeout => 500,
headers => Headers,
body => JSON,
is_peer_request => IsPeerRequest
})).
JSON = ar_serialize:jsonify(ar_serialize:candidate_to_json_struct(Candidate)),
Req = build_cm_or_pool_request(post, Peer, "/coordinated_mining/h1", JSON),
handle_cm_noop_response(ar_http:req(Req)).

cm_h2_send(Peer, Candidate) ->
{Peer3, Headers, BasePath, IsPeerRequest} =
case Peer of
{pool, URL} ->
{Peer2, Path2} = get_peer_and_path_from_url(URL),
{Peer2, pool_client_headers(), Path2, false};
_ ->
{Peer, cm_p2p_headers(), "", true}
end,
JSON =
case is_binary(Candidate) of
true ->
Candidate;
false ->
ar_serialize:jsonify(ar_serialize:candidate_to_json_struct(Candidate))
end,
handle_cm_noop_response(ar_http:req(#{
peer => Peer3,
method => post,
path => BasePath ++ "/coordinated_mining/h2",
timeout => 5 * 1000,
connect_timeout => 500,
headers => Headers,
body => JSON,
is_peer_request => IsPeerRequest
})).
JSON = ar_serialize:jsonify(ar_serialize:candidate_to_json_struct(Candidate)),
Req = build_cm_or_pool_request(post, Peer, "/coordinated_mining/h2", JSON),
handle_cm_noop_response(ar_http:req(Req)).

cm_publish_send(Peer, Solution) ->
?LOG_DEBUG([{event, cm_publish_send}, {peer, ar_util:format_peer(Peer)},
{solution, ar_util:encode(Solution#mining_solution.solution_hash)},
{step_number, Solution#mining_solution.step_number},
{start_interval_number, Solution#mining_solution.start_interval_number},
{seed, ar_util:encode(Solution#mining_solution.seed)}]),
JSON = ar_serialize:solution_to_json_struct(Solution),
handle_cm_noop_response(ar_http:req(#{
peer => Peer,
method => post,
path => "/coordinated_mining/publish",
timeout => 5 * 1000,
connect_timeout => 500,
headers => cm_p2p_headers(),
body => ar_serialize:jsonify(JSON)
})).
JSON = ar_serialize:jsonify(ar_serialize:solution_to_json_struct(Solution)),
Req = build_cm_or_pool_request(post, Peer, "/coordinated_mining/publish", JSON),
handle_cm_noop_response(ar_http:req(Req)).

%% @doc Fetch the jobs from the pool or coordinated mining exit peer.
get_jobs(Peer, PrevOutput) ->
{Peer3, Headers, BasePath, IsPeerRequest} =
case Peer of
{pool, URL} ->
{Peer2, Path2} = get_peer_and_path_from_url(URL),
{Peer2, pool_client_headers(), Path2, false};
_ ->
{Peer, cm_p2p_headers(), "", true}
Req = build_cm_or_pool_request(get, Peer,
"/jobs/" ++ binary_to_list(ar_util:encode(PrevOutput))),
handle_get_jobs_response(ar_http:req(Req)).

%% @doc Post the partial solution to the pool or coordinated mining exit peer.
post_partial_solution(Peer, Solution) ->
Payload =
case is_binary(Solution) of
true ->
Solution;
false ->
ar_serialize:jsonify(ar_serialize:solution_to_json_struct(Solution))
end,
handle_get_jobs_response(ar_http:req(#{
peer => Peer3,
method => get,
path => BasePath ++ "/jobs/" ++ binary_to_list(ar_util:encode(PrevOutput)),
timeout => 5 * 1000,
connect_timeout => 1000,
headers => Headers,
is_peer_request => IsPeerRequest
Req = build_cm_or_pool_request(post, Peer, "/partial_solution", Payload),
handle_post_partial_solution_response(ar_http:req(Req#{
timeout => 20 * 1000,
connect_timeout => 5 * 1000
})).

get_pool_cm_jobs(Peer, Jobs) ->
JSON = ar_serialize:jsonify(ar_serialize:pool_cm_jobs_to_json_struct(Jobs)),
Req = build_cm_or_pool_request(post, Peer, "/pool_cm_jobs", JSON),
handle_get_pool_cm_jobs_response(ar_http:req(Req#{
connect_timeout => 1000
})).

post_pool_cm_jobs(Peer, Payload) ->
Req = build_cm_or_pool_request(post, Peer, "/pool_cm_jobs", Payload),
handle_post_pool_cm_jobs_response(ar_http:req(Req#{
timeout => 10 * 1000,
connect_timeout => 2000
})).

post_cm_partition_table_to_pool(Peer, Payload) ->
Req = build_cm_or_pool_request(post, Peer, "/coordinated_mining/partition_table", Payload),
handle_cm_partition_table_response(ar_http:req(Req#{
timeout => 10 * 1000,
connect_timeout => 2000
})).

get_peer_and_path_from_url(URL) ->
Expand All @@ -681,8 +640,9 @@ get_peer_and_path_from_url(URL) ->
end,
{Peer, binary_to_list(P)}.

%% @doc Post the partial solution to the pool or coordinated mining exit peer.
post_partial_solution(Peer, Solution) ->
build_cm_or_pool_request(Method, Peer, Path) ->
build_cm_or_pool_request(Method, Peer, Path, <<>>).
build_cm_or_pool_request(Method, Peer, Path, Body) ->
{Peer3, Headers, BasePath, IsPeerRequest} =
case Peer of
{pool, URL} ->
Expand All @@ -691,83 +651,22 @@ post_partial_solution(Peer, Solution) ->
_ ->
{Peer, cm_p2p_headers(), "", true}
end,
Headers2 = add_header(<<"content-type">>, <<"application/json">>, Headers),
Payload =
case is_binary(Solution) of
true ->
Solution;
false ->
ar_serialize:jsonify(ar_serialize:solution_to_json_struct(Solution))
end,
handle_post_partial_solution_response(ar_http:req(#{
peer => Peer3,
method => post,
path => BasePath ++ "/partial_solution/",
timeout => 20 * 1000,
connect_timeout => 5 * 1000,
headers => Headers2,
body => Payload,
is_peer_request => IsPeerRequest
})).

get_pool_cm_jobs(Peer, Jobs) ->
{Peer3, Headers, BasePath, IsPeerRequest} =
case Peer of
{pool, URL} ->
{Peer2, Path2} = get_peer_and_path_from_url(URL),
{Peer2, pool_client_headers(), Path2, false};
_ ->
{Peer, cm_p2p_headers(), "", true}
end,
Struct = ar_serialize:pool_cm_jobs_to_json_struct(Jobs),
Payload = ar_serialize:jsonify(Struct),
handle_get_pool_cm_jobs_response(ar_http:req(#{
Headers2 = case Method of
get ->
Headers;
_ ->
add_header(<<"content-type">>, <<"application/json">>, Headers)
end,
#{
peer => Peer3,
method => post,
path => BasePath ++ "/pool_cm_jobs",
method => Method,
path => BasePath ++ Path,
timeout => 5 * 1000,
connect_timeout => 1000,
headers => Headers,
body => Payload,
is_peer_request => IsPeerRequest
})).

post_pool_cm_jobs(Peer, Payload) ->
{Peer3, Headers, BasePath, IsPeerRequest} =
case Peer of
{pool, URL} ->
{Peer2, Path2} = get_peer_and_path_from_url(URL),
{Peer2, pool_client_headers(), Path2, false};
_ ->
{Peer, cm_p2p_headers(), "", true}
end,
Headers2 = add_header(<<"content-type">>, <<"application/json">>, Headers),
handle_post_pool_cm_jobs_response(ar_http:req(#{
peer => Peer3,
method => post,
path => BasePath ++ "/pool_cm_jobs",
body => Payload,
timeout => 10 * 1000,
connect_timeout => 2000,
connect_timeout => 500,
headers => Headers2,
body => Body,
is_peer_request => IsPeerRequest
})).

post_cm_partition_table_to_pool(Peer, Payload) ->
{pool, URL} = Peer,
{Peer2, BasePath} = get_peer_and_path_from_url(URL),
Headers = pool_client_headers(),
Headers2 = add_header(<<"content-type">>, <<"application/json">>, Headers),
handle_cm_partition_table_response(ar_http:req(#{
peer => Peer2,
method => post,
path => BasePath ++ "/coordinated_mining/partition_table",
body => Payload,
timeout => 10 * 1000,
connect_timeout => 2000,
headers => Headers2,
is_peer_request => false
})).
}.

handle_get_pool_cm_jobs_response({ok, {{<<"200">>, _}, _, Body, _, _}}) ->
case catch ar_serialize:json_map_to_pool_cm_jobs(
Expand Down
4 changes: 1 addition & 3 deletions apps/arweave/src/ar_http_iface_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_config.hrl").
-include_lib("arweave/include/ar_mining.hrl").
-include_lib("arweave/include/ar_pricing.hrl").
-include_lib("arweave/include/ar_data_sync.hrl").
-include_lib("arweave/include/ar_data_discovery.hrl").
-include_lib("arweave/include/ar_consensus.hrl").

-include_lib("arweave/include/ar_pool.hrl").

Expand Down Expand Up @@ -1359,7 +1357,7 @@ handle(<<"GET">>, [<<"coordinated_mining">>, <<"partition_table">>], Req, _Pid)
%% CM miners ask each other about their local
%% partitions. A CM exit node is not an exception - it
%% does NOT aggregate peer partitions in this case.
ar_coordination:get_unique_partitions_list()
ar_coordination:get_partition_table()
end,
JSON = ar_serialize:jsonify(Partitions),
{200, #{}, JSON, Req}
Expand Down
12 changes: 0 additions & 12 deletions apps/arweave/src/ar_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

-export([register/0, get_status_class/1]).

-include_lib("arweave/include/ar_pricing.hrl").

%%%===================================================================
%%% Public interface.
%%%===================================================================
Expand Down Expand Up @@ -252,16 +250,6 @@ register() ->
"data synced. The partition label breaks the mining rate down by partition. "
"The overall mining rate is inidcated by 'total'."}
]),
prometheus_gauge:new([
{name, cm_h1_batch},
{labels, [peer, direction]},
{help, "The number of coordinated mining batches of H1 hashes processed per second. "
"When direction is 'to' that indicates the number of batches sent over HTTP "
"to 'peer'. When direction is 'from' that indicates the number of batches "
"that get processed (read and hashed) from 'peer' - note: because a peer may "
"aggregate incoming batches before processing them the 'to' and 'from' "
"numbers may not match."}
]),
prometheus_gauge:new([
{name, cm_h1_rate},
{labels, [peer, direction]},
Expand Down
Loading
Loading