diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index 085c0b22524c9..8a544de7567e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.adaptive.LogicalQueryStage import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -81,13 +80,4 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo override def doCanonicalize(): SparkPlan = { this.copy(logical = LocalRelation(logical.output).canonicalized) } - - override protected[sql] def cleanupResources(): Unit = { - logical.foreach { - case LogicalQueryStage(_, physical) => - physical.cleanupResources() - case _ => - } - super.cleanupResources() - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index fc54e7ecd46df..938a96a86b015 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1608,6 +1608,43 @@ class AdaptiveQueryExecSuite } } + test("SPARK-49460: NPE in EmptyRelationExec.cleanupResources") { + withTable("t1left", "t1right", "t1empty") { + spark.sql("create table t1left (a int, b int);") + spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") + spark.sql("create table t1right (a int, b int);") + spark.sql("create table t1empty (a int, b int);") + spark.sql("insert into t1right values (2,20), (4, 40);") + + spark.sql(""" + |with leftT as ( + | with erp as ( + | select + | * + | from + | t1left + | join t1empty on t1left.a = t1empty.a + | join t1right on t1left.a = t1right.a + | ) + | SELECT + | CASE + | WHEN COUNT(*) = 0 THEN 4 + | ELSE NULL + | END AS a + | FROM + | erp + | HAVING + | COUNT(*) = 0 + |) + |select + | /*+ MERGEJOIN(t1right) */ + | * + |from + | leftT + | join t1right on leftT.a = t1right.a""".stripMargin).collect() + } + } + test("SPARK-35585: Support propagate empty relation through project/filter") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {