Skip to content

Commit

Permalink
reworked knownmetrics to include namespace and metric kind and added …
Browse files Browse the repository at this point in the history
…internal metrics support

Signed-off-by: Eliott Bouhana <[email protected]>
  • Loading branch information
eliottness committed Jan 29, 2025
1 parent 44eb960 commit 05b010c
Show file tree
Hide file tree
Showing 14 changed files with 1,118 additions and 280 deletions.
14 changes: 8 additions & 6 deletions internal/newtelemetry/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
type Namespace = transport.Namespace

const (
NamespaceGeneral = transport.NamespaceGeneral
NamespaceTracers = transport.NamespaceTracers
NamespaceProfilers = transport.NamespaceProfilers
NamespaceAppSec = transport.NamespaceAppSec
NamespaceIAST = transport.NamespaceIAST
NamespaceTelemetry = transport.NamespaceTelemetry
NamespaceGeneral = transport.NamespaceGeneral
NamespaceTracers = transport.NamespaceTracers
NamespaceProfilers = transport.NamespaceProfilers
NamespaceAppSec = transport.NamespaceAppSec
NamespaceIAST = transport.NamespaceIAST
NamespaceCIVisibility = transport.NamespaceCIVisibility
NamespaceMLOps = transport.NamespaceMLOps
NamespaceRUM = transport.NamespaceRUM
)

// Origin describes the source of a configuration change
Expand Down
53 changes: 48 additions & 5 deletions internal/newtelemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package newtelemetry

