diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 9dbadbd97ec79..1df63aa14b4b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.{NextIterator, SerializableConfiguration, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -401,9 +401,10 @@ object FileFormatWriter extends Logging { } try { + val queryFailureCapturedIterator = new QueryFailureCapturedIterator(iterator) Utils.tryWithSafeFinallyAndFailureCallbacks(block = { // Execute the task to write rows out and commit the task. - dataWriter.writeWithIterator(iterator) + dataWriter.writeWithIterator(queryFailureCapturedIterator) dataWriter.commit() })(catchBlock = { // If there is an error, abort the task @@ -413,6 +414,8 @@ object FileFormatWriter extends Logging { dataWriter.close() }) } catch { + case e: QueryFailureDuringWrite => + throw e.queryFailure case e: FetchFailedException => throw e case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput => @@ -452,3 +455,25 @@ object FileFormatWriter extends Logging { } } } + +// A exception wrapper to indicate that the error was thrown when executing the query, not writing +// the data +private class QueryFailureDuringWrite(val queryFailure: Throwable) extends Throwable + +// An iterator wrapper to rethrow any error from the given iterator with `QueryFailureDuringWrite`. +private class QueryFailureCapturedIterator(data: Iterator[InternalRow]) + extends NextIterator[InternalRow] { + + override protected def getNext(): InternalRow = try { + if (data.hasNext) { + data.next() + } else { + finished = true + null + } + } catch { + case t: Throwable => throw new QueryFailureDuringWrite(t) + } + + override protected def close(): Unit = {} +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 7e845e69c772a..013177425da78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException} +import org.apache.spark.{SparkConf, SparkRuntimeException} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.Project @@ -60,6 +60,18 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { } } + def assertLengthCheckFailure(query: String): Unit = { + assertLengthCheckFailure(() => sql(query)) + } + + def assertLengthCheckFailure(func: () => Unit): Unit = { + checkError( + exception = intercept[SparkRuntimeException](func()), + errorClass = "EXCEED_LIMIT_LENGTH", + parameters = Map("limit" -> "5") + ) + } + test("apply char padding/trimming and varchar trimming: top-level columns") { Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => withTable("t") { @@ -147,28 +159,12 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { withTable("t") { sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)") Seq("ADD", "DROP").foreach { op => - checkError( - exception = intercept[SparkRuntimeException] { - sql(s"ALTER TABLE t $op PARTITION(c='abcdef')") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(s"ALTER TABLE t $op PARTITION(c='abcdef')") } - checkError( - exception = intercept[SparkRuntimeException] { - sql(s"ALTER TABLE t PARTITION (c='abcdef') RENAME TO PARTITION (c='2')") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) - checkError( - exception = intercept[SparkRuntimeException] { - sql(s"ALTER TABLE t PARTITION (c='1') RENAME TO PARTITION (c='abcdef')") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure( + "ALTER TABLE t PARTITION (c='abcdef') RENAME TO PARTITION (c='2')") + assertLengthCheckFailure( + "ALTER TABLE t PARTITION (c='1') RENAME TO PARTITION (c='abcdef')") } } } @@ -315,17 +311,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c $typeName(5)) USING $format") sql("INSERT INTO t VALUES (null)") checkAnswer(spark.table("t"), Row(null)) - val ex = intercept[Exception] { - sql("INSERT INTO t VALUES ('123456')") - } - checkError( - exception = ex match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES ('123456')") } } @@ -337,14 +323,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE $tableName(i INT, c $typeName(5)) USING $format PARTITIONED BY (c)") sql(s"INSERT INTO $tableName VALUES (1, null)") checkAnswer(spark.table(tableName), Row(1, null)) - checkError( - exception = intercept[SparkException] { - sql(s"INSERT INTO $tableName VALUES (1, '123456')") - }, - errorClass = "TASK_WRITE_FAILED", - parameters = Map("path" -> s".*$tableName.*"), - matchPVals = true - ) + assertLengthCheckFailure(s"INSERT INTO $tableName VALUES (1, '123456')") } } } @@ -354,13 +333,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c STRUCT) USING $format") sql("INSERT INTO t SELECT struct(null)") checkAnswer(spark.table("t"), Row(Row(null))) - checkError( - exception = intercept[SparkRuntimeException] { - sql("INSERT INTO t SELECT struct('123456')") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t SELECT struct('123456')") } } @@ -369,32 +342,14 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c ARRAY<$typeName(5)>) USING $format") sql("INSERT INTO t VALUES (array(null))") checkAnswer(spark.table("t"), Row(Seq(null))) - val e = intercept[Exception] { - sql("INSERT INTO t VALUES (array('a', '123456'))") - } - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (array('a', '123456'))") } } test("length check for input string values: nested in map key") { testTableWrite { typeName => sql(s"CREATE TABLE t(c MAP<$typeName(5), STRING>) USING $format") - val e = intercept[Exception](sql("INSERT INTO t VALUES (map('123456', 'a'))")) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (map('123456', 'a'))") } } @@ -403,39 +358,15 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c MAP) USING $format") sql("INSERT INTO t VALUES (map('a', null))") checkAnswer(spark.table("t"), Row(Map("a" -> null))) - val e = intercept[Exception](sql("INSERT INTO t VALUES (map('a', '123456'))")) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (map('a', '123456'))") } } test("length check for input string values: nested in both map key and value") { testTableWrite { typeName => sql(s"CREATE TABLE t(c MAP<$typeName(5), $typeName(5)>) USING $format") - val e1 = intercept[Exception](sql("INSERT INTO t VALUES (map('123456', 'a'))")) - checkError( - exception = e1 match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) - val e2 = intercept[Exception](sql("INSERT INTO t VALUES (map('a', '123456'))")) - checkError( - exception = e2 match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (map('123456', 'a'))") + assertLengthCheckFailure("INSERT INTO t VALUES (map('a', '123456'))") } } @@ -444,15 +375,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c STRUCT>) USING $format") sql("INSERT INTO t SELECT struct(array(null))") checkAnswer(spark.table("t"), Row(Row(Seq(null)))) - val e = intercept[Exception](sql("INSERT INTO t SELECT struct(array('123456'))")) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t SELECT struct(array('123456'))") } } @@ -461,15 +384,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c ARRAY>) USING $format") sql("INSERT INTO t VALUES (array(struct(null)))") checkAnswer(spark.table("t"), Row(Seq(Row(null)))) - val e = intercept[Exception](sql("INSERT INTO t VALUES (array(struct('123456')))")) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (array(struct('123456')))") } } @@ -478,15 +393,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c ARRAY>) USING $format") sql("INSERT INTO t VALUES (array(array(null)))") checkAnswer(spark.table("t"), Row(Seq(Seq(null)))) - val e = intercept[Exception](sql("INSERT INTO t VALUES (array(array('123456')))")) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (array(array('123456')))") } } @@ -506,24 +413,8 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c1 CHAR(5), c2 VARCHAR(5)) USING $format") sql("INSERT INTO t VALUES (1234, 1234)") checkAnswer(spark.table("t"), Row("1234 ", "1234")) - val e1 = intercept[Exception](sql("INSERT INTO t VALUES (123456, 1)")) - checkError( - exception = e1 match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) - val e2 = intercept[Exception](sql("INSERT INTO t VALUES (1, 123456)")) - checkError( - exception = e2 match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (123456, 1)") + assertLengthCheckFailure("INSERT INTO t VALUES (1, 123456)") } } @@ -1046,23 +937,8 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)") checkAnswer(spark.table(tableName), Nil) } - - checkError( - exception = intercept[SparkException] { - sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)") - }, - errorClass = "TASK_WRITE_FAILED", - parameters = Map("path" -> s".*$tableName"), - matchPVals = true - ) - - checkError( - exception = intercept[SparkRuntimeException] { - sql("ALTER TABLE t DROP PARTITION(c=100000)") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)") + assertLengthCheckFailure("ALTER TABLE t DROP PARTITION(c=100000)") } } } @@ -1087,24 +963,8 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite sql(s"ALTER TABLE t DROP PARTITION(c=$v)") checkAnswer(spark.table("t"), Nil) } - - val e1 = intercept[Exception](sql(s"INSERT OVERWRITE t VALUES ('1', 100000)")) - checkError( - exception = e1 match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) - - checkError( - exception = intercept[SparkRuntimeException] { - sql("ALTER TABLE t DROP PARTITION(c=100000)") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(s"INSERT OVERWRITE t VALUES ('1', 100000)") + assertLengthCheckFailure("ALTER TABLE t DROP PARTITION(c=100000)") } } } @@ -1113,16 +973,8 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => withTable("t") { sql(s"CREATE TABLE t(s STRUCT) USING $format") - val inputDF = sql("SELECT named_struct('n_i', 1, 'n_c', '123456') AS s") - - checkError( - exception = intercept[SparkRuntimeException] { - inputDF.writeTo("t").append() - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(() => inputDF.writeTo("t").append()) } } } @@ -1131,18 +983,8 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => withTable("t") { sql(s"CREATE TABLE t(a ARRAY>) USING $format") - val inputDF = sql("SELECT array(named_struct('n_i', 1, 'n_c', '123456')) AS a") - - val e = intercept[Exception](inputDF.writeTo("t").append()) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(() => inputDF.writeTo("t").append()) } } } @@ -1151,18 +993,8 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => withTable("t") { sql(s"CREATE TABLE t(m MAP, INT>) USING $format") - val inputDF = sql("SELECT map(named_struct('n_i', 1, 'n_c', '123456'), 1) AS m") - - val e = intercept[Exception](inputDF.writeTo("t").append()) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(() => inputDF.writeTo("t").append()) } } } @@ -1171,18 +1003,8 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => withTable("t") { sql(s"CREATE TABLE t(m MAP>) USING $format") - val inputDF = sql("SELECT map(1, named_struct('n_i', 1, 'n_c', '123456')) AS m") - - val e = intercept[Exception](inputDF.writeTo("t").append()) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(() => inputDF.writeTo("t").append()) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala index 3927d6373b4af..a0b4d345628e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala @@ -251,12 +251,10 @@ class QueryExecutionAnsiErrorsSuite extends QueryTest val tableName = "overflowTable" withTable(tableName) { sql(s"CREATE TABLE $tableName(i $targetType) USING parquet") - val ex = intercept[SparkException] { - sql(s"insert into $tableName values 12345678901234567890D") - } - assert(ex.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = ex.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into $tableName values 12345678901234567890D") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"DOUBLE\"", @@ -289,12 +287,10 @@ class QueryExecutionAnsiErrorsSuite extends QueryTest sql("CREATE TABLE t2 (x tinyint) USING parquet") val insertCmd = "insert into t2 select 0 - (case when x = 1.2345678901234567E19D " + "then 1.2345678901234567E19D else x end) from t1 where x = 1.2345678901234567E19D;" - val ex = intercept[SparkException] { - sql(insertCmd).collect() - } - assert(ex.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = ex.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(insertCmd).collect() + }, errorClass = "CAST_OVERFLOW", parameters = Map("value" -> "-1.2345678901234567E19D", "sourceType" -> "\"DOUBLE\"", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index c2b8547778254..93698fdd7bc0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -771,12 +771,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("create table t(b int) using parquet") val outOfRangeValue1 = (Int.MaxValue + 1L).toString - val e1 = intercept[SparkException] { - sql(s"insert into t values($outOfRangeValue1)") - } - assert(e1.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = e1.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into t values($outOfRangeValue1)") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"BIGINT\"", @@ -784,12 +782,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { "columnName" -> "`b`")) val outOfRangeValue2 = (Int.MinValue - 1L).toString - val e2 = intercept[SparkException] { - sql(s"insert into t values($outOfRangeValue2)") - } - assert(e2.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = e2.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into t values($outOfRangeValue2)") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"BIGINT\"", @@ -806,12 +802,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("create table t(b long) using parquet") val outOfRangeValue1 = Math.nextUp(Long.MaxValue) - val e1 = intercept[SparkException] { - sql(s"insert into t values(${outOfRangeValue1}D)") - } - assert(e1.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = e1.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into t values(${outOfRangeValue1}D)") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"DOUBLE\"", @@ -819,12 +813,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { "columnName" -> "`b`")) val outOfRangeValue2 = Math.nextDown(Long.MinValue) - val e2 = intercept[SparkException] { - sql(s"insert into t values(${outOfRangeValue2}D)") - } - assert(e2.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = e2.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into t values(${outOfRangeValue2}D)") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"DOUBLE\"", @@ -840,12 +832,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql("create table t(b decimal(3,2)) using parquet") val outOfRangeValue = "123.45" - val ex = intercept[SparkException] { - sql(s"insert into t values($outOfRangeValue)") - } - assert(ex.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = ex.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into t values($outOfRangeValue)") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"DECIMAL(5,2)\"", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala index 7312bbb6238dc..c12d727e59740 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql -import org.apache.spark.{SparkException, SparkRuntimeException} import org.apache.spark.sql.execution.command.CharVarcharDDLTestBase import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -86,23 +85,8 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite with TestHiveSinglet sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)") checkAnswer(spark.table(tableName), Nil) } - - checkError( - exception = intercept[SparkException] { - sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)") - }, - errorClass = "TASK_WRITE_FAILED", - parameters = Map("path" -> s".*$tableName.*"), - matchPVals = true - ) - - checkError( - exception = intercept[SparkRuntimeException] { - sql("ALTER TABLE t DROP PARTITION(c=100000)") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)") + assertLengthCheckFailure("ALTER TABLE t DROP PARTITION(c=100000)") } } }