diff --git a/internal/newtelemetry/api.go b/internal/newtelemetry/api.go index 8a73c598dc..066e4d1f2e 100644 --- a/internal/newtelemetry/api.go +++ b/internal/newtelemetry/api.go @@ -107,9 +107,9 @@ type Client interface { // Flush closes the client and flushes any remaining data. Flush() - // appStart sends the telemetry necessary to signal that the app is starting. - appStart() + // AppStart sends the telemetry necessary to signal that the app is starting. + AppStart() - // appStop sends the telemetry necessary to signal that the app is stopping. - appStop() + // AppStop sends the telemetry necessary to signal that the app is stopping. + AppStop() } diff --git a/internal/newtelemetry/client.go b/internal/newtelemetry/client.go index 36d4ac5cfc..5361689199 100644 --- a/internal/newtelemetry/client.go +++ b/internal/newtelemetry/client.go @@ -308,13 +308,13 @@ func (c *client) computeFlushMetrics(results []internal.EndpointRequestResult, e c.Distribution(transport.NamespaceTelemetry, "telemetry_api.ms", map[string]string{"endpoint": endpoint}).Submit(float64(successfulCall.CallDuration.Milliseconds())) } -func (c *client) appStart() { +func (c *client) AppStart() { c.flushMapperMu.Lock() defer c.flushMapperMu.Unlock() c.flushMapper = mapper.NewAppStartedMapper(c.flushMapper) } -func (c *client) appStop() { +func (c *client) AppStop() { c.flushMapperMu.Lock() defer c.flushMapperMu.Unlock() c.flushMapper = mapper.NewAppClosingMapper(c.flushMapper) diff --git a/internal/newtelemetry/client_test.go b/internal/newtelemetry/client_test.go index af568ccfbd..5eb9f45ec0 100644 --- a/internal/newtelemetry/client_test.go +++ b/internal/newtelemetry/client_test.go @@ -312,7 +312,7 @@ func TestClientFlush(t *testing.T) { { name: "app-started", when: func(c *client) { - c.appStart() + c.AppStart() }, expect: func(t *testing.T, payloads []transport.Payload) { payload := payloads[0] @@ -326,7 +326,7 @@ func TestClientFlush(t *testing.T) { { name: "app-started-with-product", when: func(c *client) { - c.appStart() + c.AppStart() c.ProductStarted("test-product") }, expect: func(t *testing.T, payloads []transport.Payload) { @@ -339,7 +339,7 @@ func TestClientFlush(t *testing.T) { { name: "app-started-with-configuration", when: func(c *client) { - c.appStart() + c.AppStart() c.AddAppConfig("key", "value", OriginDefault) }, expect: func(t *testing.T, payloads []transport.Payload) { @@ -354,7 +354,7 @@ func TestClientFlush(t *testing.T) { { name: "app-started+integrations", when: func(c *client) { - c.appStart() + c.AppStart() c.MarkIntegrationAsLoaded(Integration{Name: "test-integration", Version: "1.0.0"}) }, expect: func(t *testing.T, payloads []transport.Payload) { @@ -380,7 +380,7 @@ func TestClientFlush(t *testing.T) { HeartbeatInterval: time.Nanosecond, }, when: func(c *client) { - c.appStart() + c.AppStart() }, expect: func(t *testing.T, payloads []transport.Payload) { payload := payloads[0] @@ -397,7 +397,7 @@ func TestClientFlush(t *testing.T) { { name: "app-stopped", when: func(c *client) { - c.appStop() + c.AppStop() }, expect: func(t *testing.T, payloads []transport.Payload) { payload := payloads[0] @@ -1028,7 +1028,7 @@ func TestClientEnd2End(t *testing.T) { { name: "app-start", when: func(c *client) { - c.appStart() + c.AppStart() }, expect: func(t *testing.T, bodies []transport.Body) { require.Len(t, bodies, 1) @@ -1038,7 +1038,7 @@ func TestClientEnd2End(t *testing.T) { { name: "app-stop", when: func(c *client) { - c.appStop() + c.AppStop() }, expect: func(t *testing.T, bodies []transport.Body) { require.Len(t, bodies, 1) @@ -1048,8 +1048,8 @@ func TestClientEnd2End(t *testing.T) { { name: "app-start+app-stop", when: func(c *client) { - c.appStart() - c.appStop() + c.AppStart() + c.AppStop() }, expect: func(t *testing.T, bodies []transport.Body) { require.Len(t, bodies, 2) @@ -1060,7 +1060,7 @@ func TestClientEnd2End(t *testing.T) { { name: "fail-agent-endpoint", when: func(c *client) { - c.appStart() + c.AppStart() }, roundtrip: func(_ *testing.T, req *http.Request) (*http.Response, error) { if strings.Contains(req.URL.Host, "localhost") { @@ -1077,7 +1077,7 @@ func TestClientEnd2End(t *testing.T) { { name: "fail-all-endpoint", when: func(c *client) { - c.appStart() + c.AppStart() }, roundtrip: func(_ *testing.T, _ *http.Request) (*http.Response, error) { return nil, errors.New("failed") diff --git a/internal/newtelemetry/globalclient.go b/internal/newtelemetry/globalclient.go index 53cf41e724..b6542ca3fd 100644 --- a/internal/newtelemetry/globalclient.go +++ b/internal/newtelemetry/globalclient.go @@ -35,7 +35,7 @@ func StartApp(client Client) { globalClientRecorder.Replay(client) - client.appStart() + client.AppStart() } // SwapClient swaps the global client with the given client and Flush the old (*client). @@ -64,7 +64,7 @@ func StopApp() { } if client := globalClient.Swap(nil); client != nil && *client != nil { - (*client).appStop() + (*client).AppStop() (*client).Flush() (*client).Close() } diff --git a/internal/newtelemetry/telemetrytest/telemetrytest.go b/internal/newtelemetry/telemetrytest/telemetrytest.go new file mode 100644 index 0000000000..ac6a592ca9 --- /dev/null +++ b/internal/newtelemetry/telemetrytest/telemetrytest.go @@ -0,0 +1,201 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +// Package newtelemetrytest provides a mock implementation of the newtelemetry client for testing purposes +package newtelemetrytest + +import ( + "strings" + "sync" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry" + + "github.com/stretchr/testify/mock" +) + +// MockClient implements Client and is used for testing purposes outside the newtelemetry package, +// e.g. the tracer and profiler. +type MockClient struct { + mock.Mock + mu sync.Mutex + Started bool + Stopped bool + Configuration map[string]any + Logs map[newtelemetry.LogLevel]string + Integrations []string + Products map[newtelemetry.Namespace]bool + Metrics map[metricKey]*float64 +} + +type metricKey struct { + Namespace newtelemetry.Namespace + Name string + Tags string + Kind string +} + +func (m *MockClient) Close() error { + return nil +} + +func tagsString(tags map[string]string) string { + compiledTags := "" + for k, v := range tags { + compiledTags += k + ":" + v + "," + } + return strings.TrimSuffix(compiledTags, ",") +} + +type MockMetricHandle struct { + mock.Mock + mu sync.Mutex + submit func(ptr *float64, value float64) + value *float64 +} + +func (m *MockMetricHandle) Submit(value float64) { + m.On("Submit", value) + m.mu.Lock() + defer m.mu.Unlock() + m.submit(m.value, value) +} + +func (m *MockMetricHandle) Get() float64 { + m.On("Get") + m.mu.Lock() + defer m.mu.Unlock() + return *m.value +} + +func (m *MockClient) Count(namespace newtelemetry.Namespace, name string, tags map[string]string) newtelemetry.MetricHandle { + m.mu.Lock() + defer m.mu.Unlock() + m.On("Count", namespace, name, tags) + key := metricKey{Namespace: namespace, Name: name, Tags: tagsString(tags), Kind: "count"} + if _, ok := m.Metrics[key]; !ok { + init := 0.0 + m.Metrics[key] = &init + } + + return &MockMetricHandle{value: m.Metrics[key], submit: func(ptr *float64, value float64) { + *ptr += value + }} +} + +func (m *MockClient) Rate(namespace newtelemetry.Namespace, name string, tags map[string]string) newtelemetry.MetricHandle { + m.mu.Lock() + defer m.mu.Unlock() + m.On("Rate", namespace, name, tags) + key := metricKey{Namespace: namespace, Name: name, Tags: tagsString(tags), Kind: "rate"} + if _, ok := m.Metrics[key]; !ok { + init := 0.0 + m.Metrics[key] = &init + } + + return &MockMetricHandle{value: m.Metrics[key], submit: func(ptr *float64, value float64) { + *ptr += value + }} +} + +func (m *MockClient) Gauge(namespace newtelemetry.Namespace, name string, tags map[string]string) newtelemetry.MetricHandle { + m.mu.Lock() + defer m.mu.Unlock() + m.On("Gauge", namespace, name, tags) + key := metricKey{Namespace: namespace, Name: name, Tags: tagsString(tags), Kind: "gauge"} + if _, ok := m.Metrics[key]; !ok { + init := 0.0 + m.Metrics[key] = &init + } + + return &MockMetricHandle{value: m.Metrics[key], submit: func(ptr *float64, value float64) { + *ptr = value + }} +} + +func (m *MockClient) Distribution(namespace newtelemetry.Namespace, name string, tags map[string]string) newtelemetry.MetricHandle { + m.mu.Lock() + defer m.mu.Unlock() + m.On("Distribution", namespace, name, tags) + key := metricKey{Namespace: namespace, Name: name, Tags: tagsString(tags), Kind: "distribution"} + if _, ok := m.Metrics[key]; !ok { + init := 0.0 + m.Metrics[key] = &init + } + + return &MockMetricHandle{value: m.Metrics[key], submit: func(ptr *float64, value float64) { + *ptr = value + }} +} + +func (m *MockClient) Log(level newtelemetry.LogLevel, text string, options ...newtelemetry.LogOption) { + m.mu.Lock() + defer m.mu.Unlock() + m.On("Log", level, text, options) + m.Logs[level] = text +} + +func (m *MockClient) ProductStarted(product newtelemetry.Namespace) { + m.mu.Lock() + defer m.mu.Unlock() + m.On("ProductStarted", product) + m.Products[product] = true +} + +func (m *MockClient) ProductStopped(product newtelemetry.Namespace) { + m.mu.Lock() + defer m.mu.Unlock() + m.On("ProductStopped", product) + m.Products[product] = false +} + +func (m *MockClient) ProductStartError(product newtelemetry.Namespace, err error) { + m.mu.Lock() + defer m.mu.Unlock() + m.On("ProductStartError", product, err) + m.Products[product] = false +} + +func (m *MockClient) AddAppConfig(key string, value any, origin newtelemetry.Origin) { + m.mu.Lock() + defer m.mu.Unlock() + m.On("AddAppConfig", key, value, origin) + m.Configuration[key] = value +} + +func (m *MockClient) AddBulkAppConfig(kvs map[string]any, origin newtelemetry.Origin) { + m.mu.Lock() + defer m.mu.Unlock() + m.On("AddBulkAppConfig", kvs, origin) + for k, v := range kvs { + m.Configuration[k] = v + } +} + +func (m *MockClient) MarkIntegrationAsLoaded(integration newtelemetry.Integration) { + m.mu.Lock() + defer m.mu.Unlock() + m.On("MarkIntegrationAsLoaded", integration) + m.Integrations = append(m.Integrations, integration.Name) +} + +func (m *MockClient) Flush() { + m.On("Flush") +} + +func (m *MockClient) AppStart() { + m.mu.Lock() + defer m.mu.Unlock() + m.On("AppStart") + m.Started = true +} + +func (m *MockClient) AppStop() { + m.mu.Lock() + defer m.mu.Unlock() + m.On("AppStop") + m.Stopped = true +} + +var _ newtelemetry.Client = (*MockClient)(nil)