From 91ade0432dbae0e8d8daf284bfb101b4c4255c9d Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Thu, 19 Dec 2024 15:51:28 +0800 Subject: [PATCH] Only call cp_flush for those consumer paticipated in this cp. If a consumer registered after a cp goes to flushing state, the on_switchover_cp cb will not be called for this consumer. In this CP, the ctx for this consumer is nullptr as the consumer never participant in the cp. Previous code calling cp_flush for every consumer, leaving the duty of properly handle the nullptr returned by cp->context(svc_id) to consumer. However, none of the existing consumer handled the case. As a result, we hit an occurance that Index generate a CP sololy, but before the cp fully flushed, other consumer registered and be called into cp_flush(), the replication service, doesnt properly handled the nullptr like below, `get_repl_dev_ctx` was called with this_ptr is null, it is dangerous as invalid memory get accessed. This change is a breaking change for consumer like HO so bump up the version. HomeObject participant the CP as CLIENT, current implementation of HO always returns nullptr for `on_switchover_cp` which will result the CLIENT be excluded from cp_flush after this commit merged. callstack: ``` homestore::ReplSvcCPContext::get_repl_dev_ctx (this=0x0, dev=0x56010ab52b00) at /home/ubuntu/HomeStore/src/lib/replication/service/raft_repl_service.cpp:521 0x0000560106d58f1e in homestore::RaftReplServiceCPHandler::cp_flush (this=, cp=0x56010a467940) at /home/ubuntu/HomeStore/src/lib/replication/service/raft_repl_service.cpp:549 ``` code: ``` auto cp_ctx = s_cast< ReplSvcCPContext* >(cp->context(cp_consumer_t::REPLICATION_SVC)); ... auto dev_ctx = cp_ctx->get_repl_dev_ctx(repl_dev.get()); ``` Signed-off-by: Xiaoxi Chen --- conanfile.py | 2 +- .../homestore/index/index_internal.hpp | 9 +- src/include/homestore/index/index_table.hpp | 49 ++++- src/include/homestore/index_service.hpp | 1 + src/lib/checkpoint/cp_mgr.cpp | 9 +- src/lib/index/index_cp.cpp | 23 ++- src/lib/index/index_service.cpp | 68 ++++--- src/lib/index/wb_cache.cpp | 182 ++++++++++++++---- src/lib/index/wb_cache.hpp | 2 + .../replication/service/generic_repl_svc.cpp | 4 +- src/tests/test_index_crash_recovery.cpp | 93 ++++----- src/tests/test_scripts/index_test.py | 10 +- 12 files changed, 309 insertions(+), 143 deletions(-) diff --git a/conanfile.py b/conanfile.py index f4d5fc38b..087d8ca98 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.6.0" + version = "6.6.1" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/index/index_internal.hpp b/src/include/homestore/index/index_internal.hpp index fea20dbd6..989e650c4 100644 --- a/src/include/homestore/index/index_internal.hpp +++ b/src/include/homestore/index/index_internal.hpp @@ -73,6 +73,7 @@ class IndexTableBase { virtual uint64_t used_size() const = 0; virtual void destroy() = 0; virtual void repair_node(IndexBufferPtr const& buf) = 0; + virtual void repair_root_node(IndexBufferPtr const& buf) = 0; }; enum class index_buf_state_t : uint8_t { @@ -97,7 +98,7 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > { sisl::atomic_counter< int > m_wait_for_down_buffers{0}; // Number of children need to wait for before persisting #ifndef NDEBUG // Down buffers are not mandatory members, but only to keep track of any bugs and asserts - std::vector > m_down_buffers; + std::vector< std::weak_ptr< IndexBuffer > > m_down_buffers; std::mutex m_down_buffers_mtx; std::shared_ptr< IndexBuffer > m_prev_up_buffer; // Keep a copy for debugging #endif @@ -125,11 +126,11 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > { std::string to_string() const; std::string to_string_dot() const; - void add_down_buffer(const IndexBufferPtr &buf); + void add_down_buffer(const IndexBufferPtr& buf); - void remove_down_buffer(const IndexBufferPtr &buf); + void remove_down_buffer(const IndexBufferPtr& buf); #ifndef NDEBUG - bool is_in_down_buffers(const IndexBufferPtr &buf); + bool is_in_down_buffers(const IndexBufferPtr& buf); #endif }; diff --git a/src/include/homestore/index/index_table.hpp b/src/include/homestore/index/index_table.hpp index 94b8685a3..83411b5c0 100644 --- a/src/include/homestore/index/index_table.hpp +++ b/src/include/homestore/index/index_table.hpp @@ -79,7 +79,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { void destroy() override { auto cpg = cp_mgr().cp_guard(); - Btree::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC)); + Btree< K, V >::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC)); m_sb.destroy(); } @@ -114,11 +114,40 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { return ret; } + void repair_root_node(IndexBufferPtr const& idx_buf) override { + LOGTRACEMOD(wbcache, "check if this was the previous root node {} for buf {} ", m_sb->root_node, + idx_buf->to_string()); + if (m_sb->root_node == idx_buf->blkid().to_integer()) { + // This is the root node, we need to update the root node in superblk + LOGTRACEMOD(wbcache, "{} is old root so we need to update the meta node ", idx_buf->to_string()); + BtreeNode* n = this->init_node(idx_buf->raw_buffer(), idx_buf->blkid().to_integer(), false /* init_buf */, + BtreeNode::identify_leaf_node(idx_buf->raw_buffer())); + static_cast< IndexBtreeNode* >(n)->attach_buf(idx_buf); + auto edge_id = n->next_bnode(); + + BT_DBG_ASSERT(!n->has_valid_edge(), + "root {} already has a valid edge {}, so we should have found the new root node", + n->to_string(), n->get_edge_value().bnode_id()); + n->set_next_bnode(empty_bnodeid); + n->set_edge_value(BtreeLinkInfo{edge_id, 0}); + LOGTRACEMOD(wbcache, "change root node {}: edge updated to {} and invalidate the next node! ", n->node_id(), + edge_id); + auto cpg = cp_mgr().cp_guard(); + write_node_impl(n, (void*)cpg.context(cp_consumer_t::INDEX_SVC)); + + } else { + LOGTRACEMOD(wbcache, "This is not the root node, so we can ignore this repair call for buf {}", + idx_buf->to_string()); + } + } + void repair_node(IndexBufferPtr const& idx_buf) override { if (idx_buf->is_meta_buf()) { // We cannot repair the meta buf on its own, we need to repair the root node which modifies the // meta_buf. It is ok to ignore this call, because repair will be done from root before meta_buf is // attempted to repair, which would have updated the meta_buf already. + LOGTRACEMOD(wbcache, "Ignoring repair on meta buf {} root id {} ", idx_buf->to_string(), + this->root_node_id()); return; } BtreeNode* n = this->init_node(idx_buf->raw_buffer(), idx_buf->blkid().to_integer(), false /* init_buf */, @@ -134,13 +163,14 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { // Only for interior nodes we need to repair its links if (!bn->is_leaf()) { LOGTRACEMOD(wbcache, "repair_node cp={} buf={}", cpg->id(), idx_buf->to_string()); - repair_links(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC)); + repair_links(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC)); } if (idx_buf->m_up_buffer && idx_buf->m_up_buffer->is_meta_buf()) { // Our up buffer is a meta buffer, which means that we are the new root node, we need to update the // meta_buf with new root as well - on_root_changed(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC)); + LOGTRACEMOD(wbcache, "root change for after repairing {}\n\n", idx_buf->to_string()); + on_root_changed(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC)); } } @@ -227,10 +257,11 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { wb_cache().free_buf(n->m_idx_buf, r_cast< CPContext* >(context)); } - btree_status_t - on_root_changed(BtreeNodePtr const &new_root, void *context) override { + btree_status_t on_root_changed(BtreeNodePtr const& new_root, void* context) override { // todo: if(m_sb->root_node == new_root->node_id() && m_sb->root_link_version == new_root->link_version()){ // return btree_status_t::success;} + LOGTRACEMOD(wbcache, "root changed for index old_root={} new_root={}", m_sb->root_node, + new_root->node_id()); m_sb->root_node = new_root->node_id(); m_sb->root_link_version = new_root->link_version(); @@ -240,7 +271,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { } auto& root_buf = static_cast< IndexBtreeNode* >(new_root.get())->m_idx_buf; - wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast(context)); + wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast< CPContext* >(context)); return btree_status_t::success; } @@ -257,7 +288,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { } // Get all original child ids as a support to check if we are beyond the last child node - std::set orig_child_ids; + std::set< bnodeid_t > orig_child_ids; for (uint32_t i = 0; i < parent_node->total_entries(); ++i) { BtreeLinkInfo link_info; parent_node->get_nth_value(i, &link_info, true); @@ -391,9 +422,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { } } while (true); - if (child_node) { - this->unlock_node(child_node, locktype_t::READ); - } + if (child_node) { this->unlock_node(child_node, locktype_t::READ); } if (parent_node->total_entries() == 0 && !parent_node->has_valid_edge()) { // We shouldn't have an empty interior node in the tree, let's delete it. diff --git a/src/include/homestore/index_service.hpp b/src/include/homestore/index_service.hpp index c8801c9d2..87ad63672 100644 --- a/src/include/homestore/index_service.hpp +++ b/src/include/homestore/index_service.hpp @@ -82,6 +82,7 @@ class IndexService { uint64_t used_size() const; uint32_t node_size() const; void repair_index_node(uint32_t ordinal, IndexBufferPtr const& node_buf); + void update_root(uint32_t ordinal, IndexBufferPtr const& node_buf); IndexWBCacheBase& wb_cache() { if (!m_wb_cache) { throw std::runtime_error("Attempted to access a null pointer wb_cache"); } diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index 7072d7c91..7fd6f7460 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -187,7 +187,8 @@ folly::Future< bool > CPManager::do_trigger_cp_flush(bool force, bool flush_on_s // sealer should be the first one to switch over auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER]; if (sealer_cp) { - new_cp->m_contexts[(size_t)cp_consumer_t::SEALER] = std::move(sealer_cp->on_switchover_cp(cur_cp.get(), new_cp)); + new_cp->m_contexts[(size_t)cp_consumer_t::SEALER] = + std::move(sealer_cp->on_switchover_cp(cur_cp.get(), new_cp)); } // switch over other consumers for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) { @@ -227,7 +228,8 @@ void CPManager::cp_start_flush(CP* cp) { for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) { if (svcid == (size_t)cp_consumer_t::SEALER) { continue; } auto& consumer = m_cp_cb_table[svcid]; - if (consumer) { futs.emplace_back(std::move(consumer->cp_flush(cp))); } + bool participated = (cp->m_contexts[svcid] != nullptr); + if (consumer && participated) { futs.emplace_back(std::move(consumer->cp_flush(cp))); } } folly::collectAllUnsafe(futs).thenValue([this, cp](auto) { @@ -235,7 +237,8 @@ void CPManager::cp_start_flush(CP* cp) { // at last as the cp_lsn updated here. Other component should // at least flushed to cp_lsn. auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER]; - if (sealer_cp) { sealer_cp->cp_flush(cp).wait(); } + bool participated = (cp->m_contexts[(size_t)cp_consumer_t::SEALER] != nullptr); + if (sealer_cp && participated) { sealer_cp->cp_flush(cp).wait(); } // All consumers have flushed for the cp on_cp_flush_done(cp); }); diff --git a/src/lib/index/index_cp.cpp b/src/lib/index/index_cp.cpp index 578fae997..122667726 100644 --- a/src/lib/index/index_cp.cpp +++ b/src/lib/index/index_cp.cpp @@ -145,7 +145,7 @@ void IndexCPContext::to_string_dot(const std::string& filename) { LOGINFO("cp dag is stored in file {}", filename); } -uint16_t IndexCPContext::num_dags() { +uint16_t IndexCPContext::num_dags() { // count number of buffers whose up_buffers are nullptr uint16_t count = 0; std::unique_lock lg{m_flush_buffer_mtx}; @@ -190,15 +190,18 @@ std::string IndexCPContext::to_string_with_dags() { // Now walk through the list of graphs and prepare formatted string std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} #_of_dags={}\n", m_cp->id(), m_dirty_buf_count.get(), m_dirty_buf_list.size(), group_roots.size())}; + int cnt = 1; for (const auto& root : group_roots) { - std::vector< std::pair< std::shared_ptr< DagNode >, int > > stack; - stack.emplace_back(root, 0); + std::vector< std::tuple< std::shared_ptr< DagNode >, int, int > > stack; + stack.emplace_back(root, 0, cnt++); while (!stack.empty()) { - auto [node, level] = stack.back(); + auto [node, level, index] = stack.back(); stack.pop_back(); - fmt::format_to(std::back_inserter(str), "{}{} \n", std::string(level * 4, ' '), node->buf->to_string()); + fmt::format_to(std::back_inserter(str), "{}{}-{} \n", std::string(level * 4, ' '), index, + node->buf->to_string()); + int c = node->down_nodes.size(); for (const auto& d : node->down_nodes) { - stack.emplace_back(d, level + 1); + stack.emplace_back(d, level + 1, c--); } } } @@ -266,15 +269,11 @@ void IndexCPContext::process_txn_record(txn_record const* rec, std::map< BlkId, #ifndef NDEBUG // if (!is_sibling_link || (buf->m_up_buffer == real_up_buf)) { return buf;} // Already linked with same buf or its not a sibling link to override - if (real_up_buf->is_in_down_buffers(buf)) { - return buf; - } + if (real_up_buf->is_in_down_buffers(buf)) { return buf; } #endif if (buf->m_up_buffer != real_up_buf) { - if (buf->m_up_buffer) { - buf->m_up_buffer->remove_down_buffer(buf); - } + if (buf->m_up_buffer) { buf->m_up_buffer->remove_down_buffer(buf); } real_up_buf->add_down_buffer(buf); buf->m_up_buffer = real_up_buf; } diff --git a/src/lib/index/index_service.cpp b/src/lib/index/index_service.cpp index 49755a4ef..73b96b064 100644 --- a/src/lib/index/index_service.cpp +++ b/src/lib/index/index_service.cpp @@ -132,6 +132,15 @@ void IndexService::repair_index_node(uint32_t ordinal, IndexBufferPtr const& nod } } +void IndexService::update_root(uint32_t ordinal, IndexBufferPtr const& node_buf) { + auto tbl = get_index_table(ordinal); + if (tbl) { + tbl->repair_root_node(node_buf); + } else { + HS_DBG_ASSERT(false, "Index corresponding to ordinal={} has not been loaded yet, unexpected", ordinal); + } +} + uint32_t IndexService::node_size() const { return m_vdev->atomic_page_size(); } uint64_t IndexService::used_size() const { @@ -154,31 +163,39 @@ IndexBuffer::~IndexBuffer() { } std::string IndexBuffer::to_string() const { - if (m_is_meta_buf) { - return fmt::format("Buf={} [Meta] index={} state={} create/dirty_cp={}/{} down_wait#={} freed={}", - voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, int_cast(state()), - m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), m_node_freed); - } else { - // store m_down_buffers in a string - std::string down_bufs = ""; + static std::vector< std::string > state_str = {"CLEAN", "DIRTY", "FLUSHING"}; + // store m_down_buffers in a string + std::string down_bufs = ""; #ifndef NDEBUG - { - std::lock_guard lg(m_down_buffers_mtx); - for (auto const &down_buf: m_down_buffers) { + { + std::lock_guard lg(m_down_buffers_mtx); + if (m_down_buffers.empty()) { + fmt::format_to(std::back_inserter(down_bufs), "EMPTY"); + } else { + for (auto const& down_buf : m_down_buffers) { if (auto ptr = down_buf.lock()) { fmt::format_to(std::back_inserter(down_bufs), "[{}]", voidptr_cast(ptr.get())); } } + fmt::format_to(std::back_inserter(down_bufs), " #down bufs={}", m_down_buffers.size()); } + } #endif - return fmt::format("Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} up={} node=[{}] down=[{}]", - voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, int_cast(state()), - m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), - m_node_freed ? " Freed" : "", voidptr_cast(const_cast< IndexBuffer* >(m_up_buffer.get())), - (m_bytes == nullptr) ? "not attached yet" - : r_cast< persistent_hdr_t const* >(m_bytes)->to_compact_string(), - down_bufs); + if (m_is_meta_buf) { + return fmt::format("[Meta] Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} down={{{}}}", + voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, + state_str[int_cast(state())], m_created_cp_id, m_dirtied_cp_id, + m_wait_for_down_buffers.get(), m_node_freed ? " Freed" : "", down_bufs); + } else { + + return fmt::format( + "Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} up={} node=[{}] down={{{}}}", + voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, state_str[int_cast(state())], + m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), m_node_freed ? " Freed" : "", + voidptr_cast(const_cast< IndexBuffer* >(m_up_buffer.get())), + (m_bytes == nullptr) ? "not attached yet" : r_cast< persistent_hdr_t const* >(m_bytes)->to_compact_string(), + down_bufs); } } @@ -194,7 +211,7 @@ std::string IndexBuffer::to_string_dot() const { return str; } -void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) { +void IndexBuffer::add_down_buffer(const IndexBufferPtr& buf) { m_wait_for_down_buffers.increment(); #ifndef NDEBUG { @@ -204,10 +221,11 @@ void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) { #endif } -void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) { +void IndexBuffer::remove_down_buffer(const IndexBufferPtr& buf) { m_wait_for_down_buffers.decrement(); #ifndef NDEBUG - bool found{false}; { + bool found{false}; + { std::lock_guard lg(m_down_buffers_mtx); for (auto it = buf->m_up_buffer->m_down_buffers.begin(); it != buf->m_up_buffer->m_down_buffers.end(); ++it) { if (it->lock() == buf) { @@ -222,12 +240,10 @@ void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) { } #ifndef NDEBUG -bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr &buf) { - std::lock_guard lg(m_down_buffers_mtx); - for (auto const &dbuf: m_down_buffers) { - if (dbuf.lock() == buf) { - return true; - } +bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr& buf) { + std::lock_guard< std::mutex > lg(m_down_buffers_mtx); + for (auto const& dbuf : m_down_buffers) { + if (dbuf.lock() == buf) { return true; } } return false; } diff --git a/src/lib/index/wb_cache.cpp b/src/lib/index/wb_cache.cpp index 04383d8ac..cc45d18de 100644 --- a/src/lib/index/wb_cache.cpp +++ b/src/lib/index/wb_cache.cpp @@ -420,11 +420,11 @@ void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { } buf->m_node_freed = true; resource_mgr().inc_free_blk(m_node_size); - m_vdev->free_blk(buf->m_blkid, s_cast(cp_ctx)); + m_vdev->free_blk(buf->m_blkid, s_cast< VDevCPContext* >(cp_ctx)); } //////////////////// Recovery Related section ///////////////////////////////// -void IndexWBCache::load_buf(IndexBufferPtr const &buf) { +void IndexWBCache::load_buf(IndexBufferPtr const& buf) { if (buf->m_bytes == nullptr) { buf->m_bytes = hs_utils::iobuf_alloc(m_node_size, sisl::buftag::btree_node, m_vdev->align_size()); m_vdev->sync_read(r_cast< char* >(buf->m_bytes), m_node_size, buf->blkid()); @@ -432,6 +432,78 @@ void IndexWBCache::load_buf(IndexBufferPtr const &buf) { } } +struct DagNode { + IndexBufferPtr buffer; + std::vector< shared< DagNode > > children; +}; + +using DagPtr = std::shared_ptr< DagNode >; +using DagMap = std::map< IndexBufferPtr, DagPtr >; + +static DagMap generate_dag_buffers(std::map< BlkId, IndexBufferPtr >& bufmap) { + std::vector< IndexBufferPtr > bufs; + std::ranges::transform(bufmap, std::back_inserter(bufs), [](const auto& pair) { return pair.second; }); + + auto buildReverseMapping = [](const std::vector< IndexBufferPtr >& buffers) { + std::unordered_map< IndexBufferPtr, std::vector< IndexBufferPtr > > parentToChildren; + for (const auto& buffer : buffers) { + if (buffer->m_up_buffer) { parentToChildren[buffer->m_up_buffer].push_back(buffer); } + } + return parentToChildren; + }; + + std::function< DagPtr(IndexBufferPtr, std::unordered_map< IndexBufferPtr, std::vector< IndexBufferPtr > >&) > + buildDag; + buildDag = + [&buildDag](IndexBufferPtr buffer, + std::unordered_map< IndexBufferPtr, std::vector< IndexBufferPtr > >& parentToChildren) -> DagPtr { + auto dagNode = std::make_shared< DagNode >(); + dagNode->buffer = buffer; + if (parentToChildren.count(buffer)) { + for (const auto& child : parentToChildren[buffer]) { + dagNode->children.push_back(buildDag(child, parentToChildren)); + } + } + return dagNode; + }; + + auto generateDagMap = [&](const std::vector< IndexBufferPtr >& buffers) { + DagMap dagMap; + auto parentToChildren = buildReverseMapping(buffers); + for (const auto& buffer : buffers) { + if (!buffer->m_up_buffer) { // This is a root buffer + auto dagRoot = buildDag(buffer, parentToChildren); + dagMap[buffer] = dagRoot; + } + } + return dagMap; + }; + + return generateDagMap(bufs); +} + +static std::string to_string_dag_bufs(DagMap& dags, cp_id_t cp_id = 0) { + std::string str{fmt::format("#_of_dags={}\n", dags.size())}; + int cnt = 1; + for (const auto& [_, dag] : dags) { + std::vector< std::tuple< std::shared_ptr< DagNode >, int, int > > stack; + stack.emplace_back(dag, 0, cnt++); + while (!stack.empty()) { + auto [node, level, index] = stack.back(); + stack.pop_back(); + auto snew = node->buffer->m_created_cp_id == cp_id ? "NEW" : ""; + auto sfree = node->buffer->m_node_freed ? "FREED" : ""; + fmt::format_to(std::back_inserter(str), "{}{}-{} {} {}\n", std::string(level * 4, ' '), index, + node->buffer->to_string(), snew, sfree); + int c = node->children.size(); + for (const auto& d : node->children) { + stack.emplace_back(d, level + 1, c--); + } + } + } + return str; +} + void IndexWBCache::recover(sisl::byte_view sb) { // If sb is empty, its possible a first time boot. if ((sb.bytes() == nullptr) || (sb.size() == 0)) { @@ -452,9 +524,9 @@ void IndexWBCache::recover(sisl::byte_view sb) { #ifdef _PRERELEASE auto detailed_log = [this](std::map< BlkId, IndexBufferPtr > const& bufs, - std::vector const &pending_bufs) { + std::vector< IndexBufferPtr > const& pending_bufs) { std::string log = fmt::format("\trecovered bufs (#of bufs = {})\n", bufs.size()); - for (auto const &[_, buf]: bufs) { + for (auto const& [_, buf] : bufs) { load_buf(buf); fmt::format_to(std::back_inserter(log), "{}\n", buf->to_string()); } @@ -462,7 +534,7 @@ void IndexWBCache::recover(sisl::byte_view sb) { // list of new_bufs if (!pending_bufs.empty()) { fmt::format_to(std::back_inserter(log), "\n\tpending_bufs (#of bufs = {})\n", pending_bufs.size()); - for (auto const &buf: pending_bufs) { + for (auto const& buf : pending_bufs) { fmt::format_to(std::back_inserter(log), "{}\n", buf->to_string()); } } @@ -471,6 +543,8 @@ void IndexWBCache::recover(sisl::byte_view sb) { std::string log = fmt::format("Recovering bufs (#of bufs = {}) before processing them\n", bufs.size()); LOGTRACEMOD(wbcache, "{}\n{}", log, detailed_log(bufs, {})); + auto dags = generate_dag_buffers(bufs); + LOGTRACEMOD(wbcache, "Before recovery: {}", to_string_dag_bufs(dags, icp_ctx->id())); #endif // At this point, we have the DAG structure (up/down dependency graph), exactly the same as prior to crash, with one @@ -484,15 +558,15 @@ void IndexWBCache::recover(sisl::byte_view sb) { // the same blkid which could clash with the blkid next in the buf list. // // On the second pass, we only take part of the parents/siblings and then repair them, if needed. - std::vector pending_bufs; - std::vector deleted_bufs; - for (auto const &[_, buf]: bufs) { + std::vector< IndexBufferPtr > pending_bufs; + std::vector< IndexBufferPtr > deleted_bufs; + for (auto const& [_, buf] : bufs) { if (buf->m_node_freed) { // Freed node load_buf(buf); if (was_node_committed(buf)) { // Mark this buffer as deleted, so that we can avoid using it anymore when repairing its parent's link - r_cast(buf->m_bytes)->node_deleted = true; + r_cast< persistent_hdr_t* >(buf->m_bytes)->node_deleted = true; write_buf(nullptr, buf, icp_ctx); deleted_bufs.push_back(buf); pending_bufs.push_back(buf->m_up_buffer); @@ -508,14 +582,27 @@ void IndexWBCache::recover(sisl::byte_view sb) { } } else if (buf->m_created_cp_id == icp_ctx->id()) { // New node - if (was_node_committed(buf) && was_node_committed(buf->m_up_buffer)) { - // Both current and up buffer is commited, we can safely commit the current block - m_vdev->commit_blk(buf->m_blkid); - pending_bufs.push_back(buf->m_up_buffer); - } else { - // Just ignore it - buf->m_up_buffer->remove_down_buffer(buf); - buf->m_up_buffer = nullptr; + if (was_node_committed(buf)) { + if (was_node_committed(buf->m_up_buffer)) { + // Both current and up buffer is commited, we can safely commit the current block + m_vdev->commit_blk(buf->m_blkid); + pending_bufs.push_back(buf->m_up_buffer); + } else { + // Up buffer is not committed, we need to repair it first + buf->m_up_buffer->remove_down_buffer(buf); + // buf->m_up_buffer = nullptr; + if (buf->m_up_buffer->m_wait_for_down_buffers.testz()) { + // if up buffer has upbuffer, then we need to decrement its wait_for_down_buffers + auto grand_buf = buf->m_up_buffer->m_up_buffer; + if (grand_buf) { + HS_DBG_ASSERT(!grand_buf->m_wait_for_down_buffers.testz(), + "upbuffer of upbuffer is already zero"); + grand_buf->remove_down_buffer(buf->m_up_buffer); + LOGINFOMOD(wbcache, "Decrementing wait_for_down_buffers for up buffer of up buffer {}", + grand_buf->to_string()); + } + } + } } } } @@ -524,25 +611,48 @@ void IndexWBCache::recover(sisl::byte_view sb) { LOGINFOMOD(wbcache, "Index Recovery detected {} nodes out of {} as new/freed nodes to be recovered in prev cp={}", pending_bufs.size(), bufs.size(), icp_ctx->id()); LOGTRACEMOD(wbcache, "All unclean bufs list\n{}", detailed_log(bufs, pending_bufs)); + LOGTRACEMOD(wbcache, "After recovery: {}", to_string_dag_bufs(dags, icp_ctx->id())); #endif - for (auto const &buf: pending_bufs) { + for (auto const& buf : pending_bufs) { recover_buf(buf); - if (buf->m_bytes != nullptr && r_cast(buf->m_bytes)->node_deleted) { + if (buf->m_bytes != nullptr && r_cast< persistent_hdr_t* >(buf->m_bytes)->node_deleted) { // This buffer was marked as deleted during repair, so we also need to free it deleted_bufs.push_back(buf); } } - for (auto const &buf: deleted_bufs) { - m_vdev->free_blk(buf->m_blkid, s_cast(icp_ctx)); + for (auto const& buf : deleted_bufs) { + m_vdev->free_blk(buf->m_blkid, s_cast< VDevCPContext* >(icp_ctx)); } m_in_recovery = false; m_vdev->recovery_completed(); } -void IndexWBCache::recover_buf(IndexBufferPtr const &buf) { +void IndexWBCache::updateUpBufferCounters(std::vector< IndexBufferPtr >& l0_bufs) { + std::unordered_set< IndexBufferPtr > allBuffers; + + // First, collect all unique buffers and reset their counters + for (auto& leaf : l0_bufs) { + auto currentBuffer = leaf; + while (currentBuffer) { + if (allBuffers.insert(currentBuffer).second) { currentBuffer->m_wait_for_down_buffers.set(0); } + currentBuffer = currentBuffer->m_up_buffer; + } + } + + // Now, iterate over each leaf buffer and update the count for each parent up the chain + for (auto& leaf : l0_bufs) { + auto currentBuffer = leaf; + while (currentBuffer) { + if (currentBuffer->m_up_buffer) { currentBuffer->m_up_buffer->m_wait_for_down_buffers.increment(1); } + currentBuffer = currentBuffer->m_up_buffer; + } + } +} + +void IndexWBCache::recover_buf(IndexBufferPtr const& buf) { if (!buf->m_wait_for_down_buffers.decrement_testz()) { // TODO: remove the buf_>m_up_buffer from down_buffers list of buf->m_up_buffer return; @@ -557,6 +667,12 @@ void IndexWBCache::recover_buf(IndexBufferPtr const &buf) { } else { LOGTRACEMOD(wbcache, "Index Recovery detected up node [{}] as committed no need to repair that", buf->to_string()); + if (buf->m_up_buffer && buf->m_up_buffer->is_meta_buf()) { + // Our up buffer is a meta buffer, which means old root is dirtied and may need no repair but possible of + // new root on upper level so needs to be retore the edge + LOGTRACEMOD(wbcache, "check root change for without repairing {}\n\n", buf->to_string()); + index_service().update_root(buf->m_index_ordinal, buf); + } } if (buf->m_up_buffer) { recover_buf(buf->m_up_buffer); } @@ -656,10 +772,8 @@ void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr const if (buf->is_meta_buf()) { LOGTRACEMOD(wbcache, "Flushing cp {} meta buf {} possibly because of root split", cp_ctx->id(), buf->to_string()); - auto const &sb = r_cast(buf.get())->m_sb; - if (!sb.is_empty()) { - meta_service().update_sub_sb(buf->m_bytes, sb.size(), sb.meta_blk()); - } + auto const& sb = r_cast< MetaIndexBuffer* >(buf.get())->m_sb; + if (!sb.is_empty()) { meta_service().update_sub_sb(buf->m_bytes, sb.size(), sb.meta_blk()); } process_write_completion(cp_ctx, buf); } else if (buf->m_node_freed) { LOGTRACEMOD(wbcache, "Not flushing buf {} as it was freed, its here for merely dependency", cp_ctx->id(), @@ -667,15 +781,13 @@ void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr const process_write_completion(cp_ctx, buf); } else { LOGTRACEMOD(wbcache, "Flushing cp {} buf {}", cp_ctx->id(), buf->to_string()); - m_vdev->async_write(r_cast(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch) - .thenValue([buf, cp_ctx](auto) { - try { - auto &pthis = s_cast(wb_cache()); - pthis.process_write_completion(cp_ctx, buf); - } catch (const std::runtime_error &e) { - LOGERROR("Failed to access write-back cache: {}", e.what()); - } - }); + m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch) + .thenValue([buf, cp_ctx](auto) { + try { + auto& pthis = s_cast< IndexWBCache& >(wb_cache()); + pthis.process_write_completion(cp_ctx, buf); + } catch (const std::runtime_error& e) { LOGERROR("Failed to access write-back cache: {}", e.what()); } + }); if (!part_of_batch) { m_vdev->submit_batch(); } } diff --git a/src/lib/index/wb_cache.hpp b/src/lib/index/wb_cache.hpp index 25a4c8201..f129d11ef 100644 --- a/src/lib/index/wb_cache.hpp +++ b/src/lib/index/wb_cache.hpp @@ -41,6 +41,7 @@ class IndexWBCache : public IndexWBCacheBase { std::mutex m_flush_mtx; void* m_meta_blk; bool m_in_recovery{false}; + public: IndexWBCache(const std::shared_ptr< VirtualDev >& vdev, std::pair< meta_blk*, sisl::byte_view > sb, const std::shared_ptr< sisl::Evictor >& evictor, uint32_t node_size); @@ -78,5 +79,6 @@ class IndexWBCache : public IndexWBCacheBase { void recover_buf(IndexBufferPtr const& buf); bool was_node_committed(IndexBufferPtr const& buf); void load_buf(IndexBufferPtr const& buf); + void updateUpBufferCounters(std::vector< IndexBufferPtr >& pending_bufs); }; } // namespace homestore diff --git a/src/lib/replication/service/generic_repl_svc.cpp b/src/lib/replication/service/generic_repl_svc.cpp index 9aa2c044d..f5671cb16 100644 --- a/src/lib/replication/service/generic_repl_svc.cpp +++ b/src/lib/replication/service/generic_repl_svc.cpp @@ -152,7 +152,9 @@ AsyncReplResult<> SoloReplService::replace_member(group_id_t group_id, const rep return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED); } -std::unique_ptr< CPContext > SoloReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; } +std::unique_ptr< CPContext > SoloReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { + return std::make_unique< CPContext >(new_cp); +} folly::Future< bool > SoloReplServiceCPHandler::cp_flush(CP* cp) { repl_service().iterate_repl_devs([cp](cshared< ReplDev >& repl_dev) { diff --git a/src/tests/test_index_crash_recovery.cpp b/src/tests/test_index_crash_recovery.cpp index 560bf0f83..650b35955 100644 --- a/src/tests/test_index_crash_recovery.cpp +++ b/src/tests/test_index_crash_recovery.cpp @@ -37,29 +37,29 @@ SISL_LOGGING_DECL(test_index_crash_recovery) SISL_OPTION_GROUP( test_index_crash_recovery, (num_iters, "", "num_iters", "number of iterations for rand ops", - ::cxxopts::value()->default_value("500"), "number"), + ::cxxopts::value< uint32_t >()->default_value("500"), "number"), (num_entries, "", "num_entries", "number of entries to test with", - ::cxxopts::value()->default_value("5000"), "number"), - (run_time, "", "run_time", "run time for io", ::cxxopts::value()->default_value("360000"), "seconds"), + ::cxxopts::value< uint32_t >()->default_value("5000"), "number"), + (run_time, "", "run_time", "run time for io", ::cxxopts::value< uint32_t >()->default_value("360000"), "seconds"), (num_rounds, "", "num_rounds", "number of rounds to test with", - ::cxxopts::value()->default_value("100"), "number"), + ::cxxopts::value< uint32_t >()->default_value("100"), "number"), (num_entries_per_rounds, "", "num_entries_per_rounds", "number of entries per rounds", - ::cxxopts::value()->default_value("40"), "number"), - (max_keys_in_node, "", "max_keys_in_node", "max_keys_in_node", - ::cxxopts::value()->default_value("20"), ""), - (min_keys_in_node, "", "min_keys_in_node", "min_keys_in_node", - ::cxxopts::value()->default_value("6"), ""), + ::cxxopts::value< uint32_t >()->default_value("40"), "number"), + (max_keys_in_node, "", "max_keys_in_node", "max_keys_in_node", ::cxxopts::value< uint32_t >()->default_value("20"), + ""), + (min_keys_in_node, "", "min_keys_in_node", "min_keys_in_node", ::cxxopts::value< uint32_t >()->default_value("6"), + ""), (operation_list, "", "operation_list", "operation list instead of default created following by percentage", - ::cxxopts::value< std::vector< std::string > >(), "operations [...]"), + ::cxxopts::value< std::vector< std::string > >(), "operations [...]"), (preload_size, "", "preload_size", "number of entries to preload tree with", - ::cxxopts::value()->default_value("1000"), "number"), + ::cxxopts::value< uint32_t >()->default_value("1000"), "number"), (init_device, "", "init_device", "init device", ::cxxopts::value< bool >()->default_value("1"), ""), (load_from_file, "", "load_from_file", "load from file", ::cxxopts::value< bool >()->default_value("0"), ""), (save_to_file, "", "save_to_file", "save to file", ::cxxopts::value< bool >()->default_value("0"), ""), (cleanup_after_shutdown, "", "cleanup_after_shutdown", "cleanup after shutdown", - ::cxxopts::value< bool >()->default_value("1"), ""), + ::cxxopts::value< bool >()->default_value("1"), ""), (seed, "", "seed", "random engine seed, use random if not defined", - ::cxxopts::value< uint64_t >()->default_value("0"), "number")) + ::cxxopts::value< uint64_t >()->default_value("0"), "number")) void log_obj_life_counter() { std::string str; @@ -249,7 +249,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT m_test->m_cfg.m_leaf_node_type = T::leaf_node_type; m_test->m_cfg.m_int_node_type = T::interior_node_type; m_test->m_cfg.m_max_keys_in_node = SISL_OPTIONS["max_keys_in_node"].as< uint32_t >(); - m_test->m_cfg.m_min_keys_in_node = SISL_OPTIONS["min_keys_in_node"].as(); + m_test->m_cfg.m_min_keys_in_node = SISL_OPTIONS["min_keys_in_node"].as< uint32_t >(); m_test->m_bt = std::make_shared< typename T::BtreeType >(std::move(sb), m_test->m_cfg); return m_test->m_bt; } @@ -277,7 +277,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT this->m_cfg = BtreeConfig(hs()->index_service().node_size()); this->m_cfg.m_max_keys_in_node = SISL_OPTIONS["max_keys_in_node"].as< uint32_t >(); - this->m_cfg.m_min_keys_in_node = SISL_OPTIONS["min_keys_in_node"].as(); + this->m_cfg.m_min_keys_in_node = SISL_OPTIONS["min_keys_in_node"].as< uint32_t >(); LOGINFO("Node size {}, max_keys_in_node {}, min_keys_in_node {}", this->m_cfg.node_size(), this->m_cfg.m_max_keys_in_node, this->m_cfg.m_min_keys_in_node); auto uuid = boost::uuids::random_generator()(); @@ -338,7 +338,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT void reapply_after_crash() { ShadowMap< K, V > snapshot_map{this->m_shadow_map.max_keys()}; snapshot_map.load(m_shadow_filename); - LOGINFO("\tSnapshot before crash\n{}", snapshot_map.to_string()); + // LOGINFO("\tSnapshot before crash\n{}", snapshot_map.to_string()); auto diff = this->m_shadow_map.diff(snapshot_map); // visualize tree after crash @@ -346,13 +346,14 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT // this->visualize_keys(recovered_tree_filename); // LOGINFO(" tree after recovered stored in {}", recovered_tree_filename); - std::string dif_str = "KEY \tADDITION\n"; - for (const auto& [k, addition] : diff) { - dif_str += fmt::format(" {} \t{}\n", k.key(), addition); + std::string dif_str = "Keys["; + for (const auto& [k, _] : diff) { + dif_str += fmt::format("{} ", k.key()); } + dif_str += "]"; LOGINFO("Diff between shadow map and snapshot map\n{}\n", dif_str); - for (const auto &[k, addition]: diff) { + for (const auto& [k, addition] : diff) { // this->print_keys(fmt::format("reapply: before inserting key {}", k.key())); // this->visualize_keys(recovered_tree_filename); if (addition) { @@ -401,15 +402,15 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT } void crash_and_recover(uint32_t s_key, uint32_t e_key) { - this->print_keys("Btree prior to CP and susbsequent simulated crash: "); + // this->print_keys("Btree prior to CP and susbsequent simulated crash: "); trigger_cp(false); this->wait_for_crash_recovery(); // this->visualize_keys("tree_after_crash_" + std::to_string(s_key) + "_" + std::to_string(e_key) + ".dot"); - this->print_keys("Post crash and recovery, btree structure: "); + // this->print_keys("Post crash and recovery, btree structure: "); this->reapply_after_crash(); - this->print_keys("Post reapply, btree structure: "); + // this->print_keys("Post reapply, btree structure: "); this->get_all(); LOGINFO("Expect to have [{},{}) in tree and it is actually{} ", s_key, e_key, tree_key_count()); @@ -420,24 +421,28 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT std::set< uint64_t > new_keys; std::transform(operations.begin(), operations.end(), std::inserter(new_keys, new_keys.end()), [](const Operation& operation) { return operation.first; }); - uint32_t count = 1; + uint32_t count = 0; this->m_shadow_map.foreach ([this, new_keys, &count](K key, V value) { // discard the new keys to check if (new_keys.find(key.key()) != new_keys.end()) { return; } + count++; auto copy_key = std::make_unique< K >(); *copy_key = key; auto out_v = std::make_unique< V >(); auto req = BtreeSingleGetRequest{copy_key.get(), out_v.get()}; req.enable_route_tracing(); const auto ret = this->m_bt->get(req); + if (ret != btree_status_t::success) { + this->print_keys(fmt::format("Sanity check: key {}", key.key())); + this->dump_to_file("sanity_fail.txt"); + } ASSERT_EQ(ret, btree_status_t::success) << "Missing key " << key << " in btree but present in shadow map"; }); LOGINFO("Sanity check passed for {} keys!", count); - } void crash_and_recover(OperationList& operations, std::string filename = "") { - this->print_keys("Btree prior to CP and susbsequent simulated crash: "); + // this->print_keys("Btree prior to CP and susbsequent simulated crash: "); LOGINFO("Before Crash: {} keys in shadow map and it is actually {} keys in tree - operations size {}", this->m_shadow_map.size(), tree_key_count(), operations.size()); @@ -456,7 +461,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT LOGINFO("Visualize the tree file after recovery : {}", rec_filename); this->visualize_keys(rec_filename); } - this->print_keys("Post crash and recovery, btree structure: "); + // this->print_keys("Post crash and recovery, btree structure: "); sanity_check(operations); // Added to the index service right after recovery. Not needed here // test_common::HSTestHelper::trigger_cp(true); @@ -468,7 +473,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT LOGINFO("Visualize the tree after reapply {}", re_filename); this->visualize_keys(re_filename); } - this->print_keys("Post reapply, btree structure: "); + // this->print_keys("Post reapply, btree structure: "); this->get_all(); LOGINFO("After reapply: {} keys in shadow map and actually {} in tress", this->m_shadow_map.size(), @@ -629,7 +634,7 @@ TYPED_TEST(IndexCrashTest, long_running_put_crash) { test_common::HSTestHelper::trigger_cp(true); this->get_all(); this->m_shadow_map.save(this->m_shadow_filename); - this->print_keys("reapply: after preload"); + // this->print_keys("reapply: after preload"); this->visualize_keys("tree_after_preload.dot"); for (uint32_t round = 1; @@ -716,28 +721,27 @@ TYPED_TEST(IndexCrashTest, long_running_put_crash) { elapsed_time * 100.0 / this->m_run_time, this->tree_key_count(), num_entries, this->tree_key_count() * 100.0 / num_entries); } - this->print_keys(fmt::format("reapply: after round {}", round)); + // this->print_keys(fmt::format("reapply: after round {}", round)); if (renew_btree_after_crash) { this->reset_btree(); }; } } // Basic reverse and forward order remove with different flip points TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { - vector flip_points = { - "crash_flush_on_merge_at_parent", - "crash_flush_on_merge_at_left_child", + vector< std::string > flip_points = { + "crash_flush_on_merge_at_parent", "crash_flush_on_merge_at_left_child", // "crash_flush_on_freed_child", }; for (size_t i = 0; i < flip_points.size(); ++i) { this->reset_btree(); - auto &flip_point = flip_points[i]; + auto& flip_point = flip_points[i]; LOGINFO("=== Testing flip point: {} - {} ===", i + 1, flip_point); // Populate some keys [1,num_entries) and trigger cp to persist - LOGINFO("Step {}-1: Populate some keys and flush", i+1); - auto const num_entries = SISL_OPTIONS["num_entries"].as(); + LOGINFO("Step {}-1: Populate some keys and flush", i + 1); + auto const num_entries = SISL_OPTIONS["num_entries"].as< uint32_t >(); for (auto k = 0u; k < num_entries; ++k) { this->put(k, btree_put_type::INSERT, true /* expect_success */); } @@ -748,7 +752,8 @@ TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { // Split keys into batches and remove the last one in reverse order LOGINFO("Step {}-2: Set crash flag, remove some keys in reverse order", i + 1); - int batch_num = 4; { + int batch_num = 4; + { int n = batch_num; auto r = num_entries * n / batch_num - 1; auto l = num_entries * (n - 1) / batch_num; @@ -759,8 +764,7 @@ TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { LOGINFO("Step {}-2-1: Remove keys in batch {}/{} ({} to {})", i + 1, n, batch_num, r, l); this->set_basic_flip(flip_point); - for (auto [k, _]: ops) { - LOGINFO("Removing key {}", k); + for (auto [k, _] : ops) { this->remove_one(k, true); } this->visualize_keys("tree_merge_before_first_crash.dot"); @@ -781,8 +785,7 @@ TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { LOGINFO("Step {}-3-1: Remove keys in batch {}/{} ({} to {})", i + 1, n, batch_num, l, r); this->set_basic_flip(flip_point); - for (auto [k, _]: ops) { - LOGINFO("Removing key {}", k); + for (auto [k, _] : ops) { this->remove_one(k, true); } this->visualize_keys("tree_merge_before_second_crash.dot"); @@ -803,8 +806,7 @@ TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { LOGINFO("Step {}-4-1: Remove keys in batch {}/{} ({} to {})", i + 1, n, batch_num, l, r); this->set_basic_flip(flip_point); - for (auto [k, _]: ops) { - LOGINFO("Removing key {}", k); + for (auto [k, _] : ops) { this->remove_one(k, true); } this->visualize_keys("tree_merge_before_third_crash.dot"); @@ -828,9 +830,8 @@ TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { // vector flips = { // "crash_flush_on_merge_at_parent", "crash_flush_on_merge_at_left_child", // }; -// SequenceGenerator generator(0 /*putFreq*/, 100 /* removeFreq*/, 0 /*start_range*/, num_entries - 1 /*end_range*/); -// OperationList operations; -// for (size_t i = 0; i < flips.size(); ++i) { +// SequenceGenerator generator(0 /*putFreq*/, 100 /* removeFreq*/, 0 /*start_range*/, num_entries - 1 +// /*end_range*/); OperationList operations; for (size_t i = 0; i < flips.size(); ++i) { // this->reset_btree(); // LOGINFO("Step {}-1: Init btree", i + 1); // for (auto k = 0u; k < num_entries; ++k) { diff --git a/src/tests/test_scripts/index_test.py b/src/tests/test_scripts/index_test.py index dd2f8f010..d4734ac82 100755 --- a/src/tests/test_scripts/index_test.py +++ b/src/tests/test_scripts/index_test.py @@ -51,10 +51,10 @@ def parse_arguments(): parser.add_argument('--dev_list', help='Device list', default='') parser.add_argument('--cleanup_after_shutdown', help='Cleanup after shutdown', type=bool, default=False) parser.add_argument('--init_device', help='Initialize device', type=bool, default=True) - parser.add_argument('--max_keys_in_node', help='Maximum num of keys in btree nodes', type=int, default=5) + parser.add_argument('--max_keys_in_node', help='Maximum num of keys in btree nodes', type=int, default=10) parser.add_argument('--min_keys_in_node', help='Minimum num of keys in btree nodes', type=int, default=2) - parser.add_argument('--num_rounds', help='number of rounds for crash test', type=int, default=10000) - parser.add_argument('--num_entries_per_rounds', help='number of rounds for crash test', type=int, default=60) + parser.add_argument('--num_rounds', help='number of rounds for crash test', type=int, default=1000) + parser.add_argument('--num_entries_per_rounds', help='number of rounds for crash test', type=int, default=100) # Parse the known arguments and ignore any unknown arguments args, unknown = parser.parse_known_args() @@ -94,10 +94,10 @@ def long_running_clean_shutdown(options, type=0): def long_running_crash_put(options): print("Long running crash put started") - options['num_entries'] = 131072 # 128K + options['num_entries'] = 1310720 # 1280K options['init_device'] = True options['run_time'] = 14400 # 4 hours - options['preload_size'] = 100 + options['preload_size'] = 1024 print(f"options: {options}") run_crash_test(options) print("Long running crash put completed")