diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala
index 6b9826d652e28..22f84a1cad63d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.FileSourceOptions.{IGNORE_CORRUPT_FILES, IGNORE_MISSING_FILES}
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter}
+import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
/**
* Common options for the file-based data source.
@@ -29,6 +29,17 @@ class FileSourceOptions(
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+ protected def commonTimestampFormat =
+ if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
+ s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
+ } else {
+ if (SQLConf.get.supportSecondOffsetFormat) {
+ s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXXXX]"
+ } else {
+ s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
+ }
+ }
+
val ignoreCorruptFiles: Boolean = parameters.get(IGNORE_CORRUPT_FILES).map(_.toBoolean)
.getOrElse(SQLConf.get.ignoreCorruptFiles)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index f4ade722791c9..13b0b8077128e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -186,12 +186,8 @@ class CSVOptions(
} else {
parameters.get(TIMESTAMP_FORMAT)
}
- val timestampFormatInWrite: String = parameters.getOrElse(TIMESTAMP_FORMAT,
- if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
- s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
- } else {
- s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
- })
+ val timestampFormatInWrite: String =
+ parameters.getOrElse(TIMESTAMP_FORMAT, commonTimestampFormat)
val timestampNTZFormatInRead: Option[String] = parameters.get(TIMESTAMP_NTZ_FORMAT)
val timestampNTZFormatInWrite: String = parameters.getOrElse(TIMESTAMP_NTZ_FORMAT,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 945b6e7de8b7a..247106a8b8cd5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -122,12 +122,8 @@ class JSONOptions(
} else {
parameters.get(TIMESTAMP_FORMAT)
}
- val timestampFormatInWrite: String = parameters.getOrElse(TIMESTAMP_FORMAT,
- if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
- s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
- } else {
- s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
- })
+ val timestampFormatInWrite: String =
+ parameters.getOrElse(TIMESTAMP_FORMAT, commonTimestampFormat)
val timestampNTZFormatInRead: Option[String] = parameters.get(TIMESTAMP_NTZ_FORMAT)
val timestampNTZFormatInWrite: String =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
index 336c54e164e82..e2c2d9dbc6d63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
@@ -145,12 +145,8 @@ class XmlOptions(
} else {
parameters.get(TIMESTAMP_FORMAT)
}
- val timestampFormatInWrite: String = parameters.getOrElse(TIMESTAMP_FORMAT,
- if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
- s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
- } else {
- s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
- })
+ val timestampFormatInWrite: String =
+ parameters.getOrElse(TIMESTAMP_FORMAT, commonTimestampFormat)
val timestampNTZFormatInRead: Option[String] = parameters.get(TIMESTAMP_NTZ_FORMAT)
val timestampNTZFormatInWrite: String =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 15275be00f31e..f0ba5f7d77801 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4975,6 +4975,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val SUPPORT_SECOND_OFFSET_FORMAT =
+ buildConf("spark.sql.files.supportSecondOffsetFormat")
+ .internal()
+ .doc("When set to true, datetime formatter used for csv, json and xml " +
+ "will support zone offsets that have seconds in it. e.g. LA timezone offset prior to 1883" +
+ "was -07:52:58. When this flag is not set we lose seconds information." )
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(true)
+
// Deprecate "spark.connect.copyFromLocalToFs.allowDestLocal" in favor of this config. This is
// currently optional because we don't want to break existing users who are using the old config.
// If this config is set, then we override the deprecated config.
@@ -5934,6 +5944,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)
+ def supportSecondOffsetFormat: Boolean = getConf(SQLConf.SUPPORT_SECOND_OFFSET_FORMAT)
+
def disabledJdbcConnectionProviders: String = getConf(
StaticSQLConf.DISABLED_JDBC_CONN_PROVIDER_LIST)
diff --git a/sql/core/src/test/resources/test-data/timestamps.csv b/sql/core/src/test/resources/test-data/timestamps.csv
new file mode 100644
index 0000000000000..cea37b68a5f97
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/timestamps.csv
@@ -0,0 +1,6 @@
+timestamp
+01/01/1800 18:00Z
+01/01/1885 18:30Z
+27/10/2014 18:30
+26/08/2015 18:00
+28/01/2016 20:00
diff --git a/sql/core/src/test/resources/test-data/xml-resources/timestamps.xml b/sql/core/src/test/resources/test-data/xml-resources/timestamps.xml
new file mode 100644
index 0000000000000..5a58bc54292e1
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/xml-resources/timestamps.xml
@@ -0,0 +1,8 @@
+
+ John Smith
+
+ 01/01/1885 18:30Z
+ 27/10/2014 18:30
+ 26/08/2015 18:00
+ 28/01/2016 20:00
+
\ No newline at end of file
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 0822b0ac8073d..bb3c00d238ca6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -264,6 +264,12 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(df.select(to_csv($"a")), Row("1") :: Nil)
}
+ test("to_csv ISO default - old dates") {
+ val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("1800-01-01 00:00:00.0")))).toDF("a")
+
+ checkAnswer(df.select(to_csv($"a")), Row("1800-01-01T00:00:00.000-07:52:58") :: Nil)
+ }
+
test("to_csv with option (timestampFormat)") {
val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a")
val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm").asJava
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index ea00e02e232c6..9dca1d091d33b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -406,6 +406,17 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
Row("""{"_1":"26/08/2015 18:00"}""") :: Nil)
}
+ test("to_json ISO default - old dates") {
+ withSQLConf("spark.sql.session.timeZone" -> "America/Los_Angeles") {
+
+ val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("1800-01-01 00:00:00.0")))).toDF("a")
+
+ checkAnswer(
+ df.select(to_json($"a")),
+ Row("""{"_1":"1800-01-01T00:00:00.000-07:52:58"}""") :: Nil)
+ }
+ }
+
test("to_json with option (dateFormat)") {
val df = Seq(Tuple1(Tuple1(java.sql.Date.valueOf("2015-08-26")))).toDF("a")
val options = Map("dateFormat" -> "dd/MM/yyyy")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala
index 1364fab3138e3..4169d53e4fc8e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala
@@ -166,6 +166,23 @@ class XmlFunctionsSuite extends QueryTest with SharedSparkSession {
Row(expected) :: Nil)
}
+ test("to_xml ISO default - old dates") {
+ withSQLConf("spark.sql.session.timeZone" -> "America/Los_Angeles") {
+ val schema = StructType(StructField("a", TimestampType, nullable = false) :: Nil)
+ val data = Seq(Row(java.sql.Timestamp.valueOf("1800-01-01 00:00:00.0")))
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+ .withColumn("a", struct($"a"))
+
+ val expected =
+ s"""|
+ | 1800-01-01T00:00:00.000-07:52:58
+ |
""".stripMargin
+ checkAnswer(
+ df.select(to_xml($"a")),
+ Row(expected) :: Nil)
+ }
+ }
+
test("to_xml with option (timestampFormat)") {
val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")
val schema = StructType(StructField("a", TimestampType, nullable = false) :: Nil)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index f7ea8a735068e..cb442c44832bd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -70,6 +70,7 @@ abstract class CSVSuite
private val emptyFile = "test-data/empty.csv"
private val commentsFile = "test-data/comments.csv"
private val disableCommentsFile = "test-data/disable_comments.csv"
+ private val timestampFile = "test-data/timestamps.csv"
private val boolFile = "test-data/bool.csv"
private val decimalFile = "test-data/decimal.csv"
private val simpleSparseFile = "test-data/simple_sparse.csv"
@@ -968,8 +969,8 @@ abstract class CSVSuite
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
- .option("timestampFormat", "dd/MM/yyyy HH:mm")
- .load(testFile(datesFile))
+ .option("timestampFormat", "dd/MM/yyyy HH:mm[XXX]")
+ .load(testFile(timestampFile))
timestamps.write
.format("csv")
.option("header", "true")
@@ -983,11 +984,13 @@ abstract class CSVSuite
.option("header", "true")
.load(iso8601timestampsPath)
- val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", Locale.US)
- val expectedTimestamps = timestamps.collect().map { r =>
- // This should be ISO8601 formatted string.
- Row(iso8501.format(r.toSeq.head))
- }
+ val expectedTimestamps = Seq(
+ Row("1800-01-01T10:07:02.000-07:52:58"),
+ Row("1885-01-01T10:30:00.000-08:00"),
+ Row("2014-10-27T18:30:00.000-07:00"),
+ Row("2015-08-26T18:00:00.000-07:00"),
+ Row("2016-01-28T20:00:00.000-08:00")
+ )
checkAnswer(iso8601Timestamps, expectedTimestamps)
}
@@ -3427,6 +3430,10 @@ class CSVv2Suite extends CSVSuite {
}
class CSVLegacyTimeParserSuite extends CSVSuite {
+
+ override def excluded: Seq[String] =
+ Seq("Write timestamps correctly in ISO8601 format by default")
+
override protected def sparkConf: SparkConf =
super
.sparkConf
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 9e5ecc08e24a2..fda066aa26472 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1770,6 +1770,39 @@ abstract class JsonSuite
}
}
+ test("Write timestamps correctly in ISO8601 format by default") {
+ val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
+ withTempDir { dir =>
+ // lets set LA timezone as for old dates LA had seconds offset
+ withSQLConf("spark.sql.session.timeZone" -> "America/Los_Angeles") {
+ // With dateFormat option.
+ val timestampsWithoutFormatPath = s"${dir.getCanonicalPath}/timestampsWithoutFormat.json"
+ val timestampsWithoutFormat = spark.read
+ .schema(customSchema)
+ .option("timestampFormat", "dd/MM/yyyy HH:mm[XXX]")
+ .json(datesRecords.union(oldDatesRecord))
+
+ timestampsWithoutFormat.write
+ .format("json")
+ .save(timestampsWithoutFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+ val stringTimestampsWithFormat = spark.read
+ .schema(stringSchema)
+ .json(timestampsWithoutFormatPath)
+ val expectedStringDatesWithoutFormat = Seq(
+ Row("1800-01-01T10:07:02.000-07:52:58"),
+ Row("1885-01-01T10:30:00.000-08:00"),
+ Row("2014-10-27T18:30:00.000-07:00"),
+ Row("2015-08-26T18:00:00.000-07:00"),
+ Row("2016-01-28T20:00:00.000-08:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithoutFormat)
+ }
+ }
+ }
+
test("Write timestamps correctly with timestampFormat option") {
val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
withTempDir { dir =>
@@ -3968,6 +4001,10 @@ class JsonV2Suite extends JsonSuite {
}
class JsonLegacyTimeParserSuite extends JsonSuite {
+
+ override def excluded: Seq[String] =
+ Seq("Write timestamps correctly in ISO8601 format by default")
+
override protected def sparkConf: SparkConf =
super
.sparkConf
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index 5c35ee03fb271..6fa2bdfbfe758 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -229,6 +229,11 @@ private[json] trait TestJsonData {
"""{"date": "27/10/2014 18:30"}""" ::
"""{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING)
+ def oldDatesRecord: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
+ """{"date": "01/01/1800 18:00Z"}""" ::
+ """{"date": "01/01/1885 18:30Z"}""":: Nil))(Encoders.STRING)
+
lazy val singleRow: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize("""{"a":123}""" :: Nil))(Encoders.STRING)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
index 930cc29878108..30588dde965d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
@@ -1626,6 +1626,57 @@ class XmlSuite
assert(df.collect().head.getAs[Timestamp](2).getTime === 1322936130000L)
}
+ test("Write timestamps correctly in ISO8601 format by default") {
+ val originalSchema =
+ buildSchema(
+ field("author"),
+ field("time", TimestampType),
+ field("time2", TimestampType),
+ field("time3", TimestampType),
+ field("time4", TimestampType),
+ field("time5", TimestampType)
+ )
+
+ val df = spark.read
+ .option("rowTag", "book")
+ .option("timestampFormat", "dd/MM/yyyy HH:mm[XXX]")
+ .schema(originalSchema)
+ .xml(getTestResourcePath(resDir + "timestamps.xml"))
+
+ withTempDir { dir =>
+ // use los angeles as old dates have wierd offsets
+ withSQLConf("spark.session.timeZone" -> "America/Los_Angeles") {
+ df
+ .write
+ .option("rowTag", "book")
+ .xml(dir.getCanonicalPath + "/xml")
+ val schema =
+ buildSchema(
+ field("author"),
+ field("time", StringType),
+ field("time2", StringType),
+ field("time3", StringType),
+ field("time4", StringType),
+ field("time5", StringType)
+ )
+ val df2 = spark.read
+ .option("rowTag", "book")
+ .schema(schema)
+ .xml(dir.getCanonicalPath + "/xml")
+
+ val expectedStringDatesWithoutFormat = Seq(
+ Row("John Smith",
+ "1800-01-01T10:07:02.000-07:52:58",
+ "1885-01-01T10:30:00.000-08:00",
+ "2014-10-27T18:30:00.000-07:00",
+ "2015-08-26T18:00:00.000-07:00",
+ "2016-01-28T20:00:00.000-08:00"))
+
+ checkAnswer(df2, expectedStringDatesWithoutFormat)
+ }
+ }
+ }
+
test("Test custom timestampFormat without timezone") {
val xml = s"""
| John Smith