diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index 4c8b8a2f..0209620b 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -139,7 +139,8 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err return cursor, err } -func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, plm *sls.PullLogMeta, err error) { +func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (r *sls.PullLogsResponse, err error) { + fmt.Println("PullLogs") plr := &sls.PullLogRequest{ Project: consumer.option.Project, Logstore: consumer.option.Logstore, @@ -149,28 +150,29 @@ func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.Lo LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount, } for retry := 0; retry < 3; retry++ { - gl, plm, err = consumer.client.PullLogsWithQuery(plr) - if err != nil { - slsError, ok := err.(*sls.Error) - if ok { - level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error", - "shard", shardId, - "error", slsError, - "tryTimes", retry+1, - "cursor", cursor, - ) - if slsError.HTTPCode == 403 { - time.Sleep(5 * time.Second) - } - } else { - level.Warn(consumer.logger).Log("msg", "unknown error when pull log", - "shardId", shardId, - "cursor", cursor, - "error", err, - "tryTimes", retry+1) + r, err = consumer.client.PullLogsV3(plr) + if err == nil || retry >= 2 { + break + } + slsError, ok := err.(*sls.Error) + if ok { + level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error", + "shard", shardId, + "error", slsError, + "tryTimes", retry+1, + "cursor", cursor, + ) + if slsError.HTTPCode == 403 { + time.Sleep(5 * time.Second) } - time.Sleep(200 * time.Millisecond) + } else { + level.Warn(consumer.logger).Log("msg", "unknown error when pull log", + "shardId", shardId, + "cursor", cursor, + "error", err, + "tryTimes", retry+1) } + time.Sleep(200 * time.Millisecond) } // If you can't retry the log three times, it will return to empty list and start pulling the log cursor, // so that next time you will come in and pull the function again, which is equivalent to a dead cycle. diff --git a/consumer/tasks.go b/consumer/tasks.go index 1df4fd66..41885201 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -3,6 +3,7 @@ package consumerLibrary import ( "errors" "fmt" + sls "github.com/aliyun/aliyun-log-go-sdk" "runtime" "time" @@ -49,19 +50,21 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error { // update last fetch time, for control fetch frequency consumer.lastFetchTime = time.Now() - logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) + r, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) if err != nil { return err } // set cursors user to decide whether to save according to the execution of `process` consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor) - consumer.lastFetchLogGroupList = logGroup - consumer.nextFetchCursor = pullLogMeta.NextCursor - consumer.lastFetchRawSize = pullLogMeta.RawSize - consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList) + consumer.lastFetchLogGroupList = &sls.LogGroupList{ + FastLogGroups: r.LogGroups, + } + consumer.nextFetchCursor = r.NextCursor + consumer.lastFetchRawSize = r.RawSize + consumer.lastFetchGroupCount = len(r.LogGroups) if consumer.client.option.Query != "" { - consumer.lastFetchRawSizeBeforeQuery = pullLogMeta.RawSizeBeforeQuery - consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.RawDataCountBeforeQuery + consumer.lastFetchRawSizeBeforeQuery = r.RawSizeBeforeQuery + consumer.lastFetchGroupCountBeforeQuery = r.RawDataCountBeforeQuery if consumer.lastFetchRawSizeBeforeQuery == -1 { consumer.lastFetchRawSizeBeforeQuery = 0 } diff --git a/consumer/worker.go b/consumer/worker.go index a39909b2..9f7acbfc 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -1,8 +1,8 @@ package consumerLibrary import ( - "os" "io" + "os" "sync" "time" @@ -143,14 +143,14 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum } -func (consumerWorker *ConsumerWorker) cleanShardConsumer(owned_shards []int) { +func (consumerWorker *ConsumerWorker) cleanShardConsumer(ownedShards []int) { consumerWorker.shardConsumer.Range( func(key, value interface{}) bool { shard := key.(int) consumer := value.(*ShardConsumerWorker) - if !Contain(shard, owned_shards) { + if !Contain(shard, ownedShards) { level.Info(consumerWorker.Logger).Log("msg", "try to call shut down for unassigned consumer shard", "shardId", shard) consumer.consumerShutDown() level.Info(consumerWorker.Logger).Log("msg", "Complete call shut down for unassigned consumer shard", "shardId", shard) diff --git a/example/consumer/copy_data/copy_data.go b/example/consumer/copy_data/copy_data.go index f692a3fd..24cd5f1f 100644 --- a/example/consumer/copy_data/copy_data.go +++ b/example/consumer/copy_data/copy_data.go @@ -29,7 +29,7 @@ func main() { Logstore: "", ConsumerGroupName: "", ConsumerName: "", - // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. + // These options are used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. // Could be "begin", "end", "specific time format in time stamp", it's log receiving time. CursorPosition: consumerLibrary.BEGIN_CURSOR, } diff --git a/example/consumer/demo/simple_demo.go b/example/consumer/demo/simple_demo.go index fc383241..87af981c 100644 --- a/example/consumer/demo/simple_demo.go +++ b/example/consumer/demo/simple_demo.go @@ -23,9 +23,9 @@ func main() { Logstore: "", ConsumerGroupName: "", ConsumerName: "", - // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. + // These options are used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. // Could be "begin", "end", "specific time format in time stamp", it's log receiving time. - CursorPosition: consumerLibrary.END_CURSOR, + CursorPosition: consumerLibrary.BEGIN_CURSOR, } consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) @@ -41,7 +41,22 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { - fmt.Println(shardId, logGroupList) + //fmt.Println(shardId, logGroupList) + fmt.Println(shardId) + for _, lg := range logGroupList.FastLogGroups { + for _, tag := range lg.LogTags { + fmt.Printf("[tag] %s : %s\n", tag.Key, tag.Value) + } + fmt.Printf("[source] %s\n", lg.Source) + fmt.Printf("[topic] %s\n", lg.Topic) + for _, log := range lg.Logs { + fmt.Printf("[log] time = %d, nsTimePs = %d\n", log.Time, log.TimeNs) + for _, content := range log.Contents { + fmt.Printf("%s : %s\n", content.Key, content.Value) + } + } + } + checkpointTracker.SaveCheckPoint(false) return "", nil } diff --git a/log.pb.go b/log.pb.go index 18b508af..f61a8490 100644 --- a/log.pb.go +++ b/log.pb.go @@ -389,9 +389,10 @@ func (m *SlsLogPackageList) GetPackages() []*SlsLogPackage { type LogGroupList struct { LogGroups []*LogGroup `protobuf:"bytes,1,rep,name=LogGroups" json:"LogGroups,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + FastLogGroups []*FastLogGroup `json:"FastLogGroups"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *LogGroupList) Reset() { *m = LogGroupList{} } diff --git a/log_store.go b/log_store.go index 0a0b8750..c6434066 100644 --- a/log_store.go +++ b/log_store.go @@ -758,7 +758,7 @@ func decodeLog(rawBytes []byte, offset, length int) (f *FastLog, err error) { } } else if mode == 5 { if index == 4 { - f.TimeNsPart = uint32(rawBytes[values[2]])&255 | (uint32(rawBytes[values[2]+1])&255)<<8 | (uint32(rawBytes[values[2]+2])&255)<<16 | (uint32(rawBytes[values[2]+3])&255)<<24 + f.TimeNs = uint32(rawBytes[values[2]])&255 | (uint32(rawBytes[values[2]+1])&255)<<8 | (uint32(rawBytes[values[2]+2])&255)<<16 | (uint32(rawBytes[values[2]+3])&255)<<24 } pos = values[2] + 4 } else { @@ -820,7 +820,7 @@ func decodeLogGroup(rawBytes []byte, offset, length int) (f *FastLogGroup, err e if err != nil { return nil, err } - f.Tags = append(f.Tags, tag) + f.LogTags = append(f.LogTags, tag) break default: break diff --git a/model.go b/model.go index 8a33c844..48678063 100644 --- a/model.go +++ b/model.go @@ -82,9 +82,9 @@ type FastLogContent struct { } type FastLog struct { - Time uint32 - TimeNsPart uint32 - Contents []*FastLogContent + Time uint32 + TimeNs uint32 + Contents []*FastLogContent } type FastLogTag struct { @@ -93,10 +93,10 @@ type FastLogTag struct { } type FastLogGroup struct { - Logs []*FastLog - Tags []*FastLogTag - Source string - Topic string + Logs []*FastLog + LogTags []*FastLogTag + Source string + Topic string } type PullLogsResponse struct {