Skip to content

Commit

Permalink
Parse protobuf with custom parser
Browse files Browse the repository at this point in the history
  • Loading branch information
ke.like committed Apr 15, 2024
1 parent 7b9b95d commit 8b9e80c
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 2 deletions.
3 changes: 2 additions & 1 deletion client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,9 @@ type ClientInterface interface {
// @note if you want to pull logs continuous, set endCursor = ""
PullLogs(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error)
// Deprecated: Use PullLogsWithQuery instead.
// Deprecated: Use PullLogsV3 instead.
PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error)
PullLogsV3(plr *PullLogRequest) (*PullLogsResponse, error)
PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error)
// GetHistograms query logs with [from, to) time range
GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error)
Expand Down
5 changes: 5 additions & 0 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor s
return ls.PullLogsV2(plr)
}

func (c *Client) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.PullLogsV3(plr)
}

func (c *Client) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.PullLogsWithQuery(plr)
Expand Down
295 changes: 295 additions & 0 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sls

import (
"encoding/json"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -586,6 +587,300 @@ func (s *LogStore) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm
return
}

func (s *LogStore) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err error) {
out, plm, err := s.GetLogsBytesWithQuery(plr)
if err != nil {
return nil, err
}
r = &PullLogsResponse{
NextCursor: plm.NextCursor,
RawSize: plm.RawSize,
RawSizeBeforeQuery: plm.RawSizeBeforeQuery,
RawDataCountBeforeQuery: plm.RawDataCountBeforeQuery,
}
r.LogGroups, err = decodeLogGroups(out, r.RawSize)
if err != nil {
return nil, err
}
return r, nil
}

func decodeVarInt32(data []byte, pos, rawSize int) ([]int, error) {
values := []int{0, 0, 0}
shift := 0
var b int
for i := pos; i < rawSize; i++ {
b = int(data[i]) & 0xff
values[1] |= (b & 127) << shift
shift += 7
if (b & 128) == 0 {
values[2] = i + 1
values[0] = 1
break
}
}
if values[0] == 0 {
return nil, errors.New("error decoding varint32 from pos " + strconv.FormatInt(int64(pos), 10))
}
return values, nil
}

func decodeTag(rawBytes []byte, offset, length int) (f *FastLogTag, err error) {
f = &FastLogTag{}
pos, endOffset := offset, offset+length
var values []int
var mode, index int
for pos < endOffset {
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
pos = values[2]
mode = values[1] & 0x7
index = values[1] >> 3
if mode == 0 {
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
pos = values[2]
} else if mode == 1 {
pos = values[2] + 8
} else if mode == 2 {
pos = values[2]
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
pos = values[2] + values[1]
if index == 1 {
f.Key = string(rawBytes[values[2] : values[2]+values[1]])
} else if index == 2 {
f.Value = string(rawBytes[values[2] : values[2]+values[1]])
}
} else if mode == 5 {
pos = values[2] + 4
} else {
return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10))
}
}
// TODO check key and value
if pos != endOffset {
return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10))
}
return f, nil
}

func decodeLogContent(rawBytes []byte, offset, length int) (f *FastLogContent, err error) {
f = &FastLogContent{}
pos, endOffset := offset, offset+length
var values []int
var mode, index int
for pos < endOffset {
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
mode = values[1] & 0x7
index = values[1] >> 3
pos = values[2]
if mode == 0 {
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
pos = values[2]
} else if mode == 1 {
pos += 8
} else if mode == 2 {
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
pos = values[2] + values[1]
if index == 1 {
f.Key = string(rawBytes[values[2] : values[2]+values[1]])
} else if index == 2 {
f.Value = string(rawBytes[values[2] : values[2]+values[1]])
}
} else if mode == 5 {
pos += 4
} else {
return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10))
}
}
// TODO check key and value
if pos != endOffset {
return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10))
}
return f, nil
}

func decodeLog(rawBytes []byte, offset, length int) (f *FastLog, err error) {
f = &FastLog{}
pos, endOffset := offset, offset+length
var values []int
var mode, index int
findTime := false
for pos < endOffset {
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
mode = values[1] & 0x7
index = values[1] >> 3
if mode == 0 {
pos = values[2]
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
pos = values[2]
if index == 1 {
f.Time = uint32(values[1])
findTime = true
}
} else if mode == 1 {
pos = values[2] + 8
} else if mode == 2 {
pos = values[2]
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
pos = values[2] + values[1]
if index == 2 {
fc, err := decodeLogContent(rawBytes, values[2], values[1])
if err != nil {
return nil, err
}
f.Contents = append(f.Contents, fc)
}
} else if mode == 5 {
if index == 4 {
f.TimeNsPart = uint32(rawBytes[values[2]])&255 | (uint32(rawBytes[values[2]+1])&255)<<8 | (uint32(rawBytes[values[2]+2])&255)<<16 | (uint32(rawBytes[values[2]+3])&255)<<24
}
pos = values[2] + 4
} else {
return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10))
}
}
if !findTime {
return nil, errors.New("time is not found in log from pos " + strconv.FormatInt(int64(pos), 10))
}
if pos != endOffset {
return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10))
}
return f, nil
}

