Skip to content

Commit

Permalink
added mock telemetry client
Browse files Browse the repository at this point in the history
Signed-off-by: Eliott Bouhana <[email protected]>
  • Loading branch information
eliottness committed Jan 30, 2025
1 parent 5380ba3 commit f654610
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 20 deletions.
8 changes: 4 additions & 4 deletions internal/newtelemetry/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ type Client interface {
// Flush closes the client and flushes any remaining data.
Flush()

// appStart sends the telemetry necessary to signal that the app is starting.
appStart()
// AppStart sends the telemetry necessary to signal that the app is starting.
AppStart()

// appStop sends the telemetry necessary to signal that the app is stopping.
appStop()
// AppStop sends the telemetry necessary to signal that the app is stopping.
AppStop()
}
4 changes: 2 additions & 2 deletions internal/newtelemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,13 @@ func (c *client) computeFlushMetrics(results []internal.EndpointRequestResult, e
c.Distribution(transport.NamespaceTelemetry, "telemetry_api.ms", map[string]string{"endpoint": endpoint}).Submit(float64(successfulCall.CallDuration.Milliseconds()))
}

func (c *client) appStart() {
func (c *client) AppStart() {
c.flushMapperMu.Lock()
defer c.flushMapperMu.Unlock()
c.flushMapper = mapper.NewAppStartedMapper(c.flushMapper)
}

func (c *client) appStop() {
func (c *client) AppStop() {
c.flushMapperMu.Lock()
defer c.flushMapperMu.Unlock()
c.flushMapper = mapper.NewAppClosingMapper(c.flushMapper)
Expand Down
24 changes: 12 additions & 12 deletions internal/newtelemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func TestClientFlush(t *testing.T) {
{
name: "app-started",
when: func(c *client) {
c.appStart()
c.AppStart()
},
expect: func(t *testing.T, payloads []transport.Payload) {
payload := payloads[0]
Expand All @@ -326,7 +326,7 @@ func TestClientFlush(t *testing.T) {
{
name: "app-started-with-product",
when: func(c *client) {
c.appStart()
c.AppStart()
c.ProductStarted("test-product")
},
expect: func(t *testing.T, payloads []transport.Payload) {
Expand All @@ -339,7 +339,7 @@ func TestClientFlush(t *testing.T) {
{
name: "app-started-with-configuration",
when: func(c *client) {
c.appStart()
c.AppStart()
c.AddAppConfig("key", "value", OriginDefault)
},
expect: func(t *testing.T, payloads []transport.Payload) {
Expand All @@ -354,7 +354,7 @@ func TestClientFlush(t *testing.T) {
{
name: "app-started+integrations",
when: func(c *client) {
c.appStart()
c.AppStart()
c.MarkIntegrationAsLoaded(Integration{Name: "test-integration", Version: "1.0.0"})
},
expect: func(t *testing.T, payloads []transport.Payload) {
Expand All @@ -380,7 +380,7 @@ func TestClientFlush(t *testing.T) {
HeartbeatInterval: time.Nanosecond,
},
when: func(c *client) {
c.appStart()
c.AppStart()
},
expect: func(t *testing.T, payloads []transport.Payload) {
payload := payloads[0]
Expand All @@ -397,7 +397,7 @@ func TestClientFlush(t *testing.T) {
{
name: "app-stopped",
when: func(c *client) {
c.appStop()
c.AppStop()
},
expect: func(t *testing.T, payloads []transport.Payload) {
payload := payloads[0]
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func TestClientEnd2End(t *testing.T) {
{
name: "app-start",
when: func(c *client) {
c.appStart()
c.AppStart()
},
expect: func(t *testing.T, bodies []transport.Body) {
require.Len(t, bodies, 1)
Expand All @@ -1038,7 +1038,7 @@ func TestClientEnd2End(t *testing.T) {
{
name: "app-stop",
when: func(c *client) {
c.appStop()
c.AppStop()
},
expect: func(t *testing.T, bodies []transport.Body) {
require.Len(t, bodies, 1)
Expand All @@ -1048,8 +1048,8 @@ func TestClientEnd2End(t *testing.T) {
{
name: "app-start+app-stop",
when: func(c *client) {
c.appStart()
c.appStop()
c.AppStart()
c.AppStop()
},
expect: func(t *testing.T, bodies []transport.Body) {
require.Len(t, bodies, 2)
Expand All @@ -1060,7 +1060,7 @@ func TestClientEnd2End(t *testing.T) {
{
name: "fail-agent-endpoint",
when: func(c *client) {
c.appStart()
c.AppStart()
},
roundtrip: func(_ *testing.T, req *http.Request) (*http.Response, error) {
if strings.Contains(req.URL.Host, "localhost") {
Expand All @@ -1077,7 +1077,7 @@ func TestClientEnd2End(t *testing.T) {
{
name: "fail-all-endpoint",
when: func(c *client) {
c.appStart()
c.AppStart()
},
roundtrip: func(_ *testing.T, _ *http.Request) (*http.Response, error) {
return nil, errors.New("failed")
Expand Down
4 changes: 2 additions & 2 deletions internal/newtelemetry/globalclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func StartApp(client Client) {

globalClientRecorder.Replay(client)

client.appStart()
client.AppStart()
}

// SwapClient swaps the global client with the given client and Flush the old (*client).
Expand Down Expand Up @@ -64,7 +64,7 @@ func StopApp() {
}

if client := globalClient.Swap(nil); client != nil && *client != nil {
(*client).appStop()
(*client).AppStop()
(*client).Flush()
(*client).Close()
}
Expand Down
201 changes: 201 additions & 0 deletions internal/newtelemetry/telemetrytest/telemetrytest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.

// Package newtelemetrytest provides a mock implementation of the newtelemetry client for testing purposes
package newtelemetrytest

import (
"strings"
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry"

"github.com/stretchr/testify/mock"
)

// MockClient implements Client and is used for testing purposes outside the newtelemetry package,
// e.g. the tracer and profiler.
type MockClient struct {
mock.Mock
mu sync.Mutex
Started bool
Stopped bool
Configuration map[string]any
Logs map[newtelemetry.LogLevel]string
Integrations []string
Products map[newtelemetry.Namespace]bool
Metrics map[metricKey]*float64
}

type metricKey struct {
Namespace newtelemetry.Namespace
Name string
Tags string
Kind string
}

func (m *MockClient) Close() error {
return nil
}

func tagsString(tags map[string]string) string {
compiledTags := ""
for k, v := range tags {
compiledTags += k + ":" + v + ","
}
return strings.TrimSuffix(compiledTags, ",")
}

type MockMetricHandle struct {
mock.Mock
mu sync.Mutex
submit func(ptr *float64, value float64)
value *float64
}

func (m *MockMetricHandle) Submit(value float64) {
m.On("Submit", value)
m.mu.Lock()
defer m.mu.Unlock()
m.submit(m.value, value)
}

func (m *MockMetricHandle) Get() float64 {
m.On("Get")
m.mu.Lock()
defer m.mu.Unlock()
return *m.value
}

func (m *MockClient) Count(namespace newtelemetry.Namespace, name string, tags map[string]string) newtelemetry.MetricHandle {
m.mu.Lock()
defer m.mu.Unlock()
m.On("Count", namespace, name, tags)
key := metricKey{Namespace: namespace, Name: name, Tags: tagsString(tags), Kind: "count"}
if _, ok := m.Metrics[key]; !ok {
init := 0.0
m.Metrics[key] = &init
}

return &MockMetricHandle{value: m.Metrics[key], submit: func(ptr *float64, value float64) {
*ptr += value
}}
}

func (m *MockClient) Rate(namespace newtelemetry.Namespace, name string, tags map[string]string) newtelemetry.MetricHandle {
m.mu.Lock()
defer m.mu.Unlock()
m.On("Rate", namespace, name, tags)
key := metricKey{Namespace: namespace, Name: name, Tags: tagsString(tags), Kind: "rate"}
if _, ok := m.Metrics[key]; !ok {
init := 0.0
m.Metrics[key] = &init
}

return &MockMetricHandle{value: m.Metrics[key], submit: func(ptr *float64, value float64) {
*ptr += value
}}
}

func (m *MockClient) Gauge(namespace newtelemetry.Namespace, name string, tags map[string]string) newtelemetry.MetricHandle {
m.mu.Lock()
defer m.mu.Unlock()
m.On("Gauge", namespace, name, tags)
key := metricKey{Namespace: namespace, Name: name, Tags: tagsString(tags), Kind: "gauge"}
if _, ok := m.Metrics[key]; !ok {
init := 0.0
m.Metrics[key] = &init
}

return &MockMetricHandle{value: m.Metrics[key], submit: func(ptr *float64, value float64) {
*ptr = value
}}
}

func (m *MockClient) Distribution(namespace newtelemetry.Namespace, name string, tags map[string]string) newtelemetry.MetricHandle {
m.mu.Lock()
defer m.mu.Unlock()
m.On("Distribution", namespace, name, tags)
key := metricKey{Namespace: namespace, Name: name, Tags: tagsString(tags), Kind: "distribution"}
if _, ok := m.Metrics[key]; !ok {
init := 0.0
m.Metrics[key] = &init
}

return &MockMetricHandle{value: m.Metrics[key], submit: func(ptr *float64, value float64) {
*ptr = value
}}
}

func (m *MockClient) Log(level newtelemetry.LogLevel, text string, options ...newtelemetry.LogOption) {
m.mu.Lock()
defer m.mu.Unlock()
m.On("Log", level, text, options)
m.Logs[level] = text
}

func (m *MockClient) ProductStarted(product newtelemetry.Namespace) {
m.mu.Lock()
defer m.mu.Unlock()
m.On("ProductStarted", product)
m.Products[product] = true
}

func (m *MockClient) ProductStopped(product newtelemetry.Namespace) {
m.mu.Lock()
defer m.mu.Unlock()
m.On("ProductStopped", product)
m.Products[product] = false
}

func (m *MockClient) ProductStartError(product newtelemetry.Namespace, err error) {
m.mu.Lock()
defer m.mu.Unlock()
m.On("ProductStartError", product, err)
m.Products[product] = false
}

func (m *MockClient) AddAppConfig(key string, value any, origin newtelemetry.Origin) {
m.mu.Lock()
defer m.mu.Unlock()
m.On("AddAppConfig", key, value, origin)
m.Configuration[key] = value
}

func (m *MockClient) AddBulkAppConfig(kvs map[string]any, origin newtelemetry.Origin) {
m.mu.Lock()
defer m.mu.Unlock()
m.On("AddBulkAppConfig", kvs, origin)
for k, v := range kvs {
m.Configuration[k] = v
}
}

func (m *MockClient) MarkIntegrationAsLoaded(integration newtelemetry.Integration) {
m.mu.Lock()
defer m.mu.Unlock()
m.On("MarkIntegrationAsLoaded", integration)
m.Integrations = append(m.Integrations, integration.Name)
}

func (m *MockClient) Flush() {
m.On("Flush")
}

func (m *MockClient) AppStart() {
m.mu.Lock()
defer m.mu.Unlock()
m.On("AppStart")
m.Started = true
}

func (m *MockClient) AppStop() {
m.mu.Lock()
defer m.mu.Unlock()
m.On("AppStop")
m.Stopped = true
}

var _ newtelemetry.Client = (*MockClient)(nil)

0 comments on commit f654610

Please sign in to comment.