diff --git a/client_store.go b/client_store.go index 66dc7842..e2891162 100644 --- a/client_store.go +++ b/client_store.go @@ -105,7 +105,8 @@ func (c *Client) PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKe func (c *Client) PostMetricStoreLogs(project, logstore string, lg *LogGroup) (err error) { ls := convertLogstore(c, project, logstore) - return ls.PostMetricStoreLogs(lg) + ls.MetricStore = true + return ls.PutLogs(lg) } // PostRawLogWithCompressType put raw log data to log service, no marshal diff --git a/log_store.go b/log_store.go index 553b60bc..401d69e9 100644 --- a/log_store.go +++ b/log_store.go @@ -40,6 +40,7 @@ type LogStore struct { putLogCompressType int EncryptConf *EncryptConf `json:"encrypt_conf,omitempty"` ProductType string `json:"productType,omitempty"` + MetricStore bool `json:"metric_store"` } // Shard defines shard struct @@ -303,8 +304,12 @@ func (s *LogStore) PutLogs(lg *LogGroup) (err error) { } outLen = len(out) } - - uri := fmt.Sprintf("/logstores/%v", s.Name) + var uri string + if s.MetricStore { + uri = s.getMetricStoreURL() + } else { + uri = fmt.Sprintf("/logstores/%v", s.Name) + } r, err := request(s.project, "POST", uri, h, out[:outLen]) if err != nil { return NewClientError(err) @@ -321,67 +326,8 @@ func (s *LogStore) PutLogs(lg *LogGroup) (err error) { return nil } -// PostMetricStoreLogs put logs into metricstore -// The callers should transform user logs into LogGroup. -func (s *LogStore) PostMetricStoreLogs(lg *LogGroup) (err error) { - if len(lg.Logs) == 0 { - // empty log group - return nil - } - - body, err := proto.Marshal(lg) - if err != nil { - return NewClientError(err) - } - - var out []byte - var h map[string]string - var outLen int - switch s.putLogCompressType { - case Compress_LZ4: - // Compresse body with lz4 - out = make([]byte, lz4.CompressBlockBound(len(body))) - var hashTable [1 << 16]int - n, err := lz4.CompressBlock(body, out, hashTable[:]) - if err != nil { - return NewClientError(err) - } - // copy incompressible data as lz4 format - if n == 0 { - n, _ = copyIncompressible(body, out) - } - - h = map[string]string{ - "x-log-compresstype": "lz4", - "x-log-bodyrawsize": strconv.Itoa(len(body)), - "Content-Type": "application/x-protobuf", - } - outLen = n - break - case Compress_None: - // no compress - out = body - h = map[string]string{ - "x-log-bodyrawsize": strconv.Itoa(len(body)), - "Content-Type": "application/x-protobuf", - } - outLen = len(out) - } - uri := fmt.Sprintf("/prometheus/%s/%s/api/v1/write", s.project.Name, s.Name) - r, err := request(s.project, "POST", uri, h, out[:outLen]) - if err != nil { - return NewClientError(err) - } - defer r.Body.Close() - body, _ = ioutil.ReadAll(r.Body) - if r.StatusCode != http.StatusOK { - err := new(Error) - if jErr := json.Unmarshal(body, err); jErr != nil { - return NewBadResponseError(string(body), r.Header, r.StatusCode) - } - return err - } - return nil +func (s *LogStore) getMetricStoreURL() string { + return fmt.Sprintf("/prometheus/%s/%s/api/v1/write", s.project.Name, s.Name) } // PostLogStoreLogs put logs into Shard logstore by hashKey. @@ -392,7 +338,7 @@ func (s *LogStore) PostLogStoreLogs(lg *LogGroup, hashKey *string) (err error) { return nil } - if hashKey == nil || *hashKey == "" { + if hashKey == nil || *hashKey == "" || s.MetricStore { // empty hash call PutLogs return s.PutLogs(lg) } diff --git a/producer/log_accumulator.go b/producer/log_accumulator.go index 205852cd..8b19ee08 100644 --- a/producer/log_accumulator.go +++ b/producer/log_accumulator.go @@ -35,7 +35,7 @@ func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.L } } -func (logAccumulator *LogAccumulator) addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash string, producerBatch *ProducerBatch, log interface{}, callback CallBack, isMetricStore bool) { +func (logAccumulator *LogAccumulator) addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash string, producerBatch *ProducerBatch, log interface{}, callback CallBack) { totalDataCount := producerBatch.getLogGroupCount() + 1 if int64(producerBatch.totalDataSize) > logAccumulator.producerConfig.MaxBatchSize && producerBatch.totalDataSize < 5242880 && totalDataCount <= logAccumulator.producerConfig.MaxBatchCount { producerBatch.addLogToLogGroup(log) @@ -50,13 +50,13 @@ func (logAccumulator *LogAccumulator) addOrSendProducerBatch(key, project, logst } } else { logAccumulator.innerSendToServer(key, producerBatch) - logAccumulator.createNewProducerBatch(log, callback, key, project, logstore, logTopic, logSource, shardHash, isMetricStore) + logAccumulator.createNewProducerBatch(log, callback, key, project, logstore, logTopic, logSource, shardHash) } } // In this function,Naming with mlog is to avoid conflicts with the introduced kit/log package names. func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, shardHash, logTopic, logSource string, - logData interface{}, callback CallBack, isMetricStore bool) error { + logData interface{}, callback CallBack) error { if logAccumulator.shutDownFlag.Load() { level.Warn(logAccumulator.logger).Log("msg", "Producer has started and shut down and cannot write to new logs") return errors.New("Producer has started and shut down and cannot write to new logs") @@ -70,19 +70,19 @@ func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, s logSize := int64(GetLogSizeCalculate(mlog)) atomic.AddInt64(&producerBatch.totalDataSize, logSize) atomic.AddInt64(&logAccumulator.producer.producerLogGroupSize, logSize) - logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, mlog, callback, isMetricStore) + logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, mlog, callback) } else { - logAccumulator.createNewProducerBatch(mlog, callback, key, project, logstore, logTopic, logSource, shardHash, isMetricStore) + logAccumulator.createNewProducerBatch(mlog, callback, key, project, logstore, logTopic, logSource, shardHash) } } else if logList, ok := logData.([]*sls.Log); ok { if producerBatch, ok := logAccumulator.logGroupData[key]; ok == true { logListSize := int64(GetLogListSize(logList)) atomic.AddInt64(&producerBatch.totalDataSize, logListSize) atomic.AddInt64(&logAccumulator.producer.producerLogGroupSize, logListSize) - logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, logList, callback, isMetricStore) + logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, logList, callback) } else { - logAccumulator.createNewProducerBatch(logList, callback, key, project, logstore, logTopic, logSource, shardHash, isMetricStore) + logAccumulator.createNewProducerBatch(logList, callback, key, project, logstore, logTopic, logSource, shardHash) } } else { level.Error(logAccumulator.logger).Log("msg", "Invalid logType") @@ -92,14 +92,14 @@ func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, s } -func (logAccumulator *LogAccumulator) createNewProducerBatch(logType interface{}, callback CallBack, key, project, logstore, logTopic, logSource, shardHash string, isMetricStore bool) { +func (logAccumulator *LogAccumulator) createNewProducerBatch(logType interface{}, callback CallBack, key, project, logstore, logTopic, logSource, shardHash string) { level.Debug(logAccumulator.logger).Log("msg", "Create a new ProducerBatch") if mlog, ok := logType.(*sls.Log); ok { - newProducerBatch := initProducerBatch(mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig, isMetricStore) + newProducerBatch := initProducerBatch(mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig) logAccumulator.logGroupData[key] = newProducerBatch } else if logList, ok := logType.([]*sls.Log); ok { - newProducerBatch := initProducerBatch(logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig, isMetricStore) + newProducerBatch := initProducerBatch(logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig) logAccumulator.logGroupData[key] = newProducerBatch } } diff --git a/producer/producer.go b/producer/producer.go index 7901b432..5a8e5e55 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -170,24 +170,6 @@ func (producer *Producer) SendLogList(project, logstore, topic, source string, l } -func (producer *Producer) SendMetricStoreLog(project, logstore, topic, source string, log *sls.Log) error { - err := producer.waitTime() - if err != nil { - return err - } - return producer.logAccumulator.addLogToProducerBatch(project, logstore, "", topic, source, log, nil, true) -} - -func (producer *Producer) SendMetricStoreLogList(project, logstore, topic, source string, logList []*sls.Log) (err error) { - err = producer.waitTime() - if err != nil { - return err - } - - return producer.logAccumulator.addLogToProducerBatch(project, logstore, "", topic, source, logList, nil, true) - -} - func (producer *Producer) SendMetricStoreLogWithCallBack(project, logstore, topic, source string, log *sls.Log, callback CallBack) error { err := producer.waitTime() if err != nil { diff --git a/producer/producer_batch.go b/producer/producer_batch.go index ed661cd5..77f73c9f 100644 --- a/producer/producer_batch.go +++ b/producer/producer_batch.go @@ -36,7 +36,7 @@ func generatePackId(source string) string { return ToMd5(srcData)[0:16] } -func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig, isMetricStore bool) *ProducerBatch { +func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig) *ProducerBatch { logs := []*sls.Log{} if log, ok := logData.(*sls.Log); ok { @@ -77,7 +77,7 @@ func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logs logstore: logstore, result: initResult(), maxReservedAttempts: config.MaxReservedAttempts, - isMetricstore: isMetricStore, + isMetricstore: config.MetricStore, } if shardHash == "" { producerBatch.shardHash = nil diff --git a/producer/producer_config.go b/producer/producer_config.go index ead72459..5e17cf25 100644 --- a/producer/producer_config.go +++ b/producer/producer_config.go @@ -38,6 +38,7 @@ type ProducerConfig struct { LogTags []*sls.LogTag GeneratePackId bool CredentialsProvider sls.CredentialsProvider + MetricStore bool packLock sync.Mutex packPrefix string