Skip to content

Commit

Permalink
Merge branch 'main' into f/azure-linux-e2e-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maxtropets authored Jan 3, 2025
2 parents 2cf02d0 + 44a669d commit a881d15
Show file tree
Hide file tree
Showing 24 changed files with 1,176 additions and 216 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ jobs:
# Build tools
tdnf -y install build-essential clang cmake ninja-build which
# Dependencies
tdnf -y install openssl-devel libuv-devel
tdnf -y install openssl-devel libuv-devel curl-devel
# Test dependencies
tdnf -y install libarrow-devel parquet-libs-devel lldb npm jq expect
shell: bash
Expand Down
12 changes: 10 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,15 @@ elseif(COMPILE_TARGET STREQUAL "virtual")
endif()

target_link_libraries(
cchost PRIVATE uv ${TLS_LIBRARY} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT}
${LINK_LIBCXX} ccfcrypto.host
cchost
PRIVATE uv
${TLS_LIBRARY}
${CMAKE_DL_LIBS}
${CMAKE_THREAD_LIBS_INIT}
${LINK_LIBCXX}
ccfcrypto.host
curl
http_parser.host
)

install(TARGETS cchost DESTINATION bin)
Expand Down Expand Up @@ -714,6 +721,7 @@ if(BUILD_TESTS)
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/contiguous_set.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/unit_strings.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/dl_list.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/nonstd.cpp
)
target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT})

Expand Down
5 changes: 5 additions & 0 deletions doc/host_config_schema/cchost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@
"type": "boolean",
"default": true,
"description": "Whether to follow redirects to the primary node of the existing service to join"
},
"fetch_recent_snapshot": {
"type": "boolean",
"default": true,
"description": "Whether to ask the target for a newer snapshot before joining. The node will ask the target what their latest snapshot is, and if that is later than what the node has locally, will fetch it via RPC before launching. Should generally only be turned off for specific test cases"
}
},
"required": ["target_rpc_address"],
Expand Down
8 changes: 8 additions & 0 deletions include/ccf/ds/nonstd.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ namespace ccf::nonstd
});
}

static inline std::string_view trim(
std::string_view s, std::string_view trim_chars = " \t\r\n")
{
const auto start = std::min(s.find_first_not_of(trim_chars), s.size());
const auto end = std::min(s.find_last_not_of(trim_chars) + 1, s.size());
return s.substr(start, end - start);
}

/// Iterate through tuple, calling functor on each element
template <size_t I = 0, typename F, typename... Ts>
static void tuple_for_each(const std::tuple<Ts...>& t, const F& f)
Expand Down
10 changes: 10 additions & 0 deletions include/ccf/node/startup_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ namespace ccf
bool operator==(const Attestation&) const = default;
};
Attestation attestation = {};

struct Snapshots
{
std::string directory = "snapshots";
size_t tx_count = 10'000;
std::optional<std::string> read_only_directory = std::nullopt;

bool operator==(const Snapshots&) const = default;
};
Snapshots snapshots = {};
};

struct StartupConfig : CCFConfig
Expand Down
24 changes: 24 additions & 0 deletions python/src/ccf/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@ def range_from_filename(filename: str) -> Tuple[int, Optional[int]]:
raise ValueError(f"Could not read seqno range from ledger file {filename}")


def snapshot_index_from_filename(filename: str) -> Tuple[int, int]:
elements = (
os.path.basename(filename)
.replace(COMMITTED_FILE_SUFFIX, "")
.replace("snapshot_", "")
.split("_")
)
if len(elements) == 2:
return (int(elements[0]), int(elements[1]))
else:
raise ValueError(f"Could not read snapshot index from file name {filename}")


class GcmHeader:
view: int
seqno: int
Expand Down Expand Up @@ -851,6 +864,17 @@ def get_len(self) -> int:
return self._file_size


def latest_snapshot(snapshots_dir):
best_name, best_seqno = None, None
for s in os.listdir(snapshots_dir):
with ccf.ledger.Snapshot(os.path.join(snapshots_dir, s)) as snapshot:
snapshot_seqno = snapshot.get_public_domain().get_seqno()
if best_seqno is None or snapshot_seqno > best_seqno:
best_name = s
best_seqno = snapshot_seqno
return best_name


