From eb4f6400d7ca9f36dd4bca9d9d642d0168893766 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Sat, 1 Feb 2025 20:44:57 -0800 Subject: [PATCH] [chore] Refactor RetrySender tests to be real unit-tests (#12240) Signed-off-by: Bogdan Drutu --- .../internal/base_exporter_test.go | 8 +- .../internal/queue_sender_test.go | 16 +- .../exporterhelper/internal/retry_sender.go | 2 +- .../internal/retry_sender_test.go | 491 +++++------------- exporter/internal/requesttest/fake_request.go | 37 +- 5 files changed, 172 insertions(+), 382 deletions(-) diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index 11dc47b4d00..d71202ac3db 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -39,6 +39,12 @@ type noopSender struct { SendFunc[internal.Request] } +func newNoopExportSender() Sender[internal.Request] { + return &noopSender{SendFunc: func(ctx context.Context, req internal.Request) error { + return req.Export(ctx) + }} +} + func newNoopObsrepSender(_ *ObsReport, next Sender[internal.Request]) Sender[internal.Request] { return &noopSender{SendFunc: next.Send} } @@ -112,7 +118,7 @@ func TestBaseExporterLogging(t *testing.T) { rCfg.Enabled = false bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, WithRetry(rCfg)) require.NoError(t, err) - sendErr := bs.Send(context.Background(), newErrorRequest()) + sendErr := bs.Send(context.Background(), newErrorRequest(errors.New("my error"))) require.Error(t, sendErr) require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1) diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 8bc9a080619..fae3d8256d8 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -40,7 +40,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { ocs := be.ObsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - firstMockR := newErrorRequest() + firstMockR := newErrorRequest(errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. require.NoError(t, be.Send(context.Background(), firstMockR)) @@ -57,7 +57,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) - secondMockR.checkNumRequests(t, 1) + secondMockR.checkOneRequests(t) ocs.checkSendItemsCount(t, 3) ocs.checkDroppedItemsCount(t, 7) require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) @@ -94,7 +94,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { }) ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) + mockR.checkOneRequests(t) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) @@ -228,7 +228,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { require.Len(t, reqs, wantRequests) for _, req := range reqs { - req.checkNumRequests(t, 1) + req.checkOneRequests(t) } ocs.checkSendItemsCount(t, 2*wantRequests) @@ -340,7 +340,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { assert.Len(t, observed.All(), 1) assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message) ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) + mockR.checkOneRequests(t) ocs.checkSendItemsCount(t, 0) ocs.checkDroppedItemsCount(t, 2) require.NoError(t, be.Shutdown(context.Background())) @@ -366,7 +366,7 @@ func TestQueueFailedRequestDropped(t *testing.T) { mockR := newMockRequest(2, errors.New("some error")) require.NoError(t, be.Send(context.Background(), mockR)) require.NoError(t, be.Shutdown(context.Background())) - mockR.checkNumRequests(t, 1) + mockR.checkOneRequests(t) assert.Len(t, observed.All(), 1) assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message) }) @@ -454,7 +454,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered - mockReq := newErrorRequest() + mockReq := newErrorRequest(errors.New("transient error")) be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(mockReq)), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -489,7 +489,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { }) // wait for the item to be consumed from the queue - replacedReq.checkNumRequests(t, 1) + replacedReq.checkOneRequests(t) }) } runTest("enable_queue_batcher", true) diff --git a/exporter/exporterhelper/internal/retry_sender.go b/exporter/exporterhelper/internal/retry_sender.go index 2aaf3fd9810..8a565bf051b 100644 --- a/exporter/exporterhelper/internal/retry_sender.go +++ b/exporter/exporterhelper/internal/retry_sender.go @@ -133,7 +133,7 @@ func (rs *retrySender) Send(ctx context.Context, req internal.Request) error { // back-off, but get interrupted when shutting down or request is cancelled or timed out. select { case <-ctx.Done(): - return fmt.Errorf("request is cancelled or timed out %w", err) + return fmt.Errorf("request is cancelled or timed out: %w", err) case <-rs.stopCh: return experr.NewShutdownErr(err) case <-time.After(backoffDelay): diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 35b64bc9b41..62996f58cdf 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -6,6 +6,7 @@ package internal import ( "context" "errors" + "fmt" "sync" "sync/atomic" "testing" @@ -24,8 +25,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal" - "go.opentelemetry.io/collector/pdata/testdata" - "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/exporter/internal/requesttest" ) func mockRequestUnmarshaler(mr internal.Request) exporterqueue.Unmarshaler[internal.Request] { @@ -38,377 +38,126 @@ func mockRequestMarshaler(internal.Request) ([]byte, error) { return []byte("mockRequest"), nil } -func TestQueuedRetry_DropOnPermanentError(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - rCfg := configretry.NewDefaultBackOffConfig() - mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(mockR)), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestQueuedRetry_DropOnNoRetry(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = false - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error")))), - WithQueue(qCfg), WithRetry(rCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - mockR := newMockRequest(2, errors.New("transient error")) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestQueuedRetry_OnError(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = 0 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(2, traceErr) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestQueuedRetry_MaxElapsedTime(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = time.Millisecond - rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - ocs.run(func() { - // Add an item that will always fail. - require.NoError(t, be.Send(context.Background(), newErrorRequest())) - }) - - mockR := newMockRequest(2, nil) - start := time.Now() - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // We should ensure that we wait for more than 50ms but less than 150ms (50% less and 50% more than max elapsed). - waitingTime := time.Since(start) - assert.Less(t, 50*time.Millisecond, waitingTime) - assert.Less(t, waitingTime, 150*time.Millisecond) - - // In the newMockConcurrentExporter we count requests and items even for failed requests. - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -type wrappedError struct { - error -} - -func (e wrappedError) Unwrap() error { - return e.error -} - -func TestQueuedRetry_ThrottleError(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = 10 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - retry := NewThrottleRetry(errors.New("throttle error"), 100*time.Millisecond) - mockR := newMockRequest(2, wrappedError{retry}) - start := time.Now() - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // The initial backoff is 10ms, but because of the throttle this should wait at least 100ms. - assert.Less(t, 100*time.Millisecond, time.Since(start)) - - mockR.checkNumRequests(t, 2) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestQueuedRetry_RetryOnError(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - qCfg.QueueSize = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = 0 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - mockR := newMockRequest(2, errors.New("transient error")) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestQueueRetryWithNoQueue(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.MaxElapsedTime = time.Nanosecond // fail fast - be, err := NewBaseExporter(exportertest.NewNopSettings(), pipeline.SignalLogs, newObservabilityConsumerSender, WithRetry(rCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(2, errors.New("some error")) - ocs.run(func() { - require.Error(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - require.NoError(t, be.Shutdown(context.Background())) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestQueueRetryWithDisabledRetires(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = false - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.ErrorLevel) - set.Logger = zap.New(logger) - be, err := NewBaseExporter(set, pipeline.SignalLogs, newObservabilityConsumerSender, WithRetry(rCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(2, errors.New("some error")) - ocs.run(func() { - require.Error(t, be.Send(context.Background(), mockR)) - }) - assert.Len(t, observed.All(), 1) - assert.Equal(t, "Exporting failed. Rejecting data. "+ - "Try enabling retry_on_failure config option to retry on retryable errors.", observed.All()[0].Message) - ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - require.NoError(t, be.Shutdown(context.Background())) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestRetryWithContextTimeout(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - const testTimeout = 10 * time.Second - - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = true - - // First attempt after 100ms is attempted - rCfg.InitialInterval = 100 * time.Millisecond - rCfg.RandomizationFactor = 0 - // Second attempt is at twice the testTimeout - rCfg.Multiplier = float64(2 * testTimeout / rCfg.InitialInterval) - qCfg := exporterqueue.NewDefaultConfig() - qCfg.Enabled = false - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.InfoLevel) - set.Logger = zap.New(logger) - be, err := NewBaseExporter( - set, - pipeline.SignalLogs, - newObservabilityConsumerSender, - WithRetry(rCfg), - WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()), - ) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - mockR := newErrorRequest() - - start := time.Now() - ocs.run(func() { - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() - err := be.Send(ctx, mockR) - require.Error(t, err) - require.Equal(t, "request will be cancelled before next retry: transient error", err.Error()) - }) - assert.Len(t, observed.All(), 2) - assert.Equal(t, "Exporting failed. Will retry the request after interval.", observed.All()[0].Message) - assert.Equal(t, "Exporting failed. Rejecting data. "+ - "Try enabling sending_queue to survive temporary failures.", observed.All()[1].Message) - ocs.awaitAsyncProcessing() - ocs.checkDroppedItemsCount(t, 7) - require.Equal(t, 2, mockR.(*mockErrorRequest).getNumRequests()) - require.NoError(t, be.Shutdown(context.Background())) - - // There should be no delay, because the initial interval is - // longer than the context timeout. Merely checking that no - // delays on the order of either the context timeout or the - // retry interval were introduced, i.e., fail fast. - elapsed := time.Since(start) - require.Less(t, elapsed, testTimeout/2) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) +func TestRetrySenderDropOnPermanentError(t *testing.T) { + rCfg := configretry.NewDefaultBackOffConfig() + sink := requesttest.NewSink() + expErr := consumererror.NewPermanent(errors.New("bad data")) + rs := newRetrySender(rCfg, exportertest.NewNopSettings(), newNoopExportSender()) + require.NoError(t, rs.Start(context.Background(), componenttest.NewNopHost())) + require.ErrorIs(t, rs.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: expErr}), expErr) + require.ErrorIs(t, rs.Send(context.Background(), &requesttest.FakeRequest{Items: 3, Sink: sink, ExportErr: expErr}), expErr) + assert.Equal(t, int64(0), sink.ItemsCount()) + assert.Equal(t, int64(0), sink.RequestsCount()) + require.NoError(t, rs.Shutdown(context.Background())) +} + +func TestRetrySenderSimpleRetry(t *testing.T) { + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = 0 + sink := requesttest.NewSink() + expErr := errors.New("transient error") + rs := newRetrySender(rCfg, exportertest.NewNopSettings(), newNoopExportSender()) + require.NoError(t, rs.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, rs.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: expErr})) + assert.Equal(t, int64(2), sink.ItemsCount()) + assert.Equal(t, int64(1), sink.RequestsCount()) + require.NoError(t, rs.Shutdown(context.Background())) +} + +func TestRetrySenderRetryPartial(t *testing.T) { + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = 0 + sink := requesttest.NewSink() + rs := newRetrySender(rCfg, exportertest.NewNopSettings(), newNoopExportSender()) + require.NoError(t, rs.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, rs.Send(context.Background(), &requesttest.FakeRequest{Items: 5, Sink: sink, Partial: 3})) + assert.Equal(t, int64(5), sink.ItemsCount()) + assert.Equal(t, int64(2), sink.RequestsCount()) + require.NoError(t, rs.Shutdown(context.Background())) +} + +func TestRetrySenderMaxElapsedTime(t *testing.T) { + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = time.Millisecond + rCfg.MaxElapsedTime = 100 * time.Millisecond + sink := requesttest.NewSink() + rs := newRetrySender(rCfg, exportertest.NewNopSettings(), newNoopExportSender()) + require.NoError(t, rs.Start(context.Background(), componenttest.NewNopHost())) + expErr := errors.New("transient error") + require.ErrorIs(t, rs.Send(context.Background(), newErrorRequest(expErr)), expErr) + assert.Equal(t, int64(0), sink.ItemsCount()) + assert.Equal(t, int64(0), sink.RequestsCount()) + require.NoError(t, rs.Shutdown(context.Background())) +} + +func TestRetrySenderThrottleError(t *testing.T) { + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = 10 * time.Millisecond + sink := requesttest.NewSink() + rs := newRetrySender(rCfg, exportertest.NewNopSettings(), newNoopExportSender()) + require.NoError(t, rs.Start(context.Background(), componenttest.NewNopHost())) + retry := fmt.Errorf("wrappe error: %w", NewThrottleRetry(errors.New("throttle error"), 100*time.Millisecond)) + start := time.Now() + require.NoError(t, rs.Send(context.Background(), &requesttest.FakeRequest{Items: 5, Sink: sink, ExportErr: retry})) + // The initial backoff is 10ms, but because of the throttle this should wait at least 100ms. + assert.Less(t, 100*time.Millisecond, time.Since(start)) + assert.Equal(t, int64(5), sink.ItemsCount()) + assert.Equal(t, int64(1), sink.RequestsCount()) + require.NoError(t, rs.Shutdown(context.Background())) +} + +func TestRetrySenderWithContextTimeout(t *testing.T) { + const testTimeout = 10 * time.Second + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.Enabled = true + // First attempt after 100ms is attempted + rCfg.InitialInterval = 100 * time.Millisecond + rCfg.RandomizationFactor = 0 + // Second attempt is at twice the testTimeout + rCfg.Multiplier = float64(2 * testTimeout / rCfg.InitialInterval) + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.InfoLevel) + set.Logger = zap.New(logger) + rs := newRetrySender(rCfg, set, newNoopExportSender()) + require.NoError(t, rs.Start(context.Background(), componenttest.NewNopHost())) + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + require.ErrorContains(t, + rs.Send(ctx, newErrorRequest(errors.New("transient error"))), + "request will be cancelled before next retry: transient error") + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Will retry the request after interval.", observed.All()[0].Message) + require.Less(t, time.Since(start), testTimeout/2) + require.NoError(t, rs.Shutdown(context.Background())) +} + +func TestRetrySenderWithCancelledContext(t *testing.T) { + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.Enabled = true + // First attempt after 1s is attempted + rCfg.InitialInterval = 1 * time.Second + rs := newRetrySender(rCfg, exportertest.NewNopSettings(), newNoopExportSender()) + require.NoError(t, rs.Start(context.Background(), componenttest.NewNopHost())) + start := time.Now() + ctx, cancel := context.WithCancelCause(context.Background()) + go func() { + <-time.After(100 * time.Millisecond) + cancel(errors.New("my reason")) + }() + require.ErrorContains(t, + rs.Send(ctx, newErrorRequest(errors.New("transient error"))), + "request is cancelled or timed out: transient error") + require.Less(t, time.Since(start), 1*time.Second) + require.NoError(t, rs.Shutdown(context.Background())) } type mockErrorRequest struct { - mu sync.Mutex - requests int + err error } func (mer *mockErrorRequest) Export(context.Context) error { - mer.mu.Lock() - defer mer.mu.Unlock() - mer.requests++ - return errors.New("transient error") -} - -func (mer *mockErrorRequest) getNumRequests() int { - mer.mu.Lock() - defer mer.mu.Unlock() - return mer.requests + return mer.err } func (mer *mockErrorRequest) OnError(error) internal.Request { @@ -423,8 +172,8 @@ func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSize return nil, nil } -func newErrorRequest() internal.Request { - return &mockErrorRequest{} +func newErrorRequest(err error) internal.Request { + return &mockErrorRequest{err: err} } type mockRequest struct { @@ -455,9 +204,9 @@ func (m *mockRequest) OnError(error) internal.Request { } } -func (m *mockRequest) checkNumRequests(t *testing.T, want int) { +func (m *mockRequest) checkOneRequests(t *testing.T) { assert.Eventually(t, func() bool { - return int64(want) == m.requestCount.Load() + return int64(1) == m.requestCount.Load() }, time.Second, 1*time.Millisecond) } diff --git a/exporter/internal/requesttest/fake_request.go b/exporter/internal/requesttest/fake_request.go index 2c80e01cf1c..a4d0bb55dc2 100644 --- a/exporter/internal/requesttest/fake_request.go +++ b/exporter/internal/requesttest/fake_request.go @@ -5,6 +5,8 @@ package requesttest // import "go.opentelemetry.io/collector/exporter/internal/r import ( "context" + "errors" + "fmt" "sync/atomic" "time" @@ -35,10 +37,19 @@ func NewSink() *Sink { } } +type errorPartial struct { + fr *FakeRequest +} + +func (e errorPartial) Error() string { + return fmt.Sprintf("items: %d", e.fr.Items) +} + type FakeRequest struct { Items int Sink *Sink ExportErr error + Partial int MergeErr error Delay time.Duration } @@ -50,7 +61,23 @@ func (r *FakeRequest) Export(ctx context.Context) error { case <-time.After(r.Delay): } if r.ExportErr != nil { - return r.ExportErr + err := r.ExportErr + r.ExportErr = nil + return err + } + if r.Partial > 0 { + if r.Sink != nil { + r.Sink.requestsCount.Add(1) + r.Sink.itemsCount.Add(int64(r.Items - r.Partial)) + } + return errorPartial{fr: &FakeRequest{ + Items: r.Partial, + Sink: r.Sink, + ExportErr: r.ExportErr, + Partial: 0, + MergeErr: r.MergeErr, + Delay: r.Delay, + }} } if r.Sink != nil { r.Sink.requestsCount.Add(1) @@ -59,6 +86,14 @@ func (r *FakeRequest) Export(ctx context.Context) error { return nil } +func (r *FakeRequest) OnError(err error) internal.Request { + var pErr errorPartial + if errors.As(err, &pErr) { + return pErr.fr + } + return r +} + func (r *FakeRequest) ItemsCount() int { return r.Items }