-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[filebeat] Elasticsearch state storage for httpjson and cel inputs #41446
base: main
Are you sure you want to change the base?
Conversation
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
|
@belimawr @cmacknz (or whoever wants/have time to be involved)
|
@leehinman I'd appreciate a review here to make sure this can co-exist with Beats receivers in agent since that would be the long term way we plan to run agentless inputs. |
filebeat/features/features.go
Outdated
|
||
// List of input types Elasticsearch state store is enabled for | ||
var esTypesEnabled = map[string]void{ | ||
"httpjson": {}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be configuration instead of in the code, maybe another env var?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure can do. Something like this?
AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES=httpjson,cel
func New(ctx context.Context, log *logp.Logger, notifier *Notifier) (*Registry, error) { | ||
r := &Registry{ | ||
ctx: ctx, | ||
log: log, | ||
notifier: notifier, | ||
} | ||
|
||
return r, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error
return can be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will leave for now as is, as it's consistent with memlog.New that returns the registry and in case if further changes that could return error from the constructor
…asticsearch state store is enabled if any of the inputs are with AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES.
Eliminated
It will be event shorter when the APIKEY permissions are properly set through the policy and we can remove that workaround with APIKEY injection. |
Can I get some reviews on this PR? What is acceptable? What is missing? Assuming that this "hack" https://github.com/elastic/beats/pull/41446/files#diff-97b65fad069be1072219d7ae6a1e8f64d287e8cb6c1f4e424c801a22200db104R318 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The biggest thing missing now is tests. There needs to be an integration test proving this actually works, ideally against ES running in a container.
I would also like to see some tests that drive the Beat configuration as if it were managed by Elastic agent. There are a couple that start Filebeat with management enabled but using a gRPC server that is under the test control instead of the real agent, they also run with ES running already. https://github.com/cmacknz/beats/blob/d4fc5cdeff6d2bc382f0341fe9118157f6fcb849/x-pack/filebeat/tests/integration/managerV2_test.go#L56-L70
|
||
type OnConfigUpdateFunc func(c *conf.C) | ||
|
||
type Notifier struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added tests awhile back. @cmacknz could you please take a look again?
func (n *Notifier) Unsubscribe(id int) { | ||
n.mx.Lock() | ||
defer n.mx.Unlock() | ||
delete(n.listeners, id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this decrement n.counter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
counter here is just the id (handle) that is returned to the user on Subscribe in order to allow to that "handle" later for Unsubscribe.
} | ||
s.cliErr = nil | ||
|
||
cli, err := eslegclient.NewConnectedClient(ctx, c, s.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean there is a client and connection per input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per instance of the storage, yes. Do we want to make it singleton/global?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mainly because of this piece:
beats/libbeat/esleg/eslegclient/connection.go
Line 253 in 09e540c
// NewConnectedClient returns a non-thread-safe connection. Make sure for each goroutine you initialize a new connection. |
// NewConnectedClient returns a non-thread-safe connection. Make sure for each goroutine you initialize a new connection.
func NewConnectedClient(ctx context.Context, cfg *cfg.C, beatname string) (*Connection, error) {
|
||
// Sets the store ID when the full input configuration is acquired | ||
// This is needed in order to support Elasticsearch state store naming convention based on the input ID | ||
SetID(id string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment implies this means there will be a store per input, would this make more sense being a required argument when the store is constructed? You can't have an interface that includes a constructor, but that is where it feels like it belongs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to workaround the current code interfaces that are already in place here. That's why had to add additional function to be able to set the ID when the input configuration is actually received and the ID is available.
"query": map[string]any{ | ||
"match_all": map[string]any{}, | ||
}, | ||
"size": 1000, // TODO: we might have to do scroll if there are more than 1000 keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How likely are we to have more than 1000 keys? The inputs wouldn't have had any kind of restriction on the number of keys before, so hard to tell if this won't matter for a while or is critically important to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the input types that I'm testing with so far is very very unlikely. For example for httpjson input, it's usually one cursor key/value per stream.
This pull request is now in conflicts. Could you fix it? 🙏
|
This pull request is now in conflicts. Could you fix it? 🙏
|
Proposed commit message
[filebeat] Elasticsearch state storage for httpjson input
This is a POC for Elasticsearch as State Store Backend for Security Integrations for Agentless solution.
The scope of this change was narrowed down to supporting only
httpjson
inputs in order to support Okta integration for the initial release. All the other integrations inputs still use the file storage as before.This is a short term solution for the state storage for k8s environment.
This is the first cut and the details can change depending on the feedback.
Current feature currently could be enabled
AGENTLESS_ELASTICSEARCH_STATE_STORE_ENABLED
, to be decided how this would be configurable in k8s.This change currently contains the hacky approach to the
AGENTLESS_ELASTICSEARCH_APIKEY
overwrite. This allows to the user to provide the ApiKey with elevated permissions that are required in order to be able to create/write/read the state index per input. THIS IS FOR DEVELOPMENT/TESTING ONLY. REMOVE BEFORE THE MERGE.The existing code relied on the inputs state storage to be fully configurable before the main beat managers runs. The change delays the configuration of
httpjson
input to the time when the actual configuration is received from the Agent.There is an assumption that the index template for the state storage indices is already in place before the storage is used
Example of the state storage index content for Okta integration:
The naming convention for all state store is
agentless-state-<input id>
, since the expectation for agentless we would have only one agent per policy and the agents are ephemeral.Currently in order to run the agent with Elasticsearch state storage a couple of environment variables would be required:
where the ApiKey in the
DEPENDENCIES / TODOS:
Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Disruptive User Impact
The change should have no impact, and without the feature enabled the filebeat should work as before using the file system storage for the state.