From bc0f39da120d4458df2e851a3e0d9a897ca1e5ee Mon Sep 17 00:00:00 2001 From: Crimson <39024757+crimson-gao@users.noreply.github.com> Date: Fri, 6 Dec 2024 14:28:42 +0800 Subject: [PATCH] feat: producer support multiple logstore pack id generate (#307) * feat: producer support multiple logstroe pack id generate * chore: refine add benchmark --- producer/log_accumulator.go | 7 +++-- producer/pack_id.go | 57 +++++++++++++++++++++++++++++++++++++ producer/pack_id_test.go | 45 +++++++++++++++++++++++++++++ producer/producer_batch.go | 12 ++------ producer/producer_config.go | 5 ---- 5 files changed, 109 insertions(+), 17 deletions(-) create mode 100644 producer/pack_id.go create mode 100644 producer/pack_id_test.go diff --git a/producer/log_accumulator.go b/producer/log_accumulator.go index 8b19ee08..3ca7f299 100644 --- a/producer/log_accumulator.go +++ b/producer/log_accumulator.go @@ -21,6 +21,7 @@ type LogAccumulator struct { logger log.Logger threadPool *IoThreadPool producer *Producer + packIdGenrator *PackIdGenerator } func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool, producer *Producer) *LogAccumulator { @@ -32,6 +33,7 @@ func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.L logger: logger, threadPool: threadPool, producer: producer, + packIdGenrator: newPackIdGenerator(), } } @@ -96,10 +98,11 @@ func (logAccumulator *LogAccumulator) createNewProducerBatch(logType interface{} level.Debug(logAccumulator.logger).Log("msg", "Create a new ProducerBatch") if mlog, ok := logType.(*sls.Log); ok { - newProducerBatch := initProducerBatch(mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig) + + newProducerBatch := initProducerBatch(logAccumulator.packIdGenrator, mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig) logAccumulator.logGroupData[key] = newProducerBatch } else if logList, ok := logType.([]*sls.Log); ok { - newProducerBatch := initProducerBatch(logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig) + newProducerBatch := initProducerBatch(logAccumulator.packIdGenrator, logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig) logAccumulator.logGroupData[key] = newProducerBatch } } diff --git a/producer/pack_id.go b/producer/pack_id.go new file mode 100644 index 00000000..08c2f126 --- /dev/null +++ b/producer/pack_id.go @@ -0,0 +1,57 @@ +package producer + +import ( + "fmt" + "strings" + "sync" + "sync/atomic" + "time" +) + +type PackIdGenerator struct { + mutex sync.RWMutex + logstorePackIdGenerator map[string]*LogStorePackIdGenerator + count atomic.Int32 +} + +func newPackIdGenerator() *PackIdGenerator { + return &PackIdGenerator{ + logstorePackIdGenerator: make(map[string]*LogStorePackIdGenerator), + } +} + +func (g *PackIdGenerator) GeneratePackId(project, logstore string) string { + key := project + "|" + logstore + + // fast path, logstore already has a generator + g.mutex.RLock() + if l, ok := g.logstorePackIdGenerator[key]; ok { + packNumber := l.packNumber.Add(1) + g.mutex.RUnlock() + return fmt.Sprintf("%s%X", l.prefix, packNumber-1) + } + g.mutex.RUnlock() + + // slow path + g.mutex.Lock() + if _, ok := g.logstorePackIdGenerator[key]; !ok { + g.logstorePackIdGenerator[key] = newLogStorePackIdGenerator(g.count.Add(1)) + } + l := g.logstorePackIdGenerator[key] + packNumber := l.packNumber.Add(1) + g.mutex.Unlock() + return fmt.Sprintf("%s%X", l.prefix, packNumber-1) +} + +type LogStorePackIdGenerator struct { + packNumber atomic.Int64 + prefix string // with "-" +} + +func newLogStorePackIdGenerator(id int32) *LogStorePackIdGenerator { + hash := fmt.Sprintf("%d-%d", time.Now().UnixNano(), id) + return &LogStorePackIdGenerator{ + packNumber: atomic.Int64{}, + prefix: strings.ToUpper(generatePackId(hash)) + "-", + } +} diff --git a/producer/pack_id_test.go b/producer/pack_id_test.go new file mode 100644 index 00000000..f487a6ed --- /dev/null +++ b/producer/pack_id_test.go @@ -0,0 +1,45 @@ +package producer + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPackIdGenerator(t *testing.T) { + g := newPackIdGenerator() + wg := &sync.WaitGroup{} + m := 1000 + n := 10 + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + project := fmt.Sprintf("test%d", i) + logstore := fmt.Sprintf("test%d", i) + results := make([]string, 0, m) + for j := 0; j < m; j++ { + result := g.GeneratePackId(project, logstore) + results = append(results, result) + } + prefix := results[0][:16] + for j := 0; j < m; j++ { + assert.Equal(t, prefix, results[j][:16]) + suffix := results[j][17:] + assert.Equal(t, fmt.Sprintf("%X", j), suffix) + } + + wg.Done() + }(i) + } + wg.Wait() +} + +// BenchmarkPackIdGenerator-12 8366719 120.7 ns/op 64 B/op 4 allocs/op +func BenchmarkPackIdGenerator(b *testing.B) { + g := newPackIdGenerator() + for i := 0; i < b.N; i++ { + g.GeneratePackId("test", "test") + } +} diff --git a/producer/producer_batch.go b/producer/producer_batch.go index c9f94338..36a72632 100644 --- a/producer/producer_batch.go +++ b/producer/producer_batch.go @@ -1,8 +1,6 @@ package producer import ( - "fmt" - "strings" "sync" "time" @@ -36,7 +34,7 @@ func generatePackId(source string) string { return ToMd5(srcData)[0:16] } -func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig) *ProducerBatch { +func initProducerBatch(packIdGenerator *PackIdGenerator, logData interface{}, callBackFunc CallBack, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig) *ProducerBatch { logs := []*sls.Log{} if log, ok := logData.(*sls.Log); ok { @@ -52,17 +50,11 @@ func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logs Source: proto.String(logSource), } if config.GeneratePackId { - config.packLock.Lock() - if config.packPrefix == "" { - config.packPrefix = strings.ToUpper(generatePackId(logSource)) + "-" - } - packStr := config.packPrefix + fmt.Sprintf("%X", config.packNumber) + packStr := packIdGenerator.GeneratePackId(project, logstore) logGroup.LogTags = append(logGroup.LogTags, &sls.LogTag{ Key: proto.String("__pack_id__"), Value: proto.String(packStr), }) - config.packNumber++ - config.packLock.Unlock() } currentTimeMs := GetTimeMs(time.Now().UnixNano()) producerBatch := &ProducerBatch{ diff --git a/producer/producer_config.go b/producer/producer_config.go index 706b4414..b3d00f37 100644 --- a/producer/producer_config.go +++ b/producer/producer_config.go @@ -2,7 +2,6 @@ package producer import ( "net/http" - "sync" "time" sls "github.com/aliyun/aliyun-log-go-sdk" @@ -40,10 +39,6 @@ type ProducerConfig struct { CredentialsProvider sls.CredentialsProvider UseMetricStoreURL bool - packLock sync.Mutex - packPrefix string - packNumber int64 - // Deprecated: use CredentialsProvider and UpdateFuncProviderAdapter instead. // // Example: