diff --git a/protocol/src/memcache/binary/mod.rs b/protocol/src/memcache/binary/mod.rs index 2e84e1813..d838db5e7 100644 --- a/protocol/src/memcache/binary/mod.rs +++ b/protocol/src/memcache/binary/mod.rs @@ -190,6 +190,12 @@ impl Protocol for MemcacheBinary { None } } + + // mc目前不需要统计error,因为mc的error基本都是get miss,del not-found这种,这种错误不需要统计 + #[inline] + fn metric_err(&self) -> bool { + false + } } impl MemcacheBinary { // 根据req构建response,status为mc协议status,共11种 diff --git a/protocol/src/parser.rs b/protocol/src/parser.rs index 97b1e7ddf..681f60c1c 100644 --- a/protocol/src/parser.rs +++ b/protocol/src/parser.rs @@ -128,6 +128,10 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static { fn max_tries(&self, _req_op: Operation) -> u8 { 1_u8 } + + fn metric_err(&self) -> bool { + true + } } pub trait RequestProcessor { diff --git a/stream/src/handler.rs b/stream/src/handler.rs index c809d8d9a..270c2d1b9 100644 --- a/stream/src/handler.rs +++ b/stream/src/handler.rs @@ -19,6 +19,7 @@ pub struct Handler<'r, Req, P, S> { s: S, parser: P, rtt: Metric, + err: Metric, host_metric: HostMetric, num: Number, @@ -63,12 +64,14 @@ where data.enable(); let name = path.clone(); let rtt = path.rtt("req"); + let err = path.qps("be_err"); Self { data, pending: VecDeque::with_capacity(31), s, parser, rtt, + err, host_metric: HostMetric::from(path), num: Number::default(), req_buf: Vec::with_capacity(4), @@ -142,8 +145,12 @@ where } let (req, start) = self.pending.pop_front().expect("take response"); self.num.rx(); - // 统计请求耗时。 + // 统计请求耗时、异常响应 self.rtt += start.elapsed(); + if self.parser.metric_err() && !cmd.ok() { + self.err += 1; + } + self.parser.check(&*req, &cmd); req.on_complete(cmd); continue; diff --git a/stream/src/pipeline.rs b/stream/src/pipeline.rs index f0e8490c2..ee56ae19e 100644 --- a/stream/src/pipeline.rs +++ b/stream/src/pipeline.rs @@ -172,10 +172,16 @@ where let op = ctx.request().operation(); if let Some(rsp) = response { - if ctx.is_write_back() && rsp.ok() { + let rsp_ok = rsp.ok(); + if ctx.is_write_back() && rsp_ok { ctx.async_write_back(&self.parser, rsp, self.top.exp_sec(), &mut self.metrics); self.async_pending.push_back(ctx); } + + // 不区分是否last,这样更精确的感知总的异常响应数量 + if self.parser.metric_err() && !rsp_ok { + *self.metrics.err() += 1; + } } // 数据写完,统计耗时。当前数据只写入到buffer中, diff --git a/tests/src/bkdrsub.rs b/tests/src/bkdrsub.rs index b312f0b49..732aa4aec 100644 --- a/tests/src/bkdrsub.rs +++ b/tests/src/bkdrsub.rs @@ -12,9 +12,15 @@ use sharding::{ fn bkdrsub_one() { let hasher = Hasher::from("bkdrsub"); - let key1 = "mfh15d#3940964349989430"; + let key1 = "otdn#1042015:carSubBrand_e4ab74c125e9e95edad691ffe9820118"; let hash1 = hasher.hash(&key1.as_bytes()); - println!("key:{}, hash:{}, idx:{}", key1, hash1, hash1 % 180); + + let shards = 1080; + let servers = vec!["padding".to_string(); shards]; + let dist = Distribute::from("modrange-8640", &servers); + let dist_idx = dist.index(hash1); + + println!("key:{}, hash:{}, idx:{}", key1, hash1, dist_idx); } // TODO 临时批量文件的hash、dist校验测试,按需打开