Skip to content

Commit

Permalink
add metricstore send support
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanLjp committed Mar 27, 2024
1 parent b22db82 commit cd4fc76
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 95 deletions.
3 changes: 2 additions & 1 deletion client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ func (c *Client) PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKe

func (c *Client) PostMetricStoreLogs(project, logstore string, lg *LogGroup) (err error) {
ls := convertLogstore(c, project, logstore)
return ls.PostMetricStoreLogs(lg)
ls.MetricStore = true
return ls.PutLogs(lg)
}

// PostRawLogWithCompressType put raw log data to log service, no marshal
Expand Down
74 changes: 10 additions & 64 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type LogStore struct {
putLogCompressType int
EncryptConf *EncryptConf `json:"encrypt_conf,omitempty"`
ProductType string `json:"productType,omitempty"`
MetricStore bool `json:"metric_store"`
}

// Shard defines shard struct
Expand Down Expand Up @@ -303,8 +304,12 @@ func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
}
outLen = len(out)
}

uri := fmt.Sprintf("/logstores/%v", s.Name)
var uri string
if s.MetricStore {
uri = s.getMetricStoreURL()
} else {
uri = fmt.Sprintf("/logstores/%v", s.Name)
}
r, err := request(s.project, "POST", uri, h, out[:outLen])
if err != nil {
return NewClientError(err)
Expand All @@ -321,67 +326,8 @@ func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
return nil
}

// PostMetricStoreLogs put logs into metricstore
// The callers should transform user logs into LogGroup.
func (s *LogStore) PostMetricStoreLogs(lg *LogGroup) (err error) {
if len(lg.Logs) == 0 {
// empty log group
return nil
}

body, err := proto.Marshal(lg)
if err != nil {
return NewClientError(err)
}

var out []byte
var h map[string]string
var outLen int
switch s.putLogCompressType {
case Compress_LZ4:
// Compresse body with lz4
out = make([]byte, lz4.CompressBlockBound(len(body)))
var hashTable [1 << 16]int
n, err := lz4.CompressBlock(body, out, hashTable[:])
if err != nil {
return NewClientError(err)
}
// copy incompressible data as lz4 format
if n == 0 {
n, _ = copyIncompressible(body, out)
}

h = map[string]string{
"x-log-compresstype": "lz4",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = n
break
case Compress_None:
// no compress
out = body
h = map[string]string{
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = len(out)
}
uri := fmt.Sprintf("/prometheus/%s/%s/api/v1/write", s.project.Name, s.Name)
r, err := request(s.project, "POST", uri, h, out[:outLen])
if err != nil {
return NewClientError(err)
}
defer r.Body.Close()
body, _ = ioutil.ReadAll(r.Body)
if r.StatusCode != http.StatusOK {
err := new(Error)
if jErr := json.Unmarshal(body, err); jErr != nil {
return NewBadResponseError(string(body), r.Header, r.StatusCode)
}
return err
}
return nil
func (s *LogStore) getMetricStoreURL() string {
return fmt.Sprintf("/prometheus/%s/%s/api/v1/write", s.project.Name, s.Name)
}

// PostLogStoreLogs put logs into Shard logstore by hashKey.
Expand All @@ -392,7 +338,7 @@ func (s *LogStore) PostLogStoreLogs(lg *LogGroup, hashKey *string) (err error) {
return nil
}

if hashKey == nil || *hashKey == "" {
if hashKey == nil || *hashKey == "" || s.MetricStore {
// empty hash call PutLogs
return s.PutLogs(lg)
}
Expand Down
20 changes: 10 additions & 10 deletions producer/log_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.L
}
}

func (logAccumulator *LogAccumulator) addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash string, producerBatch *ProducerBatch, log interface{}, callback CallBack, isMetricStore bool) {
func (logAccumulator *LogAccumulator) addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash string, producerBatch *ProducerBatch, log interface{}, callback CallBack) {
totalDataCount := producerBatch.getLogGroupCount() + 1
if int64(producerBatch.totalDataSize) > logAccumulator.producerConfig.MaxBatchSize && producerBatch.totalDataSize < 5242880 && totalDataCount <= logAccumulator.producerConfig.MaxBatchCount {
producerBatch.addLogToLogGroup(log)
Expand All @@ -50,13 +50,13 @@ func (logAccumulator *LogAccumulator) addOrSendProducerBatch(key, project, logst
}
} else {
logAccumulator.innerSendToServer(key, producerBatch)
logAccumulator.createNewProducerBatch(log, callback, key, project, logstore, logTopic, logSource, shardHash, isMetricStore)
logAccumulator.createNewProducerBatch(log, callback, key, project, logstore, logTopic, logSource, shardHash)
}
}

// In this function,Naming with mlog is to avoid conflicts with the introduced kit/log package names.
func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, shardHash, logTopic, logSource string,
logData interface{}, callback CallBack, isMetricStore bool) error {
logData interface{}, callback CallBack) error {
if logAccumulator.shutDownFlag.Load() {
level.Warn(logAccumulator.logger).Log("msg", "Producer has started and shut down and cannot write to new logs")
return errors.New("Producer has started and shut down and cannot write to new logs")
Expand All @@ -70,19 +70,19 @@ func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, s
logSize := int64(GetLogSizeCalculate(mlog))
atomic.AddInt64(&producerBatch.totalDataSize, logSize)
atomic.AddInt64(&logAccumulator.producer.producerLogGroupSize, logSize)
logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, mlog, callback, isMetricStore)
logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, mlog, callback)
} else {
logAccumulator.createNewProducerBatch(mlog, callback, key, project, logstore, logTopic, logSource, shardHash, isMetricStore)
logAccumulator.createNewProducerBatch(mlog, callback, key, project, logstore, logTopic, logSource, shardHash)
}
} else if logList, ok := logData.([]*sls.Log); ok {
if producerBatch, ok := logAccumulator.logGroupData[key]; ok == true {
logListSize := int64(GetLogListSize(logList))
atomic.AddInt64(&producerBatch.totalDataSize, logListSize)
atomic.AddInt64(&logAccumulator.producer.producerLogGroupSize, logListSize)
logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, logList, callback, isMetricStore)
logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, logList, callback)

} else {
logAccumulator.createNewProducerBatch(logList, callback, key, project, logstore, logTopic, logSource, shardHash, isMetricStore)
logAccumulator.createNewProducerBatch(logList, callback, key, project, logstore, logTopic, logSource, shardHash)
}
} else {
level.Error(logAccumulator.logger).Log("msg", "Invalid logType")
Expand All @@ -92,14 +92,14 @@ func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, s

}

func (logAccumulator *LogAccumulator) createNewProducerBatch(logType interface{}, callback CallBack, key, project, logstore, logTopic, logSource, shardHash string, isMetricStore bool) {
func (logAccumulator *LogAccumulator) createNewProducerBatch(logType interface{}, callback CallBack, key, project, logstore, logTopic, logSource, shardHash string) {
level.Debug(logAccumulator.logger).Log("msg", "Create a new ProducerBatch")

if mlog, ok := logType.(*sls.Log); ok {
newProducerBatch := initProducerBatch(mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig, isMetricStore)
newProducerBatch := initProducerBatch(mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
logAccumulator.logGroupData[key] = newProducerBatch
} else if logList, ok := logType.([]*sls.Log); ok {
newProducerBatch := initProducerBatch(logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig, isMetricStore)
newProducerBatch := initProducerBatch(logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
logAccumulator.logGroupData[key] = newProducerBatch
}
}
Expand Down
18 changes: 0 additions & 18 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,24 +170,6 @@ func (producer *Producer) SendLogList(project, logstore, topic, source string, l

}

func (producer *Producer) SendMetricStoreLog(project, logstore, topic, source string, log *sls.Log) error {
err := producer.waitTime()
if err != nil {
return err
}
return producer.logAccumulator.addLogToProducerBatch(project, logstore, "", topic, source, log, nil, true)
}

func (producer *Producer) SendMetricStoreLogList(project, logstore, topic, source string, logList []*sls.Log) (err error) {
err = producer.waitTime()
if err != nil {
return err
}

return producer.logAccumulator.addLogToProducerBatch(project, logstore, "", topic, source, logList, nil, true)

}

func (producer *Producer) SendMetricStoreLogWithCallBack(project, logstore, topic, source string, log *sls.Log, callback CallBack) error {
err := producer.waitTime()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func generatePackId(source string) string {
return ToMd5(srcData)[0:16]
}

func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig, isMetricStore bool) *ProducerBatch {
func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig) *ProducerBatch {
logs := []*sls.Log{}

if log, ok := logData.(*sls.Log); ok {
Expand Down Expand Up @@ -77,7 +77,7 @@ func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logs
logstore: logstore,
result: initResult(),
maxReservedAttempts: config.MaxReservedAttempts,
isMetricstore: isMetricStore,
isMetricstore: config.MetricStore,
}
if shardHash == "" {
producerBatch.shardHash = nil
Expand Down
1 change: 1 addition & 0 deletions producer/producer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ProducerConfig struct {
LogTags []*sls.LogTag
GeneratePackId bool
CredentialsProvider sls.CredentialsProvider
MetricStore bool

packLock sync.Mutex
packPrefix string
Expand Down

0 comments on commit cd4fc76

Please sign in to comment.