Skip to content

Commit

Permalink
fix: let pika slave support Redis transaction (#2441)
Browse files Browse the repository at this point in the history
* fix: pika slave support multi/exec cmd

#2422
Signed-off-by: HappyUncle <[email protected]>

* fix: pika slave support watch cmd

#2422
Signed-off-by: HappyUncle <[email protected]>

* test: add replication-test-go, let slave support watch/unwatch/multi/exec/discard

Signed-off-by: HappyUncle <[email protected]>

---------

Signed-off-by: HappyUncle <[email protected]>
  • Loading branch information
happy-v587 authored and brother-jin committed Aug 14, 2024
1 parent 84b234f commit 0aaa704
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 47 deletions.
1 change: 1 addition & 0 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class PikaDispatchThread {
void SetQueueLimit(int queue_limit) { thread_rep_->SetQueueLimit(queue_limit); }

void UnAuthUserAndKillClient(const std::set<std::string> &users, const std::shared_ptr<User>& defaultUser);
net::ServerThread* server_thread() { return thread_rep_; }

private:
class ClientConnFactory : public net::ConnFactory {
Expand Down
3 changes: 3 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class PikaServer : public pstd::noncopyable {
void SetDispatchQueueLimit(int queue_limit);
void SetSlowCmdThreadPoolFlag(bool flag);
storage::StorageOptions storage_options();
std::unique_ptr<PikaDispatchThread>& pika_dispatch_thread() {
return pika_dispatch_thread_;
}

/*
* DB use
Expand Down
9 changes: 7 additions & 2 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
}

if (IsInTxn() && opt != kCmdNameExec && opt != kCmdNameWatch && opt != kCmdNameDiscard && opt != kCmdNameMulti) {
if (c_ptr->is_write() && g_pika_server->readonly(current_db_)) {
SetTxnInitFailState(true);
c_ptr->res().SetRes(CmdRes::kErrOther, "READONLY You can't write against a read only replica.");
return c_ptr;
}
PushCmdToQue(c_ptr);
c_ptr->res().SetRes(CmdRes::kTxnQueued);
return c_ptr;
Expand Down Expand Up @@ -160,8 +165,8 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
c_ptr->res().SetRes(CmdRes::kErrOther, "Internal ERROR");
return c_ptr;
}
if (g_pika_server->readonly(current_db_)) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Server in read-only");
if (g_pika_server->readonly(current_db_) && opt != kCmdNameExec) {
c_ptr->res().SetRes(CmdRes::kErrOther, "READONLY You can't write against a read only replica.");
return c_ptr;
}
} else if (c_ptr->is_read() && c_ptr->flag_ == 0) {
Expand Down
18 changes: 18 additions & 0 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,24 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DBUnlockShared();
}

if (c_ptr->res().ok()
&& c_ptr->is_write()
&& c_ptr->name() != kCmdNameFlushdb
&& c_ptr->name() != kCmdNameFlushall
&& c_ptr->name() != kCmdNameExec) {
auto table_keys = c_ptr->current_key();
for (auto& key : table_keys) {
key = c_ptr->db_name().append(key);
}
auto dispatcher = dynamic_cast<net::DispatchThread*>(g_pika_server->pika_dispatch_thread()->server_thread());
auto involved_conns = dispatcher->GetInvolvedTxn(table_keys);
for (auto& conn : involved_conns) {
auto c = std::dynamic_pointer_cast<PikaClientConn>(conn);
c->SetTxnWatchFailState(true);
}
}

record_lock.Unlock(c_ptr->current_key());
if (g_pika_conf->slowlog_slower_than() >= 0) {
auto start_time = static_cast<int32_t>(start_us / 1000000);
Expand Down
6 changes: 3 additions & 3 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void MultiCmd::Do() {
return;
}
if (client_conn->IsInTxn()) {
res_.SetRes(CmdRes::kErrOther, "ERR MULTI calls can not be nested");
res_.SetRes(CmdRes::kErrOther, "MULTI calls can not be nested");
return;
}
client_conn->SetTxnStartState(true);
Expand Down Expand Up @@ -246,7 +246,7 @@ void WatchCmd::Do() {
return;
}
if (client_conn->IsInTxn()) {
res_.SetRes(CmdRes::CmdRet::kErrOther, "ERR WATCH inside MULTI is not allowed");
res_.SetRes(CmdRes::CmdRet::kErrOther, "WATCH inside MULTI is not allowed");
return;
}
client_conn->AddKeysToWatch(db_keys_);
Expand Down Expand Up @@ -305,7 +305,7 @@ void DiscardCmd::Do() {
return;
}
if (!client_conn->IsInTxn()) {
res_.SetRes(CmdRes::kErrOther, "ERR DISCARD without MULTI");
res_.SetRes(CmdRes::kErrOther, "DISCARD without MULTI");
return;
}
client_conn->ExitTxn();
Expand Down
176 changes: 134 additions & 42 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,48 +661,140 @@ var _ = Describe("should replication ", func() {
}
err = clientMaster.Del(ctx, lists...)

//The test below is related with issue: https://github.com/OpenAtomFoundation/pika/issues/2643
r1 := clientMaster.Do(ctx, "MULTI")
Expect(r1.Err()).NotTo(HaveOccurred())

setkey1 := clientMaster.Set(ctx, "Tnxkey1", "Tnxvalue1", 0)
Expect(setkey1.Err()).NotTo(HaveOccurred())
Expect(setkey1.Val()).To(Equal("QUEUED"))

setkey2 := clientMaster.Set(ctx, "Tnxkey2", "Tnxvalue2", 0)
Expect(setkey2.Err()).NotTo(HaveOccurred())
Expect(setkey2.Val()).To(Equal("QUEUED"))

r2 := clientMaster.Do(ctx, "EXEC")
Expect(r2.Err()).NotTo(HaveOccurred())
Expect(r2.Val()).To(Equal([]interface{}{"OK", "OK"}))

time.Sleep(3 * time.Second)

getkey1 := clientSlave.Get(ctx, "Tnxkey1")
Expect(getkey1.Err()).NotTo(HaveOccurred())
Expect(getkey1.Val()).To(Equal("Tnxvalue1"))

getkey2 := clientSlave.Get(ctx, "Tnxkey2")
Expect(getkey2.Err()).NotTo(HaveOccurred())
Expect(getkey2.Val()).To(Equal("Tnxvalue2"))

ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
loopCount := 0

for loopCount < 10 {
select {
case <-ticker.C:
infoResExec := clientSlave.Info(ctx, "replication")
Expect(infoResExec.Err()).NotTo(HaveOccurred())
Expect(infoResExec.Val()).To(ContainSubstring("master_link_status:up"))
loopCount++
if loopCount >= 10 {
ticker.Stop()
}
}
}
// transaction replication test
log.Println("transaction replication test start")
clientMaster.Set(ctx, "txkey1", "txvalue1", 0)
time.Sleep(time.Second)

// transaction: multi/get/get/exec
r := clientSlave.Do(ctx, "MULTI")
Expect(r.Err()).NotTo(HaveOccurred())
get := clientSlave.Get(ctx, "txkey1")
Expect(get.Err()).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("QUEUED"))
get = clientSlave.Get(ctx, "txkey2")
Expect(get.Err()).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("QUEUED"))
r = clientSlave.Do(ctx, "EXEC")
Expect(r.Err()).NotTo(HaveOccurred())
Expect(r.Val()).To(Equal([]interface{}{"txvalue1", nil}))

// transaction: multi/get/get/exec
pipeline := clientSlave.TxPipeline()
pipeline.Get(ctx, "txkey1")
pipeline.Get(ctx, "txkey2")
result, perr := pipeline.Exec(ctx)
Expect(perr).To(Equal(redis.Nil))
AssertEqualRedisString("txvalue1", result[0])
Expect(result[1].Err()).To(Equal(redis.Nil))

// transaction: multi/get/set/exec
r = clientSlave.Do(ctx, "MULTI")
Expect(r.Err()).NotTo(HaveOccurred())
get = clientSlave.Get(ctx, "txkey1")
Expect(get.Err()).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("QUEUED"))
set = clientSlave.Set(ctx, "txkey2", "txvalue2", 0)
Expect(set.Err().Error()).To(Equal("ERR READONLY You can't write against a read only replica."))
r = clientSlave.Do(ctx, "EXEC")
Expect(r.Err().Error()).To(Equal("EXECABORT Transaction discarded because of previous errors."))

// transaction: multi/get/set/exec
pipeline = clientSlave.TxPipeline()
pipeline.Get(ctx, "txkey1")
pipeline.Set(ctx, "txkey2", "txvalue2", 0)
result, perr = pipeline.Exec(ctx)
Expect(perr.Error()).To(Equal("EXECABORT Transaction discarded because of previous errors."))

// transaction: watch/multi/master-set/exec
r = clientSlave.Do(ctx, "WATCH", "txkey1")
Expect(r.Err()).NotTo(HaveOccurred())
r = clientSlave.Do(ctx, "MULTI")
Expect(r.Err()).NotTo(HaveOccurred())
set = clientMaster.Set(ctx, "txkey1", "txvalue11", 0)
Expect(set.Err()).NotTo(HaveOccurred())
Expect(set.Val()).To(Equal("OK"))
r = clientSlave.Do(ctx, "EXEC")
Expect(r.Err()).NotTo(HaveOccurred())
Expect(r.Val()).To(Equal([]interface{}{}))

// transaction: multi/get/discard
r = clientSlave.Do(ctx, "MULTI")
Expect(r.Err()).NotTo(HaveOccurred())
get = clientSlave.Get(ctx, "txkey1")
Expect(get.Err()).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("QUEUED"))
r = clientSlave.Do(ctx, "DISCARD")
Expect(r.Err()).NotTo(HaveOccurred())
Expect(r.Val()).To(Equal("OK"))

// transaction: watch/unwatch
r = clientSlave.Do(ctx, "WATCH", "txkey1")
Expect(r.Err()).NotTo(HaveOccurred())
Expect(r.Val()).To(Equal("OK"))
r = clientSlave.Do(ctx, "UNWATCH")
Expect(r.Err()).NotTo(HaveOccurred())
Expect(r.Val()).To(Equal("OK"))

// transaction: times-multi/get/exec
r = clientSlave.Do(ctx, "MULTI")
Expect(r.Err()).NotTo(HaveOccurred())
r = clientSlave.Do(ctx, "MULTI")
Expect(r.Err()).To(MatchError("ERR MULTI calls can not be nested"))
get = clientSlave.Get(ctx, "txkey1")
Expect(get.Err()).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("QUEUED"))
r = clientSlave.Do(ctx, "EXEC")
Expect(r.Err()).NotTo(HaveOccurred())
Expect(r.Val()).To(Equal([]interface{}{"txvalue11"}))

// transaction: exec without multi
r = clientSlave.Do(ctx, "EXEC")
Expect(r.Err()).To(MatchError("ERR EXEC without MULTI"))

err = clientMaster.Del(ctx, "txkey1")
/The test below is related with issue: https://github.com/OpenAtomFoundation/pika/issues/2643
r1 := clientMaster.Do(ctx, "MULTI")
Expect(r1.Err()).NotTo(HaveOccurred())

setkey1 := clientMaster.Set(ctx, "Tnxkey1", "Tnxvalue1", 0)
Expect(setkey1.Err()).NotTo(HaveOccurred())
Expect(setkey1.Val()).To(Equal("QUEUED"))

setkey2 := clientMaster.Set(ctx, "Tnxkey2", "Tnxvalue2", 0)
Expect(setkey2.Err()).NotTo(HaveOccurred())
Expect(setkey2.Val()).To(Equal("QUEUED"))

r2 := clientMaster.Do(ctx, "EXEC")
Expect(r2.Err()).NotTo(HaveOccurred())
Expect(r2.Val()).To(Equal([]interface{}{"OK", "OK"}))

time.Sleep(3 * time.Second)

getkey1 := clientSlave.Get(ctx, "Tnxkey1")
Expect(getkey1.Err()).NotTo(HaveOccurred())
Expect(getkey1.Val()).To(Equal("Tnxvalue1"))

getkey2 := clientSlave.Get(ctx, "Tnxkey2")
Expect(getkey2.Err()).NotTo(HaveOccurred())
Expect(getkey2.Val()).To(Equal("Tnxvalue2"))

ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
loopCount := 0

for loopCount < 10 {
select {
case <-ticker.C:
infoResExec := clientSlave.Info(ctx, "replication")
Expect(infoResExec.Err()).NotTo(HaveOccurred())
Expect(infoResExec.Val()).To(ContainSubstring("master_link_status:up"))
loopCount++
if loopCount >= 10 {
ticker.Stop()
}
}
}
log.Println("master-slave replication test success")
})
It("should simulate the master node setex and incr operation", func() {
Expand Down

0 comments on commit 0aaa704

Please sign in to comment.