Skip to content

Commit

Permalink
[chore] Move obsevability for queues as a Queue wrapper (#12235)
Browse files Browse the repository at this point in the history
This PR also removes a test only hack for num workers being set to -1.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Feb 1, 2025
1 parent 51672ad commit fc9dd8f
Show file tree
Hide file tree
Showing 20 changed files with 369 additions and 331 deletions.
24 changes: 12 additions & 12 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ type BaseExporter struct {
Marshaler exporterqueue.Marshaler[internal.Request]
Unmarshaler exporterqueue.Unmarshaler[internal.Request]

Set exporter.Settings
Obsrep *ObsReport
Set exporter.Settings

// Message for the user to be added with an export failure message.
ExportFailureMessage string
Expand Down Expand Up @@ -79,8 +78,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
RetrySender: &BaseSender[internal.Request]{},
TimeoutSender: &TimeoutSender{cfg: NewDefaultTimeoutConfig()},

Set: set,
Obsrep: obsReport,
Set: set,
}

for _, op := range options {
Expand All @@ -91,14 +89,16 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
}

if be.queueCfg.Enabled {
q := be.queueFactory(
context.Background(),
exporterqueue.Settings{
Signal: signal,
ExporterSettings: be.Set,
},
be.queueCfg)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
qSet := exporterqueue.Settings{
Signal: signal,
ExporterSettings: be.Set,
}
q := be.queueFactory(context.Background(), qSet, be.queueCfg)
q, err = newObsQueue(qSet, q)
if err != nil {
return nil, err
}
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.BatcherCfg)
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

104 changes: 104 additions & 0 deletions exporter/exporterhelper/internal/obs_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/pipeline"
)

// obsQueue is a helper to add observability to a queue.
type obsQueue[T internal.Request] struct {
delegate exporterqueue.Queue[T]
tb *metadata.TelemetryBuilder
metricAttr metric.MeasurementOption
enqueueFailedInst metric.Int64Counter
}

func newObsQueue[T internal.Request](set exporterqueue.Settings, delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
tb, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings)
if err != nil {
return nil, err
}

exporterAttr := attribute.String(ExporterKey, set.ExporterSettings.ID.String())
asyncAttr := metric.WithAttributeSet(attribute.NewSet(exporterAttr, attribute.String(DataTypeKey, set.Signal.String())))
err = tb.RegisterExporterQueueSizeCallback(func(_ context.Context, o metric.Int64Observer) error {
o.Observe(delegate.Size(), asyncAttr)
return nil
})
if err != nil {
return nil, err
}

err = tb.RegisterExporterQueueCapacityCallback(func(_ context.Context, o metric.Int64Observer) error {
o.Observe(delegate.Capacity(), asyncAttr)
return nil
})
if err != nil {
return nil, err
}

or := &obsQueue[T]{
delegate: delegate,
tb: tb,
metricAttr: metric.WithAttributeSet(attribute.NewSet(exporterAttr)),
}

switch set.Signal {
case pipeline.SignalTraces:
or.enqueueFailedInst = tb.ExporterEnqueueFailedSpans
case pipeline.SignalMetrics:
or.enqueueFailedInst = tb.ExporterEnqueueFailedMetricPoints
case pipeline.SignalLogs:
or.enqueueFailedInst = tb.ExporterEnqueueFailedLogRecords
}

return or, nil
}

func (or *obsQueue[T]) Start(ctx context.Context, host component.Host) error {
return or.delegate.Start(ctx, host)
}

func (or *obsQueue[T]) Shutdown(ctx context.Context) error {
defer or.tb.Shutdown()
return or.delegate.Shutdown(ctx)
}

func (or *obsQueue[T]) Offer(ctx context.Context, req T) error {
numItems := req.ItemsCount()
err := or.delegate.Offer(ctx, req)
// No metrics recorded for profiles, remove enqueueFailedInst check with nil when profiles metrics available.
if err != nil && or.enqueueFailedInst != nil {
or.enqueueFailedInst.Add(ctx, int64(numItems), or.metricAttr)
}
return err
}

// Size returns the current Size of the queue
func (or *obsQueue[T]) Size() int64 {
return or.delegate.Size()
}

// Capacity returns the capacity of the queue.
func (or *obsQueue[T]) Capacity() int64 {
return or.delegate.Capacity()
}

