diff --git a/include/pika_dispatch_thread.h b/include/pika_dispatch_thread.h index 68157e82df..01a6fe96b0 100644 --- a/include/pika_dispatch_thread.h +++ b/include/pika_dispatch_thread.h @@ -23,6 +23,7 @@ class PikaDispatchThread { void SetQueueLimit(int queue_limit) { thread_rep_->SetQueueLimit(queue_limit); } void UnAuthUserAndKillClient(const std::set &users, const std::shared_ptr& defaultUser); + net::ServerThread* server_thread() { return thread_rep_; } private: class ClientConnFactory : public net::ConnFactory { diff --git a/include/pika_server.h b/include/pika_server.h index a3a4b8a60c..76d74db185 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -106,6 +106,9 @@ class PikaServer : public pstd::noncopyable { void SetDispatchQueueLimit(int queue_limit); void SetSlowCmdThreadPoolFlag(bool flag); storage::StorageOptions storage_options(); + std::unique_ptr& pika_dispatch_thread() { + return pika_dispatch_thread_; + } /* * DB use diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 40aad17b4e..85e740de1a 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -116,6 +116,11 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st } if (IsInTxn() && opt != kCmdNameExec && opt != kCmdNameWatch && opt != kCmdNameDiscard && opt != kCmdNameMulti) { + if (c_ptr->is_write() && g_pika_server->readonly(current_db_)) { + SetTxnInitFailState(true); + c_ptr->res().SetRes(CmdRes::kErrOther, "READONLY You can't write against a read only replica."); + return c_ptr; + } PushCmdToQue(c_ptr); c_ptr->res().SetRes(CmdRes::kTxnQueued); return c_ptr; @@ -160,8 +165,8 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st c_ptr->res().SetRes(CmdRes::kErrOther, "Internal ERROR"); return c_ptr; } - if (g_pika_server->readonly(current_db_)) { - c_ptr->res().SetRes(CmdRes::kErrOther, "Server in read-only"); + if (g_pika_server->readonly(current_db_) && opt != kCmdNameExec) { + c_ptr->res().SetRes(CmdRes::kErrOther, "READONLY You can't write against a read only replica."); return c_ptr; } } else if (c_ptr->is_read() && c_ptr->flag_ == 0) { diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 90f1849602..52afad4ba1 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -244,6 +244,24 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr& c_ptr) { if (!c_ptr->IsSuspend()) { c_ptr->GetDB()->DBUnlockShared(); } + + if (c_ptr->res().ok() + && c_ptr->is_write() + && c_ptr->name() != kCmdNameFlushdb + && c_ptr->name() != kCmdNameFlushall + && c_ptr->name() != kCmdNameExec) { + auto table_keys = c_ptr->current_key(); + for (auto& key : table_keys) { + key = c_ptr->db_name().append(key); + } + auto dispatcher = dynamic_cast(g_pika_server->pika_dispatch_thread()->server_thread()); + auto involved_conns = dispatcher->GetInvolvedTxn(table_keys); + for (auto& conn : involved_conns) { + auto c = std::dynamic_pointer_cast(conn); + c->SetTxnWatchFailState(true); + } + } + record_lock.Unlock(c_ptr->current_key()); if (g_pika_conf->slowlog_slower_than() >= 0) { auto start_time = static_cast(start_us / 1000000); diff --git a/src/pika_transaction.cc b/src/pika_transaction.cc index ceadf92d21..85381dcf8d 100644 --- a/src/pika_transaction.cc +++ b/src/pika_transaction.cc @@ -25,7 +25,7 @@ void MultiCmd::Do() { return; } if (client_conn->IsInTxn()) { - res_.SetRes(CmdRes::kErrOther, "ERR MULTI calls can not be nested"); + res_.SetRes(CmdRes::kErrOther, "MULTI calls can not be nested"); return; } client_conn->SetTxnStartState(true); @@ -246,7 +246,7 @@ void WatchCmd::Do() { return; } if (client_conn->IsInTxn()) { - res_.SetRes(CmdRes::CmdRet::kErrOther, "ERR WATCH inside MULTI is not allowed"); + res_.SetRes(CmdRes::CmdRet::kErrOther, "WATCH inside MULTI is not allowed"); return; } client_conn->AddKeysToWatch(db_keys_); @@ -305,7 +305,7 @@ void DiscardCmd::Do() { return; } if (!client_conn->IsInTxn()) { - res_.SetRes(CmdRes::kErrOther, "ERR DISCARD without MULTI"); + res_.SetRes(CmdRes::kErrOther, "DISCARD without MULTI"); return; } client_conn->ExitTxn(); diff --git a/tests/integration/replication_test.go b/tests/integration/replication_test.go index 2df9b71a71..ab36174024 100644 --- a/tests/integration/replication_test.go +++ b/tests/integration/replication_test.go @@ -661,48 +661,140 @@ var _ = Describe("should replication ", func() { } err = clientMaster.Del(ctx, lists...) - //The test below is related with issue: https://github.com/OpenAtomFoundation/pika/issues/2643 - r1 := clientMaster.Do(ctx, "MULTI") - Expect(r1.Err()).NotTo(HaveOccurred()) - - setkey1 := clientMaster.Set(ctx, "Tnxkey1", "Tnxvalue1", 0) - Expect(setkey1.Err()).NotTo(HaveOccurred()) - Expect(setkey1.Val()).To(Equal("QUEUED")) - - setkey2 := clientMaster.Set(ctx, "Tnxkey2", "Tnxvalue2", 0) - Expect(setkey2.Err()).NotTo(HaveOccurred()) - Expect(setkey2.Val()).To(Equal("QUEUED")) - - r2 := clientMaster.Do(ctx, "EXEC") - Expect(r2.Err()).NotTo(HaveOccurred()) - Expect(r2.Val()).To(Equal([]interface{}{"OK", "OK"})) - - time.Sleep(3 * time.Second) - - getkey1 := clientSlave.Get(ctx, "Tnxkey1") - Expect(getkey1.Err()).NotTo(HaveOccurred()) - Expect(getkey1.Val()).To(Equal("Tnxvalue1")) - - getkey2 := clientSlave.Get(ctx, "Tnxkey2") - Expect(getkey2.Err()).NotTo(HaveOccurred()) - Expect(getkey2.Val()).To(Equal("Tnxvalue2")) - - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - loopCount := 0 - - for loopCount < 10 { - select { - case <-ticker.C: - infoResExec := clientSlave.Info(ctx, "replication") - Expect(infoResExec.Err()).NotTo(HaveOccurred()) - Expect(infoResExec.Val()).To(ContainSubstring("master_link_status:up")) - loopCount++ - if loopCount >= 10 { - ticker.Stop() - } - } - } + // transaction replication test + log.Println("transaction replication test start") + clientMaster.Set(ctx, "txkey1", "txvalue1", 0) + time.Sleep(time.Second) + + // transaction: multi/get/get/exec + r := clientSlave.Do(ctx, "MULTI") + Expect(r.Err()).NotTo(HaveOccurred()) + get := clientSlave.Get(ctx, "txkey1") + Expect(get.Err()).NotTo(HaveOccurred()) + Expect(get.Val()).To(Equal("QUEUED")) + get = clientSlave.Get(ctx, "txkey2") + Expect(get.Err()).NotTo(HaveOccurred()) + Expect(get.Val()).To(Equal("QUEUED")) + r = clientSlave.Do(ctx, "EXEC") + Expect(r.Err()).NotTo(HaveOccurred()) + Expect(r.Val()).To(Equal([]interface{}{"txvalue1", nil})) + + // transaction: multi/get/get/exec + pipeline := clientSlave.TxPipeline() + pipeline.Get(ctx, "txkey1") + pipeline.Get(ctx, "txkey2") + result, perr := pipeline.Exec(ctx) + Expect(perr).To(Equal(redis.Nil)) + AssertEqualRedisString("txvalue1", result[0]) + Expect(result[1].Err()).To(Equal(redis.Nil)) + + // transaction: multi/get/set/exec + r = clientSlave.Do(ctx, "MULTI") + Expect(r.Err()).NotTo(HaveOccurred()) + get = clientSlave.Get(ctx, "txkey1") + Expect(get.Err()).NotTo(HaveOccurred()) + Expect(get.Val()).To(Equal("QUEUED")) + set = clientSlave.Set(ctx, "txkey2", "txvalue2", 0) + Expect(set.Err().Error()).To(Equal("ERR READONLY You can't write against a read only replica.")) + r = clientSlave.Do(ctx, "EXEC") + Expect(r.Err().Error()).To(Equal("EXECABORT Transaction discarded because of previous errors.")) + + // transaction: multi/get/set/exec + pipeline = clientSlave.TxPipeline() + pipeline.Get(ctx, "txkey1") + pipeline.Set(ctx, "txkey2", "txvalue2", 0) + result, perr = pipeline.Exec(ctx) + Expect(perr.Error()).To(Equal("EXECABORT Transaction discarded because of previous errors.")) + + // transaction: watch/multi/master-set/exec + r = clientSlave.Do(ctx, "WATCH", "txkey1") + Expect(r.Err()).NotTo(HaveOccurred()) + r = clientSlave.Do(ctx, "MULTI") + Expect(r.Err()).NotTo(HaveOccurred()) + set = clientMaster.Set(ctx, "txkey1", "txvalue11", 0) + Expect(set.Err()).NotTo(HaveOccurred()) + Expect(set.Val()).To(Equal("OK")) + r = clientSlave.Do(ctx, "EXEC") + Expect(r.Err()).NotTo(HaveOccurred()) + Expect(r.Val()).To(Equal([]interface{}{})) + + // transaction: multi/get/discard + r = clientSlave.Do(ctx, "MULTI") + Expect(r.Err()).NotTo(HaveOccurred()) + get = clientSlave.Get(ctx, "txkey1") + Expect(get.Err()).NotTo(HaveOccurred()) + Expect(get.Val()).To(Equal("QUEUED")) + r = clientSlave.Do(ctx, "DISCARD") + Expect(r.Err()).NotTo(HaveOccurred()) + Expect(r.Val()).To(Equal("OK")) + + // transaction: watch/unwatch + r = clientSlave.Do(ctx, "WATCH", "txkey1") + Expect(r.Err()).NotTo(HaveOccurred()) + Expect(r.Val()).To(Equal("OK")) + r = clientSlave.Do(ctx, "UNWATCH") + Expect(r.Err()).NotTo(HaveOccurred()) + Expect(r.Val()).To(Equal("OK")) + + // transaction: times-multi/get/exec + r = clientSlave.Do(ctx, "MULTI") + Expect(r.Err()).NotTo(HaveOccurred()) + r = clientSlave.Do(ctx, "MULTI") + Expect(r.Err()).To(MatchError("ERR MULTI calls can not be nested")) + get = clientSlave.Get(ctx, "txkey1") + Expect(get.Err()).NotTo(HaveOccurred()) + Expect(get.Val()).To(Equal("QUEUED")) + r = clientSlave.Do(ctx, "EXEC") + Expect(r.Err()).NotTo(HaveOccurred()) + Expect(r.Val()).To(Equal([]interface{}{"txvalue11"})) + + // transaction: exec without multi + r = clientSlave.Do(ctx, "EXEC") + Expect(r.Err()).To(MatchError("ERR EXEC without MULTI")) + + err = clientMaster.Del(ctx, "txkey1") + /The test below is related with issue: https://github.com/OpenAtomFoundation/pika/issues/2643 + r1 := clientMaster.Do(ctx, "MULTI") + Expect(r1.Err()).NotTo(HaveOccurred()) + + setkey1 := clientMaster.Set(ctx, "Tnxkey1", "Tnxvalue1", 0) + Expect(setkey1.Err()).NotTo(HaveOccurred()) + Expect(setkey1.Val()).To(Equal("QUEUED")) + + setkey2 := clientMaster.Set(ctx, "Tnxkey2", "Tnxvalue2", 0) + Expect(setkey2.Err()).NotTo(HaveOccurred()) + Expect(setkey2.Val()).To(Equal("QUEUED")) + + r2 := clientMaster.Do(ctx, "EXEC") + Expect(r2.Err()).NotTo(HaveOccurred()) + Expect(r2.Val()).To(Equal([]interface{}{"OK", "OK"})) + + time.Sleep(3 * time.Second) + + getkey1 := clientSlave.Get(ctx, "Tnxkey1") + Expect(getkey1.Err()).NotTo(HaveOccurred()) + Expect(getkey1.Val()).To(Equal("Tnxvalue1")) + + getkey2 := clientSlave.Get(ctx, "Tnxkey2") + Expect(getkey2.Err()).NotTo(HaveOccurred()) + Expect(getkey2.Val()).To(Equal("Tnxvalue2")) + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + loopCount := 0 + + for loopCount < 10 { + select { + case <-ticker.C: + infoResExec := clientSlave.Info(ctx, "replication") + Expect(infoResExec.Err()).NotTo(HaveOccurred()) + Expect(infoResExec.Val()).To(ContainSubstring("master_link_status:up")) + loopCount++ + if loopCount >= 10 { + ticker.Stop() + } + } + } log.Println("master-slave replication test success") }) It("should simulate the master node setex and incr operation", func() {