Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18569: New consumer close may wait on unneeded FindCoordinator #18590

Merged
merged 2 commits into from
Jan 29, 2025

Conversation

frankvicky
Copy link
Contributor

@frankvicky frankvicky commented Jan 17, 2025

JIRA: KAFKA-18569
Please refer to ticket for further details.

In short, now new consumer close may wait for a FindCoordinator unsent request to go out when closing the consumer, even after the commit/leaveGroup stages of close are done.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added triage PRs from the community consumer clients small Small PRs labels Jan 17, 2025
@frankvicky
Copy link
Contributor Author

Hi @lianetm @kirktrue @chia7712
Please take a look when you have a free cycle.
Many thanks 🙇🏼

@kirktrue kirktrue added KIP-848 The Next Generation of the Consumer Rebalance Protocol ctr Consumer Threading Refactor (KIP-848) labels Jan 17, 2025
@kirktrue
Copy link
Collaborator

Thanks for the PR @frankvicky!

I was curious if you had a chance to look into using the pollOnClose() approach that was suggested in the Jira. If that approach works, we wouldn't need an extra ApplicationEvent.

Thanks!

@github-actions github-actions bot added the core Kafka Broker label Jan 18, 2025
@frankvicky
Copy link
Contributor Author

Hi @kirktrue
Thanks for the review.
You're right, it's easier than having a new event. 😺
Previously I thought following CommitRequestManager could have a unified close style.

@frankvicky frankvicky force-pushed the KAKFA-18569 branch 3 times, most recently from f3def68 to ba51a9d Compare January 19, 2025 23:46
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refresh on the PR @frankvicky! This looks much more succinct.

I'm still unsure what the behavior is for this sequence of events:

  1. The coordinator is marked as unknown
  2. CoordinatorRequestManager.poll() is called and creates a new FindCoordinatorRequest
  3. The NetworkClientDelegate sends the request to the broker
  4. Consumer.close() is called with a timeout of 30 seconds
  5. ConsumerNetworkThread.sendUnsentRequests() is called

In step 5, won't it continue to loop for ~30 seconds because the find request created in step 2 (and sent in step 3) is still inflight when ConsumerNetworkThread.sendUnsentRequests() is called?

do {
    networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
    timer.update();
} while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests());

NetworkClientDelegate.hasAnyPendingRequests() will return true while there are any in-flight requests.

Any thoughts?

Thanks!

@github-actions github-actions bot removed the triage PRs from the community label Jan 22, 2025
@frankvicky
Copy link
Contributor Author

Hi @kirktrue,

Thanks for the review.
It's tricky to have NetworkClientDelegate ignore the FindCoordinatorRequest since the request is stored in NetworkClient#inFlightRequests, and we shouldn't manipulate this property outside of NetworkClient.
I considered calling completeExceptionally on the in-flight FindCoordinatorRequest when closing, but I don't see any existing logic that does this.

@frankvicky frankvicky force-pushed the KAKFA-18569 branch 4 times, most recently from b9fa0df to 97e53cb Compare January 23, 2025 04:34
@frankvicky
Copy link
Contributor Author

frankvicky commented Jan 23, 2025

Currently,testClose will timeout at this line:

checkCloseWithClusterFailure(numRecords, "group4", "group5", groupProtocol)

It seems that the behavior describe in comment are not followed:

/**
* Consumer is closed while all brokers are unavailable. Cannot rebalance or commit offsets since
* there is no coordinator, but close should timeout and return. If close is invoked with a very
* large timeout, close should timeout after request timeout.
*/

Updated:

Now the testClose will stock here because of max value timeout. We should have a way to let the background thread know the consumer is now closing.

@lianetm
Copy link
Member

lianetm commented Jan 23, 2025

Hey here, I don't quite get how the pollOnClose approach will solve the issue, basically because it still leaves a lot of time for unneeded FindCoord to be generated and block the network thread close, doesn't it?

The pollOnClose runs when we're closing the network thread, actually right before we block on sendUnsentRequests. This means that we may have already a FindCoord generated after the consumer close committed and left the group, correct?

To find a solution, let's look at the classic consumer first, this is my understanding:

