Skip to content

Commit

Permalink
feat: adaptive lookback for monovertex (#2373)
Browse files Browse the repository at this point in the history
  • Loading branch information
kohlisid authored Feb 1, 2025
1 parent 8e9bafb commit afc16ac
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 7 deletions.
2 changes: 2 additions & 0 deletions docs/user-guide/reference/autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ spec:
don't want the vertices to be scaled down to `0`. In this case, you need to increase `lookbackSeconds` to overlap
5 minutes, so that the calculated average rate and pending messages won't be `0` during the silent period, in order to prevent from
scaling down to 0.
The max value allowed to be configured is `600`.
On top of this, we have dynamic lookback adjustment which tunes this parameter based on the realtime processing data.
- `scaleUpCooldownSeconds` - After a scaling operation, how many seconds to wait for the same vertex, if the follow-up
operation is a scaling up, defaults to `90`. Please make sure that the time is greater than the pod to be `Running` and
start processing, because the autoscaling algorithm will divide the TPS by the number of pods even if the pod is not `Running`.
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ const (
DefaultTargetProcessingSeconds = 20 // Default targeted time in seconds to finish processing all the pending messages for a source
DefaultTargetBufferAvailability = 50 // Default targeted percentage of buffer availability
DefaultReplicasPerScale = 2 // Default maximum replicas to be scaled up or down at once
MaxLookbackSeconds = 600 // Max lookback seconds for calculating avg rate and pending

// Default persistent buffer queue options
DefaultPBQChannelBufferSize = 100 // Default channel size in int (what should be right value?)
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/numaflow/v1alpha1/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type Scale struct {

func (s Scale) GetLookbackSeconds() int {
if s.LookbackSeconds != nil {
return int(*s.LookbackSeconds)
// do not allow the value to be larger than the MaxLookbackSeconds in our config
return min(MaxLookbackSeconds, int(*s.LookbackSeconds))
}
return DefaultLookbackSeconds
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,17 @@ var (
Help: "Total number of Write Errors while writing to a fallback sink",
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
)

// Daemon server metrics
var (
// MonoVertexLookBackSecs is a gauge used to indicate what is the current lookback window value being used
// by a given monovertex. It is used as how many seconds to lookback for vertex average processing rate
// (tps) and pending messages calculation, defaults to 120. Rate and pending messages metrics are
// critical for autoscaling.
MonoVertexLookBackSecs = promauto.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "monovtx",
Name: "lookback_window_seconds",
Help: "A metric to show what is the lookback window value being used by a given monovertex. " +
"Look back Seconds is critical in autoscaling calculations",
}, []string{LabelMonoVertexName})
)
92 changes: 92 additions & 0 deletions pkg/mvtxdaemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,95 @@ func calculatePodDelta(tc1, tc2 *TimestampedCounts) float64 {
}
return delta
}

// podLastSeen stores the last seen timestamp and count of each pod
type podLastSeen struct {
count float64
seenTime int64
}

type podMaxDuration struct {
// Map to keep track of the last seen count and timestamp of each pod.
lastSeen map[string]podLastSeen
// Map to store the maximum duration for which the value of any pod was unchanged.
maxUnchangedDuration map[string]int64
}

// CalculateMaxLookback computes the maximum duration (in seconds) for which the count of messages processed by any pod
// remained unchanged within a specified range of indices in a queue of TimestampedCounts. It does this by analyzing each
// data point between the startIndex and endIndex, checking the count changes for each pod, and noting the durations
// during which these counts stay consistent. The metric is updated when data is read by the pod
// This would encapsulate the lookback for two scenarios
// 1. Slow processing vertex
// 2. Slow data source - data arrives after long intervals
func CalculateMaxLookback(counts []*TimestampedCounts, startIndex, endIndex int) int64 {
lookBackData := podMaxDuration{
lastSeen: make(map[string]podLastSeen),
maxUnchangedDuration: make(map[string]int64),
}
processTimeline(counts, startIndex, endIndex, &lookBackData)
finalizeDurations(counts[endIndex], &lookBackData)
return findGlobalMaxDuration(lookBackData.maxUnchangedDuration)
}

// processTimeline processes the timeline of counts and updates the maxUnchangedDuration for each pod.
func processTimeline(counts []*TimestampedCounts, startIndex, endIndex int, data *podMaxDuration) {
for i := startIndex; i <= endIndex; i++ {
item := counts[i].PodCountSnapshot()
curTime := counts[i].PodTimestamp()

for key, curCount := range item {
lastSeenData, found := data.lastSeen[key]
if found && lastSeenData.count == curCount {
continue
}

// If the read count data has updated
if found && curCount > lastSeenData.count {
duration := curTime - lastSeenData.seenTime
if currentMax, ok := data.maxUnchangedDuration[key]; !ok || duration > currentMax {
data.maxUnchangedDuration[key] = duration
}
}
// The value is updated in the lastSeen for 3 cases
// 1. If this is the first time seeing the pod entry or
// 2. in case of a value increase,
// 3. In case of a value decrease which is treated as a new entry for pod
data.lastSeen[key] = podLastSeen{curCount, curTime}
}
}
}

// Check for pods that did not change at all during the iteration,
// and update their maxUnchangedDuration to the full period from first seen to lastTime.
// Note: There is a case where one pod was getting data earlier, but then stopped altogether.
// For example, one partition in Kafka not getting data after a while. This case will not be covered
// by our logic, and we would keep increasing the look back in such a scenario.
func finalizeDurations(lastCount *TimestampedCounts, data *podMaxDuration) {
endVals := lastCount.PodCountSnapshot()
lastTime := lastCount.PodTimestamp()
for key, lastSeenData := range data.lastSeen {
endDuration := lastTime - lastSeenData.seenTime
// This condition covers two scenarios:
// 1. There is an entry in the last seen, but not in maxUnchangedDuration
// It was seen once, but value never changed. In this case update the maxDuration, but only when
// the count > 0
// 2. The value has not changed till the boundary, and this duration is larger than the current max
if _, exists := endVals[key]; exists && (lastSeenData.count != 0) {
if currentMax, ok := data.maxUnchangedDuration[key]; !ok || endDuration > currentMax {
data.maxUnchangedDuration[key] = endDuration
}
}
}
}

// Calculate the maximum duration found across all pods.
func findGlobalMaxDuration(maxUnchangedDuration map[string]int64) int64 {
globalMaxSecs := int64(0)
for _, duration := range maxUnchangedDuration {
if duration > globalMaxSecs {
globalMaxSecs = duration
}
}
return globalMaxSecs
}
127 changes: 127 additions & 0 deletions pkg/mvtxdaemon/server/service/rater/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package rater

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -286,3 +287,129 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 23.0, CalculateRate(q, 100))
})
}

