Skip to content

Commit

Permalink
Only call cp_flush for those consumer paticipated in this cp.
Browse files Browse the repository at this point in the history
If a consumer registered after a cp goes to flushing state,
the on_switchover_cp cb will not be called for this consumer.
In this CP, the ctx for this consumer is nullptr as the consumer
never participant in the cp.

Previous code calling cp_flush for every consumer, leaving the duty
of properly handle the nullptr returned by cp->context(svc_id) to
consumer. However, none of the existing consumer handled the case.

As a result, we hit an occurance that Index generate a CP sololy,
but before the cp fully flushed, other consumer registered and be
called into cp_flush(), the replication service, doesnt properly
handled the nullptr like below, `get_repl_dev_ctx` was called with this_ptr
is null, it is dangerous as invalid memory get accessed.

This change is a breaking change for consumer like HO so bump up the version.
HomeObject participant the CP as CLIENT, current implementation of HO always
returns nullptr for `on_switchover_cp` which will result the CLIENT be excluded
from cp_flush after this commit merged.

callstack:
```
homestore::ReplSvcCPContext::get_repl_dev_ctx (this=0x0, dev=0x56010ab52b00) at /home/ubuntu/HomeStore/src/lib/replication/service/raft_repl_service.cpp:521
0x0000560106d58f1e in homestore::RaftReplServiceCPHandler::cp_flush (this=<optimized out>, cp=0x56010a467940) at /home/ubuntu/HomeStore/src/lib/replication/service/raft_repl_service.cpp:549
```

code:

```
 auto cp_ctx = s_cast< ReplSvcCPContext* >(cp->context(cp_consumer_t::REPLICATION_SVC));
 ...
 auto dev_ctx = cp_ctx->get_repl_dev_ctx(repl_dev.get());
```

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen authored and shosseinimotlagh committed Dec 24, 2024
1 parent 9f9bd45 commit 91ade04
Show file tree
Hide file tree
Showing 12 changed files with 309 additions and 143 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.6.0"
version = "6.6.1"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
9 changes: 5 additions & 4 deletions src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class IndexTableBase {
virtual uint64_t used_size() const = 0;
virtual void destroy() = 0;
virtual void repair_node(IndexBufferPtr const& buf) = 0;
virtual void repair_root_node(IndexBufferPtr const& buf) = 0;
};

enum class index_buf_state_t : uint8_t {
Expand All @@ -97,7 +98,7 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > {
sisl::atomic_counter< int > m_wait_for_down_buffers{0}; // Number of children need to wait for before persisting
#ifndef NDEBUG
// Down buffers are not mandatory members, but only to keep track of any bugs and asserts
std::vector<std::weak_ptr<IndexBuffer> > m_down_buffers;
std::vector< std::weak_ptr< IndexBuffer > > m_down_buffers;
std::mutex m_down_buffers_mtx;
std::shared_ptr< IndexBuffer > m_prev_up_buffer; // Keep a copy for debugging
#endif
Expand Down Expand Up @@ -125,11 +126,11 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > {
std::string to_string() const;
std::string to_string_dot() const;

void add_down_buffer(const IndexBufferPtr &buf);
void add_down_buffer(const IndexBufferPtr& buf);

void remove_down_buffer(const IndexBufferPtr &buf);
void remove_down_buffer(const IndexBufferPtr& buf);
#ifndef NDEBUG
bool is_in_down_buffers(const IndexBufferPtr &buf);
bool is_in_down_buffers(const IndexBufferPtr& buf);
#endif
};

Expand Down
49 changes: 39 additions & 10 deletions src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {

void destroy() override {
auto cpg = cp_mgr().cp_guard();
Btree<K, V>::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC));
Btree< K, V >::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC));
m_sb.destroy();
}

Expand Down Expand Up @@ -114,11 +114,40 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
return ret;
}

