diff --git a/Cargo.lock b/Cargo.lock index 0a833e30782ee..d971a9d982b87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2646,6 +2646,12 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "crc32c" version = "0.6.4" @@ -8678,10 +8684,14 @@ dependencies = [ "async-trait", "bytes", "combine", + "crc16", + "futures", "futures-util", "itoa", + "log", "percent-encoding", "pin-project-lite", + "rand", "ryu", "sha1_smol", "socket2 0.5.6", diff --git a/ci/redis-conf/redis-7000.conf b/ci/redis-conf/redis-7000.conf new file mode 100644 index 0000000000000..a69a6f8d5cb09 --- /dev/null +++ b/ci/redis-conf/redis-7000.conf @@ -0,0 +1,9 @@ +port 7000 + +#enable cluster mode +cluster-enabled yes + +#ms +cluster-node-timeout 15000 + +cluster-config-file "nodes-7000.conf" \ No newline at end of file diff --git a/ci/redis-conf/redis-7001.conf b/ci/redis-conf/redis-7001.conf new file mode 100644 index 0000000000000..db8ff90d1866f --- /dev/null +++ b/ci/redis-conf/redis-7001.conf @@ -0,0 +1,9 @@ +port 7001 + +#enable cluster mode +cluster-enabled yes + +#ms +cluster-node-timeout 15000 + +cluster-config-file "nodes-7001.conf" \ No newline at end of file diff --git a/ci/redis-conf/redis-7002.conf b/ci/redis-conf/redis-7002.conf new file mode 100644 index 0000000000000..ed68ddbfe21f6 --- /dev/null +++ b/ci/redis-conf/redis-7002.conf @@ -0,0 +1,9 @@ +port 7002 + +#enable cluster mode +cluster-enabled yes + +#ms +cluster-node-timeout 15000 + +cluster-config-file "nodes-7002.conf" \ No newline at end of file diff --git a/ci/scripts/e2e-redis-sink-test.sh b/ci/scripts/e2e-redis-sink-test.sh index cf64662db4051..210bf27bbe793 100755 --- a/ci/scripts/e2e-redis-sink-test.sh +++ b/ci/scripts/e2e-redis-sink-test.sh @@ -44,5 +44,25 @@ else exit 1 fi +echo "--- testing cluster sinks" +redis-server ./ci/redis-conf/redis-7000.conf --daemonize yes +redis-server ./ci/redis-conf/redis-7001.conf --daemonize yes +redis-server ./ci/redis-conf/redis-7002.conf --daemonize yes + +echo "yes" | redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 + +sqllogictest -p 4566 -d dev './e2e_test/sink/redis_cluster_sink.slt' + +redis-cli -c --cluster call 127.0.0.1:7000 keys \* >> ./query_result_1.txt + +line_count=$(wc -l < query_result_1.txt) +if [ "$line_count" -eq 4 ]; then + echo "Redis sink check passed" +else + cat ./query_result_1.txt + echo "The output is not as expected." + exit 1 +fi + echo "--- Kill cluster" cargo make ci-kill \ No newline at end of file diff --git a/e2e_test/sink/redis_cluster_sink.slt b/e2e_test/sink/redis_cluster_sink.slt new file mode 100644 index 0000000000000..5d2d84b773367 --- /dev/null +++ b/e2e_test/sink/redis_cluster_sink.slt @@ -0,0 +1,35 @@ +statement ok +CREATE TABLE t6 (v1 int primary key, v2 int); + +statement ok +CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; + +statement ok +CREATE SINK s61 +FROM + mv6 WITH ( + primary_key = 'v1', + connector = 'redis', + redis.url= '["redis://127.0.0.1:7000/","redis://127.0.0.1:7001/","redis://127.0.0.1:7002/"]', +)FORMAT PLAIN ENCODE JSON(force_append_only='true'); + +statement ok +INSERT INTO t6 VALUES (1, 1); + +statement ok +INSERT INTO t6 VALUES (2, 2); + +statement ok +INSERT INTO t6 VALUES (3, 3); + +statement ok +FLUSH; + +statement ok +DROP SINK s61; + +statement ok +DROP MATERIALIZED VIEW mv6; + +statement ok +DROP TABLE t6; \ No newline at end of file diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 0212d08f23f9f..6aa22f60a4614 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -109,7 +109,7 @@ rdkafka = { workspace = true, features = [ "gssapi", "zstd", ] } -redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp"] } +redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] } regex = "1.4" reqwest = { version = "0.12.2", features = ["json"] } risingwave_common = { workspace = true } diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 1a579660ca918..96b524d5b7b21 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -16,11 +16,14 @@ use std::collections::{HashMap, HashSet}; use anyhow::anyhow; use async_trait::async_trait; -use redis::aio::MultiplexedConnection; +use redis::aio::{ConnectionLike, MultiplexedConnection}; +use redis::cluster::ClusterClient; +use redis::cluster_async::ClusterConnection; use redis::{Client as RedisClient, Pipeline}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use serde_derive::Deserialize; +use serde_json::Value; use serde_with::serde_as; use with_options::WithOptions; @@ -46,11 +49,76 @@ pub struct RedisCommon { #[serde(rename = "redis.url")] pub url: String, } +pub enum RedisConn { + // Redis deployed as a cluster, clusters with only one node should also use this conn + Cluster(ClusterConnection), + // Redis is not deployed as a cluster + Single(MultiplexedConnection), +} + +impl ConnectionLike for RedisConn { + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { + match self { + RedisConn::Cluster(conn) => conn.req_packed_command(cmd), + RedisConn::Single(conn) => conn.req_packed_command(cmd), + } + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec> { + match self { + RedisConn::Cluster(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConn::Single(conn) => conn.req_packed_commands(cmd, offset, count), + } + } + + fn get_db(&self) -> i64 { + match self { + RedisConn::Cluster(conn) => conn.get_db(), + RedisConn::Single(conn) => conn.get_db(), + } + } +} impl RedisCommon { - pub(crate) fn build_client(&self) -> ConnectorResult { - let client = RedisClient::open(self.url.clone())?; - Ok(client) + pub async fn build_conn(&self) -> ConnectorResult { + match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) { + Ok(v) => { + if let Value::Array(list) = v { + let list = list + .into_iter() + .map(|s| { + if let Value::String(s) = s { + Ok(s) + } else { + Err(SinkError::Redis( + "redis.url must be array of string".to_string(), + ) + .into()) + } + }) + .collect::>>()?; + + let client = ClusterClient::new(list)?; + Ok(RedisConn::Cluster(client.get_async_connection().await?)) + } else { + Err(SinkError::Redis("redis.url must be array or string".to_string()).into()) + } + } + Err(_) => { + let client = RedisClient::open(self.url.clone())?; + Ok(RedisConn::Single( + client.get_multiplexed_async_connection().await?, + )) + } + } } } #[serde_as] @@ -123,8 +191,7 @@ impl Sink for RedisSink { } async fn validate(&self) -> Result<()> { - let client = self.config.common.build_client()?; - client.get_connection()?; + let _conn = self.config.common.build_conn().await?; let all_set: HashSet = self .schema .fields() @@ -170,14 +237,14 @@ pub struct RedisSinkWriter { struct RedisSinkPayloadWriter { // connection to redis, one per executor - conn: Option, + conn: Option, // the command pipeline for write-commit pipe: Pipeline, } impl RedisSinkPayloadWriter { pub async fn new(config: RedisConfig) -> Result { - let client = config.common.build_client()?; - let conn = Some(client.get_multiplexed_async_connection().await?); + let conn = config.common.build_conn().await?; + let conn = Some(conn); let pipe = redis::pipe(); Ok(Self { conn, pipe })