Skip to content

Commit

Permalink
#374 Make normal transformers compatible with incremental ingestion, …
Browse files Browse the repository at this point in the history
…and add more useful methods to metastore interfaces.
  • Loading branch information
yruslan committed Sep 12, 2024
1 parent 608884c commit 7dfec70
Show file tree
Hide file tree
Showing 25 changed files with 242 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package za.co.absa.pramen.api

import org.apache.spark.sql.DataFrame
import za.co.absa.pramen.api.offset.DataOffset
import za.co.absa.pramen.api.status.TaskRunReason

import java.time.LocalDate

Expand Down Expand Up @@ -100,6 +102,15 @@ trait MetastoreReader {
*/
def isDataAvailable(tableName: String, from: Option[LocalDate], until: Option[LocalDate]): Boolean

/**
* Returns offsets for an information date (both committed and uncommitted).
*
* This info can be used by transformers and sinks to decide if actions need to be taken depending on the
* current micro batch. For example, adding partitions to Hive needs to happen only once per info date,
* so a sink that does this can check if micro-batches have been ran for the current day.
*/
def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset]

/**
* Gets definition of a metastore table. Please, use with caution and do not write to the underlying path
* from transformers.
Expand All @@ -120,6 +131,12 @@ trait MetastoreReader {
*/
def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo]

/**
* Returns the reason of running the task. This helps transformers and sinks to determine logic based on whether
* thr run is a normal run or a force re-run.
*/
def getRunReason: TaskRunReason

/**
* Returns an object that allows accessing metadata of metastore tables.
*/
Expand Down
7 changes: 5 additions & 2 deletions pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,12 @@ trait Source extends ExternalChannel {
def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): SourceResult

/**
* Returns the incremental data greater than the specified offset.
* Returns the incremental data between specified offsets.
*
* If an information date is provided and available at the source, the query will be limited to that date
* If an information date is provided and available at the source, the query will be limited to that date.
*
* This method is used for re-runs for a particular information day. For sources that have information date column
* the returned data will be for the full information date, even outside the specified offsets.
*
* @param minOffset This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset
* @param maxOffset This is an inclusive parameter the query will be SELECT ... WHERE offset_col <= max_offset
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.offset

import java.time.LocalDate

case class DataOffset(tableName: String,
infoDate: LocalDate,
minOffset: OffsetValue,
maxOffset: Option[OffsetValue], /* Can be None for uncommitted offsets. */
createdAt: Long,
committedAt: Option[Long]
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package za.co.absa.pramen.core.bookkeeper

import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.core.bookkeeper.model.{DataOffset, DataOffsetAggregated, DataOffsetRequest}
import za.co.absa.pramen.api.offset.{DataOffset, OffsetValue}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}

import java.time.LocalDate

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.bookkeeper

import org.slf4j.LoggerFactory
import slick.jdbc.H2Profile.api._
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.offset.{DataOffset, OffsetValue}
import za.co.absa.pramen.core.bookkeeper.model._
import za.co.absa.pramen.core.utils.SlickUtils

Expand All @@ -37,7 +37,7 @@ class OffsetManagerJdbc(db: Database) extends OffsetManager {
return Array.empty
}

offsets.map(DataOffset.fromOffsetRecord)
offsets.map(OffsetRecordConverter.toDataOffset)
}

