diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala index c06498684fa6f..fe9f6dc2b4a9a 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala @@ -18,359 +18,108 @@ package org.apache.spark.sql.connect.client import java.time.DateTimeException -import scala.jdk.CollectionConverters._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import com.google.rpc.ErrorInfo -import io.grpc.{ManagedChannel, StatusRuntimeException} +import io.grpc.StatusRuntimeException import io.grpc.protobuf.StatusProto -import org.json4s.{DefaultFormats, Formats} -import org.json4s.jackson.JsonMethods -import org.apache.spark.{QueryContext, QueryContextType, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException} -import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, SparkConnectServiceGrpc, UserContext} -import org.apache.spark.internal.Logging +import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.trees.Origin -import org.apache.spark.sql.streaming.StreamingQueryException -import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.JsonUtils -/** - * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions into Spark exceptions. - * It does so by utilizing the ErrorInfo defined in error_details.proto and making an additional - * FetchErrorDetails RPC call to retrieve the full error message and optionally the server-side - * stacktrace. - * - * If the FetchErrorDetails RPC call succeeds, the exceptions will be constructed based on the - * response. If the RPC call fails, the exception will be constructed based on the ErrorInfo. If - * the ErrorInfo is missing, the exception will be constructed based on the StatusRuntimeException - * itself. - */ -private[client] class GrpcExceptionConverter(channel: ManagedChannel) extends Logging { - import GrpcExceptionConverter._ - - val grpcStub = SparkConnectServiceGrpc.newBlockingStub(channel) - - def convert[T](sessionId: String, userContext: UserContext, clientType: String)(f: => T): T = { +private[client] object GrpcExceptionConverter extends JsonUtils { + def convert[T](f: => T): T = { try { f } catch { case e: StatusRuntimeException => - throw toThrowable(e, sessionId, userContext, clientType) + throw toThrowable(e) } } - def convertIterator[T]( - sessionId: String, - userContext: UserContext, - clientType: String, - iter: CloseableIterator[T]): CloseableIterator[T] = { + def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = { new WrappedCloseableIterator[T] { override def innerIterator: Iterator[T] = iter override def hasNext: Boolean = { - convert(sessionId, userContext, clientType) { + convert { iter.hasNext } } override def next(): T = { - convert(sessionId, userContext, clientType) { + convert { iter.next() } } override def close(): Unit = { - convert(sessionId, userContext, clientType) { + convert { iter.close() } } } } - /** - * Fetches enriched errors with full exception message and optionally stacktrace by issuing an - * additional RPC call to fetch error details. The RPC call is best-effort at-most-once. - */ - private def fetchEnrichedError( - info: ErrorInfo, - sessionId: String, - userContext: UserContext, - clientType: String): Option[Throwable] = { - val errorId = info.getMetadataOrDefault("errorId", null) - if (errorId == null) { - logWarning("Unable to fetch enriched error since errorId is missing") - return None - } - - try { - val errorDetailsResponse = grpcStub.fetchErrorDetails( - FetchErrorDetailsRequest - .newBuilder() - .setSessionId(sessionId) - .setErrorId(errorId) - .setUserContext(userContext) - .setClientType(clientType) - .build()) - - if (!errorDetailsResponse.hasRootErrorIdx) { - logWarning("Unable to fetch enriched error since error is not found") - return None - } - - Some( - errorsToThrowable( - errorDetailsResponse.getRootErrorIdx, - errorDetailsResponse.getErrorsList.asScala.toSeq)) - } catch { - case e: StatusRuntimeException => - logWarning("Unable to fetch enriched error", e) - None - } - } - - private def toThrowable( - ex: StatusRuntimeException, - sessionId: String, - userContext: UserContext, - clientType: String): Throwable = { - val status = StatusProto.fromThrowable(ex) - - // Extract the ErrorInfo from the StatusProto, if present. - val errorInfoOpt = status.getDetailsList.asScala - .find(_.is(classOf[ErrorInfo])) - .map(_.unpack(classOf[ErrorInfo])) - - if (errorInfoOpt.isDefined) { - // If ErrorInfo is found, try to fetch enriched error details by an additional RPC. - val enrichedErrorOpt = - fetchEnrichedError(errorInfoOpt.get, sessionId, userContext, clientType) - if (enrichedErrorOpt.isDefined) { - return enrichedErrorOpt.get - } - - // If fetching enriched error details fails, convert ErrorInfo to a Throwable. - // Unlike enriched errors above, the message from status may be truncated, - // and no cause exceptions or server-side stack traces will be reconstructed. - return errorInfoToThrowable(errorInfoOpt.get, status.getMessage) - } - - // If no ErrorInfo is found, create a SparkException based on the StatusRuntimeException. - new SparkException(ex.toString, ex.getCause) - } -} - -private[client] object GrpcExceptionConverter { - - private[client] case class ErrorParams( - message: String, - cause: Option[Throwable], - // errorClass will only be set if the error is SparkThrowable. - errorClass: Option[String], - // messageParameters will only be set if the error is both enriched and SparkThrowable. - messageParameters: Map[String, String], - // queryContext will only be set if the error is both enriched and SparkThrowable. - queryContext: Array[QueryContext]) - private def errorConstructor[T <: Throwable: ClassTag]( - throwableCtr: ErrorParams => T): (String, ErrorParams => Throwable) = { + throwableCtr: (String, Option[Throwable]) => T) + : (String, (String, Option[Throwable]) => Throwable) = { val className = implicitly[reflect.ClassTag[T]].runtimeClass.getName (className, throwableCtr) } - private[client] val errorFactory = Map( - errorConstructor(params => - new StreamingQueryException( - params.message, - params.cause.orNull, - params.errorClass.orNull, - params.messageParameters)), - errorConstructor(params => - new ParseException( - None, - Origin(), - Origin(), - errorClass = params.errorClass.orNull, - messageParameters = params.messageParameters, - queryContext = params.queryContext)), - errorConstructor(params => - new AnalysisException( - params.message, - cause = params.cause, - errorClass = params.errorClass, - messageParameters = params.messageParameters, - context = params.queryContext)), - errorConstructor(params => - new NamespaceAlreadyExistsException(params.errorClass.orNull, params.messageParameters)), - errorConstructor(params => - new TableAlreadyExistsException( - params.errorClass.orNull, - params.messageParameters, - params.cause)), - errorConstructor(params => - new TempTableAlreadyExistsException( - params.errorClass.orNull, - params.messageParameters, - params.cause)), - errorConstructor(params => - new NoSuchDatabaseException( - params.errorClass.orNull, - params.messageParameters, - params.cause)), - errorConstructor(params => - new NoSuchTableException(params.errorClass.orNull, params.messageParameters, params.cause)), - errorConstructor[NumberFormatException](params => - new SparkNumberFormatException( - params.message, - params.errorClass, - params.messageParameters, - params.queryContext)), - errorConstructor[IllegalArgumentException](params => - new SparkIllegalArgumentException( - params.message, - params.cause, - params.errorClass, - params.messageParameters, - params.queryContext)), - errorConstructor[ArithmeticException](params => - new SparkArithmeticException( - params.message, - params.errorClass, - params.messageParameters, - params.queryContext)), - errorConstructor[UnsupportedOperationException](params => - new SparkUnsupportedOperationException( - params.message, - params.errorClass, - params.messageParameters)), - errorConstructor[ArrayIndexOutOfBoundsException](params => - new SparkArrayIndexOutOfBoundsException( - params.message, - params.errorClass, - params.messageParameters, - params.queryContext)), - errorConstructor[DateTimeException](params => - new SparkDateTimeException( - params.message, - params.errorClass, - params.messageParameters, - params.queryContext)), - errorConstructor(params => - new SparkRuntimeException( - params.message, - params.cause, - params.errorClass, - params.messageParameters, - params.queryContext)), - errorConstructor(params => - new SparkUpgradeException( - params.message, - params.cause, - params.errorClass, - params.messageParameters)), - errorConstructor(params => - new SparkException( - message = params.message, - cause = params.cause.orNull, - errorClass = params.errorClass, - messageParameters = params.messageParameters, - context = params.queryContext))) - - /** - * errorsToThrowable reconstructs the exception based on a list of protobuf messages - * FetchErrorDetailsResponse.Error with un-truncated error messages and server-side stacktrace - * (if set). - */ - private def errorsToThrowable( - errorIdx: Int, - errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = { - - val error = errors(errorIdx) - val classHierarchy = error.getErrorTypeHierarchyList.asScala - - val constructor = - classHierarchy - .flatMap(errorFactory.get) - .headOption - .getOrElse((params: ErrorParams) => - errorFactory - .get(classOf[SparkException].getName) - .get(params.copy(message = s"${classHierarchy.head}: ${params.message}"))) - - val causeOpt = - if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx, errors)) else None - - val errorClass = if (error.hasSparkThrowable && error.getSparkThrowable.hasErrorClass) { - Some(error.getSparkThrowable.getErrorClass) - } else None - - val messageParameters = if (error.hasSparkThrowable) { - error.getSparkThrowable.getMessageParametersMap.asScala.toMap - } else Map.empty[String, String] + private val errorFactory = Map( + errorConstructor((message, _) => new ParseException(None, message, Origin(), Origin())), + errorConstructor((message, cause) => new AnalysisException(message, cause = cause)), + errorConstructor((message, _) => new NamespaceAlreadyExistsException(message)), + errorConstructor((message, cause) => new TableAlreadyExistsException(message, cause)), + errorConstructor((message, cause) => new TempTableAlreadyExistsException(message, cause)), + errorConstructor((message, cause) => new NoSuchDatabaseException(message, cause)), + errorConstructor((message, cause) => new NoSuchTableException(message, cause)), + errorConstructor[NumberFormatException]((message, _) => + new SparkNumberFormatException(message)), + errorConstructor[IllegalArgumentException]((message, cause) => + new SparkIllegalArgumentException(message, cause)), + errorConstructor[ArithmeticException]((message, _) => new SparkArithmeticException(message)), + errorConstructor[UnsupportedOperationException]((message, _) => + new SparkUnsupportedOperationException(message)), + errorConstructor[ArrayIndexOutOfBoundsException]((message, _) => + new SparkArrayIndexOutOfBoundsException(message)), + errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)), + errorConstructor((message, cause) => new SparkRuntimeException(message, cause)), + errorConstructor((message, cause) => new SparkUpgradeException(message, cause))) + + private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = { + val classes = + mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]]) - val queryContext = error.getSparkThrowable.getQueryContextsList.asScala.map { queryCtx => - new QueryContext { - override def contextType(): QueryContextType = queryCtx.getContextType match { - case FetchErrorDetailsResponse.QueryContext.ContextType.DATAFRAME => - QueryContextType.DataFrame - case _ => QueryContextType.SQL - } - override def objectType(): String = queryCtx.getObjectType - override def objectName(): String = queryCtx.getObjectName - override def startIndex(): Int = queryCtx.getStartIndex - override def stopIndex(): Int = queryCtx.getStopIndex - override def fragment(): String = queryCtx.getFragment - override def callSite(): String = queryCtx.getCallSite - override def summary(): String = queryCtx.getSummary + classes + .find(errorFactory.contains) + .map { cls => + val constructor = errorFactory.get(cls).get + constructor(message, None) } - }.toArray - - val exception = constructor( - ErrorParams( - message = error.getMessage, - cause = causeOpt, - errorClass = errorClass, - messageParameters = messageParameters, - queryContext = queryContext)) + } - if (!error.getStackTraceList.isEmpty) { - exception.setStackTrace(error.getStackTraceList.asScala.toArray.map { stackTraceElement => - new StackTraceElement( - stackTraceElement.getDeclaringClass, - stackTraceElement.getMethodName, - if (stackTraceElement.hasFileName) stackTraceElement.getFileName else null, - stackTraceElement.getLineNumber) - }) - } + private def toThrowable(ex: StatusRuntimeException): Throwable = { + val status = StatusProto.fromThrowable(ex) - exception - } + val fallbackEx = new SparkException(ex.toString, ex.getCause) - /** - * errorInfoToThrowable reconstructs the exception based on the error classes hierarchy and the - * truncated error message. - */ - private def errorInfoToThrowable(info: ErrorInfo, message: String): Throwable = { - implicit val formats: Formats = DefaultFormats - val classes = - JsonMethods.parse(info.getMetadataOrDefault("classes", "[]")).extract[Array[String]] - val errorClass = info.getMetadataOrDefault("errorClass", null) - val builder = FetchErrorDetailsResponse.Error - .newBuilder() - .setMessage(message) - .addAllErrorTypeHierarchy(classes.toImmutableArraySeq.asJava) + val errorInfoOpt = status.getDetailsList.asScala + .find(_.is(classOf[ErrorInfo])) - if (errorClass != null) { - builder.setSparkThrowable( - FetchErrorDetailsResponse.SparkThrowable - .newBuilder() - .setErrorClass(errorClass) - .build()) + if (errorInfoOpt.isEmpty) { + return fallbackEx } - errorsToThrowable(0, Seq(builder.build())) + errorInfoToThrowable(errorInfoOpt.get.unpack(classOf[ErrorInfo]), status.getMessage) + .getOrElse(fallbackEx) } } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 7f3eb0370a078..6dd5af2389a81 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -21,14 +21,14 @@ import scala.collection.mutable.HashMap import scala.util.control.NonFatal import org.apache.kafka.common.TopicPartition -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization /** * Utilities for converting Kafka related objects to and from json. */ private object JsonUtils { - private implicit val formats: Formats = Serialization.formats(NoTypeHints) + private implicit val formats = Serialization.formats(NoTypeHints) /** * Read TopicPartitions from json string @@ -96,8 +96,10 @@ private object JsonUtils { */ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { val result = new HashMap[String, HashMap[Int, Long]]() - implicit val order: Ordering[TopicPartition] = (x: TopicPartition, y: TopicPartition) => { - Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) + implicit val order = new Ordering[TopicPartition] { + override def compare(x: TopicPartition, y: TopicPartition): Int = { + Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) + } } val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism partitions.foreach { tp => diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index d6a50ff84f562..7209e2c373ab1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -30,7 +30,6 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.sys.process._ -import org.json4s.Formats import org.json4s.jackson.JsonMethods import org.apache.spark.{SparkConf, SparkContext} @@ -341,7 +340,7 @@ private object FaultToleranceTest extends App with Logging { private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File) extends Logging { - implicit val formats: Formats = org.json4s.DefaultFormats + implicit val formats = org.json4s.DefaultFormats var state: RecoveryState.Value = _ var liveWorkerIPs: List[String] = _ var numLiveApps = 0 @@ -384,7 +383,7 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile private class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File) extends Logging { - implicit val formats: Formats = org.json4s.DefaultFormats + implicit val formats = org.json4s.DefaultFormats logDebug("Created worker: " + this) diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala index 2e4e07b36cb64..641c5416cbb33 100644 --- a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala @@ -23,7 +23,7 @@ import java.nio.file.Files import scala.collection.mutable import scala.util.control.NonFatal -import org.json4s.{DefaultFormats, Extraction, Formats} +import org.json4s.{DefaultFormats, Extraction} import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.SparkException @@ -114,7 +114,7 @@ private[spark] object StandaloneResourceUtils extends Logging { private def writeResourceAllocationJson[T]( allocations: Seq[T], jsonFile: File): Unit = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val allocationJson = Extraction.decompose(allocations) Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes()) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index bd3927385564f..537522326fc78 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -27,6 +27,7 @@ import scala.util.{Failure, Success} import scala.util.control.NonFatal import io.netty.util.internal.PlatformDependent +import org.json4s.DefaultFormats import org.apache.spark._ import org.apache.spark.TaskState.TaskState @@ -59,6 +60,8 @@ private[spark] class CoarseGrainedExecutorBackend( import CoarseGrainedExecutorBackend._ + private implicit val formats = DefaultFormats + private[spark] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala index 603a89968b26a..7f7bb36512d14 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala @@ -19,7 +19,7 @@ package org.apache.spark.resource import scala.util.control.NonFatal -import org.json4s.{DefaultFormats, Extraction, Formats, JValue} +import org.json4s.{DefaultFormats, Extraction, JValue} import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkException @@ -69,7 +69,7 @@ private[spark] object ResourceInformation { * Parses a JSON string into a [[ResourceInformation]] instance. */ def parseJson(json: String): ResourceInformation = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats try { parse(json).extract[ResourceInformationJson].toResourceInformation } catch { @@ -80,7 +80,7 @@ private[spark] object ResourceInformation { } def parseJson(json: JValue): ResourceInformation = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats try { json.extract[ResourceInformationJson].toResourceInformation } catch { diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 095b015a28632..d19f413598b58 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -22,7 +22,7 @@ import java.util.Optional import scala.util.control.NonFatal -import org.json4s.{DefaultFormats, Formats} +import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SparkConf, SparkException} @@ -252,7 +252,7 @@ private[spark] object ResourceUtils extends Logging { def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = { withResourcesJson[ResourceAllocation](resourcesFile) { json => - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats parse(json).extract[Seq[ResourceAllocation]] } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala index 96dc5ac44b47a..d19744db089ba 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala @@ -31,7 +31,7 @@ private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { private[spark] class AppStatusSource extends Source { - override implicit val metricRegistry: MetricRegistry = new MetricRegistry() + override implicit val metricRegistry = new MetricRegistry() override val sourceName = "appStatus" diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index b4920c7cb841d..19de4544bea32 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -21,15 +21,15 @@ import java.io.IOException import java.util.{HashMap => JHashMap} import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, TimeoutException} -import scala.jdk.CollectionConverters._ +import scala.concurrent.{ExecutionContext, Future, TimeoutException} import scala.util.Random import scala.util.control.NonFatal import com.google.common.cache.CacheBuilder -import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED @@ -40,7 +40,6 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseG import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} -import org.apache.spark.util.ArrayImplicits._ /** * BlockManagerMasterEndpoint is an [[IsolatedThreadSafeRpcEndpoint]] on the master node to @@ -55,15 +54,10 @@ class BlockManagerMasterEndpoint( externalBlockStoreClient: Option[ExternalBlockStoreClient], blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo], mapOutputTracker: MapOutputTrackerMaster, - private val _shuffleManager: ShuffleManager, + shuffleManager: ShuffleManager, isDriver: Boolean) extends IsolatedThreadSafeRpcEndpoint with Logging { - // We initialize the ShuffleManager later in SparkContext and Executor, to allow - // user jars to define custom ShuffleManagers, as such `_shuffleManager` will be null here - // (except for tests) and we ask for the instance from the SparkEnv. - private lazy val shuffleManager = Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager) - // Mapping from executor id to the block manager's local disk directories. private val executorIdToLocalDirs = CacheBuilder @@ -100,8 +94,7 @@ class BlockManagerMasterEndpoint( private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100) - private implicit val askExecutionContext: ExecutionContextExecutorService = - ExecutionContext.fromExecutorService(askThreadPool) + private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool) private val topologyMapper = { val topologyMapperClassName = conf.get( @@ -884,7 +877,7 @@ class BlockManagerMasterEndpoint( private def getLocationsMultipleBlockIds( blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { - blockIds.map(blockId => getLocations(blockId)).toImmutableArraySeq + blockIds.map(blockId => getLocations(blockId)) } /** Get the list of the peers of the given block manager */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala index 5cc08714d41c1..476be80e67df3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} +import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{MapOutputTracker, SparkEnv} import org.apache.spark.internal.Logging @@ -38,8 +38,7 @@ class BlockManagerStorageEndpoint( private val asyncThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-storage-async-thread-pool", 100) - private implicit val asyncExecutionContext: ExecutionContextExecutorService = - ExecutionContext.fromExecutorService(asyncThreadPool) + private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool) // Operations that involve removing blocks may be slow and should be done asynchronously override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5addac7371925..2d3d6ec89ffbd 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -379,7 +379,7 @@ private[spark] object ThreadUtils { def parmap[I, O](in: Seq[I], prefix: String, maxThreads: Int)(f: I => O): Seq[O] = { val pool = newForkJoinPool(prefix, maxThreads) try { - implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(pool) + implicit val ec = ExecutionContext.fromExecutor(pool) val futures = in.map(x => Future(f(x))) val futureSeq = Future.sequence(futures) diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index ed67906a4f268..5434e82c95b1b 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.storage._ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager]) extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { - implicit val defaultTimeout: PatienceConfiguration.Timeout = timeout(10.seconds) + implicit val defaultTimeout = timeout(10.seconds) val conf = new SparkConf() .setMaster("local[2]") .setAppName("ContextCleanerSuite") diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index 75cebc90acba5..a07d4f76905a7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -23,7 +23,7 @@ import java.util.function.Supplier import scala.concurrent.duration._ -import org.json4s.{DefaultFormats, Extraction, Formats} +import org.json4s.{DefaultFormats, Extraction} import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Answers.RETURNS_SMART_NULLS import org.mockito.ArgumentMatchers.any @@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { } def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts) - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats private var _worker: Worker = _ diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 57d391b0cf063..909d605442575 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.concurrent.TrieMap import scala.concurrent.duration._ -import org.json4s.{DefaultFormats, Extraction, Formats} +import org.json4s.{DefaultFormats, Extraction} import org.json4s.JsonAST.{JArray, JObject} import org.json4s.JsonDSL._ import org.mockito.ArgumentMatchers.any @@ -50,7 +50,7 @@ import org.apache.spark.util.{SerializableBuffer, ThreadUtils, Utils} class CoarseGrainedExecutorBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats test("parsing no resources") { val conf = new SparkConf diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index a6f1707a1aabf..642216a7a471d 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.memory import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import org.mockito.ArgumentMatchers.{any, anyLong} @@ -148,7 +148,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite { // -- Tests of sharing of execution memory between tasks ---------------------------------------- // Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite. - implicit val ec: ExecutionContextExecutor = ExecutionContext.global + implicit val ec = ExecutionContext.global test("single task requesting on-heap execution memory") { val manager = createMemoryManager(1000L) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index f7c7ca2bd9365..f133a38269d71 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import java.util.Properties -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.language.implicitConversions import scala.reflect.ClassTag @@ -31,7 +31,7 @@ import org.apache.spark.util.ThreadUtils class BlockInfoManagerSuite extends SparkFunSuite { - private implicit val ec: ExecutionContextExecutor = ExecutionContext.global + private implicit val ec = ExecutionContext.global private var blockInfoManager: BlockInfoManager = _ override protected def beforeEach(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 32972e860275a..79496bba6674b 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -83,7 +83,7 @@ private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler { class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers { implicit var webDriver: WebDriver = _ - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats override def beforeAll(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala b/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala index 6902493dc3c5d..6888e492a8d33 100644 --- a/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala @@ -22,14 +22,14 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration._ -import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} +import org.scalatest.concurrent.{ThreadSignaler, TimeLimits} import org.apache.spark.SparkFunSuite class KeyLockSuite extends SparkFunSuite with TimeLimits { // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x - private implicit val defaultSignaler: Signaler = ThreadSignaler + private implicit val defaultSignaler = ThreadSignaler private val foreverMs = 60 * 1000L diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala index 4957c3d3f6c1e..b17b86c08314b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala @@ -17,7 +17,7 @@ package org.apache.spark.examples.sql // $example on:programmatic_schema$ -import org.apache.spark.sql.{Encoder, Row} +import org.apache.spark.sql.Row // $example off:programmatic_schema$ // $example on:init_session$ import org.apache.spark.sql.SparkSession @@ -220,8 +220,7 @@ object SparkSQLExample { // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly - implicit val mapEncoder: Encoder[Map[String, Any]] = - org.apache.spark.sql.Encoders.kryo[Map[String, Any]] + implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index cbd6a3c52d536..c160dec13ff18 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -22,7 +22,7 @@ import java.util.Locale import breeze.linalg.normalize import breeze.numerics.exp import org.apache.hadoop.fs.Path -import org.json4s.{DefaultFormats, Formats} +import org.json4s.DefaultFormats import org.json4s.JsonAST.JObject import org.json4s.jackson.JsonMethods._ @@ -384,7 +384,7 @@ private object LDAParams { def getAndSetParams(model: LDAParams, metadata: Metadata): Unit = { VersionUtils.majorMinorVersion(metadata.sparkVersion) match { case (1, 6) => - implicit val format: Formats = DefaultFormats + implicit val format = DefaultFormats metadata.params match { case JObject(pairs) => pairs.foreach { case (paramName, jsonValue) => diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala index a8844358ead2d..8f03a29eb991a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.ml.linalg -import org.json4s.{DefaultFormats, Formats} +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render} @@ -29,7 +29,7 @@ private[ml] object JsonMatrixConverter { * Parses the JSON representation of a Matrix into a [[Matrix]]. */ def fromJson(json: String): Matrix = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val jValue = parseJson(json) (jValue \ "type").extract[Int] match { case 0 => // sparse diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala index 12387233879ad..1b949d75eeaa0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.linalg -import org.json4s.{DefaultFormats, Formats} +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render} @@ -27,7 +27,7 @@ private[ml] object JsonVectorConverter { * Parses the JSON representation of a vector into a [[Vector]]. */ def fromJson(json: String): Vector = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val jValue = parseJson(json) (jValue \ "type").extract[Int] match { case 0 => // sparse diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 1b5845f14f1b0..b818be30583c0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -129,7 +129,7 @@ private[ml] object Param { case JObject(v) => val keys = v.map(_._1) if (keys.contains("class")) { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val className = (jValue \ "class").extract[String] className match { case JsonMatrixConverter.className => @@ -398,7 +398,7 @@ class IntParam(parent: String, name: String, doc: String, isValid: Int => Boolea } override def jsonDecode(json: String): Int = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats parse(json).extract[Int] } } @@ -484,7 +484,7 @@ class LongParam(parent: String, name: String, doc: String, isValid: Long => Bool } override def jsonDecode(json: String): Long = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats parse(json).extract[Long] } } @@ -505,7 +505,7 @@ class BooleanParam(parent: String, name: String, doc: String) // No need for isV } override def jsonDecode(json: String): Boolean = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats parse(json).extract[Boolean] } } @@ -528,7 +528,7 @@ class StringArrayParam(parent: Params, name: String, doc: String, isValid: Array } override def jsonDecode(json: String): Array[String] = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats parse(json).extract[Seq[String]].toArray } } @@ -617,7 +617,7 @@ class IntArrayParam(parent: Params, name: String, doc: String, isValid: Array[In } override def jsonDecode(json: String): Array[Int] = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats parse(json).extract[Seq[Int]].toArray } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala index 66a96b0192297..594d9f315f508 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala @@ -137,7 +137,7 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg class AFTSurvivalRegressionWrapperReader extends MLReader[AFTSurvivalRegressionWrapper] { override def load(path: String): AFTSurvivalRegressionWrapper = { - implicit val format: Formats = DefaultFormats + implicit val format = DefaultFormats val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala index 125cdf7259fef..ad13cced4667b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala @@ -103,7 +103,7 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] { class ALSWrapperReader extends MLReader[ALSWrapper] { override def load(path: String): ALSWrapper = { - implicit val format: Formats = DefaultFormats + implicit val format = DefaultFormats val rMetadataPath = new Path(path, "rMetadata").toString val modelPath = new Path(path, "model").toString diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala index ad7435ce5be76..5161bc72659c6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.classification -import org.json4s.{DefaultFormats, Formats, JValue} +import org.json4s.{DefaultFormats, JValue} import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD @@ -65,7 +65,7 @@ private[mllib] object ClassificationModel { * @return (numFeatures, numClasses) */ def getNumFeaturesClasses(metadata: JValue): (Int, Int) = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats ((metadata \ "numFeatures").extract[Int], (metadata \ "numClasses").extract[Int]) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala index 136e557e84782..c3979118de403 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala @@ -223,7 +223,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { } def load(sc: SparkContext, path: String): BisectingKMeansModel = { - implicit val formats: Formats = DefaultFormats + implicit val formats: DefaultFormats = DefaultFormats val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) assert(className == thisClassName) assert(formatVersion == thisFormatVersion) @@ -261,7 +261,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { } def load(sc: SparkContext, path: String): BisectingKMeansModel = { - implicit val formats: Formats = DefaultFormats + implicit val formats: DefaultFormats = DefaultFormats val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) assert(className == thisClassName) assert(formatVersion == thisFormatVersion) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index eb5e776799d04..0c9c6ab826e62 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{DenseVector => BreezeVector} -import org.json4s.{DefaultFormats, Formats} +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -175,7 +175,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { @Since("1.4.0") override def load(sc: SparkContext, path: String): GaussianMixtureModel = { val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path) - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val k = (metadata \ "k").extract[Int] val classNameV1_0 = SaveLoadV1_0.classNameV1_0 (loadedClassName, version) match { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 5eafdc9add58d..64b352157caf7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -179,7 +179,7 @@ object KMeansModel extends Loader[KMeansModel] { } def load(sc: SparkContext, path: String): KMeansModel = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) assert(className == thisClassName) @@ -213,7 +213,7 @@ object KMeansModel extends Loader[KMeansModel] { } def load(sc: SparkContext, path: String): KMeansModel = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) assert(className == thisClassName) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index e318f06900950..aa8b6a00a427f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{argmax, argtopk, normalize, sum, DenseMatrix => BDM, DenseVector => BDV} import breeze.numerics.{exp, lgamma} import org.apache.hadoop.fs.Path -import org.json4s.{DefaultFormats, Formats} +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -496,7 +496,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] { @Since("1.5.0") override def load(sc: SparkContext, path: String): LocalLDAModel = { val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path) - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val expectedK = (metadata \ "k").extract[Int] val expectedVocabSize = (metadata \ "vocabSize").extract[Int] val docConcentration = @@ -923,7 +923,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { @Since("1.5.0") override def load(sc: SparkContext, path: String): DistributedLDAModel = { val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path) - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val expectedK = (metadata \ "k").extract[Int] val vocabSize = (metadata \ "vocabSize").extract[Int] val docConcentration = diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 12c7ae5066c82..ba541bbcccd29 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -79,7 +79,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode @Since("1.4.0") def load(sc: SparkContext, path: String): PowerIterationClusteringModel = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index 4aae9d8add43a..3202f08e220b0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -145,7 +145,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] { } def load(sc: SparkContext, path: String): ChiSqSelectorModel = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) assert(className == thisClassName) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index f286b729c03d0..97f277d53ca9d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import com.google.common.collect.{Ordering => GuavaOrdering} -import org.json4s.{DefaultFormats, Formats} +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -704,7 +704,7 @@ object Word2VecModel extends Loader[Word2VecModel] { override def load(sc: SparkContext, path: String): Word2VecModel = { val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path) - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val expectedVectorSize = (metadata \ "vectorSize").extract[Int] val expectedNumWords = (metadata \ "numWords").extract[Int] val classNameV1_0 = SaveLoadV1_0.classNameV1_0 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index 0938b709226bd..ecdc28dea37fd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import scala.reflect.ClassTag import scala.reflect.runtime.universe._ -import org.json4s.{DefaultFormats, Formats} +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, render} @@ -126,7 +126,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] { } def load(sc: SparkContext, path: String): FPGrowthModel[_] = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 703dd65bfab78..7c023bcfa72a4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import scala.reflect.ClassTag import scala.reflect.runtime.universe._ -import org.json4s.{DefaultFormats, Formats} +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, render} @@ -670,7 +670,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] { } def load(sc: SparkContext, path: String): PrefixSpanModel[_] = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index fa58443cca90b..a93f37799419e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import scala.language.implicitConversions import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} -import org.json4s.{DefaultFormats, Formats} +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render} @@ -430,7 +430,7 @@ object Vectors { */ @Since("1.6.0") def fromJson(json: String): Vector = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val jValue = parseJson(json) (jValue \ "type").extract[Int] match { case 0 => // sparse diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 581fe1f9eb647..3276513213f5d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -392,7 +392,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { } def load(sc: SparkContext, path: String): MatrixFactorizationModel = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = loadMetadata(sc, path) assert(className == thisClassName) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 81d1b290404d5..12a78ef4ec140 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -209,7 +209,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { @Since("1.4.0") override def load(sc: SparkContext, path: String): IsotonicRegressionModel = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val (loadedClassName, version, metadata) = loadMetadata(sc, path) val isotonic = (metadata \ "isotonic").extract[Boolean] val classNameV1_0 = SaveLoadV1_0.thisClassName diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala index 0e2dbe43e45bb..a95a54225a085 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.regression -import org.json4s.{DefaultFormats, Formats, JValue} +import org.json4s.{DefaultFormats, JValue} import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD @@ -64,7 +64,7 @@ private[mllib] object RegressionModel { * @return numFeatures */ def getNumFeatures(metadata: JValue): Int = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats (metadata \ "numFeatures").extract[Int] } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index 7a864b9d41efe..cdc998000c2fc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -312,7 +312,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { */ @Since("1.3.0") override def load(sc: SparkContext, path: String): DecisionTreeModel = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path) val algo = (metadata \ "algo").extract[String] val numNodes = (metadata \ "numNodes").extract[Int] diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index 03821dc417750..1f879a4d9dfbb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -438,7 +438,7 @@ private[tree] object TreeEnsembleModel extends Logging { * Read metadata from the loaded JSON metadata. */ def readMetadata(metadata: JValue): Metadata = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats (metadata \ "metadata").extract[Metadata] } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala index 74e8ae75caf3e..c13bc4099ce70 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala @@ -117,7 +117,7 @@ private[mllib] object Loader { * @return (class name, version, metadata) */ def loadMetadata(sc: SparkContext, path: String): (String, String, JValue) = { - implicit val formats: Formats = DefaultFormats + implicit val formats = DefaultFormats val metadata = parse(sc.textFile(metadataPath(path)).first()) val clazz = (metadata \ "class").extract[String] val version = (metadata \ "version").extract[String] diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 993509ebaee8e..79b58deafde57 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -22,11 +22,11 @@ import java.util.Locale import scala.io.Source import scala.util.Properties +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer -import scala.jdk.CollectionConverters._ import sbt._ -import sbt.Classpaths.publishOrSkip +import sbt.Classpaths.publishTask import sbt.Keys._ import sbt.librarymanagement.{ VersionNumber, SemanticSelector } import com.etsy.sbt.checkstyle.CheckstylePlugin.autoImport._ @@ -64,10 +64,10 @@ object BuildCommons { "tags", "sketch", "kvstore", "common-utils", "sql-api" ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ Seq(connectCommon, connect, connectClient) - val optionallyEnabledProjects@Seq(kubernetes, yarn, + val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn, sparkGangliaLgpl, streamingKinesisAsl, dockerIntegrationTests, hadoopCloud, kubernetesIntegrationTests) = - Seq("kubernetes", "yarn", + Seq("kubernetes", "mesos", "yarn", "ganglia-lgpl", "streaming-kinesis-asl", "docker-integration-tests", "hadoop-cloud", "kubernetes-integration-tests").map(ProjectRef(buildLocation, _)) @@ -89,9 +89,9 @@ object BuildCommons { // Google Protobuf version used for generating the protobuf. // SPARK-41247: needs to be consistent with `protobuf.version` in `pom.xml`. - val protoVersion = "3.25.1" + val protoVersion = "3.23.4" // GRPC version used for Spark Connect. - val grpcVersion = "1.59.0" + val grpcVersion = "1.56.0" } object SparkBuild extends PomBuild { @@ -223,37 +223,72 @@ object SparkBuild extends PomBuild { } ) + // Silencer: Scala compiler plugin for warning suppression + // Aim: enable fatal warnings, but suppress ones related to using of deprecated APIs + // depends on scala version: + // <2.13.2 - silencer 1.7.13 and compiler settings to enable fatal warnings + // 2.13.2+ - no silencer and configured warnings to achieve the same lazy val compilerWarningSettings: Seq[sbt.Def.Setting[_]] = Seq( + libraryDependencies ++= { + if (VersionNumber(scalaVersion.value).matchesSemVer(SemanticSelector("<2.13.2"))) { + val silencerVersion = "1.7.13" + Seq( + "org.scala-lang.modules" %% "scala-collection-compat" % "2.2.0", + compilerPlugin("com.github.ghik" % "silencer-plugin" % silencerVersion cross CrossVersion.full), + "com.github.ghik" % "silencer-lib" % silencerVersion % Provided cross CrossVersion.full + ) + } else { + Seq.empty + } + }, (Compile / scalacOptions) ++= { - Seq( - // replace -Xfatal-warnings with fine-grained configuration, since 2.13.2 - // verbose warning on deprecation, error on all others - // see `scalac -Wconf:help` for details - "-Wconf:cat=deprecation:wv,any:e", - // 2.13-specific warning hits to be muted (as narrowly as possible) and addressed separately - "-Wunused:imports", - // SPARK-33775 Suppress compilation warnings that contain the following contents. - // TODO(SPARK-33805): Undo the corresponding deprecated usage suppression rule after - // fixed. - "-Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since 2.13).+$:e", - "-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated because it loses precision).+$:e", - // SPARK-45610 Convert "Auto-application to `()` is deprecated" to compile error, as it will become a compile error in Scala 3. - "-Wconf:cat=deprecation&msg=Auto-application to \\`\\(\\)\\` is deprecated:e", - // TODO(SPARK-45615): The issue described by https://github.com/scalatest/scalatest/issues/2297 can cause false positives. - // So SPARK-45610 added the following 4 suppression rules, which can be removed after upgrading scalatest to 3.2.18. - "-Wconf:cat=deprecation&msg=Auto-application to \\`\\(\\)\\` is deprecated&site=org.apache.spark.rdd.RDDSuite:s", - "-Wconf:cat=deprecation&msg=Auto-application to \\`\\(\\)\\` is deprecated&site=org.apache.spark.scheduler.TaskSetManagerSuite:s", - "-Wconf:cat=deprecation&msg=Auto-application to \\`\\(\\)\\` is deprecated&site=org.apache.spark.streaming.ReceiverInputDStreamSuite:s", - "-Wconf:cat=deprecation&msg=Auto-application to \\`\\(\\)\\` is deprecated&site=org.apache.spark.streaming.kafka010.KafkaRDDSuite:s", - // SPARK-35574 Prevent the recurrence of compilation warnings related to `procedure syntax is deprecated` - "-Wconf:cat=deprecation&msg=procedure syntax is deprecated:e", - // SPARK-45627 Symbol literals are deprecated in Scala 2.13 and it's a compile error in Scala 3. - "-Wconf:cat=deprecation&msg=symbol literal is deprecated:e", - // SPARK-45627 `enum`, `export` and `given` will become keywords in Scala 3, - // so they are prohibited from being used as variable names in Scala 2.13 to - // reduce the cost of migration in subsequent versions. - "-Wconf:cat=deprecation&msg=it will become a keyword in Scala 3:e" - ) + if (VersionNumber(scalaVersion.value).matchesSemVer(SemanticSelector("<2.13.2"))) { + Seq( + "-Xfatal-warnings", + "-deprecation", + "-Ywarn-unused-import", + "-P:silencer:globalFilters=.*deprecated.*" //regex to catch deprecation warnings and suppress them + ) + } else { + Seq( + // replace -Xfatal-warnings with fine-grained configuration, since 2.13.2 + // verbose warning on deprecation, error on all others + // see `scalac -Wconf:help` for details + "-Wconf:cat=deprecation:wv,any:e", + // 2.13-specific warning hits to be muted (as narrowly as possible) and addressed separately + "-Wunused:imports", + "-Wconf:cat=lint-multiarg-infix:wv", + "-Wconf:cat=other-nullary-override:wv", + "-Wconf:cat=other-match-analysis&site=org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction.catalogFunction:wv", + "-Wconf:cat=other-pure-statement&site=org.apache.spark.streaming.util.FileBasedWriteAheadLog.readAll.readFile:wv", + "-Wconf:cat=other-pure-statement&site=org.apache.spark.scheduler.OutputCommitCoordinatorSuite..futureAction:wv", + "-Wconf:cat=other-pure-statement&site=org.apache.spark.sql.streaming.sources.StreamingDataSourceV2Suite.testPositiveCase.\\$anonfun:wv", + // SPARK-33775 Suppress compilation warnings that contain the following contents. + // TODO(SPARK-33805): Undo the corresponding deprecated usage suppression rule after + // fixed. + "-Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since 2.13).+$:s", + "-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated because it loses precision).+$:s", + "-Wconf:msg=Auto-application to \\`\\(\\)\\` is deprecated:s", + "-Wconf:msg=method with a single empty parameter list overrides method without any parameter list:s", + "-Wconf:msg=method without a parameter list overrides a method with a single empty one:s", + // SPARK-35574 Prevent the recurrence of compilation warnings related to `procedure syntax is deprecated` + "-Wconf:cat=deprecation&msg=procedure syntax is deprecated:e", + // SPARK-35496 Upgrade Scala to 2.13.7 and suppress: + // 1. `The outer reference in this type test cannot be checked at run time` + // 2. `the type test for pattern TypeA cannot be checked at runtime because it + // has type parameters eliminated by erasure` + // 3. `abstract type TypeA in type pattern Seq[TypeA] (the underlying of + // Seq[TypeA]) is unchecked since it is eliminated by erasure` + // 4. `fruitless type test: a value of TypeA cannot also be a TypeB` + "-Wconf:cat=unchecked&msg=outer reference:s", + "-Wconf:cat=unchecked&msg=eliminated by erasure:s", + "-Wconf:msg=^(?=.*?a value of type)(?=.*?cannot also be).+$:s", + // TODO(SPARK-43850): Remove the following suppression rules and remove `import scala.language.higherKinds` + // from the corresponding files when Scala 2.12 is no longer supported. + "-Wconf:cat=unused-imports&src=org\\/apache\\/spark\\/graphx\\/impl\\/VertexPartitionBase.scala:s", + "-Wconf:cat=unused-imports&src=org\\/apache\\/spark\\/graphx\\/impl\\/VertexPartitionBaseOps.scala:s" + ) + } } ) @@ -289,10 +324,8 @@ object SparkBuild extends PomBuild { .withLogging(ivyLoggingLevel.value), (MavenCompile / publishMavenStyle) := true, (SbtCompile / publishMavenStyle) := false, - (MavenCompile / publishLocal) := publishOrSkip((MavenCompile / publishLocalConfiguration), - (publishLocal / skip)).value, - (SbtCompile / publishLocal) := publishOrSkip((SbtCompile / publishLocalConfiguration), - (publishLocal / skip)).value, + (MavenCompile / publishLocal) := publishTask((MavenCompile / publishLocalConfiguration)).value, + (SbtCompile / publishLocal) := publishTask((SbtCompile / publishLocalConfiguration)).value, publishLocal := Seq((MavenCompile / publishLocal), (SbtCompile / publishLocal)).dependOn.value, javaOptions ++= { @@ -329,7 +362,7 @@ object SparkBuild extends PomBuild { ), (Compile / scalacOptions) ++= Seq( - s"-target:${javaVersion.value}", + s"-target:jvm-${javaVersion.value}", "-sourcepath", (ThisBuild / baseDirectory).value.getAbsolutePath // Required for relative source links in scaladoc ), @@ -345,12 +378,19 @@ object SparkBuild extends PomBuild { "org.apache.spark.util.collection" ).mkString(":"), "-doc-title", "Spark " + version.value.replaceAll("-SNAPSHOT", "") + " ScalaDoc" - ), + ) ++ { + // Do not attempt to scaladoc javadoc comments under 2.12 since it can't handle inner classes + if (scalaBinaryVersion.value == "2.12") Seq("-no-java-comments") else Seq.empty + }, // disable Mima check for all modules, // to be enabled in specific ones that have previous artifacts MimaKeys.mimaFailOnNoPrevious := false, + // To prevent intermittent compilation failures, see also SPARK-33297 + // Apparently we can remove this when we use JDK 11. + Test / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat, + // Setting version for the protobuf compiler. This has to be propagated to every sub-project // even if the project is not using it. PB.protocVersion := protoVersion, @@ -373,7 +413,8 @@ object SparkBuild extends PomBuild { val mimaProjects = allProjects.filterNot { x => Seq( spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn, - unsafe, tags, tokenProviderKafka010, sqlKafka010, connectCommon, connect, connectClient + unsafe, tags, tokenProviderKafka010, sqlKafka010, connectCommon, connect, connectClient, + commonUtils, sqlApi ).contains(x) } @@ -542,7 +583,7 @@ object SparkParallelTestGrouping { "org.apache.spark.sql.streaming.RocksDBStateStoreStreamingAggregationSuite", "org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIOSuite", "org.apache.spark.sql.hive.HiveScalaReflectionSuite" - ) ++ sys.env.get("DEDICATED_JVM_SBT_TESTS").map(_.split(",")).getOrElse(Array.empty).toSet + ) private val DEFAULT_TEST_GROUP = "default_test_group" private val HIVE_EXECUTION_TEST_GROUP = "hive_execution_test_group" @@ -954,7 +995,8 @@ object Unsafe { object DockerIntegrationTests { // This serves to override the override specified in DependencyOverrides: lazy val settings = Seq( - dependencyOverrides += "com.google.guava" % "guava" % "18.0" + dependencyOverrides += "com.google.guava" % "guava" % "18.0", + resolvers += "DB2" at "https://app.camunda.com/nexus/content/repositories/public/" ) } @@ -1058,7 +1100,7 @@ object DependencyOverrides { dependencyOverrides += "com.google.guava" % "guava" % guavaVersion, dependencyOverrides += "xerces" % "xercesImpl" % "2.12.2", dependencyOverrides += "jline" % "jline" % "2.14.6", - dependencyOverrides += "org.apache.avro" % "avro" % "1.11.3") + dependencyOverrides += "org.apache.avro" % "avro" % "1.11.2") } /** @@ -1634,7 +1676,7 @@ object TestSettings { (Test / testOptions) += Tests.Argument(TestFrameworks.ScalaTest, "-W", "120", "300"), (Test / testOptions) += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), // Enable Junit testing. - libraryDependencies += "net.aichler" % "jupiter-interface" % "0.11.1" % "test", + libraryDependencies += "com.github.sbt" % "junit-interface" % "0.13.3" % "test", // `parallelExecutionInTest` controls whether test suites belonging to the same SBT project // can run in parallel with one another. It does NOT control whether tests execute in parallel // within the same JVM (which is controlled by `testForkedParallel`) or whether test cases diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 3bf40d2e6be93..34848a7f3d853 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -64,14 +64,14 @@ private[spark] abstract class YarnSchedulerBackend( private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint( YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint) - private implicit val askTimeout: RpcTimeout = RpcUtils.askRpcTimeout(sc.conf) + private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf) /** * Declare implicit single thread execution context for futures doRequestTotalExecutors and * doKillExecutors below, avoiding using the global execution context that may cause conflict * with user code's execution of futures. */ - private implicit val schedulerEndpointEC: ExecutionContext = ExecutionContext.fromExecutorService( + private implicit val schedulerEndpointEC = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonSingleThreadExecutor("yarn-scheduler-endpoint")) /** Application ID. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index 3521004a00e91..3a662e68d58c2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -491,8 +491,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } } - implicit private def mapIntStrEncoder: ExpressionEncoder[Map[Int, String]] = - ExpressionEncoder[Map[Int, String]]() + implicit private def mapIntStrEncoder = ExpressionEncoder[Map[Int, String]]() test("SPARK-23588 CatalystToExternalMap should support interpreted execution") { // To get a resolved `CatalystToExternalMap` expression, we build a deserializer plan diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 266c369894eca..f28df3839d0a8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -423,8 +423,7 @@ class ColumnPruningSuite extends PlanTest { comparePlans(optimized, expected) } - implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] = - ExpressionEncoder[T]() + implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() private val func = identity[Iterator[OtherTuple]] _ test("Column pruning on MapPartitions") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala index 74cd917955284..1c818eee1224d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala @@ -37,10 +37,8 @@ class EliminateMapObjectsSuite extends PlanTest { } } - implicit private def intArrayEncoder: ExpressionEncoder[Array[Int]] = - ExpressionEncoder[Array[Int]]() - implicit private def doubleArrayEncoder: ExpressionEncoder[Array[Double]] = - ExpressionEncoder[Array[Double]]() + implicit private def intArrayEncoder = ExpressionEncoder[Array[Int]]() + implicit private def doubleArrayEncoder = ExpressionEncoder[Array[Double]]() test("SPARK-20254: Remove unnecessary data conversion for primitive array") { val intObjType = ObjectType(classOf[Array[Int]]) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala index f1acac39a7672..0d654cc1ac935 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala @@ -35,9 +35,8 @@ class EliminateSerializationSuite extends PlanTest { EliminateSerialization) :: Nil } - implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] = - ExpressionEncoder[T]() - implicit private def intEncoder: ExpressionEncoder[Int] = ExpressionEncoder[Int]() + implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() + implicit private def intEncoder = ExpressionEncoder[Int]() test("back to back serialization") { val input = LocalRelation($"obj".obj(classOf[(Int, Int)])) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala index 8c4daacd1b2f0..a1039b051ce45 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala @@ -40,8 +40,7 @@ class ObjectSerializerPruningSuite extends PlanTest { RemoveNoopOperators) :: Nil } - implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] = - ExpressionEncoder[T]() + implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() test("collect struct types") { val dataTypes = Seq( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala index fb29f6f17e756..4385777e79c09 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala @@ -37,8 +37,7 @@ class TypedFilterOptimizationSuite extends PlanTest { CombineTypedFilters) :: Nil } - implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] = - ExpressionEncoder[T]() + implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() val testRelation = LocalRelation($"_1".int, $"_2".int) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala index 2324c33806d48..acf62d07bc398 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala @@ -47,8 +47,7 @@ class DistinctKeyVisitorSuite extends PlanTest { assert(plan.analyze.distinctKeys === distinctKeys) } - implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] = - ExpressionEncoder[T]() + implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() test("Aggregate's distinct attributes") { checkDistinctAttributes(t1.groupBy($"a", $"b")($"a", $"b", 1), Set(ExpressionSet(Seq(a, b)))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 293f20c453aee..c063af9381ff2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream} import scala.annotation.varargs +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashSet} -import scala.jdk.CollectionConverters._ -import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -65,7 +64,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.array.ByteArrayMethods -import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils private[sql] object Dataset { @@ -203,7 +201,7 @@ class Dataset[T] private[sql]( } // A globally unique id of this Dataset. - private[sql] val id = Dataset.curId.getAndIncrement() + private val id = Dataset.curId.getAndIncrement() queryExecution.assertAnalyzed() @@ -242,7 +240,7 @@ class Dataset[T] private[sql]( exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer) } - private implicit def classTag: ClassTag[T] = exprEnc.clsTag + private implicit def classTag = exprEnc.clsTag // sqlContext must be val because a stable identifier is expected when you import implicits @transient lazy val sqlContext: SQLContext = sparkSession.sqlContext @@ -250,13 +248,13 @@ class Dataset[T] private[sql]( private[sql] def resolve(colName: String): NamedExpression = { val resolver = sparkSession.sessionState.analyzer.resolver queryExecution.analyzed.resolveQuoted(colName, resolver) - .getOrElse(throw QueryCompilationErrors.unresolvedColumnError(colName, schema.fieldNames)) + .getOrElse(throw QueryCompilationErrors.resolveException(colName, schema.fieldNames)) } private[sql] def numericColumns: Seq[Expression] = { schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n => queryExecution.analyzed.resolveQuoted(n.name, sparkSession.sessionState.analyzer.resolver).get - }.toImmutableArraySeq + } } /** @@ -284,8 +282,7 @@ class Dataset[T] private[sql]( // For array values, replace Seq and Array with square brackets // For cells that are beyond `truncate` characters, replace it with the // first `truncate-3` and "..." - (schema.fieldNames - .map(SchemaUtils.escapeMetaCharacters).toImmutableArraySeq +: data.map { row => + schema.fieldNames.map(SchemaUtils.escapeMetaCharacters).toSeq +: data.map { row => row.toSeq.map { cell => assert(cell != null, "ToPrettyString is not nullable and should not return null value") // Escapes meta-characters not to break the `showString` format @@ -298,7 +295,7 @@ class Dataset[T] private[sql]( str } }: Seq[String] - }).toImmutableArraySeq + } } /** @@ -511,11 +508,9 @@ class Dataset[T] private[sql]( * @group basic * @since 3.4.0 */ - def to(schema: StructType): DataFrame = withOrigin { - withPlan { - val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] - Project.matchSchema(logicalPlan, replaced, sparkSession.sessionState.conf) - } + def to(schema: StructType): DataFrame = withPlan { + val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] + Project.matchSchema(logicalPlan, replaced, sparkSession.sessionState.conf) } /** @@ -655,7 +650,7 @@ class Dataset[T] private[sql]( * @group basic * @since 2.4.0 */ - def isEmpty: Boolean = withAction("isEmpty", select().limit(1).queryExecution) { plan => + def isEmpty: Boolean = withAction("isEmpty", select().queryExecution) { plan => plan.executeTake(1).isEmpty } @@ -775,13 +770,11 @@ class Dataset[T] private[sql]( */ // We only accept an existing column name, not a derived column here as a watermark that is // defined on a derived column cannot referenced elsewhere in the plan. - def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withOrigin { - withTypedPlan { - val parsedDelay = IntervalUtils.fromIntervalString(delayThreshold) - require(!IntervalUtils.isNegative(parsedDelay), - s"delay threshold ($delayThreshold) should not be negative.") - EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) - } + def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan { + val parsedDelay = IntervalUtils.fromIntervalString(delayThreshold) + require(!IntervalUtils.isNegative(parsedDelay), + s"delay threshold ($delayThreshold) should not be negative.") + EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } /** @@ -953,10 +946,8 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ - def join(right: Dataset[_]): DataFrame = withOrigin { - withPlan { - Join(logicalPlan, right.logicalPlan, joinType = Inner, None, JoinHint.NONE) - } + def join(right: Dataset[_]): DataFrame = withPlan { + Join(logicalPlan, right.logicalPlan, joinType = Inner, None, JoinHint.NONE) } /** @@ -995,7 +986,7 @@ class Dataset[T] private[sql]( * @since 3.4.0 */ def join(right: Dataset[_], usingColumns: Array[String]): DataFrame = { - join(right, usingColumns.toImmutableArraySeq) + join(right, usingColumns.toSeq) } /** @@ -1064,7 +1055,7 @@ class Dataset[T] private[sql]( * @since 3.4.0 */ def join(right: Dataset[_], usingColumns: Array[String], joinType: String): DataFrame = { - join(right, usingColumns.toImmutableArraySeq, joinType) + join(right, usingColumns.toSeq, joinType) } /** @@ -1089,23 +1080,22 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ - def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = - withOrigin { - // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right - // by creating a new instance for one of the branch. - val joined = sparkSession.sessionState.executePlan( - Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None, JoinHint.NONE)) - .analyzed.asInstanceOf[Join] - - withPlan { - Join( - joined.left, - joined.right, - UsingJoin(JoinType(joinType), usingColumns.toIndexedSeq), - None, - JoinHint.NONE) - } + def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = { + // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right + // by creating a new instance for one of the branch. + val joined = sparkSession.sessionState.executePlan( + Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None, JoinHint.NONE)) + .analyzed.asInstanceOf[Join] + + withPlan { + Join( + joined.left, + joined.right, + UsingJoin(JoinType(joinType), usingColumns.toIndexedSeq), + None, + JoinHint.NONE) } + } /** * Inner join with another `DataFrame`, using the given join expression. @@ -1186,7 +1176,7 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ - def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame = withOrigin { + def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame = { withPlan { resolveSelfJoinCondition(right, Some(joinExprs), joinType) } @@ -1202,10 +1192,8 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.1.0 */ - def crossJoin(right: Dataset[_]): DataFrame = withOrigin { - withPlan { - Join(logicalPlan, right.logicalPlan, joinType = Cross, None, JoinHint.NONE) - } + def crossJoin(right: Dataset[_]): DataFrame = withPlan { + Join(logicalPlan, right.logicalPlan, joinType = Cross, None, JoinHint.NONE) } /** @@ -1229,28 +1217,27 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = - withOrigin { - // Creates a Join node and resolve it first, to get join condition resolved, self-join - // resolved, etc. - val joined = sparkSession.sessionState.executePlan( - Join( - this.logicalPlan, - other.logicalPlan, - JoinType(joinType), - Some(condition.expr), - JoinHint.NONE)).analyzed.asInstanceOf[Join] - - implicit val tuple2Encoder: Encoder[(T, U)] = - ExpressionEncoder.tuple(this.exprEnc, other.exprEnc) - - withTypedPlan(JoinWith.typedJoinWith( - joined, - sparkSession.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity, - sparkSession.sessionState.analyzer.resolver, - this.exprEnc.isSerializedAsStructForTopLevel, - other.exprEnc.isSerializedAsStructForTopLevel)) - } + def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = { + // Creates a Join node and resolve it first, to get join condition resolved, self-join resolved, + // etc. + val joined = sparkSession.sessionState.executePlan( + Join( + this.logicalPlan, + other.logicalPlan, + JoinType(joinType), + Some(condition.expr), + JoinHint.NONE)).analyzed.asInstanceOf[Join] + + implicit val tuple2Encoder: Encoder[(T, U)] = + ExpressionEncoder.tuple(this.exprEnc, other.exprEnc) + + withTypedPlan(JoinWith.typedJoinWith( + joined, + sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity, + sparkSession.sessionState.analyzer.resolver, + this.exprEnc.isSerializedAsStructForTopLevel, + other.exprEnc.isSerializedAsStructForTopLevel)) + } /** * Using inner equi-join to join this Dataset returning a `Tuple2` for each pair @@ -1413,31 +1400,12 @@ class Dataset[T] private[sql]( * df1.join(df2.hint("broadcast")) * }}} * - * the following code specifies that this dataset could be rebalanced with given number of - * partitions: - * - * {{{ - * df1.hint("rebalance", 10) - * }}} - * - * @param name the name of the hint - * @param parameters the parameters of the hint, all the parameters should be a `Column` or - * `Expression` or `Symbol` or could be converted into a `Literal` - * * @group basic * @since 2.2.0 */ @scala.annotation.varargs - def hint(name: String, parameters: Any*): Dataset[T] = withOrigin { - withTypedPlan { - val exprs = parameters.map { - case c: Column => c.expr - case s: Symbol => Column(s.name).expr - case e: Expression => e - case literal => Literal(literal) - }.toSeq - UnresolvedHint(name, exprs, logicalPlan) - } + def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan { + UnresolvedHint(name, parameters, logicalPlan) } /** @@ -1452,7 +1420,7 @@ class Dataset[T] private[sql]( case "*" => Column(ResolvedStar(queryExecution.analyzed.output)) case _ => - if (sparkSession.sessionState.conf.supportQuotedRegexColumnName) { + if (sqlContext.conf.supportQuotedRegexColumnName) { colRegex(colName) } else { Column(addDataFrameIdToCol(resolve(colName))) @@ -1513,10 +1481,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def as(alias: String): Dataset[T] = withOrigin { - withTypedPlan { - SubqueryAlias(alias, logicalPlan) - } + def as(alias: String): Dataset[T] = withTypedPlan { + SubqueryAlias(alias, logicalPlan) } /** @@ -1553,28 +1519,25 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def select(cols: Column*): DataFrame = withOrigin { - withPlan { - val untypedCols = cols.map { - case typedCol: TypedColumn[_, _] => - // Checks if a `TypedColumn` has been inserted with - // specific input type and schema by `withInputType`. - val needInputType = typedCol.expr.exists { - case ta: TypedAggregateExpression if ta.inputDeserializer.isEmpty => true - case _ => false - } + def select(cols: Column*): DataFrame = withPlan { + val untypedCols = cols.map { + case typedCol: TypedColumn[_, _] => + // Checks if a `TypedColumn` has been inserted with + // specific input type and schema by `withInputType`. + val needInputType = typedCol.expr.exists { + case ta: TypedAggregateExpression if ta.inputDeserializer.isEmpty => true + case _ => false + } - if (!needInputType) { - typedCol - } else { - throw - QueryCompilationErrors.cannotPassTypedColumnInUntypedSelectError(typedCol.toString) - } + if (!needInputType) { + typedCol + } else { + throw QueryCompilationErrors.cannotPassTypedColumnInUntypedSelectError(typedCol.toString) + } - case other => other - } - Project(untypedCols.map(_.named), logicalPlan) + case other => other } + Project(untypedCols.map(_.named), logicalPlan) } /** @@ -1591,9 +1554,7 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def select(col: String, cols: String*): DataFrame = withOrigin { - select((col +: cols).map(Column(_)) : _*) - } + def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*) /** * Selects a set of SQL expressions. This is a variant of `select` that accepts @@ -1609,12 +1570,10 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def selectExpr(exprs: String*): DataFrame = withOrigin { - sparkSession.withActive { - select(exprs.map { expr => - Column(sparkSession.sessionState.sqlParser.parseExpression(expr)) - }: _*) - } + def selectExpr(exprs: String*): DataFrame = sparkSession.withActive { + select(exprs.map { expr => + Column(sparkSession.sessionState.sqlParser.parseExpression(expr)) + }: _*) } /** @@ -1628,8 +1587,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = withOrigin { - implicit val encoder: ExpressionEncoder[U1] = c1.encoder + def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = { + implicit val encoder = c1.encoder val project = Project(c1.withInputType(exprEnc, logicalPlan.output).named :: Nil, logicalPlan) if (!encoder.isSerializedAsStructForTopLevel) { @@ -1712,10 +1671,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def filter(condition: Column): Dataset[T] = withOrigin { - withTypedPlan { - Filter(condition.expr, logicalPlan) - } + def filter(condition: Column): Dataset[T] = withTypedPlan { + Filter(condition.expr, logicalPlan) } /** @@ -1826,33 +1783,6 @@ class Dataset[T] private[sql]( RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.CubeType) } - /** - * Create multi-dimensional aggregation for the current Dataset using the specified grouping sets, - * so we can run aggregation on them. - * See [[RelationalGroupedDataset]] for all the available aggregate functions. - * - * {{{ - * // Compute the average for all numeric columns group by specific grouping sets. - * ds.groupingSets(Seq(Seq($"department", $"group"), Seq()), $"department", $"group").avg() - * - * // Compute the max age and average salary, group by specific grouping sets. - * ds.groupingSets(Seq($"department", $"gender"), Seq()), $"department", $"group").agg(Map( - * "salary" -> "avg", - * "age" -> "max" - * )) - * }}} - * - * @group untypedrel - * @since 4.0.0 - */ - @scala.annotation.varargs - def groupingSets(groupingSets: Seq[Seq[Column]], cols: Column*): RelationalGroupedDataset = { - RelationalGroupedDataset( - toDF(), - cols.map(_.expr), - RelationalGroupedDataset.GroupingSetsType(groupingSets.map(_.map(_.expr)))) - } - /** * Groups the Dataset using the specified columns, so that we can run aggregation on them. * See [[RelationalGroupedDataset]] for all the available aggregate functions. @@ -2101,17 +2031,15 @@ class Dataset[T] private[sql]( ids: Array[Column], values: Array[Column], variableColumnName: String, - valueColumnName: String): DataFrame = withOrigin { - withPlan { - Unpivot( - Some(ids.map(_.named).toImmutableArraySeq), - Some(values.map(v => Seq(v.named)).toImmutableArraySeq), - None, - variableColumnName, - Seq(valueColumnName), - logicalPlan - ) - } + valueColumnName: String): DataFrame = withPlan { + Unpivot( + Some(ids.map(_.named)), + Some(values.map(v => Seq(v.named))), + None, + variableColumnName, + Seq(valueColumnName), + logicalPlan + ) } /** @@ -2134,17 +2062,15 @@ class Dataset[T] private[sql]( def unpivot( ids: Array[Column], variableColumnName: String, - valueColumnName: String): DataFrame = withOrigin { - withPlan { - Unpivot( - Some(ids.map(_.named).toImmutableArraySeq), - None, - None, - variableColumnName, - Seq(valueColumnName), - logicalPlan - ) - } + valueColumnName: String): DataFrame = withPlan { + Unpivot( + Some(ids.map(_.named)), + None, + None, + variableColumnName, + Seq(valueColumnName), + logicalPlan + ) } /** @@ -2261,10 +2187,8 @@ class Dataset[T] private[sql]( * @since 3.0.0 */ @varargs - def observe(name: String, expr: Column, exprs: Column*): Dataset[T] = withOrigin { - withTypedPlan { - CollectMetrics(name, (expr +: exprs).map(_.named), logicalPlan, id) - } + def observe(name: String, expr: Column, exprs: Column*): Dataset[T] = withTypedPlan { + CollectMetrics(name, (expr +: exprs).map(_.named), logicalPlan, id) } /** @@ -2301,10 +2225,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.0.0 */ - def limit(n: Int): Dataset[T] = withOrigin { - withTypedPlan { - Limit(Literal(n), logicalPlan) - } + def limit(n: Int): Dataset[T] = withTypedPlan { + Limit(Literal(n), logicalPlan) } /** @@ -2313,10 +2235,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 3.4.0 */ - def offset(n: Int): Dataset[T] = withOrigin { - withTypedPlan { - Offset(Literal(n), logicalPlan) - } + def offset(n: Int): Dataset[T] = withTypedPlan { + Offset(Literal(n), logicalPlan) } // This breaks caching, but it's usually ok because it addresses a very specific use case: @@ -2726,20 +2646,20 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @deprecated("use flatMap() or select() with functions.explode() instead", "2.0.0") - def explode[A <: Product : TypeTag](input: Column*)(f: Row => IterableOnce[A]): DataFrame = - withOrigin { - val elementSchema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] - val convert = CatalystTypeConverters.createToCatalystConverter(elementSchema) - - val rowFunction = - f.andThen(_.map(convert(_).asInstanceOf[InternalRow])) - val generator = UserDefinedGenerator(elementSchema, rowFunction, input.map(_.expr)) - - withPlan { - Generate(generator, unrequiredChildIndex = Nil, outer = false, - qualifier = None, generatorOutput = Nil, logicalPlan) - } + def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = { + val elementSchema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] + + val convert = CatalystTypeConverters.createToCatalystConverter(elementSchema) + + val rowFunction = + f.andThen(_.map(convert(_).asInstanceOf[InternalRow])) + val generator = UserDefinedGenerator(elementSchema, rowFunction, input.map(_.expr)) + + withPlan { + Generate(generator, unrequiredChildIndex = Nil, outer = false, + qualifier = None, generatorOutput = Nil, logicalPlan) } + } /** * (Scala-specific) Returns a new Dataset where a single column has been expanded to zero @@ -2763,14 +2683,14 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @deprecated("use flatMap() or select() with functions.explode() instead", "2.0.0") - def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => IterableOnce[B]) - : DataFrame = withOrigin { + def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B]) + : DataFrame = { val dataType = ScalaReflection.schemaFor[B].dataType val attributes = AttributeReference(outputColumn, dataType)() :: Nil // TODO handle the metadata? val elementSchema = attributes.toStructType - def rowFunction(row: Row): IterableOnce[InternalRow] = { + def rowFunction(row: Row): TraversableOnce[InternalRow] = { val convert = CatalystTypeConverters.createToCatalystConverter(dataType) f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o))) } @@ -2921,7 +2841,7 @@ class Dataset[T] private[sql]( * @since 3.4.0 */ @throws[AnalysisException] - def withColumnsRenamed(colsMap: Map[String, String]): DataFrame = withOrigin { + def withColumnsRenamed(colsMap: Map[String, String]): DataFrame = { val resolver = sparkSession.sessionState.analyzer.resolver val output: Seq[NamedExpression] = queryExecution.analyzed.output @@ -3092,10 +3012,8 @@ class Dataset[T] private[sql]( * @since 3.4.0 */ @scala.annotation.varargs - def drop(col: Column, cols: Column*): DataFrame = withOrigin { - withPlan { - DataFrameDropColumns((col +: cols).map(_.expr), logicalPlan) - } + def drop(col: Column, cols: Column*): DataFrame = withPlan { + DataFrameDropColumns((col +: cols).map(_.expr), logicalPlan) } /** @@ -3126,11 +3044,9 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.0.0 */ - def dropDuplicates(colNames: Seq[String]): Dataset[T] = withOrigin { - withTypedPlan { - val groupCols = groupColsFromDropDuplicates(colNames) - Deduplicate(groupCols, logicalPlan) - } + def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { + val groupCols = groupColsFromDropDuplicates(colNames) + Deduplicate(groupCols, logicalPlan) } /** @@ -3146,8 +3062,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.0.0 */ - def dropDuplicates(colNames: Array[String]): Dataset[T] = - dropDuplicates(colNames.toImmutableArraySeq) + def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq) /** * Returns a new [[Dataset]] with duplicate rows removed, considering only @@ -3207,12 +3122,10 @@ class Dataset[T] private[sql]( * @group typedrel * @since 3.5.0 */ - def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withOrigin { - withTypedPlan { - val groupCols = groupColsFromDropDuplicates(colNames) - // UnsupportedOperationChecker will fail the query if this is called with batch Dataset. - DeduplicateWithinWatermark(groupCols, logicalPlan) - } + def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withTypedPlan { + val groupCols = groupColsFromDropDuplicates(colNames) + // UnsupportedOperationChecker will fail the query if this is called with batch Dataset. + DeduplicateWithinWatermark(groupCols, logicalPlan) } /** @@ -3234,7 +3147,7 @@ class Dataset[T] private[sql]( * @since 3.5.0 */ def dropDuplicatesWithinWatermark(colNames: Array[String]): Dataset[T] = { - dropDuplicatesWithinWatermark(colNames.toImmutableArraySeq) + dropDuplicatesWithinWatermark(colNames.toSeq) } /** @@ -3436,7 +3349,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def filter(func: T => Boolean): Dataset[T] = withOrigin { + def filter(func: T => Boolean): Dataset[T] = { withTypedPlan(TypedFilter(func, logicalPlan)) } @@ -3447,7 +3360,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def filter(func: FilterFunction[T]): Dataset[T] = withOrigin { + def filter(func: FilterFunction[T]): Dataset[T] = { withTypedPlan(TypedFilter(func, logicalPlan)) } @@ -3458,10 +3371,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def map[U : Encoder](func: T => U): Dataset[U] = withOrigin { - withTypedPlan { - MapElements[T, U](func, logicalPlan) - } + def map[U : Encoder](func: T => U): Dataset[U] = withTypedPlan { + MapElements[T, U](func, logicalPlan) } /** @@ -3471,8 +3382,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = withOrigin { - implicit val uEnc: Encoder[U] = encoder + def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = { + implicit val uEnc = encoder withTypedPlan(MapElements[T, U](func, logicalPlan)) } @@ -3558,7 +3469,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def flatMap[U : Encoder](func: T => IterableOnce[U]): Dataset[U] = + def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] = mapPartitions(_.flatMap(func)) /** @@ -3634,9 +3545,8 @@ class Dataset[T] private[sql]( * @group action * @since 3.0.0 */ - def tail(n: Int): Array[T] = withOrigin { - withAction("tail", withTypedPlan(Tail(Literal(n), logicalPlan)).queryExecution)(collectFromPlan) - } + def tail(n: Int): Array[T] = withAction( + "tail", withTypedPlan(Tail(Literal(n), logicalPlan)).queryExecution)(collectFromPlan) /** * Returns the first `n` rows in the Dataset as a list. @@ -3700,10 +3610,8 @@ class Dataset[T] private[sql]( * @group action * @since 1.6.0 */ - def count(): Long = withOrigin { - withAction("count", groupBy().count().queryExecution) { plan => - plan.executeCollect().head.getLong(0) - } + def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => + plan.executeCollect().head.getLong(0) } /** @@ -3712,15 +3620,13 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def repartition(numPartitions: Int): Dataset[T] = withOrigin { - withTypedPlan { - Repartition(numPartitions, shuffle = true, logicalPlan) - } + def repartition(numPartitions: Int): Dataset[T] = withTypedPlan { + Repartition(numPartitions, shuffle = true, logicalPlan) } private def repartitionByExpression( numPartitions: Option[Int], - partitionExprs: Seq[Column]): Dataset[T] = withOrigin { + partitionExprs: Seq[Column]): Dataset[T] = { // The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments. // However, we don't want to complicate the semantics of this API method. // Instead, let's give users a friendly error message, pointing them to the new method. @@ -3765,7 +3671,7 @@ class Dataset[T] private[sql]( private def repartitionByRange( numPartitions: Option[Int], - partitionExprs: Seq[Column]): Dataset[T] = withOrigin { + partitionExprs: Seq[Column]): Dataset[T] = { require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { case expr: SortOrder => expr @@ -3837,10 +3743,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def coalesce(numPartitions: Int): Dataset[T] = withOrigin { - withTypedPlan { - Repartition(numPartitions, shuffle = false, logicalPlan) - } + def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan { + Repartition(numPartitions, shuffle = false, logicalPlan) } /** @@ -3865,7 +3769,10 @@ class Dataset[T] private[sql]( * @group basic * @since 1.6.0 */ - def persist(): this.type = persist(sparkSession.sessionState.conf.defaultCacheStorageLevel) + def persist(): this.type = { + sparkSession.sharedState.cacheManager.cacheQuery(this) + this + } /** * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`). @@ -3984,10 +3891,8 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @throws[AnalysisException] - def createTempView(viewName: String): Unit = withOrigin { - withPlan { - createTempViewCommand(viewName, replace = false, global = false) - } + def createTempView(viewName: String): Unit = withPlan { + createTempViewCommand(viewName, replace = false, global = false) } @@ -3999,10 +3904,8 @@ class Dataset[T] private[sql]( * @group basic * @since 2.0.0 */ - def createOrReplaceTempView(viewName: String): Unit = withOrigin { - withPlan { - createTempViewCommand(viewName, replace = true, global = false) - } + def createOrReplaceTempView(viewName: String): Unit = withPlan { + createTempViewCommand(viewName, replace = true, global = false) } /** @@ -4020,10 +3923,8 @@ class Dataset[T] private[sql]( * @since 2.1.0 */ @throws[AnalysisException] - def createGlobalTempView(viewName: String): Unit = withOrigin { - withPlan { - createTempViewCommand(viewName, replace = false, global = true) - } + def createGlobalTempView(viewName: String): Unit = withPlan { + createTempViewCommand(viewName, replace = false, global = true) } /** @@ -4431,7 +4332,7 @@ class Dataset[T] private[sql]( plan.executeCollect().map(fromRow) } - private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = withOrigin { + private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = { val sortOrder: Seq[SortOrder] = sortExprs.map { col => col.expr match { case expr: SortOrder => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 094bc3c8a6901..4c2ccb27eab20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -43,8 +43,8 @@ class KeyValueGroupedDataset[K, V] private[sql]( private val groupingAttributes: Seq[Attribute]) extends Serializable { // Similar to [[Dataset]], we turn the passed in encoder to `ExpressionEncoder` explicitly. - private implicit val kExprEnc: ExpressionEncoder[K] = encoderFor(kEncoder) - private implicit val vExprEnc: ExpressionEncoder[V] = encoderFor(vEncoder) + private implicit val kExprEnc = encoderFor(kEncoder) + private implicit val vExprEnc = encoderFor(vEncoder) private def logicalPlan = queryExecution.analyzed private def sparkSession = queryExecution.sparkSession diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 76613e28f8ee2..a72e4c339af8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -22,7 +22,7 @@ import java.util.Locale import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.SparkUpgradeException @@ -55,7 +55,7 @@ object DataSourceUtils extends PredicateHelper { /** * Utility methods for converting partitionBy columns to options and back. */ - private implicit val formats: Formats = Serialization.formats(NoTypeHints) + private implicit val formats = Serialization.formats(NoTypeHints) def encodePartitioningColumns(columns: Seq[String]): String = { Serialization.write(columns) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala index 2d79d7117bef3..d4c535fe76a3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala @@ -204,7 +204,7 @@ class ApplyInPandasWithStatePythonRunner( STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER) stateMetadataBatch.rowIterator().asScala.take(numRows).flatMap { row => - implicit val formats: Formats = org.json4s.DefaultFormats + implicit val formats = org.json4s.DefaultFormats // NOTE: See ApplyInPandasWithStatePythonRunner.STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER // for the schema. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala index 4bdec89182911..ad7c59bbd9f52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets._ import scala.io.{Source => IOSource} -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession @@ -82,7 +82,7 @@ case class CommitMetadata(nextBatchWatermarkMs: Long = 0) { } object CommitMetadata { - implicit val format: Formats = Serialization.formats(NoTypeHints) + implicit val format = Serialization.formats(NoTypeHints) def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index fa1beb9d15c75..140367b3236ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -24,7 +24,7 @@ import scala.io.{Source => IOSource} import scala.reflect.ClassTag import org.apache.hadoop.fs.Path -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession @@ -49,10 +49,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( import CompactibleFileStreamLog._ - private implicit val formats: Formats = Serialization.formats(NoTypeHints) + private implicit val formats = Serialization.formats(NoTypeHints) /** Needed to serialize type T into JSON when using Jackson */ - @scala.annotation.nowarn private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass) protected val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index d8aa31be47972..94ba8b8aa5153 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.streaming import org.apache.hadoop.fs.FileStatus +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession @@ -88,6 +90,8 @@ class FileStreamSinkLog( _retentionMs: Option[Long] = None) extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) { + private implicit val formats = Serialization.formats(NoTypeHints) + protected override val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 42c72f426ea49..5fe9a39c91e0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -22,6 +22,9 @@ import java.util.Map.Entry import scala.collection.mutable +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry import org.apache.spark.sql.internal.SQLConf @@ -48,6 +51,8 @@ class FileStreamSourceLog( protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSourceLogDeletion + private implicit val formats = Serialization.formats(NoTypeHints) + // A fixed size log entry cache to cache the file entries belong to the compaction batch. It is // used to avoid scanning the compacted log file to retrieve it's own batch data. private val cacheSize = compactInterval diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala index ba79c77f38677..a2b49d944a688 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming import scala.util.control.Exception._ -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization /** @@ -34,7 +34,7 @@ case class FileStreamSourceOffset(logOffset: Long) extends Offset { } object FileStreamSourceOffset { - implicit val format: Formats = Serialization.formats(NoTypeHints) + implicit val format = Serialization.formats(NoTypeHints) def apply(offset: Offset): FileStreamSourceOffset = { offset match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index c9ade7b568e82..3af9c9aebf33d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -246,7 +246,7 @@ private[sql] object GroupStateImpl { } def fromJson[S](value: Option[S], json: JValue): GroupStateImpl[S] = { - implicit val formats: Formats = org.json4s.DefaultFormats + implicit val formats = org.json4s.DefaultFormats val hmap = json.extract[Map[String, Any]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index c9363f3a773e5..9a811db679d01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import org.apache.commons.io.IOUtils import org.apache.hadoop.fs._ -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging @@ -49,10 +49,9 @@ import org.apache.spark.sql.internal.SQLConf class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String) extends MetadataLog[T] with Logging { - private implicit val formats: Formats = Serialization.formats(NoTypeHints) + private implicit val formats = Serialization.formats(NoTypeHints) /** Needed to serialize type T into JSON when using Jackson */ - @scala.annotation.nowarn private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass) // Avoid serializing generic sequences, see SPARK-17372 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index 9aff4b1dc5e76..913805d1a074d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging @@ -88,7 +88,7 @@ case class OffsetSeqMetadata( } object OffsetSeqMetadata extends Logging { - private implicit val format: Formats = Serialization.formats(NoTypeHints) + private implicit val format = Serialization.formats(NoTypeHints) /** * These configs are related to streaming query execution and should not be changed across * batches of a streaming query. The values of these configs are persisted into the offset diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala index 978cb3c34f606..cb18988b46872 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path} -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging @@ -45,7 +45,7 @@ case class StreamMetadata(id: String) { } object StreamMetadata extends Logging { - implicit val format: Formats = Serialization.formats(NoTypeHints) + implicit val format = Serialization.formats(NoTypeHints) /** Read the metadata from file if it exists */ def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala index 1005502b6febc..368dfae0cc95e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala @@ -25,7 +25,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ListBuffer -import org.json4s.{DefaultFormats, Formats, NoTypeHints} +import org.json4s.{DefaultFormats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.SparkEnv @@ -286,6 +286,6 @@ class TextSocketContinuousPartitionReader( } case class TextSocketOffset(offsets: List[Int]) extends Offset { - private implicit val formats: Formats = Serialization.formats(NoTypeHints) + private implicit val formats = Serialization.formats(NoTypeHints) override def json: String = Serialization.write(offsets) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala index 1e3d05aa8640d..d0cf602c7cca2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala @@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ListBuffer -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.{SparkEnv, TaskContext} @@ -46,7 +46,7 @@ import org.apache.spark.util.RpcUtils class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2) extends MemoryStreamBase[A](sqlContext) with ContinuousStream { - private implicit val formats: Formats = Serialization.formats(NoTypeHints) + private implicit val formats = Serialization.formats(NoTypeHints) // ContinuousReader implementation @@ -182,6 +182,6 @@ class ContinuousMemoryStreamPartitionReader( case class ContinuousMemoryStreamOffset(partitionNums: Map[Int, Int]) extends Offset { - private implicit val formats: Formats = Serialization.formats(NoTypeHints) + private implicit val formats = Serialization.formats(NoTypeHints) override def json(): String = Serialization.write(partitionNums) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala index 8dca7d40704ad..6954e4534e494 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming.sources -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging @@ -130,7 +130,7 @@ case class RatePerMicroBatchStreamOffset(offset: Long, timestamp: Long) extends } object RatePerMicroBatchStreamOffset { - implicit val formats: Formats = Serialization.formats(NoTypeHints) + implicit val formats = Serialization.formats(NoTypeHints) def apply(json: String): RatePerMicroBatchStreamOffset = Serialization.read[RatePerMicroBatchStreamOffset](json) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala deleted file mode 100644 index b58c805af9d60..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.state - -import java.io.{BufferedReader, InputStreamReader} -import java.nio.charset.StandardCharsets - -import scala.reflect.ClassTag - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataOutputStream, Path} -import org.json4s.{Formats, NoTypeHints} -import org.json4s.jackson.Serialization - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MetadataVersionUtil} - -/** - * Metadata for a state store instance. - */ -trait StateStoreMetadata { - def storeName: String - def numColsPrefixKey: Int - def numPartitions: Int -} - -case class StateStoreMetadataV1(storeName: String, numColsPrefixKey: Int, numPartitions: Int) - extends StateStoreMetadata - -/** - * Information about a stateful operator. - */ -trait OperatorInfo { - def operatorId: Long - def operatorName: String -} - -case class OperatorInfoV1(operatorId: Long, operatorName: String) extends OperatorInfo - -trait OperatorStateMetadata { - def version: Int -} - -case class OperatorStateMetadataV1( - operatorInfo: OperatorInfoV1, - stateStoreInfo: Array[StateStoreMetadataV1]) extends OperatorStateMetadata { - override def version: Int = 1 -} - -object OperatorStateMetadataV1 { - - private implicit val formats: Formats = Serialization.formats(NoTypeHints) - - @scala.annotation.nowarn - private implicit val manifest = Manifest - .classType[OperatorStateMetadataV1](implicitly[ClassTag[OperatorStateMetadataV1]].runtimeClass) - - def metadataFilePath(stateCheckpointPath: Path): Path = - new Path(new Path(stateCheckpointPath, "_metadata"), "metadata") - - def deserialize(in: BufferedReader): OperatorStateMetadata = { - Serialization.read[OperatorStateMetadataV1](in) - } - - def serialize( - out: FSDataOutputStream, - operatorStateMetadata: OperatorStateMetadata): Unit = { - Serialization.write(operatorStateMetadata.asInstanceOf[OperatorStateMetadataV1], out) - } -} - -/** - * Write OperatorStateMetadata into the state checkpoint directory. - */ -class OperatorStateMetadataWriter(stateCheckpointPath: Path, hadoopConf: Configuration) - extends Logging { - - private val metadataFilePath = OperatorStateMetadataV1.metadataFilePath(stateCheckpointPath) - - private lazy val fm = CheckpointFileManager.create(stateCheckpointPath, hadoopConf) - - def write(operatorMetadata: OperatorStateMetadata): Unit = { - if (fm.exists(metadataFilePath)) return - - fm.mkdirs(metadataFilePath.getParent) - val outputStream = fm.createAtomic(metadataFilePath, overwriteIfPossible = false) - try { - outputStream.write(s"v${operatorMetadata.version}\n".getBytes(StandardCharsets.UTF_8)) - OperatorStateMetadataV1.serialize(outputStream, operatorMetadata) - outputStream.close() - } catch { - case e: Throwable => - logError(s"Fail to write state metadata file to $metadataFilePath", e) - outputStream.cancel() - throw e - } - } -} - -/** - * Read OperatorStateMetadata from the state checkpoint directory. - */ -class OperatorStateMetadataReader(stateCheckpointPath: Path, hadoopConf: Configuration) { - - private val metadataFilePath = OperatorStateMetadataV1.metadataFilePath(stateCheckpointPath) - - private lazy val fm = CheckpointFileManager.create(stateCheckpointPath, hadoopConf) - - def read(): OperatorStateMetadata = { - val inputStream = fm.open(metadataFilePath) - val inputReader = - new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) - try { - val versionStr = inputReader.readLine() - val version = MetadataVersionUtil.validateVersion(versionStr, 1) - assert(version == 1) - OperatorStateMetadataV1.deserialize(inputReader) - } finally { - inputStream.close() - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index c20da583efa51..0c9738a6b0817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -26,7 +26,7 @@ import scala.ref.WeakReference import scala.util.Try import org.apache.hadoop.conf.Configuration -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.rocksdb.{RocksDB => NativeRocksDB, _} import org.rocksdb.TickerType._ @@ -863,7 +863,7 @@ case class RocksDBMetrics( } object RocksDBMetrics { - val format: Formats = Serialization.formats(NoTypeHints) + val format = Serialization.formats(NoTypeHints) } /** Class to wrap RocksDB's native histogram */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 6181d216bd36f..300a3b8137b4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -34,7 +34,7 @@ import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModul import org.apache.commons.io.{FilenameUtils, IOUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} -import org.json4s.{Formats, NoTypeHints} +import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.{SparkConf, SparkEnv} @@ -737,7 +737,7 @@ case class RocksDBCheckpointMetadata( object RocksDBCheckpointMetadata { val VERSION = 1 - implicit val format: Formats = Serialization.formats(NoTypeHints) + implicit val format = Serialization.formats(NoTypeHints) /** Used to convert between classes and JSON. */ lazy val mapper = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index 89dae16e64e02..e9daa825dd46c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -231,7 +231,7 @@ case class FooAgg(s: Int) extends Aggregator[Row, Int, Int] { class DatasetAggregatorSuite extends QueryTest with SharedSparkSession { import testImplicits._ - private implicit val ordering: Ordering[AggData] = Ordering.by((c: AggData) => c.a -> c.b) + private implicit val ordering = Ordering.by((c: AggData) => c.a -> c.b) test("typed aggregation: complex result type") { val ds = Seq("a" -> 1, "a" -> 3, "b" -> 3).toDS() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 905206156b3aa..c2fe31520acf6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -67,7 +67,7 @@ class DatasetSuite extends QueryTest with AdaptiveSparkPlanHelper { import testImplicits._ - private implicit val ordering: Ordering[ClassData] = Ordering.by((c: ClassData) => c.a -> c.b) + private implicit val ordering = Ordering.by((c: ClassData) => c.a -> c.b) test("checkAnswer should compare map correctly") { val data = Seq((1, "2", Map(1 -> 2, 2 -> 1))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index c59e91304c7fa..52b740bc5c34f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -21,7 +21,7 @@ import java.util.UUID import scala.collection.mutable -import org.scalactic.{Equality, TolerantNumerics} +import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.Waiters.Waiter @@ -44,7 +44,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { import testImplicits._ // To make === between double tolerate inexact values - implicit val doubleEquality: Equality[Double] = TolerantNumerics.tolerantDoubleEquality(0.01) + implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01) after { spark.streams.active.foreach(_.stop()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index a84690b5b97bb..8565056cda6fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -29,7 +29,7 @@ import org.apache.commons.io.FileUtils import org.apache.commons.lang3.RandomStringUtils import org.apache.hadoop.fs.Path import org.mockito.Mockito.when -import org.scalactic.{Equality, TolerantNumerics} +import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatestplus.mockito.MockitoSugar @@ -60,7 +60,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi import testImplicits._ // To make === between double tolerate inexact values - implicit val doubleEquality: Equality[Double] = TolerantNumerics.tolerantDoubleEquality(0.01) + implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01) after { sqlContext.streams.active.foreach(_.stop()) diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala index 604b5a9ae9f53..c63c748953f1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala @@ -47,7 +47,7 @@ class SqlResourceWithActualMetricsSuite // Exclude nodes which may not have the metrics val excludedNodes = List("WholeStageCodegen", "Project", "SerializeFromObject") - implicit val formats: DefaultFormats = new DefaultFormats { + implicit val formats = new DefaultFormats { override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss") } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 177c9b153e8ae..b421a94a06c40 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -26,7 +26,7 @@ import java.util.{Locale, UUID} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.duration._ import scala.io.Source import scala.util.Try @@ -441,7 +441,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftServer2Test { s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") queries.foreach(statement.execute) - implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService( + implicit val ec = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonSingleThreadExecutor("test-jdbc-cancel")) try { // Start a very-long-running query that will take hours to finish, then cancel it in order diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala index 9681245baa0fe..92cfd7d40338c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala @@ -89,5 +89,5 @@ case class Time(private val millis: Long) { } object Time { - implicit val ordering: Ordering[Time] = Ordering.by((time: Time) => time.millis) + implicit val ordering = Ordering.by((time: Time) => time.millis) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 54bab53a747ac..7a561ecb4990f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.receiver -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import org.apache.hadoop.conf.Configuration @@ -159,8 +159,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // For processing futures used in parallel block storing into block manager and write ahead log // # threads = 2, so that both writing to BM and WAL can proceed in parallel - implicit private val executionContext: ExecutionContextExecutorService = ExecutionContext - .fromExecutorService(ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName)) + implicit private val executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName)) /** * This implementation stores the block into the block manager as well as a write ahead log.