Skip to content

Commit

Permalink
#374 Fix the way retrospective updates are determined.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 30, 2024
1 parent 7da2627 commit 6f13121
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ class MetastoreImpl(appConfig: Config,

val finish = Instant.now.getEpochSecond

if (!skipBookKeepingUpdates) {
val nothingAppended = stats.recordCountAppended.contains(0)

if (!skipBookKeepingUpdates && !nothingAppended) {
bookkeeper.setRecordCount(tableName, infoDate, infoDate, infoDate, inputRecordCount.getOrElse(stats.recordCount), stats.recordCount, start, finish, isTransient)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,12 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
} else {
if (status.warnings.nonEmpty)
TextElement("Warning", style)
else
TextElement("Success", style)
else {
if (status.recordsAppended.contains(0))
TextElement("No new data", style)
else
TextElement("Success", style)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.schedule.Schedule
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.TimeUtils
import za.co.absa.pramen.core.utils.{Emoji, TimeUtils}

import java.time.{Instant, LocalDate}
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -126,7 +126,8 @@ abstract class JobBase(operationDef: OperationDef,
val outOfDateTables = getOutdatedTables(infoDate, chunk.jobFinished)
if (outOfDateTables.nonEmpty) {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate, but has outdated tables: ${outOfDateTables.mkString(", ")}")
Some(JobPreRunResult(JobPreRunStatus.NeedsUpdate, None, dependencyWarnings, Seq.empty[String]))
val warning = s"Based on outdated tables: ${outOfDateTables.mkString(", ")}"
Some(JobPreRunResult(JobPreRunStatus.NeedsUpdate, None, dependencyWarnings, Seq(warning)))
} else {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.")
Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String]))
Expand All @@ -143,12 +144,12 @@ abstract class JobBase(operationDef: OperationDef,
.flatMap(_.tables)
.distinct
.filter { table =>
bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match {
bookkeeper.getLatestDataChunk(table, infoDate, infoDate) match {
case Some(chunk) if chunk.jobFinished >= targetJobFinishedSeconds =>
log.warn(s"The dependent table '$table' has been updated at ${Instant.ofEpochSecond(chunk.jobFinished)} retrospectively " +
log.warn(s"${Emoji.WARNING} The dependent table '$table' has been updated at ${Instant.ofEpochSecond(chunk.jobFinished)} retrospectively " +
s"after the transformation at ${Instant.ofEpochSecond(targetJobFinishedSeconds)} .")
true
case Some(chunk) =>
case Some(_) =>
false
}
}
Expand Down

0 comments on commit 6f13121

Please sign in to comment.