override def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = {
Expand Down Expand Up @@ -118,7 +118,7 @@ class OffsetManagerJdbc(db: Database) extends OffsetManager {
val minOffset = OffsetValue.fromString(offsetDataType, offsets.map(_.minOffset).min)
val maxOffset = OffsetValue.fromString(offsetDataType, offsets.map(_.maxOffset).max)

Some(DataOffsetAggregated(table, infoDate, minOffset, maxOffset, offsets.map(DataOffset.fromOffsetRecord)))
Some(DataOffsetAggregated(table, infoDate, minOffset, maxOffset, offsets.map(OffsetRecordConverter.toDataOffset)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.core.bookkeeper.model

import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.offset.{DataOffset, OffsetValue}

import java.time.LocalDate

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,12 @@

package za.co.absa.pramen.core.bookkeeper.model

import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.offset.{DataOffset, OffsetValue}

import java.time.LocalDate

case class DataOffset(tableName: String,
infoDate: LocalDate,
minOffset: OffsetValue,
maxOffset: Option[OffsetValue], /* Can be None for uncommitted offsets. */
createdAt: Long,
committedAt: Option[Long]
)

object DataOffset {
def fromOffsetRecord(r: OffsetRecord): DataOffset = {
object OffsetRecordConverter {
def toDataOffset(r: OffsetRecord): DataOffset = {
val maxOffsetOpt = if (r.maxOffset.nonEmpty) {
Option(OffsetValue.fromString(r.dataType, r.maxOffset))
} else {
Expand All @@ -45,4 +37,4 @@ object DataOffset {
r.committedAtMilli
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package za.co.absa.pramen.core.metastore
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SaveMode}
import za.co.absa.pramen.api._
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.utils.hive.HiveHelper

Expand Down Expand Up @@ -53,5 +54,5 @@ trait Metastore {

def getStats(tableName: String, infoDate: LocalDate): MetaTableStats

def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, isIncremental: Boolean): MetastoreReader
def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean): MetastoreReader
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
import za.co.absa.pramen.api.offset.DataOffset
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.core.app.config.InfoDateConfig
import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT
import za.co.absa.pramen.core.app.config.RuntimeConfig.UNDERCOVER
Expand Down Expand Up @@ -194,7 +196,7 @@ class MetastoreImpl(appConfig: Config,
MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).getStats(infoDate, onlyForCurrentBatchId = false)
}

override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, isIncremental: Boolean): MetastoreReader = {
override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean): MetastoreReader = {
val metastore = this

new MetastoreReader {
Expand Down Expand Up @@ -232,6 +234,12 @@ class MetastoreImpl(appConfig: Config,
metastore.isDataAvailable(tableName, fromDate, untilDate)
}

override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = {
val om = bookkeeper.getOffsetManager

om.getOffsets(table, infoDate)
}

override def getTableDef(tableName: String): MetaTableDef = {
validateTable(tableName)

Expand All @@ -245,6 +253,8 @@ class MetastoreImpl(appConfig: Config,
)
}

override def getRunReason: TaskRunReason = runReason

override def metadataManager: MetadataManager = metadata

private def validateTable(tableName: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ class MetastorePersistenceParquet(path: String,

val stats = getStats(infoDate, isAppend)

if (isAppend) {
log.info(s"$SUCCESS Successfully saved ${stats.recordCountAppended.get} records (new count: ${stats.recordCount}, " +
s"new size: ${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir")
} else {
log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " +
s"(${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir")
stats.recordCountAppended match {
case Some(recordsAppended) =>
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount}, " +
s"new size: ${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir")
case None =>
log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " +
s"(${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir")
}

stats
Expand All @@ -118,7 +119,7 @@ class MetastorePersistenceParquet(path: String,

val size = fsUtils.getDirectorySize(outputDirStr)

if (onlyForCurrentBatchId) {
if (onlyForCurrentBatchId && df.schema.exists(_.name.equalsIgnoreCase(batchIdColumn))) {
val batchCount = df.filter(col(batchIdColumn) === batchId).count()
val countAll = df.count()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult}
import za.co.absa.pramen.api.{Reason, Source}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated
import za.co.absa.pramen.core.metastore.Metastore
Expand Down Expand Up @@ -51,12 +51,13 @@ class IncrementalIngestionJob(operationDef: OperationDef,
override def trackDays: Int = 0

override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
if (source.hasInfoDateColumn(sourceTable.query)) {
val hasInfoDateColumn = source.hasInfoDateColumn(sourceTable.query)
if (hasInfoDateColumn && runReason == TaskRunReason.Rerun) {
super.preRunCheckJob(infoDate, runReason, jobConfig, dependencyWarnings)
} else {
latestOffset match {
case Some(offset) =>
if (offset.maximumInfoDate.isAfter(infoDate)) {
if (offset.maximumInfoDate.isAfter(infoDate) && !hasInfoDateColumn) {
JobPreRunResult(JobPreRunStatus.Skip("Retrospective runs are not allowed yet"), None, dependencyWarnings, Nil)
} else {
JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil)
Expand Down Expand Up @@ -140,7 +141,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
source.postProcess(
sourceTable.query,
outputTable.name,
metastore.getMetastoreReader(Seq(outputTable.name), infoDate, isIncremental = true),
metastore.getMetastoreReader(Seq(outputTable.name), infoDate, runReason, isIncremental = true),
infoDate,
operationDef.extraOptions
)
Expand All @@ -155,21 +156,4 @@ class IncrementalIngestionJob(operationDef: OperationDef,

SaveResult(stats, warnings = tooLongWarnings)
}

private def getSourcingResult(infoDate: LocalDate): SourceResult = {
val (from, to) = getInfoDateRange(infoDate, sourceTable.rangeFromExpr, sourceTable.rangeToExpr)

getData(source, sourceTable.query, from, to)
}

private def getData(source: Source, query: Query, from: LocalDate, to: LocalDate): SourceResult = {
val sourceResult = if (sourceTable.transformations.isEmpty && sourceTable.filters.isEmpty)
source.getData(query, from, to, sourceTable.columns) // push down the projection
else
source.getData(query, from, to, Seq.empty[String]) // column selection and order will be applied later

val sanitizedDf = sanitizeDfColumns(sourceResult.data, specialCharacters)

sourceResult.copy(data = sanitizedDf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class IngestionJob(operationDef: OperationDef,
source.postProcess(
sourceTable.query,
outputTable.name,
metastore.getMetastoreReader(Seq(outputTable.name), infoDate, isIncremental = false),
metastore.getMetastoreReader(Seq(outputTable.name), infoDate, runReason, isIncremental = false),
infoDate,
operationDef.extraOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,52 @@ abstract class JobBase(operationDef: OperationDef,
}

protected def preRunTransformationCheck(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
validateTransformationAlreadyRanCases(infoDate, dependencyWarnings) match {
case Some(result) => result
case None => JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String])
if (isIncremental) {
JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String])
} else {
validateTransformationAlreadyRanCases(infoDate, dependencyWarnings) match {
case Some(result) => result
case None => JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String])
}
}
}

protected def validateTransformationAlreadyRanCases(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): Option[JobPreRunResult] = {
if (!isIncremental && bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate).isDefined) {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.")
Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String]))
} else {
log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.")
None
bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match {
case Some(chunk) =>
val outOfDateTables = getOutdatedTables(infoDate, chunk.jobFinished)
if (outOfDateTables.nonEmpty) {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate, but has outdated tables: ${outOfDateTables.mkString(", ")}")
Some(JobPreRunResult(JobPreRunStatus.NeedsUpdate, None, dependencyWarnings, Seq.empty[String]))
} else {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.")
Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String]))
}
case None =>
log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.")
None
}
}

private def getOutdatedTables(infoDate: LocalDate, targetJobFinishedSeconds: Long): Seq[String] = {
operationDef.dependencies
.filter(d => !d.isOptional && !d.isPassive)
.flatMap(_.tables)
.distinct
.filter { table =>
bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match {
case Some(chunk) if chunk.jobFinished >= targetJobFinishedSeconds =>
log.warn(s"The dependent table '$table' has been updated at ${Instant.ofEpochSecond(chunk.jobFinished)} retrospectively " +
s"after the transformation at ${Instant.ofEpochSecond(targetJobFinishedSeconds)} .")
true
case Some(chunk) =>
false
case Some(chunk) =>
false
}
}
}

protected def checkDependency(dep: MetastoreDependency, infoDate: LocalDate): Option[DependencyFailure] = {
val evaluator = new DateExprEvaluator
evaluator.setValue("infoDate", infoDate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class SinkJob(operationDef: OperationDef,
try {
val sinkResult = sink.send(df,
sinkTable.metaTableName,
metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, infoDate, isIncremental),
metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, infoDate, runReason, isIncremental),
infoDate,
sinkTable.options
)
Expand Down
Loading

0 comments on commit 7dfec70

Please sign in to comment.