diff --git a/pkg/mvtxdaemon/server/service/mvtx_service.go b/pkg/mvtxdaemon/server/service/mvtx_service.go index 7675cca6b..f835ce6f7 100644 --- a/pkg/mvtxdaemon/server/service/mvtx_service.go +++ b/pkg/mvtxdaemon/server/service/mvtx_service.go @@ -179,6 +179,7 @@ func (mvs *MonoVertexService) startHealthCheck(ctx context.Context) { } } +// GetMonoVertexLookback returns the current lookback of the MonoVertex from the rater func (mvs *MonoVertexService) GetMonoVertexLookback(ctx context.Context, empty *emptypb.Empty) (*mvtxdaemon.GetMonoVertexLookbackResponse, error) { return &mvtxdaemon.GetMonoVertexLookbackResponse{ Lookback: mvs.rater.GetLookBack(), diff --git a/pkg/mvtxdaemon/server/service/rater/rater.go b/pkg/mvtxdaemon/server/service/rater/rater.go index 6c54b35f4..24184c4f4 100644 --- a/pkg/mvtxdaemon/server/service/rater/rater.go +++ b/pkg/mvtxdaemon/server/service/rater/rater.go @@ -68,12 +68,13 @@ type Rater struct { timestampedPodCounts *sharedqueue.OverflowQueue[*TimestampedCounts] // timestampedPodProcessingTime is a map between vertex name and a queue of timestamped processing times for that vertex timestampedPodProcessingTime *sharedqueue.OverflowQueue[*TimestampedProcessingTime] - // userSpecifiedLookBackSeconds is a map between vertex name and the user-specified lookback seconds for that vertex + // userSpecifiedLookBackSeconds the current lookback seconds for the monovertex + // this can be updated dynamically, defaults to user-specified value in the spec userSpecifiedLookBackSeconds *atomic.Float64 options *options } -// GetLookBack is used +// GetLookBack returns the current lookback seconds for the MonoVertex. func (r *Rater) GetLookBack() *wrapperspb.DoubleValue { return wrapperspb.Double(r.userSpecifiedLookBackSeconds.Load()) } @@ -167,7 +168,6 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int) error now := time.Now().Add(CountWindow).Truncate(CountWindow).Unix() UpdateCount(r.timestampedPodCounts, now, podReadCount) UpdateProcessingTime(r.timestampedPodProcessingTime, now, processingTime) - r.log.Infof("MYDEBUG: processing rate vertex %s is: %v", pInfo.monoVertexName, r.timestampedPodProcessingTime) return nil } @@ -214,9 +214,6 @@ func (r *Rater) GetRates() map[string]*wrapperspb.DoubleValue { var result = make(map[string]*wrapperspb.DoubleValue) // calculate rates for each lookback seconds for n, i := range r.buildLookbackSecondsMap() { - if n == "default" { - r.log.Infof("MYDEBUG: lookback %d", i) - } rate := CalculateRate(r.timestampedPodCounts, i) result[n] = wrapperspb.Double(rate) } diff --git a/pkg/mvtxdaemon/server/service/rater/rater_lookback_utils.go b/pkg/mvtxdaemon/server/service/rater/rater_lookback_utils.go index 6b5c1656d..f1bf41414 100644 --- a/pkg/mvtxdaemon/server/service/rater/rater_lookback_utils.go +++ b/pkg/mvtxdaemon/server/service/rater/rater_lookback_utils.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package rater import ( @@ -10,9 +26,9 @@ import ( sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue" ) -// PodProcessingTime is a struct to maintain processing time for a batch by a pod +// PodProcessingTime is a struct to maintain the required data +// to calculate the processing time for a batch by a pod type PodProcessingTime struct { - // pod name name string processingTimeSum float64 processingTimeCount float64 @@ -22,7 +38,7 @@ func (p *PodProcessingTime) Name() string { return p.name } -func (p *PodProcessingTime) processingTime() (float64, float64) { +func (p *PodProcessingTime) processingTimeValues() (float64, float64) { return p.processingTimeSum, p.processingTimeCount } @@ -35,7 +51,6 @@ func (r *Rater) updateDynamicLookbackSecs() { processingTimeSeconds, update := r.CalculateVertexProcessingTime(r.timestampedPodProcessingTime) r.log.Infof("MYDEBUG: processingTimeSeconds %f ", processingTimeSeconds) if !update { - r.log.Infof("MYDEBUG: NO UPDATE processingTimeSeconds %f ", processingTimeSeconds) return } // if the current calculated processing time is greater than the lookback Seconds, update it @@ -44,15 +59,17 @@ func (r *Rater) updateDynamicLookbackSecs() { // round up to the nearest minute, also ensure that while going up and down we have the consistent value for // a given processingTimeSeconds, then convert back to seconds roundedProcessingTime := 60 * int(math.Ceil(processingTimeSeconds/60)) + // step up case if roundedProcessingTime > int(currentVal) { r.userSpecifiedLookBackSeconds.Store(math.Min(MaxLookback.Seconds(), float64(roundedProcessingTime))) - r.log.Infof("MYDEBUG: Updated for vertex %s, old %f new %d", vertexName, currentVal, roundedProcessingTime) + r.log.Infof("Lookback updated for mvtx %s, old %f new %d", vertexName, currentVal, roundedProcessingTime) } else { + // step down case // We should not be setting values lower than the lookBackSeconds defined in the spec roundedProcessingTime = int(math.Max(float64(roundedProcessingTime), float64(r.monoVertex.Spec.Scale.GetLookbackSeconds()))) if roundedProcessingTime != int(currentVal) { r.userSpecifiedLookBackSeconds.Store(float64(roundedProcessingTime)) - r.log.Infof("MYDEBUG: Updated for vertex %s, old %f new %d", vertexName, currentVal, roundedProcessingTime) + r.log.Infof("Lookback updated for mvtx %s, old %f new %d", vertexName, currentVal, roundedProcessingTime) } } } @@ -109,7 +126,6 @@ func (r *Rater) CalculateVertexProcessingTime(q *sharedqueue.OverflowQueue[*Time average := totalTime / float64(totalCnt) if average > maxAverage { maxAverage = average - r.log.Infof("MYDEBUG: Using %d vals %f ", totalCnt, average) } } @@ -118,7 +134,7 @@ func (r *Rater) CalculateVertexProcessingTime(q *sharedqueue.OverflowQueue[*Time return maxAverage, true } -// getPodProcessingTime +// getPodProcessingTime is a utility function to get the metrics from the pod func (r *Rater) getPodProcessingTime(podName string) *PodProcessingTime { processingTimeCountMetric := "monovtx_processing_time" headlessServiceName := r.monoVertex.GetHeadlessServiceName() @@ -149,7 +165,7 @@ func (r *Rater) getPodProcessingTime(podName string) *PodProcessingTime { podCount = float64(ele.Histogram.GetSampleCount()) podSum = ele.Histogram.GetSampleSum() } else { - r.log.Infof("[Pod name %s]: Metric %q is unavailable, the pod might haven't started processing data", podName, processingTimeCountMetric) + r.log.Debugf("[Pod name %s]: Metric %q is unavailable, the pod might haven't started processing data", podName, processingTimeCountMetric) return nil } return &PodProcessingTime{ diff --git a/pkg/mvtxdaemon/server/service/rater/timestamped_processing_counts.go b/pkg/mvtxdaemon/server/service/rater/timestamped_processing_counts.go index 575f4487e..96c602695 100644 --- a/pkg/mvtxdaemon/server/service/rater/timestamped_processing_counts.go +++ b/pkg/mvtxdaemon/server/service/rater/timestamped_processing_counts.go @@ -42,7 +42,7 @@ func (tc *TimestampedProcessingTime) Update(podReadCount *PodProcessingTime) { if podReadCount == nil { return } - sum, count := podReadCount.processingTime() + sum, count := podReadCount.processingTimeValues() // Convert microseconds to seconds microseconds := sum / count