Skip to content

Commit

Permalink
tracer: Add a retryInterval option (#2986)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtoffl01 authored Dec 9, 2024
1 parent 7f02289 commit d9a7f04
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 24 deletions.
2 changes: 1 addition & 1 deletion ddtrace/tracer/civisibility_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (w *ciVisibilityTraceWriter) flush() {
}
log.Error("ciVisibilityTraceWriter: failure sending events (attempt %d), will retry: %v", attempt+1, err)
p.reset()
time.Sleep(time.Millisecond)
time.Sleep(w.config.retryInterval)
}
log.Error("ciVisibilityTraceWriter: lost %d events: %v", count, err)
telemetry.EndpointPayloadDropped(telemetry.TestCycleEndpointType)
Expand Down
30 changes: 21 additions & 9 deletions ddtrace/tracer/civisibility_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/tinylib/msgp/msgp"
Expand Down Expand Up @@ -57,21 +58,25 @@ func (t *failingCiVisibilityTransport) send(p *payload) (io.ReadCloser, error) {
func TestCiVisibilityTraceWriterFlushRetries(t *testing.T) {
testcases := []struct {
configRetries int
retryInterval time.Duration
failCount int
tracesSent bool
expAttempts int
}{
{configRetries: 0, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 0, failCount: 1, tracesSent: false, expAttempts: 1},
{configRetries: 0, retryInterval: time.Millisecond, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 0, retryInterval: time.Millisecond, failCount: 1, tracesSent: false, expAttempts: 1},

{configRetries: 1, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 1, failCount: 1, tracesSent: true, expAttempts: 2},
{configRetries: 1, failCount: 2, tracesSent: false, expAttempts: 2},
{configRetries: 1, retryInterval: time.Millisecond, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 1, retryInterval: time.Millisecond, failCount: 1, tracesSent: true, expAttempts: 2},
{configRetries: 1, retryInterval: time.Millisecond, failCount: 2, tracesSent: false, expAttempts: 2},

{configRetries: 2, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 2, failCount: 1, tracesSent: true, expAttempts: 2},
{configRetries: 2, failCount: 2, tracesSent: true, expAttempts: 3},
{configRetries: 2, failCount: 3, tracesSent: false, expAttempts: 3},
{configRetries: 2, retryInterval: time.Millisecond, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 2, retryInterval: time.Millisecond, failCount: 1, tracesSent: true, expAttempts: 2},
{configRetries: 2, retryInterval: time.Millisecond, failCount: 2, tracesSent: true, expAttempts: 3},
{configRetries: 2, retryInterval: time.Millisecond, failCount: 3, tracesSent: false, expAttempts: 3},

{configRetries: 1, retryInterval: 2 * time.Millisecond, failCount: 1, tracesSent: true, expAttempts: 2},
{configRetries: 2, retryInterval: 2 * time.Millisecond, failCount: 2, tracesSent: true, expAttempts: 3},
}

ss := []*span{makeSpan(0)}
Expand All @@ -86,16 +91,23 @@ func TestCiVisibilityTraceWriterFlushRetries(t *testing.T) {
c := newConfig(func(c *config) {
c.transport = p
c.sendRetries = test.configRetries
c.retryInterval = test.retryInterval
})

h := newCiVisibilityTraceWriter(c)
h.add(ss)

start := time.Now()
h.flush()
h.wg.Wait()
elapsed := time.Since(start)

assert.Equal(test.expAttempts, p.sendAttempts)
assert.Equal(test.tracesSent, p.tracesSent)

if test.configRetries > 0 && test.failCount > 1 {
assert.GreaterOrEqual(elapsed, test.retryInterval*time.Duration(minInts(test.configRetries+1, test.failCount)))
}
})
}
}
14 changes: 12 additions & 2 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,13 @@ type config struct {
// output instead of using the agent. This is used in Lambda environments.
logToStdout bool

// sendRetries is the number of times a trace payload send is retried upon
// sendRetries is the number of times a trace or CI Visibility payload send is retried upon
// failure.
sendRetries int

// retryInterval is the interval between agent connection retries. It has no effect if sendRetries is not set
retryInterval time.Duration

// logStartup, when true, causes various startup info to be written
// when the tracer starts.
logStartup bool
Expand Down Expand Up @@ -456,7 +459,7 @@ func newConfig(opts ...StartOption) *config {
if v := os.Getenv("DD_TRACE_PEER_SERVICE_MAPPING"); v != "" {
internal.ForEachStringTag(v, internal.DDTagsDelimiter, func(key, val string) { c.peerServiceMappings[key] = val })
}

c.retryInterval = time.Millisecond
for _, fn := range opts {
fn(c)
}
Expand Down Expand Up @@ -892,6 +895,13 @@ func WithSendRetries(retries int) StartOption {
}
}

// WithRetryInterval sets the interval, in seconds, for retrying submitting payloads to the agent.
func WithRetryInterval(interval int) StartOption {
return func(c *config) {
c.retryInterval = time.Duration(interval) * time.Second
}
}

// WithPropagator sets an alternative propagator to be used by the tracer.
func WithPropagator(p Propagator) StartOption {
return func(c *config) {
Expand Down
17 changes: 17 additions & 0 deletions ddtrace/tracer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,23 @@ func TestTracerOptionsDefaults(t *testing.T) {
assert.Equal(t, time.Duration(10*time.Second), c.httpClient.Timeout)
})
})