Correct me there, but if that's the behaviour, could it be achieved in the new consumer by allowing the autoCommitOnClose run with the background ensuring a coordinator, but right after it we could signal to the CoordinatorReq manager that it's closing (same effect as the HB thread shutdown in the classic I would say), so it does not generate any more FindCoord? That's what comes to mind, but let me know your thoughts. Thanks!

@frankvicky
Copy link
Contributor Author

Hi @lianetm @kirktrue,
Sorry for interrupting the topic of this patch.
I noticed a potential behavior difference between the classic consumer and the async consumer while preparing this patch.

In the classic consumer, the timeout respects request.timeout.ms.

final Timer closeTimer = createTimerForRequest(timeout);

private Timer createTimerForRequest(final Duration timeout) {
// this.time could be null if an exception occurs in constructor prior to setting the this.time field
final Time localTime = (time == null) ? Time.SYSTEM : time;
return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
}

However, in the async consumer, this logic is either missing or only applies to individual requests.
Unlike the classic consumer, where request.timeout.ms works for the entire coordinator closing behavior, the async implementation handles timeouts differently.

public void close(final Timer timer) {
// we do not need to re-enable wakeups since we are closing already
client.disableWakeups();
try {
maybeAutoCommitOffsetsSync(timer);
while (pendingAsyncCommits.get() > 0 && timer.notExpired()) {
ensureCoordinatorReady(timer);
client.poll(timer);
invokeCompletedOffsetCommitCallbacks();
}
} finally {
super.close(timer);
}
}

Should we align the behavior between async and classic consumers?

@lianetm
Copy link
Member

lianetm commented Jan 24, 2025

