From 9dec3cceecacc93c417051ba2fe404d668da709a Mon Sep 17 00:00:00 2001 From: battlmonstr Date: Tue, 4 Feb 2025 17:35:21 +0100 Subject: [PATCH] datastore: kvdb::HistoryRangeQuery --- silkworm/db/datastore/common/caching_view.hpp | 122 ++++++++++++++ .../db/datastore/kvdb/big_endian_codec.hpp | 3 + .../db/datastore/kvdb/cursor_iterator.cpp | 1 + .../db/datastore/kvdb/cursor_iterator.hpp | 39 +++++ .../db/datastore/kvdb/history_queries.hpp | 1 + .../db/datastore/kvdb/history_range_query.hpp | 159 ++++++++++++++++++ .../kvdb/history_range_query_test.cpp | 121 +++++++++++++ silkworm/db/datastore/kvdb/mdbx.cpp | 6 + silkworm/db/datastore/kvdb/mdbx.hpp | 5 + .../datastore/kvdb/memory_mutation_cursor.cpp | 5 + .../datastore/kvdb/memory_mutation_cursor.hpp | 2 + silkworm/db/test_util/mock_ro_cursor.hpp | 1 + 12 files changed, 465 insertions(+) create mode 100644 silkworm/db/datastore/common/caching_view.hpp create mode 100644 silkworm/db/datastore/kvdb/history_range_query.hpp create mode 100644 silkworm/db/datastore/kvdb/history_range_query_test.cpp diff --git a/silkworm/db/datastore/common/caching_view.hpp b/silkworm/db/datastore/common/caching_view.hpp new file mode 100644 index 0000000000..3c207ce542 --- /dev/null +++ b/silkworm/db/datastore/common/caching_view.hpp @@ -0,0 +1,122 @@ +/* + Copyright 2024 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include +#include +#include +#include + +namespace silkworm::views { + +/** + * Like views::cache1 in Range-v3 + * https://ericniebler.github.io/range-v3/structranges_1_1views_1_1cache1__fn.html + * https://stackoverflow.com/questions/67321666/generator-called-twice-in-c20-views-pipeline + */ +template + requires std::movable +class CachingView : public std::ranges::view_interface> { + public: + class Iterator { + public: + using RangeIterator = std::ranges::iterator_t; + using RangeSentinel = std::ranges::sentinel_t; + + using value_type = typename RangeIterator::value_type; + using iterator_category [[maybe_unused]] = std::input_iterator_tag; + using difference_type = typename RangeIterator::difference_type; + using reference = value_type&; + using pointer = void; + + Iterator( + RangeIterator it, + RangeSentinel sentinel) + : it_{std::move(it)}, + sentinel_{std::move(sentinel)} {} + + Iterator() + requires(std::default_initializable && std::default_initializable) + = default; + + reference operator*() const { + if (!cached_value_) { + cached_value_ = *it_; + } + return *cached_value_; + } + + Iterator operator++(int) { return std::exchange(*this, ++Iterator{*this}); } + Iterator& operator++() { + ++it_; + cached_value_ = std::nullopt; + return *this; + } + + friend bool operator==(const Iterator& it, const std::default_sentinel_t&) { + return it.it_ == it.sentinel_; + } + friend bool operator!=(const Iterator& it, const std::default_sentinel_t&) { + return it.it_ != it.sentinel_; + } + friend bool operator==(const std::default_sentinel_t&, const Iterator& it) { + return it.sentinel_ == it.it_; + } + friend bool operator!=(const std::default_sentinel_t&, const Iterator& it) { + return it.sentinel_ != it.it_; + } + + private: + RangeIterator it_; + RangeSentinel sentinel_; + mutable std::optional cached_value_; + }; + + static_assert(std::input_iterator); + + explicit CachingView(TRange&& range) + : range_{std::move(range)} {} + + CachingView() + requires std::default_initializable + = default; + + CachingView(CachingView&&) = default; + CachingView& operator=(CachingView&&) noexcept = default; + + Iterator begin() { return Iterator{std::ranges::begin(range_), std::ranges::end(range_)}; } + std::default_sentinel_t end() const { return std::default_sentinel; } + + private: + TRange range_; +}; + +struct CachingViewFactory { + template + constexpr CachingView operator()(TRange&& range) const { + return CachingView{std::forward(range)}; + } + + template + friend constexpr CachingView operator|(TRange&& range, const CachingViewFactory& caching) { + return caching(std::forward(range)); + } +}; + +inline constexpr CachingViewFactory caching; + +} // namespace silkworm::views diff --git a/silkworm/db/datastore/kvdb/big_endian_codec.hpp b/silkworm/db/datastore/kvdb/big_endian_codec.hpp index d028973a70..4f6f24334d 100644 --- a/silkworm/db/datastore/kvdb/big_endian_codec.hpp +++ b/silkworm/db/datastore/kvdb/big_endian_codec.hpp @@ -24,7 +24,10 @@ struct BigEndianU64Codec : public Codec { uint64_t value{0}; Bytes data; + BigEndianU64Codec() = default; + explicit BigEndianU64Codec(uint64_t value1) : value{value1} {} ~BigEndianU64Codec() override = default; + Slice encode() override; void decode(Slice slice) override; }; diff --git a/silkworm/db/datastore/kvdb/cursor_iterator.cpp b/silkworm/db/datastore/kvdb/cursor_iterator.cpp index b7bd9a4f32..2311939a0b 100644 --- a/silkworm/db/datastore/kvdb/cursor_iterator.cpp +++ b/silkworm/db/datastore/kvdb/cursor_iterator.cpp @@ -39,6 +39,7 @@ bool operator==(const CursorIterator& lhs, const CursorIterator& rhs) { ((!lhs.decoders_.first && !lhs.decoders_.second) || (lhs.cursor_ == rhs.cursor_)); } +static_assert(std::input_iterator); static_assert(std::input_iterator); static_assert(std::input_iterator, RawDecoder>>); static_assert(std::input_iterator>>); diff --git a/silkworm/db/datastore/kvdb/cursor_iterator.hpp b/silkworm/db/datastore/kvdb/cursor_iterator.hpp index fc9ce506cf..e2a6e25bb3 100644 --- a/silkworm/db/datastore/kvdb/cursor_iterator.hpp +++ b/silkworm/db/datastore/kvdb/cursor_iterator.hpp @@ -25,6 +25,45 @@ namespace silkworm::datastore::kvdb { +class CursorMoveIterator { + public: + using value_type = std::shared_ptr; + using iterator_category [[maybe_unused]] = std::input_iterator_tag; + using difference_type = std::ptrdiff_t; + using pointer = value_type*; + using reference = value_type&; + + CursorMoveIterator() = default; + + CursorMoveIterator( + std::shared_ptr cursor, + MoveOperation move_op) + : cursor_{std::move(cursor)}, + move_op_{move_op} {} + + const value_type& operator*() const { return cursor_; } + const value_type* operator->() const { return &cursor_; } + + CursorMoveIterator operator++(int) { return std::exchange(*this, ++CursorMoveIterator{*this}); } + CursorMoveIterator& operator++() { + if (((move_op_ == MoveOperation::get_current) && cursor_->eof()) || !cursor_->move(move_op_, false)) { + cursor_.reset(); + } + return *this; + } + + friend bool operator!=(const CursorMoveIterator& it, const std::default_sentinel_t&) { + return !!it.cursor_; + } + friend bool operator==(const CursorMoveIterator& it, const std::default_sentinel_t&) { + return !it.cursor_; + } + + private: + std::shared_ptr cursor_; + MoveOperation move_op_{MoveOperation::next}; +}; + class CursorIterator { public: using value_type = std::pair, std::shared_ptr>; diff --git a/silkworm/db/datastore/kvdb/history_queries.hpp b/silkworm/db/datastore/kvdb/history_queries.hpp index 34b6f83af1..f17f2137b2 100644 --- a/silkworm/db/datastore/kvdb/history_queries.hpp +++ b/silkworm/db/datastore/kvdb/history_queries.hpp @@ -19,3 +19,4 @@ #include "history_delete_query.hpp" #include "history_get_query.hpp" #include "history_put_query.hpp" +#include "history_range_query.hpp" diff --git a/silkworm/db/datastore/kvdb/history_range_query.hpp b/silkworm/db/datastore/kvdb/history_range_query.hpp new file mode 100644 index 0000000000..a59d887968 --- /dev/null +++ b/silkworm/db/datastore/kvdb/history_range_query.hpp @@ -0,0 +1,159 @@ +/* + Copyright 2025 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include +#include +#include + +#include + +#include "../common/caching_view.hpp" +#include "../common/timestamp.hpp" +#include "cursor_iterator.hpp" +#include "history.hpp" +#include "history_codecs.hpp" +#include "mdbx.hpp" +#include "raw_codec.hpp" + +namespace silkworm::datastore::kvdb { + +template +struct HistoryRangeQuery { + ROTxn& tx; + History entity; + + using Key = decltype(TKeyDecoder::value); + using Value = decltype(TValueDecoder::value); + + template + static constexpr bool as_bool_predicate(const T& v) { + return !!v; + } + + static std::pair kv_pair_from_cursor(std::shared_ptr cursor, bool has_large_values) { + SILKWORM_ASSERT(cursor); + CursorIterator any_it{ + std::move(cursor), + MoveOperation::next, + std::make_shared>(has_large_values), + std::make_shared>(has_large_values), + }; + CursorKVIterator, HistoryValueDecoder> it{std::move(any_it)}; + + auto kv_pair = *it; + Key& key = kv_pair.first.key.value; + Value& value = kv_pair.second.value.value; + return std::pair{std::move(key), std::move(value)}; + } + + static auto kv_pair_from_cursor_func(bool has_large_values) { + return [=](std::shared_ptr cursor) -> std::pair { + return kv_pair_from_cursor(std::move(cursor), has_large_values); + }; + } + + auto exec_with_eager_begin(TimestampRange ts_range, bool ascending) { + SILKWORM_ASSERT(ascending); // descending is not implemented + + CursorMoveIterator begin_it; + std::function(std::shared_ptr)> seek_func; + + if (entity.has_large_values) { + auto begin_cursor = tx.ro_cursor(entity.values_table); + if (begin_cursor->to_first(false)) { + begin_it = CursorMoveIterator{std::move(begin_cursor), MoveOperation::get_current}; + + seek_func = [ts_range, has_large_values = entity.has_large_values, skip_current_key = std::make_shared()](std::shared_ptr cursor) -> std::shared_ptr { + auto result = cursor->current(); + SILKWORM_ASSERT(result); + + HistoryKeyDecoder> key_decoder{has_large_values}; + key_decoder.decode(result.key); + + if (*skip_current_key) { + Bytes current_key{key_decoder.value.key.value}; + do { + result = cursor->to_next(false); + if (!result) return {}; + key_decoder.decode(result.key); + } while (key_decoder.value.key.value == current_key); + } + + HistoryKeyEncoder> seek_key_encoder{has_large_values}; + seek_key_encoder.value.key.value = key_decoder.value.key.value; + seek_key_encoder.value.timestamp.value = ts_range.start; + Slice seek_key = seek_key_encoder.encode(); + + result = cursor->lower_bound(seek_key, false); + if (result) { + key_decoder.decode(result.key); + // if we jumped over to the next key, ts_range.start might be invalid + if (key_decoder.value.timestamp.value < ts_range.start) { + *skip_current_key = false; + return {}; + } else if (key_decoder.value.timestamp.value < ts_range.end) { + *skip_current_key = true; + return cursor; + } + *skip_current_key = true; + return {}; + } + return {}; + }; + } + } else { + auto begin_cursor = tx.ro_cursor_dup_sort(entity.values_table); + if (begin_cursor->to_first(false)) { + begin_it = CursorMoveIterator{std::move(begin_cursor), MoveOperation::multi_nextkey_firstvalue}; + + seek_func = [ts_range, has_large_values = entity.has_large_values](std::shared_ptr base_cursor) -> std::shared_ptr { + auto cursor = base_cursor->clone(); + auto result = cursor->current(); + SILKWORM_ASSERT(result); + + TimestampEncoder ts_range_start_encoder{ts_range.start}; + result = dynamic_cast(*cursor).lower_bound_multivalue(result.key, ts_range_start_encoder.encode(), false); + if (result) { + HistoryValueDecoder> value_decoder{has_large_values}; + value_decoder.decode(result.value); + if (value_decoder.value.timestamp.value < ts_range.end) { + return std::shared_ptr{std::move(cursor)}; + } + } + return {}; + }; + } + } + + return std::ranges::subrange{std::move(begin_it), std::default_sentinel} | + std::views::transform(std::move(seek_func)) | + silkworm::views::caching | + std::views::filter(as_bool_predicate>) | + std::views::transform(kv_pair_from_cursor_func(entity.has_large_values)); + } + + auto exec(TimestampRange ts_range, bool ascending) { + auto exec_func = [query = *this, ts_range, ascending](std::monostate) mutable { + return query.exec_with_eager_begin(ts_range, ascending); + }; + // turn into a lazy view that runs exec_func only when iteration is started using range::begin() + return std::views::single(std::monostate{}) | std::views::transform(std::move(exec_func)) | std::views::join; + } +}; + +} // namespace silkworm::datastore::kvdb diff --git a/silkworm/db/datastore/kvdb/history_range_query_test.cpp b/silkworm/db/datastore/kvdb/history_range_query_test.cpp new file mode 100644 index 0000000000..c9dc8d5da6 --- /dev/null +++ b/silkworm/db/datastore/kvdb/history_range_query_test.cpp @@ -0,0 +1,121 @@ +/* + Copyright 2025 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "history_range_query.hpp" + +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include "big_endian_codec.hpp" +#include "database.hpp" +#include "history_put_query.hpp" + +namespace silkworm::datastore::kvdb { + +template >> +std::vector vector_from_range(Range range) { + std::vector results; + std::ranges::copy(range, std::back_inserter(results)); + return results; +} + +struct HistoryPutEntry { + uint64_t key{0}; + uint64_t value{0}; + Timestamp timestamp{0}; +}; +using Entry = HistoryPutEntry; + +using Result = std::vector>; + +// by default has_large_values = false, is_multi_value = true +using DomainDefault = std::identity; + +struct DomainWithLargeValues { + Schema::DomainDef& operator()(Schema::DomainDef& domain) const { + domain.enable_large_values().values_disable_multi_value(); + return domain; + } +}; + +TEMPLATE_TEST_CASE("HistoryRangeQuery", "", DomainDefault, DomainWithLargeValues) { + const TemporaryDirectory tmp_dir; + ::mdbx::env_managed env = open_env(EnvConfig{.path = tmp_dir.path().string(), .create = true, .in_memory = true}); + + EntityName name{"Test"}; + Schema::DatabaseDef schema; + TestType domain_config; + [[maybe_unused]] auto _ = domain_config(schema.domain(name)); + + Database db{std::move(env), schema}; + db.create_tables(); + Domain domain = db.domain(name); + History& entity = *domain.history; + RWAccess db_access = db.access_rw(); + + auto find_in = [&db_access, &entity](const std::vector& data, TimestampRange ts_range) -> Result { + { + RWTxnManaged tx = db_access.start_rw_tx(); + HistoryPutQuery query{tx, entity}; + for (auto& entry : data) { + query.exec(entry.key, entry.value, entry.timestamp); + } + tx.commit_and_stop(); + } + + ROTxnManaged tx = db_access.start_ro_tx(); + HistoryRangeQuery query{tx, entity}; + return vector_from_range(query.exec(ts_range, true)); + }; + + SECTION("find all") { + CHECK(find_in({Entry{11, 111, 1}, Entry{22, 222, 2}, Entry{33, 333, 3}}, {0, 10}) == Result{{11, 111}, {22, 222}, {33, 333}}); + } + SECTION("find all in unsorted") { + CHECK(find_in({Entry{33, 333, 3}, Entry{22, 222, 2}, Entry{11, 111, 1}}, {0, 10}) == Result{{11, 111}, {22, 222}, {33, 333}}); + } + SECTION("find from timestamp") { + CHECK(find_in({Entry{11, 111, 1}, Entry{22, 222, 2}, Entry{33, 333, 3}}, {2, 10}) == Result{{22, 222}, {33, 333}}); + } + SECTION("find before timestamp") { + CHECK(find_in({Entry{11, 111, 1}, Entry{22, 222, 2}, Entry{33, 333, 3}}, {0, 3}) == Result{{11, 111}, {22, 222}}); + } + SECTION("find none given non-overlapping ts range") { + CHECK(find_in({Entry{11, 111, 1}, Entry{22, 222, 2}, Entry{33, 333, 3}}, {10, 20}).empty()); + } + SECTION("find none in empty") { + CHECK(find_in({}, {0, 10}).empty()); + } + SECTION("find all - with duplicates") { + CHECK(find_in({Entry{11, 111, 1}, Entry{11, 112, 2}, Entry{22, 222, 3}, Entry{22, 223, 4}}, {0, 10}) == Result{{11, 111}, {22, 222}}); + } + SECTION("find all in unsorted - with duplicates") { + CHECK(find_in({Entry{22, 223, 4}, Entry{22, 222, 3}, Entry{11, 112, 2}, Entry{11, 111, 1}}, {0, 10}) == Result{{11, 111}, {22, 222}}); + } + SECTION("find from timestamp - with duplicates") { + CHECK(find_in({Entry{11, 111, 1}, Entry{11, 112, 2}, Entry{22, 222, 3}, Entry{22, 223, 4}}, {2, 10}) == Result{{11, 112}, {22, 222}}); + } +} + +} // namespace silkworm::datastore::kvdb diff --git a/silkworm/db/datastore/kvdb/mdbx.cpp b/silkworm/db/datastore/kvdb/mdbx.cpp index 4b714f24ee..b458f24a53 100644 --- a/silkworm/db/datastore/kvdb/mdbx.cpp +++ b/silkworm/db/datastore/kvdb/mdbx.cpp @@ -340,6 +340,12 @@ void PooledCursor::bind(::mdbx::txn& txn, const MapConfig& config) { ::mdbx::cursor::bind(txn, map); } +std::unique_ptr PooledCursor::clone() { + auto clone = std::make_unique(); + mdbx::error::success_or_throw(::mdbx_cursor_copy(handle_, clone->handle_)); + return clone; +} + void PooledCursor::close() { ::mdbx_cursor_close(handle_); handle_ = nullptr; diff --git a/silkworm/db/datastore/kvdb/mdbx.hpp b/silkworm/db/datastore/kvdb/mdbx.hpp index 697feffb50..e096e38b58 100644 --- a/silkworm/db/datastore/kvdb/mdbx.hpp +++ b/silkworm/db/datastore/kvdb/mdbx.hpp @@ -86,6 +86,9 @@ class ROCursor { //! \brief Reuse current cursor binding it to provided transaction and map configuration virtual void bind(ROTxn& txn, const MapConfig& config) = 0; + //! \brief Clone cursor position and state + virtual std::unique_ptr clone() = 0; + //! \brief Returns the size of the underlying table virtual size_t size() const = 0; @@ -442,6 +445,8 @@ class PooledCursor : public RWCursorDupSort, protected ::mdbx::cursor { void bind(ROTxn& txn, const MapConfig& config) override { bind(*txn, config); } + std::unique_ptr clone() override; + //! \brief Closes cursor causing de-allocation of MDBX_cursor handle //! \remarks After this call the cursor is not reusable and the handle does not return to the cache void close(); diff --git a/silkworm/db/datastore/kvdb/memory_mutation_cursor.cpp b/silkworm/db/datastore/kvdb/memory_mutation_cursor.cpp index d32c9ba620..9ae11c6387 100644 --- a/silkworm/db/datastore/kvdb/memory_mutation_cursor.cpp +++ b/silkworm/db/datastore/kvdb/memory_mutation_cursor.cpp @@ -47,6 +47,11 @@ void MemoryMutationCursor::bind(ROTxn& txn, const MapConfig& config) { memory_cursor_->bind(txn, config); } +std::unique_ptr MemoryMutationCursor::clone() { + SILKWORM_ASSERT(false); // not implemented + return {}; +} + ::mdbx::map_handle MemoryMutationCursor::map() const { return memory_cursor_->map(); } diff --git a/silkworm/db/datastore/kvdb/memory_mutation_cursor.hpp b/silkworm/db/datastore/kvdb/memory_mutation_cursor.hpp index 8463c35b61..d76979bf40 100644 --- a/silkworm/db/datastore/kvdb/memory_mutation_cursor.hpp +++ b/silkworm/db/datastore/kvdb/memory_mutation_cursor.hpp @@ -33,6 +33,8 @@ class MemoryMutationCursor : public RWCursorDupSort { void bind(ROTxn& txn, const MapConfig& config) override; + std::unique_ptr clone() override; + ::mdbx::map_handle map() const override; size_t size() const override; diff --git a/silkworm/db/test_util/mock_ro_cursor.hpp b/silkworm/db/test_util/mock_ro_cursor.hpp index 152fd89669..9ca1bea26b 100644 --- a/silkworm/db/test_util/mock_ro_cursor.hpp +++ b/silkworm/db/test_util/mock_ro_cursor.hpp @@ -28,6 +28,7 @@ class MockROCursor : public datastore::kvdb::ROCursor { using Slice = datastore::kvdb::Slice; MOCK_METHOD((void), bind, (datastore::kvdb::ROTxn&, const datastore::kvdb::MapConfig&), (override)); + MOCK_METHOD((std::unique_ptr), clone, (), (override)); MOCK_METHOD((size_t), size, (), (const, override)); MOCK_METHOD((bool), empty, (), (const)); MOCK_METHOD((bool), is_multi_value, (), (const, override));