diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala index 7c7e5194f..eb7211724 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala @@ -17,6 +17,8 @@ package za.co.absa.pramen.api import org.apache.spark.sql.DataFrame +import za.co.absa.pramen.api.offset.DataOffset +import za.co.absa.pramen.api.status.TaskRunReason import java.time.LocalDate @@ -100,6 +102,15 @@ trait MetastoreReader { */ def isDataAvailable(tableName: String, from: Option[LocalDate], until: Option[LocalDate]): Boolean + /** + * Returns offsets for an information date (both committed and uncommitted). + * + * This info can be used by transformers and sinks to decide if actions need to be taken depending on the + * current micro batch. For example, adding partitions to Hive needs to happen only once per info date, + * so a sink that does this can check if micro-batches have been ran for the current day. + */ + def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] + /** * Gets definition of a metastore table. Please, use with caution and do not write to the underlying path * from transformers. @@ -120,6 +131,12 @@ trait MetastoreReader { */ def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo] + /** + * Returns the reason of running the task. This helps transformers and sinks to determine logic based on whether + * thr run is a normal run or a force re-run. + */ + def getRunReason: TaskRunReason + /** * Returns an object that allows accessing metadata of metastore tables. */ diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala index 8b543b855..05df34a22 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala @@ -78,9 +78,12 @@ trait Source extends ExternalChannel { def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): SourceResult /** - * Returns the incremental data greater than the specified offset. + * Returns the incremental data between specified offsets. * - * If an information date is provided and available at the source, the query will be limited to that date + * If an information date is provided and available at the source, the query will be limited to that date. + * + * This method is used for re-runs for a particular information day. For sources that have information date column + * the returned data will be for the full information date, even outside the specified offsets. * * @param minOffset This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset * @param maxOffset This is an inclusive parameter the query will be SELECT ... WHERE offset_col <= max_offset diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/DataOffset.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/DataOffset.scala new file mode 100644 index 000000000..95d4018db --- /dev/null +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/DataOffset.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.api.offset + +import java.time.LocalDate + +case class DataOffset(tableName: String, + infoDate: LocalDate, + minOffset: OffsetValue, + maxOffset: Option[OffsetValue], /* Can be None for uncommitted offsets. */ + createdAt: Long, + committedAt: Option[Long] + ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala index 99d3315d7..e40688e36 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala @@ -16,8 +16,8 @@ package za.co.absa.pramen.core.bookkeeper -import za.co.absa.pramen.api.offset.OffsetValue -import za.co.absa.pramen.core.bookkeeper.model.{DataOffset, DataOffsetAggregated, DataOffsetRequest} +import za.co.absa.pramen.api.offset.{DataOffset, OffsetValue} +import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest} import java.time.LocalDate diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala index 98fdabeec..a703b0269 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.bookkeeper import org.slf4j.LoggerFactory import slick.jdbc.H2Profile.api._ -import za.co.absa.pramen.api.offset.OffsetValue +import za.co.absa.pramen.api.offset.{DataOffset, OffsetValue} import za.co.absa.pramen.core.bookkeeper.model._ import za.co.absa.pramen.core.utils.SlickUtils @@ -37,7 +37,7 @@ class OffsetManagerJdbc(db: Database) extends OffsetManager { return Array.empty } - offsets.map(DataOffset.fromOffsetRecord) + offsets.map(OffsetRecordConverter.toDataOffset) } override def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = { @@ -118,7 +118,7 @@ class OffsetManagerJdbc(db: Database) extends OffsetManager { val minOffset = OffsetValue.fromString(offsetDataType, offsets.map(_.minOffset).min) val maxOffset = OffsetValue.fromString(offsetDataType, offsets.map(_.maxOffset).max) - Some(DataOffsetAggregated(table, infoDate, minOffset, maxOffset, offsets.map(DataOffset.fromOffsetRecord))) + Some(DataOffsetAggregated(table, infoDate, minOffset, maxOffset, offsets.map(OffsetRecordConverter.toDataOffset))) } /** diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffsetAggregated.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffsetAggregated.scala index e737de9f1..81854057c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffsetAggregated.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffsetAggregated.scala @@ -16,7 +16,7 @@ package za.co.absa.pramen.core.bookkeeper.model -import za.co.absa.pramen.api.offset.OffsetValue +import za.co.absa.pramen.api.offset.{DataOffset, OffsetValue} import java.time.LocalDate diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffset.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecordConverter.scala similarity index 69% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffset.scala rename to pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecordConverter.scala index 2856d058a..b1eb8c246 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffset.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecordConverter.scala @@ -16,20 +16,12 @@ package za.co.absa.pramen.core.bookkeeper.model -import za.co.absa.pramen.api.offset.OffsetValue +import za.co.absa.pramen.api.offset.{DataOffset, OffsetValue} import java.time.LocalDate -case class DataOffset(tableName: String, - infoDate: LocalDate, - minOffset: OffsetValue, - maxOffset: Option[OffsetValue], /* Can be None for uncommitted offsets. */ - createdAt: Long, - committedAt: Option[Long] - ) - -object DataOffset { - def fromOffsetRecord(r: OffsetRecord): DataOffset = { +object OffsetRecordConverter { + def toDataOffset(r: OffsetRecord): DataOffset = { val maxOffsetOpt = if (r.maxOffset.nonEmpty) { Option(OffsetValue.fromString(r.dataType, r.maxOffset)) } else { @@ -45,4 +37,4 @@ object DataOffset { r.committedAtMilli ) } -} \ No newline at end of file +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala index a27756226..c421caf78 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala @@ -19,6 +19,7 @@ package za.co.absa.pramen.core.metastore import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SaveMode} import za.co.absa.pramen.api._ +import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.utils.hive.HiveHelper @@ -53,5 +54,5 @@ trait Metastore { def getStats(tableName: String, infoDate: LocalDate): MetaTableStats - def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, isIncremental: Boolean): MetastoreReader + def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean): MetastoreReader } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index 955c66edb..eae6884b5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api._ +import za.co.absa.pramen.api.offset.DataOffset +import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT import za.co.absa.pramen.core.app.config.RuntimeConfig.UNDERCOVER @@ -194,7 +196,7 @@ class MetastoreImpl(appConfig: Config, MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).getStats(infoDate, onlyForCurrentBatchId = false) } - override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, isIncremental: Boolean): MetastoreReader = { + override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean): MetastoreReader = { val metastore = this new MetastoreReader { @@ -232,6 +234,12 @@ class MetastoreImpl(appConfig: Config, metastore.isDataAvailable(tableName, fromDate, untilDate) } + override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = { + val om = bookkeeper.getOffsetManager + + om.getOffsets(table, infoDate) + } + override def getTableDef(tableName: String): MetaTableDef = { validateTable(tableName) @@ -245,6 +253,8 @@ class MetastoreImpl(appConfig: Config, ) } + override def getRunReason: TaskRunReason = runReason + override def metadataManager: MetadataManager = metadata private def validateTable(tableName: String): Unit = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala index 531f16009..b26c302ef 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala @@ -98,12 +98,13 @@ class MetastorePersistenceParquet(path: String, val stats = getStats(infoDate, isAppend) - if (isAppend) { - log.info(s"$SUCCESS Successfully saved ${stats.recordCountAppended.get} records (new count: ${stats.recordCount}, " + - s"new size: ${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir") - } else { - log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " + - s"(${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir") + stats.recordCountAppended match { + case Some(recordsAppended) => + log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount}, " + + s"new size: ${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir") + case None => + log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " + + s"(${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir") } stats @@ -118,7 +119,7 @@ class MetastorePersistenceParquet(path: String, val size = fsUtils.getDirectorySize(outputDirStr) - if (onlyForCurrentBatchId) { + if (onlyForCurrentBatchId && df.schema.exists(_.name.equalsIgnoreCase(batchIdColumn))) { val batchCount = df.filter(col(batchIdColumn) === batchId).count() val countAll = df.count() diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index a8a487f71..4a30ae5e4 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import za.co.absa.pramen.api.offset.OffsetValue import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} -import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult} +import za.co.absa.pramen.api.{Reason, Source} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated import za.co.absa.pramen.core.metastore.Metastore @@ -51,12 +51,13 @@ class IncrementalIngestionJob(operationDef: OperationDef, override def trackDays: Int = 0 override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = { - if (source.hasInfoDateColumn(sourceTable.query)) { + val hasInfoDateColumn = source.hasInfoDateColumn(sourceTable.query) + if (hasInfoDateColumn && runReason == TaskRunReason.Rerun) { super.preRunCheckJob(infoDate, runReason, jobConfig, dependencyWarnings) } else { latestOffset match { case Some(offset) => - if (offset.maximumInfoDate.isAfter(infoDate)) { + if (offset.maximumInfoDate.isAfter(infoDate) && !hasInfoDateColumn) { JobPreRunResult(JobPreRunStatus.Skip("Retrospective runs are not allowed yet"), None, dependencyWarnings, Nil) } else { JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil) @@ -140,7 +141,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, source.postProcess( sourceTable.query, outputTable.name, - metastore.getMetastoreReader(Seq(outputTable.name), infoDate, isIncremental = true), + metastore.getMetastoreReader(Seq(outputTable.name), infoDate, runReason, isIncremental = true), infoDate, operationDef.extraOptions ) @@ -155,21 +156,4 @@ class IncrementalIngestionJob(operationDef: OperationDef, SaveResult(stats, warnings = tooLongWarnings) } - - private def getSourcingResult(infoDate: LocalDate): SourceResult = { - val (from, to) = getInfoDateRange(infoDate, sourceTable.rangeFromExpr, sourceTable.rangeToExpr) - - getData(source, sourceTable.query, from, to) - } - - private def getData(source: Source, query: Query, from: LocalDate, to: LocalDate): SourceResult = { - val sourceResult = if (sourceTable.transformations.isEmpty && sourceTable.filters.isEmpty) - source.getData(query, from, to, sourceTable.columns) // push down the projection - else - source.getData(query, from, to, Seq.empty[String]) // column selection and order will be applied later - - val sanitizedDf = sanitizeDfColumns(sourceResult.data, specialCharacters) - - sourceResult.copy(data = sanitizedDf) - } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala index 24760aed1..5d8b14e41 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala @@ -170,7 +170,7 @@ class IngestionJob(operationDef: OperationDef, source.postProcess( sourceTable.query, outputTable.name, - metastore.getMetastoreReader(Seq(outputTable.name), infoDate, isIncremental = false), + metastore.getMetastoreReader(Seq(outputTable.name), infoDate, runReason, isIncremental = false), infoDate, operationDef.extraOptions ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala index 3b699902d..0a8ef3029 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala @@ -110,22 +110,52 @@ abstract class JobBase(operationDef: OperationDef, } protected def preRunTransformationCheck(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = { - validateTransformationAlreadyRanCases(infoDate, dependencyWarnings) match { - case Some(result) => result - case None => JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String]) + if (isIncremental) { + JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String]) + } else { + validateTransformationAlreadyRanCases(infoDate, dependencyWarnings) match { + case Some(result) => result + case None => JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String]) + } } } protected def validateTransformationAlreadyRanCases(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): Option[JobPreRunResult] = { - if (!isIncremental && bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate).isDefined) { - log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.") - Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String])) - } else { - log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.") - None + bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match { + case Some(chunk) => + val outOfDateTables = getOutdatedTables(infoDate, chunk.jobFinished) + if (outOfDateTables.nonEmpty) { + log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate, but has outdated tables: ${outOfDateTables.mkString(", ")}") + Some(JobPreRunResult(JobPreRunStatus.NeedsUpdate, None, dependencyWarnings, Seq.empty[String])) + } else { + log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.") + Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String])) + } + case None => + log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.") + None } } + private def getOutdatedTables(infoDate: LocalDate, targetJobFinishedSeconds: Long): Seq[String] = { + operationDef.dependencies + .filter(d => !d.isOptional && !d.isPassive) + .flatMap(_.tables) + .distinct + .filter { table => + bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match { + case Some(chunk) if chunk.jobFinished >= targetJobFinishedSeconds => + log.warn(s"The dependent table '$table' has been updated at ${Instant.ofEpochSecond(chunk.jobFinished)} retrospectively " + + s"after the transformation at ${Instant.ofEpochSecond(targetJobFinishedSeconds)} .") + true + case Some(chunk) => + false + case Some(chunk) => + false + } + } + } + protected def checkDependency(dep: MetastoreDependency, infoDate: LocalDate): Option[DependencyFailure] = { val evaluator = new DateExprEvaluator evaluator.setValue("infoDate", infoDate) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index 5b41fbf69..a75cca9c1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -121,7 +121,7 @@ class SinkJob(operationDef: OperationDef, try { val sinkResult = sink.send(df, sinkTable.metaTableName, - metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, infoDate, isIncremental), + metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, infoDate, runReason, isIncremental), infoDate, sinkTable.options ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala index a824a5345..366045300 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala @@ -50,11 +50,11 @@ class TransformationJob(operationDef: OperationDef, } override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { - transformer.validate(metastore.getMetastoreReader(inputTables, infoDate, isIncremental), infoDate, operationDef.extraOptions) + transformer.validate(metastore.getMetastoreReader(inputTables, infoDate, runReason, isIncremental), infoDate, operationDef.extraOptions) } override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = { - RunResult(transformer.run(metastore.getMetastoreReader(inputTables, infoDate, isIncremental), infoDate, operationDef.extraOptions)) + RunResult(transformer.run(metastore.getMetastoreReader(inputTables, infoDate, runReason, isIncremental), infoDate, operationDef.extraOptions)) } def postProcessing(df: DataFrame, @@ -77,7 +77,7 @@ class TransformationJob(operationDef: OperationDef, try { transformer.postProcess( outputTable.name, - metastore.getMetastoreReader(inputTables :+ outputTable.name, infoDate, isIncremental), + metastore.getMetastoreReader(inputTables :+ outputTable.name, infoDate, runReason, isIncremental), infoDate, operationDef.extraOptions ) } catch { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala index 72e8bda7f..29f36c57a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala @@ -69,16 +69,25 @@ class TableReaderSpark(formatOpt: Option[String], override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = { val offsetInfo = offsetInfoOpt.getOrElse(throw new IllegalArgumentException(s"Offset column and type is not defined for ${query.query}.")) infoDateOpt match { - case Some(infoDate) => + case Some(infoDate) if hasInfoDateColumn => getData(query, infoDate, infoDate, columns) .filter(col(offsetInfo.offsetColumn) > minOffset.getSparkLit) - case None => + case _ => getBaseDataFrame(query) .filter(col(offsetInfo.offsetColumn) > minOffset.getSparkLit) } } - override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ??? + override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = { + val offsetInfo = offsetInfoOpt.getOrElse(throw new IllegalArgumentException(s"Offset column and type is not defined for ${query.query}.")) + infoDateOpt match { + case Some(infoDate) if hasInfoDateColumn => + getData(query, infoDate, infoDate, columns) + case _ => + getBaseDataFrame(query) + .filter(col(offsetInfo.offsetColumn) > minOffset.getSparkLit && col(offsetInfo.offsetColumn) <= maxOffset.getSparkLit) + } + } private[core] def getDailyDataFrame(query: Query, infoDate: LocalDate): DataFrame = { val dateStr = dateFormatter.format(infoDate) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala index a62c300dd..070c7d7b5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala @@ -214,7 +214,7 @@ object ScheduleStrategyUtils { dependency.tables.foldLeft(false)((acc, table) => { bookkeeper.getLatestDataChunk(table, dateFrom, dateTo) match { case Some(dependencyUpdated) => - val isUpdatedRetrospectively = dependencyUpdated.jobFinished > lastUpdated.jobFinished + val isUpdatedRetrospectively = dependencyUpdated.jobFinished >= lastUpdated.jobFinished if (isUpdatedRetrospectively) { log.warn(s"Input table '$table' has updated retrospectively${renderPeriod(Option(dateFrom), Option(dateTo))}. " + s"Adding '$outputTable' to rerun for $infoDate.") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala index 2cf4d4f47..b5277a65a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala @@ -103,7 +103,15 @@ class SparkSource(val format: Option[String], SourceResult(df, filesRead) } - override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ??? + override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = { + val reader = getReader(query) + + val df = reader.getIncrementalDataRange(query, minOffset, maxOffset, infoDate, columns) + + val filesRead = getFilesRead(query, df) + + SourceResult(df, filesRead) + } private def getFilesRead(query: Query, df: DataFrame): Seq[String] = { query match { diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf index 5804d02cd..cc4443880 100644 --- a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf @@ -79,7 +79,7 @@ pramen.operations = [ type = "transformation" class = "za.co.absa.pramen.core.transformers.IdentityTransformer" - schedule.type = "incremental" + schedule.type = ${transformer.schedule} output.table = "table2" @@ -87,7 +87,6 @@ pramen.operations = [ { tables = [ table1 ] date.from = "@infoDate" - optional = true # Since no bookkeeping available the table will be seen as empty for the dependency manager } ] diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala index 7e7cf21f6..49489a7f4 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala @@ -34,7 +34,6 @@ class RuntimeConfigSuite extends AnyWordSpec { | } | undercover = true | use.lock = false - | track.updates = false | check.only.late.data = true | check.only.new.data = true | email.if.no.changes = false diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineSuite.scala index d326d41f3..4fcb6e1e5 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineSuite.scala @@ -69,7 +69,7 @@ class IncrementalPipelineSuite extends AnyWordSpec |""".stripMargin "work end to end as a normal run" in { - withTempDirectory("integration_file_based") { tempDir => + withTempDirectory("incremental1") { tempDir => val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) @@ -112,15 +112,70 @@ class IncrementalPipelineSuite extends AnyWordSpec } } + "work with incremental ingestion and normal transformer" in { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf = getConfig(tempDir, isTransformerIncremental = false) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") + val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") + val dfTable1Before = spark.read.parquet(table1Path.toString) + val dfTable2Before = spark.read.parquet(table2Path.toString) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expected1) + compareText(actualTable2Before, expected1) + + fsUtils.deleteFile(path1) + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val exitCode2 = AppRunner.runPipeline(conf) + assert(exitCode2 == 0) + + val dfTable1After = spark.read.parquet(table1Path.toString) + val dfTable2After = spark.read.parquet(table2Path.toString) + + val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() + + assert(batchIds.length == 2) + + val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1After, expected2) + compareText(actualTable2After, expected2) + } + } + "work end to end as rerun" in { } + + "fail to run for a historical date range" in { + + } + + "deal with uncommitted changes" in { + + } } "For inputs with information date the pipeline" should { "work end to end as a normal run" in { } + "work with incremental ingestion and normal transformer" in { + } + "work end to end as rerun" in { } @@ -138,15 +193,17 @@ class IncrementalPipelineSuite extends AnyWordSpec } } - def getConfig(basePath: String, isRerun: Boolean = false, useDataFrame: Boolean = false): Config = { + def getConfig(basePath: String, isRerun: Boolean = false, useDataFrame: Boolean = false, isTransformerIncremental: Boolean = true): Config = { val configContents = ResourceUtils.getResourceString("/test/config/incremental_pipeline.conf") val basePathEscaped = basePath.replace("\\", "\\\\") + val transformerSchedule = if (isTransformerIncremental) "incremental" else "daily" val conf = ConfigFactory.parseString( s"""base.path = "$basePathEscaped" |use.dataframe = $useDataFrame |pramen.runtime.is.rerun = $isRerun |pramen.current.date = "$infoDate" + |transformer.schedule = "$transformerSchedule" | |pramen.bookkeeping.jdbc { | driver = "$driver" diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala index 79b507bbc..bef060d6d 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, DataFrame} import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.api.{CachePolicy, DataFormat} import za.co.absa.pramen.core.OperationDefFactory import za.co.absa.pramen.core.app.config.InfoDateConfig @@ -389,7 +390,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, infoDate, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) val df1 = reader.getTable("table1", Some(infoDate), Some(infoDate)) @@ -403,7 +404,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table2" :: Nil, infoDate, isIncremental = false) + val reader = m.getMetastoreReader("table2" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) val ex = intercept[TableNotConfigured] { reader.getTable("table1", Some(infoDate), Some(infoDate)) @@ -419,7 +420,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, infoDate, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) val runInfo1 = reader.getTableRunInfo("table1", infoDate) val runInfo2 = reader.getTableRunInfo("table1", infoDate.plusDays(1)) @@ -437,7 +438,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, infoDate, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) val metadataManager = reader.metadataManager metadataManager.setMetadata("table1", infoDate, "key1", "value1") @@ -455,7 +456,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) m.saveTable("table1", infoDate.plusDays(1), getDf) - val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, infoDate.plusDays(10), isIncremental = false) + val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, infoDate.plusDays(10), TaskRunReason.New, isIncremental = false) val date1 = reader.getLatestAvailableDate("table1") val date2 = reader.getLatestAvailableDate("table1", Some(infoDate)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala index 85a572e18..d4bd0e8ca 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala @@ -17,7 +17,9 @@ package za.co.absa.pramen.core.mocks.metastore import org.apache.spark.sql.DataFrame -import za.co.absa.pramen.api.{DataFormat, MetaTableDef, MetaTableRunInfo, MetadataManager, MetastoreReader} +import za.co.absa.pramen.api.offset.DataOffset +import za.co.absa.pramen.api.status.TaskRunReason +import za.co.absa.pramen.api._ import za.co.absa.pramen.core.metadata.MetadataManagerNull import java.time.LocalDate @@ -52,6 +54,8 @@ class MetastoreReaderMock(tables: Seq[(String, DataFrame)], infoDate: LocalDate) tables.exists(_._1 == tableName) } + override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = Array.empty + override def getTableDef(tableName: String): MetaTableDef = { tables.find(_._1 == tableName) match { case Some((name, _)) => MetaTableDef(name, "", DataFormat.Null(), "pramen_info_date", "yyyy-MM-dd", "pramen_batchid", None, None, null, Map.empty[String, String], Map.empty[String, String]) @@ -62,4 +66,6 @@ class MetastoreReaderMock(tables: Seq[(String, DataFrame)], infoDate: LocalDate) override def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo] = None override def metadataManager: MetadataManager = metadata + + override def getRunReason: TaskRunReason = TaskRunReason.New } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index 547a0a37f..1c807bb63 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -18,6 +18,8 @@ package za.co.absa.pramen.core.mocks.metastore import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SaveMode} +import za.co.absa.pramen.api.offset.DataOffset +import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.api.{MetaTableDef, MetaTableRunInfo, MetadataManager, MetastoreReader} import za.co.absa.pramen.core.metadata.MetadataManagerNull import za.co.absa.pramen.core.metastore.model.MetaTable @@ -103,7 +105,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), stats } - override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, isIncremental: Boolean): MetastoreReader = { + override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, taskRunReason: TaskRunReason, isIncremental: Boolean): MetastoreReader = { val metastore = this new MetastoreReader { @@ -136,6 +138,8 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), metastore.isDataAvailable(tableName, fromDate, untilDate) } + override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = Array.empty + override def getTableDef(tableName: String): MetaTableDef = { validateTable(tableName) @@ -156,6 +160,8 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), override def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo] = None + override def getRunReason: TaskRunReason = TaskRunReason.New + override def metadataManager: MetadataManager = metadataManagerMock private def validateTable(tableName: String): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/transformers/IdentityTransformerSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/transformers/IdentityTransformerSuite.scala index 26783c2c3..7e3facee9 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/transformers/IdentityTransformerSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/transformers/IdentityTransformerSuite.scala @@ -36,7 +36,7 @@ class IdentityTransformerSuite extends AnyWordSpec with SparkTestBase with TextC "validate()" should { "pass when the mandatory option is present" in { - val (transformer, metastore) = getUseCase + val (transformer, metastore) = getUseCase() val outcome = transformer.validate(metastore, infoDateWithData, Map("input.table" -> "table1")) @@ -44,7 +44,7 @@ class IdentityTransformerSuite extends AnyWordSpec with SparkTestBase with TextC } "pass when the legacy mandatory option is present" in { - val (transformer, metastore) = getUseCase + val (transformer, metastore) = getUseCase() val outcome = transformer.validate(metastore, infoDateWithData, Map("table" -> "table1")) @@ -52,7 +52,7 @@ class IdentityTransformerSuite extends AnyWordSpec with SparkTestBase with TextC } "pass when empty is allowed" in { - val (transformer, metastore) = getUseCase + val (transformer, metastore) = getUseCase(true) val outcome = transformer.validate(metastore, infoDateWithEmptyDf, Map("table" -> "table1", "empty.allowed" -> "true")) @@ -60,7 +60,7 @@ class IdentityTransformerSuite extends AnyWordSpec with SparkTestBase with TextC } "return SkipOnce when empty is not allowed" in { - val (transformer, metastore) = getUseCase + val (transformer, metastore) = getUseCase(true) val outcome = transformer.validate(metastore, infoDateWithEmptyDf, Map("table" -> "table1", "empty.allowed" -> "false")) @@ -68,7 +68,7 @@ class IdentityTransformerSuite extends AnyWordSpec with SparkTestBase with TextC } "fail when the mandatory option is absent" in { - val (transformer, metastore) = getUseCase + val (transformer, metastore) = getUseCase() val ex = intercept[IllegalArgumentException] { transformer.validate(metastore, infoDateWithData, Map.empty) @@ -89,7 +89,7 @@ class IdentityTransformerSuite extends AnyWordSpec with SparkTestBase with TextC ||C |3 | |+---+---+ |""".stripMargin - val (transformer, metastore) = getUseCase + val (transformer, metastore) = getUseCase() val outcome = transformer.run(metastore, infoDateWithData, Map("table" -> "table1")) .orderBy("a") @@ -100,12 +100,16 @@ class IdentityTransformerSuite extends AnyWordSpec with SparkTestBase with TextC } } - def getUseCase: (IdentityTransformer, MetastoreReader) = { + def getUseCase(isEmptyToday: Boolean = false): (IdentityTransformer, MetastoreReader) = { val metastoreReadeMock = mock(classOf[MetastoreReader]) whenMock(metastoreReadeMock.getTable("table1")).thenReturn(exampleDf) whenMock(metastoreReadeMock.getTable("table1", Some(infoDateWithData), Some(infoDateWithData))).thenReturn(exampleDf) whenMock(metastoreReadeMock.getTable("table1", Some(infoDateWithEmptyDf), Some(infoDateWithEmptyDf))).thenReturn(emptyDf) + if (isEmptyToday) + whenMock(metastoreReadeMock.getCurrentBatch("table1")).thenReturn(emptyDf) + else + whenMock(metastoreReadeMock.getCurrentBatch("table1")).thenReturn(exampleDf) (new IdentityTransformer(), metastoreReadeMock) }