diff --git a/contrib/IBM/sarama.v1/consumer_group_test.go b/contrib/IBM/sarama.v1/consumer_group_test.go index 17609f2629..2678ce99da 100644 --- a/contrib/IBM/sarama.v1/consumer_group_test.go +++ b/contrib/IBM/sarama.v1/consumer_group_test.go @@ -8,7 +8,6 @@ package sarama import ( "context" "log" - "os" "sync" "testing" @@ -21,18 +20,11 @@ import ( ) func TestWrapConsumerGroupHandler(t *testing.T) { - if _, ok := os.LookupEnv("INTEGRATION"); !ok { - t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)") - } + cfg := newIntegrationTestConfig(t) mt := mocktracer.Start() defer mt.Stop() - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 // first version that supports headers - cfg.Producer.Return.Successes = true - cfg.Producer.Flush.Messages = 1 - cg, err := sarama.NewConsumerGroup(kafkaBrokers, testGroupID, cfg) require.NoError(t, err) defer func() { diff --git a/contrib/IBM/sarama.v1/consumer_test.go b/contrib/IBM/sarama.v1/consumer_test.go index 8b373b2555..f36c045956 100644 --- a/contrib/IBM/sarama.v1/consumer_test.go +++ b/contrib/IBM/sarama.v1/consumer_test.go @@ -18,27 +18,13 @@ import ( ) func TestWrapConsumer(t *testing.T) { + cfg := newIntegrationTestConfig(t) + cfg.Version = sarama.MinVersion + mt := mocktracer.Start() defer mt.Stop() - broker := sarama.NewMockBroker(t, 0) - defer broker.Close() - - broker.SetHandlerByMap(map[string]sarama.MockResponse{ - "MetadataRequest": sarama.NewMockMetadataResponse(t). - SetBroker(broker.Addr(), broker.BrokerID()). - SetLeader("test-topic", 0, broker.BrokerID()), - "OffsetRequest": sarama.NewMockOffsetResponse(t). - SetOffset("test-topic", 0, sarama.OffsetOldest, 0). - SetOffset("test-topic", 0, sarama.OffsetNewest, 1), - "FetchRequest": sarama.NewMockFetchResponse(t, 1). - SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")). - SetMessage("test-topic", 0, 1, sarama.StringEncoder("world")), - }) - cfg := sarama.NewConfig() - cfg.Version = sarama.MinVersion - - client, err := sarama.NewClient([]string{broker.Addr()}, cfg) + client, err := sarama.NewClient(kafkaBrokers, cfg) require.NoError(t, err) defer client.Close() @@ -56,6 +42,7 @@ func TestWrapConsumer(t *testing.T) { require.NoError(t, err) // wait for the channel to be closed <-partitionConsumer.Messages() + waitForSpans(mt, 2) spans := mt.FinishedSpans() require.Len(t, spans, 2) @@ -67,7 +54,7 @@ func TestWrapConsumer(t *testing.T) { "span context should be injected into the consumer message headers") assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) + assert.NotNil(t, s.Tag("offset")) assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) @@ -86,7 +73,7 @@ func TestWrapConsumer(t *testing.T) { "span context should be injected into the consumer message headers") assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(1), s.Tag("offset")) + assert.NotNil(t, s.Tag("offset")) assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) diff --git a/contrib/IBM/sarama.v1/producer_test.go b/contrib/IBM/sarama.v1/producer_test.go index 92308a9eb5..e6099b38db 100644 --- a/contrib/IBM/sarama.v1/producer_test.go +++ b/contrib/IBM/sarama.v1/producer_test.go @@ -17,31 +17,12 @@ import ( ) func TestSyncProducer(t *testing.T) { + cfg := newIntegrationTestConfig(t) + mt := mocktracer.Start() defer mt.Stop() - seedBroker := sarama.NewMockBroker(t, 1) - defer seedBroker.Close() - - leader := sarama.NewMockBroker(t, 2) - defer leader.Close() - - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.Version = 1 - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - seedBroker.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.Version = 2 - prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) - leader.Returns(prodSuccess) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 // first version that supports headers - cfg.Producer.Return.Successes = true - - producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) + producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg) require.NoError(t, err) producer = WrapSyncProducer(cfg, producer, WithDataStreams()) @@ -54,7 +35,7 @@ func TestSyncProducer(t *testing.T) { require.NoError(t, err) spans := mt.FinishedSpans() - assert.Len(t, spans, 1) + require.Len(t, spans, 1) { s := spans[0] assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) @@ -62,7 +43,7 @@ func TestSyncProducer(t *testing.T) { assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) assert.Equal(t, "kafka.produce", s.OperationName()) assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) + assert.NotNil(t, s.Tag("offset")) assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) @@ -72,31 +53,12 @@ func TestSyncProducer(t *testing.T) { } func TestSyncProducerSendMessages(t *testing.T) { + cfg := newIntegrationTestConfig(t) + mt := mocktracer.Start() defer mt.Stop() - seedBroker := sarama.NewMockBroker(t, 1) - defer seedBroker.Close() - leader := sarama.NewMockBroker(t, 2) - defer leader.Close() - - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.Version = 1 - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - seedBroker.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.Version = 2 - prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) - leader.Returns(prodSuccess) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 // first version that supports headers - cfg.Producer.Return.Successes = true - cfg.Producer.Flush.Messages = 2 - - producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) + producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg) require.NoError(t, err) producer = WrapSyncProducer(cfg, producer, WithDataStreams()) @@ -135,16 +97,15 @@ func TestWrapAsyncProducer(t *testing.T) { // the default for producers is a fire-and-forget model that doesn't return // successes t.Run("Without Successes", func(t *testing.T) { + cfg := newIntegrationTestConfig(t) + cfg.Producer.Return.Successes = false + mt := mocktracer.Start() defer mt.Stop() - broker := newMockBroker(t) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 - producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) + producer, err := sarama.NewAsyncProducer(kafkaBrokers, cfg) require.NoError(t, err) - producer = WrapAsyncProducer(nil, producer, WithDataStreams()) + producer = WrapAsyncProducer(cfg, producer, WithDataStreams()) msg1 := &sarama.ProducerMessage{ Topic: "my_topic", @@ -177,16 +138,13 @@ func TestWrapAsyncProducer(t *testing.T) { }) t.Run("With Successes", func(t *testing.T) { + cfg := newIntegrationTestConfig(t) + cfg.Producer.Return.Successes = true + mt := mocktracer.Start() defer mt.Stop() - broker := newMockBroker(t) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 - cfg.Producer.Return.Successes = true - - producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) + producer, err := sarama.NewAsyncProducer(kafkaBrokers, cfg) require.NoError(t, err) producer = WrapAsyncProducer(cfg, producer, WithDataStreams()) @@ -206,7 +164,7 @@ func TestWrapAsyncProducer(t *testing.T) { assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) assert.Equal(t, "kafka.produce", s.OperationName()) assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) + assert.NotNil(t, s.Tag("offset")) assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) diff --git a/contrib/IBM/sarama.v1/sarama_test.go b/contrib/IBM/sarama.v1/sarama_test.go index e9205ebe0b..aaf5fd01f0 100644 --- a/contrib/IBM/sarama.v1/sarama_test.go +++ b/contrib/IBM/sarama.v1/sarama_test.go @@ -8,6 +8,7 @@ package sarama import ( "context" "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" + "os" "testing" "time" @@ -20,7 +21,9 @@ import ( "github.com/stretchr/testify/require" ) -var kafkaBrokers = []string{"localhost:9092", "localhost:9093", "localhost:9094"} +var ( + kafkaBrokers = []string{"localhost:9092"} +) const ( testGroupID = "gotest_ibm_sarama" @@ -32,6 +35,8 @@ func TestNamingSchema(t *testing.T) { } func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span { + cfg := newIntegrationTestConfig(t) + var opts []Option if serviceOverride != "" { opts = append(opts, WithServiceName(serviceOverride)) @@ -39,31 +44,11 @@ func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span { mt := mocktracer.Start() defer mt.Stop() - broker := sarama.NewMockBroker(t, 1) - defer broker.Close() - - broker.SetHandlerByMap(map[string]sarama.MockResponse{ - "MetadataRequest": sarama.NewMockMetadataResponse(t). - SetBroker(broker.Addr(), broker.BrokerID()). - SetLeader("test-topic", 0, broker.BrokerID()), - "OffsetRequest": sarama.NewMockOffsetResponse(t). - SetOffset("test-topic", 0, sarama.OffsetOldest, 0). - SetOffset("test-topic", 0, sarama.OffsetNewest, 1), - "FetchRequest": sarama.NewMockFetchResponse(t, 1). - SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")), - "ProduceRequest": sarama.NewMockProduceResponse(t). - SetError("test-topic", 0, sarama.ErrNoError), - }) - cfg := sarama.NewConfig() - cfg.Version = sarama.MinVersion - cfg.Producer.Return.Successes = true - cfg.Producer.Flush.Messages = 1 - - producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, cfg) + producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg) require.NoError(t, err) producer = WrapSyncProducer(cfg, producer, opts...) - c, err := sarama.NewConsumer([]string{broker.Addr()}, cfg) + c, err := sarama.NewConsumer(kafkaBrokers, cfg) require.NoError(t, err) defer func(c sarama.Consumer) { err := c.Close() @@ -86,28 +71,23 @@ func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span { require.NoError(t, err) // wait for the channel to be closed <-pc.Messages() + waitForSpans(mt, 2) spans := mt.FinishedSpans() require.Len(t, spans, 2) return spans } -func newMockBroker(t *testing.T) *sarama.MockBroker { - broker := sarama.NewMockBroker(t, 1) - - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.Version = 1 - metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, sarama.ErrNoError) - broker.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.Version = 2 - prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) - for i := 0; i < 10; i++ { - broker.Returns(prodSuccess) +func newIntegrationTestConfig(t *testing.T) *sarama.Config { + if _, ok := os.LookupEnv("INTEGRATION"); !ok { + t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)") } - return broker + + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 // first version that supports headers + cfg.Producer.Return.Successes = true + cfg.Producer.Flush.Messages = 1 + return cfg } // waitForSpans polls the mock tracer until the expected number of spans diff --git a/contrib/gofiber/fiber.v2/fiber.go b/contrib/gofiber/fiber.v2/fiber.go index 93a454417d..3cd6530968 100644 --- a/contrib/gofiber/fiber.v2/fiber.go +++ b/contrib/gofiber/fiber.v2/fiber.go @@ -53,6 +53,7 @@ func Middleware(opts ...Option) func(c *fiber.Ctx) error { } // Create a http.Header object so that a parent trace can be extracted. Fiber uses a non-standard header carrier h := http.Header{} + h.Get() for k, headers := range c.GetReqHeaders() { for _, v := range headers { // GetReqHeaders returns a list of headers associated with the given key.