Skip to content

Commit

Permalink
#372 Add Spark Application Id to the pipeline notification target int…
Browse files Browse the repository at this point in the history
…erface.
  • Loading branch information
yruslan committed May 3, 2024
1 parent 52fe9c1 commit 3af129b
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 25 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2636,8 +2636,9 @@ import za.co.absa.pramen.api.PipelineNotificationTarget

class MyPipelineNotificationTarget(conf: Config) extends PipelineNotificationTarget {
override def sendNotification(pipelineStarted: Instant,
appException: Option[Throwable],
tasksCompleted: Seq[TaskNotification]): Unit = ???
applicationId: Option[String],
appException: Option[Throwable],
tasksCompleted: Seq[TaskNotification]): Unit = ???

override def config: Config = conf
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.time.Instant
trait PipelineNotificationTarget extends ExternalChannel {
/** Sends a notification after completion of the pipeline. */
def sendNotification(pipelineStarted: Instant,
applicationId: Option[String],
appException: Option[Throwable],
tasksCompleted: Seq[TaskNotification]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import java.time.{Instant, LocalDate}

case class TaskNotification(
tableName: String,
infoDate: LocalDate,
started: Instant,
finished: Instant,
infoDate: Option[LocalDate],
started: Option[Instant],
finished: Option[Instant],
status: TaskStatus,
applicationId: String,
isTransient: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.typesafe.config.Config
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.lit
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api
import za.co.absa.pramen.api.{DataFormat, Reason, SchemaDifference, TaskNotification}
import za.co.absa.pramen.core.app.config.RuntimeConfig
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
Expand Down Expand Up @@ -394,9 +393,9 @@ abstract class TaskRunnerBase(conf: Config,
NotificationTargetManager.runStatusToTaskStatus(result.runStatus).foreach { taskStatus =>
val notification = TaskNotification(
task.job.outputTable.name,
task.infoDate,
result.runInfo.get.started,
result.runInfo.get.finished,
Option(task.infoDate),
result.runInfo.map(_.started),
result.runInfo.map(_.finished),
taskStatus,
result.applicationId,
result.isTransient,
Expand Down Expand Up @@ -441,7 +440,7 @@ abstract class TaskRunnerBase(conf: Config,
if (diff.nonEmpty) {
log.warn(s"$WARNING SCHEMA CHANGE for $table from $oldInfoDate to $infoDate: ${diff.map(_.toString).mkString("; ")}")
bookkeeper.saveSchema(table.name, infoDate, df.schema)
api.SchemaDifference(table.name, oldInfoDate, infoDate, diff) :: Nil
SchemaDifference(table.name, oldInfoDate, infoDate, diff) :: Nil
} else {
Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import za.co.absa.pramen.core.app.config.HookConfig
import za.co.absa.pramen.core.app.config.RuntimeConfig.EMAIL_IF_NO_CHANGES
import za.co.absa.pramen.core.exceptions.{OsSignalException, ThreadStackTrace}
import za.co.absa.pramen.core.metastore.peristence.{TransientJobManager, TransientTableManager}
import za.co.absa.pramen.core.notify.{NotificationTargetManager, PipelineNotificationTargetFactory}
import za.co.absa.pramen.core.notify.pipeline.{PipelineNotification, PipelineNotificationEmail}
import za.co.absa.pramen.core.notify.{NotificationTargetManager, PipelineNotificationTargetFactory}
import za.co.absa.pramen.core.pipeline.PipelineDef._
import za.co.absa.pramen.core.runner.task.RunStatus.NotRan
import za.co.absa.pramen.core.runner.task.{PipelineNotificationFailure, TaskResult}

import java.time.Instant
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
import scala.collection.JavaConverters._

class PipelineStateImpl(implicit conf: Config, notificationBuilder: NotificationBuilder) extends PipelineState {
import PipelineStateImpl._
Expand Down Expand Up @@ -87,7 +87,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
override def setSuccess(): Unit = synchronized {
if (!alreadyFinished()) {
exitedNormally = true
sendCustomNotifications()
sendPipelineNotifications()
runCustomShutdownHook()
sendNotificationEmail()
}
Expand All @@ -100,7 +100,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
}
exitCode |= EXIT_CODE_APP_FAILED
exitedNormally = false
sendCustomNotifications()
sendPipelineNotifications()
runCustomShutdownHook()
sendNotificationEmail()
}
Expand Down Expand Up @@ -138,7 +138,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
failureException = Some(new IllegalStateException("The application exited unexpectedly."))
}

sendCustomNotifications()
sendPipelineNotifications()
runCustomShutdownHook()
sendNotificationEmail()
Try {
Expand Down Expand Up @@ -172,14 +172,14 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
}
}

private[state] def sendCustomNotifications(): Unit = {
private[state] def sendPipelineNotifications(): Unit = {
val taskNotifications = taskResults.flatMap { task =>
NotificationTargetManager.runStatusToTaskStatus(task.runStatus).map(taskStatus =>
TaskNotification(
task.job.outputTable.name,
task.runInfo.get.infoDate,
task.runInfo.get.started,
task.runInfo.get.finished,
task.runInfo.map(_.infoDate),
task.runInfo.map(_.started),
task.runInfo.map(_.finished),
taskStatus,
task.applicationId,
task.isTransient,
Expand All @@ -198,6 +198,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
try {
pipelineNotificationTarget.sendNotification(
startedInstant,
sparkAppId,
failureException,
taskNotifications
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import java.time.{Instant, LocalDate}

object TaskNotificationFactory {
def getDummyTaskNotification(tableName: String = "dummy_table",
infoDate: LocalDate = LocalDate.of(2022, 2, 18),
started: Instant = Instant.ofEpochMilli(1613600000000L),
finished: Instant = Instant.ofEpochMilli(1672759508000L),
infoDate: Option[LocalDate] = Some(LocalDate.of(2022, 2, 18)),
started: Option[Instant] = Some(Instant.ofEpochMilli(1613600000000L)),
finished: Option[Instant] = Some(Instant.ofEpochMilli(1672759508000L)),
status: TaskStatus = TaskStatus.Succeeded(100, Seq.empty, Seq.empty, Seq.empty, Seq.empty),
applicationId: String = "app_12345",
isTransient: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ParallelExecutionLongSuite extends AnyWordSpec with SparkTestBase with Tem

val exitCode = AppRunner.runPipeline(conf)

assert(exitCode != 0)
assert(exitCode == 2)

val table3Path = new Path(tempDir, "table3")
val sink3Path = new Path(tempDir, "sink3")
Expand Down Expand Up @@ -70,7 +70,7 @@ class ParallelExecutionLongSuite extends AnyWordSpec with SparkTestBase with Tem

val exitCode = AppRunner.runPipeline(conf)

assert(exitCode != 0)
assert(exitCode == 2)

val table3Path = new Path(tempDir, "table3")
val sink3Path = new Path(tempDir, "sink3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.time.Instant

class PipelineNotificationTargetMock(conf: Config) extends PipelineNotificationTarget {

override def sendNotification(pipelineStarted: Instant, appException: Option[Throwable], tasksCompleted: Seq[TaskNotification]): Unit = {
override def sendNotification(pipelineStarted: Instant, applicationId: Option[String], appException: Option[Throwable], tasksCompleted: Seq[TaskNotification]): Unit = {
if (conf.hasPath(TEST_NOTIFICATION_FAIL_KEY) && conf.getBoolean(TEST_NOTIFICATION_FAIL_KEY)) {
System.setProperty("pramen.test.notification.pipeline.failure", "true")
throw new RuntimeException("Pipeline notification target test exception")
Expand Down

0 comments on commit 3af129b

Please sign in to comment.