Skip to content

Commit

Permalink
use unique topic names per test + fix consumer test
Browse files Browse the repository at this point in the history
  • Loading branch information
rarguelloF committed Jan 20, 2025
1 parent a33040c commit f7d2f03
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 41 deletions.
23 changes: 13 additions & 10 deletions contrib/IBM/sarama.v1/consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (

func TestWrapConsumerGroupHandler(t *testing.T) {
cfg := newIntegrationTestConfig(t)
topic := topicName(t)
groupID := "IBM/sarama/TestWrapConsumerGroupHandler"

mt := mocktracer.Start()
defer mt.Stop()

cg, err := sarama.NewConsumerGroup(kafkaBrokers, testGroupID, cfg)
cg, err := sarama.NewConsumerGroup(kafkaBrokers, groupID, cfg)
require.NoError(t, err)
defer func() {
assert.NoError(t, cg.Close())
Expand All @@ -36,7 +38,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) {
ready: make(chan bool),
rcvMessages: make(chan *sarama.ConsumerMessage, 1),
}
tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams(), WithGroupID(testGroupID))
tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams(), WithGroupID(groupID))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -49,7 +51,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := cg.Consume(ctx, []string{testTopic}, tracedHandler); err != nil {
if err := cg.Consume(ctx, []string{topic}, tracedHandler); err != nil {
assert.ErrorIs(t, err, sarama.ErrClosedConsumerGroup)
return
}
Expand All @@ -64,13 +66,14 @@ func TestWrapConsumerGroupHandler(t *testing.T) {
log.Println("Sarama consumer up and running!...")

p, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)

require.NoError(t, err)
p = WrapSyncProducer(cfg, p, WithDataStreams())
defer func() {
assert.NoError(t, p.Close())
}()

produceMsg := &sarama.ProducerMessage{
Topic: testTopic,
Topic: topic,
Value: sarama.StringEncoder("test 1"),
Metadata: "test",
}
Expand All @@ -88,28 +91,28 @@ func TestWrapConsumerGroupHandler(t *testing.T) {
s0 := spans[0]
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic gotest_ibm_sarama", s0.Tag(ext.ResourceName))
assert.Equal(t, "Produce Topic "+topic, s0.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s0.OperationName())
assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s0.Tag("offset"))
assert.Equal(t, "IBM/sarama", s0.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))

assertDSMProducerPathway(t, testTopic, produceMsg)
assertDSMProducerPathway(t, topic, produceMsg)

s1 := spans[1]
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, "Consume Topic gotest_ibm_sarama", s1.Tag(ext.ResourceName))
assert.Equal(t, "Consume Topic "+topic, s1.Tag(ext.ResourceName))
assert.Equal(t, "kafka.consume", s1.OperationName())
assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s1.Tag("offset"))
assert.Equal(t, "IBM/sarama", s1.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem))

assertDSMConsumerPathway(t, testTopic, testGroupID, consumeMsg, true)
assertDSMConsumerPathway(t, topic, groupID, consumeMsg, true)

assert.Equal(t, s0.SpanID(), s1.ParentID(), "spans are not parent-child")
}
Expand Down
31 changes: 25 additions & 6 deletions contrib/IBM/sarama.v1/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package sarama