func decodeLogGroup(rawBytes []byte, offset, length int) (f *FastLogGroup, err error) {
f = &FastLogGroup{}
pos, endOffset := offset, offset+length
var values []int
var mode, index int
for pos < endOffset {
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
mode = values[1] & 0x7
index = values[1] >> 3
if mode == 0 {
pos = values[2]
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
pos = values[2]
} else if mode == 1 {
pos = values[2] + 8
} else if mode == 2 {
pos = values[2]
values, err = decodeVarInt32(rawBytes, pos, endOffset)
if err != nil {
return nil, err
}
switch index {
case 1: //logs
log, err := decodeLog(rawBytes, values[2], values[1])
if err != nil {
return nil, err
}
f.Logs = append(f.Logs, log)
break
case 3:
f.Topic = string(rawBytes[values[2] : values[2]+values[1]])
break
case 4:
f.Source = string(rawBytes[values[2] : values[2]+values[1]])
break
case 6: //tags
tag, err := decodeTag(rawBytes, values[2], values[1])
if err != nil {
return nil, err
}
f.Tags = append(f.Tags, tag)
break
default:
break
}
pos = values[2] + values[1]
}
}
if pos != endOffset {
return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10))
}
return f, nil
}

// decodeLogGroups decodes logs binary data returned by GetLogsBytes API
func decodeLogGroups(data []byte, rawSize int) (logGroups []*FastLogGroup, err error) {
pos, rawSize := 0, len(data)
var mode, index int
var values []int
logGroups = []*FastLogGroup{}
for pos < rawSize {
values, err = decodeVarInt32(data, pos, rawSize)
if err != nil {
return nil, err
}
pos = values[2]
mode = values[1] & 0x7
index = values[1] >> 3
if mode == 0 {
values, err = decodeVarInt32(data, pos, rawSize)
if err != nil {
return nil, err
}
pos = values[2]
} else if mode == 1 {
pos += 8
} else if mode == 2 {
values, err = decodeVarInt32(data, pos, rawSize)
if err != nil {
return nil, err
}
if index == 1 {
f, err := decodeLogGroup(data, values[2], values[1])
if err != nil {
return nil, err
}
logGroups = append(logGroups, f)
}
pos = values[1] + values[2]
} else if mode == 5 {
pos += 4
} else {
return nil, errors.New("unexpected mode: " + strconv.FormatInt(int64(mode), 10))
}
}
if pos != rawSize {
return nil, errors.New("unable to parse pos " + strconv.FormatInt(int64(pos), 10) + " rawSize " + strconv.FormatInt(int64(rawSize), 10))
}
return logGroups, nil
}

// GetHistograms query logs with [from, to) time range
func (s *LogStore) GetHistograms(topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {

Expand Down
33 changes: 32 additions & 1 deletion model.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,45 @@ type PullLogMeta struct {
RawDataCountBeforeQuery int
}

// GetHistogramsResponse defines response from GetHistograms call
type FastLogContent struct {
Key string
Value string
}

type FastLog struct {
Time uint32
TimeNsPart uint32
Contents []*FastLogContent
}

type FastLogTag struct {
Key string
Value string
}

type FastLogGroup struct {
Logs []*FastLog
Tags []*FastLogTag
Source string
Topic string
}

type PullLogsResponse struct {
NextCursor string
RawSize int
RawSizeBeforeQuery int
RawDataCountBeforeQuery int
LogGroups []*FastLogGroup
}

type SingleHistogram struct {
Progress string `json:"progress"`
Count int64 `json:"count"`
From int64 `json:"from"`
To int64 `json:"to"`
}

// GetHistogramsResponse defines response from GetHistograms call
type GetHistogramsResponse struct {
Progress string `json:"progress"`
Count int64 `json:"count"`
Expand Down
10 changes: 10 additions & 0 deletions token_auto_update_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,16 @@ func (c *TokenAutoUpdateClient) PullLogsV2(plr *PullLogRequest) (gl *LogGroupLis
return
}

func (c *TokenAutoUpdateClient) PullLogsV3(plr *PullLogRequest) (r *PullLogsResponse, err error) {
for i := 0; i < c.maxTryTimes; i++ {
r, err = c.logClient.PullLogsV3(plr)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) {
for i := 0; i < c.maxTryTimes; i++ {
gl, plm, err = c.logClient.PullLogsWithQuery(plr)
Expand Down

0 comments on commit 8b9e80c

Please sign in to comment.