diff --git a/x-pack/filebeat/fbreceiver/factory.go b/x-pack/filebeat/fbreceiver/factory.go index 8bc6f872df75..a08c4d575cbb 100644 --- a/x-pack/filebeat/fbreceiver/factory.go +++ b/x-pack/filebeat/fbreceiver/factory.go @@ -31,7 +31,10 @@ func createDefaultConfig() component.Config { } func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.Config, consumer consumer.Logs) (receiver.Logs, error) { - cfg := baseCfg.(*Config) + cfg, ok := baseCfg.(*Config) + if !ok { + return nil, fmt.Errorf("could not convert otel config to filebeat config") + } settings := cmd.FilebeatSettings(Name) globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors()) @@ -59,7 +62,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component. return nil, fmt.Errorf("error getting %s creator:%w", Name, err) } - return &filebeatReceiver{beat: &b.Beat, beater: fbBeater}, nil + return &filebeatReceiver{beat: &b.Beat, beater: fbBeater, logger: set.Logger}, nil } func defaultProcessors() []mapstr.M { diff --git a/x-pack/filebeat/fbreceiver/receiver.go b/x-pack/filebeat/fbreceiver/receiver.go index bb39dc57077b..eeba511dd1e7 100644 --- a/x-pack/filebeat/fbreceiver/receiver.go +++ b/x-pack/filebeat/fbreceiver/receiver.go @@ -10,21 +10,28 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "go.opentelemetry.io/collector/component" + "go.uber.org/zap" ) type filebeatReceiver struct { beat *beat.Beat beater beat.Beater + logger *zap.Logger } func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error { go func() { - _ = fb.beater.Run(fb.beat) + fb.logger.Info("starting filebeat receiver") + err := fb.beater.Run(fb.beat) + if err != nil { + fb.logger.Error("filebeat receiver run error", zap.Error(err)) + } }() return nil } func (fb *filebeatReceiver) Shutdown(ctx context.Context) error { + fb.logger.Info("stopping filebeat receiver") fb.beater.Stop() return nil } diff --git a/x-pack/metricbeat/mbreceiver/config.go b/x-pack/metricbeat/mbreceiver/config.go new file mode 100644 index 000000000000..e8fdbbce2e31 --- /dev/null +++ b/x-pack/metricbeat/mbreceiver/config.go @@ -0,0 +1,25 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mbreceiver + +import "fmt" + +// Config is config settings for metricbeat receiver. The structure of +// which is the same as the metricbeat.yml configuration file. +type Config struct { + Beatconfig map[string]interface{} `mapstructure:",remain"` +} + +// Validate checks if the configuration in valid +func (cfg *Config) Validate() error { + if len(cfg.Beatconfig) == 0 { + return fmt.Errorf("Configuration is required") + } + _, prs := cfg.Beatconfig["metricbeat"] + if !prs { + return fmt.Errorf("Configuration key 'metricbeat' is required") + } + return nil +} diff --git a/x-pack/metricbeat/mbreceiver/config_test.go b/x-pack/metricbeat/mbreceiver/config_test.go new file mode 100644 index 000000000000..25bcc101568f --- /dev/null +++ b/x-pack/metricbeat/mbreceiver/config_test.go @@ -0,0 +1,44 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mbreceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidate(t *testing.T) { + tests := map[string]struct { + c *Config + hasError bool + errorString string + }{ + "Empty config": { + c: &Config{Beatconfig: map[string]interface{}{}}, + hasError: true, + errorString: "Configuration is required", + }, + "No metricbeat section": { + c: &Config{Beatconfig: map[string]interface{}{"other": map[string]interface{}{}}}, + hasError: true, + errorString: "Configuration key 'metricbeat' is required", + }, + "Valid config": { + c: &Config{Beatconfig: map[string]interface{}{"metricbeat": map[string]interface{}{}}}, + hasError: false, + errorString: "", + }, + } + for name, tc := range tests { + err := tc.c.Validate() + if tc.hasError { + assert.NotNilf(t, err, "%s failed, should have had error", name) + assert.Equalf(t, err.Error(), tc.errorString, "%s failed, error not equal", name) + } else { + assert.Nilf(t, err, "%s failed, should not have error", name) + } + } +} diff --git a/x-pack/metricbeat/mbreceiver/factory.go b/x-pack/metricbeat/mbreceiver/factory.go new file mode 100644 index 000000000000..62ea8f5c9b5f --- /dev/null +++ b/x-pack/metricbeat/mbreceiver/factory.go @@ -0,0 +1,61 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mbreceiver + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + + "github.com/elastic/beats/v7/libbeat/cmd/instance" + "github.com/elastic/beats/v7/metricbeat/beater" + "github.com/elastic/beats/v7/metricbeat/cmd" +) + +const ( + Name = "metricbeatreceiver" +) + +func createDefaultConfig() component.Config { + return &Config{} +} + +func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.Config, consumer consumer.Logs) (receiver.Logs, error) { + cfg, ok := baseCfg.(*Config) + if !ok { + return nil, fmt.Errorf("could not convert otel config to metricbeat config") + } + settings := cmd.MetricbeatSettings(Name) + settings.ElasticLicensed = true + + b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, consumer, set.Logger.Core()) + if err != nil { + return nil, fmt.Errorf("error creating %s: %w", Name, err) + } + + beatCreator := beater.DefaultCreator() + + beatConfig, err := b.BeatConfig() + if err != nil { + return nil, fmt.Errorf("error getting beat config: %w", err) + } + + mbBeater, err := beatCreator(&b.Beat, beatConfig) + if err != nil { + return nil, fmt.Errorf("error getting %s creator:%w", Name, err) + } + + return &metricbeatReceiver{beat: &b.Beat, beater: mbBeater, logger: set.Logger}, nil +} + +func NewFactory() receiver.Factory { + return receiver.NewFactory( + component.MustNewType(Name), + createDefaultConfig, + receiver.WithLogs(createReceiver, component.StabilityLevelAlpha)) +} diff --git a/x-pack/metricbeat/mbreceiver/receiver.go b/x-pack/metricbeat/mbreceiver/receiver.go new file mode 100644 index 000000000000..848ca9347aab --- /dev/null +++ b/x-pack/metricbeat/mbreceiver/receiver.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mbreceiver + +import ( + "context" + + "github.com/elastic/beats/v7/libbeat/beat" + + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" +) + +type metricbeatReceiver struct { + beat *beat.Beat + beater beat.Beater + logger *zap.Logger +} + +func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error { + go func() { + mb.logger.Info("starting metricbeat receiver") + err := mb.beater.Run(mb.beat) + if err != nil { + mb.logger.Error("metricbeat receiver run error", zap.Error(err)) + } + }() + return nil +} + +func (mb *metricbeatReceiver) Shutdown(ctx context.Context) error { + mb.logger.Info("stopping metricbeat receiver") + mb.beater.Stop() + return nil +} diff --git a/x-pack/metricbeat/mbreceiver/receiver_test.go b/x-pack/metricbeat/mbreceiver/receiver_test.go new file mode 100644 index 000000000000..b7dac8c488e8 --- /dev/null +++ b/x-pack/metricbeat/mbreceiver/receiver_test.go @@ -0,0 +1,92 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mbreceiver + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func TestNewReceiver(t *testing.T) { + config := Config{ + Beatconfig: map[string]interface{}{ + "metricbeat": map[string]interface{}{ + "modules": []map[string]interface{}{ + { + "module": "system", + "enabled": true, + "period": "1s", + "processes": []string{".*"}, + "metricsets": []string{"cpu"}, + }, + }, + }, + "output": map[string]interface{}{ + "otelconsumer": map[string]interface{}{}, + }, + "logging": map[string]interface{}{ + "level": "debug", + "selectors": []string{ + "*", + }, + }, + "path.home": t.TempDir(), + }, + } + + var zapLogs bytes.Buffer + core := zapcore.NewCore( + zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), + zapcore.AddSync(&zapLogs), + zapcore.DebugLevel) + + receiverSettings := receiver.Settings{} + receiverSettings.Logger = zap.New(core) + + var countLogs int + logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + countLogs = countLogs + ld.LogRecordCount() + return nil + }) + require.NoError(t, err, "Error creating log consumer") + + r, err := createReceiver(context.Background(), receiverSettings, &config, logConsumer) + require.NoErrorf(t, err, "Error creating receiver. Logs:\n %s", zapLogs.String()) + err = r.Start(context.Background(), nil) + require.NoError(t, err, "Error starting metricbeatreceiver") + + ch := make(chan bool, 1) + timer := time.NewTimer(120 * time.Second) + defer timer.Stop() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for tick := ticker.C; ; { + select { + case <-timer.C: + t.Fatalf("consumed logs didn't increase\nCount: %d\nLogs: %s\n", countLogs, zapLogs.String()) + case <-tick: + tick = nil + go func() { ch <- countLogs > 0 }() + case v := <-ch: + if v { + goto found + } + tick = ticker.C + } + } +found: + err = r.Shutdown(context.Background()) + require.NoError(t, err, "Error shutting down metricbeatreceiver") +}