From c55f6ecedc2a4d62723631288287d8f6d1eefa31 Mon Sep 17 00:00:00 2001 From: yuyiming Date: Mon, 20 Jul 2020 10:55:08 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=85=A2=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 6 +- Cargo.toml | 2 + src/cli.yml | 14 ++- src/com.rs | 8 ++ src/lib.rs | 33 +++++-- src/metrics/slowlog.rs | 171 +++++++++++++++++++++++++++++++++- src/metrics/tracker.rs | 2 +- src/protocol/mc.rs | 56 ++++++++++- src/protocol/mc/msg.rs | 4 + src/protocol/redis.rs | 99 +++++++++++++++++++- src/protocol/redis/resp.rs | 4 + src/proxy/cluster.rs | 14 +-- src/proxy/cluster/fetcher.rs | 1 + src/proxy/cluster/front.rs | 13 ++- src/proxy/cluster/init.rs | 9 +- src/proxy/standalone.rs | 15 ++- src/proxy/standalone/front.rs | 15 ++- 17 files changed, 433 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff05e11..af2c818 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -321,6 +321,7 @@ dependencies = [ "btoi", "byteorder", "bytes", + "chrono", "clap", "criterion", "env_logger", @@ -340,6 +341,7 @@ dependencies = [ "rayon", "serde", "serde_derive", + "serde_json", "slab", "sysinfo", "tokio", @@ -516,9 +518,9 @@ checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" [[package]] name = "chrono" -version = "0.4.11" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2" +checksum = "c74d84029116787153e02106bf53e66828452a4b325cc8652b788b5967c0a0b6" dependencies = [ "num-integer", "num-traits", diff --git a/Cargo.toml b/Cargo.toml index 0eda3f0..6fd798f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ futures= "0.1" toml="0.4" serde="1.0" serde_derive="1.0" +serde_json = "1.0" itoa= "0.4.4" net2="0.2" md5="0.6" @@ -59,6 +60,7 @@ actix-rt = "0.2.5" sysinfo = "0.9.5" rayon = "1.2.0" hotwatch = "0.4.3" +chrono = "0.4.13" [profile.release] debug = true diff --git a/src/cli.yml b/src/cli.yml index 5df8dfd..c5caf48 100644 --- a/src/cli.yml +++ b/src/cli.yml @@ -6,7 +6,6 @@ args: value_name: FILE help: Sets a custom config file takes_value: true - required: true - ip: short: i long: ip @@ -23,3 +22,16 @@ args: short: r long: reload help: enable reload feature for standalone proxy mode. + - slowlog-slower-than: + long: slowlog-slower-than + help: slowlog-slower-than is the microseconds which slowlog must slower than. DEFAULT 100ms + - slowlog-file-path: + long: slowlog-file_path + help: the file where slowlog output. + - slowlog-file-size: + long: slowlog-file-size + help: single slowlog file size in Mb. DEFAULT 200Mb + - slowlog-file-backup: + long: slowlog-file-backup + help: single slowlog file backup. DEFAULT 3 + diff --git a/src/com.rs b/src/com.rs index 4aaac58..e2cb54c 100644 --- a/src/com.rs +++ b/src/com.rs @@ -252,6 +252,8 @@ pub struct ClusterConfig { // dead option: always 1 pub node_connections: Option, + + pub slowlog_slow_than: Option, } impl ClusterConfig { @@ -265,6 +267,12 @@ impl ClusterConfig { pub(crate) fn fetch_interval_ms(&self) -> u64 { self.fetch_interval.unwrap_or(DEFAULT_FETCH_INTERVAL_MS) } + + pub fn set_slowlog_slow_than(&mut self, value: u128) { + if self.slowlog_slow_than.is_none() { + self.slowlog_slow_than.replace(value); + } + } } #[cfg(windows)] diff --git a/src/lib.rs b/src/lib.rs index bf6b987..f131223 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,12 +31,20 @@ use failure::Error; use com::meta::{load_meta, meta_init}; use com::ClusterConfig; use metrics::thread_incr; +use metrics::slowlog::{self, Entry}; + use std::time::Duration; +use futures::sync::mpsc::{channel, Sender}; + pub fn run() -> Result<(), Error> { env_logger::init(); let yaml = load_yaml!("cli.yml"); let matches = App::from_yaml(yaml).version(ASTER_VERSION).get_matches(); + if matches.is_present("version") { + println!("{}", ASTER_VERSION); + return Ok(()) + } let config = matches.value_of("config").unwrap_or("default.toml"); let watch_file = config.to_string(); let ip = matches.value_of("ip").map(|x| x.to_string()); @@ -50,8 +58,12 @@ pub fn run() -> Result<(), Error> { ); crate::proxy::standalone::reload::init(&watch_file, cfg.clone(), enable_reload)?; + let (tx, rx) = channel(1024); + + let slowlog_slower_than = matches.value_of("slowlog-slower-than").unwrap_or("10").parse::().unwrap(); + let mut ths = Vec::new(); - for cluster in cfg.clusters.into_iter() { + for mut cluster in cfg.clusters.into_iter() { if cluster.servers.is_empty() { warn!( "fail to running cluster {} in addr {} due filed `servers` is empty", @@ -68,23 +80,31 @@ pub fn run() -> Result<(), Error> { continue; } + cluster.set_slowlog_slow_than(slowlog_slower_than); + info!( "starting aster cluster {} in addr {}", cluster.name, cluster.listen_addr ); + match cluster.cache_type { com::CacheType::RedisCluster => { - let jhs = spwan_worker(&cluster, ip.clone(), proxy::cluster::spawn); + let jhs = spwan_worker(&cluster, ip.clone(), tx.clone(), proxy::cluster::spawn); ths.extend(jhs); } _ => { - let jhs = spwan_worker(&cluster, ip.clone(), proxy::standalone::spawn); + let jhs = spwan_worker(&cluster, ip.clone(), tx.clone(), proxy::standalone::spawn); ths.extend(jhs); } } } + let slowlog_file_path = matches.value_of("slowlog-file-path").unwrap_or("aster-slowlog.log").to_string(); + let slowlog_file_size = matches.value_of("slowlog-file-size").unwrap_or("200").parse::().unwrap(); + let slowlog_file_backup = matches.value_of("slowlog-file-backup").unwrap_or("3").parse::().unwrap(); + thread::spawn(move || slowlog::run(slowlog_file_path, slowlog_file_size, slowlog_file_backup, rx)); + { let port_str = matches.value_of("metrics").unwrap_or("2110"); let port = port_str.parse::().unwrap_or(2110); @@ -97,9 +117,9 @@ pub fn run() -> Result<(), Error> { Ok(()) } -fn spwan_worker(cc: &ClusterConfig, ip: Option, spawn_fn: T) -> Vec> +fn spwan_worker(cc: &ClusterConfig, ip: Option, slowlog_tx: Sender, spawn_fn: T) -> Vec> where - T: Fn(ClusterConfig) + Copy + Send + 'static, + T: Fn(ClusterConfig, Sender) + Copy + Send + 'static, { let worker = cc.thread.unwrap_or(4); let meta = load_meta(cc.clone(), ip); @@ -108,12 +128,13 @@ where .map(|_index| { let cc = cc.clone(); let meta = meta.clone(); + let tx = slowlog_tx.clone(); Builder::new() .name(cc.name.clone()) .spawn(move || { thread_incr(); meta_init(meta); - spawn_fn(cc); + spawn_fn(cc, tx); }) .expect("fail to spawn worker thread") }) diff --git a/src/metrics/slowlog.rs b/src/metrics/slowlog.rs index decdfee..1b9428d 100644 --- a/src/metrics/slowlog.rs +++ b/src/metrics/slowlog.rs @@ -1 +1,170 @@ -//! slowlog feature mod +use std::fs::{self, File, OpenOptions}; +use std::io::{Write, BufWriter}; +use std::time::{Duration, Instant}; + +use super::thread_incr; + +use serde::{Deserialize, Serialize}; +use serde_json; +use tokio::timer::Delay; +use futures::{Async, Future, Stream}; +use futures::sync::mpsc::Receiver; +use tokio::runtime::current_thread; + +const BUFF_SIZE: usize = 200usize; +const FLUSH_INTERVAL: u64 = 1; // seconds + +#[derive(Serialize, Deserialize)] +pub struct Entry { + pub cluster: String, + pub cmd: String, + pub start: String, + pub total_dur: u128, + pub remote_dur: u128, + pub subs: Option>, +} + +pub struct SlowlogHandle +where + I: Stream, +{ + input: I, + + file_path: String, + file_size: u32, // MB + file_bakckup: u8, + + bw: BufWriter, + buff_size: usize, + + delay: Delay, +} + +impl SlowlogHandle +where + I: Stream, +{ + pub fn new( + file_path: String, + file_size: u32, + file_bakckup: u8, + input: I, + ) -> SlowlogHandle { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(file_path.clone()) + .unwrap(); + let bw = BufWriter::new(file); + + let delay = Delay::new(Instant::now()); + + SlowlogHandle { + file_path, + file_size, + file_bakckup, + bw, + buff_size: 0usize, + input, + delay, + } + } + + fn try_rotate(&mut self) { + let file_size = match fs::metadata(self.file_path.clone()) { + Ok(meta) => meta.len(), + Err(_) => return, + }; + + if self.file_size <= 0 { + return + } + + if file_size < (self.file_size * 1024 * 1024) as u64 { + return + } + + if self.file_bakckup > 0 { + (0..self.file_bakckup) + .for_each(|n|{ + let mut from = format!("{}.{}", self.file_path, self.file_bakckup-n-1); + let to = format!("{}.{}", self.file_path, self.file_bakckup-n); + if self.file_bakckup-n-1 == 0 { + from = self.file_path.clone(); + } + let _ = fs::rename(from, to); + }); + + self.bw = BufWriter::new( + OpenOptions::new() + .create(true) + .append(true) + .open(self.file_path.clone()) + .unwrap() + ); + } + } + + fn try_recv(&mut self) { + loop { + match self.input.poll() { + Ok(Async::Ready(entry)) => { + if let Ok(entry) = serde_json::to_string(&entry) { + let _ = self.bw.write(entry.as_bytes()); + let _ = self.bw.write(b"\r\n"); + self.buff_size += 1; + } + }, + Ok(Async::NotReady) => return, + Err(_) => return, + }; + } + } + + fn try_flush(&mut self) { + if self.buff_size > BUFF_SIZE || self.delay.is_elapsed() { + match self.bw.flush() { + Ok(_) => { + self.buff_size = 0; + self.delay.reset(Instant::now() + Duration::from_secs(FLUSH_INTERVAL)); + self.try_rotate(); + }, + Err(err) => { + error!("fail to save slowlog: {}", err); + }, + } + return + } + } +} + +impl Future for SlowlogHandle +where + I: Stream, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Result, Self::Error> { + let _ = self.delay.poll(); + self.try_recv(); + if self.buff_size > 0 { + self.try_flush(); + } + return Ok(Async::NotReady) + } +} + +pub fn run( + file_path: String, + file_size: u32, + file_bakckup: u8, + input: Receiver, +) +{ + thread_incr(); + current_thread::block_on_all( + SlowlogHandle::new(file_path, file_size, file_bakckup, input).map_err(|_| error!("fail to init slowlog handle")), + ).unwrap(); + +} \ No newline at end of file diff --git a/src/metrics/tracker.rs b/src/metrics/tracker.rs index 2c4b6e0..717f43d 100644 --- a/src/metrics/tracker.rs +++ b/src/metrics/tracker.rs @@ -3,7 +3,7 @@ use prometheus::Histogram; use std::time::Instant; pub struct Tracker { - start: Instant, + pub start: Instant, hist: Histogram, } diff --git a/src/protocol/mc.rs b/src/protocol/mc.rs index 0f85dae..f988655 100644 --- a/src/protocol/mc.rs +++ b/src/protocol/mc.rs @@ -2,17 +2,20 @@ use bytes::BytesMut; use futures::task::Task; use tokio::codec::{Decoder, Encoder}; +use chrono::{Duration, Local}; use crate::metrics::*; +use crate::metrics::slowlog::Entry; -use crate::com::AsError; +use crate::com::{AsError, ClusterConfig}; use crate::protocol::{CmdFlags, CmdType, IntoReply}; use crate::proxy::standalone::Request; use crate::utils::notify::Notify; use crate::utils::trim_hash_tag; -use std::cell::RefCell; +use std::cell::{Ref, RefCell}; use std::rc::Rc; +use std::time; pub mod msg; pub use self::msg::Message; @@ -55,6 +58,7 @@ impl Request for Cmd { total_tracker: None, remote_tracker: None, + remote_dur: None, }; let mut notify = Notify::empty(); notify.set_expect(1); @@ -105,7 +109,7 @@ impl Request for Cmd { let reply = t.into_reply(); self.cmd.borrow_mut().set_reply(reply); } - + fn set_error(&self, t: &AsError) { let reply: Message = t.into_reply(); self.cmd.borrow_mut().set_error(reply); @@ -120,6 +124,44 @@ impl Request for Cmd { let timer = remote_tracker(cluster); self.cmd.borrow_mut().remote_tracker.replace(timer); } + + fn slowlog_entry(&self, cc: Ref) -> Option { + let cmd = self.cmd.borrow(); + let total_dur = match &cmd.total_tracker { + Some(tracker) => tracker.start.elapsed().as_micros(), + None => return None, + }; + + match cc.slowlog_slow_than { + Some(slow_than) => { + if total_dur < slow_than { + return None + } + }, + None => return None, + }; + + let start = (Local::now() - Duration::microseconds(total_dur as i64)).format("%Y-%m-%d %H:%M:%S").to_string(); + + let remote_dur = match cmd.remote_dur { + Some(dur) => dur.as_micros(), + None => return None, + }; + + let subs = match &cmd.subs { + Some(cmds) => Some(cmds.iter().map(|x|x.cmd.borrow().req.str_data()).collect()), + None => None, + }; + + Some(Entry { + cluster: cc.name.clone(), + cmd: cmd.req.str_data(), + total_dur, + remote_dur, + start, + subs, + }) + } } impl Cmd { @@ -143,6 +185,7 @@ impl Cmd { total_tracker: None, remote_tracker: None, + remote_dur: None, }; Cmd { notify: notify.clone(), @@ -162,6 +205,7 @@ impl Cmd { total_tracker: None, remote_tracker: None, + remote_dur: None, }; Cmd { cmd: Rc::new(RefCell::new(command)), @@ -190,6 +234,8 @@ pub struct Command { total_tracker: Option, remote_tracker: Option, + + remote_dur: Option, } impl Command { @@ -213,7 +259,9 @@ impl Command { self.reply = Some(reply); self.set_done(); - let _ = self.remote_tracker.take(); + if let Some(tracker) = self.remote_tracker.take() { + self.remote_dur.replace(tracker.start.elapsed()); + } } pub fn set_error(&mut self, reply: Message) { diff --git a/src/protocol/mc/msg.rs b/src/protocol/mc/msg.rs index 827a78f..3ff05dd 100644 --- a/src/protocol/mc/msg.rs +++ b/src/protocol/mc/msg.rs @@ -725,6 +725,10 @@ impl Message { flags, })) } + + pub fn str_data(&self) -> String { + String::from_utf8(self.data.iter().map(|x|*x).collect::>()).unwrap_or("".to_string()) + } } #[cfg(test)] diff --git a/src/protocol/redis.rs b/src/protocol/redis.rs index b306963..37127bf 100644 --- a/src/protocol/redis.rs +++ b/src/protocol/redis.rs @@ -1,10 +1,11 @@ use bytes::{Bytes, BytesMut}; use futures::task::Task; use tokio::codec::{Decoder, Encoder}; +use chrono::{Duration, Local}; use crate::metrics::*; - -use crate::com::{meta, AsError}; +use crate::metrics::slowlog::Entry; +use crate::com::{meta, AsError, ClusterConfig}; use crate::protocol::IntoReply; use crate::protocol::{CmdFlags, CmdType}; use crate::proxy::standalone::Request; @@ -15,6 +16,7 @@ use std::cell::{Ref, RefCell, RefMut}; use std::rc::Rc; use std::u64; use std::collections::{BTreeMap, HashSet}; +use std::time; pub const SLOTS_COUNT: usize = 16384; @@ -70,6 +72,7 @@ impl Request for Cmd { total_tracker: None, remote_tracker: None, + remote_dur: None, }; cmd.into_cmd(notify) } @@ -133,6 +136,44 @@ impl Request for Cmd { let timer = remote_tracker(cluster); self.cmd.borrow_mut().remote_tracker.replace(timer); } + + fn slowlog_entry(&self, cc: Ref) -> Option { + let cmd = self.cmd.borrow(); + let total_dur = match &cmd.total_tracker { + Some(tracker) => tracker.start.elapsed().as_micros(), + None => return None, + }; + + match cc.slowlog_slow_than { + Some(slow_than) => { + if total_dur < slow_than { + return None + } + }, + None => return None, + }; + + let start = (Local::now() - Duration::microseconds(total_dur as i64)).format("%Y-%m-%d %H:%M:%S").to_string(); + + let remote_dur = match cmd.remote_dur { + Some(dur) => dur.as_micros(), + None => return None, + }; + + let subs = match &cmd.subs { + Some(cmds) => Some(cmds.iter().map(|x|x.borrow().req.str_data()).collect()), + None => None, + }; + + Some(Entry { + cluster: cc.name.clone(), + cmd: cmd.req.str_data(), + total_dur, + remote_dur, + start, + subs, + }) + } } impl Cmd { @@ -145,9 +186,48 @@ impl Cmd { let timer = remote_tracker(cluster); if self.cmd.borrow().remote_tracker.is_none() { self.cmd.borrow_mut().remote_tracker.replace(timer); + } } + pub fn slowlog_entry(&self, cc: Ref) -> Option { + let cmd = self.borrow(); + let total_dur = match &cmd.total_tracker { + Some(tracker) => tracker.start.elapsed().as_micros(), + None => return None, + }; + + match cc.slowlog_slow_than { + Some(slow_than) => { + if total_dur < slow_than { + return None + } + }, + None => return None, + }; + + let start = (Local::now() - Duration::microseconds(total_dur as i64)).format("%Y-%m-%d %H:%M:%S").to_string(); + + let remote_dur = match cmd.remote_dur { + Some(dur) => dur.as_micros(), + None => return None, + }; + + let subs = match &cmd.subs { + Some(cmds) => Some(cmds.iter().map(|x|x.borrow().req.str_data()).collect()), + None => None, + }; + + Some(Entry { + cluster: cc.name.clone(), + cmd: cmd.req.str_data(), + total_dur, + remote_dur, + start, + subs, + }) + } + pub fn incr_notify(&self, count: u16) { self.notify.fetch_add(count); } @@ -258,6 +338,7 @@ pub struct Command { total_tracker: Option, remote_tracker: Option, + remote_dur: Option, } const BYTES_JUSTOK: &[u8] = b"+OK\r\n"; @@ -430,7 +511,9 @@ impl Command { self.reply = Some(reply.into_reply()); self.set_done(); - let _ = self.remote_tracker.take(); + if let Some(tracker) = self.remote_tracker.take() { + self.remote_dur.replace(tracker.start.elapsed()); + } } fn set_done(&mut self) { @@ -535,6 +618,7 @@ impl Command { total_tracker: None, remote_tracker: None, + remote_dur: None, }; subs.push(subcmd.into_cmd(notify.clone())); @@ -550,6 +634,7 @@ impl Command { total_tracker: None, remote_tracker: None, + remote_dur: None, }; command.into_cmd(notify) } else { @@ -564,6 +649,7 @@ impl Command { total_tracker: None, remote_tracker: None, + remote_dur: None, }; let cmd = cmd.into_cmd(notify); cmd.set_reply(&AsError::RequestInlineWithMultiKeys); @@ -599,6 +685,7 @@ impl Command { total_tracker: None, remote_tracker: None, + remote_dur: None, }; subs.push(subcmd.into_cmd(notify.clone())); @@ -615,6 +702,7 @@ impl Command { total_tracker: None, remote_tracker: None, + remote_dur: None, }; cmd.into_cmd(notify) } else { @@ -629,6 +717,7 @@ impl Command { total_tracker: None, remote_tracker: None, + remote_dur: None, }; let cmd = cmd.into_cmd(notify); cmd.set_reply(&AsError::RequestInlineWithMultiKeys); @@ -665,6 +754,7 @@ impl From for Cmd { total_tracker: None, remote_tracker: None, + remote_dur: None, }; let cmd: Cmd = command.into_cmd(notify); cmd.set_reply(AsError::RequestNotSupport); @@ -692,6 +782,7 @@ impl From for Cmd { total_tracker: None, remote_tracker: None, + remote_dur: None, }; if ctype.is_ctrl() { if let Some(data) = msg.nth(COMMAND_POS) { @@ -768,6 +859,7 @@ pub fn new_read_only_cmd() -> Cmd { total_tracker: None, remote_tracker: None, + remote_dur: None, }; cmd.into_cmd(notify) } @@ -790,6 +882,7 @@ pub fn new_cluster_slots_cmd() -> Cmd { total_tracker: None, remote_tracker: None, + remote_dur: None, }; cmd.into_cmd(notify) } diff --git a/src/protocol/redis/resp.rs b/src/protocol/redis/resp.rs index c6da9dd..52e057c 100644 --- a/src/protocol/redis/resp.rs +++ b/src/protocol/redis/resp.rs @@ -405,6 +405,10 @@ impl Message { self.data.as_ref() } + pub fn str_data(&self) -> String { + String::from_utf8(self.data.iter().map(|x|*x).collect::>()).unwrap_or("".to_string()) + } + pub fn data(&self) -> Option<&[u8]> { let range = self.get_range(Some(&self.rtype)); range.map(|rg| &self.data.as_ref()[rg.begin()..rg.end()]) diff --git a/src/proxy/cluster.rs b/src/proxy/cluster.rs index 85286b8..11150cc 100644 --- a/src/proxy/cluster.rs +++ b/src/proxy/cluster.rs @@ -14,12 +14,14 @@ use crate::proxy::cluster::fetcher::SingleFlightTrigger; use crate::utils::crc::crc16; use crate::metrics::front_conn_incr; +use crate::metrics::slowlog::Entry; // use failure::Error; use futures::future::ok; use futures::future::*; use futures::task; use futures::unsync::mpsc::{channel, Receiver, Sender}; +use futures::sync::mpsc::Sender as SyncSender; use futures::AsyncSink; use futures::{Sink, Stream}; @@ -160,7 +162,7 @@ impl Cluster { current_thread::spawn(fetch); } - fn handle_front_conn(self: &Rc, sock: TcpStream) { + fn handle_front_conn(self: &Rc, sock: TcpStream, slowlog_tx: SyncSender) { let cluster_name = self.cc.borrow().name.clone(); sock.set_nodelay(true).unwrap_or_else(|err| { warn!( @@ -183,13 +185,13 @@ impl Cluster { front_conn_incr(&cluster_name); let codec = RedisHandleCodec {}; let (output, input) = codec.framed(sock).split(); - let fut = front::Front::new(client_str, self.clone(), input, output); + let fut = front::Front::new(client_str, self.clone(), input, output, slowlog_tx); current_thread::spawn(fut); } } impl Cluster { - pub(crate) fn run(cc: ClusterConfig, replica: ReplicaLayout) -> Result<(), AsError> { + pub(crate) fn run(cc: ClusterConfig, replica: ReplicaLayout, slowlog_tx: SyncSender) -> Result<(), AsError> { let addr = cc .listen_addr .parse::() @@ -218,7 +220,7 @@ impl Cluster { .incoming() .for_each(move |sock| { let cluster = cluster.clone(); - cluster.handle_front_conn(sock); + cluster.handle_front_conn(sock, slowlog_tx.clone()); Ok(()) }) .map_err(|err| { @@ -720,9 +722,9 @@ impl ConnBuilder { } } -pub(crate) fn spawn(cc: ClusterConfig) { +pub(crate) fn spawn(cc: ClusterConfig, slowlog_tx: SyncSender) { current_thread::block_on_all( - init::Initializer::new(cc).map_err(|err| error!("fail to init cluster due to {}", err)), + init::Initializer::new(cc, slowlog_tx).map_err(|err| error!("fail to init cluster due to {}", err)), ) .unwrap(); } diff --git a/src/proxy/cluster/fetcher.rs b/src/proxy/cluster/fetcher.rs index 95a15f9..5d33127 100644 --- a/src/proxy/cluster/fetcher.rs +++ b/src/proxy/cluster/fetcher.rs @@ -66,6 +66,7 @@ impl SingleFlightTrigger { fn trigger(&self) { let mut fetch = self.fetch.borrow_mut(); if fetch.start_send(TriggerBy::Error).is_ok() && fetch.poll_complete().is_ok() { + // TODO: self.latest = Instant::now(); info!("succeed trigger fetch process"); return; } diff --git a/src/proxy/cluster/front.rs b/src/proxy/cluster/front.rs index e08623c..f5fecca 100644 --- a/src/proxy/cluster/front.rs +++ b/src/proxy/cluster/front.rs @@ -2,9 +2,11 @@ use crate::com::AsError; use crate::protocol::redis::Cmd; use crate::proxy::cluster::fetcher::TriggerBy; use crate::proxy::cluster::Cluster; +use crate::metrics::slowlog::Entry; use futures::task; use futures::{Async, AsyncSink, Future, Sink, Stream}; +use futures::sync::mpsc::Sender; use std::collections::VecDeque; use std::rc::Rc; @@ -32,6 +34,8 @@ where waitq: VecDeque, state: State, + + slowlog_tx: Sender, } impl Front @@ -39,7 +43,7 @@ where I: Stream, O: Sink, { - pub fn new(client: String, cluster: Rc, input: I, output: O) -> Front { + pub fn new(client: String, cluster: Rc, input: I, output: O, slowlog_tx: Sender) -> Front { Front { cluster, client, @@ -48,6 +52,7 @@ where sendq: VecDeque::with_capacity(MAX_BATCH_SIZE), waitq: VecDeque::with_capacity(MAX_BATCH_SIZE), state: State::Running, + slowlog_tx, } } @@ -69,6 +74,12 @@ where self.cluster.trigger_fetch(TriggerBy::Error); } + if let Some(entry) = cmd.slowlog_entry(self.cluster.cc.borrow()) { + if let Err(err) = self.slowlog_tx.start_send(entry) { + error!("fail to record slowlog: {}", err); + } + } + match self.output.start_send(cmd) { Ok(AsyncSink::Ready) => { count += 1; diff --git a/src/proxy/cluster/init.rs b/src/proxy/cluster/init.rs index f5cf03b..ea515fb 100644 --- a/src/proxy/cluster/init.rs +++ b/src/proxy/cluster/init.rs @@ -1,11 +1,14 @@ use futures::task; use futures::unsync::mpsc::{channel, Sender}; +use futures::sync::mpsc::Sender as SyncSender; use futures::{Async, AsyncSink, Future, Sink}; use crate::com::AsError; use crate::com::ClusterConfig; use crate::protocol::redis::{new_cluster_slots_cmd, slots_reply_to_replicas, Cmd}; use crate::proxy::cluster::{Cluster, ConnBuilder}; +use crate::metrics::slowlog::Entry; + enum State { Pending, @@ -19,14 +22,16 @@ pub struct Initializer { cc: ClusterConfig, current: usize, state: State, + slowlog_tx: SyncSender, } impl Initializer { - pub fn new(cc: ClusterConfig) -> Initializer { + pub fn new(cc: ClusterConfig, slowlog_tx: SyncSender) -> Initializer { Initializer { cc, current: 0, state: State::Pending, + slowlog_tx, } } } @@ -101,7 +106,7 @@ impl Future for Initializer { } State::Done(cmd) => match slots_reply_to_replicas(cmd.clone()) { Ok(Some(replica)) => { - let cluster = Cluster::run(self.cc.clone(), replica); + let cluster = Cluster::run(self.cc.clone(), replica, self.slowlog_tx.clone()); match cluster { Ok(_) => { info!("succeed to create cluster {}", self.cc.name); diff --git a/src/proxy/standalone.rs b/src/proxy/standalone.rs index 70c9e8e..bfe0b06 100644 --- a/src/proxy/standalone.rs +++ b/src/proxy/standalone.rs @@ -9,6 +9,7 @@ use futures::future::ok; use futures::lazy; use futures::task::Task; use futures::unsync::mpsc::{channel, Sender}; +use futures::sync::mpsc::Sender as SyncSender; use futures::{AsyncSink, Future, Sink, Stream}; use tokio::codec::{Decoder, Encoder}; @@ -23,6 +24,7 @@ use std::net::SocketAddr; use std::rc::Rc; use std::time::Duration; use std::process; +use std::cell::Ref; use crate::protocol::{mc, redis}; @@ -32,6 +34,7 @@ use crate::com::AsError; use crate::com::{create_reuse_port_listener, set_read_write_timeout}; use crate::com::{CacheType, ClusterConfig, CODE_PORT_IN_USE}; use crate::protocol::IntoReply; +use crate::metrics::slowlog::Entry; use fnv::fnv1a64; use ketama::HashRing; @@ -69,6 +72,8 @@ pub trait Request: Clone { fn set_reply>(&self, t: R); fn set_error(&self, t: &AsError); + + fn slowlog_entry(&self, cc: Ref) -> Option; } pub struct Cluster { @@ -84,7 +89,7 @@ pub struct Cluster { } impl Cluster { - pub(crate) fn run(cc: ClusterConfig) -> Result<(), AsError> { + pub(crate) fn run(cc: ClusterConfig, slowlog_tx: SyncSender) -> Result<(), AsError> { let addr = cc .listen_addr .parse::() @@ -174,7 +179,7 @@ impl Cluster { let (output, input) = codec.framed(sock).split(); front_conn_incr(&cluster.cc.borrow().name); - let fut = front::Front::new(client_str, cluster_ref, input, output); + let fut = front::Front::new(client_str, cluster_ref, input, output, slowlog_tx.clone()); current_thread::spawn(fut); Ok(()) }) @@ -621,10 +626,10 @@ impl ServerLine { } } -pub(crate) fn spawn(cc: ClusterConfig) { +pub(crate) fn spawn(cc: ClusterConfig, slowlog_tx: SyncSender) { match cc.cache_type { - CacheType::Redis => Cluster::::run(cc).unwrap(), - CacheType::Memcache | CacheType::MemcacheBinary => Cluster::::run(cc).unwrap(), + CacheType::Redis => Cluster::::run(cc, slowlog_tx).unwrap(), + CacheType::Memcache | CacheType::MemcacheBinary => Cluster::::run(cc, slowlog_tx).unwrap(), _ => unreachable!(), }; } diff --git a/src/proxy/standalone/front.rs b/src/proxy/standalone/front.rs index fb80bbe..03e1068 100644 --- a/src/proxy/standalone/front.rs +++ b/src/proxy/standalone/front.rs @@ -1,6 +1,8 @@ use crate::com::AsError; use futures::task; use futures::{Async, AsyncSink, Future, Sink, Stream}; +use futures::sync::mpsc::Sender; + use std::collections::VecDeque; use std::rc::Rc; @@ -8,6 +10,7 @@ use crate::proxy::standalone::Cluster; use crate::proxy::standalone::Request; use crate::metrics::front_conn_decr; +use crate::metrics::slowlog::Entry; const MAX_BATCH_SIZE: usize = 2048; @@ -33,6 +36,8 @@ where sendq: VecDeque, waitq: VecDeque, state: State, + + slowlog_tx: Sender, } impl Front @@ -41,7 +46,7 @@ where I: Stream, O: Sink, { - pub fn new(client: String, cluster: Rc>, input: I, output: O) -> Front { + pub fn new(client: String, cluster: Rc>, input: I, output: O, slowlog_tx: Sender) -> Front { Front { cluster, client, @@ -50,6 +55,7 @@ where sendq: VecDeque::with_capacity(MAX_BATCH_SIZE), waitq: VecDeque::with_capacity(MAX_BATCH_SIZE), state: State::Running, + slowlog_tx, } } fn try_reply(&mut self) -> Result, AsError> { @@ -63,6 +69,13 @@ where self.waitq.push_front(cmd); break; } + + if let Some(entry) = cmd.slowlog_entry(self.cluster.cc.borrow()) { + if let Err(err) = self.slowlog_tx.start_send(entry) { + error!("fail to record slowlog: {}", err); + } + } + match self.output.start_send(cmd) { Ok(AsyncSink::Ready) => { count += 1; From 82b014aaa68c6017496cdf35ee28de1fe654c6b6 Mon Sep 17 00:00:00 2001 From: yuyiming Date: Fri, 31 Jul 2020 10:46:54 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=85=A2=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E8=AE=B0=E5=BD=95=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cli.yml | 8 ++--- src/com.rs | 4 +-- src/lib.rs | 6 ++-- src/metrics/slowlog.rs | 67 +++++++++++++++++++++------------------- src/protocol/mc.rs | 35 +++++++++++++++------ src/protocol/redis.rs | 70 +++++++++++++++++++++++++++++++----------- 6 files changed, 123 insertions(+), 67 deletions(-) diff --git a/src/cli.yml b/src/cli.yml index c5caf48..8a24fe4 100644 --- a/src/cli.yml +++ b/src/cli.yml @@ -24,13 +24,13 @@ args: help: enable reload feature for standalone proxy mode. - slowlog-slower-than: long: slowlog-slower-than - help: slowlog-slower-than is the microseconds which slowlog must slower than. DEFAULT 100ms + help: slowlog-slower-than is the microseconds which slowlog must slower than. DEFAULT 100_000 - slowlog-file-path: long: slowlog-file_path help: the file where slowlog output. - - slowlog-file-size: - long: slowlog-file-size - help: single slowlog file size in Mb. DEFAULT 200Mb + - slowlog-max-file-size: + long: slowlog-max-file-size + help: max size in Mb for single slowlog file. DEFAULT 200Mb - slowlog-file-backup: long: slowlog-file-backup help: single slowlog file backup. DEFAULT 3 diff --git a/src/com.rs b/src/com.rs index 4f65665..90e04bb 100644 --- a/src/com.rs +++ b/src/com.rs @@ -256,7 +256,7 @@ pub struct ClusterConfig { // dead option: always 1 pub node_connections: Option, - pub slowlog_slow_than: Option, + pub slowlog_slow_than: Option, } impl ClusterConfig { @@ -271,7 +271,7 @@ impl ClusterConfig { self.fetch_interval.unwrap_or(DEFAULT_FETCH_INTERVAL_MS) } - pub fn set_slowlog_slow_than(&mut self, value: u128) { + pub fn set_slowlog_slow_than(&mut self, value: u64) { if self.slowlog_slow_than.is_none() { self.slowlog_slow_than.replace(value); } diff --git a/src/lib.rs b/src/lib.rs index f131223..e2bd91c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,7 +60,7 @@ pub fn run() -> Result<(), Error> { let (tx, rx) = channel(1024); - let slowlog_slower_than = matches.value_of("slowlog-slower-than").unwrap_or("10").parse::().unwrap(); + let slowlog_slower_than = matches.value_of("slowlog-slower-than").unwrap_or("100000").parse::().unwrap(); let mut ths = Vec::new(); for mut cluster in cfg.clusters.into_iter() { @@ -101,9 +101,9 @@ pub fn run() -> Result<(), Error> { } let slowlog_file_path = matches.value_of("slowlog-file-path").unwrap_or("aster-slowlog.log").to_string(); - let slowlog_file_size = matches.value_of("slowlog-file-size").unwrap_or("200").parse::().unwrap(); + let slowlog_max_file_size = matches.value_of("slowlog-max-file-size").unwrap_or("200").parse::().unwrap(); let slowlog_file_backup = matches.value_of("slowlog-file-backup").unwrap_or("3").parse::().unwrap(); - thread::spawn(move || slowlog::run(slowlog_file_path, slowlog_file_size, slowlog_file_backup, rx)); + thread::spawn(move || slowlog::run(slowlog_file_path, slowlog_max_file_size, slowlog_file_backup, rx)); { let port_str = matches.value_of("metrics").unwrap_or("2110"); diff --git a/src/metrics/slowlog.rs b/src/metrics/slowlog.rs index 1b9428d..5646d6e 100644 --- a/src/metrics/slowlog.rs +++ b/src/metrics/slowlog.rs @@ -11,8 +11,8 @@ use futures::{Async, Future, Stream}; use futures::sync::mpsc::Receiver; use tokio::runtime::current_thread; -const BUFF_SIZE: usize = 200usize; -const FLUSH_INTERVAL: u64 = 1; // seconds +const BUFF_SIZE: usize = 512usize; +const FLUSH_INTERVAL: u64 = 2; // seconds #[derive(Serialize, Deserialize)] pub struct Entry { @@ -21,7 +21,7 @@ pub struct Entry { pub start: String, pub total_dur: u128, pub remote_dur: u128, - pub subs: Option>, + pub subs: Option>, } pub struct SlowlogHandle @@ -31,7 +31,8 @@ where input: I, file_path: String, - file_size: u32, // MB + file_size: u64, + max_file_size: u64, file_bakckup: u8, bw: BufWriter, @@ -46,22 +47,18 @@ where { pub fn new( file_path: String, - file_size: u32, + max_file_size: u64, file_bakckup: u8, input: I, ) -> SlowlogHandle { - let file = OpenOptions::new() - .create(true) - .append(true) - .open(file_path.clone()) - .unwrap(); - let bw = BufWriter::new(file); + let (file_size, bw) = Self::open(file_path.clone()); let delay = Delay::new(Instant::now()); SlowlogHandle { file_path, file_size, + max_file_size: max_file_size*1024*1024, file_bakckup, bw, buff_size: 0usize, @@ -70,17 +67,27 @@ where } } - fn try_rotate(&mut self) { - let file_size = match fs::metadata(self.file_path.clone()) { - Ok(meta) => meta.len(), - Err(_) => return, - }; + fn open(file_path: String) -> (u64, BufWriter) { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(file_path) + .expect("fail to open slowlog file"); - if self.file_size <= 0 { + let file_size = file + .metadata() + .unwrap() + .len(); + + (file_size, BufWriter::new(file)) + } + + fn try_rotate(&mut self) { + if self.max_file_size <= 0 { return } - if file_size < (self.file_size * 1024 * 1024) as u64 { + if self.file_size < self.max_file_size { return } @@ -95,13 +102,9 @@ where let _ = fs::rename(from, to); }); - self.bw = BufWriter::new( - OpenOptions::new() - .create(true) - .append(true) - .open(self.file_path.clone()) - .unwrap() - ); + let (file_size, bw) = Self::open(self.file_path.clone()); + self.file_size = file_size; + self.bw = bw; } } @@ -109,10 +112,12 @@ where loop { match self.input.poll() { Ok(Async::Ready(entry)) => { - if let Ok(entry) = serde_json::to_string(&entry) { - let _ = self.bw.write(entry.as_bytes()); - let _ = self.bw.write(b"\r\n"); - self.buff_size += 1; + if let Ok(mut entry) = serde_json::to_string(&entry) { + entry.push_str("\r\n"); + if let Ok(size) = self.bw.write(entry.as_bytes()) { + self.buff_size += size; + self.file_size += size as u64; + } } }, Ok(Async::NotReady) => return, @@ -157,14 +162,14 @@ where pub fn run( file_path: String, - file_size: u32, + max_file_size: u64, file_bakckup: u8, input: Receiver, ) { thread_incr(); current_thread::block_on_all( - SlowlogHandle::new(file_path, file_size, file_bakckup, input).map_err(|_| error!("fail to init slowlog handle")), + SlowlogHandle::new(file_path, max_file_size, file_bakckup, input).map_err(|_| error!("fail to init slowlog handle")), ).unwrap(); } \ No newline at end of file diff --git a/src/protocol/mc.rs b/src/protocol/mc.rs index c7aa88c..7b66b45 100644 --- a/src/protocol/mc.rs +++ b/src/protocol/mc.rs @@ -135,7 +135,7 @@ impl Request for Cmd { match cc.slowlog_slow_than { Some(slow_than) => { - if total_dur < slow_than { + if total_dur < slow_than as u128 { return None } }, @@ -144,14 +144,31 @@ impl Request for Cmd { let start = (Local::now() - Duration::microseconds(total_dur as i64)).format("%Y-%m-%d %H:%M:%S").to_string(); - let remote_dur = match cmd.remote_dur { - Some(dur) => dur.as_micros(), - None => return None, - }; - - let subs = match &cmd.subs { - Some(cmds) => Some(cmds.iter().map(|x|x.cmd.borrow().req.str_data()).collect()), - None => None, + let (subs, remote_dur) = match &cmd.subs { + Some(subs) => { + let subs = subs.iter() + .map(|x| { + let sub_cmd = x.cmd.borrow(); + let remote_dur = sub_cmd.remote_dur.map(|x|x.as_micros()).unwrap_or(0); + Entry { + cluster: cc.name.clone(), + cmd: sub_cmd.req.str_data(), + total_dur, + remote_dur, + start: start.clone(), + subs: None, + } + }) + .collect::>(); + (Some(subs), 0) + }, + None => { + let remote_dur = match cmd.remote_dur { + Some(dur) => dur.as_micros(), + None => return None, + }; + (None, remote_dur) + }, }; Some(Entry { diff --git a/src/protocol/redis.rs b/src/protocol/redis.rs index 13d134c..23d231f 100644 --- a/src/protocol/redis.rs +++ b/src/protocol/redis.rs @@ -147,7 +147,7 @@ impl Request for Cmd { match cc.slowlog_slow_than { Some(slow_than) => { - if total_dur < slow_than { + if total_dur < slow_than as u128 { return None } }, @@ -156,14 +156,31 @@ impl Request for Cmd { let start = (Local::now() - Duration::microseconds(total_dur as i64)).format("%Y-%m-%d %H:%M:%S").to_string(); - let remote_dur = match cmd.remote_dur { - Some(dur) => dur.as_micros(), - None => return None, - }; - - let subs = match &cmd.subs { - Some(cmds) => Some(cmds.iter().map(|x|x.borrow().req.str_data()).collect()), - None => None, + let (subs, remote_dur) = match &cmd.subs { + Some(subs) => { + let subs = subs.iter() + .map(|x| { + let sub_cmd = x.borrow(); + let remote_dur = sub_cmd.remote_dur.map(|x|x.as_micros()).unwrap_or(0); + Entry { + cluster: cc.name.clone(), + cmd: sub_cmd.req.str_data(), + total_dur, + remote_dur, + start: start.clone(), + subs: None, + } + }) + .collect::>(); + (Some(subs), 0) + }, + None => { + let remote_dur = match cmd.remote_dur { + Some(dur) => dur.as_micros(), + None => return None, + }; + (None, remote_dur) + }, }; Some(Entry { @@ -212,7 +229,7 @@ impl Cmd { match cc.slowlog_slow_than { Some(slow_than) => { - if total_dur < slow_than { + if total_dur < slow_than as u128 { return None } }, @@ -221,14 +238,31 @@ impl Cmd { let start = (Local::now() - Duration::microseconds(total_dur as i64)).format("%Y-%m-%d %H:%M:%S").to_string(); - let remote_dur = match cmd.remote_dur { - Some(dur) => dur.as_micros(), - None => return None, - }; - - let subs = match &cmd.subs { - Some(cmds) => Some(cmds.iter().map(|x|x.borrow().req.str_data()).collect()), - None => None, + let (subs, remote_dur) = match &cmd.subs { + Some(subs) => { + let subs = subs.iter() + .map(|x| { + let sub_cmd = x.borrow(); + let remote_dur = sub_cmd.remote_dur.map(|x|x.as_micros()).unwrap_or(0); + Entry { + cluster: cc.name.clone(), + cmd: sub_cmd.req.str_data(), + total_dur, + remote_dur, + start: start.clone(), + subs: None, + } + }) + .collect::>(); + (Some(subs), 0) + }, + None => { + let remote_dur = match cmd.remote_dur { + Some(dur) => dur.as_micros(), + None => return None, + }; + (None, remote_dur) + }, }; Some(Entry {