diff --git a/libbeat/outputs/otelconsumer/otelconsumer.go b/libbeat/outputs/otelconsumer/otelconsumer.go index 2e34d1f60c31..7ccbec6cf402 100644 --- a/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/libbeat/outputs/otelconsumer/otelconsumer.go @@ -117,7 +117,13 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) beatEvent["@timestamp"] = event.Content.Timestamp logRecord.SetTimestamp(pcommon.NewTimestampFromTime(event.Content.Timestamp)) pcommonEvent := mapstrToPcommonMap(beatEvent) + // if data_stream field is set on beats.Event. Add it to logrecord.Attributes to support dynamic indexing + if data, ok := pcommonEvent.Get("data_stream"); ok { + value := logRecord.Attributes().PutEmpty("data_stream") + data.CopyTo(value) + } pcommonEvent.CopyTo(logRecord.Body().SetEmptyMap()) + } err := out.logsConsumer.ConsumeLogs(ctx, pLogs) diff --git a/libbeat/outputs/otelconsumer/otelconsumer_test.go b/libbeat/outputs/otelconsumer/otelconsumer_test.go index bcb7253023c8..ffc8b7b3dd95 100644 --- a/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/pcommon" @@ -77,6 +78,45 @@ func TestPublish(t *testing.T) { assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed") }) + t.Run("data_stream fields are set on logrecord.Attribute", func(t *testing.T) { + dataStreamField := mapstr.M{ + "type": "logs", + "namespace": "not_default", + "dataset": "not_elastic_agent", + } + event1.Fields["data_stream"] = dataStreamField + + batch := outest.NewBatch(event1) + + var countLogs int + var attributes pcommon.Map + otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { + countLogs = countLogs + ld.LogRecordCount() + for i := 0; i < ld.ResourceLogs().Len(); i++ { + resourceLog := ld.ResourceLogs().At(i) + for j := 0; j < resourceLog.ScopeLogs().Len(); j++ { + scopeLog := resourceLog.ScopeLogs().At(j) + for k := 0; k < scopeLog.LogRecords().Len(); k++ { + LogRecord := scopeLog.LogRecords().At(k) + attributes = LogRecord.Attributes() + } + } + } + return nil + }) + + err := otelConsumer.Publish(ctx, batch) + assert.NoError(t, err) + assert.Len(t, batch.Signals, 1) + assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) + pcommonFields := mapstrToPcommonMap(event1.Fields) + want, ok := pcommonFields.Get("data_stream") + require.True(t, ok) + got, ok := attributes.Get("data_stream") + require.True(t, ok) + assert.EqualValues(t, want.AsRaw(), got.AsRaw()) + }) + t.Run("retries the batch on non-permanent consumer error", func(t *testing.T) { batch := outest.NewBatch(event1, event2, event3)