Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Nov 23, 2023
1 parent ed5b3d2 commit d40cddd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 115 deletions.
120 changes: 7 additions & 113 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ fn check_cycle_for_sink(

visit_sink(session, &reader, &sinks, &sink_catalog, &mut visited_tables)
}

pub async fn handle_create_sink(
handle_args: HandlerArgs,
stmt: CreateSinkStatement,
Expand Down Expand Up @@ -352,10 +351,13 @@ pub async fn handle_create_sink(

let mut graph = build_graph(plan);

graph.parallelism = session
.config()
.get_streaming_parallelism()
.map(|parallelism| Parallelism { parallelism });
graph.parallelism =
session
.config()
.streaming_parallelism()
.map(|parallelism| Parallelism {
parallelism: parallelism.get(),
});

(sink, graph, target_table_catalog)
};
Expand Down Expand Up @@ -449,114 +451,6 @@ pub async fn handle_create_sink(
});
}

let (sink, graph) = {
let context = Rc::new(OptimizerContext::from_handler_args(handle_args));

let (query, mut plan, sink) = gen_sink_plan(&session, context.clone(), stmt)?;

if let Some(replace_table_plan) = &affected_table_change {
let affected_table_catalog = replace_table_plan.table.as_ref().unwrap();

for column in sink.full_columns() {
if column.is_generated() {
return Err(RwError::from(ErrorCode::BindError(
"The sink to table feature for Sinks with generated columns has not been implemented yet."
.to_string(),
)));
}
}

let user_defined_primary_key_table = !(affected_table_catalog.append_only
|| affected_table_catalog.row_id_index.is_some());

if !(user_defined_primary_key_table
|| sink.sink_type == SinkType::AppendOnly
|| sink.sink_type == SinkType::ForceAppendOnly)
{
return Err(RwError::from(ErrorCode::BindError(
"Only append-only sinks can sink to a table without primary keys.".to_string(),
)));
}

let mut exprs = vec![];

let table_columns = affected_table_catalog
.get_columns()
.iter()
.map(|col| ColumnCatalog::from(col.clone()))
.collect_vec();

let sink_visible_columns = sink
.full_columns()
.iter()
.enumerate()
.filter(|(_i, c)| !c.is_hidden())
.collect_vec();

for (idx, table_column) in table_columns.iter().enumerate() {
if table_column.is_generated() {
continue;
}

let data_type = table_column.data_type();

if idx < sink_visible_columns.len() {
let (sink_col_idx, sink_column) = sink_visible_columns[idx];

let sink_col_type = sink_column.data_type();

if data_type != sink_col_type {
bail!(
"column type mismatch: {:?} vs {:?}",
data_type,
sink_col_type
);
} else {
exprs.push(ExprImpl::InputRef(Box::new(InputRef::new(
sink_col_idx,
data_type.clone(),
))));
}
} else {
exprs.push(ExprImpl::Literal(Box::new(Literal::new(
None,
data_type.clone(),
))));
};
}

let logical_project = generic::Project::new(exprs, plan);

plan = StreamProject::new(logical_project).into();

let exprs = LogicalSource::derive_output_exprs_from_generated_columns(&table_columns)?;
if let Some(exprs) = exprs {
let logical_project = generic::Project::new(exprs, plan);
plan = StreamProject::new(logical_project).into();
}
};

let has_order_by = !query.order_by.is_empty();
if has_order_by {
context.warn_to_user(
r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
.to_string(),
);
}

let mut graph = build_graph(plan);

graph.parallelism =
session
.config()
.streaming_parallelism()
.map(|parallelism| Parallelism {
parallelism: parallelism.get(),
});

(sink, graph)
};

let _job_guard =
session
.env()
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,10 @@ pub async fn generate_stream_graph_for_table(
let graph = StreamFragmentGraph {
parallelism: session
.config()
.get_streaming_parallelism()
.map(|parallelism| Parallelism { parallelism }),
.streaming_parallelism()
.map(|parallelism| Parallelism {
parallelism: parallelism.get(),
}),
..build_graph(plan)
};

Expand Down

0 comments on commit d40cddd

Please sign in to comment.