Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Refactor a private method to eliminate an unnecessary parameter #23915

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,17 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
permitsForConsumer.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(k)));
// a consumer was found for the sticky key hash and the entry can be dispatched
if (permits.intValue() > 0
&& canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
if (permits.intValue() > 0) {
boolean canDispatchEntry = canDispatchEntry(consumer, entry, readType, stickyKeyHash);
if (!canDispatchEntry) {
if (blockedByHash != null) {
blockedByHash.setTrue();
}
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
}
}
}
}
Expand Down Expand Up @@ -507,27 +512,18 @@ && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {

// checks if the entry can be dispatched to the consumer
private boolean canDispatchEntry(Consumer consumer, Entry entry,
ReadType readType, int stickyKeyHash,
MutableBoolean blockedByHash) {
ReadType readType, int stickyKeyHash) {
// If redeliveryMessages contains messages that correspond to the same hash as the entry to be dispatched
// do not send those messages for order guarantee
if (readType == ReadType.Normal && redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
if (blockedByHash != null) {
blockedByHash.setTrue();
}
return false;
}

if (drainingHashesRequired) {
// If the hash is draining, do not send the message
if (drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash)) {
if (blockedByHash != null) {
blockedByHash.setTrue();
}
return false;
}
}

return true;
}

Expand Down