Skip to content

Commit

Permalink
Refactor codebase to optimize execution of logical plans and improve …
Browse files Browse the repository at this point in the history
…aggregation functionality
  • Loading branch information
holicc committed Sep 10, 2024
1 parent e94bae0 commit fb0d35a
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 58 deletions.
11 changes: 3 additions & 8 deletions qurious/src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl ExecuteSession {
}

pub fn execute_logical_plan(&self, plan: &LogicalPlan) -> Result<Vec<RecordBatch>> {
// let plan = self.optimizer.optimize(plan)?;
let plan = self.optimizer.optimize(plan)?;

match &plan {
LogicalPlan::Ddl(ddl) => self.execute_ddl(ddl),
LogicalPlan::Dml(DmlStatement {
Expand Down Expand Up @@ -220,13 +221,7 @@ mod tests {
#[test]
fn test_create_table() -> Result<()> {
let session = ExecuteSession::new()?;
let sql = r#"create table t(v1 int not null, v2 int not null, v3 int not null)"#;

session.sql(sql)?;
session.sql("insert into t values(1,4,2), (2,3,3), (3,4,4), (4,3,5)")?;

let batch = session.sql("select 1+0.1")?;

let batch = session.sql("select 1 + 0.1")?;

print_batches(&batch)?;

Expand Down
61 changes: 20 additions & 41 deletions qurious/src/logical/expr/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::logical::plan::LogicalPlan;
use std::fmt::Display;
use std::sync::Arc;

use super::cast::CastExpr;
use super::LogicalExpr;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -27,53 +26,33 @@ impl BinaryExpr {

pub fn field(&self, plan: &LogicalPlan) -> Result<FieldRef> {
Ok(Arc::new(Field::new(
format!("{} {} {}", self.left, self.op, self.right),
match self.op {
Operator::Eq
| Operator::NotEq
| Operator::Gt
| Operator::GtEq
| Operator::Lt
| Operator::LtEq
| Operator::And
| Operator::Or => DataType::Boolean,
Operator::Add | Operator::Sub | Operator::Mul | Operator::Div | Operator::Mod => {
self.left.field(plan)?.data_type().clone()
}
},
format!("({} {} {})", self.left, self.op, self.right),
self.get_result_type(plan)?,
false,
)))
}

pub fn coerce_types(self, plan: &LogicalPlan) -> Result<BinaryExpr> {
let left = self.left.field(plan)?;
let right = self.right.field(plan)?;
pub fn get_result_type(&self, plan: &LogicalPlan) -> Result<DataType> {
let ll = self.left.field(plan)?;
let rr = self.right.field(plan)?;
let left_type = ll.data_type();
let right_type = rr.data_type();

let (l, r) = match (left.data_type(), right.data_type()) {
(_, DataType::LargeUtf8) => (
Box::new(LogicalExpr::Cast(CastExpr::new(*self.left, DataType::LargeUtf8))),
self.right,
),
(DataType::LargeUtf8, _) => (
self.left,
Box::new(LogicalExpr::Cast(CastExpr::new(*self.right, DataType::LargeUtf8))),
),
(DataType::Float64, _) => (
Box::new(LogicalExpr::Cast(CastExpr::new(*self.left, DataType::Float64))),
self.right,
),
(_, DataType::Float64) => (
self.left,
Box::new(LogicalExpr::Cast(CastExpr::new(*self.right, DataType::Float64))),
),
_ => (self.left, self.right),
let final_type = match (left_type, right_type) {
(_, DataType::LargeUtf8) | (DataType::LargeUtf8, _) => DataType::LargeUtf8,
(DataType::Float64, _) | (_, DataType::Float64) => DataType::Float64,
(DataType::Int64, _) | (_, DataType::Int64) => DataType::Int64,
(DataType::Int32, _) | (_, DataType::Int32) => DataType::Int32,
(DataType::Int16, _) | (_, DataType::Int16) => DataType::Int16,
(DataType::Int8, _) | (_, DataType::Int8) => DataType::Int8,
(DataType::UInt64, _) | (_, DataType::UInt64) => DataType::UInt64,
(DataType::UInt32, _) | (_, DataType::UInt32) => DataType::UInt32,
(DataType::UInt16, _) | (_, DataType::UInt16) => DataType::UInt16,
(DataType::UInt8, _) | (_, DataType::UInt8) => DataType::UInt8,
_ => unimplemented!("Type coercion not supported for {:?} and {:?}", left_type, right_type),
};

Ok(BinaryExpr {
left: l,
op: self.op,
right: r,
})
Ok(final_type)
}
}

Expand Down
5 changes: 0 additions & 5 deletions qurious/src/logical/plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ pub struct Filter {

impl Filter {
pub fn try_new(input: LogicalPlan, expr: LogicalExpr) -> Result<Self> {
let expr = match expr {
LogicalExpr::BinaryExpr(binary) => binary.coerce_types(&input).map(LogicalExpr::BinaryExpr)?,
_ => expr,
};

Ok(Self {
input: Box::new(input),
expr,
Expand Down
33 changes: 32 additions & 1 deletion qurious/src/logical/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ pub use sub_query::SubqueryAlias;

use arrow::datatypes::SchemaRef;

use super::expr::LogicalExpr;
use crate::common::table_relation::TableRelation;
use crate::error::Result;

use super::expr::LogicalExpr;
pub enum Transformed<T> {
Yes(T),
No,
}

#[macro_export]
macro_rules! impl_logical_plan {
Expand Down Expand Up @@ -116,6 +121,32 @@ impl LogicalPlan {
LogicalPlan::Dml(l) => l.children(),
}
}

pub fn map_expr<F>(self, mut f: F) -> Result<Self>
where
F: FnMut(&LogicalExpr) -> Result<Transformed<LogicalExpr>>,
{
fn iter<F: FnMut(&LogicalExpr) -> Result<Transformed<LogicalExpr>>>(
mut plan: LogicalPlan,
f: &mut F,
) -> Result<LogicalPlan> {
match &mut plan {
LogicalPlan::Projection( proj) => {
for expr in &mut proj.exprs {
if let Transformed::Yes(new_expr) = f(expr)? {
*expr = new_expr;
}
}

}
_ => todo!("map_expr for {:?}", plan),
}

Ok(plan)
}

iter(self, &mut f)
}
}

pub fn base_plan(plan: &LogicalPlan) -> &LogicalPlan {
Expand Down
8 changes: 6 additions & 2 deletions qurious/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
mod optimize_projections;
mod push_down_projections;
mod type_coercion;

use optimize_projections::OptimizeProjections;
use type_coercion::TypeCoercion;

use crate::{error::Result, logical::plan::LogicalPlan};

Expand All @@ -18,7 +19,10 @@ pub struct Optimzier {
impl Optimzier {
pub fn new() -> Self {
Self {
rules: vec![Box::new(OptimizeProjections::default())],
rules: vec![
// Box::new(OptimizeProjections::default()),
Box::new(TypeCoercion::default()),
],
}
}

Expand Down
39 changes: 39 additions & 0 deletions qurious/src/optimizer/type_coercion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use super::OptimizerRule;
use crate::error::Result;
use crate::logical::expr::{BinaryExpr, LogicalExpr};
use crate::logical::plan::{LogicalPlan, Transformed};

#[derive(Default)]
pub struct TypeCoercion {}

impl OptimizerRule for TypeCoercion {
fn name(&self) -> &str {
"type_coercion"
}

fn optimize(&self, base_plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
base_plan
.clone()
.map_expr(|expr| type_coercion(base_plan, expr))
.map(Some)
}
}

fn type_coercion(plan: &LogicalPlan, expr: &LogicalExpr) -> Result<Transformed<LogicalExpr>> {
match expr {
LogicalExpr::BinaryExpr(binary_op) => coerce_binary_op(plan, binary_op)
.map(LogicalExpr::BinaryExpr)
.map(Transformed::Yes),
_ => todo!(),
}
}

fn coerce_binary_op(plan: &LogicalPlan, expr: &BinaryExpr) -> Result<BinaryExpr> {
let final_type = expr.get_result_type(plan)?;

Ok(BinaryExpr {
left: Box::new(expr.left.cast_to(&final_type)),
op: expr.op.clone(),
right: Box::new(expr.right.cast_to(&final_type)),
})
}
1 change: 0 additions & 1 deletion qurious/src/planner/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ impl SqlQueryPlanner {
} else {
None
};

// process the GROUP BY clause or process aggregation in SELECT
if select.group_by.is_some() || !aggr_exprs.is_empty() {
plan = self.aggregate_plan(
Expand Down

0 comments on commit fb0d35a

Please sign in to comment.