Skip to content

Commit

Permalink
refactor(cdc): refactor query plan for table-on-source cdc backfill (#…
Browse files Browse the repository at this point in the history
…13553)

Co-authored-by: Noel Kwan <[email protected]>
Co-authored-by: Noel Kwan <[email protected]>
  • Loading branch information
3 people authored Nov 24, 2023
1 parent c036915 commit bf3975f
Show file tree
Hide file tree
Showing 28 changed files with 307 additions and 330 deletions.
9 changes: 4 additions & 5 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,16 @@ echo "--- mysql & postgres cdc validate test"
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.mysql.slt'
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.postgres.slt'

# cdc share stream test cases
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt'

echo "--- mysql & postgres load and check"
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.load.slt'
# wait for cdc loading
sleep 10
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check.slt'

# cdc share stream test cases
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt'


# kill cluster
cargo make kill
echo "cluster killed "
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ select v1, v2, v3 from mytable order by v1;
query I
SELECT * from products_test_cnt
----
11
12

query I
SELECT * from orders_test_cnt
Expand Down
16 changes: 15 additions & 1 deletion e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ create table products_test ( id INT,
PRIMARY KEY (id)
) from mysql_mytest table 'mytest.products';

# check the fragment distribution
query TT
select distribution_type,flags from rw_fragments order by fragment_id;
----
SINGLE {SOURCE}
HASH {MVIEW}
SINGLE {STREAM_SCAN}
SINGLE {CDC_FILTER}
HASH {SOURCE,DML}


statement ok
create table orders_test (
order_id int,
Expand All @@ -51,13 +62,16 @@ create materialized view products_test_cnt as select count(*) as cnt from produc
statement ok
create materialized view orders_test_cnt as select count(*) as cnt from orders_test;

system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Milk', '100ml Milk');"

sleep 5s

# check ingestion results
query I
SELECT * from products_test_cnt
----
9
10

query I
SELECT * from orders_test_cnt
Expand Down
17 changes: 8 additions & 9 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ message FilterNode {
expr.ExprNode search_condition = 1;
}

message CdcFilterNode {
expr.ExprNode search_condition = 1;
uint32 upstream_source_id = 2;
repeated int32 upstream_column_ids = 3;
}

// A materialized view is regarded as a table.
// In addition, we also specify primary key to MV for efficient point lookup during update and deletion.
//
Expand Down Expand Up @@ -725,6 +731,7 @@ message StreamNode {
OverWindowNode over_window = 137;
StreamFsFetchNode stream_fs_fetch = 138;
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down Expand Up @@ -753,11 +760,6 @@ enum DispatcherType {
// piped into the downstream actor, if there are the same number of actors. If number of actors
// are not the same, should use hash instead. Should be only used when distribution is the same.
DISPATCHER_TYPE_NO_SHUFFLE = 4;

// Dispatch by table name from upstream DB, used in CDC scenario which should has only one downstream actor.
// From the optimizer's point of view, it can be treated as a specialized version of HASH distribution
// that the hash key is the upstream table name.
DISPATCHER_TYPE_CDC_TABLENAME = 5;
}

// The property of an edge in the fragment graph.
Expand All @@ -766,8 +768,6 @@ message DispatchStrategy {
DispatcherType type = 1;
repeated uint32 dist_key_indices = 2;
repeated uint32 output_indices = 3;
// The full table name of the downstream CDC table.
optional string downstream_table_name = 4;
}

// A dispatcher redistribute messages.
Expand All @@ -789,8 +789,6 @@ message Dispatcher {
uint64 dispatcher_id = 4;
// Number of downstreams decides how many endpoints a dispatcher should dispatch.
repeated uint32 downstream_actor_id = 5;
// The full table name of the downstream CDC table.
optional string downstream_table_name = 7;
}

// A StreamActor is a running fragment of the overall stream graph,
Expand Down Expand Up @@ -824,6 +822,7 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_BARRIER_RECV = 32;
FRAGMENT_TYPE_FLAG_VALUES = 64;
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
}

// The environment associated with a stream plan
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct CdcTableDesc {
/// Id of the upstream source in sharing cdc mode
pub source_id: TableId,

/// The full name of the table in external database, e.g. `database_name.table.name` in MySQL
/// The full name of the table in external database, e.g. `database_name.table_name` in MySQL
/// and `schema_name.table_name` in the Postgres.
pub external_table_name: String,
/// The key used to sort in storage.
Expand Down
4 changes: 3 additions & 1 deletion src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ pub fn row_id_column_desc() -> ColumnDesc {

pub const OFFSET_COLUMN_NAME: &str = "_rw_offset";

pub const CDC_SOURCE_COLUMN_NUM: u32 = 4;
// The number of columns output by the cdc source job
// see `debezium_cdc_source_schema()` for details
pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
pub const TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";

pub fn is_offset_column_name(name: &str) -> bool {
Expand Down
1 change: 0 additions & 1 deletion src/common/src/util/column_index_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ impl ColIndexMapping {
r#type: strategy.r#type,
dist_key_indices: map(&strategy.dist_key_indices)?,
output_indices: map(&strategy.output_indices)?,
downstream_table_name: strategy.downstream_table_name.clone(),
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl ExprImpl {
///
/// TODO: This is a naive implementation. We should avoid proto ser/de.
/// Tracking issue: <https://github.com/risingwavelabs/risingwave/issues/3479>
async fn eval_row(&self, input: &OwnedRow) -> RwResult<Datum> {
pub async fn eval_row(&self, input: &OwnedRow) -> RwResult<Datum> {
let backend_expr = build_from_prost(&self.to_expr_proto())?;
Ok(backend_expr.eval_row(input).await?)
}
Expand Down
12 changes: 10 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1250,12 +1250,13 @@ pub mod tests {
use std::collections::HashMap;

use risingwave_common::catalog::{
DEFAULT_DATABASE_NAME, DEFAULT_KEY_COLUMN_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME,
ROWID_PREFIX, TABLE_NAME_COLUMN_NAME,
CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_KEY_COLUMN_NAME, DEFAULT_SCHEMA_NAME,
OFFSET_COLUMN_NAME, ROWID_PREFIX, TABLE_NAME_COLUMN_NAME,
};
use risingwave_common::types::DataType;

use crate::catalog::root_catalog::SchemaPath;
use crate::handler::create_source::debezium_cdc_source_schema;
use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA};

#[tokio::test]
Expand Down Expand Up @@ -1341,4 +1342,11 @@ pub mod tests {
};
assert_eq!(columns, expected_columns);
}

#[tokio::test]
async fn test_cdc_source_job_schema() {
let columns = debezium_cdc_source_schema();
// make sure it doesn't broken by future PRs
assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32);
}
}
Loading

0 comments on commit bf3975f

Please sign in to comment.