Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow concurrent world state access #11216

Merged
merged 12 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ void ContentAddressedAppendOnlyTree<Store, HashingPolicy>::get_leaf(const index_
RequestContext requestContext;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = store_->get_current_root(*tx, includeUncommitted);
std::optional<fr> leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx);
std::optional<fr> leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx, false);
response.success = leaf_hash.has_value();
if (response.success) {
response.inner.leaf = leaf_hash.value();
Expand Down Expand Up @@ -690,15 +690,15 @@ void ContentAddressedAppendOnlyTree<Store, HashingPolicy>::get_leaf(const index_
leaf_index,
" for block ",
blockNumber,
", leaf index is too high.");
", leaf index out of range.");
response.success = false;
return;
}
RequestContext requestContext;
requestContext.blockNumber = blockNumber;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = blockData.root;
std::optional<fr> leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx);
std::optional<fr> leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx, false);
response.success = leaf_hash.has_value();
if (response.success) {
response.inner.leaf = leaf_hash.value();
Expand Down Expand Up @@ -746,7 +746,6 @@ void ContentAddressedAppendOnlyTree<Store, HashingPolicy>::find_leaf_indices_fro

RequestContext requestContext;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = store_->get_current_root(*tx, includeUncommitted);

for (const auto& leaf : leaves) {
std::optional<index_t> leaf_index =
Expand Down Expand Up @@ -787,7 +786,6 @@ void ContentAddressedAppendOnlyTree<Store, HashingPolicy>::find_leaf_indices_fro
RequestContext requestContext;
requestContext.blockNumber = blockNumber;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = blockData.root;
requestContext.maxIndex = blockData.size;

for (const auto& leaf : leaves) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::get_leaf(const index_t&
RequestContext requestContext;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = store_->get_current_root(*tx, includeUncommitted);
std::optional<fr> leaf_hash = find_leaf_hash(index, requestContext, *tx);
std::optional<fr> leaf_hash = find_leaf_hash(index, requestContext, *tx, false);
if (!leaf_hash.has_value()) {
response.success = false;
response.message = "Failed to find leaf hash for current root";
Expand Down Expand Up @@ -390,7 +390,7 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::get_leaf(const index_t&
requestContext.blockNumber = blockNumber;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = blockData.root;
std::optional<fr> leaf_hash = find_leaf_hash(index, requestContext, *tx);
std::optional<fr> leaf_hash = find_leaf_hash(index, requestContext, *tx, false);
if (!leaf_hash.has_value()) {
response.success = false;
response.message = format("Failed to find leaf hash for root of block ", blockNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class LMDBDatabaseCreationTransaction;
class LMDBDatabase {
public:
using Ptr = std::unique_ptr<LMDBDatabase>;
using SharedPtr = std::shared_ptr<LMDBDatabase>;

LMDBDatabase(LMDBEnvironment::SharedPtr env,
const LMDBDatabaseCreationTransaction& transaction,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#include <cstddef>
#include <cstdint>
#include <gtest/gtest.h>

#include <chrono>
#include <cstdlib>
#include <filesystem>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <vector>

#include "barretenberg/common/serialize.hpp"
#include "barretenberg/common/streams.hpp"
#include "barretenberg/common/test.hpp"
#include "barretenberg/crypto/merkle_tree/fixtures.hpp"
#include "barretenberg/crypto/merkle_tree/lmdb_store/lmdb_database.hpp"
#include "barretenberg/crypto/merkle_tree/lmdb_store/lmdb_db_transaction.hpp"
#include "barretenberg/crypto/merkle_tree/lmdb_store/lmdb_environment.hpp"
#include "barretenberg/crypto/merkle_tree/lmdb_store/queries.hpp"
#include "barretenberg/crypto/merkle_tree/signal.hpp"
#include "barretenberg/crypto/merkle_tree/types.hpp"
#include "barretenberg/numeric/random/engine.hpp"
#include "barretenberg/numeric/uint128/uint128.hpp"
#include "barretenberg/numeric/uint256/uint256.hpp"
#include "barretenberg/polynomials/serialize.hpp"
#include "barretenberg/stdlib/primitives/field/field.hpp"
#include "lmdb_tree_store.hpp"

using namespace bb::stdlib;
using namespace bb::crypto::merkle_tree;

class LMDBEnvironmentTest : public testing::Test {
protected:
void SetUp() override
{
_directory = random_temp_directory();
_mapSize = 1024 * 1024;
_maxReaders = 16;
std::filesystem::create_directories(_directory);
}

void TearDown() override { std::filesystem::remove_all(_directory); }

static std::string _directory;
static uint32_t _maxReaders;
static uint64_t _mapSize;
};

std::string LMDBEnvironmentTest::_directory;
uint32_t LMDBEnvironmentTest::_maxReaders;
uint64_t LMDBEnvironmentTest::_mapSize;

std::vector<uint8_t> serialise(std::string key)
{
std::vector<uint8_t> data(key.begin(), key.end());
return data;
}

TEST_F(LMDBEnvironmentTest, can_create_environment)
{
EXPECT_NO_THROW(LMDBEnvironment environment(
LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders));
}

TEST_F(LMDBEnvironmentTest, can_create_database)
{
LMDBEnvironment::SharedPtr environment = std::make_shared<LMDBEnvironment>(
LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders);

{
LMDBDatabaseCreationTransaction tx(environment);
LMDBDatabase::SharedPtr db = std::make_unique<LMDBDatabase>(environment, tx, "DB", false, false);
EXPECT_NO_THROW(tx.commit());
}
}

TEST_F(LMDBEnvironmentTest, can_write_to_database)
{
LMDBEnvironment::SharedPtr environment = std::make_shared<LMDBEnvironment>(
LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders);

LMDBDatabaseCreationTransaction tx(environment);
LMDBDatabase::SharedPtr db = std::make_unique<LMDBDatabase>(environment, tx, "DB", false, false);
EXPECT_NO_THROW(tx.commit());

{
LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared<LMDBTreeWriteTransaction>(environment);
auto key = serialise(std::string("Key"));
auto data = serialise(std::string("TestData"));
EXPECT_NO_THROW(tx->put_value(key, data, *db));
EXPECT_NO_THROW(tx->commit());
}
}

TEST_F(LMDBEnvironmentTest, can_read_from_database)
{
LMDBEnvironment::SharedPtr environment = std::make_shared<LMDBEnvironment>(
LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders);

LMDBDatabaseCreationTransaction tx(environment);
LMDBDatabase::SharedPtr db = std::make_unique<LMDBDatabase>(environment, tx, "DB", false, false);
EXPECT_NO_THROW(tx.commit());

{
LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared<LMDBTreeWriteTransaction>(environment);
auto key = serialise(std::string("Key"));
auto data = serialise(std::string("TestData"));
EXPECT_NO_THROW(tx->put_value(key, data, *db));
EXPECT_NO_THROW(tx->commit());
}

{
environment->wait_for_reader();
LMDBTreeReadTransaction::SharedPtr tx = std::make_shared<LMDBTreeReadTransaction>(environment);
auto key = serialise(std::string("Key"));
auto expected = serialise(std::string("TestData"));
std::vector<uint8_t> data;
tx->get_value(key, data, *db);
EXPECT_EQ(data, expected);
}
}

TEST_F(LMDBEnvironmentTest, can_write_and_read_multiple)
{
LMDBEnvironment::SharedPtr environment = std::make_shared<LMDBEnvironment>(
LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders);

LMDBDatabaseCreationTransaction tx(environment);
LMDBDatabase::SharedPtr db = std::make_unique<LMDBDatabase>(environment, tx, "DB", false, false);
EXPECT_NO_THROW(tx.commit());

uint64_t numValues = 10;

{
for (uint64_t count = 0; count < numValues; count++) {
LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared<LMDBTreeWriteTransaction>(environment);
auto key = serialise((std::stringstream() << "Key" << count).str());
auto data = serialise((std::stringstream() << "TestData" << count).str());
EXPECT_NO_THROW(tx->put_value(key, data, *db));
EXPECT_NO_THROW(tx->commit());
}
}

{
for (uint64_t count = 0; count < numValues; count++) {
environment->wait_for_reader();
LMDBTreeReadTransaction::SharedPtr tx = std::make_shared<LMDBTreeReadTransaction>(environment);
auto key = serialise((std::stringstream() << "Key" << count).str());
auto expected = serialise((std::stringstream() << "TestData" << count).str());
std::vector<uint8_t> data;
tx->get_value(key, data, *db);
EXPECT_EQ(data, expected);
}
}
}

TEST_F(LMDBEnvironmentTest, can_read_multiple_threads)
{
LMDBEnvironment::SharedPtr environment =
std::make_shared<LMDBEnvironment>(LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, 2);

LMDBDatabaseCreationTransaction tx(environment);
LMDBDatabase::SharedPtr db = std::make_unique<LMDBDatabase>(environment, tx, "DB", false, false);
EXPECT_NO_THROW(tx.commit());

uint64_t numValues = 10;
uint64_t numIterationsPerThread = 1000;
uint32_t numThreads = 16;

{
for (uint64_t count = 0; count < numValues; count++) {
LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared<LMDBTreeWriteTransaction>(environment);
auto key = serialise((std::stringstream() << "Key" << count).str());
auto data = serialise((std::stringstream() << "TestData" << count).str());
EXPECT_NO_THROW(tx->put_value(key, data, *db));
EXPECT_NO_THROW(tx->commit());
}
}

{
auto func = [&]() -> void {
for (uint64_t iteration = 0; iteration < numIterationsPerThread; iteration++) {
for (uint64_t count = 0; count < numValues; count++) {
environment->wait_for_reader();
LMDBTreeReadTransaction::SharedPtr tx = std::make_shared<LMDBTreeReadTransaction>(environment);
auto key = serialise((std::stringstream() << "Key" << count).str());
auto expected = serialise((std::stringstream() << "TestData" << count).str());
std::vector<uint8_t> data;
tx->get_value(key, data, *db);
EXPECT_EQ(data, expected);
}
}
};
std::vector<std::unique_ptr<std::thread>> threads;
for (uint64_t count = 0; count < numThreads; count++) {
threads.emplace_back(std::make_unique<std::thread>(func));
}
for (uint64_t count = 0; count < numThreads; count++) {
threads[count]->join();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cstring>
#include <exception>
#include <functional>
#include <memory>
#include <vector>

namespace bb::crypto::merkle_tree {
Expand All @@ -22,6 +23,7 @@ namespace bb::crypto::merkle_tree {
class LMDBTreeReadTransaction : public LMDBTransaction {
public:
using Ptr = std::unique_ptr<LMDBTreeReadTransaction>;
using SharedPtr = std::shared_ptr<LMDBTreeReadTransaction>;

LMDBTreeReadTransaction(LMDBEnvironment::SharedPtr env);
LMDBTreeReadTransaction(const LMDBTreeReadTransaction& other) = delete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,48 @@ TEST_F(LMDBTreeStoreTest, can_write_and_read_meta_data)
}
}

TEST_F(LMDBTreeStoreTest, can_read_data_from_multiple_threads)
{
TreeMeta metaData;
metaData.committedSize = 56;
metaData.initialSize = 12;
metaData.initialRoot = VALUES[1];
metaData.root = VALUES[2];
metaData.depth = 40;
metaData.oldestHistoricBlock = 87;
metaData.unfinalisedBlockHeight = 95;
metaData.name = "Note hash tree";
metaData.size = 60;
LMDBTreeStore store(_directory, "DB1", _mapSize, 2);
{
LMDBTreeWriteTransaction::Ptr transaction = store.create_write_transaction();
store.write_meta_data(metaData, *transaction);
transaction->commit();
}

uint64_t numIterationsPerThread = 1000;
uint32_t numThreads = 16;

{
auto func = [&]() -> void {
for (uint64_t iteration = 0; iteration < numIterationsPerThread; iteration++) {
LMDBTreeReadTransaction::Ptr transaction = store.create_read_transaction();
TreeMeta readBack;
bool success = store.read_meta_data(readBack, *transaction);
EXPECT_TRUE(success);
EXPECT_EQ(readBack, metaData);
}
};
std::vector<std::unique_ptr<std::thread>> threads;
for (uint64_t count = 0; count < numThreads; count++) {
threads.emplace_back(std::make_unique<std::thread>(func));
}
for (uint64_t count = 0; count < numThreads; count++) {
threads[count]->join();
}
}
}

TEST_F(LMDBTreeStoreTest, can_write_and_read_multiple_blocks_with_meta)
{
LMDBTreeStore store(_directory, "DB1", _mapSize, _maxReaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace bb::crypto::merkle_tree {
class LMDBTreeWriteTransaction : public LMDBTransaction {
public:
using Ptr = std::unique_ptr<LMDBTreeWriteTransaction>;
using SharedPtr = std::shared_ptr<LMDBTreeWriteTransaction>;

LMDBTreeWriteTransaction(LMDBEnvironment::SharedPtr env);
LMDBTreeWriteTransaction(const LMDBTreeWriteTransaction& other) = delete;
Expand Down
Loading
Loading