diff --git a/contrib/IBM/sarama.v1/option.go b/contrib/IBM/sarama.v1/option.go index 124ed9345f..e410e0c1f9 100644 --- a/contrib/IBM/sarama.v1/option.go +++ b/contrib/IBM/sarama.v1/option.go @@ -20,6 +20,8 @@ type config struct { consumerSpanName string producerSpanName string analyticsRate float64 + dataStreamsEnabled bool + groupID string } func defaults(cfg *config) { @@ -29,6 +31,8 @@ func defaults(cfg *config) { cfg.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound) cfg.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound) + cfg.dataStreamsEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false) + // cfg.analyticsRate = globalconfig.AnalyticsRate() if internal.BoolEnv("DD_TRACE_SARAMA_ANALYTICS_ENABLED", false) { cfg.analyticsRate = 1.0 @@ -48,6 +52,20 @@ func WithServiceName(name string) Option { } } +// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/ +func WithDataStreams() Option { + return func(cfg *config) { + cfg.dataStreamsEnabled = true + } +} + +// WithGroupID tags the produced data streams metrics with the given groupID (aka consumer group) +func WithGroupID(groupID string) Option { + return func(cfg *config) { + cfg.groupID = groupID + } +} + // WithAnalytics enables Trace Analytics for all started spans. func WithAnalytics(on bool) Option { return func(cfg *config) { diff --git a/contrib/IBM/sarama.v1/option_test.go b/contrib/IBM/sarama.v1/option_test.go new file mode 100644 index 0000000000..e97a7249ab --- /dev/null +++ b/contrib/IBM/sarama.v1/option_test.go @@ -0,0 +1,39 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDataStreamsActivation(t *testing.T) { + t.Run("default", func(t *testing.T) { + cfg := new(config) + defaults(cfg) + assert.False(t, cfg.dataStreamsEnabled) + }) + t.Run("withOption", func(t *testing.T) { + cfg := new(config) + defaults(cfg) + WithDataStreams()(cfg) + assert.True(t, cfg.dataStreamsEnabled) + }) + t.Run("withEnv", func(t *testing.T) { + t.Setenv("DD_DATA_STREAMS_ENABLED", "true") + cfg := new(config) + defaults(cfg) + assert.True(t, cfg.dataStreamsEnabled) + }) + t.Run("optionOverridesEnv", func(t *testing.T) { + t.Setenv("DD_DATA_STREAMS_ENABLED", "false") + cfg := new(config) + defaults(cfg) + WithDataStreams()(cfg) + assert.True(t, cfg.dataStreamsEnabled) + }) +} diff --git a/contrib/IBM/sarama.v1/sarama.go b/contrib/IBM/sarama.v1/sarama.go index 946a61ef70..7d9f86701e 100644 --- a/contrib/IBM/sarama.v1/sarama.go +++ b/contrib/IBM/sarama.v1/sarama.go @@ -7,8 +7,11 @@ package sarama // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama" import ( + "context" "math" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -76,6 +79,7 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P next := tracer.StartSpan(cfg.consumerSpanName, opts...) // reinject the span context so consumers can pick it up tracer.Inject(next.Context(), carrier) + setConsumeCheckpoint(cfg.dataStreamsEnabled, cfg.groupID, msg) wrapped.messages <- msg @@ -127,8 +131,12 @@ type syncProducer struct { // SendMessage calls sarama.SyncProducer.SendMessage and traces the request. func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { span := startProducerSpan(p.cfg, p.version, msg) + setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version) partition, offset, err = p.SyncProducer.SendMessage(msg) finishProducerSpan(span, partition, offset, err) + if err == nil && p.cfg.dataStreamsEnabled { + tracer.TrackKafkaProduceOffset(msg.Topic, partition, offset) + } return partition, offset, err } @@ -138,12 +146,19 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { // treated individually, so we create a span for each one spans := make([]ddtrace.Span, len(msgs)) for i, msg := range msgs { + setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version) spans[i] = startProducerSpan(p.cfg, p.version, msg) } err := p.SyncProducer.SendMessages(msgs) for i, span := range spans { finishProducerSpan(span, msgs[i].Partition, msgs[i].Offset, err) } + if err == nil && p.cfg.dataStreamsEnabled { + // we only track Kafka lag if messages have been sent successfully. Otherwise, we have no way to know to which partition data was sent to. + for _, msg := range msgs { + tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset) + } + } return err } @@ -221,6 +236,7 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts select { case msg := <-wrapped.input: span := startProducerSpan(cfg, saramaConfig.Version, msg) + setProduceCheckpoint(cfg.dataStreamsEnabled, msg, saramaConfig.Version) p.Input() <- msg if saramaConfig.Producer.Return.Successes { spanID := span.Context().SpanID() @@ -236,6 +252,10 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts // producer was closed, so exit return } + if cfg.dataStreamsEnabled { + // we only track Kafka lag if returning successes is enabled. Otherwise, we have no way to know to which partition data was sent to. + tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset) + } if spanctx, spanFound := getSpanContext(msg); spanFound { spanID := spanctx.SpanID() if span, ok := spans[spanID]; ok { @@ -303,3 +323,57 @@ func getSpanContext(msg *sarama.ProducerMessage) (ddtrace.SpanContext, bool) { return spanctx, true } + +func setProduceCheckpoint(enabled bool, msg *sarama.ProducerMessage, version sarama.KafkaVersion) { + if !enabled || msg == nil { + return + } + edges := []string{"direction:out", "topic:" + msg.Topic, "type:kafka"} + carrier := NewProducerMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, edges...) + if !ok || !version.IsAtLeast(sarama.V0_11_0_0) { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func setConsumeCheckpoint(enabled bool, groupID string, msg *sarama.ConsumerMessage) { + if !enabled || msg == nil { + return + } + edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"} + if groupID != "" { + edges = append(edges, "group:"+groupID) + } + carrier := NewConsumerMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, edges...) + if !ok { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) + if groupID != "" { + // only track Kafka lag if a consumer group is set. + // since there is no ack mechanism, we consider that messages read are committed right away. + tracer.TrackKafkaCommitOffset(groupID, msg.Topic, msg.Partition, msg.Offset) + } +} + +func getProducerMsgSize(msg *sarama.ProducerMessage) (size int64) { + for _, header := range msg.Headers { + size += int64(len(header.Key) + len(header.Value)) + } + if msg.Value != nil { + size += int64(msg.Value.Length()) + } + if msg.Key != nil { + size += int64(msg.Key.Length()) + } + return size +} + +func getConsumerMsgSize(msg *sarama.ConsumerMessage) (size int64) { + for _, header := range msg.Headers { + size += int64(len(header.Key) + len(header.Value)) + } + return size + int64(len(msg.Value)+len(msg.Key)) +} diff --git a/contrib/IBM/sarama.v1/sarama_test.go b/contrib/IBM/sarama.v1/sarama_test.go index f759640410..277da155af 100644 --- a/contrib/IBM/sarama.v1/sarama_test.go +++ b/contrib/IBM/sarama.v1/sarama_test.go @@ -11,6 +11,7 @@ import ( "time" "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -69,9 +70,7 @@ func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span { require.NoError(t, err) pc, err := c.ConsumePartition("test-topic", 0, 0) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _ = <-pc.Messages() err = pc.Close() require.NoError(t, err) @@ -103,32 +102,28 @@ func TestConsumer(t *testing.T) { }) cfg := sarama.NewConfig() cfg.Version = sarama.MinVersion + client, err := sarama.NewClient([]string{broker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer client.Close() consumer, err := sarama.NewConsumerFromClient(client) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer consumer.Close() - consumer = WrapConsumer(consumer) + consumer = WrapConsumer(consumer, WithDataStreams()) partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) msg1 := <-partitionConsumer.Messages() msg2 := <-partitionConsumer.Messages() - partitionConsumer.Close() + err = partitionConsumer.Close() + require.NoError(t, err) // wait for the channel to be closed <-partitionConsumer.Messages() spans := mt.FinishedSpans() - assert.Len(t, spans, 2) + require.Len(t, spans, 2) { s := spans[0] spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg1)) @@ -145,6 +140,13 @@ func TestConsumer(t *testing.T) { assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg1))) + require.True(t, ok, "pathway not found in context") + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) } { s := spans[1] @@ -162,6 +164,13 @@ func TestConsumer(t *testing.T) { assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg2))) + require.True(t, ok, "pathway not found in context") + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) } } @@ -176,30 +185,31 @@ func TestSyncProducer(t *testing.T) { defer leader.Close() metadataResponse := new(sarama.MetadataResponse) + metadataResponse.Version = 1 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) seedBroker.Returns(metadataResponse) prodSuccess := new(sarama.ProduceResponse) + prodSuccess.Version = 2 prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) leader.Returns(prodSuccess) cfg := sarama.NewConfig() - cfg.Version = sarama.MinVersion + cfg.Version = sarama.V0_11_0_0 // first version that supports headers cfg.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } - producer = WrapSyncProducer(cfg, producer) + require.NoError(t, err) + producer = WrapSyncProducer(cfg, producer, WithDataStreams()) msg1 := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("test 1"), Metadata: "test", } - producer.SendMessage(msg1) + _, _, err = producer.SendMessage(msg1) + require.NoError(t, err) spans := mt.FinishedSpans() assert.Len(t, spans, 1) @@ -214,6 +224,13 @@ func TestSyncProducer(t *testing.T) { assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1))) + require.True(t, ok, "pathway not found in context") + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) } } @@ -227,24 +244,24 @@ func TestSyncProducerSendMessages(t *testing.T) { defer leader.Close() metadataResponse := new(sarama.MetadataResponse) + metadataResponse.Version = 1 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) seedBroker.Returns(metadataResponse) prodSuccess := new(sarama.ProduceResponse) + prodSuccess.Version = 2 prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) leader.Returns(prodSuccess) cfg := sarama.NewConfig() - cfg.Version = sarama.MinVersion + cfg.Version = sarama.V0_11_0_0 // first version that supports headers cfg.Producer.Return.Successes = true cfg.Producer.Flush.Messages = 2 producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } - producer = WrapSyncProducer(cfg, producer) + require.NoError(t, err) + producer = WrapSyncProducer(cfg, producer, WithDataStreams()) msg1 := &sarama.ProducerMessage{ Topic: "my_topic", @@ -256,9 +273,11 @@ func TestSyncProducerSendMessages(t *testing.T) { Value: sarama.StringEncoder("test 2"), Metadata: "test", } - producer.SendMessages([]*sarama.ProducerMessage{msg1, msg2}) + err = producer.SendMessages([]*sarama.ProducerMessage{msg1, msg2}) + require.NoError(t, err) + spans := mt.FinishedSpans() - assert.Len(t, spans, 2) + require.Len(t, spans, 2) for _, s := range spans { assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) @@ -269,14 +288,23 @@ func TestSyncProducerSendMessages(t *testing.T) { assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) } + + for _, msg := range []*sarama.ProducerMessage{msg1, msg2} { + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg))) + if !assert.True(t, ok, "pathway not found in context") { + continue + } + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) + } } func TestAsyncProducer(t *testing.T) { // the default for producers is a fire-and-forget model that doesn't return // successes t.Run("Without Successes", func(t *testing.T) { - t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " + - "https://github.com/IBM/sarama/issues/1665") mt := mocktracer.Start() defer mt.Stop() @@ -285,10 +313,8 @@ func TestAsyncProducer(t *testing.T) { cfg := sarama.NewConfig() cfg.Version = sarama.V0_11_0_0 producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } - producer = WrapAsyncProducer(nil, producer) + require.NoError(t, err) + producer = WrapAsyncProducer(nil, producer, WithDataStreams()) msg1 := &sarama.ProducerMessage{ Topic: "my_topic", @@ -299,24 +325,33 @@ func TestAsyncProducer(t *testing.T) { waitForSpans(mt, 1) spans := mt.FinishedSpans() - assert.Len(t, spans, 1) + require.Len(t, spans, 1) { s := spans[0] assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) assert.Equal(t, "kafka.produce", s.OperationName()) - assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) + + // these tags are set in the finishProducerSpan function, but in this case it's never used, and instead we + // automatically finish spans after being started because we don't have a way to know when they are finished. + assert.Nil(t, s.Tag(ext.MessagingKafkaPartition)) + assert.Nil(t, s.Tag("offset")) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1))) + require.True(t, ok, "pathway not found in context") + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) } }) t.Run("With Successes", func(t *testing.T) { - t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " + - "https://github.com/IBM/sarama/issues/1665") mt := mocktracer.Start() defer mt.Stop() @@ -327,10 +362,8 @@ func TestAsyncProducer(t *testing.T) { cfg.Producer.Return.Successes = true producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } - producer = WrapAsyncProducer(cfg, producer) + require.NoError(t, err) + producer = WrapAsyncProducer(cfg, producer, WithDataStreams()) msg1 := &sarama.ProducerMessage{ Topic: "my_topic", @@ -340,7 +373,7 @@ func TestAsyncProducer(t *testing.T) { <-producer.Successes() spans := mt.FinishedSpans() - assert.Len(t, spans, 1) + require.Len(t, spans, 1) { s := spans[0] assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) @@ -352,6 +385,13 @@ func TestAsyncProducer(t *testing.T) { assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1))) + require.True(t, ok, "pathway not found in context") + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) } }) } @@ -364,11 +404,13 @@ func newMockBroker(t *testing.T) *sarama.MockBroker { broker := sarama.NewMockBroker(t, 1) metadataResponse := new(sarama.MetadataResponse) + metadataResponse.Version = 1 metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, sarama.ErrNoError) broker.Returns(metadataResponse) prodSuccess := new(sarama.ProduceResponse) + prodSuccess.Version = 2 prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) for i := 0; i < 10; i++ { broker.Returns(prodSuccess) diff --git a/contrib/Shopify/sarama/sarama.go b/contrib/Shopify/sarama/sarama.go index f1e5b9fee1..5b96be10a7 100644 --- a/contrib/Shopify/sarama/sarama.go +++ b/contrib/Shopify/sarama/sarama.go @@ -4,6 +4,9 @@ // Copyright 2016 Datadog, Inc. // Package sarama provides functions to trace the Shopify/sarama package (https://github.com/Shopify/sarama). +// +// Deprecated: github.com/Shopify/sarama is no longer maintained. Please migrate to github.com/IBM/sarama +// and use the corresponding instrumentation. package sarama // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama" import ( diff --git a/contrib/Shopify/sarama/sarama_test.go b/contrib/Shopify/sarama/sarama_test.go index 4237f6e764..0cd54c045d 100644 --- a/contrib/Shopify/sarama/sarama_test.go +++ b/contrib/Shopify/sarama/sarama_test.go @@ -70,9 +70,7 @@ func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span { require.NoError(t, err) pc, err := c.ConsumePartition("test-topic", 0, 0) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _ = <-pc.Messages() err = pc.Close() require.NoError(t, err) @@ -104,32 +102,28 @@ func TestConsumer(t *testing.T) { }) cfg := sarama.NewConfig() cfg.Version = sarama.MinVersion + client, err := sarama.NewClient([]string{broker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer client.Close() consumer, err := sarama.NewConsumerFromClient(client) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer consumer.Close() consumer = WrapConsumer(consumer, WithDataStreams()) partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) msg1 := <-partitionConsumer.Messages() msg2 := <-partitionConsumer.Messages() - partitionConsumer.Close() + err = partitionConsumer.Close() + require.NoError(t, err) // wait for the channel to be closed <-partitionConsumer.Messages() spans := mt.FinishedSpans() - assert.Len(t, spans, 2) + require.Len(t, spans, 2) { s := spans[0] spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg1)) @@ -146,8 +140,9 @@ func TestConsumer(t *testing.T) { assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg1))) - assert.True(t, ok) + require.True(t, ok, "pathway not found in context") expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka") expected, _ := datastreams.PathwayFromContext(expectedCtx) assert.NotEqual(t, expected.GetHash(), 0) @@ -169,8 +164,9 @@ func TestConsumer(t *testing.T) { assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg1))) - assert.True(t, ok) + + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg2))) + require.True(t, ok, "pathway not found in context") expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka") expected, _ := datastreams.PathwayFromContext(expectedCtx) assert.NotEqual(t, expected.GetHash(), 0) @@ -204,9 +200,7 @@ func TestSyncProducer(t *testing.T) { cfg.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) producer = WrapSyncProducer(cfg, producer, WithDataStreams()) msg1 := &sarama.ProducerMessage{ @@ -214,7 +208,8 @@ func TestSyncProducer(t *testing.T) { Value: sarama.StringEncoder("test 1"), Metadata: "test", } - producer.SendMessage(msg1) + _, _, err = producer.SendMessage(msg1) + require.NoError(t, err) spans := mt.FinishedSpans() assert.Len(t, spans, 1) @@ -229,8 +224,9 @@ func TestSyncProducer(t *testing.T) { assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1))) - assert.True(t, ok) + require.True(t, ok, "pathway not found in context") expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") expected, _ := datastreams.PathwayFromContext(expectedCtx) assert.NotEqual(t, expected.GetHash(), 0) @@ -248,24 +244,24 @@ func TestSyncProducerSendMessages(t *testing.T) { defer leader.Close() metadataResponse := new(sarama.MetadataResponse) + metadataResponse.Version = 1 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) seedBroker.Returns(metadataResponse) prodSuccess := new(sarama.ProduceResponse) + prodSuccess.Version = 2 prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) leader.Returns(prodSuccess) cfg := sarama.NewConfig() - cfg.Version = sarama.MinVersion + cfg.Version = sarama.V0_11_0_0 // first version that supports headers cfg.Producer.Return.Successes = true cfg.Producer.Flush.Messages = 2 producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } - producer = WrapSyncProducer(cfg, producer) + require.NoError(t, err) + producer = WrapSyncProducer(cfg, producer, WithDataStreams()) msg1 := &sarama.ProducerMessage{ Topic: "my_topic", @@ -277,9 +273,11 @@ func TestSyncProducerSendMessages(t *testing.T) { Value: sarama.StringEncoder("test 2"), Metadata: "test", } - producer.SendMessages([]*sarama.ProducerMessage{msg1, msg2}) + err = producer.SendMessages([]*sarama.ProducerMessage{msg1, msg2}) + require.NoError(t, err) + spans := mt.FinishedSpans() - assert.Len(t, spans, 2) + require.Len(t, spans, 2) for _, s := range spans { assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) @@ -290,14 +288,23 @@ func TestSyncProducerSendMessages(t *testing.T) { assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) } + + for _, msg := range []*sarama.ProducerMessage{msg1, msg2} { + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg))) + if !assert.True(t, ok, "pathway not found in context") { + continue + } + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) + } } func TestAsyncProducer(t *testing.T) { // the default for producers is a fire-and-forget model that doesn't return // successes t.Run("Without Successes", func(t *testing.T) { - t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " + - "https://github.com/Shopify/sarama/issues/1665") mt := mocktracer.Start() defer mt.Stop() @@ -306,10 +313,8 @@ func TestAsyncProducer(t *testing.T) { cfg := sarama.NewConfig() cfg.Version = sarama.V0_11_0_0 producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } - producer = WrapAsyncProducer(nil, producer) + require.NoError(t, err) + producer = WrapAsyncProducer(nil, producer, WithDataStreams()) msg1 := &sarama.ProducerMessage{ Topic: "my_topic", @@ -320,24 +325,33 @@ func TestAsyncProducer(t *testing.T) { waitForSpans(mt, 1) spans := mt.FinishedSpans() - assert.Len(t, spans, 1) + require.Len(t, spans, 1) { s := spans[0] assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) assert.Equal(t, "kafka.produce", s.OperationName()) - assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) + + // these tags are set in the finishProducerSpan function, but in this case it's never used, and instead we + // automatically finish spans after being started because we don't have a way to know when they are finished. + assert.Nil(t, s.Tag(ext.MessagingKafkaPartition)) + assert.Nil(t, s.Tag("offset")) + assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1))) + require.True(t, ok, "pathway not found in context") + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) } }) t.Run("With Successes", func(t *testing.T) { - t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " + - "https://github.com/Shopify/sarama/issues/1665") mt := mocktracer.Start() defer mt.Stop() @@ -348,10 +362,8 @@ func TestAsyncProducer(t *testing.T) { cfg.Producer.Return.Successes = true producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } - producer = WrapAsyncProducer(cfg, producer) + require.NoError(t, err) + producer = WrapAsyncProducer(cfg, producer, WithDataStreams()) msg1 := &sarama.ProducerMessage{ Topic: "my_topic", @@ -361,7 +373,7 @@ func TestAsyncProducer(t *testing.T) { <-producer.Successes() spans := mt.FinishedSpans() - assert.Len(t, spans, 1) + require.Len(t, spans, 1) { s := spans[0] assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) @@ -373,6 +385,13 @@ func TestAsyncProducer(t *testing.T) { assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1))) + require.True(t, ok, "pathway not found in context") + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) } }) } @@ -385,11 +404,13 @@ func newMockBroker(t *testing.T) *sarama.MockBroker { broker := sarama.NewMockBroker(t, 1) metadataResponse := new(sarama.MetadataResponse) + metadataResponse.Version = 1 metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, sarama.ErrNoError) broker.Returns(metadataResponse) prodSuccess := new(sarama.ProduceResponse) + prodSuccess.Version = 2 prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) for i := 0; i < 10; i++ { broker.Returns(prodSuccess)