diff --git a/core/src/main/scala/org/apache/spark/ThreadDumpCollector.scala b/core/src/main/scala/org/apache/spark/ThreadDumpCollector.scala index 6c5fb9f85e2b3..3bcf91936481a 100644 --- a/core/src/main/scala/org/apache/spark/ThreadDumpCollector.scala +++ b/core/src/main/scala/org/apache/spark/ThreadDumpCollector.scala @@ -18,7 +18,8 @@ package org.apache.spark import java.nio.charset.StandardCharsets -import java.time.Instant +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter import java.util.concurrent.TimeUnit import org.apache.hadoop.fs.{FileSystem, Path} @@ -27,7 +28,7 @@ import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config.{THREAD_DUMP_COLLECTOR_DIR, - THREAD_DUMP_COLLECTOR_OUTPUT_TYPE} + THREAD_DUMP_COLLECTOR_OUTPUT_TYPE, THREAD_DUMP_COLLECTOR_PATTERN} import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -76,27 +77,32 @@ private[spark] object ThreadDumpCollector extends Logging { } private def writeThreadDumpsToFile(env: SparkEnv): Unit = { - val timestamp = Instant.now().getEpochSecond().toString() + val regexPattern = env.conf.get(THREAD_DUMP_COLLECTOR_PATTERN).r + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH_mm_ss") + val timestamp = LocalDateTime.now.format(formatter) val threadDumpFileName = env.conf.getAppId + "-" + env.executorId + "-" + timestamp + ".txt" val collectedThreadDump = Utils.getThreadDump().map(_.toString).mkString - val hadoopConf = SparkHadoopUtil.get.newConfiguration(env.conf) - val rootDir = env.conf.get(THREAD_DUMP_COLLECTOR_DIR) - val fileSystem: FileSystem = new Path(rootDir).getFileSystem(hadoopConf) - val threadDumpFilePermissions = new FsPermission(Integer.parseInt("770", 8).toShort) - val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, threadDumpFileName)) - try { - val outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile, true) - fileSystem.setPermission(dfsLogFile, threadDumpFilePermissions) - outputStream.write(collectedThreadDump.getBytes(StandardCharsets - .UTF_8)) - outputStream.close() - } catch { - case e: Exception => - logError( - log"Could not save thread dumps into file from executor ${ - MDC(LogKeys.EXECUTOR_ID, - env.executorId) - }", e) + val anyPatternMatches = regexPattern.findFirstIn(collectedThreadDump).isDefined + if (anyPatternMatches) { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(env.conf) + val rootDir = env.conf.get(THREAD_DUMP_COLLECTOR_DIR) + val fileSystem: FileSystem = new Path(rootDir).getFileSystem(hadoopConf) + val threadDumpFilePermissions = new FsPermission(Integer.parseInt("770", 8).toShort) + val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, threadDumpFileName)) + try { + val outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile, true) + fileSystem.setPermission(dfsLogFile, threadDumpFilePermissions) + outputStream.write(collectedThreadDump.getBytes(StandardCharsets + .UTF_8)) + outputStream.close() + } catch { + case e: Exception => + logError( + log"Could not save thread dumps into file from executor ${ + MDC(LogKeys.EXECUTOR_ID, + env.executorId) + }", e) + } } } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 05a2c68a3f1a6..88d5de6240cd5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2870,6 +2870,13 @@ package object config { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(Set("LOG", "FILE")) - .createWithDefault("LOG") + .createWithDefault("FILE") + + private[spark] val THREAD_DUMP_COLLECTOR_PATTERN = + ConfigBuilder("spark.threadDumpCollector.include.regex") + .doc("Regular expression for determining which thread dumps will be captured") + .version("4.0.0") + .stringConf + .createWithDefault(".*") }