t.Run("trace-retries", func(t *testing.T) {
c := newConfig()
assert.Equal(t, 0, c.sendRetries)
assert.Equal(t, time.Millisecond, c.retryInterval)
})
}

func TestTraceRetry(t *testing.T) {
t.Run("sendRetries", func(t *testing.T) {
c := newConfig(WithSendRetries(10))
assert.Equal(t, 10, c.sendRetries)
})
t.Run("retryInterval", func(t *testing.T) {
c := newConfig(WithRetryInterval(10))
assert.Equal(t, 10*time.Second, c.retryInterval)
})
}

func TestDefaultHTTPClient(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions ddtrace/tracer/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func startTelemetry(c *config) {
{Name: "dogstatsd_port", Value: c.agent.StatsdPort},
{Name: "lambda_mode", Value: c.logToStdout},
{Name: "send_retries", Value: c.sendRetries},
{Name: "retry_interval", Value: c.retryInterval},
{Name: "trace_startup_logs_enabled", Value: c.logStartup},
{Name: "service", Value: c.serviceName},
{Name: "universal_version", Value: c.universalVersion},
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (h *agentTraceWriter) flush() {
var err error
for attempt := 0; attempt <= h.config.sendRetries; attempt++ {
size, count = p.size(), p.itemCount()
log.Debug("Sending payload: size: %d traces: %d\n", size, count)
log.Debug("Attempt to send payload: size: %d traces: %d\n", size, count)
var rc io.ReadCloser
rc, err = h.config.transport.send(p)
if err == nil {
Expand All @@ -119,7 +119,7 @@ func (h *agentTraceWriter) flush() {
}
log.Error("failure sending traces (attempt %d), will retry: %v", attempt+1, err)
p.reset()
time.Sleep(time.Millisecond)
time.Sleep(h.config.retryInterval)
}
h.statsd.Count("datadog.tracer.traces_dropped", int64(count), []string{"reason:send_failed"}, 1)
log.Error("lost %d traces: %v", count, err)
Expand Down
37 changes: 27 additions & 10 deletions ddtrace/tracer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"math"
"strings"
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/statsdtest"
Expand Down Expand Up @@ -363,21 +364,25 @@ func (t *failingTransport) send(p *payload) (io.ReadCloser, error) {
func TestTraceWriterFlushRetries(t *testing.T) {
testcases := []struct {
configRetries int
retryInterval time.Duration
failCount int
tracesSent bool
expAttempts int
}{
{configRetries: 0, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 0, failCount: 1, tracesSent: false, expAttempts: 1},
{configRetries: 0, retryInterval: time.Millisecond, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 0, retryInterval: time.Millisecond, failCount: 1, tracesSent: false, expAttempts: 1},

{configRetries: 1, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 1, failCount: 1, tracesSent: true, expAttempts: 2},
{configRetries: 1, failCount: 2, tracesSent: false, expAttempts: 2},
{configRetries: 1, retryInterval: time.Millisecond, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 1, retryInterval: time.Millisecond, failCount: 1, tracesSent: true, expAttempts: 2},
{configRetries: 1, retryInterval: time.Millisecond, failCount: 2, tracesSent: false, expAttempts: 2},

{configRetries: 2, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 2, failCount: 1, tracesSent: true, expAttempts: 2},
{configRetries: 2, failCount: 2, tracesSent: true, expAttempts: 3},
{configRetries: 2, failCount: 3, tracesSent: false, expAttempts: 3},
{configRetries: 2, retryInterval: time.Millisecond, failCount: 0, tracesSent: true, expAttempts: 1},
{configRetries: 2, retryInterval: time.Millisecond, failCount: 1, tracesSent: true, expAttempts: 2},
{configRetries: 2, retryInterval: time.Millisecond, failCount: 2, tracesSent: true, expAttempts: 3},
{configRetries: 2, retryInterval: time.Millisecond, failCount: 3, tracesSent: false, expAttempts: 3},

{configRetries: 1, retryInterval: 2 * time.Millisecond, failCount: 1, tracesSent: true, expAttempts: 2},
{configRetries: 2, retryInterval: 2 * time.Millisecond, failCount: 2, tracesSent: true, expAttempts: 3},
}

sentCounts := map[string]int64{
Expand All @@ -401,14 +406,16 @@ func TestTraceWriterFlushRetries(t *testing.T) {
c := newConfig(func(c *config) {
c.transport = p
c.sendRetries = test.configRetries
c.retryInterval = test.retryInterval
})
var statsd statsdtest.TestStatsdClient

h := newAgentTraceWriter(c, nil, &statsd)
h.add(ss)

start := time.Now()
h.flush()
h.wg.Wait()
elapsed := time.Since(start)

assert.Equal(test.expAttempts, p.sendAttempts)
assert.Equal(test.tracesSent, p.tracesSent)
Expand All @@ -419,10 +426,20 @@ func TestTraceWriterFlushRetries(t *testing.T) {
} else {
assert.Equal(droppedCounts, statsd.Counts())
}
if test.configRetries > 0 && test.failCount > 1 {
assert.GreaterOrEqual(elapsed, test.retryInterval*time.Duration(minInts(test.configRetries+1, test.failCount)))
}
})
}
}

func minInts(a, b int) int {
if a < b {
return a
}
return b
}

func BenchmarkJsonEncodeSpan(b *testing.B) {
s := makeSpan(10)
s.Metrics["nan"] = math.NaN()
Expand Down

0 comments on commit d9a7f04

Please sign in to comment.