Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Snowflake Plugin #818

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

sethjones348
Copy link

Overview

This PR adds support for snowflake target sources when using the spline-spark-agent in conjunction with the snowflake spark connector.

Copy link
Contributor

@wajda wajda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sethjones348 Thank you for your PR! We'll happily accept it, but there are a couple of things that needs to be polished. Please read the comments below.

@@ -0,0 +1,58 @@
package za.co.absa.spline.harvester.plugin.embedded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing copyright/license header. That's the reason why all test builds fail.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this!

val schema: String = cmd.options("sfSchema")
val table: String = cmd.options("dbtable")

WriteNodeInfo(asSourceId(url, warehouse, database, schema, table), cmd.mode, cmd.query, cmd.options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This captures a WRITE operation, but what about a READ one?
Is DataSourceV2 supported by the Snowflake connector?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know have an explicit answer to your question. However, I did some digging on capturing READ, and I wasn't able to find a way to capture the data source URI given the underlying net.snowflake.spark.snowflake.SnowflakeRelation that gets matched on when the logical plan includes reading from a snowflake source.

This is what was available in that SnowflakeRelation object on a READ test I executed:

{
  "schema": {
    "fields": [
      {
        "name": "WORKORDERID",
        "type": "StringType",
        "nullable": true
      },
      {
        "name": "PRODUCTID",
        "type": "StringType",
        "nullable": true
      },
      {
        "name": "ORDERQTY",
        "type": "StringType",
        "nullable": true
      },
      {
        "name": "SCRAPPEDQTY",
        "type": "StringType",
        "nullable": true
      },
      {
        "name": "STARTDATE",
        "type": "StringType",
        "nullable": true
      },
      {
        "name": "ENDDATE",
        "type": "StringType",
        "nullable": true
      },
      {
        "name": "MODIFIEDDATE",
        "type": "StringType",
        "nullable": true
      },
      {
        "name": "BONUS_PERCENT",
        "type": "DoubleType",
        "nullable": true
      }
    ]
  },
  "scanMetrics": {
    "numBytesRead": {
      "id": 1,
      "name": "number of bytes read",
      "value": 7197975
    },
    "numRecordsRead": {
      "id": 2,
      "name": "number of records read",
      "value": 72591
    }
  },
  "writeMetrics": {
    "numBytesWritten": {
      "id": 3,
      "name": "number of bytes written",
      "value": 0
    },
    "numRecordsWritten": {
      "id": 4,
      "name": "number of records written",
      "value": 0
    }
  },
  "jdbcWrapper": "net.snowflake.spark.snowflake.DefaultJDBCWrapper$@2da464e2",
  "params": "Snowflake Data Source",
  "userSchema": "None",
  "sqlContext": "org.apache.spark.sql.SQLContext@415f672d",
  "log": "org.apache.logging.slf4j.Log4jLogger@10cc51c0",
  "pushdownStatement": "SELECT * FROM ( workorder_data_gold ) AS \"SF_CONNECTOR_QUERY_ALIAS\"",
  "bitmap": 7
}

The only data specific to the source table is that pushdownStatement which could be parsed to get the table name, but not enough for the datasource URI. With that in mind I decided to leave the READ capturing out of this PR.

Copy link
Contributor

@wajda wajda Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this is interesting:

"jdbcWrapper": "net.snowflake.spark.snowflake.DefaultJDBCWrapper$@2da464e2"

Is Snowflake connection just a wrapper over JDBC? Can you introspect that wrapper object? I believe there will be IPs, connection string etc.

Basically, what would be ideal to achieve (though not always possible unfortunately depending on the technology) is that the data source URI that comes out of the WRITE operation to match exactly the URL that comes out of the respective READ operation (the one that reads the logically same thing that was written by the write op).
The structure on that URL doesn't really matter as long as it resembles a URL and reliably identify the data entity that is being read or written. That helps Spline to connect execution plans based on the data source URL match, and build the end-to-end lineage view.

<dependency>
<groupId>net.snowflake</groupId>
<artifactId>spark-snowflake_${scala.binary.version}</artifactId>
<version>2.16.0-spark_3.3</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connector version 2.16.0-spark_3.3 specifically says it's compiled for the Spark 3.3. What about the other Spark versions? Will the plugin still be binary compatible?

import za.co.absa.spline.harvester.builder.SourceIdentifier
import org.mockito.Mockito._

class SnowflakePluginSpec extends AnyFlatSpec with Matchers with MockitoSugar {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility to add end-to-end integration tests to test compatibility with the real Spark?
Like, for example a Snowflake docker container or maybe official mock/test library?

@sethjones348 sethjones348 force-pushed the feature/add-snowflake-plugin branch from 133f2c1 to 4495e93 Compare July 17, 2024 20:10
@sethjones348 sethjones348 marked this pull request as draft July 17, 2024 20:11
@sethjones348
Copy link
Author

@wajda I implemented some of your suggestions. Turns out the read operation was not hard to capture after all! I also added an integration test, but I've marked the PR as draft as its not ready to merge yet. I have a couple of questions for ya.

  1. Snowflake connector is dependent on spark 3.2, 3.3, or 3.4. I have written an integration test, but if I update the value of spark.version to spark-33.version I can't even attempt to run the test as I run into Slf4J binding errors:
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
24/07/17 16:03:48 INFO SparkContext: Running Spark version 3.3.1
24/07/17 16:03:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/17 16:03:48 INFO ResourceUtils: ==============================================================
24/07/17 16:03:48 INFO ResourceUtils: No custom resources configured for spark.driver.
24/07/17 16:03:48 INFO ResourceUtils: ==============================================================
24/07/17 16:03:48 INFO SparkContext: Submitted application: b911d908-3b0b-4a3d-9990-532586fee26c
24/07/17 16:03:48 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
24/07/17 16:03:48 INFO ResourceProfile: Limiting resource is cpu
24/07/17 16:03:48 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/07/17 16:03:48 INFO SecurityManager: Changing view acls to: sethjones
24/07/17 16:03:48 INFO SecurityManager: Changing modify acls to: sethjones
24/07/17 16:03:48 INFO SecurityManager: Changing view acls groups to: 
24/07/17 16:03:48 INFO SecurityManager: Changing modify acls groups to: 
24/07/17 16:03:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(sethjones); groups with view permissions: Set(); users  with modify permissions: Set(sethjones); groups with modify permissions: Set()


An exception or error caused a run to abort: 'void io.netty.buffer.PooledByteBufAllocator.<init>(boolean, int, int, int, int, int, int, boolean)' 
java.lang.NoSuchMethodError: 'void io.netty.buffer.PooledByteBufAllocator.<init>(boolean, int, int, int, int, int, int, boolean)'
	at org.apache.spark.network.util.NettyUtils.createPooledByteBufAllocator(NettyUtils.java:171)
	at org.apache.spark.network.util.NettyUtils.getSharedPooledByteBufAllocator(NettyUtils.java:142)
	at org.apache.spark.network.client.TransportClientFactory.<init>(TransportClientFactory.java:111)
	at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:144)
	at org.apache.spark.rpc.netty.NettyRpcEnv.<init>(NettyRpcEnv.scala:77)
	at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:492)
	at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:58)
	at org.apache.spark.SparkEnv$.create(SparkEnv.scala:271)
	at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:194)
	at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:279)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:464)

