Skip to content

Commit

Permalink
[SPARK-49700][CONNECT][SQL] Unified Scala Interface for Connect and C…
Browse files Browse the repository at this point in the history
…lassic

### What changes were proposed in this pull request?
This PR makes the shared SQL (JVM) interface the primary interface for Scala/JVM based Dataframe programming.

The implementations are moved to the `classic` and `connect` sub packages.

The connect client had to be moved to the sql/connect/common package because serialization requires the captured client classes to be on the classpath when deserialized on the server.

### Why are the changes needed?
This is the final step in creating a unified Scala interface for both Classic and Connect.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48818 from hvanhovell/SPARK-49700.

Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
  • Loading branch information
hvanhovell committed Jan 24, 2025
1 parent a03c4cb commit 5db31ae
Show file tree
Hide file tree
Showing 399 changed files with 1,851 additions and 1,730 deletions.
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5277,6 +5277,11 @@
"Resilient Distributed Datasets (RDDs)."
]
},
"REGISTER_UDAF" : {
"message" : [
"Registering User Defined Aggregate Functions (UDAFs)."
]
},
"SESSION_BASE_RELATION_TO_DATAFRAME" : {
"message" : [
"Invoking SparkSession 'baseRelationToDataFrame'. This is server side developer API"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import ammonite.util.Util.newLine

import org.apache.spark.SparkBuildInfo.spark_version
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.withLocalConnectServer
import org.apache.spark.sql.connect.SparkSession
import org.apache.spark.sql.connect.SparkSession.withLocalConnectServer
import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkConnectClientParser}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.RowFactory.create;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.test.SparkConnectServerUtils;
import org.apache.spark.sql.connect.test.SparkConnectServerUtils;
import org.apache.spark.sql.types.StructType;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.sql

import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.{QueryTest, RemoteSparkSession}

class DataFrameSubquerySuite extends QueryTest with RemoteSparkSession {
import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.{QueryTest, RemoteSparkSession}

class DataFrameTableValuedFunctionsSuite extends QueryTest with RemoteSparkSession {
import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ import org.apache.spark.sql.avro.{functions => avroFn}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.connect.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.connect.ConnectConversions._
import org.apache.spark.sql.connect.client.SparkConnectClient
import org.apache.spark.sql.connect.test.{ConnectFunSuite, IntegrationTestUtils}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.protobuf.{functions => pbFn}
import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.SparkFileUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.sql.test.{QueryTest, RemoteSparkSession}
import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession}
import org.apache.spark.unsafe.types.VariantVal

class SQLExpressionsSuite extends QueryTest with RemoteSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.util.Properties
import org.apache.commons.io.output.ByteArrayOutputStream
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession}
import org.apache.spark.sql.connect.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession}
import org.apache.spark.tags.AmmoniteTest
import org.apache.spark.util.IvyTestUtils
import org.apache.spark.util.MavenUtils.MavenCoordinate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
* limitations under the License.
*/

package org.apache.spark.sql
package org.apache.spark.sql.connect

import java.io.{File, FilenameFilter}

import org.apache.commons.io.FileUtils

import org.apache.spark.SparkException
import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.storage.StorageLevel

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package org.apache.spark.sql.connect

import java.io.{ByteArrayOutputStream, PrintStream}

Expand All @@ -26,7 +26,7 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException

import org.apache.spark.SparkException
import org.apache.spark.connect.proto
import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper}
import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper}
import org.apache.spark.storage.StorageLevel

class CheckpointSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
* limitations under the License.
*/

package org.apache.spark.sql
package org.apache.spark.sql.connect

import java.util.Random

import org.scalatest.matchers.must.Matchers._

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}

class ClientDataFrameStatSuite extends ConnectFunSuite with RemoteSparkSession {
private def toLetter(i: Int): String = (i + 97).toChar.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package org.apache.spark.sql.connect

import java.util.Properties
import java.util.concurrent.TimeUnit
Expand All @@ -25,9 +25,10 @@ import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.connect.proto
import org.apache.spark.sql.Column
import org.apache.spark.sql.connect.client.{DummySparkConnectService, SparkConnectClient}
import org.apache.spark.sql.connect.test.ConnectFunSuite
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.util.SparkSerDeUtils

// Add sample tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package org.apache.spark.sql.connect

import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.file.Files
Expand All @@ -34,15 +34,17 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.{SparkArithmeticException, SparkException, SparkUpgradeException}
import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.sql.{functions, AnalysisException, Observation, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, TableAlreadyExistsException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connect.ConnectConversions._
import org.apache.spark.sql.connect.client.{RetryPolicy, SparkConnectClient, SparkResult}
import org.apache.spark.sql.connect.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession, SQLHelper}
import org.apache.spark.sql.connect.test.SparkConnectServerUtils.port
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession, SQLHelper}
import org.apache.spark.sql.test.SparkConnectServerUtils.port
import org.apache.spark.sql.types._
import org.apache.spark.util.SparkThreadUtils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.internal
package org.apache.spark.sql.connect

import org.apache.spark.SparkException
import org.apache.spark.connect.proto
Expand All @@ -23,9 +23,10 @@ import org.apache.spark.sql.{Column, Encoder}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder}
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, ProtoDataTypes}
import org.apache.spark.sql.connect.test.ConnectFunSuite
import org.apache.spark.sql.expressions.{Aggregator, SparkUserDefinedFunction, UserDefinedAggregator}
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, LongType, MetadataBuilder, ShortType, StringType, StructType}
import org.apache.spark.sql.internal._
import org.apache.spark.sql.types._

/**
* Test suite for [[ColumnNode]] to [[proto.Expression]] conversions.
Expand Down Expand Up @@ -471,8 +472,8 @@ class ColumnNodeToProtoConverterSuite extends ConnectFunSuite {
}
}

private[internal] case class Nope(override val origin: Origin = CurrentOrigin.get)
private[connect] case class Nope(override val origin: Origin = CurrentOrigin.get)
extends ColumnNode {
override def sql: String = "nope"
override private[internal] def children: Seq[ColumnNodeLike] = Seq.empty
override def children: Seq[ColumnNodeLike] = Seq.empty
}
Loading

0 comments on commit 5db31ae

Please sign in to comment.