Skip to content

Commit

Permalink
#374 Improve email notifications for incremental operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 30, 2024
1 parent 8b3ed71 commit 858138f
Show file tree
Hide file tree
Showing 16 changed files with 110 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sealed trait RunStatus {
object RunStatus {
case class Succeeded(recordCountOld: Option[Long],
recordCount: Long,
recordsAppended: Option[Long],
sizeBytes: Option[Long],
reason: TaskRunReason,
filesRead: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import za.co.absa.pramen.api.{FieldChange, SchemaDifference}
import za.co.absa.pramen.core.config.Keys.TIMEZONE
import za.co.absa.pramen.core.exceptions.{CmdFailedException, ProcessFailedException}
import za.co.absa.pramen.core.notify.message._
import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml.{MIN_MEGABYTES, MIN_RPS_JOB_DURATION_SECONDS, MIN_RPS_RECORDS}
import za.co.absa.pramen.core.utils.JvmUtils.getShortExceptionDescription
import za.co.absa.pramen.core.utils.StringUtils.renderThrowable
import za.co.absa.pramen.core.utils.{BuildPropertyUtils, ConfigUtils, StringUtils, TimeUtils}
Expand Down Expand Up @@ -545,7 +544,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}

private[core] def getRecordCountText(task: TaskResult): String = {
def renderDifference(numRecords: Long, numRecordsOld: Option[Long]): String = {
def renderDifference(numRecords: Long, numRecordsOld: Option[Long], numRecordsAppended: Option[Long]): String = {
numRecordsOld match {
case Some(old) if old > 0 =>
val diff = numRecords - old
Expand All @@ -556,16 +555,20 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
else {
numRecords.toString
}
case _ => numRecords.toString
case _ =>
numRecordsAppended match {
case Some(n) => s"$numRecords (+$n)"
case None => numRecords.toString
}
}
}

if (task.isRawFilesJob) {
"-"
} else {
task.runStatus match {
case s: Succeeded => renderDifference(s.recordCount, s.recordCountOld)
case d: InsufficientData => renderDifference(d.actual, d.recordCountOld)
case s: Succeeded => renderDifference(s.recordCount, s.recordCountOld, s.recordsAppended)
case d: InsufficientData => renderDifference(d.actual, d.recordCountOld, None)
case _ => ""
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,48 +63,42 @@ class IncrementalIngestionJob(operationDef: OperationDef,
handleUncommittedOffsets(om, metastore, infoDate, uncommittedOffsets)
}

val hasInfoDateColumn = source.hasInfoDateColumn(sourceTable.query)
if (hasInfoDateColumn && runReason == TaskRunReason.Rerun) {
super.preRunCheckJob(infoDate, runReason, jobConfig, dependencyWarnings)
} else {
JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil)
}
JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil)
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
val hasInfoDate = source.hasInfoDateColumn(sourceTable.query)
if (source.getOffsetInfo.nonEmpty) {
if (runReason == TaskRunReason.Rerun) {
if (hasInfoDate) {
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate'.")
Reason.Ready
} else {
val om = bookkeeper.getOffsetManager

om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match {
case Some(offsets) =>
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate' for ${offsets.minimumOffset.valueString} < offsets <= ${offsets.maximumOffset.valueString}.")
Reason.Ready
case None =>
log.info(s"Offsets not found for '${outputTable.name}' at '$infoDate'.")
Reason.SkipOnce("No offsets registered")
}
val isReRun = runReason == TaskRunReason.Rerun

if (source.getOffsetInfo.isEmpty) {
return Reason.NotReady(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'")
}

(hasInfoDate, isReRun) match {
case (false, false) =>
latestOffset match {
case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) =>
log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.")
Reason.Skip("Incremental ingestion cannot be retrospective")
case _ =>
Reason.Ready
}
} else {
if (hasInfoDate) {
Reason.Ready
} else {
latestOffset match {
case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) =>
log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.")
Reason.Skip("Incremental ingestion cannot be retrospective")
case _ =>
Reason.Ready
}
case (false, true) =>
val om = bookkeeper.getOffsetManager

om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match {
case Some(offsets) =>
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate' for ${offsets.minimumOffset.valueString} < offsets <= ${offsets.maximumOffset.valueString}.")
Reason.Ready
case None =>
log.info(s"Offsets not found for '${outputTable.name}' at '$infoDate'.")
Reason.SkipOnce("No offsets registered")
}
}
} else {
Reason.NotReady(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'")
case (true, false) =>
Reason.Ready
case (true, true) =>
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate'.")
Reason.Ready
}
}

Expand All @@ -116,24 +110,17 @@ class IncrementalIngestionJob(operationDef: OperationDef,
}

val hasInfoDate = source.hasInfoDateColumn(sourceTable.query)
val isReRun = runReason == TaskRunReason.Rerun

val sourceResult = if (hasInfoDate) {
if (runReason == TaskRunReason.Rerun) {
source.getData(sourceTable.query, infoDate, infoDate, columns)
} else {
val om = bookkeeper.getOffsetManager
val infoDateLatestOffset = om.getMaxInfoDateAndOffset(outputTable.name, Some(infoDate))
infoDateLatestOffset match {
val sourceResult = (hasInfoDate, isReRun) match {
case (false, false) =>
latestOffset match {
case Some(maxOffset) =>
log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for offset > ${maxOffset.maximumOffset.valueString}.")
source.getDataIncremental(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns)
source.getDataIncremental(sourceTable.query, None, Option(maxOffset.maximumOffset), None, columns)
case None =>
log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for all data available at the day.")
source.getData(sourceTable.query, infoDate, infoDate, columns)
}
}
} else {
if (runReason == TaskRunReason.Rerun) {
case (false, true) =>
val om = bookkeeper.getOffsetManager

om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match {
Expand All @@ -142,14 +129,19 @@ class IncrementalIngestionJob(operationDef: OperationDef,
case None =>
throw new IllegalStateException(s"No offsets for '${outputTable.name}' for '$infoDate'. Cannot rerun.")
}
} else {
latestOffset match {
case (true, false) =>
val om = bookkeeper.getOffsetManager
val infoDateLatestOffset = om.getMaxInfoDateAndOffset(outputTable.name, Some(infoDate))
infoDateLatestOffset match {
case Some(maxOffset) =>
source.getDataIncremental(sourceTable.query, None, Option(maxOffset.maximumOffset), None, columns)
log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for offset > ${maxOffset.maximumOffset.valueString}.")
source.getDataIncremental(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns)
case None =>
log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for all data available at the day.")
source.getData(sourceTable.query, infoDate, infoDate, columns)
}
}
case (true, true) =>
source.getData(sourceTable.query, infoDate, infoDate, columns)
}

val sanitizedDf = sanitizeDfColumns(sourceResult.data, specialCharacters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.Query
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.core.expr.DateExprEvaluator
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}
import za.co.absa.pramen.core.reader.model.{JdbcConfig, OffsetInfoParser, TableReaderJdbcConfig}
import za.co.absa.pramen.core.reader.model.{JdbcConfig, TableReaderJdbcConfig}
import za.co.absa.pramen.core.utils.{JdbcNativeUtils, JdbcSparkUtils, StringUtils, TimeUtils}

import java.time.{Instant, LocalDate}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,37 @@ class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], has
minimumDate: LocalDate
): Seq[TaskPreDef] = {
val dates = params match {
case ScheduleParams.Normal(runDate, _, _, _, _) =>
case ScheduleParams.Normal(runDate, trackDays, _, _, _) =>
val infoDate = evaluateRunDate(runDate, infoDateExpression)
log.info(s"Normal run strategy: runDate=$runDate, infoDate=$infoDate")

val runInfoDays = if (hasInfoDateColumn)
Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New))
else {
val runInfoDays = if (hasInfoDateColumn) {
lastOffsets match {
case Some(lastOffset) =>
if (lastOffset.maximumInfoDate.isBefore(infoDate)) {
val startDate = if (trackDays > 1) {
val trackDate = infoDate.minusDays(trackDays - 1)
val date = if (trackDate.isAfter(lastOffset.maximumInfoDate))
trackDate
else
lastOffset.maximumInfoDate
log.warn(s"Last ran day: ${lastOffset.maximumInfoDate}. Tracking days = '$trackDate'. Catching up from '$date' until '$infoDate'.")
date
} else {
log.warn(s"Last ran day: ${lastOffset.maximumInfoDate}. Catching up data until '$infoDate'.")
lastOffset.maximumInfoDate
}

val potentialDates = getInfoDateRange(startDate, infoDate, "@runDate", schedule)
potentialDates.map(date => {
TaskPreDef(date, TaskRunReason.New)
})
} else {
Seq(TaskPreDef(infoDate, TaskRunReason.New))
}
case None => Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New))
}
} else {
lastOffsets match {
case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => Seq.empty
case _ => Seq(TaskPreDef(infoDate, TaskRunReason.New))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ abstract class TaskRunnerBase(conf: Config,
MetaTable.getMetaTableDef(task.job.outputTable),
RunStatus.Succeeded(recordCountOldOpt,
stats.recordCount,
stats.recordCountAppended,
stats.dataSizeBytes,
completionReason,
runResult.filesRead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object TaskNotificationFactory {
Instant.ofEpochMilli(1613600000000L),
Instant.ofEpochMilli(1672759508000L)
)),
status: RunStatus = RunStatus.Succeeded(None, 100, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty),
status: RunStatus = RunStatus.Succeeded(None, 100, None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty),
applicationId: String = "app_12345",
isTransient: Boolean = false,
isRawFilesJob: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class TransientJobManagerSuite extends AnyWordSpec with BeforeAndAfterAll with S
"runs a job via teh task runner and return the dataframe" in {
val taskRunner = mock(classOf[TaskRunner])
val job = mock(classOf[Job])
val successStatus = RunStatus.Succeeded(None, 1, None, TaskRunReason.OnRequest, Nil, Nil, Nil, Nil)
val successStatus = RunStatus.Succeeded(None, 1, None, None, TaskRunReason.OnRequest, Nil, Nil, Nil, Nil)

whenMock(taskRunner.runLazyTask(job, infoDate)).thenReturn(successStatus)
whenMock(job.outputTable).thenReturn(MetaTableFactory.getDummyMetaTable("table1", format = DataFormat.Transient(CachePolicy.NoCache)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import za.co.absa.pramen.api.status.{RunStatus, TaskRunReason}
object RunStatusFactory {
def getDummySuccess(recordCountOld: Option[Long] = None,
recordCount: Long = 1000,
recordsAppended: Option[Long] = None,
sizeBytes: Option[Long] = None,
reason: TaskRunReason = TaskRunReason.New,
filesRead: Seq[String] = Nil,
Expand All @@ -29,6 +30,7 @@ object RunStatusFactory {
warnings: Seq[String] = Nil): RunStatus.Succeeded = {
RunStatus.Succeeded(recordCountOld,
recordCount,
recordsAppended,
sizeBytes,
reason,
filesRead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.time.{Instant, LocalDate}
object TaskResultFactory {
def getDummyTaskResult(jobName: String = "DummyJob",
outputTable: MetaTableDef = MetaTable.getMetaTableDef(MetaTableFactory.getDummyMetaTable(name = "table_out")),
runStatus: RunStatus = RunStatus.Succeeded(Some(100), 200, Some(1000), TaskRunReason.New, Nil, Nil, Nil, Nil),
runStatus: RunStatus = RunStatus.Succeeded(Some(100), 200, None, Some(1000), TaskRunReason.New, Nil, Nil, Nil, Nil),
runInfo: Option[RunInfo] = Some(RunInfo(LocalDate.of(2022, 2, 18), Instant.ofEpochSecond(1234), Instant.ofEpochSecond(5678))),
applicationId: String = "app_123",
isTransient: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import za.co.absa.pramen.api.status.{RunStatus, TaskRunReason}

object TestPrototypes {

val runStatusSuccess: RunStatus = RunStatus.Succeeded(Some(100), 200, Some(1000), TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty)
val runStatusSuccess: RunStatus = RunStatus.Succeeded(Some(100), 200, None, Some(1000), TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty)

val runStatusWarning: RunStatus = RunStatus.Succeeded(
Some(10000), 20000, Some(100000), TaskRunReason.New, Seq("file1.txt", "file1.ctl"),
Some(10000), 20000, None, Some(100000), TaskRunReason.New, Seq("file1.txt", "file1.ctl"),
Seq("file1.csv", "file2.csv"), Seq("`db`.`table1`"), Seq("Test warning")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ConcurrentJobRunnerSpy(includeFails: Boolean = false,
var idx = 0
incomingJobs.foreach(job => {
val status = if (!includeFails || idx % 3 == 0) {
RunStatus.Succeeded(Some(1000), 500, Some(10000), TaskRunReason.New, Nil, Nil, Nil, Nil)
RunStatus.Succeeded(Some(1000), 500, None, Some(10000), TaskRunReason.New, Nil, Nil, Nil, Nil)
} else if (idx % 3 == 1) {
RunStatus.Failed(new RuntimeException("Dummy exception"))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TaskCompletedSuite extends AnyWordSpec {
val taskResult = TaskResult(
job.name,
MetaTable.getMetaTableDef(job.outputTable),
RunStatus.Succeeded(Some(1000), 2000, Some(3000), runReason, Nil, Nil, Nil, Nil),
RunStatus.Succeeded(Some(1000), 2000, None, Some(3000), runReason, Nil, Nil, Nil, Nil),
Some(RunInfo(infoDate, now.minusSeconds(10), now)),
"app_123",
isTransient = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,17 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis
assert(actual == "90 (-10)")
}

"work for successful appends" in {
val builder = getBuilder()

val runStatus = RunStatusFactory.getDummySuccess(None, 110, Some(10), reason = TaskRunReason.Update)
val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus)

val actual = builder.getRecordCountText(task)

assert(actual == "110 (+10)")
}

"work for insufficient data" in {
val builder = getBuilder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ class RunStatusSuite extends AnyWordSpec {
"toString" should {
"Succeeded" when {
"New" in {
val status = RunStatus.Succeeded(None, 0, None, TaskRunReason.New, Nil, Nil, Nil, Nil)
val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.New, Nil, Nil, Nil, Nil)

assert(status.toString == "New")
}
"Update" in {
val status = RunStatus.Succeeded(None, 0, None, TaskRunReason.Update, Nil, Nil, Nil, Nil)
val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.Update, Nil, Nil, Nil, Nil)

assert(status.toString == "Update")
}
Expand Down Expand Up @@ -85,7 +85,7 @@ class RunStatusSuite extends AnyWordSpec {

"isFailure" should {
"Succeeded" in {
val status = RunStatus.Succeeded(None, 0, None, TaskRunReason.New, Nil, Nil, Nil, Nil)
val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.New, Nil, Nil, Nil, Nil)

assert(!status.isFailure)
}
Expand Down Expand Up @@ -165,7 +165,7 @@ class RunStatusSuite extends AnyWordSpec {

"getReason" should {
"Succeeded" in {
val status = RunStatus.Succeeded(None, 0, None, TaskRunReason.New, Nil, Nil, Nil, Nil)
val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.New, Nil, Nil, Nil, Nil)

assert(status.getReason().isEmpty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object TestPrototypes {
Map.empty,
Map.empty)

val taskStatus: RunStatus = RunStatus.Succeeded(None, 100, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty)
val taskStatus: RunStatus = RunStatus.Succeeded(None, 100, None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty)

val taskNotification: TaskResult = status.TaskResult(
"Dummy Job",
Expand Down

0 comments on commit 858138f

Please sign in to comment.