diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 37ed893b19b8c..5d999fdd29d4c 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -319,7 +319,6 @@ fn check_cycle_for_sink( visit_sink(session, &reader, &sinks, &sink_catalog, &mut visited_tables) } - pub async fn handle_create_sink( handle_args: HandlerArgs, stmt: CreateSinkStatement, @@ -352,10 +351,13 @@ pub async fn handle_create_sink( let mut graph = build_graph(plan); - graph.parallelism = session - .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }); + graph.parallelism = + session + .config() + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }); (sink, graph, target_table_catalog) }; @@ -449,114 +451,6 @@ pub async fn handle_create_sink( }); } - let (sink, graph) = { - let context = Rc::new(OptimizerContext::from_handler_args(handle_args)); - - let (query, mut plan, sink) = gen_sink_plan(&session, context.clone(), stmt)?; - - if let Some(replace_table_plan) = &affected_table_change { - let affected_table_catalog = replace_table_plan.table.as_ref().unwrap(); - - for column in sink.full_columns() { - if column.is_generated() { - return Err(RwError::from(ErrorCode::BindError( - "The sink to table feature for Sinks with generated columns has not been implemented yet." - .to_string(), - ))); - } - } - - let user_defined_primary_key_table = !(affected_table_catalog.append_only - || affected_table_catalog.row_id_index.is_some()); - - if !(user_defined_primary_key_table - || sink.sink_type == SinkType::AppendOnly - || sink.sink_type == SinkType::ForceAppendOnly) - { - return Err(RwError::from(ErrorCode::BindError( - "Only append-only sinks can sink to a table without primary keys.".to_string(), - ))); - } - - let mut exprs = vec![]; - - let table_columns = affected_table_catalog - .get_columns() - .iter() - .map(|col| ColumnCatalog::from(col.clone())) - .collect_vec(); - - let sink_visible_columns = sink - .full_columns() - .iter() - .enumerate() - .filter(|(_i, c)| !c.is_hidden()) - .collect_vec(); - - for (idx, table_column) in table_columns.iter().enumerate() { - if table_column.is_generated() { - continue; - } - - let data_type = table_column.data_type(); - - if idx < sink_visible_columns.len() { - let (sink_col_idx, sink_column) = sink_visible_columns[idx]; - - let sink_col_type = sink_column.data_type(); - - if data_type != sink_col_type { - bail!( - "column type mismatch: {:?} vs {:?}", - data_type, - sink_col_type - ); - } else { - exprs.push(ExprImpl::InputRef(Box::new(InputRef::new( - sink_col_idx, - data_type.clone(), - )))); - } - } else { - exprs.push(ExprImpl::Literal(Box::new(Literal::new( - None, - data_type.clone(), - )))); - }; - } - - let logical_project = generic::Project::new(exprs, plan); - - plan = StreamProject::new(logical_project).into(); - - let exprs = LogicalSource::derive_output_exprs_from_generated_columns(&table_columns)?; - if let Some(exprs) = exprs { - let logical_project = generic::Project::new(exprs, plan); - plan = StreamProject::new(logical_project).into(); - } - }; - - let has_order_by = !query.order_by.is_empty(); - if has_order_by { - context.warn_to_user( - r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."# - .to_string(), - ); - } - - let mut graph = build_graph(plan); - - graph.parallelism = - session - .config() - .streaming_parallelism() - .map(|parallelism| Parallelism { - parallelism: parallelism.get(), - }); - - (sink, graph) - }; - let _job_guard = session .env() diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 6eb372557447d..a2b8f576a0716 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1139,8 +1139,10 @@ pub async fn generate_stream_graph_for_table( let graph = StreamFragmentGraph { parallelism: session .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }), + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }), ..build_graph(plan) };