From 8deba24a650f0f4e80188405e055154861def367 Mon Sep 17 00:00:00 2001 From: xfz <73645462+xuefengze@users.noreply.github.com> Date: Wed, 22 Nov 2023 10:47:27 +0800 Subject: [PATCH] test: add `cassandra-and-scylladb-sink` integration test to the integration test workflow (#13577) --- ci/workflows/integration-tests.yml | 5 ++ .../README.md | 19 ++---- .../create_mv.sql | 0 .../create_sink.sql | 23 +++++++ .../create_source.sql | 0 .../docker-compose.yml | 11 ++-- .../prepare_cassandra_and_scylladb.sql | 7 +++ .../cassandra-and-scylladb-sink/sink_check | 1 + .../create_sink.sql | 11 ---- integration_tests/scripts/check_data.py | 2 +- integration_tests/scripts/run_demos.py | 60 +++++++++++++++++++ 11 files changed, 107 insertions(+), 32 deletions(-) rename integration_tests/{cassandra-and-syclladb-sink => cassandra-and-scylladb-sink}/README.md (76%) rename integration_tests/{cassandra-and-syclladb-sink => cassandra-and-scylladb-sink}/create_mv.sql (100%) create mode 100644 integration_tests/cassandra-and-scylladb-sink/create_sink.sql rename integration_tests/{cassandra-and-syclladb-sink => cassandra-and-scylladb-sink}/create_source.sql (100%) rename integration_tests/{cassandra-and-syclladb-sink => cassandra-and-scylladb-sink}/docker-compose.yml (86%) create mode 100644 integration_tests/cassandra-and-scylladb-sink/prepare_cassandra_and_scylladb.sql create mode 100644 integration_tests/cassandra-and-scylladb-sink/sink_check delete mode 100644 integration_tests/cassandra-and-syclladb-sink/create_sink.sql diff --git a/ci/workflows/integration-tests.yml b/ci/workflows/integration-tests.yml index afada69aa632d..cff7767812e29 100644 --- a/ci/workflows/integration-tests.yml +++ b/ci/workflows/integration-tests.yml @@ -115,6 +115,7 @@ steps: - "clickhouse-sink" - "cockroach-sink" - "kafka-cdc-sink" + - "cassandra-and-scylladb-sink" format: - "json" - "protobuf" @@ -159,3 +160,7 @@ steps: testcase: "kafka-cdc-sink" format: "protobuf" skip: true + - with: + testcase: "cassandra-and-scylladb-sink" + format: "protobuf" + skip: true diff --git a/integration_tests/cassandra-and-syclladb-sink/README.md b/integration_tests/cassandra-and-scylladb-sink/README.md similarity index 76% rename from integration_tests/cassandra-and-syclladb-sink/README.md rename to integration_tests/cassandra-and-scylladb-sink/README.md index b022c9ef09cf8..73361f2b68a8b 100644 --- a/integration_tests/cassandra-and-syclladb-sink/README.md +++ b/integration_tests/cassandra-and-scylladb-sink/README.md @@ -1,19 +1,8 @@ # Demo: Sinking to Cassandra/Scylladb -In this demo, we want to showcase how RisingWave is able to sink data to Cassandra. +In this demo, we want to showcase how RisingWave is able to sink data to Cassandra and scylladb. -1. Set the compose profile accordingly: -Demo with Apache Cassandra: -``` -export COMPOSE_PROFILES=cassandra -``` - -Demo with Scylladb -``` -export COMPOSE_PROFILES=scylladb -``` - -2. Launch the cluster: +1. Launch the cluster: ```sh docker-compose up -d @@ -22,7 +11,7 @@ docker-compose up -d The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink. -3. Create the Cassandra table via cqlsh: +2. Create the Cassandra table via cqlsh: Login to cqlsh ```sh @@ -61,5 +50,5 @@ docker compose exec scylladb cqlsh Run the following query ```sql -select user_id, count(*) from my_keyspace.demo_test group by user_id; +select user_id, count(*) from demo.demo_bhv_table group by user_id; ``` diff --git a/integration_tests/cassandra-and-syclladb-sink/create_mv.sql b/integration_tests/cassandra-and-scylladb-sink/create_mv.sql similarity index 100% rename from integration_tests/cassandra-and-syclladb-sink/create_mv.sql rename to integration_tests/cassandra-and-scylladb-sink/create_mv.sql diff --git a/integration_tests/cassandra-and-scylladb-sink/create_sink.sql b/integration_tests/cassandra-and-scylladb-sink/create_sink.sql new file mode 100644 index 0000000000000..f3d0982a9bafe --- /dev/null +++ b/integration_tests/cassandra-and-scylladb-sink/create_sink.sql @@ -0,0 +1,23 @@ +CREATE SINK bhv_cassandra_sink +FROM + bhv_mv WITH ( + connector = 'cassandra', + type = 'append-only', + force_append_only='true', + cassandra.url = 'cassandra:9042', + cassandra.keyspace = 'demo', + cassandra.table = 'demo_bhv_table', + cassandra.datacenter = 'datacenter1', +); + +CREATE SINK bhv_scylla_sink +FROM + bhv_mv WITH ( + connector = 'cassandra', + type = 'append-only', + force_append_only='true', + cassandra.url = 'scylladb:9042', + cassandra.keyspace = 'demo', + cassandra.table = 'demo_bhv_table', + cassandra.datacenter = 'datacenter1', +); diff --git a/integration_tests/cassandra-and-syclladb-sink/create_source.sql b/integration_tests/cassandra-and-scylladb-sink/create_source.sql similarity index 100% rename from integration_tests/cassandra-and-syclladb-sink/create_source.sql rename to integration_tests/cassandra-and-scylladb-sink/create_source.sql diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml b/integration_tests/cassandra-and-scylladb-sink/docker-compose.yml similarity index 86% rename from integration_tests/cassandra-and-syclladb-sink/docker-compose.yml rename to integration_tests/cassandra-and-scylladb-sink/docker-compose.yml index 52c00117ef3d0..2b7cd47ce98b5 100644 --- a/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml +++ b/integration_tests/cassandra-and-scylladb-sink/docker-compose.yml @@ -7,16 +7,17 @@ services: - 9042:9042 environment: - CASSANDRA_CLUSTER_NAME=cloudinfra - profiles: - - cassandra + volumes: + - "./prepare_cassandra_and_scylladb.sql:/prepare_cassandra_and_scylladb.sql" scylladb: image: scylladb/scylla:5.1 + # port 9042 is used by cassandra ports: - - 9042:9042 + - 9041:9042 environment: - CASSANDRA_CLUSTER_NAME=cloudinfra - profiles: - - scylladb + volumes: + - "./prepare_cassandra_and_scylladb.sql:/prepare_cassandra_and_scylladb.sql" compactor-0: extends: file: ../../docker/docker-compose.yml diff --git a/integration_tests/cassandra-and-scylladb-sink/prepare_cassandra_and_scylladb.sql b/integration_tests/cassandra-and-scylladb-sink/prepare_cassandra_and_scylladb.sql new file mode 100644 index 0000000000000..1c221771c2e4c --- /dev/null +++ b/integration_tests/cassandra-and-scylladb-sink/prepare_cassandra_and_scylladb.sql @@ -0,0 +1,7 @@ +CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; +use demo; +CREATE table demo_bhv_table( + user_id int primary key, + target_id text, + event_timestamp timestamp, +); diff --git a/integration_tests/cassandra-and-scylladb-sink/sink_check b/integration_tests/cassandra-and-scylladb-sink/sink_check new file mode 100644 index 0000000000000..49a88f8df2245 --- /dev/null +++ b/integration_tests/cassandra-and-scylladb-sink/sink_check @@ -0,0 +1 @@ +demo.demo_bhv_table diff --git a/integration_tests/cassandra-and-syclladb-sink/create_sink.sql b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql deleted file mode 100644 index 724e784694c2f..0000000000000 --- a/integration_tests/cassandra-and-syclladb-sink/create_sink.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE SINK bhv_cassandra_sink -FROM - bhv_mv WITH ( - connector = 'cassandra', - type = 'append-only', - force_append_only='true', - cassandra.url = 'cassandra:9042', - cassandra.keyspace = 'demo', - cassandra.table = 'demo_bhv_table', - cassandra.datacenter = 'datacenter1', -); \ No newline at end of file diff --git a/integration_tests/scripts/check_data.py b/integration_tests/scripts/check_data.py index 96d53825a596e..4888deaab0618 100644 --- a/integration_tests/scripts/check_data.py +++ b/integration_tests/scripts/check_data.py @@ -54,7 +54,7 @@ def run_psql(sql): demo = sys.argv[1] upstream = sys.argv[2] # mysql, postgres, etc. see scripts/integration_tests.sh -if demo in ['docker', 'iceberg-sink','clickhouse-sink', 'iceberg-cdc', 'kafka-cdc-sink']: +if demo in ['docker', 'iceberg-sink','clickhouse-sink', 'iceberg-cdc', 'kafka-cdc-sink', 'cassandra-and-scylladb-sink']: print('Skip for running test for `%s`' % demo) sys.exit(0) diff --git a/integration_tests/scripts/run_demos.py b/integration_tests/scripts/run_demos.py index d2fac6537fa0e..5487ec6b40009 100644 --- a/integration_tests/scripts/run_demos.py +++ b/integration_tests/scripts/run_demos.py @@ -172,6 +172,64 @@ def run_clickhouse_demo(): assert len(output_content.strip()) > 0 +def run_cassandra_and_scylladb_sink_demo(): + demo = "cassandra-and-scylladb-sink" + file_dir = dirname(abspath(__file__)) + project_dir = dirname(file_dir) + demo_dir = os.path.join(project_dir, demo) + print("Running demo: {}".format(demo)) + + subprocess.run(["docker", "compose", "up", "-d", "--build"], cwd=demo_dir, check=True) + print("wait two min for cassandra and scylladb to start up") + sleep(120) + + dbs = ['cassandra', 'scylladb'] + for db in dbs: + subprocess.run(["docker", "compose", "exec", db, "cqlsh", "-f", "prepare_cassandra_and_scylladb.sql"], cwd=demo_dir, check=True) + + sql_files = ['create_source.sql', 'create_mv.sql', 'create_sink.sql'] + for fname in sql_files: + sql_file = os.path.join(demo_dir, fname) + print("executing sql: ", open(sql_file).read()) + run_sql_file(sql_file, demo_dir) + + print("sink created. Wait for 1 min time for ingestion") + + # wait for one minutes ingestion + sleep(60) + + sink_check_file = os.path.join(demo_dir, 'sink_check') + with open(sink_check_file) as f: + relations = f.read().strip().split(",") + failed_cases = [] + for rel in relations: + sql = 'select count(*) from {};'.format(rel) + for db in dbs: + print("Running SQL: {} on {}".format(sql, db)) + query_output_file_name = os.path.join(demo_dir, "query_{}_outout.txt".format(db)) + query_output_file = open(query_output_file_name, "wb+") + + command = "docker compose exec scylladb cqlsh -e" + subprocess.run(["docker", "compose", "exec", db, "cqlsh", "-e", sql], cwd=demo_dir, check=True, stdout=query_output_file) + + # output file: + # + # count + # ------- + # 1000 + # + # (1 rows) + query_output_file.seek(0) + lines = query_output_file.readlines() + query_output_file.close() + assert len(lines) >= 6 + assert lines[1].decode('utf-8').strip().lower() == 'count' + rows = int(lines[3].decode('utf-8').strip()) + print("{} rows in {}.{}".format(rows, db, rel)) + if rows < 1: + failed_cases.append(db + "_" + rel) + if len(failed_cases) != 0: + raise Exception("Data check failed for case {}".format(failed_cases)) arg_parser = argparse.ArgumentParser(description='Run the demo') arg_parser.add_argument('--format', @@ -199,5 +257,7 @@ def run_clickhouse_demo(): iceberg_cdc_demo() elif args.case == "kafka-cdc-sink": run_kafka_cdc_demo() +elif args.case == "cassandra-and-scylladb-sink": + run_cassandra_and_scylladb_sink_demo() else: run_demo(args.case, args.format)