Skip to content

Commit

Permalink
test: add cassandra-and-scylladb-sink integration test to the integ…
Browse files Browse the repository at this point in the history
…ration test workflow (#13577)
  • Loading branch information
xuefengze authored Nov 22, 2023
1 parent 710a01e commit 8deba24
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 32 deletions.
5 changes: 5 additions & 0 deletions ci/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ steps:
- "clickhouse-sink"
- "cockroach-sink"
- "kafka-cdc-sink"
- "cassandra-and-scylladb-sink"
format:
- "json"
- "protobuf"
Expand Down Expand Up @@ -159,3 +160,7 @@ steps:
testcase: "kafka-cdc-sink"
format: "protobuf"
skip: true
- with:
testcase: "cassandra-and-scylladb-sink"
format: "protobuf"
skip: true
Original file line number Diff line number Diff line change
@@ -1,19 +1,8 @@
# Demo: Sinking to Cassandra/Scylladb

In this demo, we want to showcase how RisingWave is able to sink data to Cassandra.
In this demo, we want to showcase how RisingWave is able to sink data to Cassandra and scylladb.

1. Set the compose profile accordingly:
Demo with Apache Cassandra:
```
export COMPOSE_PROFILES=cassandra
```

Demo with Scylladb
```
export COMPOSE_PROFILES=scylladb
```

2. Launch the cluster:
1. Launch the cluster:

```sh
docker-compose up -d
Expand All @@ -22,7 +11,7 @@ docker-compose up -d
The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink.


3. Create the Cassandra table via cqlsh:
2. Create the Cassandra table via cqlsh:

Login to cqlsh
```sh
Expand Down Expand Up @@ -61,5 +50,5 @@ docker compose exec scylladb cqlsh

Run the following query
```sql
select user_id, count(*) from my_keyspace.demo_test group by user_id;
select user_id, count(*) from demo.demo_bhv_table group by user_id;
```
23 changes: 23 additions & 0 deletions integration_tests/cassandra-and-scylladb-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE SINK bhv_cassandra_sink
FROM
bhv_mv WITH (
connector = 'cassandra',
type = 'append-only',
force_append_only='true',
cassandra.url = 'cassandra:9042',
cassandra.keyspace = 'demo',
cassandra.table = 'demo_bhv_table',
cassandra.datacenter = 'datacenter1',
);

CREATE SINK bhv_scylla_sink
FROM
bhv_mv WITH (
connector = 'cassandra',
type = 'append-only',
force_append_only='true',
cassandra.url = 'scylladb:9042',
cassandra.keyspace = 'demo',
cassandra.table = 'demo_bhv_table',
cassandra.datacenter = 'datacenter1',
);
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ services:
- 9042:9042
environment:
- CASSANDRA_CLUSTER_NAME=cloudinfra
profiles:
- cassandra
volumes:
- "./prepare_cassandra_and_scylladb.sql:/prepare_cassandra_and_scylladb.sql"
scylladb:
image: scylladb/scylla:5.1
# port 9042 is used by cassandra
ports:
- 9042:9042
- 9041:9042
environment:
- CASSANDRA_CLUSTER_NAME=cloudinfra
profiles:
- scylladb
volumes:
- "./prepare_cassandra_and_scylladb.sql:/prepare_cassandra_and_scylladb.sql"
compactor-0:
extends:
file: ../../docker/docker-compose.yml
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
use demo;
CREATE table demo_bhv_table(
user_id int primary key,
target_id text,
event_timestamp timestamp,
);
1 change: 1 addition & 0 deletions integration_tests/cassandra-and-scylladb-sink/sink_check
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
demo.demo_bhv_table
11 changes: 0 additions & 11 deletions integration_tests/cassandra-and-syclladb-sink/create_sink.sql

This file was deleted.

2 changes: 1 addition & 1 deletion integration_tests/scripts/check_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def run_psql(sql):

demo = sys.argv[1]
upstream = sys.argv[2] # mysql, postgres, etc. see scripts/integration_tests.sh
if demo in ['docker', 'iceberg-sink','clickhouse-sink', 'iceberg-cdc', 'kafka-cdc-sink']:
if demo in ['docker', 'iceberg-sink','clickhouse-sink', 'iceberg-cdc', 'kafka-cdc-sink', 'cassandra-and-scylladb-sink']:
print('Skip for running test for `%s`' % demo)
sys.exit(0)

Expand Down
60 changes: 60 additions & 0 deletions integration_tests/scripts/run_demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,64 @@ def run_clickhouse_demo():

assert len(output_content.strip()) > 0

def run_cassandra_and_scylladb_sink_demo():
demo = "cassandra-and-scylladb-sink"
file_dir = dirname(abspath(__file__))
project_dir = dirname(file_dir)
demo_dir = os.path.join(project_dir, demo)
print("Running demo: {}".format(demo))

subprocess.run(["docker", "compose", "up", "-d", "--build"], cwd=demo_dir, check=True)
print("wait two min for cassandra and scylladb to start up")
sleep(120)

dbs = ['cassandra', 'scylladb']
for db in dbs:
subprocess.run(["docker", "compose", "exec", db, "cqlsh", "-f", "prepare_cassandra_and_scylladb.sql"], cwd=demo_dir, check=True)

sql_files = ['create_source.sql', 'create_mv.sql', 'create_sink.sql']
for fname in sql_files:
sql_file = os.path.join(demo_dir, fname)
print("executing sql: ", open(sql_file).read())
run_sql_file(sql_file, demo_dir)

print("sink created. Wait for 1 min time for ingestion")

# wait for one minutes ingestion
sleep(60)

sink_check_file = os.path.join(demo_dir, 'sink_check')
with open(sink_check_file) as f:
relations = f.read().strip().split(",")
failed_cases = []
for rel in relations:
sql = 'select count(*) from {};'.format(rel)
for db in dbs:
print("Running SQL: {} on {}".format(sql, db))
query_output_file_name = os.path.join(demo_dir, "query_{}_outout.txt".format(db))
query_output_file = open(query_output_file_name, "wb+")

command = "docker compose exec scylladb cqlsh -e"
subprocess.run(["docker", "compose", "exec", db, "cqlsh", "-e", sql], cwd=demo_dir, check=True, stdout=query_output_file)

# output file:
#
# count
# -------
# 1000
#
# (1 rows)
query_output_file.seek(0)
lines = query_output_file.readlines()
query_output_file.close()
assert len(lines) >= 6
assert lines[1].decode('utf-8').strip().lower() == 'count'
rows = int(lines[3].decode('utf-8').strip())
print("{} rows in {}.{}".format(rows, db, rel))
if rows < 1:
failed_cases.append(db + "_" + rel)
if len(failed_cases) != 0:
raise Exception("Data check failed for case {}".format(failed_cases))

arg_parser = argparse.ArgumentParser(description='Run the demo')
arg_parser.add_argument('--format',
Expand Down Expand Up @@ -199,5 +257,7 @@ def run_clickhouse_demo():
iceberg_cdc_demo()
elif args.case == "kafka-cdc-sink":
run_kafka_cdc_demo()
elif args.case == "cassandra-and-scylladb-sink":
run_cassandra_and_scylladb_sink_demo()
else:
run_demo(args.case, args.format)

0 comments on commit 8deba24

Please sign in to comment.