From 644bda5bf786ce395d0811c688237a4340395895 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 30 Apr 2024 10:59:46 +0200 Subject: [PATCH 1/5] #372 Extend notification target messages with more fields and add pipeline notification target interface. --- .../za/co/absa/pramen/api}/FieldChange.scala | 2 +- .../api/PipelineNotificationTarget.scala | 26 +++++++++++++++++++ .../absa/pramen/api}/SchemaDifference.scala | 2 +- .../co/absa/pramen/api/TaskNotification.scala | 4 +++ .../PipelineNotificationBuilderHtml.scala | 1 + .../pramen/core/runner/task/TaskResult.scala | 2 +- .../core/runner/task/TaskRunnerBase.scala | 10 ++++--- .../absa/pramen/core/utils/SparkUtils.scala | 2 +- .../pramen/core/TaskNotificationFactory.scala | 8 ++++-- .../core/mocks/FieldChangeFactory.scala | 2 +- .../core/mocks/SchemaDifferenceFactory.scala | 2 +- .../pramen/core/mocks/TaskResultFactory.scala | 2 +- .../core/tests/utils/SparkUtilsSuite.scala | 2 +- 13 files changed, 52 insertions(+), 13 deletions(-) rename pramen/{core/src/main/scala/za/co/absa/pramen/core/notify/pipeline => api/src/main/scala/za/co/absa/pramen/api}/FieldChange.scala (95%) create mode 100644 pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineNotificationTarget.scala rename pramen/{core/src/main/scala/za/co/absa/pramen/core/notify/pipeline => api/src/main/scala/za/co/absa/pramen/api}/SchemaDifference.scala (94%) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/FieldChange.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/FieldChange.scala similarity index 95% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/FieldChange.scala rename to pramen/api/src/main/scala/za/co/absa/pramen/api/FieldChange.scala index 9f3e1fda1..f7b5358aa 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/FieldChange.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/FieldChange.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.pramen.core.notify.pipeline +package za.co.absa.pramen.api sealed trait FieldChange diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineNotificationTarget.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineNotificationTarget.scala new file mode 100644 index 000000000..136680773 --- /dev/null +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineNotificationTarget.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.api + +import java.time.Instant + +trait PipelineNotificationTarget extends ExternalChannel { + /** Sends a notification after completion of the pipeline. */ + def sendNotification(pipelineStarted: Instant, + appException: Option[Throwable], + tasksCompleted: Seq[TaskNotification]): Unit +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/SchemaDifference.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/SchemaDifference.scala similarity index 94% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/SchemaDifference.scala rename to pramen/api/src/main/scala/za/co/absa/pramen/api/SchemaDifference.scala index 325e438c1..4d45a4a4b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/SchemaDifference.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/SchemaDifference.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.pramen.core.notify.pipeline +package za.co.absa.pramen.api import java.time.LocalDate diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/TaskNotification.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/TaskNotification.scala index 89b6831d1..3d2003a59 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/TaskNotification.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/TaskNotification.scala @@ -25,5 +25,9 @@ case class TaskNotification( finished: Instant, status: TaskStatus, applicationId: String, + isTransient: Boolean, + isRawFilesJob: Boolean, + schemaChanges: Seq[SchemaDifference], + dependencyWarningTables: Seq[String], options: Map[String, String] ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index 993646246..27d42b9ba 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.notify.pipeline import com.typesafe.config.Config import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.{FieldChange, SchemaDifference} import za.co.absa.pramen.api.notification._ import za.co.absa.pramen.core.config.Keys.TIMEZONE import za.co.absa.pramen.core.exceptions.{CmdFailedException, ProcessFailedException} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskResult.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskResult.scala index 9d6454c38..6183a8fc5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskResult.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskResult.scala @@ -16,7 +16,7 @@ package za.co.absa.pramen.core.runner.task -import za.co.absa.pramen.core.notify.pipeline.SchemaDifference +import za.co.absa.pramen.api.SchemaDifference import za.co.absa.pramen.core.pipeline.{DependencyWarning, Job} case class TaskResult( diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index 89679b6a2..f5e297755 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -20,7 +20,8 @@ import com.typesafe.config.Config import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.lit import org.slf4j.LoggerFactory -import za.co.absa.pramen.api.{DataFormat, Reason, TaskNotification} +import za.co.absa.pramen.api +import za.co.absa.pramen.api.{DataFormat, Reason, SchemaDifference, TaskNotification} import za.co.absa.pramen.core.app.config.RuntimeConfig import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.exceptions.{FatalErrorWrapper, ReasonException} @@ -30,7 +31,6 @@ import za.co.absa.pramen.core.lock.TokenLockFactory import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.notify.NotificationTargetManager -import za.co.absa.pramen.core.notify.pipeline.SchemaDifference import za.co.absa.pramen.core.pipeline.JobPreRunStatus._ import za.co.absa.pramen.core.pipeline._ import za.co.absa.pramen.core.state.PipelineState @@ -399,6 +399,10 @@ abstract class TaskRunnerBase(conf: Config, result.runInfo.get.finished, taskStatus, result.applicationId, + result.isTransient, + result.isRawFilesJob, + result.schemaChanges, + result.dependencyWarnings.map(_.table), notificationTarget.options ) @@ -437,7 +441,7 @@ abstract class TaskRunnerBase(conf: Config, if (diff.nonEmpty) { log.warn(s"$WARNING SCHEMA CHANGE for $table from $oldInfoDate to $infoDate: ${diff.map(_.toString).mkString("; ")}") bookkeeper.saveSchema(table.name, infoDate, df.schema) - SchemaDifference(table.name, oldInfoDate, infoDate, diff) :: Nil + api.SchemaDifference(table.name, oldInfoDate, infoDate, diff) :: Nil } else { Nil } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index fa7ebe72f..b7437f42b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, SparkSession} import org.slf4j.LoggerFactory -import za.co.absa.pramen.core.notify.pipeline.FieldChange +import za.co.absa.pramen.api.FieldChange import za.co.absa.pramen.core.pipeline.TransformExpression import java.io.ByteArrayOutputStream diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala index 52ced44a4..cd1a19f84 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala @@ -16,7 +16,7 @@ package za.co.absa.pramen.core -import za.co.absa.pramen.api.{TaskNotification, TaskStatus} +import za.co.absa.pramen.api.{SchemaDifference, TaskNotification, TaskStatus} import java.time.{Instant, LocalDate} @@ -27,8 +27,12 @@ object TaskNotificationFactory { finished: Instant = Instant.ofEpochMilli(1672759508000L), status: TaskStatus = TaskStatus.Succeeded(100, Seq.empty, Seq.empty, Seq.empty, Seq.empty), applicationId: String = "app_12345", + isTransient: Boolean = false, + isRawFilesJob: Boolean = false, + schemaChanges: Seq[SchemaDifference] = Seq.empty, + dependencyWarningTables: Seq[String] = Seq.empty, options: Map[String, String] = Map.empty[String, String]): TaskNotification = { - TaskNotification(tableName, infoDate, started, finished, status, applicationId, options) + TaskNotification(tableName, infoDate, started, finished, status, applicationId, isTransient, isRawFilesJob, schemaChanges, dependencyWarningTables, options) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/FieldChangeFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/FieldChangeFactory.scala index efb8efc2f..e6a255904 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/FieldChangeFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/FieldChangeFactory.scala @@ -16,7 +16,7 @@ package za.co.absa.pramen.core.mocks -import za.co.absa.pramen.core.notify.pipeline.FieldChange +import za.co.absa.pramen.api.FieldChange object FieldChangeFactory { def getDummyNewField(columnName: String = "dummy_new_column", dataType: String = "int"): FieldChange = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SchemaDifferenceFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SchemaDifferenceFactory.scala index 8bbca1213..90280603a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SchemaDifferenceFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SchemaDifferenceFactory.scala @@ -16,7 +16,7 @@ package za.co.absa.pramen.core.mocks -import za.co.absa.pramen.core.notify.pipeline.{FieldChange, SchemaDifference} +import za.co.absa.pramen.api.{FieldChange, SchemaDifference} import java.time.LocalDate diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala index f72b129c0..37aad2e3f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala @@ -16,8 +16,8 @@ package za.co.absa.pramen.core.mocks +import za.co.absa.pramen.api.SchemaDifference import za.co.absa.pramen.core.mocks.job.JobSpy -import za.co.absa.pramen.core.notify.pipeline.SchemaDifference import za.co.absa.pramen.core.pipeline.{DependencyWarning, Job, TaskRunReason} import za.co.absa.pramen.core.runner.task.{NotificationFailure, RunInfo, RunStatus, TaskResult} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala index 5b435d651..217d8ee86 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.{DataFrame, Row} import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture} -import za.co.absa.pramen.core.notify.pipeline.FieldChange._ +import za.co.absa.pramen.api.FieldChange._ import za.co.absa.pramen.core.pipeline.TransformExpression import za.co.absa.pramen.core.samples.SampleCaseClass2 import za.co.absa.pramen.core.utils.SparkUtils From 0f1bf22e8c6ff4b9d66c3c8a58c2bd565b690625 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 2 May 2024 16:29:31 +0200 Subject: [PATCH 2/5] #372 Implement pipeline notification targets, and sending of custom notifications. --- README.md | 23 +++++++++- .../PipelineNotificationTargetFactory.scala | 41 +++++++++++++++++ .../pramen/core/state/PipelineStateImpl.scala | 45 ++++++++++++++++++- 3 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/notify/PipelineNotificationTargetFactory.scala diff --git a/README.md b/README.md index 3ae80a1cb..2c02d4fc0 100644 --- a/README.md +++ b/README.md @@ -2624,7 +2624,28 @@ pramen.operations = [ ``` -## Notifications +## Pipeline Notifications +Custom pipeline notification targets allow execution arbitrary actions after the pipeline is finished. Usually, it is +used to send custom notifications to external systems. A pipeline notification target can be created by implementing +`PipelineNotificationTarget` interface: +```scala +package com.example + +import za.co.absa.pramen.api.PipelineNotificationTarget + +class MyPipelineNotificationTarget extends PipelineNotificationTarget { + def sendNotification(pipelineStarted: Instant, + appException: Option[Throwable], + tasksCompleted: Seq[TaskNotification]): Unit = ??? +} +``` + +Pipeline notification targets can be registered in the workflow configuration: +```hocon +pramen.pipeline.notification.targets = [ "com.example.MyPipelineNotificationTarget" ] +``` + +## Job Notifications If you need to react on a completion event of any job, you can do it using notification targets. A notification target is a component that you can implement and register for any operation or table. The notification target will be called when the operation or job completes. Usually it is used to send an event to trigger actions outside the Pramen pipeline. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/PipelineNotificationTargetFactory.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/PipelineNotificationTargetFactory.scala new file mode 100644 index 000000000..aad0ac2cd --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/PipelineNotificationTargetFactory.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.notify + +import com.typesafe.config.Config +import za.co.absa.pramen.api.PipelineNotificationTarget +import za.co.absa.pramen.core.utils.ClassLoaderUtils + +import scala.collection.JavaConverters._ + +object PipelineNotificationTargetFactory { + val PIPELINE_NOTIFICATION_FACTORIES_KEY = "pramen.pipeline.notification.targets" + + def fromConfig(conf: Config): Seq[PipelineNotificationTarget] = { + if (conf.hasPath(PIPELINE_NOTIFICATION_FACTORIES_KEY)) { + val factories = conf.getStringList(PIPELINE_NOTIFICATION_FACTORIES_KEY).asScala + factories.map(clazz => createPipelineNotificationTarget(clazz, conf)).toSeq + } else { + Seq.empty + } + } + + private[core] def createPipelineNotificationTarget(clazz: String, appConfig: Config): PipelineNotificationTarget = { + ClassLoaderUtils.loadConfigurableClass[PipelineNotificationTarget](clazz, appConfig) + } + +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index d55d10320..dac69b61a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -19,11 +19,12 @@ package za.co.absa.pramen.core.state import com.typesafe.config.Config import org.slf4j.{Logger, LoggerFactory} import sun.misc.{Signal, SignalHandler} -import za.co.absa.pramen.api.NotificationBuilder +import za.co.absa.pramen.api.{NotificationBuilder, PipelineNotificationTarget, TaskNotification} import za.co.absa.pramen.core.app.config.HookConfig import za.co.absa.pramen.core.app.config.RuntimeConfig.EMAIL_IF_NO_CHANGES import za.co.absa.pramen.core.exceptions.{OsSignalException, ThreadStackTrace} import za.co.absa.pramen.core.metastore.peristence.{TransientJobManager, TransientTableManager} +import za.co.absa.pramen.core.notify.{NotificationTargetManager, PipelineNotificationTargetFactory} import za.co.absa.pramen.core.notify.pipeline.{PipelineNotification, PipelineNotificationEmail} import za.co.absa.pramen.core.pipeline.PipelineDef._ import za.co.absa.pramen.core.runner.task.RunStatus.NotRan @@ -45,6 +46,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification private val environmentName = conf.getString(ENVIRONMENT_NAME) private val sendEmailIfNoNewData: Boolean = conf.getBoolean(EMAIL_IF_NO_CHANGES) private val hookConfig = HookConfig.fromConfig(conf) + private val pipelineNotificationTargets = PipelineNotificationTargetFactory.fromConfig(conf) // State private val startedInstant = Instant.now @@ -84,6 +86,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification override def setSuccess(): Unit = synchronized { if (!alreadyFinished()) { exitedNormally = true + sendCustomNotifications() runCustomShutdownHook() sendNotificationEmail() } @@ -96,6 +99,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification } exitCode |= EXIT_CODE_APP_FAILED exitedNormally = false + sendCustomNotifications() runCustomShutdownHook() sendNotificationEmail() } @@ -133,6 +137,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification failureException = Some(new IllegalStateException("The application exited unexpectedly.")) } + sendCustomNotifications() runCustomShutdownHook() sendNotificationEmail() Try { @@ -166,6 +171,44 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification } } + private[state] def sendCustomNotifications(): Unit = { + val taskNotifications = taskResults.flatMap { task => + NotificationTargetManager.runStatusToTaskStatus(task.runStatus).map(taskStatus => + TaskNotification( + task.job.outputTable.name, + task.runInfo.get.infoDate, + task.runInfo.get.started, + task.runInfo.get.finished, + taskStatus, + task.applicationId, + task.isTransient, + task.isRawFilesJob, + task.schemaChanges, + task.dependencyWarnings.map(_.table), + Map.empty + ) + ) + }.toSeq + + pipelineNotificationTargets.foreach(notificationTarget => sendCustomNotification(notificationTarget, taskNotifications)) + } + + private[state] def sendCustomNotification(pipelineNotificationTarget: PipelineNotificationTarget, taskNotifications: Seq[TaskNotification]): Unit = { + try { + pipelineNotificationTarget.sendNotification( + startedInstant, + failureException, + taskNotifications + ) + } catch { + case ex: Throwable => + log.error(s"Unable to send a notification to the custom notification target: ${pipelineNotificationTarget.getClass.getName}", ex) + if (failureException.isEmpty) { + setFailure("running the shutdown hook", ex) + } + } + } + protected def sendNotificationEmail(): Unit = { failureException.foreach(ex => log.error(s"The job has FAILED.", ex)) From 0b7dddac3f309418c890f0955d7b22d08ad7869e Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 3 May 2024 11:20:34 +0200 Subject: [PATCH 3/5] #372 Add pipeline notification failures to email notifications. --- .../pipeline/PipelineNotification.scala | 3 ++- .../PipelineNotificationBuilder.scala | 4 +++- .../PipelineNotificationBuilderHtml.scala | 22 ++++++++++++++++-- .../task/PipelineNotificationFailure.scala | 22 ++++++++++++++++++ .../pramen/core/state/PipelineStateImpl.scala | 8 +++---- .../mocks/PipelineNotificationFactory.scala | 4 +++- .../PipelineNotificationBuilderSpy.scala | 5 +++- ...PipelineNotificationBuilderHtmlSuite.scala | 23 ++++++++++++++++++- 8 files changed, 80 insertions(+), 11 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/PipelineNotificationFailure.scala diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotification.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotification.scala index 3a44c7d9a..3960c033b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotification.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotification.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.notify.pipeline import za.co.absa.pramen.api.notification.{NotificationEntry, TextElement} -import za.co.absa.pramen.core.runner.task.TaskResult +import za.co.absa.pramen.core.runner.task.{PipelineNotificationFailure, TaskResult} import java.time.Instant @@ -29,6 +29,7 @@ case class PipelineNotification( started: Instant, finished: Instant, tasksCompleted: List[TaskResult], + pipelineNotificationFailures: List[PipelineNotificationFailure], customEntries: List[NotificationEntry], customSignature: List[TextElement] ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilder.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilder.scala index dca0d0e99..a06cfb0cb 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilder.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilder.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.notify.pipeline import za.co.absa.pramen.api.notification.{NotificationEntry, TextElement} -import za.co.absa.pramen.core.runner.task.TaskResult +import za.co.absa.pramen.core.runner.task.{PipelineNotificationFailure, TaskResult} import java.time.Instant @@ -40,6 +40,8 @@ trait PipelineNotificationBuilder { def addCompletedTask(completedTask: TaskResult): Unit + def addPipelineNotificationFailure(failure: PipelineNotificationFailure): Unit + def addCustomEntries(entries: Seq[NotificationEntry]): Unit def addSignature(signature: TextElement*): Unit diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index 27d42b9ba..836354784 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -26,7 +26,7 @@ import za.co.absa.pramen.core.notify.message._ import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml.{MIN_MEGABYTES, MIN_RPS_JOB_DURATION_SECONDS, MIN_RPS_RECORDS} import za.co.absa.pramen.core.pipeline.TaskRunReason import za.co.absa.pramen.core.runner.task.RunStatus._ -import za.co.absa.pramen.core.runner.task.{NotificationFailure, RunStatus, TaskResult} +import za.co.absa.pramen.core.runner.task.{NotificationFailure, PipelineNotificationFailure, RunStatus, TaskResult} import za.co.absa.pramen.core.utils.JvmUtils.getShortExceptionDescription import za.co.absa.pramen.core.utils.StringUtils.renderThrowable import za.co.absa.pramen.core.utils.{BuildPropertyUtils, ConfigUtils, StringUtils, TimeUtils} @@ -64,6 +64,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot var customSignature = Seq.empty[TextElement] val completedTasks = new ListBuffer[TaskResult] + val pipelineNotificationFailures = new ListBuffer[PipelineNotificationFailure] val customEntries = new ListBuffer[NotificationEntry] override def addFailureException(ex: Throwable): Unit = { @@ -104,6 +105,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot completedTasks += completedTask } + override def addPipelineNotificationFailure(failure: PipelineNotificationFailure): Unit = { + pipelineNotificationFailures += failure + } + override def addCustomEntries(entries: Seq[NotificationEntry]): Unit = customEntries ++= entries override def addSignature(signature: TextElement*): Unit = customSignature = signature @@ -146,6 +151,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot } }) + renderPipelineNotificationFailures(builder) + val notificationTargetErrors = completedTasks.flatMap(_.notificationTargetErrors) if (notificationTargetErrors.nonEmpty) { @@ -254,8 +261,9 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot val hasTaskWarnings = task.runStatus.isInstanceOf[RunStatus.Succeeded] && task.runStatus.asInstanceOf[RunStatus.Succeeded].warnings.nonEmpty val hasSkippedWithWarnings = task.runStatus.isInstanceOf[RunStatus.Skipped] && task.runStatus.asInstanceOf[RunStatus.Skipped].isWarning val hasSchemaChanges = task.schemaChanges.nonEmpty + val hasPipelineNotificationFailures = pipelineNotificationFailures.nonEmpty - hasDependencyWarnings || hasNotificationErrors || hasTaskWarnings || hasSkippedWithWarnings || hasSchemaChanges + hasDependencyWarnings || hasNotificationErrors || hasTaskWarnings || hasSkippedWithWarnings || hasSchemaChanges || hasPipelineNotificationFailures } } @@ -413,6 +421,16 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot builder.withTable(tableBuilder) } + private[core] def renderPipelineNotificationFailures(builder: MessageBuilderHtml): MessageBuilder = { + pipelineNotificationFailures.foreach { failure => + val notificationErrorsParagraph = ParagraphBuilder() + .withText(s"Failed to send pipeline notification via '${failure.notificationTarget}': ", Style.Exception) + builder.withParagraph(notificationErrorsParagraph) + renderException(builder, failure.ex) + } + builder + } + private[core] def renderNotificationTargetErrors(builder: MessageBuilderHtml, notificationTargetErrors: ListBuffer[NotificationFailure]): MessageBuilder = { val tableBuilder = new TableBuilderHtml diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/PipelineNotificationFailure.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/PipelineNotificationFailure.scala new file mode 100644 index 000000000..b5dee4088 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/PipelineNotificationFailure.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.runner.task + +case class PipelineNotificationFailure( + notificationTarget: String, + ex: Throwable + ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index dac69b61a..1bb58a4e9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -28,7 +28,7 @@ import za.co.absa.pramen.core.notify.{NotificationTargetManager, PipelineNotific import za.co.absa.pramen.core.notify.pipeline.{PipelineNotification, PipelineNotificationEmail} import za.co.absa.pramen.core.pipeline.PipelineDef._ import za.co.absa.pramen.core.runner.task.RunStatus.NotRan -import za.co.absa.pramen.core.runner.task.TaskResult +import za.co.absa.pramen.core.runner.task.{PipelineNotificationFailure, TaskResult} import java.time.Instant import scala.collection.mutable.ListBuffer @@ -52,6 +52,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification private val startedInstant = Instant.now @volatile private var exitCode = EXIT_CODE_SUCCESS private val taskResults = new ListBuffer[TaskResult] + private val pipelineNotificationFailures = new ListBuffer[PipelineNotificationFailure] @volatile private var failureException: Option[Throwable] = None @volatile private var exitedNormally = false @volatile private var isFinished = false @@ -203,9 +204,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification } catch { case ex: Throwable => log.error(s"Unable to send a notification to the custom notification target: ${pipelineNotificationTarget.getClass.getName}", ex) - if (failureException.isEmpty) { - setFailure("running the shutdown hook", ex) - } + pipelineNotificationFailures += PipelineNotificationFailure(pipelineNotificationTarget.getClass.getName, ex) } } @@ -230,6 +229,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification startedInstant, finishedInstant, realTaskResults.toList, + pipelineNotificationFailures.toList, customEntries.toList, customSignature.toList) if (realTaskResults.nonEmpty || sendEmailIfNoNewData || failureException.nonEmpty) { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineNotificationFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineNotificationFactory.scala index 5392101f7..4ff6af295 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineNotificationFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineNotificationFactory.scala @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.mocks import za.co.absa.pramen.api.notification.{NotificationEntry, TextElement} import za.co.absa.pramen.core.notify.pipeline import za.co.absa.pramen.core.notify.pipeline.PipelineNotification -import za.co.absa.pramen.core.runner.task.TaskResult +import za.co.absa.pramen.core.runner.task.{PipelineNotificationFailure, TaskResult} import java.time.Instant @@ -31,6 +31,7 @@ object PipelineNotificationFactory { started: Instant = Instant.ofEpochSecond(1234567L), finished: Instant = Instant.ofEpochSecond(1234568L), tasksCompleted: List[TaskResult] = List(TaskResultFactory.getDummyTaskResult()), + pipelineNotificationFailures: List[PipelineNotificationFailure] = List.empty, customEntries: List[NotificationEntry] = List.empty[NotificationEntry], customSignature: List[TextElement] = List.empty[TextElement] ): PipelineNotification = { @@ -42,6 +43,7 @@ object PipelineNotificationFactory { started, finished, tasksCompleted, + pipelineNotificationFailures, customEntries, customSignature ) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationBuilderSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationBuilderSpy.scala index ee14a04fb..10adc79e5 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationBuilderSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationBuilderSpy.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.mocks.notify import za.co.absa.pramen.api.notification.{NotificationEntry, TextElement} import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilder -import za.co.absa.pramen.core.runner.task.TaskResult +import za.co.absa.pramen.core.runner.task.{PipelineNotificationFailure, TaskResult} import java.time.Instant @@ -36,6 +36,7 @@ class PipelineNotificationBuilderSpy extends PipelineNotificationBuilder { var customSignature = Seq.empty[TextElement] var addCompletedTaskCalled = 0 + var addPipelineNotificationFailure = 0 var addCustomEntriesCalled = 0 override def addFailureException(ex: Throwable): Unit = failureException = Option(ex) @@ -62,6 +63,8 @@ class PipelineNotificationBuilderSpy extends PipelineNotificationBuilder { override def addCompletedTask(completedTask: TaskResult): Unit = addCompletedTaskCalled += 1 + override def addPipelineNotificationFailure(failure: PipelineNotificationFailure): Unit = addPipelineNotificationFailure += 1 + override def addCustomEntries(entries: Seq[NotificationEntry]): Unit = addCustomEntriesCalled += 1 override def addSignature(signature: TextElement*): Unit = customSignature = signature diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala index 5d059d822..9ae64beb0 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala @@ -26,7 +26,7 @@ import za.co.absa.pramen.core.mocks.{RunStatusFactory, SchemaDifferenceFactory, import za.co.absa.pramen.core.notify.message.{MessageBuilderHtml, ParagraphBuilder} import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml import za.co.absa.pramen.core.pipeline.TaskRunReason -import za.co.absa.pramen.core.runner.task.{NotificationFailure, RunStatus} +import za.co.absa.pramen.core.runner.task.{NotificationFailure, PipelineNotificationFailure, RunStatus} import za.co.absa.pramen.core.utils.ResourceUtils import java.time.{Instant, LocalDate} @@ -256,6 +256,27 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis } } + "renderPipelineNotificationFailures" should { + "render pipeline failure as an exception" in { + val notificationBuilder = getBuilder + val messageBuilder = new MessageBuilderHtml + + val ex = new RuntimeException("Test exception") + notificationBuilder.addPipelineNotificationFailure(PipelineNotificationFailure("com.example.MyNotification", ex)) + + val actual = notificationBuilder.renderPipelineNotificationFailures(messageBuilder) + .renderBody + + assert(actual.contains( + """

