Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat] Elasticsearch state storage for httpjson and cel inputs #41446

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
55c72d3
[filebeat] Elasticsearch state storage for httpjson input
aleksmaus Oct 24, 2024
1bf288d
Fixup tests
aleksmaus Oct 24, 2024
e003053
Linter
aleksmaus Oct 25, 2024
dfce978
Enabled elastisearch storage support for cel input and some cleanup
aleksmaus Oct 28, 2024
e2e25fa
Remove the "hack" with .Each implementation
aleksmaus Oct 30, 2024
953355b
Merge branch 'main' into poc/es_state_store
aleksmaus Oct 31, 2024
c1fc2a8
Adjust for the latest main es client signature change
aleksmaus Oct 31, 2024
c9b0256
Make check happy
aleksmaus Oct 31, 2024
21d451d
Fixed missing interface method on test mock store
aleksmaus Oct 31, 2024
ffb9364
Add error check in ES store Each
aleksmaus Oct 31, 2024
10d212f
Parameterize the supported input types through environment variables
aleksmaus Nov 5, 2024
24000d7
Delete the dev tests file
aleksmaus Nov 5, 2024
a0d019e
Address code review comments
aleksmaus Nov 7, 2024
34003d4
Add refresh=wait_for for data consistency
aleksmaus Nov 8, 2024
36e9983
Revert "Add refresh=wait_for for data consistency"
aleksmaus Nov 18, 2024
aee4112
Cleanup
aleksmaus Nov 18, 2024
76b11a4
Added updated_at field
aleksmaus Nov 18, 2024
1fe1c0e
Merge branch 'main' into poc/es_state_store
aleksmaus Nov 19, 2024
3c5bab1
Merge branch 'main' into poc/es_state_store
aleksmaus Nov 25, 2024
4ac1bd0
Eliminate AGENTLESS_ELASTICSEARCH_STATE_STORE_ENABLED setting. The El…
aleksmaus Nov 25, 2024
535560e
Merge branch 'main' into poc/es_state_store
aleksmaus Dec 2, 2024
4940097
Merge branch 'main' into poc/es_state_store
aleksmaus Dec 6, 2024
e73a1b7
Added logging per code review request
aleksmaus Dec 9, 2024
6c39fd0
Some cleanup and notifier utz
aleksmaus Dec 10, 2024
716758f
Add some integration testing coverage
aleksmaus Dec 10, 2024
5ec7ed4
Merge branch 'main' into poc/es_state_store
aleksmaus Dec 11, 2024
904d8e8
Merge branch 'main' into poc/es_state_store
aleksmaus Jan 7, 2025
02d684a
Merge branch 'main' into poc/es_state_store
aleksmaus Jan 7, 2025
5a9cdb5
Fix the Filebeat test harness for agentbeat tests
aleksmaus Jan 7, 2025
fd4cd00
Fixed the imports formatting for the notifier_test
aleksmaus Jan 7, 2025
8dd3938
Merge branch 'main' into poc/es_state_store
aleksmaus Jan 8, 2025
c52bd65
Merge branch 'main' into poc/es_state_store
aleksmaus Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package beater

import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
Expand All @@ -39,6 +41,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
Expand Down Expand Up @@ -79,7 +82,7 @@ type Filebeat struct {
type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin

type StateStore interface {
Access() (*statestore.Store, error)
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

Expand Down Expand Up @@ -281,13 +284,48 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry)
// Use context, like normal people do, hooking up to the beat.done channel
ctx, cn := context.WithCancel(context.Background())
go func() {
<-fb.done
cn()
}()

stateStore, err := openStateStore(ctx, b.Info, logp.NewLogger("filebeat"), config.Registry)
if err != nil {
logp.Err("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()

// If notifier is set, configure the listener for output configuration
// The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage
// in order to allow it fully configure
if stateStore.notifier != nil {
b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
outCfg := conf.Namespace{}
if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
logp.Err("Failed to unpack the output config: %v", err)
return nil
}

// TODO: REMOVE THIS HACK BEFORE MERGE. LEAVING FOR TESTING FOR DRAFT
// Injecting the ApiKey that has enough permissions to write to the index
// TODO: need to figure out how add permissions for the state index
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
// agentless-state-<input id>, for example httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959
apiKey := os.Getenv("AGENTLESS_ELASTICSEARCH_APIKEY")
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
if apiKey != "" {
err := outCfg.Config().SetString("api_key", -1, apiKey)
if err != nil {
return fmt.Errorf("failed to overwrite api_key: %w", err)
}
}
orestisfl marked this conversation as resolved.
Show resolved Hide resolved

stateStore.notifier.NotifyConfigUpdate(outCfg.Config())
return nil
})
}

