diff --git a/README.md b/README.md index 81eae3ced..c1ae0ee73 100644 --- a/README.md +++ b/README.md @@ -495,9 +495,9 @@ is determined by the pipeline configuration. # (Optional) The timeout for connecting to the JDBC host. connection.timeout = 60 - # (Optional) For built-in JDBC connector the default behavior is sanitize timestemp fields + # (Optional) For built-in JDBC connector the default behavior is sanitize date and timestamp fields # by bounding to the range of 0001-01-01 ... 9999-12-31. This behavior can be switched off like this - sanitize.timestamps = false + sanitize.datetime = false # Any option passed as 'option.' will be passed to the JDBC driver. Example: #option.database = "test_db" diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/JdbcConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/JdbcConfig.scala index f20437ee2..2e9fe4308 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/JdbcConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/JdbcConfig.scala @@ -30,7 +30,7 @@ case class JdbcConfig( password: Option[String] = None, retries: Option[Int] = None, connectionTimeoutSeconds: Option[Int] = None, - sanitizeTimestamps: Boolean = true, + sanitizeDateTime: Boolean = true, extraOptions: Map[String, String] = Map.empty[String, String] ) @@ -45,7 +45,7 @@ object JdbcConfig { val JDBC_PASSWORD = "jdbc.password" val JDBC_RETRIES = "jdbc.retries" val JDBC_CONNECTION_TIMEOUT = "jdbc.connection.timeout" - val JDBC_SANITIZE_TIMESTAMPS = "jdbc.sanitize.timestamps" + val JDBC_SANITIZE_DATETIME = "jdbc.sanitize.datetime" val JDBC_EXTRA_OPTIONS_PREFIX = "jdbc.option" def load(conf: Config, parent: String = ""): JdbcConfig = { @@ -74,7 +74,7 @@ object JdbcConfig { password = ConfigUtils.getOptionString(conf, JDBC_PASSWORD), retries = ConfigUtils.getOptionInt(conf, JDBC_RETRIES), connectionTimeoutSeconds = ConfigUtils.getOptionInt(conf, JDBC_CONNECTION_TIMEOUT), - sanitizeTimestamps = ConfigUtils.getOptionBoolean(conf, JDBC_SANITIZE_TIMESTAMPS).getOrElse(true), + sanitizeDateTime = ConfigUtils.getOptionBoolean(conf, JDBC_SANITIZE_DATETIME).getOrElse(true), extraOptions = ConfigUtils.getExtraOptions(conf, JDBC_EXTRA_OPTIONS_PREFIX) ) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala index 6cbfb4c6a..6a6b386f7 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala @@ -89,13 +89,13 @@ object JdbcNativeUtils { // Executing the query val rs = getResultSet(jdbcConfig, url, query) - val driverIterator = new ResultSetToRowIterator(rs, jdbcConfig.sanitizeTimestamps) + val driverIterator = new ResultSetToRowIterator(rs, jdbcConfig.sanitizeDateTime) val schema = JdbcSparkUtils.addMetadataFromJdbc(driverIterator.getSchema, rs.getMetaData) driverIterator.close() val rdd = spark.sparkContext.parallelize(Seq(query)).flatMap(q => { - new ResultSetToRowIterator(getResultSet(jdbcConfig, url, q), jdbcConfig.sanitizeTimestamps) + new ResultSetToRowIterator(getResultSet(jdbcConfig, url, q), jdbcConfig.sanitizeDateTime) }) spark.createDataFrame(rdd, schema) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala index c99112e45..a39e4e986 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala @@ -21,10 +21,10 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ import java.sql.Types._ -import java.sql.{ResultSet, Timestamp} +import java.sql.{Date, ResultSet, Timestamp} import java.time.{LocalDateTime, ZoneOffset} -class ResultSetToRowIterator(rs: ResultSet, sanitizeTimestamps: Boolean) extends Iterator[Row] { +class ResultSetToRowIterator(rs: ResultSet, sanitizeDateTime: Boolean) extends Iterator[Row] { import ResultSetToRowIterator._ private var didHasNext = false @@ -115,15 +115,30 @@ class ResultSetToRowIterator(rs: ResultSet, sanitizeTimestamps: Boolean) extends case DOUBLE => rs.getDouble(columnIndex) case REAL => rs.getBigDecimal(columnIndex) case NUMERIC => rs.getBigDecimal(columnIndex) - case DATE => rs.getDate(columnIndex) + case DATE => sanitizeDate(rs.getDate(columnIndex)) case TIMESTAMP => sanitizeTimestamp(rs.getTimestamp(columnIndex)) case _ => rs.getString(columnIndex) } } + private[core] def sanitizeDate(date: Date): Date = { + // This check against null is important since date=null is a valid value. + if (sanitizeDateTime && date != null) { + val timeMilli = date.getTime + if (timeMilli > MAX_SAFE_DATE_MILLI) + MAX_SAFE_DATE + else if (timeMilli < MIN_SAFE_DATE_MILLI) + MIN_SAFE_DATE + else + date + } else { + date + } + } + private[core] def sanitizeTimestamp(timestamp: Timestamp): Timestamp = { // This check against null is important since timestamp=null is a valid value. - if (sanitizeTimestamps && timestamp != null) { + if (sanitizeDateTime && timestamp != null) { val timeMilli = timestamp.getTime if (timeMilli > MAX_SAFE_TIMESTAMP_MILLI) MAX_SAFE_TIMESTAMP @@ -138,9 +153,16 @@ class ResultSetToRowIterator(rs: ResultSet, sanitizeTimestamps: Boolean) extends } object ResultSetToRowIterator { + // Constants are aligned with Spark implementation + val MAX_SAFE_DATE_MILLI: Long = LocalDateTime.of(9999, 12, 31, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli + val MAX_SAFE_DATE = new Date(MAX_SAFE_DATE_MILLI) + val MAX_SAFE_TIMESTAMP_MILLI: Long = LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999999999).toInstant(ZoneOffset.UTC).toEpochMilli val MAX_SAFE_TIMESTAMP = new Timestamp(MAX_SAFE_TIMESTAMP_MILLI) + val MIN_SAFE_DATE_MILLI: Long = LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli + val MIN_SAFE_DATE = new Date(MIN_SAFE_DATE_MILLI) + val MIN_SAFE_TIMESTAMP_MILLI: Long = LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli val MIN_SAFE_TIMESTAMP = new Timestamp(MIN_SAFE_TIMESTAMP_MILLI) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcNativeSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcNativeSuite.scala index 01e878175..275d9ffa2 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcNativeSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcNativeSuite.scala @@ -55,7 +55,7 @@ class TableReaderJdbcNativeSuite extends AnyWordSpec with RelationalDbFixture wi | } | | has.information.date.column = true - | jdbc.sanitize.timestamps = false + | jdbc.sanitize.datetime = false | | information.date.column = "FOUNDED" | information.date.type = "date" @@ -107,13 +107,13 @@ class TableReaderJdbcNativeSuite extends AnyWordSpec with RelationalDbFixture wi "work with legacy config" in { val reader = TableReaderJdbcNative(conf.getConfig("reader_legacy"), "reader_legacy") assert(reader.getJdbcReaderConfig.infoDateFormat == "yyyy-MM-DD") - assert(!reader.getJdbcReaderConfig.jdbcConfig.sanitizeTimestamps) + assert(!reader.getJdbcReaderConfig.jdbcConfig.sanitizeDateTime) } "work with minimal config" in { val reader = TableReaderJdbcNative(conf.getConfig("reader_minimal"), "reader_minimal") assert(reader.getJdbcReaderConfig.infoDateFormat == "yyyy-MM-dd") - assert(reader.getJdbcReaderConfig.jdbcConfig.sanitizeTimestamps) + assert(reader.getJdbcReaderConfig.jdbcConfig.sanitizeDateTime) } "throw an exception if config is missing" in { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala index 0b2b3ae31..2557f1fc6 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala @@ -193,12 +193,7 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa } } - "sanitizeTimestamp" should { - // From Spark: - // https://github.com/apache/spark/blob/ad8ac17dbdfa763236ab3303eac6a3115ba710cc/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala#L457 - val minTimestamp = -62135596800000L - val maxTimestamp = 253402300799999L - + "sanitizeDateTime" when { // Variable names come from PostgreSQL "constant field docs": // https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html val POSTGRESQL_DATE_NEGATIVE_INFINITY: Long = -9223372036832400000L @@ -210,58 +205,124 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa when(resultSetMetaData.getColumnCount).thenReturn(1) when(resultSet.getMetaData).thenReturn(resultSetMetaData) - "ignore null values" in { - val iterator = new ResultSetToRowIterator(resultSet, true) + "sanitizeTimestamp" should { + // From Spark: + // https://github.com/apache/spark/blob/070461cc673c3fc910e66d1cbf628632b558b48c/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L323 + val minTimestamp = -62135596800000L + val maxTimestamp = 253402300799999L - val fixedTs = iterator.sanitizeTimestamp(null) + "ignore null values" in { + val iterator = new ResultSetToRowIterator(resultSet, true) - assert(fixedTs == null) - } + val fixedTs = iterator.sanitizeTimestamp(null) - "convert PostgreSql positive infinity value" in { - val iterator = new ResultSetToRowIterator(resultSet, true) - val timestamp = Timestamp.from(Instant.ofEpochMilli(POSTGRESQL_DATE_POSITIVE_INFINITY)) + assert(fixedTs == null) + } - val fixedTs = iterator.sanitizeTimestamp(timestamp) + "convert PostgreSql positive infinity value" in { + val iterator = new ResultSetToRowIterator(resultSet, true) + val timestamp = Timestamp.from(Instant.ofEpochMilli(POSTGRESQL_DATE_POSITIVE_INFINITY)) - assert(fixedTs.getTime == maxTimestamp) - } + val fixedTs = iterator.sanitizeTimestamp(timestamp) - "convert PostgreSql negative infinity value" in { - val iterator = new ResultSetToRowIterator(resultSet, true) - val timestamp = Timestamp.from(Instant.ofEpochMilli(POSTGRESQL_DATE_NEGATIVE_INFINITY)) + assert(fixedTs.getTime == maxTimestamp) + } - val fixedTs = iterator.sanitizeTimestamp(timestamp) + "convert PostgreSql negative infinity value" in { + val iterator = new ResultSetToRowIterator(resultSet, true) + val timestamp = Timestamp.from(Instant.ofEpochMilli(POSTGRESQL_DATE_NEGATIVE_INFINITY)) - assert(fixedTs.getTime == minTimestamp) - } + val fixedTs = iterator.sanitizeTimestamp(timestamp) + + assert(fixedTs.getTime == minTimestamp) + } - "convert overflowed value to the maximum value supported" in { - val iterator = new ResultSetToRowIterator(resultSet, true) - val timestamp = Timestamp.from(Instant.ofEpochMilli(1000000000000000L)) + "convert overflowed value to the maximum value supported" in { + val iterator = new ResultSetToRowIterator(resultSet, true) + val timestamp = Timestamp.from(Instant.ofEpochMilli(1000000000000000L)) - val actual = iterator.sanitizeTimestamp(timestamp) + val actual = iterator.sanitizeTimestamp(timestamp) - val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC"))) - calendar.setTime(actual) - val year = calendar.get(Calendar.YEAR) + val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC"))) + calendar.setTime(actual) + val year = calendar.get(Calendar.YEAR) - assert(year == 9999) - assert(actual.getTime == maxTimestamp) + assert(year == 9999) + assert(actual.getTime == maxTimestamp) + } + + "do nothing if the feature is turned off" in { + val iterator = new ResultSetToRowIterator(resultSet, false) + val timestamp = Timestamp.from(Instant.ofEpochMilli(1000000000000000L)) + + val actual = iterator.sanitizeTimestamp(timestamp) + + val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC"))) + calendar.setTime(actual) + val year = calendar.get(Calendar.YEAR) + + assert(year == 33658) + } } - "do nothing if the feature is turned off" in { - val iterator = new ResultSetToRowIterator(resultSet, false) - val timestamp = Timestamp.from(Instant.ofEpochMilli(1000000000000000L)) + "sanitizeDate" should { + // From Spark: + // https://github.com/apache/spark/blob/070461cc673c3fc910e66d1cbf628632b558b48c/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L339 + val minDate = -62135596800000L + val maxDate = 253402214400000L + + "ignore null values" in { + val iterator = new ResultSetToRowIterator(resultSet, true) + + val fixedDate = iterator.sanitizeDate(null) + + assert(fixedDate == null) + } + + "convert PostgreSql positive infinity value" in { + val iterator = new ResultSetToRowIterator(resultSet, true) + val date = new Date(POSTGRESQL_DATE_POSITIVE_INFINITY) + + val fixedDate = iterator.sanitizeDate(date) + + assert(fixedDate.getTime == maxDate) + } + + "convert PostgreSql negative infinity value" in { + val iterator = new ResultSetToRowIterator(resultSet, true) + val date = new Date(POSTGRESQL_DATE_NEGATIVE_INFINITY) + + val fixedDate = iterator.sanitizeDate(date) + + assert(fixedDate.getTime == minDate) + } - val actual = iterator.sanitizeTimestamp(timestamp) + "convert overflowed value to the maximum value supported" in { + val iterator = new ResultSetToRowIterator(resultSet, true) + val date = new Date(1000000000000000L) - val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC"))) - calendar.setTime(actual) - val year = calendar.get(Calendar.YEAR) + val actual = iterator.sanitizeDate(date) - assert(year == 33658) + val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC"))) + calendar.setTime(actual) + val year = calendar.get(Calendar.YEAR) + + assert(year == 9999) + assert(actual.getTime == maxDate) + } + + "do nothing if the feature is turned off" in { + val iterator = new ResultSetToRowIterator(resultSet, false) + val date = new Date(1000000000000000L) + + val actual = iterator.sanitizeDate(date) + + val calendar = new GregorianCalendar(TimeZone.getTimeZone(ZoneId.of("UTC"))) + calendar.setTime(actual) + val year = calendar.get(Calendar.YEAR) + + assert(year == 33658) + } } } - }