I thought this was a little strange being that I see other tests where it is tagged as ignore if < spark 3.0 or > spark 3.3, which tells me there is a way to run the integration tests at a spark 3.3.x version. Any insight into why this is happening or just instructions on how to run an integration test at a specific spark version other than updating the <spark.version> mvn variable? I believe that my test will work, but am blocked from testing until I figure this error out.

  1. I am using this Snowflake plugin within Databricks. I noticed that if I build the spark spline agent bundle JAR file on the latest develop branch, my compute clusters enter a crashback loop if I try and do any spark operations. I saw an error related to potentially conflicting scala versions. When I build the JAR off of release/2.0, I can run the databricks notebooks just fine and the spline spark agent works as expected. Any ideas as to why this is happening?

Running the databricks runtime associated with scala 2.12 and spark 3.3.x.

Any insight would be greatly appreciated! Thanks!

@wajda
Copy link
Contributor

wajda commented Jul 19, 2024

1 ... Any insight into why this is happening or just instructions on how to run an integration test at a specific spark version other than updating the <spark.version> mvn variable?

To run integration tests at a specific Spark version activate a corresponding spark-x.y Maven profile:

$ mvn ... -Pspark-3.3

2 ... I noticed that if I build the spark spline agent bundle JAR file on the latest develop branch, my compute clusters enter a crashback loop if I try and do any spark operations. I saw an error related to potentially conflicting scala versions. When I build the JAR off of release/2.0, I can run the databricks notebooks just fine and the spline spark agent works as expected. Any ideas as to why this is happening?

That's interesting, but it's difficult to tell without looking at detailed logs.
There were quite a few changes since the 2.0.x version. The latest release is 2.1.0 that comes with the Spark 3.4 support. Did you try that one?
The difference between release/2.1 and develop is very minor on the other hand. It's just this - a6f63b1

3 ... Failing auto builds

Did you check them?

The ones for Scala_2.12 fail because they can't connect to the Snowflake instance. Perhaps host naming env specific issue or alike. Can you take a look at it please?

The builds for Scala_2.11 fail because the given version of the Snowflake library doesn't exist for scala 2.11. You need to move Snowflake maven dependency under the the spark-x.y profiles and specify library version that corresponds the given profile.

@wajda wajda force-pushed the feature/add-snowflake-plugin branch from 4495e93 to 03e5e6d Compare September 19, 2024 10:40
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants