Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Avoid block markDeletePosition forward when skip lost entries #4

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -2890,6 +2890,54 @@ public void deleteFailed(ManagedLedgerException ex, Object ctx) {
}, null);
}

/**
* Manually acknowledge all entries from startPosition to endPosition.
* - Since this is an uncommon event, we focus on maintainability. So we do not modify
* {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call
* {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
* - This method is valid regardless of the consumer ACK type.
* - If there is a consumer ack request after this event, it will also work.
*/
public void skipNonRecoverableEntries(Position startPosition, Position endPosition){
long ledgerId = startPosition.getLedgerId();
LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
if (ledgerInfo == null) {
return;
}

long startEntryId = Math.max(0, startPosition.getEntryId());
long endEntryId = ledgerId != endPosition.getLedgerId() ? ledgerInfo.getEntries() : endPosition.getEntryId();
if (startEntryId >= endEntryId) {
return;
}

lock.writeLock().lock();
log.warn("[{}] [{}] Since these entry for ledger [{}] is lost and the autoSkipNonRecoverableData is true, "
+ "these entries [{}:{}) will be auto acknowledge in subscription",
ledger.getName(), name, ledgerId, startEntryId, endEntryId);
try {
for (long i = startEntryId; i < endEntryId; i++) {
if (!individualDeletedMessages.contains(ledgerId, i)) {
asyncDelete(PositionFactory.create(ledgerId, i), new AsyncCallbacks.DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
// ignore.
}

@Override
public void deleteFailed(ManagedLedgerException ex, Object ctx) {
// The method internalMarkDelete already handled the failure operation. We only need to
// make sure the memory state is updated.
// If the broker crashed, the non-recoverable ledger will be detected again.
}
}, null);
}
}
} finally {
lock.writeLock().unlock();
}
}

// //////////////////////////////////////////////////

void startCreatingNewMetadataLedger() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
updateReadPosition(nexReadPosition);
if (lostLedger != null) {
cursor.getManagedLedger().skipNonRecoverableLedger(lostLedger);
} else {
cursor.skipNonRecoverableEntries(readPosition, nexReadPosition);
}
checkReadCompletion();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4766,6 +4766,53 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
&& cursorReadPosition.getEntryId() == expectReadPosition.getEntryId());
}

@Test
public void testSkipNonRecoverableEntries() throws ManagedLedgerException, InterruptedException {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
int maxMessagePerLedger = 10;
managedLedgerConfig.setMaxEntriesPerLedger(maxMessagePerLedger);
ManagedLedger ledger = factory.open("testSkipNonRecoverableEntries", managedLedgerConfig);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("my-cursor");

Position lacPosition = ledger.getLastConfirmedEntry();
long ledgerId = lacPosition.getLedgerId();
assertEquals(PositionFactory.create(ledgerId, -1), cursor.getMarkDeletedPosition());

// Mock add 10 entry
for (int i = 0; i < 10; i++) {
ledger.addEntry(String.valueOf(i).getBytes());
}

// read 2 entry and delete these entries, MarkDeletedPosition move forward
List<Entry> entries = cursor.readEntries(2);
for (Entry entry : entries) {
cursor.delete(entry.getPosition());
}
assertEquals(PositionFactory.create(ledgerId, 1), cursor.getMarkDeletedPosition());

// read the next 6 entry and not delete, MarkDeletedPosition not move forward
entries = cursor.readEntries(6);
assertEquals(PositionFactory.create(ledgerId, 1), cursor.getMarkDeletedPosition());

// delete last read entry, MarkDeletedPosition not move forward
Entry lastEntry = entries.get(entries.size() - 1);
cursor.delete(lastEntry.getPosition());
assertEquals(PositionFactory.create(ledgerId, 1), cursor.getMarkDeletedPosition());

// call skip entries, MarkDeletedPosition move forward
cursor.skipNonRecoverableEntries(cursor.getMarkDeletedPosition(),
PositionFactory.create(ledgerId, lastEntry.getEntryId()));
assertEquals(PositionFactory.create(ledgerId, lastEntry.getEntryId()), cursor.getMarkDeletedPosition());

// repeat call skip entries, MarkDeletedPosition not change
cursor.skipNonRecoverableEntries(cursor.getMarkDeletedPosition(),
PositionFactory.create(ledgerId, lastEntry.getEntryId()));
assertEquals(PositionFactory.create(ledgerId, lastEntry.getEntryId()), cursor.getMarkDeletedPosition());

cursor.close();
ledger.close();
}

@Test
public void testRecoverCursorWithTerminateManagedLedger() throws Exception {
String mlName = "my_test_ledger";
Expand Down
Loading