Skip to content

Commit

Permalink
Refactor codebase to optimize execution of logical plans and improve …
Browse files Browse the repository at this point in the history
…aggregation functionality
  • Loading branch information
holicc committed Sep 13, 2024
1 parent fb0d35a commit b7f73bc
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 42 deletions.
5 changes: 4 additions & 1 deletion qurious/src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ mod tests {
#[test]
fn test_create_table() -> Result<()> {
let session = ExecuteSession::new()?;
let batch = session.sql("select 1 + 0.1")?;
session.sql("create table t(v1 int not null, v2 int not null, v3 int not null)")?;
session.sql("insert into t values(1,4,2), (2,3,3), (3,4,4), (4,3,5)")?;

let batch = session.sql("select count(v3) = min(v3),count(v3),min(v3) from t group by v2")?;

print_batches(&batch)?;

Expand Down
39 changes: 20 additions & 19 deletions qurious/src/logical/expr/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use arrow::datatypes::{DataType, Field, FieldRef};
use crate::datatypes::operator::Operator;
use crate::error::Result;
use crate::logical::plan::LogicalPlan;
use crate::utils;
use std::fmt::Display;
use std::sync::Arc;

Expand Down Expand Up @@ -33,26 +34,26 @@ impl BinaryExpr {
}

pub fn get_result_type(&self, plan: &LogicalPlan) -> Result<DataType> {
let ll = self.left.field(plan)?;
let rr = self.right.field(plan)?;
let left_type = ll.data_type();
let right_type = rr.data_type();
match self.op {
Operator::Eq
| Operator::NotEq
| Operator::Gt
| Operator::GtEq
| Operator::Lt
| Operator::LtEq
| Operator::And
| Operator::Or => {
return Ok(DataType::Boolean);
}
_ => {
let ll = self.left.field(plan)?;
let rr = self.right.field(plan)?;
let left_type = ll.data_type();
let right_type = rr.data_type();

let final_type = match (left_type, right_type) {
(_, DataType::LargeUtf8) | (DataType::LargeUtf8, _) => DataType::LargeUtf8,
(DataType::Float64, _) | (_, DataType::Float64) => DataType::Float64,
(DataType::Int64, _) | (_, DataType::Int64) => DataType::Int64,
(DataType::Int32, _) | (_, DataType::Int32) => DataType::Int32,
(DataType::Int16, _) | (_, DataType::Int16) => DataType::Int16,
(DataType::Int8, _) | (_, DataType::Int8) => DataType::Int8,
(DataType::UInt64, _) | (_, DataType::UInt64) => DataType::UInt64,
(DataType::UInt32, _) | (_, DataType::UInt32) => DataType::UInt32,
(DataType::UInt16, _) | (_, DataType::UInt16) => DataType::UInt16,
(DataType::UInt8, _) | (_, DataType::UInt8) => DataType::UInt8,
_ => unimplemented!("Type coercion not supported for {:?} and {:?}", left_type, right_type),
};

Ok(final_type)
Ok(utils::get_input_types(left_type, right_type))
}
}
}
}

Expand Down
11 changes: 5 additions & 6 deletions qurious/src/logical/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,21 @@ impl LogicalPlan {

pub fn map_expr<F>(self, mut f: F) -> Result<Self>
where
F: FnMut(&LogicalExpr) -> Result<Transformed<LogicalExpr>>,
F: FnMut(&LogicalPlan, &LogicalExpr) -> Result<Transformed<LogicalExpr>>,
{
fn iter<F: FnMut(&LogicalExpr) -> Result<Transformed<LogicalExpr>>>(
fn iter<F: FnMut(&LogicalPlan, &LogicalExpr) -> Result<Transformed<LogicalExpr>>>(
mut plan: LogicalPlan,
f: &mut F,
) -> Result<LogicalPlan> {
match &mut plan {
LogicalPlan::Projection( proj) => {
LogicalPlan::Projection(proj) => {
for expr in &mut proj.exprs {
if let Transformed::Yes(new_expr) = f(expr)? {
if let Transformed::Yes(new_expr) = f(&proj.input, expr)? {
*expr = new_expr;
}
}

}
_ => todo!("map_expr for {:?}", plan),
_ => {}
}

Ok(plan)
Expand Down
9 changes: 4 additions & 5 deletions qurious/src/logical/plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@ pub struct Projection {

impl Projection {
pub fn try_new(input: LogicalPlan, exprs: Vec<LogicalExpr>) -> Result<Self> {
let base_plan = base_plan(&input);

Ok(Self {
schema: exprs
.iter()
.filter_map(|f| match f {
LogicalExpr::Column(i) => Some(i.field(&input)),
LogicalExpr::Literal(i) => Some(Ok(Arc::new(i.to_field()))),
LogicalExpr::Alias(i) => Some(i.expr.field(&input)),
LogicalExpr::AggregateExpr(i) => {
let plan = base_plan(&input);
Some(i.field(plan))
}
LogicalExpr::BinaryExpr(i) => Some(i.field(&input)),
LogicalExpr::AggregateExpr(i) => Some(i.field(base_plan)),
LogicalExpr::BinaryExpr(i) => Some(i.field(base_plan)),
a => todo!("Projection::try_new: {:?}", a),
})
.collect::<Result<Vec<FieldRef>>>()
Expand Down
38 changes: 28 additions & 10 deletions qurious/src/optimizer/type_coercion.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::OptimizerRule;
use crate::error::Result;
use crate::logical::expr::{BinaryExpr, LogicalExpr};
use crate::logical::plan::{LogicalPlan, Transformed};
use crate::logical::plan::{base_plan, LogicalPlan, Transformed};
use crate::utils;

#[derive(Default)]
pub struct TypeCoercion {}
Expand All @@ -12,28 +13,45 @@ impl OptimizerRule for TypeCoercion {
}

fn optimize(&self, base_plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
base_plan
.clone()
.map_expr(|expr| type_coercion(base_plan, expr))
.map(Some)
let plan = base_plan.clone();
let plan = if let LogicalPlan::Filter(mut filter) = plan {
if let Transformed::Yes(new_expr) = type_coercion(base_plan, &filter.expr)? {
filter.expr = new_expr;
}
LogicalPlan::Filter(filter)
} else {
plan
};

plan.map_expr(type_coercion).map(Some)
}
}

fn type_coercion(plan: &LogicalPlan, expr: &LogicalExpr) -> Result<Transformed<LogicalExpr>> {
match expr {
LogicalExpr::BinaryExpr(binary_op) => coerce_binary_op(plan, binary_op)
LogicalExpr::BinaryExpr(binary_op) => coerce_binary_op(base_plan(plan), binary_op)
.map(LogicalExpr::BinaryExpr)
.map(Transformed::Yes),
_ => todo!(),
_ => Ok(Transformed::No),
}
}

fn coerce_binary_op(plan: &LogicalPlan, expr: &BinaryExpr) -> Result<BinaryExpr> {
let final_type = expr.get_result_type(plan)?;
let ll = expr.left.field(plan)?;
let rr = expr.right.field(plan)?;
let left_type = ll.data_type();
let right_type = rr.data_type();

let (l, r) = if left_type == right_type {
(expr.left.as_ref().clone(), expr.right.as_ref().clone())
} else {
let final_type = utils::get_input_types(left_type, right_type);
(expr.left.cast_to(&final_type), expr.right.cast_to(&final_type))
};

Ok(BinaryExpr {
left: Box::new(expr.left.cast_to(&final_type)),
left: Box::new(l),
op: expr.op.clone(),
right: Box::new(expr.right.cast_to(&final_type)),
right: Box::new(r),
})
}
1 change: 1 addition & 0 deletions qurious/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl DefaultQueryPlanner {
.iter()
.map(|e| self.create_physical_expr(&input_schema, e))
.collect::<Result<Vec<_>>>()?;

Ok(Arc::new(physical::plan::Projection::new(
projection.schema.clone(),
physical_plan,
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/planner/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ mod tests {
fn test_read_parquet() {
quick_test(
"select * from read_parquet('./tests/testdata/file/case2.parquet') where counter_id = '1'",
"Projection: (tmp_table(17b774f).counter_id, tmp_table(17b774f).currency, tmp_table(17b774f).market, tmp_table(17b774f).type)\n Filter: tmp_table(17b774f).counter_id = CAST(Utf8(1) AS LargeUtf8)\n TableScan: tmp_table(17b774f)\n",
"Projection: (tmp_table(17b774f).counter_id, tmp_table(17b774f).currency, tmp_table(17b774f).market, tmp_table(17b774f).type)\n Filter: tmp_table(17b774f).counter_id = Utf8(1)\n TableScan: tmp_table(17b774f)\n",
);
}

Expand Down
17 changes: 17 additions & 0 deletions qurious/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod array;
pub mod batch;

use arrow::datatypes::DataType;
use sqlparser::ast::Ident;

use crate::logical::plan::LogicalPlan;
Expand Down Expand Up @@ -31,3 +32,19 @@ pub fn normalize_ident(i: Ident) -> String {
None => i.value.to_ascii_lowercase(),
}
}

pub fn get_input_types(left_type: &DataType, right_type: &DataType) -> DataType {
match (left_type, right_type) {
(_, DataType::LargeUtf8) | (DataType::LargeUtf8, _) => DataType::LargeUtf8,
(DataType::Float64, _) | (_, DataType::Float64) => DataType::Float64,
(DataType::Int64, _) | (_, DataType::Int64) => DataType::Int64,
(DataType::Int32, _) | (_, DataType::Int32) => DataType::Int32,
(DataType::Int16, _) | (_, DataType::Int16) => DataType::Int16,
(DataType::Int8, _) | (_, DataType::Int8) => DataType::Int8,
(DataType::UInt64, _) | (_, DataType::UInt64) => DataType::UInt64,
(DataType::UInt32, _) | (_, DataType::UInt32) => DataType::UInt32,
(DataType::UInt16, _) | (_, DataType::UInt16) => DataType::UInt16,
(DataType::UInt8, _) | (_, DataType::UInt8) => DataType::UInt8,
_ => unimplemented!("Type coercion not supported for {:?} and {:?}", left_type, right_type),
}
}

0 comments on commit b7f73bc

Please sign in to comment.