-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track in/out pages in exchange #120867
Track in/out pages in exchange #120867
Conversation
Pinging @elastic/es-analytical-engine (Team:Analytics) |
exchangeSource, | ||
exchangeSink | ||
exchangeSource::createExchangeSource, | ||
() -> exchangeSink.createExchangeSink(() -> {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems all prod implementations are passing a noop runnable. Could you please point me to the actual usage? Or is it going to be added in a later prs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its usage will be added later in "retry node requests on shard-level failures" work.
For the use case cited, this looks fine, as far as I can tell, as I'm not exactly sure how you'll use this in the next PR. But are there other use cases for this callback? If so, should we use something other than Runnable, in favor of something that returns metadata about the block? For example, would it be useful know things like: 1) which node or cluster the block came from; 2) whether this is the last block and no other blocks will be coming. Those could be useful for metadata accounting, especially around CCS work or maybe incremental results work that is planned for later this yar. |
@quux00 For my use case, the callback updates a shared atomic boolean, while others might need a page count. Therefore, I chose to pass a Runnable to allow callers to manage their metadata externally. |
Thanks everyone! |
This is a spin-off of the "retry node requests on shard-level failures" work. Currently, a driver can execute against multiple shards simultaneously. If the execution fails and no pages are added to the sink, we can retry the failed shards on another node. In another scenario, if no pages are fetched or added to the exchange source and the entire data node request fails, we can also retry the entire request. This change adds callbacks to RemoteSink and ExchangeSink, allowing for tracking of in/out pages.
💚 Backport successful
|
This is a spin-off of the "retry node requests on shard-level failures" work. Currently, a driver can execute against multiple shards simultaneously. If the execution fails and no pages are added to the sink, we can retry the failed shards on another node. In another scenario, if no pages are fetched or added to the exchange source and the entire data node request fails, we can also retry the entire request. This change adds callbacks to RemoteSink and ExchangeSink, allowing for tracking of in/out pages.
This is a spin-off of the "retry node requests on shard-level failures" work.
Currently, a driver can execute against multiple shards simultaneously. If the execution fails and no pages are added to the sink, we can retry the failed shards on another node. In another scenario, if no pages are fetched or added to the exchange source and the entire data node request fails, we can also retry the entire request. This change adds callbacks to RemoteSink and ExchangeSink, allowing for tracking of in/out pages.