Skip to content

Commit

Permalink
Spike on abort merge
Browse files Browse the repository at this point in the history
Aborting during stored fields checksumming. Unfortunately looks like it does
not work since merge does not wrap the directory used for the input here - since
it uses a pooled reader and clones the input.
  • Loading branch information
henningandersen committed May 3, 2024
1 parent fc71923 commit 79e3d1b
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@

public class ForceMergeIT extends ESIntegTestCase {

public void testForceMerge() {
internalCluster().ensureAtLeastNumDataNodes(2);
final String index = "test-index";
createIndex(index, 1, 1);
ensureGreen(index);

index(index, "1", "{}");
flush(index);
index(index, "2", "{}");
flush(index);
final BroadcastResponse forceMergeResponse = indicesAdmin().prepareForceMerge(index).setMaxNumSegments(1).get();


}
public void testForceMergeUUIDConsistent() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
final String index = "test-index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.FilterIndexInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
Expand Down Expand Up @@ -217,4 +224,50 @@ void refreshConfig() {
disableAutoIOThrottle();
}
}

@Override
public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in) {
Directory original = super.wrapForMerge(merge, in);

return new FilterDirectory(original) {
@Override
public ChecksumIndexInput openChecksumInput(String name, IOContext context) throws IOException {
IndexInput main = openInput(name, context);
return new BufferedChecksumIndexInput(new AbortingIndexingInput(main, merge));
}
};
}

private static class AbortingIndexingInput extends FilterIndexInput {
private final MergePolicy.OneMerge merge;

AbortingIndexingInput(IndexInput in, MergePolicy.OneMerge merge) {
super(in.toString(), in);
this.merge = merge;
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
merge.checkAborted();
super.readBytes(b, offset, len);
}

@Override
public byte readByte() throws IOException {
merge.checkAborted();
return super.readByte();
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
IndexInput sliced = super.slice(sliceDescription, offset, length);
return new AbortingIndexingInput(sliced, merge);
}

@Override
public IndexInput clone() {
IndexInput clone = super.clone();
return new AbortingIndexingInput(clone, merge);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3401,4 +3401,9 @@ private static boolean assertGetUsesIdField(Get get) {
protected long getPreCommitSegmentGeneration() {
return preCommitSegmentGeneration.get();
}

// for tests
Set<OnGoingMerge> onGoingMerges() {
return mergeScheduler.onGoingMerges();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,8 @@ public long getTotalBytesSize() {
public List<SegmentCommitInfo> getMergedSegments() {
return oneMerge.segments;
}

public boolean isAborted() {
return oneMerge.isAborted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.FilterIndexInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
Expand Down Expand Up @@ -121,6 +125,7 @@
import org.elasticsearch.index.shard.SearcherHelper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.store.LuceneFilesExtensions;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.TestTranslog;
Expand Down Expand Up @@ -7703,6 +7708,93 @@ public void testFlushListener() throws Exception {
}
}

public void testMergeAbortsDuringStoredFieldsMerge() throws IOException, InterruptedException {
Directory directory = newDirectory();
CyclicBarrier barrier = new CyclicBarrier(2);
AtomicBoolean enableSlowRead = new AtomicBoolean();
Directory mockDirectory = new FilterDirectory(directory) {
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
IndexInput indexInput = super.openInput(name, context);
if (name.endsWith("." + LuceneFilesExtensions.FDT.getExtension()) == false) {
return indexInput;
}

logger.info("--> opening " + name);
return new FilterIndexInput(indexInput.toString(), indexInput) {
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
if (enableSlowRead.get()) {
logger.info("--> wait for barrier " + Thread.currentThread());
safeAwait(barrier);
logger.info("--> barrier 1 reached " + Thread.currentThread());
safeAwait(barrier);
logger.info("--> barrier 2 reached " + Thread.currentThread());
}
super.readBytes(b, offset, len);
}

@Override
public IndexInput clone() {
fail("clone not expected");
return super.clone();
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
fail("slice not expected");
return super.slice(sliceDescription, offset, length);
}
};
}
};
try (Store store = createStore(mockDirectory); InternalEngine engine = createEngine(store, createTempDir())) {
// 2 segments
for (int round = 0; round < 2; ++round) {
int numDocs = between(10000, 100000);
for (int i = 0; i < numDocs; ++i) {
index(engine, i);
}
engine.flush(true, true);
// engine.refresh("test");
}

Thread closer = new Thread(() -> {
try {
engine.close();
} catch (IOException e) {
fail(e);
throw new RuntimeException(e);
}
logger.info("--> closer done");
});

Thread waiter = new Thread(() -> {
safeAwait(barrier);
closer.start();
try {
assertBusy(() -> engine.onGoingMerges().forEach(onGoingMerge -> assertTrue(onGoingMerge.isAborted())));
} catch (Exception e) {
throw new RuntimeException(e);
}
safeAwait(barrier);
logger.info("--> waiter done");
});
waiter.start();

enableSlowRead.set(true);

// verification is that this completes without failing the wait on barrier.
expectThrows(AlreadyClosedException.class, () ->
engine.forceMerge(true, 1, false, randomUUID())
);
waiter.join(10000);
closer.join(10000);
assertThat(waiter.isAlive(), is(false));
assertThat(closer.isAlive(), is(false));
}
}

private static void assertCommitGenerations(Map<IndexCommit, Engine.IndexCommitRef> commits, List<Long> expectedGenerations) {
assertCommitGenerations(commits.values().stream().map(Engine.IndexCommitRef::getIndexCommit).toList(), expectedGenerations);
}
Expand Down

0 comments on commit 79e3d1b

Please sign in to comment.