From 61c0fb494b5aa4de0c25c90823ff6d630186b2a9 Mon Sep 17 00:00:00 2001 From: Sumeet Kumar Rai Date: Tue, 12 Dec 2023 16:13:40 +0530 Subject: [PATCH 1/7] removed redundant statsd code from meteor --- agent/agent.go | 15 +- agent/agent_test.go | 40 ++--- agent/config.go | 2 +- cmd/run.go | 21 +-- config/config.go | 2 - config/config_test.go | 4 - config/meteor.yaml.sample | 2 - config/testdata/valid-config.yaml | 3 - go.mod | 1 - go.sum | 2 - metrics/statsd.go | 103 ------------- metrics/statsd_test.go | 244 ------------------------------ 12 files changed, 28 insertions(+), 411 deletions(-) delete mode 100644 metrics/statsd.go delete mode 100644 metrics/statsd_test.go diff --git a/agent/agent.go b/agent/agent.go index 63c58cdab..a30a34a76 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -27,7 +27,7 @@ type Agent struct { extractorFactory *registry.ExtractorFactory processorFactory *registry.ProcessorFactory sinkFactory *registry.SinkFactory - monitor []Monitor + monitor Monitor logger log.Logger retrier *retrier stopOnSinkError bool @@ -279,9 +279,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s } retryNotification := func(e error, d time.Duration) { - for _, mt := range r.monitor { - mt.RecordSinkRetryCount(ctx, pluginInfo) - } + + r.monitor.RecordSinkRetryCount(ctx, pluginInfo) r.logger.Warn( fmt.Sprintf("retrying sink in %s", d), @@ -302,9 +301,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s ) pluginInfo.Success = err == nil - for _, mt := range r.monitor { - mt.RecordPlugin(ctx, pluginInfo) // this can be deleted when statsd is removed - } + if err != nil { // once it reaches here, it means that the retry has been exhausted and still got error r.logger.Error("error running sink", "sink", sr.Name, "error", err.Error()) @@ -328,9 +325,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s } func (r *Agent) logAndRecordMetrics(ctx context.Context, run Run) { - for _, monitor := range r.monitor { - monitor.RecordRun(ctx, run) - } + r.monitor.RecordRun(ctx, run) if run.Success { r.logger.Info("done running recipe", diff --git a/agent/agent_test.go b/agent/agent_test.go index 36d76af54..95ff12538 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -82,7 +82,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -113,7 +113,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: registry.NewProcessorFactory(), SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -146,7 +146,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: registry.NewSinkFactory(), Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -183,7 +183,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -222,7 +222,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -263,7 +263,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -305,7 +305,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -346,7 +346,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -394,7 +394,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -441,7 +441,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -491,7 +491,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.True(t, run.Success) @@ -542,7 +542,7 @@ func TestAgentRun(t *testing.T) { SinkFactory: sf, Logger: utils.Logger, StopOnSinkError: true, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) @@ -594,7 +594,7 @@ func TestAgentRun(t *testing.T) { SinkFactory: sf, Logger: utils.Logger, StopOnSinkError: false, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) @@ -657,7 +657,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.NoError(t, run.Error) @@ -713,7 +713,7 @@ func TestAgentRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, Logger: utils.Logger, TimerFn: timerFn, }) @@ -769,7 +769,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time }) @@ -824,7 +824,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time }) @@ -884,7 +884,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, MaxRetries: 5, RetryInitialInterval: 10 * time.Second, }) @@ -1065,7 +1065,7 @@ func TestAgentRunMultiple(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) runs := r.RunMultiple(ctx, recipeList) @@ -1152,7 +1152,7 @@ func TestValidate(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{newMockMonitor()}, + Monitor: newMockMonitor(), }) var expectedErrs []error diff --git a/agent/config.go b/agent/config.go index 7b7d07eeb..1b6e8caf8 100644 --- a/agent/config.go +++ b/agent/config.go @@ -11,7 +11,7 @@ type Config struct { ExtractorFactory *registry.ExtractorFactory ProcessorFactory *registry.ProcessorFactory SinkFactory *registry.SinkFactory - Monitor []Monitor + Monitor Monitor Logger log.Logger MaxRetries int RetryInitialInterval time.Duration diff --git a/cmd/run.go b/cmd/run.go index b8217effb..479bcd133 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -70,16 +70,7 @@ func RunCmd() *cobra.Command { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - var mts []agent.Monitor - if cfg.StatsdEnabled { - mt, err := newStatsdMonitor(cfg) - if err != nil { - return err - } - - mts = append(mts, mt) - - } + var mts agent.Monitor if cfg.OtelEnabled { doneOtlp, err := metrics.InitOtel(ctx, cfg, lg, Version) @@ -88,7 +79,7 @@ func RunCmd() *cobra.Command { } defer doneOtlp() - mts = append(mts, metrics.NewOtelMonitor()) + mts = metrics.NewOtelMonitor() } runner := agent.NewAgent(agent.Config{ @@ -156,11 +147,3 @@ func RunCmd() *cobra.Command { return cmd } - -func newStatsdMonitor(cfg config.Config) (*metrics.StatsdMonitor, error) { - client, err := metrics.NewStatsdClient(cfg.StatsdHost) - if err != nil { - return nil, err - } - return metrics.NewStatsdMonitor(client, cfg.AppName), nil -} diff --git a/config/config.go b/config/config.go index 3439e152e..edd7892fc 100644 --- a/config/config.go +++ b/config/config.go @@ -14,8 +14,6 @@ type Config struct { MaxRetries int `mapstructure:"MAX_RETRIES" default:"5"` RetryInitialIntervalSeconds int `mapstructure:"RETRY_INITIAL_INTERVAL_SECONDS" default:"5"` StopOnSinkError bool `mapstructure:"STOP_ON_SINK_ERROR" default:"false"` - StatsdEnabled bool `mapstructure:"STATSD_ENABLED" default:"false"` - StatsdHost string `mapstructure:"STATSD_HOST" default:"localhost:8125"` OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"` OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"` OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"` diff --git a/config/config_test.go b/config/config_test.go index 98c46dceb..56d28055d 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -25,8 +25,6 @@ func TestLoad(t *testing.T) { expected: config.Config{ AppName: "meteor", LogLevel: "info", - StatsdEnabled: false, - StatsdHost: "localhost:8125", OtelEnabled: false, OtelCollectorAddr: "localhost:4317", OtelTraceSampleProbability: 1, @@ -43,8 +41,6 @@ func TestLoad(t *testing.T) { expected: config.Config{ AppName: "meteor", LogLevel: "info", - StatsdEnabled: false, - StatsdHost: "localhost:8125", OtelEnabled: false, OtelCollectorAddr: "localhost:4317", OtelTraceSampleProbability: 1, diff --git a/config/meteor.yaml.sample b/config/meteor.yaml.sample index cc580e041..f660860ba 100644 --- a/config/meteor.yaml.sample +++ b/config/meteor.yaml.sample @@ -1,6 +1,4 @@ LOG_LEVEL: info -STATSD_ENABLED: false -STATSD_HOST: "localhost:8125" MAX_RETRIES: 5 RETRY_INITIAL_INTERVAL_SECONDS: 5 STOP_ON_SINK_ERROR: false diff --git a/config/testdata/valid-config.yaml b/config/testdata/valid-config.yaml index b2edb1b3b..740117186 100644 --- a/config/testdata/valid-config.yaml +++ b/config/testdata/valid-config.yaml @@ -1,7 +1,4 @@ LOG_LEVEL: info -STATSD_ENABLED: false -STATSD_HOST: "localhost:8125" -STATSD_PREFIX: meteor MAX_RETRIES: 5 RETRY_INITIAL_INTERVAL_SECONDS: 5 STOP_ON_SINK_ERROR: false \ No newline at end of file diff --git a/go.mod b/go.mod index 3bc0db618..97ba9245b 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/denisenkom/go-mssqldb v0.12.3 github.com/dnaeon/go-vcr/v2 v2.0.1 github.com/elastic/go-elasticsearch/v8 v8.0.0-20210708134649-33f644c8e327 - github.com/etsy/statsd v0.9.0 github.com/go-kivik/couchdb v2.0.0+incompatible github.com/go-kivik/kivik v2.0.0+incompatible github.com/go-playground/validator/v10 v10.11.0 diff --git a/go.sum b/go.sum index bd5351530..c49baeea0 100644 --- a/go.sum +++ b/go.sum @@ -555,8 +555,6 @@ github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo= github.com/envoyproxy/protoc-gen-validate v0.10.0 h1:oIfnZFdC0YhpNNEX+SuIqko4cqqVZeN9IGTrhZje83Y= github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= -github.com/etsy/statsd v0.9.0 h1:GLP1pAzn1fGE7/kM2S5QXSU0ZTUV6QnZsyZVMx7IVF4= -github.com/etsy/statsd v0.9.0/go.mod h1:rmx2gVm1TEkQUIcU/KAM4prmC/AAUU8Wndeule9gvW4= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= diff --git a/metrics/statsd.go b/metrics/statsd.go deleted file mode 100644 index f95d02475..000000000 --- a/metrics/statsd.go +++ /dev/null @@ -1,103 +0,0 @@ -package metrics - -import ( - "context" - "fmt" - "net" - "strconv" - - statsd "github.com/etsy/statsd/examples/go" - "github.com/goto/meteor/agent" - "github.com/goto/meteor/recipe" -) - -const ( - runDurationMetricName = "runDuration" - runRecordCountMetricName = "runRecordCount" - runMetricName = "run" - pluginRunMetricName = "runPlugin" -) - -// StatsdMonitor represents the statsd monitor. -type StatsdMonitor struct { - client statsdClient - prefix string -} - -// NewStatsdMonitor creates a new StatsdMonitor -func NewStatsdMonitor(client statsdClient, prefix string) *StatsdMonitor { - return &StatsdMonitor{ - client: client, - prefix: prefix, - } -} - -// RecordRun records a run behavior -func (m *StatsdMonitor) RecordRun(_ context.Context, run agent.Run) { - m.client.Timing( - m.createMetricName(runDurationMetricName, run.Recipe, run.Success), - int64(run.DurationInMs), - ) - m.client.Increment( - m.createMetricName(runMetricName, run.Recipe, run.Success), - ) - m.client.IncrementByValue( - m.createMetricName(runRecordCountMetricName, run.Recipe, run.Success), - run.RecordCount, - ) -} - -// RecordPlugin records a individual plugin behavior in a run -func (m *StatsdMonitor) RecordPlugin(_ context.Context, pluginInfo agent.PluginInfo) { - m.client.Increment( - fmt.Sprintf( - "%s.%s,recipe_name=%s,name=%s,type=%s,success=%t", - m.prefix, - pluginRunMetricName, - pluginInfo.RecipeName, - pluginInfo.PluginName, - pluginInfo.PluginType, - pluginInfo.Success, - ), - ) -} - -func (*StatsdMonitor) RecordSinkRetryCount(context.Context, agent.PluginInfo) {} - -// createMetricName creates a metric name for a given recipe and success -func (m *StatsdMonitor) createMetricName(metricName string, recipe recipe.Recipe, success bool) string { - successText := "false" - if success { - successText = "true" - } - - return fmt.Sprintf( - "%s.%s,name=%s,success=%s,extractor=%s", - m.prefix, - metricName, - recipe.Name, - successText, - recipe.Source.Name, - ) -} - -type statsdClient interface { - Timing(string, int64) - Increment(string) - IncrementByValue(string, int) -} - -// NewStatsdClient returns a new statsd client if the given address is valid -func NewStatsdClient(statsdAddress string) (*statsd.StatsdClient, error) { - host, portStr, err := net.SplitHostPort(statsdAddress) - if err != nil { - return nil, fmt.Errorf("split the network address: %w", err) - } - - port, err := strconv.Atoi(portStr) - if err != nil { - return nil, fmt.Errorf("convert port type: %w", err) - } - - return statsd.New(host, port), nil -} diff --git a/metrics/statsd_test.go b/metrics/statsd_test.go deleted file mode 100644 index 5ab076b51..000000000 --- a/metrics/statsd_test.go +++ /dev/null @@ -1,244 +0,0 @@ -package metrics_test - -import ( - "context" - "fmt" - "log" - "os" - "testing" - - "github.com/goto/meteor/agent" - "github.com/goto/meteor/metrics" - "github.com/goto/meteor/recipe" - "github.com/goto/meteor/test/utils" - "github.com/ory/dockertest/v3" - "github.com/ory/dockertest/v3/docker" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -type mockStatsdClient struct { - mock.Mock -} - -func (c *mockStatsdClient) Timing(name string, val int64) { - c.Called(name, val) -} - -func (c *mockStatsdClient) IncrementByValue(name string, val int) { - c.Called(name, val) -} - -func (c *mockStatsdClient) Increment(name string) { - c.Called(name) -} - -var port = "8125" - -func TestMain(m *testing.M) { - // setup test - opts := dockertest.RunOptions{ - Repository: "statsd/statsd", - Platform: "linux/amd64", - Tag: "latest", - Env: []string{ - "MYSQL_ALLOW_EMPTY_PASSWORD=true", - }, - ExposedPorts: []string{"8125", port}, - PortBindings: map[docker.Port][]docker.PortBinding{ - "8125": { - {HostIP: "0.0.0.0", HostPort: port}, - }, - }, - } - // exponential backoff-retry, because the application in the container might not be ready to accept connections yet - retryFn := func(resource *dockertest.Resource) error { - c, err := metrics.NewStatsdClient("127.0.0.1:" + port) - if err != nil { - return err - } - c.Open() - return nil - } - purgeFn, err := utils.CreateContainer(opts, retryFn) - if err != nil { - log.Fatal(err) - } - - // run tests - code := m.Run() - - // clean tests - if err := purgeFn(); err != nil { - log.Fatal(err) - } - os.Exit(code) -} - -func TestStatsdMonitorRecordRun(t *testing.T) { - statsdPrefix := "testprefix" - - t.Run("should create metrics with the correct name and value", func(t *testing.T) { - recipe := recipe.Recipe{ - Name: "test-recipe", - Source: recipe.PluginRecipe{ - Name: "mysql", - }, - } - duration := 100 - recordCount := 2 - timingMetric := fmt.Sprintf( - "%s.runDuration,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "false", - recipe.Source.Name, - ) - incrementMetric := fmt.Sprintf( - "%s.run,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "false", - recipe.Source.Name, - ) - recordIncrementMetric := fmt.Sprintf( - "%s.runRecordCount,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "false", - recipe.Source.Name, - ) - - client := new(mockStatsdClient) - client.On("Timing", timingMetric, int64(duration)) - client.On("Increment", incrementMetric) - client.On("IncrementByValue", recordIncrementMetric, recordCount) - defer client.AssertExpectations(t) - - monitor := metrics.NewStatsdMonitor(client, statsdPrefix) - monitor.RecordRun(context.Background(), agent.Run{Recipe: recipe, DurationInMs: duration, RecordCount: recordCount, Success: false}) - }) - - t.Run("should set success field to true on success", func(t *testing.T) { - recipe := recipe.Recipe{ - Name: "test-recipe", - Source: recipe.PluginRecipe{ - Name: "bigquery", - }, - } - duration := 100 - recordCount := 2 - timingMetric := fmt.Sprintf( - "%s.runDuration,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "true", - recipe.Source.Name, - ) - incrementMetric := fmt.Sprintf( - "%s.run,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "true", - recipe.Source.Name, - ) - recordIncrementMetric := fmt.Sprintf( - "%s.runRecordCount,name=%s,success=%s,extractor=%s", - statsdPrefix, - recipe.Name, - "true", - recipe.Source.Name, - ) - - client := new(mockStatsdClient) - client.On("Timing", timingMetric, int64(duration)) - client.On("Increment", incrementMetric) - client.On("IncrementByValue", recordIncrementMetric, recordCount) - defer client.AssertExpectations(t) - - monitor := metrics.NewStatsdMonitor(client, statsdPrefix) - monitor.RecordRun(context.Background(), agent.Run{Recipe: recipe, DurationInMs: duration, RecordCount: recordCount, Success: true}) - }) -} - -func TestStatsdMonitorRecordPlugin(t *testing.T) { - statsdPrefix := "testprefix" - - t.Run("should create metrics with the correct name and value", func(t *testing.T) { - recipe := recipe.Recipe{ - Name: "test-recipe", - Source: recipe.PluginRecipe{ - Name: "mysql", - }, - Sinks: []recipe.PluginRecipe{ - {Name: "test-sink"}, - }, - } - incrementMetric := fmt.Sprintf( - "%s.%s,recipe_name=%s,name=%s,type=%s,success=%t", - statsdPrefix, - "runPlugin", - recipe.Name, - recipe.Sinks[0].Name, - "sink", - false, - ) - - client := new(mockStatsdClient) - client.On("Increment", incrementMetric) - defer client.AssertExpectations(t) - - monitor := metrics.NewStatsdMonitor(client, statsdPrefix) - monitor.RecordPlugin(context.Background(), agent.PluginInfo{ - RecipeName: recipe.Name, - PluginName: recipe.Sinks[0].Name, - PluginType: "sink", - Success: false, - }) - }) - - t.Run("should set success field to true on success", func(t *testing.T) { - recipe := recipe.Recipe{ - Name: "test-recipe", - Source: recipe.PluginRecipe{ - Name: "bigquery", - }, - Sinks: []recipe.PluginRecipe{ - {Name: "test-sink"}, - }, - } - incrementMetric := fmt.Sprintf( - "%s.%s,recipe_name=%s,name=%s,type=%s,success=%t", - statsdPrefix, - "runPlugin", - recipe.Name, - recipe.Sinks[0].Name, - "sink", - true, - ) - - client := new(mockStatsdClient) - client.On("Increment", incrementMetric) - defer client.AssertExpectations(t) - - monitor := metrics.NewStatsdMonitor(client, statsdPrefix) - monitor.RecordPlugin(context.Background(), - agent.PluginInfo{ - RecipeName: recipe.Name, - PluginName: recipe.Sinks[0].Name, - PluginType: "sink", - Success: true, - }) - }) -} - -func TestNewStatsClient(t *testing.T) { - t.Run("should return error for invalid address", func(t *testing.T) { - _, err := metrics.NewStatsdClient("127.0.0.1") - assert.Error(t, err) - }) - t.Run("should return error for invalid port", func(t *testing.T) { - _, err := metrics.NewStatsdClient("127.0.0.1:81A5") - assert.Error(t, err) - }) -} From cfc5da9c49bbaa13c4fa05b6392633ee3063b00f Mon Sep 17 00:00:00 2001 From: Sumeet Kumar Rai Date: Tue, 12 Dec 2023 16:35:44 +0530 Subject: [PATCH 2/7] fixed a unit test case --- config/testdata/invalid-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/testdata/invalid-config.yaml b/config/testdata/invalid-config.yaml index fb63b1b38..4baef74aa 100644 --- a/config/testdata/invalid-config.yaml +++ b/config/testdata/invalid-config.yaml @@ -1 +1 @@ -STATSD_ENABLED: not-a-boolean \ No newline at end of file +OTEL_ENABLED: not-a-boolean \ No newline at end of file From 065b3d9e88d04a912d073711c6fe5e32fa28025e Mon Sep 17 00:00:00 2001 From: Sumeet Kumar Rai Date: Tue, 12 Dec 2023 17:17:04 +0530 Subject: [PATCH 3/7] refactored in attempt to resolve lint error --- agent/agent.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index a30a34a76..127ec87d5 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -279,7 +279,6 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s } retryNotification := func(e error, d time.Duration) { - r.monitor.RecordSinkRetryCount(ctx, pluginInfo) r.logger.Warn( @@ -301,7 +300,6 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s ) pluginInfo.Success = err == nil - if err != nil { // once it reaches here, it means that the retry has been exhausted and still got error r.logger.Error("error running sink", "sink", sr.Name, "error", err.Error()) From b63e8b83afd4cf3d8fae82de30ffa48dda3aae2a Mon Sep 17 00:00:00 2001 From: Sumeet Kumar Rai Date: Wed, 13 Dec 2023 12:39:40 +0530 Subject: [PATCH 4/7] reverting to keep a list of monitors for extensibility --- agent/agent.go | 10 +++++++--- agent/agent_test.go | 40 ++++++++++++++++++++-------------------- agent/config.go | 2 +- cmd/run.go | 4 ++-- 4 files changed, 30 insertions(+), 26 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 127ec87d5..2282a4f23 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -27,7 +27,7 @@ type Agent struct { extractorFactory *registry.ExtractorFactory processorFactory *registry.ProcessorFactory sinkFactory *registry.SinkFactory - monitor Monitor + monitor []Monitor logger log.Logger retrier *retrier stopOnSinkError bool @@ -279,7 +279,9 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s } retryNotification := func(e error, d time.Duration) { - r.monitor.RecordSinkRetryCount(ctx, pluginInfo) + for _, mt := range r.monitor{ + mt.RecordSinkRetryCount(ctx, pluginInfo) + } r.logger.Warn( fmt.Sprintf("retrying sink in %s", d), @@ -323,7 +325,9 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s } func (r *Agent) logAndRecordMetrics(ctx context.Context, run Run) { - r.monitor.RecordRun(ctx, run) + for _, monitor := range r.monitor { + monitor.RecordRun(ctx, run) + } if run.Success { r.logger.Info("done running recipe", diff --git a/agent/agent_test.go b/agent/agent_test.go index 95ff12538..36d76af54 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -82,7 +82,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -113,7 +113,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: registry.NewProcessorFactory(), SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -146,7 +146,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: registry.NewSinkFactory(), Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -183,7 +183,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -222,7 +222,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -263,7 +263,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -305,7 +305,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -346,7 +346,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -394,7 +394,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -441,7 +441,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -491,7 +491,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.True(t, run.Success) @@ -542,7 +542,7 @@ func TestAgentRun(t *testing.T) { SinkFactory: sf, Logger: utils.Logger, StopOnSinkError: true, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) @@ -594,7 +594,7 @@ func TestAgentRun(t *testing.T) { SinkFactory: sf, Logger: utils.Logger, StopOnSinkError: false, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) @@ -657,7 +657,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) run := r.Run(ctx, validRecipe) assert.NoError(t, run.Error) @@ -713,7 +713,7 @@ func TestAgentRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, Logger: utils.Logger, TimerFn: timerFn, }) @@ -769,7 +769,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time }) @@ -824,7 +824,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time }) @@ -884,7 +884,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, MaxRetries: 5, RetryInitialInterval: 10 * time.Second, }) @@ -1065,7 +1065,7 @@ func TestAgentRunMultiple(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: monitor, + Monitor: []agent.Monitor{monitor}, }) runs := r.RunMultiple(ctx, recipeList) @@ -1152,7 +1152,7 @@ func TestValidate(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: newMockMonitor(), + Monitor: []agent.Monitor{newMockMonitor()}, }) var expectedErrs []error diff --git a/agent/config.go b/agent/config.go index 1b6e8caf8..7b7d07eeb 100644 --- a/agent/config.go +++ b/agent/config.go @@ -11,7 +11,7 @@ type Config struct { ExtractorFactory *registry.ExtractorFactory ProcessorFactory *registry.ProcessorFactory SinkFactory *registry.SinkFactory - Monitor Monitor + Monitor []Monitor Logger log.Logger MaxRetries int RetryInitialInterval time.Duration diff --git a/cmd/run.go b/cmd/run.go index 479bcd133..3cb1acaf1 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -70,7 +70,7 @@ func RunCmd() *cobra.Command { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - var mts agent.Monitor + var mts []agent.Monitor if cfg.OtelEnabled { doneOtlp, err := metrics.InitOtel(ctx, cfg, lg, Version) @@ -79,7 +79,7 @@ func RunCmd() *cobra.Command { } defer doneOtlp() - mts = metrics.NewOtelMonitor() + mts = append(mts, metrics.NewOtelMonitor()) } runner := agent.NewAgent(agent.Config{ From 4894aa74e22e940036db1057d5db4d899ff612a1 Mon Sep 17 00:00:00 2001 From: Sumeet Kumar Rai Date: Wed, 13 Dec 2023 12:40:51 +0530 Subject: [PATCH 5/7] formatting to keep older code as it was --- agent/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/agent.go b/agent/agent.go index 2282a4f23..885ab9c34 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -279,7 +279,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s } retryNotification := func(e error, d time.Duration) { - for _, mt := range r.monitor{ + for _, mt := range r.monitor { mt.RecordSinkRetryCount(ctx, pluginInfo) } From 2cdb9c3f1deaf43b6ccb7442b3b3551e760026c9 Mon Sep 17 00:00:00 2001 From: Sumeet Kumar Rai Date: Wed, 13 Dec 2023 20:34:13 +0530 Subject: [PATCH 6/7] recordPlugin method mock method call not needed now --- agent/agent_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index 36d76af54..42e0163db 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -483,7 +483,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -533,7 +533,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -585,7 +585,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -649,7 +649,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -706,7 +706,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -761,7 +761,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -815,7 +815,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() monitor.On("RecordSinkRetryCount", mockCtx, mock.AnythingOfType("agent.PluginInfo")) defer monitor.AssertExpectations(t) @@ -875,7 +875,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", utils.OfTypeContext(), mock.AnythingOfType("agent.Run")).Once() - monitor.On("RecordPlugin", utils.OfTypeContext(), mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", utils.OfTypeContext(), mock.AnythingOfType("agent.PluginInfo")).Maybe() monitor.On("RecordSinkRetryCount", utils.OfTypeContext(), mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) @@ -1057,7 +1057,7 @@ func TestAgentRunMultiple(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")) - monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")) + monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe() defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ From d2ce4ac7c21a01a46fc49a13f0b6c4aeeb860784 Mon Sep 17 00:00:00 2001 From: Sumeet Kumar Rai Date: Thu, 14 Dec 2023 11:04:20 +0530 Subject: [PATCH 7/7] removed list as only otel is being supported now --- agent/agent.go | 10 +++++----- agent/agent_test.go | 40 ++++++++++++++++++++-------------------- agent/config.go | 2 +- cmd/run.go | 4 ++-- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 885ab9c34..27f8f1d00 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -27,7 +27,7 @@ type Agent struct { extractorFactory *registry.ExtractorFactory processorFactory *registry.ProcessorFactory sinkFactory *registry.SinkFactory - monitor []Monitor + monitor Monitor logger log.Logger retrier *retrier stopOnSinkError bool @@ -279,8 +279,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s } retryNotification := func(e error, d time.Duration) { - for _, mt := range r.monitor { - mt.RecordSinkRetryCount(ctx, pluginInfo) + if r.monitor != nil { + r.monitor.RecordSinkRetryCount(ctx, pluginInfo) } r.logger.Warn( @@ -325,8 +325,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s } func (r *Agent) logAndRecordMetrics(ctx context.Context, run Run) { - for _, monitor := range r.monitor { - monitor.RecordRun(ctx, run) + if r.monitor != nil { + r.monitor.RecordRun(ctx, run) } if run.Success { diff --git a/agent/agent_test.go b/agent/agent_test.go index 42e0163db..fef9434ed 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -82,7 +82,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -113,7 +113,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: registry.NewProcessorFactory(), SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -146,7 +146,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: registry.NewSinkFactory(), Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -183,7 +183,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -222,7 +222,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -263,7 +263,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -305,7 +305,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -346,7 +346,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -394,7 +394,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -441,7 +441,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.False(t, run.Success) @@ -491,7 +491,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.True(t, run.Success) @@ -542,7 +542,7 @@ func TestAgentRun(t *testing.T) { SinkFactory: sf, Logger: utils.Logger, StopOnSinkError: true, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) @@ -594,7 +594,7 @@ func TestAgentRun(t *testing.T) { SinkFactory: sf, Logger: utils.Logger, StopOnSinkError: false, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) @@ -657,7 +657,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) run := r.Run(ctx, validRecipe) assert.NoError(t, run.Error) @@ -713,7 +713,7 @@ func TestAgentRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, Logger: utils.Logger, TimerFn: timerFn, }) @@ -769,7 +769,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time }) @@ -824,7 +824,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time }) @@ -884,7 +884,7 @@ func TestAgentRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, MaxRetries: 5, RetryInitialInterval: 10 * time.Second, }) @@ -1065,7 +1065,7 @@ func TestAgentRunMultiple(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{monitor}, + Monitor: monitor, }) runs := r.RunMultiple(ctx, recipeList) @@ -1152,7 +1152,7 @@ func TestValidate(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Logger: utils.Logger, - Monitor: []agent.Monitor{newMockMonitor()}, + Monitor: newMockMonitor(), }) var expectedErrs []error diff --git a/agent/config.go b/agent/config.go index 7b7d07eeb..1b6e8caf8 100644 --- a/agent/config.go +++ b/agent/config.go @@ -11,7 +11,7 @@ type Config struct { ExtractorFactory *registry.ExtractorFactory ProcessorFactory *registry.ProcessorFactory SinkFactory *registry.SinkFactory - Monitor []Monitor + Monitor Monitor Logger log.Logger MaxRetries int RetryInitialInterval time.Duration diff --git a/cmd/run.go b/cmd/run.go index 3cb1acaf1..479bcd133 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -70,7 +70,7 @@ func RunCmd() *cobra.Command { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - var mts []agent.Monitor + var mts agent.Monitor if cfg.OtelEnabled { doneOtlp, err := metrics.InitOtel(ctx, cfg, lg, Version) @@ -79,7 +79,7 @@ func RunCmd() *cobra.Command { } defer doneOtlp() - mts = append(mts, metrics.NewOtelMonitor()) + mts = metrics.NewOtelMonitor() } runner := agent.NewAgent(agent.Config{