// Helper function to create a TimestampedCounts instance
func newTimestampedCounts(timestamp int64, counts map[string]float64) *TimestampedCounts {
return &TimestampedCounts{
timestamp: timestamp,
podReadCounts: counts,
lock: new(sync.RWMutex),
}
}

// TestCalculateMaxLookback tests various scenarios on the CalculateMaxLookback function
func TestCalculateMaxLookback(t *testing.T) {
tests := []struct {
name string
counts []*TimestampedCounts
startIndex int
endIndex int
expectedMax int64
}{
{
name: "Uniform data across the range",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 200}),
newTimestampedCounts(200, map[string]float64{"pod1": 100, "pod2": 200}),
newTimestampedCounts(400, map[string]float64{"pod1": 100, "pod2": 200}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 300,
},
{
name: "Values change midway",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 150}),
newTimestampedCounts(240, map[string]float64{"pod1": 100, "pod2": 200}),
newTimestampedCounts(360, map[string]float64{"pod1": 150, "pod2": 200}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 260,
},
{
name: "No data change across any pods",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 500}),
newTimestampedCounts(600, map[string]float64{"pod1": 500}),
},
startIndex: 0,
endIndex: 1,
expectedMax: 500, // Entire duration
},
{
name: "Edge Case: One entry only",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100}),
},
startIndex: 0,
endIndex: 0,
expectedMax: 0, // No duration difference
},
{
name: "Rapid changes in sequential entries",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 500, "pod2": 400}),
newTimestampedCounts(130, map[string]float64{"pod1": 600, "pod2": 400}),
newTimestampedCounts(160, map[string]float64{"pod1": 600, "pod2": 600}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 60,
},
{
// Here the pod has an initial read count, and then would we see a pod count as 0.
// This is equated as a refresh in counts, and thus
name: "Pod goes to zero",
counts: []*TimestampedCounts{
newTimestampedCounts(0, map[string]float64{"pod1": 50}), // Initial count
newTimestampedCounts(30, map[string]float64{"pod1": 50}),
newTimestampedCounts(60, map[string]float64{"pod1": 0}), // Count falls to zero
newTimestampedCounts(120, map[string]float64{"pod1": 25}), // Count returns
newTimestampedCounts(180, map[string]float64{"pod1": 25}), // Count stays stable
newTimestampedCounts(240, map[string]float64{"pod1": 25}), // Count stays stable again
},
startIndex: 0,
endIndex: 5,
expectedMax: 120, // from index 3,5
},
{
name: "Pod goes to zero - 2",
counts: []*TimestampedCounts{
newTimestampedCounts(0, map[string]float64{"pod1": 60}),
newTimestampedCounts(60, map[string]float64{"pod1": 60}),
newTimestampedCounts(120, map[string]float64{"pod1": 70}),
newTimestampedCounts(180, map[string]float64{"pod1": 0}),
newTimestampedCounts(240, map[string]float64{"pod1": 25}),
newTimestampedCounts(300, map[string]float64{"pod1": 25}),
},
startIndex: 0,
endIndex: 5,
expectedMax: 120, // here idx 0,2 should be used, after going to zero it resets
},
{
// this is a case where one pod never got any data which we consider as read count = 0 always
// in such a case we should not use this pod for calculation
name: "One pod no data, other >0 ",
counts: []*TimestampedCounts{
newTimestampedCounts(0, map[string]float64{"pod1": 0, "pod2": 5}),
newTimestampedCounts(60, map[string]float64{"pod1": 0, "pod2": 5}),
newTimestampedCounts(120, map[string]float64{"pod1": 0, "pod2": 5}),
newTimestampedCounts(180, map[string]float64{"pod1": 0, "pod2": 5}),
newTimestampedCounts(240, map[string]float64{"pod1": 0, "pod2": 6}),
newTimestampedCounts(300, map[string]float64{"pod1": 0, "pod2": 6}),
},
startIndex: 0,
endIndex: 5,
expectedMax: 240,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
maxDuration := CalculateMaxLookback(tt.counts, tt.startIndex, tt.endIndex)
assert.Equal(t, tt.expectedMax, maxDuration)
})
}
}
Loading

0 comments on commit afc16ac

Please sign in to comment.