Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加慢日志记录 #23

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ futures= "0.1"
toml="0.4"
serde="1.0"
serde_derive="1.0"
serde_json = "1.0"
itoa= "0.4.4"
net2="0.2"
md5="0.6"
@@ -59,6 +60,7 @@ actix-rt = "0.2.5"
sysinfo = "0.9.5"
rayon = "1.2.0"
hotwatch = "0.4.3"
chrono = "0.4.13"

[profile.release]
debug = true
14 changes: 13 additions & 1 deletion src/cli.yml
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ args:
value_name: FILE
help: Sets a custom config file
takes_value: true
required: true
- ip:
short: i
long: ip
@@ -23,3 +22,16 @@ args:
short: r
long: reload
help: enable reload feature for standalone proxy mode.
- slowlog-slower-than:
long: slowlog-slower-than
help: slowlog-slower-than is the microseconds which slowlog must slower than. DEFAULT 100_000
- slowlog-file-path:
long: slowlog-file_path
help: the file where slowlog output.
- slowlog-max-file-size:
long: slowlog-max-file-size
help: max size in Mb for single slowlog file. DEFAULT 200Mb
- slowlog-file-backup:
long: slowlog-file-backup
help: single slowlog file backup. DEFAULT 3

8 changes: 8 additions & 0 deletions src/com.rs
Original file line number Diff line number Diff line change
@@ -255,6 +255,8 @@ pub struct ClusterConfig {

// dead option: always 1
pub node_connections: Option<usize>,

pub slowlog_slow_than: Option<u64>,
}

impl ClusterConfig {
@@ -268,6 +270,12 @@ impl ClusterConfig {
pub(crate) fn fetch_interval_ms(&self) -> u64 {
self.fetch_interval.unwrap_or(DEFAULT_FETCH_INTERVAL_MS)
}

pub fn set_slowlog_slow_than(&mut self, value: u64) {
if self.slowlog_slow_than.is_none() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里不用判断吧?

self.slowlog_slow_than.replace(value);
}
}
}

#[cfg(windows)]
33 changes: 27 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -31,12 +31,20 @@ use failure::Error;
use com::meta::{load_meta, meta_init};
use com::ClusterConfig;
use metrics::thread_incr;
use metrics::slowlog::{self, Entry};

use std::time::Duration;

use futures::sync::mpsc::{channel, Sender};

