Skip to content

Commit

Permalink
Track in/out pages in exchange (elastic#120867)
Browse files Browse the repository at this point in the history
This is a spin-off of the "retry node requests on shard-level failures" work.

Currently, a driver can execute against multiple shards simultaneously. 
If the execution fails and no pages are added to the sink, we can retry
the failed shards on another node. In another scenario, if no pages are
fetched or added to the exchange source and the entire data node request
fails, we can also retry the entire request. This change adds callbacks
to RemoteSink and ExchangeSink, allowing for tracking of in/out pages.
  • Loading branch information
dnhatn committed Jan 27, 2025
1 parent ecc4c92 commit c4708ea
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
/**
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
* It holds a map of {@link ExchangeSinkHandler} instances for each node in the cluster to serve {@link ExchangeRequest}s
* To connect exchange sources to exchange sinks, use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)}.
* To connect exchange sources to exchange sinks,
* use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)}.
*/
public final class ExchangeService extends AbstractLifecycleComponent {
// TODO: Make this a child action of the data node transport to ensure that exchanges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

/**
* An {@link ExchangeSinkHandler} receives pages and status from its {@link ExchangeSink}s, which are created using
* {@link #createExchangeSink()}} method. Pages and status can then be retrieved asynchronously by {@link ExchangeSourceHandler}s
* {@link #createExchangeSink(Runnable)}} method. Pages and status can then be retrieved asynchronously by {@link ExchangeSourceHandler}s
* using the {@link #fetchPageAsync(boolean, ActionListener)} method.
*
* @see #createExchangeSink()
* @see #createExchangeSink(Runnable)
* @see #fetchPageAsync(boolean, ActionListener)
* @see ExchangeSourceHandler
*/
Expand All @@ -52,16 +52,19 @@ public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSup

private class ExchangeSinkImpl implements ExchangeSink {
boolean finished;
private final Runnable onPageFetched;
private final SubscribableListener<Void> onFinished = new SubscribableListener<>();

ExchangeSinkImpl() {
ExchangeSinkImpl(Runnable onPageFetched) {
this.onPageFetched = onPageFetched;
onChanged();
buffer.addCompletionListener(onFinished);
outstandingSinks.incrementAndGet();
}

@Override
public void addPage(Page page) {
onPageFetched.run();
buffer.addPage(page);
notifyListeners();
}
Expand Down Expand Up @@ -101,7 +104,7 @@ public IsBlockedResult waitForWriting() {
* @param sourceFinished if true, then this handler can finish as sources have enough pages.
* @param listener the listener that will be notified when pages are ready or this handler is finished
* @see RemoteSink
* @see ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)
* @see ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)
*/
public void fetchPageAsync(boolean sourceFinished, ActionListener<ExchangeResponse> listener) {
if (sourceFinished) {
Expand Down Expand Up @@ -161,10 +164,11 @@ private void notifyListeners() {
/**
* Create a new exchange sink for exchanging data
*
* @param onPageFetched a {@link Runnable} that will be called when a page is fetched.
* @see ExchangeSinkOperator
*/
public ExchangeSink createExchangeSink() {
return new ExchangeSinkImpl();
public ExchangeSink createExchangeSink(Runnable onPageFetched) {
return new ExchangeSinkImpl(onPageFetched);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
/**
* An {@link ExchangeSourceHandler} asynchronously fetches pages and status from multiple {@link RemoteSink}s
* and feeds them to its {@link ExchangeSource}, which are created using the {@link #createExchangeSource()}) method.
* {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, boolean, int, ActionListener)}) method.
* {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)}) method.
*
* @see #createExchangeSource()
* @see #addRemoteSink(RemoteSink, boolean, int, ActionListener)
* @see #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)
*/
public final class ExchangeSourceHandler {
private final ExchangeBuffer buffer;
Expand Down Expand Up @@ -185,11 +185,13 @@ private final class RemoteSinkFetcher {
private volatile boolean finished = false;
private final RemoteSink remoteSink;
private final boolean failFast;
private final Runnable onPageFetched;
private final ActionListener<Void> completionListener;

RemoteSinkFetcher(RemoteSink remoteSink, boolean failFast, ActionListener<Void> completionListener) {
RemoteSinkFetcher(RemoteSink remoteSink, boolean failFast, Runnable onPageFetched, ActionListener<Void> completionListener) {
outstandingSinks.trackNewInstance();
this.remoteSink = remoteSink;
this.onPageFetched = onPageFetched;
this.failFast = failFast;
this.completionListener = completionListener;
}
Expand All @@ -203,6 +205,7 @@ void fetchPage() {
remoteSink.fetchPageAsync(toFinishSinks, ActionListener.wrap(resp -> {
Page page = resp.takePage();
if (page != null) {
onPageFetched.run();
buffer.addPage(page);
}
if (resp.finished()) {
Expand Down Expand Up @@ -252,19 +255,26 @@ void onSinkComplete() {
/**
* Add a remote sink as a new data source of this handler. The handler will start fetching data from this remote sink intermediately.
*
* @param remoteSink the remote sink
* @param failFast determines how failures in this remote sink are handled:
* - If {@code false}, failures from this remote sink will not cause the exchange source to abort.
* Callers must handle these failures notified via {@code listener}.
* - If {@code true}, failures from this remote sink will cause the exchange source to abort.
* Callers can safely ignore failures notified via this listener, as they are collected and
* reported by the exchange source.
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages.
* More clients reduce latency, but add overhead.
* @param listener a listener that will be notified when the sink fails or completes
* @param remoteSink the remote sink
* @param failFast determines how failures in this remote sink are handled:
* - If {@code false}, failures from this remote sink will not cause the exchange source to abort.
* Callers must handle these failures notified via {@code listener}.
* - If {@code true}, failures from this remote sink will cause the exchange source to abort.
* Callers can safely ignore failures notified via this listener, as they are collected and
* reported by the exchange source.
* @param onPageFetched a callback that will be called when a page is fetched from the remote sink
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages.
* More clients reduce latency, but add overhead.
* @param listener a listener that will be notified when the sink fails or completes
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
*/
public void addRemoteSink(RemoteSink remoteSink, boolean failFast, int instances, ActionListener<Void> listener) {
public void addRemoteSink(
RemoteSink remoteSink,
boolean failFast,
Runnable onPageFetched,
int instances,
ActionListener<Void> listener
) {
final int sinkId = nextSinkId.incrementAndGet();
remoteSinks.put(sinkId, remoteSink);
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(
Expand All @@ -284,7 +294,7 @@ public void onFailure(Exception e) {
protected void doRun() {
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(sinkListener)) {
for (int i = 0; i < instances; i++) {
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, refs.acquire());
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, onPageFetched, refs.acquire());
fetcher.fetchPage();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void testEarlyTermination() {
final int maxAllowedRows = between(1, 100);
final AtomicInteger processedRows = new AtomicInteger(0);
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity());
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
@Override
public Block eval(Page page) {
Expand Down Expand Up @@ -335,7 +335,7 @@ public void testResumeOnEarlyFinish() throws Exception {
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"), sourceFuture);
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity());
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
Driver driver = new Driver(driverContext, sourceOperator, List.of(), sinkOperator, () -> {});
PlainActionFuture<Void> future = new PlainActionFuture<>();
Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
sourceExchanger.addRemoteSink(
sinkExchanger::fetchPageAsync,
randomBoolean(),
() -> {},
1,
ActionListener.<Void>noop().delegateResponse((l, e) -> {
throw new AssertionError("unexpected failure", e);
Expand Down Expand Up @@ -248,7 +249,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context),
intermediateOperatorItr.next()
),
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(), Function.identity()),
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity()),
() -> {}
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,35 +97,50 @@ public void testBasic() throws Exception {
pages[i] = new Page(blockFactory.newConstantIntBlockWith(i, 2));
}
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
ExchangeSink sink1 = sinkExchanger.createExchangeSink();
ExchangeSink sink2 = sinkExchanger.createExchangeSink();
AtomicInteger pagesAddedToSink = new AtomicInteger();
ExchangeSink sink1 = sinkExchanger.createExchangeSink(pagesAddedToSink::incrementAndGet);
ExchangeSink sink2 = sinkExchanger.createExchangeSink(pagesAddedToSink::incrementAndGet);
PlainActionFuture<Void> sourceCompletion = new PlainActionFuture<>();
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(3, threadPool.executor(ESQL_TEST_EXECUTOR), sourceCompletion);
ExchangeSource source = sourceExchanger.createExchangeSource();
sourceExchanger.addRemoteSink(sinkExchanger::fetchPageAsync, randomBoolean(), 1, ActionListener.noop());
AtomicInteger pagesAddedToSource = new AtomicInteger();
sourceExchanger.addRemoteSink(
sinkExchanger::fetchPageAsync,
randomBoolean(),
pagesAddedToSource::incrementAndGet,
1,
ActionListener.noop()
);
SubscribableListener<Void> waitForReading = source.waitForReading().listener();
assertFalse(waitForReading.isDone());
assertNull(source.pollPage());
assertTrue(sink1.waitForWriting().listener().isDone());
randomFrom(sink1, sink2).addPage(pages[0]);
assertThat(pagesAddedToSink.get(), equalTo(1));
randomFrom(sink1, sink2).addPage(pages[1]);
assertThat(pagesAddedToSink.get(), equalTo(2));
assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(2)));
// source and sink buffers can store 5 pages
for (Page p : List.of(pages[2], pages[3], pages[4])) {
ExchangeSink sink = randomFrom(sink1, sink2);
assertBusy(() -> assertTrue(sink.waitForWriting().listener().isDone()));
sink.addPage(p);
}
assertThat(pagesAddedToSink.get(), equalTo(5));
assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(3)));
// sink buffer is full
assertFalse(randomFrom(sink1, sink2).waitForWriting().listener().isDone());
assertBusy(() -> assertTrue(source.waitForReading().listener().isDone()));
assertEquals(pages[0], source.pollPage());
assertBusy(() -> assertTrue(source.waitForReading().listener().isDone()));
assertEquals(pages[1], source.pollPage());
assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(5)));
// sink can write again
assertBusy(() -> assertTrue(randomFrom(sink1, sink2).waitForWriting().listener().isDone()));
randomFrom(sink1, sink2).addPage(pages[5]);
assertBusy(() -> assertTrue(randomFrom(sink1, sink2).waitForWriting().listener().isDone()));
randomFrom(sink1, sink2).addPage(pages[6]);
assertThat(pagesAddedToSink.get(), equalTo(7));
// sink buffer is full
assertFalse(randomFrom(sink1, sink2).waitForWriting().listener().isDone());
sink1.finish();
Expand All @@ -134,6 +149,7 @@ public void testBasic() throws Exception {
assertBusy(() -> assertTrue(source.waitForReading().listener().isDone()));
assertEquals(pages[2 + i], source.pollPage());
}
assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(7)));
// source buffer is empty
assertFalse(source.waitForReading().listener().isDone());
assertBusy(() -> assertTrue(sink2.waitForWriting().listener().isDone()));
Expand Down Expand Up @@ -340,10 +356,16 @@ public void testConcurrentWithHandlers() {
sinkHandler = randomFrom(sinkHandlers);
} else {
sinkHandler = new ExchangeSinkHandler(blockFactory, randomExchangeBuffer(), threadPool.relativeTimeInMillisSupplier());
sourceExchanger.addRemoteSink(sinkHandler::fetchPageAsync, randomBoolean(), randomIntBetween(1, 3), ActionListener.noop());
sourceExchanger.addRemoteSink(
sinkHandler::fetchPageAsync,
randomBoolean(),
() -> {},
randomIntBetween(1, 3),
ActionListener.noop()
);
sinkHandlers.add(sinkHandler);
}
return sinkHandler.createExchangeSink();
return sinkHandler.createExchangeSink(() -> {});
};
final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
Expand Down Expand Up @@ -398,14 +420,14 @@ public void testExchangeSourceContinueOnFailure() {
l.onResponse(new ExchangeResponse(blockFactory, page, r.finished()));
}));
}
}, false, instance, ActionListener.wrap(r -> {
}, false, () -> {}, instance, ActionListener.wrap(r -> {
assertFalse(sinkFailed.get());
completedSinks.incrementAndGet();
}, e -> {
assertTrue(sinkFailed.get());
failedSinks.incrementAndGet();
}));
return sinkHandler.createExchangeSink();
return sinkHandler.createExchangeSink(() -> {});
};
Set<Integer> actualSeqNos = runConcurrentTest(
maxInputSeqNo,
Expand All @@ -430,7 +452,7 @@ public void testClosingSinks() {
Page p1 = new Page(block1);
Page p2 = new Page(block2);
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
ExchangeSink sink = sinkExchanger.createExchangeSink();
ExchangeSink sink = sinkExchanger.createExchangeSink(() -> {});
sink.addPage(p1);
sink.addPage(p2);
assertFalse(sink.waitForWriting().listener().isDone());
Expand Down Expand Up @@ -475,7 +497,7 @@ public void testFinishEarly() throws Exception {
throw new AssertionError(e);
}
}
}, false, between(1, 3), sinkCompleted);
}, false, () -> {}, between(1, 3), sinkCompleted);
threadPool.schedule(
() -> sourceHandler.finishEarly(randomBoolean(), ActionListener.noop()),
TimeValue.timeValueMillis(between(0, 10)),
Expand Down Expand Up @@ -526,6 +548,7 @@ public void testConcurrentWithTransportActions() {
sourceHandler.addRemoteSink(
exchange0.newRemoteSink(task, exchangeId, node0, connection),
randomBoolean(),
() -> {},
randomIntBetween(1, 5),
ActionListener.noop()
);
Expand All @@ -535,7 +558,7 @@ public void testConcurrentWithTransportActions() {
maxInputSeqNo,
maxOutputSeqNo,
sourceHandler::createExchangeSource,
sinkHandler::createExchangeSink
() -> sinkHandler.createExchangeSink(() -> {})
);
var expectedSeqNos = IntStream.range(0, Math.min(maxInputSeqNo, maxOutputSeqNo)).boxed().collect(Collectors.toSet());
assertThat(actualSeqNos, hasSize(expectedSeqNos.size()));
Expand Down Expand Up @@ -601,12 +624,18 @@ public void sendResponse(TransportResponse transportResponse) {
sourceHandler.addRemoteSink(
exchange0.newRemoteSink(task, exchangeId, node0, connection),
true,
() -> {},
randomIntBetween(1, 5),
ActionListener.noop()
);
Exception err = expectThrows(
Exception.class,
() -> runConcurrentTest(maxSeqNo, maxSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink)
() -> runConcurrentTest(
maxSeqNo,
maxSeqNo,
sourceHandler::createExchangeSource,
() -> sinkHandler.createExchangeSink(() -> {})
)
);
Throwable cause = ExceptionsHelper.unwrap(err, IOException.class);
assertNotNull(cause);
Expand Down
Loading

0 comments on commit c4708ea

Please sign in to comment.