Skip to content

Commit

Permalink
Fix cache cycling
Browse files Browse the repository at this point in the history
  • Loading branch information
shizzard committed Feb 7, 2025
1 parent 0bfbc52 commit e360728
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_chunk_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ get_groups(Cache0) ->
-spec add_chunk_to_existing_group(
GroupId :: term(),
ChunkId :: term(),
Chunk :: binary() | {binary(), #{}},
Chunk :: binary() | ?CACHE_VALUE(binary(), #{}),
Cache0 :: #ar_chunk_cache{}
) ->
{ok, Cache1 :: #ar_chunk_cache{}} | {error, Reason :: term()}.
Expand Down
37 changes: 18 additions & 19 deletions apps/arweave/src/ar_mining_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
-define(MSG_SET_DIFFICULTY(DiffPair), {set_difficulty, DiffPair}).
-define(MSG_SET_CACHE_LIMITS(SubChunkCacheLimitBytes, VDFQueueLimit), {set_cache_limits, SubChunkCacheLimitBytes, VDFQueueLimit}).
-define(MSG_REMOVE_SUB_CHUNKS_FROM_CACHE(SubChunkCount, Candidate), {remove_sub_chunks_from_cache, SubChunkCount, Candidate}).
-define(MSG_CHECK_WORKER_STATUS, check_worker_status).
-define(MSG_HANDLE_TASK, handle_task).
-define(MSG_CHECK_WORKER_STATUS, {check_worker_status}).
-define(MSG_HANDLE_TASK, {handle_task}).
-define(MSG_GARBAGE_COLLECT(StartTime, GCResult), {garbage_collect, StartTime, GCResult}).
-define(MSG_GARBAGE_COLLECT, garbage_collect).
-define(MSG_GARBAGE_COLLECT, {garbage_collect}).
-define(MSG_FETCHED_LAST_MOMENT_PROOF(Any), {fetched_last_moment_proof, Any}).

%%%===================================================================
Expand Down Expand Up @@ -216,7 +216,7 @@ handle_cast(?MSG_HANDLE_TASK, #state{ task_queue = Q } = State) ->
{{_Priority, _ID, Task}, Q2} = gb_sets:take_smallest(Q),
{TaskType, Candidate, _ExtraArgs} = Task,
prometheus_gauge:dec(mining_server_task_queue_len, [TaskType]),
gen_server:cast(self(), handle_task),
gen_server:cast(self(), ?MSG_HANDLE_TASK),
case is_session_valid(State, Candidate) of
true ->
handle_task(Task, State#state{ task_queue = Q2 });
Expand All @@ -240,7 +240,7 @@ handle_cast(?MSG_REMOVE_SUB_CHUNKS_FROM_CACHE(SubChunkCount, Candidate), State)

handle_cast(?MSG_CHECK_WORKER_STATUS, State) ->
maybe_warn_about_lag(State#state.task_queue, State#state.name),
ar_util:cast_after(?STATUS_CHECK_FREQUENCY_MS, self(), check_worker_status),
ar_util:cast_after(?STATUS_CHECK_FREQUENCY_MS, self(), ?MSG_CHECK_WORKER_STATUS),
{noreply, State};

handle_cast(?MSG_GARBAGE_COLLECT, State) ->
Expand Down Expand Up @@ -309,19 +309,21 @@ process_chunks(WhichChunk, Candidate, RangeStart, Nonce, NoncesPerChunk, NonceMa
[], SubChunkSize, Count, State) ->
%% No more ChunkOffsets means no more chunks have been read. Iterate through all the
%% remaining nonces and remove the full chunks from the cache.
gen_server:cast(self(),
{remove_sub_chunks_from_cache, NoncesPerChunk,
Candidate#mining_candidate{ nonce = Nonce }}),
gen_server:cast(
self(),
?MSG_REMOVE_SUB_CHUNKS_FROM_CACHE(NoncesPerChunk, Candidate#mining_candidate{ nonce = Nonce })
),
process_chunks(WhichChunk, Candidate, RangeStart, Nonce + NoncesPerChunk, NoncesPerChunk,
NonceMax, [], SubChunkSize, Count, State);
process_chunks(WhichChunk, Candidate, RangeStart, Nonce, NoncesPerChunk, NonceMax,
[{EndOffset, Chunk} | ChunkOffsets], SubChunkSize, Count, State)
when RangeStart + Nonce * SubChunkSize < EndOffset - ?DATA_CHUNK_SIZE ->
%% Nonce falls in a chunk which wasn't read from disk (e.g. because there are holes
%% in the recall range).
gen_server:cast(self(),
{remove_sub_chunks_from_cache, NoncesPerChunk,
Candidate#mining_candidate{ nonce = Nonce }}),
gen_server:cast(
self(),
?MSG_REMOVE_SUB_CHUNKS_FROM_CACHE(NoncesPerChunk, Candidate#mining_candidate{ nonce = Nonce })
),
process_chunks(WhichChunk, Candidate, RangeStart, Nonce + NoncesPerChunk, NoncesPerChunk,
NonceMax, [{EndOffset, Chunk} | ChunkOffsets], SubChunkSize, Count, State);
process_chunks(WhichChunk, Candidate, RangeStart, Nonce, NoncesPerChunk, NonceMax,
Expand Down Expand Up @@ -780,18 +782,15 @@ do_not_cache(Nonce, NonceMax, Candidate, State) ->
%% which data will be available first. The function manages that shared cache slot by either
%% caching data if its the first to arrive, or "popping" data that was previously cached. The
%% caller is responsible for taking the appropriate action based on the return value.
%%
%% do_not_cache is a special value used to prevent unnecessary data from being cached once a
%% solution has been found for a given nonce.
cycle_sub_chunk_cache(#mining_candidate{ cache_ref = CacheRef } = Candidate, Chunk, ChunkMeta, State)
when CacheRef /= not_set ->
#mining_candidate{ nonce = Nonce, session_key = SessionKey } = Candidate,
case ar_chunk_cache:take_chunk(SessionKey, {CacheRef, Nonce}, State#state.chunk_cache) of
{ok, {<<>>, _}, ChunkCache1} ->
{ok, ChunkCache2} = ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, {Chunk, ChunkMeta}, ChunkCache1),
{do_not_cache, State#state{ chunk_cache = ChunkCache2 }};
{ok, {<<>>, Meta}, ChunkCache1} ->
{ok, ChunkCache2} = ar_chunk_cache:add_chunk_to_existing_group(SessionKey, {CacheRef, Nonce}, {Chunk, ChunkMeta}, ChunkCache1),
{{<<>>, Meta}, State#state{ chunk_cache = ChunkCache2 }};
{error, chunk_not_found} ->
{ok, ChunkCache2} = ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, {Chunk, ChunkMeta}, State#state.chunk_cache),
{ok, ChunkCache2} = ar_chunk_cache:add_chunk_to_existing_group(SessionKey, {CacheRef, Nonce}, {Chunk, ChunkMeta}, State#state.chunk_cache),
{cached, State#state{ chunk_cache = ChunkCache2 }};
{ok, RetVal, ChunkCache1} ->
{RetVal, State#state{ chunk_cache = ChunkCache1 }}
Expand Down Expand Up @@ -824,7 +823,7 @@ remove_sub_chunks_from_cache(#mining_candidate{ cache_ref = CacheRef } = Candida

cache_chunk(Data, Candidate, State) ->
#mining_candidate{ cache_ref = CacheRef, nonce = Nonce, session_key = SessionKey } = Candidate,
{ok, Cache1} = ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, Data, State#state.chunk_cache),
{ok, Cache1} = ar_chunk_cache:add_chunk_to_existing_group(SessionKey, {CacheRef, Nonce}, Data, State#state.chunk_cache),
State#state{
chunk_cache = Cache1
}.
Expand Down

0 comments on commit e360728

Please sign in to comment.