Skip to content

Commit

Permalink
WIP: 😭 too hard
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Nov 27, 2024
1 parent d2fdb98 commit 2a331a7
Show file tree
Hide file tree
Showing 29 changed files with 1,043 additions and 338 deletions.
2 changes: 1 addition & 1 deletion qurious/src/common/join_type.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Display;

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub enum JoinType {
Left,
Right,
Expand Down
4 changes: 2 additions & 2 deletions qurious/src/datatypes/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ impl Display for Operator {
Operator::GtEq => write!(f, ">="),
Operator::Lt => write!(f, "<"),
Operator::LtEq => write!(f, "<="),
Operator::And => write!(f, "&&"),
Operator::Or => write!(f, "||"),
Operator::And => write!(f, "AND"),
Operator::Or => write!(f, "OR"),
Operator::Add => write!(f, "+"),
Operator::Sub => write!(f, "-"),
Operator::Mul => write!(f, "*"),
Expand Down
10 changes: 7 additions & 3 deletions qurious/src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::common::table_relation::TableRelation;
use crate::datasource::memory::MemoryTable;
use crate::error::Error;
use crate::functions::{all_builtin_functions, UserDefinedFunction};
use crate::internal_err;
use crate::logical::plan::{
CreateMemoryTable, DdlStatement, DmlOperator, DmlStatement, DropTable, Filter, LogicalPlan,
};
Expand All @@ -22,6 +21,7 @@ use crate::provider::schema::SchemaProvider;
use crate::provider::table::TableProvider;
use crate::utils::batch::make_count_batch;
use crate::{error::Result, planner::DefaultQueryPlanner};
use crate::{internal_err, utils};

use crate::execution::providers::CatalogProviderList;

Expand Down Expand Up @@ -92,11 +92,15 @@ impl ExecuteSession {
}

pub fn execute_logical_plan(&self, plan: &LogicalPlan) -> Result<Vec<RecordBatch>> {
let plan = self.optimizer.optimize(plan)?;
match &plan {
LogicalPlan::Ddl(ddl) => self.execute_ddl(ddl),
LogicalPlan::Dml(stmt) => self.execute_dml(stmt),
plan => self.planner.create_physical_plan(plan)?.execute(),
plan => {
println!("before optimize: \n{}", utils::format(&plan, 0));
let plan = self.optimizer.optimize(plan)?;
println!("after optimize: \n{}", utils::format(&plan, 0));
self.planner.create_physical_plan(&plan)?.execute()
}
}
}

Expand Down
51 changes: 45 additions & 6 deletions qurious/src/logical/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::common::table_relation::TableRelation;
use crate::common::transformed::{TransformNode, Transformed, TransformedResult, TreeNodeRecursion};
use crate::datatypes::scalar::ScalarValue;
use crate::error::{Error, Result};
use crate::internal_err;
use crate::logical::plan::LogicalPlan;
use crate::{internal_err, utils};
use arrow::datatypes::{DataType, Field, FieldRef, Schema};

use self::alias::Alias;
Expand All @@ -42,7 +42,9 @@ pub enum LogicalExpr {
Function(Function),
IsNull(Box<LogicalExpr>),
IsNotNull(Box<LogicalExpr>),
Like(Like),
Negative(Box<LogicalExpr>),
SubQuery(Box<LogicalPlan>),
}

macro_rules! impl_logical_expr_methods {
Expand Down Expand Up @@ -91,7 +93,15 @@ impl Display for LogicalExpr {
LogicalExpr::Cast(cast_expr) => write!(f, "CAST({} AS {})", cast_expr.expr, cast_expr.data_type),
LogicalExpr::Function(function) => write!(f, "{function}",),
LogicalExpr::IsNull(logical_expr) => write!(f, "{} IS NULL", logical_expr),
LogicalExpr::IsNotNull(logical_expr) => write!(f, "{} IS NOT NULL", logical_expr),
LogicalExpr::IsNotNull(logical_expr) => write!(f, "{} IS NOT NULLni", logical_expr),
LogicalExpr::SubQuery(logical_plan) => write!(f, "(\n{})\n", utils::format(logical_plan, 5)),
LogicalExpr::Like(like) => {
if like.negated {
write!(f, "{} NOT LIKE {}", like.expr, like.pattern)
} else {
write!(f, "{} LIKE {}", like.expr, like.pattern)
}
}
}
}
}
Expand Down Expand Up @@ -159,7 +169,17 @@ impl LogicalExpr {
}

pub fn column_refs(&self) -> HashSet<&Column> {
todo!()
let mut columns = HashSet::new();

self.apply(|expr| {
if let LogicalExpr::Column(column) = expr {
columns.insert(column);
}
Ok(TreeNodeRecursion::Continue)
})
.expect("[column_refs] failed to apply");

columns
}

pub fn data_type(&self, schema: &Arc<Schema>) -> Result<DataType> {
Expand All @@ -175,8 +195,9 @@ impl LogicalExpr {
LogicalExpr::Function(function) => Ok(function.func.return_type()),
LogicalExpr::AggregateExpr(AggregateExpr { op, expr }) => op.infer_type(&expr.data_type(schema)?),
LogicalExpr::SortExpr(SortExpr { expr, .. }) | LogicalExpr::Negative(expr) => expr.data_type(schema),
LogicalExpr::IsNull(_) | LogicalExpr::IsNotNull(_) => Ok(DataType::Boolean),
LogicalExpr::Wildcard => internal_err!("Wildcard has no data type"),
LogicalExpr::Like(_) | LogicalExpr::IsNull(_) | LogicalExpr::IsNotNull(_) => Ok(DataType::Boolean),
LogicalExpr::SubQuery(plan) => Ok(plan.schema().fields[0].data_type().clone()),
_ => internal_err!("[{}] has no data type", self),
}
}
}
Expand Down Expand Up @@ -231,8 +252,16 @@ impl TransformNode for LogicalExpr {
LogicalExpr::IsNull(expr) => f(*expr)?.update(|expr| LogicalExpr::IsNull(Box::new(expr))),
LogicalExpr::IsNotNull(expr) => f(*expr)?.update(|expr| LogicalExpr::IsNotNull(Box::new(expr))),
LogicalExpr::Negative(expr) => f(*expr)?.update(|expr| LogicalExpr::Negative(Box::new(expr))),
LogicalExpr::SubQuery(plan) => plan.map_exprs(f)?.update(|plan| LogicalExpr::SubQuery(Box::new(plan))),

LogicalExpr::Wildcard | LogicalExpr::Column(_) | LogicalExpr::Literal(_) => Transformed::no(self),
LogicalExpr::Like(like) => f(*like.expr)?.update(|expr| {
LogicalExpr::Like(Like {
negated: like.negated,
expr: Box::new(expr),
pattern: like.pattern,
})
}),
})
}

Expand All @@ -250,7 +279,10 @@ impl TransformNode for LogicalExpr {
| LogicalExpr::IsNull(expr)
| LogicalExpr::IsNotNull(expr)
| LogicalExpr::Alias(Alias { expr, .. }) => vec![expr.as_ref()],
LogicalExpr::Wildcard | LogicalExpr::Column(_) | LogicalExpr::Literal(_) => vec![],
LogicalExpr::SubQuery(_) | LogicalExpr::Wildcard | LogicalExpr::Column(_) | LogicalExpr::Literal(_) => {
vec![]
}
LogicalExpr::Like(like) => vec![like.expr.as_ref(), like.pattern.as_ref()],
};

for expr in children {
Expand All @@ -270,3 +302,10 @@ pub(crate) fn get_expr_value(expr: LogicalExpr) -> Result<i64> {
_ => Err(Error::InternalError(format!("Unexpected expression in"))),
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Like {
pub negated: bool,
pub expr: Box<LogicalExpr>,
pub pattern: Box<LogicalExpr>,
}
2 changes: 1 addition & 1 deletion qurious/src/logical/plan/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::logical::expr::LogicalExpr;
use std::fmt::Display;
use std::sync::Arc;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Aggregate {
pub schema: SchemaRef,
pub input: Box<LogicalPlan>,
Expand Down
6 changes: 3 additions & 3 deletions qurious/src/logical/plan/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow::datatypes::{Schema, SchemaRef};

use crate::{impl_logical_plan, logical::plan::LogicalPlan};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DdlStatement {
CreateMemoryTable(CreateMemoryTable),
DropTable(DropTable),
Expand Down Expand Up @@ -40,7 +40,7 @@ impl Display for DdlStatement {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateMemoryTable {
pub schema: SchemaRef,
pub name: String,
Expand All @@ -49,7 +49,7 @@ pub struct CreateMemoryTable {

impl_logical_plan!(CreateMemoryTable);

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DropTable {
pub name: String,
pub if_exists: bool,
Expand Down
4 changes: 2 additions & 2 deletions qurious/src/logical/plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow::datatypes::SchemaRef;

use crate::{common::table_relation::TableRelation, impl_logical_plan, logical::plan::LogicalPlan};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DmlOperator {
Insert,
Update,
Expand All @@ -21,7 +21,7 @@ impl Display for DmlOperator {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DmlStatement {
pub relation: TableRelation,
pub op: DmlOperator,
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/logical/plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::logical::expr::LogicalExpr;
use crate::logical::plan::LogicalPlan;
use arrow::datatypes::SchemaRef;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Filter {
pub input: Box<LogicalPlan>,
pub expr: LogicalExpr,
Expand Down
4 changes: 2 additions & 2 deletions qurious/src/logical/plan/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
use arrow::datatypes::SchemaRef;
use std::{fmt::Display, sync::Arc};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CrossJoin {
pub left: Arc<LogicalPlan>,
pub right: Arc<LogicalPlan>,
Expand All @@ -32,7 +32,7 @@ impl Display for CrossJoin {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Join {
pub left: Arc<LogicalPlan>,
pub right: Arc<LogicalPlan>,
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/logical/plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow::datatypes::SchemaRef;

use crate::logical::plan::LogicalPlan;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Limit {
pub input: Box<LogicalPlan>,
pub fetch: Option<usize>,
Expand Down
23 changes: 20 additions & 3 deletions qurious/src/logical/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ macro_rules! impl_logical_plan {
};
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum LogicalPlan {
/// Apply Cross Join to two logical plans.
CrossJoin(CrossJoin),
Expand Down Expand Up @@ -144,6 +144,10 @@ impl LogicalPlan {
aggr_expr,
})))
}
LogicalPlan::Filter(Filter { input, expr }) => Ok(Transformed::yes(LogicalPlan::Filter(Filter {
input,
expr: f(expr).data()?,
}))),
_ => Ok(Transformed::no(self)),
}
}
Expand Down Expand Up @@ -178,6 +182,19 @@ impl TransformNode for LogicalPlan {
input: Box::new(input),
})
}),
LogicalPlan::Limit(Limit { input, fetch, skip }) => f(*input)?.update(|input| {
LogicalPlan::Limit(Limit {
input: Box::new(input),
fetch,
skip,
})
}),
LogicalPlan::Filter(Filter { expr, input }) => f(*input)?.update(|input| {
LogicalPlan::Filter(Filter {
expr,
input: Box::new(input),
})
}),
_ => Transformed::no(self),
})
}
Expand Down Expand Up @@ -217,13 +234,13 @@ impl std::fmt::Display for LogicalPlan {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct EmptyRelation {
pub produce_one_row: bool,
pub schema: SchemaRef,
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Values {
pub values: Vec<Vec<LogicalExpr>>,
pub schema: SchemaRef,
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/logical/plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{logical::expr::LogicalExpr, logical::plan::LogicalPlan};
use std::fmt::Display;
use std::sync::Arc;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Projection {
pub schema: SchemaRef,
pub input: Box<LogicalPlan>,
Expand Down
24 changes: 23 additions & 1 deletion qurious/src/logical/plan/scan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Display, sync::Arc};
use std::{fmt::Display, sync::Arc, hash::{Hash, Hasher}};

use arrow::datatypes::{Schema, SchemaRef};

Expand Down Expand Up @@ -89,3 +89,25 @@ impl Display for TableScan {
}
}
}

impl PartialEq for TableScan {
fn eq(&self, other: &Self) -> bool {
self.relation == other.relation
&& Arc::ptr_eq(&self.source, &other.source)
&& self.projections == other.projections
&& Arc::ptr_eq(&self.projected_schema, &other.projected_schema)
&& self.filter == other.filter
}
}

impl Eq for TableScan {}

impl Hash for TableScan {
fn hash<H: Hasher>(&self, state: &mut H) {
self.relation.hash(state);
Arc::as_ptr(&self.source).hash(state);
self.projections.hash(state);
Arc::as_ptr(&self.projected_schema).hash(state);
self.filter.hash(state);
}
}
2 changes: 1 addition & 1 deletion qurious/src/logical/plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow::datatypes::SchemaRef;
use crate::logical::expr::SortExpr;
use crate::logical::plan::LogicalPlan;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Sort {
pub exprs: Vec<SortExpr>,
pub input: Box<LogicalPlan>,
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/logical/plan/sub_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
sync::Arc,
};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SubqueryAlias {
pub input: Arc<LogicalPlan>,
pub alias: TableRelation,
Expand Down
14 changes: 9 additions & 5 deletions qurious/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
mod count_wildcard_rule;
mod optimize_projections;
mod push_down_projections;
mod pushdown_filter_inner_join;
mod scalar_subquery_to_join;
mod type_coercion;

use crate::{error::Result, logical::plan::LogicalPlan};
use count_wildcard_rule::CountWildcardRule;
use pushdown_filter_inner_join::PushdownFilterInnerJoin;
use type_coercion::TypeCoercion;

use crate::{error::Result, logical::plan::LogicalPlan};

pub trait OptimizerRule {
fn name(&self) -> &str;

Expand All @@ -21,7 +21,11 @@ pub struct Optimizer {
impl Optimizer {
pub fn new() -> Self {
Self {
rules: vec![Box::new(CountWildcardRule), Box::new(TypeCoercion::default())],
rules: vec![
Box::new(CountWildcardRule),
Box::new(TypeCoercion),
Box::new(PushdownFilterInnerJoin),
],
}
}

Expand Down
Loading

0 comments on commit 2a331a7

Please sign in to comment.