Skip to content

Commit

Permalink
New THREAD_DUMP_COLLECTOR_PATTERN parameter and timestamp formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
roczei committed Feb 28, 2025
1 parent 2bb6398 commit ccf4e0f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 22 deletions.
48 changes: 27 additions & 21 deletions core/src/main/scala/org/apache/spark/ThreadDumpCollector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark

import java.nio.charset.StandardCharsets
import java.time.Instant
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit

import org.apache.hadoop.fs.{FileSystem, Path}
Expand All @@ -27,7 +28,7 @@ import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config.{THREAD_DUMP_COLLECTOR_DIR,
THREAD_DUMP_COLLECTOR_OUTPUT_TYPE}
THREAD_DUMP_COLLECTOR_OUTPUT_TYPE, THREAD_DUMP_COLLECTOR_PATTERN}
import org.apache.spark.util.{ThreadUtils, Utils}

/**
Expand Down Expand Up @@ -76,27 +77,32 @@ private[spark] object ThreadDumpCollector extends Logging {
}

private def writeThreadDumpsToFile(env: SparkEnv): Unit = {
val timestamp = Instant.now().getEpochSecond().toString()
val regexPattern = env.conf.get(THREAD_DUMP_COLLECTOR_PATTERN).r
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH_mm_ss")
val timestamp = LocalDateTime.now.format(formatter)
val threadDumpFileName = env.conf.getAppId + "-" + env.executorId + "-" + timestamp + ".txt"
val collectedThreadDump = Utils.getThreadDump().map(_.toString).mkString
val hadoopConf = SparkHadoopUtil.get.newConfiguration(env.conf)
val rootDir = env.conf.get(THREAD_DUMP_COLLECTOR_DIR)
val fileSystem: FileSystem = new Path(rootDir).getFileSystem(hadoopConf)
val threadDumpFilePermissions = new FsPermission(Integer.parseInt("770", 8).toShort)
val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, threadDumpFileName))
try {
val outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile, true)
fileSystem.setPermission(dfsLogFile, threadDumpFilePermissions)
outputStream.write(collectedThreadDump.getBytes(StandardCharsets
.UTF_8))
outputStream.close()
} catch {
case e: Exception =>
logError(
log"Could not save thread dumps into file from executor ${
MDC(LogKeys.EXECUTOR_ID,
env.executorId)
}", e)
val anyPatternMatches = regexPattern.findFirstIn(collectedThreadDump).isDefined
if (anyPatternMatches) {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(env.conf)
val rootDir = env.conf.get(THREAD_DUMP_COLLECTOR_DIR)
val fileSystem: FileSystem = new Path(rootDir).getFileSystem(hadoopConf)
val threadDumpFilePermissions = new FsPermission(Integer.parseInt("770", 8).toShort)
val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, threadDumpFileName))
try {
val outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile, true)
fileSystem.setPermission(dfsLogFile, threadDumpFilePermissions)
outputStream.write(collectedThreadDump.getBytes(StandardCharsets
.UTF_8))
outputStream.close()
} catch {
case e: Exception =>
logError(
log"Could not save thread dumps into file from executor ${
MDC(LogKeys.EXECUTOR_ID,
env.executorId)
}", e)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2870,6 +2870,13 @@ package object config {
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set("LOG", "FILE"))
.createWithDefault("LOG")
.createWithDefault("FILE")

private[spark] val THREAD_DUMP_COLLECTOR_PATTERN =
ConfigBuilder("spark.threadDumpCollector.include.regex")
.doc("Regular expression for determining which thread dumps will be captured")
.version("4.0.0")
.stringConf
.createWithDefault(".*")

}

0 comments on commit ccf4e0f

Please sign in to comment.