From 8b9e80c1806467def95b72da8f52b26240689426 Mon Sep 17 00:00:00 2001 From: "ke.like" Date: Mon, 15 Apr 2024 19:42:14 +0800 Subject: [PATCH 1/7] Parse protobuf with custom parser --- client_interface.go | 3 +- client_store.go | 5 + log_store.go | 295 ++++++++++++++++++++++++++++++++++++ model.go | 33 +++- token_auto_update_client.go | 10 ++ 5 files changed, 344 insertions(+), 2 deletions(-) diff --git a/client_interface.go b/client_interface.go index 40321773..0b1b8b60 100644 --- a/client_interface.go +++ b/client_interface.go @@ -280,8 +280,9 @@ type ClientInterface interface { // @note if you want to pull logs continuous, set endCursor = "" PullLogs(project, logstore string, shardID int, cursor, endCursor string, logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) - // Deprecated: Use PullLogsWithQuery instead. + // Deprecated: Use PullLogsV3 instead. PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) + PullLogsV3(plr *PullLogRequest) (*PullLogsResponse, error) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) // GetHistograms query logs with [from, to) time range GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) diff --git a/client_store.go b/client_store.go index cd0c9efe..751205f9 100644 --- a/client_store.go +++ b/client_store.go @@ -228,6 +228,11 @@ func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor s return ls.PullLogsV2(plr) } +func (c *Client) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err error) { + ls := convertLogstore(c, plr.Project, plr.Logstore) + return ls.PullLogsV3(plr) +} + func (c *Client) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) { ls := convertLogstore(c, plr.Project, plr.Logstore) return ls.PullLogsWithQuery(plr) diff --git a/log_store.go b/log_store.go index 6b6f251a..0a0b8750 100644 --- a/log_store.go +++ b/log_store.go @@ -2,6 +2,7 @@ package sls import ( "encoding/json" + "errors" "fmt" "time" @@ -586,6 +587,300 @@ func (s *LogStore) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm return } +func (s *LogStore) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err error) { + out, plm, err := s.GetLogsBytesWithQuery(plr) + if err != nil { + return nil, err + } + r = &PullLogsResponse{ + NextCursor: plm.NextCursor, + RawSize: plm.RawSize, + RawSizeBeforeQuery: plm.RawSizeBeforeQuery, + RawDataCountBeforeQuery: plm.RawDataCountBeforeQuery, + } + r.LogGroups, err = decodeLogGroups(out, r.RawSize) + if err != nil { + return nil, err + } + return r, nil +} + +func decodeVarInt32(data []byte, pos, rawSize int) ([]int, error) { + values := []int{0, 0, 0} + shift := 0 + var b int + for i := pos; i < rawSize; i++ { + b = int(data[i]) & 0xff + values[1] |= (b & 127) << shift + shift += 7 + if (b & 128) == 0 { + values[2] = i + 1 + values[0] = 1 + break + } + } + if values[0] == 0 { + return nil, errors.New("error decoding varint32 from pos " + strconv.FormatInt(int64(pos), 10)) + } + return values, nil +} + +func decodeTag(rawBytes []byte, offset, length int) (f *FastLogTag, err error) { + f = &FastLogTag{} + pos, endOffset := offset, offset+length + var values []int + var mode, index int + for pos < endOffset { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + mode = values[1] & 0x7 + index = values[1] >> 3 + if mode == 0 { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + } else if mode == 1 { + pos = values[2] + 8 + } else if mode == 2 { + pos = values[2] + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + values[1] + if index == 1 { + f.Key = string(rawBytes[values[2] : values[2]+values[1]]) + } else if index == 2 { + f.Value = string(rawBytes[values[2] : values[2]+values[1]]) + } + } else if mode == 5 { + pos = values[2] + 4 + } else { + return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + } + } + // TODO check key and value + if pos != endOffset { + return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + } + return f, nil +} + +func decodeLogContent(rawBytes []byte, offset, length int) (f *FastLogContent, err error) { + f = &FastLogContent{} + pos, endOffset := offset, offset+length + var values []int + var mode, index int + for pos < endOffset { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + mode = values[1] & 0x7 + index = values[1] >> 3 + pos = values[2] + if mode == 0 { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + } else if mode == 1 { + pos += 8 + } else if mode == 2 { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + values[1] + if index == 1 { + f.Key = string(rawBytes[values[2] : values[2]+values[1]]) + } else if index == 2 { + f.Value = string(rawBytes[values[2] : values[2]+values[1]]) + } + } else if mode == 5 { + pos += 4 + } else { + return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + } + } + // TODO check key and value + if pos != endOffset { + return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + } + return f, nil +} + +func decodeLog(rawBytes []byte, offset, length int) (f *FastLog, err error) { + f = &FastLog{} + pos, endOffset := offset, offset+length + var values []int + var mode, index int + findTime := false + for pos < endOffset { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + mode = values[1] & 0x7 + index = values[1] >> 3 + if mode == 0 { + pos = values[2] + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + if index == 1 { + f.Time = uint32(values[1]) + findTime = true + } + } else if mode == 1 { + pos = values[2] + 8 + } else if mode == 2 { + pos = values[2] + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + values[1] + if index == 2 { + fc, err := decodeLogContent(rawBytes, values[2], values[1]) + if err != nil { + return nil, err + } + f.Contents = append(f.Contents, fc) + } + } else if mode == 5 { + if index == 4 { + f.TimeNsPart = uint32(rawBytes[values[2]])&255 | (uint32(rawBytes[values[2]+1])&255)<<8 | (uint32(rawBytes[values[2]+2])&255)<<16 | (uint32(rawBytes[values[2]+3])&255)<<24 + } + pos = values[2] + 4 + } else { + return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + } + } + if !findTime { + return nil, errors.New("time is not found in log from pos " + strconv.FormatInt(int64(pos), 10)) + } + if pos != endOffset { + return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + } + return f, nil +} + +func decodeLogGroup(rawBytes []byte, offset, length int) (f *FastLogGroup, err error) { + f = &FastLogGroup{} + pos, endOffset := offset, offset+length + var values []int + var mode, index int + for pos < endOffset { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + mode = values[1] & 0x7 + index = values[1] >> 3 + if mode == 0 { + pos = values[2] + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + } else if mode == 1 { + pos = values[2] + 8 + } else if mode == 2 { + pos = values[2] + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + switch index { + case 1: //logs + log, err := decodeLog(rawBytes, values[2], values[1]) + if err != nil { + return nil, err + } + f.Logs = append(f.Logs, log) + break + case 3: + f.Topic = string(rawBytes[values[2] : values[2]+values[1]]) + break + case 4: + f.Source = string(rawBytes[values[2] : values[2]+values[1]]) + break + case 6: //tags + tag, err := decodeTag(rawBytes, values[2], values[1]) + if err != nil { + return nil, err + } + f.Tags = append(f.Tags, tag) + break + default: + break + } + pos = values[2] + values[1] + } + } + if pos != endOffset { + return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + } + return f, nil +} + +// decodeLogGroups decodes logs binary data returned by GetLogsBytes API +func decodeLogGroups(data []byte, rawSize int) (logGroups []*FastLogGroup, err error) { + pos, rawSize := 0, len(data) + var mode, index int + var values []int + logGroups = []*FastLogGroup{} + for pos < rawSize { + values, err = decodeVarInt32(data, pos, rawSize) + if err != nil { + return nil, err + } + pos = values[2] + mode = values[1] & 0x7 + index = values[1] >> 3 + if mode == 0 { + values, err = decodeVarInt32(data, pos, rawSize) + if err != nil { + return nil, err + } + pos = values[2] + } else if mode == 1 { + pos += 8 + } else if mode == 2 { + values, err = decodeVarInt32(data, pos, rawSize) + if err != nil { + return nil, err + } + if index == 1 { + f, err := decodeLogGroup(data, values[2], values[1]) + if err != nil { + return nil, err + } + logGroups = append(logGroups, f) + } + pos = values[1] + values[2] + } else if mode == 5 { + pos += 4 + } else { + return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + } + } + if pos != rawSize { + return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10) + " rawSize " + strconv.FormatInt(int64(rawSize), 10)) + } + return logGroups, nil +} + // GetHistograms query logs with [from, to) time range func (s *LogStore) GetHistograms(topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) { diff --git a/model.go b/model.go index 5e6008c4..8a33c844 100644 --- a/model.go +++ b/model.go @@ -76,7 +76,37 @@ type PullLogMeta struct { RawDataCountBeforeQuery int } -// GetHistogramsResponse defines response from GetHistograms call +type FastLogContent struct { + Key string + Value string +} + +type FastLog struct { + Time uint32 + TimeNsPart uint32 + Contents []*FastLogContent +} + +type FastLogTag struct { + Key string + Value string +} + +type FastLogGroup struct { + Logs []*FastLog + Tags []*FastLogTag + Source string + Topic string +} + +type PullLogsResponse struct { + NextCursor string + RawSize int + RawSizeBeforeQuery int + RawDataCountBeforeQuery int + LogGroups []*FastLogGroup +} + type SingleHistogram struct { Progress string `json:"progress"` Count int64 `json:"count"` @@ -84,6 +114,7 @@ type SingleHistogram struct { To int64 `json:"to"` } +// GetHistogramsResponse defines response from GetHistograms call type GetHistogramsResponse struct { Progress string `json:"progress"` Count int64 `json:"count"` diff --git a/token_auto_update_client.go b/token_auto_update_client.go index ed065ca4..2857cb53 100644 --- a/token_auto_update_client.go +++ b/token_auto_update_client.go @@ -871,6 +871,16 @@ func (c *TokenAutoUpdateClient) PullLogsV2(plr *PullLogRequest) (gl *LogGroupLis return } +func (c *TokenAutoUpdateClient) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err error) { + for i := 0; i < c.maxTryTimes; i++ { + r, err = c.logClient.PullLogsV3(plr) + if !c.processError(err) { + return + } + } + return +} + func (c *TokenAutoUpdateClient) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) { for i := 0; i < c.maxTryTimes; i++ { gl, plm, err = c.logClient.PullLogsWithQuery(plr) From de4191fa6752992ae9da8fa9b6ccebee3fe97b17 Mon Sep 17 00:00:00 2001 From: "ke.like" Date: Tue, 16 Apr 2024 11:50:22 +0800 Subject: [PATCH 2/7] fix consumer not used v3 bug --- consumer/consumer_client.go | 44 +++++++++++++------------ consumer/tasks.go | 17 ++++++---- consumer/worker.go | 6 ++-- example/consumer/copy_data/copy_data.go | 2 +- example/consumer/demo/simple_demo.go | 21 ++++++++++-- log.pb.go | 7 ++-- log_store.go | 4 +-- model.go | 14 ++++---- 8 files changed, 68 insertions(+), 47 deletions(-) diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index 4c8b8a2f..0209620b 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -139,7 +139,8 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err return cursor, err } -func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, plm *sls.PullLogMeta, err error) { +func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (r *sls.PullLogsResponse, err error) { + fmt.Println("PullLogs") plr := &sls.PullLogRequest{ Project: consumer.option.Project, Logstore: consumer.option.Logstore, @@ -149,28 +150,29 @@ func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.Lo LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount, } for retry := 0; retry < 3; retry++ { - gl, plm, err = consumer.client.PullLogsWithQuery(plr) - if err != nil { - slsError, ok := err.(*sls.Error) - if ok { - level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error", - "shard", shardId, - "error", slsError, - "tryTimes", retry+1, - "cursor", cursor, - ) - if slsError.HTTPCode == 403 { - time.Sleep(5 * time.Second) - } - } else { - level.Warn(consumer.logger).Log("msg", "unknown error when pull log", - "shardId", shardId, - "cursor", cursor, - "error", err, - "tryTimes", retry+1) + r, err = consumer.client.PullLogsV3(plr) + if err == nil || retry >= 2 { + break + } + slsError, ok := err.(*sls.Error) + if ok { + level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error", + "shard", shardId, + "error", slsError, + "tryTimes", retry+1, + "cursor", cursor, + ) + if slsError.HTTPCode == 403 { + time.Sleep(5 * time.Second) } - time.Sleep(200 * time.Millisecond) + } else { + level.Warn(consumer.logger).Log("msg", "unknown error when pull log", + "shardId", shardId, + "cursor", cursor, + "error", err, + "tryTimes", retry+1) } + time.Sleep(200 * time.Millisecond) } // If you can't retry the log three times, it will return to empty list and start pulling the log cursor, // so that next time you will come in and pull the function again, which is equivalent to a dead cycle. diff --git a/consumer/tasks.go b/consumer/tasks.go index 1df4fd66..41885201 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -3,6 +3,7 @@ package consumerLibrary import ( "errors" "fmt" + sls "github.com/aliyun/aliyun-log-go-sdk" "runtime" "time" @@ -49,19 +50,21 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error { // update last fetch time, for control fetch frequency consumer.lastFetchTime = time.Now() - logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) + r, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) if err != nil { return err } // set cursors user to decide whether to save according to the execution of `process` consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor) - consumer.lastFetchLogGroupList = logGroup - consumer.nextFetchCursor = pullLogMeta.NextCursor - consumer.lastFetchRawSize = pullLogMeta.RawSize - consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList) + consumer.lastFetchLogGroupList = &sls.LogGroupList{ + FastLogGroups: r.LogGroups, + } + consumer.nextFetchCursor = r.NextCursor + consumer.lastFetchRawSize = r.RawSize + consumer.lastFetchGroupCount = len(r.LogGroups) if consumer.client.option.Query != "" { - consumer.lastFetchRawSizeBeforeQuery = pullLogMeta.RawSizeBeforeQuery - consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.RawDataCountBeforeQuery + consumer.lastFetchRawSizeBeforeQuery = r.RawSizeBeforeQuery + consumer.lastFetchGroupCountBeforeQuery = r.RawDataCountBeforeQuery if consumer.lastFetchRawSizeBeforeQuery == -1 { consumer.lastFetchRawSizeBeforeQuery = 0 } diff --git a/consumer/worker.go b/consumer/worker.go index a39909b2..9f7acbfc 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -1,8 +1,8 @@ package consumerLibrary import ( - "os" "io" + "os" "sync" "time" @@ -143,14 +143,14 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum } -func (consumerWorker *ConsumerWorker) cleanShardConsumer(owned_shards []int) { +func (consumerWorker *ConsumerWorker) cleanShardConsumer(ownedShards []int) { consumerWorker.shardConsumer.Range( func(key, value interface{}) bool { shard := key.(int) consumer := value.(*ShardConsumerWorker) - if !Contain(shard, owned_shards) { + if !Contain(shard, ownedShards) { level.Info(consumerWorker.Logger).Log("msg", "try to call shut down for unassigned consumer shard", "shardId", shard) consumer.consumerShutDown() level.Info(consumerWorker.Logger).Log("msg", "Complete call shut down for unassigned consumer shard", "shardId", shard) diff --git a/example/consumer/copy_data/copy_data.go b/example/consumer/copy_data/copy_data.go index f692a3fd..24cd5f1f 100644 --- a/example/consumer/copy_data/copy_data.go +++ b/example/consumer/copy_data/copy_data.go @@ -29,7 +29,7 @@ func main() { Logstore: "", ConsumerGroupName: "", ConsumerName: "", - // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. + // These options are used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. // Could be "begin", "end", "specific time format in time stamp", it's log receiving time. CursorPosition: consumerLibrary.BEGIN_CURSOR, } diff --git a/example/consumer/demo/simple_demo.go b/example/consumer/demo/simple_demo.go index fc383241..87af981c 100644 --- a/example/consumer/demo/simple_demo.go +++ b/example/consumer/demo/simple_demo.go @@ -23,9 +23,9 @@ func main() { Logstore: "", ConsumerGroupName: "", ConsumerName: "", - // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. + // These options are used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. // Could be "begin", "end", "specific time format in time stamp", it's log receiving time. - CursorPosition: consumerLibrary.END_CURSOR, + CursorPosition: consumerLibrary.BEGIN_CURSOR, } consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) @@ -41,7 +41,22 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { - fmt.Println(shardId, logGroupList) + //fmt.Println(shardId, logGroupList) + fmt.Println(shardId) + for _, lg := range logGroupList.FastLogGroups { + for _, tag := range lg.LogTags { + fmt.Printf("[tag] %s : %s\n", tag.Key, tag.Value) + } + fmt.Printf("[source] %s\n", lg.Source) + fmt.Printf("[topic] %s\n", lg.Topic) + for _, log := range lg.Logs { + fmt.Printf("[log] time = %d, nsTimePs = %d\n", log.Time, log.TimeNs) + for _, content := range log.Contents { + fmt.Printf("%s : %s\n", content.Key, content.Value) + } + } + } + checkpointTracker.SaveCheckPoint(false) return "", nil } diff --git a/log.pb.go b/log.pb.go index 18b508af..f61a8490 100644 --- a/log.pb.go +++ b/log.pb.go @@ -389,9 +389,10 @@ func (m *SlsLogPackageList) GetPackages() []*SlsLogPackage { type LogGroupList struct { LogGroups []*LogGroup `protobuf:"bytes,1,rep,name=LogGroups" json:"LogGroups,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + FastLogGroups []*FastLogGroup `json:"FastLogGroups"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *LogGroupList) Reset() { *m = LogGroupList{} } diff --git a/log_store.go b/log_store.go index 0a0b8750..c6434066 100644 --- a/log_store.go +++ b/log_store.go @@ -758,7 +758,7 @@ func decodeLog(rawBytes []byte, offset, length int) (f *FastLog, err error) { } } else if mode == 5 { if index == 4 { - f.TimeNsPart = uint32(rawBytes[values[2]])&255 | (uint32(rawBytes[values[2]+1])&255)<<8 | (uint32(rawBytes[values[2]+2])&255)<<16 | (uint32(rawBytes[values[2]+3])&255)<<24 + f.TimeNs = uint32(rawBytes[values[2]])&255 | (uint32(rawBytes[values[2]+1])&255)<<8 | (uint32(rawBytes[values[2]+2])&255)<<16 | (uint32(rawBytes[values[2]+3])&255)<<24 } pos = values[2] + 4 } else { @@ -820,7 +820,7 @@ func decodeLogGroup(rawBytes []byte, offset, length int) (f *FastLogGroup, err e if err != nil { return nil, err } - f.Tags = append(f.Tags, tag) + f.LogTags = append(f.LogTags, tag) break default: break diff --git a/model.go b/model.go index 8a33c844..48678063 100644 --- a/model.go +++ b/model.go @@ -82,9 +82,9 @@ type FastLogContent struct { } type FastLog struct { - Time uint32 - TimeNsPart uint32 - Contents []*FastLogContent + Time uint32 + TimeNs uint32 + Contents []*FastLogContent } type FastLogTag struct { @@ -93,10 +93,10 @@ type FastLogTag struct { } type FastLogGroup struct { - Logs []*FastLog - Tags []*FastLogTag - Source string - Topic string + Logs []*FastLog + LogTags []*FastLogTag + Source string + Topic string } type PullLogsResponse struct { From d2950b6e3d1ac246382627b3ab78ce8f9e1c0d54 Mon Sep 17 00:00:00 2001 From: "ke.like" Date: Tue, 16 Apr 2024 12:40:57 +0800 Subject: [PATCH 3/7] refine --- log_store.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/log_store.go b/log_store.go index c6434066..f64e8760 100644 --- a/log_store.go +++ b/log_store.go @@ -661,12 +661,12 @@ func decodeTag(rawBytes []byte, offset, length int) (f *FastLogTag, err error) { } else if mode == 5 { pos = values[2] + 4 } else { - return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + return nil, fmt.Errorf("unexpected mode: %d", mode) } } // TODO check key and value if pos != endOffset { - return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + return nil, fmt.Errorf("unable to parse pos %d", pos) } return f, nil } @@ -706,12 +706,12 @@ func decodeLogContent(rawBytes []byte, offset, length int) (f *FastLogContent, e } else if mode == 5 { pos += 4 } else { - return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + return nil, fmt.Errorf("unexpected mode: %d", mode) } } // TODO check key and value if pos != endOffset { - return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + return nil, fmt.Errorf("unable to parse pos %d", pos) } return f, nil } @@ -762,14 +762,14 @@ func decodeLog(rawBytes []byte, offset, length int) (f *FastLog, err error) { } pos = values[2] + 4 } else { - return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + return nil, fmt.Errorf("unexpected mode: %d", mode) } } if !findTime { - return nil, errors.New("time is not found in log from pos " + strconv.FormatInt(int64(pos), 10)) + return nil, fmt.Errorf("time is not found in log from pos %d", pos) } if pos != endOffset { - return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + return nil, fmt.Errorf("unable to parse pos %d", pos) } return f, nil } @@ -829,7 +829,7 @@ func decodeLogGroup(rawBytes []byte, offset, length int) (f *FastLogGroup, err e } } if pos != endOffset { - return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + return nil, fmt.Errorf("unable to parse pos %d", pos) } return f, nil } @@ -872,11 +872,11 @@ func decodeLogGroups(data []byte, rawSize int) (logGroups []*FastLogGroup, err e } else if mode == 5 { pos += 4 } else { - return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + return nil, fmt.Errorf("unexpected mode: %d", mode) } } if pos != rawSize { - return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10) + " rawSize " + strconv.FormatInt(int64(rawSize), 10)) + return nil, fmt.Errorf("unable to parse pos %d rawSize %d", pos, rawSize) } return logGroups, nil } From b7b61296b89224b8d0639625d335764c955674b3 Mon Sep 17 00:00:00 2001 From: "ke.like" Date: Tue, 16 Apr 2024 14:46:53 +0800 Subject: [PATCH 4/7] refine --- log_store.go | 218 ++++++++++++++++++++++----------------------------- model.go | 7 +- 2 files changed, 97 insertions(+), 128 deletions(-) diff --git a/log_store.go b/log_store.go index f64e8760..d68d8182 100644 --- a/log_store.go +++ b/log_store.go @@ -2,7 +2,6 @@ package sls import ( "encoding/json" - "errors" "fmt" "time" @@ -593,10 +592,12 @@ func (s *LogStore) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err err return nil, err } r = &PullLogsResponse{ - NextCursor: plm.NextCursor, - RawSize: plm.RawSize, - RawSizeBeforeQuery: plm.RawSizeBeforeQuery, - RawDataCountBeforeQuery: plm.RawDataCountBeforeQuery, + PullLogMeta: PullLogMeta{ + NextCursor: plm.NextCursor, + RawSize: plm.RawSize, + RawSizeBeforeQuery: plm.RawSizeBeforeQuery, + RawDataCountBeforeQuery: plm.RawDataCountBeforeQuery, + }, } r.LogGroups, err = decodeLogGroups(out, r.RawSize) if err != nil { @@ -605,68 +606,58 @@ func (s *LogStore) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err err return r, nil } -func decodeVarInt32(data []byte, pos, rawSize int) ([]int, error) { - values := []int{0, 0, 0} - shift := 0 +func decodeVarInt32(data []byte, pos, rawSize int) (int, int, error) { + shift, x := 0, 0 var b int for i := pos; i < rawSize; i++ { b = int(data[i]) & 0xff - values[1] |= (b & 127) << shift + x |= (b & 127) << shift shift += 7 if (b & 128) == 0 { - values[2] = i + 1 - values[0] = 1 - break + return x, i + 1, nil } } - if values[0] == 0 { - return nil, errors.New("error decoding varint32 from pos " + strconv.FormatInt(int64(pos), 10)) - } - return values, nil + return 0, 0, fmt.Errorf("error decoding varint32 from pos %d", pos) } func decodeTag(rawBytes []byte, offset, length int) (f *FastLogTag, err error) { f = &FastLogTag{} pos, endOffset := offset, offset+length - var values []int - var mode, index int + var mode, index, val int for pos < endOffset { - values, err = decodeVarInt32(rawBytes, pos, endOffset) + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - pos = values[2] - mode = values[1] & 0x7 - index = values[1] >> 3 - if mode == 0 { - values, err = decodeVarInt32(rawBytes, pos, endOffset) + mode, index = val&0x7, val>>3 + switch mode { + case 0: + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - pos = values[2] - } else if mode == 1 { - pos = values[2] + 8 - } else if mode == 2 { - pos = values[2] - values, err = decodeVarInt32(rawBytes, pos, endOffset) + case 1: + pos += 8 + case 2: + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - pos = values[2] + values[1] if index == 1 { - f.Key = string(rawBytes[values[2] : values[2]+values[1]]) + f.Key = string(rawBytes[pos : pos+val]) } else if index == 2 { - f.Value = string(rawBytes[values[2] : values[2]+values[1]]) + f.Value = string(rawBytes[pos : pos+val]) } - } else if mode == 5 { - pos = values[2] + 4 - } else { - return nil, fmt.Errorf("unexpected mode: %d", mode) + pos += val + case 5: + pos += 4 + default: + return nil, fmt.Errorf("error parsing tag, unexpected mode: %d", mode) } } // TODO check key and value if pos != endOffset { - return nil, fmt.Errorf("unable to parse pos %d", pos) + return nil, fmt.Errorf("error parsing tag, pos %d, end %d", pos, endOffset) } return f, nil } @@ -674,44 +665,41 @@ func decodeTag(rawBytes []byte, offset, length int) (f *FastLogTag, err error) { func decodeLogContent(rawBytes []byte, offset, length int) (f *FastLogContent, err error) { f = &FastLogContent{} pos, endOffset := offset, offset+length - var values []int - var mode, index int + var mode, index, val int for pos < endOffset { - values, err = decodeVarInt32(rawBytes, pos, endOffset) + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - mode = values[1] & 0x7 - index = values[1] >> 3 - pos = values[2] - if mode == 0 { - values, err = decodeVarInt32(rawBytes, pos, endOffset) + mode, index = val&0x7, val>>3 + switch mode { + case 0: + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - pos = values[2] - } else if mode == 1 { + case 1: pos += 8 - } else if mode == 2 { - values, err = decodeVarInt32(rawBytes, pos, endOffset) + case 2: + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - pos = values[2] + values[1] if index == 1 { - f.Key = string(rawBytes[values[2] : values[2]+values[1]]) + f.Key = string(rawBytes[pos : pos+val]) } else if index == 2 { - f.Value = string(rawBytes[values[2] : values[2]+values[1]]) + f.Value = string(rawBytes[pos : pos+val]) } - } else if mode == 5 { + pos += val + case 5: pos += 4 - } else { - return nil, fmt.Errorf("unexpected mode: %d", mode) + default: + return nil, fmt.Errorf("error parsing content, unexpected mode: %d", mode) } } // TODO check key and value if pos != endOffset { - return nil, fmt.Errorf("unable to parse pos %d", pos) + return nil, fmt.Errorf("error parsing content, pos %d, end %d", pos, endOffset) } return f, nil } @@ -719,57 +707,53 @@ func decodeLogContent(rawBytes []byte, offset, length int) (f *FastLogContent, e func decodeLog(rawBytes []byte, offset, length int) (f *FastLog, err error) { f = &FastLog{} pos, endOffset := offset, offset+length - var values []int - var mode, index int + var mode, index, val int findTime := false for pos < endOffset { - values, err = decodeVarInt32(rawBytes, pos, endOffset) + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - mode = values[1] & 0x7 - index = values[1] >> 3 - if mode == 0 { - pos = values[2] - values, err = decodeVarInt32(rawBytes, pos, endOffset) + mode, index = val&0x7, val>>3 + switch mode { + case 0: + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - pos = values[2] if index == 1 { - f.Time = uint32(values[1]) + f.Time = uint32(val) findTime = true } - } else if mode == 1 { - pos = values[2] + 8 - } else if mode == 2 { - pos = values[2] - values, err = decodeVarInt32(rawBytes, pos, endOffset) + case 1: + pos += 8 + case 2: + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - pos = values[2] + values[1] if index == 2 { - fc, err := decodeLogContent(rawBytes, values[2], values[1]) + fc, err := decodeLogContent(rawBytes, pos, val) if err != nil { return nil, err } f.Contents = append(f.Contents, fc) } - } else if mode == 5 { + pos += val + case 5: if index == 4 { - f.TimeNs = uint32(rawBytes[values[2]])&255 | (uint32(rawBytes[values[2]+1])&255)<<8 | (uint32(rawBytes[values[2]+2])&255)<<16 | (uint32(rawBytes[values[2]+3])&255)<<24 + f.TimeNs = uint32(rawBytes[pos])&255 | (uint32(rawBytes[pos+1])&255)<<8 | (uint32(rawBytes[pos+2])&255)<<16 | (uint32(rawBytes[pos+3])&255)<<24 } - pos = values[2] + 4 - } else { - return nil, fmt.Errorf("unexpected mode: %d", mode) + pos += 4 + default: + return nil, fmt.Errorf("error parsing log, unexpected mode: %d", mode) } } if !findTime { return nil, fmt.Errorf("time is not found in log from pos %d", pos) } if pos != endOffset { - return nil, fmt.Errorf("unable to parse pos %d", pos) + return nil, fmt.Errorf("error parsing log, pos %d, end %d", pos, endOffset) } return f, nil } @@ -777,59 +761,50 @@ func decodeLog(rawBytes []byte, offset, length int) (f *FastLog, err error) { func decodeLogGroup(rawBytes []byte, offset, length int) (f *FastLogGroup, err error) { f = &FastLogGroup{} pos, endOffset := offset, offset+length - var values []int - var mode, index int + var mode, index, val int for pos < endOffset { - values, err = decodeVarInt32(rawBytes, pos, endOffset) + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - mode = values[1] & 0x7 - index = values[1] >> 3 - if mode == 0 { - pos = values[2] - values, err = decodeVarInt32(rawBytes, pos, endOffset) + mode, index = val&0x7, val>>3 + switch mode { + case 0: + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } - pos = values[2] - } else if mode == 1 { - pos = values[2] + 8 - } else if mode == 2 { - pos = values[2] - values, err = decodeVarInt32(rawBytes, pos, endOffset) + case 1: + pos += 8 + case 2: + val, pos, err = decodeVarInt32(rawBytes, pos, endOffset) if err != nil { return nil, err } switch index { - case 1: //logs - log, err := decodeLog(rawBytes, values[2], values[1]) + case 1: + log, err := decodeLog(rawBytes, pos, val) if err != nil { return nil, err } f.Logs = append(f.Logs, log) - break case 3: - f.Topic = string(rawBytes[values[2] : values[2]+values[1]]) - break + f.Topic = string(rawBytes[pos : pos+val]) case 4: - f.Source = string(rawBytes[values[2] : values[2]+values[1]]) - break - case 6: //tags - tag, err := decodeTag(rawBytes, values[2], values[1]) + f.Source = string(rawBytes[pos : pos+val]) + case 6: + tag, err := decodeTag(rawBytes, pos, val) if err != nil { return nil, err } f.LogTags = append(f.LogTags, tag) - break default: - break } - pos = values[2] + values[1] + pos += val } } if pos != endOffset { - return nil, fmt.Errorf("unable to parse pos %d", pos) + return nil, fmt.Errorf("error parsing log group, pos %d, end %d", pos, endOffset) } return f, nil } @@ -837,41 +812,38 @@ func decodeLogGroup(rawBytes []byte, offset, length int) (f *FastLogGroup, err e // decodeLogGroups decodes logs binary data returned by GetLogsBytes API func decodeLogGroups(data []byte, rawSize int) (logGroups []*FastLogGroup, err error) { pos, rawSize := 0, len(data) - var mode, index int - var values []int + var mode, index, val int logGroups = []*FastLogGroup{} for pos < rawSize { - values, err = decodeVarInt32(data, pos, rawSize) + val, pos, err = decodeVarInt32(data, pos, rawSize) if err != nil { return nil, err } - pos = values[2] - mode = values[1] & 0x7 - index = values[1] >> 3 - if mode == 0 { - values, err = decodeVarInt32(data, pos, rawSize) + mode, index = val&0x7, val>>3 + switch mode { + case 0: + val, pos, err = decodeVarInt32(data, pos, rawSize) if err != nil { return nil, err } - pos = values[2] - } else if mode == 1 { + case 1: pos += 8 - } else if mode == 2 { - values, err = decodeVarInt32(data, pos, rawSize) + case 2: + val, pos, err = decodeVarInt32(data, pos, rawSize) if err != nil { return nil, err } if index == 1 { - f, err := decodeLogGroup(data, values[2], values[1]) + f, err := decodeLogGroup(data, pos, val) if err != nil { return nil, err } logGroups = append(logGroups, f) } - pos = values[1] + values[2] - } else if mode == 5 { + pos += val + case 5: pos += 4 - } else { + default: return nil, fmt.Errorf("unexpected mode: %d", mode) } } diff --git a/model.go b/model.go index 48678063..df5b9318 100644 --- a/model.go +++ b/model.go @@ -100,11 +100,8 @@ type FastLogGroup struct { } type PullLogsResponse struct { - NextCursor string - RawSize int - RawSizeBeforeQuery int - RawDataCountBeforeQuery int - LogGroups []*FastLogGroup + PullLogMeta + LogGroups []*FastLogGroup } type SingleHistogram struct { From d5e294d7a5bf8f88c280ce056ea1b0fe3afef786 Mon Sep 17 00:00:00 2001 From: "ke.like" Date: Tue, 16 Apr 2024 15:28:02 +0800 Subject: [PATCH 5/7] refine --- log_store.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/log_store.go b/log_store.go index d68d8182..ed86d24d 100644 --- a/log_store.go +++ b/log_store.go @@ -150,7 +150,7 @@ func (s *LogStore) PutRawLog(rawLogData []byte) (err error) { var outLen int switch s.putLogCompressType { case Compress_LZ4: - // Compresse body with lz4 + // Compress body with lz4 out = make([]byte, lz4.CompressBlockBound(len(rawLogData))) var hashTable [1 << 16]int n, err := lz4.CompressBlock(rawLogData, out, hashTable[:]) @@ -212,7 +212,7 @@ func (s *LogStore) PostRawLogs(body []byte, hashKey *string) (err error) { var outLen int switch s.putLogCompressType { case Compress_LZ4: - // Compresse body with lz4 + // Compress body with lz4 out = make([]byte, lz4.CompressBlockBound(len(body))) var hashTable [1 << 16]int n, err := lz4.CompressBlock(body, out, hashTable[:]) @@ -599,7 +599,7 @@ func (s *LogStore) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err err RawDataCountBeforeQuery: plm.RawDataCountBeforeQuery, }, } - r.LogGroups, err = decodeLogGroups(out, r.RawSize) + r.LogGroups, err = DecodeLogGroups(out, r.RawSize) if err != nil { return nil, err } @@ -809,8 +809,8 @@ func decodeLogGroup(rawBytes []byte, offset, length int) (f *FastLogGroup, err e return f, nil } -// decodeLogGroups decodes logs binary data returned by GetLogsBytes API -func decodeLogGroups(data []byte, rawSize int) (logGroups []*FastLogGroup, err error) { +// DecodeLogGroups decodes logs binary data returned by GetLogsBytes API +func DecodeLogGroups(data []byte, rawSize int) (logGroups []*FastLogGroup, err error) { pos, rawSize := 0, len(data) var mode, index, val int logGroups = []*FastLogGroup{} From 9e855e68eb7ded4a58b24b180e3f03e882e95dc6 Mon Sep 17 00:00:00 2001 From: "ke.like" Date: Tue, 16 Apr 2024 16:43:06 +0800 Subject: [PATCH 6/7] refine --- consumer/consumer_client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index 0209620b..c3d61292 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -140,7 +140,6 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err } func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (r *sls.PullLogsResponse, err error) { - fmt.Println("PullLogs") plr := &sls.PullLogRequest{ Project: consumer.option.Project, Logstore: consumer.option.Logstore, From 0b12864787a11f31d695a90f9bfb65bef88b82c4 Mon Sep 17 00:00:00 2001 From: "ke.like" Date: Fri, 19 Apr 2024 12:01:32 +0800 Subject: [PATCH 7/7] add getter --- model.go | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/model.go b/model.go index df5b9318..936adb42 100644 --- a/model.go +++ b/model.go @@ -81,17 +81,37 @@ type FastLogContent struct { Value string } +func (f *FastLogContent) GetKey() string { + return f.Key +} + +func (f *FastLogContent) GetValue() string { + return f.Value +} + type FastLog struct { Time uint32 TimeNs uint32 Contents []*FastLogContent } +func (m *FastLog) GetContents() []*FastLogContent { + return m.Contents +} + type FastLogTag struct { Key string Value string } +func (f *FastLogTag) GetKey() string { + return f.Key +} + +func (f *FastLogTag) GetValue() string { + return f.Value +} + type FastLogGroup struct { Logs []*FastLog LogTags []*FastLogTag @@ -99,11 +119,31 @@ type FastLogGroup struct { Topic string } +func (f *FastLogGroup) GetLogs() []*FastLog { + return f.Logs +} + +func (f *FastLogGroup) GetLogTags() []*FastLogTag { + return f.LogTags +} + +func (f *FastLogGroup) GetSource() string { + return f.Source +} + +func (f *FastLogGroup) GetTopic() string { + return f.Topic +} + type PullLogsResponse struct { PullLogMeta LogGroups []*FastLogGroup } +func (r *PullLogsResponse) GetLogGroups() []*FastLogGroup { + return r.LogGroups +} + type SingleHistogram struct { Progress string `json:"progress"` Count int64 `json:"count"`