Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: producer support multiple logstore pack id generate #307

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions producer/log_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type LogAccumulator struct {
logger log.Logger
threadPool *IoThreadPool
producer *Producer
packIdGenrator *PackIdGenerator
}

func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool, producer *Producer) *LogAccumulator {
Expand All @@ -32,6 +33,7 @@ func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.L
logger: logger,
threadPool: threadPool,
producer: producer,
packIdGenrator: newPackIdGenerator(),
}
}

Expand Down Expand Up @@ -96,10 +98,11 @@ func (logAccumulator *LogAccumulator) createNewProducerBatch(logType interface{}
level.Debug(logAccumulator.logger).Log("msg", "Create a new ProducerBatch")

if mlog, ok := logType.(*sls.Log); ok {
newProducerBatch := initProducerBatch(mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)

newProducerBatch := initProducerBatch(logAccumulator.packIdGenrator, mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
logAccumulator.logGroupData[key] = newProducerBatch
} else if logList, ok := logType.([]*sls.Log); ok {
newProducerBatch := initProducerBatch(logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
newProducerBatch := initProducerBatch(logAccumulator.packIdGenrator, logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
logAccumulator.logGroupData[key] = newProducerBatch
}
}
Expand Down
57 changes: 57 additions & 0 deletions producer/pack_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package producer

import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
)

type PackIdGenerator struct {
mutex sync.RWMutex
logstorePackIdGenerator map[string]*LogStorePackIdGenerator
count atomic.Int32
}

func newPackIdGenerator() *PackIdGenerator {
return &PackIdGenerator{
logstorePackIdGenerator: make(map[string]*LogStorePackIdGenerator),
}
}

func (g *PackIdGenerator) GeneratePackId(project, logstore string) string {
key := project + "|" + logstore

// fast path, logstore already has a generator
g.mutex.RLock()
if l, ok := g.logstorePackIdGenerator[key]; ok {
packNumber := l.packNumber.Add(1)
g.mutex.RUnlock()
return fmt.Sprintf("%s%X", l.prefix, packNumber-1)
}
g.mutex.RUnlock()

// slow path
g.mutex.Lock()
if _, ok := g.logstorePackIdGenerator[key]; !ok {
g.logstorePackIdGenerator[key] = newLogStorePackIdGenerator(g.count.Add(1))
}
l := g.logstorePackIdGenerator[key]
packNumber := l.packNumber.Add(1)
g.mutex.Unlock()
return fmt.Sprintf("%s%X", l.prefix, packNumber-1)
}

type LogStorePackIdGenerator struct {
packNumber atomic.Int64
prefix string // with "-"
}

func newLogStorePackIdGenerator(id int32) *LogStorePackIdGenerator {
hash := fmt.Sprintf("%d-%d", time.Now().UnixNano(), id)
return &LogStorePackIdGenerator{
packNumber: atomic.Int64{},
prefix: strings.ToUpper(generatePackId(hash)) + "-",
}
}
45 changes: 45 additions & 0 deletions producer/pack_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package producer

import (
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestPackIdGenerator(t *testing.T) {
g := newPackIdGenerator()
wg := &sync.WaitGroup{}
m := 1000
n := 10
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
project := fmt.Sprintf("test%d", i)
logstore := fmt.Sprintf("test%d", i)
results := make([]string, 0, m)
for j := 0; j < m; j++ {
result := g.GeneratePackId(project, logstore)
results = append(results, result)
}
prefix := results[0][:16]
for j := 0; j < m; j++ {
assert.Equal(t, prefix, results[j][:16])
suffix := results[j][17:]
assert.Equal(t, fmt.Sprintf("%X", j), suffix)
}

wg.Done()
}(i)
}
wg.Wait()
}

// BenchmarkPackIdGenerator-12 8366719 120.7 ns/op 64 B/op 4 allocs/op
func BenchmarkPackIdGenerator(b *testing.B) {
g := newPackIdGenerator()
for i := 0; i < b.N; i++ {
g.GeneratePackId("test", "test")
}
}
12 changes: 2 additions & 10 deletions producer/producer_batch.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package producer

import (
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -36,7 +34,7 @@ func generatePackId(source string) string {
return ToMd5(srcData)[0:16]
}

func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig) *ProducerBatch {
func initProducerBatch(packIdGenerator *PackIdGenerator, logData interface{}, callBackFunc CallBack, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig) *ProducerBatch {
logs := []*sls.Log{}

if log, ok := logData.(*sls.Log); ok {
Expand All @@ -52,17 +50,11 @@ func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logs
Source: proto.String(logSource),
}
if config.GeneratePackId {
config.packLock.Lock()
if config.packPrefix == "" {
config.packPrefix = strings.ToUpper(generatePackId(logSource)) + "-"
}
packStr := config.packPrefix + fmt.Sprintf("%X", config.packNumber)
packStr := packIdGenerator.GeneratePackId(project, logstore)
logGroup.LogTags = append(logGroup.LogTags, &sls.LogTag{
Key: proto.String("__pack_id__"),
Value: proto.String(packStr),
})
config.packNumber++
config.packLock.Unlock()
}
currentTimeMs := GetTimeMs(time.Now().UnixNano())
producerBatch := &ProducerBatch{
Expand Down
5 changes: 0 additions & 5 deletions producer/producer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package producer

import (
"net/http"
"sync"
"time"

sls "github.com/aliyun/aliyun-log-go-sdk"
Expand Down Expand Up @@ -40,10 +39,6 @@ type ProducerConfig struct {
CredentialsProvider sls.CredentialsProvider
UseMetricStoreURL bool

packLock sync.Mutex
packPrefix string
packNumber int64

// Deprecated: use CredentialsProvider and UpdateFuncProviderAdapter instead.
//
// Example:
Expand Down
Loading