Skip to content

Commit

Permalink
eventstore: add more metrics (pingcap#593)
Browse files Browse the repository at this point in the history
* fix

* add metrics

* change config

* add grafana
  • Loading branch information
lidezhu authored Nov 23, 2024
1 parent 57e4376 commit 211ccbf
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 10 deletions.
8 changes: 7 additions & 1 deletion logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ import (
"golang.org/x/sync/errgroup"
)

var metricEventStoreDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("event-store")
var metricEventStoreDSChannelSize = metrics.DynamicStreamEventChanSize.WithLabelValues("event-store")

type ResolvedTsNotifier func(watermark uint64)

type EventStore interface {
Expand Down Expand Up @@ -225,7 +228,7 @@ func New(

option := dynstream.NewOption()
option.InputBufferSize = 80000
option.BatchCount = 4096
option.BatchCount = 40960
ds := dynstream.NewParallelDynamicStream(streamCount, pathHasher{}, &eventsHandler{}, option)
ds.Start()

Expand Down Expand Up @@ -688,6 +691,9 @@ func (e *eventStore) updateMetricsOnce() {
minResolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
eventStoreResolvedTsLag := float64(currentPhyTs-minResolvedPhyTs) / 1e3
metrics.EventStoreResolvedTsLagGauge.Set(eventStoreResolvedTsLag)
dsMetrics := e.ds.GetMetrics()
metricEventStoreDSChannelSize.Set(float64(dsMetrics.EventChanSize))
metricEventStoreDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen))
}

func (e *eventStore) writeEvents(db *pebble.DB, events []kvEvents) error {
Expand Down
216 changes: 207 additions & 9 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@
"editable": true,
"gnetId": null,
"graphTooltip": 1,
"id": 33,
"iteration": 1732343791065,
"id": 32,
"iteration": 1732345252721,
"links": [],
"panels": [
{
Expand Down Expand Up @@ -4069,6 +4069,201 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_C1}",
"description": "",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 54
},
"hiddenSeries": false,
"id": 20028,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.17",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_dynamic_stream_event_chan_size{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", component=\"event-store\"}[1m])) by (instance,component)",
"hide": false,
"interval": "",
"legendFormat": "Input-chanel-len-{{instance}}",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "DS Input Channel Length",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:1887",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:1888",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_C1}",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 54
},
"hiddenSeries": false,
"id": 20036,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.17",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_dynamic_stream_pending_queue_len{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", component=\"event-store\"}[1m])) by (instance,component)",
"hide": false,
"interval": "",
"legendFormat": "len-{{instance}}",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "DS Pending Queue Length",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:1751",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:1752",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "Event Store",
Expand Down Expand Up @@ -5204,6 +5399,7 @@
},
"yaxes": [
{
"$$hashKey": "object:1751",
"format": "short",
"label": null,
"logBase": 1,
Expand All @@ -5212,6 +5408,7 @@
"show": true
},
{
"$$hashKey": "object:1752",
"format": "short",
"label": null,
"logBase": 1,
Expand All @@ -5231,7 +5428,6 @@
"dashLength": 10,
"dashes": false,
"datasource": "${DS_C1}",
"description": "",
"fieldConfig": {
"defaults": {},
"overrides": []
Expand All @@ -5245,7 +5441,7 @@
"y": 30
},
"hiddenSeries": false,
"id": 20028,
"id": 20035,
"legend": {
"avg": false,
"current": false,
Expand Down Expand Up @@ -5273,18 +5469,18 @@
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_dynamic_stream_event_chan_size{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", component=\"event-broker\"}[1m])) by (instance,component)",
"expr": "sum(rate(ticdc_dynamic_stream_pending_queue_len{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", component=\"event-broker\"}[1m])) by (instance,component)",
"hide": false,
"interval": "",
"legendFormat": "Input-chanel-len-{{instance}}",
"legendFormat": "len-{{instance}}",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "DS Input Channel Length",
"title": "DS Pending Queue Length",
"tooltip": {
"shared": true,
"sort": 0,
Expand All @@ -5300,6 +5496,7 @@
},
"yaxes": [
{
"$$hashKey": "object:1751",
"format": "short",
"label": null,
"logBase": 1,
Expand All @@ -5308,6 +5505,7 @@
"show": true
},
{
"$$hashKey": "object:1752",
"format": "short",
"label": null,
"logBase": 1,
Expand Down Expand Up @@ -27908,6 +28106,6 @@
},
"timezone": "browser",
"title": "${DS_C1}-TiCDC",
"uid": "YiGL8hBZ0aaa",
"version": 1
"uid": "YiGL8hBZ0aab",
"version": 2
}
1 change: 1 addition & 0 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func (c *eventBroker) getMessageCh(workerIndex int) chan wrapEvent {
}

func (c *eventBroker) runScanWorker(ctx context.Context) {
c.wg.Add(c.scanWorkerCount)
for i := 0; i < c.scanWorkerCount; i++ {
go func() {
defer c.wg.Done()
Expand Down

0 comments on commit 211ccbf

Please sign in to comment.