From 8b9e80c1806467def95b72da8f52b26240689426 Mon Sep 17 00:00:00 2001 From: "ke.like" Date: Mon, 15 Apr 2024 19:42:14 +0800 Subject: [PATCH] Parse protobuf with custom parser --- client_interface.go | 3 +- client_store.go | 5 + log_store.go | 295 ++++++++++++++++++++++++++++++++++++ model.go | 33 +++- token_auto_update_client.go | 10 ++ 5 files changed, 344 insertions(+), 2 deletions(-) diff --git a/client_interface.go b/client_interface.go index 40321773..0b1b8b60 100644 --- a/client_interface.go +++ b/client_interface.go @@ -280,8 +280,9 @@ type ClientInterface interface { // @note if you want to pull logs continuous, set endCursor = "" PullLogs(project, logstore string, shardID int, cursor, endCursor string, logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) - // Deprecated: Use PullLogsWithQuery instead. + // Deprecated: Use PullLogsV3 instead. PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) + PullLogsV3(plr *PullLogRequest) (*PullLogsResponse, error) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) // GetHistograms query logs with [from, to) time range GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) diff --git a/client_store.go b/client_store.go index cd0c9efe..751205f9 100644 --- a/client_store.go +++ b/client_store.go @@ -228,6 +228,11 @@ func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor s return ls.PullLogsV2(plr) } +func (c *Client) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err error) { + ls := convertLogstore(c, plr.Project, plr.Logstore) + return ls.PullLogsV3(plr) +} + func (c *Client) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) { ls := convertLogstore(c, plr.Project, plr.Logstore) return ls.PullLogsWithQuery(plr) diff --git a/log_store.go b/log_store.go index 6b6f251a..0a0b8750 100644 --- a/log_store.go +++ b/log_store.go @@ -2,6 +2,7 @@ package sls import ( "encoding/json" + "errors" "fmt" "time" @@ -586,6 +587,300 @@ func (s *LogStore) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm return } +func (s *LogStore) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err error) { + out, plm, err := s.GetLogsBytesWithQuery(plr) + if err != nil { + return nil, err + } + r = &PullLogsResponse{ + NextCursor: plm.NextCursor, + RawSize: plm.RawSize, + RawSizeBeforeQuery: plm.RawSizeBeforeQuery, + RawDataCountBeforeQuery: plm.RawDataCountBeforeQuery, + } + r.LogGroups, err = decodeLogGroups(out, r.RawSize) + if err != nil { + return nil, err + } + return r, nil +} + +func decodeVarInt32(data []byte, pos, rawSize int) ([]int, error) { + values := []int{0, 0, 0} + shift := 0 + var b int + for i := pos; i < rawSize; i++ { + b = int(data[i]) & 0xff + values[1] |= (b & 127) << shift + shift += 7 + if (b & 128) == 0 { + values[2] = i + 1 + values[0] = 1 + break + } + } + if values[0] == 0 { + return nil, errors.New("error decoding varint32 from pos " + strconv.FormatInt(int64(pos), 10)) + } + return values, nil +} + +func decodeTag(rawBytes []byte, offset, length int) (f *FastLogTag, err error) { + f = &FastLogTag{} + pos, endOffset := offset, offset+length + var values []int + var mode, index int + for pos < endOffset { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + mode = values[1] & 0x7 + index = values[1] >> 3 + if mode == 0 { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + } else if mode == 1 { + pos = values[2] + 8 + } else if mode == 2 { + pos = values[2] + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + values[1] + if index == 1 { + f.Key = string(rawBytes[values[2] : values[2]+values[1]]) + } else if index == 2 { + f.Value = string(rawBytes[values[2] : values[2]+values[1]]) + } + } else if mode == 5 { + pos = values[2] + 4 + } else { + return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + } + } + // TODO check key and value + if pos != endOffset { + return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + } + return f, nil +} + +func decodeLogContent(rawBytes []byte, offset, length int) (f *FastLogContent, err error) { + f = &FastLogContent{} + pos, endOffset := offset, offset+length + var values []int + var mode, index int + for pos < endOffset { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + mode = values[1] & 0x7 + index = values[1] >> 3 + pos = values[2] + if mode == 0 { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + } else if mode == 1 { + pos += 8 + } else if mode == 2 { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + values[1] + if index == 1 { + f.Key = string(rawBytes[values[2] : values[2]+values[1]]) + } else if index == 2 { + f.Value = string(rawBytes[values[2] : values[2]+values[1]]) + } + } else if mode == 5 { + pos += 4 + } else { + return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + } + } + // TODO check key and value + if pos != endOffset { + return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + } + return f, nil +} + +func decodeLog(rawBytes []byte, offset, length int) (f *FastLog, err error) { + f = &FastLog{} + pos, endOffset := offset, offset+length + var values []int + var mode, index int + findTime := false + for pos < endOffset { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + mode = values[1] & 0x7 + index = values[1] >> 3 + if mode == 0 { + pos = values[2] + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + if index == 1 { + f.Time = uint32(values[1]) + findTime = true + } + } else if mode == 1 { + pos = values[2] + 8 + } else if mode == 2 { + pos = values[2] + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + values[1] + if index == 2 { + fc, err := decodeLogContent(rawBytes, values[2], values[1]) + if err != nil { + return nil, err + } + f.Contents = append(f.Contents, fc) + } + } else if mode == 5 { + if index == 4 { + f.TimeNsPart = uint32(rawBytes[values[2]])&255 | (uint32(rawBytes[values[2]+1])&255)<<8 | (uint32(rawBytes[values[2]+2])&255)<<16 | (uint32(rawBytes[values[2]+3])&255)<<24 + } + pos = values[2] + 4 + } else { + return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + } + } + if !findTime { + return nil, errors.New("time is not found in log from pos " + strconv.FormatInt(int64(pos), 10)) + } + if pos != endOffset { + return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + } + return f, nil +} + +func decodeLogGroup(rawBytes []byte, offset, length int) (f *FastLogGroup, err error) { + f = &FastLogGroup{} + pos, endOffset := offset, offset+length + var values []int + var mode, index int + for pos < endOffset { + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + mode = values[1] & 0x7 + index = values[1] >> 3 + if mode == 0 { + pos = values[2] + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + pos = values[2] + } else if mode == 1 { + pos = values[2] + 8 + } else if mode == 2 { + pos = values[2] + values, err = decodeVarInt32(rawBytes, pos, endOffset) + if err != nil { + return nil, err + } + switch index { + case 1: //logs + log, err := decodeLog(rawBytes, values[2], values[1]) + if err != nil { + return nil, err + } + f.Logs = append(f.Logs, log) + break + case 3: + f.Topic = string(rawBytes[values[2] : values[2]+values[1]]) + break + case 4: + f.Source = string(rawBytes[values[2] : values[2]+values[1]]) + break + case 6: //tags + tag, err := decodeTag(rawBytes, values[2], values[1]) + if err != nil { + return nil, err + } + f.Tags = append(f.Tags, tag) + break + default: + break + } + pos = values[2] + values[1] + } + } + if pos != endOffset { + return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10)) + } + return f, nil +} + +// decodeLogGroups decodes logs binary data returned by GetLogsBytes API +func decodeLogGroups(data []byte, rawSize int) (logGroups []*FastLogGroup, err error) { + pos, rawSize := 0, len(data) + var mode, index int + var values []int + logGroups = []*FastLogGroup{} + for pos < rawSize { + values, err = decodeVarInt32(data, pos, rawSize) + if err != nil { + return nil, err + } + pos = values[2] + mode = values[1] & 0x7 + index = values[1] >> 3 + if mode == 0 { + values, err = decodeVarInt32(data, pos, rawSize) + if err != nil { + return nil, err + } + pos = values[2] + } else if mode == 1 { + pos += 8 + } else if mode == 2 { + values, err = decodeVarInt32(data, pos, rawSize) + if err != nil { + return nil, err + } + if index == 1 { + f, err := decodeLogGroup(data, values[2], values[1]) + if err != nil { + return nil, err + } + logGroups = append(logGroups, f) + } + pos = values[1] + values[2] + } else if mode == 5 { + pos += 4 + } else { + return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10)) + } + } + if pos != rawSize { + return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10) + " rawSize " + strconv.FormatInt(int64(rawSize), 10)) + } + return logGroups, nil +} + // GetHistograms query logs with [from, to) time range func (s *LogStore) GetHistograms(topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) { diff --git a/model.go b/model.go index 5e6008c4..8a33c844 100644 --- a/model.go +++ b/model.go @@ -76,7 +76,37 @@ type PullLogMeta struct { RawDataCountBeforeQuery int } -// GetHistogramsResponse defines response from GetHistograms call +type FastLogContent struct { + Key string + Value string +} + +type FastLog struct { + Time uint32 + TimeNsPart uint32 + Contents []*FastLogContent +} + +type FastLogTag struct { + Key string + Value string +} + +type FastLogGroup struct { + Logs []*FastLog + Tags []*FastLogTag + Source string + Topic string +} + +type PullLogsResponse struct { + NextCursor string + RawSize int + RawSizeBeforeQuery int + RawDataCountBeforeQuery int + LogGroups []*FastLogGroup +} + type SingleHistogram struct { Progress string `json:"progress"` Count int64 `json:"count"` @@ -84,6 +114,7 @@ type SingleHistogram struct { To int64 `json:"to"` } +// GetHistogramsResponse defines response from GetHistograms call type GetHistogramsResponse struct { Progress string `json:"progress"` Count int64 `json:"count"` diff --git a/token_auto_update_client.go b/token_auto_update_client.go index ed065ca4..2857cb53 100644 --- a/token_auto_update_client.go +++ b/token_auto_update_client.go @@ -871,6 +871,16 @@ func (c *TokenAutoUpdateClient) PullLogsV2(plr *PullLogRequest) (gl *LogGroupLis return } +func (c *TokenAutoUpdateClient) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err error) { + for i := 0; i < c.maxTryTimes; i++ { + r, err = c.logClient.PullLogsV3(plr) + if !c.processError(err) { + return + } + } + return +} + func (c *TokenAutoUpdateClient) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) { for i := 0; i < c.maxTryTimes; i++ { gl, plm, err = c.logClient.PullLogsWithQuery(plr)