From 1cc1bbd491b8e89b3199f709f329473accf32465 Mon Sep 17 00:00:00 2001 From: "Sofiane ALI (contractor)" Date: Tue, 16 Apr 2024 13:35:43 +0200 Subject: [PATCH 1/5] Add Snowflake batch feature --- .../snowflake/batch/SnowflakeInput.scala | 47 ++++++++++ .../snowflake/batch/SnowflakeOutput.scala | 57 ++++++++++++ .../snowflake/batch/SnowflakeInputTest.scala | 58 ++++++++++++ .../snowflake/batch/SnowflakeOutputTest.scala | 91 +++++++++++++++++++ .../_data/config/pipes/snowflake/batch.yaml | 12 +++ .../_data/config/pipes/snowflake/common.yaml | 23 +++++ 6 files changed, 288 insertions(+) create mode 100644 core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala create mode 100644 core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala create mode 100644 core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala create mode 100644 core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala create mode 100644 docs/content/_data/config/pipes/snowflake/batch.yaml create mode 100644 docs/content/_data/config/pipes/snowflake/common.yaml diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala new file mode 100644 index 0000000..0916c9b --- /dev/null +++ b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala @@ -0,0 +1,47 @@ +package com.amadeus.dataio.pipes.snowflake.batch + +import com.amadeus.dataio.core.{Input, Logging} +import com.amadeus.dataio.config.fields._ +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.spark.sql.{DataFrame, SparkSession} + +/** + * Class for reading Snowflake input + * + * @param options the snowflake connector options. + * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. + */ +case class SnowflakeInput( + options: Map[String, String], + config: Config = ConfigFactory.empty() + ) extends Input + with Logging { + + val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake" + + /** + * Reads a batch of data from snowflake. + * + * @param spark The SparkSession which will be used to read the data. + * @return The data that was read. + * @throws Exception If the exactly one of the dateRange/dateColumn fields is None. + */ + override def read(implicit spark: SparkSession): DataFrame = { + spark.read.format(SNOWFLAKE_CONNECTOR_NAME).options(options).load() + } + +} + +object SnowflakeInput { + + /** + * Creates a new instance of SnowflakeInput from a typesafe Config object. + * + * @param config typesafe Config object containing the configuration fields. + * @return a new SnowflakeInput object. + * @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument. + */ + def apply(implicit config: Config): SnowflakeInput = { + SnowflakeInput(options = getOptions , config = config) + } +} \ No newline at end of file diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala new file mode 100644 index 0000000..248ddaf --- /dev/null +++ b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala @@ -0,0 +1,57 @@ +package com.amadeus.dataio.pipes.snowflake.batch + +import com.amadeus.dataio.config.fields._ +import com.amadeus.dataio.core.{Logging, Output} +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.spark.sql.{Dataset, SparkSession} + +/** + * Allows to write data to Snowflake. + * + * @param mode the mode to use. + * @param options the snowflake connector options. + * @param config the config object. + */ +case class SnowflakeOutput( + mode: String, + options: Map[String, String], + config: Config = ConfigFactory.empty() + ) extends Output + with Logging { + + val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake" + + + /** + * Writes data to this output. + * + * @param data The data to write. + * @param spark The SparkSession which will be used to write the data. + */ + override def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = { + + data.write + .format(SNOWFLAKE_CONNECTOR_NAME) + .options(options) + .mode(mode) + .save() + } +} + +object SnowflakeOutput { + + /** + * Creates a new instance of SnowflakeOutput from a typesafe Config object. + * + * @param config typesafe Config object containing the configuration fields. + * @return a new SnowflakeOutput object. + * @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument. + */ + def apply(implicit config: Config): SnowflakeOutput = { + + val mode = config.getString("Mode") + + SnowflakeOutput(mode = mode, options = getOptions, config = config) + } + +} \ No newline at end of file diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala new file mode 100644 index 0000000..57aec50 --- /dev/null +++ b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala @@ -0,0 +1,58 @@ +package com.amadeus.dataio.pipes.snowflake.batch + +import com.amadeus.dataio.testutils.JavaImplicitConverters._ +import com.typesafe.config.{ConfigException, ConfigFactory} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class SnowflakeInputTest extends AnyWordSpec with Matchers { + + "SnowflakeInput" should { + "be initialized according to configuration" in { + val config = ConfigFactory.parseMap( + Map( + "Input" -> Map( + "Name" -> "my-test-snowflake", + "Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput", + "Options" -> Map( + "sfDatabase" -> "TESTDATABASE", + "sfSchema" -> "TESTSCHEMA", + "sfUser" -> "TESTUSER", + "sfUrl" -> "snowflake.url.com", + "dbtable" -> "TESTTABLE" + ) + ) + ) + ) + + val snowflakeInputObj = SnowflakeInput.apply(config.getConfig("Input")) + + val expectedSnowflakeOptions = Map( + "sfDatabase" -> "TESTDATABASE", + "sfSchema" -> "TESTSCHEMA", + "sfUser" -> "TESTUSER", + "sfUrl" -> "snowflake.url.com", + "dbtable" -> "TESTTABLE" + ) + + snowflakeInputObj.options shouldEqual expectedSnowflakeOptions + } + } + + "be initialized according to configuration" in { + val config = ConfigFactory.parseMap( + Map( + "Input" -> Map( + "Name" -> "my-test-snowflake", + "Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput", + ) + ) + ) + + def snowflakeInputObj = SnowflakeInput.apply(config.getConfig("Input")) + + intercept[ConfigException](snowflakeInputObj) + + } + +} \ No newline at end of file diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala new file mode 100644 index 0000000..7e84950 --- /dev/null +++ b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala @@ -0,0 +1,91 @@ +package com.amadeus.dataio.pipes.snowflake.batch + +import com.amadeus.dataio.testutils.JavaImplicitConverters._ +import com.typesafe.config.{ConfigException, ConfigFactory} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + + +class SnowflakeOutputTest extends AnyWordSpec with Matchers { + + "SnowflakeOutput" should { + "be initialized according to configuration" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Mode" -> "append", + "Options" -> Map( + "dbTable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "warehouse", + "column_mapping" -> "name" + ) + ) + ) + ) + + val snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + + val expectedSnowflakeOptions = Map( + "dbTable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "warehouse", + "column_mapping" -> "name" + ) + + snowflakeStreamOutput.options shouldEqual expectedSnowflakeOptions + snowflakeStreamOutput.mode shouldEqual "append" + } + + "be throw an exception when mode is missing" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Options" -> Map( + "dbTable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "warehouse", + "column_mapping" -> "name" + ) + ) + ) + ) + + def snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + intercept[ConfigException](snowflakeStreamOutput) + } + + "throw an exception when options is missing" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + ) + ) + ) + + def snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + intercept[ConfigException](snowflakeStreamOutput) + } + + + } + +} \ No newline at end of file diff --git a/docs/content/_data/config/pipes/snowflake/batch.yaml b/docs/content/_data/config/pipes/snowflake/batch.yaml new file mode 100644 index 0000000..c14089b --- /dev/null +++ b/docs/content/_data/config/pipes/snowflake/batch.yaml @@ -0,0 +1,12 @@ +input: + type: com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput + + +output: + type: com.amadeus.dataio.pipes.snowflake.batch.SnowflakeOutput + fields: + - name: Mode + mandatory: Yes + description: Writing mode on the Snowflake table + example: Mode = "append" + diff --git a/docs/content/_data/config/pipes/snowflake/common.yaml b/docs/content/_data/config/pipes/snowflake/common.yaml new file mode 100644 index 0000000..716ad99 --- /dev/null +++ b/docs/content/_data/config/pipes/snowflake/common.yaml @@ -0,0 +1,23 @@ +name: Snowflake +description: Allows the connection to Snowflake to automatically retrieve from or publish data to a Snowflake table. + +code_directory: snowflake + +links: + - name: Spark Snowflake API documentation + url: https://docs.snowflake.com/en/user-guide/spark-connector-use#using-the-connector-in-scala + +fields: + - name: Options + description: Snowflake options to specify such as key = value pairs. These options are then passed as option to the Spark connector for Snowflake + example: Options { Database = "example_database" + Schema = "example_schema" + Table = "example_table" + Mode = "append" + User = "username" + Url = "account_address" + pem_private_key = "private_key" + sfWarehouse = "warehouse" + sfRole = "role" + column_mapping = "name" + } From 26733606a8583d8835384337c68f8ddfc9d29113 Mon Sep 17 00:00:00 2001 From: "Sofiane ALI (contractor)" Date: Tue, 16 Apr 2024 13:38:22 +0200 Subject: [PATCH 2/5] Add Snowflake Streaming output class --- .../snowflake/streaming/SnowflakeOutput.scala | 137 ++++++++++++++++ .../streaming/SnowflakeOutputTest.scala | 146 ++++++++++++++++++ 2 files changed, 283 insertions(+) create mode 100644 core/src/main/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutput.scala create mode 100644 core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutput.scala new file mode 100644 index 0000000..52dd4e1 --- /dev/null +++ b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutput.scala @@ -0,0 +1,137 @@ +package com.amadeus.dataio.pipes.snowflake.streaming + +import com.amadeus.dataio.core.{Logging, Output} +import org.apache.spark.sql.streaming.Trigger +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.spark.sql.functions.current_timestamp +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import scala.util.Try + +/** + * Allows to write stream data to Snowflake. + * + * \!/ It uses the snowflake connector and therefore guarantees only at least once delivery. \!/ + * + * @param trigger the trigger to be used for the streaming query. + * @param timeout the streaming query timeout. + * @param mode the mode to use. + * @param options the snowflake connector options. + * @param addTimestampOnInsert if true, a timestamp column wit the current timestamp will be added to the data. + * @param config the config object. + * @param outputName the output name used to define the streaming query name. + */ +case class SnowflakeOutput( + timeout: Long, + trigger: Option[Trigger], + mode: String, + options: Map[String, String], + addTimestampOnInsert: Boolean, + config: Config = ConfigFactory.empty(), + outputName: Option[String] +) extends Output + with Logging { + + val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake" + + /** + * Writes data to this output. + * + * @param data The data to write. + * @param spark The SparkSession which will be used to write the data. + */ + def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = { + + val dbTable: Option[String] = options.get("dbtable") + dbTable.foreach(table => logger.info(s"Writing dataframe to snowflake table ${table}")) + trigger.foreach(trigger => logger.info(s"Using trigger ${trigger}")) + + logger.info(s"Add timestamp on insert: $addTimestampOnInsert") + + val queryName = createQueryName() + + var streamWriter = data.writeStream.queryName(queryName) + + streamWriter = trigger match { + case Some(trigger) => streamWriter.trigger(trigger) + case _ => streamWriter + } + + streamWriter.foreachBatch((batchDF: Dataset[T], _: Long) => { + addTimestampOnInsert(batchDF).write + .format(SNOWFLAKE_CONNECTOR_NAME) + .options(options) + .mode(mode) + .save() + }) + + val streamingQuery = streamWriter.start() + + streamingQuery.awaitTermination(timeout) + streamingQuery.stop() + } + + /** + * Add current timestamp to the data if needed. + * + * @param data the dataset to enrich. + * @tparam T the dataset type. + * @return the dataset with the timestamp column if needed. + */ + private def addTimestampOnInsert[T](data: Dataset[T]): DataFrame = { + if (addTimestampOnInsert) { + data.withColumn("timestamp", current_timestamp()) + } else { + data.toDF() + } + } + + /** + * Create a unique query name based on output topic. + * + * @return a unique query name. + */ + private[streaming] def createQueryName(): String = { + + val dbTable: Option[String] = options.get("dbtable") + + (outputName, dbTable) match { + case (Some(name), Some(table)) => s"QN_${name}_${table}_${java.util.UUID.randomUUID}" + case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}" + case (None, Some(table)) => s"QN_${table}_${java.util.UUID.randomUUID}" + case _ => s"QN_${java.util.UUID.randomUUID}" + } + + } +} + +object SnowflakeOutput { + import com.amadeus.dataio.config.fields._ + + /** + * Creates a new instance of SnowflakeOutput from a typesafe Config object. + * + * @param config typesafe Config object containing the configuration fields. + * @return a new SnowflakeOutput object. + * @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument. + */ + def apply(implicit config: Config): SnowflakeOutput = { + + val mode = config.getString("Mode") + val addTimestampOnInsert = Try(config.getBoolean("AddTimestampOnInsert")).getOrElse(false) + + val duration = Try(config.getString("Duration")).toOption + val trigger = duration.map(Trigger.ProcessingTime) + + val name = Try(config.getString("Name")).toOption + + SnowflakeOutput( + timeout = getTimeout, + trigger = trigger, + mode = mode, + options = getOptions, + addTimestampOnInsert = addTimestampOnInsert, + config = config, + outputName = name + ) + } +} diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala new file mode 100644 index 0000000..685f8d1 --- /dev/null +++ b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala @@ -0,0 +1,146 @@ +package com.amadeus.dataio.pipes.snowflake.streaming + +import com.amadeus.dataio.testutils.JavaImplicitConverters._ +import com.typesafe.config.ConfigFactory +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import org.apache.spark.sql.streaming.Trigger + +import scala.concurrent.duration.Duration + +class SnowflakeOutputTest extends AnyWordSpec with Matchers { + + "SnowflakeOutput" should { + "be initialized according to configuration" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Mode" -> "append", + "Duration" -> "60 seconds", + "Timeout" -> "24", + "Options" -> Map( + "dbtable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester" + ) + ) + ) + ) + + val snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + + val expectedSnowflakeOptions = Map( + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester", + "dbtable" -> "test-table" + ) + + snowflakeStreamOutput.outputName shouldEqual Some("my-test-snowflake") + snowflakeStreamOutput.options shouldEqual expectedSnowflakeOptions + snowflakeStreamOutput.mode shouldEqual "append" + snowflakeStreamOutput.addTimestampOnInsert shouldEqual false + snowflakeStreamOutput.trigger shouldEqual Some(Trigger.ProcessingTime(Duration("60 seconds"))) + snowflakeStreamOutput.timeout shouldEqual 24 * 60 * 60 * 1000 + } + + "be initialized according to configuration with addTimestampOnInsert to true" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Mode" -> "append", + "Duration" -> "60 seconds", + "Timeout" -> "24", + "AddTimestampOnInsert" -> true, + "Options" -> Map( + "dbtable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester" + ) + ) + ) + ) + + val snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + + val expectedSnowflakeOptions = Map( + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester", + "dbtable" -> "test-table" + ) + + snowflakeStreamOutput.outputName shouldEqual Some("my-test-snowflake") + snowflakeStreamOutput.options shouldEqual expectedSnowflakeOptions + snowflakeStreamOutput.mode shouldEqual "append" + snowflakeStreamOutput.addTimestampOnInsert shouldEqual true + snowflakeStreamOutput.trigger shouldEqual Some(Trigger.ProcessingTime(Duration("60 seconds"))) + snowflakeStreamOutput.timeout shouldEqual 24 * 60 * 60 * 1000 + } + + "be initialized according to configuration with addTimestampOnInsert to false" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Mode" -> "append", + "Duration" -> "60 seconds", + "Timeout" -> "24", + "AddTimestampOnInsert" -> false, + "Options" -> Map( + "dbtable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester" + ) + ) + ) + ) + + val snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + + val expectedSnowflakeOptions = Map( + "dbtable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester" + ) + + snowflakeStreamOutput.outputName shouldEqual Some("my-test-snowflake") + snowflakeStreamOutput.options shouldEqual expectedSnowflakeOptions + snowflakeStreamOutput.mode shouldEqual "append" + snowflakeStreamOutput.addTimestampOnInsert shouldEqual false + snowflakeStreamOutput.trigger shouldEqual Some(Trigger.ProcessingTime(Duration("60 seconds"))) + snowflakeStreamOutput.timeout shouldEqual 24 * 60 * 60 * 1000 + } + + } +} From 18f8c1ac77f36878017340796ae04fcc91d8264c Mon Sep 17 00:00:00 2001 From: "Sofiane ALI (contractor)" Date: Tue, 16 Apr 2024 14:14:16 +0200 Subject: [PATCH 3/5] Add documentation for Snowflake Streaming --- .../_data/config/pipes/snowflake/common.yaml | 9 +------- .../config/pipes/snowflake/streaming.yaml | 21 +++++++++++++++++++ docs/content/configuration/pipes/snowflake.md | 10 +++++++++ 3 files changed, 32 insertions(+), 8 deletions(-) create mode 100644 docs/content/_data/config/pipes/snowflake/streaming.yaml create mode 100644 docs/content/configuration/pipes/snowflake.md diff --git a/docs/content/_data/config/pipes/snowflake/common.yaml b/docs/content/_data/config/pipes/snowflake/common.yaml index 716ad99..9fd9a8a 100644 --- a/docs/content/_data/config/pipes/snowflake/common.yaml +++ b/docs/content/_data/config/pipes/snowflake/common.yaml @@ -12,12 +12,5 @@ fields: description: Snowflake options to specify such as key = value pairs. These options are then passed as option to the Spark connector for Snowflake example: Options { Database = "example_database" Schema = "example_schema" - Table = "example_table" - Mode = "append" - User = "username" - Url = "account_address" - pem_private_key = "private_key" - sfWarehouse = "warehouse" - sfRole = "role" - column_mapping = "name" + ... } diff --git a/docs/content/_data/config/pipes/snowflake/streaming.yaml b/docs/content/_data/config/pipes/snowflake/streaming.yaml new file mode 100644 index 0000000..3b9c47d --- /dev/null +++ b/docs/content/_data/config/pipes/snowflake/streaming.yaml @@ -0,0 +1,21 @@ +output: + type: com.amadeus.dataio.pipes.snowflake.streaming.SnowflakeOutput + fields: + - name: Duration + mandatory: "No" + description: Sets the trigger for the stream query. Controls the trigger() Spark function. + example: Duration = "60 seconds" + - name: Timeout + mandatory: "Yes" + description: Controls the amount of time before returning from the streaming query, in hours. It can be a String or an Int. + example: Timeout = 24 + - name: Mode + mandatory: "Yes" + description: The Spark Structured Streaming output mode. + example: Mode = "complete" + default: append + - name: AddTimestampOnInsert + mandatory: "No" + description: Add a column named "timestamp" containing the current timestamp at the start of query evaluation + example: AddTimestampOnInsert = "true" + default: false diff --git a/docs/content/configuration/pipes/snowflake.md b/docs/content/configuration/pipes/snowflake.md new file mode 100644 index 0000000..42c5a55 --- /dev/null +++ b/docs/content/configuration/pipes/snowflake.md @@ -0,0 +1,10 @@ +--- +title: Snowflake +layout: default +grand_parent: Configuration +parent: Pipes +--- + +# Snowflake +{% include pipe_description.md pipe=site.data.config.pipes.snowflake %} + From 043586d26229f63789c6a188536210a8bc4fbf71 Mon Sep 17 00:00:00 2001 From: "Sofiane ALI (contractor)" Date: Tue, 16 Apr 2024 15:38:36 +0200 Subject: [PATCH 4/5] Reformat Snowflake files --- .../snowflake/batch/SnowflakeInput.scala | 14 ++--- .../snowflake/batch/SnowflakeOutput.scala | 13 ++-- .../snowflake/batch/SnowflakeInputTest.scala | 12 ++-- .../snowflake/batch/SnowflakeOutputTest.scala | 62 +++++++++---------- .../streaming/SnowflakeOutputTest.scala | 32 +++++----- 5 files changed, 65 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala index 0916c9b..26b1f97 100644 --- a/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala +++ b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala @@ -12,10 +12,10 @@ import org.apache.spark.sql.{DataFrame, SparkSession} * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. */ case class SnowflakeInput( - options: Map[String, String], - config: Config = ConfigFactory.empty() - ) extends Input - with Logging { + options: Map[String, String], + config: Config = ConfigFactory.empty() +) extends Input + with Logging { val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake" @@ -27,7 +27,7 @@ case class SnowflakeInput( * @throws Exception If the exactly one of the dateRange/dateColumn fields is None. */ override def read(implicit spark: SparkSession): DataFrame = { - spark.read.format(SNOWFLAKE_CONNECTOR_NAME).options(options).load() + spark.read.format(SNOWFLAKE_CONNECTOR_NAME).options(options).load() } } @@ -42,6 +42,6 @@ object SnowflakeInput { * @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument. */ def apply(implicit config: Config): SnowflakeInput = { - SnowflakeInput(options = getOptions , config = config) + SnowflakeInput(options = getOptions, config = config) } -} \ No newline at end of file +} diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala index 248ddaf..b5257ec 100644 --- a/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala +++ b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala @@ -13,15 +13,14 @@ import org.apache.spark.sql.{Dataset, SparkSession} * @param config the config object. */ case class SnowflakeOutput( - mode: String, - options: Map[String, String], - config: Config = ConfigFactory.empty() - ) extends Output - with Logging { + mode: String, + options: Map[String, String], + config: Config = ConfigFactory.empty() +) extends Output + with Logging { val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake" - /** * Writes data to this output. * @@ -54,4 +53,4 @@ object SnowflakeOutput { SnowflakeOutput(mode = mode, options = getOptions, config = config) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala index 57aec50..f8d1a93 100644 --- a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala +++ b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala @@ -12,9 +12,9 @@ class SnowflakeInputTest extends AnyWordSpec with Matchers { val config = ConfigFactory.parseMap( Map( "Input" -> Map( - "Name" -> "my-test-snowflake", - "Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput", - "Options" -> Map( + "Name" -> "my-test-snowflake", + "Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput", + "Options" -> Map( "sfDatabase" -> "TESTDATABASE", "sfSchema" -> "TESTSCHEMA", "sfUser" -> "TESTUSER", @@ -43,8 +43,8 @@ class SnowflakeInputTest extends AnyWordSpec with Matchers { val config = ConfigFactory.parseMap( Map( "Input" -> Map( - "Name" -> "my-test-snowflake", - "Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput", + "Name" -> "my-test-snowflake", + "Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput" ) ) ) @@ -55,4 +55,4 @@ class SnowflakeInputTest extends AnyWordSpec with Matchers { } -} \ No newline at end of file +} diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala index 7e84950..a54a81c 100644 --- a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala +++ b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala @@ -5,7 +5,6 @@ import com.typesafe.config.{ConfigException, ConfigFactory} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec - class SnowflakeOutputTest extends AnyWordSpec with Matchers { "SnowflakeOutput" should { @@ -14,16 +13,16 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers { val config = ConfigFactory.parseMap( Map( "Output" -> Map( - "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", - "Name" -> "my-test-snowflake", - "Mode" -> "append", - "Options" -> Map( - "dbTable" -> "test-table", - "sfUrl" -> "http://snowflake.com", - "sfUser" -> "my-user", - "sfDatabase" -> "db", - "sfSchema" -> "test-schema", - "sfWarehouse" -> "warehouse", + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Mode" -> "append", + "Options" -> Map( + "dbTable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "warehouse", "column_mapping" -> "name" ) ) @@ -33,12 +32,12 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers { val snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) val expectedSnowflakeOptions = Map( - "dbTable" -> "test-table", - "sfUrl" -> "http://snowflake.com", - "sfUser" -> "my-user", - "sfDatabase" -> "db", - "sfSchema" -> "test-schema", - "sfWarehouse" -> "warehouse", + "dbTable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "warehouse", "column_mapping" -> "name" ) @@ -50,16 +49,16 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers { val config = ConfigFactory.parseMap( Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", - "Name" -> "my-test-snowflake", - "Options" -> Map( - "dbTable" -> "test-table", - "sfUrl" -> "http://snowflake.com", - "sfUser" -> "my-user", - "sfDatabase" -> "db", - "sfSchema" -> "test-schema", - "sfWarehouse" -> "warehouse", + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Options" -> Map( + "dbTable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "warehouse", "column_mapping" -> "name" ) ) @@ -74,9 +73,9 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers { val config = ConfigFactory.parseMap( Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", - "Name" -> "my-test-snowflake", + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake" ) ) ) @@ -85,7 +84,6 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers { intercept[ConfigException](snowflakeStreamOutput) } - } -} \ No newline at end of file +} diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala index 685f8d1..7448b85 100644 --- a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala +++ b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala @@ -23,10 +23,10 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers { "Timeout" -> "24", "Options" -> Map( "dbtable" -> "test-table", - "sfUrl" -> "http://snowflake.com", - "sfUser" -> "my-user", - "sfDatabase" -> "db", - "sfSchema" -> "test-schema", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", "sfWarehouse" -> "test-warehouse", "sfRole" -> "tester" ) @@ -67,10 +67,10 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers { "AddTimestampOnInsert" -> true, "Options" -> Map( "dbtable" -> "test-table", - "sfUrl" -> "http://snowflake.com", - "sfUser" -> "my-user", - "sfDatabase" -> "db", - "sfSchema" -> "test-schema", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", "sfWarehouse" -> "test-warehouse", "sfRole" -> "tester" ) @@ -111,10 +111,10 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers { "AddTimestampOnInsert" -> false, "Options" -> Map( "dbtable" -> "test-table", - "sfUrl" -> "http://snowflake.com", - "sfUser" -> "my-user", - "sfDatabase" -> "db", - "sfSchema" -> "test-schema", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", "sfWarehouse" -> "test-warehouse", "sfRole" -> "tester" ) @@ -126,10 +126,10 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers { val expectedSnowflakeOptions = Map( "dbtable" -> "test-table", - "sfUrl" -> "http://snowflake.com", - "sfUser" -> "my-user", - "sfDatabase" -> "db", - "sfSchema" -> "test-schema", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", "sfWarehouse" -> "test-warehouse", "sfRole" -> "tester" ) From 4f5a703c2ece02189ab31aa9c149506c2bced78a Mon Sep 17 00:00:00 2001 From: "Sofiane ALI (contractor)" Date: Thu, 25 Apr 2024 16:18:16 +0200 Subject: [PATCH 5/5] Add Spark-snowflake connector as dependency --- build.sbt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index daabe0c..7a9f208 100644 --- a/build.sbt +++ b/build.sbt @@ -16,7 +16,8 @@ ThisBuild / libraryDependencies ++= Seq( // Spark "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion.value, "org.apache.spark" %% "spark-sql" % sparkVersion.value, - "org.apache.spark" %% "spark-core" % sparkVersion.value + "org.apache.spark" %% "spark-core" % sparkVersion.value, + "net.snowflake" %% "spark-snowflake" % f"2.15.0-spark_3.4" ) // Tests configuration