From 28e92a58b9e7c9bc8daf3432776221708d273af9 Mon Sep 17 00:00:00 2001 From: Elien Vandermaesen Date: Tue, 14 Jan 2025 11:30:00 +0100 Subject: [PATCH 1/2] issue989 implement BatchJobProgressListener.onApplicationEnd to filter the stages to only log long-running stages --- .../BatchJobProgressListener.scala | 89 +++++++++++++++---- 1 file changed, 73 insertions(+), 16 deletions(-) diff --git a/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala b/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala index ffe09ff1..d162b8d9 100644 --- a/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala +++ b/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala @@ -1,15 +1,13 @@ package org.openeo.sparklisteners; -import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; -import org.apache.spark.util.AccumulatorV2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.Traversable; -import scala.collection.mutable.Map; - +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerStageCompleted, SparkListenerStageSubmitted} +import org.apache.spark.util.AccumulatorV2 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import scala.collection.mutable import java.time.Duration; object BatchJobProgressListener { @@ -22,22 +20,29 @@ class BatchJobProgressListener extends SparkListener { import BatchJobProgressListener.logger + private val stagesInformation = new mutable.LinkedHashMap[String,mutable.Map[String,Any]]() + private val executorInformation = new mutable.LinkedHashMap[String,mutable.Map[String,Long]] + override def onStageSubmitted( stageSubmitted:SparkListenerStageSubmitted):Unit = { logger.info(s"Starting stage: ${stageSubmitted.stageInfo.stageId} - ${stageSubmitted.stageInfo.name}. \nStages may combine multiple processes." ) } override def onStageCompleted( stageCompleted: SparkListenerStageCompleted):Unit = { val taskMetrics = stageCompleted.stageInfo.taskMetrics - + val stageInformation = new mutable.LinkedHashMap[String,Any]() + var logs = List[(String, String)]() if(stageCompleted.stageInfo.failureReason.isDefined){ + stageInformation += ("duration" -> Duration.ofMillis(taskMetrics.executorRunTime)) val message = f"""A part of the process graph failed, and will be retried, the reason was: "${stageCompleted.stageInfo.failureReason.get}" |Your job may still complete if the failure was caused by a transient error, but will take more time. A common cause of transient errors is too little executor memory (overhead). Too low executor-memory can be seen by a high 'garbage collection' time, which was: ${Duration.ofMillis(taskMetrics.jvmGCTime).toSeconds / 1000.0} seconds. |""".stripMargin - logger.warn(message) +// logger.warn(message) + logs = ("warn", message) :: logs }else{ val duration = Duration.ofMillis(taskMetrics.executorRunTime) + stageInformation += ("duration" -> Duration.ofMillis(taskMetrics.executorRunTime)) val timeString = if(duration.toSeconds>60) { duration.toMinutes + " minutes" } else { @@ -45,21 +50,73 @@ class BatchJobProgressListener extends SparkListener { } val megabytes = taskMetrics.shuffleWriteMetrics.bytesWritten.toFloat/(1024.0*1024.0) val name = stageCompleted.stageInfo.name - logger.info(f"Stage ${stageCompleted.stageInfo.stageId} produced $megabytes%.2f MB in $timeString - ${name}."); - + val message = f"Stage ${stageCompleted.stageInfo.stageId} produced $megabytes%.2f MB in $timeString - ${name}." +// logger.info(f"Stage ${stageCompleted.stageInfo.stageId} produced $megabytes%.2f MB in $timeString - ${name}."); + logs = ("info",message) :: logs val accumulators = stageCompleted.stageInfo.accumulables; val chunkCounts = accumulators.filter(_._2.name.get.startsWith("ChunkCount")); if (chunkCounts.nonEmpty) { val totalChunks = chunkCounts.head._2.value val megapixel = totalChunks.get.asInstanceOf[Long] * 256 * 256 / (1024 * 1024) if(taskMetrics.executorRunTime > 0) { - logger.info(f"load_collection: data was loaded with an average speed of: ${megapixel.toFloat/ duration.toSeconds.toFloat}%.3f Megapixel per second.") + val messageSpeed = f"load_collection: data was loaded with an average speed of: ${megapixel.toFloat/ duration.toSeconds.toFloat}%.3f Megapixel per second." +// logger.info(f"load_collection: data was loaded with an average speed of: ${megapixel.toFloat/ duration.toSeconds.toFloat}%.3f Megapixel per second.") + logs = ("info",messageSpeed) :: logs }; } } - + stageInformation += ("logs" -> logs) + stagesInformation += (stageCompleted.stageInfo.stageId.toString -> stageInformation) } + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + val startTime = executorAdded.time + val executorId = executorAdded.executorId + + } + override def onExecutorRemoved(executorAdded: SparkListenerExecutorRemoved): Unit = { + val endTime = executorAdded.time + val executorId = executorAdded.executorId + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd):Unit={ + val (totalStages, totalDuration) = stagesInformation.foldLeft((0,Duration.ZERO)){ (x,y) => + val duration = y._2.getOrElse("duration",0) match { + case n:Duration => n + } + (x._1 + 1, x._2.plus(duration)) + } + val ordered = stagesInformation.toSeq.sortWith((a,b) =>{ + val DurationA = a._2.getOrElse("duration",0) match { + case n:Duration => n} + val DurationB = b._2.getOrElse("duration",0) match {case n:Duration => n} + DurationA.toMillis < DurationB.toMillis + }) + val timeString = if(totalDuration.toSeconds>60) { + totalDuration.toMinutes + " minutes" + } else { + totalDuration.toMillis.toFloat / 1000.0 + " seconds" + } + logger.info(f" ${totalStages} stages finished in ${timeString}") + val thresholdDurationLogging = totalDuration.toMillis * 0.8 + var tempDuration = 0.0 + var i = 0 + var maxDurationToLog = ordered.head._2.getOrElse("duration",0) match {case n:Duration => n} + while (tempDuration < thresholdDurationLogging){ + val stageInfo = ordered(i)._2 + val logs = stageInfo.getOrElse("logs","") match {case s:List[(String, String)] => s} + for (log <- logs){ + log match { + case ("warn",s) => logger.warn(s) + case ("info",s) => logger.info(s) + } + } + val duration = stageInfo.getOrElse("duration",0.0) match {case v:Duration => v} + tempDuration += duration.toMillis + maxDurationToLog = duration + i += 1 + } + } } From 3f8a872b0d3905fcea2475367e0de64ed4bb443e Mon Sep 17 00:00:00 2001 From: Elien Vandermaesen Date: Tue, 14 Jan 2025 13:27:15 +0100 Subject: [PATCH 2/2] issue989 Add BatchJobProgressListener to the longest unit tests and add executor time --- .../BatchJobProgressListener.scala | 20 +++++++++++-------- .../layers/FileLayerProviderTest.scala | 8 +++++++- .../Sentinel2FileLayerProviderTest.scala | 9 ++++++--- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala b/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala index d162b8d9..f39a0151 100644 --- a/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala +++ b/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala @@ -21,7 +21,7 @@ class BatchJobProgressListener extends SparkListener { import BatchJobProgressListener.logger private val stagesInformation = new mutable.LinkedHashMap[String,mutable.Map[String,Any]]() - private val executorInformation = new mutable.LinkedHashMap[String,mutable.Map[String,Long]] + private val executorInformation = new mutable.LinkedHashMap[String,Long] override def onStageSubmitted( stageSubmitted:SparkListenerStageSubmitted):Unit = { logger.info(s"Starting stage: ${stageSubmitted.stageInfo.stageId} - ${stageSubmitted.stageInfo.name}. \nStages may combine multiple processes." ) @@ -50,7 +50,7 @@ class BatchJobProgressListener extends SparkListener { } val megabytes = taskMetrics.shuffleWriteMetrics.bytesWritten.toFloat/(1024.0*1024.0) val name = stageCompleted.stageInfo.name - val message = f"Stage ${stageCompleted.stageInfo.stageId} produced $megabytes%.2f MB in $timeString - ${name}." + val message = f"Stage ${stageCompleted.stageInfo.stageId} produced $megabytes%.2f MB in $timeString - $name." // logger.info(f"Stage ${stageCompleted.stageInfo.stageId} produced $megabytes%.2f MB in $timeString - ${name}."); logs = ("info",message) :: logs val accumulators = stageCompleted.stageInfo.accumulables; @@ -74,11 +74,13 @@ class BatchJobProgressListener extends SparkListener { override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { val startTime = executorAdded.time val executorId = executorAdded.executorId + executorInformation += (executorId -> startTime) } - override def onExecutorRemoved(executorAdded: SparkListenerExecutorRemoved): Unit = { - val endTime = executorAdded.time - val executorId = executorAdded.executorId + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + val executorId = executorRemoved.executorId + val executorTime = executorRemoved.time-executorInformation(executorId) + executorInformation += (executorId -> executorTime) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd):Unit={ @@ -88,6 +90,9 @@ class BatchJobProgressListener extends SparkListener { } (x._1 + 1, x._2.plus(duration)) } + val executorTime = executorInformation.foldLeft(0L)((x,y) => { + x + (y._2) + }) val ordered = stagesInformation.toSeq.sortWith((a,b) =>{ val DurationA = a._2.getOrElse("duration",0) match { case n:Duration => n} @@ -99,12 +104,11 @@ class BatchJobProgressListener extends SparkListener { } else { totalDuration.toMillis.toFloat / 1000.0 + " seconds" } - logger.info(f" ${totalStages} stages finished in ${timeString}") - val thresholdDurationLogging = totalDuration.toMillis * 0.8 + logger.info(f" \nSummary of the executed stages: \nTotal number of stages: $totalStages \nTotal stage runtime: $timeString\nTotal executor allocation time: $executorTime \nLogs of the longest stages:") var tempDuration = 0.0 var i = 0 var maxDurationToLog = ordered.head._2.getOrElse("duration",0) match {case n:Duration => n} - while (tempDuration < thresholdDurationLogging){ + while (tempDuration < totalDuration.toMillis * 0.8){ val stageInfo = ordered(i)._2 val logs = stageInfo.getOrElse("logs","") match {case s:List[(String, String)] => s} for (log <- logs){ diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala index f743b92a..1d487d4d 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala @@ -33,7 +33,7 @@ import org.openeo.geotrelliscommon.{ConfigurableSpaceTimePartitioner, DataCubePa import org.openeo.opensearch.OpenSearchResponses.{CreoFeatureCollection, FeatureCollection, Link} import org.openeo.opensearch.backends.CreodiasClient import org.openeo.opensearch.{OpenSearchClient, OpenSearchResponses} -import org.openeo.sparklisteners.GetInfoSparkListener +import org.openeo.sparklisteners.{BatchJobProgressListener, GetInfoSparkListener} import ucar.nc2.NetcdfFile import ucar.nc2.util.CompareNetcdf2 @@ -1218,6 +1218,9 @@ class FileLayerProviderTest extends RasterMatchers{ val listener = new GetInfoSparkListener() sc.addSparkListener(listener) + val ProgressListener = new BatchJobProgressListener() + sc.addSparkListener(ProgressListener) + val (datacubeParams,result) = keysForLargeArea() val allTiles = result._1.collect() @@ -1251,6 +1254,9 @@ class FileLayerProviderTest extends RasterMatchers{ val listener = new GetInfoSparkListener() sc.addSparkListener(listener) + val ProgressListener = new BatchJobProgressListener() + sc.addSparkListener(ProgressListener) + val (datacubeParams,result) = keysForLargeArea(true) val allTiles = result._1.collect() diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala index 3d0c8108..e72cccfa 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala @@ -31,10 +31,10 @@ import org.openeo.geotrellis.TestImplicits._ import org.openeo.geotrellis.geotiff.{GTiffOptions, saveRDD} import org.openeo.geotrellis.netcdf.{NetCDFOptions, NetCDFRDDWriter} import org.openeo.geotrellis.{LayerFixtures, MergeCubesSpec, OpenEOProcessScriptBuilder, OpenEOProcesses, ProjectedPolygons, TestOpenEOProcessScriptBuilder} -import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ConfigurableSpaceTimePartitioner, SparseSpaceTimePartitioner, DataCubeParameters, ResampledTile} +import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ConfigurableSpaceTimePartitioner, DataCubeParameters, ResampledTile, SparseSpaceTimePartitioner} import org.openeo.opensearch.OpenSearchResponses.Link import org.openeo.opensearch.{OpenSearchClient, OpenSearchResponses} -import org.openeo.sparklisteners.GetInfoSparkListener +import org.openeo.sparklisteners.{BatchJobProgressListener, GetInfoSparkListener} import java.net.URI import java.time.LocalTime.MIDNIGHT @@ -545,7 +545,8 @@ class Sentinel2FileLayerProviderTest extends RasterMatchers { val crs = CRS.fromEpsgCode(32631) val boundingBox = ProjectedExtent(Extent(640860, 5676170, 666460, 5701770), crs) val dataCubeParameters = new DataCubeParameters - + val ProgressListener = new BatchJobProgressListener() + sc.addSparkListener(ProgressListener) val listener = new GetInfoSparkListener() SparkContext.getOrCreate().addSparkListener(listener) // dataCubeParameters.tileSize = 2048 (This requires increased spark.kryoserializer.buffer.max) @@ -713,6 +714,8 @@ class Sentinel2FileLayerProviderTest extends RasterMatchers { @Test def testL1CMultibandTileMask(): Unit = { + val listener = new BatchJobProgressListener() + sc.addSparkListener(listener) val dilationDistance = 5 val creoL1CLayerProvider = FileLayerProvider(