Skip to content

Commit

Permalink
fix: fix dead lock bug (#2529)
Browse files Browse the repository at this point in the history
  • Loading branch information
luky116 authored Mar 19, 2024
1 parent 5f9ea47 commit 3d62cd6
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 67 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ jobs:
- name: Start codis, pika master and pika slave
working-directory: ${{ github.workspace }}/build
run: |
rm -rf ../deps
chmod +x ../tests/integration/start_master_and_slave.sh
../tests/integration/start_master_and_slave.sh
chmod +x ../tests/integration/start_codis.sh
Expand Down
8 changes: 4 additions & 4 deletions codis/config/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ session_break_on_failure = false
# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log
slowlog_log_slower_than = 100000

# quick command list e.g. get, set
quick_cmd_list = ""
# slow command list e.g. hgetall, mset
slow_cmd_list = ""
# quick command list
quick_cmd_list = "get,set"
# slow command list
slow_cmd_list = "mget, mset"

# Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period.
metrics_report_server = ""
Expand Down
7 changes: 0 additions & 7 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
std::shared_mutex& GetDBLock() {
return dbs_rw_;
}
std::shared_mutex& GetDBLocks() {
return db_rwlock_;
}
void DBLock() {
dbs_rw_.lock();
}
Expand Down Expand Up @@ -132,9 +129,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
void SetCompactRangeOptions(const bool is_canceled);

std::shared_ptr<pstd::lock::LockMgr> LockMgr();
void DbRWLockWriter();
void DbRWLockReader();
void DbRWUnLock();
/*
* Cache used
*/
Expand Down Expand Up @@ -167,7 +161,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
std::string log_path_;
std::string bgsave_sub_path_;
pstd::Mutex key_info_protector_;
std::shared_mutex db_rwlock_;
std::atomic<bool> binlog_io_error_;
std::shared_mutex dbs_rw_;
// class may be shared, using shared_ptr would be a better choice
Expand Down
1 change: 0 additions & 1 deletion include/pika_repl_server_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class PikaReplServerConn : public net::PbConn {
const InnerMessage::InnerRequest::TrySync& try_sync_request,
const std::shared_ptr<net::PbConn>& conn,
InnerMessage::InnerResponse::TrySync* try_sync_response);

static void HandleDBSyncRequest(void* arg);
static void HandleBinlogSyncRequest(void* arg);
static void HandleRemoveSlaveNodeRequest(void* arg);
Expand Down
18 changes: 9 additions & 9 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ void FlushdbCmd::Execute() {
if (db_->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
} else {
std::lock_guard l_prw(db_->GetDBLocks());
std::lock_guard l_prw(db_->GetDBLock());
std::lock_guard s_prw(g_pika_rm->GetDBLock());
FlushAllDBsWithoutLock();
res_.SetRes(CmdRes::kOk);
Expand Down Expand Up @@ -832,9 +832,9 @@ void ShutdownCmd::DoInitial() {
// no return
void ShutdownCmd::Do() {
DLOG(WARNING) << "handle \'shutdown\'";
db_->DbRWUnLock();
db_->DBUnlockShared();
g_pika_server->Exit();
db_->DbRWLockReader();
db_->DBLockShared();
res_.SetRes(CmdRes::kNone);
}

Expand Down Expand Up @@ -1356,11 +1356,11 @@ void InfoCmd::InfoData(std::string& info) {
}
background_errors.clear();
memtable_usage = table_reader_usage = 0;
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
for (const auto& item : background_errors) {
Expand Down Expand Up @@ -1394,9 +1394,9 @@ void InfoCmd::InfoRocksDB(std::string& info) {
continue;
}
std::string rocksdb_info;
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->GetRocksDBInfo(rocksdb_info);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
tmp_stream << rocksdb_info;
}
info.append(tmp_stream.str());
Expand Down Expand Up @@ -3141,9 +3141,9 @@ void DiskRecoveryCmd::Do() {
}
db_item.second->SetBinlogIoErrorrelieve();
background_errors_.clear();
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors_);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
for (const auto &item: background_errors_) {
if (item.second != 0) {
rocksdb::Status s = db_item.second->storage()->GetDBByType(item.first)->Resume();
Expand Down
4 changes: 2 additions & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,11 +889,11 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {

void Cmd::DoCommand(const HintKeys& hint_keys) {
if (!IsSuspend()) {
db_->DbRWLockReader();
db_->DBLockShared();
}
DEFER {
if (!IsSuspend()) {
db_->DbRWUnLock();
db_->DBUnlockShared();
}
};
if (IsNeedCacheDo()
Expand Down
12 changes: 4 additions & 8 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ void DB::SetBinlogIoError() { return binlog_io_error_.store(true); }
void DB::SetBinlogIoErrorrelieve() { return binlog_io_error_.store(false); }
bool DB::IsBinlogIoError() { return binlog_io_error_.load(); }
std::shared_ptr<pstd::lock::LockMgr> DB::LockMgr() { return lock_mgr_; }
void DB::DbRWLockReader() { db_rwlock_.lock_shared(); }
void DB::DbRWUnLock() { db_rwlock_.unlock(); }
std::shared_ptr<PikaCache> DB::cache() const { return cache_; }
std::shared_ptr<storage::Storage> DB::storage() const { return storage_; }

Expand Down Expand Up @@ -196,8 +194,6 @@ void DB::SetCompactRangeOptions(const bool is_canceled) {
storage_->SetCompactRangeOptions(is_canceled);
}

void DB::DbRWLockWriter() { db_rwlock_.lock(); }

DisplayCacheInfo DB::GetCacheInfo() {
std::lock_guard l(key_info_protector_);
return cache_info_;
Expand Down Expand Up @@ -357,7 +353,7 @@ bool DB::InitBgsaveEngine() {
}

{
std::lock_guard lock(db_rwlock_);
std::lock_guard lock(dbs_rw_);
LogOffset bgsave_offset;
// term, index are 0
db->Logger()->GetProducerStatus(&(bgsave_offset.b_offset.filenum), &(bgsave_offset.b_offset.offset));
Expand Down Expand Up @@ -546,7 +542,7 @@ bool DB::ChangeDb(const std::string& new_path) {
tmp_path += "_bak";
pstd::DeleteDirIfExist(tmp_path);

std::lock_guard l(db_rwlock_);
std::lock_guard l(dbs_rw_);
LOG(INFO) << "DB: " << db_name_ << ", Prepare change db from: " << tmp_path;
storage_.reset();

Expand Down Expand Up @@ -577,7 +573,7 @@ void DB::ClearBgsave() {
}

bool DB::FlushSubDB(const std::string& db_name) {
std::lock_guard rwl(db_rwlock_);
std::lock_guard rwl(dbs_rw_);
return FlushSubDBWithoutLock(db_name);
}

Expand Down Expand Up @@ -631,7 +627,7 @@ void DB::ResetDisplayCacheInfo(int status) {
}

bool DB::FlushDB() {
std::lock_guard rwl(db_rwlock_);
std::lock_guard rwl(dbs_rw_);
std::lock_guard l(bgsave_protector_);
return FlushDBWithoutLock();
}
4 changes: 2 additions & 2 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
pstd::lock::MultiRecordLock record_lock(c_ptr->GetDB()->LockMgr());
record_lock.Lock(c_ptr->current_key());
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DbRWLockReader();
c_ptr->GetDB()->DBLockShared();
}
if (c_ptr->IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_model()
Expand All @@ -238,7 +238,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
c_ptr->Do();
}
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DbRWUnLock();
c_ptr->GetDB()->DBUnlockShared();
}
record_lock.Unlock(c_ptr->current_key());
if (g_pika_conf->slowlog_slower_than() >= 0) {
Expand Down
22 changes: 10 additions & 12 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ bool PikaServer::IsKeyScaning() {
bool PikaServer::IsCompacting() {
std::shared_lock db_rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
std::string task_type = db_item.second->storage()->GetCurrentTaskType();
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
if (strcasecmp(task_type.data(), "no") != 0) {
return true;
}
Expand Down Expand Up @@ -445,27 +445,27 @@ void PikaServer::PrepareDBTrySync() {
void PikaServer::DBSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->SetMaxCacheStatisticKeys(max_cache_statistic_keys);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
}
}

void PikaServer::DBSetSmallCompactionThreshold(uint32_t small_compaction_threshold) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->SetSmallCompactionThreshold(small_compaction_threshold);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
}
}

void PikaServer::DBSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockReader();
db_item.second->DBLockShared();
db_item.second->storage()->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold);
db_item.second->DbRWUnLock();
db_item.second->DBUnlockShared();
}
}

Expand Down Expand Up @@ -1381,9 +1381,7 @@ storage::Status PikaServer::RewriteStorageOptions(const storage::OptionType& opt
storage::Status s;
std::shared_lock db_rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockWriter();
s = db_item.second->storage()->SetOptions(option_type, storage::ALL_DB, options_map);
db_item.second->DbRWUnLock();
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -1562,9 +1560,9 @@ void PikaServer::DisableCompact() {
/* cancel in-progress manual compactions */
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DbRWLockWriter();
db_item.second->DBLock();
db_item.second->SetCompactRangeOptions(true);
db_item.second->DbRWUnLock();
db_item.second->DBUnlock();
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,22 @@ void ExecCmd::Lock() {
g_pika_rm->DBLock();
}

std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_slot) {
if (lock_db_keys_.count(need_lock_slot) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_slot->LockMgr());
record_lock.Lock(lock_db_keys_[need_lock_slot]);
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_db) {
if (lock_db_keys_.count(need_lock_db) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_db->LockMgr());
record_lock.Lock(lock_db_keys_[need_lock_db]);
}
need_lock_slot->DbRWLockReader();
need_lock_db->DBLockShared();
});
}

void ExecCmd::Unlock() {
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_slot) {
if (lock_db_keys_.count(need_lock_slot) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_slot->LockMgr());
record_lock.Unlock(lock_db_keys_[need_lock_slot]);
std::for_each(r_lock_dbs_.begin(), r_lock_dbs_.end(), [this](auto& need_lock_db) {
if (lock_db_keys_.count(need_lock_db) != 0) {
pstd::lock::MultiRecordLock record_lock(need_lock_db->LockMgr());
record_lock.Unlock(lock_db_keys_[need_lock_db]);
}
need_lock_slot->DbRWUnLock();
need_lock_db->DBUnlockShared();
});
if (is_lock_rm_dbs_) {
g_pika_rm->DBUnlock();
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (
func TestPikaWithCache(t *testing.T) {
GlobalBefore = func(ctx context.Context, client *redis.Client) {
Expect(client.SlaveOf(ctx, "NO", "ONE").Err()).NotTo(HaveOccurred())
Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
//Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
Expect(client.ConfigSet(ctx, "cache-model", "1").Err()).NotTo(HaveOccurred())
}
RegisterFailHandler(Fail)
Expand All @@ -26,7 +26,7 @@ func TestPikaWithCache(t *testing.T) {
func TestPikaWithoutCache(t *testing.T) {
GlobalBefore = func(ctx context.Context, client *redis.Client) {
Expect(client.SlaveOf(ctx, "NO", "ONE").Err()).NotTo(HaveOccurred())
Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
//Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
Expect(client.ConfigSet(ctx, "cache-model", "0").Err()).NotTo(HaveOccurred())
}
RegisterFailHandler(Fail)
Expand Down
21 changes: 11 additions & 10 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func cleanEnv(ctx context.Context, clientMaster, clientSlave *redis.Client) {
r := clientSlave.Do(ctx, "slaveof", "no", "one")
Expect(r.Err()).NotTo(HaveOccurred())
Expect(r.Val()).To(Equal("OK"))
r = clientSlave.Do(ctx, "clearreplicationid")
r = clientMaster.Do(ctx, "clearreplicationid")
Expect(clientSlave.Do(ctx, "clearreplicationid").Err()).NotTo(HaveOccurred())
Expect(clientMaster.Do(ctx, "clearreplicationid").Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
}

Expand Down Expand Up @@ -373,8 +373,8 @@ var _ = Describe("should replication ", func() {
BeforeEach(func() {
clientMaster = redis.NewClient(PikaOption(MASTERADDR))
clientSlave = redis.NewClient(PikaOption(SLAVEADDR))
Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred())
//Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred())
//Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred())
cleanEnv(ctx, clientMaster, clientSlave)
if GlobalBefore != nil {
GlobalBefore(ctx, clientMaster)
Expand All @@ -383,8 +383,8 @@ var _ = Describe("should replication ", func() {
})
AfterEach(func() {
cleanEnv(ctx, clientMaster, clientSlave)
Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred())
//Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred())
//Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(clientSlave.Close()).NotTo(HaveOccurred())
Expect(clientMaster.Close()).NotTo(HaveOccurred())
log.Println("Replication test case done")
Expand Down Expand Up @@ -441,7 +441,8 @@ var _ = Describe("should replication ", func() {
set1 := clientMaster.Set(ctx, "a", "b", 0)
Expect(set1.Err()).NotTo(HaveOccurred())
Expect(set1.Val()).To(Equal("OK"))
Expect(clientMaster.Del(ctx, "x").Err()).NotTo(HaveOccurred())
time.Sleep(3 * time.Second)
Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred())
//TODO Use del instead of flushdb after the flushDB problem is fixed
Eventually(func() error {
return clientMaster.Get(ctx, "x").Err()
Expand All @@ -456,9 +457,9 @@ var _ = Describe("should replication ", func() {
Expect(clientMaster.Del(ctx, "blist0", "blist1", "blist").Err()).NotTo(HaveOccurred())
execute(&ctx, clientMaster, 4, rpoplpushThread)
// TODO, the problem was not reproduced locally, record an issue first: https://github.com/OpenAtomFoundation/pika/issues/2492
//for i := int64(0); i < clientMaster.LLen(ctx, "blist").Val(); i++ {
// Expect(clientMaster.LIndex(ctx, "blist", i)).To(Equal(clientSlave.LIndex(ctx, "blist", i)))
//}
for i := int64(0); i < clientMaster.LLen(ctx, "blist").Val(); i++ {
Expect(clientMaster.LIndex(ctx, "blist", i)).To(Equal(clientSlave.LIndex(ctx, "blist", i)))
}
Expect(clientMaster.Del(ctx, "blist0", "blist1", "blist").Err()).NotTo(HaveOccurred())
log.Println("rpoplpush test success")

Expand Down

0 comments on commit 3d62cd6

Please sign in to comment.