Skip to content

Commit

Permalink
add idle field for client list command
Browse files Browse the repository at this point in the history
  • Loading branch information
KernelMaker committed May 19, 2016
1 parent 57a9548 commit b48fcad
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 25 deletions.
6 changes: 6 additions & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
const std::string kPikaVersion = "2.0.3";
const std::string kPikaPidFile = "pika.pid";

struct ClientInfo {
int fd;
std::string ip_port;
int last_interaction;
};

struct WorkerCronTask {
int task;
std::string ip_port;
Expand Down
4 changes: 2 additions & 2 deletions include/pika_monitor_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class PikaMonitorThread : public pink::Thread {

void AddMonitorClient(pink::RedisConn* client_ptr);
void AddMonitorMessage(std::string monitor_message);
int32_t ThreadClientList(std::vector<std::pair<int32_t, std::string> >* client = NULL);
int32_t ThreadClientList(std::vector<ClientInfo>* client = NULL);
bool ThreadClientKill(const std::string& ip_port = "all");
bool HasMonitorClients();

Expand All @@ -33,7 +33,7 @@ class PikaMonitorThread : public pink::Thread {
slash::Mutex monitor_mutex_protector_;
slash::CondVar monitor_cond_;

std::list<std::pair<int32_t, std::string> > monitor_clients_;
std::list<ClientInfo> monitor_clients_;
std::deque<std::string> monitor_messages_;
std::atomic<bool> is_running_;
std::atomic<bool> should_exit_;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class PikaServer
*/
void ClientKillAll();
int ClientKill(const std::string &ip_port);
int64_t ClientList(std::vector< std::pair<int, std::string> > *clients = NULL);
int64_t ClientList(std::vector<ClientInfo> *clients = NULL);

/*
* Monitor used
Expand Down
2 changes: 1 addition & 1 deletion include/pika_worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PikaWorkerThread : public pink::WorkerThread<PikaClientConn>
virtual ~PikaWorkerThread();
virtual void CronHandle();

int64_t ThreadClientList(std::vector< std::pair<int, std::string> > *clients = NULL);
int64_t ThreadClientList(std::vector<ClientInfo> *clients = NULL);
bool ThreadClientKill(std::string ip_port = "");
int ThreadClientNum();

Expand Down
8 changes: 5 additions & 3 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,15 @@ void ClientCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info)

void ClientCmd::Do() {
if (operation_ == CLIENT_LIST_S) {
std::vector< std::pair<int, std::string> > clients;
struct timeval now;
gettimeofday(&now, NULL);
std::vector<ClientInfo> clients;
g_pika_server->ClientList(&clients);
std::vector<std::pair<int, std::string> >::iterator iter= clients.begin();
std::vector<ClientInfo>::iterator iter= clients.begin();
std::string reply = "+";
char buf[128];
while (iter != clients.end()) {
snprintf(buf, sizeof(buf), "addr=%s, fd=%d\n", iter->second.c_str(), iter->first);
snprintf(buf, sizeof(buf), "addr=%s, fd=%d, idle=%ld\n", iter->ip_port.c_str(), iter->fd, iter->last_interaction == 0 ? 0 : now.tv_sec - iter->last_interaction);
reply.append(buf);
iter++;
}
Expand Down
28 changes: 14 additions & 14 deletions src/pika_monitor_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,29 @@ PikaMonitorThread::~PikaMonitorThread() {
monitor_cond_.SignalAll();
pthread_join(thread_id(), NULL);
}
for (std::list<std::pair<int32_t, std::string> >::iterator iter = monitor_clients_.begin();
for (std::list<ClientInfo>::iterator iter = monitor_clients_.begin();
iter != monitor_clients_.end();
++iter) {
close(iter->first);
close(iter->fd);
}
DLOG(INFO) << " PikaMonitorThread " << pthread_self() << " exit!!!";
}

void PikaMonitorThread::AddMonitorClient(pink::RedisConn* client_ptr) {
StartThread();
slash::MutexLock lm(&monitor_mutex_protector_);
monitor_clients_.push_back(std::make_pair(client_ptr->fd(), client_ptr->ip_port()));
monitor_clients_.push_back(ClientInfo{client_ptr->fd(), client_ptr->ip_port(), 0});
}

void PikaMonitorThread::RemoveMonitorClient(const std::string& ip_port) {
std::list<std::pair<int32_t, std::string> >::iterator iter = monitor_clients_.begin();
std::list<ClientInfo>::iterator iter = monitor_clients_.begin();
for (; iter != monitor_clients_.end(); ++iter) {
if (ip_port == "all") {
close(iter->first);
close(iter->fd);
continue;
}
if (iter->second == ip_port) {
close(iter->first);
if (iter->ip_port == ip_port) {
close(iter->fd);
break;
}
}
Expand All @@ -66,9 +66,9 @@ void PikaMonitorThread::AddMonitorMessage(std::string monitor_message) {
}
}

int32_t PikaMonitorThread::ThreadClientList(std::vector<std::pair<int32_t, std::string> >* clients_ptr) {
int32_t PikaMonitorThread::ThreadClientList(std::vector<ClientInfo>* clients_ptr) {
if (clients_ptr != NULL) {
for (std::list<std::pair<int32_t, std::string> >::iterator iter = monitor_clients_.begin();
for (std::list<ClientInfo>::iterator iter = monitor_clients_.begin();
iter != monitor_clients_.end();
iter++) {
clients_ptr->push_back(*iter);
Expand All @@ -89,10 +89,10 @@ void PikaMonitorThread::AddCronTask(MonitorCronTask task) {

bool PikaMonitorThread::FindClient(const std::string &ip_port) {
slash::MutexLock lm(&monitor_mutex_protector_);
for (std::list<std::pair<int32_t, std::string> >::iterator iter = monitor_clients_.begin();
for (std::list<ClientInfo>::iterator iter = monitor_clients_.begin();
iter != monitor_clients_.end();
++iter) {
if (iter->second == ip_port) {
if (iter->ip_port == ip_port) {
return true;
}
}
Expand Down Expand Up @@ -189,12 +189,12 @@ void* PikaMonitorThread::ThreadMain() {
}
messages_transfer.replace(messages_transfer.size()-1, 1, "\r\n", 0, 2);
monitor_mutex_protector_.Lock();
for (std::list<std::pair<int32_t, std::string> >::iterator iter = monitor_clients_.begin();
for (std::list<ClientInfo>::iterator iter = monitor_clients_.begin();
iter != monitor_clients_.end();
++iter) {
write_status = SendMessage(iter->first, messages_transfer);
write_status = SendMessage(iter->fd, messages_transfer);
if (write_status == pink::kWriteError) {
cron_tasks_.push({TASK_KILL, iter->second});
cron_tasks_.push({TASK_KILL, iter->ip_port});
}
}
monitor_mutex_protector_.Unlock();
Expand Down
2 changes: 1 addition & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ int PikaServer::ClientKill(const std::string &ip_port) {
return 0;
}

int64_t PikaServer::ClientList(std::vector< std::pair<int, std::string> > *clients) {
int64_t PikaServer::ClientList(std::vector<ClientInfo> *clients) {
int64_t clients_num = 0;
for (int idx = 0; idx != worker_num_; ++idx) {
clients_num += pika_worker_thread_[idx]->ThreadClientList(clients);
Expand Down
4 changes: 2 additions & 2 deletions src/pika_worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ void PikaWorkerThread::ClientKillAll() {
}
}

int64_t PikaWorkerThread::ThreadClientList(std::vector< std::pair<int, std::string> > *clients) {
int64_t PikaWorkerThread::ThreadClientList(std::vector<ClientInfo> *clients) {
slash::RWLock l(&rwlock_, false);
if (clients != NULL) {
std::map<int, void*>::const_iterator iter = conns_.begin();
while (iter != conns_.end()) {
clients->push_back(make_pair(iter->first, reinterpret_cast<PikaClientConn*>(iter->second)->ip_port()));
clients->push_back(ClientInfo{iter->first, reinterpret_cast<PikaClientConn*>(iter->second)->ip_port(), (reinterpret_cast<PikaClientConn*>(iter->second)->last_interaction()).tv_sec});
iter++;
}
}
Expand Down
2 changes: 1 addition & 1 deletion third/nemo
Submodule nemo updated from d2253f to 8063db

0 comments on commit b48fcad

Please sign in to comment.