// Read pulls the next available item from the queue along with its done callback. Once processing is
// finished, the done callback must be called to clean up the storage.
// The function blocks until an item is available or if the queue is stopped.
// If the queue is stopped returns false, otherwise true.
func (or *obsQueue[T]) Read(ctx context.Context) (context.Context, T, exporterqueue.DoneCallback, bool) {
return or.delegate.Read(ctx)
}
202 changes: 202 additions & 0 deletions exporter/exporterhelper/internal/obs_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/exporter/internal/requesttest"
"go.opentelemetry.io/collector/pipeline"
)

type fakeQueue[T any] struct {
component.StartFunc
component.ShutdownFunc
offerErr error
size int64
capacity int64
}

func (fq *fakeQueue[T]) Size() int64 {
return fq.size
}

func (fq *fakeQueue[T]) Capacity() int64 {
return fq.capacity
}

func (fq *fakeQueue[T]) Read(context.Context) (context.Context, T, exporterqueue.DoneCallback, bool) {
panic("implement me")
}

func (fq *fakeQueue[T]) Offer(context.Context, T) error {
return fq.offerErr
}

func newFakeQueue[T internal.Request](offerErr error, size, capacity int64) exporterqueue.Queue[T] {
return &fakeQueue[T]{offerErr: offerErr, size: size, capacity: capacity}
}

func TestObsQueueLogsSizeCapacity(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[internal.Request](exporterqueue.Settings{
Signal: pipeline.SignalLogs,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[internal.Request](nil, 7, 9))
require.NoError(t, err)
require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 2}))
metadatatest.AssertEqualExporterQueueSize(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("exporter", exporterID.String()),
attribute.String(DataTypeKey, pipeline.SignalLogs.String())),
Value: int64(7),
},
}, metricdatatest.IgnoreTimestamp())
metadatatest.AssertEqualExporterQueueCapacity(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("exporter", exporterID.String()),
attribute.String(DataTypeKey, pipeline.SignalLogs.String())),
Value: int64(9),
},
}, metricdatatest.IgnoreTimestamp())
}

func TestObsQueueLogsFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[internal.Request](exporterqueue.Settings{
Signal: pipeline.SignalLogs,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[internal.Request](errors.New("my error"), 7, 9))
require.NoError(t, err)
require.Error(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 2}))
metadatatest.AssertEqualExporterEnqueueFailedLogRecords(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("exporter", exporterID.String())),
Value: int64(2),
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
}

func TestObsQueueTracesSizeCapacity(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[internal.Request](exporterqueue.Settings{
Signal: pipeline.SignalTraces,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[internal.Request](nil, 17, 19))
require.NoError(t, err)
require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 12}))
metadatatest.AssertEqualExporterQueueSize(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("exporter", exporterID.String()),
attribute.String(DataTypeKey, pipeline.SignalTraces.String())),
Value: int64(17),
},
}, metricdatatest.IgnoreTimestamp())
metadatatest.AssertEqualExporterQueueCapacity(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("exporter", exporterID.String()),
attribute.String(DataTypeKey, pipeline.SignalTraces.String())),
Value: int64(19),
},
}, metricdatatest.IgnoreTimestamp())
}

func TestObsQueueTracesFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[internal.Request](exporterqueue.Settings{
Signal: pipeline.SignalTraces,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[internal.Request](errors.New("my error"), 0, 0))
require.NoError(t, err)
require.Error(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 12}))
metadatatest.AssertEqualExporterEnqueueFailedSpans(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("exporter", exporterID.String())),
Value: int64(12),
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
}

func TestObsQueueMetrics(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[internal.Request](exporterqueue.Settings{
Signal: pipeline.SignalMetrics,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[internal.Request](nil, 27, 29))
require.NoError(t, err)
require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 22}))
metadatatest.AssertEqualExporterQueueSize(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("exporter", exporterID.String()),
attribute.String(DataTypeKey, pipeline.SignalMetrics.String())),
Value: int64(27),
},
}, metricdatatest.IgnoreTimestamp())
metadatatest.AssertEqualExporterQueueCapacity(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("exporter", exporterID.String()),
attribute.String(DataTypeKey, pipeline.SignalMetrics.String())),
Value: int64(29),
},
}, metricdatatest.IgnoreTimestamp())
}

func TestObsQueueMetricsFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[internal.Request](exporterqueue.Settings{
Signal: pipeline.SignalMetrics,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[internal.Request](errors.New("my error"), 0, 0))
require.NoError(t, err)
require.Error(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 22}))
metadatatest.AssertEqualExporterEnqueueFailedMetricPoints(t, tt,
[]metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("exporter", exporterID.String())),
Value: int64(22),
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
}
Loading

0 comments on commit fc9dd8f

Please sign in to comment.