Skip to content

Commit

Permalink
removed list as only otel is being supported now
Browse files Browse the repository at this point in the history
  • Loading branch information
Sumeet Kumar Rai authored and Sumeet Kumar Rai committed Dec 14, 2023
1 parent 42db9b3 commit c221dcd
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 28 deletions.
10 changes: 5 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Agent struct {
extractorFactory *registry.ExtractorFactory
processorFactory *registry.ProcessorFactory
sinkFactory *registry.SinkFactory
monitor []Monitor
monitor Monitor
logger log.Logger
retrier *retrier
stopOnSinkError bool
Expand Down Expand Up @@ -279,8 +279,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
}

retryNotification := func(e error, d time.Duration) {
for _, mt := range r.monitor {
mt.RecordSinkRetryCount(ctx, pluginInfo)
if r.monitor != nil {
r.monitor.RecordSinkRetryCount(ctx, pluginInfo)
}

r.logger.Warn(
Expand Down Expand Up @@ -325,8 +325,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
}

func (r *Agent) logAndRecordMetrics(ctx context.Context, run Run) {
for _, monitor := range r.monitor {
monitor.RecordRun(ctx, run)
if r.monitor != nil {
r.monitor.RecordRun(ctx, run)
}

if run.Success {
Expand Down
40 changes: 20 additions & 20 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: registry.NewProcessorFactory(),
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: registry.NewSinkFactory(),
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -441,7 +441,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -491,7 +491,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.True(t, run.Success)
Expand Down Expand Up @@ -542,7 +542,7 @@ func TestAgentRun(t *testing.T) {
SinkFactory: sf,
Logger: utils.Logger,
StopOnSinkError: true,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})

run := r.Run(ctx, validRecipe)
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestAgentRun(t *testing.T) {
SinkFactory: sf,
Logger: utils.Logger,
StopOnSinkError: false,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})

run := r.Run(ctx, validRecipe)
Expand Down Expand Up @@ -657,7 +657,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.NoError(t, run.Error)
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestAgentRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
Logger: utils.Logger,
TimerFn: timerFn,
})
Expand Down Expand Up @@ -769,7 +769,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
})
Expand Down Expand Up @@ -824,7 +824,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
})
Expand Down Expand Up @@ -884,7 +884,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
MaxRetries: 5,
RetryInitialInterval: 10 * time.Second,
})
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func TestAgentRunMultiple(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
runs := r.RunMultiple(ctx, recipeList)

Expand Down Expand Up @@ -1152,7 +1152,7 @@ func TestValidate(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{newMockMonitor()},
Monitor: newMockMonitor(),
})

var expectedErrs []error
Expand Down
2 changes: 1 addition & 1 deletion agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
ExtractorFactory *registry.ExtractorFactory
ProcessorFactory *registry.ProcessorFactory
SinkFactory *registry.SinkFactory
Monitor []Monitor
Monitor Monitor
Logger log.Logger
MaxRetries int
RetryInitialInterval time.Duration
Expand Down
4 changes: 2 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func RunCmd() *cobra.Command {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

var mts []agent.Monitor
var mts agent.Monitor

if cfg.OtelEnabled {
doneOtlp, err := metrics.InitOtel(ctx, cfg, lg, Version)
Expand All @@ -79,7 +79,7 @@ func RunCmd() *cobra.Command {
}
defer doneOtlp()

mts = append(mts, metrics.NewOtelMonitor())
mts = metrics.NewOtelMonitor()
}

runner := agent.NewAgent(agent.Config{
Expand Down

0 comments on commit c221dcd

Please sign in to comment.