Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make pika compactible with redis-sentinel #2854

Merged
6 changes: 4 additions & 2 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ class ClientCmd : public Cmd {
Cmd* Clone() override { return new ClientCmd(*this); }

private:
std::string operation_, info_;
const static std::string KILLTYPE_NORMAL;
const static std::string KILLTYPE_PUBSUB;

std::string operation_, info_, kill_type_;
void DoInitial() override;
};

Expand All @@ -260,7 +263,6 @@ class InfoCmd : public Cmd {
kInfoCommandStats,
kInfoCache
};

InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
Expand Down
2 changes: 1 addition & 1 deletion include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class PikaDispatchThread {
int max_conn_rbuf_size);
~PikaDispatchThread();
int StartThread();

void StopThread();
uint64_t ThreadClientList(std::vector<ClientInfo>* clients);

bool ClientKill(const std::string& ip_port);
Expand Down
5 changes: 5 additions & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class PikaReplBgWorker {
explicit PikaReplBgWorker(int queue_size);
int StartThread();
int StopThread();
int TaskQueueSize() {
int pri_size, qu_size;
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
bg_thread_.QueueSize(&pri_size, &qu_size);
return pri_size + qu_size;
}
void Schedule(net::TaskFunc func, void* arg);
void Schedule(net::TaskFunc func, void* arg, std::function<void()>& call_back);
static void HandleBGWorkerWriteBinlog(void* arg);
Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ class PikaServer : public pstd::noncopyable {
void ClientKillAll();
int ClientKill(const std::string& ip_port);
int64_t ClientList(std::vector<ClientInfo>* clients = nullptr);
void ClientKillPubSub();
void ClientKillAllNormal();

/*
* Monitor used
Expand Down
4 changes: 3 additions & 1 deletion src/net/include/net_pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,20 @@ class PubSubThread : public Thread {
bool IsReady(int fd);
int ClientPubSubChannelSize(const std::shared_ptr<NetConn>& conn);
int ClientPubSubChannelPatternSize(const std::shared_ptr<NetConn>& conn);
void NotifyToCloseAllConns();

private:
void RemoveConn(const std::shared_ptr<NetConn>& conn);
void CloseConn(const std::shared_ptr<NetConn>& conn);

void CloseAllConns();
int ClientChannelSize(const std::shared_ptr<NetConn>& conn);

int msg_pfd_[2];
bool should_exit_;

mutable pstd::RWMutex rwlock_; /* For external statistics */
std::map<int, std::shared_ptr<ConnHandle>> conns_;
std::atomic<bool> close_all_conn_sig_{false};

pstd::Mutex pub_mutex_;
pstd::CondVar receiver_rsignal_;
Expand Down
23 changes: 23 additions & 0 deletions src/net/src/net_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,20 @@ void PubSubThread::CloseConn(const std::shared_ptr<NetConn>& conn) {
}
}

void PubSubThread::CloseAllConns() {
std::lock_guard l(rwlock_);

pubsub_channel_.clear();
pubsub_pattern_.clear();

for (auto& pair : conns_) {
net_multiplexer_->NetDelEvent(pair.second->conn->fd(), 0);
CloseFd(pair.second->conn);
}
std::map<int, std::shared_ptr<ConnHandle>> empty_conns;
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
conns_.swap(empty_conns);
}

int PubSubThread::Publish(const std::string& channel, const std::string& msg) {
// TODO(LIBA-S): change the Publish Mode to Asynchronous
std::lock_guard lk(pub_mutex_);
Expand Down Expand Up @@ -414,6 +428,12 @@ void* PubSubThread::ThreadMain() {
char triger[1];

while (!should_stop()) {

if (close_all_conn_sig_.load()) {
close_all_conn_sig_.store(false);
CloseAllConns();
}

nfds = net_multiplexer_->NetPoll(NET_CRON_INTERVAL);
for (int i = 0; i < nfds; i++) {
pfe = (net_multiplexer_->FiredEvents()) + i;
Expand Down Expand Up @@ -585,4 +605,7 @@ void PubSubThread::Cleanup() {
}
conns_.clear();
}
void PubSubThread::NotifyToCloseAllConns() {
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
close_all_conn_sig_.store(true);
}
}; // namespace net
4 changes: 3 additions & 1 deletion src/net/src/worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ void WorkerThread::DoCronTask() {
}
conns_.clear();
deleting_conn_ipport_.clear();
return;
}

auto iter = conns_.begin();
Expand Down Expand Up @@ -273,10 +272,13 @@ void WorkerThread::DoCronTask() {
++iter;
}
}

for (const auto& conn : to_close) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
}
for (const auto& conn : to_timeout) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
server_thread_->handle_->FdTimeoutHandle(conn->fd(), conn->ip_port());
}
Expand Down
33 changes: 32 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,13 @@ void ClientCmd::DoInitial() {
}
} else if ((strcasecmp(argv_[1].data(), "kill") == 0) && argv_.size() == 3) {
info_ = argv_[2];
} else if ((strcasecmp(argv_[1].data(), "kill") == 0) &&
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
argv_.size() == 4 &&
(strcasecmp(argv_[2].data(), "type") == 0) &&
((strcasecmp(argv_[3].data(), KILLTYPE_NORMAL.data()) == 0) || (strcasecmp(argv_[3].data(), KILLTYPE_PUBSUB.data()) == 0))) {
//kill all if user wanna kill a type
info_ = "type";
kill_type_ = argv_[3];
} else {
res_.SetRes(CmdRes::kErrOther, "Syntax error, try CLIENT (LIST [order by [addr|idle]| KILL ip:port)");
return;
Expand Down Expand Up @@ -733,6 +740,16 @@ void ClientCmd::Do() {
} else if ((strcasecmp(operation_.data(), "kill") == 0) && (strcasecmp(info_.data(), "all") == 0)) {
g_pika_server->ClientKillAll();
res_.SetRes(CmdRes::kOk);
} else if ((strcasecmp(operation_.data(), "kill") == 0) && (strcasecmp(info_.data(), "type") == 0)) {
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
if (kill_type_ == KILLTYPE_NORMAL) {
g_pika_server->ClientKillAllNormal();
res_.SetRes(CmdRes::kOk);
} else if (kill_type_ == KILLTYPE_PUBSUB) {
g_pika_server->ClientKillPubSub();
res_.SetRes(CmdRes::kOk);
} else {
res_.SetRes(CmdRes::kErrOther, "kill type is unknown");
}
} else if (g_pika_server->ClientKill(info_) == 1) {
res_.SetRes(CmdRes::kOk);
} else {
Expand Down Expand Up @@ -787,6 +804,10 @@ const std::string InfoCmd::kDebugSection = "debug";
const std::string InfoCmd::kCommandStatsSection = "commandstats";
const std::string InfoCmd::kCacheSection = "cache";


const std::string ClientCmd::KILLTYPE_NORMAL = "normal";
const std::string ClientCmd::KILLTYPE_PUBSUB = "pubsub";

void InfoCmd::Execute() {
std::shared_ptr<DB> db = g_pika_server->GetDB(db_name_);
Do();
Expand Down Expand Up @@ -1198,6 +1219,9 @@ void InfoCmd::InfoReplication(std::string& info) {
Status s;
uint32_t filenum = 0;
uint64_t offset = 0;
uint64_t total_file_num = 0;
uint64_t total_offset = 0;
uint64_t slave_repl_offset = 0;
std::string safety_purge;
std::shared_ptr<SyncMasterDB> master_db = nullptr;
for (const auto& t_item : g_pika_server->dbs_) {
Expand All @@ -1209,11 +1233,18 @@ void InfoCmd::InfoReplication(std::string& info) {
continue;
}
master_db->Logger()->GetProducerStatus(&filenum, &offset);
total_file_num += filenum;
total_offset += offset;
tmp_stream << db_name << ":binlog_offset=" << filenum << " " << offset;
s = master_db->GetSafetyPurgeBinlog(&safety_purge);
tmp_stream << ",safety_purge=" << (s.ok() ? safety_purge : "error") << "\r\n";
}

total_file_num += total_offset / g_pika_conf->binlog_file_size();
total_offset = total_offset % g_pika_conf->binlog_file_size();
//then total_offset will not greater than binlog file size whose max value is 2GB in bytes
//which can be contained by uint32_t
slave_repl_offset = (total_file_num << 32) | total_offset;
tmp_stream << "slave_repl_offset:" << slave_repl_offset << "\r\n";
info.append(tmp_stream.str());
}

Expand Down
4 changes: 4 additions & 0 deletions src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ void PikaDispatchThread::UnAuthUserAndKillClient(const std::set<std::string>& us
}
}

void PikaDispatchThread::StopThread() {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
thread_rep_->StopThread();
}

bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const {
if (ip == "127.0.0.1") {
ip = g_pika_server->host();
Expand Down
17 changes: 17 additions & 0 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ int PikaReplClient::Stop() {
for (auto & binlog_worker : write_binlog_workers_) {
binlog_worker->StopThread();
}

// write DB task is async task, we must wait all writeDB task done and then to exit
// or some data will be loss
bool all_write_db_task_done = true;
do {
for (auto &db_worker: write_db_workers_) {
if (db_worker->TaskQueueSize() != 0) {
all_write_db_task_done = false;
std::this_thread::sleep_for(std::chrono::microseconds(300));
break;
} else {
all_write_db_task_done = true;
}
}
//if there are unfinished async write db task, just continue to wait
} while(!all_write_db_task_done);
cheniujh marked this conversation as resolved.
Show resolved Hide resolved

for (auto &db_worker: write_db_workers_) {
db_worker->StopThread();
}
Expand Down
18 changes: 15 additions & 3 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ PikaServer::PikaServer()

PikaServer::~PikaServer() {
rsync_server_->Stop();
// DispatchThread will use queue of worker thread,
// so we need to delete dispatch before worker.
// DispatchThread will use queue of worker thread
// so we need to Stop dispatch before worker.
pika_dispatch_thread_->StopThread();
pika_client_processor_->Stop();
pika_slow_cmd_thread_pool_->stop_thread_pool();
pika_admin_cmd_thread_pool_->stop_thread_pool();
Expand Down Expand Up @@ -854,7 +855,18 @@ void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) {
key_scan_thread_.Schedule(func, arg);
}

void PikaServer::ClientKillAll() { pika_dispatch_thread_->ClientKillAll(); }
void PikaServer::ClientKillAll() {
pika_dispatch_thread_->ClientKillAll();
pika_pubsub_thread_->NotifyToCloseAllConns();
}

void PikaServer::ClientKillPubSub() {
pika_pubsub_thread_->NotifyToCloseAllConns();
}

void PikaServer::ClientKillAllNormal() {
pika_dispatch_thread_->ClientKillAll();
}

int PikaServer::ClientKill(const std::string& ip_port) {
if (pika_dispatch_thread_->ClientKill(ip_port)) {
Expand Down
Loading