diff --git a/docs/user-guide/reference/autoscaling.md b/docs/user-guide/reference/autoscaling.md
index e78cc0ec59..fff13191c0 100644
--- a/docs/user-guide/reference/autoscaling.md
+++ b/docs/user-guide/reference/autoscaling.md
@@ -49,6 +49,8 @@ spec:
   don't want the vertices to be scaled down to `0`. In this case, you need to increase `lookbackSeconds` to overlap
   5 minutes, so that the calculated average rate and pending messages won't be `0` during the silent period, in order to prevent from
   scaling down to 0.
+  The max value allowed to be configured is `600`.
+  On top of this, we have dynamic lookback adjustment which tunes this parameter based on the realtime processing data.
 - `scaleUpCooldownSeconds` - After a scaling operation, how many seconds to wait for the same vertex, if the follow-up
   operation is a scaling up, defaults to `90`. Please make sure that the time is greater than the pod to be `Running` and
   start processing, because the autoscaling algorithm will divide the TPS by the number of pods even if the pod is not `Running`.
diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go
index 8f747f91ef..e9a539179e 100644
--- a/pkg/apis/numaflow/v1alpha1/const.go
+++ b/pkg/apis/numaflow/v1alpha1/const.go
@@ -183,6 +183,7 @@ const (
 	DefaultTargetProcessingSeconds  = 20  // Default targeted time in seconds to finish processing all the pending messages for a source
 	DefaultTargetBufferAvailability = 50  // Default targeted percentage of buffer availability
 	DefaultReplicasPerScale         = 2   // Default maximum replicas to be scaled up or down at once
+	MaxLookbackSeconds              = 600 // Max lookback seconds for calculating avg rate and pending
 	// Default persistent buffer queue options
 	DefaultPBQChannelBufferSize = 100             // Default channel size in int (what should be right value?)
diff --git a/pkg/apis/numaflow/v1alpha1/scale.go b/pkg/apis/numaflow/v1alpha1/scale.go
index 9fbe58da04..4ee06f4b99 100644
--- a/pkg/apis/numaflow/v1alpha1/scale.go
+++ b/pkg/apis/numaflow/v1alpha1/scale.go
@@ -69,7 +69,8 @@ type Scale struct {
 func (s Scale) GetLookbackSeconds() int {
 	if s.LookbackSeconds != nil {
-		return int(*s.LookbackSeconds)
+		// do not allow the value to be larger than the MaxLookbackSeconds in our config
+		return min(MaxLookbackSeconds, int(*s.LookbackSeconds))
 	return DefaultLookbackSeconds
diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go
index 53605eca2c..5d2d722545 100644
--- a/pkg/metrics/metrics.go
+++ b/pkg/metrics/metrics.go
@@ -370,3 +370,17 @@ var (
 		Help:      "Total number of Write Errors while writing to a fallback sink",
 	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
+// Daemon server metrics
+var (
+	// MonoVertexLookBackSecs is a gauge used to indicate what is the current lookback window value being used
+	// by a given monovertex. It is used as how many seconds to lookback for vertex average processing rate
+	// (tps) and pending messages calculation, defaults to 120. Rate and pending messages metrics are
+	// critical for autoscaling.
+	MonoVertexLookBackSecs = promauto.NewGaugeVec(prometheus.GaugeOpts{
+		Subsystem: "monovtx",
+		Name:      "lookback_window_seconds",
+		Help: "A metric to show what is the lookback window value being used by a given monovertex. " +
+			"Look back Seconds is critical in autoscaling calculations",
+	}, []string{LabelMonoVertexName})
diff --git a/pkg/mvtxdaemon/server/service/rater/helper.go b/pkg/mvtxdaemon/server/service/rater/helper.go
index ce9aac1ed4..0091fd984e 100644
--- a/pkg/mvtxdaemon/server/service/rater/helper.go
+++ b/pkg/mvtxdaemon/server/service/rater/helper.go
@@ -125,3 +125,95 @@ func calculatePodDelta(tc1, tc2 *TimestampedCounts) float64 {
 	return delta
+// podLastSeen stores the last seen timestamp and count of each pod
+type podLastSeen struct {
+	count    float64
+	seenTime int64
+type podMaxDuration struct {
+	// Map to keep track of the last seen count and timestamp of each pod.
+	lastSeen map[string]podLastSeen
+	// Map to store the maximum duration for which the value of any pod was unchanged.
+	maxUnchangedDuration map[string]int64
+// CalculateMaxLookback computes the maximum duration (in seconds) for which the count of messages processed by any pod
+// remained unchanged within a specified range of indices in a queue of TimestampedCounts. It does this by analyzing each
+// data point between the startIndex and endIndex, checking the count changes for each pod, and noting the durations
+// during which these counts stay consistent. The metric is updated when data is read by the pod
+// This would encapsulate the lookback for two scenarios
+// 1. Slow processing vertex
+// 2. Slow data source - data arrives after long intervals
+func CalculateMaxLookback(counts []*TimestampedCounts, startIndex, endIndex int) int64 {
+	lookBackData := podMaxDuration{
+		lastSeen:             make(map[string]podLastSeen),
+		maxUnchangedDuration: make(map[string]int64),
+	}
+	processTimeline(counts, startIndex, endIndex, &lookBackData)
+	finalizeDurations(counts[endIndex], &lookBackData)
+	return findGlobalMaxDuration(lookBackData.maxUnchangedDuration)
+// processTimeline processes the timeline of counts and updates the maxUnchangedDuration for each pod.
+func processTimeline(counts []*TimestampedCounts, startIndex, endIndex int, data *podMaxDuration) {
+	for i := startIndex; i <= endIndex; i++ {
+		item := counts[i].PodCountSnapshot()
+		curTime := counts[i].PodTimestamp()
+		for key, curCount := range item {
+			lastSeenData, found := data.lastSeen[key]
+			if found && lastSeenData.count == curCount {
+				continue
+			}
+			// If the read count data has updated
+			if found && curCount > lastSeenData.count {
+				duration := curTime - lastSeenData.seenTime
+				if currentMax, ok := data.maxUnchangedDuration[key]; !ok || duration > currentMax {
+					data.maxUnchangedDuration[key] = duration
+				}
+			}
+			// The value is updated in the lastSeen for 3 cases
+			// 1. If this is the first time seeing the pod entry or
+			// 2. in case of a value increase,
+			// 3. In case of a value decrease which is treated as a new entry for pod
+			data.lastSeen[key] = podLastSeen{curCount, curTime}
+		}
+	}
+// Check for pods that did not change at all during the iteration,
+// and update their maxUnchangedDuration to the full period from first seen to lastTime.
+// Note: There is a case where one pod was getting data earlier, but then stopped altogether.
+// For example, one partition in Kafka not getting data after a while. This case will not be covered
+// by our logic, and we would keep increasing the look back in such a scenario.
+func finalizeDurations(lastCount *TimestampedCounts, data *podMaxDuration) {
+	endVals := lastCount.PodCountSnapshot()
+	lastTime := lastCount.PodTimestamp()
+	for key, lastSeenData := range data.lastSeen {
+		endDuration := lastTime - lastSeenData.seenTime
+		// This condition covers two scenarios:
+		// 1. There is an entry in the last seen, but not in maxUnchangedDuration
+		// It was seen once, but value never changed. In this case update the maxDuration, but only when
+		// the count > 0
+		// 2. The value has not changed till the boundary, and this duration is larger than the current max
+		if _, exists := endVals[key]; exists && (lastSeenData.count != 0) {
+			if currentMax, ok := data.maxUnchangedDuration[key]; !ok || endDuration > currentMax {
+				data.maxUnchangedDuration[key] = endDuration
+			}
+		}
+	}
+// Calculate the maximum duration found across all pods.
+func findGlobalMaxDuration(maxUnchangedDuration map[string]int64) int64 {
+	globalMaxSecs := int64(0)
+	for _, duration := range maxUnchangedDuration {
+		if duration > globalMaxSecs {
+			globalMaxSecs = duration
+		}
+	}
+	return globalMaxSecs
diff --git a/pkg/mvtxdaemon/server/service/rater/helper_test.go b/pkg/mvtxdaemon/server/service/rater/helper_test.go
index f3e61e3390..3812567c80 100644
--- a/pkg/mvtxdaemon/server/service/rater/helper_test.go
+++ b/pkg/mvtxdaemon/server/service/rater/helper_test.go
@@ -17,6 +17,7 @@ limitations under the License.
 package rater
 import (
+	"sync"
@@ -286,3 +287,129 @@ func TestCalculateRate(t *testing.T) {
 		assert.Equal(t, 23.0, CalculateRate(q, 100))
+// Helper function to create a TimestampedCounts instance
+func newTimestampedCounts(timestamp int64, counts map[string]float64) *TimestampedCounts {
+	return &TimestampedCounts{
+		timestamp:     timestamp,
+		podReadCounts: counts,
+		lock:          new(sync.RWMutex),
+	}
+// TestCalculateMaxLookback tests various scenarios on the CalculateMaxLookback function
+func TestCalculateMaxLookback(t *testing.T) {
+	tests := []struct {
+		name        string
+		counts      []*TimestampedCounts
+		startIndex  int
+		endIndex    int
+		expectedMax int64
+	}{
+		{
+			name: "Uniform data across the range",
+			counts: []*TimestampedCounts{
+				newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 200}),
+				newTimestampedCounts(200, map[string]float64{"pod1": 100, "pod2": 200}),
+				newTimestampedCounts(400, map[string]float64{"pod1": 100, "pod2": 200}),
+			},
+			startIndex:  0,
+			endIndex:    2,
+			expectedMax: 300,
+		},
+		{
+			name: "Values change midway",
+			counts: []*TimestampedCounts{
+				newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 150}),
+				newTimestampedCounts(240, map[string]float64{"pod1": 100, "pod2": 200}),
+				newTimestampedCounts(360, map[string]float64{"pod1": 150, "pod2": 200}),
+			},
+			startIndex:  0,
+			endIndex:    2,
+			expectedMax: 260,
+		},
+		{
+			name: "No data change across any pods",
+			counts: []*TimestampedCounts{
+				newTimestampedCounts(100, map[string]float64{"pod1": 500}),
+				newTimestampedCounts(600, map[string]float64{"pod1": 500}),
+			},
+			startIndex:  0,
+			endIndex:    1,
+			expectedMax: 500, // Entire duration
+		},
+		{
+			name: "Edge Case: One entry only",
+			counts: []*TimestampedCounts{
+				newTimestampedCounts(100, map[string]float64{"pod1": 100}),
+			},
+			startIndex:  0,
+			endIndex:    0,
+			expectedMax: 0, // No duration difference
+		},
+		{
+			name: "Rapid changes in sequential entries",
+			counts: []*TimestampedCounts{
+				newTimestampedCounts(100, map[string]float64{"pod1": 500, "pod2": 400}),
+				newTimestampedCounts(130, map[string]float64{"pod1": 600, "pod2": 400}),
+				newTimestampedCounts(160, map[string]float64{"pod1": 600, "pod2": 600}),
+			},
+			startIndex:  0,
+			endIndex:    2,
+			expectedMax: 60,
+		},
+		{
+			// Here the pod has an initial read count, and then would we see a pod count as 0.
+			// This is equated as a refresh in counts, and thus
+			name: "Pod goes to zero",
+			counts: []*TimestampedCounts{
+				newTimestampedCounts(0, map[string]float64{"pod1": 50}), // Initial count
+				newTimestampedCounts(30, map[string]float64{"pod1": 50}),
+				newTimestampedCounts(60, map[string]float64{"pod1": 0}),   // Count falls to zero
+				newTimestampedCounts(120, map[string]float64{"pod1": 25}), // Count returns
+				newTimestampedCounts(180, map[string]float64{"pod1": 25}), // Count stays stable
+				newTimestampedCounts(240, map[string]float64{"pod1": 25}), // Count stays stable again
+			},
+			startIndex:  0,
+			endIndex:    5,
+			expectedMax: 120, // from index 3,5
+		},
+		{
+			name: "Pod goes to zero - 2",
+			counts: []*TimestampedCounts{
+				newTimestampedCounts(0, map[string]float64{"pod1": 60}),
+				newTimestampedCounts(60, map[string]float64{"pod1": 60}),
+				newTimestampedCounts(120, map[string]float64{"pod1": 70}),
+				newTimestampedCounts(180, map[string]float64{"pod1": 0}),
+				newTimestampedCounts(240, map[string]float64{"pod1": 25}),
+				newTimestampedCounts(300, map[string]float64{"pod1": 25}),
+			},
+			startIndex:  0,
+			endIndex:    5,
+			expectedMax: 120, // here idx 0,2 should be used, after going to zero it resets
+		},
+		{
+			// this is a case where one pod never got any data which we consider as read count = 0 always
+			// in such a case we should not use this pod for calculation
+			name: "One pod no data, other >0 ",
+			counts: []*TimestampedCounts{
+				newTimestampedCounts(0, map[string]float64{"pod1": 0, "pod2": 5}),
+				newTimestampedCounts(60, map[string]float64{"pod1": 0, "pod2": 5}),
+				newTimestampedCounts(120, map[string]float64{"pod1": 0, "pod2": 5}),
+				newTimestampedCounts(180, map[string]float64{"pod1": 0, "pod2": 5}),
+				newTimestampedCounts(240, map[string]float64{"pod1": 0, "pod2": 6}),
+				newTimestampedCounts(300, map[string]float64{"pod1": 0, "pod2": 6}),
+			},
+			startIndex:  0,
+			endIndex:    5,
+			expectedMax: 240,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			maxDuration := CalculateMaxLookback(tt.counts, tt.startIndex, tt.endIndex)
+			assert.Equal(t, tt.expectedMax, maxDuration)
+		})
+	}
diff --git a/pkg/mvtxdaemon/server/service/rater/rater.go b/pkg/mvtxdaemon/server/service/rater/rater.go
index ac7422952e..38c3ed9dbb 100644
--- a/pkg/mvtxdaemon/server/service/rater/rater.go
+++ b/pkg/mvtxdaemon/server/service/rater/rater.go
@@ -19,15 +19,18 @@ package rater
 import (
+	"math"
+	"go.uber.org/atomic"
+	"github.com/numaproj/numaflow/pkg/metrics"
 	sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue"
@@ -35,6 +38,11 @@ import (
 const CountWindow = time.Second * 10
 const monoVtxReadMetricName = "monovtx_read_total"
+// MaxLookback is the upper limit beyond which lookback value is not increased
+// by the dynamic algorithm. This is chosen as a conservative limit
+// where vertices taking beyond this time for processing might not be the
+// best candidate for auto-scaling. Might be prudent to keep fixed pods at that time.
 // MonoVtxRatable is the interface for the Rater struct.
 type MonoVtxRatable interface {
 	Start(ctx context.Context) error
@@ -63,9 +71,10 @@ type Rater struct {
 	podTracker *PodTracker
 	// timestampedPodCounts is a queue of timestamped counts for the MonoVertex
 	timestampedPodCounts *sharedqueue.OverflowQueue[*TimestampedCounts]
-	// userSpecifiedLookBackSeconds is the user-specified lookback seconds for that MonoVertex
-	userSpecifiedLookBackSeconds int64
-	options                      *options
+	// lookBackSeconds is the lookback time window used for scaling calculations
+	// this can be updated dynamically, defaults to user-specified value in the spec
+	lookBackSeconds *atomic.Float64
+	options         *options
 // PodReadCount is a struct to maintain count of messages read by a pod of MonoVertex
@@ -97,18 +106,20 @@ func NewRater(ctx context.Context, mv *v1alpha1.MonoVertex, opts ...Option) *Rat
 		log:     logging.FromContext(ctx).Named("Rater"),
 		options: defaultOptions(),
+		// load the default lookback value from the spec
+		lookBackSeconds: atomic.NewFloat64(float64(mv.Spec.Scale.GetLookbackSeconds())),
 	rater.podTracker = NewPodTracker(ctx, mv)
 	// maintain the total counts of the last 30 minutes(1800 seconds) since we support 1m, 5m, 15m lookback seconds.
 	rater.timestampedPodCounts = sharedqueue.New[*TimestampedCounts](int(1800 / CountWindow.Seconds()))
-	rater.userSpecifiedLookBackSeconds = int64(mv.Spec.Scale.GetLookbackSeconds())
 	for _, opt := range opts {
 		if opt != nil {
+	// initialise the metric value for the lookback window
+	metrics.MonoVertexLookBackSecs.WithLabelValues(mv.Name).Set(rater.lookBackSeconds.Load())
 	return &rater
@@ -203,7 +214,10 @@ func (r *Rater) GetRates() map[string]*wrapperspb.DoubleValue {
 func (r *Rater) buildLookbackSecondsMap() map[string]int64 {
-	lookbackSecondsMap := map[string]int64{"default": r.userSpecifiedLookBackSeconds}
+	// as the default lookback value can be changing dynamically,
+	// load the current value for the lookback seconds
+	lbValue := r.lookBackSeconds.Load()
+	lookbackSecondsMap := map[string]int64{"default": int64(lbValue)}
 	for k, v := range fixedLookbackSeconds {
 		lookbackSecondsMap[k] = v
@@ -223,6 +237,10 @@ func (r *Rater) Start(ctx context.Context) error {
+	// start the dynamic lookback check which will be
+	// updating the lookback period based on the data read time.
+	go r.startDynamicLookBack(ctx)
 	// Worker group
 	for i := 1; i <= r.options.workers; i++ {
 		go r.monitor(ctx, i, keyCh)
@@ -269,3 +287,65 @@ func sleep(ctx context.Context, duration time.Duration) {
 	case <-time.After(duration):
+func (r *Rater) startDynamicLookBack(ctx context.Context) {
+	ticker := time.NewTicker(30 * time.Second)
+	// Ensure the ticker is stopped to prevent a resource leak.
+	defer ticker.Stop()
+	for {
+		select {
+		case <-ticker.C:
+			r.updateDynamicLookbackSecs()
+		case <-ctx.Done():
+			// If the context is canceled or expires exit
+			return
+		}
+	}
+// updateDynamicLookbackSecs continuously adjusts ths lookback duration based on the current
+// processing time of the MonoVertex system.
+func (r *Rater) updateDynamicLookbackSecs() {
+	counts := r.timestampedPodCounts.Items()
+	if len(counts) <= 1 {
+		return
+	}
+	// We will calculate the processing time for a time window = 3 * currentLookback
+	// This ensures that we have enough data to capture one complete processing
+	currentLookback := r.lookBackSeconds.Load()
+	startIndex := findStartIndex(3*int64(currentLookback), counts)
+	// we consider the last but one element as the end index because the last element might be incomplete
+	// we can be sure that the last but one element in the queue is complete.
+	endIndex := len(counts) - 2
+	if startIndex == indexNotFound {
+		return
+	}
+	// time diff in seconds.
+	timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp
+	if timeDiff == 0 {
+		// no action required here
+		return
+	}
+	maxProcessingTime := CalculateMaxLookback(counts, startIndex, endIndex)
+	// round up to the nearest minute, also ensure that while going up and down we have the consistent value for
+	// a given processingTimeSeconds, then convert back to seconds
+	roundedMaxLookback := 60.0 * (math.Ceil(float64(maxProcessingTime) / 60.0))
+	// Based on the value received we can have two cases
+	// 1. Step up case (value is > than current):
+	// 	  Do not allow the value to be increased more than the MaxLookback allowed (10mins)
+	// 2. Step Down (value is <= than current)
+	//    Do not allow the value to be lower the lookback value specified in the spec
+	if roundedMaxLookback > currentLookback {
+		roundedMaxLookback = math.Min(roundedMaxLookback, float64(v1alpha1.MaxLookbackSeconds))
+	} else {
+		roundedMaxLookback = math.Max(roundedMaxLookback, float64(r.monoVertex.Spec.Scale.GetLookbackSeconds()))
+	}
+	// If the value has changed, update it
+	if roundedMaxLookback != currentLookback {
+		r.lookBackSeconds.Store(roundedMaxLookback)
+		r.log.Infof("Lookback updated for mvtx %s, Current: %f Updated %f", r.monoVertex.Name, currentLookback, roundedMaxLookback)
+		// update the metric value for the lookback window
+		metrics.MonoVertexLookBackSecs.WithLabelValues(r.monoVertex.Name).Set(roundedMaxLookback)
+	}
diff --git a/pkg/mvtxdaemon/server/service/rater/timestamped_counts.go b/pkg/mvtxdaemon/server/service/rater/timestamped_counts.go
index ee2a13519b..e3761bb3ed 100644
--- a/pkg/mvtxdaemon/server/service/rater/timestamped_counts.go
+++ b/pkg/mvtxdaemon/server/service/rater/timestamped_counts.go
@@ -71,3 +71,9 @@ func (tc *TimestampedCounts) String() string {
 	defer tc.lock.RUnlock()
 	return fmt.Sprintf("{timestamp: %d, podReadCounts: %v}", tc.timestamp, tc.podReadCounts)
+func (tc *TimestampedCounts) PodTimestamp() int64 {
+	tc.lock.RLock()
+	defer tc.lock.RUnlock()
+	return tc.timestamp