From 6663fbc19f6b7fad76daab092f656be725f81286 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 25 Jan 2025 19:53:25 +0000 Subject: [PATCH] fix --- .../datasource/physical_plan/parquet/mod.rs | 2 +- datafusion/physical-optimizer/src/pruning.rs | 407 ++++++++++++------ parquet-testing | 2 +- testing | 2 +- 4 files changed, 283 insertions(+), 130 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 085f44191b8a8..4d8657f64e7d9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -2009,7 +2009,7 @@ mod tests { assert_contains!( &display, - "pruning_predicate=c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != bar OR bar != c1_max@1)" + "pruning_predicate=c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 != bar OR c1_max@1 IS NULL OR bar != c1_max@1" ); assert_contains!(&display, r#"predicate=c1@0 != bar"#); diff --git a/datafusion/physical-optimizer/src/pruning.rs b/datafusion/physical-optimizer/src/pruning.rs index 30c6e7fb4b32d..c90b3c157c78e 100644 --- a/datafusion/physical-optimizer/src/pruning.rs +++ b/datafusion/physical-optimizer/src/pruning.rs @@ -258,9 +258,7 @@ pub trait PruningStatistics { /// /// * Rows that evaluate to `false` are not returned (“filtered out” or “pruned” or “skipped”). /// -/// * Rows that evaluate to `NULL` are **NOT** returned (also “filtered out”). -/// Note: *this treatment of `NULL` is **DIFFERENT** than how `NULL` is treated -/// in the rewritten predicate described below.* +/// * No rows should evaluate to `null`, even if statistics are missing (the statistics are themselves `null`). /// /// # `PruningPredicate` Implementation /// @@ -287,11 +285,6 @@ pub trait PruningStatistics { /// predicate can never possibly be true). The container can be pruned (skipped) /// entirely. /// -/// While `PruningPredicate` will never return a `NULL` value, the -/// rewritten predicate (as returned by `build_predicate_expression` and used internally -/// by `PruningPredicate`) may evaluate to `NULL` when some of the min/max values -/// or null / row counts are not known. -/// /// In order to be correct, `PruningPredicate` must return false /// **only** if it can determine that for all rows in the container, the /// predicate could never evaluate to `true` (always evaluates to either `NULL` @@ -311,12 +304,6 @@ pub trait PruningStatistics { /// /// * `true`: there MAY be rows that pass the predicate, **KEEPS** the container /// -/// * `NULL`: there MAY be rows that pass the predicate, **KEEPS** the container -/// Note that rewritten predicate can evaluate to NULL when some of -/// the min/max values are not known. *Note that this is different than -/// the SQL filter semantics where `NULL` means the row is filtered -/// out.* -/// /// * `false`: there are no rows that could possibly match the predicate, /// **PRUNES** the container /// @@ -722,8 +709,15 @@ impl BoolVecBuilder { // False means all containers can not pass the predicate self.inner = vec![false; self.inner.len()]; } + ColumnarValue::Scalar(ScalarValue::Boolean(None)) => { + // this should catch bugs in tests while keeping + // the "old" behavior for PruningPredicate in production + // that if any nulls are encountered they are treated as truthy + #[cfg(debug_assertions)] + panic!("no predicate should return null!") + } _ => { - // Null or true means the rows in container may pass this + // true means the rows in container may pass this // conjunct so we can't prune any containers based on that } } @@ -819,16 +813,24 @@ impl RequiredColumns { /// statistics column, while keeping track that a reference to the statistics /// column is required /// - /// for example, an expression like `col("foo") > 5`, when called + /// For example, an expression like `col("foo") > 5`, when called /// with Max would result in an expression like `col("foo_max") > - /// 5` with the appropriate entry noted in self.columns + /// 5` with the appropriate entry noted in self.columns. + /// + /// Along with a reference to the min/max/null_count/row_count column + /// an expression for `{stats_col} IS NULL` is returned that should be `OR`ed + /// with any expressions applied to the column itself such that if any value in `{stats_col}` + /// is `null` we return `true` and not `null` because `null` is falsy, but we'd need to scan that + /// container, causing confusion and preventing optimizations because of the inverted logic! + /// + /// Thus for the example above we'd arrive at the final expression `foo_max is null or foo_max > 5`. fn stat_column_expr( &mut self, column: &phys_expr::Column, column_expr: &Arc, field: &Field, stat_type: StatisticsType, - ) -> Result> { + ) -> Result<(Arc, Arc)> { let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) { Some(idx) => (idx, false), None => (self.columns.len(), true), @@ -852,36 +854,49 @@ impl RequiredColumns { Field::new(stat_column.name(), field.data_type().clone(), nullable); self.columns.push((column.clone(), stat_type, stat_field)); } - rewrite_column_expr(Arc::clone(column_expr), column, &stat_column) + let stats_column_expr = + rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)?; + let is_null_expr = + Arc::new(phys_expr::IsNullExpr::new(Arc::<_>::clone(&stats_column_expr))); + Ok((stats_column_expr, is_null_expr)) } - /// rewrite col --> col_min + /// Returns a PhysicalExpr reference to the min column in the statistics + /// corresponding to `column` along with an expression for `{column}_min IS NOT NULL` + /// that should be `OR`ed with any reference to the column such that + /// if any rows in `{column}_min` are null `true` is returned and not `null`. fn min_column_expr( &mut self, column: &phys_expr::Column, column_expr: &Arc, field: &Field, - ) -> Result> { + ) -> Result<(Arc, Arc)> { self.stat_column_expr(column, column_expr, field, StatisticsType::Min) } - /// rewrite col --> col_max + /// Returns a PhysicalExpr reference to the max column in the statistics + /// corresponding to `column` along with an expression for `{column}_max IS NOT NULL` + /// that should be `OR`ed with any reference to the column such that + /// if any rows in `{column}_max` are null `true` is returned and not `null`. fn max_column_expr( &mut self, column: &phys_expr::Column, column_expr: &Arc, field: &Field, - ) -> Result> { + ) -> Result<(Arc, Arc)> { self.stat_column_expr(column, column_expr, field, StatisticsType::Max) } - /// rewrite col --> col_null_count + /// Returns a PhysicalExpr reference to the null count column in the statistics + /// corresponding to `column` along with an expression for `{column}_null_count IS NOT NULL` + /// that should be `OR`ed with any reference to the column such that + /// if any rows in `{column}_null_count` are themselves null `true` is returned and not `null`. fn null_count_column_expr( &mut self, column: &phys_expr::Column, column_expr: &Arc, field: &Field, - ) -> Result> { + ) -> Result<(Arc, Arc)> { self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount) } @@ -891,7 +906,7 @@ impl RequiredColumns { column: &phys_expr::Column, column_expr: &Arc, field: &Field, - ) -> Result> { + ) -> Result<(Arc, Arc)> { self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount) } } @@ -1045,12 +1060,16 @@ impl<'a> PruningExpressionBuilder<'a> { &self.scalar_expr } - fn min_column_expr(&mut self) -> Result> { + fn min_column_expr( + &mut self, + ) -> Result<(Arc, Arc)> { self.required_columns .min_column_expr(&self.column, &self.column_expr, self.field) } - fn max_column_expr(&mut self) -> Result> { + fn max_column_expr( + &mut self, + ) -> Result<(Arc, Arc)> { self.required_columns .max_column_expr(&self.column, &self.column_expr, self.field) } @@ -1061,7 +1080,9 @@ impl<'a> PruningExpressionBuilder<'a> { /// i.e., x > 5 => x_null_count, /// cast(x as int) < 10 => x_null_count, /// try_cast(x as float) < 10.0 => x_null_count - fn null_count_column_expr(&mut self) -> Result> { + fn null_count_column_expr( + &mut self, + ) -> Result<(Arc, Arc)> { // Retune to [`phys_expr::Column`] let column_expr = Arc::new(self.column.clone()) as _; @@ -1081,7 +1102,9 @@ impl<'a> PruningExpressionBuilder<'a> { /// i.e., x > 5 => x_row_count, /// cast(x as int) < 10 => x_row_count, /// try_cast(x as float) < 10.0 => x_row_count - fn row_count_column_expr(&mut self) -> Result> { + fn row_count_column_expr( + &mut self, + ) -> Result<(Arc, Arc)> { // Retune to [`phys_expr::Column`] let column_expr = Arc::new(self.column.clone()) as _; @@ -1261,17 +1284,23 @@ fn build_single_column_expr( if matches!(field.data_type(), &DataType::Boolean) { let col_ref = Arc::new(column.clone()) as _; - let min = required_columns + let (min, min_column_is_null_expr) = required_columns .min_column_expr(column, &col_ref, field) .ok()?; - let max = required_columns + let (max, max_column_is_null_expr) = required_columns .max_column_expr(column, &col_ref, field) .ok()?; + let nulls_expr = Arc::new(phys_expr::BinaryExpr::new( + min_column_is_null_expr, + Operator::Or, + max_column_is_null_expr, + )); + // remember -- we want an expression that is: // TRUE: if there may be rows that match // FALSE: if there are no rows that match - if is_not { + let expr: Option> = if is_not { // The only way we know a column couldn't match is if both the min and max are true // !(min && max) Some(Arc::new(phys_expr::NotExpr::new(Arc::new( @@ -1281,7 +1310,12 @@ fn build_single_column_expr( // the only way we know a column couldn't match is if both the min and max are false // !(!min && !max) --> min || max Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max))) - } + }; + + expr.map(|e| { + Arc::new(phys_expr::BinaryExpr::new(nulls_expr, Operator::Or, e)) + as Arc + }) } else { None } @@ -1306,18 +1340,25 @@ fn build_is_null_column_expr( let null_count_field = &Field::new(field.name(), DataType::UInt64, true); if with_not { - if let Ok(row_count_expr) = + if let Ok((row_count_expr, row_count_is_null_expr)) = required_columns.row_count_column_expr(col, expr, null_count_field) { required_columns .null_count_column_expr(col, expr, null_count_field) - .map(|null_count_column_expr| { + .map(|(null_count_column_expr, null_count_column_is_null_expr)| { + // If either the null_count or the row_count are null we can't know anything, so return true + let any_null_expr = Arc::new(phys_expr::BinaryExpr::new( + row_count_is_null_expr, + Operator::Or, + null_count_column_is_null_expr, + )); // IsNotNull(column) => null_count != row_count - Arc::new(phys_expr::BinaryExpr::new( + let stats_expr = Arc::new(phys_expr::BinaryExpr::new( null_count_column_expr, Operator::NotEq, row_count_expr, - )) as _ + )); + stats_are_null_or(any_null_expr, stats_expr) as _ }) .ok() } else { @@ -1326,13 +1367,18 @@ fn build_is_null_column_expr( } else { required_columns .null_count_column_expr(col, expr, null_count_field) - .map(|null_count_column_expr| { + .map(|(null_count_column_expr, null_count_column_is_null_expr)| { // IsNull(column) => null_count > 0 - Arc::new(phys_expr::BinaryExpr::new( - null_count_column_expr, - Operator::Gt, - Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), - )) as _ + stats_are_null_or( + null_count_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + null_count_column_expr, + Operator::Gt, + Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some( + 0, + )))), + )), + ) as _ }) .ok() } @@ -1536,6 +1582,17 @@ fn build_predicate_expression( .unwrap_or_else(|_| unhandled_hook.handle(expr)) } +fn stats_are_null_or( + stats_column_is_null_expr: Arc, + or_other: Arc, +) -> Arc { + Arc::new(phys_expr::BinaryExpr::new( + stats_column_is_null_expr, + Operator::Or, + or_other, + )) +} + fn build_statistics_expr( expr_builder: &mut PruningExpressionBuilder, ) -> Result> { @@ -1544,39 +1601,55 @@ fn build_statistics_expr( // column != literal => (min, max) = literal => // !(min != literal && max != literal) ==> // min != literal || literal != max - let min_column_expr = expr_builder.min_column_expr()?; - let max_column_expr = expr_builder.max_column_expr()?; + let (min_column_expr, min_column_is_null_expr) = + expr_builder.min_column_expr()?; + let (max_column_expr, max_column_is_null_expr) = + expr_builder.max_column_expr()?; Arc::new(phys_expr::BinaryExpr::new( - Arc::new(phys_expr::BinaryExpr::new( - min_column_expr, - Operator::NotEq, - Arc::clone(expr_builder.scalar_expr()), - )), + stats_are_null_or( + min_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + min_column_expr, + Operator::NotEq, + Arc::clone(expr_builder.scalar_expr()), + )), + ), Operator::Or, - Arc::new(phys_expr::BinaryExpr::new( - Arc::clone(expr_builder.scalar_expr()), - Operator::NotEq, - max_column_expr, - )), + stats_are_null_or( + max_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + Arc::clone(expr_builder.scalar_expr()), + Operator::NotEq, + max_column_expr, + )), + ), )) } Operator::Eq => { // column = literal => (min, max) = literal => min <= literal && literal <= max // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) - let min_column_expr = expr_builder.min_column_expr()?; - let max_column_expr = expr_builder.max_column_expr()?; + let (min_column_expr, min_column_is_null_expr) = + expr_builder.min_column_expr()?; + let (max_column_expr, max_column_is_null_expr) = + expr_builder.max_column_expr()?; Arc::new(phys_expr::BinaryExpr::new( - Arc::new(phys_expr::BinaryExpr::new( - min_column_expr, - Operator::LtEq, - Arc::clone(expr_builder.scalar_expr()), - )), + stats_are_null_or( + min_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + min_column_expr, + Operator::LtEq, + Arc::clone(expr_builder.scalar_expr()), + )), + ), Operator::And, - Arc::new(phys_expr::BinaryExpr::new( - Arc::clone(expr_builder.scalar_expr()), - Operator::LtEq, - max_column_expr, - )), + stats_are_null_or( + max_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + Arc::clone(expr_builder.scalar_expr()), + Operator::LtEq, + max_column_expr, + )), + ), )) } Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| { @@ -1586,35 +1659,55 @@ fn build_statistics_expr( })?, Operator::Gt => { // column > literal => (min, max) > literal => max > literal - Arc::new(phys_expr::BinaryExpr::new( - expr_builder.max_column_expr()?, - Operator::Gt, - Arc::clone(expr_builder.scalar_expr()), - )) + let (max_column_expr, max_column_is_null_expr) = + expr_builder.max_column_expr()?; + stats_are_null_or( + max_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + max_column_expr, + Operator::Gt, + Arc::clone(expr_builder.scalar_expr()), + )), + ) } Operator::GtEq => { // column >= literal => (min, max) >= literal => max >= literal - Arc::new(phys_expr::BinaryExpr::new( - expr_builder.max_column_expr()?, - Operator::GtEq, - Arc::clone(expr_builder.scalar_expr()), - )) + let (min_column_expr, min_column_is_null_expr) = + expr_builder.max_column_expr()?; + stats_are_null_or( + min_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + min_column_expr, + Operator::GtEq, + Arc::clone(expr_builder.scalar_expr()), + )), + ) } Operator::Lt => { // column < literal => (min, max) < literal => min < literal - Arc::new(phys_expr::BinaryExpr::new( - expr_builder.min_column_expr()?, - Operator::Lt, - Arc::clone(expr_builder.scalar_expr()), - )) + let (min_column_expr, min_column_is_null_expr) = + expr_builder.min_column_expr()?; + stats_are_null_or( + min_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + min_column_expr, + Operator::Lt, + Arc::clone(expr_builder.scalar_expr()), + )), + ) } Operator::LtEq => { // column <= literal => (min, max) <= literal => min <= literal - Arc::new(phys_expr::BinaryExpr::new( - expr_builder.min_column_expr()?, - Operator::LtEq, - Arc::clone(expr_builder.scalar_expr()), - )) + let (min_column_expr, min_column_is_null_expr) = + expr_builder.min_column_expr()?; + stats_are_null_or( + min_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + min_column_expr, + Operator::LtEq, + Arc::clone(expr_builder.scalar_expr()), + )), + ) } // other expressions are not supported _ => { @@ -1654,8 +1747,10 @@ fn build_like_match( // TODO Handle ILIKE perhaps by making the min lowercase and max uppercase // this may involve building the physical expressions that call lower() and upper() - let min_column_expr = expr_builder.min_column_expr().ok()?; - let max_column_expr = expr_builder.max_column_expr().ok()?; + let (min_column_expr, min_column_is_null_expr) = + expr_builder.min_column_expr().ok()?; + let (max_column_expr, max_column_is_null_expr) = + expr_builder.max_column_expr().ok()?; let scalar_expr = expr_builder.scalar_expr(); // check that the scalar is a string literal let s = extract_string_literal(scalar_expr)?; @@ -1681,16 +1776,22 @@ fn build_like_match( )))); (Arc::clone(&bound), bound) }; - let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new( - lower_bound, - Operator::LtEq, - Arc::clone(&max_column_expr), - )); - let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new( - Arc::clone(&min_column_expr), - Operator::LtEq, - upper_bound, - )); + let lower_bound_expr = stats_are_null_or( + max_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + lower_bound, + Operator::LtEq, + Arc::clone(&max_column_expr), + )), + ); + let upper_bound_expr = stats_are_null_or( + min_column_is_null_expr, + Arc::new(phys_expr::BinaryExpr::new( + Arc::clone(&min_column_expr), + Operator::LtEq, + upper_bound, + )), + ); let combined = Arc::new(phys_expr::BinaryExpr::new( upper_bound_expr, Operator::And, @@ -1770,15 +1871,25 @@ fn wrap_null_count_check_expr( expr_builder: &mut PruningExpressionBuilder, ) -> Result> { // x_null_count != x_row_count + let (null_count_column_expr, null_count_column_is_null_expr) = + expr_builder.null_count_column_expr()?; + let (row_count_column_expr, row_count_column_is_null_expr) = + expr_builder.row_count_column_expr()?; let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new( - expr_builder.null_count_column_expr()?, + null_count_column_expr, Operator::NotEq, - expr_builder.row_count_column_expr()?, + row_count_column_expr, + )); + + let any_null_expr = Arc::new(phys_expr::BinaryExpr::new( + null_count_column_is_null_expr, + Operator::Or, + row_count_column_is_null_expr, )); // (x_null_count != x_row_count) AND () Ok(Arc::new(phys_expr::BinaryExpr::new( - not_when_null_count_eq_row_count, + stats_are_null_or(any_null_expr, not_when_null_count_eq_row_count), Operator::And, statistics_expr, ))) @@ -1801,7 +1912,7 @@ mod tests { use datafusion_common::assert_batches_eq; use datafusion_expr::{col, lit}; - use arrow::array::Decimal128Array; + use arrow::array::{record_batch, Decimal128Array}; use arrow::{ array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array}, datatypes::TimeUnit, @@ -2475,7 +2586,7 @@ mod tests { fn row_group_predicate_eq() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); let expected_expr = - "c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1"; + "(c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 <= 1) AND (c1_max@1 IS NULL OR 1 <= c1_max@1)"; // test column on the left let expr = col("c1").eq(lit(1)); @@ -2496,7 +2607,7 @@ mod tests { fn row_group_predicate_not_eq() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); let expected_expr = - "c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)"; + "(c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 != 1 OR c1_max@1 IS NULL OR 1 != c1_max@1)"; // test column on the left let expr = col("c1").not_eq(lit(1)); @@ -2516,7 +2627,7 @@ mod tests { #[test] fn row_group_predicate_gt() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_max@0 > 1"; + let expected_expr = "(c1_null_count@1 IS NULL OR c1_row_count@2 IS NULL OR c1_null_count@1 != c1_row_count@2) AND (c1_max@0 IS NULL OR c1_max@0 > 1)"; // test column on the left let expr = col("c1").gt(lit(1)); @@ -2536,7 +2647,7 @@ mod tests { #[test] fn row_group_predicate_gt_eq() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_max@0 >= 1"; + let expected_expr = "(c1_null_count@1 IS NULL OR c1_row_count@2 IS NULL OR c1_null_count@1 != c1_row_count@2) AND (c1_max@0 IS NULL OR c1_max@0 >= 1)"; // test column on the left let expr = col("c1").gt_eq(lit(1)); @@ -2555,7 +2666,7 @@ mod tests { #[test] fn row_group_predicate_lt() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 < 1"; + let expected_expr = "(c1_null_count@1 IS NULL OR c1_row_count@2 IS NULL OR c1_null_count@1 != c1_row_count@2) AND (c1_min@0 IS NULL OR c1_min@0 < 1)"; // test column on the left let expr = col("c1").lt(lit(1)); @@ -2575,7 +2686,7 @@ mod tests { #[test] fn row_group_predicate_lt_eq() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 <= 1"; + let expected_expr = "(c1_null_count@1 IS NULL OR c1_row_count@2 IS NULL OR c1_null_count@1 != c1_row_count@2) AND (c1_min@0 IS NULL OR c1_min@0 <= 1)"; // test column on the left let expr = col("c1").lt_eq(lit(1)); @@ -2600,7 +2711,7 @@ mod tests { ]); // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3"))); - let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 < 1"; + let expected_expr = "(c1_null_count@1 IS NULL OR c1_row_count@2 IS NULL OR c1_null_count@1 != c1_row_count@2) AND (c1_min@0 IS NULL OR c1_min@0 < 1)"; let predicate_expr = test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); @@ -2640,7 +2751,8 @@ mod tests { #[test] fn row_group_predicate_not_bool() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]); - let expected_expr = "NOT c1_min@0 AND c1_max@1"; + let expected_expr = + "c1_min@0 IS NULL OR c1_max@1 IS NULL OR NOT c1_min@0 AND c1_max@1"; let expr = col("c1").not(); let predicate_expr = @@ -2653,7 +2765,8 @@ mod tests { #[test] fn row_group_predicate_bool() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]); - let expected_expr = "c1_min@0 OR c1_max@1"; + let expected_expr = + "c1_min@0 IS NULL OR c1_max@1 IS NULL OR c1_min@0 OR c1_max@1"; let expr = col("c1"); let predicate_expr = @@ -2666,7 +2779,7 @@ mod tests { #[test] fn row_group_predicate_lt_bool() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]); - let expected_expr = "c1_null_count@1 != c1_row_count@2 AND c1_min@0 < true"; + let expected_expr = "(c1_null_count@1 IS NULL OR c1_row_count@2 IS NULL OR c1_null_count@1 != c1_row_count@2) AND (c1_min@0 IS NULL OR c1_min@0 < true)"; // DF doesn't support arithmetic on boolean columns so // this predicate will error when evaluated @@ -2689,12 +2802,7 @@ mod tests { let expr = col("c1") .lt(lit(1)) .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3)))); - let expected_expr = "c1_null_count@1 != c1_row_count@2 \ - AND c1_min@0 < 1 AND (\ - c2_null_count@5 != c2_row_count@6 \ - AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR \ - c2_null_count@5 != c2_row_count@6 AND c2_min@3 <= 3 AND 3 <= c2_max@4\ - )"; + let expected_expr = "(c1_null_count@1 IS NULL OR c1_row_count@2 IS NULL OR c1_null_count@1 != c1_row_count@2) AND (c1_min@0 IS NULL OR c1_min@0 < 1) AND ((c2_null_count@5 IS NULL OR c2_row_count@6 IS NULL OR c2_null_count@5 != c2_row_count@6) AND (c2_min@3 IS NULL OR c2_min@3 <= 2) AND (c2_max@4 IS NULL OR 2 <= c2_max@4) OR (c2_null_count@5 IS NULL OR c2_row_count@6 IS NULL OR c2_null_count@5 != c2_row_count@6) AND (c2_min@3 IS NULL OR c2_min@3 <= 3) AND (c2_max@4 IS NULL OR 3 <= c2_max@4))"; let predicate_expr = test_build_predicate_expression(&expr, &schema, &mut required_columns); assert_eq!(predicate_expr.to_string(), expected_expr); @@ -2785,7 +2893,7 @@ mod tests { vec![lit(1), lit(2), lit(3)], false, )); - let expected_expr = "c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1"; + let expected_expr = "(c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 <= 1) AND (c1_max@1 IS NULL OR 1 <= c1_max@1) OR (c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 <= 2) AND (c1_max@1 IS NULL OR 2 <= c1_max@1) OR (c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 <= 3) AND (c1_max@1 IS NULL OR 3 <= c1_max@1)"; let predicate_expr = test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); @@ -2821,7 +2929,7 @@ mod tests { vec![lit(1), lit(2), lit(3)], true, )); - let expected_expr = "c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != c1_row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)"; + let expected_expr = "(c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 != 1 OR c1_max@1 IS NULL OR 1 != c1_max@1) AND (c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 != 2 OR c1_max@1 IS NULL OR 2 != c1_max@1) AND (c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 != 3 OR c1_max@1 IS NULL OR 3 != c1_max@1)"; let predicate_expr = test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); @@ -2867,7 +2975,7 @@ mod tests { // test c1 in(1, 2) and c2 BETWEEN 4 AND 5 let expr3 = expr1.and(expr2); - let expected_expr = "(c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != c1_row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != c2_row_count@6 AND c2_max@4 >= 4 AND c2_null_count@5 != c2_row_count@6 AND c2_min@7 <= 5"; + let expected_expr = "((c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 <= 1) AND (c1_max@1 IS NULL OR 1 <= c1_max@1) OR (c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (c1_min@0 IS NULL OR c1_min@0 <= 2) AND (c1_max@1 IS NULL OR 2 <= c1_max@1)) AND (c2_null_count@5 IS NULL OR c2_row_count@6 IS NULL OR c2_null_count@5 != c2_row_count@6) AND (c2_max@4 IS NULL OR c2_max@4 >= 4) AND (c2_null_count@5 IS NULL OR c2_row_count@6 IS NULL OR c2_null_count@5 != c2_row_count@6) AND (c2_min@7 IS NULL OR c2_min@7 <= 5)"; let predicate_expr = test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); @@ -2894,7 +3002,7 @@ mod tests { #[test] fn row_group_predicate_cast() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)"; + let expected_expr = "(c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (CAST(c1_min@0 AS Int64) IS NULL OR CAST(c1_min@0 AS Int64) <= 1) AND (CAST(c1_max@1 AS Int64) IS NULL OR 1 <= CAST(c1_max@1 AS Int64))"; // test cast(c1 as int64) = 1 // test column on the left @@ -2910,7 +3018,7 @@ mod tests { assert_eq!(predicate_expr.to_string(), expected_expr); let expected_expr = - "c1_null_count@1 != c1_row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1"; + "(c1_null_count@1 IS NULL OR c1_row_count@2 IS NULL OR c1_null_count@1 != c1_row_count@2) AND (TRY_CAST(c1_max@0 AS Int64) IS NULL OR TRY_CAST(c1_max@0 AS Int64) > 1)"; // test column on the left let expr = @@ -2942,7 +3050,7 @@ mod tests { ], false, )); - let expected_expr = "c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != c1_row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)"; + let expected_expr = "(c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (CAST(c1_min@0 AS Int64) IS NULL OR CAST(c1_min@0 AS Int64) <= 1) AND (CAST(c1_max@1 AS Int64) IS NULL OR 1 <= CAST(c1_max@1 AS Int64)) OR (c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (CAST(c1_min@0 AS Int64) IS NULL OR CAST(c1_min@0 AS Int64) <= 2) AND (CAST(c1_max@1 AS Int64) IS NULL OR 2 <= CAST(c1_max@1 AS Int64)) OR (c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (CAST(c1_min@0 AS Int64) IS NULL OR CAST(c1_min@0 AS Int64) <= 3) AND (CAST(c1_max@1 AS Int64) IS NULL OR 3 <= CAST(c1_max@1 AS Int64))"; let predicate_expr = test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); @@ -2956,7 +3064,7 @@ mod tests { ], true, )); - let expected_expr = "c1_null_count@2 != c1_row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != c1_row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != c1_row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))"; + let expected_expr = "(c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (CAST(c1_min@0 AS Int64) IS NULL OR CAST(c1_min@0 AS Int64) != 1 OR CAST(c1_max@1 AS Int64) IS NULL OR 1 != CAST(c1_max@1 AS Int64)) AND (c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (CAST(c1_min@0 AS Int64) IS NULL OR CAST(c1_min@0 AS Int64) != 2 OR CAST(c1_max@1 AS Int64) IS NULL OR 2 != CAST(c1_max@1 AS Int64)) AND (c1_null_count@2 IS NULL OR c1_row_count@3 IS NULL OR c1_null_count@2 != c1_row_count@3) AND (CAST(c1_min@0 AS Int64) IS NULL OR CAST(c1_min@0 AS Int64) != 3 OR CAST(c1_max@1 AS Int64) IS NULL OR 3 != CAST(c1_max@1 AS Int64))"; let predicate_expr = test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); @@ -4168,6 +4276,51 @@ mod tests { // TODO: add other negative test for other case and op } + #[tokio::test] + async fn test_rewrite_expr_to_prunable_with_nulls() { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + + let rewriter = PredicateRewriter::new(); + + let transform_expr = |expr| { + let expr = logical2physical(&expr, &schema); + rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema) + }; + + // transform an arbitrary valid expression that we know is handled + let known_expression = col("a").eq(lit(12)); + let known_expression_transformed = transform_expr(known_expression.clone()); + + // construct a set of statistics containers with nulls + let statistics_batch = record_batch!( + ("a_min", Int32, [Some(1), Some(13), None]), + ("a_max", Int32, [Some(2), Some(15), Some(15)]), + ("a_row_count", UInt64, [Some(2), Some(2), Some(2)]), + ("a_null_count", UInt64, [Some(0), Some(0), Some(0)]) + ) + .unwrap(); + + let res = known_expression_transformed + .evaluate(&statistics_batch) + .unwrap(); + + let ColumnarValue::Array(matches) = res else { + panic!("should have returned an array") + }; + + let matches = matches + .as_any() + .downcast_ref::() + .expect("matches should be a boolean array"); + let values = matches.iter().collect::>(); + // the first container can be pruned because the max is lower than the value + // the second container can be pruned because the min is higher than the value + // the third container must be scanned because although the max is higher than the value we know nothing about the min + let expected = vec![Some(false), Some(false), Some(true)]; + + assert_eq!(values, expected); + } + #[test] fn prune_with_contained_one_column() { let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)])); diff --git a/parquet-testing b/parquet-testing index f4d7ed772a62a..e45cd23f784aa 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit f4d7ed772a62a95111db50fbcad2460833e8c882 +Subproject commit e45cd23f784aab3d6bf0701f8f4e621469ed3be7 diff --git a/testing b/testing index d2a1371230349..98fceecd024dc 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit d2a13712303498963395318a4eb42872e66aead7 +Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4