Skip to content

Commit

Permalink
contrib/IBM/sarama.v1: stop using mockbroker for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rarguelloF committed Jan 20, 2025
1 parent 2e3193f commit bc8b350
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 127 deletions.
10 changes: 1 addition & 9 deletions contrib/IBM/sarama.v1/consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package sarama
import (
"context"
"log"
"os"
"sync"
"testing"

Expand All @@ -21,18 +20,11 @@ import (
)

func TestWrapConsumerGroupHandler(t *testing.T) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)")
}
cfg := newIntegrationTestConfig(t)

mt := mocktracer.Start()
defer mt.Stop()

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0 // first version that supports headers
cfg.Producer.Return.Successes = true
cfg.Producer.Flush.Messages = 1

cg, err := sarama.NewConsumerGroup(kafkaBrokers, testGroupID, cfg)
require.NoError(t, err)
defer func() {
Expand Down
27 changes: 7 additions & 20 deletions contrib/IBM/sarama.v1/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,13 @@ import (
)

func TestWrapConsumer(t *testing.T) {
cfg := newIntegrationTestConfig(t)
cfg.Version = sarama.MinVersion

mt := mocktracer.Start()
defer mt.Stop()

broker := sarama.NewMockBroker(t, 0)
defer broker.Close()

broker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()).
SetLeader("test-topic", 0, broker.BrokerID()),
"OffsetRequest": sarama.NewMockOffsetResponse(t).
SetOffset("test-topic", 0, sarama.OffsetOldest, 0).
SetOffset("test-topic", 0, sarama.OffsetNewest, 1),
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")).
SetMessage("test-topic", 0, 1, sarama.StringEncoder("world")),
})
cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion

client, err := sarama.NewClient([]string{broker.Addr()}, cfg)
client, err := sarama.NewClient(kafkaBrokers, cfg)
require.NoError(t, err)
defer client.Close()

Expand All @@ -56,6 +42,7 @@ func TestWrapConsumer(t *testing.T) {
require.NoError(t, err)
// wait for the channel to be closed
<-partitionConsumer.Messages()
waitForSpans(mt, 2)

spans := mt.FinishedSpans()
require.Len(t, spans, 2)
Expand All @@ -67,7 +54,7 @@ func TestWrapConsumer(t *testing.T) {
"span context should be injected into the consumer message headers")

assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, int64(0), s.Tag("offset"))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
Expand All @@ -86,7 +73,7 @@ func TestWrapConsumer(t *testing.T) {
"span context should be injected into the consumer message headers")

assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, int64(1), s.Tag("offset"))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
Expand Down
78 changes: 18 additions & 60 deletions contrib/IBM/sarama.v1/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,12 @@ import (
)

func TestSyncProducer(t *testing.T) {
cfg := newIntegrationTestConfig(t)

mt := mocktracer.Start()
defer mt.Stop()

seedBroker := sarama.NewMockBroker(t, 1)
defer seedBroker.Close()

leader := sarama.NewMockBroker(t, 2)
defer leader.Close()

metadataResponse := new(sarama.MetadataResponse)
metadataResponse.Version = 1
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
seedBroker.Returns(metadataResponse)

prodSuccess := new(sarama.ProduceResponse)
prodSuccess.Version = 2
prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError)
leader.Returns(prodSuccess)

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0 // first version that supports headers
cfg.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg)
producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapSyncProducer(cfg, producer, WithDataStreams())

Expand All @@ -54,15 +35,15 @@ func TestSyncProducer(t *testing.T) {
require.NoError(t, err)

spans := mt.FinishedSpans()
assert.Len(t, spans, 1)
require.Len(t, spans, 1)
{
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, int64(0), s.Tag("offset"))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
Expand All @@ -72,31 +53,12 @@ func TestSyncProducer(t *testing.T) {
}

func TestSyncProducerSendMessages(t *testing.T) {
cfg := newIntegrationTestConfig(t)

mt := mocktracer.Start()
defer mt.Stop()

seedBroker := sarama.NewMockBroker(t, 1)
defer seedBroker.Close()
leader := sarama.NewMockBroker(t, 2)
defer leader.Close()

metadataResponse := new(sarama.MetadataResponse)
metadataResponse.Version = 1
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
seedBroker.Returns(metadataResponse)

prodSuccess := new(sarama.ProduceResponse)
prodSuccess.Version = 2
prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError)
leader.Returns(prodSuccess)

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0 // first version that supports headers
cfg.Producer.Return.Successes = true
cfg.Producer.Flush.Messages = 2

producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg)
producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapSyncProducer(cfg, producer, WithDataStreams())

