diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotification.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotification.scala index 1efeedcc2..3a44c7d9a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotification.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotification.scala @@ -25,6 +25,7 @@ case class PipelineNotification( exception: Option[Throwable], pipelineName: String, environmentName: String, + sparkAppId: Option[String], started: Instant, finished: Instant, tasksCompleted: List[TaskResult], diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilder.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilder.scala index a66670269..dca0d0e99 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilder.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilder.scala @@ -26,6 +26,8 @@ trait PipelineNotificationBuilder { def addAppName(appName: String): Unit + def addSparkAppId(sparkAppId: String): Unit + def addEnvironmentName(env: String): Unit def addAppDuration(appStarted: Instant, appFinished: Instant): Unit diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index 4f494d6ac..993646246 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -54,6 +54,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot var appException: Option[Throwable] = None var appName: String = "Unspecified Job" + var sparkAppId: Option[String] = None var envName: String = "Unspecified Environment" var appStarted: Instant = Instant.now() var appFinished: Instant = Instant.now() @@ -72,6 +73,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot this.appName = appName } + override def addSparkAppId(sparkAppId: String): Unit = { + this.sparkAppId = Option(sparkAppId) + } + override def addEnvironmentName(envName: String): Unit = { this.envName = envName } @@ -191,7 +196,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot introParagraph.withText(".") - val applicationIds = completedTasks.map(_.applicationId.trim).filter(_.nonEmpty).distinct + val applicationIds = getSparkApplicationIds // This handles the case when all tasks are run under the same Spark Session. // When Pramen support runners that run tasks in different Spark Sessions (via Yarn, Glue etc APIs), this will need @@ -227,6 +232,13 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot builder } + private[core] def getSparkApplicationIds: Seq[String] = { + sparkAppId match { + case Some(appId) => (appId +: completedTasks.map(_.applicationId.trim).filter(_.nonEmpty)).distinct.toSeq + case None => completedTasks.map(_.applicationId.trim).filter(_.nonEmpty).distinct.toSeq + } + } + private[core] def getSuccessFlags: (Boolean, Boolean) = { val hasNotificationFailures = completedTasks.exists(t => t.notificationTargetErrors.nonEmpty) val someTasksSucceeded = completedTasks.exists(_.runStatus.isInstanceOf[Succeeded]) && appException.isEmpty diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationDirector.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationDirector.scala index b3834fee8..2e88de7bc 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationDirector.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationDirector.scala @@ -36,6 +36,7 @@ object PipelineNotificationDirector { notificationBuilder.addAppName(notification.pipelineName) notificationBuilder.addEnvironmentName(notification.environmentName) + notification.sparkAppId.foreach(id => notificationBuilder.addSparkAppId(id)) notificationBuilder.addAppDuration(notification.started, notification.finished) notificationBuilder.addDryRun(dryRun) notificationBuilder.addUndercover(undercover) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala index 14d62c2b2..e81f5a7df 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala @@ -55,6 +55,7 @@ object AppRunner { val exitCodeTry = for { spark <- getSparkSession(conf, state) + _ <- Try { state.setSparkAppId(spark.sparkContext.applicationId) } _ <- logBanner(spark) _ <- logExecutorNodes(conf, state, spark) appContext <- createAppContext(conf, state, spark) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala index 0c836fa9d..8047a1071 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala @@ -27,6 +27,8 @@ trait PipelineState extends AutoCloseable { def setFailure(stage: String, exception: Throwable): Unit + def setSparkAppId(sparkAppId: String): Unit + def addTaskCompletion(statuses: Seq[TaskResult]): Unit def getExitCode: Int diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index 3d9a402b2..d55d10320 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -54,6 +54,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification @volatile private var exitedNormally = false @volatile private var isFinished = false @volatile private var customShutdownHookCanRun = false + @volatile private var sparkAppId: Option[String] = None init() @@ -100,6 +101,10 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification } } + override def setSparkAppId(sparkAppId: String): Unit = synchronized { + this.sparkAppId = Option(sparkAppId) + } + override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized { taskResults ++= statuses.filter(_.runStatus != NotRan) if (statuses.exists(_.runStatus.isFailure)) { @@ -178,6 +183,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification val notification = PipelineNotification(failureException, pipelineName, environmentName, + sparkAppId, startedInstant, finishedInstant, realTaskResults.toList, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineNotificationFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineNotificationFactory.scala index 1b34bbade..5392101f7 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineNotificationFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineNotificationFactory.scala @@ -27,6 +27,7 @@ object PipelineNotificationFactory { def getDummyNotification(exception: Option[Throwable] = None, pipelineName: String = "DummyPipeline", environmentName: String = "DummyEnvironment", + sparkAppId: Option[String] = None, started: Instant = Instant.ofEpochSecond(1234567L), finished: Instant = Instant.ofEpochSecond(1234568L), tasksCompleted: List[TaskResult] = List(TaskResultFactory.getDummyTaskResult()), @@ -37,6 +38,7 @@ object PipelineNotificationFactory { exception, pipelineName, environmentName, + sparkAppId, started, finished, tasksCompleted, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationBuilderSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationBuilderSpy.scala index 04eb8760d..ee14a04fb 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationBuilderSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationBuilderSpy.scala @@ -25,6 +25,7 @@ import java.time.Instant class PipelineNotificationBuilderSpy extends PipelineNotificationBuilder { var failureException: Option[Throwable] = None var appName = "" + var sparkId = "" var environmentName = "" var appStarted: Instant = Instant.MIN var appFinished: Instant = Instant.MIN @@ -41,6 +42,8 @@ class PipelineNotificationBuilderSpy extends PipelineNotificationBuilder { override def addAppName(name: String): Unit = appName = name + override def addSparkAppId(sparkAppId: String): Unit = sparkId = sparkAppId + override def addEnvironmentName(env: String): Unit = environmentName = env override def addAppDuration(started: Instant, finished: Instant): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala index c8a5cdaee..f0e82a611 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala @@ -29,6 +29,7 @@ class PipelineStateSpy extends PipelineState { val failures = new ListBuffer[(String, Throwable)] val completedStatuses = new ListBuffer[TaskResult] var closeCalled = 0 + var sparkAppId: Option[String] = None override def getState(): PipelineStateSnapshot = { PipelineStateSnapshot( @@ -53,6 +54,10 @@ class PipelineStateSpy extends PipelineState { failures.append((stage, ex)) } + override def setSparkAppId(sparkAppId: String): Unit = synchronized { + this.sparkAppId = Option(sparkAppId) + } + override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized { completedStatuses ++= statuses }