Skip to content

Commit

Permalink
[SPARK-47633][SQL] Include right-side plan output in `LateralJoin#all…
Browse files Browse the repository at this point in the history
…Attributes` for more consistent canonicalization

### What changes were proposed in this pull request?

Modify `LateralJoin` to include right-side plan output in `allAttributes`.

### Why are the changes needed?

In the following example, the view v1 is cached, but a query of v1 does not use the cache:
```
CREATE or REPLACE TEMP VIEW t1(c1, c2) AS VALUES (0, 1), (1, 2);
CREATE or REPLACE TEMP VIEW t2(c1, c2) AS VALUES (0, 1), (1, 2);

create or replace temp view v1 as
select *
from t1
join lateral (
  select c1 as a, c2 as b
  from t2)
on c1 = a;

cache table v1;

explain select * from v1;
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [c1#180], [a#173], Inner, BuildRight, false
   :- LocalTableScan [c1#180, c2#181]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=113]
      +- LocalTableScan [a#173, b#174]
```

The canonicalized version of the `LateralJoin` node is not consistent when there is a join condition. For example, for the above query, the join condition is canonicalized as follows:
```
Before canonicalization: Some((c1#174 = a#167))
After canonicalization:  Some((none#0 = none#167))
```
You can see that the `exprId` for the second operand of `EqualTo` is not normalized (it remains 167). That's because the attribute `a` from the right-side plan is not included `allAttributes`.

This PR adds right-side attributes to `allAttributes` so that references to right-side attributes in the join condition are normalized during canonicalization.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45763 from bersprockets/lj_canonical_issue.

Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
bersprockets authored and cloud-fan committed Apr 23, 2024
1 parent 876c2cf commit eba6364
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,8 @@ case class LateralJoin(
joinType: JoinType,
condition: Option[Expression]) extends UnaryNode {

override lazy val allAttributes: AttributeSeq = left.output ++ right.plan.output

require(Seq(Inner, LeftOuter, Cross).contains(joinType),
s"Unsupported lateral join type $joinType")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1770,4 +1770,23 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
withSQLConf(SQLConf.DEFAULT_CACHE_STORAGE_LEVEL.key -> "DISK") {}
}
}

test("SPARK-47633: Cache hit for lateral join with join condition") {
withTempView("t", "q1") {
sql("create or replace temp view t(c1, c2) as values (0, 1), (1, 2)")
val query = """select *
|from t
|join lateral (
| select c1 as a, c2 as b
| from t)
|on c1 = a;
|""".stripMargin
sql(s"cache table q1 as $query")
val df = sql(query)
checkAnswer(df,
Row(0, 1, 0, 1) :: Row(1, 2, 1, 2) :: Nil)
assert(getNumInMemoryRelations(df) == 1)
}

}
}

0 comments on commit eba6364

Please sign in to comment.