Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to terminate an aggregation group early in the aggregation processor #5240

Open
joelmarty opened this issue Dec 3, 2024 · 4 comments
Labels
enhancement New feature or request good first issue Good for newcomers help wanted Extra attention is needed

Comments

@joelmarty
Copy link
Contributor

Is your feature request related to a problem? Please describe.
I have a pipeline to ingest logs in opensearch, and I use the aggregation processor with the put_map action.
At the moment, the only way a group can close with this action is to wait for the group_duration to expire.

That means that all records that have been merged but whose group is not yet closed still lives in memory in the data-prepper nodes.
For high throughput or high latency pipeline where you have to specify a large group_duration, or both, that means a lot of memory will be wasted on already merged records that are just waiting for the expiration of the group.

There should be a way to terminate a group and flush the result to the next processor or sink if you know you do not need to wait.

Describe the solution you'd like
The solution could work in two steps:

  1. Configure an expression to evaluate when the aggregation action is executed. This expression would evaluate to a tag added to the metadata of the aggregation group for instance.
  2. Attach an expression to evaluate or a list of tags that must be present on the aggregation group when it is mutated, that if true marks the group for finalization and flushes it immediately.

The pipeline configuration could look like:

my_pipeline:
  source:
    file:
      path: somefile.log
  processors:
    - aggregate:
         action:
           put_all: {}
         identification_keys:
           - common_key
         tag_on_aggregate: /log_type
         terminate_when: hasTags("type_1", "type_2")

Describe alternatives you've considered (Optional)

Other option: add a close_when expression common to all AggregateAction that provides the custom expression that guards the closure of the group.
This expression can be evaluated when AggregateGroupManager.getGroupsToConclude() is called, so the changes in AggregateProcessor are minimal.

Additional context

The aggregate processor first checks for groups to conclude and then processes the current batch. This logic should be reversed so the events are flushed immediately after the aggregation.

@dlvenable
Copy link
Member

@joelmarty , We originally planned for a conclude_when field in #699.

conclude_when: "/event/type == 'CLOSED'"

The idea here is that when this condition occurs, the group closes. Is this what you would like?

Would you be able to create a PR with this feature? We can help if you need some pointers for getting started.

@dlvenable dlvenable added enhancement New feature or request good first issue Good for newcomers help wanted Extra attention is needed and removed untriaged labels Dec 10, 2024
@joelmarty
Copy link
Contributor Author

Yes, it is exactly what I would need. I am not sure I would be able to work on this feature as it would be a consequent investment of time, but I can try.

I am interested in getting opinions on the options I provided if I were to work on this.

@dlvenable
Copy link
Member

@joelmarty ,

Regarding the options, let me see if I understand them:

Configure an expression to evaluate when the aggregation action is executed. This expression would evaluate to a tag added to the metadata of the aggregation group for instance.

I think you are wanting to use the existing aggregate_when option. This allows us you to configure if any given input event will be aggregated or not.

I'm not sure what you mean by the tagging. Can you elaborate on this?

Attach an expression to evaluate or a list of tags that must be present on the aggregation group when it is mutated, that if true marks the group for finalization and flushes it immediately.

I think you may be looking for something slightly different than the original conclude_when condition. That proposal was to conclude the group when the input event matches a condition. Perhaps you are looking to conclude the group when the entire group has met a condition.

Concluding when a group has met a condition may be harder because expressions only work on events. But, the event is not created until the group is completed.

Regarding working on these, implementing a conclude_when on the input event would not be very difficult. We already have other examples of code that use expressions, including aggregate_when. So you'd just use that existing code and then conclude the group.

@joelmarty
Copy link
Contributor Author

I think you are wanting to use the existing aggregate_when option. This allows us you to configure if any given input event will be aggregated or not.
I'm not sure what you mean by the tagging. Can you elaborate on this?

No, one of the solutions I was proposing is to add both a "tag generator" expression that adds a tag to the AggregateGroup (a new field would be required, but would work much like EventMetadata tags), and an expression, or array field, that guards the closure of the group: when all tags defined in that field are present in the aggregation group, then it is marked for closure.

The other solution I was proposing to use a more generic conclude_when expression that works very much like the proposal in #699.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers help wanted Extra attention is needed
Projects
Development

No branches or pull requests

2 participants