Skip to content

Commit

Permalink
[SPARK-47689][SQL] Do not wrap query execution error during data writing
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

It's quite confusing to report `TASK_WRITE_FAILED` error when the error was caused by input query execution. This PR updates the error wrapping code to not wrap with `TASK_WRITE_FAILED` if the error was from input query execution.

### Why are the changes needed?

better error reporting

### Does this PR introduce _any_ user-facing change?

yes, now people won't see `TASK_WRITE_FAILED` error if the error was from input query.

### How was this patch tested?

updated tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#45797 from cloud-fan/write-error.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
cloud-fan committed Apr 2, 2024
1 parent b1b1fde commit ba98b7a
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.{NextIterator, SerializableConfiguration, Utils}
import org.apache.spark.util.ArrayImplicits._


Expand Down Expand Up @@ -401,9 +401,10 @@ object FileFormatWriter extends Logging {
}

try {
val queryFailureCapturedIterator = new QueryFailureCapturedIterator(iterator)
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
// Execute the task to write rows out and commit the task.
dataWriter.writeWithIterator(iterator)
dataWriter.writeWithIterator(queryFailureCapturedIterator)
dataWriter.commit()
})(catchBlock = {
// If there is an error, abort the task
Expand All @@ -413,6 +414,8 @@ object FileFormatWriter extends Logging {
dataWriter.close()
})
} catch {
case e: QueryFailureDuringWrite =>
throw e.queryFailure
case e: FetchFailedException =>
throw e
case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput =>
Expand Down Expand Up @@ -452,3 +455,25 @@ object FileFormatWriter extends Logging {
}
}
}

// A exception wrapper to indicate that the error was thrown when executing the query, not writing
// the data
private class QueryFailureDuringWrite(val queryFailure: Throwable) extends Throwable

// An iterator wrapper to rethrow any error from the given iterator with `QueryFailureDuringWrite`.
private class QueryFailureCapturedIterator(data: Iterator[InternalRow])
extends NextIterator[InternalRow] {

override protected def getNext(): InternalRow = try {
if (data.hasNext) {
data.next()
} else {
finished = true
null
}
} catch {
case t: Throwable => throw new QueryFailureDuringWrite(t)
}

override protected def close(): Unit = {}
}
Loading

0 comments on commit ba98b7a

Please sign in to comment.