From e47ef1f0d2666a0d36c8db3b56b81a082dbe6dcc Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Tue, 20 Jun 2023 19:03:52 +0800 Subject: [PATCH] chore: minor fix (#1801) --- src/meta-client/src/client.rs | 4 +- src/meta-srv/src/service/store/etcd.rs | 51 +++++++++----------------- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 78a612f4a270..5842b431c48e 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -756,7 +756,7 @@ mod tests { let tc = new_client("test_batch_put").await; let mut req = BatchPutRequest::new(); - for i in 0..256 { + for i in 0..275 { req = req.add_kv( tc.key(&format!("key-{}", i)), format!("value-{}", i).into_bytes(), @@ -769,7 +769,7 @@ mod tests { let req = RangeRequest::new().with_prefix(tc.key("key-")); let res = tc.client.range(req).await; let kvs = res.unwrap().take_kvs(); - assert_eq!(256, kvs.len()); + assert_eq!(275, kvs.len()); } #[tokio::test] diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index 22834b355b3b..c97543137c52 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -32,6 +32,10 @@ use crate::error::Result; use crate::metrics::METRIC_META_KV_REQUEST; use crate::service::store::kv::{KvStore, KvStoreRef}; +// Maximum number of operations permitted in a transaction. +// The etcd default configuration's `--max-txn-ops` is 128. +// +// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/ const MAX_TXN_SIZE: usize = 128; pub struct EtcdStore { @@ -55,7 +59,7 @@ impl EtcdStore { Ok(Arc::new(Self { client })) } - async fn do_multi_txn(&self, mut txn_ops: Vec) -> Result> { + async fn do_multi_txn(&self, txn_ops: Vec) -> Result> { if txn_ops.len() < MAX_TXN_SIZE { // fast path let txn = Txn::new().and_then(txn_ops); @@ -68,36 +72,17 @@ impl EtcdStore { return Ok(vec![txn_res]); } - let mut txns = vec![]; - loop { - if txn_ops.is_empty() { - break; - } - - if txn_ops.len() < MAX_TXN_SIZE { - let txn = Txn::new().and_then(txn_ops); - txns.push(txn); - break; - } - - let part = txn_ops.drain(..MAX_TXN_SIZE).collect::>(); - let txn = Txn::new().and_then(part); - txns.push(txn); - } + let txns = txn_ops + .chunks(MAX_TXN_SIZE) + .map(|part| async move { + let txn = Txn::new().and_then(part); + self.client.kv_client().txn(txn).await + }) + .collect::>(); - let mut txn_responses = Vec::with_capacity(txns.len()); - // Considering the pressure on etcd, it would be more appropriate to execute txn in - // a serial manner. - for txn in txns { - let txn_res = self - .client - .kv_client() - .txn(txn) - .await - .context(error::EtcdFailedSnafu)?; - txn_responses.push(txn_res); - } - Ok(txn_responses) + futures::future::try_join_all(txns) + .await + .context(error::EtcdFailedSnafu) } } @@ -241,7 +226,7 @@ impl KvStore for EtcdStore { prev_kvs.push(KvPair::from_etcd_kv(prev_kv)); } } - _ => unreachable!(), // never get here + _ => unreachable!(), } } } @@ -283,7 +268,7 @@ impl KvStore for EtcdStore { prev_kvs.push(KvPair::from_etcd_kv(kv)); }); } - _ => unreachable!(), // never get here + _ => unreachable!(), } } } @@ -343,7 +328,7 @@ impl KvStore for EtcdStore { let prev_kv = match op_res { TxnOpResponse::Put(res) => res.prev_key().map(KvPair::from_etcd_kv), TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::from_etcd_kv), - _ => unreachable!(), // never get here + _ => unreachable!(), }; let header = Some(ResponseHeader::success(cluster_id));