Skip to content

Commit

Permalink
[improvement] stats: add some metrics and watcher. (#21182)
Browse files Browse the repository at this point in the history
add some metrics and watcher.

Approved by: @XuPeng-SH, @aptend
  • Loading branch information
volgariver6 authored Jan 10, 2025
1 parent dc935af commit c5da079
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/util/metric/v2/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func init() {
registry.MustRegister(HeartbeatRecvHistogram)
registry.MustRegister(HeartbeatRecvFailureCounter)
registry.MustRegister(statsTriggerCounter)
registry.MustRegister(StatsTriggerQueueSizeGauge)
registry.MustRegister(StatsUpdateDurationHistogram)
registry.MustRegister(StatsUpdateBlockCounter)
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/util/metric/v2/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ var (
}, []string{"type"})
StatsTriggerForcedCounter = statsTriggerCounter.WithLabelValues("forced")
StatsTriggerUnforcedCounter = statsTriggerCounter.WithLabelValues("unforced")
StatsTriggerConsumeCounter = statsTriggerCounter.WithLabelValues("consume")

StatsTriggerQueueSizeGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "mo",
Subsystem: "stats",
Name: "trigger_queue_size",
Help: "Size of stats trigger queue size.",
})

StatsUpdateDurationHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "mo",
Subsystem: "stats",
Name: "update_duration_seconds",
Help: "Histogram of stats update duration.",
Buckets: getDurationBuckets(),
})

StatsUpdateBlockCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Expand Down
79 changes: 79 additions & 0 deletions pkg/vm/engine/disttae/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ type GlobalStats struct {

updateC chan pb.StatsInfoKeyWithContext

// queueWatcher keeps the table id and its enqueue time.
// and watch the queue item in the queue.
queueWatcher *queueWatcher

updatingMu struct {
sync.Mutex
updating map[pb.StatsInfoKey]*updateRecord
Expand Down Expand Up @@ -233,6 +237,7 @@ func NewGlobalStats(
tableLogtailCounter: make(map[pb.StatsInfoKey]int64),
KeyRouter: keyRouter,
waitKeeper: newWaitKeeper(),
queueWatcher: newQueueWatcher(),
}
s.updatingMu.updating = make(map[pb.StatsInfoKey]*updateRecord)
s.mu.statsInfoMap = make(map[pb.StatsInfoKey]*pb.StatsInfo)
Expand All @@ -247,6 +252,7 @@ func NewGlobalStats(
s.concurrentExecutor.Run(ctx)
go s.consumeWorker(ctx)
go s.updateWorker(ctx)
go s.queueWatcher.run(ctx)
return s
}

Expand Down Expand Up @@ -409,6 +415,10 @@ func (gs *GlobalStats) updateWorker(ctx context.Context) {
return

case key := <-gs.updateC:
// after dequeue from the chan, remove the table ID from the queue watcher.
gs.queueWatcher.del(key.Key.TableID)

v2.StatsTriggerConsumeCounter.Add(1)
gs.updateTableStats(key)
}
}
Expand All @@ -417,14 +427,19 @@ func (gs *GlobalStats) updateWorker(ctx context.Context) {
}

func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKeyWithContext, force bool) bool {
defer func() {
v2.StatsTriggerQueueSizeGauge.Set(float64(len(gs.updateC)))
}()
if force {
gs.updateC <- key
gs.queueWatcher.add(key.Key.TableID)
v2.StatsTriggerForcedCounter.Add(1)
return true
}

select {
case gs.updateC <- key:
gs.queueWatcher.add(key.Key.TableID)
v2.StatsTriggerUnforcedCounter.Add(1)
return true
default:
Expand Down Expand Up @@ -695,10 +710,12 @@ func (gs *GlobalStats) doUpdate(ctx context.Context, key pb.StatsInfoKey, stats
approxObjectNum,
stats,
)
start := time.Now()
if err := UpdateStats(ctx, req, gs.concurrentExecutor); err != nil {
logutil.Errorf("failed to init stats info for table %v, err: %v", key, err)
return false
}
v2.StatsUpdateDurationHistogram.Observe(time.Since(start).Seconds())
v2.StatsUpdateBlockCounter.Add(float64(stats.BlockNumber))
return true
}
Expand Down Expand Up @@ -889,3 +906,65 @@ func UpdateStats(ctx context.Context, req *updateStatsRequest, executor Concurre
}
return nil
}

type enqueueItem struct {
tableID uint64
enqueueTime time.Time
}

type queueWatcher struct {
sync.Mutex
value map[uint64]time.Time
threshold time.Duration
checkInterval time.Duration
}

func newQueueWatcher() *queueWatcher {
return &queueWatcher{
value: make(map[uint64]time.Time),
threshold: time.Second * 30,
checkInterval: time.Minute,
}
}

func (qw *queueWatcher) add(tid uint64) {
qw.Lock()
defer qw.Unlock()
qw.value[tid] = time.Now()
}

func (qw *queueWatcher) del(tid uint64) {
qw.Lock()
defer qw.Unlock()
delete(qw.value, tid)
}

func (qw *queueWatcher) check() []enqueueItem {
var timeoutList []enqueueItem
qw.Lock()
defer qw.Unlock()
for tid, et := range qw.value {
if time.Since(et) > qw.threshold {
timeoutList = append(timeoutList, enqueueItem{tid, et})
}
}
return timeoutList
}

func (qw *queueWatcher) run(ctx context.Context) {
ticker := time.NewTicker(qw.checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
logutil.Infof("stats trigger queue watcher stopped")
return

case <-ticker.C:
list := qw.check()
if len(list) > 0 {
logutil.Warnf("there are some timeout items in the queue: %v", list)
}
}
}
}
36 changes: 36 additions & 0 deletions pkg/vm/engine/disttae/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,39 @@ func TestWaitKeeper(t *testing.T) {
assert.False(t, ok)
assert.False(t, gs.safeToUnsubscribe(tid))
}

func TestQueueWatcher(t *testing.T) {
defer leaktest.AfterTest(t)()
testAdjustFn := func(qw *queueWatcher) {
qw.checkInterval = time.Millisecond * 10
qw.threshold = time.Millisecond * 10
}
q := newQueueWatcher()
testAdjustFn(q)

t.Run("ok", func(t *testing.T) {
q.add(101)
q.add(102)
assert.Equal(t, 2, len(q.value))
q.del(101)
assert.Equal(t, 1, len(q.value))

time.Sleep(time.Millisecond * 20)
list := q.check()
assert.Equal(t, 1, len(list))
q.del(102)
assert.Equal(t, 0, len(q.value))
})

t.Run("run in background", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go q.run(ctx)
q.add(101)
q.add(102)
time.Sleep(time.Millisecond * 20)
list := q.check()
assert.Equal(t, 2, len(list))
})

}

0 comments on commit c5da079

Please sign in to comment.