diff --git a/include/pika_admin.h b/include/pika_admin.h index 67a1969820..a51bcd946f 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -185,11 +185,15 @@ class FlushallCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new FlushallCmd(*this); } - void FlushAllWithoutLock(); + bool FlushAllWithoutLock(); + void DoBinlog() override; private: void DoInitial() override; - void DoWithoutLock(std::shared_ptr db); + bool DoWithoutLock(std::shared_ptr db); + void Clear() override { flushall_succeed_ = false; } + + bool flushall_succeed_{false}; }; class FlushdbCmd : public Cmd { @@ -204,14 +208,19 @@ class FlushdbCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new FlushdbCmd(*this); } - void FlushAllDBsWithoutLock(); std::string GetFlushDBname() { return db_name_; } + void DoBinlog() override; + bool DoWithoutLock(); private: - std::string db_name_; void DoInitial() override; - void Clear() override { db_name_.clear(); } - void DoWithoutLock(); + void Clear() override { + db_name_.clear(); + flush_succeed_ = false; + } + + bool flush_succeed_{false}; + std::string db_name_; }; class ClientCmd : public Cmd { diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 81a5a36ab6..04f9bbbf5c 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -492,6 +492,7 @@ void SelectCmd::Do() { } void FlushallCmd::DoInitial() { + flushall_succeed_ = false; if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameFlushall); return; @@ -510,13 +511,20 @@ void FlushallCmd::Do() { for (const auto& db_item : g_pika_server->GetDB()) { db_item.second->DBLock(); } - FlushAllWithoutLock(); + flushall_succeed_ = FlushAllWithoutLock(); for (const auto& db_item : g_pika_server->GetDB()) { db_item.second->DBUnlock(); } g_pika_rm->DBUnlock(); - if (res_.ok()) { + if (flushall_succeed_) { res_.SetRes(CmdRes::kOk); + } else if (res_.ret() == CmdRes::kErrOther){ + //flushdb failed and the res_ was set + } else { + //flushall failed, but res_ was not set + res_.SetRes(CmdRes::kErrOther, + "Flushall failed, maybe only some of the dbs successfully flushed while some not, check WARNING/ERROR log to know " + "more, you can try again moment later"); } } @@ -525,37 +533,55 @@ void FlushallCmd::DoThroughDB() { } void FlushallCmd::DoUpdateCache(std::shared_ptr db) { + if (!flushall_succeed_) { + //flushdb failed, also don't clear the cache + return; + } // clear cache if (PIKA_CACHE_NONE != g_pika_conf->cache_mode()) { g_pika_server->ClearCacheDbAsync(db); } } -void FlushallCmd::FlushAllWithoutLock() { +bool FlushallCmd::FlushAllWithoutLock() { for (const auto& db_item : g_pika_server->GetDB()) { std::shared_ptr db = db_item.second; DBInfo p_info(db->GetDBName()); if (g_pika_rm->GetSyncMasterDBs().find(p_info) == g_pika_rm->GetSyncMasterDBs().end()) { - res_.SetRes(CmdRes::kErrOther, "DB not found"); - return; + LOG(ERROR) << p_info.db_name_ + " not found when flushall db"; + res_.SetRes(CmdRes::kErrOther,p_info.db_name_ + " not found when flushall db"); + return false; } - DoWithoutLock(db); - } - if (res_.ok()) { - res_.SetRes(CmdRes::kOk); + bool success = DoWithoutLock(db); + if (!success) { return false; } } + return true; } -void FlushallCmd::DoWithoutLock(std::shared_ptr db) { +bool FlushallCmd::DoWithoutLock(std::shared_ptr db) { if (!db) { - LOG(INFO) << "Flushall, but DB not found"; - } else { - db->FlushDBWithoutLock(); - DoUpdateCache(db); + LOG(ERROR) << "Flushall, but DB not found"; + res_.SetRes(CmdRes::kErrOther,db->GetDBName() + " not found when flushall db"); + return false; + } + bool success = db->FlushDBWithoutLock(); + if (!success) { + // if the db is not flushed, return before clear the cache + res_.SetRes(CmdRes::kErrOther,db->GetDBName() + " flushall failed due to other Errors, please check Error/Warning log to know more"); + return false; + } + DoUpdateCache(db); + + return true; +} +void FlushallCmd::DoBinlog() { + if (flushall_succeed_) { + Cmd::DoBinlog(); } } void FlushdbCmd::DoInitial() { + flush_succeed_ = false; if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameFlushdb); return; @@ -570,16 +596,22 @@ void FlushdbCmd::DoInitial() { void FlushdbCmd::Do() { if (!db_) { - res_.SetRes(CmdRes::kInvalidDB); + res_.SetRes(CmdRes::kInvalidDB, "DB not found while flushdb"); + return; + } + if (db_->IsKeyScaning()) { + res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later"); + return; + } + std::lock_guard s_prw(g_pika_rm->GetDBLock()); + std::lock_guard l_prw(db_->GetDBLock()); + flush_succeed_ = DoWithoutLock(); + if (flush_succeed_) { + res_.SetRes(CmdRes::kOk); + } else if (res_.ret() == CmdRes::kErrOther || res_.ret() == CmdRes::kInvalidParameter) { + //flushdb failed and res_ was set } else { - if (db_->IsKeyScaning()) { - res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later"); - } else { - std::lock_guard s_prw(g_pika_rm->GetDBLock()); - std::lock_guard l_prw(db_->GetDBLock()); - FlushAllDBsWithoutLock(); - res_.SetRes(CmdRes::kOk); - } + res_.SetRes(CmdRes::kErrOther, "flushdb failed, maybe you cna try again later(check WARNING/ERROR log to know more)"); } } @@ -588,31 +620,34 @@ void FlushdbCmd::DoThroughDB() { } void FlushdbCmd::DoUpdateCache() { + if (!flush_succeed_) { + //if flushdb failed, also do not clear the cache + return; + } // clear cache if (g_pika_conf->cache_mode() != PIKA_CACHE_NONE) { g_pika_server->ClearCacheDbAsync(db_); } } -void FlushdbCmd::FlushAllDBsWithoutLock() { +bool FlushdbCmd::DoWithoutLock() { + if (!db_) { + LOG(ERROR) << db_name_ << " Flushdb, but DB not found"; + res_.SetRes(CmdRes::kErrOther, db_name_ + " Flushdb, but DB not found"); + return false; + } DBInfo p_info(db_->GetDBName()); if (g_pika_rm->GetSyncMasterDBs().find(p_info) == g_pika_rm->GetSyncMasterDBs().end()) { - res_.SetRes(CmdRes::kErrOther, "DB not found"); - return; + LOG(ERROR) << "DB not found when flushing " << db_->GetDBName(); + res_.SetRes(CmdRes::kErrOther, db_->GetDBName() + " Flushdb, but DB not found"); + return false; } - DoWithoutLock(); + return db_->FlushDBWithoutLock(); } -void FlushdbCmd::DoWithoutLock() { - if (!db_) { - LOG(INFO) << "Flushdb, but DB not found"; - } else { - if (db_name_ == "all") { - db_->FlushDBWithoutLock(); - } else { - //Floyd does not support flushdb by type - LOG(ERROR) << "cannot flushdb by type in floyd"; - } +void FlushdbCmd::DoBinlog() { + if (flush_succeed_) { + Cmd::DoBinlog(); } } diff --git a/src/pika_db.cc b/src/pika_db.cc index 0905206fcd..e1c4415d03 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -204,15 +204,23 @@ bool DB::FlushDBWithoutLock() { if (dbpath[dbpath.length() - 1] == '/') { dbpath.erase(dbpath.length() - 1); } - dbpath.append("_deleting/"); - pstd::RenameFile(db_path_, dbpath); - + std::string delete_suffix("_deleting_"); + delete_suffix.append(std::to_string(NowMicros())); + delete_suffix.append("/"); + dbpath.append(delete_suffix); + auto rename_success = pstd::RenameFile(db_path_, dbpath); storage_ = std::make_shared(g_pika_conf->db_instance_num(), g_pika_conf->default_slot_num(), g_pika_conf->classic_mode()); rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), db_path_); assert(storage_); assert(s.ok()); + if (rename_success == -1) { + //the storage_->Open actually opened old RocksDB instance, so flushdb failed + LOG(WARNING) << db_name_ << " FlushDB failed due to rename old db_path_ failed"; + return false; + } LOG(INFO) << db_name_ << " Open new db success"; + g_pika_server->PurgeDir(dbpath); return true; } diff --git a/src/pika_server.cc b/src/pika_server.cc index 35efd46747..a3f03f7fd7 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -41,6 +41,7 @@ void DoPurgeDir(void* arg) { LOG(INFO) << "Delete dir: " << *path << " done"; } + PikaServer::PikaServer() : exit_(false), slow_cmd_thread_pool_flag_(g_pika_conf->slow_cmd_pool()), @@ -794,6 +795,7 @@ void PikaServer::PurgeDir(const std::string& path) { PurgeDirTaskSchedule(&DoPurgeDir, static_cast(dir_path)); } + void PikaServer::PurgeDirTaskSchedule(void (*function)(void*), void* arg) { purge_thread_.set_thread_name("PurgeDirTask"); purge_thread_.StartThread(); diff --git a/src/pika_transaction.cc b/src/pika_transaction.cc index cdf47856df..29fbac4754 100644 --- a/src/pika_transaction.cc +++ b/src/pika_transaction.cc @@ -60,7 +60,7 @@ void ExecCmd::Do() { client_conn->SetAllTxnFailed(); } else if (cmd->name() == kCmdNameFlushdb) { auto flushdb = std::dynamic_pointer_cast(cmd); - flushdb->FlushAllDBsWithoutLock(); + flushdb->DoWithoutLock(); if (cmd->res().ok()) { cmd->res().SetRes(CmdRes::kOk); }