From ace71e78d8c46e92a2f4bfefda9d077d0c5061fa Mon Sep 17 00:00:00 2001 From: James Piechota Date: Tue, 14 May 2024 14:15:51 -0400 Subject: [PATCH 1/7] Remove dead code to fix some syntax highlighting errors The removed code as unreachable --- apps/arweave/src/ar_pricing.erl | 60 ++------------------------------- apps/arweave/src/ar_tx.erl | 16 --------- 2 files changed, 3 insertions(+), 73 deletions(-) diff --git a/apps/arweave/src/ar_pricing.erl b/apps/arweave/src/ar_pricing.erl index 39030df87..f68b0dfbc 100644 --- a/apps/arweave/src/ar_pricing.erl +++ b/apps/arweave/src/ar_pricing.erl @@ -6,10 +6,9 @@ redenominate/3, may_be_redenominate/1]). %% 2.5 exports. --export([get_tx_fee/4, get_miner_reward_and_endowment_pool/1, get_tx_fee_pre_fork_2_4/4, - usd_to_ar_rate/1, usd_to_ar/3, recalculate_usd_to_ar_rate/1, usd_to_ar_pre_fork_2_4/3, - get_miner_reward_and_endowment_pool_pre_fork_2_4/1, get_storage_cost/4, - get_expected_min_decline_rate/6]). +-export([get_tx_fee/4, get_miner_reward_and_endowment_pool/1, + usd_to_ar_rate/1, usd_to_ar/3, recalculate_usd_to_ar_rate/1, + get_storage_cost/4, get_expected_min_decline_rate/6]). %% For tests. -export([get_v2_price_per_gib_minute/2]). @@ -453,47 +452,6 @@ get_miner_reward_and_endowment_pool(Args) -> {BaseReward + Take, Pool2 - Take} end. -%% @doc Calculate the transaction fee. -get_tx_fee_pre_fork_2_4(Size, Diff, Height, Timestamp) -> - GBs = (?TX_SIZE_BASE + Size) / (1024 * 1024 * 1024), - true = Height >= ar_fork:height_2_0(), - PerGB = - usd_to_ar_pre_fork_2_4( - get_perpetual_gb_cost_at_timestamp(Timestamp, Height), - Diff, - Height - ), - StorageCost = PerGB * GBs, - HashingCost = StorageCost, - MaintenanceCost = erlang:trunc(StorageCost + HashingCost), - MinerFeeShare = get_miner_fee_share(MaintenanceCost, Height), - MaintenanceCost + MinerFeeShare. - -%% @doc Return the miner reward and the new endowment pool. -get_miner_reward_and_endowment_pool_pre_fork_2_4({Pool, TXs, unclaimed, _, _, _, _}) -> - {0, Pool + lists:sum([TX#tx.reward || TX <- TXs])}; -get_miner_reward_and_endowment_pool_pre_fork_2_4(Args) -> - {Pool, TXs, _RewardAddr, WeaveSize, Height, Diff, Timestamp} = Args, - true = Height >= ar_fork:height_2_0(), - Inflation = trunc(ar_inflation:calculate(Height)), - {PoolFeeShare, MinerFeeShare} = distribute_transaction_fees(TXs, Height), - BaseReward = Inflation + MinerFeeShare, - StorageCostPerGBPerBlock = - usd_to_ar_pre_fork_2_4( - get_gb_cost_per_block_at_timestamp(Timestamp, Height), - Diff, - Height - ), - Burden = trunc(WeaveSize * StorageCostPerGBPerBlock / (1024 * 1024 * 1024)), - Pool2 = Pool + PoolFeeShare, - case BaseReward >= Burden of - true -> - {BaseReward, Pool2}; - false -> - Take = min(Pool2, Burden - BaseReward), - {BaseReward + Take, Pool2 - Take} - end. - %% @doc Return the effective USD to AR rate corresponding to the given block %% considering its previous block. usd_to_ar_rate(#block{ height = PrevHeight } = PrevB) -> @@ -539,18 +497,6 @@ recalculate_usd_to_ar_rate(#block{ height = PrevHeight } = B) -> end end. -%% @doc Return the amount of AR the given number of USD is worth. -usd_to_ar_pre_fork_2_4(USD, Diff, Height) -> - InitialDiff = - ar_retarget:switch_to_linear_diff_pre_fork_2_4(?INITIAL_USD_TO_AR_DIFF(Height)()), - MaxDiff = ?MAX_DIFF, - DeltaP = (MaxDiff - InitialDiff) / (MaxDiff - Diff), - InitialInflation = ar_inflation:calculate(?INITIAL_USD_TO_AR_HEIGHT(Height)()), - DeltaInflation = ar_inflation:calculate(Height) / InitialInflation, - erlang:trunc( - (USD * ?WINSTON_PER_AR * DeltaInflation) / (?INITIAL_USD_PER_AR(Height)() * DeltaP) - ). - %% @doc Return an estimation for the minimum required decline rate making the given %% Amount (in Winston) sufficient to subsidize storage for Period seconds starting from %% Timestamp and assuming the given USD to AR rate. diff --git a/apps/arweave/src/ar_tx.erl b/apps/arweave/src/ar_tx.erl index bf51927f2..811559594 100644 --- a/apps/arweave/src/ar_tx.erl +++ b/apps/arweave/src/ar_tx.erl @@ -7,8 +7,6 @@ chunk_binary/2, chunks_to_size_tagged_chunks/1, sized_chunks_to_sized_chunk_ids/1, get_addresses/1, get_weave_size_increase/2, utility/1]). --export([get_wallet_fee_pre_fork_2_4/2]). - -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_pricing.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -229,20 +227,6 @@ utility(#tx{ format = 1, reward = Reward, data_size = DataSize }, _Size) utility(#tx{ reward = Reward }, _Size) -> {2, Reward}. -get_wallet_fee_pre_fork_2_4(Diff, Height) -> - case Height >= ar_fork:height_2_2() of - true -> - %% Scale the wallet fee so that is always roughly 0.1$. - {Dividend, Divisor} = ?WALLET_GEN_FEE_USD, - ar_pricing:usd_to_ar_pre_fork_2_4( - Dividend / Divisor, - Diff, - Height - ); - false -> - ?WALLET_GEN_FEE - end. - %%%=================================================================== %%% Private functions. %%%=================================================================== From d8137b024c80f531bbd72aba94f71cca1b94dc37 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Wed, 15 May 2024 13:53:07 -0400 Subject: [PATCH 2/7] Remove an overly verbose warning --- apps/arweave/src/ar_coordination.erl | 4 ++-- apps/arweave/src/ar_mining_worker.erl | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/arweave/src/ar_coordination.erl b/apps/arweave/src/ar_coordination.erl index 13ed362ec..8d554a16a 100644 --- a/apps/arweave/src/ar_coordination.erl +++ b/apps/arweave/src/ar_coordination.erl @@ -378,12 +378,11 @@ get_peer(PartitionNumber, State) -> send_h1(Candidate, State) -> #mining_candidate{ partition_number2 = PartitionNumber2, cm_h1_list = H1List } = Candidate, - Peer = get_peer(PartitionNumber2, State), case get_peer(PartitionNumber2, State) of none -> ok; Peer -> - Candidate2 = Candidate#mining_candidate { label = <<"cm">> }, + Candidate2 = Candidate#mining_candidate{ label = <<"cm">> }, spawn(fun() -> ar_http_iface_client:cm_h1_send(Peer, Candidate2) end), @@ -438,6 +437,7 @@ remove_mining_peer(Peer, State) -> refetch_peer_partitions(Peers) -> spawn(fun() -> + ar_util:pmap( fun(Peer) -> case ar_http_iface_client:get_cm_partition_table(Peer) of diff --git a/apps/arweave/src/ar_mining_worker.erl b/apps/arweave/src/ar_mining_worker.erl index 5c09d5cfc..5f7fa7fb0 100644 --- a/apps/arweave/src/ar_mining_worker.erl +++ b/apps/arweave/src/ar_mining_worker.erl @@ -503,11 +503,11 @@ handle_task({compute_h2_for_peer, Candidate}, State) -> State3 = do_not_cache(Candidate3, State2), {noreply, cache_h1_list(Candidate3, H1List, State3)}; false -> - %% This can happen if the remote peer has an outdated partition table - ?LOG_WARNING([{event, cm_outdated_partition_table}, - {worker, State#state.name}, - {partition_number, Candidate3#mining_candidate.partition_number2}, - {cm_peer, ar_util:format_peer(Peer)}]), + %% This can happen for two reasons: + %% 1. (most common) Remote peer has requested a range we don't have from a + %% partition that we do have. + %% 2. (rare, but possible) Remote peer has an outdated partition table and we + %% don't even have the requested partition. {noreply, State} end. From 9c4bbbc36d0c28a05df29b1ac483cfd26b90d28d Mon Sep 17 00:00:00 2001 From: James Piechota Date: Tue, 14 May 2024 14:17:13 -0400 Subject: [PATCH 3/7] Remove some code duplication in ar_http_iface_client --- apps/arweave/src/ar_http_iface_client.erl | 235 ++++++---------------- 1 file changed, 67 insertions(+), 168 deletions(-) diff --git a/apps/arweave/src/ar_http_iface_client.erl b/apps/arweave/src/ar_http_iface_client.erl index dce125fa6..318f05f9d 100644 --- a/apps/arweave/src/ar_http_iface_client.erl +++ b/apps/arweave/src/ar_http_iface_client.erl @@ -19,7 +19,6 @@ get_pool_cm_jobs/2, post_pool_cm_jobs/2, post_cm_partition_table_to_pool/2]). -include_lib("arweave/include/ar.hrl"). --include_lib("arweave/include/ar_pricing.hrl"). -include_lib("arweave/include/ar_config.hrl"). -include_lib("arweave/include/ar_data_sync.hrl"). -include_lib("arweave/include/ar_data_discovery.hrl"). @@ -556,78 +555,23 @@ get_previous_vdf_session(Peer) -> Reply end. +%% ----------------------------------------------------------------------------- +%% Coordinated Mining and Pool Request +%% ----------------------------------------------------------------------------- + get_cm_partition_table(Peer) -> - {Peer3, Headers, BasePath, IsPeerRequest} = - case Peer of - {pool, URL} -> - {Peer2, Path2} = get_peer_and_path_from_url(URL), - {Peer2, pool_client_headers(), Path2, false}; - _ -> - {Peer, cm_p2p_headers(), "", true} - end, - handle_cm_partition_table_response(ar_http:req(#{ - peer => Peer3, - method => get, - path => BasePath ++ "/coordinated_mining/partition_table", - timeout => 5 * 1000, - connect_timeout => 500, - headers => Headers, - is_peer_request => IsPeerRequest - })). + Req = build_cm_or_pool_request(get, Peer, "/coordinated_mining/partition_table"), + handle_cm_partition_table_response(ar_http:req(Req)). cm_h1_send(Peer, Candidate) -> - {Peer3, Headers, BasePath, IsPeerRequest} = - case Peer of - {pool, URL} -> - {Peer2, Path2} = get_peer_and_path_from_url(URL), - {Peer2, pool_client_headers(), Path2, false}; - _ -> - {Peer, cm_p2p_headers(), "", true} - end, - JSON = - case is_binary(Candidate) of - true -> - Candidate; - false -> - ar_serialize:jsonify(ar_serialize:candidate_to_json_struct(Candidate)) - end, - handle_cm_noop_response(ar_http:req(#{ - peer => Peer3, - method => post, - path => BasePath ++ "/coordinated_mining/h1", - timeout => 5 * 1000, - connect_timeout => 500, - headers => Headers, - body => JSON, - is_peer_request => IsPeerRequest - })). + JSON = ar_serialize:jsonify(ar_serialize:candidate_to_json_struct(Candidate)), + Req = build_cm_or_pool_request(post, Peer, "/coordinated_mining/h1", JSON), + handle_cm_noop_response(ar_http:req(Req)). cm_h2_send(Peer, Candidate) -> - {Peer3, Headers, BasePath, IsPeerRequest} = - case Peer of - {pool, URL} -> - {Peer2, Path2} = get_peer_and_path_from_url(URL), - {Peer2, pool_client_headers(), Path2, false}; - _ -> - {Peer, cm_p2p_headers(), "", true} - end, - JSON = - case is_binary(Candidate) of - true -> - Candidate; - false -> - ar_serialize:jsonify(ar_serialize:candidate_to_json_struct(Candidate)) - end, - handle_cm_noop_response(ar_http:req(#{ - peer => Peer3, - method => post, - path => BasePath ++ "/coordinated_mining/h2", - timeout => 5 * 1000, - connect_timeout => 500, - headers => Headers, - body => JSON, - is_peer_request => IsPeerRequest - })). + JSON = ar_serialize:jsonify(ar_serialize:candidate_to_json_struct(Candidate)), + Req = build_cm_or_pool_request(post, Peer, "/coordinated_mining/h2", JSON), + handle_cm_noop_response(ar_http:req(Req)). cm_publish_send(Peer, Solution) -> ?LOG_DEBUG([{event, cm_publish_send}, {peer, ar_util:format_peer(Peer)}, @@ -635,35 +579,50 @@ cm_publish_send(Peer, Solution) -> {step_number, Solution#mining_solution.step_number}, {start_interval_number, Solution#mining_solution.start_interval_number}, {seed, ar_util:encode(Solution#mining_solution.seed)}]), - JSON = ar_serialize:solution_to_json_struct(Solution), - handle_cm_noop_response(ar_http:req(#{ - peer => Peer, - method => post, - path => "/coordinated_mining/publish", - timeout => 5 * 1000, - connect_timeout => 500, - headers => cm_p2p_headers(), - body => ar_serialize:jsonify(JSON) - })). + JSON = ar_serialize:jsonify(ar_serialize:solution_to_json_struct(Solution)), + Req = build_cm_or_pool_request(post, Peer, "/coordinated_mining/publish", JSON), + handle_cm_noop_response(ar_http:req(Req)). %% @doc Fetch the jobs from the pool or coordinated mining exit peer. get_jobs(Peer, PrevOutput) -> - {Peer3, Headers, BasePath, IsPeerRequest} = - case Peer of - {pool, URL} -> - {Peer2, Path2} = get_peer_and_path_from_url(URL), - {Peer2, pool_client_headers(), Path2, false}; - _ -> - {Peer, cm_p2p_headers(), "", true} + Req = build_cm_or_pool_request(get, Peer, + "/jobs/" ++ binary_to_list(ar_util:encode(PrevOutput))), + handle_get_jobs_response(ar_http:req(Req)). + +%% @doc Post the partial solution to the pool or coordinated mining exit peer. +post_partial_solution(Peer, Solution) -> + Payload = + case is_binary(Solution) of + true -> + Solution; + false -> + ar_serialize:jsonify(ar_serialize:solution_to_json_struct(Solution)) end, - handle_get_jobs_response(ar_http:req(#{ - peer => Peer3, - method => get, - path => BasePath ++ "/jobs/" ++ binary_to_list(ar_util:encode(PrevOutput)), - timeout => 5 * 1000, - connect_timeout => 1000, - headers => Headers, - is_peer_request => IsPeerRequest + Req = build_cm_or_pool_request(post, Peer, "/partial_solution", Payload), + handle_post_partial_solution_response(ar_http:req(Req#{ + timeout => 20 * 1000, + connect_timeout => 5 * 1000 + })). + +get_pool_cm_jobs(Peer, Jobs) -> + JSON = ar_serialize:jsonify(ar_serialize:pool_cm_jobs_to_json_struct(Jobs)), + Req = build_cm_or_pool_request(post, Peer, "/pool_cm_jobs", JSON), + handle_get_pool_cm_jobs_response(ar_http:req(Req#{ + connect_timeout => 1000 + })). + +post_pool_cm_jobs(Peer, Payload) -> + Req = build_cm_or_pool_request(post, Peer, "/pool_cm_jobs", Payload), + handle_post_pool_cm_jobs_response(ar_http:req(Req#{ + timeout => 10 * 1000, + connect_timeout => 2000 + })). + +post_cm_partition_table_to_pool(Peer, Payload) -> + Req = build_cm_or_pool_request(post, Peer, "/coordinated_mining/partition_table", Payload), + handle_cm_partition_table_response(ar_http:req(Req#{ + timeout => 10 * 1000, + connect_timeout => 2000 })). get_peer_and_path_from_url(URL) -> @@ -681,8 +640,9 @@ get_peer_and_path_from_url(URL) -> end, {Peer, binary_to_list(P)}. -%% @doc Post the partial solution to the pool or coordinated mining exit peer. -post_partial_solution(Peer, Solution) -> +build_cm_or_pool_request(Method, Peer, Path) -> + build_cm_or_pool_request(Method, Peer, Path, <<>>). +build_cm_or_pool_request(Method, Peer, Path, Body) -> {Peer3, Headers, BasePath, IsPeerRequest} = case Peer of {pool, URL} -> @@ -691,83 +651,22 @@ post_partial_solution(Peer, Solution) -> _ -> {Peer, cm_p2p_headers(), "", true} end, - Headers2 = add_header(<<"content-type">>, <<"application/json">>, Headers), - Payload = - case is_binary(Solution) of - true -> - Solution; - false -> - ar_serialize:jsonify(ar_serialize:solution_to_json_struct(Solution)) - end, - handle_post_partial_solution_response(ar_http:req(#{ - peer => Peer3, - method => post, - path => BasePath ++ "/partial_solution/", - timeout => 20 * 1000, - connect_timeout => 5 * 1000, - headers => Headers2, - body => Payload, - is_peer_request => IsPeerRequest - })). - -get_pool_cm_jobs(Peer, Jobs) -> - {Peer3, Headers, BasePath, IsPeerRequest} = - case Peer of - {pool, URL} -> - {Peer2, Path2} = get_peer_and_path_from_url(URL), - {Peer2, pool_client_headers(), Path2, false}; - _ -> - {Peer, cm_p2p_headers(), "", true} - end, - Struct = ar_serialize:pool_cm_jobs_to_json_struct(Jobs), - Payload = ar_serialize:jsonify(Struct), - handle_get_pool_cm_jobs_response(ar_http:req(#{ + Headers2 = case Method of + get -> + Headers; + _ -> + add_header(<<"content-type">>, <<"application/json">>, Headers) + end, + #{ peer => Peer3, - method => post, - path => BasePath ++ "/pool_cm_jobs", + method => Method, + path => BasePath ++ Path, timeout => 5 * 1000, - connect_timeout => 1000, - headers => Headers, - body => Payload, - is_peer_request => IsPeerRequest - })). - -post_pool_cm_jobs(Peer, Payload) -> - {Peer3, Headers, BasePath, IsPeerRequest} = - case Peer of - {pool, URL} -> - {Peer2, Path2} = get_peer_and_path_from_url(URL), - {Peer2, pool_client_headers(), Path2, false}; - _ -> - {Peer, cm_p2p_headers(), "", true} - end, - Headers2 = add_header(<<"content-type">>, <<"application/json">>, Headers), - handle_post_pool_cm_jobs_response(ar_http:req(#{ - peer => Peer3, - method => post, - path => BasePath ++ "/pool_cm_jobs", - body => Payload, - timeout => 10 * 1000, - connect_timeout => 2000, + connect_timeout => 500, headers => Headers2, + body => Body, is_peer_request => IsPeerRequest - })). - -post_cm_partition_table_to_pool(Peer, Payload) -> - {pool, URL} = Peer, - {Peer2, BasePath} = get_peer_and_path_from_url(URL), - Headers = pool_client_headers(), - Headers2 = add_header(<<"content-type">>, <<"application/json">>, Headers), - handle_cm_partition_table_response(ar_http:req(#{ - peer => Peer2, - method => post, - path => BasePath ++ "/coordinated_mining/partition_table", - body => Payload, - timeout => 10 * 1000, - connect_timeout => 2000, - headers => Headers2, - is_peer_request => false - })). + }. handle_get_pool_cm_jobs_response({ok, {{<<"200">>, _}, _, Body, _, _}}) -> case catch ar_serialize:json_map_to_pool_cm_jobs( From 958befbba5cd118fdcf6362a800adcec1038b403 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Tue, 14 May 2024 17:34:16 -0400 Subject: [PATCH 4/7] Don't clear the chunk_cache when adding a new session. The chunk cache will be initialized the first time it is written to. And this clearing created a race condition when computing H2 for peers - sometimes the cached H1 hashes would be cleared before the chunk2s could be read --- apps/arweave/src/ar_http_iface_middleware.erl | 2 - apps/arweave/src/ar_mining_io.erl | 7 +++- apps/arweave/src/ar_mining_stats.erl | 15 -------- apps/arweave/src/ar_mining_worker.erl | 38 +++++++++++++------ 4 files changed, 32 insertions(+), 30 deletions(-) diff --git a/apps/arweave/src/ar_http_iface_middleware.erl b/apps/arweave/src/ar_http_iface_middleware.erl index 88ce61fda..b9bbf3d31 100644 --- a/apps/arweave/src/ar_http_iface_middleware.erl +++ b/apps/arweave/src/ar_http_iface_middleware.erl @@ -7,10 +7,8 @@ -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_config.hrl"). -include_lib("arweave/include/ar_mining.hrl"). --include_lib("arweave/include/ar_pricing.hrl"). -include_lib("arweave/include/ar_data_sync.hrl"). -include_lib("arweave/include/ar_data_discovery.hrl"). --include_lib("arweave/include/ar_consensus.hrl"). -include_lib("arweave/include/ar_pool.hrl"). diff --git a/apps/arweave/src/ar_mining_io.erl b/apps/arweave/src/ar_mining_io.erl index 2e884824a..72690f416 100644 --- a/apps/arweave/src/ar_mining_io.erl +++ b/apps/arweave/src/ar_mining_io.erl @@ -297,7 +297,12 @@ cached_read_range(WhichChunk, Candidate, RangeStart, StoreID, Cache) -> {_CachedTime, ChunkOffsets} -> ?LOG_DEBUG([{event, mining_debug_read_cached_recall_range}, {pid, self()}, {range_start, RangeStart}, - {store_id, StoreID}]), + {store_id, StoreID}, + {partition_number, Candidate#mining_candidate.partition_number}, + {partition_number2, Candidate#mining_candidate.partition_number2}, + {cm_peer, ar_util:format_peer(Candidate#mining_candidate.cm_lead_peer)}, + {cache_ref, Candidate#mining_candidate.cache_ref}, + {session, ar_nonce_limiter:encode_session_key(Candidate#mining_candidate.session_key)}]), {ChunkOffsets, Cache} end. diff --git a/apps/arweave/src/ar_mining_stats.erl b/apps/arweave/src/ar_mining_stats.erl index 4626ba314..94950895a 100644 --- a/apps/arweave/src/ar_mining_stats.erl +++ b/apps/arweave/src/ar_mining_stats.erl @@ -12,7 +12,6 @@ -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_config.hrl"). -include_lib("arweave/include/ar_consensus.hrl"). --include_lib("arweave/include/ar_mining.hrl"). -include_lib("eunit/include/eunit.hrl"). -record(state, { @@ -328,20 +327,6 @@ get_overall_total(PartitionPeer, Stat, TotalCurrent) -> Counts = [Count || [Count] <- Matches], lists:sum(Counts). -get_overall_average(PartitionPeer, Stat, TotalCurrent) -> - Pattern = {{PartitionPeer, '_', Stat, TotalCurrent}, '_', '$1', '$2'}, - Matches = ets:match(?MODULE, Pattern), - Counts = [Count || [_, Count] <- Matches], - AllSamples = [Samples || [Samples, _] <- Matches], - TotalCount = lists:sum(Counts), - TotalSamples = lists:sum(AllSamples), - case TotalSamples > 0 of - true -> - TotalCount / TotalSamples; - false -> - 0 - end. - get_partition_data_size(PartitionNumber) -> {ok, Config} = application:get_env(arweave, config), MiningAddress = Config#config.mining_addr, diff --git a/apps/arweave/src/ar_mining_worker.erl b/apps/arweave/src/ar_mining_worker.erl index 5f7fa7fb0..2a1f6b5c0 100644 --- a/apps/arweave/src/ar_mining_worker.erl +++ b/apps/arweave/src/ar_mining_worker.erl @@ -285,7 +285,8 @@ handle_task({chunk2, Candidate}, State) -> {partition_number2, Candidate#mining_candidate.partition_number2}, {cm_peer, ar_util:format_peer(Candidate#mining_candidate.cm_lead_peer)}, {cache_ref, Candidate#mining_candidate.cache_ref}, - {nonce, Candidate#mining_candidate.nonce}]) + {nonce, Candidate#mining_candidate.nonce}, + {session, ar_nonce_limiter:encode_session_key(SessionKey)}]) end, {noreply, State2} end; @@ -497,9 +498,9 @@ handle_task({compute_h2_for_peer, Candidate}, State) -> %% later if we find that this causes unacceptable memory bloat. RecallRangeChunks = nonce_max() + 1, State2 = update_chunk_cache_size(RecallRangeChunks, SessionKey, State), - %% First flag all nonces in the range as do_not_cache, then cache the specific nonces - %% inclueded in the H1 list. This will make sure we don't cache the chunk2s that are - %% read for the missing nonces. + %% First flag all nonces in the range as do_not_cache, then cache the specific + %% nonces included in the H1 list. This will make sure we don't cache the chunk2s + %% that are read for the missing nonces. State3 = do_not_cache(Candidate3, State2), {noreply, cache_h1_list(Candidate3, H1List, State3)}; false -> @@ -548,11 +549,24 @@ maybe_warn_about_lag(Q, Name) -> false -> case gb_sets:take_smallest(Q) of {{_Priority, _ID, {compute_h0, _}}, Q3} -> + %% Since we sample the queue asynchronously, we expect there to regularly + %% be a queue of length 1 (i.e. a task may have just been added to the + %% queue when we run this check). + %% + %% To further reduce log spam, we'll only warn if the queue is greater + %% than 2. We really only care if a queue is consistently long or if + %% it's getting longer. Temporary blips are fine. We may incrase + %% the threshold in the future. N = count_h0_tasks(Q3) + 1, - ?LOG_WARNING([ - {event, mining_worker_lags_behind_the_nonce_limiter}, - {worker, Name}, - {step_count, N}]); + case N > 2 of + true -> + ?LOG_WARNING([ + {event, mining_worker_lags_behind_the_nonce_limiter}, + {worker, Name}, + {step_count, N}]); + false -> + ok + end; _ -> ok end @@ -601,16 +615,16 @@ update_sessions(ActiveSessions, State) -> State3 = add_sessions(AddedSessions, State2), State3#state{ active_sessions = ActiveSessions }. +%% We no longer have to do anything when adding a session as the chunk cache will be +%% automatically created the first time it is updated. This function now serves to log +%% the sessions being added. add_sessions([], State) -> State; add_sessions([SessionKey | AddedSessions], State) -> ?LOG_DEBUG([{event, mining_debug_add_session}, {worker, State#state.name}, {partition, State#state.partition_number}, {session_key, ar_nonce_limiter:encode_session_key(SessionKey)}]), - State2 = State#state{ - chunk_cache = maps:put(SessionKey, #{}, State#state.chunk_cache) - }, - add_sessions(AddedSessions, State2). + add_sessions(AddedSessions, State). remove_sessions([], State) -> State; From 58a892f7e0d7f7e8cac6b6c1facdc13e4907add3 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Wed, 15 May 2024 14:19:07 -0400 Subject: [PATCH 5/7] Remove the CM batch metrics --- apps/arweave/src/ar_metrics.erl | 12 ------ apps/arweave/src/ar_mining_stats.erl | 56 +++++----------------------- 2 files changed, 10 insertions(+), 58 deletions(-) diff --git a/apps/arweave/src/ar_metrics.erl b/apps/arweave/src/ar_metrics.erl index e2aeb058f..ace3d8e8a 100644 --- a/apps/arweave/src/ar_metrics.erl +++ b/apps/arweave/src/ar_metrics.erl @@ -2,8 +2,6 @@ -export([register/0, get_status_class/1]). --include_lib("arweave/include/ar_pricing.hrl"). - %%%=================================================================== %%% Public interface. %%%=================================================================== @@ -252,16 +250,6 @@ register() -> "data synced. The partition label breaks the mining rate down by partition. " "The overall mining rate is inidcated by 'total'."} ]), - prometheus_gauge:new([ - {name, cm_h1_batch}, - {labels, [peer, direction]}, - {help, "The number of coordinated mining batches of H1 hashes processed per second. " - "When direction is 'to' that indicates the number of batches sent over HTTP " - "to 'peer'. When direction is 'from' that indicates the number of batches " - "that get processed (read and hashed) from 'peer' - note: because a peer may " - "aggregate incoming batches before processing them the 'to' and 'from' " - "numbers may not match."} - ]), prometheus_gauge:new([ {name, cm_h1_rate}, {labels, [peer, direction]}, diff --git a/apps/arweave/src/ar_mining_stats.erl b/apps/arweave/src/ar_mining_stats.erl index 94950895a..c480e36e0 100644 --- a/apps/arweave/src/ar_mining_stats.erl +++ b/apps/arweave/src/ar_mining_stats.erl @@ -32,8 +32,6 @@ current_read_mibps = 0.0, average_hash_hps = 0.0, current_hash_hps = 0.0, - average_batches_to_peer = 0.0, - average_batches_from_peer = 0.0, average_h1_to_peer_hps = 0.0, current_h1_to_peer_hps = 0.0, average_h1_from_peer_hps = 0.0, @@ -58,8 +56,6 @@ -record(peer_report, { peer, - average_batches_to_peer, - average_batches_from_peer, average_h1_to_peer_hps, current_h1_to_peer_hps, average_h1_from_peer_hps, @@ -467,18 +463,12 @@ generate_peer_report(Peer, Report) -> #report{ now = Now, peers = Peers, - average_batches_to_peer = AverageBatchesToPeer, - average_batches_from_peer = AverageBatchesFromPeer, average_h1_to_peer_hps = AverageH1ToPeer, current_h1_to_peer_hps = CurrentH1ToPeer, average_h1_from_peer_hps = AverageH1FromPeer, current_h1_from_peer_hps = CurrentH1FromPeer } = Report, PeerReport = #peer_report{ peer = Peer, - average_batches_to_peer = - get_average_samples_by_time({peer, Peer, h1_to_peer, total}, Now), - average_batches_from_peer = - get_average_samples_by_time({peer, Peer, h1_from_peer, total}, Now), average_h1_to_peer_hps = get_average_count_by_time({peer, Peer, h1_to_peer, total}, Now), current_h1_to_peer_hps = @@ -496,10 +486,6 @@ generate_peer_report(Peer, Report) -> Report#report{ peers = Peers ++ [PeerReport], - average_batches_to_peer = - AverageBatchesToPeer + PeerReport#peer_report.average_batches_to_peer, - average_batches_from_peer = - AverageBatchesFromPeer + PeerReport#peer_report.average_batches_from_peer, average_h1_to_peer_hps = AverageH1ToPeer + PeerReport#peer_report.average_h1_to_peer_hps, current_h1_to_peer_hps = @@ -532,8 +518,6 @@ set_metrics(Report) -> prometheus_gauge:set(mining_rate, [hash, total], Report#report.current_hash_hps), prometheus_gauge:set(mining_rate, [ideal_read, total], Report#report.optimal_overall_read_mibps), prometheus_gauge:set(mining_rate, [ideal_hash, total], Report#report.optimal_overall_hash_hps), - prometheus_gauge:set(cm_h1_batch, [total, to], Report#report.average_batches_to_peer), - prometheus_gauge:set(cm_h1_batch, [total, from], Report#report.average_batches_from_peer), prometheus_gauge:set(cm_h1_rate, [total, to], Report#report.current_h1_to_peer_hps), prometheus_gauge:set(cm_h1_rate, [total, from], Report#report.current_h1_from_peer_hps), prometheus_gauge:set(cm_h2_count, [total, to], Report#report.total_h2_to_peer), @@ -559,10 +543,6 @@ set_peer_metrics([]) -> ok; set_peer_metrics([PeerReport | PeerReports]) -> Peer = ar_util:format_peer(PeerReport#peer_report.peer), - prometheus_gauge:set(cm_h1_batch, [Peer, to], - PeerReport#peer_report.average_batches_to_peer), - prometheus_gauge:set(cm_h1_batch, [Peer, from], - PeerReport#peer_report.average_batches_from_peer), prometheus_gauge:set(cm_h1_rate, [Peer, to], PeerReport#peer_report.current_h1_to_peer_hps), prometheus_gauge:set(cm_h1_rate, [Peer, from], @@ -578,8 +558,6 @@ clear_metrics() -> prometheus_gauge:set(mining_rate, [read, total], 0), prometheus_gauge:set(mining_rate, [hash, total], 0), prometheus_gauge:set(mining_rate, [ideal, total], 0), - prometheus_gauge:set(cm_h1_batch, [total, to], 0), - prometheus_gauge:set(cm_h1_batch, [total, from], 0), prometheus_gauge:set(cm_h1_rate, [total, to], 0), prometheus_gauge:set(cm_h1_rate, [total, from], 0), prometheus_gauge:set(cm_h2_count, [total, to], 0), @@ -600,8 +578,6 @@ clear_peer_metrics([]) -> ok; clear_peer_metrics([PeerReport | PeerReports]) -> Peer = ar_util:format_peer(PeerReport#peer_report.peer), - prometheus_gauge:set(cm_h1_batch, [Peer, to], 0), - prometheus_gauge:set(cm_h1_batch, [Peer, from], 0), prometheus_gauge:set(cm_h1_rate, [Peer, to], 0), prometheus_gauge:set(cm_h1_rate, [Peer, from], 0), prometheus_gauge:set(cm_h2_count, [Peer, to], 0), @@ -692,19 +668,17 @@ format_peer_report(Report) -> Header = "\n" "Coordinated mining cluster stats:\n" - "+----------------------+-----------+--------------+--------------+----------+-------------+-------------+--------+--------+\n" - "| Peer | Out Batch | H1 Out (Cur) | H1 Out (Avg) | In Batch | H1 In (Cur) | H1 In (Avg) | H2 Out | H2 In |\n" - "+----------------------+-----------+--------------+--------------+----------+-------------+-------------+--------+--------+\n", + "+----------------------+--------------+--------------+-------------+-------------+--------+--------+\n" + "| Peer | H1 Out (Cur) | H1 Out (Avg) | H1 In (Cur) | H1 In (Avg) | H2 Out | H2 In |\n" + "+----------------------+--------------+--------------+-------------+-------------+--------+--------+\n", TotalRow = format_peer_total_row(Report), PartitionRows = format_peer_rows(Report#report.peers), Footer = - "+----------------------+-----------+--------------+--------------+----------+-------------+-------------+--------+--------+\n", + "+----------------------+--------------+--------------+-------------+-------------+--------+--------+\n", io_lib:format("~s~s~s~s", [Header, TotalRow, PartitionRows, Footer]). format_peer_total_row(Report) -> #report{ - average_batches_to_peer = AverageBatchTo, - average_batches_from_peer = AverageBatchFrom, average_h1_to_peer_hps = AverageH1To, current_h1_to_peer_hps = CurrentH1To, average_h1_from_peer_hps = AverageH1From, @@ -712,10 +686,10 @@ format_peer_total_row(Report) -> total_h2_to_peer = TotalH2To, total_h2_from_peer = TotalH2From } = Report, io_lib:format( - "| All | ~9B | ~8B h/s | ~8B h/s | ~8B | ~7B h/s | ~7B h/s | ~6B | ~6B |\n", + "| All | ~8B h/s | ~8B h/s | ~7B h/s | ~7B h/s | ~6B | ~6B |\n", [ - floor(AverageBatchTo), floor(CurrentH1To), floor(AverageH1To), - floor(AverageBatchFrom), floor(CurrentH1From), floor(AverageH1From), + floor(CurrentH1To), floor(AverageH1To), + floor(CurrentH1From), floor(AverageH1From), TotalH2To, TotalH2From ]). @@ -728,8 +702,6 @@ format_peer_rows([PeerReport | PeerReports]) -> format_peer_row(PeerReport) -> #peer_report{ peer = Peer, - average_batches_to_peer = AverageBatchTo, - average_batches_from_peer = AverageBatchFrom, average_h1_to_peer_hps = AverageH1To, current_h1_to_peer_hps = CurrentH1To, average_h1_from_peer_hps = AverageH1From, @@ -737,11 +709,11 @@ format_peer_row(PeerReport) -> total_h2_to_peer = TotalH2To, total_h2_from_peer = TotalH2From } = PeerReport, io_lib:format( - "| ~20s | ~9B | ~8B h/s | ~8B h/s | ~8B | ~7B h/s | ~7B h/s | ~6B | ~6B |\n", + "| ~20s | ~8B h/s | ~8B h/s | ~7B h/s | ~7B h/s | ~6B | ~6B |\n", [ ar_util:format_peer(Peer), - floor(AverageBatchTo), floor(CurrentH1To), floor(AverageH1To), - floor(AverageBatchFrom), floor(CurrentH1From), floor(AverageH1From), + floor(CurrentH1To), floor(AverageH1To), + floor(CurrentH1From), floor(AverageH1From), TotalH2To, TotalH2From ]). @@ -1342,8 +1314,6 @@ test_report(PoA1Multiplier) -> current_read_mibps = 1.25, average_hash_hps = TotalHash, current_hash_hps = TotalHash, - average_batches_to_peer = 5.0, - average_batches_from_peer = 5.0, average_h1_to_peer_hps = 50.0, current_h1_to_peer_hps = 50.0, average_h1_from_peer_hps = 50.0, @@ -1385,8 +1355,6 @@ test_report(PoA1Multiplier) -> peers = [ #peer_report{ peer = Peer3, - average_batches_to_peer = 0.0, - average_batches_from_peer = 0.0, average_h1_to_peer_hps = 0.0, current_h1_to_peer_hps = 0.0, average_h1_from_peer_hps = 0.0, @@ -1396,8 +1364,6 @@ test_report(PoA1Multiplier) -> }, #peer_report{ peer = Peer2, - average_batches_to_peer = 2.0, - average_batches_from_peer = 3.0, average_h1_to_peer_hps = 20.0, current_h1_to_peer_hps = 20.0, average_h1_from_peer_hps = 30.0, @@ -1407,8 +1373,6 @@ test_report(PoA1Multiplier) -> }, #peer_report{ peer = Peer1, - average_batches_to_peer = 3.0, - average_batches_from_peer = 2.0, average_h1_to_peer_hps = 30.0, current_h1_to_peer_hps = 30.0, average_h1_from_peer_hps = 20.0, From d65d53ae766ba2238cf1430de00ec14b71c9ce67 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Wed, 15 May 2024 21:05:55 -0400 Subject: [PATCH 6/7] Store the full storage module tuple as long as possible Only convert to store id at the end --- apps/arweave/src/ar_http_iface_client.erl | 2 +- apps/arweave/src/ar_mining_io.erl | 74 +++++++++++--------- apps/arweave/src/ar_mining_stats.erl | 84 +++++++++-------------- apps/arweave/src/ar_mining_sup.erl | 1 - apps/arweave/src/ar_mining_worker.erl | 2 +- apps/arweave/src/ar_storage_module.erl | 1 - apps/arweave/src/ar_sync_record.erl | 10 ++- 7 files changed, 85 insertions(+), 89 deletions(-) diff --git a/apps/arweave/src/ar_http_iface_client.erl b/apps/arweave/src/ar_http_iface_client.erl index 318f05f9d..8bdee059c 100644 --- a/apps/arweave/src/ar_http_iface_client.erl +++ b/apps/arweave/src/ar_http_iface_client.erl @@ -556,7 +556,7 @@ get_previous_vdf_session(Peer) -> end. %% ----------------------------------------------------------------------------- -%% Coordinated Mining and Pool Request +%% Coordinated Mining and Pool Requests %% ----------------------------------------------------------------------------- get_cm_partition_table(Peer) -> diff --git a/apps/arweave/src/ar_mining_io.erl b/apps/arweave/src/ar_mining_io.erl index 72690f416..4c99343c6 100644 --- a/apps/arweave/src/ar_mining_io.erl +++ b/apps/arweave/src/ar_mining_io.erl @@ -2,7 +2,7 @@ -behaviour(gen_server). --export([start_link/0, set_largest_seen_upper_bound/1, +-export([start_link/0, set_largest_seen_upper_bound/1, get_storage_modules/0, get_partitions/0, get_partitions/1, read_recall_range/4, garbage_collect/0]). -export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). @@ -39,13 +39,24 @@ read_recall_range(WhichChunk, Worker, Candidate, RecallRangeStart) -> gen_server:call(?MODULE, {read_recall_range, WhichChunk, Worker, Candidate, RecallRangeStart}, 60000). +get_storage_modules() -> + {ok, Config} = application:get_env(arweave, config), + MiningAddress = Config#config.mining_addr, + + lists:filter( + fun ({_BucketSize, _Bucket, {spora_2_6, Addr}}) -> Addr == MiningAddress; + (_) -> false + end, + Config#config.storage_modules + ). + get_partitions(PartitionUpperBound) when PartitionUpperBound =< 0 -> []; get_partitions(PartitionUpperBound) -> Max = ar_node:get_max_partition_number(PartitionUpperBound), lists:sort(sets:to_list( lists:foldl( - fun({Partition, MiningAddress, _StoreID}, Acc) -> + fun({Partition, MiningAddress, _StorageModule}, Acc) -> case Partition > Max of true -> Acc; @@ -69,8 +80,8 @@ init([]) -> process_flag(trap_exit, true), State = lists:foldl( - fun ({PartitionNumber, MiningAddress, StoreID}, Acc) -> - start_io_thread(PartitionNumber, MiningAddress, StoreID, Acc) + fun ({PartitionNumber, MiningAddress, StorageModule}, Acc) -> + start_io_thread(PartitionNumber, MiningAddress, StorageModule, Acc) end, #state{}, get_io_channels() @@ -157,26 +168,21 @@ terminate(_Reason, _State) -> %%% Private functions. %%%=================================================================== -%% @doc Returns tuples {PartitionNumber, MiningAddress, StoreID) covering all attached storage -%% modules (excluding the "default" storage module). The assumption is that each IO channel -%% represents a distinct 200MiB/s read channel to which we will (later) assign an IO thread. +%% @doc Returns tuples {PartitionNumber, MiningAddress, StorageModule) covering all attached +%% storage modules (excluding the "default" storage module). The assumption is that each IO +%% channel represents a distinct 200MiB/s read channel to which we will (later) assign an IO +%% thread. get_io_channels() -> - {ok, Config} = application:get_env(arweave, config), - MiningAddress = Config#config.mining_addr, - - %% First get the start/end ranges for all storage modules configured for the mining address. + %% First get the start/end ranges for all storage modules. StorageModules = lists:foldl( - fun ({BucketSize, Bucket, {spora_2_6, Addr}} = M, Acc) when Addr == MiningAddress -> + fun ({BucketSize, Bucket, {spora_2_6, MiningAddress}} = M, Acc) -> Start = Bucket * BucketSize, End = (Bucket + 1) * BucketSize, - StoreID = ar_storage_module:id(M), - [{Start, End, MiningAddress, StoreID} | Acc]; - (_Module, Acc) -> - Acc + [{Start, End, MiningAddress, M} | Acc] end, [], - Config#config.storage_modules + get_storage_modules() ), %% And then map those storage modules to partitions. @@ -184,21 +190,23 @@ get_io_channels() -> get_io_channels([], Channels) -> Channels; -get_io_channels([{Start, End, _MiningAddress, _StoreID} | StorageModules], Channels) +get_io_channels([{Start, End, _MiningAddress, _StorageModule} | StorageModules], Channels) when Start >= End -> get_io_channels(StorageModules, Channels); -get_io_channels([{Start, End, MiningAddress, StoreID} | StorageModules], Channels) -> +get_io_channels([{Start, End, MiningAddress, StorageModule} | StorageModules], Channels) -> PartitionNumber = ar_node:get_partition_number(Start), - Channels2 = [{PartitionNumber, MiningAddress, StoreID} | Channels], - StorageModules2 = [{Start + ?PARTITION_SIZE, End, MiningAddress, StoreID} | StorageModules], + Channels2 = [{PartitionNumber, MiningAddress, StorageModule} | Channels], + StorageModules2 = [{Start + ?PARTITION_SIZE, End, MiningAddress, StorageModule} | StorageModules], get_io_channels(StorageModules2, Channels2). -start_io_thread(PartitionNumber, MiningAddress, StoreID, #state{ io_threads = Threads } = State) - when is_map_key({PartitionNumber, MiningAddress, StoreID}, Threads) -> +start_io_thread(PartitionNumber, MiningAddress, StorageModule, + #state{ io_threads = Threads } = State) + when is_map_key({PartitionNumber, MiningAddress, StorageModule}, Threads) -> State; -start_io_thread(PartitionNumber, MiningAddress, StoreID, +start_io_thread(PartitionNumber, MiningAddress, StorageModule, #state{ io_threads = Threads, io_thread_monitor_refs = Refs } = State) -> Now = os:system_time(millisecond), + StoreID = ar_storage_module:id(StorageModule), Thread = spawn( fun() -> @@ -208,12 +216,12 @@ start_io_thread(PartitionNumber, MiningAddress, StoreID, _ -> ar_chunk_storage:open_files(StoreID) end, - io_thread(PartitionNumber, MiningAddress, StoreID, #{}, Now) + io_thread(PartitionNumber, MiningAddress, StorageModule, #{}, Now) end ), Ref = monitor(process, Thread), - Threads2 = maps:put({PartitionNumber, MiningAddress, StoreID}, Thread, Threads), - Refs2 = maps:put(Ref, {PartitionNumber, MiningAddress, StoreID}, Refs), + Threads2 = maps:put({PartitionNumber, MiningAddress, StorageModule}, Thread, Threads), + Refs2 = maps:put(Ref, {PartitionNumber, MiningAddress, StorageModule}, Refs), ?LOG_DEBUG([{event, started_io_mining_thread}, {partition_number, PartitionNumber}, {mining_addr, ar_util:safe_encode(MiningAddress)}, {store_id, StoreID}]), State#state{ io_threads = Threads2, io_thread_monitor_refs = Refs2 }. @@ -221,20 +229,21 @@ start_io_thread(PartitionNumber, MiningAddress, StoreID, handle_io_thread_down(Ref, Reason, #state{ io_threads = Threads, io_thread_monitor_refs = Refs } = State) -> ?LOG_WARNING([{event, mining_io_thread_down}, {reason, io_lib:format("~p", [Reason])}]), - ThreadID = {PartitionNumber, MiningAddress, StoreID} = maps:get(Ref, Refs), + ThreadID = {PartitionNumber, MiningAddress, StorageModule} = maps:get(Ref, Refs), Refs2 = maps:remove(Ref, Refs), Threads2 = maps:remove(ThreadID, Threads), - start_io_thread(PartitionNumber, MiningAddress, StoreID, + start_io_thread(PartitionNumber, MiningAddress, StorageModule, State#state{ io_threads = Threads2, io_thread_monitor_refs = Refs2 }). -io_thread(PartitionNumber, MiningAddress, StoreID, Cache, LastClearTime) -> +io_thread(PartitionNumber, MiningAddress, StorageModule, Cache, LastClearTime) -> receive {WhichChunk, {Worker, Candidate, RecallRangeStart}} -> + StoreID = ar_storage_module:id(StorageModule), {ChunkOffsets, Cache2} = get_chunks(WhichChunk, Candidate, RecallRangeStart, StoreID, Cache), send_chunks(WhichChunk, Worker, Candidate, RecallRangeStart, ChunkOffsets), {Cache3, LastClearTime2} = maybe_clear_cached_chunks(Cache2, LastClearTime), - io_thread(PartitionNumber, MiningAddress, StoreID, Cache3, LastClearTime2) + io_thread(PartitionNumber, MiningAddress, StorageModule, Cache3, LastClearTime2) end. get_packed_intervals(Start, End, MiningAddress, "default", Intervals) -> @@ -408,7 +417,8 @@ find_thread2(PartitionNumber, MiningAddress, Iterator) -> end. find_thread3([Key | Keys], RangeEnd, RangeStart, Max, MaxKey) -> - {_PartitionNumber, _MiningAddress, StoreID} = Key, + {_PartitionNumber, _MiningAddress, StorageModule} = Key, + StoreID = ar_storage_module:id(StorageModule), I = ar_sync_record:get_intersection_size(RangeEnd, RangeStart, ar_chunk_storage, StoreID), case I > Max of true -> diff --git a/apps/arweave/src/ar_mining_stats.erl b/apps/arweave/src/ar_mining_stats.erl index c480e36e0..ab26b67a3 100644 --- a/apps/arweave/src/ar_mining_stats.erl +++ b/apps/arweave/src/ar_mining_stats.erl @@ -2,7 +2,7 @@ -behaviour(gen_server). -export([start_link/0, start_performance_reports/0, pause_performance_reports/1, mining_paused/0, - set_total_data_size/1, set_storage_module_data_size/6, + set_total_data_size/1, set_storage_module_data_size/3, vdf_computed/0, raw_read_rate/2, chunk_read/1, h1_computed/1, h2_computed/1, h1_solution/0, h2_solution/0, block_found/0, h1_sent_to_peer/2, h1_received_from_peer/2, h2_sent_to_peer/1, h2_received_from_peer/1]). @@ -157,16 +157,18 @@ set_total_data_size(DataSize) -> {type, Type}, {reason, Reason}, {data_size, DataSize}]) end. -set_storage_module_data_size( - StoreID, Packing, PartitionNumber, StorageModuleSize, StorageModuleIndex, DataSize) -> +set_storage_module_data_size(PartitionNumber, StorageModule, DataSize) -> + {BucketSize, Bucket, Packing} = StorageModule, + StoreID = ar_storage_module:id(StorageModule), StoreLabel = ar_storage_module:label_by_id(StoreID), PackingLabel = ar_storage_module:packing_label(Packing), try prometheus_gauge:set(v2_index_data_size_by_packing, - [StoreLabel, PackingLabel, PartitionNumber, StorageModuleSize, StorageModuleIndex], + [StoreLabel, PackingLabel, PartitionNumber, BucketSize, Bucket], DataSize), ets:insert(?MODULE, { - {partition, PartitionNumber, storage_module, StoreID, packing, Packing}, DataSize}) + {partition, PartitionNumber, storage_module, StorageModule}, + DataSize}) catch error:badarg -> ?LOG_WARNING([{event, set_storage_module_data_size_failed}, @@ -174,24 +176,24 @@ set_storage_module_data_size( {store_id, StoreID}, {store_label, StoreLabel}, {packing, ar_chunk_storage:encode_packing(Packing)}, {packing_label, PackingLabel}, - {partition_number, PartitionNumber}, {storage_module_size, StorageModuleSize}, - {storage_module_index, StorageModuleIndex}, {data_size, DataSize}]); + {partition_number, PartitionNumber}, {storage_module_size, BucketSize}, + {storage_module_index, Bucket}, {data_size, DataSize}]); error:{unknown_metric,default,v2_index_data_size_by_packing} -> ?LOG_WARNING([{event, set_storage_module_data_size_failed}, {reason, prometheus_not_started}, {store_id, StoreID}, {store_label, StoreLabel}, {packing, ar_chunk_storage:encode_packing(Packing)}, {packing_label, PackingLabel}, - {partition_number, PartitionNumber}, {storage_module_size, StorageModuleSize}, - {storage_module_index, StorageModuleIndex}, {data_size, DataSize}]); + {partition_number, PartitionNumber}, {storage_module_size, BucketSize}, + {storage_module_index, Bucket}, {data_size, DataSize}]); Type:Reason -> ?LOG_ERROR([{event, set_storage_module_data_size_failed}, {type, Type}, {reason, Reason}, {store_id, StoreID}, {store_label, StoreLabel}, {packing, ar_chunk_storage:encode_packing(Packing)}, {packing_label, PackingLabel}, - {partition_number, PartitionNumber}, {storage_module_size, StorageModuleSize}, - {storage_module_index, StorageModuleIndex}, {data_size, DataSize} ]) + {partition_number, PartitionNumber}, {storage_module_size, BucketSize}, + {storage_module_index, Bucket}, {data_size, DataSize} ]) end. mining_paused() -> @@ -327,7 +329,7 @@ get_partition_data_size(PartitionNumber) -> {ok, Config} = application:get_env(arweave, config), MiningAddress = Config#config.mining_addr, Pattern = { - {partition, PartitionNumber, storage_module, '_', packing, {spora_2_6, MiningAddress}}, + {partition, PartitionNumber, storage_module, {'_', '_', {spora_2_6, MiningAddress}}}, '$1' }, Sizes = [Size || [Size] <- ets:match(?MODULE, Pattern)], @@ -931,57 +933,42 @@ test_data_size_stats() -> PackingAddress = <<"PACKING">>, ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.1 * ?PARTITION_SIZE), 10, unpacked}), - unpacked, 1, floor(0.1 * ?PARTITION_SIZE), 10, 101), + 1, {floor(0.1 * ?PARTITION_SIZE), 10, unpacked}, 101), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.1 * ?PARTITION_SIZE), 10, {spora_2_6, MiningAddress}}), - {spora_2_6, MiningAddress}, 1, floor(0.1 * ?PARTITION_SIZE), 10, 102), + 1, {floor(0.1 * ?PARTITION_SIZE), 10, {spora_2_6, MiningAddress}}, 102), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.1 * ?PARTITION_SIZE), 10, {spora_2_6, PackingAddress}}), - {spora_2_6, PackingAddress}, 1, floor(0.1 * ?PARTITION_SIZE), 10, 103), + 1, {floor(0.1 * ?PARTITION_SIZE), 10, {spora_2_6, PackingAddress}}, 103), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.3 * ?PARTITION_SIZE), 4, unpacked}), - unpacked, 1, floor(0.3 * ?PARTITION_SIZE), 4, 111), + 1, {floor(0.3 * ?PARTITION_SIZE), 4, unpacked}, 111), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.3 * ?PARTITION_SIZE), 4, {spora_2_6, MiningAddress}}), - {spora_2_6, MiningAddress}, 1, floor(0.3 * ?PARTITION_SIZE), 4, 112), + 1, {floor(0.3 * ?PARTITION_SIZE), 4, {spora_2_6, MiningAddress}}, 112), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.3 * ?PARTITION_SIZE), 4, {spora_2_6, PackingAddress}}), - {spora_2_6, PackingAddress}, 1, floor(0.3 * ?PARTITION_SIZE), 4, 113), + 1, {floor(0.3 * ?PARTITION_SIZE), 4, {spora_2_6, PackingAddress}}, 113), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({?PARTITION_SIZE, 2, unpacked}), - unpacked, 2, ?PARTITION_SIZE, 2, 201), + 2, {?PARTITION_SIZE, 2, unpacked}, 201), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({?PARTITION_SIZE, 2, {spora_2_6, MiningAddress}}), - {spora_2_6, MiningAddress}, 2, ?PARTITION_SIZE, 2, 202), + 2, {?PARTITION_SIZE, 2, {spora_2_6, MiningAddress}}, 202), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({?PARTITION_SIZE, 2, {spora_2_6, PackingAddress}}), - {spora_2_6, PackingAddress}, 2, ?PARTITION_SIZE, 2, 203), + 2, {?PARTITION_SIZE, 2, {spora_2_6, PackingAddress}}, 203), ?assertEqual(214, get_partition_data_size(1)), ?assertEqual(202, get_partition_data_size(2)), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.2 * ?PARTITION_SIZE), 8, unpacked}), - unpacked, 1, floor(0.2 * ?PARTITION_SIZE), 8, 121), + 1, {floor(0.2 * ?PARTITION_SIZE), 8, unpacked}, 121), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.2 * ?PARTITION_SIZE), 8, {spora_2_6, MiningAddress}}), - {spora_2_6, MiningAddress}, 1, floor(0.2 * ?PARTITION_SIZE), 8, 122), + 1, {floor(0.2 * ?PARTITION_SIZE), 8, {spora_2_6, MiningAddress}}, 122), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.2 * ?PARTITION_SIZE), 8, {spora_2_6, PackingAddress}}), - {spora_2_6, PackingAddress}, 1, floor(0.2 * ?PARTITION_SIZE), 8, 123), + 1, {floor(0.2 * ?PARTITION_SIZE), 8, {spora_2_6, PackingAddress}}, 123), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({?PARTITION_SIZE, 2, unpacked}), - unpacked, 2, ?PARTITION_SIZE, 2, 51), + 2, {?PARTITION_SIZE, 2, unpacked}, 51), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({?PARTITION_SIZE, 2, {spora_2_6, MiningAddress}}), - {spora_2_6, MiningAddress}, 2, ?PARTITION_SIZE, 2, 52), + 2, {?PARTITION_SIZE, 2, {spora_2_6, MiningAddress}}, 52), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({?PARTITION_SIZE, 2, {spora_2_6, PackingAddress}}), - {spora_2_6, PackingAddress}, 2, ?PARTITION_SIZE, 2, 53), + 2, {?PARTITION_SIZE, 2, {spora_2_6, PackingAddress}}, 53), ?assertEqual(336, get_partition_data_size(1)), ?assertEqual(52, get_partition_data_size(2)), @@ -1229,20 +1216,17 @@ test_report(PoA1Multiplier) -> WeaveSize = floor(10 * ?PARTITION_SIZE), ar_mining_stats:set_total_data_size(floor(0.6 * ?PARTITION_SIZE)), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.1 * ?PARTITION_SIZE), 10, {spora_2_6, MiningAddress}}), - {spora_2_6, MiningAddress}, 1, floor(0.1 * ?PARTITION_SIZE), 10, + 1, {floor(0.1 * ?PARTITION_SIZE), 10, {spora_2_6, MiningAddress}}, floor(0.1 * ?PARTITION_SIZE)), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.3 * ?PARTITION_SIZE), 4, {spora_2_6, MiningAddress}}), - {spora_2_6, MiningAddress}, 1, floor(0.3 * ?PARTITION_SIZE), 4, + 1, {floor(0.3 * ?PARTITION_SIZE), 4, {spora_2_6, MiningAddress}}, floor(0.2 * ?PARTITION_SIZE)), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({floor(0.2 * ?PARTITION_SIZE), 8, {spora_2_6, MiningAddress}}), - {spora_2_6, MiningAddress}, 1, floor(0.2 * ?PARTITION_SIZE), 8, + 1, {floor(0.2 * ?PARTITION_SIZE), 8, {spora_2_6, MiningAddress}}, floor(0.05 * ?PARTITION_SIZE)), ar_mining_stats:set_storage_module_data_size( - ar_storage_module:id({?PARTITION_SIZE, 2, {spora_2_6, MiningAddress}}), - {spora_2_6, MiningAddress}, 2, ?PARTITION_SIZE, 2, floor(0.25 * ?PARTITION_SIZE)), + 2, {?PARTITION_SIZE, 2, {spora_2_6, MiningAddress}}, + floor(0.25 * ?PARTITION_SIZE)), ar_mining_stats:vdf_computed(), ar_mining_stats:vdf_computed(), ar_mining_stats:vdf_computed(), diff --git a/apps/arweave/src/ar_mining_sup.erl b/apps/arweave/src/ar_mining_sup.erl index 516a60792..aa8ba63cb 100644 --- a/apps/arweave/src/ar_mining_sup.erl +++ b/apps/arweave/src/ar_mining_sup.erl @@ -7,7 +7,6 @@ -export([init/1]). -include_lib("arweave/include/ar_sup.hrl"). --include_lib("arweave/include/ar_config.hrl"). %%%=================================================================== %%% Public interface. diff --git a/apps/arweave/src/ar_mining_worker.erl b/apps/arweave/src/ar_mining_worker.erl index 2a1f6b5c0..2a4ca2b5e 100644 --- a/apps/arweave/src/ar_mining_worker.erl +++ b/apps/arweave/src/ar_mining_worker.erl @@ -555,7 +555,7 @@ maybe_warn_about_lag(Q, Name) -> %% %% To further reduce log spam, we'll only warn if the queue is greater %% than 2. We really only care if a queue is consistently long or if - %% it's getting longer. Temporary blips are fine. We may incrase + %% it's getting longer. Temporary blips are fine. We may increase %% the threshold in the future. N = count_h0_tasks(Q3) + 1, case N > 2 of diff --git a/apps/arweave/src/ar_storage_module.erl b/apps/arweave/src/ar_storage_module.erl index f36ffae56..be3319204 100644 --- a/apps/arweave/src/ar_storage_module.erl +++ b/apps/arweave/src/ar_storage_module.erl @@ -6,7 +6,6 @@ -export([get_unique_sorted_intervals/1]). --include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_consensus.hrl"). -include_lib("arweave/include/ar_config.hrl"). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/arweave/src/ar_sync_record.erl b/apps/arweave/src/ar_sync_record.erl index 82a423ee3..46e11e1bd 100644 --- a/apps/arweave/src/ar_sync_record.erl +++ b/apps/arweave/src/ar_sync_record.erl @@ -49,8 +49,10 @@ %% The partition covered by the storage module. partition_number, %% The size in bytes of the storage module; undefined for the "default" storage. + %% aka BucketSize storage_module_size, %% The index of the storage module; undefined for the "default" storage. + %% aka Bucket storage_module_index, %% The number of entries in the write-ahead log. wal @@ -633,10 +635,12 @@ store_state(State) -> ok -> maps:map( fun ({ar_data_sync, Type}, TypeRecord) -> + %% Assert that the StorageModuleSize, StorageModuleIndex, and Type + %% match storage module referenced by StoreID (they should) + StorageModule = {StorageModuleSize, StorageModuleIndex, Type} = + ar_storage_module:get_by_id(StoreID), ar_mining_stats:set_storage_module_data_size( - StoreID, Type, PartitionNumber, StorageModuleSize, - StorageModuleIndex, - ar_intervals:sum(TypeRecord)); + PartitionNumber, StorageModule, ar_intervals:sum(TypeRecord)); (_, _) -> ok end, From 3bf316ac4c6d6837cdac6de690e773c9ae074d40 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Tue, 21 May 2024 21:43:12 -0400 Subject: [PATCH 7/7] WIP --- apps/arweave/src/ar_coordination.erl | 20 +++++++++++++++---- apps/arweave/src/ar_http_iface_middleware.erl | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/apps/arweave/src/ar_coordination.erl b/apps/arweave/src/ar_coordination.erl index 8d554a16a..868c10eb8 100644 --- a/apps/arweave/src/ar_coordination.erl +++ b/apps/arweave/src/ar_coordination.erl @@ -6,7 +6,7 @@ start_link/0, computed_h1/2, compute_h2_for_peer/2, computed_h2_for_peer/1, get_public_state/0, send_h1_batch_to_peer/0, stat_loop/0, get_peers/1, get_peer/1, update_peer/2, remove_peer/1, garbage_collect/0, is_exit_peer/0, - get_unique_partitions_list/0, get_self_plus_external_partitions_list/0, + get_partition_table/0, get_self_plus_external_partitions_list/0, get_cluster_partitions_list/0 ]). @@ -461,9 +461,21 @@ refetch_peer_partitions(Peers) -> refetch_pool_peer_partitions() -> gen_server:cast(?MODULE, refetch_pool_peer_partitions). -get_unique_partitions_list() -> - Set = get_unique_partitions_set(ar_mining_io:get_partitions(), sets:new()), - lists:sort(sets:to_list(Set)). +get_partition_table() -> + lists:foldl( + fun({BucketSize, Bucket, {spora_2_6, MiningAddress}}, Acc) -> + [ + {[ + {bucket, Bucket}, + {bucketsize, BucketSize}, + {addr, ar_util:encode(MiningAddress)} + ]} + | Acc + ] + end, + sets:new(), + ar_mining_io:get_storage_modules() + ) get_unique_partitions_set() -> get_unique_partitions_set(ar_mining_io:get_partitions(), sets:new()). diff --git a/apps/arweave/src/ar_http_iface_middleware.erl b/apps/arweave/src/ar_http_iface_middleware.erl index b9bbf3d31..ae6811af1 100644 --- a/apps/arweave/src/ar_http_iface_middleware.erl +++ b/apps/arweave/src/ar_http_iface_middleware.erl @@ -1357,7 +1357,7 @@ handle(<<"GET">>, [<<"coordinated_mining">>, <<"partition_table">>], Req, _Pid) %% CM miners ask each other about their local %% partitions. A CM exit node is not an exception - it %% does NOT aggregate peer partitions in this case. - ar_coordination:get_unique_partitions_list() + ar_coordination:get_partition_table() end, JSON = ar_serialize:jsonify(Partitions), {200, #{}, JSON, Req}