Expand Down Expand Up @@ -135,16 +97,15 @@ func TestWrapAsyncProducer(t *testing.T) {
// the default for producers is a fire-and-forget model that doesn't return
// successes
t.Run("Without Successes", func(t *testing.T) {
cfg := newIntegrationTestConfig(t)
cfg.Producer.Return.Successes = false

mt := mocktracer.Start()
defer mt.Stop()

broker := newMockBroker(t)

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0
producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg)
producer, err := sarama.NewAsyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapAsyncProducer(nil, producer, WithDataStreams())
producer = WrapAsyncProducer(cfg, producer, WithDataStreams())

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Expand Down Expand Up @@ -177,16 +138,13 @@ func TestWrapAsyncProducer(t *testing.T) {
})

t.Run("With Successes", func(t *testing.T) {
cfg := newIntegrationTestConfig(t)
cfg.Producer.Return.Successes = true

mt := mocktracer.Start()
defer mt.Stop()

broker := newMockBroker(t)

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0
cfg.Producer.Return.Successes = true

producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg)
producer, err := sarama.NewAsyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapAsyncProducer(cfg, producer, WithDataStreams())

Expand All @@ -206,7 +164,7 @@ func TestWrapAsyncProducer(t *testing.T) {
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, int64(0), s.Tag("offset"))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
Expand Down
56 changes: 18 additions & 38 deletions contrib/IBM/sarama.v1/sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package sarama
import (
"context"
"gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest"
"os"
"testing"
"time"

Expand All @@ -20,7 +21,9 @@ import (
"github.com/stretchr/testify/require"
)

var kafkaBrokers = []string{"localhost:9092", "localhost:9093", "localhost:9094"}
var (
kafkaBrokers = []string{"localhost:9092"}
)

const (
testGroupID = "gotest_ibm_sarama"
Expand All @@ -32,38 +35,20 @@ func TestNamingSchema(t *testing.T) {
}

func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span {
cfg := newIntegrationTestConfig(t)

var opts []Option
if serviceOverride != "" {
opts = append(opts, WithServiceName(serviceOverride))
}
mt := mocktracer.Start()
defer mt.Stop()

broker := sarama.NewMockBroker(t, 1)
defer broker.Close()

broker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()).
SetLeader("test-topic", 0, broker.BrokerID()),
"OffsetRequest": sarama.NewMockOffsetResponse(t).
SetOffset("test-topic", 0, sarama.OffsetOldest, 0).
SetOffset("test-topic", 0, sarama.OffsetNewest, 1),
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")),
"ProduceRequest": sarama.NewMockProduceResponse(t).
SetError("test-topic", 0, sarama.ErrNoError),
})
cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion
cfg.Producer.Return.Successes = true
cfg.Producer.Flush.Messages = 1

producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, cfg)
producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapSyncProducer(cfg, producer, opts...)

c, err := sarama.NewConsumer([]string{broker.Addr()}, cfg)
c, err := sarama.NewConsumer(kafkaBrokers, cfg)
require.NoError(t, err)
defer func(c sarama.Consumer) {
err := c.Close()
Expand All @@ -86,28 +71,23 @@ func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span {
require.NoError(t, err)
// wait for the channel to be closed
<-pc.Messages()
waitForSpans(mt, 2)

spans := mt.FinishedSpans()
require.Len(t, spans, 2)
return spans
}

func newMockBroker(t *testing.T) *sarama.MockBroker {
broker := sarama.NewMockBroker(t, 1)

metadataResponse := new(sarama.MetadataResponse)
metadataResponse.Version = 1
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, sarama.ErrNoError)
broker.Returns(metadataResponse)

prodSuccess := new(sarama.ProduceResponse)
prodSuccess.Version = 2
prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError)
for i := 0; i < 10; i++ {
broker.Returns(prodSuccess)
func newIntegrationTestConfig(t *testing.T) *sarama.Config {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)")
}
return broker

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0 // first version that supports headers
cfg.Producer.Return.Successes = true
cfg.Producer.Flush.Messages = 1
return cfg
}

// waitForSpans polls the mock tracer until the expected number of spans
Expand Down
1 change: 1 addition & 0 deletions contrib/gofiber/fiber.v2/fiber.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func Middleware(opts ...Option) func(c *fiber.Ctx) error {
}
// Create a http.Header object so that a parent trace can be extracted. Fiber uses a non-standard header carrier
h := http.Header{}
h.Get()
for k, headers := range c.GetReqHeaders() {
for _, v := range headers {
// GetReqHeaders returns a list of headers associated with the given key.
Expand Down

0 comments on commit bc8b350

Please sign in to comment.