Skip to content

Commit

Permalink
chore: expose data health metrics (numaproj#1862)
Browse files Browse the repository at this point in the history
Signed-off-by: Samhith Kakarla <[email protected]>
  • Loading branch information
samhith-kakarla authored Jul 25, 2024
1 parent 51a21fa commit 0466a8f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/operations/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ These metrics can be used to determine if there are any errors in the pipeline

| Metric name | Metric type | Labels | Description |
| --------------------------------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------ |
| `pipeline_data_processing_health` | Gauge | `pipeline=<pipeline-name>` | Pipeline data processing health status. 1: Healthy, 0: Unknown, -1: Warning, -2: Critical |
| `forwarder_platform_error_total` | Counter | `pipeline=<pipeline-name>` <br> `vertex=<vertex-name>` <br> `vertex_type=<vertex-type>` <br> `replica=<replica-index>` | Indicates any internal errors which could stop pipeline processing |
| `forwarder_read_error_total` | Counter | `pipeline=<pipeline-name>` <br> `vertex=<vertex-name>` <br> `vertex_type=<vertex-type>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Indicates any errors while reading messages by the forwarder |
| `forwarder_write_error_total` | Counter | `pipeline=<pipeline-name>` <br> `vertex=<vertex-name>` `vertex_type=<vertex-type>` <br> <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Indicates any errors while writing messages by the forwarder |
Expand Down
24 changes: 22 additions & 2 deletions pkg/daemon/server/daemon_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ func (ds *daemonServer) exposeMetrics(ctx context.Context) {
}
}
}
// if the data hasn't arrived the sink vertex
// set the lag to be -1

//exposing pipeline processing lag metric.
if minWM < 0 {
pipelineProcessingLag.WithLabelValues(ds.pipeline.Name).Set(-1)
} else {
Expand All @@ -300,11 +300,31 @@ func (ds *daemonServer) exposeMetrics(ctx context.Context) {
}
}

// exposing the watermark delay to current time metric.
if maxWM == math.MinInt64 {
watermarkCmpNow.WithLabelValues(ds.pipeline.Name).Set(-1)
} else {
watermarkCmpNow.WithLabelValues(ds.pipeline.Name).Set(float64(time.Now().UnixMilli() - maxWM))
}

//exposing Pipeline data processing health metric.
pipelineDataHealth, err := ds.metaDataQuery.GetPipelineStatus(ctx, &daemon.GetPipelineStatusRequest{Pipeline: ds.pipeline.Name})

if err != nil {
log.Errorw("Failed to get data processing health status", zap.Error(err))
continue
}
switch pipelineDataHealth.Status.Status {
case v1alpha1.PipelineStatusHealthy:
dataProcessingHealth.WithLabelValues(ds.pipeline.Name).Set(1)
case v1alpha1.PipelineStatusWarning:
dataProcessingHealth.WithLabelValues(ds.pipeline.Name).Set(-1)
case v1alpha1.PipelineStatusCritical:
dataProcessingHealth.WithLabelValues(ds.pipeline.Name).Set(-2)
default:
dataProcessingHealth.WithLabelValues(ds.pipeline.Name).Set(0)
}

case <-ctx.Done():
return
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/daemon/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,10 @@ var (
Name: "watermark_cmp_now",
Help: "Max source watermark compared with current time in milliseconds",
}, []string{metrics.LabelPipeline})

dataProcessingHealth = promauto.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "pipeline",
Name: "data_processing_health",
Help: "Pipeline data processing health status. 1: Healthy, 0: Unknown, -1: Warning, -2: Critical",
}, []string{metrics.LabelPipeline})
)

0 comments on commit 0466a8f

Please sign in to comment.