From a3b8420e5eecc3ce33528bc7c73967a64b1f670e Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Wed, 29 May 2024 13:52:33 -0700 Subject: [PATCH] [SPARK-48431][SQL] Do not forward predicates on collated columns to file readers ### What changes were proposed in this pull request? [SPARK-47657](https://issues.apache.org/jira/browse/SPARK-47657) allows to push filters on collated columns to file sources that support it. If such filters are pushed to file sources, those file sources must not push those filters to the actual file readers (i.e. parquet or csv readers), because there is no guarantee that those support collations. In this PR we are widening filters on collations to be AlwaysTrue when we translate filters for file sources. ### Why are the changes needed? Without this, no file source can implement filter pushdown ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests. No component tests are possible because there is no file source with filter pushdown yet. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46760 from olaky/filter-translation-for-collations. Authored-by: Ole Sasse Signed-off-by: Wenchen Fan --- .../datasources/DataSourceStrategy.scala | 31 +++++++++-- .../datasources/DataSourceStrategySuite.scala | 55 ++++++++++++++++++- 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 22b60caf26693..7cda347ce581b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -54,7 +54,7 @@ import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.{PartitioningUtils => CatalystPartitioningUtils} -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} import org.apache.spark.unsafe.types.UTF8String /** @@ -595,6 +595,16 @@ object DataSourceStrategy translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]], nestedPredicatePushdownEnabled: Boolean) : Option[Filter] = { + + def translateAndRecordLeafNodeFilter(filter: Expression): Option[Filter] = { + val translatedFilter = + translateLeafNodeFilter(filter, PushableColumn(nestedPredicatePushdownEnabled)) + if (translatedFilter.isDefined && translatedFilterToExpr.isDefined) { + translatedFilterToExpr.get(translatedFilter.get) = predicate + } + translatedFilter + } + predicate match { case expressions.And(left, right) => // See SPARK-12218 for detailed discussion @@ -621,16 +631,25 @@ object DataSourceStrategy right, translatedFilterToExpr, nestedPredicatePushdownEnabled) } yield sources.Or(leftFilter, rightFilter) + case notNull @ expressions.IsNotNull(_: AttributeReference) => + // Not null filters on attribute references can always be pushed, also for collated columns. + translateAndRecordLeafNodeFilter(notNull) + + case isNull @ expressions.IsNull(_: AttributeReference) => + // Is null filters on attribute references can always be pushed, also for collated columns. + translateAndRecordLeafNodeFilter(isNull) + + case p if p.references.exists(ref => SchemaUtils.hasNonUTF8BinaryCollation(ref.dataType)) => + // The filter cannot be pushed and we widen it to be AlwaysTrue(). This is only valid if + // the result of the filter is not negated by a Not expression it is wrapped in. + translateAndRecordLeafNodeFilter(Literal.TrueLiteral) + case expressions.Not(child) => translateFilterWithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled) .map(sources.Not) case other => - val filter = translateLeafNodeFilter(other, PushableColumn(nestedPredicatePushdownEnabled)) - if (filter.isDefined && translatedFilterToExpr.isDefined) { - translatedFilterToExpr.get(filter.get) = predicate - } - filter + translateAndRecordLeafNodeFilter(other) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index 2b9ec97bace1e..3c09dee990ebc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -327,8 +327,10 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { test("SPARK-41636: selectFilters returns predicates in deterministic order") { - val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2), - EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 6)) + val idColAttribute = AttributeReference("id", IntegerType)() + val predicates = Seq(EqualTo(idColAttribute, 1), EqualTo(idColAttribute, 2), + EqualTo(idColAttribute, 3), EqualTo(idColAttribute, 4), EqualTo(idColAttribute, 5), + EqualTo(idColAttribute, 6)) val (unhandledPredicates, pushedFilters, handledFilters) = DataSourceStrategy.selectFilters(FakeRelation(), predicates) @@ -338,4 +340,53 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { }) assert(handledFilters.isEmpty) } + + test("SPARK-48431: Push filters on columns with UTF8_BINARY collation") { + val colAttr = $"col".string("UTF8_BINARY") + testTranslateFilter(EqualTo(colAttr, Literal("value")), Some(sources.EqualTo("col", "value"))) + testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))), + Some(sources.Not(sources.EqualTo("col", "value")))) + testTranslateFilter(LessThan(colAttr, Literal("value")), + Some(sources.LessThan("col", "value"))) + testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.LessThan("col", "value"))) + testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")), + Some(sources.LessThanOrEqual("col", "value"))) + testTranslateFilter(GreaterThan(colAttr, Literal("value")), + Some(sources.GreaterThan("col", "value"))) + testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")), + Some(sources.GreaterThanOrEqual("col", "value"))) + testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col"))) + } + + for (collation <- Seq("UTF8_BINARY_LCASE", "UNICODE")) { + test(s"SPARK-48431: Filter pushdown on columns with $collation collation") { + val colAttr = $"col".string(collation) + + // No pushdown for all comparison based filters. + testTranslateFilter(EqualTo(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + testTranslateFilter(GreaterThan(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + + // Allow pushdown of Is(Not)Null filter. + testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col"))) + testTranslateFilter(IsNull(colAttr), Some(sources.IsNull("col"))) + + // Top level filter splitting at And and Or. + testTranslateFilter(And(EqualTo(colAttr, Literal("value")), IsNotNull(colAttr)), + Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col")))) + testTranslateFilter(Or(EqualTo(colAttr, Literal("value")), IsNotNull(colAttr)), + Some(sources.Or(sources.AlwaysTrue, sources.IsNotNull("col")))) + + // Different cases involving Not. + testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))), Some(sources.AlwaysTrue)) + testTranslateFilter(And(Not(EqualTo(colAttr, Literal("value"))), IsNotNull(colAttr)), + Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col")))) + // This filter would work, but we want to keep the translation logic simple. + testTranslateFilter(And(EqualTo(colAttr, Literal("value")), Not(IsNotNull(colAttr))), + Some(sources.And(sources.AlwaysTrue, sources.AlwaysTrue))) + } + } }