From 99b3689db4f240d990b1d7f9f871c6ed9ac6efc5 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 4 Oct 2024 11:56:15 +0200 Subject: [PATCH] #374 Fix new incremental ingestion and integration tests to match inclusive intervals. --- .../scala/za/co/absa/pramen/api/Source.scala | 11 ++++++++++- .../co/absa/pramen/api/sql/SqlGenerator.scala | 9 +++++++++ .../pramen/api/sql/SqlGeneratorBase.scala | 2 +- .../pramen/core/reader/TableReaderSpark.scala | 8 ++++---- .../IncrementalPipelineLongFixture.scala | 19 +++++++++---------- .../tests/sql/SqlGeneratorGenericSuite.scala | 4 ++-- 6 files changed, 35 insertions(+), 18 deletions(-) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala index b52742732..19d67403f 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala @@ -72,8 +72,17 @@ trait Source extends ExternalChannel { * * If an information date is provided and available at the source, the query will be limited to that date. * + * + * * @param offsetFromOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset - * @param offsetToOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset + * @param offsetToOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col <= min_offset * @param onlyForInfoDate An information date to get data for. Can be empty if the source table doesn't have such a column. * @param columns Select only specified columns. Selects all if an empty Seq is passed. */ diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGenerator.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGenerator.scala index 2ff693262..e38009542 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGenerator.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGenerator.scala @@ -55,6 +55,15 @@ trait SqlGenerator { /** * Generates a query for incremental ingestion, the result can be restricted by an information column, if present, but also by offset range. + * + * */ def getDataQueryIncremental(tableName: String, onlyForInfoDate: Option[LocalDate], diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala index ade09afda..f79ee5cbb 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala @@ -120,7 +120,7 @@ abstract class SqlGeneratorBase(sqlConfig: SqlConfig) extends SqlGenerator { s"${getOffsetWhereCondition(offsetColumn, ">=", offsetFrom)} AND ${getOffsetWhereCondition(offsetColumn, "<=", offsetTo)}" case (Some(offsetFrom), None) => validateOffsetValue(offsetFrom) - s"${getOffsetWhereCondition(offsetColumn, ">=", offsetFrom)}" + s"${getOffsetWhereCondition(offsetColumn, ">", offsetFrom)}" case (None, Some(offsetTo)) => validateOffsetValue(offsetTo) s"${getOffsetWhereCondition(offsetColumn, "<=", offsetTo)}" diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala index feacaf413..49562a27a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala @@ -82,9 +82,9 @@ class TableReaderSpark(formatOpt: Option[String], getData(query, infoDate, infoDate, columns) .filter(offsetCol >= offsetFrom.getSparkLit && offsetCol <= offsetTo.getSparkLit) case (Some(offsetFrom), None) => - log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} >= ${offsetFrom.valueString}") + log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} > ${offsetFrom.valueString}") getData(query, infoDate, infoDate, columns) - .filter(offsetCol >= offsetFrom.getSparkLit) + .filter(offsetCol > offsetFrom.getSparkLit) case (None, Some(offsetTo)) => log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} <= ${offsetTo.valueString}") getData(query, infoDate, infoDate, columns) @@ -100,9 +100,9 @@ class TableReaderSpark(formatOpt: Option[String], getBaseDataFrame(query) .filter(offsetCol >= offsetFrom.getSparkLit && offsetCol <= offsetTo.getSparkLit) case (Some(offsetFrom), None) => - log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} >= ${offsetFrom.valueString}") + log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${offsetFrom.valueString}") getBaseDataFrame(query) - .filter(offsetCol >= offsetFrom.getSparkLit) + .filter(offsetCol > offsetFrom.getSparkLit) case (None, Some(offsetTo)) => log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} <= ${offsetTo.valueString}") getBaseDataFrame(query) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala index 8e2d7677c..b1ea0b125 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala @@ -621,7 +621,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec assert(offsets.length == 1) - assert(offsets.head.minOffset.get.valueString.toLong == Long.MinValue) + assert(offsets.head.minOffset.get.valueString.toLong == 1) assert(offsets.head.maxOffset.get.valueString.toLong == 3) assert(offsets.head.committedAt.nonEmpty) } @@ -674,7 +674,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec assert(offsets.length == 1) - assert(offsets.head.minOffset.get.valueString.toLong == Long.MinValue) + assert(offsets.head.minOffset.get.valueString.toLong == 1) assert(offsets.head.maxOffset.get.valueString.toLong == 3) assert(offsets.head.committedAt.nonEmpty) } @@ -736,10 +736,10 @@ class IncrementalPipelineLongFixture extends AnyWordSpec assert(offsets.length == 2) - assert(offsets.head.minOffset.get.valueString.toLong == Long.MinValue) + assert(offsets.head.minOffset.get.valueString.toLong == 1) assert(offsets.head.maxOffset.get.valueString.toLong == 3) assert(offsets.head.committedAt.nonEmpty) - assert(offsets(1).minOffset.get.valueString.toLong == 3) + assert(offsets(1).minOffset.get.valueString.toLong == 4) assert(offsets(1).maxOffset.get.valueString.toLong == 6) assert(offsets(1).committedAt.nonEmpty) } @@ -1043,20 +1043,19 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val om = new OffsetManagerJdbc(pramenDb.db, 123L) val offsets1 = om.getOffsets("table1", infoDate.minusDays(1)) - assert(offsets1.head.minOffset.get.valueString.toLong == -62135596800000L) + assert(offsets1.head.minOffset.get.valueString.toLong == 1613563930000L) assert(offsets1.head.maxOffset.get.valueString.toLong == 1613563930000L) assert(offsets1.head.committedAt.nonEmpty) - val offsets2 = om.getOffsets("table1", infoDate) assert(offsets2.length == 1) - assert(offsets2.head.minOffset.get.valueString.toLong == 1613563930000L) + assert(offsets2.head.minOffset.get.valueString.toLong == 1613639398123L) assert(offsets2.head.maxOffset.get.valueString.toLong == 1613639399123L) assert(offsets2.head.committedAt.nonEmpty) val offsets3 = om.getOffsets("table1", infoDate.plusDays(1)) assert(offsets3.length == 1) - assert(offsets3.head.minOffset.get.valueString.toLong == 1613639399123L) + assert(offsets3.head.minOffset.get.valueString.toLong == 1613740330000L) assert(offsets3.head.maxOffset.get.valueString.toLong == 1613740330000L) assert(offsets3.head.committedAt.nonEmpty) } @@ -1109,13 +1108,13 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val offsets1 = om.getOffsets("table1", infoDate.minusDays(1)) assert(offsets1.length == 1) - assert(offsets1.head.minOffset.get.valueString.toLong == Long.MinValue) + assert(offsets1.head.minOffset.get.valueString.toLong == 1) assert(offsets1.head.maxOffset.get.valueString.toLong == 2) assert(offsets1.head.committedAt.nonEmpty) val offsets2 = om.getOffsets("table1", infoDate) assert(offsets2.length == 1) - assert(offsets2.head.minOffset.get.valueString.toLong == 2) + assert(offsets2.head.minOffset.get.valueString.toLong == 3) assert(offsets2.head.maxOffset.get.valueString.toLong == 4) assert(offsets2.head.committedAt.nonEmpty) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorGenericSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorGenericSuite.scala index ba240c3fd..379a856f3 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorGenericSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorGenericSuite.scala @@ -211,7 +211,7 @@ class SqlGeneratorGenericSuite extends AnyWordSpec { "work with only from offset" in { val sql = genDate.getDataQueryIncremental("table1", None, Some(OffsetValue.IntegralValue(1)), None, Seq.empty) - assert(sql == "SELECT * FROM table1 WHERE offset >= 1") + assert(sql == "SELECT * FROM table1 WHERE offset > 1") } "work with only to offset" in { @@ -242,7 +242,7 @@ class SqlGeneratorGenericSuite extends AnyWordSpec { "work with only from offset" in { val sql = genDate.getDataQueryIncremental("table1", Some(date1), Some(OffsetValue.IntegralValue(1)), None, Seq.empty) - assert(sql == "SELECT * FROM table1 WHERE D = date'2020-08-17' AND offset >= 1") + assert(sql == "SELECT * FROM table1 WHERE D = date'2020-08-17' AND offset > 1") } "work with only to offset" in {