Skip to content

Commit

Permalink
#372 Add integration tests for notification targets.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed May 3, 2024
1 parent 0b7ddda commit 52fe9c1
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 4 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2631,12 +2631,15 @@ used to send custom notifications to external systems. A pipeline notification t
```scala
package com.example

import com.typesafe.config.Config
import za.co.absa.pramen.api.PipelineNotificationTarget

class MyPipelineNotificationTarget extends PipelineNotificationTarget {
def sendNotification(pipelineStarted: Instant,
class MyPipelineNotificationTarget(conf: Config) extends PipelineNotificationTarget {
override def sendNotification(pipelineStarted: Instant,
appException: Option[Throwable],
tasksCompleted: Seq[TaskNotification]): Unit = ???

override def config: Config = conf
}
```

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This variable is expected to be set up by the test suite
#base.path = "/tmp"

pramen {
pipeline.name = "Integration test with notificaiton targets"

temporary.directory = ${base.path}/temp

bookkeeping.enabled = false
stop.spark.session = false
}

pramen.notification.targets = [
{
name = "dummy_notification_target"
factory.class = "za.co.absa.pramen.core.mocks.notify.NotificationTargetMock"

test.fail.notification = ${test.fail.notification}
}
]

pramen.pipeline.notification.targets = [ "za.co.absa.pramen.core.mocks.notify.PipelineNotificationTargetMock" ]

pramen.metastore {
tables = [
{
name = "table1"
format = "parquet"
path = ${base.path}/table1
},
{
name = "table2"
format = "parquet"
path = ${base.path}/table2
}
]
}

pramen.operations = [
{
name = "Generating dataframe"
type = "transformation"

class = "za.co.absa.pramen.core.mocks.transformer.GeneratingTransformer"
schedule.type = "daily"
output.table = "table1"

notification.targets = [ "dummy_notification_target" ]
},
{
name = "Identity transformer"
type = "transformation"
class = "za.co.absa.pramen.core.transformers.IdentityTransformer"
schedule.type = "daily"

output.table = "table2"

dependencies = [
{
tables = [ table1 ]
date.from = "@infoDate"
optional = true # Since no bookkeeping available the table will be seen as empty for the dependency manager
}
]

notification.targets = [ "dummy_notification_target" ]

option {
table = "table1"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.integration

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.fs.Path
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture}
import za.co.absa.pramen.core.runner.AppRunner
import za.co.absa.pramen.core.utils.{FsUtils, ResourceUtils}

import java.time.LocalDate

class NotificationTargetSuite extends AnyWordSpec with SparkTestBase with TempDirFixture with TextComparisonFixture {
private val infoDate = LocalDate.of(2021, 2, 18)

"Pipeline with notification targets" should {
val expectedSingle =
"""{"a":"D","b":4}
|{"a":"E","b":5}
|{"a":"F","b":6}
|""".stripMargin

"work end to end for non-failing pipelines" in {
withTempDirectory("notification_targets") { tempDir =>
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)


val conf = getConfig(tempDir)
val exitCode = AppRunner.runPipeline(conf)

assert(exitCode == 0)

val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate")

assert(fsUtils.exists(table2Path))

val df2 = spark.read.parquet(table2Path.toString)
val actual2 = df2.orderBy("a").toJSON.collect().mkString("\n")

compareText(actual2, expectedSingle)

assert(System.getProperty("pramen.test.notification.tasks.completed").toInt == 2)
assert(System.getProperty("pramen.test.notification.table") == "table2")
}
}

"still return zero exit code on notification failures" in {
withTempDirectory("notification_targets") { tempDir =>
val conf = getConfig(tempDir, failNotifications = true)
val exitCode = AppRunner.runPipeline(conf)

assert(exitCode == 0)

assert(System.getProperty("pramen.test.notification.pipeline.failure").toBoolean)
assert(System.getProperty("pramen.test.notification.target.failure").toBoolean)
}
}
}

def getConfig(basePath: String, failNotifications: Boolean = false): Config = {
val configContents = ResourceUtils.getResourceString("/test/config/integration_notification_targets.conf")
val basePathEscaped = basePath.replace("\\", "\\\\")

val conf = ConfigFactory.parseString(
s"""base.path = "$basePathEscaped"
|pramen.runtime.is.rerun = true
|pramen.current.date = "$infoDate"
|test.fail.notification = $failNotifications
|$configContents
|""".stripMargin
).withFallback(ConfigFactory.load())
.resolve()

conf
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ParallelExecutionLongSuite extends AnyWordSpec with SparkTestBase with Tem

val exitCode = AppRunner.runPipeline(conf)

assert(exitCode == 2)
assert(exitCode != 0)

val table3Path = new Path(tempDir, "table3")
val sink3Path = new Path(tempDir, "sink3")
Expand Down Expand Up @@ -70,7 +70,7 @@ class ParallelExecutionLongSuite extends AnyWordSpec with SparkTestBase with Tem

val exitCode = AppRunner.runPipeline(conf)

assert(exitCode == 2)
assert(exitCode != 0)

val table3Path = new Path(tempDir, "table3")
val sink3Path = new Path(tempDir, "sink3")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.mocks.notify

import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, TaskNotification}
import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock.TEST_NOTIFICATION_FAIL_KEY

import scala.collection.mutable.ListBuffer

class NotificationTargetMock(conf: Config) extends NotificationTarget {
val notificationsSent: ListBuffer[TaskNotification] = new ListBuffer[TaskNotification]()

override def config: Config = conf

override def sendNotification(notification: TaskNotification): Unit = {
if (conf.hasPath(TEST_NOTIFICATION_FAIL_KEY) && conf.getBoolean(TEST_NOTIFICATION_FAIL_KEY)) {
System.setProperty("pramen.test.notification.target.failure", "true")
throw new RuntimeException("Notification target test exception")
}
System.setProperty("pramen.test.notification.table", notification.tableName)
}
}

object NotificationTargetMock extends ExternalChannelFactory[NotificationTargetMock] {
val TEST_NOTIFICATION_FAIL_KEY = "test.fail.notification"

override def apply(conf: Config, parentPath: String, spark: SparkSession): NotificationTargetMock = {
new NotificationTargetMock(conf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.mocks.notify

import com.typesafe.config.Config
import za.co.absa.pramen.api.{PipelineNotificationTarget, TaskNotification}
import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock.TEST_NOTIFICATION_FAIL_KEY

import java.time.Instant

class PipelineNotificationTargetMock(conf: Config) extends PipelineNotificationTarget {

override def sendNotification(pipelineStarted: Instant, appException: Option[Throwable], tasksCompleted: Seq[TaskNotification]): Unit = {
if (conf.hasPath(TEST_NOTIFICATION_FAIL_KEY) && conf.getBoolean(TEST_NOTIFICATION_FAIL_KEY)) {
System.setProperty("pramen.test.notification.pipeline.failure", "true")
throw new RuntimeException("Pipeline notification target test exception")
}

System.setProperty("pramen.test.notification.tasks.completed", tasksCompleted.length.toString)
}

override def config: Config = conf
}

0 comments on commit 52fe9c1

Please sign in to comment.