void repair_root_node(IndexBufferPtr const& idx_buf) override {
LOGTRACEMOD(wbcache, "check if this was the previous root node {} for buf {} ", m_sb->root_node,
idx_buf->to_string());
if (m_sb->root_node == idx_buf->blkid().to_integer()) {
// This is the root node, we need to update the root node in superblk
LOGTRACEMOD(wbcache, "{} is old root so we need to update the meta node ", idx_buf->to_string());
BtreeNode* n = this->init_node(idx_buf->raw_buffer(), idx_buf->blkid().to_integer(), false /* init_buf */,
BtreeNode::identify_leaf_node(idx_buf->raw_buffer()));
static_cast< IndexBtreeNode* >(n)->attach_buf(idx_buf);
auto edge_id = n->next_bnode();

BT_DBG_ASSERT(!n->has_valid_edge(),
"root {} already has a valid edge {}, so we should have found the new root node",
n->to_string(), n->get_edge_value().bnode_id());
n->set_next_bnode(empty_bnodeid);
n->set_edge_value(BtreeLinkInfo{edge_id, 0});
LOGTRACEMOD(wbcache, "change root node {}: edge updated to {} and invalidate the next node! ", n->node_id(),
edge_id);
auto cpg = cp_mgr().cp_guard();
write_node_impl(n, (void*)cpg.context(cp_consumer_t::INDEX_SVC));

} else {
LOGTRACEMOD(wbcache, "This is not the root node, so we can ignore this repair call for buf {}",
idx_buf->to_string());
}
}

void repair_node(IndexBufferPtr const& idx_buf) override {
if (idx_buf->is_meta_buf()) {
// We cannot repair the meta buf on its own, we need to repair the root node which modifies the
// meta_buf. It is ok to ignore this call, because repair will be done from root before meta_buf is
// attempted to repair, which would have updated the meta_buf already.
LOGTRACEMOD(wbcache, "Ignoring repair on meta buf {} root id {} ", idx_buf->to_string(),
this->root_node_id());
return;
}
BtreeNode* n = this->init_node(idx_buf->raw_buffer(), idx_buf->blkid().to_integer(), false /* init_buf */,
Expand All @@ -134,13 +163,14 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// Only for interior nodes we need to repair its links
if (!bn->is_leaf()) {
LOGTRACEMOD(wbcache, "repair_node cp={} buf={}", cpg->id(), idx_buf->to_string());
repair_links(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC));
repair_links(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC));
}

if (idx_buf->m_up_buffer && idx_buf->m_up_buffer->is_meta_buf()) {
// Our up buffer is a meta buffer, which means that we are the new root node, we need to update the
// meta_buf with new root as well
on_root_changed(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC));
LOGTRACEMOD(wbcache, "root change for after repairing {}\n\n", idx_buf->to_string());
on_root_changed(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC));
}
}

Expand Down Expand Up @@ -227,10 +257,11 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
wb_cache().free_buf(n->m_idx_buf, r_cast< CPContext* >(context));
}

btree_status_t
on_root_changed(BtreeNodePtr const &new_root, void *context) override {
btree_status_t on_root_changed(BtreeNodePtr const& new_root, void* context) override {
// todo: if(m_sb->root_node == new_root->node_id() && m_sb->root_link_version == new_root->link_version()){
// return btree_status_t::success;}
LOGTRACEMOD(wbcache, "root changed for index old_root={} new_root={}", m_sb->root_node,
new_root->node_id());
m_sb->root_node = new_root->node_id();
m_sb->root_link_version = new_root->link_version();

Expand All @@ -240,7 +271,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}

auto& root_buf = static_cast< IndexBtreeNode* >(new_root.get())->m_idx_buf;
wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast<CPContext *>(context));
wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast< CPContext* >(context));
return btree_status_t::success;
}

Expand All @@ -257,7 +288,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}

// Get all original child ids as a support to check if we are beyond the last child node
std::set<bnodeid_t> orig_child_ids;
std::set< bnodeid_t > orig_child_ids;
for (uint32_t i = 0; i < parent_node->total_entries(); ++i) {
BtreeLinkInfo link_info;
parent_node->get_nth_value(i, &link_info, true);
Expand Down Expand Up @@ -391,9 +422,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}
} while (true);

if (child_node) {
this->unlock_node(child_node, locktype_t::READ);
}
if (child_node) { this->unlock_node(child_node, locktype_t::READ); }