import (
"errors"
"os"
"strconv"
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
Expand Down Expand Up @@ -53,10 +55,7 @@ func newClient(tracerConfig internal.TracerConfig, config ClientConfig) (*client
writer: writer,
clientConfig: config,
flushMapper: mapper.NewDefaultMapper(config.HeartbeatInterval, config.ExtendedHeartbeatInterval),
// This means that, by default, we incur dataloss if we spend ~30mins without flushing, considering we send telemetry data this looks reasonable.
// This also means that in the worst case scenario, memory-wise, the app is stabilized after running for 30mins.
// TODO: tweak this value once we get real telemetry data from the telemetry client
payloadQueue: internal.NewRingQueue[transport.Payload](4, 32),
payloadQueue: internal.NewRingQueue[transport.Payload](config.PayloadQueueSize.Min, config.PayloadQueueSize.Max),

dependencies: dependencies{
DependencyLoader: config.DependencyLoader,
Expand Down Expand Up @@ -86,7 +85,7 @@ func newClient(tracerConfig internal.TracerConfig, config ClientConfig) (*client

client.flushTicker = internal.NewTicker(func() {
client.Flush()
}, config.FlushIntervalRange.Min, config.FlushIntervalRange.Max)
}, config.FlushInterval.Min, config.FlushInterval.Max)

return client, nil
}
Expand Down Expand Up @@ -230,6 +229,7 @@ func (c *client) flush(payloads []transport.Payload) (int, error) {
}

results, err := c.writer.Flush(payload)
c.computeFlushMetrics(results, err)
if err != nil {
// We stop flushing when we encounter a fatal error, put the payloads in the queue and return the error
log.Error("error while flushing telemetry data: %v", err)
Expand Down Expand Up @@ -264,6 +264,49 @@ func (c *client) flush(payloads []transport.Payload) (int, error) {
return nbBytes, nil
}

// computeFlushMetrics computes and submits the metrics for the flush operation using the output from the writer.Flush method.
// It will submit the number of requests, responses, errors, the number of bytes sent and the duration of the call that was successful.
func (c *client) computeFlushMetrics(results []internal.EndpointRequestResult, err error) {
if !c.clientConfig.InternalMetricsEnabled {
return
}

indexToEndpoint := func(i int) string {
if i == 0 {
return "agent"
}
return "agentless"
}

for i, result := range results {
c.Count(transport.NamespaceTelemetry, "telemetry_api.requests", map[string]string{"endpoint": indexToEndpoint(i)}).Submit(1)
if result.StatusCode != 0 {
c.Count(transport.NamespaceTelemetry, "telemetry_api.responses", map[string]string{"endpoint": indexToEndpoint(i), "status_code": strconv.Itoa(result.StatusCode)}).Submit(1)
}

if result.Error != nil {
typ := "network"
if os.IsTimeout(result.Error) {
typ = "timeout"
}
var writerStatusCodeError *internal.WriterStatusCodeError
if errors.As(result.Error, &writerStatusCodeError) {
typ = "status_code"
}
c.Count(transport.NamespaceTelemetry, "telemetry_api.errors", map[string]string{"endpoint": indexToEndpoint(i), "type": typ}).Submit(1)
}
}

if err != nil {
return
}

successfulCall := results[len(results)-1]
endpoint := indexToEndpoint(len(results) - 1)
c.Distribution(transport.NamespaceTelemetry, "telemetry_api.bytes", map[string]string{"endpoint": endpoint}).Submit(float64(successfulCall.PayloadByteSize))
c.Distribution(transport.NamespaceTelemetry, "telemetry_api.ms", map[string]string{"endpoint": endpoint}).Submit(float64(successfulCall.CallDuration.Milliseconds()))
}

func (c *client) appStart() {
c.flushMapperMu.Lock()
defer c.flushMapperMu.Unlock()
Expand Down
84 changes: 67 additions & 17 deletions internal/newtelemetry/client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal"
)

type Range[T any] struct {
Min T
Max T
}

type ClientConfig struct {
// DependencyLoader determines how dependency data is sent via telemetry.
// If nil, the library should not send the app-dependencies-loaded event.
// The default value is [debug.ReadBuildInfo] since Application Security Monitoring uses this data to detect vulnerabilities in the ASM-SCA product
// This can be controlled via the env var DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED
DependencyLoader func() (*debug.BuildInfo, bool)

// MetricsEnabled etermines whether metrics are sent via telemetry.
// MetricsEnabled determines whether metrics are sent via telemetry.
// If false, libraries should not send the generate-metrics or distributions events.
// This can be controlled via the env var DD_TELEMETRY_METRICS_ENABLED
MetricsEnabled bool
Expand All @@ -33,6 +38,9 @@ type ClientConfig struct {
// This can be controlled via the env var DD_TELEMETRY_LOG_COLLECTION_ENABLED
LogsEnabled bool

// InternalMetricsEnabled determines whether client stats metrics are sent via telemetry.
InternalMetricsEnabled bool

// AgentlessURL is the full URL to the agentless telemetry endpoint. (optional)
// Defaults to https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry
AgentlessURL string
Expand All @@ -51,13 +59,23 @@ type ClientConfig struct {
// ExtendedHeartbeatInterval is the interval at which to send an extended heartbeat payload, defaults to 24h.
ExtendedHeartbeatInterval time.Duration

// FlushIntervalRange is the interval at which the client flushes the data.
// FlushInterval is the interval at which the client flushes the data.
// By default, the client will start to Flush at 60s intervals and will reduce the interval based on the load till it hit 15s
// Both values cannot be higher than 60s because the heartbeat need to be sent at least every 60s.
FlushIntervalRange struct {
Min time.Duration
Max time.Duration
}
FlushInterval Range[time.Duration]

// PayloadQueueSize is the size of the payload queue.
// This means that, by default, we incur dataloss if we spend ~30mins without flushing, considering we send telemetry data this looks reasonable.
// This also means that in the worst case scenario, memory-wise, the app is stabilized after running for 30mins.
// Ideally both values should be power of 2 because of the way the ring queue is implemented as it's growing
PayloadQueueSize Range[int]

// DistributionsSize is the size of the distribution queue.
// Default max size is a 2^14 array of float64 (2^3 bytes) which makes a distribution 128KB bytes array _at worse_.
// Considering we add a point per user request on a simple http server, we would be losing data after 2^14 requests per minute or about 280 requests per second or under 3ms per request.
// If this throughput is constant, the telemetry client flush ticker speed will increase to, at best, double twice to flush 15 seconds of data each time.
// Which will bring our max throughput to 1100 points per second or about 750µs per request.
DistributionsSize Range[int]

// Debug enables debug mode for the telemetry clientt and sent it to the backend so it logs the request
Debug bool
Expand All @@ -71,7 +89,7 @@ type ClientConfig struct {
EarlyFlushPayloadSize int
}

const (
var (
// agentlessURL is the endpoint used to send telemetry in an agentless environment. It is
// also the default URL in case connecting to the agent URL fails.
agentlessURL = "https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry"
Expand All @@ -94,6 +112,18 @@ const (

// maxPayloadSize is specified by the backend to be 5MB. The goal is to never reach this value otherwise our data will be silently dropped.
maxPayloadSize = 5 * 1024 * 1024 // 5MB

// TODO: tweak this value once we get real telemetry data from the telemetry client
defaultPayloadQueueSize = Range[int]{
Min: 4,
Max: 21,
}

// TODO: tweak this value once we get telemetry data from the telemetry client
distributionsSize = Range[int]{
Min: 1 << 8,
Max: 1 << 14,
}
)

// clamp squeezes a value between a minimum and maximum value.
Expand All @@ -106,12 +136,12 @@ func (config ClientConfig) validateConfig() error {
return fmt.Errorf("HeartbeatInterval cannot be higher than 60s, got %v", config.HeartbeatInterval)
}

if config.FlushIntervalRange.Min > 60*time.Second || config.FlushIntervalRange.Max > 60*time.Second {
return fmt.Errorf("FlushIntervalRange cannot be higher than 60s, got Min: %v, Max: %v", config.FlushIntervalRange.Min, config.FlushIntervalRange.Max)
if config.FlushInterval.Min > 60*time.Second || config.FlushInterval.Max > 60*time.Second {
return fmt.Errorf("FlushIntervalRange cannot be higher than 60s, got Min: %v, Max: %v", config.FlushInterval.Min, config.FlushInterval.Max)
}

if config.FlushIntervalRange.Min > config.FlushIntervalRange.Max {
return fmt.Errorf("FlushIntervalRange Min cannot be higher than Max, got Min: %v, Max: %v", config.FlushIntervalRange.Min, config.FlushIntervalRange.Max)
if config.FlushInterval.Min > config.FlushInterval.Max {
return fmt.Errorf("FlushIntervalRange Min cannot be higher than Max, got Min: %v, Max: %v", config.FlushInterval.Min, config.FlushInterval.Max)
}

if config.EarlyFlushPayloadSize > maxPayloadSize || config.EarlyFlushPayloadSize <= 0 {
Expand All @@ -137,16 +167,16 @@ func defaultConfig(config ClientConfig) ClientConfig {
config.HeartbeatInterval = clamp(config.HeartbeatInterval, time.Microsecond, 60*time.Second)
}

if config.FlushIntervalRange.Min == 0 {
config.FlushIntervalRange.Min = defaultMinFlushInterval
if config.FlushInterval.Min == 0 {
config.FlushInterval.Min = defaultMinFlushInterval
} else {
config.FlushIntervalRange.Min = clamp(config.FlushIntervalRange.Min, time.Microsecond, 60*time.Second)
config.FlushInterval.Min = clamp(config.FlushInterval.Min, time.Microsecond, 60*time.Second)
}

if config.FlushIntervalRange.Max == 0 {
config.FlushIntervalRange.Max = defaultMaxFlushInterval
if config.FlushInterval.Max == 0 {
config.FlushInterval.Max = defaultMaxFlushInterval
} else {
config.FlushIntervalRange.Max = clamp(config.FlushIntervalRange.Max, time.Microsecond, 60*time.Second)
config.FlushInterval.Max = clamp(config.FlushInterval.Max, time.Microsecond, 60*time.Second)
}

if config.DependencyLoader == nil && globalinternal.BoolEnv("DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED", true) {
Expand All @@ -161,6 +191,10 @@ func defaultConfig(config ClientConfig) ClientConfig {
config.LogsEnabled = globalinternal.BoolEnv("DD_TELEMETRY_LOG_COLLECTION_ENABLED", true)
}

if !config.InternalMetricsEnabled {
config.InternalMetricsEnabled = true
}

if config.EarlyFlushPayloadSize == 0 {
config.EarlyFlushPayloadSize = defaultEarlyFlushPayloadSize
}
Expand All @@ -169,6 +203,22 @@ func defaultConfig(config ClientConfig) ClientConfig {
config.ExtendedHeartbeatInterval = defaultExtendedHeartbeatInterval
}

if config.PayloadQueueSize.Min == 0 {
config.PayloadQueueSize.Min = defaultPayloadQueueSize.Min
}

if config.PayloadQueueSize.Max == 0 {
config.PayloadQueueSize.Max = defaultPayloadQueueSize.Max
}

if config.DistributionsSize.Min == 0 {
config.DistributionsSize.Min = distributionsSize.Min
}

if config.DistributionsSize.Max == 0 {
config.DistributionsSize.Max = distributionsSize.Max
}

return config
}

Expand Down
41 changes: 23 additions & 18 deletions internal/newtelemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func TestClientFlush(t *testing.T) {
},
},
{
name: "full-distribution",
name: "distribution-overflow",
when: func(c *client) {
handler := c.Distribution(NamespaceGeneral, "init_time", nil)
for i := 0; i < 1<<16; i++ {
Expand All @@ -911,7 +911,9 @@ func TestClientFlush(t *testing.T) {
t.Parallel()
config := defaultConfig(test.clientConfig)
config.AgentURL = "http://localhost:8126"
config.DependencyLoader = test.clientConfig.DependencyLoader // Don't use the default dependency loader
config.DependencyLoader = test.clientConfig.DependencyLoader // Don't use the default dependency loader
config.InternalMetricsEnabled = test.clientConfig.InternalMetricsEnabled // only enabled internal metrics when explicitly set
config.InternalMetricsEnabled = false
c, err := newClient(tracerConfig, config)
require.NoError(t, err)
defer c.Close()
Expand All @@ -934,15 +936,23 @@ func TestClientFlush(t *testing.T) {
func TestMetricsDisabled(t *testing.T) {
t.Setenv("DD_TELEMETRY_METRICS_ENABLED", "false")

client, err := NewClient("test-service", "test-env", "1.0.0", ClientConfig{AgentURL: "http://localhost:8126"})
c, err := NewClient("test-service", "test-env", "1.0.0", ClientConfig{AgentURL: "http://localhost:8126"})
require.NoError(t, err)

defer client.Close()
recordWriter := &internal.RecordWriter{}
c.(*client).writer = recordWriter

assert.NotNil(t, client.Gauge(NamespaceTracers, "init_time", nil))
assert.NotNil(t, client.Count(NamespaceTracers, "init_time", nil))
assert.NotNil(t, client.Rate(NamespaceTracers, "init_time", nil))
assert.NotNil(t, client.Distribution(NamespaceGeneral, "init_time", nil))
defer c.Close()

assert.NotNil(t, c.Gauge(NamespaceTracers, "init_time", nil))
assert.NotNil(t, c.Count(NamespaceTracers, "init_time", nil))
assert.NotNil(t, c.Rate(NamespaceTracers, "init_time", nil))
assert.NotNil(t, c.Distribution(NamespaceGeneral, "init_time", nil))

c.Flush()

payloads := recordWriter.Payloads()
require.Len(t, payloads, 0)
}

type testRoundTripper struct {
Expand Down Expand Up @@ -1149,11 +1159,8 @@ func BenchmarkWorstCaseScenarioFloodLogging(b *testing.B) {
clientConfig := ClientConfig{
HeartbeatInterval: time.Hour,
ExtendedHeartbeatInterval: time.Hour,
FlushIntervalRange: struct {
Min time.Duration
Max time.Duration
}{Min: time.Second, Max: time.Second},
AgentURL: "http://localhost:8126",
FlushInterval: Range[time.Duration]{Min: time.Second, Max: time.Second},
AgentURL: "http://localhost:8126",

// Empty transport to avoid sending data to the agent
HTTPClient: &http.Client{
Expand Down Expand Up @@ -1326,11 +1333,9 @@ func BenchmarkWorstCaseScenarioFloodMetrics(b *testing.B) {
clientConfig := ClientConfig{
HeartbeatInterval: time.Hour,
ExtendedHeartbeatInterval: time.Hour,
FlushIntervalRange: struct {
Min time.Duration
Max time.Duration
}{Min: time.Second, Max: time.Second},
AgentURL: "http://localhost:8126",
FlushInterval: Range[time.Duration]{Min: time.Second, Max: time.Second},
DistributionsSize: Range[int]{256, -1},
AgentURL: "http://localhost:8126",

// Empty transport to avoid sending data to the agent
HTTPClient: &http.Client{
Expand Down
13 changes: 4 additions & 9 deletions internal/newtelemetry/distributions.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type distributions struct {

// LoadOrStore returns a MetricHandle for the given distribution metric. If the metric key does not exist, it will be created.
func (d *distributions) LoadOrStore(namespace Namespace, name string, tags map[string]string) MetricHandle {
if !knownmetrics.IsKnownMetricName(name) {
if !knownmetrics.IsKnownMetric(namespace, "distribution", name) {
log.Debug("telemetry: metric name %q is not a known metric, please update the list of metrics name or check that your wrote the name correctly. "+
"The metric will still be sent.", name)
}
Expand All @@ -41,11 +41,6 @@ func (d *distributions) LoadOrStore(namespace Namespace, name string, tags map[s

key := distributionKey{namespace: namespace, name: name, tags: strings.TrimSuffix(compiledTags, ",")}

// Max size is a 2^14 array of float64 (2^3 bytes) which makes a distribution 128KB bytes array _at worse_.
// Considering we add a point per user request on a simple http server, we would be losing data after 2^14 requests per minute or about 280 requests per second or under 3ms per request.
// If this throughput is constant, the telemetry client flush ticker speed will increase to, at best, double twice to flush 15 seconds of data each time.
// Which will bring our max throughput to 1100 points per second or about 750µs per request.
// TODO: tweak this value once we get telemetry data from the telemetry client
handle, _ := d.store.LoadOrStore(key, &distribution{key: key, values: internal.NewRingQueue[float64](1<<8, 1<<14)})

return handle
Expand Down Expand Up @@ -74,12 +69,12 @@ type distribution struct {
values *internal.RingQueue[float64]
}

var logLossOnce sync.Once
var distrLogLossOnce sync.Once

func (d *distribution) Submit(value float64) {
d.newSubmit.Store(true)
if !d.values.Enqueue(value) {
logLossOnce.Do(func() {
distrLogLossOnce.Do(func() {
log.Debug("telemetry: distribution %q is losing values because the buffer is full", d.key.name)
})
}
Expand All @@ -99,7 +94,7 @@ func (d *distribution) payload() transport.DistributionSeries {
Metric: d.key.name,
Namespace: d.key.namespace,
Tags: tags,
Common: knownmetrics.IsCommonMetricName(d.key.name),
Common: knownmetrics.IsCommonMetric(d.key.namespace, "distribution", d.key.name),
Points: d.values.Flush(),
}

Expand Down
Loading

0 comments on commit 05b010c

Please sign in to comment.