Hey @frankvicky, good finding. Agree that the behaviour is not aligned in the close timeout handling, so in practice the classic consumer.close will never wait for more than the request timeout if there is a call to close with a larger timeout (and that's indeed missing on the async close timeout)

Actually, the behaviour is explicitly called out in one of the tests:

https://github.com/lianetm/kafka/blob/023f9c26e60c0710891abd148cce52c1dadaf7cd/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala#L300-L305

So I do agree that we need to align this. But just for my understanding, this is something else we need here to unblock these tests (the testClose specifically I imagine?), but not enough right? I expect we still need to deal with the initial situation to avoid issuing/blocking on unneeded FindCoord requests on close after the commit/leave have completed, agree? (just to make sure I'm not missing anything).

If my understanding is right then I think we should file a separate jira for the close timeout considering the request timeout, and if you can validate locally that it's the only fix required to enable the testClose then we enable it in that other PR (leaving this PR for the unneeded FindCoord issue and the testCloseDuringRebalance), let me know what you think.

@frankvicky
Copy link
Contributor Author

Hi @lianetm
Yes, you are right. The request.timeout.ms is a separate issue.
I will create a new ticket to track this timeout-handling problem.

@chia7712
Copy link
Member

so in practice the classic consumer.close will never wait for more than the request timeout if there is a call to close with a larger timeout (and that's indeed missing on the async close timeout)

I agree that we should align the behavior with how it has functioned for a long time (f72203e). Additionally, we should document this behavior for both request.timeout.ms and close method.

@frankvicky
Copy link
Contributor Author

Hi everyone,
I have updated the patch and looped the test locally, testCloseDuringRebalance now continuously passes the test.
Screenshot from 2025-01-25 14-00-50

frankvicky added a commit to frankvicky/kafka that referenced this pull request Jan 25, 2025
…assic consumer

JIRA: KAFKA-18645
see discussion:
apache#18590 (comment)

In the classic consumer, the timeout respects request.timeout.ms.
However, in the async consumer, this logic is either missing or only
applies to individual requests. Unlike the classic consumer, where
request.timeout.ms works for the entire coordinator closing behavior,
the async implementation handles timeouts differently.

We should align the close timeout-handling to enable
ConsumerBounceTest#testClose
frankvicky added a commit to frankvicky/kafka that referenced this pull request Jan 25, 2025
…assic consumer

JIRA: KAFKA-18645
see discussion:
apache#18590 (comment)

In the classic consumer, the timeout respects request.timeout.ms.
However, in the async consumer, this logic is either missing or only
applies to individual requests. Unlike the classic consumer, where
request.timeout.ms works for the entire coordinator closing behavior,
the async implementation handles timeouts differently.

We should align the close timeout-handling to enable
ConsumerBounceTest#testClose
frankvicky added a commit to frankvicky/kafka that referenced this pull request Jan 26, 2025
…assic consumer

JIRA: KAFKA-18645
see discussion:
apache#18590 (comment)

In the classic consumer, the timeout respects request.timeout.ms.
However, in the async consumer, this logic is either missing or only
applies to individual requests. Unlike the classic consumer, where
request.timeout.ms works for the entire coordinator closing behavior,
the async implementation handles timeouts differently.

We should align the close timeout-handling to enable
ConsumerBounceTest#testClose
@kirktrue
Copy link
Collaborator

The old/new approach to include a specialized event makes sense. Thanks for the suggestion @lianetm!

frankvicky added a commit to frankvicky/kafka that referenced this pull request Jan 28, 2025
…assic consumer

JIRA: KAFKA-18645
see discussion:
apache#18590 (comment)

In the classic consumer, the timeout respects request.timeout.ms.
However, in the async consumer, this logic is either missing or only
applies to individual requests. Unlike the classic consumer, where
request.timeout.ms works for the entire coordinator closing behavior,
the async implementation handles timeouts differently.

We should align the close timeout-handling to enable
ConsumerBounceTest#testClose
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @frankvicky ! Just one nit left. Also pls merge trunk latest changes to get the latests test fixed and will check the build again. Thanks!

* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
public class StopFindCoordinatorOnCloseEvent extends ApplicationEvent {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a java doc here? Mainly to describe that the purpose of this event is to ensure that the CoordinatorRequestManager does not generate FindCoordinator requests when the consumer is closing and has already completed the operations that require a coordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have just written some description for it. PTAL 😺

frankvicky and others added 2 commits January 29, 2025 10:55
JIRA: KAFKA-18569
Please refer to ticker for further details
Co-authored-by: Lianet Magrans <[email protected]>
frankvicky added a commit to frankvicky/kafka that referenced this pull request Jan 29, 2025
…assic consumer

JIRA: KAFKA-18645
see discussion:
apache#18590 (comment)

In the classic consumer, the timeout respects request.timeout.ms.
However, in the async consumer, this logic is either missing or only
applies to individual requests. Unlike the classic consumer, where
request.timeout.ms works for the entire coordinator closing behavior,
the async implementation handles timeouts differently.

We should align the close timeout-handling to enable
ConsumerBounceTest#testClose
@frankvicky
Copy link
Contributor Author

Failed test is handled by #18735

@lianetm lianetm merged commit 9dd73d4 into apache:trunk Jan 29, 2025
7 of 9 checks passed
lianetm pushed a commit that referenced this pull request Jan 29, 2025
@lianetm
Copy link
Member

lianetm commented Jan 29, 2025

Merged to trunk and cherry-picked to 4.0

frankvicky added a commit to frankvicky/kafka that referenced this pull request Jan 30, 2025
…assic consumer

JIRA: KAFKA-18645
see discussion:
apache#18590 (comment)

In the classic consumer, the timeout respects request.timeout.ms.
However, in the async consumer, this logic is either missing or only
applies to individual requests. Unlike the classic consumer, where
request.timeout.ms works for the entire coordinator closing behavior,
the async implementation handles timeouts differently.

We should align the close timeout-handling to enable
ConsumerBounceTest#testClose
frankvicky added a commit to frankvicky/kafka that referenced this pull request Jan 31, 2025
…assic consumer

JIRA: KAFKA-18645
see discussion:
apache#18590 (comment)

In the classic consumer, the timeout respects request.timeout.ms.
However, in the async consumer, this logic is either missing or only
applies to individual requests. Unlike the classic consumer, where
request.timeout.ms works for the entire coordinator closing behavior,
the async implementation handles timeouts differently.

We should align the close timeout-handling to enable
ConsumerBounceTest#testClose
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved clients consumer core Kafka Broker ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol small Small PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants