From 6ee53da5f356232e2026a67c8408de38c625038e Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 20 Nov 2024 21:03:14 +0800 Subject: [PATCH] [SPARK-50258][SQL] Fix output column order changed issue after AQE optimization ### What changes were proposed in this pull request? The root cause of this issue is the planner turns `Limit` + `Sort` into `TakeOrderedAndProjectExec` which adds an additional `Project` that does not exist in the logical plan. We shouldn't use this additional `Project` to optimize out other `Project`s, otherwise when AQE turns physical plan back to logical plan, we lose the `Project` and may mess up the output column order. This PR makes it does not remove redundant projects if AEQ is enabled and projectList is the same as child output in `TakeOrderedAndProjectExec`. ### Why are the changes needed? Fix potential data issue and avoid Spark Driver crash: ``` # more hs_err_pid193136.log # # A fatal error has been detected by the Java Runtime Environment: # # SIGSEGV (0xb) at pc=0x00007f9d14841bc0, pid=193136, tid=223205 # # JRE version: OpenJDK Runtime Environment Zulu17.36+18-SA (17.0.4.1+1) (build 17.0.4.1+1-LTS) # Java VM: OpenJDK 64-Bit Server VM Zulu17.36+18-SA (17.0.4.1+1-LTS, mixed mode, sharing, tiered, compressed class ptrs, g1 gc, linux-amd64) # Problematic frame: # v ~StubRoutines::jint_disjoint_arraycopy_avx3 # # Core dump will be written. Default location: /apache/spark-release/3.5.0-20241105/spark/core.193136 ... ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48789 from wangyum/SPARK-50258. Authored-by: Yuming Wang Signed-off-by: Wenchen Fan --- .../execution/RemoveRedundantProjects.scala | 8 ++++++- .../adaptive/AdaptiveQueryExecSuite.scala | 23 ++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala index 8f4ce0f49a89a..69230fd7b3343 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala @@ -58,7 +58,13 @@ object RemoveRedundantProjects extends Rule[SparkPlan] { p.mapChildren(removeProject(_, false)) } case op: TakeOrderedAndProjectExec => - op.mapChildren(removeProject(_, false)) + // The planner turns Limit + Sort into TakeOrderedAndProjectExec which adds an additional + // Project that does not exist in the logical plan. We shouldn't use this additional Project + // to optimize out other Projects, otherwise when AQE turns physical plan back to + // logical plan, we lose the Project and may mess up the output column order. So column + // ordering is required if AQE is enabled and projectList is the same as child output. + val requireColOrdering = conf.adaptiveExecutionEnabled && op.projectList == op.child.output + op.mapChildren(removeProject(_, requireColOrdering)) case a: BaseAggregateExec => // BaseAggregateExec require specific column ordering when mode is Final or PartialMerge. // See comments in BaseAggregateExec inputAttributes method. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 1df045764d8b9..ad28fd5176d99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, EmptyRelationExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec} +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike} import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ENSURE_RE import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SparkListenerSQLExecutionStart} +import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode @@ -3086,6 +3087,26 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-50258: Fix output column order changed issue after AQE optimization") { + withTable("t") { + sql("SELECT course, year, earnings FROM courseSales").write.saveAsTable("t") + val df = sql( + """ + |SELECT year, course, earnings, SUM(earnings) OVER (ORDER BY year, course) AS balance + |FROM t ORDER BY year, course + |LIMIT 100 + |""".stripMargin) + df.collect() + + val plan = df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + assert(plan.inputPlan.isInstanceOf[TakeOrderedAndProjectExec]) + assert(plan.finalPhysicalPlan.isInstanceOf[WindowExec]) + plan.inputPlan.output.zip(plan.finalPhysicalPlan.output).foreach { case (o1, o2) => + assert(o1.semanticEquals(o2), "Different output column order after AQE optimization") + } + } + } } /**