diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 2635b9bb..0f0cbc87 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -14,11 +14,13 @@ import ( ) // todo: refine the sleep time -const noProgressSleepTime = 500 * time.Millisecond -const processFailedSleepTime = 50 * time.Millisecond -const fetchFailedSleepTime = 100 * time.Millisecond // todo: use backoff interval, [1, 2, 4, 8, ...] -const shutdownFailedSleepTime = 100 * time.Millisecond -const flushCheckPointFailedSleepTime = 100 * time.Millisecond +const ( + noProgressSleepTime = 500 * time.Millisecond + processFailedSleepTime = 50 * time.Millisecond + fetchFailedSleepTime = 100 * time.Millisecond // todo: use backoff interval, [1, 2, 4, 8, ...] + shutdownFailedSleepTime = 100 * time.Millisecond + flushCheckPointFailedSleepTime = 100 * time.Millisecond +) type ShardConsumerWorker struct { client *ConsumerClient @@ -31,17 +33,16 @@ type ShardConsumerWorker struct { lastCheckpointSaveTime time.Time shutDownFlag *atomic.Bool stopped *atomic.Bool - startOnce sync.Once + startOnceFlag sync.Once } func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger) *ShardConsumerWorker { - shardLogger := log.With(logger, "shard", shardId) shardConsumeWorker := &ShardConsumerWorker{ processor: processor, consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger), client: consumerClient, shardId: shardId, - logger: shardLogger, + logger: log.With(logger, "shard", shardId), shutDownFlag: atomic.NewBool(false), stopped: atomic.NewBool(false), lastCheckpointSaveTime: time.Now(), @@ -51,7 +52,7 @@ func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consume } func (c *ShardConsumerWorker) ensureStarted() { - c.startOnce.Do(func() { + c.startOnceFlag.Do(func() { go c.runLoop() }) } @@ -67,20 +68,18 @@ func (c *ShardConsumerWorker) runLoop() { level.Info(c.logger).Log("msg", "runLoop got init cursor", "cursor", cursor) for !c.shutDownFlag.Load() { - fetchTime := time.Now() - shouldCallProcess, logGroupList, plm, sleepTime := c.fetchLogs(cursor) - time.Sleep(sleepTime) - + lastFetchTime := time.Now() + shouldCallProcess, logGroupList, plm := c.fetchLogs(cursor) if !shouldCallProcess { continue } - cursor = c.callProcess(cursor, logGroupList, plm) + cursor = c.callProcess(logGroupList, plm) if c.shutDownFlag.Load() { break } - c.sleepUtilNextFetch(fetchTime, plm) + c.sleepUtilNextFetch(lastFetchTime, plm) } } @@ -95,14 +94,15 @@ func (consumer *ShardConsumerWorker) getInitCursor() string { return "" } -func (c *ShardConsumerWorker) fetchLogs(cursor string) (shouldCallProcess bool, logGroupList *sls.LogGroupList, plm *sls.PullLogMeta, sleepTime time.Duration) { +func (c *ShardConsumerWorker) fetchLogs(cursor string) (shouldCallProcess bool, logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) { start := time.Now() logGroupList, plm, err := c.client.pullLogs(c.shardId, cursor) c.monitor.RecordFetchRequest(plm, err, start) if err != nil { - return false, nil, nil, fetchFailedSleepTime + time.Sleep(fetchFailedSleepTime) + return false, nil, nil } c.consumerCheckPointTracker.setCurrentCursor(cursor) @@ -110,19 +110,19 @@ func (c *ShardConsumerWorker) fetchLogs(cursor string) (shouldCallProcess bool, if cursor == plm.NextCursor { // already reach end of shard c.saveCheckPointIfNeeded() - return false, nil, nil, noProgressSleepTime + time.Sleep(noProgressSleepTime) + return false, nil, nil } - return true, logGroupList, plm, 0 + return true, logGroupList, plm } -func (c *ShardConsumerWorker) callProcess(cursor string, logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) (nextCursor string) { - for !c.shutDownFlag.Load() { - +func (c *ShardConsumerWorker) callProcess(logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) (nextCursor string) { + for { start := time.Now() rollBackCheckpoint, err := c.processInternal(logGroupList) c.monitor.RecordProcess(err, start) - c.saveCheckPointIfNeeded() // todo: should we save checkpoint here even if failed + c.saveCheckPointIfNeeded() if err != nil { level.Error(c.logger).Log("msg", "process func returns an error", "err", err) } @@ -134,15 +134,19 @@ func (c *ShardConsumerWorker) callProcess(cursor string, logGroupList *sls.LogGr if err == nil { return plm.NextCursor } + // if process failed and shutting down, just quit + if c.shutDownFlag.Load() { + level.Warn(c.logger).Log("msg", "shutting down and last process failed, just quit") + return plm.NextCursor + } time.Sleep(processFailedSleepTime) } - return cursor } func (c *ShardConsumerWorker) processInternal(logGroup *sls.LogGroupList) (rollBackCheckpoint string, err error) { defer func() { - if r := c.recoverIfPanic("your process function paniced"); r != nil { - err = fmt.Errorf("get a panic when process: %v", r) + if r := c.recoverIfPanic("panic in your process function"); r != nil { + err = fmt.Errorf("panic when process: %v", r) } }() @@ -151,17 +155,17 @@ func (c *ShardConsumerWorker) processInternal(logGroup *sls.LogGroupList) (rollB // call user shutdown func and flush checkpoint func (c *ShardConsumerWorker) doShutDown() { - level.Info(c.logger).Log("msg", "start to shutdown, begin to call processor shutdown") + level.Info(c.logger).Log("msg", "begin to shutdown, invoking processor.shutdown") for { err := c.processor.Shutdown(c.consumerCheckPointTracker) // todo: should we catch panic here? if err == nil { break } - level.Error(c.logger).Log("msg", "failed to call processor shutdown", "err", err) + level.Error(c.logger).Log("msg", "processor.shutdown finished with error", "err", err) time.Sleep(shutdownFailedSleepTime) } - level.Info(c.logger).Log("msg", "call processor shutdown succeed, begin to flush checkpoint") + level.Info(c.logger).Log("msg", "call processor.shutdown succeed, begin to flush checkpoint") for { err := c.consumerCheckPointTracker.flushCheckPoint()