import (
"fmt"
"testing"

"github.com/IBM/sarama"
Expand All @@ -20,6 +21,7 @@ import (
func TestWrapConsumer(t *testing.T) {
cfg := newIntegrationTestConfig(t)
cfg.Version = sarama.MinVersion
topic := topicName(t)

mt := mocktracer.Start()
defer mt.Stop()
Expand All @@ -30,12 +32,29 @@ func TestWrapConsumer(t *testing.T) {

consumer, err := sarama.NewConsumerFromClient(client)
require.NoError(t, err)
consumer = WrapConsumer(consumer, WithDataStreams())
defer consumer.Close()

consumer = WrapConsumer(consumer, WithDataStreams())
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)
defer partitionConsumer.Close()

partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0)
p, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
defer func() {
assert.NoError(t, p.Close())
}()

for i := 1; i <= 2; i++ {
produceMsg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(fmt.Sprintf("test %d", i)),
Metadata: fmt.Sprintf("test %d", i),
}
_, _, err = p.SendMessage(produceMsg)
require.NoError(t, err)
}

msg1 := <-partitionConsumer.Messages()
msg2 := <-partitionConsumer.Messages()
err = partitionConsumer.Close()
Expand All @@ -56,14 +75,14 @@ func TestWrapConsumer(t *testing.T) {
assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName))
assert.Equal(t, "Consume Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "kafka.consume", s.OperationName())
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

assertDSMConsumerPathway(t, "test-topic", "", msg1, false)
assertDSMConsumerPathway(t, topic, "", msg1, false)
}
{
s := spans[1]
Expand All @@ -75,13 +94,13 @@ func TestWrapConsumer(t *testing.T) {
assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName))
assert.Equal(t, "Consume Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "kafka.consume", s.OperationName())
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

assertDSMConsumerPathway(t, "test-topic", "", msg2, false)
assertDSMConsumerPathway(t, topic, "", msg2, false)
}
}
42 changes: 29 additions & 13 deletions contrib/IBM/sarama.v1/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ import (

func TestSyncProducer(t *testing.T) {
cfg := newIntegrationTestConfig(t)
topic := topicName(t)

mt := mocktracer.Start()
defer mt.Stop()

producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapSyncProducer(cfg, producer, WithDataStreams())
defer func() {
assert.NoError(t, producer.Close())
}()

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Topic: topic,
Value: sarama.StringEncoder("test 1"),
Metadata: "test",
}
Expand All @@ -40,35 +44,39 @@ func TestSyncProducer(t *testing.T) {
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

assertDSMProducerPathway(t, "my_topic", msg1)
assertDSMProducerPathway(t, topic, msg1)
}
}

func TestSyncProducerSendMessages(t *testing.T) {
cfg := newIntegrationTestConfig(t)
topic := topicName(t)

mt := mocktracer.Start()
defer mt.Stop()

producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapSyncProducer(cfg, producer, WithDataStreams())
defer func() {
assert.NoError(t, producer.Close())
}()

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Topic: topic,
Value: sarama.StringEncoder("test 1"),
Metadata: "test",
}
msg2 := &sarama.ProducerMessage{
Topic: "my_topic",
Topic: topic,
Value: sarama.StringEncoder("test 2"),
Metadata: "test",
}
Expand All @@ -80,7 +88,7 @@ func TestSyncProducerSendMessages(t *testing.T) {
for _, s := range spans {
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
Expand All @@ -89,7 +97,7 @@ func TestSyncProducerSendMessages(t *testing.T) {
}

for _, msg := range []*sarama.ProducerMessage{msg1, msg2} {
assertDSMProducerPathway(t, "my_topic", msg)
assertDSMProducerPathway(t, topic, msg)
}
}

Expand All @@ -99,16 +107,20 @@ func TestWrapAsyncProducer(t *testing.T) {
t.Run("Without Successes", func(t *testing.T) {
cfg := newIntegrationTestConfig(t)
cfg.Producer.Return.Successes = false
topic := topicName(t)

mt := mocktracer.Start()
defer mt.Stop()

producer, err := sarama.NewAsyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapAsyncProducer(cfg, producer, WithDataStreams())
defer func() {
assert.NoError(t, producer.Close())
}()

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Topic: topic,
Value: sarama.StringEncoder("test 1"),
}
producer.Input() <- msg1
Expand All @@ -121,7 +133,7 @@ func TestWrapAsyncProducer(t *testing.T) {
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())

// these tags are set in the finishProducerSpan function, but in this case it's never used, and instead we
Expand All @@ -133,23 +145,27 @@ func TestWrapAsyncProducer(t *testing.T) {
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

assertDSMProducerPathway(t, "my_topic", msg1)
assertDSMProducerPathway(t, topic, msg1)
}
})

t.Run("With Successes", func(t *testing.T) {
cfg := newIntegrationTestConfig(t)
cfg.Producer.Return.Successes = true
topic := topicName(t)

mt := mocktracer.Start()
defer mt.Stop()

producer, err := sarama.NewAsyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapAsyncProducer(cfg, producer, WithDataStreams())
defer func() {
assert.NoError(t, producer.Close())
}()

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Topic: topic,
Value: sarama.StringEncoder("test 1"),
}
producer.Input() <- msg1
Expand All @@ -161,15 +177,15 @@ func TestWrapAsyncProducer(t *testing.T) {
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

assertDSMProducerPathway(t, "my_topic", msg1)
assertDSMProducerPathway(t, topic, msg1)
}
})
}
Loading

0 comments on commit f7d2f03

Please sign in to comment.