From 45621e3509a5f19bf74866d2a2191f0b0cdb80f7 Mon Sep 17 00:00:00 2001 From: joe Date: Mon, 9 Dec 2024 18:08:21 +0800 Subject: [PATCH] chore: find out ref columns in subquery --- qurious/src/common/table_schema.rs | 9 +- qurious/src/common/transformed.rs | 6 + qurious/src/logical/expr/aggregate.rs | 1 + qurious/src/logical/expr/column.rs | 6 +- qurious/src/logical/expr/mod.rs | 27 +++- qurious/src/logical/plan/mod.rs | 74 +++++++++- qurious/src/logical/plan/projection.rs | 25 ++-- qurious/src/optimizer/mod.rs | 2 +- .../optimizer/pushdown_filter_inner_join.rs | 11 +- qurious/src/optimizer/type_coercion.rs | 126 ++++++++++++++---- qurious/src/planner/mod.rs | 4 +- qurious/src/planner/sql.rs | 61 +++++---- 12 files changed, 271 insertions(+), 81 deletions(-) diff --git a/qurious/src/common/table_schema.rs b/qurious/src/common/table_schema.rs index d210282..6d82e15 100644 --- a/qurious/src/common/table_schema.rs +++ b/qurious/src/common/table_schema.rs @@ -17,6 +17,13 @@ pub struct TableSchema { } impl TableSchema { + pub fn new(field_qualifiers: Vec>, schema: SchemaRef) -> Self { + Self { + field_qualifiers, + schema, + } + } + pub fn try_from_qualified_schema(relation: impl Into, schema: SchemaRef) -> Result { Ok(Self { field_qualifiers: vec![Some(relation.into()); schema.fields().len()], @@ -48,7 +55,7 @@ impl TableSchema { .fields() .iter() .zip(self.field_qualifiers.iter()) - .map(|(f, q)| Column::new(f.name(), q.clone())) + .map(|(f, q)| Column::new(f.name(), q.clone(), false)) .collect() } } diff --git a/qurious/src/common/transformed.rs b/qurious/src/common/transformed.rs index 0555cd0..b32f18e 100644 --- a/qurious/src/common/transformed.rs +++ b/qurious/src/common/transformed.rs @@ -117,3 +117,9 @@ fn apply_impl<'n, N: TransformNode, F: FnMut(&'n N) -> Result ) -> Result { f(node)?.visit_children(|| node.apply_children(|c| apply_impl(c, f))) } + +pub trait TreeNodeContainer<'a, T: 'a> { + fn apply(&'a self, f: F) -> Result + where + F: FnMut(&'a T) -> Result; +} diff --git a/qurious/src/logical/expr/aggregate.rs b/qurious/src/logical/expr/aggregate.rs index 4153737..de24ec6 100644 --- a/qurious/src/logical/expr/aggregate.rs +++ b/qurious/src/logical/expr/aggregate.rs @@ -115,6 +115,7 @@ impl AggregateExpr { LogicalExpr::Column(Column { name: format!("{}({})", self.op, inner_col), relation: None, + is_outer_ref: false, }) }) } diff --git a/qurious/src/logical/expr/column.rs b/qurious/src/logical/expr/column.rs index 72f39eb..70099b4 100644 --- a/qurious/src/logical/expr/column.rs +++ b/qurious/src/logical/expr/column.rs @@ -15,13 +15,15 @@ use super::LogicalExpr; pub struct Column { pub name: String, pub relation: Option, + pub is_outer_ref: bool, } impl Column { - pub fn new(name: impl Into, relation: Option>) -> Self { + pub fn new(name: impl Into, relation: Option>, is_outer_ref: bool) -> Self { Self { name: name.into(), relation: relation.map(|r| r.into()), + is_outer_ref, } } @@ -58,6 +60,7 @@ impl FromStr for Column { Ok(Self { name: s.to_string(), relation: None, + is_outer_ref: false, }) } } @@ -66,5 +69,6 @@ pub fn column(name: &str) -> LogicalExpr { LogicalExpr::Column(Column { name: name.to_string(), relation: None, + is_outer_ref: false, }) } diff --git a/qurious/src/logical/expr/mod.rs b/qurious/src/logical/expr/mod.rs index 60c5f62..1dde6dc 100644 --- a/qurious/src/logical/expr/mod.rs +++ b/qurious/src/logical/expr/mod.rs @@ -44,7 +44,7 @@ pub enum LogicalExpr { IsNotNull(Box), Like(Like), Negative(Box), - SubQuery(Box), + SubQuery(SubQuery), } macro_rules! impl_logical_expr_methods { @@ -94,7 +94,7 @@ impl Display for LogicalExpr { LogicalExpr::Function(function) => write!(f, "{function}",), LogicalExpr::IsNull(logical_expr) => write!(f, "{} IS NULL", logical_expr), LogicalExpr::IsNotNull(logical_expr) => write!(f, "{} IS NOT NULLni", logical_expr), - LogicalExpr::SubQuery(logical_plan) => write!(f, "(\n{})\n", utils::format(logical_plan, 5)), + LogicalExpr::SubQuery(subquery) => write!(f, "(\n{})\n", utils::format(&subquery.subquery, 5)), LogicalExpr::Like(like) => { if like.negated { write!(f, "{} NOT LIKE {}", like.expr, like.pattern) @@ -107,6 +107,14 @@ impl Display for LogicalExpr { } impl LogicalExpr { + pub fn qualified_name(&self) -> Option { + match self { + LogicalExpr::Column(column) => column.relation.clone(), + LogicalExpr::Alias(alias) => Some(alias.name.clone().into()), + _ => None, + } + } + pub fn rebase_expr(self, base_exprs: &[&LogicalExpr]) -> Result { self.transform(|nested_expr| { if base_exprs.contains(&&nested_expr) { @@ -162,7 +170,7 @@ impl LogicalExpr { LogicalExpr::Column(_) => Ok(self.clone()), LogicalExpr::AggregateExpr(agg) => agg.as_column(), LogicalExpr::Literal(_) | LogicalExpr::Wildcard | LogicalExpr::BinaryExpr(_) => Ok(LogicalExpr::Column( - Column::new(format!("{}", self), None::), + Column::new(format!("{}", self), None::, false), )), _ => Err(Error::InternalError(format!("Expect column, got {:?}", self))), } @@ -196,7 +204,7 @@ impl LogicalExpr { LogicalExpr::AggregateExpr(AggregateExpr { op, expr }) => op.infer_type(&expr.data_type(schema)?), LogicalExpr::SortExpr(SortExpr { expr, .. }) | LogicalExpr::Negative(expr) => expr.data_type(schema), LogicalExpr::Like(_) | LogicalExpr::IsNull(_) | LogicalExpr::IsNotNull(_) => Ok(DataType::Boolean), - LogicalExpr::SubQuery(plan) => Ok(plan.schema().fields[0].data_type().clone()), + LogicalExpr::SubQuery(subquery) => Ok(subquery.subquery.schema().fields[0].data_type().clone()), _ => internal_err!("[{}] has no data type", self), } } @@ -252,7 +260,12 @@ impl TransformNode for LogicalExpr { LogicalExpr::IsNull(expr) => f(*expr)?.update(|expr| LogicalExpr::IsNull(Box::new(expr))), LogicalExpr::IsNotNull(expr) => f(*expr)?.update(|expr| LogicalExpr::IsNotNull(Box::new(expr))), LogicalExpr::Negative(expr) => f(*expr)?.update(|expr| LogicalExpr::Negative(Box::new(expr))), - LogicalExpr::SubQuery(plan) => plan.map_exprs(f)?.update(|plan| LogicalExpr::SubQuery(Box::new(plan))), + LogicalExpr::SubQuery(subquery) => subquery.subquery.map_exprs(f)?.update(|plan| { + LogicalExpr::SubQuery(SubQuery { + subquery: Box::new(plan), + outer_ref_columns: subquery.outer_ref_columns, + }) + }), LogicalExpr::Wildcard | LogicalExpr::Column(_) | LogicalExpr::Literal(_) => Transformed::no(self), LogicalExpr::Like(like) => f(*like.expr)?.update(|expr| { @@ -267,7 +280,7 @@ impl TransformNode for LogicalExpr { fn apply_children<'n, F>(&'n self, mut f: F) -> Result where - F: FnMut(&'n Self) -> Result, + F: FnMut(&'n LogicalExpr) -> Result, { let children = match self { LogicalExpr::BinaryExpr(BinaryExpr { left, right, .. }) => vec![left.as_ref(), right.as_ref()], @@ -312,6 +325,6 @@ pub struct Like { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct SubQuery { - pub subquery: Arc, + pub subquery: Box, pub outer_ref_columns: Vec, } diff --git a/qurious/src/logical/plan/mod.rs b/qurious/src/logical/plan/mod.rs index e790b27..8af0dff 100644 --- a/qurious/src/logical/plan/mod.rs +++ b/qurious/src/logical/plan/mod.rs @@ -24,10 +24,10 @@ pub use sub_query::SubqueryAlias; use arrow::datatypes::SchemaRef; -use super::expr::LogicalExpr; +use super::expr::{Column, LogicalExpr}; use crate::common::table_relation::TableRelation; use crate::common::table_schema::TableSchemaRef; -use crate::common::transformed::{TransformNode, Transformed, TransformedResult}; +use crate::common::transformed::{TransformNode, Transformed, TransformedResult, TreeNodeContainer, TreeNodeRecursion}; use crate::error::Result; #[macro_export] @@ -70,6 +70,36 @@ pub enum LogicalPlan { } impl LogicalPlan { + pub fn outer_ref_columns(&self) -> Result> { + let mut outer_ref_columns = vec![]; + + let mut stack = vec![self]; + + while let Some(plan) = stack.pop() { + match plan.apply_exprs(|expr| { + expr.apply_children(|expr| { + match expr { + LogicalExpr::Column(Column { is_outer_ref: true, .. }) => { + outer_ref_columns.push(expr.clone()); + } + _ => {} + } + + Ok(TreeNodeRecursion::Continue) + }) + })? { + TreeNodeRecursion::Continue => { + if let Some(children) = plan.children() { + stack.extend(children); + } + } + TreeNodeRecursion::Stop => return Ok(outer_ref_columns), + } + } + + Ok(outer_ref_columns) + } + pub fn relation(&self) -> Option { match self { LogicalPlan::TableScan(s) => Some(s.table_name.clone()), @@ -103,6 +133,7 @@ impl LogicalPlan { LogicalPlan::CrossJoin(s) => s.schema.clone(), LogicalPlan::SubqueryAlias(s) => s.schema.clone(), LogicalPlan::Filter(f) => f.input.table_schema(), + LogicalPlan::Projection(p) => p.schema.clone(), _ => todo!("[{}] not implement table_schema", self), } } @@ -124,6 +155,25 @@ impl LogicalPlan { } } + pub fn apply_exprs(&self, mut f: F) -> Result + where + F: FnMut(&LogicalExpr) -> Result, + { + match self { + LogicalPlan::Projection(Projection { exprs, .. }) => exprs.apply(f), + LogicalPlan::Aggregate(Aggregate { + group_expr, aggr_expr, .. + }) => { + group_expr.apply(&mut f)?; + aggr_expr.apply(&mut f)?; + + Ok(TreeNodeRecursion::Continue) + } + LogicalPlan::Filter(Filter { expr, .. }) => f(expr), + _ => Ok(TreeNodeRecursion::Continue), + } + } + pub fn map_exprs(self, mut f: F) -> Result> where F: FnMut(LogicalExpr) -> Result>, @@ -211,14 +261,30 @@ impl TransformNode for LogicalPlan { }) } - fn apply_children<'n, F>(&'n self, _f: F) -> Result + fn apply_children<'n, F>(&'n self, _f: F) -> Result where - F: FnMut(&'n Self) -> Result, + F: FnMut(&'n LogicalPlan) -> Result, { todo!() } } +impl<'a, T: TransformNode + 'a> TreeNodeContainer<'a, T> for Vec { + fn apply(&'a self, mut f: F) -> Result + where + F: FnMut(&'a T) -> Result, + { + for child in self { + match child.apply(&mut f)? { + TreeNodeRecursion::Continue => {} + TreeNodeRecursion::Stop => return Ok(TreeNodeRecursion::Stop), + } + } + + Ok(TreeNodeRecursion::Continue) + } +} + pub fn base_plan(plan: &LogicalPlan) -> &LogicalPlan { match plan { LogicalPlan::Aggregate(Aggregate { input, .. }) => base_plan(&input), diff --git a/qurious/src/logical/plan/projection.rs b/qurious/src/logical/plan/projection.rs index 62035d5..330035e 100644 --- a/qurious/src/logical/plan/projection.rs +++ b/qurious/src/logical/plan/projection.rs @@ -1,5 +1,6 @@ -use arrow::datatypes::{FieldRef, Schema, SchemaRef}; +use arrow::datatypes::{Schema, SchemaRef}; +use crate::common::table_schema::{TableSchema, TableSchemaRef}; use crate::error::Result; use crate::{logical::expr::LogicalExpr, logical::plan::LogicalPlan}; use std::fmt::Display; @@ -7,25 +8,31 @@ use std::sync::Arc; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Projection { - pub schema: SchemaRef, + pub schema: TableSchemaRef, pub input: Box, pub exprs: Vec, } impl Projection { pub fn try_new(input: LogicalPlan, exprs: Vec) -> Result { + let mut field_qualifiers = vec![]; + let mut fields = vec![]; + + for expr in &exprs { + field_qualifiers.push(expr.qualified_name()); + fields.push(expr.field(&input)?); + } + + let schema = TableSchema::new(field_qualifiers, Arc::new(Schema::new(fields))); + Ok(Self { - schema: exprs - .iter() - .map(|f| f.field(&input)) - .collect::>>() - .map(|fields| Arc::new(Schema::new(fields)))?, + schema: Arc::new(schema), input: Box::new(input), exprs, }) } - pub fn try_new_with_schema(input: LogicalPlan, exprs: Vec, schema: SchemaRef) -> Result { + pub fn try_new_with_schema(input: LogicalPlan, exprs: Vec, schema: TableSchemaRef) -> Result { Ok(Self { schema, input: Box::new(input), @@ -34,7 +41,7 @@ impl Projection { } pub fn schema(&self) -> SchemaRef { - self.schema.clone() + self.schema.arrow_schema() } pub fn children(&self) -> Option> { diff --git a/qurious/src/optimizer/mod.rs b/qurious/src/optimizer/mod.rs index f13b86b..012a245 100644 --- a/qurious/src/optimizer/mod.rs +++ b/qurious/src/optimizer/mod.rs @@ -1,6 +1,6 @@ mod count_wildcard_rule; mod pushdown_filter_inner_join; -mod scalar_subquery_to_join; +// mod scalar_subquery_to_join; mod type_coercion; use crate::{error::Result, logical::plan::LogicalPlan}; diff --git a/qurious/src/optimizer/pushdown_filter_inner_join.rs b/qurious/src/optimizer/pushdown_filter_inner_join.rs index f219f33..47e053f 100644 --- a/qurious/src/optimizer/pushdown_filter_inner_join.rs +++ b/qurious/src/optimizer/pushdown_filter_inner_join.rs @@ -8,7 +8,7 @@ use crate::common::join_type::JoinType; use crate::common::transformed::{TransformNode, Transformed, TransformedResult}; use crate::datatypes::operator::Operator; use crate::error::{Error, Result}; -use crate::logical::expr::{BinaryExpr, Column, LogicalExpr}; +use crate::logical::expr::{BinaryExpr, Column, LogicalExpr, SubQuery}; use crate::logical::plan::{CrossJoin, Filter, LogicalPlan}; use crate::logical::LogicalPlanBuilder; @@ -45,8 +45,13 @@ impl OptimizerRule for PushdownFilterInnerJoin { .map_exprs(|expr| { expr.transform(|expr| match expr { LogicalExpr::SubQuery(query) => self - .optimize(*query) - .map(|rewritten_query| LogicalExpr::SubQuery(Box::new(rewritten_query))) + .optimize(*query.subquery) + .map(|rewritten_query| { + LogicalExpr::SubQuery(SubQuery { + subquery: Box::new(rewritten_query), + outer_ref_columns: query.outer_ref_columns, + }) + }) .map(Transformed::yes), _ => Ok(Transformed::no(expr)), }) diff --git a/qurious/src/optimizer/type_coercion.rs b/qurious/src/optimizer/type_coercion.rs index f74556d..f6ed025 100644 --- a/qurious/src/optimizer/type_coercion.rs +++ b/qurious/src/optimizer/type_coercion.rs @@ -86,7 +86,7 @@ fn cast_if_needed(expr: Box, current_type: &DataType, target_type: mod tests { use super::*; use crate::{ - common::table_relation::TableRelation, + common::{table_relation::TableRelation, table_schema::TableSchema}, datatypes::{operator::Operator, scalar::ScalarValue}, logical::{ expr::{AggregateExpr, AggregateOperator, Column}, @@ -110,9 +110,17 @@ mod tests { Field::new("float_col", DataType::Float64, false), ])); let expr = LogicalExpr::BinaryExpr(BinaryExpr { - left: Box::new(LogicalExpr::Column(Column::new("int_col", None::))), + left: Box::new(LogicalExpr::Column(Column::new( + "int_col", + None::, + false, + ))), op: Operator::Add, - right: Box::new(LogicalExpr::Column(Column::new("float_col", None::))), + right: Box::new(LogicalExpr::Column(Column::new( + "float_col", + None::, + false, + ))), }); let plan = LogicalPlan::Projection(Projection { exprs: vec![expr], @@ -120,7 +128,7 @@ mod tests { produce_one_row: true, schema: Arc::new(Schema::empty()), })), - schema: schema.clone(), + schema: Arc::new(TableSchema::new(vec![], schema)), }); assert_analyzed_plan_eq( @@ -139,9 +147,17 @@ mod tests { Field::new("int2", DataType::Int32, false), ])); let expr = LogicalExpr::BinaryExpr(BinaryExpr { - left: Box::new(LogicalExpr::Column(Column::new("int1", None::))), + left: Box::new(LogicalExpr::Column(Column::new( + "int1", + None::, + false, + ))), op: Operator::Add, - right: Box::new(LogicalExpr::Column(Column::new("int2", None::))), + right: Box::new(LogicalExpr::Column(Column::new( + "int2", + None::, + false, + ))), }); let plan = LogicalPlan::Projection(Projection { exprs: vec![expr], @@ -149,7 +165,7 @@ mod tests { produce_one_row: true, schema: Arc::new(Schema::empty()), })), - schema: schema.clone(), + schema: Arc::new(TableSchema::new(vec![], schema)), }); assert_analyzed_plan_eq(plan, "Projection: (int1 + int2)\n Empty Relation\n"); @@ -167,15 +183,27 @@ mod tests { ])); let inner_expr = LogicalExpr::BinaryExpr(BinaryExpr { - left: Box::new(LogicalExpr::Column(Column::new("int_col", None::))), + left: Box::new(LogicalExpr::Column(Column::new( + "int_col", + None::, + false, + ))), op: Operator::Add, - right: Box::new(LogicalExpr::Column(Column::new("float_col", None::))), + right: Box::new(LogicalExpr::Column(Column::new( + "float_col", + None::, + false, + ))), }); let outer_expr = LogicalExpr::BinaryExpr(BinaryExpr { left: Box::new(inner_expr), op: Operator::Mul, - right: Box::new(LogicalExpr::Column(Column::new("double_col", None::))), + right: Box::new(LogicalExpr::Column(Column::new( + "double_col", + None::, + false, + ))), }); let plan = LogicalPlan::Projection(Projection { @@ -184,7 +212,7 @@ mod tests { produce_one_row: true, schema: Arc::new(Schema::empty()), })), - schema: schema.clone(), + schema: Arc::new(TableSchema::new(vec![], schema)), }); assert_analyzed_plan_eq( @@ -206,15 +234,27 @@ mod tests { ])); let expr1 = LogicalExpr::BinaryExpr(BinaryExpr { - left: Box::new(LogicalExpr::Column(Column::new("int16_col", None::))), + left: Box::new(LogicalExpr::Column(Column::new( + "int16_col", + None::, + false, + ))), op: Operator::Add, - right: Box::new(LogicalExpr::Column(Column::new("float64_col", None::))), + right: Box::new(LogicalExpr::Column(Column::new( + "float64_col", + None::, + false, + ))), }); let expr2 = LogicalExpr::BinaryExpr(BinaryExpr { left: Box::new(expr1), op: Operator::Add, - right: Box::new(LogicalExpr::Column(Column::new("float32_col", None::))), + right: Box::new(LogicalExpr::Column(Column::new( + "float32_col", + None::, + false, + ))), }); let plan = LogicalPlan::Projection(Projection { @@ -223,7 +263,7 @@ mod tests { produce_one_row: true, schema: Arc::new(Schema::empty()), })), - schema: schema.clone(), + schema: Arc::new(TableSchema::new(vec![], schema)), }); assert_analyzed_plan_eq( @@ -247,19 +287,31 @@ mod tests { // Create sum(int_col) let sum_expr = LogicalExpr::AggregateExpr(AggregateExpr { op: AggregateOperator::Sum, - expr: Box::new(LogicalExpr::Column(Column::new("int_col", None::))), + expr: Box::new(LogicalExpr::Column(Column::new( + "int_col", + None::, + false, + ))), }); // Create avg(float32_col) let avg_expr = LogicalExpr::AggregateExpr(AggregateExpr { op: AggregateOperator::Avg, - expr: Box::new(LogicalExpr::Column(Column::new("float32_col", None::))), + expr: Box::new(LogicalExpr::Column(Column::new( + "float32_col", + None::, + false, + ))), }); // Create count(double_col) let count_expr = LogicalExpr::AggregateExpr(AggregateExpr { op: AggregateOperator::Count, - expr: Box::new(LogicalExpr::Column(Column::new("double_col", None::))), + expr: Box::new(LogicalExpr::Column(Column::new( + "double_col", + None::, + false, + ))), }); // Create avg * count @@ -282,7 +334,7 @@ mod tests { produce_one_row: true, schema: Arc::new(Schema::empty()), })), - schema: schema.clone(), + schema: Arc::new(TableSchema::new(vec![], schema)), }); assert_analyzed_plan_eq( @@ -306,13 +358,21 @@ mod tests { // Create avg(int32_col) let avg_expr = LogicalExpr::AggregateExpr(AggregateExpr { op: AggregateOperator::Avg, - expr: Box::new(LogicalExpr::Column(Column::new("int32_col", None::))), + expr: Box::new(LogicalExpr::Column(Column::new( + "int32_col", + None::, + false, + ))), }); // Create sum(float32_col) let sum_expr = LogicalExpr::AggregateExpr(AggregateExpr { op: AggregateOperator::Sum, - expr: Box::new(LogicalExpr::Column(Column::new("float32_col", None::))), + expr: Box::new(LogicalExpr::Column(Column::new( + "float32_col", + None::, + false, + ))), }); // Create avg + sum @@ -328,7 +388,7 @@ mod tests { produce_one_row: true, schema: Arc::new(Schema::empty()), })), - schema: schema.clone(), + schema: Arc::new(TableSchema::new(vec![], schema)), }); assert_analyzed_plan_eq( @@ -351,14 +411,22 @@ mod tests { // Create float_col + 42 let add_expr = LogicalExpr::BinaryExpr(BinaryExpr { - left: Box::new(LogicalExpr::Column(Column::new("float_col", None::))), + left: Box::new(LogicalExpr::Column(Column::new( + "float_col", + None::, + false, + ))), op: Operator::Add, right: Box::new(LogicalExpr::Literal(ScalarValue::from(42i32))), }); // Create int_col * 3.14 let mul_expr = LogicalExpr::BinaryExpr(BinaryExpr { - left: Box::new(LogicalExpr::Column(Column::new("int_col", None::))), + left: Box::new(LogicalExpr::Column(Column::new( + "int_col", + None::, + false, + ))), op: Operator::Mul, right: Box::new(LogicalExpr::Literal(ScalarValue::from(3.14))), }); @@ -376,7 +444,7 @@ mod tests { produce_one_row: true, schema: Arc::new(Schema::empty()), })), - schema: schema.clone(), + schema: Arc::new(TableSchema::new(vec![], schema)), }); assert_analyzed_plan_eq( @@ -395,7 +463,11 @@ mod tests { // Create int_col + 1.5 let add_expr = LogicalExpr::BinaryExpr(BinaryExpr { - left: Box::new(LogicalExpr::Column(Column::new("int_col", None::))), + left: Box::new(LogicalExpr::Column(Column::new( + "int_col", + None::, + false, + ))), op: Operator::Add, right: Box::new(LogicalExpr::Literal(ScalarValue::from(1.5f64))), }); @@ -412,7 +484,7 @@ mod tests { produce_one_row: true, schema: Arc::new(Schema::empty()), })), - schema: schema.clone(), + schema: Arc::new(TableSchema::new(vec![], schema)), }); assert_analyzed_plan_eq( diff --git a/qurious/src/planner/mod.rs b/qurious/src/planner/mod.rs index b1033b5..ad6be87 100644 --- a/qurious/src/planner/mod.rs +++ b/qurious/src/planner/mod.rs @@ -90,7 +90,7 @@ impl QueryPlanner for DefaultQueryPlanner { .map(|expr| Arc::new(Negative::new(expr)) as Arc), LogicalExpr::Like(like) => self.physical_expr_like(input_schema, like), LogicalExpr::SubQuery(plan) => self - .create_physical_plan(plan) + .create_physical_plan(&plan.subquery) .map(|plan| Arc::new(physical::expr::SubQuery { plan }) as Arc), _ => unimplemented!("unsupported logical expression: {}", expr), } @@ -110,7 +110,7 @@ impl DefaultQueryPlanner { .collect::>>()?; Ok(Arc::new(physical::plan::Projection::new( - projection.schema.clone(), + projection.schema(), physical_plan, exprs, ))) diff --git a/qurious/src/planner/sql.rs b/qurious/src/planner/sql.rs index a75f2cd..2ecf1e9 100644 --- a/qurious/src/planner/sql.rs +++ b/qurious/src/planner/sql.rs @@ -239,8 +239,9 @@ impl<'a> SqlQueryPlanner<'a> { /// find the relation of the column /// if the column is ambiguous, return an error - fn get_relation(&self, column_name: &str) -> Result> { - for ctx in self.contexts.iter().rev() { + /// return (relation, is_outer_ref) + fn get_relation(&self, column_name: &str) -> Result<(Option, bool)> { + for (i, ctx) in self.contexts.iter().rev().enumerate() { let mut matched = vec![]; for (relation, table_schema) in &ctx.relations { if table_schema.has_field(None, column_name) { @@ -250,21 +251,23 @@ impl<'a> SqlQueryPlanner<'a> { match matched.len() { 0 => continue, - 1 => return Ok(matched.pop()), + // if the column is not in the current context, it is an outer reference + 1 => return Ok((matched.pop(), i > 0)), _ => return internal_err!("Column \"{}\" is ambiguous", column_name,), } } - Ok(None) + Ok((None, false)) } /// find the relation of the table - fn find_relation(&self, table: &TableRelation) -> Option { - self.contexts.iter().rev().find_map(|ctx| { - ctx.relations - .contains_key(table) - .then(|| table.clone()) - .or(ctx.table_aliase.get(&table.to_string()).cloned()) + fn find_relation(&self, table: &TableRelation) -> Option<(TableRelation, bool)> { + self.contexts.iter().rev().enumerate().find_map(|(i, ctx)| { + ctx.relations.contains_key(table).then(|| (table.clone(), i > 0)).or(ctx + .table_aliase + .get(&table.to_string()) + .cloned() + .map(|r| (r, i > 0))) }) } @@ -388,7 +391,11 @@ impl<'a> SqlQueryPlanner<'a> { assign_map .remove(f.name()) .map(|expr| expr.cast_to(f.data_type()).alias(f.name())) - .unwrap_or(LogicalExpr::Column(Column::new(f.name(), Some(relation.clone())))) + .unwrap_or(LogicalExpr::Column(Column::new( + f.name(), + Some(relation.clone()), + false, + ))) }) .collect::>(); @@ -923,23 +930,20 @@ impl<'a> SqlQueryPlanner<'a> { let name = normalize_ident(idents.remove(1)); let relation = idents.remove(0).value.into(); - if self.find_relation(&relation).is_none() { - return internal_err!( - "Column [\"{}\"] not found in table [\"{}\"] or table not exists", - name, - relation - ); + if let Some((relation, is_outer_ref)) = self.find_relation(&relation) { + return Ok(LogicalExpr::Column(Column::new(name, Some(relation), is_outer_ref))); } - Ok(LogicalExpr::Column(Column { + internal_err!( + "Column [\"{}\"] not found in table [\"{}\"] or table not exists", name, - relation: Some(relation), - })) + relation + ) } Expression::Identifier(ident) => { let col_name = normalize_ident(ident); self.get_relation(&col_name) - .map(|relation| LogicalExpr::Column(Column::new(col_name, relation))) + .map(|(relation, is_outer_ref)| LogicalExpr::Column(Column::new(col_name, relation, is_outer_ref))) } Expression::Literal(lit) => match lit { Literal::Int(i) => Ok(LogicalExpr::Literal(ScalarValue::Int64(Some(i)))), @@ -981,9 +985,13 @@ impl<'a> SqlQueryPlanner<'a> { _ => todo!("UnaryOperator: {:?}", expr), }), Expression::SubQuery(sub_query) => self.new_context_scope(|planner| { - planner - .select_to_plan(*sub_query) - .map(|plan| LogicalExpr::SubQuery(Box::new(plan))) + let subquery = planner.select_to_plan(*sub_query)?; + let outer_ref_columns = subquery.outer_ref_columns()?; + + Ok(LogicalExpr::SubQuery(SubQuery { + subquery: Box::new(subquery), + outer_ref_columns, + })) }), Expression::Like { negated, left, right } => Ok(LogicalExpr::Like(Like { negated, @@ -1069,7 +1077,7 @@ impl<'a> SqlQueryPlanner<'a> { // expand schema let quanlified_prefix = idents.join(".").into(); - if self.find_relation(&quanlified_prefix).is_some() { + if let Some((relation, is_outer_ref)) = self.find_relation(&quanlified_prefix) { return plan .schema() .fields() @@ -1077,7 +1085,8 @@ impl<'a> SqlQueryPlanner<'a> { .map(|field| { Ok(LogicalExpr::Column(Column::new( field.name(), - Some(quanlified_prefix.clone()), + Some(relation.clone()), + is_outer_ref, ))) }) .collect();