Skip to content

Commit

Permalink
chore: suppress some misleading logs from raters (numaproj#2075)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored and SaniyaKalamkar committed Jan 19, 2025
1 parent 6ffc13b commit ccdf654
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
5 changes: 2 additions & 3 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,11 @@ func sleep(ctx context.Context, duration time.Duration) {
// since a pod can read from multiple partitions, we will return a map of partition to read count.
func (r *Rater) getPodReadCounts(vertexName, podName string) *PodReadCount {
readTotalMetricName := "forwarder_data_read_total"

// scrape the read total metric from pod metric port
url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort)
resp, err := r.httpClient.Get(url)
if err != nil {
r.log.Errorf("[vertex name %s, pod name %s]: failed reading the metrics endpoint, %v", vertexName, podName, err.Error())
r.log.Warnf("[vertex name %s, pod name %s]: failed reading the metrics endpoint, the pod might have been scaled down: %v", vertexName, podName, err.Error())
return nil
}
defer resp.Body.Close()
Expand Down Expand Up @@ -255,7 +254,7 @@ func (r *Rater) getPodReadCounts(vertexName, podName string) *PodReadCount {
podReadCount := &PodReadCount{podName, partitionReadCount}
return podReadCount
} else {
r.log.Errorf("[vertex name %s, pod name %s]: failed getting the read total metric, the metric is not available.", vertexName, podName)
r.log.Infof("[vertex name %s, pod name %s]: Metric %q is unavailable, the pod might haven't started processing data", vertexName, podName, readTotalMetricName)
return nil
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/mvtxdaemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

const CountWindow = time.Second * 10
const MonoVtxReadMetricName = "monovtx_read_total"
const monoVtxReadMetricName = "monovtx_read_total"

// MonoVtxRatable is the interface for the Rater struct.
type MonoVtxRatable interface {
Expand Down Expand Up @@ -161,19 +161,19 @@ func (r *Rater) getPodReadCounts(podName string) *PodReadCount {
url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, headlessServiceName, r.monoVertex.Namespace, v1alpha1.MonoVertexMetricsPort)
resp, err := r.httpClient.Get(url)
if err != nil {
r.log.Errorf("[MonoVertex name %s, pod name %s]: failed reading the metrics endpoint, %v", r.monoVertex.Name, podName, err.Error())
r.log.Warnf("[Pod name %s]: failed reading the metrics endpoint, the pod might have been scaled down: %v", podName, err.Error())
return nil
}
defer resp.Body.Close()

textParser := expfmt.TextParser{}
result, err := textParser.TextToMetricFamilies(resp.Body)
if err != nil {
r.log.Errorf("[MonoVertex name %s, pod name %s]: failed parsing to prometheus metric families, %v", r.monoVertex.Name, podName, err.Error())
r.log.Errorf("[Pod name %s]: failed parsing to prometheus metric families, %v", podName, err.Error())
return nil
}

if value, ok := result[MonoVtxReadMetricName]; ok && value != nil && len(value.GetMetric()) > 0 {
if value, ok := result[monoVtxReadMetricName]; ok && value != nil && len(value.GetMetric()) > 0 {
metricsList := value.GetMetric()
// Each pod should be emitting only one metric with this name, so we should be able to take the first value
// from the results safely.
Expand All @@ -182,7 +182,7 @@ func (r *Rater) getPodReadCounts(podName string) *PodReadCount {
podReadCount := &PodReadCount{podName, metricsList[0].Untyped.GetValue()}
return podReadCount
} else {
r.log.Errorf("[MonoVertex name %s, pod name %s]: failed getting the read total metric, the metric is not available.", r.monoVertex.Name, podName)
r.log.Infof("[Pod name %s]: Metric %q is unavailable, the pod might haven't started processing data", podName, monoVtxReadMetricName)
return nil
}
}
Expand Down

0 comments on commit ccdf654

Please sign in to comment.