Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add log-net-activities configuration #2964

Merged
Merged
10 changes: 8 additions & 2 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
###########################

# Pika port, the default value is 9221.
# [NOTICE] Port Magic offsets of port+1000 / port+2000 are used by Pika at present.
# Port 10221 is used for Rsync, and port 11221 is used for Replication, while the listening port is 9221.
# [NOTICE] Port Magic offsets of port+1000 / port+10000 are used by Pika at present.
# Port 9221+10000 is used for Rsync, and port 9221+1000 is used for incr Replication, while the listening port is 9221.
port : 9221

db-instance-num : 3
Expand Down Expand Up @@ -74,6 +74,12 @@ log-path : ./log/
# The unit of serverlogs is in [days] and the default value is 7(days).
log-retention-time : 7

# log level can be config as 0 or 1.
# when log-level is 0: connection activities will only be summary.
# when log-level is 1: connection activities will be show in details.
# Default log-level is 0.
log-level : 0
cheniujh marked this conversation as resolved.
Show resolved Hide resolved

# Directory to store the data of Pika.
db-path : ./db/

Expand Down
10 changes: 5 additions & 5 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return log_retention_time_;
}
std::string log_level() {
std::shared_lock l(rwlock_);
int32_t log_level() {
return log_level_;
}
std::string db_path() {
Expand Down Expand Up @@ -826,9 +825,9 @@ class PikaConf : public pstd::BaseConf {
max_compaction_bytes_ = value;
}

void SetLogLevel(const std::string& value) {
void SetLogLevel(int32_t value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("loglevel", value);
TryPushDiffCommands("log-level", std::to_string(value));
log_level_ = value;
}
cheniujh marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -952,7 +951,6 @@ class PikaConf : public pstd::BaseConf {
std::string slaveof_;
std::string log_path_;
int log_retention_time_;
std::string log_level_;
std::string db_path_;
int db_instance_num_ = 0;
std::string db_sync_path_;
Expand Down Expand Up @@ -1093,6 +1091,8 @@ class PikaConf : public pstd::BaseConf {
std::atomic_int cache_maxmemory_policy_ = 1;
std::atomic_int cache_maxmemory_samples_ = 5;
std::atomic_int cache_lfu_decay_time_ = 1;
std::atomic_int log_level_ = 0;


// rocksdb blob
bool enable_blob_files_ = false;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class PikaDispatchThread {

bool ClientKill(const std::string& ip_port);
void ClientKillAll();

void SetLogLevel(int32_t value);
void SetQueueLimit(int queue_limit) { thread_rep_->SetQueueLimit(queue_limit); }

void UnAuthUserAndKillClient(const std::set<std::string> &users, const std::shared_ptr<User>& defaultUser);
Expand Down
2 changes: 1 addition & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ class PikaServer : public pstd::noncopyable {
void CacheConfigInit(cache::CacheConfig &cache_cfg);
void ProcessCronTask();
double HitRatio();

void SetLogLevel(int32_t value);
/*
* disable compact
*/
Expand Down
4 changes: 4 additions & 0 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class ServerThread : public Thread {

int SetTcpNoDelay(int connfd);

void SetLogLevel(int32_t value);

/*
* StartThread will return the error code as pthread_create
* Return 0 if success
Expand Down Expand Up @@ -167,6 +169,8 @@ class ServerThread : public Thread {
*/
std::unique_ptr<NetMultiplexer> net_multiplexer_;

std::atomic<int32_t> log_level_{0};

private:
friend class HolyThread;
friend class DispatchThread;
Expand Down
10 changes: 7 additions & 3 deletions src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,19 @@ void DispatchThread::HandleNewConn(const int connfd, const std::string& ip_port)
// Slow workers may consume many fds.
// We simply loop to find next legal worker.
NetItem ti(connfd, ip_port);
LOG(INFO) << "accept new conn " << ti.String();
if (log_level_ == 1) {
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
LOG(INFO) << "accept new conn " << ti.String();
}
int next_thread = last_thread_;
bool find = false;
for (int cnt = 0; cnt < work_num_; cnt++) {
std::unique_ptr<WorkerThread>& worker_thread = worker_thread_[next_thread];
find = worker_thread->MoveConnIn(ti, false);
if (find) {
last_thread_ = (next_thread + 1) % work_num_;
LOG(INFO) << "find worker(" << next_thread << "), refresh the last_thread_ to " << last_thread_;
if (log_level_ == 1) {
LOG(INFO) << "find worker(" << next_thread << "), refresh the last_thread_ to " << last_thread_;
}
break;
}
next_thread = (next_thread + 1) % work_num_;
Expand Down Expand Up @@ -189,7 +193,7 @@ void DispatchThread::CleanWaitNodeOfUnBlockedBlrConn(std::shared_ptr<net::RedisC
// removed all the waiting info of this conn/ doing cleaning work
auto pair = blocked_conn_to_keys_.find(conn_unblocked->fd());
if (pair == blocked_conn_to_keys_.end()) {
LOG(WARNING) << "blocking info of blpop/brpop went wrong, blpop/brpop can't working correctly";
LOG(ERROR) << "blocking info of blpop/brpop went wrong, blpop/brpop can't working correctly";
return;
}
auto& blpop_keys_list = pair->second;
Expand Down
7 changes: 7 additions & 0 deletions src/net/src/server_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ void* ServerThread::ThreadMain() {
return nullptr;
}

void ServerThread::SetLogLevel(int32_t value) {
if (value != 0 && value != 1) {
return;
}
log_level_.store(value);
}

#ifdef __ENABLE_SSL
static std::vector<std::unique_ptr<pstd::Mutex>> ssl_mutex_;

Expand Down
20 changes: 18 additions & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1590,6 +1590,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->port());
}

if (pstd::stringmatch(pattern.data(), "log-level", 1) != 0) {
elements += 2;
EncodeString(&config_body, "log-level");
EncodeNumber(&config_body, g_pika_conf->log_level());
}

if (pstd::stringmatch(pattern.data(), "thread-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "thread-num");
Expand Down Expand Up @@ -2157,7 +2163,7 @@ void ConfigCmd::ConfigGet(std::string& ret) {
if (pstd::stringmatch(pattern.data(), "loglevel", 1) != 0) {
elements += 2;
EncodeString(&config_body, "loglevel");
EncodeString(&config_body, g_pika_conf->log_level());
EncodeString(&config_body, std::to_string(g_pika_conf->log_level()));
}

if (pstd::stringmatch(pattern.data(), "min-blob-size", 1) != 0) {
Expand Down Expand Up @@ -2472,6 +2478,14 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
g_pika_conf->SetSlowlogMaxLen(static_cast<int>(ival));
g_pika_server->SlowlogTrim();
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "log-level") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || (ival != 0 && ival != 1)) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'log-level', only 0 or 1 is valid\r\n");
return;
}
g_pika_conf->SetLogLevel(static_cast<int32_t>(ival));
g_pika_server->SetLogLevel(static_cast<int32_t>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "max-cache-statistic-keys") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-statistic-keys'\r\n");
Expand Down Expand Up @@ -3312,7 +3326,9 @@ void QuitCmd::DoInitial() {

void QuitCmd::Do() {
res_.SetRes(CmdRes::kOk);
LOG(INFO) << "QutCmd will close connection " << GetConn()->String();
if (g_pika_conf->log_level()) {
LOG(INFO) << "QutCmd will close connection " << GetConn()->String();
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
}
GetConn()->SetClose(true);
}

Expand Down
10 changes: 9 additions & 1 deletion src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,15 @@ int PikaConf::Load() {
if(log_retention_time_ < 0){
LOG(FATAL) << "log-retention-time invalid";
}
GetConfStr("loglevel", &log_level_);

int32_t log_level = 0;
GetConfInt("log-level", &log_level);
if (log_level != 0 && log_level != 1) {
LOG(ERROR) << "log-level loaded from pika.conf is invalid, auto change it to 0";
log_level = 0;
}
log_level_.store(log_level);

GetConfStr("db-path", &db_path_);
GetConfInt("db-instance-num", &db_instance_num_);
if (db_instance_num_ <= 0) {
Expand Down
3 changes: 3 additions & 0 deletions src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ void PikaDispatchThread::UnAuthUserAndKillClient(const std::set<std::string>& us
void PikaDispatchThread::StopThread() {
thread_rep_->StopThread();
}
void PikaDispatchThread::SetLogLevel(int32_t value) {
thread_rep_->SetLogLevel(value);
}
cheniujh marked this conversation as resolved.
Show resolved Hide resolved

bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const {
if (ip == "127.0.0.1") {
Expand Down
2 changes: 2 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ void PikaServer::Start() {
<< (ret == net::kBindError ? ": bind port " + std::to_string(port_) + " conflict" : ": other error")
<< ", Listen on this port to handle the connected redis client";
}
pika_dispatch_thread_->SetLogLevel(g_pika_conf->log_level());
ret = pika_pubsub_thread_->StartThread();
if (ret != net::kSuccess) {
dbs_.clear();
Expand Down Expand Up @@ -1919,3 +1920,4 @@ void PikaServer::CacheConfigInit(cache::CacheConfig& cache_cfg) {
cache_cfg.maxmemory_samples = g_pika_conf->cache_maxmemory_samples();
cache_cfg.lfu_decay_time = g_pika_conf->cache_lfu_decay_time();
}
void PikaServer::SetLogLevel(int32_t value) { pika_dispatch_thread_->SetLogLevel(value); }
Loading