if (parent_node->total_entries() == 0 && !parent_node->has_valid_edge()) {
// We shouldn't have an empty interior node in the tree, let's delete it.
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/index_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class IndexService {
uint64_t used_size() const;
uint32_t node_size() const;
void repair_index_node(uint32_t ordinal, IndexBufferPtr const& node_buf);
void update_root(uint32_t ordinal, IndexBufferPtr const& node_buf);

IndexWBCacheBase& wb_cache() {
if (!m_wb_cache) { throw std::runtime_error("Attempted to access a null pointer wb_cache"); }
Expand Down
9 changes: 6 additions & 3 deletions src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ folly::Future< bool > CPManager::do_trigger_cp_flush(bool force, bool flush_on_s
// sealer should be the first one to switch over
auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER];
if (sealer_cp) {
new_cp->m_contexts[(size_t)cp_consumer_t::SEALER] = std::move(sealer_cp->on_switchover_cp(cur_cp.get(), new_cp));
new_cp->m_contexts[(size_t)cp_consumer_t::SEALER] =
std::move(sealer_cp->on_switchover_cp(cur_cp.get(), new_cp));
}
// switch over other consumers
for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) {
Expand Down Expand Up @@ -227,15 +228,17 @@ void CPManager::cp_start_flush(CP* cp) {
for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) {
if (svcid == (size_t)cp_consumer_t::SEALER) { continue; }
auto& consumer = m_cp_cb_table[svcid];
if (consumer) { futs.emplace_back(std::move(consumer->cp_flush(cp))); }
bool participated = (cp->m_contexts[svcid] != nullptr);
if (consumer && participated) { futs.emplace_back(std::move(consumer->cp_flush(cp))); }
}

folly::collectAllUnsafe(futs).thenValue([this, cp](auto) {
// Sync flushing SEALER svc which is the replication service
// at last as the cp_lsn updated here. Other component should
// at least flushed to cp_lsn.
auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER];
if (sealer_cp) { sealer_cp->cp_flush(cp).wait(); }
bool participated = (cp->m_contexts[(size_t)cp_consumer_t::SEALER] != nullptr);
if (sealer_cp && participated) { sealer_cp->cp_flush(cp).wait(); }
// All consumers have flushed for the cp
on_cp_flush_done(cp);
});
Expand Down
23 changes: 11 additions & 12 deletions src/lib/index/index_cp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void IndexCPContext::to_string_dot(const std::string& filename) {
LOGINFO("cp dag is stored in file {}", filename);
}

uint16_t IndexCPContext::num_dags() {
uint16_t IndexCPContext::num_dags() {
// count number of buffers whose up_buffers are nullptr
uint16_t count = 0;
std::unique_lock lg{m_flush_buffer_mtx};
Expand Down Expand Up @@ -190,15 +190,18 @@ std::string IndexCPContext::to_string_with_dags() {
// Now walk through the list of graphs and prepare formatted string
std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} #_of_dags={}\n",
m_cp->id(), m_dirty_buf_count.get(), m_dirty_buf_list.size(), group_roots.size())};
int cnt = 1;
for (const auto& root : group_roots) {
std::vector< std::pair< std::shared_ptr< DagNode >, int > > stack;
stack.emplace_back(root, 0);
std::vector< std::tuple< std::shared_ptr< DagNode >, int, int > > stack;
stack.emplace_back(root, 0, cnt++);
while (!stack.empty()) {
auto [node, level] = stack.back();
auto [node, level, index] = stack.back();
stack.pop_back();
fmt::format_to(std::back_inserter(str), "{}{} \n", std::string(level * 4, ' '), node->buf->to_string());
fmt::format_to(std::back_inserter(str), "{}{}-{} \n", std::string(level * 4, ' '), index,
node->buf->to_string());
int c = node->down_nodes.size();
for (const auto& d : node->down_nodes) {
stack.emplace_back(d, level + 1);
stack.emplace_back(d, level + 1, c--);
}
}
}
Expand Down Expand Up @@ -266,15 +269,11 @@ void IndexCPContext::process_txn_record(txn_record const* rec, std::map< BlkId,
#ifndef NDEBUG
// if (!is_sibling_link || (buf->m_up_buffer == real_up_buf)) { return buf;}
// Already linked with same buf or its not a sibling link to override
if (real_up_buf->is_in_down_buffers(buf)) {
return buf;
}
if (real_up_buf->is_in_down_buffers(buf)) { return buf; }
#endif

if (buf->m_up_buffer != real_up_buf) {
if (buf->m_up_buffer) {
buf->m_up_buffer->remove_down_buffer(buf);
}
if (buf->m_up_buffer) { buf->m_up_buffer->remove_down_buffer(buf); }
real_up_buf->add_down_buffer(buf);
buf->m_up_buffer = real_up_buf;
}
Expand Down
68 changes: 42 additions & 26 deletions src/lib/index/index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ void IndexService::repair_index_node(uint32_t ordinal, IndexBufferPtr const& nod
}
}

void IndexService::update_root(uint32_t ordinal, IndexBufferPtr const& node_buf) {
auto tbl = get_index_table(ordinal);
if (tbl) {
tbl->repair_root_node(node_buf);
} else {
HS_DBG_ASSERT(false, "Index corresponding to ordinal={} has not been loaded yet, unexpected", ordinal);
}
}

uint32_t IndexService::node_size() const { return m_vdev->atomic_page_size(); }

uint64_t IndexService::used_size() const {
Expand All @@ -154,31 +163,39 @@ IndexBuffer::~IndexBuffer() {
}

std::string IndexBuffer::to_string() const {
if (m_is_meta_buf) {
return fmt::format("Buf={} [Meta] index={} state={} create/dirty_cp={}/{} down_wait#={} freed={}",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, int_cast(state()),
m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), m_node_freed);
} else {
// store m_down_buffers in a string
std::string down_bufs = "";
static std::vector< std::string > state_str = {"CLEAN", "DIRTY", "FLUSHING"};
// store m_down_buffers in a string
std::string down_bufs = "";
#ifndef NDEBUG
{
std::lock_guard lg(m_down_buffers_mtx);
for (auto const &down_buf: m_down_buffers) {
{
std::lock_guard lg(m_down_buffers_mtx);
if (m_down_buffers.empty()) {
fmt::format_to(std::back_inserter(down_bufs), "EMPTY");
} else {
for (auto const& down_buf : m_down_buffers) {
if (auto ptr = down_buf.lock()) {
fmt::format_to(std::back_inserter(down_bufs), "[{}]", voidptr_cast(ptr.get()));
}
}
fmt::format_to(std::back_inserter(down_bufs), " #down bufs={}", m_down_buffers.size());
}
}
#endif

return fmt::format("Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} up={} node=[{}] down=[{}]",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, int_cast(state()),
m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(),
m_node_freed ? " Freed" : "", voidptr_cast(const_cast< IndexBuffer* >(m_up_buffer.get())),
(m_bytes == nullptr) ? "not attached yet"
: r_cast< persistent_hdr_t const* >(m_bytes)->to_compact_string(),
down_bufs);
if (m_is_meta_buf) {
return fmt::format("[Meta] Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} down={{{}}}",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal,
state_str[int_cast(state())], m_created_cp_id, m_dirtied_cp_id,
m_wait_for_down_buffers.get(), m_node_freed ? " Freed" : "", down_bufs);
} else {

return fmt::format(
"Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} up={} node=[{}] down={{{}}}",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, state_str[int_cast(state())],
m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), m_node_freed ? " Freed" : "",
voidptr_cast(const_cast< IndexBuffer* >(m_up_buffer.get())),
(m_bytes == nullptr) ? "not attached yet" : r_cast< persistent_hdr_t const* >(m_bytes)->to_compact_string(),
down_bufs);
}
}

Expand All @@ -194,7 +211,7 @@ std::string IndexBuffer::to_string_dot() const {
return str;
}

void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) {
void IndexBuffer::add_down_buffer(const IndexBufferPtr& buf) {
m_wait_for_down_buffers.increment();
#ifndef NDEBUG
{
Expand All @@ -204,10 +221,11 @@ void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) {
#endif
}

void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) {
void IndexBuffer::remove_down_buffer(const IndexBufferPtr& buf) {
m_wait_for_down_buffers.decrement();
#ifndef NDEBUG
bool found{false}; {
bool found{false};
{
std::lock_guard lg(m_down_buffers_mtx);
for (auto it = buf->m_up_buffer->m_down_buffers.begin(); it != buf->m_up_buffer->m_down_buffers.end(); ++it) {
if (it->lock() == buf) {
Expand All @@ -222,12 +240,10 @@ void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) {
}

#ifndef NDEBUG
bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr &buf) {
std::lock_guard<std::mutex> lg(m_down_buffers_mtx);
for (auto const &dbuf: m_down_buffers) {
if (dbuf.lock() == buf) {
return true;
}
bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr& buf) {
std::lock_guard< std::mutex > lg(m_down_buffers_mtx);
for (auto const& dbuf : m_down_buffers) {
if (dbuf.lock() == buf) { return true; }
}
return false;
}
Expand Down
Loading

0 comments on commit 91ade04

Please sign in to comment.