From 3aefbe27884429bdfc295aabee45e61d07d01ad2 Mon Sep 17 00:00:00 2001 From: LiuBo Date: Fri, 10 Jan 2025 18:12:42 +0800 Subject: [PATCH] [improvement] stats: add some metrics and watcher. --- pkg/util/metric/v2/metrics.go | 2 + pkg/util/metric/v2/stats.go | 18 +++++++ pkg/vm/engine/disttae/stats.go | 79 +++++++++++++++++++++++++++++ pkg/vm/engine/disttae/stats_test.go | 36 +++++++++++++ 4 files changed, 135 insertions(+) diff --git a/pkg/util/metric/v2/metrics.go b/pkg/util/metric/v2/metrics.go index 22a49be0104f6..97db85a251f9b 100644 --- a/pkg/util/metric/v2/metrics.go +++ b/pkg/util/metric/v2/metrics.go @@ -58,6 +58,8 @@ func init() { registry.MustRegister(HeartbeatRecvHistogram) registry.MustRegister(HeartbeatRecvFailureCounter) registry.MustRegister(statsTriggerCounter) + registry.MustRegister(StatsTriggerQueueSizeGauge) + registry.MustRegister(StatsUpdateDurationHistogram) registry.MustRegister(StatsUpdateBlockCounter) } diff --git a/pkg/util/metric/v2/stats.go b/pkg/util/metric/v2/stats.go index 9f13d9df32261..6ba8dfbd7d6ec 100644 --- a/pkg/util/metric/v2/stats.go +++ b/pkg/util/metric/v2/stats.go @@ -26,6 +26,24 @@ var ( }, []string{"type"}) StatsTriggerForcedCounter = statsTriggerCounter.WithLabelValues("forced") StatsTriggerUnforcedCounter = statsTriggerCounter.WithLabelValues("unforced") + StatsTriggerConsumeCounter = statsTriggerCounter.WithLabelValues("consume") + + StatsTriggerQueueSizeGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "mo", + Subsystem: "stats", + Name: "trigger_queue_size", + Help: "Size of stats trigger queue size.", + }) + + StatsUpdateDurationHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "mo", + Subsystem: "stats", + Name: "update_duration_seconds", + Help: "Histogram of stats update duration.", + Buckets: getDurationBuckets(), + }) StatsUpdateBlockCounter = prometheus.NewCounter( prometheus.CounterOpts{ diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index 42ad1bd3b0c10..2867b1c597526 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -178,6 +178,10 @@ type GlobalStats struct { updateC chan pb.StatsInfoKeyWithContext + // queueWatcher keeps the table id and its enqueue time. + // and watch the queue item in the queue. + queueWatcher *queueWatcher + updatingMu struct { sync.Mutex updating map[pb.StatsInfoKey]*updateRecord @@ -233,6 +237,7 @@ func NewGlobalStats( tableLogtailCounter: make(map[pb.StatsInfoKey]int64), KeyRouter: keyRouter, waitKeeper: newWaitKeeper(), + queueWatcher: newQueueWatcher(), } s.updatingMu.updating = make(map[pb.StatsInfoKey]*updateRecord) s.mu.statsInfoMap = make(map[pb.StatsInfoKey]*pb.StatsInfo) @@ -247,6 +252,7 @@ func NewGlobalStats( s.concurrentExecutor.Run(ctx) go s.consumeWorker(ctx) go s.updateWorker(ctx) + go s.queueWatcher.run(ctx) return s } @@ -409,6 +415,10 @@ func (gs *GlobalStats) updateWorker(ctx context.Context) { return case key := <-gs.updateC: + // after dequeue from the chan, remove the table ID from the queue watcher. + gs.queueWatcher.del(key.Key.TableID) + + v2.StatsTriggerConsumeCounter.Add(1) gs.updateTableStats(key) } } @@ -417,14 +427,19 @@ func (gs *GlobalStats) updateWorker(ctx context.Context) { } func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKeyWithContext, force bool) bool { + defer func() { + v2.StatsTriggerQueueSizeGauge.Set(float64(len(gs.updateC))) + }() if force { gs.updateC <- key + gs.queueWatcher.add(key.Key.TableID) v2.StatsTriggerForcedCounter.Add(1) return true } select { case gs.updateC <- key: + gs.queueWatcher.add(key.Key.TableID) v2.StatsTriggerUnforcedCounter.Add(1) return true default: @@ -695,10 +710,12 @@ func (gs *GlobalStats) doUpdate(ctx context.Context, key pb.StatsInfoKey, stats approxObjectNum, stats, ) + start := time.Now() if err := UpdateStats(ctx, req, gs.concurrentExecutor); err != nil { logutil.Errorf("failed to init stats info for table %v, err: %v", key, err) return false } + v2.StatsUpdateDurationHistogram.Observe(time.Since(start).Seconds()) v2.StatsUpdateBlockCounter.Add(float64(stats.BlockNumber)) return true } @@ -889,3 +906,65 @@ func UpdateStats(ctx context.Context, req *updateStatsRequest, executor Concurre } return nil } + +type enqueueItem struct { + tableID uint64 + enqueueTime time.Time +} + +type queueWatcher struct { + sync.Mutex + value map[uint64]time.Time + threshold time.Duration + checkInterval time.Duration +} + +func newQueueWatcher() *queueWatcher { + return &queueWatcher{ + value: make(map[uint64]time.Time), + threshold: time.Second * 30, + checkInterval: time.Minute, + } +} + +func (qw *queueWatcher) add(tid uint64) { + qw.Lock() + defer qw.Unlock() + qw.value[tid] = time.Now() +} + +func (qw *queueWatcher) del(tid uint64) { + qw.Lock() + defer qw.Unlock() + delete(qw.value, tid) +} + +func (qw *queueWatcher) check() []enqueueItem { + var timeoutList []enqueueItem + qw.Lock() + defer qw.Unlock() + for tid, et := range qw.value { + if time.Since(et) > qw.threshold { + timeoutList = append(timeoutList, enqueueItem{tid, et}) + } + } + return timeoutList +} + +func (qw *queueWatcher) run(ctx context.Context) { + ticker := time.NewTicker(qw.checkInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + logutil.Infof("stats trigger queue watcher stopped") + return + + case <-ticker.C: + list := qw.check() + if len(list) > 0 { + logutil.Warnf("there are some timeout items in the queue: %v", list) + } + } + } +} diff --git a/pkg/vm/engine/disttae/stats_test.go b/pkg/vm/engine/disttae/stats_test.go index 2b6d780b1932b..a03418c70ba49 100644 --- a/pkg/vm/engine/disttae/stats_test.go +++ b/pkg/vm/engine/disttae/stats_test.go @@ -336,3 +336,39 @@ func TestWaitKeeper(t *testing.T) { assert.False(t, ok) assert.False(t, gs.safeToUnsubscribe(tid)) } + +func TestQueueWatcher(t *testing.T) { + defer leaktest.AfterTest(t)() + testAdjustFn := func(qw *queueWatcher) { + qw.checkInterval = time.Millisecond * 10 + qw.threshold = time.Millisecond * 10 + } + q := newQueueWatcher() + testAdjustFn(q) + + t.Run("ok", func(t *testing.T) { + q.add(101) + q.add(102) + assert.Equal(t, 2, len(q.value)) + q.del(101) + assert.Equal(t, 1, len(q.value)) + + time.Sleep(time.Millisecond * 20) + list := q.check() + assert.Equal(t, 1, len(list)) + q.del(102) + assert.Equal(t, 0, len(q.value)) + }) + + t.Run("run in background", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go q.run(ctx) + q.add(101) + q.add(102) + time.Sleep(time.Millisecond * 20) + list := q.check() + assert.Equal(t, 2, len(list)) + }) + +}