Skip to content

Commit

Permalink
[improvement] logtail: optimize the instance allocation.
Browse files Browse the repository at this point in the history
  • Loading branch information
volgariver6 committed Jan 17, 2025
1 parent e9a5458 commit e07a384
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions pkg/vm/engine/disttae/logtail_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,9 +1366,10 @@ func (s *subscribedTable) setTableUnsubscribe(dbId, tblId uint64) {
type syncLogTailTimestamp struct {
timestampWaiter client.TimestampWaiter
ready atomic.Bool
tList []atomic.Value
tList []atomic.Pointer[timestamp.Timestamp]
latestAppliedLogTailTS atomic.Pointer[timestamp.Timestamp]
e *Engine
pool *sync.Pool
}

func (r *syncLogTailTimestamp) initLogTailTimestamp(timestampWaiter client.TimestampWaiter) {
Expand All @@ -1379,17 +1380,24 @@ func (r *syncLogTailTimestamp) initLogTailTimestamp(timestampWaiter client.Times

r.timestampWaiter = timestampWaiter
if len(r.tList) == 0 {
r.tList = make([]atomic.Value, consumerNumber+1)
r.tList = make([]atomic.Pointer[timestamp.Timestamp], consumerNumber+1)
}
for i := range r.tList {
r.tList[i].Store(timestamp.Timestamp{})
r.tList[i].Store(new(timestamp.Timestamp))
}
if r.pool == nil {
r.pool = &sync.Pool{
New: func() interface{} {
return &timestamp.Timestamp{}
},
}
}
}

func (r *syncLogTailTimestamp) getTimestamp() timestamp.Timestamp {
var minT timestamp.Timestamp
for i := 0; i < len(r.tList); i++ {
t := r.tList[i].Load().(timestamp.Timestamp)
t := *r.tList[i].Load()
if i == 0 {
minT = t
} else {
Expand All @@ -1410,7 +1418,18 @@ func (r *syncLogTailTimestamp) updateTimestamp(
defer func() {
v2.LogTailApplyNotifyDurationHistogram.Observe(time.Since(start).Seconds())
}()
r.tList[index].Store(newTimestamp)

// fetch the instance from the pool.
tmp := r.pool.Get().(*timestamp.Timestamp)
*tmp = newTimestamp

// get the previous instance to put it back to pool later.
prev := r.tList[index].Load()
r.tList[index].Store(tmp)

// put the previous instance to the pool.
r.pool.Put(prev)

if r.ready.Load() {
ts := r.getTimestamp()
r.timestampWaiter.NotifyLatestCommitTS(ts)
Expand Down

0 comments on commit e07a384

Please sign in to comment.