Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
crimson-gao committed Nov 28, 2024
1 parent bcd24fb commit 890f8ba
Showing 1 changed file with 33 additions and 29 deletions.
62 changes: 33 additions & 29 deletions consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
)

// todo: refine the sleep time
const noProgressSleepTime = 500 * time.Millisecond
const processFailedSleepTime = 50 * time.Millisecond
const fetchFailedSleepTime = 100 * time.Millisecond // todo: use backoff interval, [1, 2, 4, 8, ...]
const shutdownFailedSleepTime = 100 * time.Millisecond
const flushCheckPointFailedSleepTime = 100 * time.Millisecond
const (
noProgressSleepTime = 500 * time.Millisecond
processFailedSleepTime = 50 * time.Millisecond
fetchFailedSleepTime = 100 * time.Millisecond // todo: use backoff interval, [1, 2, 4, 8, ...]
shutdownFailedSleepTime = 100 * time.Millisecond
flushCheckPointFailedSleepTime = 100 * time.Millisecond
)

type ShardConsumerWorker struct {
client *ConsumerClient
Expand All @@ -31,17 +33,16 @@ type ShardConsumerWorker struct {
lastCheckpointSaveTime time.Time
shutDownFlag *atomic.Bool
stopped *atomic.Bool
startOnce sync.Once
startOnceFlag sync.Once
}

func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger) *ShardConsumerWorker {
shardLogger := log.With(logger, "shard", shardId)
shardConsumeWorker := &ShardConsumerWorker{
processor: processor,
consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger),
client: consumerClient,
shardId: shardId,
logger: shardLogger,
logger: log.With(logger, "shard", shardId),
shutDownFlag: atomic.NewBool(false),
stopped: atomic.NewBool(false),
lastCheckpointSaveTime: time.Now(),
Expand All @@ -51,7 +52,7 @@ func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consume
}

func (c *ShardConsumerWorker) ensureStarted() {
c.startOnce.Do(func() {
c.startOnceFlag.Do(func() {
go c.runLoop()
})
}
Expand All @@ -67,20 +68,18 @@ func (c *ShardConsumerWorker) runLoop() {
level.Info(c.logger).Log("msg", "runLoop got init cursor", "cursor", cursor)

for !c.shutDownFlag.Load() {
fetchTime := time.Now()
shouldCallProcess, logGroupList, plm, sleepTime := c.fetchLogs(cursor)
time.Sleep(sleepTime)

lastFetchTime := time.Now()
shouldCallProcess, logGroupList, plm := c.fetchLogs(cursor)
if !shouldCallProcess {
continue
}

cursor = c.callProcess(cursor, logGroupList, plm)
cursor = c.callProcess(logGroupList, plm)
if c.shutDownFlag.Load() {
break
}

c.sleepUtilNextFetch(fetchTime, plm)
c.sleepUtilNextFetch(lastFetchTime, plm)
}
}

Expand All @@ -95,34 +94,35 @@ func (consumer *ShardConsumerWorker) getInitCursor() string {
return ""
}

func (c *ShardConsumerWorker) fetchLogs(cursor string) (shouldCallProcess bool, logGroupList *sls.LogGroupList, plm *sls.PullLogMeta, sleepTime time.Duration) {
func (c *ShardConsumerWorker) fetchLogs(cursor string) (shouldCallProcess bool, logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) {

start := time.Now()
logGroupList, plm, err := c.client.pullLogs(c.shardId, cursor)
c.monitor.RecordFetchRequest(plm, err, start)

if err != nil {
return false, nil, nil, fetchFailedSleepTime
time.Sleep(fetchFailedSleepTime)
return false, nil, nil
}

c.consumerCheckPointTracker.setCurrentCursor(cursor)
c.consumerCheckPointTracker.setNextCursor(plm.NextCursor)

if cursor == plm.NextCursor { // already reach end of shard
c.saveCheckPointIfNeeded()
return false, nil, nil, noProgressSleepTime
time.Sleep(noProgressSleepTime)
return false, nil, nil
}
return true, logGroupList, plm, 0
return true, logGroupList, plm
}

func (c *ShardConsumerWorker) callProcess(cursor string, logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) (nextCursor string) {
for !c.shutDownFlag.Load() {

func (c *ShardConsumerWorker) callProcess(logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) (nextCursor string) {
for {
start := time.Now()
rollBackCheckpoint, err := c.processInternal(logGroupList)
c.monitor.RecordProcess(err, start)

c.saveCheckPointIfNeeded() // todo: should we save checkpoint here even if failed
c.saveCheckPointIfNeeded()
if err != nil {
level.Error(c.logger).Log("msg", "process func returns an error", "err", err)
}
Expand All @@ -134,15 +134,19 @@ func (c *ShardConsumerWorker) callProcess(cursor string, logGroupList *sls.LogGr
if err == nil {
return plm.NextCursor
}
// if process failed and shutting down, just quit
if c.shutDownFlag.Load() {
level.Warn(c.logger).Log("msg", "shutting down and last process failed, just quit")
return plm.NextCursor
}
time.Sleep(processFailedSleepTime)
}
return cursor
}

func (c *ShardConsumerWorker) processInternal(logGroup *sls.LogGroupList) (rollBackCheckpoint string, err error) {
defer func() {
if r := c.recoverIfPanic("your process function paniced"); r != nil {
err = fmt.Errorf("get a panic when process: %v", r)
if r := c.recoverIfPanic("panic in your process function"); r != nil {
err = fmt.Errorf("panic when process: %v", r)
}
}()

Expand All @@ -151,17 +155,17 @@ func (c *ShardConsumerWorker) processInternal(logGroup *sls.LogGroupList) (rollB

// call user shutdown func and flush checkpoint
func (c *ShardConsumerWorker) doShutDown() {
level.Info(c.logger).Log("msg", "start to shutdown, begin to call processor shutdown")
level.Info(c.logger).Log("msg", "begin to shutdown, invoking processor.shutdown")
for {
err := c.processor.Shutdown(c.consumerCheckPointTracker) // todo: should we catch panic here?
if err == nil {
break
}
level.Error(c.logger).Log("msg", "failed to call processor shutdown", "err", err)
level.Error(c.logger).Log("msg", "processor.shutdown finished with error", "err", err)
time.Sleep(shutdownFailedSleepTime)
}

level.Info(c.logger).Log("msg", "call processor shutdown succeed, begin to flush checkpoint")
level.Info(c.logger).Log("msg", "call processor.shutdown succeed, begin to flush checkpoint")

for {
err := c.consumerCheckPointTracker.flushCheckPoint()
Expand Down

0 comments on commit 890f8ba

Please sign in to comment.