-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Behavioral Analytics event ingest tuning #95405
Conversation
@@ -4,7 +4,7 @@ | |||
"actions": { | |||
"rollover": { | |||
"max_age": "30d", | |||
"max_size": "3GB" | |||
"max_primary_shard_size": "50gb" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Using the same settings than those used in the default log policy.
@@ -19,6 +19,10 @@ | |||
} | |||
} | |||
}, | |||
"cold": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Adding a cold tier, allowing leveraging cold nodes if it exists into the cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Changing most wildcard
field to keyword
since we do not really need it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Adding a "cool down" mechanism to the ingest.
When the bulk processor start rejecting event we will drop all new events during a configurable delay (10 s. by default). Once the delay is expired we try to collect event again.
When entering/leaving the cool down mode the following warning are logged:
Bulk processor is full. Start dropping events for %s.
Trying to accept events again.
Having too many of this message in the log means a capacity issue that can be solved by:
- increasing heap size of your existing nodes
- adding mode nodes to the cluster
In the future we will add metrics on this feature and we will probably add a persistent queue to avoid dropping events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Renaming the BulkProcessorConfig
class into AnalyticsEventIngestConfig
and changed the following:
- Set the default bulk size to
500
(previously1000
) - Adding a max_bytes_in_flight settings to the bulk processor. Default is 5% of the heap, (half of the allowed indexing pressure), so we should not put the cluster at risk.
Reducing this value increase the probability of getting dropped events while increasing it may cause error in the indexing process. - Adding the ability to configure the cool down delay (default to 10s.)
Hi @afoucret, I've created a changelog YAML for you. |
@pgomulka would be great if you can have a look at this PR. Happy top hear your thoughts. |
CI errors are not related to this code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One alternative to explore regarding cooldown period enforcement 💯
listener.onFailure(new ElasticsearchStatusException("Unable to add the event: too many requests.", RestStatus.TOO_MANY_REQUESTS)); | ||
if (dropEvent.compareAndSet(false, true)) { | ||
logger.warn(Strings.format("Bulk processor is full. Start dropping events for %s.", coolDownDelay.getStringRep())); | ||
client.threadPool().schedule(this::onCoolDownExpired, coolDownDelay, ThreadPool.Names.SAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need thread scheduling for cooling down? Wouldn't it be equally effective to register the time at which we can accept events again, and compare the current time with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea but I am still struggling to know how to log a message to notify that we are accepting event again...
Any suggestions?
PS: I think it is important at least until we will have better monitoring for event ingest in 8.9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you considered using BulkProcessor's backpressure mechanism and retry features?
I think it should give use the same "drop event" functionality
perhaps just using bulkdProcessor.addWithBackpressure(eventIndexRequest, ()->false) would be enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pgomulka I wanted to use it first but the implementation adds wait time of 500ms. In the context of handling a lot of parallel HTTP connections this block could be a problem. I could do it async but would loose the ability to respond with a 429 error and the risk of having this monitored.
Instead I made the choice of not trying to add new events until the cool down delay is expired.
More specifically, this cool down mechanism helped me to deal with OOM that I was able to see on my laptop:
One instance of org.elasticsearch.index.translog.Translog loaded by jdk.internal.loader.ClassLoaders$AppClassLoader @ 0x7e00008b0 occupies 99 020 568 (39,68 %) bytes. The memory is accumulated in one instance of java.util.HashMap$Node[], loaded by <system class loader>, which occupies 98 565 752 (39,50 %) bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I did new Rally tests with all the optimizations I made since I introduce it and this cool down mechanism does not seems to be required anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can confirm this is not needed anymore. Reverted the code to a previous which is just logging start / end of event dropping
public void emitEvent(
final PostAnalyticsEventAction.Request request,
final ActionListener<PostAnalyticsEventAction.Response> listener
) {
try {
AnalyticsEvent event = eventFactory.fromRequest(request);
IndexRequest eventIndexRequest = createIndexRequest(event);
bulkProcessor.add(eventIndexRequest);
if (dropEvent.compareAndSet(true, false)) {
logger.warn("Bulk processor has been flushed. Accepting new events again.");
}
if (request.isDebug()) {
listener.onResponse(new PostAnalyticsEventAction.DebugResponse(true, event));
} else {
listener.onResponse(PostAnalyticsEventAction.Response.ACCEPTED);
}
} catch (IOException e) {
listener.onFailure(new ElasticsearchException("Unable to parse the event.", e));
} catch (EsRejectedExecutionException e) {
listener.onFailure(
new ElasticsearchStatusException("Unable to add the event: too many requests.", RestStatus.TOO_MANY_REQUESTS)
);
if (dropEvent.compareAndSet(false, true)) {
logger.warn("Bulk processor is full. Start dropping events.");
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we can also try to modify the bulkProcessor to either have the 500ms time configurable or contain the mechanism you want. So that all future use cases like yours will benefit from it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pgomulka The current code is very stable in my benchmarks under heavy load. I think we will keep this version and I will see what could be improved after the feature freeze. Are you comfortable with this?
… session are always into the same shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pgomulka The current code is very stable in my benchmarks under heavy load. I think we will keep this version and I will see what could be improved after the feature freeze. Are you comfortable with this?
I think that is sensible thing to do. I think the code is good, will work especially since we don't expect this to be heavily used straight away.
@@ -75,8 +84,13 @@ public void emitEvent( | |||
} catch (IOException e) { | |||
listener.onFailure(new ElasticsearchException("Unable to parse the event.", e)); | |||
} catch (EsRejectedExecutionException e) { | |||
listener.onFailure(new ElasticsearchException("Unable to add the event to the bulk.")); | |||
logger.error("Unable to add the event to the bulk.", e); | |||
listener.onFailure( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this listener will only be used when add throws an exception. How about exceptions during the actual batch execute?
Perhaps you would like to use listener from the BulkProcessor itself? In addition to what you have now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pgomulka The point of this listener is to answer the HTTP request. We are not waiting for the bulk execution. Instead we are returning as soon as the BulkProcessor did accept / reject the request.
We are logging bulk errors in the listener defined here. In the future we would like to implement a dead-letter box index where we would put all failed events. This way we could be able to surface this errors in the UI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, the explanations make sense to me
I would be great if we could follow up with at least a PoC to verify if we could achieve the same performance with BulkProcessor (using addWithBackpressure or add this dropping feature to BulkProcessor). I understand this might take time because it would have to go throw performance tracks
Improvement for the event ingest.
See code comments for detail.
This was battle tested using the following rally track and esbench.
With default setting I was able to reach the end of the 26 millions of events ingestion:
Before introducing the cool down mechanism it was happening that we go OOM errors (~2000 ev.sec on my laptop).
According heap analysis it was related to the memory usage: