From 858138fa8ef4086566a7d23af8e4f98359bcec3b Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 27 Sep 2024 12:15:00 +0200 Subject: [PATCH] #374 Improve email notifications for incremental operations. --- .../co/absa/pramen/api/status/RunStatus.scala | 1 + .../PipelineNotificationBuilderHtml.scala | 13 ++- .../pipeline/IncrementalIngestionJob.scala | 98 +++++++++---------- .../core/reader/TableReaderJdbcNative.scala | 4 +- .../ScheduleStrategyIncremental.scala | 32 +++++- .../core/runner/task/TaskRunnerBase.scala | 1 + .../pramen/core/TaskNotificationFactory.scala | 2 +- .../TransientJobManagerSuite.scala | 2 +- .../pramen/core/mocks/RunStatusFactory.scala | 2 + .../pramen/core/mocks/TaskResultFactory.scala | 2 +- .../pramen/core/mocks/TestPrototypes.scala | 4 +- .../mocks/runner/ConcurrentJobRunnerSpy.scala | 2 +- .../tests/journal/TaskCompletedSuite.scala | 2 +- ...PipelineNotificationBuilderHtmlSuite.scala | 11 +++ .../tests/runner/task/RunStatusSuite.scala | 8 +- .../pramen/extras/mocks/TestPrototypes.scala | 2 +- 16 files changed, 110 insertions(+), 76 deletions(-) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RunStatus.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RunStatus.scala index 15d579494..da39e6d61 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RunStatus.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RunStatus.scala @@ -25,6 +25,7 @@ sealed trait RunStatus { object RunStatus { case class Succeeded(recordCountOld: Option[Long], recordCount: Long, + recordsAppended: Option[Long], sizeBytes: Option[Long], reason: TaskRunReason, filesRead: Seq[String], diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index d5bc35ab4..571e15c99 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -25,7 +25,6 @@ import za.co.absa.pramen.api.{FieldChange, SchemaDifference} import za.co.absa.pramen.core.config.Keys.TIMEZONE import za.co.absa.pramen.core.exceptions.{CmdFailedException, ProcessFailedException} import za.co.absa.pramen.core.notify.message._ -import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml.{MIN_MEGABYTES, MIN_RPS_JOB_DURATION_SECONDS, MIN_RPS_RECORDS} import za.co.absa.pramen.core.utils.JvmUtils.getShortExceptionDescription import za.co.absa.pramen.core.utils.StringUtils.renderThrowable import za.co.absa.pramen.core.utils.{BuildPropertyUtils, ConfigUtils, StringUtils, TimeUtils} @@ -545,7 +544,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot } private[core] def getRecordCountText(task: TaskResult): String = { - def renderDifference(numRecords: Long, numRecordsOld: Option[Long]): String = { + def renderDifference(numRecords: Long, numRecordsOld: Option[Long], numRecordsAppended: Option[Long]): String = { numRecordsOld match { case Some(old) if old > 0 => val diff = numRecords - old @@ -556,7 +555,11 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot else { numRecords.toString } - case _ => numRecords.toString + case _ => + numRecordsAppended match { + case Some(n) => s"$numRecords (+$n)" + case None => numRecords.toString + } } } @@ -564,8 +567,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot "-" } else { task.runStatus match { - case s: Succeeded => renderDifference(s.recordCount, s.recordCountOld) - case d: InsufficientData => renderDifference(d.actual, d.recordCountOld) + case s: Succeeded => renderDifference(s.recordCount, s.recordCountOld, s.recordsAppended) + case d: InsufficientData => renderDifference(d.actual, d.recordCountOld, None) case _ => "" } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 41f046e8e..c37e673f7 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -63,48 +63,42 @@ class IncrementalIngestionJob(operationDef: OperationDef, handleUncommittedOffsets(om, metastore, infoDate, uncommittedOffsets) } - val hasInfoDateColumn = source.hasInfoDateColumn(sourceTable.query) - if (hasInfoDateColumn && runReason == TaskRunReason.Rerun) { - super.preRunCheckJob(infoDate, runReason, jobConfig, dependencyWarnings) - } else { - JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil) - } + JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil) } override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { val hasInfoDate = source.hasInfoDateColumn(sourceTable.query) - if (source.getOffsetInfo.nonEmpty) { - if (runReason == TaskRunReason.Rerun) { - if (hasInfoDate) { - log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate'.") - Reason.Ready - } else { - val om = bookkeeper.getOffsetManager - - om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match { - case Some(offsets) => - log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate' for ${offsets.minimumOffset.valueString} < offsets <= ${offsets.maximumOffset.valueString}.") - Reason.Ready - case None => - log.info(s"Offsets not found for '${outputTable.name}' at '$infoDate'.") - Reason.SkipOnce("No offsets registered") - } + val isReRun = runReason == TaskRunReason.Rerun + + if (source.getOffsetInfo.isEmpty) { + return Reason.NotReady(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'") + } + + (hasInfoDate, isReRun) match { + case (false, false) => + latestOffset match { + case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => + log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.") + Reason.Skip("Incremental ingestion cannot be retrospective") + case _ => + Reason.Ready } - } else { - if (hasInfoDate) { - Reason.Ready - } else { - latestOffset match { - case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => - log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.") - Reason.Skip("Incremental ingestion cannot be retrospective") - case _ => - Reason.Ready - } + case (false, true) => + val om = bookkeeper.getOffsetManager + + om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match { + case Some(offsets) => + log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate' for ${offsets.minimumOffset.valueString} < offsets <= ${offsets.maximumOffset.valueString}.") + Reason.Ready + case None => + log.info(s"Offsets not found for '${outputTable.name}' at '$infoDate'.") + Reason.SkipOnce("No offsets registered") } - } - } else { - Reason.NotReady(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'") + case (true, false) => + Reason.Ready + case (true, true) => + log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate'.") + Reason.Ready } } @@ -116,24 +110,17 @@ class IncrementalIngestionJob(operationDef: OperationDef, } val hasInfoDate = source.hasInfoDateColumn(sourceTable.query) + val isReRun = runReason == TaskRunReason.Rerun - val sourceResult = if (hasInfoDate) { - if (runReason == TaskRunReason.Rerun) { - source.getData(sourceTable.query, infoDate, infoDate, columns) - } else { - val om = bookkeeper.getOffsetManager - val infoDateLatestOffset = om.getMaxInfoDateAndOffset(outputTable.name, Some(infoDate)) - infoDateLatestOffset match { + val sourceResult = (hasInfoDate, isReRun) match { + case (false, false) => + latestOffset match { case Some(maxOffset) => - log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for offset > ${maxOffset.maximumOffset.valueString}.") - source.getDataIncremental(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns) + source.getDataIncremental(sourceTable.query, None, Option(maxOffset.maximumOffset), None, columns) case None => - log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for all data available at the day.") source.getData(sourceTable.query, infoDate, infoDate, columns) } - } - } else { - if (runReason == TaskRunReason.Rerun) { + case (false, true) => val om = bookkeeper.getOffsetManager om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match { @@ -142,14 +129,19 @@ class IncrementalIngestionJob(operationDef: OperationDef, case None => throw new IllegalStateException(s"No offsets for '${outputTable.name}' for '$infoDate'. Cannot rerun.") } - } else { - latestOffset match { + case (true, false) => + val om = bookkeeper.getOffsetManager + val infoDateLatestOffset = om.getMaxInfoDateAndOffset(outputTable.name, Some(infoDate)) + infoDateLatestOffset match { case Some(maxOffset) => - source.getDataIncremental(sourceTable.query, None, Option(maxOffset.maximumOffset), None, columns) + log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for offset > ${maxOffset.maximumOffset.valueString}.") + source.getDataIncremental(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns) case None => + log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for all data available at the day.") source.getData(sourceTable.query, infoDate, infoDate, columns) } - } + case (true, true) => + source.getData(sourceTable.query, infoDate, infoDate, columns) } val sanitizedDf = sanitizeDfColumns(sourceResult.data, specialCharacters) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala index b4a6bbc63..632764cbb 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala @@ -20,9 +20,9 @@ import com.typesafe.config.Config import org.apache.spark.sql.{DataFrame, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api.Query +import za.co.absa.pramen.api.offset.OffsetValue import za.co.absa.pramen.core.expr.DateExprEvaluator -import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue} -import za.co.absa.pramen.core.reader.model.{JdbcConfig, OffsetInfoParser, TableReaderJdbcConfig} +import za.co.absa.pramen.core.reader.model.{JdbcConfig, TableReaderJdbcConfig} import za.co.absa.pramen.core.utils.{JdbcNativeUtils, JdbcSparkUtils, StringUtils, TimeUtils} import java.time.{Instant, LocalDate} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala index 7197536af..b809514a7 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala @@ -40,13 +40,37 @@ class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], has minimumDate: LocalDate ): Seq[TaskPreDef] = { val dates = params match { - case ScheduleParams.Normal(runDate, _, _, _, _) => + case ScheduleParams.Normal(runDate, trackDays, _, _, _) => val infoDate = evaluateRunDate(runDate, infoDateExpression) log.info(s"Normal run strategy: runDate=$runDate, infoDate=$infoDate") - val runInfoDays = if (hasInfoDateColumn) - Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New)) - else { + val runInfoDays = if (hasInfoDateColumn) { + lastOffsets match { + case Some(lastOffset) => + if (lastOffset.maximumInfoDate.isBefore(infoDate)) { + val startDate = if (trackDays > 1) { + val trackDate = infoDate.minusDays(trackDays - 1) + val date = if (trackDate.isAfter(lastOffset.maximumInfoDate)) + trackDate + else + lastOffset.maximumInfoDate + log.warn(s"Last ran day: ${lastOffset.maximumInfoDate}. Tracking days = '$trackDate'. Catching up from '$date' until '$infoDate'.") + date + } else { + log.warn(s"Last ran day: ${lastOffset.maximumInfoDate}. Catching up data until '$infoDate'.") + lastOffset.maximumInfoDate + } + + val potentialDates = getInfoDateRange(startDate, infoDate, "@runDate", schedule) + potentialDates.map(date => { + TaskPreDef(date, TaskRunReason.New) + }) + } else { + Seq(TaskPreDef(infoDate, TaskRunReason.New)) + } + case None => Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New)) + } + } else { lastOffsets match { case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => Seq.empty case _ => Seq(TaskPreDef(infoDate, TaskRunReason.New)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index 104278834..812d72efa 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -397,6 +397,7 @@ abstract class TaskRunnerBase(conf: Config, MetaTable.getMetaTableDef(task.job.outputTable), RunStatus.Succeeded(recordCountOldOpt, stats.recordCount, + stats.recordCountAppended, stats.dataSizeBytes, completionReason, runResult.filesRead, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala index 1e8920325..3174dc0d7 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala @@ -29,7 +29,7 @@ object TaskNotificationFactory { Instant.ofEpochMilli(1613600000000L), Instant.ofEpochMilli(1672759508000L) )), - status: RunStatus = RunStatus.Succeeded(None, 100, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty), + status: RunStatus = RunStatus.Succeeded(None, 100, None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty), applicationId: String = "app_12345", isTransient: Boolean = false, isRawFilesJob: Boolean = false, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/TransientJobManagerSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/TransientJobManagerSuite.scala index f5f130105..044673bf8 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/TransientJobManagerSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/TransientJobManagerSuite.scala @@ -166,7 +166,7 @@ class TransientJobManagerSuite extends AnyWordSpec with BeforeAndAfterAll with S "runs a job via teh task runner and return the dataframe" in { val taskRunner = mock(classOf[TaskRunner]) val job = mock(classOf[Job]) - val successStatus = RunStatus.Succeeded(None, 1, None, TaskRunReason.OnRequest, Nil, Nil, Nil, Nil) + val successStatus = RunStatus.Succeeded(None, 1, None, None, TaskRunReason.OnRequest, Nil, Nil, Nil, Nil) whenMock(taskRunner.runLazyTask(job, infoDate)).thenReturn(successStatus) whenMock(job.outputTable).thenReturn(MetaTableFactory.getDummyMetaTable("table1", format = DataFormat.Transient(CachePolicy.NoCache))) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala index 784f4840c..9d127a354 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala @@ -21,6 +21,7 @@ import za.co.absa.pramen.api.status.{RunStatus, TaskRunReason} object RunStatusFactory { def getDummySuccess(recordCountOld: Option[Long] = None, recordCount: Long = 1000, + recordsAppended: Option[Long] = None, sizeBytes: Option[Long] = None, reason: TaskRunReason = TaskRunReason.New, filesRead: Seq[String] = Nil, @@ -29,6 +30,7 @@ object RunStatusFactory { warnings: Seq[String] = Nil): RunStatus.Succeeded = { RunStatus.Succeeded(recordCountOld, recordCount, + recordsAppended, sizeBytes, reason, filesRead, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala index c740e5d8b..5d5d2875c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala @@ -25,7 +25,7 @@ import java.time.{Instant, LocalDate} object TaskResultFactory { def getDummyTaskResult(jobName: String = "DummyJob", outputTable: MetaTableDef = MetaTable.getMetaTableDef(MetaTableFactory.getDummyMetaTable(name = "table_out")), - runStatus: RunStatus = RunStatus.Succeeded(Some(100), 200, Some(1000), TaskRunReason.New, Nil, Nil, Nil, Nil), + runStatus: RunStatus = RunStatus.Succeeded(Some(100), 200, None, Some(1000), TaskRunReason.New, Nil, Nil, Nil, Nil), runInfo: Option[RunInfo] = Some(RunInfo(LocalDate.of(2022, 2, 18), Instant.ofEpochSecond(1234), Instant.ofEpochSecond(5678))), applicationId: String = "app_123", isTransient: Boolean = false, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala index 4764b9ff4..50633de6f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala @@ -20,10 +20,10 @@ import za.co.absa.pramen.api.status.{RunStatus, TaskRunReason} object TestPrototypes { - val runStatusSuccess: RunStatus = RunStatus.Succeeded(Some(100), 200, Some(1000), TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty) + val runStatusSuccess: RunStatus = RunStatus.Succeeded(Some(100), 200, None, Some(1000), TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty) val runStatusWarning: RunStatus = RunStatus.Succeeded( - Some(10000), 20000, Some(100000), TaskRunReason.New, Seq("file1.txt", "file1.ctl"), + Some(10000), 20000, None, Some(100000), TaskRunReason.New, Seq("file1.txt", "file1.ctl"), Seq("file1.csv", "file2.csv"), Seq("`db`.`table1`"), Seq("Test warning") ) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala index 52464e331..649e1ef97 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala @@ -68,7 +68,7 @@ class ConcurrentJobRunnerSpy(includeFails: Boolean = false, var idx = 0 incomingJobs.foreach(job => { val status = if (!includeFails || idx % 3 == 0) { - RunStatus.Succeeded(Some(1000), 500, Some(10000), TaskRunReason.New, Nil, Nil, Nil, Nil) + RunStatus.Succeeded(Some(1000), 500, None, Some(10000), TaskRunReason.New, Nil, Nil, Nil, Nil) } else if (idx % 3 == 1) { RunStatus.Failed(new RuntimeException("Dummy exception")) } else { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala index d799e2f42..f646188a0 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala @@ -50,7 +50,7 @@ class TaskCompletedSuite extends AnyWordSpec { val taskResult = TaskResult( job.name, MetaTable.getMetaTableDef(job.outputTable), - RunStatus.Succeeded(Some(1000), 2000, Some(3000), runReason, Nil, Nil, Nil, Nil), + RunStatus.Succeeded(Some(1000), 2000, None, Some(3000), runReason, Nil, Nil, Nil, Nil), Some(RunInfo(infoDate, now.minusSeconds(10), now)), "app_123", isTransient = false, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala index b3210dcad..b755d2e2a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala @@ -518,6 +518,17 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis assert(actual == "90 (-10)") } + "work for successful appends" in { + val builder = getBuilder() + + val runStatus = RunStatusFactory.getDummySuccess(None, 110, Some(10), reason = TaskRunReason.Update) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) + + val actual = builder.getRecordCountText(task) + + assert(actual == "110 (+10)") + } + "work for insufficient data" in { val builder = getBuilder() diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/RunStatusSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/RunStatusSuite.scala index 30d4595d0..1135cf968 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/RunStatusSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/RunStatusSuite.scala @@ -23,12 +23,12 @@ class RunStatusSuite extends AnyWordSpec { "toString" should { "Succeeded" when { "New" in { - val status = RunStatus.Succeeded(None, 0, None, TaskRunReason.New, Nil, Nil, Nil, Nil) + val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.New, Nil, Nil, Nil, Nil) assert(status.toString == "New") } "Update" in { - val status = RunStatus.Succeeded(None, 0, None, TaskRunReason.Update, Nil, Nil, Nil, Nil) + val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.Update, Nil, Nil, Nil, Nil) assert(status.toString == "Update") } @@ -85,7 +85,7 @@ class RunStatusSuite extends AnyWordSpec { "isFailure" should { "Succeeded" in { - val status = RunStatus.Succeeded(None, 0, None, TaskRunReason.New, Nil, Nil, Nil, Nil) + val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.New, Nil, Nil, Nil, Nil) assert(!status.isFailure) } @@ -165,7 +165,7 @@ class RunStatusSuite extends AnyWordSpec { "getReason" should { "Succeeded" in { - val status = RunStatus.Succeeded(None, 0, None, TaskRunReason.New, Nil, Nil, Nil, Nil) + val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.New, Nil, Nil, Nil, Nil) assert(status.getReason().isEmpty) } diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala index 726bf2008..17a332af6 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala @@ -40,7 +40,7 @@ object TestPrototypes { Map.empty, Map.empty) - val taskStatus: RunStatus = RunStatus.Succeeded(None, 100, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty) + val taskStatus: RunStatus = RunStatus.Succeeded(None, 100, None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty) val taskNotification: TaskResult = status.TaskResult( "Dummy Job",