From 44a669d2091d019f409661c1ea7e0c7f00c5a03d Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Fri, 3 Jan 2025 14:36:07 +0000 Subject: [PATCH] Fetch fresh snapshot from peer when joining (#6700) Co-authored-by: Amaury Chamayou --- .github/workflows/ci.yml | 2 +- CMakeLists.txt | 12 +- doc/host_config_schema/cchost_config.json | 5 + include/ccf/ds/nonstd.h | 8 + include/ccf/node/startup_config.h | 10 + python/src/ccf/ledger.py | 24 ++ src/common/configuration.h | 6 + src/ds/test/nonstd.cpp | 16 + src/host/configuration.h | 21 +- src/host/main.cpp | 73 +++- src/host/test/ledger.cpp | 5 +- src/http/http_builder.h | 30 +- src/http/http_rpc_context.h | 23 +- src/http/http_session.h | 7 +- src/node/node_state.h | 2 +- src/node/rpc/node_frontend.h | 343 +++++++++++++++++- src/snapshots/fetch.h | 342 +++++++++++++++++ src/snapshots/filenames.h | 165 +++++++++ .../snapshot_manager.h} | 160 +------- tests/config.jinja | 3 +- tests/e2e_operations.py | 100 +++++ tests/infra/clients.py | 18 +- tests/infra/remote.py | 2 + tests/reconfiguration.py | 15 +- 24 files changed, 1176 insertions(+), 216 deletions(-) create mode 100644 src/snapshots/fetch.h create mode 100644 src/snapshots/filenames.h rename src/{host/snapshots.h => snapshots/snapshot_manager.h} (58%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f33072174a35..96bc8c9a1328 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,7 +114,7 @@ jobs: # Build tools tdnf -y install build-essential clang cmake ninja-build which # Dependencies - tdnf -y install openssl-devel libuv-devel + tdnf -y install openssl-devel libuv-devel curl-devel # Test dependencies tdnf -y install libarrow-devel parquet-libs-devel lldb shell: bash diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f81b61ea28c..415076a670d7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -255,8 +255,15 @@ elseif(COMPILE_TARGET STREQUAL "virtual") endif() target_link_libraries( - cchost PRIVATE uv ${TLS_LIBRARY} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT} - ${LINK_LIBCXX} ccfcrypto.host + cchost + PRIVATE uv + ${TLS_LIBRARY} + ${CMAKE_DL_LIBS} + ${CMAKE_THREAD_LIBS_INIT} + ${LINK_LIBCXX} + ccfcrypto.host + curl + http_parser.host ) install(TARGETS cchost DESTINATION bin) @@ -711,6 +718,7 @@ if(BUILD_TESTS) ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/contiguous_set.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/unit_strings.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/dl_list.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/nonstd.cpp ) target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT}) diff --git a/doc/host_config_schema/cchost_config.json b/doc/host_config_schema/cchost_config.json index f457ccbac250..4b06681c18b1 100644 --- a/doc/host_config_schema/cchost_config.json +++ b/doc/host_config_schema/cchost_config.json @@ -396,6 +396,11 @@ "type": "boolean", "default": true, "description": "Whether to follow redirects to the primary node of the existing service to join" + }, + "fetch_recent_snapshot": { + "type": "boolean", + "default": true, + "description": "Whether to ask the target for a newer snapshot before joining. The node will ask the target what their latest snapshot is, and if that is later than what the node has locally, will fetch it via RPC before launching. Should generally only be turned off for specific test cases" } }, "required": ["target_rpc_address"], diff --git a/include/ccf/ds/nonstd.h b/include/ccf/ds/nonstd.h index 5788ce97deaf..c77148cfe6c2 100644 --- a/include/ccf/ds/nonstd.h +++ b/include/ccf/ds/nonstd.h @@ -185,6 +185,14 @@ namespace ccf::nonstd }); } + static inline std::string_view trim( + std::string_view s, std::string_view trim_chars = " \t\r\n") + { + const auto start = std::min(s.find_first_not_of(trim_chars), s.size()); + const auto end = std::min(s.find_last_not_of(trim_chars) + 1, s.size()); + return s.substr(start, end - start); + } + /// Iterate through tuple, calling functor on each element template static void tuple_for_each(const std::tuple& t, const F& f) diff --git a/include/ccf/node/startup_config.h b/include/ccf/node/startup_config.h index 4e51467c80ef..2045897c1be1 100644 --- a/include/ccf/node/startup_config.h +++ b/include/ccf/node/startup_config.h @@ -76,6 +76,16 @@ namespace ccf bool operator==(const Attestation&) const = default; }; Attestation attestation = {}; + + struct Snapshots + { + std::string directory = "snapshots"; + size_t tx_count = 10'000; + std::optional read_only_directory = std::nullopt; + + bool operator==(const Snapshots&) const = default; + }; + Snapshots snapshots = {}; }; struct StartupConfig : CCFConfig diff --git a/python/src/ccf/ledger.py b/python/src/ccf/ledger.py index e243c8abef78..ec837fd93cf0 100644 --- a/python/src/ccf/ledger.py +++ b/python/src/ccf/ledger.py @@ -126,6 +126,19 @@ def range_from_filename(filename: str) -> Tuple[int, Optional[int]]: raise ValueError(f"Could not read seqno range from ledger file {filename}") +def snapshot_index_from_filename(filename: str) -> Tuple[int, int]: + elements = ( + os.path.basename(filename) + .replace(COMMITTED_FILE_SUFFIX, "") + .replace("snapshot_", "") + .split("_") + ) + if len(elements) == 2: + return (int(elements[0]), int(elements[1])) + else: + raise ValueError(f"Could not read snapshot index from file name {filename}") + + class GcmHeader: view: int seqno: int @@ -851,6 +864,17 @@ def get_len(self) -> int: return self._file_size +def latest_snapshot(snapshots_dir): + best_name, best_seqno = None, None + for s in os.listdir(snapshots_dir): + with ccf.ledger.Snapshot(os.path.join(snapshots_dir, s)) as snapshot: + snapshot_seqno = snapshot.get_public_domain().get_seqno() + if best_seqno is None or snapshot_seqno > best_seqno: + best_name = s + best_seqno = snapshot_seqno + return best_name + + class LedgerChunk: """ Class used to parse and iterate over :py:class:`ccf.ledger.Transaction` in a CCF ledger chunk. diff --git a/src/common/configuration.h b/src/common/configuration.h index cde57c319e3a..d23724421b86 100644 --- a/src/common/configuration.h +++ b/src/common/configuration.h @@ -84,6 +84,11 @@ namespace ccf snp_security_policy_file, snp_uvm_endorsements_file); + DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig::Snapshots); + DECLARE_JSON_REQUIRED_FIELDS(CCFConfig::Snapshots); + DECLARE_JSON_OPTIONAL_FIELDS( + CCFConfig::Snapshots, directory, tx_count, read_only_directory); + DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig); DECLARE_JSON_REQUIRED_FIELDS(CCFConfig, network); DECLARE_JSON_OPTIONAL_FIELDS( @@ -94,6 +99,7 @@ namespace ccf ledger_signatures, jwt, attestation, + snapshots, node_to_node_message_limit, historical_cache_soft_limit); diff --git a/src/ds/test/nonstd.cpp b/src/ds/test/nonstd.cpp index b20c84716e95..8548df72dbac 100644 --- a/src/ds/test/nonstd.cpp +++ b/src/ds/test/nonstd.cpp @@ -265,3 +265,19 @@ TEST_CASE("rsplit" * doctest::test_suite("nonstd")) } } } + +TEST_CASE("trim" * doctest::test_suite("nonstd")) +{ + REQUIRE(ccf::nonstd::trim(" hello world ") == "hello world"); + REQUIRE( + ccf::nonstd::trim(" \r\n\t\nhello world\n\n\r\t\t\n\t \n\t") == + "hello world"); + REQUIRE(ccf::nonstd::trim("..hello..") == "..hello.."); + REQUIRE(ccf::nonstd::trim("..hello..", ".") == "hello"); + + REQUIRE(ccf::nonstd::trim("hello") == "hello"); + REQUIRE(ccf::nonstd::trim(" h") == "h"); + REQUIRE(ccf::nonstd::trim("h ") == "h"); + REQUIRE(ccf::nonstd::trim(" ") == ""); + REQUIRE(ccf::nonstd::trim("") == ""); +} \ No newline at end of file diff --git a/src/host/configuration.h b/src/host/configuration.h index 1d39578b3cb8..d6b9765b4fc9 100644 --- a/src/host/configuration.h +++ b/src/host/configuration.h @@ -103,16 +103,6 @@ namespace host }; Ledger ledger = {}; - struct Snapshots - { - std::string directory = "snapshots"; - size_t tx_count = 10'000; - std::optional read_only_directory = std::nullopt; - - bool operator==(const Snapshots&) const = default; - }; - Snapshots snapshots = {}; - struct Logging { ccf::LoggerLevel host_level = ccf::LoggerLevel::INFO; @@ -155,6 +145,7 @@ namespace host ccf::NodeInfoNetwork::NetAddress target_rpc_address; ccf::ds::TimeString retry_timeout = {"1000ms"}; bool follow_redirect = true; + bool fetch_recent_snapshot = true; bool operator==(const Join&) const = default; }; @@ -189,11 +180,6 @@ namespace host DECLARE_JSON_OPTIONAL_FIELDS( CCHostConfig::Ledger, directory, read_only_directories, chunk_size); - DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Snapshots); - DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Snapshots); - DECLARE_JSON_OPTIONAL_FIELDS( - CCHostConfig::Snapshots, directory, tx_count, read_only_directory); - DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Logging); DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Logging); DECLARE_JSON_OPTIONAL_FIELDS(CCHostConfig::Logging, host_level, format); @@ -216,7 +202,10 @@ namespace host DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Join); DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Join, target_rpc_address); DECLARE_JSON_OPTIONAL_FIELDS( - CCHostConfig::Command::Join, retry_timeout, follow_redirect); + CCHostConfig::Command::Join, + retry_timeout, + follow_redirect, + fetch_recent_snapshot); DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Recover); DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Recover); diff --git a/src/host/main.cpp b/src/host/main.cpp index e4a7a43b5122..888cada048fc 100644 --- a/src/host/main.cpp +++ b/src/host/main.cpp @@ -24,7 +24,8 @@ #include "process_launcher.h" #include "rpc_connections.h" #include "sig_term.h" -#include "snapshots.h" +#include "snapshots/fetch.h" +#include "snapshots/snapshot_manager.h" #include "ticker.h" #include "time_updater.h" @@ -376,7 +377,7 @@ int main(int argc, char** argv) config.ledger.read_only_directories); ledger.register_message_handlers(bp.get_dispatcher()); - asynchost::SnapshotManager snapshots( + snapshots::SnapshotManager snapshots( config.snapshots.directory, writer_factory, config.snapshots.read_only_directory); @@ -507,8 +508,6 @@ int main(int argc, char** argv) ccf::StartupConfig startup_config(config); - startup_config.snapshot_tx_interval = config.snapshots.tx_count; - if (startup_config.attestation.snp_security_policy_file.has_value()) { auto security_policy_file = @@ -690,22 +689,62 @@ int main(int argc, char** argv) config.command.type == StartType::Join || config.command.type == StartType::Recover) { - auto latest_committed_snapshot = - snapshots.find_latest_committed_snapshot(); - if (latest_committed_snapshot.has_value()) - { - auto& [snapshot_dir, snapshot_file] = latest_committed_snapshot.value(); - startup_snapshot = files::slurp(snapshot_dir / snapshot_file); + auto latest_local_snapshot = snapshots.find_latest_committed_snapshot(); - LOG_INFO_FMT( - "Found latest snapshot file: {} (size: {})", - snapshot_dir / snapshot_file, - startup_snapshot.size()); + if ( + config.command.type == StartType::Join && + config.command.join.fetch_recent_snapshot) + { + // Try to fetch a recent snapshot from peer + const size_t latest_local_idx = latest_local_snapshot.has_value() ? + snapshots::get_snapshot_idx_from_file_name( + latest_local_snapshot->second) : + 0; + auto latest_peer_snapshot = snapshots::fetch_from_peer( + config.command.join.target_rpc_address, + config.command.service_certificate_file, + latest_local_idx); + + if (latest_peer_snapshot.has_value()) + { + LOG_INFO_FMT( + "Received snapshot {} from peer (size: {}) - writing this to disk " + "and using for join startup", + latest_peer_snapshot->snapshot_name, + latest_peer_snapshot->snapshot_data.size()); + + const auto dst_path = fs::path(config.snapshots.directory) / + fs::path(latest_peer_snapshot->snapshot_name); + if (files::exists(dst_path)) + { + LOG_FATAL_FMT( + "Unable to write peer snapshot - already have a file at {}. " + "Exiting.", + dst_path); + return static_cast(CLI::ExitCodes::FileError); + } + files::dump(latest_peer_snapshot->snapshot_data, dst_path); + startup_snapshot = latest_peer_snapshot->snapshot_data; + } } - else + + if (startup_snapshot.empty()) { - LOG_INFO_FMT( - "No snapshot found: Node will replay all historical transactions"); + if (latest_local_snapshot.has_value()) + { + auto& [snapshot_dir, snapshot_file] = latest_local_snapshot.value(); + startup_snapshot = files::slurp(snapshot_dir / snapshot_file); + + LOG_INFO_FMT( + "Found latest local snapshot file: {} (size: {})", + snapshot_dir / snapshot_file, + startup_snapshot.size()); + } + else + { + LOG_INFO_FMT( + "No snapshot found: Node will replay all historical transactions"); + } } } diff --git a/src/host/test/ledger.cpp b/src/host/test/ledger.cpp index f0320a0654c5..9fb8ebd24f06 100644 --- a/src/host/test/ledger.cpp +++ b/src/host/test/ledger.cpp @@ -7,8 +7,8 @@ #include "crypto/openssl/hash.h" #include "ds/files.h" #include "ds/serialized.h" -#include "host/snapshots.h" #include "kv/serialised_entry_format.h" +#include "snapshots/snapshot_manager.h" #define DOCTEST_CONFIG_IMPLEMENT #include @@ -1259,6 +1259,8 @@ TEST_CASE("Snapshot file name" * doctest::test_suite("snapshot")) std::vector snapshot_idx_interval_ranges = { 10, 1000, 10000, std::numeric_limits::max() - 2}; + using namespace snapshots; + for (auto const& snapshot_idx_interval_range : snapshot_idx_interval_ranges) { std::uniform_int_distribution dist(1, snapshot_idx_interval_range); @@ -1304,6 +1306,7 @@ TEST_CASE("Generate and commit snapshots" * doctest::test_suite("snapshot")) auto snap_ro_dir = AutoDeleteFolder(snapshot_dir_read_only); fs::create_directory(snapshot_dir_read_only); + using namespace snapshots; SnapshotManager snapshots(snapshot_dir, wf, snapshot_dir_read_only); size_t snapshot_interval = 5; diff --git a/src/http/http_builder.h b/src/http/http_builder.h index 7ca6171e12d5..b0380e1a6834 100644 --- a/src/http/http_builder.h +++ b/src/http/http_builder.h @@ -71,34 +71,46 @@ namespace http return body; } - void set_body(const std::vector* b) + void set_body( + const std::vector* b, bool overwrite_content_length = true) { if (b != nullptr) { - set_body(b->data(), b->size()); + set_body(b->data(), b->size(), overwrite_content_length); } else { - set_body(nullptr, 0); + set_body(nullptr, 0, overwrite_content_length); } } - void set_body(const uint8_t* b, size_t s) + void set_body( + const uint8_t* b, size_t s, bool overwrite_content_length = true) { body = b; body_size = s; - headers[ccf::http::headers::CONTENT_LENGTH] = - fmt::format("{}", get_content_length()); + if ( + overwrite_content_length || + headers.find(ccf::http::headers::CONTENT_LENGTH) == headers.end()) + { + headers[ccf::http::headers::CONTENT_LENGTH] = + fmt::format("{}", get_content_length()); + } } - void set_body(const std::string& s) + void set_body(const std::string& s, bool overwrite_content_length = true) { body = (uint8_t*)s.data(); body_size = s.size(); - headers[ccf::http::headers::CONTENT_LENGTH] = - fmt::format("{}", get_content_length()); + if ( + overwrite_content_length || + headers.find(ccf::http::headers::CONTENT_LENGTH) == headers.end()) + { + headers[ccf::http::headers::CONTENT_LENGTH] = + fmt::format("{}", get_content_length()); + } } }; diff --git a/src/http/http_rpc_context.h b/src/http/http_rpc_context.h index 6f663dd16691..b1b15f1c0927 100644 --- a/src/http/http_rpc_context.h +++ b/src/http/http_rpc_context.h @@ -215,19 +215,36 @@ namespace http return responder; } + template + void _set_response_body(T&& body) + { + // HEAD responses must not contain a body - clients will ignore it + if (verb != HTTP_HEAD) + { + if constexpr (std::is_same_v) + { + response_body = std::vector(body.begin(), body.end()); + } + else + { + response_body = std::forward(body); + } + } + } + virtual void set_response_body(const std::vector& body) override { - response_body = body; + _set_response_body(body); } virtual void set_response_body(std::vector&& body) override { - response_body = std::move(body); + _set_response_body(std::move(body)); } virtual void set_response_body(std::string&& body) override { - response_body = std::vector(body.begin(), body.end()); + _set_response_body(std::move(body)); } virtual const std::vector& get_response_body() const override diff --git a/src/http/http_session.h b/src/http/http_session.h index eeccb399cb73..68729d9d7760 100644 --- a/src/http/http_session.h +++ b/src/http/http_session.h @@ -283,7 +283,12 @@ namespace http { response.set_header(k, v); } - response.set_body(body.data(), body.size()); + + response.set_body( + body.data(), + body.size(), + false /* Don't overwrite any existing content-length header */ + ); auto data = response.build_response(); tls_io->send_raw(data.data(), data.size()); diff --git a/src/node/node_state.h b/src/node/node_state.h index 3a5a68cd3e7b..7ea883aff5cb 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -2619,7 +2619,7 @@ namespace ccf throw std::logic_error("Snapshotter already initialised"); } snapshotter = std::make_shared( - writer_factory, network.tables, config.snapshot_tx_interval); + writer_factory, network.tables, config.snapshots.tx_count); } void read_ledger_entries(::consensus::Index from, ::consensus::Index to) diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index d28dfa36bd45..b55eee365c29 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -15,6 +15,7 @@ #include "ccf/version.h" #include "crypto/certs.h" #include "crypto/csr.h" +#include "ds/files.h" #include "ds/std_formatters.h" #include "frontend.h" #include "node/network_state.h" @@ -25,6 +26,7 @@ #include "node_interface.h" #include "service/internal_tables_access.h" #include "service/tables/previous_service_identity.h" +#include "snapshots/filenames.h" namespace ccf { @@ -1184,13 +1186,14 @@ namespace ccf ccf::errors::InternalError, "NodeConfigurationSubsystem is not available"); } + const auto& node_startup_config = + node_configuration_subsystem->get().node_config; return make_success(GetNode::Out{ node_id, ccf::NodeStatus::PENDING, is_primary, - node_configuration_subsystem->get() - .node_config.network.rpc_interfaces, - node_configuration_subsystem->get().node_config.node_data, + node_startup_config.network.rpc_interfaces, + node_startup_config.node_data, 0}); } }; @@ -1786,6 +1789,340 @@ namespace ccf .set_auto_schema() .set_forwarding_required(endpoints::ForwardingRequired::Never) .install(); + + static constexpr auto snapshot_since_param_key = "since"; + // Redirects to endpoint for a single specific snapshot + auto find_snapshot = [this](ccf::endpoints::CommandEndpointContext& ctx) { + auto node_configuration_subsystem = + this->context.get_subsystem(); + if (!node_configuration_subsystem) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::InternalError, + "NodeConfigurationSubsystem is not available"); + return; + } + + const auto& snapshots_config = + node_configuration_subsystem->get().node_config.snapshots; + + size_t latest_idx = 0; + { + // Get latest_idx from query param, if present + const auto parsed_query = + http::parse_query(ctx.rpc_ctx->get_request_query()); + + std::string error_reason; + auto snapshot_since = http::get_query_value_opt( + parsed_query, snapshot_since_param_key, error_reason); + + if (snapshot_since.has_value()) + { + if (error_reason != "") + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidQueryParameterValue, + std::move(error_reason)); + return; + } + latest_idx = snapshot_since.value(); + } + } + + const auto orig_latest = latest_idx; + auto latest_committed_snapshot = + snapshots::find_latest_committed_snapshot_in_directory( + snapshots_config.directory, latest_idx); + + if (!latest_committed_snapshot.has_value()) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_NOT_FOUND, + ccf::errors::ResourceNotFound, + fmt::format( + "This node has no committed snapshots since {}", orig_latest)); + return; + } + + const auto& snapshot_path = latest_committed_snapshot.value(); + + LOG_DEBUG_FMT("Redirecting to snapshot: {}", snapshot_path); + + auto redirect_url = fmt::format("/node/snapshot/{}", snapshot_path); + ctx.rpc_ctx->set_response_header( + ccf::http::headers::LOCATION, redirect_url); + ctx.rpc_ctx->set_response_status(HTTP_STATUS_PERMANENT_REDIRECT); + }; + make_command_endpoint( + "/snapshot", HTTP_HEAD, find_snapshot, no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .add_query_parameter( + snapshot_since_param_key, ccf::endpoints::OptionalParameter) + .set_openapi_hidden(true) + .install(); + make_command_endpoint( + "/snapshot", HTTP_GET, find_snapshot, no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .add_query_parameter( + snapshot_since_param_key, ccf::endpoints::OptionalParameter) + .set_openapi_hidden(true) + .install(); + + auto get_snapshot = [this](ccf::endpoints::CommandEndpointContext& ctx) { + auto node_configuration_subsystem = + this->context.get_subsystem(); + if (!node_configuration_subsystem) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::InternalError, + "NodeConfigurationSubsystem is not available"); + return; + } + + const auto& snapshots_config = + node_configuration_subsystem->get().node_config.snapshots; + + std::string snapshot_name; + std::string error; + if (!get_path_param( + ctx.rpc_ctx->get_request_path_params(), + "snapshot_name", + snapshot_name, + error)) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidResourceName, + std::move(error)); + return; + } + + fs::path snapshot_path = + fs::path(snapshots_config.directory) / snapshot_name; + + std::ifstream f(snapshot_path, std::ios::binary); + if (!f.good()) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_NOT_FOUND, + ccf::errors::ResourceNotFound, + fmt::format( + "This node does not have a snapshot named {}", snapshot_name)); + return; + } + + LOG_DEBUG_FMT("Found snapshot: {}", snapshot_path.string()); + + f.seekg(0, f.end); + const auto total_size = (size_t)f.tellg(); + + ctx.rpc_ctx->set_response_header("accept-ranges", "bytes"); + + if (ctx.rpc_ctx->get_request_verb() == HTTP_HEAD) + { + ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK); + ctx.rpc_ctx->set_response_header( + ccf::http::headers::CONTENT_LENGTH, total_size); + return; + } + + size_t range_start = 0; + size_t range_end = total_size; + { + const auto range_header = ctx.rpc_ctx->get_request_header("range"); + if (range_header.has_value()) + { + LOG_TRACE_FMT("Parsing range header {}", range_header.value()); + + auto [unit, ranges] = + ccf::nonstd::split_1(range_header.value(), "="); + if (unit != "bytes") + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + "Only 'bytes' is supported as a Range header unit"); + return; + } + + if (ranges.find(",") != std::string::npos) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + "Multiple ranges are not supported"); + return; + } + + const auto segments = ccf::nonstd::split(ranges, "-"); + if (segments.size() != 2) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Invalid format, cannot parse range in {}", + range_header.value())); + return; + } + + const auto s_range_start = segments[0]; + const auto s_range_end = segments[1]; + + if (!s_range_start.empty()) + { + { + const auto [p, ec] = std::from_chars( + s_range_start.begin(), s_range_start.end(), range_start); + if (ec != std::errc()) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Unable to parse start of range value {} in {}", + s_range_start, + range_header.value())); + return; + } + } + + if (range_start > total_size) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Start of range {} is larger than total file size {}", + range_start, + total_size)); + return; + } + + if (!s_range_end.empty()) + { + // Fully-specified range, like "X-Y" + { + const auto [p, ec] = std::from_chars( + s_range_end.begin(), s_range_end.end(), range_end); + if (ec != std::errc()) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Unable to parse end of range value {} in {}", + s_range_end, + range_header.value())); + return; + } + } + + if (range_end > total_size) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "End of range {} is larger than total file size {}", + range_end, + total_size)); + return; + } + + if (range_end < range_start) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Invalid range: Start ({}) and end ({}) out of order", + range_start, + range_end)); + return; + } + } + else + { + // Else this is an open-ended range like "X-" + range_end = total_size; + } + } + else + { + if (!s_range_end.empty()) + { + // Negative range, like "-Y" + size_t offset; + const auto [p, ec] = std::from_chars( + s_range_end.begin(), s_range_end.end(), offset); + if (ec != std::errc()) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Unable to parse end of range offset value {} in {}", + s_range_end, + range_header.value())); + return; + } + + range_end = total_size; + range_start = range_end - offset; + } + else + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + "Invalid range: Must contain range-start or range-end"); + return; + } + } + } + } + + const auto range_size = range_end - range_start; + + LOG_TRACE_FMT( + "Reading {}-byte range from {} to {}", + range_size, + range_start, + range_end); + + // Read requested range into buffer + std::vector contents(range_size); + f.seekg(range_start); + f.read((char*)contents.data(), contents.size()); + f.close(); + + // Build successful response + ctx.rpc_ctx->set_response_status(HTTP_STATUS_PARTIAL_CONTENT); + ctx.rpc_ctx->set_response_header( + ccf::http::headers::CONTENT_TYPE, + ccf::http::headervalues::contenttype::OCTET_STREAM); + ctx.rpc_ctx->set_response_body(std::move(contents)); + + // Partial Content responses describe the current response in + // Content-Range + ctx.rpc_ctx->set_response_header( + "content-range", + fmt::format("bytes {}-{}/{}", range_start, range_end, total_size)); + }; + make_command_endpoint( + "/snapshot/{snapshot_name}", HTTP_HEAD, get_snapshot, no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .set_openapi_hidden(true) + .install(); + make_command_endpoint( + "/snapshot/{snapshot_name}", HTTP_GET, get_snapshot, no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .set_openapi_hidden(true) + .install(); } }; diff --git a/src/snapshots/fetch.h b/src/snapshots/fetch.h new file mode 100644 index 000000000000..d3ee499c9cbf --- /dev/null +++ b/src/snapshots/fetch.h @@ -0,0 +1,342 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/ds/logger.h" +#include "ccf/ds/nonstd.h" +#include "ccf/rest_verb.h" +#include "http/http_builder.h" + +#include +#include +#include +#include +#include +#include +#include + +#define CHECK_CURL_EASY(fn, ...) \ + do \ + { \ + const auto res = fn(__VA_ARGS__); \ + if (res != CURLE_OK) \ + { \ + throw std::runtime_error(fmt::format( \ + "Error calling " #fn ": {} ({})", res, curl_easy_strerror(res))); \ + } \ + } while (0) + +#define CHECK_CURL_EASY_SETOPT(handle, info, arg) \ + CHECK_CURL_EASY(curl_easy_setopt, handle, info, arg) +#define CHECK_CURL_EASY_GETINFO(handle, info, arg) \ + CHECK_CURL_EASY(curl_easy_getinfo, handle, info, arg) + +#define EXPECT_HTTP_RESPONSE_STATUS(request, response, expected) \ + do \ + { \ + if (response.status_code != expected) \ + { \ + throw std::runtime_error(fmt::format( \ + "Expected {} response from {} {}, instead received {}", \ + ccf::http_status_str(expected), \ + request.method.c_str(), \ + request.url, \ + response.status_code)); \ + } \ + } while (0) + +namespace snapshots +{ + // Using curl 7.68.0, so missing niceties like curl_easy_header + + using HeaderMap = std::unordered_map; + size_t append_header(char* buffer, size_t size, size_t nitems, void* userdata) + { + HeaderMap& headers = *(HeaderMap*)userdata; + + if (size != 1) + { + LOG_FAIL_FMT( + "Unexpected value in curl HEADERFUNCTION callback: size = {}", size); + return 0; + } + + const std::string_view header = + ccf::nonstd::trim(std::string_view(buffer, nitems)); + + // Ignore HTTP status line + if (!header.starts_with("HTTP/1.1")) + { + const auto [field, value] = ccf::nonstd::split_1(header, ": "); + if (!value.empty()) + { + headers[std::string(field)] = ccf::nonstd::trim(value); + } + else + { + LOG_INFO_FMT("Ignoring invalid-looking HTTP Header '{}'", header); + } + } + + return nitems * size; + } + + using BodyHandler = std::function&)>; + size_t curl_write_callback( + char* ptr, size_t size, size_t nmemb, void* user_data) + { + BodyHandler& body_handler = *(BodyHandler*)user_data; + + if (size != 1) + { + LOG_FAIL_FMT( + "Unexpected value in curl WRITEFUNCTION callback: size = {}", size); + return 0; + } + + std::span data((const uint8_t*)ptr, size * nmemb); + + body_handler(data); + + return size * nmemb; + } + + struct SimpleHTTPRequest + { + ccf::RESTVerb method; + std::string url; + HeaderMap headers; + std::string ca_path; + BodyHandler body_handler = nullptr; + }; + + struct SimpleHTTPResponse + { + long status_code; + HeaderMap headers; + }; + + static inline SimpleHTTPResponse make_curl_request( + const SimpleHTTPRequest& request) + { + CURL* curl; + + curl = curl_easy_init(); + if (!curl) + { + throw std::runtime_error("Error initialising curl easy request"); + } + + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_URL, request.url.c_str()); + if (request.method == HTTP_HEAD) + { + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_NOBODY, 1L); + } + else if (request.method == HTTP_GET) + { + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HTTPGET, 1L); + } + else + { + throw std::logic_error( + fmt::format("Unsupported HTTP method: {}", request.method.c_str())); + } + + SimpleHTTPResponse response; + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HEADERDATA, &response.headers); + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HEADERFUNCTION, append_header); + + curl_easy_setopt(curl, CURLOPT_CAINFO, request.ca_path.c_str()); + + curl_slist* list = nullptr; + for (const auto& [k, v] : request.headers) + { + list = curl_slist_append(list, fmt::format("{}: {}", k, v).c_str()); + } + + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HTTPHEADER, list); + + if (request.body_handler != nullptr) + { + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEDATA, &request.body_handler); + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEFUNCTION, curl_write_callback); + } + + LOG_TRACE_FMT( + "Sending curl request {} {}", request.method.c_str(), request.url); + + CHECK_CURL_EASY(curl_easy_perform, curl); + + CHECK_CURL_EASY_GETINFO( + curl, CURLINFO_RESPONSE_CODE, &response.status_code); + + LOG_TRACE_FMT( + "{} {} returned {}", + request.method.c_str(), + request.url, + response.status_code); + + curl_slist_free_all(list); + curl_easy_cleanup(curl); + + return response; + } + + struct SnapshotResponse + { + std::string snapshot_name; + std::vector snapshot_data; + }; + + static std::optional fetch_from_peer( + const std::string& peer_address, + const std::string& path_to_peer_cert, + size_t latest_local_snapshot) + { + try + { + // Make initial request, which returns a redirect response to specific + // snapshot + std::string snapshot_url; + { + const auto initial_url = fmt::format( + "https://{}/node/snapshot?since={}", + peer_address, + latest_local_snapshot); + + SimpleHTTPRequest initial_request; + initial_request.method = HTTP_HEAD; + initial_request.url = initial_url; + initial_request.ca_path = path_to_peer_cert; + + const auto initial_response = make_curl_request(initial_request); + if (initial_response.status_code == HTTP_STATUS_NOT_FOUND) + { + LOG_INFO_FMT( + "Peer has no snapshot newer than {}", latest_local_snapshot); + return std::nullopt; + } + else if (initial_response.status_code != HTTP_STATUS_PERMANENT_REDIRECT) + { + EXPECT_HTTP_RESPONSE_STATUS( + initial_request, initial_response, HTTP_STATUS_PERMANENT_REDIRECT); + } + + auto location_it = + initial_response.headers.find(ccf::http::headers::LOCATION); + if (location_it == initial_response.headers.end()) + { + throw std::runtime_error(fmt::format( + "Expected {} header in redirect response from {} {}, none found", + ccf::http::headers::LOCATION, + initial_request.method.c_str(), + initial_request.url)); + } + + LOG_TRACE_FMT("Snapshot fetch redirected to {}", location_it->second); + + snapshot_url = + fmt::format("https://{}{}", peer_address, location_it->second); + } + + // Make follow-up request to redirected URL, to fetch total content size + size_t content_size; + { + SimpleHTTPRequest snapshot_size_request; + snapshot_size_request.method = HTTP_HEAD; + snapshot_size_request.url = snapshot_url; + snapshot_size_request.ca_path = path_to_peer_cert; + + const auto snapshot_size_response = + make_curl_request(snapshot_size_request); + + EXPECT_HTTP_RESPONSE_STATUS( + snapshot_size_request, snapshot_size_response, HTTP_STATUS_OK); + + auto content_size_it = snapshot_size_response.headers.find( + ccf::http::headers::CONTENT_LENGTH); + if (content_size_it == snapshot_size_response.headers.end()) + { + throw std::runtime_error(fmt::format( + "Expected {} header in redirect response from {} {}, none found", + ccf::http::headers::CONTENT_LENGTH, + snapshot_size_request.method.c_str(), + snapshot_size_request.url)); + } + + const auto& content_size_s = content_size_it->second; + const auto [p, ec] = std::from_chars( + content_size_s.data(), + content_size_s.data() + content_size_s.size(), + content_size); + if (ec != std::errc()) + { + throw std::runtime_error(fmt::format( + "Invalid {} header in redirect response from {} {}: {}", + ccf::http::headers::CONTENT_LENGTH, + snapshot_size_request.method.c_str(), + snapshot_size_request.url, + ec)); + } + } + + // Fetch 4MB chunks at a time + constexpr size_t range_size = 4 * 1024 * 1024; + LOG_TRACE_FMT( + "Preparing to fetch {}-byte snapshot from peer, {} bytes per-request", + content_size, + range_size); + + std::vector snapshot(content_size); + + { + auto range_start = 0; + auto range_end = std::min(content_size, range_size); + + while (true) + { + SimpleHTTPRequest snapshot_range_request; + snapshot_range_request.method = HTTP_GET; + snapshot_range_request.url = snapshot_url; + snapshot_range_request.headers["range"] = + fmt::format("bytes={}-{}", range_start, range_end); + snapshot_range_request.ca_path = path_to_peer_cert; + + snapshot_range_request.body_handler = [&](const auto& data) { + LOG_TRACE_FMT( + "Copying {} bytes into snapshot, starting at {}", + range_size, + range_start); + memcpy(snapshot.data() + range_start, data.data(), data.size()); + range_start += data.size(); + }; + + const auto range_response = make_curl_request(snapshot_range_request); + + EXPECT_HTTP_RESPONSE_STATUS( + snapshot_range_request, + range_response, + HTTP_STATUS_PARTIAL_CONTENT); + + if (range_end == content_size) + { + break; + } + + range_start = range_end; + range_end = std::min(content_size, range_start + range_size); + } + } + + const auto url_components = ccf::nonstd::split(snapshot_url, "/"); + const std::string snapshot_name(url_components.back()); + + return SnapshotResponse{snapshot_name, std::move(snapshot)}; + } + catch (const std::exception& e) + { + LOG_FAIL_FMT("Error during snapshot fetch: {}", e.what()); + return std::nullopt; + } + } +} \ No newline at end of file diff --git a/src/snapshots/filenames.h b/src/snapshots/filenames.h new file mode 100644 index 000000000000..bba801d6e4b5 --- /dev/null +++ b/src/snapshots/filenames.h @@ -0,0 +1,165 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include +#include +#include + +namespace fs = std::filesystem; + +namespace snapshots +{ + static constexpr auto snapshot_file_prefix = "snapshot"; + static constexpr auto snapshot_idx_delimiter = "_"; + static constexpr auto snapshot_committed_suffix = ".committed"; + + static bool is_snapshot_file(const std::string& file_name) + { + return file_name.starts_with(snapshot_file_prefix); + } + + static bool is_snapshot_file_committed(const std::string& file_name) + { + return file_name.find(snapshot_committed_suffix) != std::string::npos; + } + + static size_t read_idx(const std::string& str) + { + size_t idx = 0; + auto end_ptr = str.data() + str.size(); + + auto res = std::from_chars(str.data(), end_ptr, idx); + if (res.ec != std::errc()) + { + throw std::logic_error( + fmt::format("Could not read idx from string \"{}\": {}", str, res.ec)); + } + else if (res.ptr != end_ptr) + { + throw std::logic_error(fmt::format( + "Trailing characters in \"{}\" cannot be converted to idx: \"{}\"", + str, + std::string(res.ptr, end_ptr))); + } + return idx; + } + + static std::optional get_evidence_commit_idx_from_file_name( + const std::string& file_name) + { + // Only returns an evidence commit index for 1.x committed snapshots. + // 1.x committed snapshots file names are of the form: + // "snapshot_X_Y.committed_Z" while 2.x+ ones are of the form: + // "snapshot_X_Y.committed" + auto pos = file_name.find(snapshot_committed_suffix); + if (pos == std::string::npos) + { + throw std::logic_error( + fmt::format("Snapshot file \"{}\" is not committed", file_name)); + } + + pos = file_name.find(snapshot_idx_delimiter, pos); + if (pos == std::string::npos) + { + // 2.x+ snapshot + return std::nullopt; + } + + return read_idx(file_name.substr(pos + 1)); + } + + static size_t get_snapshot_idx_from_file_name(const std::string& file_name) + { + if (!is_snapshot_file(file_name)) + { + throw std::logic_error( + fmt::format("File \"{}\" is not a valid snapshot file", file_name)); + } + + auto idx_pos = file_name.find_first_of(snapshot_idx_delimiter); + if (idx_pos == std::string::npos) + { + throw std::logic_error(fmt::format( + "Snapshot file name {} does not contain snapshot seqno", file_name)); + } + + auto evidence_idx_pos = + file_name.find_first_of(snapshot_idx_delimiter, idx_pos + 1); + if (evidence_idx_pos == std::string::npos) + { + throw std::logic_error(fmt::format( + "Snapshot file \"{}\" does not contain evidence index", file_name)); + } + + return read_idx( + file_name.substr(idx_pos + 1, evidence_idx_pos - idx_pos - 1)); + } + + static size_t get_snapshot_evidence_idx_from_file_name( + const std::string& file_name) + { + if (!is_snapshot_file(file_name)) + { + throw std::logic_error( + fmt::format("File \"{}\" is not a valid snapshot file", file_name)); + } + + auto idx_pos = file_name.find_first_of(snapshot_idx_delimiter); + if (idx_pos == std::string::npos) + { + throw std::logic_error( + fmt::format("Snapshot file \"{}\" does not contain index", file_name)); + } + + auto evidence_idx_pos = + file_name.find_first_of(snapshot_idx_delimiter, idx_pos + 1); + if (evidence_idx_pos == std::string::npos) + { + throw std::logic_error(fmt::format( + "Snapshot file \"{}\" does not contain evidence index", file_name)); + } + + // Note: Snapshot file may not be committed + size_t end_str = std::string::npos; + auto commit_suffix_pos = + file_name.find_first_of(snapshot_committed_suffix, evidence_idx_pos + 1); + if (commit_suffix_pos != std::string::npos) + { + end_str = commit_suffix_pos - evidence_idx_pos - 1; + } + + return read_idx(file_name.substr(evidence_idx_pos + 1, end_str)); + } + + std::optional find_latest_committed_snapshot_in_directory( + const fs::path& directory, size_t& latest_committed_snapshot_idx) + { + std::optional latest_committed_snapshot_file_name = std::nullopt; + + for (auto& f : fs::directory_iterator(directory)) + { + auto file_name = f.path().filename(); + if (!is_snapshot_file(file_name)) + { + LOG_INFO_FMT("Ignoring non-snapshot file {}", file_name); + continue; + } + + if (!is_snapshot_file_committed(file_name)) + { + LOG_INFO_FMT("Ignoring non-committed snapshot file {}", file_name); + continue; + } + + auto snapshot_idx = get_snapshot_idx_from_file_name(file_name); + if (snapshot_idx > latest_committed_snapshot_idx) + { + latest_committed_snapshot_file_name = file_name; + latest_committed_snapshot_idx = snapshot_idx; + } + } + + return latest_committed_snapshot_file_name; + } +} \ No newline at end of file diff --git a/src/host/snapshots.h b/src/snapshots/snapshot_manager.h similarity index 58% rename from src/host/snapshots.h rename to src/snapshots/snapshot_manager.h index b1f8906d4ea7..dfaa1d15a42d 100644 --- a/src/host/snapshots.h +++ b/src/snapshots/snapshot_manager.h @@ -4,7 +4,8 @@ #include "ccf/ds/nonstd.h" #include "consensus/ledger_enclave_types.h" -#include "time_bound_logger.h" +#include "host/time_bound_logger.h" +#include "snapshots/filenames.h" #include #include @@ -14,161 +15,8 @@ namespace fs = std::filesystem; -namespace asynchost +namespace snapshots { - static constexpr auto snapshot_file_prefix = "snapshot"; - static constexpr auto snapshot_idx_delimiter = "_"; - static constexpr auto snapshot_committed_suffix = ".committed"; - - static bool is_snapshot_file(const std::string& file_name) - { - return file_name.starts_with(snapshot_file_prefix); - } - - static bool is_snapshot_file_committed(const std::string& file_name) - { - return file_name.find(snapshot_committed_suffix) != std::string::npos; - } - - static size_t read_idx(const std::string& str) - { - size_t idx = 0; - auto end_ptr = str.data() + str.size(); - - auto res = std::from_chars(str.data(), end_ptr, idx); - if (res.ec != std::errc()) - { - throw std::logic_error( - fmt::format("Could not read idx from string \"{}\": {}", str, res.ec)); - } - else if (res.ptr != end_ptr) - { - throw std::logic_error(fmt::format( - "Trailing characters in \"{}\" cannot be converted to idx: \"{}\"", - str, - std::string(res.ptr, end_ptr))); - } - return idx; - } - - static std::optional get_evidence_commit_idx_from_file_name( - const std::string& file_name) - { - // Only returns an evidence commit index for 1.x committed snapshots. - // 1.x committed snapshots file names are of the form: - // "snapshot_X_Y.committed_Z" while 2.x+ ones are of the form: - // "snapshot_X_Y.committed" - auto pos = file_name.find(snapshot_committed_suffix); - if (pos == std::string::npos) - { - throw std::logic_error( - fmt::format("Snapshot file \"{}\" is not committed", file_name)); - } - - pos = file_name.find(snapshot_idx_delimiter, pos); - if (pos == std::string::npos) - { - // 2.x+ snapshot - return std::nullopt; - } - - return read_idx(file_name.substr(pos + 1)); - } - - static size_t get_snapshot_idx_from_file_name(const std::string& file_name) - { - if (!is_snapshot_file(file_name)) - { - throw std::logic_error( - fmt::format("File \"{}\" is not a valid snapshot file", file_name)); - } - - auto idx_pos = file_name.find_first_of(snapshot_idx_delimiter); - if (idx_pos == std::string::npos) - { - throw std::logic_error(fmt::format( - "Snapshot file name {} does not contain snapshot seqno", file_name)); - } - - auto evidence_idx_pos = - file_name.find_first_of(snapshot_idx_delimiter, idx_pos + 1); - if (evidence_idx_pos == std::string::npos) - { - throw std::logic_error(fmt::format( - "Snapshot file \"{}\" does not contain evidence index", file_name)); - } - - return read_idx( - file_name.substr(idx_pos + 1, evidence_idx_pos - idx_pos - 1)); - } - - static size_t get_snapshot_evidence_idx_from_file_name( - const std::string& file_name) - { - if (!is_snapshot_file(file_name)) - { - throw std::logic_error( - fmt::format("File \"{}\" is not a valid snapshot file", file_name)); - } - - auto idx_pos = file_name.find_first_of(snapshot_idx_delimiter); - if (idx_pos == std::string::npos) - { - throw std::logic_error( - fmt::format("Snapshot file \"{}\" does not contain index", file_name)); - } - - auto evidence_idx_pos = - file_name.find_first_of(snapshot_idx_delimiter, idx_pos + 1); - if (evidence_idx_pos == std::string::npos) - { - throw std::logic_error(fmt::format( - "Snapshot file \"{}\" does not contain evidence index", file_name)); - } - - // Note: Snapshot file may not be committed - size_t end_str = std::string::npos; - auto commit_suffix_pos = - file_name.find_first_of(snapshot_committed_suffix, evidence_idx_pos + 1); - if (commit_suffix_pos != std::string::npos) - { - end_str = commit_suffix_pos - evidence_idx_pos - 1; - } - - return read_idx(file_name.substr(evidence_idx_pos + 1, end_str)); - } - - std::optional find_latest_committed_snapshot_in_directory( - const fs::path& directory, size_t& latest_committed_snapshot_idx) - { - std::optional latest_committed_snapshot_file_name = std::nullopt; - - for (auto& f : fs::directory_iterator(directory)) - { - auto file_name = f.path().filename(); - if (!is_snapshot_file(file_name)) - { - LOG_INFO_FMT("Ignoring non-snapshot file {}", file_name); - continue; - } - - if (!is_snapshot_file_committed(file_name)) - { - LOG_INFO_FMT("Ignoring non-committed snapshot file {}", file_name); - continue; - } - - auto snapshot_idx = get_snapshot_idx_from_file_name(file_name); - if (snapshot_idx > latest_committed_snapshot_idx) - { - latest_committed_snapshot_file_name = file_name; - latest_committed_snapshot_idx = snapshot_idx; - } - } - - return latest_committed_snapshot_file_name; - } - class SnapshotManager { private: @@ -238,7 +86,7 @@ namespace asynchost const uint8_t* receipt_data, size_t receipt_size) { - TimeBoundLogger log_if_slow( + asynchost::TimeBoundLogger log_if_slow( fmt::format("Committing snapshot - snapshot_idx={}", snapshot_idx)); try diff --git a/tests/config.jinja b/tests/config.jinja index 65407e0befb6..c5dfbb0ef6d3 100644 --- a/tests/config.jinja +++ b/tests/config.jinja @@ -50,7 +50,8 @@ { "retry_timeout": "{{ join_timer }}", "target_rpc_address": "{{ target_rpc_address }}", - "follow_redirect": {{ follow_redirect|tojson }} + "follow_redirect": {{ follow_redirect|tojson }}, + "fetch_recent_snapshot": {{ fetch_recent_snapshot|tojson }} }, "recover": { "initial_service_certificate_validity_days": {{ initial_service_cert_validity_days }}, diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index 7655432e00fb..48344d4138cf 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -209,6 +209,105 @@ def test_large_snapshot(network, args): ) +def test_snapshot_access(network, args): + primary, _ = network.find_primary() + + snapshots_dir = network.get_committed_snapshots(primary) + snapshot_name = ccf.ledger.latest_snapshot(snapshots_dir) + snapshot_index, _ = ccf.ledger.snapshot_index_from_filename(snapshot_name) + + with open(os.path.join(snapshots_dir, snapshot_name), "rb") as f: + snapshot_data = f.read() + + with primary.client() as c: + r = c.head("/node/snapshot", allow_redirects=False) + assert r.status_code == http.HTTPStatus.PERMANENT_REDIRECT.value, r + assert "location" in r.headers, r.headers + location = r.headers["location"] + assert location == f"/node/snapshot/{snapshot_name}" + LOG.warning(r.headers) + + for since, expected in ( + (0, location), + (1, location), + (snapshot_index // 2, location), + (snapshot_index - 1, location), + (snapshot_index, None), + (snapshot_index + 1, None), + ): + for method in ("GET", "HEAD"): + r = c.call( + f"/node/snapshot?since={since}", + allow_redirects=False, + http_verb=method, + ) + if expected is None: + assert r.status_code == http.HTTPStatus.NOT_FOUND, r + else: + assert r.status_code == http.HTTPStatus.PERMANENT_REDIRECT.value, r + assert "location" in r.headers, r.headers + actual = r.headers["location"] + assert actual == expected + + r = c.head(location) + assert r.status_code == http.HTTPStatus.OK.value, r + assert r.headers["accept-ranges"] == "bytes", r.headers + total_size = int(r.headers["content-length"]) + + a = total_size // 3 + b = a * 2 + for start, end in [ + (0, None), + (0, total_size), + (0, a), + (a, a), + (a, b), + (b, b), + (b, total_size), + (b, None), + ]: + range_header_value = f"{start}-{'' if end is None else end}" + r = c.get(location, headers={"range": f"bytes={range_header_value}"}) + assert r.status_code == http.HTTPStatus.PARTIAL_CONTENT.value, r + + expected = snapshot_data[start:end] + actual = r.body.data() + assert ( + expected == actual + ), f"Binary mismatch, {len(expected)} vs {len(actual)}:\n{expected}\nvs\n{actual}" + + for negative_offset in [ + 1, + a, + b, + ]: + range_header_value = f"-{negative_offset}" + r = c.get(location, headers={"range": f"bytes={range_header_value}"}) + assert r.status_code == http.HTTPStatus.PARTIAL_CONTENT.value, r + + expected = snapshot_data[-negative_offset:] + actual = r.body.data() + assert ( + expected == actual + ), f"Binary mismatch, {len(expected)} vs {len(actual)}:\n{expected}\nvs\n{actual}" + + # Check error handling for invalid ranges + for invalid_range, err_msg in [ + (f"{a}-foo", "Unable to parse end of range value foo"), + ("foo-foo", "Unable to parse start of range value foo"), + (f"foo-{b}", "Unable to parse start of range value foo"), + (f"{b}-{a}", "out of order"), + (f"0-{total_size + 1}", "larger than total file size"), + ("-1-5", "Invalid format"), + ("-", "Invalid range"), + ("-foo", "Unable to parse end of range offset value foo"), + ("", "Invalid format"), + ]: + r = c.get(location, headers={"range": f"bytes={invalid_range}"}) + assert r.status_code == http.HTTPStatus.BAD_REQUEST.value, r + assert err_msg in r.body.json()["error"]["message"], r + + def split_all_ledger_files_in_dir(input_dir, output_dir): # A ledger file can only be split at a seqno that contains a signature # (so that all files end on a signature that verifies their integrity). @@ -296,6 +395,7 @@ def run_file_operations(args): test_forced_ledger_chunk(network, args) test_forced_snapshot(network, args) test_large_snapshot(network, args) + test_snapshot_access(network, args) primary, _ = network.find_primary() # Scoped transactions are not handled by historical range queries diff --git a/tests/infra/clients.py b/tests/infra/clients.py index 76247ef64548..6ac10b6d6dfa 100644 --- a/tests/infra/clients.py +++ b/tests/infra/clients.py @@ -162,7 +162,13 @@ def __str__(self): if self.headers: string += f" {truncate(str(self.headers), max_len=25)}" if self.body is not None: - string += escape_loguru_tags(f' {truncate(f"{self.body}")}') + if ( + "content-type" in self.headers + and self.headers["content-type"] == "application/octet-stream" + ): + string += f"" + else: + string += escape_loguru_tags(f' {truncate(f"{self.body}")}') return string @@ -267,7 +273,15 @@ def __str__(self): status_color = ( "red" if status_category in (4, 5) else "yellow" if redirect else "green" ) - body_s = escape_loguru_tags(truncate(str(self.body))) + + if ( + "content-type" in self.headers + and self.headers["content-type"] == "application/octet-stream" + ): + body_s = f"" + else: + body_s = escape_loguru_tags(truncate(str(self.body))) + # Body can't end with a \, or it will escape the loguru closing tag if len(body_s) > 0 and body_s[-1] == "\\": body_s += " " diff --git a/tests/infra/remote.py b/tests/infra/remote.py index e7522d0734ef..2bf52563363f 100644 --- a/tests/infra/remote.py +++ b/tests/infra/remote.py @@ -336,6 +336,7 @@ def __init__( ignore_first_sigterm=False, node_container_image=None, follow_redirect=True, + fetch_recent_snapshot=True, max_uncommitted_tx_count=0, snp_security_policy_file=None, snp_uvm_endorsements_file=None, @@ -533,6 +534,7 @@ def __init__( ignore_first_sigterm=ignore_first_sigterm, node_address=remote_class.get_node_address(node_address), follow_redirect=follow_redirect, + fetch_recent_snapshot=fetch_recent_snapshot, max_uncommitted_tx_count=max_uncommitted_tx_count, snp_security_policy_file=snp_security_policy_file, snp_uvm_endorsements_file=snp_uvm_endorsements_file, diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index c79b3550450e..95375878f770 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -124,7 +124,13 @@ def test_add_node(network, args, from_snapshot=True): } ) ) - network.join_node(new_node, args.package, args, from_snapshot=from_snapshot) + network.join_node( + new_node, + args.package, + args, + from_snapshot=from_snapshot, + fetch_recent_snapshot=from_snapshot, + ) # Verify self-signed node certificate validity period new_node.verify_certificate_validity_period(interface_name=operator_rpc_interface) @@ -138,9 +144,10 @@ def test_add_node(network, args, from_snapshot=True): if not from_snapshot: with new_node.client() as c: s = c.get("/node/state") - assert s.body.json()["node_id"] == new_node.node_id + body = s.body.json() + assert body["node_id"] == new_node.node_id assert ( - s.body.json()["startup_seqno"] == 0 + body["startup_seqno"] == 0 ), "Node started without snapshot but reports startup seqno != 0" # Now that the node is trusted, verify endorsed certificate validity period @@ -868,6 +875,7 @@ def run_join_old_snapshot(args): args.package, args, from_snapshot=True, + fetch_recent_snapshot=False, snapshots_dir=tmp_dir, timeout=3, ) @@ -891,6 +899,7 @@ def run_join_old_snapshot(args): args.package, args, from_snapshot=False, + fetch_recent_snapshot=False, timeout=3, ) except infra.network.StartupSeqnoIsOld as e: