Skip to content

Commit

Permalink
[SPARK-48431][SQL] Do not forward predicates on collated columns to f…
Browse files Browse the repository at this point in the history
…ile readers

### What changes were proposed in this pull request?

[SPARK-47657](https://issues.apache.org/jira/browse/SPARK-47657) allows to push filters on collated columns to file sources that support it. If such filters are pushed to file sources, those file sources must not push those filters to the actual file readers (i.e. parquet or csv readers), because there is no guarantee that those support collations.

In this PR we are widening filters on collations to be AlwaysTrue when we translate filters for file sources.

### Why are the changes needed?

Without this, no file source can implement filter pushdown

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests. No component tests are possible because there is no file source with filter pushdown yet.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46760 from olaky/filter-translation-for-collations.

Authored-by: Ole Sasse <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
olaky authored and cloud-fan committed May 29, 2024
1 parent cf47293 commit a3b8420
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{PartitioningUtils => CatalystPartitioningUtils}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -595,6 +595,16 @@ object DataSourceStrategy
translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]],
nestedPredicatePushdownEnabled: Boolean)
: Option[Filter] = {

def translateAndRecordLeafNodeFilter(filter: Expression): Option[Filter] = {
val translatedFilter =
translateLeafNodeFilter(filter, PushableColumn(nestedPredicatePushdownEnabled))
if (translatedFilter.isDefined && translatedFilterToExpr.isDefined) {
translatedFilterToExpr.get(translatedFilter.get) = predicate
}
translatedFilter
}

predicate match {
case expressions.And(left, right) =>
// See SPARK-12218 for detailed discussion
Expand All @@ -621,16 +631,25 @@ object DataSourceStrategy
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
} yield sources.Or(leftFilter, rightFilter)

case notNull @ expressions.IsNotNull(_: AttributeReference) =>
// Not null filters on attribute references can always be pushed, also for collated columns.
translateAndRecordLeafNodeFilter(notNull)

case isNull @ expressions.IsNull(_: AttributeReference) =>
// Is null filters on attribute references can always be pushed, also for collated columns.
translateAndRecordLeafNodeFilter(isNull)

case p if p.references.exists(ref => SchemaUtils.hasNonUTF8BinaryCollation(ref.dataType)) =>
// The filter cannot be pushed and we widen it to be AlwaysTrue(). This is only valid if
// the result of the filter is not negated by a Not expression it is wrapped in.
translateAndRecordLeafNodeFilter(Literal.TrueLiteral)

case expressions.Not(child) =>
translateFilterWithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled)
.map(sources.Not)

case other =>
val filter = translateLeafNodeFilter(other, PushableColumn(nestedPredicatePushdownEnabled))
if (filter.isDefined && translatedFilterToExpr.isDefined) {
translatedFilterToExpr.get(filter.get) = predicate
}
filter
translateAndRecordLeafNodeFilter(other)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,10 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {

test("SPARK-41636: selectFilters returns predicates in deterministic order") {

val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2),
EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 6))
val idColAttribute = AttributeReference("id", IntegerType)()
val predicates = Seq(EqualTo(idColAttribute, 1), EqualTo(idColAttribute, 2),
EqualTo(idColAttribute, 3), EqualTo(idColAttribute, 4), EqualTo(idColAttribute, 5),
EqualTo(idColAttribute, 6))

val (unhandledPredicates, pushedFilters, handledFilters) =
DataSourceStrategy.selectFilters(FakeRelation(), predicates)
Expand All @@ -338,4 +340,53 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
})
assert(handledFilters.isEmpty)
}

test("SPARK-48431: Push filters on columns with UTF8_BINARY collation") {
val colAttr = $"col".string("UTF8_BINARY")
testTranslateFilter(EqualTo(colAttr, Literal("value")), Some(sources.EqualTo("col", "value")))
testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))),
Some(sources.Not(sources.EqualTo("col", "value"))))
testTranslateFilter(LessThan(colAttr, Literal("value")),
Some(sources.LessThan("col", "value")))
testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.LessThan("col", "value")))
testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")),
Some(sources.LessThanOrEqual("col", "value")))
testTranslateFilter(GreaterThan(colAttr, Literal("value")),
Some(sources.GreaterThan("col", "value")))
testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")),
Some(sources.GreaterThanOrEqual("col", "value")))
testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col")))
}

for (collation <- Seq("UTF8_BINARY_LCASE", "UNICODE")) {
test(s"SPARK-48431: Filter pushdown on columns with $collation collation") {
val colAttr = $"col".string(collation)

// No pushdown for all comparison based filters.
testTranslateFilter(EqualTo(colAttr, Literal("value")), Some(sources.AlwaysTrue))
testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.AlwaysTrue))
testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.AlwaysTrue))
testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")), Some(sources.AlwaysTrue))
testTranslateFilter(GreaterThan(colAttr, Literal("value")), Some(sources.AlwaysTrue))
testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")), Some(sources.AlwaysTrue))

// Allow pushdown of Is(Not)Null filter.
testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col")))
testTranslateFilter(IsNull(colAttr), Some(sources.IsNull("col")))

// Top level filter splitting at And and Or.
testTranslateFilter(And(EqualTo(colAttr, Literal("value")), IsNotNull(colAttr)),
Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col"))))
testTranslateFilter(Or(EqualTo(colAttr, Literal("value")), IsNotNull(colAttr)),
Some(sources.Or(sources.AlwaysTrue, sources.IsNotNull("col"))))

// Different cases involving Not.
testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))), Some(sources.AlwaysTrue))
testTranslateFilter(And(Not(EqualTo(colAttr, Literal("value"))), IsNotNull(colAttr)),
Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col"))))
// This filter would work, but we want to keep the translation logic simple.
testTranslateFilter(And(EqualTo(colAttr, Literal("value")), Not(IsNotNull(colAttr))),
Some(sources.And(sources.AlwaysTrue, sources.AlwaysTrue)))
}
}
}

0 comments on commit a3b8420

Please sign in to comment.