class LedgerChunk:
"""
Class used to parse and iterate over :py:class:`ccf.ledger.Transaction` in a CCF ledger chunk.
Expand Down
6 changes: 6 additions & 0 deletions src/common/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ namespace ccf
snp_security_policy_file,
snp_uvm_endorsements_file);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig::Snapshots);
DECLARE_JSON_REQUIRED_FIELDS(CCFConfig::Snapshots);
DECLARE_JSON_OPTIONAL_FIELDS(
CCFConfig::Snapshots, directory, tx_count, read_only_directory);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig);
DECLARE_JSON_REQUIRED_FIELDS(CCFConfig, network);
DECLARE_JSON_OPTIONAL_FIELDS(
Expand All @@ -94,6 +99,7 @@ namespace ccf
ledger_signatures,
jwt,
attestation,
snapshots,
node_to_node_message_limit,
historical_cache_soft_limit);

Expand Down
16 changes: 16 additions & 0 deletions src/ds/test/nonstd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,19 @@ TEST_CASE("rsplit" * doctest::test_suite("nonstd"))
}
}
}

TEST_CASE("trim" * doctest::test_suite("nonstd"))
{
REQUIRE(ccf::nonstd::trim(" hello world ") == "hello world");
REQUIRE(
ccf::nonstd::trim(" \r\n\t\nhello world\n\n\r\t\t\n\t \n\t") ==
"hello world");
REQUIRE(ccf::nonstd::trim("..hello..") == "..hello..");
REQUIRE(ccf::nonstd::trim("..hello..", ".") == "hello");

REQUIRE(ccf::nonstd::trim("hello") == "hello");
REQUIRE(ccf::nonstd::trim(" h") == "h");
REQUIRE(ccf::nonstd::trim("h ") == "h");
REQUIRE(ccf::nonstd::trim(" ") == "");
REQUIRE(ccf::nonstd::trim("") == "");
}
21 changes: 5 additions & 16 deletions src/host/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,6 @@ namespace host
};
Ledger ledger = {};

struct Snapshots
{
std::string directory = "snapshots";
size_t tx_count = 10'000;
std::optional<std::string> read_only_directory = std::nullopt;

bool operator==(const Snapshots&) const = default;
};
Snapshots snapshots = {};

struct Logging
{
ccf::LoggerLevel host_level = ccf::LoggerLevel::INFO;
Expand Down Expand Up @@ -155,6 +145,7 @@ namespace host
ccf::NodeInfoNetwork::NetAddress target_rpc_address;
ccf::ds::TimeString retry_timeout = {"1000ms"};
bool follow_redirect = true;
bool fetch_recent_snapshot = true;

bool operator==(const Join&) const = default;
};
Expand Down Expand Up @@ -189,11 +180,6 @@ namespace host
DECLARE_JSON_OPTIONAL_FIELDS(
CCHostConfig::Ledger, directory, read_only_directories, chunk_size);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Snapshots);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Snapshots);
DECLARE_JSON_OPTIONAL_FIELDS(
CCHostConfig::Snapshots, directory, tx_count, read_only_directory);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Logging);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Logging);
DECLARE_JSON_OPTIONAL_FIELDS(CCHostConfig::Logging, host_level, format);
Expand All @@ -216,7 +202,10 @@ namespace host
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Join);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Join, target_rpc_address);
DECLARE_JSON_OPTIONAL_FIELDS(
CCHostConfig::Command::Join, retry_timeout, follow_redirect);
CCHostConfig::Command::Join,
retry_timeout,
follow_redirect,
fetch_recent_snapshot);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Recover);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Recover);
Expand Down
73 changes: 56 additions & 17 deletions src/host/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#include "process_launcher.h"
#include "rpc_connections.h"
#include "sig_term.h"
#include "snapshots.h"
#include "snapshots/fetch.h"
#include "snapshots/snapshot_manager.h"
#include "ticker.h"
#include "time_updater.h"

Expand Down Expand Up @@ -376,7 +377,7 @@ int main(int argc, char** argv)
config.ledger.read_only_directories);
ledger.register_message_handlers(bp.get_dispatcher());

asynchost::SnapshotManager snapshots(
snapshots::SnapshotManager snapshots(
config.snapshots.directory,
writer_factory,
config.snapshots.read_only_directory);
Expand Down Expand Up @@ -507,8 +508,6 @@ int main(int argc, char** argv)

ccf::StartupConfig startup_config(config);

startup_config.snapshot_tx_interval = config.snapshots.tx_count;

if (startup_config.attestation.snp_security_policy_file.has_value())
{
auto security_policy_file =
Expand Down Expand Up @@ -690,22 +689,62 @@ int main(int argc, char** argv)
config.command.type == StartType::Join ||
config.command.type == StartType::Recover)
{
auto latest_committed_snapshot =
snapshots.find_latest_committed_snapshot();
if (latest_committed_snapshot.has_value())
{
auto& [snapshot_dir, snapshot_file] = latest_committed_snapshot.value();
startup_snapshot = files::slurp(snapshot_dir / snapshot_file);
auto latest_local_snapshot = snapshots.find_latest_committed_snapshot();

LOG_INFO_FMT(
"Found latest snapshot file: {} (size: {})",
snapshot_dir / snapshot_file,
startup_snapshot.size());
if (
config.command.type == StartType::Join &&
config.command.join.fetch_recent_snapshot)
{
// Try to fetch a recent snapshot from peer
const size_t latest_local_idx = latest_local_snapshot.has_value() ?
snapshots::get_snapshot_idx_from_file_name(
latest_local_snapshot->second) :
0;
auto latest_peer_snapshot = snapshots::fetch_from_peer(
config.command.join.target_rpc_address,
config.command.service_certificate_file,
latest_local_idx);

if (latest_peer_snapshot.has_value())
{
LOG_INFO_FMT(
"Received snapshot {} from peer (size: {}) - writing this to disk "
"and using for join startup",
latest_peer_snapshot->snapshot_name,
latest_peer_snapshot->snapshot_data.size());

const auto dst_path = fs::path(config.snapshots.directory) /
fs::path(latest_peer_snapshot->snapshot_name);
if (files::exists(dst_path))
{
LOG_FATAL_FMT(
"Unable to write peer snapshot - already have a file at {}. "
"Exiting.",
dst_path);
return static_cast<int>(CLI::ExitCodes::FileError);
}
files::dump(latest_peer_snapshot->snapshot_data, dst_path);
startup_snapshot = latest_peer_snapshot->snapshot_data;
}
}
else

if (startup_snapshot.empty())
{
LOG_INFO_FMT(
"No snapshot found: Node will replay all historical transactions");
if (latest_local_snapshot.has_value())
{
auto& [snapshot_dir, snapshot_file] = latest_local_snapshot.value();
startup_snapshot = files::slurp(snapshot_dir / snapshot_file);

LOG_INFO_FMT(
"Found latest local snapshot file: {} (size: {})",
snapshot_dir / snapshot_file,
startup_snapshot.size());
}
else
{
LOG_INFO_FMT(
"No snapshot found: Node will replay all historical transactions");
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/host/test/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#include "crypto/openssl/hash.h"
#include "ds/files.h"
#include "ds/serialized.h"
#include "host/snapshots.h"
#include "kv/serialised_entry_format.h"
#include "snapshots/snapshot_manager.h"

#define DOCTEST_CONFIG_IMPLEMENT
#include <doctest/doctest.h>
Expand Down Expand Up @@ -1259,6 +1259,8 @@ TEST_CASE("Snapshot file name" * doctest::test_suite("snapshot"))
std::vector<size_t> snapshot_idx_interval_ranges = {
10, 1000, 10000, std::numeric_limits<size_t>::max() - 2};

using namespace snapshots;

for (auto const& snapshot_idx_interval_range : snapshot_idx_interval_ranges)
{
std::uniform_int_distribution<size_t> dist(1, snapshot_idx_interval_range);
Expand Down Expand Up @@ -1304,6 +1306,7 @@ TEST_CASE("Generate and commit snapshots" * doctest::test_suite("snapshot"))
auto snap_ro_dir = AutoDeleteFolder(snapshot_dir_read_only);
fs::create_directory(snapshot_dir_read_only);

using namespace snapshots;
SnapshotManager snapshots(snapshot_dir, wf, snapshot_dir_read_only);

size_t snapshot_interval = 5;
Expand Down
30 changes: 21 additions & 9 deletions src/http/http_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,46 @@ namespace http
return body;
}

void set_body(const std::vector<uint8_t>* b)
void set_body(
const std::vector<uint8_t>* b, bool overwrite_content_length = true)
{
if (b != nullptr)
{
set_body(b->data(), b->size());
set_body(b->data(), b->size(), overwrite_content_length);
}
else
{
set_body(nullptr, 0);
set_body(nullptr, 0, overwrite_content_length);
}
}

void set_body(const uint8_t* b, size_t s)
void set_body(
const uint8_t* b, size_t s, bool overwrite_content_length = true)
{
body = b;
body_size = s;

headers[ccf::http::headers::CONTENT_LENGTH] =
fmt::format("{}", get_content_length());
if (
overwrite_content_length ||
headers.find(ccf::http::headers::CONTENT_LENGTH) == headers.end())
{
headers[ccf::http::headers::CONTENT_LENGTH] =
fmt::format("{}", get_content_length());
}
}

void set_body(const std::string& s)
void set_body(const std::string& s, bool overwrite_content_length = true)
{
body = (uint8_t*)s.data();
body_size = s.size();

headers[ccf::http::headers::CONTENT_LENGTH] =
fmt::format("{}", get_content_length());
if (
overwrite_content_length ||
headers.find(ccf::http::headers::CONTENT_LENGTH) == headers.end())
{
headers[ccf::http::headers::CONTENT_LENGTH] =
fmt::format("{}", get_content_length());
}
}
};

Expand Down
Loading

0 comments on commit a881d15

Please sign in to comment.