Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Don't call ManagedLedger#asyncAddEntry in Netty I/O thread #23983

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Feb 13, 2025

Motivation

#23940 brings a behavior change that the core logic of ManagedLedger#asyncAddEntry now won't switch threads, which means it will be executed directly in Netty I/O thread via PersistentTopic#asyncAddEntry.

The beforeAddEntry method calls theintercept and interceptWithNumberOfMessages methods for all broker entry interceptors and prepends a new broker entry metadata buffer on the original buffer (though it's just a composite buffer).

There is a risk that when many producers send messages to the same managed ledger concurrently, the process of asyncAddEntry might block the Netty I/O thread for some time and cause the performance regression.

Modifications

In PersistentTopic#publishMessage, expose the getExecutor() method for ManagedLedger and execute ManagedLedger#asyncAddEntry in that executor. The change of #12606 is moved to PersistentTopic as well that the buffer is retained before switching to another thread.

After that, only synchronize afterAddEntryToQueue with other synchronized methods of ManagedLedgerImpl. P.S. actually I don't think synchronized is needed here but the logic is not trivial like beforeAddEntryToQueue and beforeAddEntry, so I still retain it as synchronized.

ManagedLedgerImpl#asyncAddEntry still doesn't switch the thread, so it would still be possible for the downstream application to synchronize asyncAddEntry, either by adding a lock (e.g. synchronized) or executing this method is a single thread.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: BewareMyPower#40

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 13, 2025
@BewareMyPower BewareMyPower self-assigned this Feb 13, 2025
@BewareMyPower BewareMyPower added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages release/4.0.3 labels Feb 13, 2025
@BewareMyPower BewareMyPower marked this pull request as draft February 13, 2025 13:00
@BewareMyPower BewareMyPower marked this pull request as ready for review February 13, 2025 13:17
@BewareMyPower BewareMyPower marked this pull request as draft February 13, 2025 14:07
@BewareMyPower BewareMyPower marked this pull request as ready for review February 13, 2025 14:09
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An extra context switch for each entry is costly, especially when you have many small entries and little or no batching. That's why we put it on the same thread.

If the interceptor needs to do expensive work, maybe only the interceptor part should be done in a different thread, though it shouldn't affect it when we don't use interceptor.

@lhotari lhotari added the release/blocker Indicate the PR or issue that should block the release until it gets resolved label Feb 13, 2025
@lhotari
Copy link
Member

lhotari commented Feb 13, 2025

An extra context switch for each entry is costly, especially when you have many small entries and little or no batching. That's why we put it on the same thread.

@merlimat The thread switching was added in PR #9039, already in December 2020. The reason to make this change is related to a performance concern of #23940 changes which removed the thread switching.

public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}
// retain buffer in this thread
buffer.retain();
// Jump to specific thread to avoid contention from writers writing from different threads
final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
var added = false;
try {
// Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first
// element in `pendingAddEntries`.
synchronized (this) {
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
}
final var state = STATE_UPDATER.get(this);
beforeAddEntryToQueue(state);
pendingAddEntries.add(addOperation);
added = true;
afterAddEntryToQueue(state, addOperation);
}
} catch (Throwable throwable) {
if (!added) {
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
} // else: all elements of `pendingAddEntries` will fail in another thread
}
}

In Pulsar use cases, synchronization on CPU intensive operations (or blocking IO operations) in Netty IO threads could cause performance regressions. In this case, it would impact use cases where there's a large number of producers producing to a single topic.
Blocking IO threads will have a broader impact since it will impact Netty IO of all connections sharing the same IO thread.

Before #23940, the code looks like this:

public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}
// retain buffer in this thread
buffer.retain();
// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}

btw. In the Pulsar code base, we have a problem in how IO threads are used. IO threads are used to process work that shouldn't be handled with IO threads at all. I have created an issue #23865. There should be a separate thread pool for running blocking operations and CPU intensive synchronized operations.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @BewareMyPower. Some comments added in this first pass.

@BewareMyPower BewareMyPower marked this pull request as draft February 14, 2025 01:56
@BewareMyPower BewareMyPower marked this pull request as ready for review February 14, 2025 02:39
@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Feb 14, 2025

@merlimat The thread switching was added in PR #9039, already in December 2020.

@merlimat @lhotari to correct it, this is the very early behavior introduced in #1521.

This PR intends to decouple ManagedLedger#asyncAddEntry and PersistentTopic#asyncAddEntry so that the managed ledger interface can be more flexible for the downstream protocol handlers to use.

After that, all write operations from Pulsar client will still keep the original behavior that switches to managed ledger's executor to call ManagedLedger#asyncAddEntry.

However, regarding the downstream, for example, in my Kafka protocol handler implementation, PersistentTopic#publishMessage is not called in an I/O thread. Instead, it's called in an independent worker thread. Then I can choose to call persistentTopic.getManagedLedger().asyncAddEntry(/* ... */) in order, which can be achieved by adding the synchronized keyword or using the same worker thread for the same topic.

The comment here makes sense to a certain extent, but it might be a new topic (e.g. thread switching vs. synchronized) to discuss, which is beyond the scope of this PR. At least, the existing thread switching approach can already achieve high publish performance, which is verified by many benchmarks.

@BewareMyPower BewareMyPower marked this pull request as draft February 14, 2025 07:46
@BewareMyPower BewareMyPower marked this pull request as ready for review February 14, 2025 07:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs release/blocker Indicate the PR or issue that should block the release until it gets resolved release/4.0.3 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants