Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Jan 29, 2025
1 parent 662cf6b commit b0f2e52
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 33 deletions.
1 change: 0 additions & 1 deletion src/include/homestore/blkdata_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ class BlkDataService {
uint32_t m_blk_size;

private:
// graceful shutdown
// graceful shutdown related
std::atomic_bool m_stopping{false};
mutable std::atomic_uint64_t pending_request_num{0};
Expand Down
4 changes: 2 additions & 2 deletions src/lib/blkdata_svc/blkdata_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ folly::Future< std::error_code > BlkDataService::async_write(sisl::sg_list const
decr_pending_request_num();
return collect_all_futures(s_futs);
}
decr_pending_request_num();
}

BlkAllocStatus BlkDataService::alloc_blks(uint32_t size, const blk_alloc_hints& hints, MultiBlkId& out_blkids) {
Expand All @@ -226,8 +225,9 @@ BlkAllocStatus BlkDataService::commit_blk(MultiBlkId const& blkid) {

if (blkid.num_pieces() == 1) {
// Shortcut to most common case
m_vdev->commit_blk(blkid);
auto ret = m_vdev->commit_blk(blkid);
decr_pending_request_num();
return ret;
}
auto it = blkid.iterate();
while (auto const bid = it.next()) {
Expand Down
40 changes: 17 additions & 23 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,9 @@ LogDev::~LogDev() {
THIS_LOGDEV_LOG(INFO, "Logdev stopping id {}", m_logdev_id);
HS_LOG_ASSERT((m_pending_flush_size.load() == 0),
"LogDev stop attempted while writes to logdev are pending completion");
{
std::unique_lock lg = flush_guard();
// waiting under lock to make sure no new flush is started
while (m_pending_callback.load() > 0) {
THIS_LOGDEV_LOG(INFO, "Waiting for pending callbacks to complete, pending callbacks {}",
m_pending_callback.load());
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
}
}
// after we call stop, we need to do any pending device truncations
truncate();

if (allow_timer_flush()) stop_timer();

{
folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx);
m_id_logstore_map.clear();
}

m_log_records = nullptr;
m_log_records.reset(nullptr);
m_logdev_meta.reset();
m_log_idx.store(0);
m_pending_flush_size.store(0);
Expand All @@ -144,16 +127,28 @@ LogDev::~LogDev() {
}

void LogDev::stop() {
// Walk through all the stores and find the least logdev_key that we can truncate
for (auto& [_, store] : m_id_logstore_map)
store.log_store->stop();

start_stopping();
while (true) {
if (!get_pending_request_num()) break;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
{
std::unique_lock lg = flush_guard();
// waiting under lock to make sure no new flush is started
while (m_pending_callback.load() > 0) {
THIS_LOGDEV_LOG(INFO, "Waiting for pending callbacks to complete, pending callbacks {}",
m_pending_callback.load());
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
}
}

folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx);
for (auto& [_, store] : m_id_logstore_map)
store.log_store->stop();

// after we call stop, we need to do any pending device truncations
truncate();
m_id_logstore_map.clear();
}

void LogDev::destroy() {
Expand Down Expand Up @@ -538,7 +533,6 @@ void LogDev::on_flush_completion(LogGroup* lg) {

uint64_t LogDev::truncate() {
auto stopping = is_stopping();
if (stopping) return 0;
incr_pending_request_num();
// Order of this lock has to be preserved. We take externally visible lock which is flush lock first. This
// prevents any further update to tail_lsn and also flushes conurrently with truncation. Then we take the store
Expand Down
4 changes: 2 additions & 2 deletions src/lib/logstore/log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ bool HomeLogStore::truncate(logstore_seq_num_t upto_lsn, bool in_memory_truncate
m_records.truncate(upto_lsn);
m_start_lsn.store(upto_lsn + 1);
if (!in_memory_truncate_only) { m_logdev->truncate(); }
return true;
decr_pending_request_num();
return true;
}

std::tuple< logstore_seq_num_t, logdev_key, logstore_seq_num_t > HomeLogStore::truncate_info() const {
Expand Down Expand Up @@ -325,8 +325,8 @@ bool HomeLogStore::foreach (int64_t start_idx, const std::function< bool(logstor
auto log_buf = m_logdev->read(record.m_dev_key);
return cb(cur_idx, log_buf);
});
return true;
decr_pending_request_num();
return true;
}

logstore_seq_num_t HomeLogStore::get_contiguous_issued_seq_num(logstore_seq_num_t from) const {
Expand Down
6 changes: 2 additions & 4 deletions src/lib/logstore/log_store_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,8 @@ void LogStoreService::stop() {
}

LogStoreService::~LogStoreService() {
{
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
m_id_logdev_map.clear();
}
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
m_id_logdev_map.clear();
}

logdev_id_t LogStoreService::get_next_logdev_id() {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ void RaftReplService::stop() {

// stop all repl_devs
std::unique_lock lg(m_rd_map_mtx);
for (auto it = m_rd_map.begin(); it != m_rd_map.end();) {
for (auto it = m_rd_map.begin(); it != m_rd_map.end(); ++it) {
auto rdev = std::dynamic_pointer_cast< RaftReplDev >(it->second);
rdev->stop();
}
Expand Down

0 comments on commit b0f2e52

Please sign in to comment.