diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 238432d354f60..23a2cce596289 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -31,8 +31,11 @@ trait LogKey { */ object LogKeys { case object ACCUMULATOR_ID extends LogKey + case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey case object ACTUAL_NUM_FILES extends LogKey case object ACTUAL_PARTITION_COLUMN extends LogKey + case object ADDED_JARS extends LogKey + case object ADDED_JARS_MESSAGE extends LogKey case object AGGREGATE_FUNCTIONS extends LogKey case object ALPHA extends LogKey case object ANALYSIS_ERROR extends LogKey @@ -42,7 +45,10 @@ object LogKeys { case object APP_NAME extends LogKey case object APP_STATE extends LogKey case object ARGS extends LogKey + case object AUTH_ENABLED extends LogKey case object BACKUP_FILE extends LogKey + case object BARRIER_EPOCH extends LogKey + case object BARRIER_ID extends LogKey case object BATCH_ID extends LogKey case object BATCH_NAME extends LogKey case object BATCH_TIMESTAMP extends LogKey @@ -52,6 +58,7 @@ object LogKeys { case object BOOT extends LogKey case object BROADCAST extends LogKey case object BROADCAST_ID extends LogKey + case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey case object BUCKET extends LogKey case object BYTECODE_SIZE extends LogKey case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey @@ -59,6 +66,7 @@ object LogKeys { case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey case object CALL_SITE_LONG_FORM extends LogKey + case object CALL_SITE_SHORT_FORM extends LogKey case object CATALOG_NAME extends LogKey case object CATEGORICAL_FEATURES extends LogKey case object CHECKPOINT_FILE extends LogKey @@ -118,6 +126,7 @@ object LogKeys { case object DATA_FILE_NUM extends LogKey case object DATA_SOURCE extends LogKey case object DATA_SOURCES extends LogKey + case object DEFAULT_COMPACTION_INTERVAL extends LogKey case object DEFAULT_COMPACT_INTERVAL extends LogKey case object DEFAULT_ISOLATION_LEVEL extends LogKey @@ -126,11 +135,13 @@ object LogKeys { case object DELEGATE extends LogKey case object DELTA extends LogKey case object DESCRIPTION extends LogKey + case object DESTINATION_ABSOLUTE_PATH extends LogKey case object DESIRED_NUM_PARTITIONS extends LogKey case object DFS_FILE extends LogKey case object DIFF_DELTA extends LogKey case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey case object DRIVER_ID extends LogKey + case object DRIVER_STATE extends LogKey case object DROPPED_PARTITIONS extends LogKey case object DURATION extends LogKey case object EARLIEST_LOADED_VERSION extends LogKey @@ -145,6 +156,7 @@ object LogKeys { case object ERROR extends LogKey case object ESTIMATOR_PARAMETER_MAP extends LogKey case object EVALUATED_FILTERS extends LogKey + case object EVENT extends LogKey case object EVENT_LOOP extends LogKey case object EVENT_QUEUE extends LogKey case object EXCEPTION extends LogKey @@ -162,7 +174,9 @@ object LogKeys { case object EXECUTOR_STATE extends LogKey case object EXECUTOR_TARGET_COUNT extends LogKey case object EXISTING_FILE extends LogKey + case object EXISTING_JARS extends LogKey case object EXIT_CODE extends LogKey + case object EXPECTED_ANSWER extends LogKey case object EXPECTED_NUM_FILES extends LogKey case object EXPECTED_PARTITION_COLUMN extends LogKey case object EXPIRY_TIMESTAMP extends LogKey @@ -185,6 +199,7 @@ object LogKeys { case object FILE_VERSION extends LogKey case object FILTER extends LogKey case object FILTERS extends LogKey + case object FINAL_OUTPUT_PATH extends LogKey case object FINAL_PATH extends LogKey case object FINISH_TRIGGER_DURATION extends LogKey case object FROM_OFFSET extends LogKey @@ -218,6 +233,7 @@ object LogKeys { case object INPUT extends LogKey case object INTERVAL extends LogKey case object ISOLATION_LEVEL extends LogKey + case object JAVA_VERSION extends LogKey case object JOB_ID extends LogKey case object JOIN_CONDITION extends LogKey case object JOIN_CONDITION_SUB_EXPR extends LogKey @@ -245,10 +261,12 @@ object LogKeys { case object LOGICAL_PLAN_COLUMNS extends LogKey case object LOGICAL_PLAN_LEAVES extends LogKey case object LOG_ID extends LogKey + case object LOG_KEY_FILE extends LogKey case object LOG_OFFSET extends LogKey case object LOG_TYPE extends LogKey case object LOWER_BOUND extends LogKey case object MALFORMATTED_STIRNG extends LogKey + case object MAP_ID extends LogKey case object MASTER_URL extends LogKey case object MAX_ATTEMPTS extends LogKey case object MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey @@ -282,11 +300,14 @@ object LogKeys { case object NEW_FEATURE_COLUMN_NAME extends LogKey case object NEW_LABEL_COLUMN_NAME extends LogKey case object NEW_PATH extends LogKey + case object NEW_RDD_ID extends LogKey case object NEW_VALUE extends LogKey case object NODES extends LogKey case object NODE_LOCATION extends LogKey case object NORM extends LogKey + case object NUM_ADDED_MASTERS extends LogKey case object NUM_ADDED_PARTITIONS extends LogKey + case object NUM_ADDED_WORKERS extends LogKey case object NUM_BIN extends LogKey case object NUM_BYTES extends LogKey case object NUM_CLASSES extends LogKey @@ -294,6 +315,7 @@ object LogKeys { case object NUM_CONCURRENT_WRITER extends LogKey case object NUM_DROPPED_PARTITIONS extends LogKey case object NUM_EXAMPLES extends LogKey + case object NUM_FAILED_TESTS extends LogKey case object NUM_FEATURES extends LogKey case object NUM_FILES extends LogKey case object NUM_FILES_COPIED extends LogKey @@ -304,13 +326,16 @@ object LogKeys { case object NUM_LEFT_PARTITION_VALUES extends LogKey case object NUM_LOADED_ENTRIES extends LogKey case object NUM_LOCAL_FREQUENT_PATTERN extends LogKey + case object NUM_PASSED_TESTS extends LogKey case object NUM_PARTITIONS extends LogKey case object NUM_PARTITION_VALUES extends LogKey case object NUM_POINT extends LogKey case object NUM_PREFIXES extends LogKey case object NUM_PRUNED extends LogKey case object NUM_RIGHT_PARTITION_VALUES extends LogKey + case object NUM_REQUEST_SYNC_TASK extends LogKey case object NUM_SEQUENCES extends LogKey + case object NUM_TESTS extends LogKey case object NUM_VERSIONS_RETAIN extends LogKey case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey case object OBJECT_ID extends LogKey @@ -325,7 +350,12 @@ object LogKeys { case object OPTIONS extends LogKey case object OP_ID extends LogKey case object OP_TYPE extends LogKey + case object OS_ARCH extends LogKey + case object OS_NAME extends LogKey + case object OS_VERSION extends LogKey case object OUTPUT extends LogKey + case object OUTPUT_LINE extends LogKey + case object OUTPUT_LINE_NUMBER extends LogKey case object OVERHEAD_MEMORY_SIZE extends LogKey case object PARSE_MODE extends LogKey case object PARTITIONED_FILE_READER extends LogKey @@ -357,6 +387,9 @@ object LogKeys { case object PUSHED_FILTERS extends LogKey case object PVC_METADATA_NAME extends LogKey case object PYTHON_EXEC extends LogKey + case object PYTHON_VERSION extends LogKey + case object PYTHON_WORKER_MODULE extends LogKey + case object PYTHON_WORKER_RESPONSE extends LogKey case object QUERY_CACHE_VALUE extends LogKey case object QUERY_HINT extends LogKey case object QUERY_ID extends LogKey @@ -366,7 +399,9 @@ object LogKeys { case object QUERY_PLAN_LENGTH_MAX extends LogKey case object QUERY_RUN_ID extends LogKey case object RANGE extends LogKey + case object RDD_CHECKPOINT_DIR extends LogKey case object RDD_ID extends LogKey + case object RDD_DEBUG_STRING extends LogKey case object READ_LIMIT extends LogKey case object REASON extends LogKey case object REATTACHABLE extends LogKey @@ -382,6 +417,7 @@ object LogKeys { case object RELATIVE_TOLERANCE extends LogKey case object REMAINING_PARTITIONS extends LogKey case object REPORT_DETAILS extends LogKey + case object REQUESTER_SIZE extends LogKey case object RESOURCE extends LogKey case object RESOURCE_NAME extends LogKey case object RESOURCE_PROFILE_ID extends LogKey @@ -399,22 +435,30 @@ object LogKeys { case object RUN_ID extends LogKey case object SCHEMA extends LogKey case object SCHEMA2 extends LogKey + case object SERIALIZE_OUTPUT_LENGTH extends LogKey + case object SERVER_NAME extends LogKey case object SERVICE_NAME extends LogKey + case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey case object SESSION_HOLD_INFO extends LogKey case object SESSION_ID extends LogKey case object SESSION_KEY extends LogKey case object SHARD_ID extends LogKey case object SHUFFLE_BLOCK_INFO extends LogKey + case object SHUFFLE_DB_BACKEND_KEY extends LogKey + case object SHUFFLE_DB_BACKEND_NAME extends LogKey case object SHUFFLE_ID extends LogKey case object SHUFFLE_MERGE_ID extends LogKey case object SHUFFLE_SERVICE_NAME extends LogKey case object SIZE extends LogKey + case object SIGNAL extends LogKey case object SLEEP_TIME extends LogKey case object SLIDE_DURATION extends LogKey case object SMALLEST_CLUSTER_INDEX extends LogKey case object SNAPSHOT_VERSION extends LogKey + case object SOURCE_ABSOLUTE_PATH extends LogKey case object SPARK_DATA_STREAM extends LogKey case object SPARK_PLAN_ID extends LogKey + case object SPARK_VERSION extends LogKey case object SQL_TEXT extends LogKey case object SRC_PATH extends LogKey case object STAGE_ATTEMPT extends LogKey @@ -426,6 +470,7 @@ object LogKeys { case object STATE_STORE_VERSION extends LogKey case object STATUS extends LogKey case object STDERR extends LogKey + case object STOP_SITE_SHORT_FORM extends LogKey case object STORAGE_LEVEL extends LogKey case object STORAGE_LEVEL_DESERIALIZED extends LogKey case object STORAGE_LEVEL_REPLICATION extends LogKey @@ -440,11 +485,14 @@ object LogKeys { case object STREAMING_WRITE extends LogKey case object STREAM_ID extends LogKey case object STREAM_NAME extends LogKey + case object STREAM_SOURCE extends LogKey case object SUBMISSION_ID extends LogKey case object SUBSAMPLING_RATE extends LogKey case object SUB_QUERY extends LogKey case object TABLE_NAME extends LogKey case object TABLE_TYPES extends LogKey + case object TARGET_NUM_EXECUTOR extends LogKey + case object TARGET_NUM_EXECUTOR_DELTA extends LogKey case object TARGET_PATH extends LogKey case object TASK_ATTEMPT_ID extends LogKey case object TASK_ID extends LogKey @@ -452,7 +500,9 @@ object LogKeys { case object TASK_SET_NAME extends LogKey case object TASK_STATE extends LogKey case object TEMP_FILE extends LogKey + case object TEMP_OUTPUT_PATH extends LogKey case object TEMP_PATH extends LogKey + case object TEST_NAME extends LogKey case object TEST_SIZE extends LogKey case object THREAD extends LogKey case object THREAD_NAME extends LogKey @@ -460,6 +510,7 @@ object LogKeys { case object TIME extends LogKey case object TIMEOUT extends LogKey case object TIMER extends LogKey + case object TIMESTAMP extends LogKey case object TIME_UNITS extends LogKey case object TIP extends LogKey case object TOKEN_REGEX extends LogKey @@ -499,7 +550,9 @@ object LogKeys { case object WAIT_SEND_TIME extends LogKey case object WAIT_TIME extends LogKey case object WATERMARK_CONSTRAINT extends LogKey + case object WEB_URL extends LogKey case object WEIGHTED_NUM extends LogKey + case object WORKER_ID extends LogKey case object WORKER_URL extends LogKey case object WRITE_AHEAD_LOG_INFO extends LogKey case object WRITE_JOB_UUID extends LogKey diff --git a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala index 942242107e22f..adce6c3f5ffdb 100644 --- a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala @@ -23,7 +23,8 @@ import java.util.function.Consumer import scala.collection.mutable.{ArrayBuffer, HashSet} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys._ import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} import org.apache.spark.util.ThreadUtils @@ -161,7 +162,8 @@ private[spark] class BarrierCoordinator( s"${request.numTasks} from Task $taskId, previously it was $numTasks.") // Check whether the epoch from the barrier tasks matches current barrierEpoch. - logInfo(s"Current barrier epoch for $barrierId is $barrierEpoch.") + logInfo(log"Current barrier epoch for ${MDC(BARRIER_ID, barrierId)}" + + log" is ${MDC(BARRIER_EPOCH, barrierEpoch)}.") if (epoch != barrierEpoch) { requester.sendFailure(new SparkException(s"The request to sync of $barrierId with " + s"barrier epoch $barrierEpoch has already finished. Maybe task $taskId is not " + @@ -176,14 +178,17 @@ private[spark] class BarrierCoordinator( // Add the requester to array of RPCCallContexts pending for reply. requesters += requester messages(request.partitionId) = request.message - logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received update from Task " + - s"$taskId, current progress: ${requesters.size}/$numTasks.") + logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" + + log" from ${MDC(BARRIER_ID, barrierId)} received update from Task" + + log" ${MDC(TASK_ID, taskId)}, current progress:" + + log" ${MDC(REQUESTER_SIZE, requesters.size)}/${MDC(NUM_REQUEST_SYNC_TASK, numTasks)}.") if (requesters.size == numTasks) { requesters.foreach(_.reply(messages.clone())) // Finished current barrier() call successfully, clean up ContextBarrierState and // increase the barrier epoch. - logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received all updates from " + - s"tasks, finished successfully.") + logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" + + log" from ${MDC(BARRIER_ID, barrierId)} received all updates from" + + log" tasks, finished successfully.") barrierEpoch += 1 requesters.clear() requestMethods.clear() diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index e083ece918b63..dc01c328f9a64 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -19,14 +19,13 @@ package org.apache.spark import java.util.{Properties, TimerTask} import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit} - import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ -import scala.util.{Failure, Success => ScalaSuccess, Try} - +import scala.util.{Failure, Try, Success => ScalaSuccess} import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LogEntry, Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKeys._ import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.Source import org.apache.spark.resource.ResourceInformation @@ -56,19 +55,27 @@ class BarrierTaskContext private[spark] ( // with the driver side epoch. private var barrierEpoch = 0 + private def logProgressInfo(msg: MessageWithContext, startTime: Option[Long]): Unit = { + val waitMsg = startTime.fold(log"")(st => log", waited " + + log"for ${MDC(TOTAL_TIME, System.currentTimeMillis() - st)} ms,") + logInfo(log"Task ${MDC(TASK_ATTEMPT_ID, taskAttemptId())}" + + log" from Stage ${MDC(STAGE_ID, stageId())}" + + log"(Attempt ${MDC(STAGE_ATTEMPT, stageAttemptNumber())}) " + + msg + waitMsg + + log" current barrier epoch is ${MDC(BARRIER_EPOCH, barrierEpoch)}.") + } + private def runBarrier(message: String, requestMethod: RequestMethod.Value): Array[String] = { - logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " + - s"has entered the global sync, current barrier epoch is $barrierEpoch.") + logProgressInfo(log"has entered the global sync", None) logTrace("Current callSite: " + Utils.getCallSite()) val startTime = System.currentTimeMillis() val timerTask = new TimerTask { override def run(): Unit = { - logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " + - s"${stageAttemptNumber()}) waiting " + - s"under the global sync since $startTime, has been waiting for " + - s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " + - s"current barrier epoch is $barrierEpoch.") + logProgressInfo( + log"waiting under the global sync since ${MDC(TIME, startTime)}", + Some(startTime) + ) } } // Log the update of global sync every 1 minute. @@ -104,17 +111,11 @@ class BarrierTaskContext private[spark] ( val messages = abortableRpcFuture.future.value.get.get barrierEpoch += 1 - logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " + - s"finished global sync successfully, waited for " + - s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " + - s"current barrier epoch is $barrierEpoch.") + logProgressInfo(log"finished global sync successfully", Some(startTime)) messages } catch { case e: SparkException => - logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " + - s"${stageAttemptNumber()}) failed to perform global sync, waited for " + - s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " + - s"current barrier epoch is $barrierEpoch.") + logProgressInfo(log"failed to perform global sync", Some(startTime)) throw e } finally { timerTask.cancel() diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 94927caff1d78..d309ae6051a11 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -26,7 +26,8 @@ import scala.util.control.NonFatal import com.codahale.metrics.{Counter, Gauge, MetricRegistry} -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.internal.config.Tests.TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED @@ -445,10 +446,12 @@ private[spark] class ExecutorAllocationManager( val delta = targetNum.delta totalDelta += delta if (delta > 0) { - val executorsString = "executor" + { if (delta > 1) "s" else "" } - logInfo(s"Requesting $delta new $executorsString because tasks are backlogged " + - s"(new desired total will be ${numExecutorsTargetPerResourceProfileId(rpId)} " + - s"for resource profile id: ${rpId})") + val executorsString = log"executor" + { if (delta > 1) "s" else "" } + logInfo(log"Requesting ${MDC(TARGET_NUM_EXECUTOR_DELTA, delta)}" + + log" new $executorsString because tasks are backlogged " + + log"(new desired total will be" + + log" ${MDC(TARGET_NUM_EXECUTOR, numExecutorsTargetPerResourceProfileId(rpId))} " + + log"for resource profile id: ${MDC(RESOURCE_PROFILE_ID, rpId)})") numExecutorsToAddPerResourceProfileId(rpId) = if (delta == numExecutorsToAddPerResourceProfileId(rpId)) { numExecutorsToAddPerResourceProfileId(rpId) * 2 @@ -603,7 +606,8 @@ private[spark] class ExecutorAllocationManager( } else { executorMonitor.executorsKilled(executorsRemoved.toSeq) } - logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.") + logInfo(log"Executors ${MDC(EXECUTOR_IDS, executorsRemoved.mkString(","))}" + + log"removed due to idle timeout.") executorsRemoved.toSeq } else { logWarning(s"Unable to reach the cluster manager to kill executor/s " + diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index a323dc9fcb477..f892155016859 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -17,11 +17,10 @@ package org.apache.spark -import java.io.{ByteArrayInputStream, InputStream, IOException, ObjectInputStream, ObjectOutputStream} +import java.io.{ByteArrayInputStream, IOException, InputStream, ObjectInputStream, ObjectOutputStream} import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.locks.ReentrantReadWriteLock - import scala.collection import scala.collection.mutable.{HashMap, ListBuffer, Map} import scala.concurrent.{ExecutionContext, Future} @@ -29,12 +28,10 @@ import scala.concurrent.duration.Duration import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal - import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream} import org.roaringbitmap.RoaringBitmap - import org.apache.spark.broadcast.{Broadcast, BroadcastManager} -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec @@ -188,7 +185,8 @@ private class ShuffleStatus( val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_)) mapStatusOpt match { case Some(mapStatus) => - logInfo(s"Updating map output for ${mapId} to ${bmAddress}") + logInfo(log"Updating map output for ${MDC(MAP_ID, mapId)}" + + log" to ${MDC(BLOCK_MANAGER_ID, bmAddress)}") mapStatus.updateLocation(bmAddress) invalidateSerializedMapOutputStatusCache() case None => @@ -200,7 +198,8 @@ private class ShuffleStatus( _numAvailableMapOutputs += 1 invalidateSerializedMapOutputStatusCache() mapStatusesDeleted(index) = null - logInfo(s"Recover ${mapStatus.mapId} ${mapStatus.location}") + logInfo(log"Recover ${MDC(MAP_ID, mapStatus.mapId)}" + + log" ${MDC(BLOCK_MANAGER_ID, mapStatus.location)}") } else { logWarning(s"Asked to update map output ${mapId} for untracked map status.") } @@ -487,21 +486,24 @@ private[spark] class MapOutputTrackerMasterEndpoint( extends RpcEndpoint with Logging { logDebug("init") // force eager creation of logger + private def logInfoMsg(msg: MessageWithContext, shuffleId: Int, context: RpcCallContext): Unit = { + val hostPort = context.senderAddress.hostPort + logInfo(log"Asked to send " + + msg + + log" locations for shuffle ${MDC(SHUFFLE_ID, shuffleId)} to ${MDC(HOST_PORT, hostPort)}") + } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case GetMapOutputStatuses(shuffleId: Int) => - val hostPort = context.senderAddress.hostPort - logInfo(s"Asked to send map output locations for shuffle $shuffleId to $hostPort") + logInfoMsg(log"map output", shuffleId, context) tracker.post(GetMapOutputMessage(shuffleId, context)) case GetMapAndMergeResultStatuses(shuffleId: Int) => - val hostPort = context.senderAddress.hostPort - logInfo(s"Asked to send map/merge result locations for shuffle $shuffleId to $hostPort") + logInfoMsg(log"map/merge result", shuffleId, context) tracker.post(GetMapAndMergeOutputMessage(shuffleId, context)) case GetShufflePushMergerLocations(shuffleId: Int) => - logInfo(s"Asked to send shuffle push merger locations for shuffle" + - s" $shuffleId to ${context.senderAddress.hostPort}") + logInfoMsg(log"shuffle push merger", shuffleId, context) tracker.post(GetShufflePushMergersMessage(shuffleId, context)) case StopMapOutputTracker => @@ -1419,13 +1421,15 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr val mergeOutputStatuses = mergeStatuses.get(shuffleId).orNull if (mapOutputStatuses == null || mergeOutputStatuses == null) { - logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", fetching them") + logInfo(log"Don't have map/merge outputs for" + + log" shuffle ${MDC(SHUFFLE_ID, shuffleId)}, fetching them") val startTimeNs = System.nanoTime() fetchingLock.withLock(shuffleId) { var fetchedMapStatuses = mapStatuses.get(shuffleId).orNull var fetchedMergeStatuses = mergeStatuses.get(shuffleId).orNull if (fetchedMapStatuses == null || fetchedMergeStatuses == null) { - logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint) + logInfo(log"Doing the fetch; tracker endpoint = " + + log"${MDC(RPC_ENDPOINT_REF, trackerEndpoint)}") val fetchedBytes = askTracker[(Array[Byte], Array[Byte])](GetMapAndMergeResultStatuses(shuffleId)) try { @@ -1453,12 +1457,14 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } else { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { - logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") + logInfo(log"Don't have map outputs for shuffle ${MDC(SHUFFLE_ID, shuffleId)}," + + log" fetching them") val startTimeNs = System.nanoTime() fetchingLock.withLock(shuffleId) { var fetchedStatuses = mapStatuses.get(shuffleId).orNull if (fetchedStatuses == null) { - logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint) + logInfo(log"Doing the fetch; tracker endpoint =" + + log" ${MDC(RPC_ENDPOINT_REF, trackerEndpoint)}") val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId)) try { fetchedStatuses = @@ -1497,7 +1503,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr def updateEpoch(newEpoch: Long): Unit = { epochLock.synchronized { if (newEpoch > epoch) { - logInfo("Updating epoch to " + newEpoch + " and clearing cache") + logInfo(log"Updating epoch to ${MDC(EPOCH, newEpoch)} and clearing cache") epoch = newEpoch mapStatuses.clear() mergeStatuses.clear() @@ -1558,7 +1564,9 @@ private[spark] object MapOutputTracker extends Logging { oos.close() } val outArr = out.toByteArray - logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual size = " + arrSize) + logInfo(log"Broadcast outputstatuses size = " + + log"${MDC(BROADCAST_OUTPUT_STATUS_SIZE, outArr.length)}," + + log" actual size = ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, arrSize)}") (outArr, bcast) } else { (chunkedByteBuf.toArray, null) @@ -1591,8 +1599,10 @@ private[spark] object MapOutputTracker extends Logging { try { // deserialize the Broadcast, pull .value array out of it, and then deserialize that val bcast = deserializeObject(in).asInstanceOf[Broadcast[Array[Array[Byte]]]] - logInfo("Broadcast outputstatuses size = " + bytes.length + - ", actual size = " + bcast.value.foldLeft(0L)(_ + _.length)) + val actualSize = bcast.value.foldLeft(0L)(_ + _.length) + logInfo(log"Broadcast outputstatuses size =" + + log" ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, bytes.length)}" + + log", actual size = ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, actualSize)}") val bcastIn = new ChunkedByteBuffer(bcast.value.map(ByteBuffer.wrap)).toInputStream() // Important - ignore the DIRECT tag ! Start from offset 1 bcastIn.skip(1) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b1c640dc935a9..11d358f953c51 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -199,10 +199,11 @@ class SparkContext(config: SparkConf) extends Logging { this(master, appName, sparkHome, jars, Map()) // log out Spark Version in Spark driver log - logInfo(s"Running Spark version $SPARK_VERSION") - logInfo(s"OS info ${System.getProperty("os.name")}, ${System.getProperty("os.version")}, " + - s"${System.getProperty("os.arch")}") - logInfo(s"Java version ${System.getProperty("java.version")}") + logInfo(log"Running Spark version ${MDC(LogKeys.SPARK_VERSION, SPARK_VERSION)}") + logInfo(log"OS info ${MDC(LogKeys.OS_NAME, System.getProperty("os.name"))}," + + log" ${MDC(LogKeys.OS_VERSION, System.getProperty("os.version"))}, " + + log"${MDC(LogKeys.OS_ARCH, System.getProperty("os.arch"))}") + logInfo(log"Java version ${MDC(LogKeys.JAVA_VERSION, System.getProperty("java.version"))}") /* ------------------------------------------------------------------------------------- * | Private variables. These variables keep the internal state of the context, and are | @@ -439,7 +440,7 @@ class SparkContext(config: SparkConf) extends Logging { logResourceInfo(SPARK_DRIVER_PREFIX, _resources) // log out spark.app.name in the Spark driver logs - logInfo(s"Submitted application: $appName") + logInfo(log"Submitted application: ${MDC(LogKeys.APP_NAME, appName)}") // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { @@ -448,7 +449,7 @@ class SparkContext(config: SparkConf) extends Logging { } if (_conf.getBoolean("spark.logConf", false)) { - logInfo("Spark configuration:\n" + _conf.toDebugString) + logInfo(log"Spark configuration:\n${MDC(LogKeys.CONFIG, _conf.toDebugString)}") } // Set Spark driver host and port system properties. This explicitly sets the configuration @@ -1704,7 +1705,8 @@ class SparkContext(config: SparkConf) extends Logging { "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") val bc = env.broadcastManager.newBroadcast[T](value, isLocal, serializedOnly) val callSite = getCallSite() - logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) + logInfo(log"Created broadcast ${MDC(LogKeys.BROADCAST_ID, bc.id)}" + + log" from ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}") cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc } @@ -1832,7 +1834,8 @@ class SparkContext(config: SparkConf) extends Logging { addedFiles .getOrElseUpdate(jobArtifactUUID, new ConcurrentHashMap[String, Long]().asScala) .putIfAbsent(key, timestamp).isEmpty) { - logInfo(s"Added file $path at $key with timestamp $timestamp") + logInfo(log"Added file ${MDC(LogKeys.PATH, path)} at ${MDC(LogKeys.KEY, key)} with" + + log" timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}") // Fetch the file locally so that closures which are run on the driver can still use the // SparkFiles API to access files. Utils.fetchFile(uri.toString, root, conf, hadoopConfiguration, timestamp, useCache = false) @@ -1844,7 +1847,8 @@ class SparkContext(config: SparkConf) extends Logging { .putIfAbsent( Utils.getUriBuilder(new URI(key)).fragment(uri.getFragment).build().toString, timestamp).isEmpty) { - logInfo(s"Added archive $path at $key with timestamp $timestamp") + logInfo(log"Added archive ${MDC(LogKeys.PATH, path)} at ${MDC(LogKeys.KEY, key)}" + + log" with timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}") // If the scheme is file, use URI to simply copy instead of downloading. val uriToUse = if (!isLocal && scheme == "file") uri else new URI(key) val uriToDownload = Utils.getUriBuilder(uriToUse).fragment(null).build() @@ -1854,7 +1858,9 @@ class SparkContext(config: SparkConf) extends Logging { root, if (uri.getFragment != null) uri.getFragment else source.getName) logInfo( - s"Unpacking an archive $path from ${source.getAbsolutePath} to ${dest.getAbsolutePath}") + log"Unpacking an archive ${MDC(LogKeys.PATH, path)}" + + log" from ${MDC(LogKeys.SOURCE_ABSOLUTE_PATH, source.getAbsolutePath)}" + + log" to ${MDC(LogKeys.DESTINATION_ABSOLUTE_PATH, dest.getAbsolutePath)}") Utils.deleteRecursively(dest) Utils.unpack(source, dest) postEnvironmentUpdate() @@ -2215,7 +2221,10 @@ class SparkContext(config: SparkConf) extends Logging { .putIfAbsent(_, timestamp).isEmpty) if (added.nonEmpty) { val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI" - logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp") + logInfo(log"Added" + + log" ${MDC(LogKeys.ADDED_JARS_MESSAGE, jarMessage)} ${MDC(LogKeys.PATH, path)}" + + log" at ${MDC(LogKeys.ADDED_JARS, added.mkString(","))}" + + log" with timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}") postEnvironmentUpdate() } if (existed.nonEmpty) { @@ -2271,7 +2280,8 @@ class SparkContext(config: SparkConf) extends Logging { */ def stop(exitCode: Int): Unit = { stopSite = Some(getCallSite()) - logInfo(s"SparkContext is stopping with exitCode $exitCode from ${stopSite.get.shortForm}.") + logInfo(log"SparkContext is stopping with exitCode ${MDC(LogKeys.EXIT_CODE, exitCode)}" + + log" from ${MDC(LogKeys.STOP_SITE_SHORT_FORM, stopSite.get.shortForm)}.") if (LiveListenerBus.withinListenerThread.value) { throw new SparkException(s"Cannot stop SparkContext within listener bus thread.") } @@ -2435,9 +2445,10 @@ class SparkContext(config: SparkConf) extends Logging { } val callSite = getCallSite() val cleanedFunc = clean(func) - logInfo("Starting job: " + callSite.shortForm) + logInfo(log"Starting job: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}") if (conf.getBoolean("spark.logLineage", false)) { - logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) + logInfo(log"RDD's recursive dependencies:\n" + + log"${MDC(LogKeys.RDD_DEBUG_STRING, rdd.toDebugString)}") } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) @@ -2556,13 +2567,14 @@ class SparkContext(config: SparkConf) extends Logging { timeout: Long): PartialResult[R] = { assertNotStopped() val callSite = getCallSite() - logInfo("Starting job: " + callSite.shortForm) - val start = System.nanoTime + logInfo(log"Starting job: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}") + val start = System.currentTimeMillis() val cleanedFunc = clean(func) val result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout, localProperties.get) logInfo( - "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s") + log"Job finished: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}," + + log" took ${MDC(LogKeys.TOTAL_TIME, System.currentTimeMillis() - start)}ms") result } @@ -2790,7 +2802,8 @@ class SparkContext(config: SparkConf) extends Logging { val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf) listeners.foreach { listener => listenerBus.addToSharedQueue(listener) - logInfo(s"Registered listener ${listener.getClass().getName()}") + logInfo(log"Registered listener" + + log"${MDC(LogKeys.CLASS_NAME, listener.getClass().getName())}") } } } catch { diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 46dab16b8276e..5e2b5553f3dca 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -45,7 +45,7 @@ private[python] object Converter extends Logging { converterClass.map { cc => Try { val c = Utils.classForName[Converter[T, U]](cc).getConstructor().newInstance() - logInfo(s"Loaded converter: $cc") + logInfo(log"Loaded converter: ${MDC(CLASS_NAME, cc)}") c } match { case Success(c) => c diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 5aa080b5fb291..d643983ef5dfe 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -37,7 +37,8 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.api.python.PythonFunction.PythonAccumulator import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{HOST, PORT} import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD @@ -733,7 +734,8 @@ private[spark] class PythonAccumulatorV2( private def openSocket(): Socket = synchronized { if (socket == null || socket.isClosed) { socket = new Socket(serverHost, serverPort) - logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort") + logInfo(log"Connected to AccumulatorServer at host: ${MDC(HOST, serverHost)}" + + log" port: ${MDC(PORT, serverPort)}") // send the secret just for the initial authentication when opening a new connection socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8)) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 26c790a124470..fc6403fc7de77 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -28,7 +28,8 @@ import scala.sys.process.Process import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{PATH, PYTHON_VERSION} import org.apache.spark.util.ArrayImplicits.SparkArrayOps import org.apache.spark.util.Utils @@ -122,11 +123,11 @@ private[spark] object PythonUtils extends Logging { PythonUtils.sparkPythonPath, sys.env.getOrElse("PYTHONPATH", "")) val environment = Map("PYTHONPATH" -> pythonPath) - logInfo(s"Python path $pythonPath") + logInfo(log"Python path ${MDC(PATH, pythonPath)}") val processPythonVer = Process(pythonVersionCMD, None, environment.toSeq: _*) val output = runCommand(processPythonVer) - logInfo(s"Python version: ${output.getOrElse("Unable to determine")}") + logInfo(log"Python version: ${MDC(PYTHON_VERSION, output.getOrElse("Unable to determine"))}") val pythonCode = """ diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala index b238e2b87c653..b536be728b46d 100644 --- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala @@ -22,7 +22,8 @@ import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, Data import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkEnv, SparkPythonException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{PYTHON_EXEC,PYTHON_WORKER_MODULE, PYTHON_WORKER_RESPONSE, SESSION_ID} import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT @@ -58,7 +59,8 @@ private[spark] class StreamingPythonRunner( * to be used with the functions. */ def init(): (DataOutputStream, DataInputStream) = { - logInfo(s"Initializing Python runner (session: $sessionId, pythonExec: $pythonExec)") + logInfo(log"Initializing Python runner (session: ${MDC(SESSION_ID, sessionId)}," + + log" pythonExec: ${MDC(PYTHON_EXEC, pythonExec)})") val env = SparkEnv.get val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") @@ -95,7 +97,8 @@ private[spark] class StreamingPythonRunner( val errMessage = PythonWorkerUtils.readUTF(dataIn) throw streamingPythonRunnerInitializationFailure(resFromPython, errMessage) } - logInfo(s"Runner initialization succeeded (returned $resFromPython).") + logInfo(log"Runner initialization succeeded (returned" + + log" ${MDC(PYTHON_WORKER_RESPONSE, resFromPython)}).") (dataOut, dataIn) } @@ -116,7 +119,8 @@ private[spark] class StreamingPythonRunner( * Stops the Python worker. */ def stop(): Unit = { - logInfo(s"Stopping streaming runner for sessionId: $sessionId, module: $workerModule.") + logInfo(log"Stopping streaming runner for sessionId: ${MDC(SESSION_ID, sessionId)}," + + log" module: ${MDC(PYTHON_WORKER_MODULE, workerModule)}.") try { pythonWorkerFactory.foreach { factory => diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 4b0e12861f057..2741b13e92f76 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKeys.{DRIVER_ID, ERROR, HOST_PORT} +import org.apache.spark.internal.LogKeys.{DRIVER_ID, DRIVER_STATE, ERROR, HOST, HOST_PORT, WORKER_ID} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} @@ -163,11 +163,12 @@ private class ClientEndpoint( // logs again when waitAppCompletion is set to true if (!driverStatusReported) { driverStatusReported = true - logInfo(s"State of $submittedDriverID is ${state.get}") + logInfo(log"State of ${MDC(DRIVER_ID, submittedDriverID)}" + + log" is ${MDC(DRIVER_STATE, state.get)}") // Worker node, if present (workerId, workerHostPort, state) match { case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") + logInfo(log"Driver running on ${MDC(HOST, hostPort)} (${MDC(WORKER_ID, id)})") case _ => } } @@ -180,17 +181,18 @@ private class ClientEndpoint( state.get match { case DriverState.FINISHED | DriverState.FAILED | DriverState.ERROR | DriverState.KILLED => - logInfo(s"State of driver $submittedDriverID is ${state.get}, " + - s"exiting spark-submit JVM.") + logInfo(log"State of driver ${MDC(DRIVER_ID, submittedDriverID)}" + + log" is ${MDC(DRIVER_STATE, state.get)}, exiting spark-submit JVM.") System.exit(0) case _ => if (!waitAppCompletion) { - logInfo(s"spark-submit not configured to wait for completion, " + - s"exiting spark-submit JVM.") + logInfo("spark-submit not configured to wait for completion, " + + " exiting spark-submit JVM.") System.exit(0) } else { - logDebug(s"State of driver $submittedDriverID is ${state.get}, " + - s"continue monitoring driver status.") + logDebug(log"State of driver ${MDC(DRIVER_ID, submittedDriverID)}" + + log" is ${MDC(DRIVER_STATE, state.get)}, " + + log"continue monitoring driver status.") } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index a56fbd5a644ae..3ce5e2d62b6a0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -23,7 +23,8 @@ import java.util.concurrent.CountDownLatch import scala.jdk.CollectionConverters._ import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKeys.{AUTH_ENABLED, PORT, SHUFFLE_DB_BACKEND_KEY, SHUFFLE_DB_BACKEND_NAME} import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} import org.apache.spark.network.TransportContext import org.apache.spark.network.crypto.AuthServerBootstrap @@ -86,8 +87,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) { val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND) val dbBackend = DBBackend.byName(shuffleDBName) - logInfo(s"Use ${dbBackend.name()} as the implementation of " + - s"${config.SHUFFLE_SERVICE_DB_BACKEND.key}") + logInfo(log"Use ${MDC(SHUFFLE_DB_BACKEND_NAME, dbBackend.name())} as the implementation of " + + log"${MDC(SHUFFLE_DB_BACKEND_KEY, config.SHUFFLE_SERVICE_DB_BACKEND.key)}") new ExternalBlockHandler(conf, findRegisteredExecutorsDBFile(dbBackend.fileName(registeredExecutorsDB))) } else { @@ -106,7 +107,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana def start(): Unit = { require(server == null, "Shuffle server already started") val authEnabled = securityManager.isAuthenticationEnabled() - logInfo(s"Starting shuffle service on port $port (auth enabled = $authEnabled)") + logInfo(log"Starting shuffle service on port ${MDC(PORT, port)}" + + log" (auth enabled = ${MDC(AUTH_ENABLED, authEnabled)})") val bootstraps: Seq[TransportServerBootstrap] = if (authEnabled) { Seq(new AuthServerBootstrap(transportConf, securityManager)) diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index d6a50ff84f562..c4e582160f8ec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -35,7 +35,8 @@ import org.json4s.jackson.JsonMethods import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.master.RecoveryState -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKeys._ import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -187,7 +188,7 @@ private object FaultToleranceTest extends App with Logging { fn numPassed += 1 logInfo("==============================================") - logInfo("Passed: " + name) + logInfo(log"Passed: ${MDC(TEST_NAME, name)}") logInfo("==============================================") } catch { case e: Exception => @@ -201,12 +202,12 @@ private object FaultToleranceTest extends App with Logging { } private def addMasters(num: Int): Unit = { - logInfo(s">>>>> ADD MASTERS $num <<<<<") + logInfo(log">>>>> ADD MASTERS ${MDC(NUM_ADDED_MASTERS, num)} <<<<<") (1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) } } private def addWorkers(num: Int): Unit = { - logInfo(s">>>>> ADD WORKERS $num <<<<<") + logInfo(log">>>>> ADD WORKERS ${MDC(NUM_ADDED_WORKERS, num)} <<<<<") val masterUrls = getMasterUrls(masters.toSeq) (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) } } @@ -334,8 +335,9 @@ private object FaultToleranceTest extends App with Logging { } } - logInfo("Ran %s tests, %s passed and %s failed".format(numPassed + numFailed, numPassed, - numFailed)) + logInfo(log"Ran ${MDC(NUM_TESTS, numPassed + numFailed)} tests," + + log" ${MDC(NUM_PASSED_TESTS, numPassed)} passed" + + log" and ${MDC(NUM_FAILED_TESTS, numFailed)} failed") } private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 863bcd5b12d35..5795629819de3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -29,7 +29,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{FINAL_OUTPUT_PATH, TEMP_OUTPUT_PATH, TOTAL_TIME} import org.apache.spark.internal.config.{BUFFER_SIZE, CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME, CHECKPOINT_COMPRESS} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -172,7 +173,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { val checkpointDurationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs) - logInfo(s"Checkpointing took $checkpointDurationMs ms.") + logInfo(log"Checkpointing took ${MDC(TOTAL_TIME, checkpointDurationMs)} ms.") val newRDD = new ReliableCheckpointRDD[T]( sc, checkpointDirPath.toString, originalRDD.partitioner) @@ -219,7 +220,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { } (catchBlock = { val deleted = fs.delete(tempOutputPath, false) if (!deleted) { - logInfo(s"Failed to delete tempOutputPath $tempOutputPath.") + logInfo(log"Failed to delete tempOutputPath ${MDC(TEMP_OUTPUT_PATH, tempOutputPath)}.") } }, finallyBlock = { serializeStream.close() @@ -227,12 +228,13 @@ private[spark] object ReliableCheckpointRDD extends Logging { if (!fs.rename(tempOutputPath, finalOutputPath)) { if (!fs.exists(finalOutputPath)) { - logInfo(s"Deleting tempOutputPath $tempOutputPath") + logInfo(log"Deleting tempOutputPath ${MDC(TEMP_OUTPUT_PATH, tempOutputPath)}") fs.delete(tempOutputPath, false) throw SparkCoreErrors.checkpointFailedToSaveError(ctx.attemptNumber(), finalOutputPath) } else { // Some other copy of this task must've finished before us and renamed it - logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") + logInfo(log"Final output path" + + log" ${MDC(FINAL_OUTPUT_PATH, finalOutputPath)} already exists; not overwriting it") if (!fs.delete(tempOutputPath, false)) { logWarning(s"Error deleting ${tempOutputPath}") } diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index 0d1bc1425161e..b468a38fcf229 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -23,7 +23,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{NEW_RDD_ID, RDD_CHECKPOINT_DIR, RDD_ID} import org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS /** @@ -66,7 +67,8 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v } } - logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}") + logInfo(log"Done checkpointing RDD ${MDC(RDD_ID, rdd.id)}" + + log" to ${MDC(RDD_CHECKPOINT_DIR, cpDir)}, new parent is RDD ${MDC(NEW_RDD_ID, newRDD.id)}") newRDD } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 5e567a891d587..902de71f324b2 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -40,7 +40,8 @@ import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} import org.apache.spark.{SecurityManager, SparkConf, SSLOptions} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{HOST, PORT, SERVER_NAME, SERVLET_CONTEXT_HANDLER_PATH, UI_FILTER} import org.apache.spark.internal.config.UI._ import org.apache.spark.util.Utils @@ -247,7 +248,8 @@ private[spark] object JettyUtils extends Logging { poolSize: Int = 200): ServerInfo = { val stopTimeout = conf.get(UI_JETTY_STOP_TIMEOUT) - logInfo(s"Start Jetty $hostName:$port for $serverName") + logInfo(log"Start Jetty ${MDC(HOST, hostName)}:${MDC(PORT, port)}" + + log" for ${MDC(SERVER_NAME, serverName)}") // Start the server first, with no connectors. val pool = new QueuedThreadPool(poolSize) if (serverName.nonEmpty) { @@ -555,7 +557,9 @@ private[spark] case class ServerInfo( */ private def addFilters(handler: ServletContextHandler, securityMgr: SecurityManager): Unit = { conf.get(UI_FILTERS).foreach { filter => - logInfo(s"Adding filter to ${handler.getContextPath()}: $filter") + logInfo(log"Adding filter to" + + log" ${MDC(SERVLET_CONTEXT_HANDLER_PATH, handler.getContextPath())}:" + + log" ${MDC(UI_FILTER, filter)}") val oldParams = conf.getOption(s"spark.$filter.params").toSeq .flatMap(Utils.stringToSeq) .flatMap { param => diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ac76f74725e89..b8d422c9d9fbb 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -24,7 +24,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.CLASS_NAME +import org.apache.spark.internal.LogKeys.{CLASS_NAME, WEB_URL} import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR import org.apache.spark.internal.config.UI._ import org.apache.spark.scheduler._ @@ -164,7 +164,7 @@ private[spark] class SparkUI private ( /** Stop the server behind this web interface. Only valid after bind(). */ override def stop(): Unit = { super.stop() - logInfo(s"Stopped Spark web UI at $webUrl") + logInfo(log"Stopped Spark web UI at ${MDC(WEB_URL, webUrl)}") } override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 51499870a6a30..4f01cd6ac2136 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -27,7 +27,7 @@ import com.codahale.metrics.Timer import org.apache.spark.SparkEnv import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKeys.LISTENER +import org.apache.spark.internal.LogKeys.{EVENT, LISTENER, TOTAL_TIME} import org.apache.spark.scheduler.EventLoggingListener import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate @@ -132,8 +132,9 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { if (maybeTimerContext != null) { val elapsed = maybeTimerContext.stop() if (logSlowEventEnabled && elapsed > logSlowEventThreshold) { - logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " + - s"${elapsed / 1000000000d}s.") + logInfo(log"Process of event ${MDC(EVENT, redactEvent(event))} by" + + log"listener ${MDC(LISTENER, listenerName)} took " + + log"${MDC(TOTAL_TIME, elapsed / 1000000d)}ms.") } } } diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala index 775dc44fc1a13..01850027bf1bb 100644 --- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -25,7 +25,8 @@ import org.apache.commons.lang3.SystemUtils import org.slf4j.Logger import sun.misc.{Signal, SignalHandler} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.SIGNAL /** * Contains utilities for working with posix signals. @@ -79,7 +80,7 @@ private[spark] object SignalUtils extends Logging { action: => Boolean): Unit = synchronized { try { val handler = handlers.getOrElseUpdate(signal, { - logInfo(s"Registering signal handler for $signal") + logInfo(log"Registering signal handler for ${MDC(SIGNAL, signal)}") new ActionHandler(new Signal(signal)) }) handler.register(action) diff --git a/repl/src/test/scala/org/apache/spark/repl/SparkShellSuite.scala b/repl/src/test/scala/org/apache/spark/repl/SparkShellSuite.scala index 067f08cb67528..6f54b924ee177 100644 --- a/repl/src/test/scala/org/apache/spark/repl/SparkShellSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/SparkShellSuite.scala @@ -26,6 +26,8 @@ import scala.concurrent.duration._ import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.MDC import org.apache.spark.util.ThreadUtils class SparkShellSuite extends SparkFunSuite { @@ -68,7 +70,7 @@ class SparkShellSuite extends SparkFunSuite { val lock = new Object def captureOutput(source: String)(line: String): Unit = lock.synchronized { - val newLine = s"$source> $line" + val newLine = log"${MDC(STREAM_SOURCE, source)}> ${MDC(OUTPUT_LINE, line)}" logInfo(newLine) buffer += newLine @@ -79,7 +81,9 @@ class SparkShellSuite extends SparkFunSuite { // If we haven't found all expected answers and another expected answer comes up... if (next < expectedAnswers.size && line.contains(expectedAnswers(next))) { - logInfo(s"$source> found expected output line $next: '${expectedAnswers(next)}'") + logInfo(log"${MDC(STREAM_SOURCE, source)}> found expected" + + log" output line ${MDC(OUTPUT_LINE_NUMBER, next)}:" + + log" '${MDC(EXPECTED_ANSWER, expectedAnswers(next))}'") next += 1 // If all expected answers have been found... if (next == expectedAnswers.size) {