Skip to content

Commit

Permalink
fix consumer not used v3 bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ke.like committed Apr 16, 2024
1 parent 8b9e80c commit de4191f
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 47 deletions.
44 changes: 23 additions & 21 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err
return cursor, err
}

func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, plm *sls.PullLogMeta, err error) {
func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (r *sls.PullLogsResponse, err error) {
fmt.Println("PullLogs")
plr := &sls.PullLogRequest{
Project: consumer.option.Project,
Logstore: consumer.option.Logstore,
Expand All @@ -149,28 +150,29 @@ func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.Lo
LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount,
}
for retry := 0; retry < 3; retry++ {
gl, plm, err = consumer.client.PullLogsWithQuery(plr)
if err != nil {
slsError, ok := err.(*sls.Error)
if ok {
level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error",
"shard", shardId,
"error", slsError,
"tryTimes", retry+1,
"cursor", cursor,
)
if slsError.HTTPCode == 403 {
time.Sleep(5 * time.Second)
}
} else {
level.Warn(consumer.logger).Log("msg", "unknown error when pull log",
"shardId", shardId,
"cursor", cursor,
"error", err,
"tryTimes", retry+1)
r, err = consumer.client.PullLogsV3(plr)
if err == nil || retry >= 2 {
break
}
slsError, ok := err.(*sls.Error)
if ok {
level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error",
"shard", shardId,
"error", slsError,
"tryTimes", retry+1,
"cursor", cursor,
)
if slsError.HTTPCode == 403 {
time.Sleep(5 * time.Second)
}
time.Sleep(200 * time.Millisecond)
} else {
level.Warn(consumer.logger).Log("msg", "unknown error when pull log",
"shardId", shardId,
"cursor", cursor,
"error", err,
"tryTimes", retry+1)
}
time.Sleep(200 * time.Millisecond)
}
// If you can't retry the log three times, it will return to empty list and start pulling the log cursor,
// so that next time you will come in and pull the function again, which is equivalent to a dead cycle.
Expand Down
17 changes: 10 additions & 7 deletions consumer/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consumerLibrary
import (
"errors"
"fmt"
sls "github.com/aliyun/aliyun-log-go-sdk"
"runtime"
"time"

Expand Down Expand Up @@ -49,19 +50,21 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error {
// update last fetch time, for control fetch frequency
consumer.lastFetchTime = time.Now()

logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor)
r, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor)
if err != nil {
return err
}
// set cursors user to decide whether to save according to the execution of `process`
consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor)
consumer.lastFetchLogGroupList = logGroup
consumer.nextFetchCursor = pullLogMeta.NextCursor
consumer.lastFetchRawSize = pullLogMeta.RawSize
consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList)
consumer.lastFetchLogGroupList = &sls.LogGroupList{
FastLogGroups: r.LogGroups,
}
consumer.nextFetchCursor = r.NextCursor
consumer.lastFetchRawSize = r.RawSize
consumer.lastFetchGroupCount = len(r.LogGroups)
if consumer.client.option.Query != "" {
consumer.lastFetchRawSizeBeforeQuery = pullLogMeta.RawSizeBeforeQuery
consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.RawDataCountBeforeQuery
consumer.lastFetchRawSizeBeforeQuery = r.RawSizeBeforeQuery
consumer.lastFetchGroupCountBeforeQuery = r.RawDataCountBeforeQuery
if consumer.lastFetchRawSizeBeforeQuery == -1 {
consumer.lastFetchRawSizeBeforeQuery = 0
}
Expand Down
6 changes: 3 additions & 3 deletions consumer/worker.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package consumerLibrary

import (
"os"
"io"
"os"
"sync"
"time"

Expand Down Expand Up @@ -143,14 +143,14 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum

}

func (consumerWorker *ConsumerWorker) cleanShardConsumer(owned_shards []int) {
func (consumerWorker *ConsumerWorker) cleanShardConsumer(ownedShards []int) {

consumerWorker.shardConsumer.Range(
func(key, value interface{}) bool {
shard := key.(int)
consumer := value.(*ShardConsumerWorker)

if !Contain(shard, owned_shards) {
if !Contain(shard, ownedShards) {
level.Info(consumerWorker.Logger).Log("msg", "try to call shut down for unassigned consumer shard", "shardId", shard)
consumer.consumerShutDown()
level.Info(consumerWorker.Logger).Log("msg", "Complete call shut down for unassigned consumer shard", "shardId", shard)
Expand Down
2 changes: 1 addition & 1 deletion example/consumer/copy_data/copy_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
Logstore: "",
ConsumerGroupName: "",
ConsumerName: "",
// This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
// These options are used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
// Could be "begin", "end", "specific time format in time stamp", it's log receiving time.
CursorPosition: consumerLibrary.BEGIN_CURSOR,
}
Expand Down
21 changes: 18 additions & 3 deletions example/consumer/demo/simple_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func main() {
Logstore: "",
ConsumerGroupName: "",
ConsumerName: "",
// This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
// These options are used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
// Could be "begin", "end", "specific time format in time stamp", it's log receiving time.
CursorPosition: consumerLibrary.END_CURSOR,
CursorPosition: consumerLibrary.BEGIN_CURSOR,
}

consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process)
Expand All @@ -41,7 +41,22 @@ func main() {
// Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value,
// otherwise you will report errors.
func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
fmt.Println(shardId, logGroupList)
//fmt.Println(shardId, logGroupList)
fmt.Println(shardId)
for _, lg := range logGroupList.FastLogGroups {
for _, tag := range lg.LogTags {
fmt.Printf("[tag] %s : %s\n", tag.Key, tag.Value)
}
fmt.Printf("[source] %s\n", lg.Source)
fmt.Printf("[topic] %s\n", lg.Topic)
for _, log := range lg.Logs {
fmt.Printf("[log] time = %d, nsTimePs = %d\n", log.Time, log.TimeNs)
for _, content := range log.Contents {
fmt.Printf("%s : %s\n", content.Key, content.Value)
}
}
}

checkpointTracker.SaveCheckPoint(false)
return "", nil
}
7 changes: 4 additions & 3 deletions log.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func decodeLog(rawBytes []byte, offset, length int) (f *FastLog, err error) {
}
} else if mode == 5 {
if index == 4 {
f.TimeNsPart = uint32(rawBytes[values[2]])&255 | (uint32(rawBytes[values[2]+1])&255)<<8 | (uint32(rawBytes[values[2]+2])&255)<<16 | (uint32(rawBytes[values[2]+3])&255)<<24
f.TimeNs = uint32(rawBytes[values[2]])&255 | (uint32(rawBytes[values[2]+1])&255)<<8 | (uint32(rawBytes[values[2]+2])&255)<<16 | (uint32(rawBytes[values[2]+3])&255)<<24
}
pos = values[2] + 4
} else {
Expand Down Expand Up @@ -820,7 +820,7 @@ func decodeLogGroup(rawBytes []byte, offset, length int) (f *FastLogGroup, err e
if err != nil {
return nil, err
}
f.Tags = append(f.Tags, tag)
f.LogTags = append(f.LogTags, tag)
break
default:
break
Expand Down
14 changes: 7 additions & 7 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ type FastLogContent struct {
}

type FastLog struct {
Time uint32
TimeNsPart uint32
Contents []*FastLogContent
Time uint32
TimeNs uint32
Contents []*FastLogContent
}

type FastLogTag struct {
Expand All @@ -93,10 +93,10 @@ type FastLogTag struct {
}

type FastLogGroup struct {
Logs []*FastLog
Tags []*FastLogTag
Source string
Topic string
Logs []*FastLog
LogTags []*FastLogTag
Source string
Topic string
}

type PullLogsResponse struct {
Expand Down

0 comments on commit de4191f

Please sign in to comment.