diff --git a/docs/operations/metrics/metrics.md b/docs/operations/metrics/metrics.md index 4a36666443..049bcc98ab 100644 --- a/docs/operations/metrics/metrics.md +++ b/docs/operations/metrics/metrics.md @@ -73,6 +73,7 @@ These metrics can be used to determine if there are any errors in the pipeline | Metric name | Metric type | Labels | Description | | --------------------------------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------ | +| `pipeline_data_processing_health` | Gauge | `pipeline=` | Pipeline data processing health status. 1: Healthy, 0: Unknown, -1: Warning, -2: Critical | | `forwarder_platform_error_total` | Counter | `pipeline=`
`vertex=`
`vertex_type=`
`replica=` | Indicates any internal errors which could stop pipeline processing | | `forwarder_read_error_total` | Counter | `pipeline=`
`vertex=`
`vertex_type=`
`replica=`
`partition_name=` | Indicates any errors while reading messages by the forwarder | | `forwarder_write_error_total` | Counter | `pipeline=`
`vertex=` `vertex_type=`

`replica=`
`partition_name=` | Indicates any errors while writing messages by the forwarder | diff --git a/pkg/daemon/server/daemon_server.go b/pkg/daemon/server/daemon_server.go index 746f856a74..768f789dde 100644 --- a/pkg/daemon/server/daemon_server.go +++ b/pkg/daemon/server/daemon_server.go @@ -288,8 +288,8 @@ func (ds *daemonServer) exposeMetrics(ctx context.Context) { } } } - // if the data hasn't arrived the sink vertex - // set the lag to be -1 + + //exposing pipeline processing lag metric. if minWM < 0 { pipelineProcessingLag.WithLabelValues(ds.pipeline.Name).Set(-1) } else { @@ -300,11 +300,31 @@ func (ds *daemonServer) exposeMetrics(ctx context.Context) { } } + // exposing the watermark delay to current time metric. if maxWM == math.MinInt64 { watermarkCmpNow.WithLabelValues(ds.pipeline.Name).Set(-1) } else { watermarkCmpNow.WithLabelValues(ds.pipeline.Name).Set(float64(time.Now().UnixMilli() - maxWM)) } + + //exposing Pipeline data processing health metric. + pipelineDataHealth, err := ds.metaDataQuery.GetPipelineStatus(ctx, &daemon.GetPipelineStatusRequest{Pipeline: ds.pipeline.Name}) + + if err != nil { + log.Errorw("Failed to get data processing health status", zap.Error(err)) + continue + } + switch pipelineDataHealth.Status.Status { + case v1alpha1.PipelineStatusHealthy: + dataProcessingHealth.WithLabelValues(ds.pipeline.Name).Set(1) + case v1alpha1.PipelineStatusWarning: + dataProcessingHealth.WithLabelValues(ds.pipeline.Name).Set(-1) + case v1alpha1.PipelineStatusCritical: + dataProcessingHealth.WithLabelValues(ds.pipeline.Name).Set(-2) + default: + dataProcessingHealth.WithLabelValues(ds.pipeline.Name).Set(0) + } + case <-ctx.Done(): return } diff --git a/pkg/daemon/server/metrics.go b/pkg/daemon/server/metrics.go index fd70a6029f..c27bdeedb0 100644 --- a/pkg/daemon/server/metrics.go +++ b/pkg/daemon/server/metrics.go @@ -42,4 +42,10 @@ var ( Name: "watermark_cmp_now", Help: "Max source watermark compared with current time in milliseconds", }, []string{metrics.LabelPipeline}) + + dataProcessingHealth = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: "pipeline", + Name: "data_processing_health", + Help: "Pipeline data processing health status. 1: Healthy, 0: Unknown, -1: Warning, -2: Critical", + }, []string{metrics.LabelPipeline}) )