Skip to content

Commit

Permalink
[SPARK-46707][SQL][FOLLOWUP] Push down throwable predicate through ag…
Browse files Browse the repository at this point in the history
…gregates

### What changes were proposed in this pull request?
Push down throwable predicate through aggregates and add ut for "can't push down nondeterministic filter through aggregate".

### Why are the changes needed?
If we can push down a filter through Aggregate, it means the filter only references the grouping keys. The Aggregate operator can't reduce grouping keys so the filter won't see any new data after pushing down. So push down throwable filter through aggregate does not affect exception thrown.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#44975 from zml1206/SPARK-46707-FOLLOWUP.

Authored-by: zml1206 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
zml1206 authored and cloud-fan committed May 14, 2024
1 parent c098262 commit 6766c39
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,10 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
val aliasMap = getAliasMap(project)
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))

// We can push down deterministic predicate through Aggregate, including throwable predicate.
// If we can push down a filter through Aggregate, it means the filter only references the
// grouping keys or constants. The Aggregate operator can't reduce distinct values of grouping
// keys so the filter won't see any new data after push down.
case filter @ Filter(condition, aggregate: Aggregate)
if aggregate.aggregateExpressions.forall(_.deterministic)
&& aggregate.groupingExpressions.nonEmpty =>
Expand All @@ -1777,8 +1781,8 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
// attributes produced by the aggregate operator's child operator.
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
cond.deterministic && !cond.throwable &&
cond.references.nonEmpty && replaced.references.subsetOf(aggregate.child.outputSet)
cond.deterministic && cond.references.nonEmpty &&
replaced.references.subsetOf(aggregate.child.outputSet)
}

if (pushDown.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("Can't push down nondeterministic filter through aggregate") {
val originalQuery = testRelation
.groupBy($"a")($"a", count($"b") as "c")
.where(Rand(10) > $"a")
.analyze

val optimized = Optimize.execute(originalQuery)

comparePlans(optimized, originalQuery)
}

test("filters: combines filters") {
val originalQuery = testRelation
.select($"a")
Expand Down Expand Up @@ -1483,14 +1494,16 @@ class FilterPushdownSuite extends PlanTest {
test("SPARK-46707: push down predicate with sequence (without step) through aggregates") {
val x = testRelation.subquery("x")

// do not push down when sequence has step param
// Always push down sequence as it's deterministic
val queryWithStep = x.groupBy($"x.a", $"x.b")($"x.a", $"x.b")
.where(IsNotNull(Sequence($"x.a", $"x.b", Some(Literal(1)))))
.analyze
val optimizedQueryWithStep = Optimize.execute(queryWithStep)
comparePlans(optimizedQueryWithStep, queryWithStep)
val correctAnswerWithStep = x.where(IsNotNull(Sequence($"x.a", $"x.b", Some(Literal(1)))))
.groupBy($"x.a", $"x.b")($"x.a", $"x.b")
.analyze
comparePlans(optimizedQueryWithStep, correctAnswerWithStep)

// push down when sequence does not have step param
val queryWithoutStep = x.groupBy($"x.a", $"x.b")($"x.a", $"x.b")
.where(IsNotNull(Sequence($"x.a", $"x.b", None)))
.analyze
Expand Down

0 comments on commit 6766c39

Please sign in to comment.