diff --git a/pkg/daemon/server/service/rater/rater.go b/pkg/daemon/server/service/rater/rater.go index b858cc968f..86b2ad10de 100644 --- a/pkg/daemon/server/service/rater/rater.go +++ b/pkg/daemon/server/service/rater/rater.go @@ -218,12 +218,11 @@ func sleep(ctx context.Context, duration time.Duration) { // since a pod can read from multiple partitions, we will return a map of partition to read count. func (r *Rater) getPodReadCounts(vertexName, podName string) *PodReadCount { readTotalMetricName := "forwarder_data_read_total" - // scrape the read total metric from pod metric port url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort) resp, err := r.httpClient.Get(url) if err != nil { - r.log.Errorf("[vertex name %s, pod name %s]: failed reading the metrics endpoint, %v", vertexName, podName, err.Error()) + r.log.Warnf("[vertex name %s, pod name %s]: failed reading the metrics endpoint, the pod might have been scaled down: %v", vertexName, podName, err.Error()) return nil } defer resp.Body.Close() @@ -255,7 +254,7 @@ func (r *Rater) getPodReadCounts(vertexName, podName string) *PodReadCount { podReadCount := &PodReadCount{podName, partitionReadCount} return podReadCount } else { - r.log.Errorf("[vertex name %s, pod name %s]: failed getting the read total metric, the metric is not available.", vertexName, podName) + r.log.Infof("[vertex name %s, pod name %s]: Metric %q is unavailable, the pod might haven't started processing data", vertexName, podName, readTotalMetricName) return nil } } diff --git a/pkg/mvtxdaemon/server/service/rater/rater.go b/pkg/mvtxdaemon/server/service/rater/rater.go index d160edd838..19a3ee87dc 100644 --- a/pkg/mvtxdaemon/server/service/rater/rater.go +++ b/pkg/mvtxdaemon/server/service/rater/rater.go @@ -33,7 +33,7 @@ import ( ) const CountWindow = time.Second * 10 -const MonoVtxReadMetricName = "monovtx_read_total" +const monoVtxReadMetricName = "monovtx_read_total" // MonoVtxRatable is the interface for the Rater struct. type MonoVtxRatable interface { @@ -161,7 +161,7 @@ func (r *Rater) getPodReadCounts(podName string) *PodReadCount { url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, headlessServiceName, r.monoVertex.Namespace, v1alpha1.MonoVertexMetricsPort) resp, err := r.httpClient.Get(url) if err != nil { - r.log.Errorf("[MonoVertex name %s, pod name %s]: failed reading the metrics endpoint, %v", r.monoVertex.Name, podName, err.Error()) + r.log.Warnf("[Pod name %s]: failed reading the metrics endpoint, the pod might have been scaled down: %v", podName, err.Error()) return nil } defer resp.Body.Close() @@ -169,11 +169,11 @@ func (r *Rater) getPodReadCounts(podName string) *PodReadCount { textParser := expfmt.TextParser{} result, err := textParser.TextToMetricFamilies(resp.Body) if err != nil { - r.log.Errorf("[MonoVertex name %s, pod name %s]: failed parsing to prometheus metric families, %v", r.monoVertex.Name, podName, err.Error()) + r.log.Errorf("[Pod name %s]: failed parsing to prometheus metric families, %v", podName, err.Error()) return nil } - if value, ok := result[MonoVtxReadMetricName]; ok && value != nil && len(value.GetMetric()) > 0 { + if value, ok := result[monoVtxReadMetricName]; ok && value != nil && len(value.GetMetric()) > 0 { metricsList := value.GetMetric() // Each pod should be emitting only one metric with this name, so we should be able to take the first value // from the results safely. @@ -182,7 +182,7 @@ func (r *Rater) getPodReadCounts(podName string) *PodReadCount { podReadCount := &PodReadCount{podName, metricsList[0].Untyped.GetValue()} return podReadCount } else { - r.log.Errorf("[MonoVertex name %s, pod name %s]: failed getting the read total metric, the metric is not available.", r.monoVertex.Name, podName) + r.log.Infof("[Pod name %s]: Metric %q is unavailable, the pod might haven't started processing data", podName, monoVtxReadMetricName) return nil } }