Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#387 Implement handling DATE overflows that can happen with PotgreSQL infinity date values. #392

Merged
merged 2 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,9 @@ is determined by the pipeline configuration.
# (Optional) The timeout for connecting to the JDBC host.
connection.timeout = 60

# (Optional) For built-in JDBC connector the default behavior is sanitize timestemp fields
# (Optional) For built-in JDBC connector the default behavior is sanitize date and timestamp fields
# by bounding to the range of 0001-01-01 ... 9999-12-31. This behavior can be switched off like this
sanitize.timestamps = false
sanitize.datetime = false

# Any option passed as 'option.' will be passed to the JDBC driver. Example:
#option.database = "test_db"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ case class JdbcConfig(
password: Option[String] = None,
retries: Option[Int] = None,
connectionTimeoutSeconds: Option[Int] = None,
sanitizeTimestamps: Boolean = true,
sanitizeDateTime: Boolean = true,
extraOptions: Map[String, String] = Map.empty[String, String]
)

Expand All @@ -45,7 +45,7 @@ object JdbcConfig {
val JDBC_PASSWORD = "jdbc.password"
val JDBC_RETRIES = "jdbc.retries"
val JDBC_CONNECTION_TIMEOUT = "jdbc.connection.timeout"
val JDBC_SANITIZE_TIMESTAMPS = "jdbc.sanitize.timestamps"
val JDBC_SANITIZE_DATETIME = "jdbc.sanitize.datetime"
val JDBC_EXTRA_OPTIONS_PREFIX = "jdbc.option"

def load(conf: Config, parent: String = ""): JdbcConfig = {
Expand Down Expand Up @@ -74,7 +74,7 @@ object JdbcConfig {
password = ConfigUtils.getOptionString(conf, JDBC_PASSWORD),
retries = ConfigUtils.getOptionInt(conf, JDBC_RETRIES),
connectionTimeoutSeconds = ConfigUtils.getOptionInt(conf, JDBC_CONNECTION_TIMEOUT),
sanitizeTimestamps = ConfigUtils.getOptionBoolean(conf, JDBC_SANITIZE_TIMESTAMPS).getOrElse(true),
sanitizeDateTime = ConfigUtils.getOptionBoolean(conf, JDBC_SANITIZE_DATETIME).getOrElse(true),
extraOptions = ConfigUtils.getExtraOptions(conf, JDBC_EXTRA_OPTIONS_PREFIX)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ object JdbcNativeUtils {

// Executing the query
val rs = getResultSet(jdbcConfig, url, query)
val driverIterator = new ResultSetToRowIterator(rs, jdbcConfig.sanitizeTimestamps)
val driverIterator = new ResultSetToRowIterator(rs, jdbcConfig.sanitizeDateTime)
val schema = JdbcSparkUtils.addMetadataFromJdbc(driverIterator.getSchema, rs.getMetaData)

driverIterator.close()

val rdd = spark.sparkContext.parallelize(Seq(query)).flatMap(q => {
new ResultSetToRowIterator(getResultSet(jdbcConfig, url, q), jdbcConfig.sanitizeTimestamps)
new ResultSetToRowIterator(getResultSet(jdbcConfig, url, q), jdbcConfig.sanitizeDateTime)
})

spark.createDataFrame(rdd, schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._

import java.sql.Types._
import java.sql.{ResultSet, Timestamp}
import java.sql.{Date, ResultSet, Timestamp}
import java.time.{LocalDateTime, ZoneOffset}

class ResultSetToRowIterator(rs: ResultSet, sanitizeTimestamps: Boolean) extends Iterator[Row] {
class ResultSetToRowIterator(rs: ResultSet, sanitizeDateTime: Boolean) extends Iterator[Row] {
import ResultSetToRowIterator._

private var didHasNext = false
Expand Down Expand Up @@ -115,15 +115,30 @@ class ResultSetToRowIterator(rs: ResultSet, sanitizeTimestamps: Boolean) extends
case DOUBLE => rs.getDouble(columnIndex)
case REAL => rs.getBigDecimal(columnIndex)
case NUMERIC => rs.getBigDecimal(columnIndex)
case DATE => rs.getDate(columnIndex)
case DATE => sanitizeDate(rs.getDate(columnIndex))
case TIMESTAMP => sanitizeTimestamp(rs.getTimestamp(columnIndex))
case _ => rs.getString(columnIndex)
}
}

private[core] def sanitizeDate(date: Date): Date = {
// This check against null is important since date=null is a valid value.
if (sanitizeDateTime && date != null) {
val timeMilli = date.getTime
if (timeMilli > MAX_SAFE_DATE_MILLI)
MAX_SAFE_DATE
else if (timeMilli < MIN_SAFE_DATE_MILLI)
MIN_SAFE_DATE
else
date
} else {
date
}
}

private[core] def sanitizeTimestamp(timestamp: Timestamp): Timestamp = {
// This check against null is important since timestamp=null is a valid value.
if (sanitizeTimestamps && timestamp != null) {
if (sanitizeDateTime && timestamp != null) {
val timeMilli = timestamp.getTime
if (timeMilli > MAX_SAFE_TIMESTAMP_MILLI)
MAX_SAFE_TIMESTAMP
Expand All @@ -138,9 +153,16 @@ class ResultSetToRowIterator(rs: ResultSet, sanitizeTimestamps: Boolean) extends
}

object ResultSetToRowIterator {
// Constants are aligned with Spark implementation
val MAX_SAFE_DATE_MILLI: Long = LocalDateTime.of(9999, 12, 31, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli
val MAX_SAFE_DATE = new Date(MAX_SAFE_DATE_MILLI)

val MAX_SAFE_TIMESTAMP_MILLI: Long = LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999999999).toInstant(ZoneOffset.UTC).toEpochMilli
val MAX_SAFE_TIMESTAMP = new Timestamp(MAX_SAFE_TIMESTAMP_MILLI)

val MIN_SAFE_DATE_MILLI: Long = LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli
val MIN_SAFE_DATE = new Date(MIN_SAFE_DATE_MILLI)

val MIN_SAFE_TIMESTAMP_MILLI: Long = LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli
val MIN_SAFE_TIMESTAMP = new Timestamp(MIN_SAFE_TIMESTAMP_MILLI)
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TableReaderJdbcNativeSuite extends AnyWordSpec with RelationalDbFixture wi
| }
|
| has.information.date.column = true
| jdbc.sanitize.timestamps = false
| jdbc.sanitize.datetime = false
|
| information.date.column = "FOUNDED"
| information.date.type = "date"
Expand Down Expand Up @@ -107,13 +107,13 @@ class TableReaderJdbcNativeSuite extends AnyWordSpec with RelationalDbFixture wi
"work with legacy config" in {
val reader = TableReaderJdbcNative(conf.getConfig("reader_legacy"), "reader_legacy")
assert(reader.getJdbcReaderConfig.infoDateFormat == "yyyy-MM-DD")
assert(!reader.getJdbcReaderConfig.jdbcConfig.sanitizeTimestamps)
assert(!reader.getJdbcReaderConfig.jdbcConfig.sanitizeDateTime)
}

"work with minimal config" in {
val reader = TableReaderJdbcNative(conf.getConfig("reader_minimal"), "reader_minimal")
assert(reader.getJdbcReaderConfig.infoDateFormat == "yyyy-MM-dd")
assert(reader.getJdbcReaderConfig.jdbcConfig.sanitizeTimestamps)
assert(reader.getJdbcReaderConfig.jdbcConfig.sanitizeDateTime)
}

"throw an exception if config is missing" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,7 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
}
}

"sanitizeTimestamp" should {
// From Spark:
// https://github.com/apache/spark/blob/ad8ac17dbdfa763236ab3303eac6a3115ba710cc/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala#L457
val minTimestamp = -62135596800000L
val maxTimestamp = 253402300799999L

"sanitizeDateTime" when {
// Variable names come from PostgreSQL "constant field docs":
// https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html
val POSTGRESQL_DATE_NEGATIVE_INFINITY: Long = -9223372036832400000L
Expand All @@ -210,58 +205,124 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
when(resultSetMetaData.getColumnCount).thenReturn(1)
when(resultSet.getMetaData).thenReturn(resultSetMetaData)

"ignore null values" in {
val iterator = new ResultSetToRowIterator(resultSet, true)
"sanitizeTimestamp" should {
// From Spark:
// https://github.com/apache/spark/blob/070461cc673c3fc910e66d1cbf628632b558b48c/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L323
val minTimestamp = -62135596800000L
val maxTimestamp = 253402300799999L

val fixedTs = iterator.sanitizeTimestamp(null)
"ignore null values" in {
val iterator = new ResultSetToRowIterator(resultSet, true)

assert(fixedTs == null)
}
val fixedTs = iterator.sanitizeTimestamp(null)

"convert PostgreSql positive infinity value" in {
val iterator = new ResultSetToRowIterator(resultSet, true)
val timestamp = Timestamp.from(Instant.ofEpochMilli(POSTGRESQL_DATE_POSITIVE_INFINITY))
assert(fixedTs == null)
}

val fixedTs = iterator.sanitizeTimestamp(timestamp)
"convert PostgreSql positive infinity value" in {
val iterator = new ResultSetToRowIterator(resultSet, true)
val timestamp = Timestamp.from(Instant.ofEpochMilli(POSTGRESQL_DATE_POSITIVE_INFINITY))

assert(fixedTs.getTime == maxTimestamp)
}
val fixedTs = iterator.sanitizeTimestamp(timestamp)

"convert PostgreSql negative infinity value" in {
val iterator = new ResultSetToRowIterator(resultSet, true)
val timestamp = Timestamp.from(Instant.ofEpochMilli(POSTGRESQL_DATE_NEGATIVE_INFINITY))
assert(fixedTs.getTime == maxTimestamp)
}

val fixedTs = iterator.sanitizeTimestamp(timestamp)
"convert PostgreSql negative infinity value" in {
val iterator = new ResultSetToRowIterator(resultSet, true)
val timestamp = Timestamp.from(Instant.ofEpochMilli(POSTGRESQL_DATE_NEGATIVE_INFINITY))

assert(fixedTs.getTime == minTimestamp)
}
val fixedTs = iterator.sanitizeTimestamp(timestamp)

assert(fixedTs.getTime == minTimestamp)
}

"convert overflowed value to the maximum value supported" in {
val iterator = new ResultSetToRowIterator(resultSet, true)
val timestamp = Timestamp.from(Instant.ofEpochMilli(1000000000000000L))
"convert overflowed value to the maximum value supported" in {
val iterator = new ResultSetToRowIterator(resultSet, true)
val timestamp = Timestamp.from(Instant.ofEpochMilli(1000000000000000L))

val actual = iterator.sanitizeTimestamp(timestamp)
val actual = iterator.sanitizeTimestamp(timestamp)

val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC")))
calendar.setTime(actual)
val year = calendar.get(Calendar.YEAR)
val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC")))
calendar.setTime(actual)
val year = calendar.get(Calendar.YEAR)

assert(year == 9999)
assert(actual.getTime == maxTimestamp)
assert(year == 9999)
assert(actual.getTime == maxTimestamp)
}

"do nothing if the feature is turned off" in {
val iterator = new ResultSetToRowIterator(resultSet, false)
val timestamp = Timestamp.from(Instant.ofEpochMilli(1000000000000000L))

val actual = iterator.sanitizeTimestamp(timestamp)

val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC")))
calendar.setTime(actual)
val year = calendar.get(Calendar.YEAR)

assert(year == 33658)
}
}

"do nothing if the feature is turned off" in {
val iterator = new ResultSetToRowIterator(resultSet, false)
val timestamp = Timestamp.from(Instant.ofEpochMilli(1000000000000000L))
"sanitizeDate" should {
// From Spark:
// https://github.com/apache/spark/blob/070461cc673c3fc910e66d1cbf628632b558b48c/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L339
val minDate = -62135596800000L
val maxDate = 253402214400000L

"ignore null values" in {
val iterator = new ResultSetToRowIterator(resultSet, true)

val fixedDate = iterator.sanitizeDate(null)

assert(fixedDate == null)
}

"convert PostgreSql positive infinity value" in {
val iterator = new ResultSetToRowIterator(resultSet, true)
val date = new Date(POSTGRESQL_DATE_POSITIVE_INFINITY)

val fixedDate = iterator.sanitizeDate(date)

assert(fixedDate.getTime == maxDate)
}

"convert PostgreSql negative infinity value" in {
val iterator = new ResultSetToRowIterator(resultSet, true)
val date = new Date(POSTGRESQL_DATE_NEGATIVE_INFINITY)

val fixedDate = iterator.sanitizeDate(date)

assert(fixedDate.getTime == minDate)
}

val actual = iterator.sanitizeTimestamp(timestamp)
"convert overflowed value to the maximum value supported" in {
val iterator = new ResultSetToRowIterator(resultSet, true)
val date = new Date(1000000000000000L)

val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC")))
calendar.setTime(actual)
val year = calendar.get(Calendar.YEAR)
val actual = iterator.sanitizeDate(date)

assert(year == 33658)
val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC")))
calendar.setTime(actual)
val year = calendar.get(Calendar.YEAR)

assert(year == 9999)
assert(actual.getTime == maxDate)
}

"do nothing if the feature is turned off" in {
val iterator = new ResultSetToRowIterator(resultSet, false)
val date = new Date(1000000000000000L)

val actual = iterator.sanitizeDate(date)

val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC")))
calendar.setTime(actual)
val year = calendar.get(Calendar.YEAR)

assert(year == 33658)
}
}
}

}
Loading