diff --git a/pkg/vm/engine/disttae/logtail_consumer.go b/pkg/vm/engine/disttae/logtail_consumer.go index ca958473b2a69..6fdf9fb6aca43 100644 --- a/pkg/vm/engine/disttae/logtail_consumer.go +++ b/pkg/vm/engine/disttae/logtail_consumer.go @@ -1366,9 +1366,10 @@ func (s *subscribedTable) setTableUnsubscribe(dbId, tblId uint64) { type syncLogTailTimestamp struct { timestampWaiter client.TimestampWaiter ready atomic.Bool - tList []atomic.Value + tList []atomic.Pointer[timestamp.Timestamp] latestAppliedLogTailTS atomic.Pointer[timestamp.Timestamp] e *Engine + pool *sync.Pool } func (r *syncLogTailTimestamp) initLogTailTimestamp(timestampWaiter client.TimestampWaiter) { @@ -1379,17 +1380,24 @@ func (r *syncLogTailTimestamp) initLogTailTimestamp(timestampWaiter client.Times r.timestampWaiter = timestampWaiter if len(r.tList) == 0 { - r.tList = make([]atomic.Value, consumerNumber+1) + r.tList = make([]atomic.Pointer[timestamp.Timestamp], consumerNumber+1) } for i := range r.tList { - r.tList[i].Store(timestamp.Timestamp{}) + r.tList[i].Store(new(timestamp.Timestamp)) + } + if r.pool == nil { + r.pool = &sync.Pool{ + New: func() interface{} { + return ×tamp.Timestamp{} + }, + } } } func (r *syncLogTailTimestamp) getTimestamp() timestamp.Timestamp { var minT timestamp.Timestamp for i := 0; i < len(r.tList); i++ { - t := r.tList[i].Load().(timestamp.Timestamp) + t := *r.tList[i].Load() if i == 0 { minT = t } else { @@ -1410,7 +1418,18 @@ func (r *syncLogTailTimestamp) updateTimestamp( defer func() { v2.LogTailApplyNotifyDurationHistogram.Observe(time.Since(start).Seconds()) }() - r.tList[index].Store(newTimestamp) + + // fetch the instance from the pool. + tmp := r.pool.Get().(*timestamp.Timestamp) + *tmp = newTimestamp + + // get the previous instance to put it back to pool later. + prev := r.tList[index].Load() + r.tList[index].Store(tmp) + + // put the previous instance to the pool. + r.pool.Put(prev) + if r.ready.Load() { ts := r.getTimestamp() r.timestampWaiter.NotifyLatestCommitTS(ts)