err = processLogInputTakeOver(stateStore, config)
if err != nil {
logp.Err("Failed to attempt filestream state take over: %+v", err)
Expand Down Expand Up @@ -341,6 +379,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
defer func() {
_ = inputTaskGroup.Stop()
}()

// Store needs to be fully configured at this point
if err := v2InputLoader.Init(&inputTaskGroup); err != nil {
logp.Err("Failed to initialize the input managers: %v", err)
return err
Expand Down Expand Up @@ -509,7 +549,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
return nil
}

store, err := stateStore.Access()
store, err := stateStore.Access("")
if err != nil {
return fmt.Errorf("Failed to access state when attempting take over: %w", err)
}
Expand Down
49 changes: 43 additions & 6 deletions filebeat/beater/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,80 @@
package beater

import (
"context"
"time"

"github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/features"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/backend"
"github.com/elastic/beats/v7/libbeat/statestore/backend/es"

Check failure on line 29 in filebeat/beater/store.go

View workflow job for this annotation

GitHub Actions / lint (windows)

could not import github.com/elastic/beats/v7/libbeat/statestore/backend/es (-: # github.com/elastic/beats/v7/libbeat/statestore/backend/es
"github.com/elastic/beats/v7/libbeat/statestore/backend/memlog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

type filebeatStore struct {
registry *statestore.Registry
esRegistry *statestore.Registry
orestisfl marked this conversation as resolved.
Show resolved Hide resolved
storeName string
cleanInterval time.Duration

// Notifies the Elasticsearch store about configuration change
// which is available only after the beat runtime manager connects to the Agent
// and receives the output configuration
notifier *es.Notifier
}

func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
memlog, err := memlog.New(logger, memlog.Settings{
func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
var (
reg backend.Registry
err error

esreg *es.Registry
notifier *es.Notifier

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Optional) pointers to potentially nil objects are common places where bugs could be introduced. How about an interface with two implementations, one no-op and one the actual notifier.

)

if features.IsElasticsearchStateStoreEnabled() {
notifier = es.NewNotifier()
esreg, err = es.New(ctx, logger, notifier)
if err != nil {
return nil, err
}
}

reg, err = memlog.New(logger, memlog.Settings{
Root: paths.Resolve(paths.Data, cfg.Path),
FileMode: cfg.Permissions,
})
if err != nil {
return nil, err
}

return &filebeatStore{
registry: statestore.NewRegistry(memlog),
store := &filebeatStore{
registry: statestore.NewRegistry(reg),
storeName: info.Beat,
cleanInterval: cfg.CleanInterval,
}, nil
notifier: notifier,
}

if esreg != nil {
store.esRegistry = statestore.NewRegistry(esreg)
}

return store, nil
}

func (s *filebeatStore) Close() {
s.registry.Close()
}

func (s *filebeatStore) Access() (*statestore.Store, error) {
// Access returns the storage registry depending on the type. Default is the file store.
func (s *filebeatStore) Access(typ string) (*statestore.Store, error) {
if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil {
return s.esRegistry.Get(s.storeName)
}
return s.registry.Get(s.storeName)
}

Expand Down
49 changes: 49 additions & 0 deletions filebeat/features/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package features

import "os"

type void struct{}

// List of input types Elasticsearch state store is enabled for
var esTypesEnabled = map[string]void{
"httpjson": {},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be configuration instead of in the code, maybe another env var?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure can do. Something like this?
AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES=httpjson,cel

"cel": {},
}

var isESEnabled bool

func init() {
isESEnabled = (os.Getenv("AGENTLESS_ELASTICSEARCH_STATE_STORE_ENABLED") != "")
}

// IsElasticsearchStateStoreEnabled returns true if feature is enabled for agentless
func IsElasticsearchStateStoreEnabled() bool {
return isESEnabled
}

// IsElasticsearchStateStoreEnabledForInput returns true if the provided input type uses Elasticsearch for state storage if the Elasticsearch state store feature is enabled
func IsElasticsearchStateStoreEnabledForInput(inputType string) bool {
if IsElasticsearchStateStoreEnabled() {
if _, ok := esTypesEnabled[inputType]; ok {
return true
}
}
return false
}
8 changes: 4 additions & 4 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@
}

func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")

actual := 0
err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) {
Expand Down Expand Up @@ -331,7 +331,7 @@
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")
id := getIDFromPath(filepath, inputID, fi)

var entry registryEntry
Expand All @@ -352,7 +352,7 @@
}

func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) {
inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")

var entry registryEntry
err := inputStore.Get(key, &entry)
Expand Down Expand Up @@ -418,9 +418,9 @@
for _, e := range e.pipeline.GetAllEvents() {
flat := e.Fields.Flatten()
pathi, _ := flat.GetValue("log.file.path")
path := pathi.(string)

Check failure on line 421 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value is not checked (errcheck)
msgi, _ := flat.GetValue("message")
msg := msgi.(string)

Check failure on line 423 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value is not checked (errcheck)
logLines[path] = append(logLines[path], msg)
}

Expand Down Expand Up @@ -462,7 +462,7 @@
if len(events) == checkedEventCount {
e.t.Fatalf("not enough expected elements")
}
message := evt.Fields["message"].(string)

Check failure on line 465 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value is not checked (errcheck)
if message == events[checkedEventCount] {
foundEvents[checkedEventCount] = true
}
Expand Down Expand Up @@ -538,7 +538,7 @@
s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (s *testStore) Close() {
s.registry.Close()
}

func (s *testStore) Access() (*statestore.Store, error) {
func (s *testStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filestream-benchmark")
}

Expand Down
55 changes: 35 additions & 20 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/elastic/go-concert/unison"

"github.com/elastic/beats/v7/filebeat/features"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/statestore"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -64,7 +65,7 @@ type InputManager struct {
// that will be used to collect events from each source.
Configure func(cfg *conf.C) (Prospector, Harvester, error)

initOnce sync.Once
initedFull bool
initErr error
store *store
ackUpdater *updateWriter
Expand All @@ -88,34 +89,48 @@ const globalInputID = ".global"

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access() (*statestore.Store, error)
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

func (cim *InputManager) init() error {
cim.initOnce.Do(func() {
// init initializes the state store
// This function is called from:
// 1. InputManager::Init on beat start
// 2. InputManager::Create when the input is initialized with configuration
// When Elasticsearch state storage is used for the input it will be only fully configured on InputManager::Create,
// so skip reading the state from the storage on InputManager::Init in this case
func (cim *InputManager) init(inputID string) error {
if cim.initedFull {
return nil
}

log := cim.Logger.With("input_type", cim.Type)
log := cim.Logger.With("input_type", cim.Type)

var store *store
store, cim.initErr = openStore(log, cim.StateStore, cim.Type)
if cim.initErr != nil {
return
}
var store *store

cim.store = store
cim.ackCH = newUpdateChan()
cim.ackUpdater = newUpdateWriter(store, cim.ackCH)
cim.ids = map[string]struct{}{}
})
useES := features.IsElasticsearchStateStoreEnabledForInput(cim.Type)
fullInit := !useES || (inputID != "" && useES)
store, cim.initErr = openStore(log, cim.StateStore, cim.Type, inputID, fullInit)
if cim.initErr != nil {
return cim.initErr
}

cim.store = store
cim.ackCH = newUpdateChan()
cim.ackUpdater = newUpdateWriter(store, cim.ackCH)
cim.ids = map[string]struct{}{}

if fullInit {
cim.initedFull = true
}

return cim.initErr
}

// Init starts background processes for deleting old entries from the
// persistent store if mode is ModeRun.
func (cim *InputManager) Init(group unison.Group) error {
if err := cim.init(); err != nil {
if err := cim.init(""); err != nil {
return err
}

Expand Down Expand Up @@ -150,10 +165,6 @@ func (cim *InputManager) shutdown() {
// Create builds a new v2.Input using the provided Configure function.
// The Input will run a go-routine per source that has been configured.
func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
if err := cim.init(); err != nil {
return nil, err
}

settings := struct {
ID string `config:"id"`
CleanInactive time.Duration `config:"clean_inactive"`
Expand All @@ -163,6 +174,10 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
return nil, err
}

if err := cim.init(settings.ID); err != nil {
return nil, err
}

if settings.ID == "" {
cim.Logger.Error("filestream input ID without ID might lead to data" +
" duplication, please add an ID and restart Filebeat")
Expand Down
Loading
Loading