Skip to content

Commit

Permalink
feat: support network IO traffic monitoring (OpenAtomFoundation#1733)
Browse files Browse the repository at this point in the history
* feat: support network IO traffic monitoring

Support network IO traffic monitoring. Including IO bytes and kps of Redis requests and master-slave replication.

Fixes: OpenAtomFoundation#1732

Signed-off-by: yaoyinnan <[email protected]>
  • Loading branch information
yaoyinnan authored Jul 18, 2023
1 parent bac8102 commit 4821991
Show file tree
Hide file tree
Showing 27 changed files with 4,109 additions and 2,270 deletions.
39 changes: 39 additions & 0 deletions include/pika_instant.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_PIKA_INSTANT_H
#define PIKA_PIKA_INSTANT_H

#include <string>
#include <unordered_map>

inline constexpr size_t STATS_METRIC_SAMPLES = 16; /* Number of samples per metric. */
inline const std::string STATS_METRIC_NET_INPUT = "stats_metric_net_input";
inline const std::string STATS_METRIC_NET_OUTPUT = "stats_metric_net_output";
inline const std::string STATS_METRIC_NET_INPUT_REPLICATION = "stats_metric_net_input_replication";
inline const std::string STATS_METRIC_NET_OUTPUT_REPLICATION = "stats_metric_net_output_replication";

/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct InstMetric{
size_t last_sample_base; /* The divisor of last sample window */
size_t last_sample_value; /* The dividend of last sample window */
double samples[STATS_METRIC_SAMPLES];
int idx;
};

class Instant {
public:
Instant() = default;
~Instant() = default;

void trackInstantaneousMetric(std::string metric, size_t current_value, size_t current_base, size_t factor);
double getInstantaneousMetric(std::string metric);

private:
std::unordered_map<std::string, InstMetric> inst_metrics_;
};

#endif // PIKA_PIKA_INSTANT_H
22 changes: 22 additions & 0 deletions include/pika_monotonic_time.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_MONOTONIC_TIME_H
#define PIKA_MONOTONIC_TIME_H

#include <cstdint>

/* A counter in micro-seconds. The 'monotime' type is provided for variables
* holding a monotonic time. This will help distinguish & document that the
* variable is associated with the monotonic clock and should not be confused
* with other types of time.*/
typedef uint64_t monotime;

// Get monotonic time in microseconds
monotime getMonotonicUs();

#endif // PIKA_MONOTONIC_TIME_H


23 changes: 22 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "include/pika_db.h"
#include "include/pika_define.h"
#include "include/pika_dispatch_thread.h"
#include "include/pika_instant.h"
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"
#include "include/pika_rsync_service.h"
Expand Down Expand Up @@ -306,6 +307,19 @@ class PikaServer : public pstd::noncopyable {
std::unordered_map<std::string, uint64_t> ServerExecCountDB();
QpsStatistic ServerDBStat(const std::string& db_name);
std::unordered_map<std::string, QpsStatistic> ServerAllDBStat();

/*
* Network Statistic used
*/
size_t NetInputBytes();
size_t NetOutputBytes();
size_t NetReplInputBytes();
size_t NetReplOutputBytes();
float InstantaneousInputKbps();
float InstantaneousOutputKbps();
float InstantaneousInputReplKbps();
float InstantaneousOutputReplKbps();

/*
* Slave to Master communication used
*/
Expand Down Expand Up @@ -472,7 +486,13 @@ class PikaServer : public pstd::noncopyable {
/*
* Info Commandstats used
*/
std::unordered_map<std::string, CommandStatistics>* GetCommandStatMap();
std::unordered_map<std::string, CommandStatistics>* GetCommandStatMap();


/*
* Instantaneous Metric used
*/
std::unique_ptr<Instant> instant_;

friend class Cmd;
friend class InfoCmd;
Expand All @@ -488,6 +508,7 @@ class PikaServer : public pstd::noncopyable {
void AutoPurge();
void AutoDeleteExpiredDump();
void AutoKeepAliveRSync();
void AutoInstantaneousMetric();

std::string host_;
int port_ = 0;
Expand Down
4 changes: 2 additions & 2 deletions include/pika_statistic.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class QpsStatistic {
};

struct ServerStatistic {
ServerStatistic();
~ServerStatistic();
ServerStatistic() = default;
~ServerStatistic() = default;

std::atomic<uint64_t> accumulative_connections;
std::unordered_map<std::string, std::atomic<uint64_t>> exec_count_db;
Expand Down
36 changes: 36 additions & 0 deletions src/net/include/net_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef NET_INCLUDE_REDIS_STSTS_H_
#define NET_INCLUDE_REDIS_STSTS_H_

#include <atomic>

namespace net {

class NetworkStatistic {
public:
NetworkStatistic() = default;
~NetworkStatistic() = default;

size_t NetInputBytes();
size_t NetOutputBytes();
size_t NetReplInputBytes();
size_t NetReplOutputBytes();
void IncrRedisInputBytes(uint64_t bytes);
void IncrRedisOutputBytes(uint64_t bytes);
void IncrReplInputBytes(uint64_t bytes);
void IncrReplOutputBytes(uint64_t bytes);

private:
std::atomic<size_t> stat_net_input_bytes {0}; /* Bytes read from network. */
std::atomic<size_t> stat_net_output_bytes {0}; /* Bytes written to network. */
std::atomic<size_t> stat_net_repl_input_bytes {0}; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */
std::atomic<size_t> stat_net_repl_output_bytes {0}; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */
};

}

#endif // NET_INCLUDE_REDIS_STSTS_H_
46 changes: 46 additions & 0 deletions src/net/src/net_stats.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <atomic>
#include <memory>
#include "net/include/net_stats.h"

std::unique_ptr<net::NetworkStatistic> g_network_statistic;

namespace net {

size_t NetworkStatistic::NetInputBytes() {
return stat_net_input_bytes.load(std::memory_order_relaxed);
}

size_t NetworkStatistic::NetOutputBytes() {
return stat_net_output_bytes.load(std::memory_order_relaxed);
}

size_t NetworkStatistic::NetReplInputBytes() {
return stat_net_repl_input_bytes.load(std::memory_order_relaxed);
}

size_t NetworkStatistic::NetReplOutputBytes() {
return stat_net_repl_output_bytes.load(std::memory_order_relaxed);
}

void NetworkStatistic::IncrRedisInputBytes(uint64_t bytes) {
stat_net_input_bytes.fetch_add(bytes, std::memory_order_relaxed);
}

void NetworkStatistic::IncrRedisOutputBytes(uint64_t bytes) {
stat_net_output_bytes.fetch_add(bytes, std::memory_order_relaxed);
}

void NetworkStatistic::IncrReplInputBytes(uint64_t bytes) {
stat_net_repl_input_bytes.fetch_add(bytes, std::memory_order_relaxed);
}

void NetworkStatistic::IncrReplOutputBytes(uint64_t bytes) {
stat_net_repl_output_bytes.fetch_add(bytes, std::memory_order_relaxed);
}

}
6 changes: 6 additions & 0 deletions src/net/src/pb_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
#include <glog/logging.h>

#include "net/include/net_define.h"
#include "net/include/net_stats.h"
#include "pstd/include/xdebug.h"

extern std::unique_ptr<net::NetworkStatistic> g_network_statistic;

namespace net {

PbConn::PbConn(const int fd, const std::string& ip_port, Thread* thread, NetMultiplexer* mpx)
Expand All @@ -34,6 +37,7 @@ ReadStatus PbConn::GetRequest() {
switch (connStatus_) {
case kHeader: {
ssize_t nread = read(fd(), rbuf_ + cur_pos_, COMMAND_HEADER_LENGTH - cur_pos_);
g_network_statistic->IncrReplInputBytes(nread);
if (nread == -1) {
if (errno == EAGAIN) {
return kReadHalf;
Expand Down Expand Up @@ -71,6 +75,7 @@ ReadStatus PbConn::GetRequest() {
}
// read msg body
ssize_t nread = read(fd(), rbuf_ + cur_pos_, remain_packet_len_);
g_network_statistic->IncrReplInputBytes(nread);
if (nread == -1) {
if (errno == EAGAIN) {
return kReadHalf;
Expand Down Expand Up @@ -117,6 +122,7 @@ WriteStatus PbConn::SendReply() {
item_len = item.size();
while (item_len - write_buf_.item_pos_ > 0) {
nwritten = write(fd(), item.data() + write_buf_.item_pos_, item_len - write_buf_.item_pos_);
g_network_statistic->IncrReplOutputBytes(nwritten);
if (nwritten <= 0) {
break;
}
Expand Down
8 changes: 5 additions & 3 deletions src/net/src/redis_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@

#include "net/include/redis_conn.h"

#include <climits>
#include <cstdlib>

#include <sstream>
#include <string>

#include <glog/logging.h>

#include "net/include/net_stats.h"
#include "pstd/include/pstd_string.h"
#include "pstd/include/xdebug.h"

extern std::unique_ptr<net::NetworkStatistic> g_network_statistic;

namespace net {

RedisConn::RedisConn(const int fd, const std::string& ip_port, Thread* thread, NetMultiplexer* net_mpx,
Expand Down Expand Up @@ -87,6 +87,7 @@ ReadStatus RedisConn::GetRequest() {
}

nread = read(fd(), rbuf_ + next_read_pos, remain);
g_network_statistic->IncrRedisInputBytes(nread);
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
nread = 0;
Expand Down Expand Up @@ -129,6 +130,7 @@ WriteStatus RedisConn::SendReply() {
size_t wbuf_len = response_.size();
while (wbuf_len > 0) {
nwritten = write(fd(), response_.data() + wbuf_pos_, wbuf_len - wbuf_pos_);
g_network_statistic->IncrRedisOutputBytes(nwritten);
if (nwritten <= 0) {
break;
}
Expand Down
5 changes: 5 additions & 0 deletions src/pika.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <sys/resource.h>
#include <csignal>

#include "net/include/net_stats.h"
#include "include/build_version.h"
#include "include/pika_cmd_table_manager.h"
#include "include/pika_command.h"
Expand All @@ -26,6 +27,8 @@ std::unique_ptr<PikaReplicaManager> g_pika_rm;

std::unique_ptr<PikaCmdTableManager> g_pika_cmd_table_manager;

extern std::unique_ptr<net::NetworkStatistic> g_network_statistic;

static void version() {
char version[32];
snprintf(version, sizeof(version), "%d.%d.%d", PIKA_MAJOR, PIKA_MINOR, PIKA_PATCH);
Expand Down Expand Up @@ -192,6 +195,7 @@ int main(int argc, char* argv[]) {
g_pika_cmd_table_manager = std::make_unique<PikaCmdTableManager>();
g_pika_server = new PikaServer();
g_pika_rm = std::make_unique<PikaReplicaManager>();
g_network_statistic = std::make_unique<net::NetworkStatistic>();

if (g_pika_conf->daemonize()) {
close_std();
Expand All @@ -202,6 +206,7 @@ int main(int argc, char* argv[]) {
g_pika_server = nullptr;
g_pika_rm.reset();
g_pika_cmd_table_manager.reset();
g_network_statistic.reset();
::google::ShutdownGoogleLogging();
g_pika_conf.reset();
};
Expand Down
11 changes: 11 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,17 @@ void InfoCmd::InfoStats(std::string& info) {
tmp_stream << "total_connections_received:" << g_pika_server->accumulative_connections() << "\r\n";
tmp_stream << "instantaneous_ops_per_sec:" << g_pika_server->ServerCurrentQps() << "\r\n";
tmp_stream << "total_commands_processed:" << g_pika_server->ServerQueryNum() << "\r\n";

// Network stats
tmp_stream << "total_net_input_bytes:" << g_pika_server->NetInputBytes() + g_pika_server->NetReplInputBytes() << "\r\n";
tmp_stream << "total_net_output_bytes:" << g_pika_server->NetOutputBytes() + g_pika_server->NetReplOutputBytes() << "\r\n";
tmp_stream << "total_net_repl_input_bytes:" << g_pika_server->NetReplInputBytes() << "\r\n";
tmp_stream << "total_net_repl_output_bytes:" << g_pika_server->NetReplOutputBytes() << "\r\n";
tmp_stream << "instantaneous_input_kbps:" << g_pika_server->InstantaneousInputKbps() << "\r\n";
tmp_stream << "instantaneous_output_kbps:" << g_pika_server->InstantaneousOutputKbps() << "\r\n";
tmp_stream << "instantaneous_input_repl_kbps:" << g_pika_server->InstantaneousInputReplKbps() << "\r\n";
tmp_stream << "instantaneous_output_repl_kbps:" << g_pika_server->InstantaneousOutputReplKbps() << "\r\n";

tmp_stream << "is_bgsaving:" << (g_pika_server->IsBgSaving() ? "Yes" : "No") << "\r\n";
tmp_stream << "is_scaning_keyspace:" << (g_pika_server->IsKeyScaning() ? "Yes" : "No") << "\r\n";
tmp_stream << "is_compact:" << (g_pika_server->IsCompacting() ? "Yes" : "No") << "\r\n";
Expand Down
40 changes: 40 additions & 0 deletions src/pika_instant.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <string>
#include "../include/pika_instant.h"

/* Return the mean of all the samples. */
double Instant::getInstantaneousMetric(std::string metric) {
size_t j;
size_t sum = 0;

for (j = 0; j < STATS_METRIC_SAMPLES; j++)
sum += inst_metrics_[metric].samples[j];

return sum / STATS_METRIC_SAMPLES;
}

/* ======================= Cron: called every 100 ms ======================== */

/* Add a sample to the instantaneous metric. This function computes the quotient
* of the increment of value and base, which is useful to record operation count
* per second, or the average time consumption of an operation.
*
* current_value - The dividend
* current_base - The divisor
* */
void Instant::trackInstantaneousMetric(std::string metric, size_t current_value, size_t current_base, size_t factor) {
if (inst_metrics_[metric].last_sample_base > 0) {
size_t base = current_base - inst_metrics_[metric].last_sample_base;
size_t value = current_value - inst_metrics_[metric].last_sample_value;
size_t avg = base > 0 ? (value * factor / base) : 0;
inst_metrics_[metric].samples[inst_metrics_[metric].idx] = avg;
inst_metrics_[metric].idx++;
inst_metrics_[metric].idx %= STATS_METRIC_SAMPLES;
}
inst_metrics_[metric].last_sample_base = current_base;
inst_metrics_[metric].last_sample_value = current_value;
}
Loading

0 comments on commit 4821991

Please sign in to comment.