Skip to content

Commit

Permalink
#421 Allow transient non-cached jobs not return record count.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 30, 2024
1 parent ab0ee24 commit 09420c2
Show file tree
Hide file tree
Showing 37 changed files with 134 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ sealed trait RunStatus {

object RunStatus {
case class Succeeded(recordCountOld: Option[Long],
recordCount: Long,
recordCount: Option[Long],
recordsAppended: Option[Long],
sizeBytes: Option[Long],
reason: TaskRunReason,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class JournalHadoopCsv(journalPath: String)
val periodEnd = t.periodEnd.format(dateFormatter)
val infoDate = t.informationDate.format(dateFormatter)

val inputRecordCount = t.inputRecordCount.map(_.toString).getOrElse("")
val inputRecordCountOld = t.inputRecordCountOld.map(_.toString).getOrElse("")
val outputRecordCount = t.outputRecordCount.map(_.toString).getOrElse("")
val outputRecordCountOld = t.outputRecordCountOld.map(_.toString).getOrElse("")
val appendedRecordCount = t.appendedRecordCount.map(_.toString).getOrElse("")
Expand All @@ -106,8 +108,8 @@ class JournalHadoopCsv(journalPath: String)
periodBegin ::
periodEnd ::
infoDate ::
t.inputRecordCount ::
t.inputRecordCountOld ::
inputRecordCount ::
inputRecordCountOld ::
outputRecordCount ::
outputRecordCountOld ::
appendedRecordCount ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class JournalJdbc(db: Database) extends Journal {
periodBegin,
periodEnd,
infoDate,
entry.inputRecordCount,
entry.inputRecordCountOld,
entry.inputRecordCount.getOrElse(-1L),
entry.inputRecordCountOld.getOrElse(-1L),
entry.outputRecordCount,
entry.outputRecordCountOld,
entry.appendedRecordCount,
Expand Down Expand Up @@ -73,14 +73,16 @@ class JournalJdbc(db: Database) extends Journal {
val entries = SlickUtils.executeQuery(db, JournalTasks.journalTasks.filter(d => d.finishedAt >= fromSec && d.finishedAt <= toSec ))

entries.map(v => {
val recordCountOpt = if (v.inputRecordCount < 0) None else Option(v.inputRecordCount)
val recordCountOldOpt = if (v.inputRecordCountOld < 0) None else Option(v.inputRecordCountOld)
model.TaskCompleted(
jobName = v.jobName,
tableName = v.pramenTableName,
periodBegin = LocalDate.parse(v.periodBegin, dateFormatter),
periodEnd = LocalDate.parse(v.periodEnd, dateFormatter),
informationDate = LocalDate.parse(v.informationDate, dateFormatter),
inputRecordCount = v.inputRecordCount,
inputRecordCountOld = v.inputRecordCountOld,
inputRecordCount = recordCountOpt,
inputRecordCountOld = recordCountOldOpt,
outputRecordCount = v.outputRecordCount,
outputRecordCountOld = v.outputRecordCountOld,
appendedRecordCount = v.appendedRecordCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ case class TaskCompleted(
periodBegin: LocalDate,
periodEnd: LocalDate,
informationDate: LocalDate,
inputRecordCount: Long,
inputRecordCountOld: Long,
inputRecordCount: Option[Long],
inputRecordCountOld: Option[Long],
outputRecordCount: Option[Long],
outputRecordCountOld: Option[Long],
appendedRecordCount: Option[Long],
Expand Down Expand Up @@ -60,8 +60,8 @@ object TaskCompleted {
val sparkApplicationId = Option(taskResult.applicationId)

val (recordCountOld, inputRecordCount, outputRecordCount, sizeBytes, appendedRecords) = taskResult.runStatus match {
case s: Succeeded => (s.recordCountOld, s.recordCount, Some(s.recordCount), s.sizeBytes, s.recordsAppended)
case _ => (None, 0L, None, None, None)
case s: Succeeded => (s.recordCountOld, s.recordCount, s.recordCount, s.sizeBytes, s.recordsAppended)
case _ => (None, None, None, None, None)
}

TaskCompleted(
Expand All @@ -71,7 +71,7 @@ object TaskCompleted {
task.infoDate,
task.infoDate,
inputRecordCount,
recordCountOld.getOrElse(0L),
recordCountOld,
outputRecordCount,
recordCountOld,
appendedRecords,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ case class TaskCompletedCsv(
periodBegin: String,
periodEnd: String,
informationDate: String,
inputRecordCount: Long,
inputRecordCountOld: Long,
inputRecordCount: Option[Long],
inputRecordCountOld: Option[Long],
outputRecordCount: Option[Long],
outputRecordCountOld: Option[Long],
outputSize: Option[Long],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.pramen.core.metastore

case class MetaTableStats(
recordCount: Long,
recordCount: Option[Long],
recordCountAppended: Option[Long],
dataSizeBytes: Option[Long]
)
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class MetastoreImpl(appConfig: Config,
val isTransient = mt.format.isTransient
val start = Instant.now.getEpochSecond

var stats = MetaTableStats(0, None, None)
var stats = MetaTableStats(Some(0), None, None)

withSparkConfig(mt.sparkConfig) {
stats = MetastorePersistence.fromMetaTable(mt, appConfig, saveModeOverride, batchId).saveTable(infoDate, df, inputRecordCount)
Expand All @@ -122,8 +122,10 @@ class MetastoreImpl(appConfig: Config,

val nothingAppended = stats.recordCountAppended.contains(0)

if (!skipBookKeepingUpdates && !nothingAppended) {
bookkeeper.setRecordCount(tableName, infoDate, infoDate, infoDate, inputRecordCount.getOrElse(stats.recordCount), stats.recordCount, start, finish, isTransient)
stats.recordCount.foreach{recordCount =>
if (!skipBookKeepingUpdates && !nothingAppended) {
bookkeeper.setRecordCount(tableName, infoDate, infoDate, infoDate, inputRecordCount.getOrElse(recordCount), recordCount, start, finish, isTransient)
}
}

stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,18 @@ class MetastorePersistenceDelta(query: Query,
case Some(size) =>
stats.recordCountAppended match {
case Some(recordsAppended) =>
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount}, " +
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount.get}, " +
s"new size: ${StringUtils.prettySize(size)}) to ${query.query}")
case None =>
log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " +
log.info(s"$SUCCESS Successfully saved ${stats.recordCount.get} records " +
s"(${StringUtils.prettySize(size)}) to ${query.query}")
}
case None =>
stats.recordCountAppended match {
case Some(recordsAppended) =>
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount} to ${query.query}")
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount.get} to ${query.query}")
case None =>
log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records to ${query.query}")
log.info(s"$SUCCESS Successfully saved ${stats.recordCount.get} records to ${query.query}")
}
}

Expand Down Expand Up @@ -160,11 +160,11 @@ class MetastorePersistenceDelta(query: Query,
val batchCount = df.filter(col(batchIdColumn) === batchId).count()
val countAll = df.count()

MetaTableStats(countAll, Option(batchCount), sizeOpt)
MetaTableStats(Option(countAll), Option(batchCount), sizeOpt)
} else {
val countAll = df.count()

MetaTableStats(countAll, None, sizeOpt)
MetaTableStats(Option(countAll), None, sizeOpt)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ class MetastorePersistenceParquet(path: String,

stats.recordCountAppended match {
case Some(recordsAppended) =>
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount}, " +
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount.get}, " +
s"new size: ${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir")
case None =>
log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " +
log.info(s"$SUCCESS Successfully saved ${stats.recordCount.get} records " +
s"(${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir")
}

Expand All @@ -123,11 +123,11 @@ class MetastorePersistenceParquet(path: String,
val batchCount = df.filter(col(batchIdColumn) === batchId).count()
val countAll = df.count()

MetaTableStats(countAll, Option(batchCount), Option(size))
MetaTableStats(Option(countAll), Option(batchCount), Option(size))
} else {
val countAll = df.count()

MetaTableStats(countAll, None, Option(size))
MetaTableStats(Option(countAll), None, Option(size))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class MetastorePersistenceRaw(path: String,
}

MetaTableStats(
totalSize,
Option(totalSize),
None,
Some(totalSize)
)
Expand All @@ -108,7 +108,7 @@ class MetastorePersistenceRaw(path: String,
})

MetaTableStats(
files.length,
Option(files.length),
None,
Some(totalSize)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,13 @@ class MetastorePersistenceTransientEager(tempPathOpt: Option[String],
}

val recordCount = numberOfRecordsEstimate match {
case Some(n) => n
case None => dfOut.count()
case Some(n) => Option(n)
case None =>
cachePolicy match {
case CachePolicy.Cache => Option(dfOut.count())
case CachePolicy.Persist => Option(dfOut.count())
case _ => None
}
}

MetaTableStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,14 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
builder.withTable(tableBuilder)
}

private[core] def getThroughputRps(task: TaskResult): TextElement = {
def getThroughputRps(task: TaskResult): TextElement = {
val recordCount = task.runStatus match {
case s: Succeeded =>
s.recordsAppended match {
case Some(appended) => appended
case None => s.recordCount
case None => s.recordCount.getOrElse(0L)
}
case _ => 0
case _ => 0L
}

task.runInfo match {
Expand Down Expand Up @@ -547,7 +547,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}

private[core] def getRecordCountText(task: TaskResult): String = {
def getRecordCountText(task: TaskResult): String = {
def renderDifference(numRecords: Long, numRecordsOld: Option[Long], numRecordsAppended: Option[Long]): String = {
numRecordsOld match {
case Some(old) if old > 0 =>
Expand All @@ -571,14 +571,18 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
"-"
} else {
task.runStatus match {
case s: Succeeded => renderDifference(s.recordCount, s.recordCountOld, s.recordsAppended)
case s: Succeeded =>
s.recordCount match {
case Some(recordCount) => renderDifference(recordCount, s.recordCountOld, s.recordsAppended)
case None => "-"
}
case d: InsufficientData => renderDifference(d.actual, d.recordCountOld, None)
case _ => ""
case _ => "-"
}
}
}

private[core] def getSizeText(task: TaskResult): String = {
def getSizeText(task: TaskResult): String = {
def renderDifferenceSize(numBytes: Long, numBytesOld: Option[Long]): String = {
numBytesOld match {
case Some(old) if old > 0 =>
Expand All @@ -595,7 +599,11 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}

task.runStatus match {
case s: Succeeded => renderDifferenceSize(s.recordCount, s.recordCountOld)
case s: Succeeded =>
s.recordCount match {
case Some(recordCount) => renderDifferenceSize(recordCount, s.recordCountOld)
case None => ""
}
case d: InsufficientData => renderDifferenceSize(d.actual, d.recordCountOld)
case _ => ""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,25 +120,29 @@ class PythonTransformationJob(operationDef: OperationDef,
case ex: AnalysisException => throw new RuntimeException(s"Output data not found in the metastore for $infoDate", ex)
}

if (stats.recordCount == 0 && minimumRecords > 0) {
val recordCount = stats.recordCount.getOrElse(0L)

if (recordCount == 0 && minimumRecords > 0) {
throw new RuntimeException(s"Output table is empty in the metastore for $infoDate")
}

if (stats.recordCount < minimumRecords) {
throw new RuntimeException(s"The transformation returned too few records (${stats.recordCount} < $minimumRecords).")
if (recordCount < minimumRecords) {
throw new RuntimeException(s"The transformation returned too few records ($recordCount < $minimumRecords).")
}

val jobFinished = Instant.now()

bookkeeper.setRecordCount(outputTable.name,
infoDate,
infoDate,
infoDate,
stats.recordCount,
stats.recordCount,
jobStarted.getEpochSecond,
jobFinished.getEpochSecond,
isTableTransient = false)
stats.recordCount.foreach{ recordCount =>
bookkeeper.setRecordCount(outputTable.name,
infoDate,
infoDate,
infoDate,
recordCount,
recordCount,
jobStarted.getEpochSecond,
jobFinished.getEpochSecond,
isTableTransient = false)
}

SaveResult(stats)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class SinkJob(operationDef: OperationDef,
isTransient
)

val stats = MetaTableStats(sinkResult.recordsSent, None, None)
val stats = MetaTableStats(Option(sinkResult.recordsSent), None, None)
SaveResult(stats, sinkResult.filesSent, sinkResult.hiveTables, sinkResult.warnings ++ tooLongWarnings)
} catch {
case NonFatal(ex) => throw new IllegalStateException("Unable to write to the sink.", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ abstract class TaskRunnerBase(conf: Config,

val saveResult = if (runtimeConfig.isDryRun) {
log.warn(s"$WARNING DRY RUN mode, no actual writes to ${task.job.outputTable.name} for ${task.infoDate} will be performed.")
SaveResult(MetaTableStats(dfTransformed.count(), None, None))
SaveResult(MetaTableStats(Option(dfTransformed.count()), None, None))
} else {
task.job.save(dfTransformed, task.infoDate, task.reason, conf, started, validationResult.inputRecordsCount)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object TaskNotificationFactory {
Instant.ofEpochMilli(1613600000000L),
Instant.ofEpochMilli(1672759508000L)
)),
status: RunStatus = RunStatus.Succeeded(None, 100, None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty),
status: RunStatus = RunStatus.Succeeded(None, Some(100), None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty),
applicationId: String = "app_12345",
isTransient: Boolean = false,
isRawFilesJob: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF

val stats = m.getStats("table1", infoDate)

assert(stats.recordCount == 3)
assert(stats.recordCount.contains(3))
assert(stats.dataSizeBytes.exists(_ > 0))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T

val stats = persistence.getStats(infoDate, onlyForCurrentBatchId = false)

assert(stats.recordCount == 2)
assert(stats.recordCount.contains(2))
assert(stats.dataSizeBytes.contains(7L))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class MetastorePersistenceSuite extends AnyWordSpec with SparkTestBase with Temp

val stats = mtp.getStats(infoDate, onlyForCurrentBatchId = false)

assert(stats.recordCount == 3)
assert(stats.recordCount.contains(3))
assert(stats.dataSizeBytes.exists(_ > 0))
}

Expand All @@ -226,7 +226,7 @@ class MetastorePersistenceSuite extends AnyWordSpec with SparkTestBase with Temp

val stats = mtp.getStats(infoDate, onlyForCurrentBatchId = false)

assert(stats.recordCount == 0)
assert(stats.recordCount.contains(0))
assert(stats.dataSizeBytes.contains(0L))
}

Expand Down
Loading

0 comments on commit 09420c2

Please sign in to comment.