Skip to content

Commit

Permalink
Adjust cache size
Browse files Browse the repository at this point in the history
  • Loading branch information
shizzard committed Feb 4, 2025
1 parent a4ad4a0 commit 5de5e45
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 57 deletions.
52 changes: 31 additions & 21 deletions apps/arweave/src/ar_chunk_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@



-define(CACHE_VALUE(Chunk, Meta), {Chunk, Meta}).



%%%===================================================================
%%% Public API.
%%%===================================================================
Expand Down Expand Up @@ -125,14 +129,14 @@ get_groups(Cache0) ->
) ->
{ok, Cache1 :: #ar_chunk_cache{}} | {error, Reason :: term()}.

add_chunk_to_existing_group(GroupId, ChunkId, {Chunk, ChunkMeta}, Cache0) ->
add_chunk_to_existing_group(GroupId, ChunkId, ?CACHE_VALUE(Chunk, ChunkMeta), Cache0) ->
case (byte_size(Chunk) + cache_size(Cache0)) > Cache0#ar_chunk_cache.chunk_cache_limit_bytes of
true when Cache0#ar_chunk_cache.chunk_cache_limit_bytes =/= 0 -> {error, cache_limit_exceeded};
_ -> map_chunk_cache_group(GroupId, add_chunk_map_fun(ChunkId, Chunk, ChunkMeta), Cache0)
end;

add_chunk_to_existing_group(GroupId, ChunkId, Chunk, Cache0) ->
add_chunk_to_existing_group(GroupId, ChunkId, {Chunk, #{}}, Cache0).
add_chunk_to_existing_group(GroupId, ChunkId, ?CACHE_VALUE(Chunk, #{}), Cache0).



Expand All @@ -144,14 +148,14 @@ add_chunk_to_existing_group(GroupId, ChunkId, Chunk, Cache0) ->
) ->
{ok, Cache1 :: #ar_chunk_cache{}} | {error, Reason :: term()}.

add_chunk(GroupId, ChunkId, {Chunk, ChunkMeta}, Cache0) ->
add_chunk(GroupId, ChunkId, ?CACHE_VALUE(Chunk, ChunkMeta), Cache0) ->
case (byte_size(Chunk) + cache_size(Cache0)) > Cache0#ar_chunk_cache.chunk_cache_limit_bytes of
true when Cache0#ar_chunk_cache.chunk_cache_limit_bytes =/= 0 -> {error, cache_limit_exceeded};
_ -> map_chunk_cache_group(GroupId, add_chunk_map_fun(ChunkId, Chunk, ChunkMeta), Cache0, true)
end;

add_chunk(GroupId, ChunkId, Chunk, Cache0) ->
add_chunk(GroupId, ChunkId, {Chunk, #{}}, Cache0).
add_chunk(GroupId, ChunkId, ?CACHE_VALUE(Chunk, #{}), Cache0).



Expand All @@ -160,11 +164,13 @@ add_chunk(GroupId, ChunkId, Chunk, Cache0) ->

take_chunk(GroupId, ChunkId, Cache0) ->
map_chunk_cache_group(GroupId, fun(#ar_chunk_cache_group{
chunk_cache = ChunkCache0
chunk_cache = ChunkCache0,
chunk_cache_size_bytes = ChunkCacheSize0
} = Group0) ->
case maps:take(ChunkId, ChunkCache0) of
{Chunk, ChunkCache1} -> {ok, Chunk, Group0#ar_chunk_cache_group{
chunk_cache = ChunkCache1
{?CACHE_VALUE(Chunk, _Meta) = RetVal, ChunkCache1} -> {ok, RetVal, Group0#ar_chunk_cache_group{
chunk_cache = ChunkCache1,
chunk_cache_size_bytes = ChunkCacheSize0 - byte_size(Chunk)
}};
error -> {error, chunk_not_found}
end
Expand All @@ -176,19 +182,11 @@ take_chunk(GroupId, ChunkId, Cache0) ->
{ok, Cache1 :: #ar_chunk_cache{}} | {error, Reason :: term()}.

drop_chunk(GroupId, ChunkId, Cache0) ->
map_chunk_cache_group(GroupId, fun(#ar_chunk_cache_group{
chunk_cache = ChunkCache0,
chunk_cache_size_bytes = ChunkCacheSize0
} = Group0) ->
case maps:find(ChunkId, ChunkCache0) of
{ok, {Chunk, _ChunkMeta}} ->
{ok, Group0#ar_chunk_cache_group{
chunk_cache = maps:remove(ChunkId, ChunkCache0),
chunk_cache_size_bytes = ChunkCacheSize0 - byte_size(Chunk)
}};
error -> {ok, Group0}
end
end, Cache0).
case take_chunk(GroupId, ChunkId, Cache0) of
{ok, _, Cache1} -> {ok, Cache1};
{error, chunk_not_found} -> {ok, Cache0};
{error, Reason} -> {error, Reason}
end.


-spec chunk_exists(GroupId :: term(), ChunkId :: term(), Cache0 :: #ar_chunk_cache{}) ->
Expand Down Expand Up @@ -273,7 +271,7 @@ map_chunk_cache_group(GroupId, Fun, Cache0, InsertIfNotFound) ->
Cache1 = Cache0#ar_chunk_cache{
chunk_cache_groups = maps:put(GroupId, Group1, Cache0#ar_chunk_cache.chunk_cache_groups)
},
{ok, {RetVal, Cache1}};
{ok, RetVal, Cache1};
{error, Reason} -> {error, Reason}
end;
error when InsertIfNotFound ->
Expand Down Expand Up @@ -395,6 +393,18 @@ add_chunk_meta_test() ->



take_chunk_test() ->
Cache0 = new(1024),
GroupId0 = session0,
ChunkId0 = chunk0,
Data = <<"chunk_data">>,
{ok, Cache1} = add_chunk(GroupId0, ChunkId0, Data, Cache0),
?assertEqual(byte_size(Data), cache_size(Cache1)),
{ok, {Data, #{}}, Cache2} = take_chunk(GroupId0, ChunkId0, Cache1),
?assertEqual(0, cache_size(Cache2)).



set_limit_test() ->
Cache0 = new(),
Data = <<"chunk_data">>,
Expand Down
37 changes: 15 additions & 22 deletions apps/arweave/src/ar_mining_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -452,75 +452,68 @@ calculate_cache_limits(NumActivePartitions, PackingDifficulty) ->
IdealRangesPerStep = 2,
RecallRangeSize = ar_block:get_recall_range_size(PackingDifficulty),

MinimumCacheLimitMiB = max(
MinimumCacheLimitBytes = max(
1,
(IdealStepsPerPartition * IdealRangesPerStep * RecallRangeSize * NumActivePartitions)
div ?MiB
),

{ok, Config} = application:get_env(arweave, config),
OverallCacheLimitMiB = case Config#config.mining_cache_size_mb of
OverallCacheLimitBytes = case Config#config.mining_cache_size_mb of
undefined ->
MinimumCacheLimitMiB;
MinimumCacheLimitBytes;
N ->
N
N * ?MiB
end,

%% Convert the overall cache limit from MiB to sub-chunks. Each partition will track
%% their cache in terms of sub-chunks where a spora_2_6 sub-chunk is the same as a chunk,
%% and a composite sub-chunk is much smaller than a chunk.
OverallCacheLimitSubChunks = (OverallCacheLimitMiB * ?MiB) div
ar_block:get_sub_chunk_size(PackingDifficulty),

%% We shard the chunk cache across every active worker. Only workers that mine a partition
%% included in the current weave are active.
PartitionCacheLimit = max(1, OverallCacheLimitSubChunks div NumActivePartitions),
PartitionCacheLimitBytes = OverallCacheLimitBytes div NumActivePartitions,

%% Allow enough compute_h0 tasks to be queued to completely refill the chunk cache.
VDFQueueLimit = max(
1,
PartitionCacheLimit div (2 * ar_block:get_nonces_per_recall_range(PackingDifficulty))
PartitionCacheLimitBytes div (2 * ar_block:get_recall_range_size(PackingDifficulty))
),

GarbageCollectionFrequency = 4 * VDFQueueLimit * 1000,

{MinimumCacheLimitMiB, OverallCacheLimitMiB, PartitionCacheLimit, VDFQueueLimit,
{MinimumCacheLimitBytes, OverallCacheLimitBytes, PartitionCacheLimitBytes, VDFQueueLimit,
GarbageCollectionFrequency}.

maybe_update_cache_limits({_, _, PartitionCacheLimit, _, _},
#state{chunk_cache_limit = PartitionCacheLimit} = State) ->
State;
maybe_update_cache_limits(Limits, State) ->
{MinimumCacheLimitMiB, OverallCacheLimitMiB, PartitionCacheLimit, VDFQueueLimit,
{MinimumCacheLimitBytes, OverallCacheLimitBytes, PartitionCacheLimitBytes, VDFQueueLimit,
GarbageCollectionFrequency} = Limits,
maps:foreach(
fun(_Partition, Worker) ->
ar_mining_worker:set_cache_limits(
Worker, PartitionCacheLimit, VDFQueueLimit)
Worker, PartitionCacheLimitBytes, VDFQueueLimit)
end,
State#state.workers
),

ar:console(
"~nSetting the mining chunk cache size limit to ~B MiB "
"(~B sub-chunks per partition).~n",
[OverallCacheLimitMiB, PartitionCacheLimit]),
[OverallCacheLimitBytes div ?MiB, PartitionCacheLimitBytes div ?MiB]),
?LOG_INFO([{event, update_mining_cache_limits},
{overall_limit_mb, OverallCacheLimitMiB},
{per_partition_sub_chunks, PartitionCacheLimit},
{overall_limit_mb, OverallCacheLimitBytes div ?MiB},
{per_partition_sub_chunks, PartitionCacheLimitBytes div ?MiB},
{vdf_queue_limit_steps, VDFQueueLimit}]),
case OverallCacheLimitMiB < MinimumCacheLimitMiB of
case OverallCacheLimitBytes < MinimumCacheLimitBytes of
true ->
ar:console("~nChunk cache size limit (~p MiB) is below minimum limit of "
"~p MiB. Mining performance may be impacted.~n"
"Consider changing the 'mining_cache_size_mb' option.",
[OverallCacheLimitMiB, MinimumCacheLimitMiB]);
[OverallCacheLimitBytes div ?MiB, MinimumCacheLimitBytes div ?MiB]);
false -> ok
end,

State2 = reset_gc_timer(GarbageCollectionFrequency, State),
State2#state{
chunk_cache_limit = PartitionCacheLimit
chunk_cache_limit = PartitionCacheLimitBytes
}.

distribute_output(Candidate, State) ->
Expand Down
17 changes: 9 additions & 8 deletions apps/arweave/src/ar_mining_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -788,13 +788,13 @@ cycle_sub_chunk_cache(#mining_candidate{ cache_ref = CacheRef } = Candidate, Chu
when CacheRef /= not_set ->
#mining_candidate{ nonce = Nonce, session_key = SessionKey } = Candidate,
case ar_chunk_cache:take_chunk(SessionKey, {CacheRef, Nonce}, State#state.chunk_cache) of
{ok, {<<>>, ChunkCache1}} ->
ChunkCache2 = ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, {Chunk, ChunkMeta}, ChunkCache1),
{ok, {<<>>, _}, ChunkCache1} ->
{ok, ChunkCache2} = ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, {Chunk, ChunkMeta}, ChunkCache1),
{do_not_cache, State#state{ chunk_cache = ChunkCache2 }};
{error, chunk_not_found} ->
ChunkCache2 = ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, {Chunk, ChunkMeta}, State#state.chunk_cache),
{ok, ChunkCache2} = ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, {Chunk, ChunkMeta}, State#state.chunk_cache),
{cached, State#state{ chunk_cache = ChunkCache2 }};
{ok, {Data, ChunkCache1}} ->
{ok, {Data, _}, ChunkCache1} ->
{Data, State#state{ chunk_cache = ChunkCache1 }}
end.

Expand All @@ -809,14 +809,14 @@ remove_sub_chunks_from_cache(#mining_candidate{ cache_ref = CacheRef } = Candida
%% We may decrement the cache size further depending on what's already cached.
State2 = State,
State3 = case ar_chunk_cache:take_chunk(SessionKey, {CacheRef, Nonce}, State#state.chunk_cache) of
{ok, {{<<>>, _}, ChunkCache1}} ->
{ok, {<<>>, _}, ChunkCache1} ->
State2#state{ chunk_cache = ChunkCache1 };
{error, chunk_not_found} ->
cache_chunk(<<>>, Candidate, State2);
{{_Chunk, #{chunk1 := true, h1 := _H1}}, ChunkCache1} ->
{ok, {_Chunk, #{chunk1 := true, h1 := _H1}}, ChunkCache1} ->
%% If we find data from a CM peer, discard it but don't decrement the cache size
State2#state{ chunk_cache = ChunkCache1 };
{_, ChunkCache1} ->
{ok, _, ChunkCache1} ->
%% if we find any cached data, discard it and decrement the cache size
State2#state{ chunk_cache = ChunkCache1 }
end,
Expand All @@ -825,8 +825,9 @@ remove_sub_chunks_from_cache(#mining_candidate{ cache_ref = CacheRef } = Candida

cache_chunk(Data, Candidate, State) ->
#mining_candidate{ cache_ref = CacheRef, nonce = Nonce, session_key = SessionKey } = Candidate,
{ok, Cache1} = ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, Data, State#state.chunk_cache),
State#state{
chunk_cache = ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, Data, State#state.chunk_cache)
chunk_cache = Cache1
}.

cache_h1_list(_Candidate, [], State) ->
Expand Down
12 changes: 6 additions & 6 deletions apps/arweave/test/ar_test_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

%% The new, more flexible, and more user-friendly interface.
-export([boot_peers/1, wait_for_peers/1, get_config/1,set_config/2,
wait_until_joined/0, wait_until_joined/1,
wait_until_joined/0, wait_until_joined/1,
restart/0, restart/1, restart_with_config/1, restart_with_config/2,
start_other_node/4, start_node/2, start_node/3, start_coordinated/1, base_cm_config/1, mine/1,
wait_until_height/1, wait_until_height/2, wait_until_height/3, assert_wait_until_height/2, http_get_block/2, get_blocks/1,
Expand Down Expand Up @@ -922,13 +922,13 @@ wait_until_height(Node, TargetHeight) ->
wait_until_height(Node, TargetHeight, true).

wait_until_height(Node, TargetHeight, Strict) ->
{BI, Height} = case Node of
{BI, Height} = case Node of
main ->
{
wait_until_height(TargetHeight),
ar_node:get_height()
};
_ ->
_ ->
{
remote_call(Node, ?MODULE, wait_until_height, [TargetHeight],
?WAIT_UNTIL_BLOCK_HEIGHT_TIMEOUT + 500),
Expand All @@ -937,7 +937,7 @@ wait_until_height(Node, TargetHeight, Strict) ->
end,
case Strict of
true ->
?assertEqual(TargetHeight, Height,
?assertEqual(TargetHeight, Height,
iolist_to_binary(io_lib:format("Node ~p not at the expected height", [Node])));
false ->
ok
Expand Down Expand Up @@ -1332,7 +1332,7 @@ get_chunk(Node, Offset) ->
get_chunk(Node, Offset, Packing) ->
Headers = case Packing of
undefined -> [];
_ ->
_ ->
PackingBinary = iolist_to_binary(ar_serialize:encode_packing(Packing, false)),
[{<<"x-packing">>, PackingBinary}]
end,
Expand Down Expand Up @@ -1469,4 +1469,4 @@ get_genesis_chunk(EndOffset) ->
get_genesis_chunk(StartOffset, EndOffset) ->
Size = EndOffset - StartOffset,
StartValue = StartOffset div 4,
ar_weave:generate_data(StartValue, Size, <<>>).
ar_weave:generate_data(StartValue, Size, <<>>).

0 comments on commit 5de5e45

Please sign in to comment.