diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/append_only_tree/content_addressed_append_only_tree.hpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/append_only_tree/content_addressed_append_only_tree.hpp index b27be4d3aae..d25507954eb 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/append_only_tree/content_addressed_append_only_tree.hpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/append_only_tree/content_addressed_append_only_tree.hpp @@ -651,7 +651,7 @@ void ContentAddressedAppendOnlyTree::get_leaf(const index_ RequestContext requestContext; requestContext.includeUncommitted = includeUncommitted; requestContext.root = store_->get_current_root(*tx, includeUncommitted); - std::optional leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx); + std::optional leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx, false); response.success = leaf_hash.has_value(); if (response.success) { response.inner.leaf = leaf_hash.value(); @@ -690,7 +690,7 @@ void ContentAddressedAppendOnlyTree::get_leaf(const index_ leaf_index, " for block ", blockNumber, - ", leaf index is too high."); + ", leaf index out of range."); response.success = false; return; } @@ -698,7 +698,7 @@ void ContentAddressedAppendOnlyTree::get_leaf(const index_ requestContext.blockNumber = blockNumber; requestContext.includeUncommitted = includeUncommitted; requestContext.root = blockData.root; - std::optional leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx); + std::optional leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx, false); response.success = leaf_hash.has_value(); if (response.success) { response.inner.leaf = leaf_hash.value(); @@ -746,7 +746,6 @@ void ContentAddressedAppendOnlyTree::find_leaf_indices_fro RequestContext requestContext; requestContext.includeUncommitted = includeUncommitted; - requestContext.root = store_->get_current_root(*tx, includeUncommitted); for (const auto& leaf : leaves) { std::optional leaf_index = @@ -787,7 +786,6 @@ void ContentAddressedAppendOnlyTree::find_leaf_indices_fro RequestContext requestContext; requestContext.blockNumber = blockNumber; requestContext.includeUncommitted = includeUncommitted; - requestContext.root = blockData.root; requestContext.maxIndex = blockData.size; for (const auto& leaf : leaves) { diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/indexed_tree/content_addressed_indexed_tree.hpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/indexed_tree/content_addressed_indexed_tree.hpp index 94b8d2723bf..4144bb206d1 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/indexed_tree/content_addressed_indexed_tree.hpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/indexed_tree/content_addressed_indexed_tree.hpp @@ -344,7 +344,7 @@ void ContentAddressedIndexedTree::get_leaf(const index_t& RequestContext requestContext; requestContext.includeUncommitted = includeUncommitted; requestContext.root = store_->get_current_root(*tx, includeUncommitted); - std::optional leaf_hash = find_leaf_hash(index, requestContext, *tx); + std::optional leaf_hash = find_leaf_hash(index, requestContext, *tx, false); if (!leaf_hash.has_value()) { response.success = false; response.message = "Failed to find leaf hash for current root"; @@ -390,7 +390,7 @@ void ContentAddressedIndexedTree::get_leaf(const index_t& requestContext.blockNumber = blockNumber; requestContext.includeUncommitted = includeUncommitted; requestContext.root = blockData.root; - std::optional leaf_hash = find_leaf_hash(index, requestContext, *tx); + std::optional leaf_hash = find_leaf_hash(index, requestContext, *tx, false); if (!leaf_hash.has_value()) { response.success = false; response.message = format("Failed to find leaf hash for root of block ", blockNumber); diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_database.hpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_database.hpp index 6443c996ec3..8f071901414 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_database.hpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_database.hpp @@ -12,6 +12,7 @@ class LMDBDatabaseCreationTransaction; class LMDBDatabase { public: using Ptr = std::unique_ptr; + using SharedPtr = std::shared_ptr; LMDBDatabase(LMDBEnvironment::SharedPtr env, const LMDBDatabaseCreationTransaction& transaction, diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_environment.test.cpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_environment.test.cpp new file mode 100644 index 00000000000..c8f13c5bdf7 --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_environment.test.cpp @@ -0,0 +1,203 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "barretenberg/common/serialize.hpp" +#include "barretenberg/common/streams.hpp" +#include "barretenberg/common/test.hpp" +#include "barretenberg/crypto/merkle_tree/fixtures.hpp" +#include "barretenberg/crypto/merkle_tree/lmdb_store/lmdb_database.hpp" +#include "barretenberg/crypto/merkle_tree/lmdb_store/lmdb_db_transaction.hpp" +#include "barretenberg/crypto/merkle_tree/lmdb_store/lmdb_environment.hpp" +#include "barretenberg/crypto/merkle_tree/lmdb_store/queries.hpp" +#include "barretenberg/crypto/merkle_tree/signal.hpp" +#include "barretenberg/crypto/merkle_tree/types.hpp" +#include "barretenberg/numeric/random/engine.hpp" +#include "barretenberg/numeric/uint128/uint128.hpp" +#include "barretenberg/numeric/uint256/uint256.hpp" +#include "barretenberg/polynomials/serialize.hpp" +#include "barretenberg/stdlib/primitives/field/field.hpp" +#include "lmdb_tree_store.hpp" + +using namespace bb::stdlib; +using namespace bb::crypto::merkle_tree; + +class LMDBEnvironmentTest : public testing::Test { + protected: + void SetUp() override + { + _directory = random_temp_directory(); + _mapSize = 1024 * 1024; + _maxReaders = 16; + std::filesystem::create_directories(_directory); + } + + void TearDown() override { std::filesystem::remove_all(_directory); } + + static std::string _directory; + static uint32_t _maxReaders; + static uint64_t _mapSize; +}; + +std::string LMDBEnvironmentTest::_directory; +uint32_t LMDBEnvironmentTest::_maxReaders; +uint64_t LMDBEnvironmentTest::_mapSize; + +std::vector serialise(std::string key) +{ + std::vector data(key.begin(), key.end()); + return data; +} + +TEST_F(LMDBEnvironmentTest, can_create_environment) +{ + EXPECT_NO_THROW(LMDBEnvironment environment( + LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders)); +} + +TEST_F(LMDBEnvironmentTest, can_create_database) +{ + LMDBEnvironment::SharedPtr environment = std::make_shared( + LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders); + + { + LMDBDatabaseCreationTransaction tx(environment); + LMDBDatabase::SharedPtr db = std::make_unique(environment, tx, "DB", false, false); + EXPECT_NO_THROW(tx.commit()); + } +} + +TEST_F(LMDBEnvironmentTest, can_write_to_database) +{ + LMDBEnvironment::SharedPtr environment = std::make_shared( + LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders); + + LMDBDatabaseCreationTransaction tx(environment); + LMDBDatabase::SharedPtr db = std::make_unique(environment, tx, "DB", false, false); + EXPECT_NO_THROW(tx.commit()); + + { + LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared(environment); + auto key = serialise(std::string("Key")); + auto data = serialise(std::string("TestData")); + EXPECT_NO_THROW(tx->put_value(key, data, *db)); + EXPECT_NO_THROW(tx->commit()); + } +} + +TEST_F(LMDBEnvironmentTest, can_read_from_database) +{ + LMDBEnvironment::SharedPtr environment = std::make_shared( + LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders); + + LMDBDatabaseCreationTransaction tx(environment); + LMDBDatabase::SharedPtr db = std::make_unique(environment, tx, "DB", false, false); + EXPECT_NO_THROW(tx.commit()); + + { + LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared(environment); + auto key = serialise(std::string("Key")); + auto data = serialise(std::string("TestData")); + EXPECT_NO_THROW(tx->put_value(key, data, *db)); + EXPECT_NO_THROW(tx->commit()); + } + + { + environment->wait_for_reader(); + LMDBTreeReadTransaction::SharedPtr tx = std::make_shared(environment); + auto key = serialise(std::string("Key")); + auto expected = serialise(std::string("TestData")); + std::vector data; + tx->get_value(key, data, *db); + EXPECT_EQ(data, expected); + } +} + +TEST_F(LMDBEnvironmentTest, can_write_and_read_multiple) +{ + LMDBEnvironment::SharedPtr environment = std::make_shared( + LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders); + + LMDBDatabaseCreationTransaction tx(environment); + LMDBDatabase::SharedPtr db = std::make_unique(environment, tx, "DB", false, false); + EXPECT_NO_THROW(tx.commit()); + + uint64_t numValues = 10; + + { + for (uint64_t count = 0; count < numValues; count++) { + LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared(environment); + auto key = serialise((std::stringstream() << "Key" << count).str()); + auto data = serialise((std::stringstream() << "TestData" << count).str()); + EXPECT_NO_THROW(tx->put_value(key, data, *db)); + EXPECT_NO_THROW(tx->commit()); + } + } + + { + for (uint64_t count = 0; count < numValues; count++) { + environment->wait_for_reader(); + LMDBTreeReadTransaction::SharedPtr tx = std::make_shared(environment); + auto key = serialise((std::stringstream() << "Key" << count).str()); + auto expected = serialise((std::stringstream() << "TestData" << count).str()); + std::vector data; + tx->get_value(key, data, *db); + EXPECT_EQ(data, expected); + } + } +} + +TEST_F(LMDBEnvironmentTest, can_read_multiple_threads) +{ + LMDBEnvironment::SharedPtr environment = + std::make_shared(LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, 2); + + LMDBDatabaseCreationTransaction tx(environment); + LMDBDatabase::SharedPtr db = std::make_unique(environment, tx, "DB", false, false); + EXPECT_NO_THROW(tx.commit()); + + uint64_t numValues = 10; + uint64_t numIterationsPerThread = 1000; + uint32_t numThreads = 16; + + { + for (uint64_t count = 0; count < numValues; count++) { + LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared(environment); + auto key = serialise((std::stringstream() << "Key" << count).str()); + auto data = serialise((std::stringstream() << "TestData" << count).str()); + EXPECT_NO_THROW(tx->put_value(key, data, *db)); + EXPECT_NO_THROW(tx->commit()); + } + } + + { + auto func = [&]() -> void { + for (uint64_t iteration = 0; iteration < numIterationsPerThread; iteration++) { + for (uint64_t count = 0; count < numValues; count++) { + environment->wait_for_reader(); + LMDBTreeReadTransaction::SharedPtr tx = std::make_shared(environment); + auto key = serialise((std::stringstream() << "Key" << count).str()); + auto expected = serialise((std::stringstream() << "TestData" << count).str()); + std::vector data; + tx->get_value(key, data, *db); + EXPECT_EQ(data, expected); + } + } + }; + std::vector> threads; + for (uint64_t count = 0; count < numThreads; count++) { + threads.emplace_back(std::make_unique(func)); + } + for (uint64_t count = 0; count < numThreads; count++) { + threads[count]->join(); + } + } +} diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_read_transaction.hpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_read_transaction.hpp index 89a20df8e7a..dd94b88b441 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_read_transaction.hpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_read_transaction.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include namespace bb::crypto::merkle_tree { @@ -22,6 +23,7 @@ namespace bb::crypto::merkle_tree { class LMDBTreeReadTransaction : public LMDBTransaction { public: using Ptr = std::unique_ptr; + using SharedPtr = std::shared_ptr; LMDBTreeReadTransaction(LMDBEnvironment::SharedPtr env); LMDBTreeReadTransaction(const LMDBTreeReadTransaction& other) = delete; diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.test.cpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.test.cpp index c33eb42bc23..f7bcbf009f5 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.test.cpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.test.cpp @@ -104,6 +104,48 @@ TEST_F(LMDBTreeStoreTest, can_write_and_read_meta_data) } } +TEST_F(LMDBTreeStoreTest, can_read_data_from_multiple_threads) +{ + TreeMeta metaData; + metaData.committedSize = 56; + metaData.initialSize = 12; + metaData.initialRoot = VALUES[1]; + metaData.root = VALUES[2]; + metaData.depth = 40; + metaData.oldestHistoricBlock = 87; + metaData.unfinalisedBlockHeight = 95; + metaData.name = "Note hash tree"; + metaData.size = 60; + LMDBTreeStore store(_directory, "DB1", _mapSize, 2); + { + LMDBTreeWriteTransaction::Ptr transaction = store.create_write_transaction(); + store.write_meta_data(metaData, *transaction); + transaction->commit(); + } + + uint64_t numIterationsPerThread = 1000; + uint32_t numThreads = 16; + + { + auto func = [&]() -> void { + for (uint64_t iteration = 0; iteration < numIterationsPerThread; iteration++) { + LMDBTreeReadTransaction::Ptr transaction = store.create_read_transaction(); + TreeMeta readBack; + bool success = store.read_meta_data(readBack, *transaction); + EXPECT_TRUE(success); + EXPECT_EQ(readBack, metaData); + } + }; + std::vector> threads; + for (uint64_t count = 0; count < numThreads; count++) { + threads.emplace_back(std::make_unique(func)); + } + for (uint64_t count = 0; count < numThreads; count++) { + threads[count]->join(); + } + } +} + TEST_F(LMDBTreeStoreTest, can_write_and_read_multiple_blocks_with_meta) { LMDBTreeStore store(_directory, "DB1", _mapSize, _maxReaders); diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_write_transaction.hpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_write_transaction.hpp index 927e14fb4fa..0ad9cdd5a9f 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_write_transaction.hpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_write_transaction.hpp @@ -22,6 +22,7 @@ namespace bb::crypto::merkle_tree { class LMDBTreeWriteTransaction : public LMDBTransaction { public: using Ptr = std::unique_ptr; + using SharedPtr = std::shared_ptr; LMDBTreeWriteTransaction(LMDBEnvironment::SharedPtr env); LMDBTreeWriteTransaction(const LMDBTreeWriteTransaction& other) = delete; diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/node_store/cached_content_addressed_tree_store.hpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/node_store/cached_content_addressed_tree_store.hpp index cdd5e102754..5fce392b619 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/node_store/cached_content_addressed_tree_store.hpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/node_store/cached_content_addressed_tree_store.hpp @@ -157,7 +157,7 @@ template class ContentAddressedCachedTreeStore { /** * @brief Returns the name of the tree */ - std::string get_name() const { return name_; } + std::string get_name() const { return forkConstantData_.name_; } /** * @brief Returns a read transaction against the underlying store. @@ -187,9 +187,12 @@ template class ContentAddressedCachedTreeStore { std::optional find_block_for_index(const index_t& index, ReadTransaction& tx) const; private: - std::string name_; - uint32_t depth_; - std::optional initialised_from_block_; + struct ForkConstantData { + std::string name_; + uint32_t depth_; + std::optional initialised_from_block_; + }; + ForkConstantData forkConstantData_; // This is a mapping between the node hash and it's payload (children and ref count) for every node in the tree, // including leaves. As indexed trees are updated, this will end up containing many nodes that are not part of the @@ -217,7 +220,7 @@ template class ContentAddressedCachedTreeStore { bool read_persisted_meta(TreeMeta& m, ReadTransaction& tx) const; - void enrich_meta_from_block(TreeMeta& m) const; + void enrich_meta_from_fork_constant_data(TreeMeta& m) const; void persist_meta(TreeMeta& m, WriteTransaction& tx); @@ -242,7 +245,7 @@ template class ContentAddressedCachedTreeStore { void delete_block_for_index(const block_number_t& blockNumber, const index_t& index, WriteTransaction& tx); - index_t constrain_tree_size(const RequestContext& requestContext, ReadTransaction& tx) const; + index_t constrain_tree_size_to_only_committed(const RequestContext& requestContext, ReadTransaction& tx) const; WriteTransactionPtr create_write_transaction() const { return dataStore_->create_write_transaction(); } }; @@ -251,10 +254,9 @@ template ContentAddressedCachedTreeStore::ContentAddressedCachedTreeStore(std::string name, uint32_t levels, PersistedStoreType::SharedPtr dataStore) - : name_(std::move(name)) - , depth_(levels) + : forkConstantData_{ .name_ = (std::move(name)), .depth_ = levels } , dataStore_(dataStore) - , nodes_by_index_(std::vector>(depth_ + 1, std::unordered_map())) + , nodes_by_index_(std::vector>(levels + 1, std::unordered_map())) { initialise(); } @@ -264,23 +266,30 @@ ContentAddressedCachedTreeStore::ContentAddressedCachedTreeStore( uint32_t levels, const index_t& referenceBlockNumber, PersistedStoreType::SharedPtr dataStore) - : name_(std::move(name)) - , depth_(levels) + : forkConstantData_{ .name_ = (std::move(name)), .depth_ = levels } , dataStore_(dataStore) - , nodes_by_index_(std::vector>(depth_ + 1, std::unordered_map())) + , nodes_by_index_(std::vector>(levels + 1, std::unordered_map())) { initialise_from_block(referenceBlockNumber); } template -index_t ContentAddressedCachedTreeStore::constrain_tree_size(const RequestContext& requestContext, - ReadTransaction& tx) const +index_t ContentAddressedCachedTreeStore::constrain_tree_size_to_only_committed( + const RequestContext& requestContext, ReadTransaction& tx) const { // We need to identify the size of the committed tree as it exists from our perspective - // To do this we read the uncommitted meta which will contained the committed size at our initialisation point - TreeMeta m; - get_meta(m, tx, true); - index_t sizeLimit = m.committedSize; + // We either take from the fork's constant data if available or we read the meta data from the store + index_t sizeLimit = 0; + if (forkConstantData_.initialised_from_block_.has_value()) { + // We are a fork. Take from constant data + sizeLimit = forkConstantData_.initialised_from_block_.value().size; + } else { + // We are the main tree. Read from the store, only use committed so as to not violate any requests for purely + // committed data + TreeMeta m; + get_meta(m, tx, false); + sizeLimit = m.committedSize; + } if (requestContext.maxIndex.has_value() && requestContext.maxIndex.value() < sizeLimit) { sizeLimit = requestContext.maxIndex.value(); } @@ -293,7 +302,7 @@ std::optional ContentAddressedCachedTreeStore::fi { RequestContext context; context.maxIndex = index + 1; - index_t constrainedSize = constrain_tree_size(context, tx); + index_t constrainedSize = constrain_tree_size_to_only_committed(context, tx); if (index >= constrainedSize) { return std::nullopt; } @@ -324,7 +333,11 @@ std::pair ContentAddressedCachedTreeStore::find_lo { auto new_value_as_number = uint256_t(new_leaf_key); index_t committed = 0; - std::optional sizeLimit = constrain_tree_size(requestContext, tx); + + // We first read committed data, so we must constrin the search to only the data committed from our perspective + // That means, if we are a fork, the committed size is the size of the tree as it was when we forked + // If we are the main tree, the committed size is the size of the tree as it is now + std::optional sizeLimit = constrain_tree_size_to_only_committed(requestContext, tx); fr found_key = dataStore_->find_low_leaf(new_leaf_key, committed, sizeLimit, tx); index_t db_index = committed; @@ -469,7 +482,10 @@ std::optional ContentAddressedCachedTreeStore::find_leaf FrKeyType key = leaf; bool success = dataStore_->read_leaf_index(key, committed, tx); if (success) { - index_t sizeLimit = constrain_tree_size(requestContext, tx); + // We must constrin the search to only the data committed from our perspective + // That means, if we are a fork, the committed size is the size of the tree as it was when we forked + // If we are the main tree, the committed size is the size of the tree as it is now + index_t sizeLimit = constrain_tree_size_to_only_committed(requestContext, tx); if (committed < start_index) { return std::nullopt; } @@ -576,18 +592,22 @@ bool ContentAddressedCachedTreeStore::read_persisted_meta(TreeMet if (!dataStore_->read_meta_data(m, tx)) { return false; } - enrich_meta_from_block(m); + // Having read the meta from the store, we need to enrich it with the fork constant data if available + enrich_meta_from_fork_constant_data(m); return true; } template -void ContentAddressedCachedTreeStore::enrich_meta_from_block(TreeMeta& m) const +void ContentAddressedCachedTreeStore::enrich_meta_from_fork_constant_data(TreeMeta& m) const { - if (initialised_from_block_.has_value()) { - m.size = initialised_from_block_->size; - m.committedSize = initialised_from_block_->size; - m.root = initialised_from_block_->root; - m.unfinalisedBlockHeight = initialised_from_block_->blockNumber; + // Here we update the given meta with properties from our constant fork data if available. + // If we are not a fork then nothing is to be updated + // If we are a fork then we will overwrite the root, size and committed size with the original fork values + if (forkConstantData_.initialised_from_block_.has_value()) { + m.size = forkConstantData_.initialised_from_block_->size; + m.committedSize = forkConstantData_.initialised_from_block_->size; + m.root = forkConstantData_.initialised_from_block_->root; + m.unfinalisedBlockHeight = forkConstantData_.initialised_from_block_->blockNumber; } } @@ -616,7 +636,7 @@ void ContentAddressedCachedTreeStore::commit(TreeMeta& finalMeta, TreeMeta uncommittedMeta; TreeMeta committedMeta; // We don't allow commits using images/forks - if (initialised_from_block_.has_value()) { + if (forkConstantData_.initialised_from_block_.has_value()) { throw std::runtime_error("Committing a fork is forbidden"); } { @@ -667,7 +687,8 @@ void ContentAddressedCachedTreeStore::commit(TreeMeta& finalMeta, tx->commit(); } catch (std::exception& e) { tx->try_abort(); - throw std::runtime_error(format("Unable to commit data to tree: ", name_, " Error: ", e.what())); + throw std::runtime_error( + format("Unable to commit data to tree: ", forkConstantData_.name_, " Error: ", e.what())); } } finalMeta = uncommittedMeta; @@ -732,7 +753,7 @@ void ContentAddressedCachedTreeStore::persist_node(const std::opt } fr hash = so.opHash.value(); - if (so.lvl == depth_) { + if (so.lvl == forkConstantData_.depth_) { // this is a leaf persist_leaf_pre_image(hash, tx); } @@ -767,7 +788,8 @@ template void ContentAddressedCachedTreeStore(); indices_ = std::map(); leaves_ = std::unordered_map(); - nodes_by_index_ = std::vector>(depth_ + 1, std::unordered_map()); + nodes_by_index_ = + std::vector>(forkConstantData_.depth_ + 1, std::unordered_map()); leaf_pre_image_by_index_ = std::unordered_map(); } @@ -784,9 +806,10 @@ void ContentAddressedCachedTreeStore::advance_finalised_block(con TreeMeta uncommittedMeta; BlockPayload blockPayload; if (blockNumber < 1) { - throw std::runtime_error(format("Unable to advance finalised block: ", blockNumber, ". Tree name: ", name_)); + throw std::runtime_error( + format("Unable to advance finalised block: ", blockNumber, ". Tree name: ", forkConstantData_.name_)); } - if (initialised_from_block_.has_value()) { + if (forkConstantData_.initialised_from_block_.has_value()) { throw std::runtime_error("Advancing the finalised block on a fork is forbidden"); } { @@ -795,8 +818,10 @@ void ContentAddressedCachedTreeStore::advance_finalised_block(con get_meta(uncommittedMeta, *tx, true); get_meta(committedMeta, *tx, false); if (!dataStore_->read_block_data(blockNumber, blockPayload, *tx)) { - throw std::runtime_error(format( - "Unable to advance finalised block: ", blockNumber, ". Failed to read block data. Tree name: ", name_)); + throw std::runtime_error(format("Unable to advance finalised block: ", + blockNumber, + ". Failed to read block data. Tree name: ", + forkConstantData_.name_)); } } // can only finalise blocks that are not finalised @@ -827,7 +852,7 @@ void ContentAddressedCachedTreeStore::advance_finalised_block(con throw std::runtime_error(format("Unable to commit advance of finalised block: ", blockNumber, ". Tree name: ", - name_, + forkConstantData_.name_, " Error: ", e.what())); } @@ -847,9 +872,10 @@ void ContentAddressedCachedTreeStore::unwind_block(const block_nu BlockPayload blockData; BlockPayload previousBlockData; if (blockNumber < 1) { - throw std::runtime_error(format("Unable to unwind block: ", blockNumber, ". Tree name: ", name_)); + throw std::runtime_error( + format("Unable to unwind block: ", blockNumber, ". Tree name: ", forkConstantData_.name_)); } - if (initialised_from_block_.has_value()) { + if (forkConstantData_.initialised_from_block_.has_value()) { throw std::runtime_error("Removing a block on a fork is forbidden"); } { @@ -861,7 +887,7 @@ void ContentAddressedCachedTreeStore::unwind_block(const block_nu format("Unable to unwind block: ", blockNumber, " Can't unwind with uncommitted data, first rollback before unwinding. Tree name: ", - name_)); + forkConstantData_.name_)); } if (blockNumber != uncommittedMeta.unfinalisedBlockHeight) { throw std::runtime_error(format("Unable to unwind block: ", @@ -869,7 +895,7 @@ void ContentAddressedCachedTreeStore::unwind_block(const block_nu " unfinalisedBlockHeight: ", committedMeta.unfinalisedBlockHeight, ". Tree name: ", - name_)); + forkConstantData_.name_)); } if (blockNumber <= uncommittedMeta.finalisedBlockHeight) { throw std::runtime_error(format("Unable to unwind block: ", @@ -877,7 +903,7 @@ void ContentAddressedCachedTreeStore::unwind_block(const block_nu " finalisedBlockHeight: ", committedMeta.finalisedBlockHeight, ". Tree name: ", - name_)); + forkConstantData_.name_)); } // populate the required data for the previous block @@ -886,14 +912,18 @@ void ContentAddressedCachedTreeStore::unwind_block(const block_nu previousBlockData.size = uncommittedMeta.initialSize; previousBlockData.blockNumber = 0; } else if (!dataStore_->read_block_data(blockNumber - 1, previousBlockData, *tx)) { - throw std::runtime_error(format( - "Unable to unwind block: ", blockNumber, ". Failed to read previous block data. Tree name: ", name_)); + throw std::runtime_error(format("Unable to unwind block: ", + blockNumber, + ". Failed to read previous block data. Tree name: ", + forkConstantData_.name_)); } // now get the root for the block we want to unwind if (!dataStore_->read_block_data(blockNumber, blockData, *tx)) { - throw std::runtime_error( - format("Unable to unwind block: ", blockNumber, ". Failed to read block data. Tree name: ", name_)); + throw std::runtime_error(format("Unable to unwind block: ", + blockNumber, + ". Failed to read block data. Tree name: ", + forkConstantData_.name_)); } } WriteTransactionPtr writeTx = create_write_transaction(); @@ -916,8 +946,12 @@ void ContentAddressedCachedTreeStore::unwind_block(const block_nu writeTx->commit(); } catch (std::exception& e) { writeTx->try_abort(); - throw std::runtime_error( - format("Unable to commit unwind of block: ", blockNumber, ". Tree name: ", name_, " Error: ", e.what())); + throw std::runtime_error(format("Unable to commit unwind of block: ", + blockNumber, + ". Tree name: ", + forkConstantData_.name_, + " Error: ", + e.what())); } // now update the uncommitted meta @@ -936,9 +970,10 @@ void ContentAddressedCachedTreeStore::remove_historical_block(con TreeMeta uncommittedMeta; BlockPayload blockData; if (blockNumber < 1) { - throw std::runtime_error(format("Unable to remove historical block: ", blockNumber, ". Tree name: ", name_)); + throw std::runtime_error( + format("Unable to remove historical block: ", blockNumber, ". Tree name: ", forkConstantData_.name_)); } - if (initialised_from_block_.has_value()) { + if (forkConstantData_.initialised_from_block_.has_value()) { throw std::runtime_error("Removing a block on a fork is forbidden"); } { @@ -953,7 +988,7 @@ void ContentAddressedCachedTreeStore::remove_historical_block(con " oldestHistoricBlock: ", committedMeta.oldestHistoricBlock, ". Tree name: ", - name_)); + forkConstantData_.name_)); } if (blockNumber >= committedMeta.finalisedBlockHeight) { throw std::runtime_error(format("Unable to remove historical block: ", @@ -961,12 +996,14 @@ void ContentAddressedCachedTreeStore::remove_historical_block(con " oldestHistoricBlock: ", committedMeta.finalisedBlockHeight, ". Tree name: ", - name_)); + forkConstantData_.name_)); } if (!dataStore_->read_block_data(blockNumber, blockData, *tx)) { - throw std::runtime_error(format( - "Unable to remove historical block: ", blockNumber, ". Failed to read block data. Tree name: ", name_)); + throw std::runtime_error(format("Unable to remove historical block: ", + blockNumber, + ". Failed to read block data. Tree name: ", + forkConstantData_.name_)); } } WriteTransactionPtr writeTx = create_write_transaction(); @@ -985,7 +1022,7 @@ void ContentAddressedCachedTreeStore::remove_historical_block(con throw std::runtime_error(format("Unable to commit removal of historical block: ", blockNumber, ". Tree name: ", - name_, + forkConstantData_.name_, " Error: ", e.what())); } @@ -1072,7 +1109,7 @@ void ContentAddressedCachedTreeStore::remove_node(const std::opti continue; } // the node was deleted, if it was a leaf then we need to remove the pre-image - if (so.lvl == depth_) { + if (so.lvl == forkConstantData_.depth_) { remove_leaf(hash, maxIndex, tx); } // push the child nodes to the stack @@ -1090,20 +1127,21 @@ template void ContentAddressedCachedTreeStore::initialise_from_block(const ReadTransactionPtr tx = create_read_transaction(); bool success = read_persisted_meta(meta_, *tx); if (success) { - if (name_ != meta_.name || depth_ != meta_.depth) { + if (forkConstantData_.name_ != meta_.name || forkConstantData_.depth_ != meta_.depth) { throw std::runtime_error(format("Inconsistent tree meta data when initialising ", - name_, + forkConstantData_.name_, " with depth ", - depth_, + forkConstantData_.depth_, " from block ", blockNumber, " stored name: ", @@ -1142,8 +1180,10 @@ void ContentAddressedCachedTreeStore::initialise_from_block(const } } else { - throw std::runtime_error(format( - "Tree found to be uninitialised when attempting to create ", name_, " from block ", blockNumber)); + throw std::runtime_error(format("Tree found to be uninitialised when attempting to create ", + forkConstantData_.name_, + " from block ", + blockNumber)); } if (meta_.unfinalisedBlockHeight < blockNumber) { @@ -1152,7 +1192,7 @@ void ContentAddressedCachedTreeStore::initialise_from_block(const " unfinalisedBlockHeight: ", meta_.unfinalisedBlockHeight, ". Tree name: ", - name_)); + forkConstantData_.name_)); } if (meta_.oldestHistoricBlock > blockNumber && blockNumber != 0) { throw std::runtime_error(format("Unable to fork from expired historical block: ", @@ -1160,7 +1200,7 @@ void ContentAddressedCachedTreeStore::initialise_from_block(const " unfinalisedBlockHeight: ", meta_.oldestHistoricBlock, ". Tree name: ", - name_)); + forkConstantData_.name_)); } BlockPayload blockData; if (blockNumber == 0) { @@ -1168,18 +1208,20 @@ void ContentAddressedCachedTreeStore::initialise_from_block(const blockData.root = meta_.initialRoot; blockData.size = meta_.initialSize; } else if (get_block_data(blockNumber, blockData, *tx) == false) { - throw std::runtime_error(format("Failed to retrieve block data: ", blockNumber, ". Tree name: ", name_)); + throw std::runtime_error( + format("Failed to retrieve block data: ", blockNumber, ". Tree name: ", forkConstantData_.name_)); } - initialised_from_block_ = blockData; - enrich_meta_from_block(meta_); + forkConstantData_.initialised_from_block_ = blockData; + // Ensure the meta reflects the fork constant data + enrich_meta_from_fork_constant_data(meta_); } } template std::optional ContentAddressedCachedTreeStore::get_fork_block() const { - if (initialised_from_block_.has_value()) { - return initialised_from_block_->blockNumber; + if (forkConstantData_.initialised_from_block_.has_value()) { + return forkConstantData_.initialised_from_block_->blockNumber; } return std::nullopt; } diff --git a/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp b/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp index 033ad3a51c3..581bf578c1f 100644 --- a/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp +++ b/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp @@ -46,7 +46,9 @@ WorldState::WorldState(uint64_t thread_pool_size, , _forkId(CANONICAL_FORK_ID) , _initial_header_generator_point(initial_header_generator_point) { - create_canonical_fork(data_dir, map_size, thread_pool_size); + // We set the max readers to be high, at least the number of given threads or the default if higher + uint64_t maxReaders = std::max(thread_pool_size, DEFAULT_MIN_NUMBER_OF_READERS); + create_canonical_fork(data_dir, map_size, maxReaders); } WorldState::WorldState(uint64_t thread_pool_size, diff --git a/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp b/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp index a87ff94db65..7f4b434bd08 100644 --- a/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp +++ b/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp @@ -49,6 +49,8 @@ template struct SequentialInsertionResult { MSGPACK_FIELDS(low_leaf_witness_data, insertion_witness_data); }; +const uint64_t DEFAULT_MIN_NUMBER_OF_READERS = 128; + /** * @brief Holds the Merkle trees responsible for storing the state of the Aztec protocol. * diff --git a/yarn-project/world-state/src/native/merkle_trees_facade.ts b/yarn-project/world-state/src/native/merkle_trees_facade.ts index 28e5886de1d..4e2c1d40611 100644 --- a/yarn-project/world-state/src/native/merkle_trees_facade.ts +++ b/yarn-project/world-state/src/native/merkle_trees_facade.ts @@ -143,7 +143,7 @@ export class MerkleTreesFacade implements MerkleTreeReadOperations { } async getInitialStateReference(): Promise { - const resp = await this.instance.call(WorldStateMessageType.GET_INITIAL_STATE_REFERENCE, void 0); + const resp = await this.instance.call(WorldStateMessageType.GET_INITIAL_STATE_REFERENCE, { canonical: true }); return new StateReference( treeStateReferenceToSnapshot(resp.state[MerkleTreeId.L1_TO_L2_MESSAGE_TREE]), diff --git a/yarn-project/world-state/src/native/message.ts b/yarn-project/world-state/src/native/message.ts index 6cd4b4f751a..a48bd189cdd 100644 --- a/yarn-project/world-state/src/native/message.ts +++ b/yarn-project/world-state/src/native/message.ts @@ -314,6 +314,10 @@ interface WithWorldStateRevision { revision: WorldStateRevision; } +interface WithCanonicalForkId { + canonical: true; +} + interface WithLeafIndex { leafIndex: bigint; } @@ -333,7 +337,7 @@ interface WithLeafValues { leaves: SerializedLeafValue[]; } -interface BlockShiftRequest { +interface BlockShiftRequest extends WithCanonicalForkId { toBlockNumber: bigint; } @@ -422,7 +426,7 @@ interface UpdateArchiveRequest extends WithForkId { blockHeaderHash: Buffer; } -interface SyncBlockRequest { +interface SyncBlockRequest extends WithCanonicalForkId { blockNumber: number; blockStateRef: BlockStateReference; blockHeaderHash: Fr; @@ -432,7 +436,7 @@ interface SyncBlockRequest { publicDataWrites: readonly SerializedLeafValue[]; } -interface CreateForkRequest { +interface CreateForkRequest extends WithCanonicalForkId { latest: boolean; blockNumber: number; } @@ -441,22 +445,26 @@ interface CreateForkResponse { forkId: number; } -interface DeleteForkRequest { - forkId: number; +interface DeleteForkRequest extends WithForkId {} + +export type WorldStateRequestCategories = WithForkId | WithWorldStateRevision | WithCanonicalForkId; + +export function isWithForkId(body: WorldStateRequestCategories): body is WithForkId { + return body && 'forkId' in body; } -interface CreateForkResponse { - forkId: number; +export function isWithRevision(body: WorldStateRequestCategories): body is WithWorldStateRevision { + return body && 'revision' in body; } -interface DeleteForkRequest { - forkId: number; +export function isWithCanonical(body: WorldStateRequestCategories): body is WithCanonicalForkId { + return body && 'canonical' in body; } export type WorldStateRequest = { [WorldStateMessageType.GET_TREE_INFO]: GetTreeInfoRequest; [WorldStateMessageType.GET_STATE_REFERENCE]: GetStateReferenceRequest; - [WorldStateMessageType.GET_INITIAL_STATE_REFERENCE]: void; + [WorldStateMessageType.GET_INITIAL_STATE_REFERENCE]: WithCanonicalForkId; [WorldStateMessageType.GET_LEAF_VALUE]: GetLeafRequest; [WorldStateMessageType.GET_LEAF_PREIMAGE]: GetLeafPreImageRequest; @@ -472,8 +480,8 @@ export type WorldStateRequest = { [WorldStateMessageType.UPDATE_ARCHIVE]: UpdateArchiveRequest; - [WorldStateMessageType.COMMIT]: void; - [WorldStateMessageType.ROLLBACK]: void; + [WorldStateMessageType.COMMIT]: WithCanonicalForkId; + [WorldStateMessageType.ROLLBACK]: WithCanonicalForkId; [WorldStateMessageType.SYNC_BLOCK]: SyncBlockRequest; @@ -484,9 +492,9 @@ export type WorldStateRequest = { [WorldStateMessageType.UNWIND_BLOCKS]: BlockShiftRequest; [WorldStateMessageType.FINALISE_BLOCKS]: BlockShiftRequest; - [WorldStateMessageType.GET_STATUS]: void; + [WorldStateMessageType.GET_STATUS]: WithCanonicalForkId; - [WorldStateMessageType.CLOSE]: void; + [WorldStateMessageType.CLOSE]: WithCanonicalForkId; }; export type WorldStateResponse = { diff --git a/yarn-project/world-state/src/native/native_world_state.test.ts b/yarn-project/world-state/src/native/native_world_state.test.ts index 0fc564db7a1..071c26ecfbf 100644 --- a/yarn-project/world-state/src/native/native_world_state.test.ts +++ b/yarn-project/world-state/src/native/native_world_state.test.ts @@ -709,4 +709,83 @@ describe('NativeWorldState', () => { await ws.close(); }); }); + + describe('Concurrent requests', () => { + let ws: NativeWorldStateService; + + beforeEach(async () => { + ws = await NativeWorldStateService.tmp(); + }); + + afterEach(async () => { + await ws.close(); + }); + + it('Mutating and non-mutating requests are correctly queued', async () => { + const numReads = 64; + const setupFork = await ws.fork(); + + const { block: block1, messages } = await mockBlock(1, 8, setupFork); + const { block: block2 } = await mockBlock(2, 8, setupFork); + const { block: block3 } = await mockBlock(3, 8, setupFork); + + await ws.handleL2BlockAndMessages(block1, messages); + + const testFork = await ws.fork(); + const commitmentDb = ws.getCommitted(); + + const committedPath = await commitmentDb.getSiblingPath(MerkleTreeId.PUBLIC_DATA_TREE, 0n); + + await testFork.sequentialInsert( + MerkleTreeId.PUBLIC_DATA_TREE, + block2.body.txEffects.map(write => { + return write.toBuffer(); + }), + ); + + const initialPath = await testFork.getSiblingPath(MerkleTreeId.PUBLIC_DATA_TREE, 0n); + + const firstReadsUncommitted = Array.from({ length: numReads }, () => + testFork.getSiblingPath(MerkleTreeId.PUBLIC_DATA_TREE, 0n), + ); + const firstReadsCommitted = Array.from({ length: numReads }, () => + commitmentDb.getSiblingPath(MerkleTreeId.PUBLIC_DATA_TREE, 0n), + ); + const write = testFork.sequentialInsert( + MerkleTreeId.PUBLIC_DATA_TREE, + block3.body.txEffects.map(write => { + return write.toBuffer(); + }), + ); + const secondReadsUncommitted = Array.from({ length: numReads }, () => + testFork.getSiblingPath(MerkleTreeId.PUBLIC_DATA_TREE, 0n), + ); + const secondReadsCommitted = Array.from({ length: numReads }, () => + commitmentDb.getSiblingPath(MerkleTreeId.PUBLIC_DATA_TREE, 0n), + ); + await Promise.all([ + ...firstReadsUncommitted, + ...firstReadsCommitted, + write, + ...secondReadsUncommitted, + ...secondReadsCommitted, + ]); + + const finalPath = await testFork.getSiblingPath(MerkleTreeId.PUBLIC_DATA_TREE, 0n); + + for (let i = 0; i < numReads; i++) { + const firstPathUncommitted = await firstReadsUncommitted[i]; + const secondPathUncommitted = await secondReadsUncommitted[i]; + expect(firstPathUncommitted).toEqual(initialPath); + expect(secondPathUncommitted).toEqual(finalPath); + + const firstPathCommitted = await firstReadsCommitted[i]; + const secondPathCommitted = await secondReadsCommitted[i]; + expect(firstPathCommitted).toEqual(committedPath); + expect(secondPathCommitted).toEqual(committedPath); + } + + await Promise.all([setupFork.close(), testFork.close()]); + }, 30_000); + }); }); diff --git a/yarn-project/world-state/src/native/native_world_state.ts b/yarn-project/world-state/src/native/native_world_state.ts index 0d5d305b8b1..092adb69dc3 100644 --- a/yarn-project/world-state/src/native/native_world_state.ts +++ b/yarn-project/world-state/src/native/native_world_state.ts @@ -160,6 +160,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { const resp = await this.instance.call(WorldStateMessageType.CREATE_FORK, { latest: blockNumber === undefined, blockNumber: blockNumber ?? 0, + canonical: true, }); return new MerkleTreesForkFacade(this.instance, this.initialHeader!, worldStateRevision(true, resp.forkId, 0)); } @@ -198,6 +199,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { paddedNullifiers: paddedNullifiers.map(serializeLeaf), publicDataWrites: publicDataWrites.map(serializeLeaf), blockStateRef: blockStateReference(l2Block.header.state), + canonical: true, }, this.sanitiseAndCacheSummaryFromFull.bind(this), this.deleteCachedSummary.bind(this), @@ -240,6 +242,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { WorldStateMessageType.FINALISE_BLOCKS, { toBlockNumber, + canonical: true, }, this.sanitiseAndCacheSummary.bind(this), this.deleteCachedSummary.bind(this), @@ -257,6 +260,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { WorldStateMessageType.REMOVE_HISTORICAL_BLOCKS, { toBlockNumber, + canonical: true, }, this.sanitiseAndCacheSummaryFromFull.bind(this), this.deleteCachedSummary.bind(this), @@ -273,6 +277,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { WorldStateMessageType.UNWIND_BLOCKS, { toBlockNumber, + canonical: true, }, this.sanitiseAndCacheSummaryFromFull.bind(this), this.deleteCachedSummary.bind(this), @@ -283,7 +288,11 @@ export class NativeWorldStateService implements MerkleTreeDatabase { if (this.cachedStatusSummary !== undefined) { return { ...this.cachedStatusSummary }; } - return await this.instance.call(WorldStateMessageType.GET_STATUS, void 0, this.sanitiseAndCacheSummary.bind(this)); + return await this.instance.call( + WorldStateMessageType.GET_STATUS, + { canonical: true }, + this.sanitiseAndCacheSummary.bind(this), + ); } updateLeaf( @@ -295,7 +304,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { } private async getInitialStateReference(): Promise { - const resp = await this.instance.call(WorldStateMessageType.GET_INITIAL_STATE_REFERENCE, void 0); + const resp = await this.instance.call(WorldStateMessageType.GET_INITIAL_STATE_REFERENCE, { canonical: true }); return new StateReference( treeStateReferenceToSnapshot(resp.state[MerkleTreeId.L1_TO_L2_MESSAGE_TREE]), diff --git a/yarn-project/world-state/src/native/native_world_state_instance.ts b/yarn-project/world-state/src/native/native_world_state_instance.ts index 25cee92f60d..4db6072a279 100644 --- a/yarn-project/world-state/src/native/native_world_state_instance.ts +++ b/yarn-project/world-state/src/native/native_world_state_instance.ts @@ -11,7 +11,6 @@ import { PUBLIC_DATA_TREE_HEIGHT, } from '@aztec/circuits.js'; import { createLogger } from '@aztec/foundation/log'; -import { SerialQueue } from '@aztec/foundation/queue'; import assert from 'assert'; import bindings from 'bindings'; @@ -25,8 +24,13 @@ import { TypedMessage, WorldStateMessageType, type WorldStateRequest, + type WorldStateRequestCategories, type WorldStateResponse, + isWithCanonical, + isWithForkId, + isWithRevision, } from './message.js'; +import { WorldStateOpsQueue } from './world_state_ops_queue.js'; // small extension to pack an NodeJS Fr instance to a representation that the C++ code can understand // this only works for writes. Unpacking from C++ can't create Fr instances because the data is passed @@ -49,7 +53,10 @@ const NATIVE_MODULE = bindings(NATIVE_LIBRARY_NAME); const MAX_WORLD_STATE_THREADS = +(process.env.HARDWARE_CONCURRENCY || '16'); export interface NativeWorldStateInstance { - call(messageType: T, body: WorldStateRequest[T]): Promise; + call( + messageType: T, + body: WorldStateRequest[T] & WorldStateRequestCategories, + ): Promise; } /** @@ -78,8 +85,8 @@ export class NativeWorldState implements NativeWorldStateInstance { /** The actual native instance */ private instance: any; - /** Calls to the same instance are serialized */ - private queue = new SerialQueue(); + // We maintain a map of queue to fork + private queues = new Map(); /** Creates a new native WorldState instance */ constructor( @@ -109,7 +116,8 @@ export class NativeWorldState implements NativeWorldStateInstance { dbMapSizeKb, threads, ); - this.queue.start(); + // Manually create the queue for the canonical fork + this.queues.set(0, new WorldStateOpsQueue()); } /** @@ -120,25 +128,65 @@ export class NativeWorldState implements NativeWorldStateInstance { * @param errorHandler - A callback called on request error, executed on the job queue * @returns The response to the message */ - public call( + public async call( messageType: T, - body: WorldStateRequest[T], + body: WorldStateRequest[T] & WorldStateRequestCategories, // allows for the pre-processing of responses on the job queue before being passed back responseHandler = (response: WorldStateResponse[T]): WorldStateResponse[T] => response, errorHandler = (_: string) => {}, ): Promise { - return this.queue.put(async () => { - assert.notEqual(messageType, WorldStateMessageType.CLOSE, 'Use close() to close the native instance'); - assert.equal(this.open, true, 'Native instance is closed'); - let response: WorldStateResponse[T]; - try { - response = await this._sendMessage(messageType, body); - } catch (error: any) { - errorHandler(error.message); - throw error; - } - return responseHandler(response); - }); + // Here we determine which fork the request is being executed against and whether it requires uncommitted data + // We use the fork Id to select the appropriate request queue and the uncommitted data flag to pass to the queue + let forkId = -1; + // We assume it includes uncommitted unless explicitly told otherwise + let committedOnly = false; + + // Canonical requests ALWAYS go against the canonical fork + // These include things like block syncs/unwinds etc + // These requests don't contain a fork ID + if (isWithCanonical(body)) { + forkId = 0; + } else if (isWithForkId(body)) { + forkId = body.forkId; + } else if (isWithRevision(body)) { + forkId = body.revision.forkId; + committedOnly = body.revision.includeUncommitted === false; + } else { + const _: never = body; + throw new Error(`Unable to determine forkId for message=${WorldStateMessageType[messageType]}`); + } + + // Get the queue or create a new one + let requestQueue = this.queues.get(forkId); + if (requestQueue === undefined) { + requestQueue = new WorldStateOpsQueue(); + this.queues.set(forkId, requestQueue); + } + + // Enqueue the request and wait for the response + const response = await requestQueue.execute( + async () => { + assert.notEqual(messageType, WorldStateMessageType.CLOSE, 'Use close() to close the native instance'); + assert.equal(this.open, true, 'Native instance is closed'); + let response: WorldStateResponse[T]; + try { + response = await this._sendMessage(messageType, body); + } catch (error: any) { + errorHandler(error.message); + throw error; + } + return responseHandler(response); + }, + messageType, + committedOnly, + ); + + // If the request was to delete the fork then we clean it up here + if (messageType === WorldStateMessageType.DELETE_FORK) { + await requestQueue.stop(); + this.queues.delete(forkId); + } + return response; } /** @@ -149,13 +197,21 @@ export class NativeWorldState implements NativeWorldStateInstance { return; } this.open = false; - await this._sendMessage(WorldStateMessageType.CLOSE, undefined); - await this.queue.end(); + const queue = this.queues.get(0)!; + + await queue.execute( + async () => { + await this._sendMessage(WorldStateMessageType.CLOSE, { canonical: true }); + }, + WorldStateMessageType.CLOSE, + false, + ); + await queue.stop(); } private async _sendMessage( messageType: T, - body: WorldStateRequest[T], + body: WorldStateRequest[T] & WorldStateRequestCategories, ): Promise { const messageId = this.nextMessageId++; if (body) { diff --git a/yarn-project/world-state/src/native/world_state_ops_queue.ts b/yarn-project/world-state/src/native/world_state_ops_queue.ts new file mode 100644 index 00000000000..ad786aa7ea4 --- /dev/null +++ b/yarn-project/world-state/src/native/world_state_ops_queue.ts @@ -0,0 +1,187 @@ +import { promiseWithResolvers } from '@aztec/foundation/promise'; + +import { WorldStateMessageType } from './message.js'; + +/** + * This is the implementation for queueing requests to the world state. + * Requests need to be queued for the world state to ensure that writes are correctly ordered + * and reads return the correct data. + * Due to the nature of the NAPI we can't really do this there. + * + * The rules for queueing are as follows: + * + * 1. Reads of committed state never need to be queued. LMDB uses MVCC to ensure readers see a consistent view of the DB. + * 2. Reads of uncommitted state can happen concurrently with other reads of uncommitted state on the same fork (or reads of committed state) + * 3. All writes require exclusive access to their respective fork + * + */ + +type WorldStateOp = { + requestId: number; + mutating: boolean; + request: () => Promise; + promise: PromiseWithResolvers; +}; + +// These are the set of message types that implement mutating operations +// Messages of these types require exclusive access to their given forks +export const MUTATING_MSG_TYPES = new Set([ + WorldStateMessageType.APPEND_LEAVES, + WorldStateMessageType.BATCH_INSERT, + WorldStateMessageType.SEQUENTIAL_INSERT, + WorldStateMessageType.UPDATE_ARCHIVE, + WorldStateMessageType.COMMIT, + WorldStateMessageType.ROLLBACK, + WorldStateMessageType.SYNC_BLOCK, + WorldStateMessageType.CREATE_FORK, + WorldStateMessageType.DELETE_FORK, + WorldStateMessageType.FINALISE_BLOCKS, + WorldStateMessageType.UNWIND_BLOCKS, + WorldStateMessageType.REMOVE_HISTORICAL_BLOCKS, +]); + +// This class implements the per-fork operation queue +export class WorldStateOpsQueue { + private requests: WorldStateOp[] = []; + private inFlightMutatingCount = 0; + private inFlightCount = 0; + private stopPromise?: Promise; + private stopResolve?: () => void; + private requestId = 0; + private ops: Map = new Map(); + + // The primary public api, this is where an operation is queued + // We return a promise that will ultimately be resolved/rejected with the response/error generated by the 'request' argument + public execute(request: () => Promise, messageType: WorldStateMessageType, committedOnly: boolean) { + if (this.stopResolve !== undefined) { + throw new Error('Unable to send request to world state, queue already stopped'); + } + + const op: WorldStateOp = { + requestId: this.requestId++, + mutating: MUTATING_MSG_TYPES.has(messageType), + request, + promise: promiseWithResolvers(), + }; + this.ops.set(op.requestId, op); + + // Perform the appropriate action based upon the queueing rules + if (op.mutating) { + this.executeMutating(op); + } else if (committedOnly === false) { + this.executeNonMutatingUncommitted(op); + } else { + this.executeNonMutatingCommitted(op); + } + return op.promise.promise; + } + + // Mutating requests need exclusive access + private executeMutating(op: WorldStateOp) { + // If nothing is in flight then we send the request immediately + // Otherwise add to the queue + if (this.inFlightCount === 0) { + this.sendEnqueuedRequest(op); + } else { + this.requests.push(op); + } + } + + // Non mutating requests including uncommitted state + private executeNonMutatingUncommitted(op: WorldStateOp) { + // If there are no mutating requests in flight and there is nothing queued + // then send the request immediately + // If a mutating request is in flight then we must wait + // If a mutating request is not in flight but something is queued then it must be a mutating request + if (this.inFlightMutatingCount == 0 && this.requests.length == 0) { + this.sendEnqueuedRequest(op); + } else { + this.requests.push(op); + } + } + + private executeNonMutatingCommitted(op: WorldStateOp) { + // This is a non-mutating request for committed data + // It can always be sent + op.request() + .then(op.promise.resolve, op.promise.reject) + .finally(() => { + this.ops.delete(op.requestId); + }); + } + + private checkAndEnqueue(completedOp: WorldStateOp) { + // As request has completed + // First we decrements the relevant in flight counters + if (completedOp.mutating) { + --this.inFlightMutatingCount; + } + --this.inFlightCount; + + // If there are still requests in flight then do nothing further + if (this.inFlightCount != 0) { + return; + } + + // No requests in flight, send next queued requests + // We loop and send: + // 1 mutating request if it is next in the queue + // As many non-mutating requests as we encounter until + // we exhaust the queue or we reach a mutating request + while (this.requests.length > 0) { + const next = this.requests[0]; + if (next.mutating) { + if (this.inFlightCount == 0) { + // send the mutating request + this.requests.shift(); + this.sendEnqueuedRequest(next); + } + // this request is mutating, we need to stop here + break; + } else { + // not mutating, send and go round again + this.requests.shift(); + this.sendEnqueuedRequest(next); + } + } + + // If the queue is empty, there is nothing in flight and we have been told to stop, then resolve the stop promise + if (this.inFlightCount == 0 && this.stopResolve !== undefined) { + this.stopResolve(); + } + } + + private sendEnqueuedRequest(op: WorldStateOp) { + // Here we increment the in flight counts before sending + ++this.inFlightCount; + if (op.mutating) { + ++this.inFlightMutatingCount; + } + + // Make the request and pass the response/error through to the stored promise + op.request() + .then(op.promise.resolve, op.promise.reject) + .finally(() => { + this.checkAndEnqueue(op); + this.ops.delete(op.requestId); + }); + } + + public stop() { + // If there is already a stop promise then return it + if (this.stopPromise) { + return this.stopPromise; + } + + // Otherwise create a new one and capture the resolve method + this.stopPromise = new Promise(resolve => { + this.stopResolve = resolve; + }); + + // If no outstanding requests then immediately resolve the promise + if (this.requests.length == 0 && this.inFlightCount == 0 && this.stopResolve !== undefined) { + this.stopResolve(); + } + return this.stopPromise; + } +}