From afc16ac0bc2633071f2843b5eecf2de1b44159d0 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Sat, 1 Feb 2025 09:32:02 -0800 Subject: [PATCH] feat: adaptive lookback for monovertex (#2373) --- docs/user-guide/reference/autoscaling.md | 2 + pkg/apis/numaflow/v1alpha1/const.go | 1 + pkg/apis/numaflow/v1alpha1/scale.go | 3 +- pkg/metrics/metrics.go | 14 ++ pkg/mvtxdaemon/server/service/rater/helper.go | 92 +++++++++++++ .../server/service/rater/helper_test.go | 127 ++++++++++++++++++ pkg/mvtxdaemon/server/service/rater/rater.go | 92 ++++++++++++- .../service/rater/timestamped_counts.go | 6 + 8 files changed, 330 insertions(+), 7 deletions(-) diff --git a/docs/user-guide/reference/autoscaling.md b/docs/user-guide/reference/autoscaling.md index e78cc0ec59..fff13191c0 100644 --- a/docs/user-guide/reference/autoscaling.md +++ b/docs/user-guide/reference/autoscaling.md @@ -49,6 +49,8 @@ spec: don't want the vertices to be scaled down to `0`. In this case, you need to increase `lookbackSeconds` to overlap 5 minutes, so that the calculated average rate and pending messages won't be `0` during the silent period, in order to prevent from scaling down to 0. + The max value allowed to be configured is `600`. + On top of this, we have dynamic lookback adjustment which tunes this parameter based on the realtime processing data. - `scaleUpCooldownSeconds` - After a scaling operation, how many seconds to wait for the same vertex, if the follow-up operation is a scaling up, defaults to `90`. Please make sure that the time is greater than the pod to be `Running` and start processing, because the autoscaling algorithm will divide the TPS by the number of pods even if the pod is not `Running`. diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index 8f747f91ef..e9a539179e 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -183,6 +183,7 @@ const ( DefaultTargetProcessingSeconds = 20 // Default targeted time in seconds to finish processing all the pending messages for a source DefaultTargetBufferAvailability = 50 // Default targeted percentage of buffer availability DefaultReplicasPerScale = 2 // Default maximum replicas to be scaled up or down at once + MaxLookbackSeconds = 600 // Max lookback seconds for calculating avg rate and pending // Default persistent buffer queue options DefaultPBQChannelBufferSize = 100 // Default channel size in int (what should be right value?) diff --git a/pkg/apis/numaflow/v1alpha1/scale.go b/pkg/apis/numaflow/v1alpha1/scale.go index 9fbe58da04..4ee06f4b99 100644 --- a/pkg/apis/numaflow/v1alpha1/scale.go +++ b/pkg/apis/numaflow/v1alpha1/scale.go @@ -69,7 +69,8 @@ type Scale struct { func (s Scale) GetLookbackSeconds() int { if s.LookbackSeconds != nil { - return int(*s.LookbackSeconds) + // do not allow the value to be larger than the MaxLookbackSeconds in our config + return min(MaxLookbackSeconds, int(*s.LookbackSeconds)) } return DefaultLookbackSeconds } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 53605eca2c..5d2d722545 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -370,3 +370,17 @@ var ( Help: "Total number of Write Errors while writing to a fallback sink", }, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName}) ) + +// Daemon server metrics +var ( + // MonoVertexLookBackSecs is a gauge used to indicate what is the current lookback window value being used + // by a given monovertex. It is used as how many seconds to lookback for vertex average processing rate + // (tps) and pending messages calculation, defaults to 120. Rate and pending messages metrics are + // critical for autoscaling. + MonoVertexLookBackSecs = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: "monovtx", + Name: "lookback_window_seconds", + Help: "A metric to show what is the lookback window value being used by a given monovertex. " + + "Look back Seconds is critical in autoscaling calculations", + }, []string{LabelMonoVertexName}) +) diff --git a/pkg/mvtxdaemon/server/service/rater/helper.go b/pkg/mvtxdaemon/server/service/rater/helper.go index ce9aac1ed4..0091fd984e 100644 --- a/pkg/mvtxdaemon/server/service/rater/helper.go +++ b/pkg/mvtxdaemon/server/service/rater/helper.go @@ -125,3 +125,95 @@ func calculatePodDelta(tc1, tc2 *TimestampedCounts) float64 { } return delta } + +// podLastSeen stores the last seen timestamp and count of each pod +type podLastSeen struct { + count float64 + seenTime int64 +} + +type podMaxDuration struct { + // Map to keep track of the last seen count and timestamp of each pod. + lastSeen map[string]podLastSeen + // Map to store the maximum duration for which the value of any pod was unchanged. + maxUnchangedDuration map[string]int64 +} + +// CalculateMaxLookback computes the maximum duration (in seconds) for which the count of messages processed by any pod +// remained unchanged within a specified range of indices in a queue of TimestampedCounts. It does this by analyzing each +// data point between the startIndex and endIndex, checking the count changes for each pod, and noting the durations +// during which these counts stay consistent. The metric is updated when data is read by the pod +// This would encapsulate the lookback for two scenarios +// 1. Slow processing vertex +// 2. Slow data source - data arrives after long intervals +func CalculateMaxLookback(counts []*TimestampedCounts, startIndex, endIndex int) int64 { + lookBackData := podMaxDuration{ + lastSeen: make(map[string]podLastSeen), + maxUnchangedDuration: make(map[string]int64), + } + processTimeline(counts, startIndex, endIndex, &lookBackData) + finalizeDurations(counts[endIndex], &lookBackData) + return findGlobalMaxDuration(lookBackData.maxUnchangedDuration) +} + +// processTimeline processes the timeline of counts and updates the maxUnchangedDuration for each pod. +func processTimeline(counts []*TimestampedCounts, startIndex, endIndex int, data *podMaxDuration) { + for i := startIndex; i <= endIndex; i++ { + item := counts[i].PodCountSnapshot() + curTime := counts[i].PodTimestamp() + + for key, curCount := range item { + lastSeenData, found := data.lastSeen[key] + if found && lastSeenData.count == curCount { + continue + } + + // If the read count data has updated + if found && curCount > lastSeenData.count { + duration := curTime - lastSeenData.seenTime + if currentMax, ok := data.maxUnchangedDuration[key]; !ok || duration > currentMax { + data.maxUnchangedDuration[key] = duration + } + } + // The value is updated in the lastSeen for 3 cases + // 1. If this is the first time seeing the pod entry or + // 2. in case of a value increase, + // 3. In case of a value decrease which is treated as a new entry for pod + data.lastSeen[key] = podLastSeen{curCount, curTime} + } + } +} + +// Check for pods that did not change at all during the iteration, +// and update their maxUnchangedDuration to the full period from first seen to lastTime. +// Note: There is a case where one pod was getting data earlier, but then stopped altogether. +// For example, one partition in Kafka not getting data after a while. This case will not be covered +// by our logic, and we would keep increasing the look back in such a scenario. +func finalizeDurations(lastCount *TimestampedCounts, data *podMaxDuration) { + endVals := lastCount.PodCountSnapshot() + lastTime := lastCount.PodTimestamp() + for key, lastSeenData := range data.lastSeen { + endDuration := lastTime - lastSeenData.seenTime + // This condition covers two scenarios: + // 1. There is an entry in the last seen, but not in maxUnchangedDuration + // It was seen once, but value never changed. In this case update the maxDuration, but only when + // the count > 0 + // 2. The value has not changed till the boundary, and this duration is larger than the current max + if _, exists := endVals[key]; exists && (lastSeenData.count != 0) { + if currentMax, ok := data.maxUnchangedDuration[key]; !ok || endDuration > currentMax { + data.maxUnchangedDuration[key] = endDuration + } + } + } +} + +// Calculate the maximum duration found across all pods. +func findGlobalMaxDuration(maxUnchangedDuration map[string]int64) int64 { + globalMaxSecs := int64(0) + for _, duration := range maxUnchangedDuration { + if duration > globalMaxSecs { + globalMaxSecs = duration + } + } + return globalMaxSecs +} diff --git a/pkg/mvtxdaemon/server/service/rater/helper_test.go b/pkg/mvtxdaemon/server/service/rater/helper_test.go index f3e61e3390..3812567c80 100644 --- a/pkg/mvtxdaemon/server/service/rater/helper_test.go +++ b/pkg/mvtxdaemon/server/service/rater/helper_test.go @@ -17,6 +17,7 @@ limitations under the License. package rater import ( + "sync" "testing" "time" @@ -286,3 +287,129 @@ func TestCalculateRate(t *testing.T) { assert.Equal(t, 23.0, CalculateRate(q, 100)) }) } + +// Helper function to create a TimestampedCounts instance +func newTimestampedCounts(timestamp int64, counts map[string]float64) *TimestampedCounts { + return &TimestampedCounts{ + timestamp: timestamp, + podReadCounts: counts, + lock: new(sync.RWMutex), + } +} + +// TestCalculateMaxLookback tests various scenarios on the CalculateMaxLookback function +func TestCalculateMaxLookback(t *testing.T) { + tests := []struct { + name string + counts []*TimestampedCounts + startIndex int + endIndex int + expectedMax int64 + }{ + { + name: "Uniform data across the range", + counts: []*TimestampedCounts{ + newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 200}), + newTimestampedCounts(200, map[string]float64{"pod1": 100, "pod2": 200}), + newTimestampedCounts(400, map[string]float64{"pod1": 100, "pod2": 200}), + }, + startIndex: 0, + endIndex: 2, + expectedMax: 300, + }, + { + name: "Values change midway", + counts: []*TimestampedCounts{ + newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 150}), + newTimestampedCounts(240, map[string]float64{"pod1": 100, "pod2": 200}), + newTimestampedCounts(360, map[string]float64{"pod1": 150, "pod2": 200}), + }, + startIndex: 0, + endIndex: 2, + expectedMax: 260, + }, + { + name: "No data change across any pods", + counts: []*TimestampedCounts{ + newTimestampedCounts(100, map[string]float64{"pod1": 500}), + newTimestampedCounts(600, map[string]float64{"pod1": 500}), + }, + startIndex: 0, + endIndex: 1, + expectedMax: 500, // Entire duration + }, + { + name: "Edge Case: One entry only", + counts: []*TimestampedCounts{ + newTimestampedCounts(100, map[string]float64{"pod1": 100}), + }, + startIndex: 0, + endIndex: 0, + expectedMax: 0, // No duration difference + }, + { + name: "Rapid changes in sequential entries", + counts: []*TimestampedCounts{ + newTimestampedCounts(100, map[string]float64{"pod1": 500, "pod2": 400}), + newTimestampedCounts(130, map[string]float64{"pod1": 600, "pod2": 400}), + newTimestampedCounts(160, map[string]float64{"pod1": 600, "pod2": 600}), + }, + startIndex: 0, + endIndex: 2, + expectedMax: 60, + }, + { + // Here the pod has an initial read count, and then would we see a pod count as 0. + // This is equated as a refresh in counts, and thus + name: "Pod goes to zero", + counts: []*TimestampedCounts{ + newTimestampedCounts(0, map[string]float64{"pod1": 50}), // Initial count + newTimestampedCounts(30, map[string]float64{"pod1": 50}), + newTimestampedCounts(60, map[string]float64{"pod1": 0}), // Count falls to zero + newTimestampedCounts(120, map[string]float64{"pod1": 25}), // Count returns + newTimestampedCounts(180, map[string]float64{"pod1": 25}), // Count stays stable + newTimestampedCounts(240, map[string]float64{"pod1": 25}), // Count stays stable again + }, + startIndex: 0, + endIndex: 5, + expectedMax: 120, // from index 3,5 + }, + { + name: "Pod goes to zero - 2", + counts: []*TimestampedCounts{ + newTimestampedCounts(0, map[string]float64{"pod1": 60}), + newTimestampedCounts(60, map[string]float64{"pod1": 60}), + newTimestampedCounts(120, map[string]float64{"pod1": 70}), + newTimestampedCounts(180, map[string]float64{"pod1": 0}), + newTimestampedCounts(240, map[string]float64{"pod1": 25}), + newTimestampedCounts(300, map[string]float64{"pod1": 25}), + }, + startIndex: 0, + endIndex: 5, + expectedMax: 120, // here idx 0,2 should be used, after going to zero it resets + }, + { + // this is a case where one pod never got any data which we consider as read count = 0 always + // in such a case we should not use this pod for calculation + name: "One pod no data, other >0 ", + counts: []*TimestampedCounts{ + newTimestampedCounts(0, map[string]float64{"pod1": 0, "pod2": 5}), + newTimestampedCounts(60, map[string]float64{"pod1": 0, "pod2": 5}), + newTimestampedCounts(120, map[string]float64{"pod1": 0, "pod2": 5}), + newTimestampedCounts(180, map[string]float64{"pod1": 0, "pod2": 5}), + newTimestampedCounts(240, map[string]float64{"pod1": 0, "pod2": 6}), + newTimestampedCounts(300, map[string]float64{"pod1": 0, "pod2": 6}), + }, + startIndex: 0, + endIndex: 5, + expectedMax: 240, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + maxDuration := CalculateMaxLookback(tt.counts, tt.startIndex, tt.endIndex) + assert.Equal(t, tt.expectedMax, maxDuration) + }) + } +} diff --git a/pkg/mvtxdaemon/server/service/rater/rater.go b/pkg/mvtxdaemon/server/service/rater/rater.go index ac7422952e..38c3ed9dbb 100644 --- a/pkg/mvtxdaemon/server/service/rater/rater.go +++ b/pkg/mvtxdaemon/server/service/rater/rater.go @@ -19,15 +19,18 @@ package rater import ( "crypto/tls" "fmt" + "math" "net/http" "time" "github.com/prometheus/common/expfmt" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/net/context" "google.golang.org/protobuf/types/known/wrapperspb" "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/logging" sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue" ) @@ -35,6 +38,11 @@ import ( const CountWindow = time.Second * 10 const monoVtxReadMetricName = "monovtx_read_total" +// MaxLookback is the upper limit beyond which lookback value is not increased +// by the dynamic algorithm. This is chosen as a conservative limit +// where vertices taking beyond this time for processing might not be the +// best candidate for auto-scaling. Might be prudent to keep fixed pods at that time. + // MonoVtxRatable is the interface for the Rater struct. type MonoVtxRatable interface { Start(ctx context.Context) error @@ -63,9 +71,10 @@ type Rater struct { podTracker *PodTracker // timestampedPodCounts is a queue of timestamped counts for the MonoVertex timestampedPodCounts *sharedqueue.OverflowQueue[*TimestampedCounts] - // userSpecifiedLookBackSeconds is the user-specified lookback seconds for that MonoVertex - userSpecifiedLookBackSeconds int64 - options *options + // lookBackSeconds is the lookback time window used for scaling calculations + // this can be updated dynamically, defaults to user-specified value in the spec + lookBackSeconds *atomic.Float64 + options *options } // PodReadCount is a struct to maintain count of messages read by a pod of MonoVertex @@ -97,18 +106,20 @@ func NewRater(ctx context.Context, mv *v1alpha1.MonoVertex, opts ...Option) *Rat }, log: logging.FromContext(ctx).Named("Rater"), options: defaultOptions(), + // load the default lookback value from the spec + lookBackSeconds: atomic.NewFloat64(float64(mv.Spec.Scale.GetLookbackSeconds())), } - rater.podTracker = NewPodTracker(ctx, mv) // maintain the total counts of the last 30 minutes(1800 seconds) since we support 1m, 5m, 15m lookback seconds. rater.timestampedPodCounts = sharedqueue.New[*TimestampedCounts](int(1800 / CountWindow.Seconds())) - rater.userSpecifiedLookBackSeconds = int64(mv.Spec.Scale.GetLookbackSeconds()) for _, opt := range opts { if opt != nil { opt(rater.options) } } + // initialise the metric value for the lookback window + metrics.MonoVertexLookBackSecs.WithLabelValues(mv.Name).Set(rater.lookBackSeconds.Load()) return &rater } @@ -203,7 +214,10 @@ func (r *Rater) GetRates() map[string]*wrapperspb.DoubleValue { } func (r *Rater) buildLookbackSecondsMap() map[string]int64 { - lookbackSecondsMap := map[string]int64{"default": r.userSpecifiedLookBackSeconds} + // as the default lookback value can be changing dynamically, + // load the current value for the lookback seconds + lbValue := r.lookBackSeconds.Load() + lookbackSecondsMap := map[string]int64{"default": int64(lbValue)} for k, v := range fixedLookbackSeconds { lookbackSecondsMap[k] = v } @@ -223,6 +237,10 @@ func (r *Rater) Start(ctx context.Context) error { } }() + // start the dynamic lookback check which will be + // updating the lookback period based on the data read time. + go r.startDynamicLookBack(ctx) + // Worker group for i := 1; i <= r.options.workers; i++ { go r.monitor(ctx, i, keyCh) @@ -269,3 +287,65 @@ func sleep(ctx context.Context, duration time.Duration) { case <-time.After(duration): } } + +func (r *Rater) startDynamicLookBack(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + // Ensure the ticker is stopped to prevent a resource leak. + defer ticker.Stop() + for { + select { + case <-ticker.C: + r.updateDynamicLookbackSecs() + case <-ctx.Done(): + // If the context is canceled or expires exit + return + } + } +} + +// updateDynamicLookbackSecs continuously adjusts ths lookback duration based on the current +// processing time of the MonoVertex system. +func (r *Rater) updateDynamicLookbackSecs() { + counts := r.timestampedPodCounts.Items() + if len(counts) <= 1 { + return + } + // We will calculate the processing time for a time window = 3 * currentLookback + // This ensures that we have enough data to capture one complete processing + currentLookback := r.lookBackSeconds.Load() + startIndex := findStartIndex(3*int64(currentLookback), counts) + // we consider the last but one element as the end index because the last element might be incomplete + // we can be sure that the last but one element in the queue is complete. + endIndex := len(counts) - 2 + if startIndex == indexNotFound { + return + } + + // time diff in seconds. + timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp + if timeDiff == 0 { + // no action required here + return + } + maxProcessingTime := CalculateMaxLookback(counts, startIndex, endIndex) + // round up to the nearest minute, also ensure that while going up and down we have the consistent value for + // a given processingTimeSeconds, then convert back to seconds + roundedMaxLookback := 60.0 * (math.Ceil(float64(maxProcessingTime) / 60.0)) + // Based on the value received we can have two cases + // 1. Step up case (value is > than current): + // Do not allow the value to be increased more than the MaxLookback allowed (10mins) + // 2. Step Down (value is <= than current) + // Do not allow the value to be lower the lookback value specified in the spec + if roundedMaxLookback > currentLookback { + roundedMaxLookback = math.Min(roundedMaxLookback, float64(v1alpha1.MaxLookbackSeconds)) + } else { + roundedMaxLookback = math.Max(roundedMaxLookback, float64(r.monoVertex.Spec.Scale.GetLookbackSeconds())) + } + // If the value has changed, update it + if roundedMaxLookback != currentLookback { + r.lookBackSeconds.Store(roundedMaxLookback) + r.log.Infof("Lookback updated for mvtx %s, Current: %f Updated %f", r.monoVertex.Name, currentLookback, roundedMaxLookback) + // update the metric value for the lookback window + metrics.MonoVertexLookBackSecs.WithLabelValues(r.monoVertex.Name).Set(roundedMaxLookback) + } +} diff --git a/pkg/mvtxdaemon/server/service/rater/timestamped_counts.go b/pkg/mvtxdaemon/server/service/rater/timestamped_counts.go index ee2a13519b..e3761bb3ed 100644 --- a/pkg/mvtxdaemon/server/service/rater/timestamped_counts.go +++ b/pkg/mvtxdaemon/server/service/rater/timestamped_counts.go @@ -71,3 +71,9 @@ func (tc *TimestampedCounts) String() string { defer tc.lock.RUnlock() return fmt.Sprintf("{timestamp: %d, podReadCounts: %v}", tc.timestamp, tc.podReadCounts) } + +func (tc *TimestampedCounts) PodTimestamp() int64 { + tc.lock.RLock() + defer tc.lock.RUnlock() + return tc.timestamp +}