pub fn run() -> Result<(), Error> {
env_logger::init();
let yaml = load_yaml!("cli.yml");
let matches = App::from_yaml(yaml).version(ASTER_VERSION).get_matches();
if matches.is_present("version") {
println!("{}", ASTER_VERSION);
return Ok(())
}
let config = matches.value_of("config").unwrap_or("default.toml");
let watch_file = config.to_string();
let ip = matches.value_of("ip").map(|x| x.to_string());
@@ -50,8 +58,12 @@ pub fn run() -> Result<(), Error> {
);
crate::proxy::standalone::reload::init(&watch_file, cfg.clone(), enable_reload)?;

let (tx, rx) = channel(1024);

let slowlog_slower_than = matches.value_of("slowlog-slower-than").unwrap_or("100000").parse::<u64>().unwrap();

let mut ths = Vec::new();
for cluster in cfg.clusters.into_iter() {
for mut cluster in cfg.clusters.into_iter() {
if cluster.servers.is_empty() {
warn!(
"fail to running cluster {} in addr {} due filed `servers` is empty",
@@ -68,23 +80,31 @@ pub fn run() -> Result<(), Error> {
continue;
}

cluster.set_slowlog_slow_than(slowlog_slower_than);

info!(
"starting aster cluster {} in addr {}",
cluster.name, cluster.listen_addr
);


match cluster.cache_type {
com::CacheType::RedisCluster => {
let jhs = spwan_worker(&cluster, ip.clone(), proxy::cluster::spawn);
let jhs = spwan_worker(&cluster, ip.clone(), tx.clone(), proxy::cluster::spawn);
ths.extend(jhs);
}
_ => {
let jhs = spwan_worker(&cluster, ip.clone(), proxy::standalone::spawn);
let jhs = spwan_worker(&cluster, ip.clone(), tx.clone(), proxy::standalone::spawn);
ths.extend(jhs);
}
}
}

let slowlog_file_path = matches.value_of("slowlog-file-path").unwrap_or("aster-slowlog.log").to_string();
let slowlog_max_file_size = matches.value_of("slowlog-max-file-size").unwrap_or("200").parse::<u64>().unwrap();
let slowlog_file_backup = matches.value_of("slowlog-file-backup").unwrap_or("3").parse::<u8>().unwrap();
thread::spawn(move || slowlog::run(slowlog_file_path, slowlog_max_file_size, slowlog_file_backup, rx));

{
let port_str = matches.value_of("metrics").unwrap_or("2110");
let port = port_str.parse::<usize>().unwrap_or(2110);
@@ -97,9 +117,9 @@ pub fn run() -> Result<(), Error> {
Ok(())
}

fn spwan_worker<T>(cc: &ClusterConfig, ip: Option<String>, spawn_fn: T) -> Vec<JoinHandle<()>>
fn spwan_worker<T>(cc: &ClusterConfig, ip: Option<String>, slowlog_tx: Sender<Entry>, spawn_fn: T) -> Vec<JoinHandle<()>>
where
T: Fn(ClusterConfig) + Copy + Send + 'static,
T: Fn(ClusterConfig, Sender<Entry>) + Copy + Send + 'static,
{
let worker = cc.thread.unwrap_or(4);
let meta = load_meta(cc.clone(), ip);
@@ -108,12 +128,13 @@ where
.map(|_index| {
let cc = cc.clone();
let meta = meta.clone();
let tx = slowlog_tx.clone();
Builder::new()
.name(cc.name.clone())
.spawn(move || {
thread_incr();
meta_init(meta);
spawn_fn(cc);
spawn_fn(cc, tx);
})
.expect("fail to spawn worker thread")
})
176 changes: 175 additions & 1 deletion src/metrics/slowlog.rs
Original file line number Diff line number Diff line change
@@ -1 +1,175 @@
//! slowlog feature mod
use std::fs::{self, File, OpenOptions};
use std::io::{Write, BufWriter};
use std::time::{Duration, Instant};

use super::thread_incr;

use serde::{Deserialize, Serialize};
use serde_json;
use tokio::timer::Delay;
use futures::{Async, Future, Stream};
use futures::sync::mpsc::Receiver;
use tokio::runtime::current_thread;

const BUFF_SIZE: usize = 512usize;
const FLUSH_INTERVAL: u64 = 2; // seconds

#[derive(Serialize, Deserialize)]
pub struct Entry {
pub cluster: String,
pub cmd: String,
pub start: String,
pub total_dur: u128,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上128干啥。。。

pub remote_dur: u128,
pub subs: Option<Vec<Entry>>,
}

pub struct SlowlogHandle<I>
where
I: Stream<Item = Entry, Error = ()>,
{
input: I,

file_path: String,
file_size: u64,
max_file_size: u64,
file_bakckup: u8,

bw: BufWriter<File>,
buff_size: usize,

delay: Delay,
}

impl<I> SlowlogHandle<I>
where
I: Stream<Item = Entry, Error = ()>,
{
pub fn new(
file_path: String,
max_file_size: u64,
file_bakckup: u8,
input: I,
) -> SlowlogHandle<I> {
let (file_size, bw) = Self::open(file_path.clone());

let delay = Delay::new(Instant::now());

SlowlogHandle {
file_path,
file_size,
max_file_size: max_file_size*1024*1024,
file_bakckup,
bw,
buff_size: 0usize,
input,
delay,
}
}

fn open(file_path: String) -> (u64, BufWriter<File>) {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(file_path)
.expect("fail to open slowlog file");

let file_size = file
.metadata()
.unwrap()
.len();

(file_size, BufWriter::new(file))
}

fn try_rotate(&mut self) {
if self.max_file_size <= 0 {
return
}

if self.file_size < self.max_file_size {
return
}

if self.file_bakckup > 0 {
(0..self.file_bakckup)
.for_each(|n|{
let mut from = format!("{}.{}", self.file_path, self.file_bakckup-n-1);
let to = format!("{}.{}", self.file_path, self.file_bakckup-n);
if self.file_bakckup-n-1 == 0 {
from = self.file_path.clone();
}
let _ = fs::rename(from, to);
});

let (file_size, bw) = Self::open(self.file_path.clone());
self.file_size = file_size;
self.bw = bw;
}
}

fn try_recv(&mut self) {
loop {
match self.input.poll() {
Ok(Async::Ready(entry)) => {
if let Ok(mut entry) = serde_json::to_string(&entry) {
entry.push_str("\r\n");
if let Ok(size) = self.bw.write(entry.as_bytes()) {
self.buff_size += size;
self.file_size += size as u64;
}
}
},
Ok(Async::NotReady) => return,
Err(_) => return,
};
}
}

fn try_flush(&mut self) {
if self.buff_size > BUFF_SIZE || self.delay.is_elapsed() {
match self.bw.flush() {
Ok(_) => {
self.buff_size = 0;
self.delay.reset(Instant::now() + Duration::from_secs(FLUSH_INTERVAL));
self.try_rotate();
},
Err(err) => {
error!("fail to save slowlog: {}", err);
},
}
return
}
}
}

impl<I> Future for SlowlogHandle<I>
where
I: Stream<Item = Entry, Error = ()>,
{
type Item = ();
type Error = ();

fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
let _ = self.delay.poll();
self.try_recv();
if self.buff_size > 0 {
self.try_flush();
}
return Ok(Async::NotReady)
}
}

pub fn run(
file_path: String,
max_file_size: u64,
file_bakckup: u8,
input: Receiver<Entry>,
)
{
thread_incr();
current_thread::block_on_all(
SlowlogHandle::new(file_path, max_file_size, file_bakckup, input).map_err(|_| error!("fail to init slowlog handle")),
).unwrap();

}
Loading