diff --git a/apps/arweave/include/ar_data_sync.hrl b/apps/arweave/include/ar_data_sync.hrl index a1818d842..f8607e402 100644 --- a/apps/arweave/include/ar_data_sync.hrl +++ b/apps/arweave/include/ar_data_sync.hrl @@ -65,22 +65,6 @@ -define(EXCLUDE_MISSING_INTERVAL_TIMEOUT_MS, 10 * 60 * 1000). -endif. -%% Let at least this many chunks stack up, per storage module, then write them on disk in the -%% ascending order, to reduce out-of-order disk writes causing fragmentation. --ifdef(DEBUG). --define(STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD, 2). --else. --define(STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD, 100). % ~ 25 MB worth of chunks. --endif. - -%% If a chunk spends longer than this in the store queue, write it on disk without waiting -%% for ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD chunks to stack up. --ifdef(DEBUG). --define(STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD, 1000). --else. --define(STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD, 2_000). % 2 seconds. --endif. - %% @doc The state of the server managing data synchronization. -record(sync_data_state, { %% The last entries of the block index. @@ -200,16 +184,6 @@ %% storage module to be searched for missing data before attempting to sync the data %% from the network. other_storage_modules_with_unsynced_intervals = [], - %% The priority queue of chunks sorted by offset. The motivation is to have chunks - %% stack up, per storage module, before writing them on disk so that we can write - %% them in the ascending order and reduce out-of-order disk writes causing fragmentation. - store_chunk_queue = gb_sets:new(), - %% The length of the store chunk queue. - store_chunk_queue_len = 0, - %% The threshold controlling the brief accumuluation of the chunks in the queue before - %% the actual disk dump, to reduce the chance of out-of-order write causing disk - %% fragmentation. - store_chunk_queue_threshold = ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD, %% Cache mapping peers to /data_sync_record responses all_peers_intervals = #{} }). diff --git a/apps/arweave/src/ar.erl b/apps/arweave/src/ar.erl index d9d30f5ce..4adc79b54 100644 --- a/apps/arweave/src/ar.erl +++ b/apps/arweave/src/ar.erl @@ -756,12 +756,19 @@ start(normal, _Args) -> ar_sup:start_link(). set_mining_address(#config{ mining_addr = not_set } = C) -> - W = ar_wallet:get_or_create_wallet([{?RSA_SIGN_ALG, 65537}]), - Addr = ar_wallet:to_address(W), - ar:console("~nSetting the mining address to ~s.~n", [ar_util:encode(Addr)]), - C2 = C#config{ mining_addr = Addr }, - application:set_env(arweave, config, C2), - set_mining_address(C2); + case ar_wallet:get_or_create_wallet([{?RSA_SIGN_ALG, 65537}]) of + {error, Reason} -> + ar:console("~nFailed to create a wallet, reason: ~p.~n", + [io_lib:format("~p", [Reason])]), + timer:sleep(500), + erlang:halt(); + W -> + Addr = ar_wallet:to_address(W), + ar:console("~nSetting the mining address to ~s.~n", [ar_util:encode(Addr)]), + C2 = C#config{ mining_addr = Addr }, + application:set_env(arweave, config, C2), + set_mining_address(C2) + end; set_mining_address(#config{ mine = false }) -> ok; set_mining_address(#config{ mining_addr = Addr, cm_exit_peer = CmExitPeer, @@ -790,10 +797,17 @@ create_wallet([DataDir]) -> create_wallet_fail(); true -> ok = application:set_env(arweave, config, #config{ data_dir = DataDir }), - W = ar_wallet:new_keyfile({?RSA_SIGN_ALG, 65537}), - Addr = ar_wallet:to_address(W), - ar:console("Created a wallet with address ~s.~n", [ar_util:encode(Addr)]), - erlang:halt() + case ar_wallet:new_keyfile({?RSA_SIGN_ALG, 65537}) of + {error, Reason} -> + ar:console("Failed to create a wallet, reason: ~p.~n~n", + [io_lib:format("~p", [Reason])]), + timer:sleep(500), + erlang:halt(); + W -> + Addr = ar_wallet:to_address(W), + ar:console("Created a wallet with address ~s.~n", [ar_util:encode(Addr)]), + erlang:halt() + end end; create_wallet(_) -> create_wallet_fail(). diff --git a/apps/arweave/src/ar_block_pre_validator.erl b/apps/arweave/src/ar_block_pre_validator.erl index 455bbc5d1..67bc37227 100644 --- a/apps/arweave/src/ar_block_pre_validator.erl +++ b/apps/arweave/src/ar_block_pre_validator.erl @@ -458,6 +458,13 @@ get_last_step_prev_output(B) -> end. validate_poa_against_cached_poa(B, CacheB) -> + case CacheB#block.poa_cache of + {_, _} -> + ok; + _ -> + ID = binary_to_list(ar_util:encode(crypto:strong_rand_bytes(2))), + file:write_file("/opt/arweave/bad-block-" ++ ID, term_to_binary(B)) + end, #block{ poa_cache = {ArgCache, ChunkID}, poa2_cache = Cache2 } = CacheB, Args = erlang:append_element(erlang:insert_element(5, ArgCache, B#block.poa), ChunkID), case ar_poa:validate(Args) of diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index 45a26e3fc..0dab357ec 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -291,6 +291,13 @@ handle_cast({register_packing_ref, Ref, Offset}, #state{ packing_map = Map } = S {noreply, State#state{ packing_map = maps:put(Ref, Offset, Map) }}; handle_cast({expire_repack_request, Ref}, #state{ packing_map = Map } = State) -> + case maps:is_key(Ref, Map) of + true -> + ?LOG_WARNING([{event, repack_request_expired}, + {type, repack_in_place}]); + false -> + ok + end, {noreply, State#state{ packing_map = maps:remove(Ref, Map) }}; handle_cast(Cast, State) -> @@ -358,6 +365,7 @@ handle_info({chunk, {packed, Ref, ChunkArgs}}, repack_cursor = Offset, prev_repack_cursor = PrevCursor }}; Error2 -> ?LOG_ERROR([{event, failed_to_store_repacked_chunk}, + {type, repack_in_place}, {storage_module, StoreID}, {offset, Offset}, {packing, ar_serialize:encode_packing(Packing, true)}, @@ -366,6 +374,7 @@ handle_info({chunk, {packed, Ref, ChunkArgs}}, end; Error3 -> ?LOG_ERROR([{event, failed_to_remove_repacked_chunk_from_sync_record}, + {type, repack_in_place}, {storage_module, StoreID}, {offset, Offset}, {packing, ar_serialize:encode_packing(Packing, true)}, @@ -806,16 +815,23 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> ok -> case catch get_range(Start, RepackIntervalSize, StoreID) of [] -> + ?LOG_WARNING([{event, failed_to_read_chunk_interval}, + {type, repack_in_place}, + {start, start}, + {size, RepackIntervalSize}, + {storage_module, StoreID}]), Start2 = Start + RepackIntervalSize, gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound, RequiredPacking}), continue; - {'EXIT', _Exc} -> + {'EXIT', Exc} -> ?LOG_ERROR([{event, failed_to_read_chunk_range}, + {type, repack_in_place}, + {error, io_lib:format("~p", [Exc])}, {storage_module, StoreID}, {start, Start}, {size, RepackIntervalSize}, - {store_id, StoreID}]), + {storage_module, StoreID}]), Start2 = Start + RepackIntervalSize, gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound, RequiredPacking}), @@ -835,6 +851,7 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> {ok, Map, MetadataMap}; {error, Error} -> ?LOG_ERROR([{event, failed_to_read_chunk_metadata_range}, + {type, repack_in_place}, {storage_module, StoreID}, {error, io_lib:format("~p", [Error])}, {left, Min}, @@ -856,6 +873,11 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> fun (AbsoluteOffset, {_, _TXRoot, _, _, _, ChunkSize}, ok) when ChunkSize /= ?DATA_CHUNK_SIZE, AbsoluteOffset =< ?STRICT_DATA_SPLIT_THRESHOLD -> + ?LOG_WARNING([{event, skipping_small_chunk}, + {type, repack_in_place}, + {chunk_size, ChunkSize}, + {offset, AbsoluteOffset}, + {storage_module, StoreID}]), ok; (AbsoluteOffset, {_, TXRoot, _, _, _, ChunkSize}, ok) -> PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), @@ -863,6 +885,7 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> {true, RequiredPacking} -> ?LOG_WARNING([{event, repacking_process_chunk_already_repacked}, + {type, repack_in_place}, {storage_module, StoreID}, {packing, ar_serialize:encode_packing(RequiredPacking,true)}, @@ -873,8 +896,9 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> not_found -> ?LOG_WARNING([{event, chunk_not_found_in_chunk_storage}, + {type, repack_in_place}, {storage_module, StoreID}, - {offset, PaddedOffset}]), + {padded_offset, PaddedOffset}]), ok; Chunk -> Ref = make_ref(), @@ -889,11 +913,13 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> end; true -> ?LOG_WARNING([{event, no_packing_information_for_the_chunk}, + {type, repack_in_place}, {storage_module, StoreID}, {offset, PaddedOffset}]), ok; false -> ?LOG_WARNING([{event, chunk_not_found_in_sync_record}, + {type, repack_in_place}, {storage_module, StoreID}, {offset, PaddedOffset}]), ok diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 9d3a0e5c4..4ec5155b4 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -8,11 +8,10 @@ get_chunk/2, get_chunk_proof/2, get_tx_data/1, get_tx_data/2, get_tx_offset/1, get_tx_offset_data_in_range/2, has_data_root/2, request_tx_data_removal/3, request_data_removal/4, record_disk_pool_chunks_count/0, - record_chunk_cache_size_metric/0, is_chunk_cache_full/0, is_disk_space_sufficient/1, - get_chunk_by_byte/2, get_chunk_seek_offset/1, read_chunk/4, read_data_path/2, - increment_chunk_cache_size/0, decrement_chunk_cache_size/0, - get_chunk_padded_offset/1, get_chunk_metadata_range/3, - get_merkle_rebase_threshold/0, should_store_in_chunk_storage/3]). + record_chunk_cache_size_metric/0, is_chunk_cache_full/1, is_disk_space_sufficient/1, + get_chunk_by_byte/2, read_chunk/4, decrement_chunk_cache_size/1, + increment_chunk_cache_size/1, get_chunk_padded_offset/1, get_chunk_metadata_range/3, + get_chunk_data_key/1]). -export([debug_get_disk_pool_chunks/0]). @@ -483,10 +482,10 @@ request_data_removal(Start, End, Ref, ReplyTo) -> %% @doc Return true if the in-memory data chunk cache is full. Return not_initialized %% if there is no information yet. -is_chunk_cache_full() -> +is_chunk_cache_full(StoreID) -> case ets:lookup(ar_data_sync_state, chunk_cache_size_limit) of [{_, Limit}] -> - case ets:lookup(ar_data_sync_state, chunk_cache_size) of + case ets:lookup(ar_data_sync_state, {chunk_cache_size, StoreID}) of [{_, Size}] when Size > Limit -> true; _ -> @@ -567,11 +566,13 @@ read_data_path(_Offset, ChunkDataDB, ChunkDataKey, _StoreID) -> Error end. -decrement_chunk_cache_size() -> - ets:update_counter(ar_data_sync_state, chunk_cache_size, {2, -1}, {chunk_cache_size, 0}). +decrement_chunk_cache_size(StoreID) -> + ets:update_counter(ar_data_sync_state, {chunk_cache_size, StoreID}, {2, -1}, + {{chunk_cache_size, StoreID}, 0}). -increment_chunk_cache_size() -> - ets:update_counter(ar_data_sync_state, chunk_cache_size, {2, 1}, {chunk_cache_size, 1}). +increment_chunk_cache_size(StoreID) -> + ets:update_counter(ar_data_sync_state, {chunk_cache_size, StoreID}, {2, 1}, + {{chunk_cache_size, StoreID}, 1}). %% @doc Return Offset if it is smaller than or equal to ?STRICT_DATA_SPLIT_THRESHOLD. %% Otherwise, return the offset of the last byte of the chunk + the size of the padding. @@ -673,17 +674,17 @@ init({"default" = StoreID, _}) -> undefined -> Free = proplists:get_value(free_memory, memsup:get_system_memory_data(), 2000000000), - Limit2 = min(1000, erlang:ceil(Free * 0.9 / 3 / 262144)), + Limit2 = min(10000, erlang:ceil(Free * 0.9 / 3 / 262144)), Limit3 = ar_util:ceil_int(Limit2, 100), Limit3; Limit2 -> Limit2 end, ar:console("~nSetting the data chunk cache size limit to ~B chunks.~n", [Limit]), - ets:insert(ar_data_sync_state, {chunk_cache_size_limit, Limit}), - ets:insert(ar_data_sync_state, {chunk_cache_size, 0}), + StoreCount = length(Config#config.storage_modules) + 1, + ets:insert(ar_data_sync_state, {chunk_cache_size_limit, Limit div StoreCount}), + ets:insert(ar_data_sync_state, {{chunk_cache_size, "default"}, 0}), timer:apply_interval(200, ?MODULE, record_chunk_cache_size_metric, []), - gen_server:cast(self(), process_store_chunk_queue), {ok, State2}; init({StoreID, RepackInPlacePacking}) -> ?LOG_INFO([{event, ar_data_sync_start}, {store_id, StoreID}]), @@ -691,9 +692,9 @@ init({StoreID, RepackInPlacePacking}) -> process_flag(trap_exit, true), [ok, ok] = ar_events:subscribe([node_state, disksup]), State = init_kv(StoreID), + ets:insert(ar_data_sync_state, {{chunk_cache_size, StoreID}, 0}), case RepackInPlacePacking of none -> - gen_server:cast(self(), process_store_chunk_queue), {RangeStart, RangeEnd} = ar_storage_module:get_range(StoreID), State2 = State#sync_data_state{ store_id = StoreID, @@ -709,10 +710,6 @@ handle_cast({move_data_root_index, Cursor, N}, State) -> move_data_root_index(Cursor, N, State), {noreply, State}; -handle_cast(process_store_chunk_queue, State) -> - ar_util:cast_after(200, self(), process_store_chunk_queue), - {noreply, process_store_chunk_queue(State)}; - handle_cast({join, RecentBI}, State) -> #sync_data_state{ block_index = CurrentBI, store_id = StoreID } = State, [{_, WeaveSize, _} | _] = RecentBI, @@ -843,13 +840,15 @@ handle_cast(sync_data2, #sync_data_state{ store_id = OriginStoreID, unsynced_intervals_from_other_storage_modules = [{StoreID, {Start, End}} | Intervals] } = State) -> - State2 = case ar_data_sync_worker_master:read_range(Start, End, StoreID, OriginStoreID, false) of - true -> - State#sync_data_state{ - unsynced_intervals_from_other_storage_modules = Intervals }; - false -> - State - end, + State2 = + case ar_data_sync_worker_master:read_range(Start, End, + StoreID, OriginStoreID, false) of + true -> + State#sync_data_state{ + unsynced_intervals_from_other_storage_modules = Intervals }; + false -> + State + end, ar_util:cast_after(50, self(), sync_data2), {noreply, State2}; @@ -1044,7 +1043,7 @@ handle_cast(sync_intervals, State) -> false -> true; true -> - case is_chunk_cache_full() of + case is_chunk_cache_full(StoreID) of true -> ar_util:cast_after(1000, self(), sync_intervals), true; @@ -1057,7 +1056,7 @@ handle_cast(sync_intervals, State) -> true -> true; false -> - case ar_data_sync_worker_master:ready_for_work() of + case ar_data_sync_worker_master:ready_for_work(StoreID) of false -> ar_util:cast_after(200, self(), sync_intervals), true; @@ -1079,7 +1078,7 @@ handle_cast(sync_intervals, State) -> end; handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) -> - #sync_data_state{ packing_map = PackingMap } = State, + #sync_data_state{ packing_map = PackingMap, store_id = StoreID } = State, #{ data_path := DataPath, tx_path := TXPath, chunk := Chunk, packing := Packing } = Proof, SeekByte = get_chunk_seek_offset(Byte + 1) - 1, {BlockStartOffset, BlockEndOffset, TXRoot} = ar_block_index:get_block_bounds(SeekByte), @@ -1096,7 +1095,7 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) -> Chunk, ChunkID, ChunkEndOffset, Peer, Byte}, case maps:is_key({AbsoluteOffset, unpacked}, PackingMap) of true -> - decrement_chunk_cache_size(), + decrement_chunk_cache_size(StoreID), {noreply, State}; false -> case ar_packing_server:is_buffer_full() of @@ -1120,7 +1119,7 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) -> end end; false -> - decrement_chunk_cache_size(), + decrement_chunk_cache_size(StoreID), process_invalid_fetched_chunk(Peer, Byte, State); {true, DataRoot, TXStartOffset, ChunkEndOffset, TXSize, ChunkSize, ChunkID} -> AbsoluteTXStartOffset = BlockStartOffset + TXStartOffset, @@ -1284,26 +1283,11 @@ handle_cast({remove_range, End, Cursor, Ref, PID}, State) -> {noreply, State} end; -handle_cast({expire_repack_chunk_request, Key}, State) -> - #sync_data_state{ packing_map = PackingMap } = State, - case maps:get(Key, PackingMap, not_found) of - {pack_chunk, {_, DataPath, Offset, DataRoot, _, _, _}} -> - decrement_chunk_cache_size(), - DataPathHash = crypto:hash(sha256, DataPath), - ?LOG_DEBUG([{event, expired_repack_chunk_request}, - {data_path_hash, ar_util:encode(DataPathHash)}, - {data_root, ar_util:encode(DataRoot)}, - {relative_offset, Offset}]), - {noreply, State#sync_data_state{ packing_map = maps:remove(Key, PackingMap) }}; - _ -> - {noreply, State} - end; - handle_cast({expire_unpack_fetched_chunk_request, Key}, State) -> - #sync_data_state{ packing_map = PackingMap } = State, + #sync_data_state{ packing_map = PackingMap, store_id = StoreID } = State, case maps:get(Key, PackingMap, not_found) of {unpack_fetched_chunk, _Args} -> - decrement_chunk_cache_size(), + decrement_chunk_cache_size(StoreID), {noreply, State#sync_data_state{ packing_map = maps:remove(Key, PackingMap) }}; _ -> {noreply, State} @@ -1381,18 +1365,6 @@ handle_info({chunk, {unpacked, Offset, ChunkArgs}}, State) -> {noreply, State} end; -handle_info({chunk, {packed, Offset, ChunkArgs}}, State) -> - #sync_data_state{ packing_map = PackingMap } = State, - Packing = element(1, ChunkArgs), - Key = {Offset, Packing}, - case maps:get(Key, PackingMap, not_found) of - {pack_chunk, Args} when element(1, Args) == Packing -> - State2 = State#sync_data_state{ packing_map = maps:remove(Key, PackingMap) }, - {noreply, store_chunk(ChunkArgs, Args, State2)}; - _ -> - {noreply, State} - end; - handle_info({chunk, _}, State) -> {noreply, State}; @@ -1483,11 +1455,13 @@ handle_info({'DOWN', _, process, _, Reason}, #sync_data_state{ store_id = Stor {noreply, State}; handle_info(Message, #sync_data_state{ store_id = StoreID } = State) -> - ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {store_id, StoreID}, {message, Message}]), + ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, + {store_id, StoreID}, {message, io_lib:format("~p", [Message])}]), {noreply, State}. terminate(Reason, #sync_data_state{ store_id = StoreID } = State) -> - ?LOG_INFO([{event, terminate}, {module, ?MODULE}, {store_id, StoreID}, {reason, io_lib:format("~p", [Reason])}]), + ?LOG_INFO([{event, terminate}, {module, ?MODULE}, + {store_id, StoreID}, {reason, io_lib:format("~p", [Reason])}]), store_sync_state(State). %%%=================================================================== @@ -2724,92 +2698,6 @@ get_chunk_data_key(DataPathHash) -> Timestamp = os:system_time(microsecond), << Timestamp:256, DataPathHash/binary >>. -write_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) -> - case ar_tx_blacklist:is_byte_blacklisted(Offset) of - true -> - ok; - false -> - write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, - Packing, State) - end. - -write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, - State) -> - #sync_data_state{ chunk_data_db = ChunkDataDB, store_id = StoreID } = State, - ShouldStoreInChunkStorage = should_store_in_chunk_storage(Offset, ChunkSize, Packing), - Result = - case ShouldStoreInChunkStorage of - true -> - PaddedOffset = get_chunk_padded_offset(Offset), - ar_chunk_storage:put(PaddedOffset, Chunk, StoreID); - false -> - ok - end, - case Result of - ok -> - case ShouldStoreInChunkStorage of - false -> - ar_kv:put(ChunkDataDB, ChunkDataKey, term_to_binary({Chunk, DataPath})); - true -> - ar_kv:put(ChunkDataDB, ChunkDataKey, term_to_binary(DataPath)) - end; - _ -> - Result - end. - -%% @doc 256 KiB chunks are stored in the blob storage optimized for read speed. -%% Return true if we want to place the chunk there. -should_store_in_chunk_storage(Offset, ChunkSize, Packing) -> - case Offset > ?STRICT_DATA_SPLIT_THRESHOLD of - true -> - %% All chunks above ?STRICT_DATA_SPLIT_THRESHOLD are placed in 256 KiB buckets - %% so technically can be stored in ar_chunk_storage. However, to avoid - %% managing padding in ar_chunk_storage for unpacked chunks smaller than 256 KiB - %% (we do not need fast random access to unpacked chunks after - %% ?STRICT_DATA_SPLIT_THRESHOLD anyways), we put them to RocksDB. - Packing /= unpacked orelse ChunkSize == (?DATA_CHUNK_SIZE); - false -> - ChunkSize == (?DATA_CHUNK_SIZE) - end. - -update_chunks_index(Args, State) -> - AbsoluteChunkOffset = element(1, Args), - case ar_tx_blacklist:is_byte_blacklisted(AbsoluteChunkOffset) of - true -> - ok; - false -> - update_chunks_index2(Args, State) - end. - -update_chunks_index2(Args, State) -> - {AbsoluteOffset, Offset, ChunkDataKey, TXRoot, DataRoot, TXPath, ChunkSize, - Packing} = Args, - #sync_data_state{ chunks_index = ChunksIndex, store_id = StoreID } = State, - Key = << AbsoluteOffset:?OFFSET_KEY_BITSIZE >>, - Value = {ChunkDataKey, TXRoot, DataRoot, TXPath, Offset, ChunkSize}, - case ar_kv:put(ChunksIndex, Key, term_to_binary(Value)) of - ok -> - StartOffset = get_chunk_padded_offset(AbsoluteOffset - ChunkSize), - PaddedOffset = get_chunk_padded_offset(AbsoluteOffset), - case ar_sync_record:add(PaddedOffset, StartOffset, Packing, ?MODULE, StoreID) of - ok -> - ok; - {error, Reason} -> - ?LOG_ERROR([{event, failed_to_update_sync_record}, {reason, Reason}, - {chunk, ar_util:encode(ChunkDataKey)}, - {absolute_end_offset, AbsoluteOffset}, - {data_root, ar_util:encode(DataRoot)}, - {store_id, StoreID}]), - {error, Reason} - end; - {error, Reason} -> - ?LOG_ERROR([{event, failed_to_update_chunk_index}, {reason, Reason}, - {chunk_data_key, ar_util:encode(ChunkDataKey)}, - {data_root, ar_util:encode(DataRoot)}, - {absolute_end_offset, AbsoluteOffset}, {store_id, StoreID}]), - {error, Reason} - end. - pick_missing_blocks([{H, WeaveSize, _} | CurrentBI], BlockTXPairs) -> {After, Before} = lists:splitwith(fun({BH, _}) -> BH /= H end, BlockTXPairs), case Before of @@ -2836,14 +2724,14 @@ process_valid_fetched_chunk(ChunkArgs, Args, State) -> false -> ?LOG_WARNING([{event, got_too_big_proof_from_peer}, {peer, ar_util:format_peer(Peer)}]), - decrement_chunk_cache_size(), + decrement_chunk_cache_size(StoreID), {noreply, State}; true -> #sync_data_state{ store_id = StoreID } = State, case ar_sync_record:is_recorded(Byte + 1, ?MODULE, StoreID) of {true, _} -> %% The chunk has been synced by another job already. - decrement_chunk_cache_size(), + decrement_chunk_cache_size(StoreID), {noreply, State}; false -> true = AbsoluteEndOffset == AbsoluteTXStartOffset + ChunkEndOffset, @@ -2854,15 +2742,16 @@ process_valid_fetched_chunk(ChunkArgs, Args, State) -> end. pack_and_store_chunk({_, AbsoluteOffset, _, _, _, _, _, _, _, _, _, _}, - #sync_data_state{ disk_pool_threshold = DiskPoolThreshold } = State) + #sync_data_state{ disk_pool_threshold = DiskPoolThreshold, + store_id = StoreID } = State) when AbsoluteOffset > DiskPoolThreshold -> %% We do not put data into storage modules unless it is well confirmed. - decrement_chunk_cache_size(), + decrement_chunk_cache_size(StoreID), {noreply, State}; pack_and_store_chunk(Args, State) -> {DataRoot, AbsoluteOffset, TXPath, TXRoot, DataPath, Packing, Offset, ChunkSize, Chunk, UnpackedChunk, OriginStoreID, OriginChunkDataKey} = Args, - #sync_data_state{ packing_map = PackingMap } = State, + #sync_data_state{ store_id = StoreID } = State, RequiredPacking = get_required_chunk_packing(AbsoluteOffset, ChunkSize, State), PackingStatus = case {RequiredPacking, Packing} of @@ -2874,150 +2763,43 @@ pack_and_store_chunk(Args, State) -> case PackingStatus of {ready, {StoredPacking, StoredChunk}} -> ChunkArgs = {StoredPacking, StoredChunk, AbsoluteOffset, TXRoot, ChunkSize}, - {noreply, store_chunk(ChunkArgs, {StoredPacking, DataPath, Offset, DataRoot, - TXPath, OriginStoreID, OriginChunkDataKey}, State)}; + DataSyncStorage = ar_data_sync_storage:name(StoreID), + ExtraArgs = {StoredPacking, DataPath, Offset, DataRoot, TXPath, + OriginStoreID, OriginChunkDataKey}, + gen_server:cast(DataSyncStorage, {store_chunk, ChunkArgs, ExtraArgs}), + {noreply, State}; {need_packing, RequiredPacking} -> - case maps:is_key({AbsoluteOffset, RequiredPacking}, PackingMap) of + case ar_packing_server:is_buffer_full() of true -> - decrement_chunk_cache_size(), + ar_util:cast_after(1000, self(), {pack_and_store_chunk, Args}), {noreply, State}; false -> - case ar_packing_server:is_buffer_full() of - true -> - ar_util:cast_after(1000, self(), {pack_and_store_chunk, Args}), - {noreply, State}; - false -> - {Packing2, Chunk2} = - case UnpackedChunk of - none -> - {Packing, Chunk}; - _ -> - {unpacked, UnpackedChunk} - end, - ar_packing_server:request_repack(AbsoluteOffset, - {RequiredPacking, Packing2, Chunk2, AbsoluteOffset, - TXRoot, ChunkSize}), - ar_util:cast_after(600000, self(), - {expire_repack_chunk_request, - {AbsoluteOffset, RequiredPacking}}), - PackingArgs = {pack_chunk, {RequiredPacking, DataPath, - Offset, DataRoot, TXPath, OriginStoreID, - OriginChunkDataKey}}, - {noreply, State#sync_data_state{ - packing_map = PackingMap#{ - {AbsoluteOffset, RequiredPacking} => PackingArgs }}} - end - end - end. - -process_store_chunk_queue(#sync_data_state{ store_chunk_queue_len = StartLen } = State) -> - process_store_chunk_queue(State, StartLen). - -process_store_chunk_queue(#sync_data_state{ store_chunk_queue_len = 0 } = State, StartLen) -> - log_stored_chunks(State, StartLen), - State; -process_store_chunk_queue(State, StartLen) -> - #sync_data_state{ store_chunk_queue = Q, store_chunk_queue_len = Len, - store_chunk_queue_threshold = Threshold } = State, - Timestamp = element(2, gb_sets:smallest(Q)), - Now = os:system_time(millisecond), - Threshold2 = - case Threshold < ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD of - true -> - Threshold; - false -> - case Len > Threshold of - true -> - 0; - false -> - Threshold - end - end, - case Len > Threshold2 - orelse Now - Timestamp > ?STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD of - true -> - {{_Offset, _Timestamp, _Ref, ChunkArgs, Args}, Q2} = gb_sets:take_smallest(Q), - store_chunk2(ChunkArgs, Args, State), - decrement_chunk_cache_size(), - State2 = State#sync_data_state{ store_chunk_queue = Q2, - store_chunk_queue_len = Len - 1, - store_chunk_queue_threshold = min(Threshold2 + 1, - ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD) }, - process_store_chunk_queue(State2); - false -> - log_stored_chunks(State, StartLen), - State - end. - -store_chunk(ChunkArgs, Args, State) -> - %% Let at least N chunks stack up, then write them in the ascending order, - %% to reduce out-of-order disk writes causing fragmentation. - #sync_data_state{ store_chunk_queue = Q, store_chunk_queue_len = Len } = State, - Now = os:system_time(millisecond), - Offset = element(3, ChunkArgs), - Q2 = gb_sets:add_element({Offset, Now, make_ref(), ChunkArgs, Args}, Q), - State2 = State#sync_data_state{ store_chunk_queue = Q2, store_chunk_queue_len = Len + 1 }, - process_store_chunk_queue(State2). - -store_chunk2(ChunkArgs, Args, State) -> - #sync_data_state{ store_id = StoreID } = State, - {Packing, Chunk, AbsoluteOffset, TXRoot, ChunkSize} = ChunkArgs, - {_Packing, DataPath, Offset, DataRoot, TXPath, OriginStoreID, OriginChunkDataKey} = Args, - PaddedOffset = get_chunk_padded_offset(AbsoluteOffset), - StartOffset = get_chunk_padded_offset(AbsoluteOffset - ChunkSize), - DataPathHash = crypto:hash(sha256, DataPath), - case ar_sync_record:delete(PaddedOffset, StartOffset, ?MODULE, StoreID) of - {error, Reason} -> - log_failed_to_store_chunk(Reason, AbsoluteOffset, Offset, DataRoot, DataPathHash, - StoreID), - {error, Reason}; - ok -> - DataPathHash = crypto:hash(sha256, DataPath), - ChunkDataKey = - case StoreID == OriginStoreID of - true -> - OriginChunkDataKey; - _ -> - get_chunk_data_key(DataPathHash) - end, - case write_chunk(AbsoluteOffset, ChunkDataKey, Chunk, ChunkSize, DataPath, - Packing, State) of - ok -> - case update_chunks_index({AbsoluteOffset, Offset, ChunkDataKey, TXRoot, - DataRoot, TXPath, ChunkSize, Packing}, State) of - ok -> - ok; - {error, Reason} -> - log_failed_to_store_chunk(Reason, AbsoluteOffset, Offset, DataRoot, - DataPathHash, StoreID), - {error, Reason} - end; - {error, Reason} -> - log_failed_to_store_chunk(Reason, AbsoluteOffset, Offset, DataRoot, - DataPathHash, StoreID), - {error, Reason} + {Packing2, Chunk2} = + case UnpackedChunk of + none -> + {Packing, Chunk}; + _ -> + {unpacked, UnpackedChunk} + end, + DataSyncStorage = ar_data_sync_storage:name(StoreID), + PromiseKey = {AbsoluteOffset, RequiredPacking}, + PromiseArgs = {pack_chunk, {RequiredPacking, DataPath, + Offset, DataRoot, TXPath, OriginStoreID, + OriginChunkDataKey}}, + gen_server:cast(DataSyncStorage, + {packed_chunk_promise, PromiseKey, PromiseArgs}), + RepackArgs = {RequiredPacking, Packing2, Chunk2, AbsoluteOffset, + TXRoot, ChunkSize}, + ar_packing_server:request_repack( + AbsoluteOffset, + whereis(DataSyncStorage), + RepackArgs), + ar_util:cast_after(600000, DataSyncStorage, + {expire_repack_chunk_request, PromiseKey}), + {noreply, State} end end. -log_stored_chunks(State, StartLen) -> - #sync_data_state{ store_chunk_queue_len = EndLen, store_id = StoreID } = State, - StoredCount = StartLen - EndLen, - case StoredCount > 0 of - true -> - ?LOG_DEBUG([{event, stored_chunks}, {count, StoredCount}, {store_id, StoreID}]); - false -> - ok - end. - -log_failed_to_store_chunk(Reason, AbsoluteOffset, Offset, DataRoot, DataPathHash, StoreID) -> - ?LOG_ERROR([{event, failed_to_store_chunk}, - {reason, io_lib:format("~p", [Reason])}, - {absolute_end_offset, AbsoluteOffset, Offset}, - {relative_offset, Offset}, - {data_path_hash, ar_util:encode(DataPathHash)}, - {data_root, ar_util:encode(DataRoot)}, - {store_id, StoreID}]). - get_required_chunk_packing(Offset, ChunkSize, #sync_data_state{ store_id = StoreID }) -> case Offset =< ?STRICT_DATA_SPLIT_THRESHOLD andalso ChunkSize < ?DATA_CHUNK_SIZE of true -> @@ -3240,7 +3022,7 @@ process_disk_pool_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset, MayConc process_disk_pool_immature_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset, Args, State) -> - #sync_data_state{ store_id = StoreID } = State, + #sync_data_state{ store_id = StoreID, chunks_index = ChunksIndexDB } = State, case ar_sync_record:is_recorded(AbsoluteOffset, ?MODULE, StoreID) of {true, unpacked} -> %% Pass MayConclude as false because we have encountered an offset @@ -3251,8 +3033,9 @@ process_disk_pool_immature_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset {noreply, State}; false -> {Offset, _, ChunkSize, DataRoot, DataPathHash, ChunkDataKey, Key, _, _, _} = Args, - case update_chunks_index({AbsoluteOffset, Offset, ChunkDataKey, - TXRoot, DataRoot, TXPath, ChunkSize, unpacked}, State) of + case ar_data_sync_storage:update_chunks_index({AbsoluteOffset, Offset, + ChunkDataKey, TXRoot, DataRoot, TXPath, ChunkSize, unpacked, + StoreID, ChunksIndexDB}) of ok -> ?LOG_DEBUG([{event, indexed_disk_pool_chunk}, {data_path_hash, ar_util:encode(DataPathHash)}, @@ -3344,7 +3127,7 @@ process_disk_pool_matured_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset, {noreply, State5} -> {noreply, State5}; StoreID5 -> - case is_chunk_cache_full() of + case is_chunk_cache_full(StoreID5) of true -> gen_server:cast(self(), {process_disk_pool_chunk_offsets, Iterator, false, Args}), @@ -3379,7 +3162,7 @@ process_disk_pool_matured_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset, gen_server:cast(self(), process_disk_pool_item), {noreply, deregister_currently_processed_disk_pool_key(Key, State)}; {ok, {Chunk, DataPath}} -> - increment_chunk_cache_size(), + increment_chunk_cache_size(StoreID6), Args2 = {DataRoot, AbsoluteOffset, TXPath, TXRoot, DataPath, unpacked, Offset, ChunkSize, Chunk, Chunk, none, none}, gen_server:cast(name(StoreID6), {pack_and_store_chunk, Args2}), @@ -3458,6 +3241,7 @@ cache_recently_processed_offset(Offset, ChunkDataKey, State) -> State#sync_data_state{ recently_processed_disk_pool_offsets = Map2 }. process_unpacked_chunk(ChunkArgs, Args, State) -> + #sync_data_state{ store_id = StoreID } = State, {_AbsoluteTXStartOffset, _TXSize, _DataPath, _TXPath, _DataRoot, _Chunk, ChunkID, _ChunkEndOffset, Peer, Byte} = Args, {_Packing, Chunk, AbsoluteEndOffset, _TXRoot, ChunkSize} = ChunkArgs, @@ -3465,7 +3249,7 @@ process_unpacked_chunk(ChunkArgs, Args, State) -> false -> ?LOG_DEBUG([{event, invalid_unpacked_fetched_chunk}, {absolute_end_offset, AbsoluteEndOffset}]), - decrement_chunk_cache_size(), + decrement_chunk_cache_size(StoreID), process_invalid_fetched_chunk(Peer, Byte, State); true -> process_valid_fetched_chunk(ChunkArgs, Args, State) @@ -3547,9 +3331,16 @@ data_root_index_next({Index, Count}, _Limit) -> end. record_chunk_cache_size_metric() -> - case ets:lookup(ar_data_sync_state, chunk_cache_size) of - [{_, Size}] -> - prometheus_gauge:set(chunk_cache_size, Size); - _ -> - ok - end. + {ok, Config} = application:get_env(arweave, config), + Size = lists:foldl( + fun(StoreID, Acc) -> + case ets:lookup(ar_data_sync_state, {chunk_cache_size, StoreID}) of + [{_, V}] -> + Acc + V; + _ -> + Acc + end + end, + 0, + ["default" | [ar_storage_module:id(M) || M <- Config#config.storage_modules]]), + prometheus_gauge:set(chunk_cache_size, Size). diff --git a/apps/arweave/src/ar_data_sync_storage.erl b/apps/arweave/src/ar_data_sync_storage.erl new file mode 100644 index 000000000..d83b68a60 --- /dev/null +++ b/apps/arweave/src/ar_data_sync_storage.erl @@ -0,0 +1,361 @@ +-module(ar_data_sync_storage). + +-behaviour(gen_server). + +-export([start_link/2, name/1, update_chunks_index/1]). + +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). + +-include_lib("arweave/include/ar.hrl"). +-include_lib("arweave/include/ar_consensus.hrl"). +-include_lib("arweave/include/ar_config.hrl"). +-include_lib("arweave/include/ar_data_sync.hrl"). + +%% Let at least this many chunks stack up, per storage module, then write them +%% on disk in the ascending order, to reduce out-of-order disk writes causing +%% fragmentation. +-ifdef(DEBUG). +-define(STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD, 2). +-else. +-define(STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD, 100). % ~ 25 MB worth of chunks. +-endif. + +%% If a chunk spends longer than this in the store queue, write it on disk +%% without waiting for ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD chunks +%% to stack up. +-ifdef(DEBUG). +-define(STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD, 1000). +-else. +-define(STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD, 2_000). % 2 seconds. +-endif. + +-record(state, { + store_id, + chunks_index_db, + chunk_data_db, + packing_map = #{}, + %% The priority queue of chunks sorted by offset. The motivation is to have + %% chunks stack up, per storage module, before writing them on disk so that + %% we can write them in the ascending order and reduce out-of-order disk + %% writes causing fragmentation. + store_chunk_queue = gb_sets:new(), + %% The length of the store chunk queue. + store_chunk_queue_len = 0, + %% The threshold controlling the brief accumuluation of the chunks in the + %% queue before the actual disk dump, to reduce the chance of out-of-order + %% write causing disk fragmentation. + store_chunk_queue_threshold = ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD +}). + +%%%=================================================================== +%%% Public interface. +%%%=================================================================== + +name(StoreID) -> + list_to_atom("ar_data_sync_writer_" + ++ ar_storage_module:label_by_id(StoreID)). + +start_link(Name, StoreID) -> + gen_server:start_link({local, Name}, ?MODULE, StoreID, []). + +%%%=================================================================== +%%% Generic server callbacks. +%%%=================================================================== + +init(StoreID) -> + ?LOG_INFO([{event, ar_data_sync_storage_start}, {store_id, StoreID}]), + %% Trap exit to avoid corrupting any open files on quit.. + process_flag(trap_exit, true), + gen_server:cast(self(), process_store_chunk_queue), + {ok, #state{ + store_id = StoreID, + %% Initialized in ar_data_sync:init_kv. + chunks_index_db = {chunks_index, StoreID}, + chunk_data_db = {chunk_data_db, StoreID} + }}. + +handle_cast({expire_repack_chunk_request, Key}, State) -> + #state{ store_id = StoreID, packing_map = PackingMap } = State, + case maps:get(Key, PackingMap, not_found) of + {pack_chunk, {_, DataPath, Offset, DataRoot, _, _, _}} -> + ar_data_sync:decrement_chunk_cache_size(StoreID), + DataPathHash = crypto:hash(sha256, DataPath), + ?LOG_DEBUG([{event, expired_repack_chunk_request}, + {data_path_hash, ar_util:encode(DataPathHash)}, + {data_root, ar_util:encode(DataRoot)}, + {relative_offset, Offset}]), + {noreply, State#state{ + packing_map = maps:remove(Key, PackingMap) }}; + _ -> + {noreply, State} + end; + +handle_cast({packed_chunk_promise, Key, Args}, State) -> + #state{ packing_map = Map, store_id = StoreID } = State, + case maps:is_key(Key, Map) of + true -> + %% Free up the cache for the duplicate. + ar_data_sync:decrement_chunk_cache_size(StoreID), + {noreply, State}; + false -> + Map2 = maps:put(Key, Args, Map), + {noreply, State#state{ packing_map = Map2 }} + end; + +handle_cast(process_store_chunk_queue, State) -> + ar_util:cast_after(200, self(), process_store_chunk_queue), + {noreply, process_store_chunk_queue(State)}; + +handle_cast({store_chunk, ChunkArgs, Args}, State) -> + {noreply, store_chunk(ChunkArgs, Args, State)}; + +handle_cast(Cast, State) -> + ?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]), + {noreply, State}. + +handle_call(Request, _From, State) -> + ?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {request, Request}]), + {reply, ok, State}. + +handle_info({chunk, {packed, Offset, ChunkArgs}}, State) -> + #state{ packing_map = PackingMap } = State, + Packing = element(1, ChunkArgs), + Key = {Offset, Packing}, + case maps:get(Key, PackingMap, not_found) of + {pack_chunk, Args} when element(1, Args) == Packing -> + State2 = State#state{ packing_map = maps:remove(Key, PackingMap) }, + {noreply, store_chunk(ChunkArgs, Args, State2)}; + _ -> + %% Should be a duplicate - no need to do anything as we + %% clean up the cache earlier in the packed_chunk_promise handler. + {noreply, State} + end; + +handle_info({chunk, _}, State) -> + {noreply, State}; + +handle_info(Message, #state{ store_id = StoreID } = State) -> + ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, + {store_id, StoreID}, {message, Message}]), + {noreply, State}. + +terminate(Reason, #state{ store_id = StoreID }) -> + ?LOG_INFO([{event, terminate}, {module, ?MODULE}, + {store_id, StoreID}, {reason, io_lib:format("~p", [Reason])}]). + +%%%=================================================================== +%%% Private functions. +%%%=================================================================== + +process_store_chunk_queue(#state{ store_chunk_queue_len = 0 } = State, StartLen) -> + log_stored_chunks(State, StartLen), + State; +process_store_chunk_queue(State, StartLen) -> + #state{ store_chunk_queue = Q, store_chunk_queue_len = Len, + store_chunk_queue_threshold = Threshold, + store_id = StoreID } = State, + Timestamp = element(2, gb_sets:smallest(Q)), + Now = os:system_time(millisecond), + Threshold2 = + case Threshold < ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD of + true -> + Threshold; + false -> + case Len > Threshold of + true -> + 0; + false -> + Threshold + end + end, + case Len > Threshold2 + orelse Now - Timestamp > ?STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD of + true -> + {{_Offset, _Timestamp, _Ref, ChunkArgs, Args}, Q2} = gb_sets:take_smallest(Q), + store_chunk2(ChunkArgs, Args, State), + ar_data_sync:decrement_chunk_cache_size(StoreID), + State2 = State#state{ store_chunk_queue = Q2, + store_chunk_queue_len = Len - 1, + store_chunk_queue_threshold = min(Threshold2 + 1, + ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD) }, + process_store_chunk_queue(State2); + false -> + log_stored_chunks(State, StartLen), + State + end. + +log_stored_chunks(State, StartLen) -> + #state{ store_chunk_queue_len = EndLen, store_id = StoreID } = State, + StoredCount = StartLen - EndLen, + case StoredCount > 0 of + true -> + ?LOG_DEBUG([{event, stored_chunks}, {count, StoredCount}, + {store_id, StoreID}]); + false -> + ok + end. + +process_store_chunk_queue(#state{ store_chunk_queue_len = StartLen } = State) -> + process_store_chunk_queue(State, StartLen). + +store_chunk(ChunkArgs, Args, State) -> + %% Let at least N chunks stack up, then write them in the ascending order, + %% to reduce out-of-order disk writes causing fragmentation. + #state{ store_chunk_queue = Q, store_chunk_queue_len = Len } = State, + Now = os:system_time(millisecond), + Offset = element(3, ChunkArgs), + Q2 = gb_sets:add_element({Offset, Now, make_ref(), ChunkArgs, Args}, Q), + State2 = State#state{ store_chunk_queue = Q2, + store_chunk_queue_len = Len + 1 }, + process_store_chunk_queue(State2). + +store_chunk2(ChunkArgs, Args, State) -> + #state{ store_id = StoreID, chunks_index_db = ChunksIndexDB } = State, + {Packing, Chunk, AbsoluteOffset, TXRoot, ChunkSize} = ChunkArgs, + {_Packing, DataPath, Offset, DataRoot, TXPath, + OriginStoreID, OriginChunkDataKey} = Args, + PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), + UnpaddedStartOffset = AbsoluteOffset - ChunkSize, + StartOffset = ar_data_sync:get_chunk_padded_offset(UnpaddedStartOffset), + DataPathHash = crypto:hash(sha256, DataPath), + case ar_sync_record:delete(PaddedOffset, StartOffset, + ar_data_sync, StoreID) of + {error, Reason} -> + log_failed_to_store_chunk(Reason, AbsoluteOffset, Offset, + DataRoot, DataPathHash, StoreID), + {error, Reason}; + ok -> + DataPathHash = crypto:hash(sha256, DataPath), + ChunkDataKey = + case StoreID == OriginStoreID of + true -> + OriginChunkDataKey; + _ -> + ar_data_sync:get_chunk_data_key(DataPathHash) + end, + case write_chunk(AbsoluteOffset, ChunkDataKey, Chunk, + ChunkSize, DataPath, Packing, State) of + ok -> + case update_chunks_index({AbsoluteOffset, Offset, + ChunkDataKey, TXRoot, DataRoot, TXPath, ChunkSize, + Packing, StoreID, ChunksIndexDB}) of + ok -> + ok; + {error, Reason} -> + log_failed_to_store_chunk(Reason, AbsoluteOffset, + Offset, DataRoot, DataPathHash, StoreID), + {error, Reason} + end; + {error, Reason} -> + log_failed_to_store_chunk(Reason, AbsoluteOffset, + Offset, DataRoot, DataPathHash, StoreID), + {error, Reason} + end + end. + +log_failed_to_store_chunk(Reason, AbsoluteOffset, Offset, + DataRoot, DataPathHash, StoreID) -> + ?LOG_ERROR([{event, failed_to_store_chunk}, + {reason, io_lib:format("~p", [Reason])}, + {absolute_end_offset, AbsoluteOffset, Offset}, + {relative_offset, Offset}, + {data_path_hash, ar_util:encode(DataPathHash)}, + {data_root, ar_util:encode(DataRoot)}, + {store_id, StoreID}]). + +write_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) -> + case ar_tx_blacklist:is_byte_blacklisted(Offset) of + true -> + ok; + false -> + Args = {Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, + State}, + write_not_blacklisted_chunk(Args) + end. + +write_not_blacklisted_chunk(Args) -> + {Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State} = Args, + #state{ chunk_data_db = ChunkDataDB, store_id = StoreID } = State, + ShouldStoreInChunkStorage + = should_store_in_chunk_storage(Offset, ChunkSize, Packing), + Result = + case ShouldStoreInChunkStorage of + true -> + PaddedOffset = ar_data_sync:get_chunk_padded_offset(Offset), + ar_chunk_storage:put(PaddedOffset, Chunk, StoreID); + false -> + ok + end, + case Result of + ok -> + case ShouldStoreInChunkStorage of + false -> + ar_kv:put(ChunkDataDB, ChunkDataKey, + term_to_binary({Chunk, DataPath})); + true -> + ar_kv:put(ChunkDataDB, ChunkDataKey, + term_to_binary(DataPath)) + end; + _ -> + Result + end. + +%% @doc 256 KiB chunks are stored in the blob storage optimized for read speed. +%% Return true if we want to place the chunk there. +should_store_in_chunk_storage(Offset, ChunkSize, Packing) -> + case Offset > ?STRICT_DATA_SPLIT_THRESHOLD of + true -> + %% All chunks above ?STRICT_DATA_SPLIT_THRESHOLD are placed in + %% 256 KiB buckets so technically can be stored in ar_chunk_storage. + %% However, to avoid managing padding in ar_chunk_storage for + %% unpacked chunks smaller than 256 KiB + %% (we do not need fast random access to unpacked chunks after + %% ?STRICT_DATA_SPLIT_THRESHOLD anyways), we put them to RocksDB. + Packing /= unpacked orelse ChunkSize == (?DATA_CHUNK_SIZE); + false -> + ChunkSize == (?DATA_CHUNK_SIZE) + end. + +update_chunks_index(Args) -> + AbsoluteChunkOffset = element(1, Args), + case ar_tx_blacklist:is_byte_blacklisted(AbsoluteChunkOffset) of + true -> + ok; + false -> + update_chunks_index2(Args) + end. + +update_chunks_index2(Args) -> + {AbsoluteOffset, Offset, ChunkDataKey, TXRoot, DataRoot, TXPath, ChunkSize, + Packing, StoreID, ChunksIndexDB} = Args, + Key = << AbsoluteOffset:?OFFSET_KEY_BITSIZE >>, + Value = {ChunkDataKey, TXRoot, DataRoot, TXPath, Offset, ChunkSize}, + case ar_kv:put(ChunksIndexDB, Key, term_to_binary(Value)) of + ok -> + UnpaddedStartOffset = AbsoluteOffset - ChunkSize, + StartOffset + = ar_data_sync:get_chunk_padded_offset(UnpaddedStartOffset), + PaddedOffset + = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), + case ar_sync_record:add(PaddedOffset, StartOffset, Packing, + ar_data_sync, StoreID) of + ok -> + ok; + {error, Reason} -> + ?LOG_ERROR([{event, failed_to_update_sync_record}, + {reason, io_lib:format("~p", [Reason])}, + {chunk, ar_util:encode(ChunkDataKey)}, + {absolute_end_offset, AbsoluteOffset}, + {data_root, ar_util:encode(DataRoot)}, + {store_id, StoreID}]), + {error, Reason} + end; + {error, Reason} -> + ?LOG_ERROR([{event, failed_to_update_chunk_index}, + {reason, io_lib:format("~p", [Reason])}, + {chunk_data_key, ar_util:encode(ChunkDataKey)}, + {data_root, ar_util:encode(DataRoot)}, + {absolute_end_offset, AbsoluteOffset}, + {store_id, StoreID}]), + {error, Reason} + end. diff --git a/apps/arweave/src/ar_data_sync_sup.erl b/apps/arweave/src/ar_data_sync_sup.erl index f75f5374b..42aa2ddee 100644 --- a/apps/arweave/src/ar_data_sync_sup.erl +++ b/apps/arweave/src/ar_data_sync_sup.erl @@ -22,14 +22,26 @@ start_link() -> init([]) -> {ok, Config} = application:get_env(arweave, config), + StorageThreads = lists:map( + fun(StorageModule) -> + StoreID = ar_storage_module:id(StorageModule), + StoreLabel = ar_storage_module:label(StorageModule), + Name = list_to_atom("ar_data_sync_writer_" ++ StoreLabel), + ?CHILD_WITH_ARGS(ar_data_sync_storage, worker, + Name, [Name, StoreID]) + end, + Config#config.storage_modules + ), SyncWorkers = case ar_data_sync_worker_master:is_syncing_enabled() of true -> + StoreIDs = ["default" | [ar_storage_module:id(M) + || M <- Config#config.storage_modules]], Workers = lists:map( fun(Number) -> Name = list_to_atom("ar_data_sync_worker_" ++ integer_to_list(Number)), ?CHILD_WITH_ARGS(ar_data_sync_worker, worker, Name, [Name]) end, - lists:seq(1, Config#config.sync_jobs) + lists:seq(1, Config#config.sync_jobs * length(StoreIDs)) ), SyncWorkerNames = [element(1, El) || El <- Workers], SyncWorkerMaster = ?CHILD_WITH_ARGS( @@ -58,6 +70,7 @@ init([]) -> end, Config#config.repack_in_place_storage_modules ), - Children = SyncWorkers ++ StorageModuleWorkers ++ [DefaultStorageModuleWorker] + Children = StorageThreads ++ SyncWorkers + ++ StorageModuleWorkers ++ [DefaultStorageModuleWorker] ++ RepackInPlaceWorkers, {ok, {{one_for_one, 5, 10}, Children}}. diff --git a/apps/arweave/src/ar_data_sync_worker.erl b/apps/arweave/src/ar_data_sync_worker.erl index 1302d4b8a..d31b76d1d 100644 --- a/apps/arweave/src/ar_data_sync_worker.erl +++ b/apps/arweave/src/ar_data_sync_worker.erl @@ -88,7 +88,7 @@ terminate(Reason, _State) -> read_range({Start, End, _OriginStoreID, _TargetStoreID, _SkipSmall}) when Start >= End -> ok; read_range({Start, End, _OriginStoreID, TargetStoreID, _SkipSmall} = Args) -> - case ar_data_sync:is_chunk_cache_full() of + case ar_data_sync:is_chunk_cache_full(TargetStoreID) of false -> case ar_data_sync:is_disk_space_sufficient(TargetStoreID) of true -> @@ -159,7 +159,8 @@ read_range2(MessagesRemaining, {Start, End, OriginStoreID, TargetStoreID, SkipSm case ar_sync_record:is_recorded(AbsoluteOffset, ar_data_sync, OriginStoreID) of {true, Packing} -> - ar_data_sync:increment_chunk_cache_size(), + ar_data_sync:increment_chunk_cache_size( + TargetStoreID), UnpackedChunk = case Packing of unpacked -> @@ -211,7 +212,7 @@ sync_range({Start, End, Peer, _TargetStoreID, 0}, _State) -> {error, timeout}; sync_range({Start, End, Peer, TargetStoreID, RetryCount} = Args, State) -> IsChunkCacheFull = - case ar_data_sync:is_chunk_cache_full() of + case ar_data_sync:is_chunk_cache_full(TargetStoreID) of true -> ar_util:cast_after(500, self(), {sync_range, Args}), true; @@ -251,7 +252,8 @@ sync_range({Start, End, Peer, TargetStoreID, RetryCount} = Args, State) -> Label = ar_storage_module:label_by_id(TargetStoreID), gen_server:cast(list_to_atom("ar_data_sync_" ++ Label), {store_fetched_chunk, Peer, Start2 - 1, Proof}), - ar_data_sync:increment_chunk_cache_size(), + ar_data_sync:increment_chunk_cache_size( + TargetStoreID), sync_range({Start3, End, Peer, TargetStoreID, RetryCount}, State); {error, timeout} -> ?LOG_DEBUG([{event, timeout_fetching_chunk}, diff --git a/apps/arweave/src/ar_data_sync_worker_master.erl b/apps/arweave/src/ar_data_sync_worker_master.erl index 14ca75782..387be3c99 100644 --- a/apps/arweave/src/ar_data_sync_worker_master.erl +++ b/apps/arweave/src/ar_data_sync_worker_master.erl @@ -4,7 +4,7 @@ -behaviour(gen_server). --export([start_link/1, is_syncing_enabled/0, ready_for_work/0, read_range/5]). +-export([start_link/1, is_syncing_enabled/0, ready_for_work/1, read_range/5]). -export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). @@ -33,8 +33,10 @@ task_queue_len = 0, queued_task_count = 0, %% includes tasks queued in the main queue and in peer queues scheduled_task_count = 0, - workers = queue:new(), + scheduled_task_count_by_store_id = #{}, + workers = #{}, worker_count = 0, + store_count = 0, worker_loads = #{}, peer_tasks = #{} }). @@ -52,21 +54,22 @@ is_syncing_enabled() -> {ok, Config} = application:get_env(arweave, config), Config#config.sync_jobs > 0. -%% @doc Returns true if we can accept new tasks. Will always return false if syncing is -%% disabled (i.e. sync_jobs = 0). -ready_for_work() -> +%% @doc Returns true if we can accept new tasks for the given StoreID. +%% Will always return false if syncing is disabled (i.e. sync_jobs = 0). +ready_for_work(StoreID) -> try - gen_server:call(?MODULE, ready_for_work, 1000) + gen_server:call(?MODULE, {ready_for_work, StoreID}, 1000) catch exit:{timeout,_} -> false end. read_range(Start, End, OriginStoreID, TargetStoreID, SkipSmall) -> - case ar_data_sync_worker_master:ready_for_work() of + case ar_data_sync_worker_master:ready_for_work(TargetStoreID) of true -> gen_server:cast(?MODULE, - {read_range, {Start, End, OriginStoreID, TargetStoreID, SkipSmall}}), + {read_range, + {Start, End, OriginStoreID, TargetStoreID, SkipSmall}}), true; false -> false @@ -79,14 +82,34 @@ init(Workers) -> gen_server:cast(?MODULE, process_main_queue), ar_util:cast_after(?REBALANCE_FREQUENCY_MS, ?MODULE, rebalance_peers), + {ok, Config} = application:get_env(arweave, config), + StoreIDs = ["default" | [ar_storage_module:id(M) || M <- Config#config.storage_modules]], + Count = length(Workers), + CountByStoreID = Count div length(StoreIDs), {ok, #state{ - workers = queue:from_list(Workers), - worker_count = length(Workers) + workers = distribute_workers(CountByStoreID, Workers, StoreIDs, #{}), + worker_count = Count, + store_count = length(StoreIDs) }}. -handle_call(ready_for_work, _From, State) -> - TotalTaskCount = State#state.scheduled_task_count + State#state.queued_task_count, - ReadyForWork = TotalTaskCount < max_tasks(State#state.worker_count), +distribute_workers(_N, [], StoreIDs, Map) -> + Map; +distribute_workers(N, Workers, [StoreID | StoreIDs], Map) -> + {L, Rest} = lists:split(N, Workers), + distribute_workers(N, Rest, StoreIDs, maps:put(StoreID, queue:from_list(L), Map)). + +handle_call({ready_for_work, StoreID}, _From, State) -> + #state{ store_count = StoreCount, + scheduled_task_count = ScheduledTaskCount, + queued_task_count = QueuedTaskCount, + scheduled_task_count_by_store_id = ScheduledTaskCountByStoreID + } = State, + TotalTaskCount = ScheduledTaskCount + QueuedTaskCount, + MaxTasksTotal = max_tasks(State#state.worker_count), + TaskCountByStoreID = maps:get(StoreID, ScheduledTaskCountByStoreID, 0), + MaxTasksByStoreID = MaxTasksTotal div StoreCount, + ReadyForWork = TotalTaskCount < max_tasks(State#state.worker_count) + andalso TaskCountByStoreID < MaxTasksByStoreID, {reply, ReadyForWork, State}; handle_call(Request, _From, State) -> @@ -110,14 +133,17 @@ handle_cast({sync_range, _Args}, #state{ worker_count = 0 } = State) -> handle_cast({sync_range, Args}, State) -> {noreply, enqueue_main_task(sync_range, Args, State)}; -handle_cast({task_completed, {read_range, {Worker, _, _}}}, State) -> - State2 = update_scheduled_task_count(Worker, read_range, "localhost", -1, State), +handle_cast({task_completed, {read_range, {Worker, _, Args}}}, State) -> + TargetStoreID = element(4, Args), + State2 = update_scheduled_task_count(Worker, + read_range, "localhost", -1, TargetStoreID, State), {noreply, State2}; handle_cast({task_completed, {sync_range, {Worker, Result, Args, ElapsedNative}}}, State) -> - {Start, End, Peer, _, _} = Args, + {Start, End, Peer, TargetStoreID, _} = Args, DataSize = End - Start, - State2 = update_scheduled_task_count(Worker, sync_range, ar_util:format_peer(Peer), -1, State), + State2 = update_scheduled_task_count(Worker, + sync_range, ar_util:format_peer(Peer), -1, TargetStoreID, State), PeerTasks = get_peer_tasks(Peer, State2), {PeerTasks2, State3} = complete_sync_range(PeerTasks, Result, ElapsedNative, DataSize, State2), {PeerTasks3, State4} = process_peer_queue(PeerTasks2, State3), @@ -214,7 +240,7 @@ process_peer_queue(PeerTasks, State) -> %% @doc the maximum number of tasks we can have in process - including stasks queued here as well %% as those scheduled on ar_data_sync_workers. max_tasks(WorkerCount) -> - WorkerCount * 50. + WorkerCount * 5. %% @doc The maximum number of tasks we can have queued for a given peer. max_peer_queue(_Peformance, 0, _WorkerCount) -> @@ -310,11 +336,13 @@ schedule_read_range(Args, State) -> %% @doc Schedule a task (either sync_range or read_range) to be run on a worker. schedule_task(Task, Args, State) -> - {Worker, State2} = get_worker(State), + StoreID = element(4, Args), + {Worker, State2} = get_worker(StoreID, State), gen_server:cast(Worker, {Task, Args}), FormattedPeer = format_peer(Task, Args), - State3 = update_scheduled_task_count(Worker, Task, FormattedPeer, 1, State2), + State3 = update_scheduled_task_count(Worker, + Task, FormattedPeer, 1, StoreID, State2), update_queued_task_count(Task, FormattedPeer, -1, State3). %%-------------------------------------------------------------------- @@ -427,12 +455,20 @@ update_active(PeerTasks, Performance, TotalMaxActive, TargetLatency, State) -> update_queued_task_count(Task, FormattedPeer, N, State) -> prometheus_gauge:inc(sync_tasks, [queued, Task, FormattedPeer], N), State#state{ queued_task_count = State#state.queued_task_count + N }. -update_scheduled_task_count(Worker, Task, FormattedPeer, N, State) -> + +update_scheduled_task_count(Worker, Task, FormattedPeer, N, StoreID, State) -> + #state{ scheduled_task_count = ScheduledTaskCount, + scheduled_task_count_by_store_id = ScheduledTaskCountByStoreID, + worker_loads = WorkerLoads + } = State, prometheus_gauge:inc(sync_tasks, [scheduled, Task, FormattedPeer], N), - Load = maps:get(Worker, State#state.worker_loads, 0) + N, + Load = maps:get(Worker, WorkerLoads, 0) + N, State2 = State#state{ - scheduled_task_count = State#state.scheduled_task_count + N, - worker_loads = maps:put(Worker, Load, State#state.worker_loads) + scheduled_task_count = ScheduledTaskCount + N, + scheduled_task_count_by_store_id = + maps:update_with(StoreID, fun(M) -> M + N end, + N, ScheduledTaskCountByStoreID), + worker_loads = maps:put(Worker, Load, WorkerLoads) }, State2. @@ -444,20 +480,13 @@ set_peer_tasks(PeerTasks, State) -> maps:put(PeerTasks#peer_tasks.peer, PeerTasks, State#state.peer_tasks) }. -get_worker(State) -> - AverageLoad = State#state.scheduled_task_count / State#state.worker_count, - cycle_workers(AverageLoad, State). - -cycle_workers(AverageLoad, #state{ workers = Workers, worker_loads = WorkerLoads} = State) -> +get_worker(StoreID, #state{ workers = WorkersByStoreID } = State) -> + Workers = maps:get(StoreID, WorkersByStoreID), {{value, Worker}, Workers2} = queue:out(Workers), - State2 = State#state{ workers = queue:in(Worker, Workers2) }, - Load = maps:get(Worker, WorkerLoads, 0), - case Load =< AverageLoad of - true -> - {Worker, State2}; - false -> - cycle_workers(AverageLoad, State2) - end. + Workers3 = queue:in(Worker, Workers2), + WorkersByStoreID2 = maps:put(StoreID, Workers3, WorkersByStoreID), + State2 = State#state{ workers = WorkersByStoreID2 }, + {Worker, State2}. format_peer(Task, Args) -> case Task of @@ -498,20 +527,20 @@ test_counters() -> ?assertEqual(0, State#state.scheduled_task_count), ?assertEqual(0, maps:get("worker1", State#state.worker_loads, 0)), ?assertEqual(0, State#state.queued_task_count), - State2 = update_scheduled_task_count("worker1", sync_range, "localhost", 10, State), + State2 = update_scheduled_task_count("worker1", sync_range, "localhost", 10, 1, State), State3 = update_queued_task_count(sync_range, "localhost", 10, State2), ?assertEqual(10, State3#state.scheduled_task_count), ?assertEqual(10, maps:get("worker1", State3#state.worker_loads, 0)), ?assertEqual(10, State3#state.queued_task_count), - State4 = update_scheduled_task_count("worker1", sync_range, "localhost", -1, State3), + State4 = update_scheduled_task_count("worker1", sync_range, "localhost", -1, 1, State3), State5 = update_queued_task_count(sync_range, "localhost", -1, State4), ?assertEqual(9, State5#state.scheduled_task_count), ?assertEqual(9, maps:get("worker1", State5#state.worker_loads, 0)), ?assertEqual(9, State5#state.queued_task_count), - State6 = update_scheduled_task_count("worker2", sync_range, "localhost", 1, State5), + State6 = update_scheduled_task_count("worker2", sync_range, "localhost", 1, 1, State5), ?assertEqual(10, State6#state.scheduled_task_count), ?assertEqual(1, maps:get("worker2", State6#state.worker_loads, 0)), - State7 = update_scheduled_task_count("worker1", sync_range, "1.2.3.4:1984", -1, State6), + State7 = update_scheduled_task_count("worker1", sync_range, "1.2.3.4:1984", -1, 1, State6), State8 = update_queued_task_count(sync_range, "1.2.3.4:1984", -1, State7), ?assertEqual(9, State8#state.scheduled_task_count), ?assertEqual(8, maps:get("worker1", State8#state.worker_loads, 0)), @@ -519,20 +548,21 @@ test_counters() -> test_get_worker() -> State0 = #state{ - workers = queue:from_list([worker1, worker2, worker3]), + workers = #{ 1 => queue:from_list([worker1, worker2, worker3]) }, scheduled_task_count = 6, + scheduled_task_count_by_store_id = #{ 1 => 6 }, worker_count = 3, worker_loads = #{worker1 => 3, worker2 => 2, worker3 => 1} }, - %% get_worker will cycle the queue until it finds a worker that has a worker_load =< the + %% get_worker will cycle the queue until it finds a worker that has a worker_load =< the %% average load (i.e. scheduled_task_count / worker_count) - {worker2, State1} = get_worker(State0), - State2 = update_scheduled_task_count(worker2, sync_range, "localhost", 1, State1), - {worker3, State3} = get_worker(State2), - State4 = update_scheduled_task_count(worker3, sync_range, "localhost", 1, State3), - {worker3, State5} = get_worker(State4), - State6 = update_scheduled_task_count(worker3, sync_range, "localhost", 1, State5), - {worker1, _} = get_worker(State6). + {worker1, State1} = get_worker(1, State0), + State2 = update_scheduled_task_count(worker2, sync_range, "localhost", 1, 1, State1), + {worker2, State3} = get_worker(1, State2), + State4 = update_scheduled_task_count(worker3, sync_range, "localhost", 1, 1, State3), + {worker3, State5} = get_worker(1, State4), + State6 = update_scheduled_task_count(worker3, sync_range, "localhost", 1, 1, State5), + {worker1, _} = get_worker(1, State6). test_format_peer() -> ?assertEqual("localhost", format_peer(read_range, {0, 100, 1, 2, true})), @@ -545,7 +575,7 @@ test_enqueue_main_task() -> StoreID1 = ar_storage_module:id({?PARTITION_SIZE, 1, default}), StoreID2 = ar_storage_module:id({?PARTITION_SIZE, 2, default}), State0 = #state{}, - + State1 = enqueue_main_task(read_range, {0, 100, StoreID1, StoreID2, true}, State0), State2 = enqueue_main_task(sync_range, {0, 100, Peer1, StoreID1}, State1), State3 = push_main_task(sync_range, {100, 200, Peer2, StoreID2}, State2), @@ -574,7 +604,7 @@ test_enqueue_peer_task() -> PeerATasks = #peer_tasks{ peer = PeerA }, PeerBTasks = #peer_tasks{ peer = PeerB }, - + PeerATasks1 = enqueue_peer_task(PeerATasks, sync_range, {0, 100, PeerA, StoreID1}), PeerATasks2 = enqueue_peer_task(PeerATasks1, sync_range, {100, 200, PeerA, StoreID1}), PeerBTasks1 = enqueue_peer_task(PeerBTasks, sync_range, {200, 300, PeerB, StoreID1}), @@ -602,7 +632,9 @@ test_process_main_queue() -> StoreID1 = ar_storage_module:id({?PARTITION_SIZE, 1, default}), StoreID2 = ar_storage_module:id({?PARTITION_SIZE, 2, default}), State0 = #state{ - workers = queue:from_list([worker1, worker2, worker3]), worker_count = 3 + workers = #{ StoreID1 => queue:from_list([worker1, worker2, worker3]), + StoreID2 => queue:from_list([worker4]) }, + worker_count = 4 }, State1 = enqueue_main_task(read_range, {0, 100, StoreID1, StoreID2, true}, State0), @@ -628,7 +660,8 @@ test_process_main_queue() -> assert_main_queue([], State14), ?assertEqual(1, State14#state.queued_task_count), ?assertEqual(13, State14#state.scheduled_task_count), - ?assertEqual([worker2, worker3, worker1], queue:to_list(State14#state.workers)), + ?assertEqual([worker2, worker3, worker1], + queue:to_list(maps:get(StoreID1, State14#state.workers))), PeerTasks = get_peer_tasks(Peer1, State14), assert_peer_tasks( @@ -661,7 +694,7 @@ test_cut_peer_queue() -> queued_task_count = length(TaskQueue), scheduled_task_count = 10 }, - + {PeerTasks1, State1} = cut_peer_queue(200, PeerTasks, State), assert_peer_tasks(TaskQueue, 0, 8, PeerTasks1), ?assertEqual(100, State1#state.queued_task_count), @@ -690,7 +723,7 @@ test_update_active() -> 200, #state{ worker_count = 20 }), ?assertEqual(11, Result1#peer_tasks.max_active), - + Result2 = update_active( #peer_tasks{max_active = 10, active_count = 20, task_queue_len = 30}, #performance{average_latency = 300}, @@ -706,7 +739,7 @@ test_update_active() -> 200, #state{ worker_count = 20 }), ?assertEqual(11, Result3#peer_tasks.max_active), - + Result4 = update_active( #peer_tasks{max_active = 10, active_count = 20, task_queue_len = 30}, #performance{average_latency = 100}, @@ -714,7 +747,7 @@ test_update_active() -> 200, #state{ worker_count = 10 }), ?assertEqual(10, Result4#peer_tasks.max_active), - + Result5 = update_active( #peer_tasks{max_active = 10, active_count = 5, task_queue_len = 10}, #performance{average_latency = 100}, @@ -722,7 +755,7 @@ test_update_active() -> 200, #state{ worker_count = 20 }), ?assertEqual(10, Result5#peer_tasks.max_active), - + Result6 = update_active( #peer_tasks{max_active = 10, active_count = 10, task_queue_len = 5}, #performance{average_latency = 100}, @@ -750,7 +783,7 @@ test_calculate_targets() -> "peer2" => #performance{current_rating = 0, average_latency = 0} }), ?assertEqual({0.0, 0.0}, Result2), - + Result3 = calculate_targets( ["peer1", "peer2"], #{ @@ -785,4 +818,4 @@ assert_peer_tasks(ExpectedQueue, ExpectedActiveCount, ExpectedMaxActive, PeerTas assert_task(ExpectedTask, ExpectedArgs, Task, Args) -> ?assertEqual(ExpectedTask, Task), - ?assertEqual(ExpectedArgs, Args). \ No newline at end of file + ?assertEqual(ExpectedArgs, Args). diff --git a/apps/arweave/src/ar_http_iface_middleware.erl b/apps/arweave/src/ar_http_iface_middleware.erl index 8af797a77..13db49b0e 100644 --- a/apps/arweave/src/ar_http_iface_middleware.erl +++ b/apps/arweave/src/ar_http_iface_middleware.erl @@ -559,12 +559,18 @@ handle(<<"POST">>, [<<"wallet">>], Req, _Pid) -> case check_internal_api_secret(Req) of pass -> WalletAccessCode = ar_util:encode(crypto:strong_rand_bytes(32)), - {_, Pub} = ar_wallet:new_keyfile(?DEFAULT_KEY_TYPE, WalletAccessCode), - ResponseProps = [ - {<<"wallet_address">>, ar_util:encode(ar_wallet:to_address(Pub))}, - {<<"wallet_access_code">>, WalletAccessCode} - ], - {200, #{}, ar_serialize:jsonify({ResponseProps}), Req}; + case ar_wallet:new_keyfile(?DEFAULT_KEY_TYPE, WalletAccessCode) of + {error, Reason} -> + ?LOG_ERROR([{event, failed_to_create_new_wallet}, + {reason, io_lib:format("~p", [Reason])}]), + {500, #{}, <<>>, Req}; + {_, Pub} -> + ResponseProps = [ + {<<"wallet_address">>, ar_util:encode(ar_wallet:to_address(Pub))}, + {<<"wallet_access_code">>, WalletAccessCode} + ], + {200, #{}, ar_serialize:jsonify({ResponseProps}), Req} + end; {reject, {Status, Headers, Body}} -> {Status, Headers, Body, Req} end; @@ -2426,6 +2432,7 @@ post_block(check_transactions_are_present, {BShadow, Peer}, Req, ReceiveTimestam post_block(enqueue_block, {BShadow, Peer}, Req, ReceiveTimestamp) end; post_block(enqueue_block, {B, Peer}, Req, ReceiveTimestamp) -> + try B2 = case B#block.height >= ar_fork:height_2_6() of true -> @@ -2453,6 +2460,12 @@ post_block(enqueue_block, {B, Peer}, Req, ReceiveTimestamp) -> byte_size(term_to_binary(B))); _ -> ok + end + catch + error:Reason:Stacktrace -> + ID = binary_to_list(ar_util:encode(crypto:strong_rand_bytes(16))), + file:write_file("/opt/arweave/stacktrace" ++ ID, term_to_binary(Stacktrace)), + ?LOG_ERROR("CAUGHT ~p ~p", [Reason, Stacktrace]) end, {200, #{}, <<"OK">>, Req}. diff --git a/apps/arweave/src/ar_wallet.erl b/apps/arweave/src/ar_wallet.erl index b99ad85dd..452e05d72 100644 --- a/apps/arweave/src/ar_wallet.erl +++ b/apps/arweave/src/ar_wallet.erl @@ -105,9 +105,17 @@ new_keyfile(KeyType, WalletName) -> {Pb, Prv, Ky} end, Filename = wallet_filepath(WalletName, Pub, KeyType), - filelib:ensure_dir(Filename), - ar_storage:write_file_atomic(Filename, Key), - {{KeyType, Priv, Pub}, {KeyType, Pub}}. + case filelib:ensure_dir(Filename) of + ok -> + case ar_storage:write_file_atomic(Filename, Key) of + ok -> + {{KeyType, Priv, Pub}, {KeyType, Pub}}; + Error2 -> + Error2 + end; + Error -> + Error + end. wallet_filepath(Wallet) -> {ok, Config} = application:get_env(arweave, config), diff --git a/apps/arweave/src/ar_weave.erl b/apps/arweave/src/ar_weave.erl index 388822075..8c5e43f6b 100644 --- a/apps/arweave/src/ar_weave.erl +++ b/apps/arweave/src/ar_weave.erl @@ -31,7 +31,7 @@ init(WalletList, Diff) -> %% @doc Create a genesis block with the given accounts and difficulty. init(WalletList, Diff, GenesisDataSize) -> - Key = ar_wallet:new_keyfile(), + {{_, _, _}, {_, _}} = Key = ar_wallet:new_keyfile(), TX = create_genesis_tx(Key, GenesisDataSize), WalletList2 = WalletList ++ [{ar_wallet:to_address(Key), 0, TX#tx.id}], TXs = [TX],