From ac186b41a36edd57dc23a16701375d78c70106d2 Mon Sep 17 00:00:00 2001 From: Crimson <39024757+crimson-gao@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:46:17 +0800 Subject: [PATCH] feat: consumer support io worker config to reduce mem usage (#310) --- consumer/config.go | 4 ++++ consumer/shard_worker.go | 6 +++++- consumer/worker.go | 38 +++++++++++++++++++++++++++++++++++--- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/consumer/config.go b/consumer/config.go index bfe7ca67..c9eee499 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -52,6 +52,7 @@ type LogHubConfig struct { //:param AuthVersion: signature algorithm version, default is sls.AuthV1 //:param Region: region of sls endpoint, eg. cn-hangzhou, region must be set if AuthVersion is sls.AuthV4 //:param DisableRuntimeMetrics: disable runtime metrics, runtime metrics prints to local log. + //::param MaxIoWorkers: max io workers, default is 50. Smaller io workers will reduce memory usage, but may reduce throughput. Endpoint string AccessKeyID string AccessKeySecret string @@ -83,6 +84,7 @@ type LogHubConfig struct { AuthVersion sls.AuthVersionType Region string DisableRuntimeMetrics bool + MaxIoWorkers int } const ( @@ -98,3 +100,5 @@ const ( SHUTTING_DOWN = "SHUTTING_DOWN" SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE" ) + +const defaultMaxIoWorkers = 50 diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 0f0cbc87..1606402d 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -34,9 +34,10 @@ type ShardConsumerWorker struct { shutDownFlag *atomic.Bool stopped *atomic.Bool startOnceFlag sync.Once + ioThrottler ioThrottler } -func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger) *ShardConsumerWorker { +func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger, ioThrottler ioThrottler) *ShardConsumerWorker { shardConsumeWorker := &ShardConsumerWorker{ processor: processor, consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger), @@ -47,6 +48,7 @@ func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consume stopped: atomic.NewBool(false), lastCheckpointSaveTime: time.Now(), monitor: newShardMonitor(shardId, time.Minute), + ioThrottler: ioThrottler, } return shardConsumeWorker } @@ -95,6 +97,8 @@ func (consumer *ShardConsumerWorker) getInitCursor() string { } func (c *ShardConsumerWorker) fetchLogs(cursor string) (shouldCallProcess bool, logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) { + c.ioThrottler.Acquire() + defer c.ioThrottler.Release() start := time.Now() logGroupList, plm, err := c.client.pullLogs(c.shardId, cursor) diff --git a/consumer/worker.go b/consumer/worker.go index 308674a6..1831bfc3 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -21,6 +21,7 @@ type ConsumerWorker struct { processor Processor waitGroup sync.WaitGroup Logger log.Logger + ioThrottler ioThrottler } // depreciated: this old logic is to automatically save to memory, and then commit at a fixed time @@ -57,6 +58,10 @@ func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) * if logger == nil { logger = logConfig(option) } + maxIoWorker := defaultMaxIoWorkers + if option.MaxIoWorkers > 0 { + maxIoWorker = option.MaxIoWorkers + } consumerClient := initConsumerClient(option, logger) consumerHeatBeat := initConsumerHeatBeat(consumerClient, logger) @@ -65,8 +70,9 @@ func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) * client: consumerClient, workerShutDownFlag: atomic.NewBool(false), //shardConsumer: make(map[int]*ShardConsumerWorker), - processor: processor, - Logger: logger, + processor: processor, + Logger: logger, + ioThrottler: newSimpleIoThrottler(maxIoWorker), } if err := consumerClient.createConsumerGroup(); err != nil { level.Error(consumerWorker.Logger).Log( @@ -144,7 +150,12 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum if ok { return consumer.(*ShardConsumerWorker) } - consumerIns := newShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.processor, consumerWorker.Logger) + consumerIns := newShardConsumerWorker(shardId, + consumerWorker.client, + consumerWorker.consumerHeatBeat, + consumerWorker.processor, + consumerWorker.Logger, + consumerWorker.ioThrottler) consumerWorker.shardConsumer.Store(shardId, consumerIns) return consumerIns @@ -217,3 +228,24 @@ func logConfig(option LogHubConfig) log.Logger { logger = log.With(logger, "time", log.DefaultTimestampUTC, "caller", log.DefaultCaller) return logger } + +type ioThrottler interface { + Acquire() + Release() +} + +type simpleIoThrottler struct { + chance chan struct{} +} + +func newSimpleIoThrottler(maxIoWorkers int) *simpleIoThrottler { + return &simpleIoThrottler{ + chance: make(chan struct{}, maxIoWorkers), + } +} +func (t *simpleIoThrottler) Acquire() { + t.chance <- struct{}{} +} +func (t *simpleIoThrottler) Release() { + <-t.chance +}