Skip to content

Commit

Permalink
#302 Improve handling of fatal errors in parallel pipeline executions.
Browse files Browse the repository at this point in the history
This might not fix all the causes of #302, but should prevent most of it from happening.
  • Loading branch information
yruslan committed Nov 24, 2023
1 parent 7d781b8 commit b72d4de
Show file tree
Hide file tree
Showing 9 changed files with 457 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object OperationDef {
val dependencyConfigs = conf.getConfigList(DEPENDENCIES_KEY)
dependencyConfigs.asScala
.zipWithIndex
.map { case (c, i) => MetastoreDependency.fromConfig(c, s"$parent[$i]") }
.map { case (c, i) => MetastoreDependency.fromConfig(c, s"$parent[${i + 1}]") }
.toSeq
} else {
Seq.empty[MetastoreDependency]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object PipelineDef {
val environment = conf.getString(ENVIRONMENT_NAME)
val operations = ConfigUtils.getOptionConfigList(conf, OPERATIONS_KEY)
.zipWithIndex
.flatMap{ case (c, i) => OperationDef.fromConfig(c, conf, infoDateConfig, s"$OPERATIONS_KEY[$i]", defaultDelayDays) }
.flatMap{ case (c, i) => OperationDef.fromConfig(c, conf, infoDateConfig, s"$OPERATIONS_KEY[${i+1}]", defaultDelayDays) }
.toSeq
PipelineDef(name, environment, operations)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import java.util.concurrent.Executors.newFixedThreadPool
import scala.concurrent.ExecutionContext.fromExecutorService
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContextExecutorService, Future}
import scala.util.Try
import scala.util.control.NonFatal

class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,
Expand Down Expand Up @@ -72,6 +71,7 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,
if (!loopStarted) {
throw new IllegalStateException("Worker loop hasn't started yet")
}
log.info("Waiting for worker threads to finish...")
Await.result(workersFuture, Duration.Inf)
executionContext.shutdown()
loopStarted = false
Expand All @@ -86,23 +86,24 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,

completedJobsChannel.send((job, Nil, isSucceeded))
} catch {
case ex: FatalErrorWrapper if ex.cause != null =>
log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex.cause)
val fatalEx = new RuntimeException(s"FATAL exception encountered, stopping the pipeline.", ex.cause)
completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(fatalEx), None, applicationId, isTransient, Nil, Nil, Nil) :: Nil, false))
completedJobsChannel.close()
case NonFatal(ex) =>
completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(ex), None, applicationId, isTransient, Nil, Nil, Nil) :: Nil, false))
case ex: Throwable =>
log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex)
val fatalEx = new RuntimeException(s"FATAL exception encountered, stopping the pipeline.", ex)
completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(fatalEx), None, applicationId, isTransient, Nil, Nil, Nil) :: Nil, false))
completedJobsChannel.close()
case ex: FatalErrorWrapper if ex.cause != null => onFatalException(ex.cause, job, isTransient)
case NonFatal(ex) => sendFailure(ex, job, isTransient)
case ex: Throwable => onFatalException(ex, job, isTransient)
}
}
completedJobsChannel.close()
}

private[core] def onFatalException(ex: Throwable, job: Job, isTransient: Boolean): Unit = {
log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex)
val fatalEx = new FatalErrorWrapper(s"FATAL exception encountered, stopping the pipeline.", ex)
sendFailure(fatalEx, job, isTransient)
}

private[core] def sendFailure(ex: Throwable, job: Job, isTransient: Boolean): Unit = {
completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(ex), None, applicationId, isTransient, Nil, Nil, Nil) :: Nil, false))
}

private[core] def runJob(job: Job): Boolean = {
val scheduleParams = ScheduleParams.fromRuntimeConfig(runtimeConfig, job.trackDays, job.operation.expectedDelayDays)

Expand All @@ -121,7 +122,15 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,

val fut = taskRunner.runJobTasks(job, taskDefs)

log.info("Waiting for all job tasks to finish...")
val statuses = Await.result(fut, Duration.Inf)
log.info("All job tasks have finished.")

// Rethrow fatal errors so the pipeline can be stopped asap.
statuses.foreach {
case RunStatus.Failed(ex) if ex.isInstanceOf[FatalErrorWrapper] => throw ex
case _ => // skip
}

statuses.forall(s => !s.isFailure)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.DataFormat
import za.co.absa.pramen.core.app.AppContext
import za.co.absa.pramen.core.exceptions.FatalErrorWrapper
import za.co.absa.pramen.core.pipeline.{Job, JobDependency, OperationType}
import za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunner
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils.evaluateRunDate
Expand Down Expand Up @@ -77,11 +78,14 @@ class OrchestratorImpl extends Orchestrator {
jobRunner.startWorkerLoop(runJobChannel)

val atLeastOneStarted = sendPendingJobs(runJobChannel, dependencyResolver)
var hasFatalErrors = false

if (atLeastOneStarted) {
completedJobsChannel.foreach { case (finishedJob, taskResults, isSucceeded) =>
runningJobs.remove(finishedJob)

hasFatalErrors = hasFatalErrors || taskResults.exists(status => isFatalFailure(status.runStatus))

val hasAnotherUnfinishedJob = hasAnotherJobWithSameOutputTable(finishedJob.outputTable.name)
if (hasAnotherUnfinishedJob) {
log.info(s"There is another job outputting to ${finishedJob.outputTable.name}. Waiting for it to finish before marking the table as finished.")
Expand All @@ -93,9 +97,18 @@ class OrchestratorImpl extends Orchestrator {

state.addTaskCompletion(taskResults)

val jobStarted = sendPendingJobs(runJobChannel, dependencyResolver)
if (!jobStarted && runningJobs.isEmpty) {
runJobChannel.close()
if (hasFatalErrors) {
// In case of a fatal error we either need to interrupt running threads, or wait for them to return.
// In the current implementation we wait for threads to finish, but not start new jobs in running threads.
// This can be also reconsidered, if there are issues with the current solutions observed.
if (runningJobs.isEmpty) {
runJobChannel.close()
}
} else {
val jobStarted = sendPendingJobs(runJobChannel, dependencyResolver)
if (!jobStarted && runningJobs.isEmpty) {
runJobChannel.close()
}
}
}
}
Expand All @@ -117,6 +130,13 @@ class OrchestratorImpl extends Orchestrator {
jobRunner.shutdown()
}

private def isFatalFailure(runStatus: RunStatus): Boolean = {
runStatus match {
case RunStatus.Failed(ex) if ex.isInstanceOf[FatalErrorWrapper] => true
case _ => false
}
}

private def hasNonPassiveNonOptionalDeps(job: Job, missingTables: Seq[String]): Boolean = {
missingTables.exists(table =>
job.operation.dependencies.exists(d => !d.isPassive && !d.isOptional && d.tables.contains(table))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{DataFormat, Reason, TaskNotification}
import za.co.absa.pramen.core.app.config.RuntimeConfig
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.ReasonException
import za.co.absa.pramen.core.exceptions.{FatalErrorWrapper, ReasonException}
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.journal.model.TaskCompleted
import za.co.absa.pramen.core.lock.TokenLockFactory
Expand Down Expand Up @@ -323,7 +323,7 @@ abstract class TaskRunnerBase(conf: Config,
Seq.empty)
}
} catch {
case ex: Throwable => Failure(ex)
case ex: Throwable => Failure(new FatalErrorWrapper("Fatal error has occurred.", ex))
} finally {
lock.release()
}
Expand Down
Loading

0 comments on commit b72d4de

Please sign in to comment.