Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metricbeat receiver #41738

Merged
merged 5 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions x-pack/filebeat/fbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ func createDefaultConfig() component.Config {
}

func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
cfg := baseCfg.(*Config)
cfg, ok := baseCfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not convert otel config to filebeat config")
}

settings := cmd.FilebeatSettings(Name)
globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors())
Expand Down Expand Up @@ -59,7 +62,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
return nil, fmt.Errorf("error getting %s creator:%w", Name, err)
}

return &filebeatReceiver{beat: &b.Beat, beater: fbBeater}, nil
return &filebeatReceiver{beat: &b.Beat, beater: fbBeater, logger: set.Logger}, nil
}

func defaultProcessors() []mapstr.M {
Expand Down
9 changes: 8 additions & 1 deletion x-pack/filebeat/fbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,28 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
)

type filebeatReceiver struct {
beat *beat.Beat
beater beat.Beater
logger *zap.Logger
}

func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error {
go func() {
_ = fb.beater.Run(fb.beat)
fb.logger.Info("starting filebeat receiver")
err := fb.beater.Run(fb.beat)
if err != nil {
fb.logger.Error("filebeat receiver run error", zap.Error(err))
}
}()
return nil
}

func (fb *filebeatReceiver) Shutdown(ctx context.Context) error {
fb.logger.Info("stopping filebeat receiver")
fb.beater.Stop()
return nil
}
25 changes: 25 additions & 0 deletions x-pack/metricbeat/mbreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package mbreceiver

import "fmt"

// Config is config settings for metricbeat receiver. The structure of
// which is the same as the metricbeat.yml configuration file.
type Config struct {
Beatconfig map[string]interface{} `mapstructure:",remain"`
}

// Validate checks if the configuration in valid
func (cfg *Config) Validate() error {
if len(cfg.Beatconfig) == 0 {
return fmt.Errorf("Configuration is required")
}
_, prs := cfg.Beatconfig["metricbeat"]
if !prs {
return fmt.Errorf("Configuration key 'metricbeat' is required")
}
return nil
}
44 changes: 44 additions & 0 deletions x-pack/metricbeat/mbreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package mbreceiver

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidate(t *testing.T) {
tests := map[string]struct {
c *Config
hasError bool
errorString string
}{
"Empty config": {
c: &Config{Beatconfig: map[string]interface{}{}},
hasError: true,
errorString: "Configuration is required",
},
"No metricbeat section": {
c: &Config{Beatconfig: map[string]interface{}{"other": map[string]interface{}{}}},
hasError: true,
errorString: "Configuration key 'metricbeat' is required",
},
"Valid config": {
c: &Config{Beatconfig: map[string]interface{}{"metricbeat": map[string]interface{}{}}},
hasError: false,
errorString: "",
},
}
for name, tc := range tests {
err := tc.c.Validate()
if tc.hasError {
assert.NotNilf(t, err, "%s failed, should have had error", name)
assert.Equalf(t, err.Error(), tc.errorString, "%s failed, error not equal", name)
} else {
assert.Nilf(t, err, "%s failed, should not have error", name)
}
}
}
61 changes: 61 additions & 0 deletions x-pack/metricbeat/mbreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package mbreceiver

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/metricbeat/beater"
"github.com/elastic/beats/v7/metricbeat/cmd"
)

const (
Name = "metricbeatreceiver"
)

func createDefaultConfig() component.Config {
return &Config{}
}

func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
cfg, ok := baseCfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not convert otel config to metricbeat config")
}
settings := cmd.MetricbeatSettings(Name)
settings.ElasticLicensed = true

b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, consumer, set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}

beatCreator := beater.DefaultCreator()

beatConfig, err := b.BeatConfig()
if err != nil {
return nil, fmt.Errorf("error getting beat config: %w", err)
}

mbBeater, err := beatCreator(&b.Beat, beatConfig)
if err != nil {
return nil, fmt.Errorf("error getting %s creator:%w", Name, err)
}

return &metricbeatReceiver{beat: &b.Beat, beater: mbBeater, logger: set.Logger}, nil
}

func NewFactory() receiver.Factory {
return receiver.NewFactory(
component.MustNewType(Name),
createDefaultConfig,
receiver.WithLogs(createReceiver, component.StabilityLevelAlpha))
}
37 changes: 37 additions & 0 deletions x-pack/metricbeat/mbreceiver/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package mbreceiver

import (
"context"

"github.com/elastic/beats/v7/libbeat/beat"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
)

type metricbeatReceiver struct {
beat *beat.Beat
beater beat.Beater
logger *zap.Logger
}

func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error {
go func() {
mb.logger.Info("starting metricbeat receiver")
err := mb.beater.Run(mb.beat)
if err != nil {
mb.logger.Error("metricbeat receiver run error", zap.Error(err))
}
}()
return nil
}

func (mb *metricbeatReceiver) Shutdown(ctx context.Context) error {
mb.logger.Info("stopping metricbeat receiver")
mb.beater.Stop()
return nil
}
92 changes: 92 additions & 0 deletions x-pack/metricbeat/mbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package mbreceiver

import (
"bytes"
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func TestNewReceiver(t *testing.T) {
config := Config{
Beatconfig: map[string]interface{}{
"metricbeat": map[string]interface{}{
"modules": []map[string]interface{}{
{
"module": "system",
"enabled": true,
"period": "1s",
"processes": []string{".*"},
"metricsets": []string{"cpu"},
},
},
},
"output": map[string]interface{}{
"otelconsumer": map[string]interface{}{},
},
"logging": map[string]interface{}{
"level": "debug",
"selectors": []string{
"*",
},
},
"path.home": t.TempDir(),
},
}

var zapLogs bytes.Buffer
core := zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
zapcore.AddSync(&zapLogs),
zapcore.DebugLevel)

receiverSettings := receiver.Settings{}
receiverSettings.Logger = zap.New(core)

var countLogs int
logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
countLogs = countLogs + ld.LogRecordCount()
return nil
})
require.NoError(t, err, "Error creating log consumer")

r, err := createReceiver(context.Background(), receiverSettings, &config, logConsumer)
require.NoErrorf(t, err, "Error creating receiver. Logs:\n %s", zapLogs.String())
err = r.Start(context.Background(), nil)
require.NoError(t, err, "Error starting metricbeatreceiver")

ch := make(chan bool, 1)
timer := time.NewTimer(120 * time.Second)
defer timer.Stop()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for tick := ticker.C; ; {
select {
case <-timer.C:
t.Fatalf("consumed logs didn't increase\nCount: %d\nLogs: %s\n", countLogs, zapLogs.String())
case <-tick:
tick = nil
go func() { ch <- countLogs > 0 }()
case v := <-ch:
if v {
goto found
}
tick = ticker.C
}
}
found:
leehinman marked this conversation as resolved.
Show resolved Hide resolved
err = r.Shutdown(context.Background())
require.NoError(t, err, "Error shutting down metricbeatreceiver")
}
Loading