Skip to content

Commit

Permalink
Revert "[SPARK-45629][CORE][SQL][CONNECT][ML][STREAMING][BUILD][EXAMP…
Browse files Browse the repository at this point in the history
…LES] Fix `Implicit definition should have explicit type`"

This reverts commit bc67612.
  • Loading branch information
jiyong-lee-dev committed Apr 29, 2024
1 parent bc67612 commit 51e249e
Show file tree
Hide file tree
Showing 78 changed files with 455 additions and 906 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

import org.apache.kafka.common.TopicPartition
import org.json4s.{Formats, NoTypeHints}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

/**
* Utilities for converting Kafka related objects to and from json.
*/
private object JsonUtils {
private implicit val formats: Formats = Serialization.formats(NoTypeHints)
private implicit val formats = Serialization.formats(NoTypeHints)

/**
* Read TopicPartitions from json string
Expand Down Expand Up @@ -96,8 +96,10 @@ private object JsonUtils {
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
implicit val order: Ordering[TopicPartition] = (x: TopicPartition, y: TopicPartition) => {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
implicit val order = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.sys.process._

import org.json4s.Formats
import org.json4s.jackson.JsonMethods

import org.apache.spark.{SparkConf, SparkContext}
Expand Down Expand Up @@ -341,7 +340,7 @@ private object FaultToleranceTest extends App with Logging {
private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats: Formats = org.json4s.DefaultFormats
implicit val formats = org.json4s.DefaultFormats
var state: RecoveryState.Value = _
var liveWorkerIPs: List[String] = _
var numLiveApps = 0
Expand Down Expand Up @@ -384,7 +383,7 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile
private class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats: Formats = org.json4s.DefaultFormats
implicit val formats = org.json4s.DefaultFormats

logDebug("Created worker: " + this)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.file.Files
import scala.collection.mutable
import scala.util.control.NonFatal

import org.json4s.{DefaultFormats, Extraction, Formats}
import org.json4s.{DefaultFormats, Extraction}
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.SparkException
Expand Down Expand Up @@ -114,7 +114,7 @@ private[spark] object StandaloneResourceUtils extends Logging {
private def writeResourceAllocationJson[T](
allocations: Seq[T],
jsonFile: File): Unit = {
implicit val formats: Formats = DefaultFormats
implicit val formats = DefaultFormats
val allocationJson = Extraction.decompose(allocations)
Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import io.netty.util.internal.PlatformDependent
import org.json4s.DefaultFormats

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
Expand Down Expand Up @@ -59,6 +60,8 @@ private[spark] class CoarseGrainedExecutorBackend(

import CoarseGrainedExecutorBackend._

private implicit val formats = DefaultFormats

private[spark] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.resource

import scala.util.control.NonFatal

import org.json4s.{DefaultFormats, Extraction, Formats, JValue}
import org.json4s.{DefaultFormats, Extraction, JValue}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkException
Expand Down Expand Up @@ -69,7 +69,7 @@ private[spark] object ResourceInformation {
* Parses a JSON string into a [[ResourceInformation]] instance.
*/
def parseJson(json: String): ResourceInformation = {
implicit val formats: Formats = DefaultFormats
implicit val formats = DefaultFormats
try {
parse(json).extract[ResourceInformationJson].toResourceInformation
} catch {
Expand All @@ -80,7 +80,7 @@ private[spark] object ResourceInformation {
}

def parseJson(json: JValue): ResourceInformation = {
implicit val formats: Formats = DefaultFormats
implicit val formats = DefaultFormats
try {
json.extract[ResourceInformationJson].toResourceInformation
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Optional

import scala.util.control.NonFatal

import org.json4s.{DefaultFormats, Formats}
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{SparkConf, SparkException}
Expand Down Expand Up @@ -252,7 +252,7 @@ private[spark] object ResourceUtils extends Logging {

def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = {
withResourcesJson[ResourceAllocation](resourcesFile) { json =>
implicit val formats: Formats = DefaultFormats
implicit val formats = DefaultFormats
parse(json).extract[Seq[ResourceAllocation]]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] {

private[spark] class AppStatusSource extends Source {

override implicit val metricRegistry: MetricRegistry = new MetricRegistry()
override implicit val metricRegistry = new MetricRegistry()

override val sourceName = "appStatus"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import java.io.IOException
import java.util.{HashMap => JHashMap}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, TimeoutException}
import scala.jdk.CollectionConverters._
import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.util.Random
import scala.util.control.NonFatal

import com.google.common.cache.CacheBuilder

import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED
Expand All @@ -40,7 +40,6 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseG
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
import org.apache.spark.util.ArrayImplicits._

/**
* BlockManagerMasterEndpoint is an [[IsolatedThreadSafeRpcEndpoint]] on the master node to
Expand All @@ -55,15 +54,10 @@ class BlockManagerMasterEndpoint(
externalBlockStoreClient: Option[ExternalBlockStoreClient],
blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],
mapOutputTracker: MapOutputTrackerMaster,
private val _shuffleManager: ShuffleManager,
shuffleManager: ShuffleManager,
isDriver: Boolean)
extends IsolatedThreadSafeRpcEndpoint with Logging {

// We initialize the ShuffleManager later in SparkContext and Executor, to allow
// user jars to define custom ShuffleManagers, as such `_shuffleManager` will be null here
// (except for tests) and we ask for the instance from the SparkEnv.
private lazy val shuffleManager = Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)

// Mapping from executor id to the block manager's local disk directories.
private val executorIdToLocalDirs =
CacheBuilder
Expand Down Expand Up @@ -100,8 +94,7 @@ class BlockManagerMasterEndpoint(

private val askThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
private implicit val askExecutionContext: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(askThreadPool)
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)

private val topologyMapper = {
val topologyMapperClassName = conf.get(
Expand Down Expand Up @@ -884,7 +877,7 @@ class BlockManagerMasterEndpoint(

private def getLocationsMultipleBlockIds(
blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
blockIds.map(blockId => getLocations(blockId)).toImmutableArraySeq
blockIds.map(blockId => getLocations(blockId))
}

/** Get the list of the peers of the given block manager */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.{MapOutputTracker, SparkEnv}
import org.apache.spark.internal.Logging
Expand All @@ -38,8 +38,7 @@ class BlockManagerStorageEndpoint(

private val asyncThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-storage-async-thread-pool", 100)
private implicit val asyncExecutionContext: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(asyncThreadPool)
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)

// Operations that involve removing blocks may be slow and should be done asynchronously
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private[spark] object ThreadUtils {
def parmap[I, O](in: Seq[I], prefix: String, maxThreads: Int)(f: I => O): Seq[O] = {
val pool = newForkJoinPool(prefix, maxThreads)
try {
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(pool)
implicit val ec = ExecutionContext.fromExecutor(pool)

val futures = in.map(x => Future(f(x)))
val futureSeq = Future.sequence(futures)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.storage._
abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
{
implicit val defaultTimeout: PatienceConfiguration.Timeout = timeout(10.seconds)
implicit val defaultTimeout = timeout(10.seconds)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.function.Supplier

import scala.concurrent.duration._

import org.json4s.{DefaultFormats, Extraction, Formats}
import org.json4s.{DefaultFormats, Extraction}
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
Expand Down Expand Up @@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
}
def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)

implicit val formats: Formats = DefaultFormats
implicit val formats = DefaultFormats

private var _worker: Worker = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._

import org.json4s.{DefaultFormats, Extraction, Formats}
import org.json4s.{DefaultFormats, Extraction}
import org.json4s.JsonAST.{JArray, JObject}
import org.json4s.JsonDSL._
import org.mockito.ArgumentMatchers.any
Expand All @@ -50,7 +50,7 @@ import org.apache.spark.util.{SerializableBuffer, ThreadUtils, Utils}
class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
with LocalSparkContext with MockitoSugar {

implicit val formats: Formats = DefaultFormats
implicit val formats = DefaultFormats

test("parsing no resources") {
val conf = new SparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.memory
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.mockito.ArgumentMatchers.{any, anyLong}
Expand Down Expand Up @@ -148,7 +148,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
// -- Tests of sharing of execution memory between tasks ----------------------------------------
// Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.

implicit val ec: ExecutionContextExecutor = ExecutionContext.global
implicit val ec = ExecutionContext.global

test("single task requesting on-heap execution memory") {
val manager = createMemoryManager(1000L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.storage

import java.util.Properties

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.{ExecutionContext, Future}
import scala.language.implicitConversions
import scala.reflect.ClassTag

Expand All @@ -31,7 +31,7 @@ import org.apache.spark.util.ThreadUtils

class BlockInfoManagerSuite extends SparkFunSuite {

private implicit val ec: ExecutionContextExecutor = ExecutionContext.global
private implicit val ec = ExecutionContext.global
private var blockInfoManager: BlockInfoManager = _

override protected def beforeEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers {

implicit var webDriver: WebDriver = _
implicit val formats: Formats = DefaultFormats
implicit val formats = DefaultFormats


override def beforeAll(): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.duration._

import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.{ThreadSignaler, TimeLimits}

import org.apache.spark.SparkFunSuite

class KeyLockSuite extends SparkFunSuite with TimeLimits {

// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
private implicit val defaultSignaler: Signaler = ThreadSignaler
private implicit val defaultSignaler = ThreadSignaler

private val foreverMs = 60 * 1000L

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.examples.sql

// $example on:programmatic_schema$
import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.Row
// $example off:programmatic_schema$
// $example on:init_session$
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -220,8 +220,7 @@ object SparkSQLExample {
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder: Encoder[Map[String, Any]] =
org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

Expand Down
4 changes: 2 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import breeze.linalg.normalize
import breeze.numerics.exp
import org.apache.hadoop.fs.Path
import org.json4s.{DefaultFormats, Formats}
import org.json4s.DefaultFormats
import org.json4s.JsonAST.JObject
import org.json4s.jackson.JsonMethods._

Expand Down Expand Up @@ -384,7 +384,7 @@ private object LDAParams {
def getAndSetParams(model: LDAParams, metadata: Metadata): Unit = {
VersionUtils.majorMinorVersion(metadata.sparkVersion) match {
case (1, 6) =>
implicit val format: Formats = DefaultFormats
implicit val format = DefaultFormats
metadata.params match {
case JObject(pairs) =>
pairs.foreach { case (paramName, jsonValue) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.ml.linalg

import org.json4s.{DefaultFormats, Formats}
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render}

Expand All @@ -29,7 +29,7 @@ private[ml] object JsonMatrixConverter {
* Parses the JSON representation of a Matrix into a [[Matrix]].
*/
def fromJson(json: String): Matrix = {
implicit val formats: Formats = DefaultFormats
implicit val formats = DefaultFormats
val jValue = parseJson(json)
(jValue \ "type").extract[Int] match {
case 0 => // sparse
Expand Down
Loading

0 comments on commit 51e249e

Please sign in to comment.