Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#389 Add Spark application id to jobs that fail before running a task. #391

Merged
merged 2 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ case class PipelineNotification(
exception: Option[Throwable],
pipelineName: String,
environmentName: String,
sparkAppId: Option[String],
started: Instant,
finished: Instant,
tasksCompleted: List[TaskResult],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ trait PipelineNotificationBuilder {

def addAppName(appName: String): Unit

def addSparkAppId(sparkAppId: String): Unit

def addEnvironmentName(env: String): Unit

def addAppDuration(appStarted: Instant, appFinished: Instant): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

var appException: Option[Throwable] = None
var appName: String = "Unspecified Job"
var sparkAppId: Option[String] = None
var envName: String = "Unspecified Environment"
var appStarted: Instant = Instant.now()
var appFinished: Instant = Instant.now()
Expand All @@ -72,6 +73,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
this.appName = appName
}

override def addSparkAppId(sparkAppId: String): Unit = {
this.sparkAppId = Option(sparkAppId)
}

override def addEnvironmentName(envName: String): Unit = {
this.envName = envName
}
Expand Down Expand Up @@ -191,7 +196,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

introParagraph.withText(".")

val applicationIds = completedTasks.map(_.applicationId.trim).filter(_.nonEmpty).distinct
val applicationIds = getSparkApplicationIds

// This handles the case when all tasks are run under the same Spark Session.
// When Pramen support runners that run tasks in different Spark Sessions (via Yarn, Glue etc APIs), this will need
Expand Down Expand Up @@ -227,6 +232,13 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
builder
}

private[core] def getSparkApplicationIds: Seq[String] = {
sparkAppId match {
case Some(appId) => (appId +: completedTasks.map(_.applicationId.trim).filter(_.nonEmpty)).distinct.toSeq
case None => completedTasks.map(_.applicationId.trim).filter(_.nonEmpty).distinct.toSeq
}
}

private[core] def getSuccessFlags: (Boolean, Boolean) = {
val hasNotificationFailures = completedTasks.exists(t => t.notificationTargetErrors.nonEmpty)
val someTasksSucceeded = completedTasks.exists(_.runStatus.isInstanceOf[Succeeded]) && appException.isEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object PipelineNotificationDirector {

notificationBuilder.addAppName(notification.pipelineName)
notificationBuilder.addEnvironmentName(notification.environmentName)
notification.sparkAppId.foreach(id => notificationBuilder.addSparkAppId(id))
notificationBuilder.addAppDuration(notification.started, notification.finished)
notificationBuilder.addDryRun(dryRun)
notificationBuilder.addUndercover(undercover)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ object AppRunner {

val exitCodeTry = for {
spark <- getSparkSession(conf, state)
_ <- Try { state.setSparkAppId(spark.sparkContext.applicationId) }
_ <- logBanner(spark)
_ <- logExecutorNodes(conf, state, spark)
appContext <- createAppContext(conf, state, spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ trait PipelineState extends AutoCloseable {

def setFailure(stage: String, exception: Throwable): Unit

def setSparkAppId(sparkAppId: String): Unit

def addTaskCompletion(statuses: Seq[TaskResult]): Unit

def getExitCode: Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
@volatile private var exitedNormally = false
@volatile private var isFinished = false
@volatile private var customShutdownHookCanRun = false
@volatile private var sparkAppId: Option[String] = None

init()

Expand Down Expand Up @@ -100,6 +101,10 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
}
}

override def setSparkAppId(sparkAppId: String): Unit = synchronized {
this.sparkAppId = Option(sparkAppId)
}

override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized {
taskResults ++= statuses.filter(_.runStatus != NotRan)
if (statuses.exists(_.runStatus.isFailure)) {
Expand Down Expand Up @@ -178,6 +183,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
val notification = PipelineNotification(failureException,
pipelineName,
environmentName,
sparkAppId,
startedInstant,
finishedInstant,
realTaskResults.toList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object PipelineNotificationFactory {
def getDummyNotification(exception: Option[Throwable] = None,
pipelineName: String = "DummyPipeline",
environmentName: String = "DummyEnvironment",
sparkAppId: Option[String] = None,
started: Instant = Instant.ofEpochSecond(1234567L),
finished: Instant = Instant.ofEpochSecond(1234568L),
tasksCompleted: List[TaskResult] = List(TaskResultFactory.getDummyTaskResult()),
Expand All @@ -37,6 +38,7 @@ object PipelineNotificationFactory {
exception,
pipelineName,
environmentName,
sparkAppId,
started,
finished,
tasksCompleted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.time.Instant
class PipelineNotificationBuilderSpy extends PipelineNotificationBuilder {
var failureException: Option[Throwable] = None
var appName = ""
var sparkId = ""
var environmentName = ""
var appStarted: Instant = Instant.MIN
var appFinished: Instant = Instant.MIN
Expand All @@ -41,6 +42,8 @@ class PipelineNotificationBuilderSpy extends PipelineNotificationBuilder {

override def addAppName(name: String): Unit = appName = name

override def addSparkAppId(sparkAppId: String): Unit = sparkId = sparkAppId

override def addEnvironmentName(env: String): Unit = environmentName = env

override def addAppDuration(started: Instant, finished: Instant): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class PipelineStateSpy extends PipelineState {
val failures = new ListBuffer[(String, Throwable)]
val completedStatuses = new ListBuffer[TaskResult]
var closeCalled = 0
var sparkAppId: Option[String] = None

override def getState(): PipelineStateSnapshot = {
PipelineStateSnapshot(
Expand All @@ -53,6 +54,10 @@ class PipelineStateSpy extends PipelineState {
failures.append((stage, ex))
}

override def setSparkAppId(sparkAppId: String): Unit = synchronized {
this.sparkAppId = Option(sparkAppId)
}

override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized {
completedStatuses ++= statuses
}
Expand Down
Loading