diff --git a/.gitignore b/.gitignore index fd55954..1051447 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ debug/ /target/ target/ -.DS_Store \ No newline at end of file +.DS_Store +pkg/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index abdc9b0..f084e48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,7 +8,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "getrandom", "once_cell", "version_check", @@ -75,6 +75,12 @@ dependencies = [ "syn 2.0.18", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -91,13 +97,23 @@ dependencies = [ "num-traits", ] +[[package]] +name = "console_error_panic_hook" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06aeb73f470f66dcdbf7223caeebb85984942f22f1adb2a088cf9668146bbbc" +dependencies = [ + "cfg-if 1.0.0", + "wasm-bindgen", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -107,7 +123,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-epoch", "crossbeam-utils", ] @@ -119,7 +135,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46bd5f3f85273295a9d14aedfb86f6aadbff6d8f5295c4a9edb08e819dcf5695" dependencies = [ "autocfg", - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", "memoffset", "scopeguard", @@ -131,7 +147,7 @@ version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -164,7 +180,7 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "libc", "wasi", @@ -302,6 +318,9 @@ dependencies = [ "rand", "rayon", "smartstring", + "wasm-bindgen", + "wasm-bindgen-test", + "wee_alloc", "xxhash-rust", ] @@ -320,6 +339,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memory_units" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8452105ba047068f40ff7093dd1d9da90898e63dd61736462e9cdda6a90ad3c3" + [[package]] name = "multiversion" version = "0.7.2" @@ -452,6 +477,12 @@ dependencies = [ "semver", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.1.0" @@ -545,7 +576,7 @@ version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bba0e8cb82ba49ff4e229459ff22a191bbe9a1cb3a341610c9c33efc27ddf73" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] @@ -564,6 +595,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d1985d03709c53167ce907ff394f5316aa22cb4e12761295c5dc57dacb6297e" +dependencies = [ + "cfg-if 1.0.0", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.86" @@ -593,6 +636,74 @@ version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93" +[[package]] +name = "wasm-bindgen-test" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e636f3a428ff62b3742ebc3c70e254dfe12b8c2b469d688ea59cdd4abcf502" +dependencies = [ + "console_error_panic_hook", + "js-sys", + "scoped-tls", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-bindgen-test-macro", +] + +[[package]] +name = "wasm-bindgen-test-macro" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f18c1fad2f7c4958e7bcce014fa212f59a65d5e3721d0f77e6c0b27ede936ba3" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "web-sys" +version = "0.3.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bdd9ef4e984da1187bf8110c5cf5b845fbc87a23602cdf912386a76fcd3a7c2" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "wee_alloc" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbb3b5a6b2bb17cb6ad44a2e68a43e8d2722c997da10e928665c72ec6c0a0b8e" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "memory_units", + "winapi", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "xxhash-rust" version = "0.8.6" diff --git a/Cargo.toml b/Cargo.toml index 7004ea1..ac150da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] + [dependencies] ahash = "0.8.3" arrow2 = { version = "0.17.1", features = [ @@ -25,3 +28,8 @@ rand = "0.8.5" rayon = "1.7.0" smartstring = "1.0.1" xxhash-rust = { version = "0.8.6", features = ["xxh3"] } +wasm-bindgen = "0.2.63" +wee_alloc = { version = "0.4.5", optional = true } + +[dev-dependencies] +wasm-bindgen-test = "0.3.13" diff --git a/src/chunked_array/sort_test.rs b/src/chunked_array/sort_test.rs index 80d4edf..6f4ca7c 100644 --- a/src/chunked_array/sort_test.rs +++ b/src/chunked_array/sort_test.rs @@ -43,7 +43,7 @@ mod sort_i32 { let arr = ChunkedArray::new("s", &vec![12, 1, 5, 8]); let sorted = arr.sort(true); println!("arr: {:?}", &sorted); - assert_eq!(sorted.to_vec(), vec![1, 5, 8, 12]); + assert_eq!(sorted.to_vec(), vec![12, 8, 5, 1]); } #[test] diff --git a/src/dataframe/groupby/mod.rs b/src/dataframe/groupby/mod.rs index 1236039..76156bd 100644 --- a/src/dataframe/groupby/mod.rs +++ b/src/dataframe/groupby/mod.rs @@ -20,15 +20,9 @@ mod mod_test; use super::DataFrame; #[derive(Debug)] -pub enum GroupsProxy { - Idx(GroupsIdx), - Slice, -} - -#[derive(Debug)] -pub struct GroupsIdx { - first: Vec, - all: Vec>, +pub struct GroupsProxy { + pub first: Vec, + pub all: Vec>, } impl DataFrame { @@ -87,10 +81,10 @@ impl DataFrame { }) .collect(); let (first_indices, grouped_indices) = join_group_indices(tuples); - GroupsProxy::Idx(GroupsIdx { + GroupsProxy { first: first_indices, all: grouped_indices, - }) + } } } diff --git a/src/dataframe/mod.rs b/src/dataframe/mod.rs index 90fa437..939178c 100644 --- a/src/dataframe/mod.rs +++ b/src/dataframe/mod.rs @@ -99,7 +99,11 @@ impl DataFrame { } pub fn get_index_with_name(&self, name: &str) -> usize { - self.columns.iter().position(|c| c.name() == name).unwrap() + let col = self.columns.iter().position(|c| c.name() == name); + if col.is_none() { + panic!("column not found for name: {name}"); + } + col.unwrap() } // For now, makes all the chunks diff --git a/src/js_little_dataframe/mod.rs b/src/js_little_dataframe/mod.rs new file mode 100644 index 0000000..8148494 --- /dev/null +++ b/src/js_little_dataframe/mod.rs @@ -0,0 +1,11 @@ +use wasm_bindgen::prelude::*; + +#[wasm_bindgen] +extern "C" { + fn alert(s: &str); +} + +#[wasm_bindgen] +pub fn greet() { + alert("Hello, wasm-game-of-life!"); +} diff --git a/src/js_little_dataframe/test.ts b/src/js_little_dataframe/test.ts new file mode 100644 index 0000000..e51c390 --- /dev/null +++ b/src/js_little_dataframe/test.ts @@ -0,0 +1,4 @@ +import * as Foo from "../../pkg" + +const foo = "bar" +console.log("YO") \ No newline at end of file diff --git a/src/lazy_dataframe/aexpr.rs b/src/lazy_dataframe/aexpr.rs index 7ba5018..1b17944 100644 --- a/src/lazy_dataframe/aexpr.rs +++ b/src/lazy_dataframe/aexpr.rs @@ -4,10 +4,11 @@ use crate::core::{field::Field, iterator::AExprIter, schema::Schema}; use super::{ arena::{Arena, Node}, - expr::{Expr, Operator}, + expr::{AggExpr, Expr, Operator}, lit::LiteralValue, physical_plan::physical_expr::{ - binary_expr::BinaryExpr, column::ColumnExpr, literal::LiteralExpr, PhysicalExpr, + agg::AggregationExpr, binary_expr::BinaryExpr, column::ColumnExpr, literal::LiteralExpr, + PhysicalExpr, }, }; @@ -21,6 +22,12 @@ pub enum AExpr { }, Column(Arc), Literal(LiteralValue), + Agg(AAggExpr), +} + +#[derive(Clone, Debug)] +pub enum AAggExpr { + Min(Node), } impl AExpr { @@ -32,6 +39,9 @@ impl AExpr { } AExpr::Column(_) => {} AExpr::Literal(_) => {} + AExpr::Agg(agg) => match agg { + AAggExpr::Min(agg) => stack.push(*agg), + }, } } @@ -40,6 +50,7 @@ impl AExpr { AExpr::BinaryExpr { left, op, right } => todo!(), AExpr::Column(col_name) => schema.get_field(&col_name).unwrap(), AExpr::Literal(_) => todo!(), + AExpr::Agg(_) => todo!(), } } } @@ -53,6 +64,9 @@ pub fn expr_to_aexpr(expr: Expr, arena: &mut Arena) -> Node { right: expr_to_aexpr(*right, arena), }, Expr::Literal(v) => AExpr::Literal(v), + Expr::Agg(agg) => match agg { + AggExpr::Min(input) => AExpr::Agg(AAggExpr::Min(expr_to_aexpr(*input, arena))), + }, }; arena.add(aexpr) } @@ -66,6 +80,11 @@ pub fn create_physical_expr(expr: Node, expr_arena: &mut Arena) -> Arc Arc::new(ColumnExpr::new(col_name)), AExpr::Literal(lit) => Arc::new(LiteralExpr::new(lit)), + AExpr::Agg(agg) => match agg { + AAggExpr::Min(input) => Arc::new(AggregationExpr::Min(create_physical_expr( + input, expr_arena, + ))), + }, } } @@ -102,3 +121,21 @@ pub fn aexpr_to_leaf_names_iter<'a>( pub fn check_input_node(node: Node, schema: &Schema, expr_arena: &Arena) -> bool { aexpr_to_leaf_names_iter(node, expr_arena).all(|name| schema.index_of(name.as_ref()).is_some()) } + +pub fn expr_node_to_expr(node: Node, expr_arena: &Arena) -> Expr { + let aexpr = expr_arena.get(node); + match aexpr { + AExpr::BinaryExpr { left, op, right } => Expr::BinaryExpr { + left: Box::new(expr_node_to_expr(*left, expr_arena)), + op: op.clone(), + right: Box::new(expr_node_to_expr(*right, expr_arena)), + }, + AExpr::Column(col) => Expr::Column(col.clone()), + AExpr::Literal(lit) => Expr::Literal(lit.clone()), + AExpr::Agg(agg) => match agg { + AAggExpr::Min(input) => Expr::Agg(AggExpr::Min(Box::new(expr_node_to_expr( + *input, expr_arena, + )))), + }, + } +} diff --git a/src/lazy_dataframe/alogical_plan.rs b/src/lazy_dataframe/alogical_plan.rs index 354e7a3..c53ffca 100644 --- a/src/lazy_dataframe/alogical_plan.rs +++ b/src/lazy_dataframe/alogical_plan.rs @@ -6,11 +6,12 @@ use crate::{ }; use super::{ - aexpr::{create_physical_expr, expr_to_aexpr, AExpr}, + aexpr::{create_physical_expr, expr_node_to_expr, expr_to_aexpr, AExpr}, arena::{Arena, Node}, logical_plan::LogicalPlan, physical_plan::executor::{ - data_frame_scan::DataFrameScanExec, filter::FilterExec, join::JoinExec, Executor, + data_frame_scan::DataFrameScanExec, filter::FilterExec, groupby::GroupByExec, + join::JoinExec, Executor, }, }; @@ -36,6 +37,11 @@ pub enum ALogicalPlan { selection: Option, schema: Arc, }, + GroupBy { + input: Node, + by: Vec, + agg: Vec, + }, } impl ALogicalPlan { @@ -44,6 +50,63 @@ impl ALogicalPlan { ALogicalPlan::Join { schema, .. } => schema.as_ref().clone(), ALogicalPlan::Selection { input, .. } => arena.get(*input).schema(arena), ALogicalPlan::DataFrameScan { schema, .. } => schema.as_ref().clone(), + ALogicalPlan::GroupBy { input, by, agg } => todo!(), + } + } + + pub fn to_lp( + self, + alp_arena: &mut Arena, + expr_arena: &mut Arena, + ) -> LogicalPlan { + match self { + ALogicalPlan::Join { + left, + right, + left_on, + right_on, + join_type, + schema, + } => LogicalPlan::Join { + left: Box::new(alp_arena.take(left).to_lp(alp_arena, expr_arena)), + right: Box::new(alp_arena.take(right).to_lp(alp_arena, expr_arena)), + left_on: left_on + .iter() + .map(|node| expr_node_to_expr(*node, expr_arena)) + .collect(), + right_on: right_on + .iter() + .map(|node| expr_node_to_expr(*node, expr_arena)) + .collect(), + join_type, + schema, + }, + ALogicalPlan::Selection { input, predicate } => LogicalPlan::Selection { + input: Box::new(alp_arena.take(input).to_lp(alp_arena, expr_arena)), + predicate: expr_node_to_expr(predicate, expr_arena), + }, + ALogicalPlan::DataFrameScan { + df, + projection, + selection, + schema, + } => LogicalPlan::DataFrameScan { + df, + projection, + selection: selection.map(|node| expr_node_to_expr(node, expr_arena)), + schema, + }, + ALogicalPlan::GroupBy { input, by, agg } => LogicalPlan::GroupBy { + keys: by + .iter() + .map(|node| expr_node_to_expr(*node, expr_arena)) + .collect(), + agg: agg + .iter() + .map(|node| expr_node_to_expr(*node, expr_arena)) + .collect(), + input: Box::new(alp_arena.take(input).to_lp(alp_arena, expr_arena)), + }, } } } @@ -90,6 +153,17 @@ pub fn logical_to_alp( selection: selection.map(|expr| expr_to_aexpr(expr, expr_arena)), schema, }, + LogicalPlan::GroupBy { keys, agg, input } => ALogicalPlan::GroupBy { + input: logical_to_alp(*input, expr_arena, alp_arena), + by: keys + .into_iter() + .map(|expr| expr_to_aexpr(expr, expr_arena)) + .collect(), + agg: agg + .into_iter() + .map(|expr| expr_to_aexpr(expr, expr_arena)) + .collect(), + }, }; alp_arena.add(node) } @@ -144,5 +218,21 @@ pub fn alp_node_to_physical_plan( let selection = selection.map(|node| create_physical_expr(node, expr_arena)); Box::new(DataFrameScanExec::new(df, projection, selection)) } + ALogicalPlan::GroupBy { input, by, agg } => { + let input = alp_node_to_physical_plan(input, expr_arena, alp_arena); + let by = by + .iter() + .map(|node| create_physical_expr(*node, expr_arena)) + .collect(); + let agg = agg + .iter() + .map(|node| create_physical_expr(*node, expr_arena)) + .collect(); + Box::new(GroupByExec { + input, + keys: by, + agg, + }) + } } } diff --git a/src/lazy_dataframe/arena.rs b/src/lazy_dataframe/arena.rs index 6d8e1cf..c9eb0c5 100644 --- a/src/lazy_dataframe/arena.rs +++ b/src/lazy_dataframe/arena.rs @@ -19,6 +19,11 @@ impl Arena { Node(len) } + pub fn replace(&mut self, idx: Node, val: T) { + let ele = self.get_mut(idx); + *ele = val; + } + pub fn get_mut(&mut self, idx: Node) -> &mut T { self.items.get_mut(idx.0).unwrap() } diff --git a/src/lazy_dataframe/expr.rs b/src/lazy_dataframe/expr.rs index e57f292..0443ac1 100644 --- a/src/lazy_dataframe/expr.rs +++ b/src/lazy_dataframe/expr.rs @@ -11,6 +11,7 @@ pub enum Expr { right: Box, }, Literal(LiteralValue), + Agg(AggExpr), } #[derive(Copy, Clone, PartialEq, Eq, Debug)] @@ -28,18 +29,36 @@ impl Expr { right: Box::new(other), } } + + pub fn min(self) -> Expr { + Expr::Agg(AggExpr::Min(Box::new(self))) + } } pub fn col(str: &str) -> Expr { Expr::Column(Arc::from(str)) } +#[derive(Clone)] +pub enum AggExpr { + Min(Box), +} + +impl Debug for AggExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Min(input) => write!(f, "Min(\"{input:?}\")"), + } + } +} + impl Debug for Expr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Expr::Column(name) => write!(f, "col(\"{name}\")"), Expr::BinaryExpr { left, op, right } => write!(f, "[({left:?}) {op:?} ({right:?})]"), Expr::Literal(lit) => write!(f, "lit(\"{lit:?}\")"), + Expr::Agg(agg_expr) => write!(f, "Agg(\"{agg_expr:?}\")"), } } } diff --git a/src/lazy_dataframe/frame.rs b/src/lazy_dataframe/frame.rs index eff67f7..0d9142b 100644 --- a/src/lazy_dataframe/frame.rs +++ b/src/lazy_dataframe/frame.rs @@ -10,8 +10,10 @@ use super::{ alogical_plan::{alp_node_to_physical_plan, ALogicalPlan}, arena::{Arena, Node}, expr::Expr, + lazy_groupby::LazyGroupBy, logical_plan::LogicalPlan, logical_plan_builder::LogicalPlanBuilder, + optimizer::predicate_pushdown::PredicatePushdown, physical_plan::executor::Executor, }; @@ -59,6 +61,10 @@ impl LazyFrame { ) } + pub fn groupby(self, by: Vec) -> LazyGroupBy { + LazyGroupBy::new(self.logical_plan, by) + } + pub fn optimize_with_scratch( self, alp_arena: &mut Arena, @@ -66,10 +72,21 @@ impl LazyFrame { ) -> Node { let node = logical_to_alp(self.logical_plan, expr_arena, alp_arena); - // TODO: Optimize + let predicate_pushdown = PredicatePushdown::new(); + let alp = alp_arena.take(node); + let new_alp = predicate_pushdown.optimize(alp, alp_arena, expr_arena); + alp_arena.replace(node, new_alp); node } + pub fn get_optimized_plan(self) -> LogicalPlan { + let mut expr_arena = Arena::new(); + let mut alp_arena = Arena::new(); + let root = self.optimize_with_scratch(&mut alp_arena, &mut expr_arena); + let alp = alp_arena.take(root); + alp.to_lp(&mut alp_arena, &mut expr_arena) + } + pub fn collect(self) -> DataFrame { let mut executor = self.prepare_collect(); executor.execute() diff --git a/src/lazy_dataframe/lazy_groupby.rs b/src/lazy_dataframe/lazy_groupby.rs new file mode 100644 index 0000000..9b8750e --- /dev/null +++ b/src/lazy_dataframe/lazy_groupby.rs @@ -0,0 +1,23 @@ +use super::{expr::Expr, frame::LazyFrame, logical_plan::LogicalPlan}; + +pub struct LazyGroupBy { + input: LogicalPlan, + by: Vec, +} + +impl LazyGroupBy { + pub fn new(input: LogicalPlan, by: Vec) -> Self { + LazyGroupBy { input, by } + } +} + +impl LazyGroupBy { + pub fn agg(self, agg: Vec) -> LazyFrame { + let lp = LogicalPlan::GroupBy { + keys: self.by, + agg, + input: Box::new(self.input), + }; + LazyFrame::from_logical_plan(lp) + } +} diff --git a/src/lazy_dataframe/logical_plan.rs b/src/lazy_dataframe/logical_plan.rs index 7cb0fef..2ec0df0 100644 --- a/src/lazy_dataframe/logical_plan.rs +++ b/src/lazy_dataframe/logical_plan.rs @@ -34,6 +34,11 @@ pub enum LogicalPlan { selection: Option, schema: Arc, }, + GroupBy { + keys: Vec, + agg: Vec, + input: Box, + }, // TODO: Projection } @@ -43,6 +48,7 @@ impl LogicalPlan { LogicalPlan::Join { schema, .. } => schema.clone(), LogicalPlan::Selection { input, predicate } => input.schema(), LogicalPlan::DataFrameScan { schema, .. } => schema.clone(), + LogicalPlan::GroupBy { keys, agg, .. } => todo!(), } } } @@ -110,6 +116,13 @@ impl LogicalPlan { write!(f, "\n{:indent$} SELECTION: {selection:?}", "")?; write!(f, "\n{:indent$} SCHEMA: {schema:?}", "") } + LogicalPlan::GroupBy { keys, agg, input } => { + write!(f, "{:indent$}GROUPBY:", "")?; + + write!(f, "\n{:indent$} KEYS: {keys:?}", "")?; + write!(f, "\n{:indent$} BY: {agg:?}", ""); + write!(f, "\n{:indent$} INPUT: {input:?}", "") + } } } } diff --git a/src/lazy_dataframe/mod.rs b/src/lazy_dataframe/mod.rs index 9185b1a..09e0492 100644 --- a/src/lazy_dataframe/mod.rs +++ b/src/lazy_dataframe/mod.rs @@ -3,6 +3,7 @@ pub mod alogical_plan; pub mod arena; pub mod expr; pub mod frame; +pub mod lazy_groupby; pub mod lit; pub mod logical_plan; pub mod logical_plan_builder; diff --git a/src/lazy_dataframe/optimizer/mod.rs b/src/lazy_dataframe/optimizer/mod.rs index ed7c734..578ab80 100644 --- a/src/lazy_dataframe/optimizer/mod.rs +++ b/src/lazy_dataframe/optimizer/mod.rs @@ -1 +1,2 @@ pub mod predicate_pushdown; +mod predicate_pushdown_test; diff --git a/src/lazy_dataframe/optimizer/predicate_pushdown.rs b/src/lazy_dataframe/optimizer/predicate_pushdown.rs index c34dda7..6e01b0b 100644 --- a/src/lazy_dataframe/optimizer/predicate_pushdown.rs +++ b/src/lazy_dataframe/optimizer/predicate_pushdown.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use hashbrown::{hash_map::Entry, HashMap}; use crate::lazy_dataframe::{ - aexpr::{aexpr_to_leaf_names_iter, AExpr}, + aexpr::{aexpr_to_leaf_names_iter, check_input_node, AExpr}, alogical_plan::ALogicalPlan, arena::{self, Arena, Node}, expr::Operator, @@ -13,6 +13,10 @@ use crate::lazy_dataframe::{ pub struct PredicatePushdown {} impl PredicatePushdown { + pub fn new() -> Self { + PredicatePushdown {} + } + pub fn optimize( &self, logical_plan: ALogicalPlan, @@ -41,8 +45,37 @@ impl PredicatePushdown { } => { let left_schema = alp_arena.get(left).schema(&alp_arena); let right_schema = alp_arena.get(right).schema(&alp_arena); - for (name, predicate_node) in acc_predicates.iter() {} - todo!() + let mut local_predicates = Vec::with_capacity(acc_predicates.len()); + let mut left_pushdowns = HashMap::new(); + let mut right_pushdowns = HashMap::new(); + for (name, predicate_node) in acc_predicates.into_iter() { + let mut did_pushdown = false; + if !predicate_is_pushdown_boundary(predicate_node, expr_arena) { + if check_input_node(predicate_node, &left_schema, expr_arena) { + left_pushdowns.insert(name, predicate_node); + did_pushdown = true; + } else if check_input_node(predicate_node, &right_schema, expr_arena) { + right_pushdowns.insert(name, predicate_node); + did_pushdown = true; + } + } + if !did_pushdown { + local_predicates.push(predicate_node) + } + } + + self.pushdown_and_replace(left, left_pushdowns, alp_arena, expr_arena); + self.pushdown_and_replace(right, right_pushdowns, alp_arena, expr_arena); + + let new_join = ALogicalPlan::Join { + left, + right, + left_on, + right_on, + join_type, + schema, + }; + self.optional_wrap_selection(new_join, local_predicates, alp_arena, expr_arena) } ALogicalPlan::Selection { input, predicate } => { let local_predicates = extract_local_predicates(&mut acc_predicates, |node| { @@ -59,7 +92,34 @@ impl PredicatePushdown { projection, selection, schema, - } => todo!(), + } => { + let selection = if !acc_predicates.is_empty() { + let mut predicate = + combine_predicates(acc_predicates.iter().map(|a| *a.1), expr_arena); + if let Some(selection) = selection { + predicate = expr_arena.add(AExpr::BinaryExpr { + left: predicate, + op: Operator::And, + right: selection, + }); + } + Some(predicate) + } else { + selection + }; + ALogicalPlan::DataFrameScan { + df, + projection, + selection, + schema, + } + } + ALogicalPlan::GroupBy { input, by, agg } => { + self.pushdown_and_replace(input, acc_predicates, alp_arena, expr_arena); + let lp = ALogicalPlan::GroupBy { input, by, agg }; + // TODO: We might need to have some local predicates here + self.optional_wrap_selection(lp, vec![], alp_arena, expr_arena) + } } } @@ -78,6 +138,18 @@ impl PredicatePushdown { ALogicalPlan::Selection { input, predicate } } } + + // Pushes down the predicates to the Node and replace the Node with the wrapped value. + fn pushdown_and_replace( + &self, + node: Node, + predicates: HashMap, Node>, + alp_arena: &mut Arena, + expr_arena: &mut Arena, + ) { + let new_alp = self.push_down(alp_arena.take(node), alp_arena, expr_arena, predicates); + alp_arena.replace(node, new_alp); + } } pub fn predicate_to_key(predicate: Node, expr_arena: &Arena) -> Arc { diff --git a/src/lazy_dataframe/optimizer/predicate_pushdown_test.rs b/src/lazy_dataframe/optimizer/predicate_pushdown_test.rs new file mode 100644 index 0000000..33b394e --- /dev/null +++ b/src/lazy_dataframe/optimizer/predicate_pushdown_test.rs @@ -0,0 +1,35 @@ +use crate::{ + chunked_array::builder::NewFrom, + dataframe::{join::JoinType, DataFrame}, + lazy_dataframe::{ + expr::col, + lit::{self, lit}, + }, + series::Series, +}; + +#[test] +fn test_inner_join_pushdown() { + let df1 = DataFrame::new(vec![ + Series::from_vec("foo", &vec!["abc", "def", "ghi"]), + Series::from_vec("idx1", &vec![0, 0, 1]), + ]); + + let df2 = DataFrame::new(vec![ + Series::from_vec("bar", &vec![5, 6]), + Series::from_vec("idx2", &vec![0, 1]), + ]); + + let out = df1 + .lazy() + .join( + vec![col("idx1")], + df2.lazy(), + vec![col("idx2")], + JoinType::Inner, + ) + .filter(col("bar").eq(lit(5i32))); + + let optimized_plan = out.get_optimized_plan(); + println!("Optimized Plan: {optimized_plan:?}") +} diff --git a/src/lazy_dataframe/physical_plan/executor/data_frame_scan.rs b/src/lazy_dataframe/physical_plan/executor/data_frame_scan.rs index e4d43ab..ae536c0 100644 --- a/src/lazy_dataframe/physical_plan/executor/data_frame_scan.rs +++ b/src/lazy_dataframe/physical_plan/executor/data_frame_scan.rs @@ -33,10 +33,10 @@ impl Executor for DataFrameScanExec { df = df.select(projection.iter()) } - let pred = self.selection.as_ref().map(|s| s.evaluate(&self.df)); + let pred = self.selection.as_ref().map(|s| s.evaluate(&df)); if let Some(pred) = pred { - df = self.df.filter(pred.bool()); + df = df.filter(pred.bool()); }; df } diff --git a/src/lazy_dataframe/physical_plan/executor/groupby.rs b/src/lazy_dataframe/physical_plan/executor/groupby.rs new file mode 100644 index 0000000..6522a9c --- /dev/null +++ b/src/lazy_dataframe/physical_plan/executor/groupby.rs @@ -0,0 +1,43 @@ +use std::sync::Arc; + +use crate::{ + dataframe::DataFrame, lazy_dataframe::physical_plan::physical_expr::PhysicalExpr, + series::Series, +}; + +use super::Executor; + +pub struct GroupByExec { + pub input: Box, + pub keys: Vec>, + pub agg: Vec>, +} + +impl Executor for GroupByExec { + fn execute(&mut self) -> DataFrame { + let df = self.input.execute(); + let by = self + .keys + .iter() + .map(|expr| expr.evaluate(&df)) + .collect::>(); + let group_proxy = df.compute_group_proxy(by.clone()); + + let mut columns_selected = by + .iter() + .map(|col| compute_key(col, &group_proxy.first)) + .collect::>(); + let columns_aggregated = self + .agg + .iter() + .map(|expr| expr.evaluate_for_groups(&df, &group_proxy)) + .collect::>(); + columns_selected.extend(columns_aggregated); + DataFrame::new(columns_selected) + } +} + +fn compute_key(series: &Series, indices: &Vec) -> Series { + // Extracts the series to only keep the elements in the indices + todo!() +} diff --git a/src/lazy_dataframe/physical_plan/executor/groupby_test.rs b/src/lazy_dataframe/physical_plan/executor/groupby_test.rs new file mode 100644 index 0000000..40a3714 --- /dev/null +++ b/src/lazy_dataframe/physical_plan/executor/groupby_test.rs @@ -0,0 +1,21 @@ +use crate::{ + chunked_array::builder::NewFrom, + dataframe::DataFrame, + lazy_dataframe::expr::{col, AggExpr, Expr}, + series::Series, +}; + +#[test] +fn test_simple_groupby_agg() { + let df = DataFrame::new(vec![ + Series::from_vec("name", &vec!["a", "b", "a", "b", "c"]), + Series::from_vec("points", &vec![1, 2, 3, 2, 1]), + ]); + + let computed_df = df + .lazy() + .groupby(vec![col("name")]) + .agg(vec![col("points").min()]) + .collect(); + println!("Groupby: {computed_df:?}"); +} diff --git a/src/lazy_dataframe/physical_plan/executor/mod.rs b/src/lazy_dataframe/physical_plan/executor/mod.rs index bb57698..82b42b0 100644 --- a/src/lazy_dataframe/physical_plan/executor/mod.rs +++ b/src/lazy_dataframe/physical_plan/executor/mod.rs @@ -3,6 +3,8 @@ use crate::dataframe::DataFrame; pub mod data_frame_scan; pub mod filter; mod filter_test; +pub mod groupby; +mod groupby_test; pub mod join; mod join_test; diff --git a/src/lazy_dataframe/physical_plan/physical_expr/agg.rs b/src/lazy_dataframe/physical_plan/physical_expr/agg.rs new file mode 100644 index 0000000..f86f352 --- /dev/null +++ b/src/lazy_dataframe/physical_plan/physical_expr/agg.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; + +use crate::{ + dataframe::{groupby::GroupsProxy, DataFrame}, + series::Series, +}; + +use super::PhysicalExpr; + +pub enum AggregationExpr { + Min(Arc), +} + +impl PhysicalExpr for AggregationExpr { + fn evaluate(&self, df: &DataFrame) -> Series { + todo!() + } + + fn evaluate_for_groups(&self, df: &DataFrame, group_proxy: &GroupsProxy) -> Series { + let input = self.evaluate(df); + match self { + AggregationExpr::Min(agg) => { + // TODO: This should be evaluate_groups. + // But for the MVP, let's just only support col(...).agg(...) + let series = agg.evaluate(df); + } + } + todo!() + } +} + +pub fn agg_min() {} diff --git a/src/lazy_dataframe/physical_plan/physical_expr/binary_expr.rs b/src/lazy_dataframe/physical_plan/physical_expr/binary_expr.rs index f28dcea..2188ece 100644 --- a/src/lazy_dataframe/physical_plan/physical_expr/binary_expr.rs +++ b/src/lazy_dataframe/physical_plan/physical_expr/binary_expr.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::{ chunked_array::chunk_compare::ChunkCompare, - dataframe::DataFrame, + dataframe::{groupby::GroupsProxy, DataFrame}, lazy_dataframe::expr::Operator, series::{constructor::IntoSeries, Series}, }; @@ -34,4 +34,8 @@ impl PhysicalExpr for BinaryExpr { } } } + + fn evaluate_for_groups(&self, df: &DataFrame, group_proxy: &GroupsProxy) -> Series { + todo!() + } } diff --git a/src/lazy_dataframe/physical_plan/physical_expr/column.rs b/src/lazy_dataframe/physical_plan/physical_expr/column.rs index 59a2fa8..093f28e 100644 --- a/src/lazy_dataframe/physical_plan/physical_expr/column.rs +++ b/src/lazy_dataframe/physical_plan/physical_expr/column.rs @@ -1,6 +1,9 @@ use std::sync::Arc; -use crate::{dataframe::DataFrame, series::Series}; +use crate::{ + dataframe::{groupby::GroupsProxy, DataFrame}, + series::Series, +}; use super::PhysicalExpr; @@ -18,4 +21,8 @@ impl PhysicalExpr for ColumnExpr { fn evaluate(&self, df: &DataFrame) -> Series { df.column(&self.col_name) } + + fn evaluate_for_groups(&self, df: &DataFrame, group_proxy: &GroupsProxy) -> Series { + todo!() + } } diff --git a/src/lazy_dataframe/physical_plan/physical_expr/literal.rs b/src/lazy_dataframe/physical_plan/physical_expr/literal.rs index 719c0b9..536e1e1 100644 --- a/src/lazy_dataframe/physical_plan/physical_expr/literal.rs +++ b/src/lazy_dataframe/physical_plan/physical_expr/literal.rs @@ -3,7 +3,7 @@ use crate::{ chunk_full::ChunkFull, types::{BooleanChunked, I32Chunked, Utf8Chunked}, }, - dataframe::DataFrame, + dataframe::{groupby::GroupsProxy, DataFrame}, lazy_dataframe::lit::LiteralValue, series::{constructor::IntoSeries, Series}, }; @@ -33,4 +33,8 @@ impl PhysicalExpr for LiteralExpr { LiteralValue::Utf8(v) => Utf8Chunked::full(series_name, v, rows_count).into_series(), } } + + fn evaluate_for_groups(&self, df: &DataFrame, group_proxy: &GroupsProxy) -> Series { + todo!() + } } diff --git a/src/lazy_dataframe/physical_plan/physical_expr/mod.rs b/src/lazy_dataframe/physical_plan/physical_expr/mod.rs index 33d29d8..77f4b4a 100644 --- a/src/lazy_dataframe/physical_plan/physical_expr/mod.rs +++ b/src/lazy_dataframe/physical_plan/physical_expr/mod.rs @@ -1,9 +1,15 @@ -use crate::{dataframe::DataFrame, series::Series}; +use crate::{ + dataframe::{groupby::GroupsProxy, DataFrame}, + series::Series, +}; +pub mod agg; pub mod binary_expr; pub mod column; pub mod literal; pub trait PhysicalExpr: Send + Sync { fn evaluate(&self, df: &DataFrame) -> Series; + + fn evaluate_for_groups(&self, df: &DataFrame, group_proxy: &GroupsProxy) -> Series; } diff --git a/src/lib.rs b/src/lib.rs index 8116c7b..2f7bdae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod chunked_array; pub mod core; pub mod dataframe; pub mod hashing; +pub mod js_little_dataframe; pub mod lazy_dataframe; pub mod little_arrow; pub mod random; diff --git a/src/series/implementations.rs b/src/series/implementations.rs index 782c64f..42542de 100644 --- a/src/series/implementations.rs +++ b/src/series/implementations.rs @@ -1,4 +1,4 @@ -use std::{collections::hash_map::RandomState, sync::Arc}; +use std::{cmp::min, collections::hash_map::RandomState, sync::Arc}; use crate::{ chunked_array::{ @@ -9,6 +9,7 @@ use crate::{ types::{AnyValue, BooleanChunked, I32Chunked, Utf8Chunked}, }, core::field::Field, + dataframe::groupby::GroupsProxy, hashing::VecHash, types::DataType, }; @@ -84,6 +85,10 @@ impl SeriesTrait for SeriesWrap { dtype: self.dtype(), } } + + fn agg_min(&self, groups: &GroupsProxy) -> Series { + todo!() + } } impl SeriesTrait for SeriesWrap { @@ -155,6 +160,25 @@ impl SeriesTrait for SeriesWrap { dtype: self.dtype(), } } + + fn agg_min(&self, groups: &GroupsProxy) -> Series { + let arr = self.0.iter_primitive().next().unwrap(); + + let values: Vec = groups + .all + .iter() + .enumerate() + .map(|(_, indices)| { + // TODO: Parallelize + let min = indices.iter().fold(i32::MAX, |acc, &idx| { + let v = arr.get(idx as usize).unwrap(); + min(acc, v) + }); + min + }) + .collect(); + Series::new(self.name(), &values) + } } impl SeriesTrait for SeriesWrap { @@ -226,4 +250,8 @@ impl SeriesTrait for SeriesWrap { dtype: self.dtype(), } } + + fn agg_min(&self, groups: &GroupsProxy) -> Series { + todo!() + } } diff --git a/src/series/series_trait.rs b/src/series/series_trait.rs index a7f30f0..32ee59a 100644 --- a/src/series/series_trait.rs +++ b/src/series/series_trait.rs @@ -3,6 +3,7 @@ use std::collections::hash_map::RandomState; use crate::{ chunked_array::types::{AnyValue, BooleanChunked}, core::field::Field, + dataframe::groupby::GroupsProxy, types::{DataType, LittleDataType}, }; @@ -38,4 +39,6 @@ pub trait SeriesTrait: Send + Sync { fn filter(&self, _filter: &BooleanChunked) -> Series; fn field(&self) -> Field; + + fn agg_min(&self, groups: &GroupsProxy) -> Series; }