From 76c6cc0e1e470e6fac7ddb1273ad45fe2feeb6f3 Mon Sep 17 00:00:00 2001 From: Jaewook Lee Date: Sat, 6 Jul 2024 16:48:54 +0900 Subject: [PATCH 1/4] Chore: now TopicStreamer must have consumer group id --- streamer.go | 23 +++++++++++++++++------ streamer_test.go | 31 ++++++++++++++++--------------- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/streamer.go b/streamer.go index 9084eaf..ceaf84d 100644 --- a/streamer.go +++ b/streamer.go @@ -9,22 +9,24 @@ import ( "sync" ) +// TopicStreamer is a streamer that streams messages from a topic to other topics. type TopicStreamer struct { topic common.Topic configs []StreamConfig cancel context.CancelFunc mutex *sync.Mutex + groupId string consumer *internal.StreamConsumer } // NewTopicStreamer creates a new topic streamer that streams messages from a topic to other topics. -// The streamer is configured with a list of brokers and a topic to stream from. +// The streamer is configured with a list of brokers, a topic to stream from and a consumer group id . // If you want to override the default configuration of the sarama consumer and producer, you can pass additional arguments. -// - ts := NewTopicStreamer(brokers, topic) -// - ts := NewTopicStreamer(brokers, topic, consumerConfig, producerConfig) -// - ts := NewTopicStreamer(brokers, topic, nil, producerConfig) -func NewTopicStreamer(brokers []string, topic common.Topic, args ...interface{}) *TopicStreamer { +// - ts := NewTopicStreamer(brokers, topic, groupId) +// - ts := NewTopicStreamer(brokers, topic, groupId, consumerConfig, producerConfig) +// - ts := NewTopicStreamer(brokers, topic, groupId, nil, producerConfig) +func NewTopicStreamer(brokers []string, topic common.Topic, groupId string, args ...interface{}) *TopicStreamer { var ccfg *sarama.Config var pcfg *sarama.Config @@ -44,9 +46,13 @@ func NewTopicStreamer(brokers []string, topic common.Topic, args ...interface{}) panic("Invalid number of arguments") } + if groupId == "" { + groupId = "queue-streamer-default-group" + } + consumer := internal.NewStreamConsumer( topic, - "groupId", + groupId, brokers, ccfg, pcfg, @@ -58,6 +64,7 @@ func NewTopicStreamer(brokers []string, topic common.Topic, args ...interface{}) cancel: nil, consumer: consumer, mutex: &sync.Mutex{}, + groupId: groupId, } } @@ -77,6 +84,10 @@ func (ts *TopicStreamer) AddConfig(config StreamConfig) { ts.configs = append(ts.configs, config) } +func (ts *TopicStreamer) GroupId() string { + return ts.groupId +} + func (ts *TopicStreamer) Run() { dests := make([]common.Topic, 0) mss := make([]common.MessageSerializer, 0) diff --git a/streamer_test.go b/streamer_test.go index 41e8b77..611dc85 100644 --- a/streamer_test.go +++ b/streamer_test.go @@ -9,7 +9,8 @@ import ( "time" ) -var brokers = []string{"localhost:9093"} +var brokers = []string{"kafka.vp-datacenter-1.violetpay.net:9092", "kafka.vp-datacenter-1.violetpay.net:9093", "kafka.vp-datacenter-1.violetpay.net:9094"} + var topic = qstreamer.Topic("test", 1) func TestTopicStreamer_NewTopicStreamer(t *testing.T) { @@ -24,18 +25,18 @@ func TestTopicStreamer_NewTopicStreamer(t *testing.T) { streamer = nil }) - streamer = qstreamer.NewTopicStreamer(brokers, topic) + streamer = qstreamer.NewTopicStreamer(brokers, topic, "") assert.NotNil(t, streamer) }) t.Run("NewTopicStreamer with additional arguments", func(t *testing.T) { t.Run("Invalid number of arguments", func(t *testing.T) { assert.Panics(t, func() { - qstreamer.NewTopicStreamer(brokers, topic, nil) + qstreamer.NewTopicStreamer(brokers, topic, "", nil) }) assert.Panics(t, func() { - qstreamer.NewTopicStreamer(brokers, topic, nil, nil, nil) + qstreamer.NewTopicStreamer(brokers, topic, "", nil, nil, nil) }) }) @@ -45,7 +46,7 @@ func TestTopicStreamer_NewTopicStreamer(t *testing.T) { }) config := sarama.NewConfig() - streamer = qstreamer.NewTopicStreamer(brokers, topic, config, nil) + streamer = qstreamer.NewTopicStreamer(brokers, topic, "", config, nil) assert.NotNil(t, streamer) }) @@ -55,7 +56,7 @@ func TestTopicStreamer_NewTopicStreamer(t *testing.T) { }) config := sarama.NewConfig() - streamer = qstreamer.NewTopicStreamer(brokers, topic, nil, config) + streamer = qstreamer.NewTopicStreamer(brokers, topic, "", nil, config) assert.NotNil(t, streamer) }) @@ -66,14 +67,14 @@ func TestTopicStreamer_NewTopicStreamer(t *testing.T) { consumerConfig := sarama.NewConfig() producerConfig := sarama.NewConfig() - streamer = qstreamer.NewTopicStreamer(brokers, topic, consumerConfig, producerConfig) + streamer = qstreamer.NewTopicStreamer(brokers, topic, "", consumerConfig, producerConfig) assert.NotNil(t, streamer) }) }) } func TestTopicStreamer_Getters(t *testing.T) { - streamer := qstreamer.NewTopicStreamer(brokers, topic) + streamer := qstreamer.NewTopicStreamer(brokers, topic, "") t.Run("Topic", func(t *testing.T) { assert.Equal(t, topic, streamer.Topic()) @@ -89,7 +90,7 @@ func TestTopicStreamer_Getters(t *testing.T) { } func TestTopicStreamer_AddConfig(t *testing.T) { - streamer := qstreamer.NewTopicStreamer(brokers, topic) + streamer := qstreamer.NewTopicStreamer(brokers, topic, "") t.Run("AddConfig", func(t *testing.T) { config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), topic) @@ -120,7 +121,7 @@ func TestTopicStreamer_Run(t *testing.T) { streamer = nil }) - streamer = qstreamer.NewTopicStreamer(brokers, topic) + streamer = qstreamer.NewTopicStreamer(brokers, topic, "") config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), topic) streamer.AddConfig(config) @@ -136,7 +137,7 @@ func TestTopicStreamer_Run(t *testing.T) { assert.NotNil(t, err) streamer = nil }) - streamer = qstreamer.NewTopicStreamer(brokers, topic) + streamer = qstreamer.NewTopicStreamer(brokers, topic, "") assert.Panics(t, streamer.Run) }) @@ -148,7 +149,7 @@ func TestTopicStreamer_Run(t *testing.T) { streamer = nil }) - streamer = qstreamer.NewTopicStreamer(brokers, topic) + streamer = qstreamer.NewTopicStreamer(brokers, topic, "") config := qstreamer.NewStreamConfig(nil, topic) streamer.AddConfig(config) @@ -162,7 +163,7 @@ func TestTopicStreamer_Run(t *testing.T) { streamer = nil }) - streamer = qstreamer.NewTopicStreamer(brokers, topic) + streamer = qstreamer.NewTopicStreamer(brokers, topic, "") config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), common.Topic{}) streamer.AddConfig(config) @@ -176,7 +177,7 @@ func TestTopicStreamer_Run(t *testing.T) { streamer = nil }) - streamer = qstreamer.NewTopicStreamer(brokers, topic) + streamer = qstreamer.NewTopicStreamer(brokers, topic, "") config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), common.Topic{Name: "test1"}) streamer.AddConfig(config) @@ -190,7 +191,7 @@ func TestTopicStreamer_Run(t *testing.T) { streamer = nil }) - streamer = qstreamer.NewTopicStreamer(brokers, topic) + streamer = qstreamer.NewTopicStreamer(brokers, topic, "") config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), common.Topic{Partition: 1}) streamer.AddConfig(config) From d087ab2f9c8a9a8debed701f7d09f06cdb764b20 Mon Sep 17 00:00:00 2001 From: Jaewook Lee Date: Sat, 6 Jul 2024 16:50:36 +0900 Subject: [PATCH 2/4] Feat: add DirectStreamer, streams message from a topic to a topic, embeddings TopicStreamer --- direct.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 direct.go diff --git a/direct.go b/direct.go new file mode 100644 index 0000000..458c792 --- /dev/null +++ b/direct.go @@ -0,0 +1,52 @@ +package qstreamer + +import ( + "github.com/violetpay-org/queue-streamer/common" + "github.com/violetpay-org/queue-streamer/internal" +) + +// DirectStreamer is a streamer that streams messages from a topic to a topic. +type DirectStreamer struct { + ts TopicStreamer +} + +// NewDirectStreamer creates a new topic streamer that streams messages from a topic to a topic. +// The streamer is configured with a list of brokers, a topic to stream from and a consumer group id . +// If you want to override the default configuration of the sarama consumer and producer, you can pass additional arguments. +// - ds := NewDirectStreamer(brokers, topic, groupId) +// - ds := NewDirectStreamer(brokers, topic, groupId, consumerConfig, producerConfig) +// - ds := NewDirectStreamer(brokers, topic, groupId, nil, producerConfig) +func NewDirectStreamer(brokers []string, src common.Topic, groupId string, args ...interface{}) *DirectStreamer { + ts := NewTopicStreamer(brokers, src, groupId, args...) + return &DirectStreamer{ + ts: *ts, + } +} + +func (ds *DirectStreamer) Config() StreamConfig { + return ds.ts.configs[0] +} + +func (ds *DirectStreamer) SetConfig(config StreamConfig) { + ds.ts.configs[0] = config +} + +func (ds *DirectStreamer) Topic() common.Topic { + return ds.ts.Topic() +} + +func (ds *DirectStreamer) Consumer() *internal.StreamConsumer { + return ds.ts.Consumer() +} + +func (ds *DirectStreamer) GroupId() string { + return ds.ts.GroupId() +} + +func (ds *DirectStreamer) Run() { + ds.ts.Run() +} + +func (ds *DirectStreamer) Stop() error { + return ds.ts.Stop() +} From 9d2df2daa8e283e2359c488c207aa7b30a5d9132 Mon Sep 17 00:00:00 2001 From: Jaewook Lee Date: Sat, 6 Jul 2024 17:49:22 +0900 Subject: [PATCH 3/4] Fix: DirectStreamer.Config() occurs index out of range --- direct.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/direct.go b/direct.go index 458c792..ea23fe9 100644 --- a/direct.go +++ b/direct.go @@ -23,12 +23,20 @@ func NewDirectStreamer(brokers []string, src common.Topic, groupId string, args } } -func (ds *DirectStreamer) Config() StreamConfig { - return ds.ts.configs[0] +func (ds *DirectStreamer) Config() (bool, StreamConfig) { + if len(ds.ts.configs) == 0 { + return false, StreamConfig{} + } + + return true, ds.ts.configs[0] } func (ds *DirectStreamer) SetConfig(config StreamConfig) { - ds.ts.configs[0] = config + if len(ds.ts.configs) == 0 { + ds.ts.configs = append(ds.ts.configs, config) + } else { + ds.ts.configs[0] = config + } } func (ds *DirectStreamer) Topic() common.Topic { From daa0ebb447fe4f1d24a289defc7577635622db1b Mon Sep 17 00:00:00 2001 From: Jaewook Lee Date: Sat, 6 Jul 2024 17:50:22 +0900 Subject: [PATCH 4/4] Test: DirectStreamer test --- direct_test.go | 147 +++++++++++++++++++++++++++++++++++++++++ internal/utils_test.go | 2 - 2 files changed, 147 insertions(+), 2 deletions(-) create mode 100644 direct_test.go diff --git a/direct_test.go b/direct_test.go new file mode 100644 index 0000000..38c54bb --- /dev/null +++ b/direct_test.go @@ -0,0 +1,147 @@ +package qstreamer_test + +import ( + "github.com/stretchr/testify/assert" + qstreamer "github.com/violetpay-org/queue-streamer" + "testing" + "time" +) + +var dbrokers = []string{"localhost:9093"} +var dTopic = qstreamer.Topic("test", 1) +var dTopic2 = qstreamer.Topic("test2", 2) + +func TestNewDirectStreamer(t *testing.T) { + var ds *qstreamer.DirectStreamer + + t.Cleanup(func() { + ds = nil + }) + + t.Run("NewDirectStreamer", func(t *testing.T) { + ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "") + assert.NotNil(t, ds) + }) +} + +func TestDirectStreamer_Config(t *testing.T) { + var ds *qstreamer.DirectStreamer + + t.Cleanup(func() { + ds = nil + }) + + t.Run("No Config set", func(t *testing.T) { + t.Cleanup(func() { + ds = nil + }) + + ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "") + ok, _ := ds.Config() + + assert.False(t, ok) + }) + + t.Run("Set Config", func(t *testing.T) { + t.Cleanup(func() { + ds = nil + }) + + ms := qstreamer.NewPassThroughSerializer() + ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "") + cfg := qstreamer.NewStreamConfig(ms, dTopic) + ds.SetConfig(cfg) + + ok, cfg := ds.Config() + assert.True(t, ok) + + assert.Equal(t, dTopic, cfg.Topic()) + assert.Equal(t, ms, cfg.MessageSerializer()) + }) + + t.Run("Set Config multiple times", func(t *testing.T) { + t.Cleanup(func() { + ds = nil + }) + + ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "") + + ms1 := qstreamer.NewPassThroughSerializer() + ms2 := qstreamer.NewPassThroughSerializer() + + cfg1 := qstreamer.NewStreamConfig(ms1, dTopic) + cfg2 := qstreamer.NewStreamConfig(ms2, dTopic2) + + ds.SetConfig(cfg1) + ds.SetConfig(cfg2) + + ok, dsCfg := ds.Config() + assert.True(t, ok) + assert.Equal(t, dTopic2, dsCfg.Topic()) + assert.Equal(t, ms2, dsCfg.MessageSerializer()) + }) +} + +func TestDirectStreamer_Consumer(t *testing.T) { + var ds *qstreamer.DirectStreamer + + t.Cleanup(func() { + ds = nil + }) + + t.Run("Consumer", func(t *testing.T) { + ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "") + assert.NotNil(t, ds.Consumer()) + }) +} + +func TestDirectStreamer_Topic(t *testing.T) { + var ds *qstreamer.DirectStreamer + + t.Cleanup(func() { + ds = nil + }) + + t.Run("Topic", func(t *testing.T) { + ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "") + assert.Equal(t, dTopic, ds.Topic()) + }) +} + +func TestDirectStreamer_GroupId(t *testing.T) { + var ds *qstreamer.DirectStreamer + + t.Cleanup(func() { + ds = nil + }) + + t.Run("GroupID", func(t *testing.T) { + ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "testGroupId") + assert.Equal(t, "testGroupId", ds.GroupId()) + }) +} + +func TestDirectStreamer_Run(t *testing.T) { + var ds *qstreamer.DirectStreamer + + t.Cleanup(func() { + ds = nil + }) + + t.Run("Run", func(t *testing.T) { + t.Cleanup(func() { + err := ds.Stop() + assert.Nil(t, err) + ds = nil + }) + + ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "") + cfg := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), dTopic) + ds.SetConfig(cfg) + + assert.NotPanics(t, func() { + go ds.Run() + time.Sleep(1 * time.Second) + }) + }) +} diff --git a/internal/utils_test.go b/internal/utils_test.go index 1f9b923..6e428b8 100644 --- a/internal/utils_test.go +++ b/internal/utils_test.go @@ -7,8 +7,6 @@ import ( ) func TestCopy(t *testing.T) { - t.Parallel() - t.Run("Copy pointer", func(t *testing.T) { type TestStruct struct { Name string