Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#311 Add the ability to run multiple workflows in a single Spark Session #334

Merged
merged 11 commits into from
Jan 10, 2024
Merged
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,6 @@ pramen {
spark.app.name = "Pramen - "${pramen.pipeline.name}

# The number of tasks to run in parallel. A task is a source, transformer, or sink running for a specified information date.
# This feature is experimental, use more than 1 with caution.
parallel.tasks = 1

# You can set this option so that Pramen never writes to partitions older than the specified date
Expand Down Expand Up @@ -1456,7 +1455,6 @@ pramen.operations = [
# By setting 'consume.threads' to greater value than 1, the task will appear to require more than 1 thread to run.
# Thus, the task will take up multiple "slots" in 'pramen.parallel.tasks' setting.
# This is useful if some tasks consume lot of memory and CPU and should not be running with other tasks in parallel.
# This feature is experimental. Use with caution.
consume.threads = 2

tables = [
Expand Down Expand Up @@ -2765,6 +2763,17 @@ spark-submit --class za.co.absa.pramen.runner.PipelineRunner \
pramen-runner_2.11-x.y.z.jar --workflow my_workflow.conf
```

**Experimental:** Use `--workflows` (plural) to specify comma-separated list of multiple workflows to run in
a single Spark Session.
```sh
spark-submit --class za.co.absa.pramen.runner.PipelineRunner \
pramen-runner_2.11-x.y.z.jar --workflows /path/my_workflow1.conf,/path/my_workflow2.conf
```

If one of workflows fails, the rest will still be attempted. The command will return non-zero exit code if at least one
of pipelines fail.
yruslan marked this conversation as resolved.
Show resolved Hide resolved


### Running built-in jobs

```sh
Expand Down
10 changes: 6 additions & 4 deletions pramen/core/src/main/resources/email_template/style.css
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
}

.datagrid table thead th {
background: -webkit-gradient(linear, left top, left bottom, color-stop(0.05, #006699), color-stop(1, #00557F));
background: -moz-linear-gradient(center top, #006699 5%, #00557F 100%);
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#006699', endColorstr='#00557F');
background-color: #006699;
background: #00557F;
background: -moz-linear-gradient(top, #006699 5%, #00557F 100%);
background: -webkit-linear-gradient(top, #006699 5%, #00557F 100%);
background: -o-linear-gradient(top, #006699 5%, #00557F 100%);
background: -ms-linear-gradient(top, #006699 5%, #00557F 100%);
background: linear-gradient(to bottom, #006699 5%, #00557F 100%);
color: #FFFFFF;
font-size: 14px;
font-weight: bold;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package za.co.absa.pramen.core

import com.typesafe.config.Config
import za.co.absa.pramen.api.{MetadataManager, NotificationBuilder, Pramen}
import za.co.absa.pramen.api.app.PramenFactory
import za.co.absa.pramen.api.common.BuildPropertiesRetriever
import za.co.absa.pramen.api.{MetadataManager, NotificationBuilder, Pramen}
import za.co.absa.pramen.core.state.NotificationBuilderImpl
import za.co.absa.pramen.core.utils.BuildPropertyUtils

Expand All @@ -42,11 +42,25 @@ class PramenImpl extends Pramen {
throw new IllegalStateException("Metadata manager is not available at the context.")
)

private[core] def setWorkflowConfig(config: Config): Unit = _workflowConfig = Option(config)
private[core] def setWorkflowConfig(config: Config): Unit = synchronized {
_workflowConfig = Option(config)
}

private[core] def setMetadataManager(m: MetadataManager): Unit = _metadataManager = Option(m)
private[core] def setMetadataManager(m: MetadataManager): Unit = synchronized {
_metadataManager = Option(m)
}

private[core] def reset(): Unit = synchronized {
notificationBuilderImpl.reset()
_workflowConfig = None
_metadataManager = None
}
}

object PramenImpl extends PramenFactory {
lazy val instance: Pramen = new PramenImpl
val instance: Pramen = new PramenImpl

private[core] def reset(): Unit = this.synchronized {
instance.asInstanceOf[PramenImpl].reset()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.app.config.RuntimeConfig.VERBOSE
import za.co.absa.pramen.core.cmd.CmdLineConfig
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.utils.{ConfigUtils, JavaXConfig}
import za.co.absa.pramen.core.utils.JavaXConfig

import java.io.File
import java.nio.file.{FileSystems, Files, Paths}
Expand All @@ -33,7 +32,27 @@ import scala.util.Try
object RunnerCommons {
private val log = LoggerFactory.getLogger(this.getClass)

def getMainContext(args: Array[String]): Config = {
/**
* The method parses command line parameters and returns workflow configs.
*
* The method is guaranteed to return at least one workflow configuration.
*
* The first configuration in the list is considered the 'primary one'. For options
yruslan marked this conversation as resolved.
Show resolved Hide resolved
* that affect the whole runtime, and cannot be different for different pipeline, will
* be used from the primary configuration.
*
* Such options are:
* - pramen.exit.code.enabled
* - JVM security options: javax.*, java.* (see JavaXConfig.setJavaXProperties)
* - Spark Session config: pramen.spark.conf.option.*
* This is because restarting Spark is not supported by most of runtimes.
*
* On the other hand, Hadoop options (hadoop.option.*), will be applied for each workflow.
*
* @param args Arguments passed to the command line.
* @return Workflow configs. At least one configuration will be returned.
*/
def getMainContext(args: Array[String]): Seq[Config] = {
val rootLogger = Logger.getRootLogger

val cmdLineConfig = CmdLineConfig(args)
Expand All @@ -45,13 +64,12 @@ object RunnerCommons {
copyFilesToLocal(cmdLineConfig.files, hadoopConfig)
}

val conf: Config = getConfig(cmdLineConfig.configPathName, cmdLineConfig)
val configs: Seq[Config] = getConfigs(cmdLineConfig.configPathNames, cmdLineConfig)
val primaryConfig = configs.head

JavaXConfig.setJavaXProperties(conf)
JavaXConfig.setJavaXProperties(primaryConfig)

ConfigUtils.logEffectiveConfigProps(conf, Keys.CONFIG_KEYS_TO_REDACT, Keys.KEYS_TO_REDACT)

if (!conf.getBoolean(VERBOSE)) {
if (!primaryConfig.getBoolean(VERBOSE)) {
// Switch logging level to WARN
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
Expand All @@ -70,9 +88,26 @@ object RunnerCommons {
}
}

conf
configs
}

/**
* Returns workflow configurations defined in the command line config. One
* configuration is always returned. Event if workflow path is not defined,
yruslan marked this conversation as resolved.
Show resolved Hide resolved
* the default TypeSafe config (application.conf + reference.conf) is returned.
*/
def getConfigs(configPaths: Seq[String], cmd: CmdLineConfig): Seq[Config] = {
if (configPaths.isEmpty) {
Seq(getConfig(None, cmd))
} else {
configPaths.map(path => getConfig(Some(path), cmd))
}
}

/**
* Returns the workflow configuration defined either by the workflow path or
* from the default TypeSafe configuration source (application.conf + reference.conf).
*/
def getConfig(configPath: Option[String], cmd: CmdLineConfig): Config = {
val originalConfig = ConfigFactory.load()

Expand All @@ -94,6 +129,7 @@ object RunnerCommons {
CmdLineConfig.applyCmdLineToConfig(conf, cmd)
}

/** Checks path existence at the absolute location and in the current path, an returns whichever works. */
yruslan marked this conversation as resolved.
Show resolved Hide resolved
def getExistingWorkflowPath(pathStr: String): String = {
val path = Paths.get(pathStr)

Expand All @@ -113,6 +149,10 @@ object RunnerCommons {
throw new IllegalArgumentException(s"The workflow configuration '$pathStr' does not exist at the driver node.")
}

/**
* Copies the list of files from a Hadoop location (S3, HDFS, etc) to the current directory.
* The Spark Session is not required to be started at this point, Hadoop FileSystem API is used.
*/
def copyFilesToLocal(files: Seq[String], hadoopConfig: Configuration): Unit = {
val currentPath = new Path(".")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.time.LocalDate
import scala.collection.JavaConverters.asJavaIterableConverter

case class CmdLineConfig(
configPathName: Option[String] = None,
configPathNames: Seq[String] = Seq.empty,
files: Seq[String] = Seq.empty[String],
operations: Seq[String] = Seq.empty[String],
currentDate: Option[LocalDate] = None,
Expand Down Expand Up @@ -126,10 +126,28 @@ object CmdLineConfig {
private class CmdParser(programName: String) extends OptionParser[CmdLineConfig](programName) {
head("\nPramen Workflow Runner")

opt[String]("workflow").required().action((value, config) =>
config.copy(configPathName = Option(value)))
opt[String]("workflow").optional().action((value, config) =>
config.copy(configPathNames = Seq(value)))
.text("Path to a workflow configuration")

opt[String]("workflows").optional()
.action((value, config) =>
if (config.configPathNames.nonEmpty) {
throw new IllegalArgumentException(s"Cannot use both --workflow and --workflows options together")
} else {
config.copy(configPathNames = value.split(',').map(_.trim))
}
)
.text("Path to a comma separated list of workflow configurations")

checkConfig(c =>
if (c.configPathNames.isEmpty) {
failure("Either --workflow or --workflows option must be specified")
} else {
success
}
)

opt[String]("files").optional().action((value, config) =>
config.copy(files = value.split(',').map(_.trim)))
.text("A comma-separated list of files to get from HDFS or S3 (use Hadoop prefix s3a://, hdfs://). " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package za.co.absa.pramen.core.metastore.peristence

import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -170,7 +169,7 @@ object MetastorePersistenceTransient {
}
}

private[core] def cleanup(): Unit = synchronized {
private[core] def reset(): Unit = synchronized {
rawDataframes.clear()
cachedDataframes.foreach { case (_, df) => df.unpersist() }
cachedDataframes.clear()
Expand Down
Loading
Loading