Skip to content

Commit

Permalink
Migrate Spark Core LogInfo Part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
zeotuan committed May 3, 2024
1 parent ff401dd commit c9e0209
Show file tree
Hide file tree
Showing 20 changed files with 231 additions and 118 deletions.
53 changes: 53 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ trait LogKey {
*/
object LogKeys {
case object ACCUMULATOR_ID extends LogKey
case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
case object ACTUAL_NUM_FILES extends LogKey
case object ACTUAL_PARTITION_COLUMN extends LogKey
case object ADDED_JARS extends LogKey
case object ADDED_JARS_MESSAGE extends LogKey
case object AGGREGATE_FUNCTIONS extends LogKey
case object ALPHA extends LogKey
case object ANALYSIS_ERROR extends LogKey
Expand All @@ -42,7 +45,10 @@ object LogKeys {
case object APP_NAME extends LogKey
case object APP_STATE extends LogKey
case object ARGS extends LogKey
case object AUTH_ENABLED extends LogKey
case object BACKUP_FILE extends LogKey
case object BARRIER_EPOCH extends LogKey
case object BARRIER_ID extends LogKey
case object BATCH_ID extends LogKey
case object BATCH_NAME extends LogKey
case object BATCH_TIMESTAMP extends LogKey
Expand All @@ -52,13 +58,15 @@ object LogKeys {
case object BOOT extends LogKey
case object BROADCAST extends LogKey
case object BROADCAST_ID extends LogKey
case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
case object BUCKET extends LogKey
case object BYTECODE_SIZE extends LogKey
case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
case object CACHE_AUTO_REMOVED_SIZE extends LogKey
case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey
case object CALL_SITE_LONG_FORM extends LogKey
case object CALL_SITE_SHORT_FORM extends LogKey
case object CATALOG_NAME extends LogKey
case object CATEGORICAL_FEATURES extends LogKey
case object CHECKPOINT_FILE extends LogKey
Expand Down Expand Up @@ -118,6 +126,7 @@ object LogKeys {
case object DATA_FILE_NUM extends LogKey
case object DATA_SOURCE extends LogKey
case object DATA_SOURCES extends LogKey

case object DEFAULT_COMPACTION_INTERVAL extends LogKey
case object DEFAULT_COMPACT_INTERVAL extends LogKey
case object DEFAULT_ISOLATION_LEVEL extends LogKey
Expand All @@ -126,11 +135,13 @@ object LogKeys {
case object DELEGATE extends LogKey
case object DELTA extends LogKey
case object DESCRIPTION extends LogKey
case object DESTINATION_ABSOLUTE_PATH extends LogKey
case object DESIRED_NUM_PARTITIONS extends LogKey
case object DFS_FILE extends LogKey
case object DIFF_DELTA extends LogKey
case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey
case object DRIVER_ID extends LogKey
case object DRIVER_STATE extends LogKey
case object DROPPED_PARTITIONS extends LogKey
case object DURATION extends LogKey
case object EARLIEST_LOADED_VERSION extends LogKey
Expand All @@ -145,6 +156,7 @@ object LogKeys {
case object ERROR extends LogKey
case object ESTIMATOR_PARAMETER_MAP extends LogKey
case object EVALUATED_FILTERS extends LogKey
case object EVENT extends LogKey
case object EVENT_LOOP extends LogKey
case object EVENT_QUEUE extends LogKey
case object EXCEPTION extends LogKey
Expand All @@ -162,7 +174,9 @@ object LogKeys {
case object EXECUTOR_STATE extends LogKey
case object EXECUTOR_TARGET_COUNT extends LogKey
case object EXISTING_FILE extends LogKey
case object EXISTING_JARS extends LogKey
case object EXIT_CODE extends LogKey
case object EXPECTED_ANSWER extends LogKey
case object EXPECTED_NUM_FILES extends LogKey
case object EXPECTED_PARTITION_COLUMN extends LogKey
case object EXPIRY_TIMESTAMP extends LogKey
Expand All @@ -185,6 +199,7 @@ object LogKeys {
case object FILE_VERSION extends LogKey
case object FILTER extends LogKey
case object FILTERS extends LogKey
case object FINAL_OUTPUT_PATH extends LogKey
case object FINAL_PATH extends LogKey
case object FINISH_TRIGGER_DURATION extends LogKey
case object FROM_OFFSET extends LogKey
Expand Down Expand Up @@ -218,6 +233,7 @@ object LogKeys {
case object INPUT extends LogKey
case object INTERVAL extends LogKey
case object ISOLATION_LEVEL extends LogKey
case object JAVA_VERSION extends LogKey
case object JOB_ID extends LogKey
case object JOIN_CONDITION extends LogKey
case object JOIN_CONDITION_SUB_EXPR extends LogKey
Expand Down Expand Up @@ -245,10 +261,12 @@ object LogKeys {
case object LOGICAL_PLAN_COLUMNS extends LogKey
case object LOGICAL_PLAN_LEAVES extends LogKey
case object LOG_ID extends LogKey
case object LOG_KEY_FILE extends LogKey
case object LOG_OFFSET extends LogKey
case object LOG_TYPE extends LogKey
case object LOWER_BOUND extends LogKey
case object MALFORMATTED_STIRNG extends LogKey
case object MAP_ID extends LogKey
case object MASTER_URL extends LogKey
case object MAX_ATTEMPTS extends LogKey
case object MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
Expand Down Expand Up @@ -282,18 +300,22 @@ object LogKeys {
case object NEW_FEATURE_COLUMN_NAME extends LogKey
case object NEW_LABEL_COLUMN_NAME extends LogKey
case object NEW_PATH extends LogKey
case object NEW_RDD_ID extends LogKey
case object NEW_VALUE extends LogKey
case object NODES extends LogKey
case object NODE_LOCATION extends LogKey
case object NORM extends LogKey
case object NUM_ADDED_MASTERS extends LogKey
case object NUM_ADDED_PARTITIONS extends LogKey
case object NUM_ADDED_WORKERS extends LogKey
case object NUM_BIN extends LogKey
case object NUM_BYTES extends LogKey
case object NUM_CLASSES extends LogKey
case object NUM_COLUMNS extends LogKey
case object NUM_CONCURRENT_WRITER extends LogKey
case object NUM_DROPPED_PARTITIONS extends LogKey
case object NUM_EXAMPLES extends LogKey
case object NUM_FAILED_TESTS extends LogKey
case object NUM_FEATURES extends LogKey
case object NUM_FILES extends LogKey
case object NUM_FILES_COPIED extends LogKey
Expand All @@ -304,13 +326,16 @@ object LogKeys {
case object NUM_LEFT_PARTITION_VALUES extends LogKey
case object NUM_LOADED_ENTRIES extends LogKey
case object NUM_LOCAL_FREQUENT_PATTERN extends LogKey
case object NUM_PASSED_TESTS extends LogKey
case object NUM_PARTITIONS extends LogKey
case object NUM_PARTITION_VALUES extends LogKey
case object NUM_POINT extends LogKey
case object NUM_PREFIXES extends LogKey
case object NUM_PRUNED extends LogKey
case object NUM_RIGHT_PARTITION_VALUES extends LogKey
case object NUM_REQUEST_SYNC_TASK extends LogKey
case object NUM_SEQUENCES extends LogKey
case object NUM_TESTS extends LogKey
case object NUM_VERSIONS_RETAIN extends LogKey
case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey
case object OBJECT_ID extends LogKey
Expand All @@ -325,7 +350,12 @@ object LogKeys {
case object OPTIONS extends LogKey
case object OP_ID extends LogKey
case object OP_TYPE extends LogKey
case object OS_ARCH extends LogKey
case object OS_NAME extends LogKey
case object OS_VERSION extends LogKey
case object OUTPUT extends LogKey
case object OUTPUT_LINE extends LogKey
case object OUTPUT_LINE_NUMBER extends LogKey
case object OVERHEAD_MEMORY_SIZE extends LogKey
case object PARSE_MODE extends LogKey
case object PARTITIONED_FILE_READER extends LogKey
Expand Down Expand Up @@ -357,6 +387,9 @@ object LogKeys {
case object PUSHED_FILTERS extends LogKey
case object PVC_METADATA_NAME extends LogKey
case object PYTHON_EXEC extends LogKey
case object PYTHON_VERSION extends LogKey
case object PYTHON_WORKER_MODULE extends LogKey
case object PYTHON_WORKER_RESPONSE extends LogKey
case object QUERY_CACHE_VALUE extends LogKey
case object QUERY_HINT extends LogKey
case object QUERY_ID extends LogKey
Expand All @@ -366,7 +399,9 @@ object LogKeys {
case object QUERY_PLAN_LENGTH_MAX extends LogKey
case object QUERY_RUN_ID extends LogKey
case object RANGE extends LogKey
case object RDD_CHECKPOINT_DIR extends LogKey
case object RDD_ID extends LogKey
case object RDD_DEBUG_STRING extends LogKey
case object READ_LIMIT extends LogKey
case object REASON extends LogKey
case object REATTACHABLE extends LogKey
Expand All @@ -382,6 +417,7 @@ object LogKeys {
case object RELATIVE_TOLERANCE extends LogKey
case object REMAINING_PARTITIONS extends LogKey
case object REPORT_DETAILS extends LogKey
case object REQUESTER_SIZE extends LogKey
case object RESOURCE extends LogKey
case object RESOURCE_NAME extends LogKey
case object RESOURCE_PROFILE_ID extends LogKey
Expand All @@ -399,22 +435,30 @@ object LogKeys {
case object RUN_ID extends LogKey
case object SCHEMA extends LogKey
case object SCHEMA2 extends LogKey
case object SERIALIZE_OUTPUT_LENGTH extends LogKey
case object SERVER_NAME extends LogKey
case object SERVICE_NAME extends LogKey
case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey
case object SESSION_HOLD_INFO extends LogKey
case object SESSION_ID extends LogKey
case object SESSION_KEY extends LogKey
case object SHARD_ID extends LogKey
case object SHUFFLE_BLOCK_INFO extends LogKey
case object SHUFFLE_DB_BACKEND_KEY extends LogKey
case object SHUFFLE_DB_BACKEND_NAME extends LogKey
case object SHUFFLE_ID extends LogKey
case object SHUFFLE_MERGE_ID extends LogKey
case object SHUFFLE_SERVICE_NAME extends LogKey
case object SIZE extends LogKey
case object SIGNAL extends LogKey
case object SLEEP_TIME extends LogKey
case object SLIDE_DURATION extends LogKey
case object SMALLEST_CLUSTER_INDEX extends LogKey
case object SNAPSHOT_VERSION extends LogKey
case object SOURCE_ABSOLUTE_PATH extends LogKey
case object SPARK_DATA_STREAM extends LogKey
case object SPARK_PLAN_ID extends LogKey
case object SPARK_VERSION extends LogKey
case object SQL_TEXT extends LogKey
case object SRC_PATH extends LogKey
case object STAGE_ATTEMPT extends LogKey
Expand All @@ -426,6 +470,7 @@ object LogKeys {
case object STATE_STORE_VERSION extends LogKey
case object STATUS extends LogKey
case object STDERR extends LogKey
case object STOP_SITE_SHORT_FORM extends LogKey
case object STORAGE_LEVEL extends LogKey
case object STORAGE_LEVEL_DESERIALIZED extends LogKey
case object STORAGE_LEVEL_REPLICATION extends LogKey
Expand All @@ -440,26 +485,32 @@ object LogKeys {
case object STREAMING_WRITE extends LogKey
case object STREAM_ID extends LogKey
case object STREAM_NAME extends LogKey
case object STREAM_SOURCE extends LogKey
case object SUBMISSION_ID extends LogKey
case object SUBSAMPLING_RATE extends LogKey
case object SUB_QUERY extends LogKey
case object TABLE_NAME extends LogKey
case object TABLE_TYPES extends LogKey
case object TARGET_NUM_EXECUTOR extends LogKey
case object TARGET_NUM_EXECUTOR_DELTA extends LogKey
case object TARGET_PATH extends LogKey
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
case object TASK_NAME extends LogKey
case object TASK_SET_NAME extends LogKey
case object TASK_STATE extends LogKey
case object TEMP_FILE extends LogKey
case object TEMP_OUTPUT_PATH extends LogKey
case object TEMP_PATH extends LogKey
case object TEST_NAME extends LogKey
case object TEST_SIZE extends LogKey
case object THREAD extends LogKey
case object THREAD_NAME extends LogKey
case object TID extends LogKey
case object TIME extends LogKey
case object TIMEOUT extends LogKey
case object TIMER extends LogKey
case object TIMESTAMP extends LogKey
case object TIME_UNITS extends LogKey
case object TIP extends LogKey
case object TOKEN_REGEX extends LogKey
Expand Down Expand Up @@ -499,7 +550,9 @@ object LogKeys {
case object WAIT_SEND_TIME extends LogKey
case object WAIT_TIME extends LogKey
case object WATERMARK_CONSTRAINT extends LogKey
case object WEB_URL extends LogKey
case object WEIGHTED_NUM extends LogKey
case object WORKER_ID extends LogKey
case object WORKER_URL extends LogKey
case object WRITE_AHEAD_LOG_INFO extends LogKey
case object WRITE_JOB_UUID extends LogKey
Expand Down
17 changes: 11 additions & 6 deletions core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import java.util.function.Consumer

import scala.collection.mutable.{ArrayBuffer, HashSet}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted}
import org.apache.spark.util.ThreadUtils
Expand Down Expand Up @@ -161,7 +162,8 @@ private[spark] class BarrierCoordinator(
s"${request.numTasks} from Task $taskId, previously it was $numTasks.")

// Check whether the epoch from the barrier tasks matches current barrierEpoch.
logInfo(s"Current barrier epoch for $barrierId is $barrierEpoch.")
logInfo(log"Current barrier epoch for ${MDC(BARRIER_ID, barrierId)}" +
log" is ${MDC(BARRIER_EPOCH, barrierEpoch)}.")
if (epoch != barrierEpoch) {
requester.sendFailure(new SparkException(s"The request to sync of $barrierId with " +
s"barrier epoch $barrierEpoch has already finished. Maybe task $taskId is not " +
Expand All @@ -176,14 +178,17 @@ private[spark] class BarrierCoordinator(
// Add the requester to array of RPCCallContexts pending for reply.
requesters += requester
messages(request.partitionId) = request.message
logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received update from Task " +
s"$taskId, current progress: ${requesters.size}/$numTasks.")
logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" +
log" from ${MDC(BARRIER_ID, barrierId)} received update from Task" +
log" ${MDC(TASK_ID, taskId)}, current progress:" +
log" ${MDC(REQUESTER_SIZE, requesters.size)}/${MDC(NUM_REQUEST_SYNC_TASK, numTasks)}.")
if (requesters.size == numTasks) {
requesters.foreach(_.reply(messages.clone()))
// Finished current barrier() call successfully, clean up ContextBarrierState and
// increase the barrier epoch.
logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received all updates from " +
s"tasks, finished successfully.")
logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" +
log" from ${MDC(BARRIER_ID, barrierId)} received all updates from" +
log" tasks, finished successfully.")
barrierEpoch += 1
requesters.clear()
requestMethods.clear()
Expand Down
39 changes: 20 additions & 19 deletions core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark

import java.util.{Properties, TimerTask}
import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success => ScalaSuccess, Try}

import scala.util.{Failure, Try, Success => ScalaSuccess}
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{LogEntry, Logging, MDC, MessageWithContext}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.Source
import org.apache.spark.resource.ResourceInformation
Expand Down Expand Up @@ -56,19 +55,27 @@ class BarrierTaskContext private[spark] (
// with the driver side epoch.
private var barrierEpoch = 0

private def logProgressInfo(msg: MessageWithContext, startTime: Option[Long]): Unit = {
val waitMsg = startTime.fold(log"")(st => log", waited " +
log"for ${MDC(TOTAL_TIME, System.currentTimeMillis() - st)} ms,")
logInfo(log"Task ${MDC(TASK_ATTEMPT_ID, taskAttemptId())}" +
log" from Stage ${MDC(STAGE_ID, stageId())}" +
log"(Attempt ${MDC(STAGE_ATTEMPT, stageAttemptNumber())}) " +
msg + waitMsg +
log" current barrier epoch is ${MDC(BARRIER_EPOCH, barrierEpoch)}.")
}

private def runBarrier(message: String, requestMethod: RequestMethod.Value): Array[String] = {
logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " +
s"has entered the global sync, current barrier epoch is $barrierEpoch.")
logProgressInfo(log"has entered the global sync", None)
logTrace("Current callSite: " + Utils.getCallSite())

val startTime = System.currentTimeMillis()
val timerTask = new TimerTask {
override def run(): Unit = {
logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " +
s"${stageAttemptNumber()}) waiting " +
s"under the global sync since $startTime, has been waiting for " +
s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
s"current barrier epoch is $barrierEpoch.")
logProgressInfo(
log"waiting under the global sync since ${MDC(TIME, startTime)}",
Some(startTime)
)
}
}
// Log the update of global sync every 1 minute.
Expand Down Expand Up @@ -104,17 +111,11 @@ class BarrierTaskContext private[spark] (
val messages = abortableRpcFuture.future.value.get.get

barrierEpoch += 1
logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " +
s"finished global sync successfully, waited for " +
s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
s"current barrier epoch is $barrierEpoch.")
logProgressInfo(log"finished global sync successfully", Some(startTime))
messages
} catch {
case e: SparkException =>
logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " +
s"${stageAttemptNumber()}) failed to perform global sync, waited for " +
s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
s"current barrier epoch is $barrierEpoch.")
logProgressInfo(log"failed to perform global sync", Some(startTime))
throw e
} finally {
timerTask.cancel()
Expand Down
Loading

0 comments on commit c9e0209

Please sign in to comment.