Skip to content

Commit

Permalink
[otelconsumer]: Add fields on logrecord attributes to support dynamic…
Browse files Browse the repository at this point in the history
… indexing (#42649)

* [otelconsumer]: Add  fields on logrecord attributes to support dynamic indexing on ES exporter

* fix flakiness of this test

* fix flakiness of this test
  • Loading branch information
khushijain21 authored Feb 10, 2025
1 parent 8122c43 commit c338f21
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
6 changes: 6 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
beatEvent["@timestamp"] = event.Content.Timestamp
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(event.Content.Timestamp))
pcommonEvent := mapstrToPcommonMap(beatEvent)
// if data_stream field is set on beats.Event. Add it to logrecord.Attributes to support dynamic indexing
if data, ok := pcommonEvent.Get("data_stream"); ok {
value := logRecord.Attributes().PutEmpty("data_stream")
data.CopyTo(value)
}
pcommonEvent.CopyTo(logRecord.Body().SetEmptyMap())

}

err := out.logsConsumer.ConsumeLogs(ctx, pLogs)
Expand Down
40 changes: 40 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -77,6 +78,45 @@ func TestPublish(t *testing.T) {
assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed")
})

t.Run("data_stream fields are set on logrecord.Attribute", func(t *testing.T) {
dataStreamField := mapstr.M{
"type": "logs",
"namespace": "not_default",
"dataset": "not_elastic_agent",
}
event1.Fields["data_stream"] = dataStreamField

batch := outest.NewBatch(event1)

var countLogs int
var attributes pcommon.Map
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
countLogs = countLogs + ld.LogRecordCount()
for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLog := ld.ResourceLogs().At(i)
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLog := resourceLog.ScopeLogs().At(j)
for k := 0; k < scopeLog.LogRecords().Len(); k++ {
LogRecord := scopeLog.LogRecords().At(k)
attributes = LogRecord.Attributes()
}
}
}
return nil
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
pcommonFields := mapstrToPcommonMap(event1.Fields)
want, ok := pcommonFields.Get("data_stream")
require.True(t, ok)
got, ok := attributes.Get("data_stream")
require.True(t, ok)
assert.EqualValues(t, want.AsRaw(), got.AsRaw())
})

t.Run("retries the batch on non-permanent consumer error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

Expand Down

0 comments on commit c338f21

Please sign in to comment.