Failed to send pipeline notification via 'com.example.MyNotification':

""".stripMargin.replaceAll("\\r\\n", "\\n") + )) + + assert(actual.contains( + """
java.lang.RuntimeException: Test exception""".stripMargin.replaceAll("\\r\\n", "\\n")
+      ))
+    }
+  }
+
   "getStatus" should {
     "work for succeeded" in {
       val builder = getBuilder

From 52fe9c181072ba343cd86da8e99db67695614e43 Mon Sep 17 00:00:00 2001
From: Ruslan Iushchenko 
Date: Fri, 3 May 2024 14:26:33 +0200
Subject: [PATCH 4/5] #372 Add integration tests for notification targets.

---
 README.md                                     |  7 +-
 .../integration_notification_targets.conf     | 85 +++++++++++++++++
 .../integration/NotificationTargetSuite.scala | 93 +++++++++++++++++++
 .../ParallelExecutionLongSuite.scala          |  4 +-
 .../mocks/notify/NotificationTargetMock.scala | 46 +++++++++
 .../PipelineNotificationTargetMock.scala      | 37 ++++++++
 6 files changed, 268 insertions(+), 4 deletions(-)
 create mode 100644 pramen/core/src/test/resources/test/config/integration_notification_targets.conf
 create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/integration/NotificationTargetSuite.scala
 create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala
 create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala

diff --git a/README.md b/README.md
index 2c02d4fc0..d22ae4f37 100644
--- a/README.md
+++ b/README.md
@@ -2631,12 +2631,15 @@ used to send custom notifications to external systems. A pipeline notification t
 ```scala
 package com.example
 
+import com.typesafe.config.Config
 import za.co.absa.pramen.api.PipelineNotificationTarget
 
-class MyPipelineNotificationTarget extends PipelineNotificationTarget {
-  def sendNotification(pipelineStarted: Instant,
+class MyPipelineNotificationTarget(conf: Config) extends PipelineNotificationTarget {
+  override def sendNotification(pipelineStarted: Instant,
                        appException: Option[Throwable],
                        tasksCompleted: Seq[TaskNotification]): Unit = ???
+
+  override def config: Config = conf
 }
 ```
 
diff --git a/pramen/core/src/test/resources/test/config/integration_notification_targets.conf b/pramen/core/src/test/resources/test/config/integration_notification_targets.conf
new file mode 100644
index 000000000..d03b8ed46
--- /dev/null
+++ b/pramen/core/src/test/resources/test/config/integration_notification_targets.conf
@@ -0,0 +1,85 @@
+# Copyright 2022 ABSA Group Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This variable is expected to be set up by the test suite
+#base.path = "/tmp"
+
+pramen {
+  pipeline.name = "Integration test with notificaiton targets"
+
+  temporary.directory = ${base.path}/temp
+
+  bookkeeping.enabled = false
+  stop.spark.session = false
+}
+
+pramen.notification.targets = [
+  {
+    name = "dummy_notification_target"
+    factory.class = "za.co.absa.pramen.core.mocks.notify.NotificationTargetMock"
+
+    test.fail.notification = ${test.fail.notification}
+  }
+]
+
+pramen.pipeline.notification.targets = [ "za.co.absa.pramen.core.mocks.notify.PipelineNotificationTargetMock" ]
+
+pramen.metastore {
+  tables = [
+    {
+      name = "table1"
+      format = "parquet"
+      path = ${base.path}/table1
+    },
+    {
+      name = "table2"
+      format = "parquet"
+      path = ${base.path}/table2
+    }
+  ]
+}
+
+pramen.operations = [
+  {
+    name = "Generating dataframe"
+    type = "transformation"
+
+    class = "za.co.absa.pramen.core.mocks.transformer.GeneratingTransformer"
+    schedule.type = "daily"
+    output.table = "table1"
+
+    notification.targets = [ "dummy_notification_target" ]
+  },
+  {
+    name = "Identity transformer"
+    type = "transformation"
+    class = "za.co.absa.pramen.core.transformers.IdentityTransformer"
+    schedule.type = "daily"
+
+    output.table = "table2"
+
+    dependencies = [
+      {
+        tables = [ table1 ]
+        date.from = "@infoDate"
+        optional = true # Since no bookkeeping available the table will be seen as empty for the dependency manager
+      }
+    ]
+
+    notification.targets = [ "dummy_notification_target" ]
+
+    option {
+      table = "table1"
+    }
+  }
+]
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/NotificationTargetSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/NotificationTargetSuite.scala
new file mode 100644
index 000000000..b296f2d1b
--- /dev/null
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/NotificationTargetSuite.scala
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2022 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.pramen.core.integration
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.hadoop.fs.Path
+import org.scalatest.wordspec.AnyWordSpec
+import za.co.absa.pramen.core.base.SparkTestBase
+import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture}
+import za.co.absa.pramen.core.runner.AppRunner
+import za.co.absa.pramen.core.utils.{FsUtils, ResourceUtils}
+
+import java.time.LocalDate
+
+class NotificationTargetSuite extends AnyWordSpec with SparkTestBase with TempDirFixture with TextComparisonFixture {
+  private val infoDate = LocalDate.of(2021, 2, 18)
+
+  "Pipeline with notification targets" should {
+    val expectedSingle =
+      """{"a":"D","b":4}
+        |{"a":"E","b":5}
+        |{"a":"F","b":6}
+        |""".stripMargin
+
+    "work end to end for non-failing pipelines" in {
+      withTempDirectory("notification_targets") { tempDir =>
+        val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)
+
+
+        val conf = getConfig(tempDir)
+        val exitCode = AppRunner.runPipeline(conf)
+
+        assert(exitCode == 0)
+
+        val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate")
+
+        assert(fsUtils.exists(table2Path))
+
+        val df2 = spark.read.parquet(table2Path.toString)
+        val actual2 = df2.orderBy("a").toJSON.collect().mkString("\n")
+
+        compareText(actual2, expectedSingle)
+
+        assert(System.getProperty("pramen.test.notification.tasks.completed").toInt == 2)
+        assert(System.getProperty("pramen.test.notification.table") == "table2")
+      }
+    }
+
+    "still return zero exit code on notification failures" in {
+      withTempDirectory("notification_targets") { tempDir =>
+        val conf = getConfig(tempDir, failNotifications = true)
+        val exitCode = AppRunner.runPipeline(conf)
+
+        assert(exitCode == 0)
+
+        assert(System.getProperty("pramen.test.notification.pipeline.failure").toBoolean)
+        assert(System.getProperty("pramen.test.notification.target.failure").toBoolean)
+      }
+    }
+  }
+
+  def getConfig(basePath: String, failNotifications: Boolean = false): Config = {
+    val configContents = ResourceUtils.getResourceString("/test/config/integration_notification_targets.conf")
+    val basePathEscaped = basePath.replace("\\", "\\\\")
+
+    val conf = ConfigFactory.parseString(
+      s"""base.path = "$basePathEscaped"
+         |pramen.runtime.is.rerun = true
+         |pramen.current.date = "$infoDate"
+         |test.fail.notification = $failNotifications
+         |$configContents
+         |""".stripMargin
+    ).withFallback(ConfigFactory.load())
+      .resolve()
+
+    conf
+  }
+
+}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala
index c8a882628..33398d1ef 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala
@@ -41,7 +41,7 @@ class ParallelExecutionLongSuite extends AnyWordSpec with SparkTestBase with Tem
 
         val exitCode = AppRunner.runPipeline(conf)
 
-        assert(exitCode == 2)
+        assert(exitCode != 0)
 
         val table3Path = new Path(tempDir, "table3")
         val sink3Path = new Path(tempDir, "sink3")
@@ -70,7 +70,7 @@ class ParallelExecutionLongSuite extends AnyWordSpec with SparkTestBase with Tem
 
         val exitCode = AppRunner.runPipeline(conf)
 
-        assert(exitCode == 2)
+        assert(exitCode != 0)
 
         val table3Path = new Path(tempDir, "table3")
         val sink3Path = new Path(tempDir, "sink3")
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala
new file mode 100644
index 000000000..b7e355e61
--- /dev/null
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2022 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.pramen.core.mocks.notify
+
+import com.typesafe.config.Config
+import org.apache.spark.sql.SparkSession
+import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, TaskNotification}
+import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock.TEST_NOTIFICATION_FAIL_KEY
+
+import scala.collection.mutable.ListBuffer
+
+class NotificationTargetMock(conf: Config) extends NotificationTarget {
+  val notificationsSent: ListBuffer[TaskNotification] = new ListBuffer[TaskNotification]()
+
+  override def config: Config = conf
+
+  override def sendNotification(notification: TaskNotification): Unit = {
+    if (conf.hasPath(TEST_NOTIFICATION_FAIL_KEY) && conf.getBoolean(TEST_NOTIFICATION_FAIL_KEY)) {
+      System.setProperty("pramen.test.notification.target.failure", "true")
+      throw new RuntimeException("Notification target test exception")
+    }
+    System.setProperty("pramen.test.notification.table", notification.tableName)
+  }
+}
+
+object NotificationTargetMock extends ExternalChannelFactory[NotificationTargetMock] {
+  val TEST_NOTIFICATION_FAIL_KEY = "test.fail.notification"
+
+  override def apply(conf: Config, parentPath: String, spark: SparkSession): NotificationTargetMock = {
+    new NotificationTargetMock(conf)
+  }
+}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala
new file mode 100644
index 000000000..dd53d10e1
--- /dev/null
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2022 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.pramen.core.mocks.notify
+
+import com.typesafe.config.Config
+import za.co.absa.pramen.api.{PipelineNotificationTarget, TaskNotification}
+import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock.TEST_NOTIFICATION_FAIL_KEY
+
+import java.time.Instant
+
+class PipelineNotificationTargetMock(conf: Config) extends PipelineNotificationTarget {
+
+  override def sendNotification(pipelineStarted: Instant, appException: Option[Throwable], tasksCompleted: Seq[TaskNotification]): Unit = {
+    if (conf.hasPath(TEST_NOTIFICATION_FAIL_KEY) && conf.getBoolean(TEST_NOTIFICATION_FAIL_KEY)) {
+      System.setProperty("pramen.test.notification.pipeline.failure", "true")
+      throw new RuntimeException("Pipeline notification target test exception")
+    }
+
+    System.setProperty("pramen.test.notification.tasks.completed", tasksCompleted.length.toString)
+  }
+
+  override def config: Config = conf
+}

From 3af129b396ddd74a59a398baf099f76f3bbf048f Mon Sep 17 00:00:00 2001
From: Ruslan Iushchenko 
Date: Fri, 3 May 2024 15:05:13 +0200
Subject: [PATCH 5/5] #372 Add Spark Application Id to the pipeline
 notification target interface.

---
 README.md                                     |  5 +++--
 .../api/PipelineNotificationTarget.scala      |  1 +
 .../co/absa/pramen/api/TaskNotification.scala |  6 +++---
 .../core/runner/task/TaskRunnerBase.scala     |  9 ++++-----
 .../pramen/core/state/PipelineStateImpl.scala | 19 ++++++++++---------
 .../pramen/core/TaskNotificationFactory.scala |  6 +++---
 .../ParallelExecutionLongSuite.scala          |  4 ++--
 .../PipelineNotificationTargetMock.scala      |  2 +-
 8 files changed, 27 insertions(+), 25 deletions(-)

diff --git a/README.md b/README.md
index d22ae4f37..15c4b11a8 100644
--- a/README.md
+++ b/README.md
@@ -2636,8 +2636,9 @@ import za.co.absa.pramen.api.PipelineNotificationTarget
 
 class MyPipelineNotificationTarget(conf: Config) extends PipelineNotificationTarget {
   override def sendNotification(pipelineStarted: Instant,
-                       appException: Option[Throwable],
-                       tasksCompleted: Seq[TaskNotification]): Unit = ???
+                                applicationId: Option[String],
+                                appException: Option[Throwable],
+                                tasksCompleted: Seq[TaskNotification]): Unit = ???
 
   override def config: Config = conf
 }
diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineNotificationTarget.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineNotificationTarget.scala
index 136680773..db46d1820 100644
--- a/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineNotificationTarget.scala
+++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineNotificationTarget.scala
@@ -21,6 +21,7 @@ import java.time.Instant
 trait PipelineNotificationTarget extends ExternalChannel {
   /** Sends a notification after completion of the pipeline. */
   def sendNotification(pipelineStarted: Instant,
+                       applicationId: Option[String],
                        appException: Option[Throwable],
                        tasksCompleted: Seq[TaskNotification]): Unit
 }
diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/TaskNotification.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/TaskNotification.scala
index 3d2003a59..83103352b 100644
--- a/pramen/api/src/main/scala/za/co/absa/pramen/api/TaskNotification.scala
+++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/TaskNotification.scala
@@ -20,9 +20,9 @@ import java.time.{Instant, LocalDate}
 
 case class TaskNotification(
                              tableName: String,
-                             infoDate: LocalDate,
-                             started: Instant,
-                             finished: Instant,
+                             infoDate: Option[LocalDate],
+                             started: Option[Instant],
+                             finished: Option[Instant],
                              status: TaskStatus,
                              applicationId: String,
                              isTransient: Boolean,
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
index f5e297755..a4a7f421c 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
@@ -20,7 +20,6 @@ import com.typesafe.config.Config
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.functions.lit
 import org.slf4j.LoggerFactory
-import za.co.absa.pramen.api
 import za.co.absa.pramen.api.{DataFormat, Reason, SchemaDifference, TaskNotification}
 import za.co.absa.pramen.core.app.config.RuntimeConfig
 import za.co.absa.pramen.core.bookkeeper.Bookkeeper
@@ -394,9 +393,9 @@ abstract class TaskRunnerBase(conf: Config,
       NotificationTargetManager.runStatusToTaskStatus(result.runStatus).foreach { taskStatus =>
         val notification = TaskNotification(
           task.job.outputTable.name,
-          task.infoDate,
-          result.runInfo.get.started,
-          result.runInfo.get.finished,
+          Option(task.infoDate),
+          result.runInfo.map(_.started),
+          result.runInfo.map(_.finished),
           taskStatus,
           result.applicationId,
           result.isTransient,
@@ -441,7 +440,7 @@ abstract class TaskRunnerBase(conf: Config,
         if (diff.nonEmpty) {
           log.warn(s"$WARNING SCHEMA CHANGE for $table from $oldInfoDate to $infoDate: ${diff.map(_.toString).mkString("; ")}")
           bookkeeper.saveSchema(table.name, infoDate, df.schema)
-          api.SchemaDifference(table.name, oldInfoDate, infoDate, diff) :: Nil
+          SchemaDifference(table.name, oldInfoDate, infoDate, diff) :: Nil
         } else {
           Nil
         }
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
index 1bb58a4e9..cfdde8c7f 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
@@ -24,17 +24,17 @@ import za.co.absa.pramen.core.app.config.HookConfig
 import za.co.absa.pramen.core.app.config.RuntimeConfig.EMAIL_IF_NO_CHANGES
 import za.co.absa.pramen.core.exceptions.{OsSignalException, ThreadStackTrace}
 import za.co.absa.pramen.core.metastore.peristence.{TransientJobManager, TransientTableManager}
-import za.co.absa.pramen.core.notify.{NotificationTargetManager, PipelineNotificationTargetFactory}
 import za.co.absa.pramen.core.notify.pipeline.{PipelineNotification, PipelineNotificationEmail}
+import za.co.absa.pramen.core.notify.{NotificationTargetManager, PipelineNotificationTargetFactory}
 import za.co.absa.pramen.core.pipeline.PipelineDef._
 import za.co.absa.pramen.core.runner.task.RunStatus.NotRan
 import za.co.absa.pramen.core.runner.task.{PipelineNotificationFailure, TaskResult}
 
 import java.time.Instant
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.util.control.NonFatal
 import scala.util.{Failure, Success, Try}
-import scala.collection.JavaConverters._
 
 class PipelineStateImpl(implicit conf: Config, notificationBuilder: NotificationBuilder) extends PipelineState {
   import PipelineStateImpl._
@@ -87,7 +87,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
   override def setSuccess(): Unit = synchronized {
     if (!alreadyFinished()) {
       exitedNormally = true
-      sendCustomNotifications()
+      sendPipelineNotifications()
       runCustomShutdownHook()
       sendNotificationEmail()
     }
@@ -100,7 +100,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
       }
       exitCode |= EXIT_CODE_APP_FAILED
       exitedNormally = false
-      sendCustomNotifications()
+      sendPipelineNotifications()
       runCustomShutdownHook()
       sendNotificationEmail()
     }
@@ -138,7 +138,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
           failureException = Some(new IllegalStateException("The application exited unexpectedly."))
         }
 
-        sendCustomNotifications()
+        sendPipelineNotifications()
         runCustomShutdownHook()
         sendNotificationEmail()
         Try {
@@ -172,14 +172,14 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
     }
   }
 
-  private[state] def sendCustomNotifications(): Unit = {
+  private[state] def sendPipelineNotifications(): Unit = {
     val taskNotifications = taskResults.flatMap { task =>
       NotificationTargetManager.runStatusToTaskStatus(task.runStatus).map(taskStatus =>
         TaskNotification(
           task.job.outputTable.name,
-          task.runInfo.get.infoDate,
-          task.runInfo.get.started,
-          task.runInfo.get.finished,
+          task.runInfo.map(_.infoDate),
+          task.runInfo.map(_.started),
+          task.runInfo.map(_.finished),
           taskStatus,
           task.applicationId,
           task.isTransient,
@@ -198,6 +198,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
     try {
       pipelineNotificationTarget.sendNotification(
         startedInstant,
+        sparkAppId,
         failureException,
         taskNotifications
       )
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala
index cd1a19f84..210eb79ec 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala
@@ -22,9 +22,9 @@ import java.time.{Instant, LocalDate}
 
 object TaskNotificationFactory {
   def getDummyTaskNotification(tableName: String = "dummy_table",
-                               infoDate: LocalDate = LocalDate.of(2022, 2, 18),
-                               started: Instant = Instant.ofEpochMilli(1613600000000L),
-                               finished: Instant = Instant.ofEpochMilli(1672759508000L),
+                               infoDate: Option[LocalDate] = Some(LocalDate.of(2022, 2, 18)),
+                               started: Option[Instant] = Some(Instant.ofEpochMilli(1613600000000L)),
+                               finished: Option[Instant] = Some(Instant.ofEpochMilli(1672759508000L)),
                                status: TaskStatus = TaskStatus.Succeeded(100, Seq.empty, Seq.empty, Seq.empty, Seq.empty),
                                applicationId: String = "app_12345",
                                isTransient: Boolean = false,
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala
index 33398d1ef..c8a882628 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/ParallelExecutionLongSuite.scala
@@ -41,7 +41,7 @@ class ParallelExecutionLongSuite extends AnyWordSpec with SparkTestBase with Tem
 
         val exitCode = AppRunner.runPipeline(conf)
 
-        assert(exitCode != 0)
+        assert(exitCode == 2)
 
         val table3Path = new Path(tempDir, "table3")
         val sink3Path = new Path(tempDir, "sink3")
@@ -70,7 +70,7 @@ class ParallelExecutionLongSuite extends AnyWordSpec with SparkTestBase with Tem
 
         val exitCode = AppRunner.runPipeline(conf)
 
-        assert(exitCode != 0)
+        assert(exitCode == 2)
 
         val table3Path = new Path(tempDir, "table3")
         val sink3Path = new Path(tempDir, "sink3")
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala
index dd53d10e1..a97e7a826 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala
@@ -24,7 +24,7 @@ import java.time.Instant
 
 class PipelineNotificationTargetMock(conf: Config) extends PipelineNotificationTarget {
 
-  override def sendNotification(pipelineStarted: Instant, appException: Option[Throwable], tasksCompleted: Seq[TaskNotification]): Unit = {
+  override def sendNotification(pipelineStarted: Instant, applicationId: Option[String], appException: Option[Throwable], tasksCompleted: Seq[TaskNotification]): Unit = {
     if (conf.hasPath(TEST_NOTIFICATION_FAIL_KEY) && conf.getBoolean(TEST_NOTIFICATION_FAIL_KEY)) {
       System.setProperty("pramen.test.notification.pipeline.failure", "true")
       throw new RuntimeException("Pipeline notification target test exception")