Skip to content

Commit

Permalink
Merge pull request #47 from newrelic-experimental/feat/minor-spark-me…
Browse files Browse the repository at this point in the history
…tric-updates-and-docs

feat: add job/stage durations, expose stage/task ids, add spark docs
  • Loading branch information
sdewitt-newrelic authored Jan 13, 2025
2 parents 82baf43 + 9501ce4 commit 3f2c13c
Show file tree
Hide file tree
Showing 10 changed files with 1,878 additions and 1,533 deletions.
660 changes: 660 additions & 0 deletions README.md

Large diffs are not rendered by default.

Binary file modified examples/spark-dashboard-executors.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified examples/spark-dashboard-jobs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed examples/spark-dashboard-rdds.png
Binary file not shown.
Binary file removed examples/spark-dashboard-stages.png
Binary file not shown.
Binary file added examples/spark-dashboard-storage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2,667 changes: 1,143 additions & 1,524 deletions examples/spark-daskboard.json

Large diffs are not rendered by default.

56 changes: 48 additions & 8 deletions internal/spark/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,26 @@ func collectSparkAppJobMetrics(

// Write all the things.

if job.CompletionTime != "" {
jobDuration, err := calcDateDifferenceMillis(
job.SubmissionTime,
job.CompletionTime,
)
if err != nil {
log.Warnf("could not calculate job duration: %v", err)

continue
}

writeGauge(
metricPrefix,
"app.job.duration",
jobDuration,
attrs,
writer,
)
}

writeGauge(
metricPrefix,
"app.job.indices.completed",
Expand All @@ -355,7 +375,7 @@ func collectSparkAppJobMetrics(
writer,
)

attrs["sparkAppStageStatus"] = "completed"
attrs["sparkAppStageStatus"] = "complete"

writeGauge(
metricPrefix,
Expand Down Expand Up @@ -397,7 +417,7 @@ func collectSparkAppJobMetrics(
writer,
)

attrs["sparkAppTaskStatus"] = "completed"
attrs["sparkAppTaskStatus"] = "complete"

writeGauge(
metricPrefix,
Expand Down Expand Up @@ -521,8 +541,8 @@ func collectSparkAppStageMetrics(
attrs["sparkAppStageStatus"] = stageStatus
// @TODO: The attributes below may cause high cardinality. Further
// investigation is needed.
//attrs["sparkAppStageId"] = stage.StageId
//attrs["sparkAppStageAttemptId"] = stage.AttemptId
attrs["sparkAppStageId"] = stage.StageId
attrs["sparkAppStageAttemptId"] = stage.AttemptId
//attrs["sparkAppStageSchedulingPool"] = stage.SchedulingPool
//attrs["sparkAppStageResourceProfileId"] = stage.ResourceProfileId

Expand All @@ -540,6 +560,26 @@ func collectSparkAppStageMetrics(

// Write all the things.

if stage.CompletionTime != "" {
stageDuration, err := calcDateDifferenceMillis(
stage.FirstTaskLaunchedTime,
stage.CompletionTime,
)
if err != nil {
log.Warnf("could not calculate stage duration: %v", err)

continue
}

writeGauge(
metricPrefix,
"app.stage.duration",
stageDuration,
attrs,
writer,
)
}

writeGauge(
metricPrefix,
"app.stage.peakNettyDirectMemory",
Expand Down Expand Up @@ -831,8 +871,8 @@ func collectSparkAppStageMetrics(
}

writePeakMemoryMetrics(
metricPrefix + "app.stage.memory.peak.",
&stage.PeakMemoryMetrics,
metricPrefix + "app.stage.executor.memory.peak.",
&stage.PeakExecutorMetrics,
attrs,
writer,
)
Expand Down Expand Up @@ -972,8 +1012,8 @@ func writeStageTaskMetrics(
taskMetricAttrs["sparkAppTaskSpeculative"] = task.Speculative
// @TODO: The attributes below may cause high cardinality. Further
// investigation is needed.
//attrs["sparkAppTaskId"] = task.TaskId
//attrs["sparkAppTaskAttempt"] = task.Attempt
taskMetricAttrs["sparkAppTaskId"] = task.TaskId
taskMetricAttrs["sparkAppTaskAttempt"] = task.Attempt
//attrs["sparkAppTaskPartitionId"] = task.PartitionId

writeGauge(
Expand Down
2 changes: 1 addition & 1 deletion internal/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ type SparkStage struct {
Tasks map[string]SparkTask `json:"tasks"`
ExecutorSummary map[string]SparkExecutorSummary `json:"executorSummary"`
ResourceProfileId int `json:"resourceProfileId"`
PeakMemoryMetrics SparkExecutorPeakMemoryMetrics `json:"peakMemoryMetrics"`
PeakExecutorMetrics SparkExecutorPeakMemoryMetrics `json:"peakExecutorMetrics"`
ShuffleMergersCount int `json:"shuffleMergersCount"`
}

Expand Down
26 changes: 26 additions & 0 deletions internal/spark/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package spark

import (
"fmt"
"time"
)

const (
RFC3339Milli = "2006-01-02T15:04:05.000GMT"
)

func calcDateDifferenceMillis(startDate string, endDate string) (
int64, error,
) {
start, err := time.Parse(RFC3339Milli, startDate)
if err != nil {
return 0, fmt.Errorf("invalid start date: %s", startDate)
}

end, err := time.Parse(RFC3339Milli, endDate)
if err != nil {
return 0, fmt.Errorf("invalid end date: %s", endDate)
}

return end.Sub(start).Milliseconds(), nil
}

0 comments on commit 3f2c13c

Please sign in to comment.