Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Track in/out pages in exchange (#120867) #120943

Merged
merged 4 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
/**
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
* It holds a map of {@link ExchangeSinkHandler} instances for each node in the cluster to serve {@link ExchangeRequest}s
* To connect exchange sources to exchange sinks, use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)}.
* To connect exchange sources to exchange sinks,
* use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)}.
*/
public final class ExchangeService extends AbstractLifecycleComponent {
// TODO: Make this a child action of the data node transport to ensure that exchanges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

/**
* An {@link ExchangeSinkHandler} receives pages and status from its {@link ExchangeSink}s, which are created using
* {@link #createExchangeSink()}} method. Pages and status can then be retrieved asynchronously by {@link ExchangeSourceHandler}s
* {@link #createExchangeSink(Runnable)}} method. Pages and status can then be retrieved asynchronously by {@link ExchangeSourceHandler}s
* using the {@link #fetchPageAsync(boolean, ActionListener)} method.
*
* @see #createExchangeSink()
* @see #createExchangeSink(Runnable)
* @see #fetchPageAsync(boolean, ActionListener)
* @see ExchangeSourceHandler
*/
Expand All @@ -52,16 +52,19 @@ public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSup

private class ExchangeSinkImpl implements ExchangeSink {
boolean finished;
private final Runnable onPageFetched;
private final SubscribableListener<Void> onFinished = new SubscribableListener<>();

ExchangeSinkImpl() {
ExchangeSinkImpl(Runnable onPageFetched) {
this.onPageFetched = onPageFetched;
onChanged();
buffer.addCompletionListener(onFinished);
outstandingSinks.incrementAndGet();
}

@Override
public void addPage(Page page) {
onPageFetched.run();
buffer.addPage(page);
notifyListeners();
}
Expand Down Expand Up @@ -101,7 +104,7 @@ public IsBlockedResult waitForWriting() {
* @param sourceFinished if true, then this handler can finish as sources have enough pages.
* @param listener the listener that will be notified when pages are ready or this handler is finished
* @see RemoteSink
* @see ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)
* @see ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)
*/
public void fetchPageAsync(boolean sourceFinished, ActionListener<ExchangeResponse> listener) {
if (sourceFinished) {
Expand Down Expand Up @@ -161,10 +164,11 @@ private void notifyListeners() {
/**
* Create a new exchange sink for exchanging data
*
* @param onPageFetched a {@link Runnable} that will be called when a page is fetched.
* @see ExchangeSinkOperator
*/
public ExchangeSink createExchangeSink() {
return new ExchangeSinkImpl();
public ExchangeSink createExchangeSink(Runnable onPageFetched) {
return new ExchangeSinkImpl(onPageFetched);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
/**
* An {@link ExchangeSourceHandler} asynchronously fetches pages and status from multiple {@link RemoteSink}s
* and feeds them to its {@link ExchangeSource}, which are created using the {@link #createExchangeSource()}) method.
* {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, boolean, int, ActionListener)}) method.
* {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)}) method.
*
* @see #createExchangeSource()
* @see #addRemoteSink(RemoteSink, boolean, int, ActionListener)
* @see #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)
*/
public final class ExchangeSourceHandler {
private final ExchangeBuffer buffer;
Expand Down Expand Up @@ -185,11 +185,13 @@ private final class RemoteSinkFetcher {
private volatile boolean finished = false;
private final RemoteSink remoteSink;
private final boolean failFast;
private final Runnable onPageFetched;
private final ActionListener<Void> completionListener;

RemoteSinkFetcher(RemoteSink remoteSink, boolean failFast, ActionListener<Void> completionListener) {
RemoteSinkFetcher(RemoteSink remoteSink, boolean failFast, Runnable onPageFetched, ActionListener<Void> completionListener) {
outstandingSinks.trackNewInstance();
this.remoteSink = remoteSink;
this.onPageFetched = onPageFetched;
this.failFast = failFast;
this.completionListener = completionListener;
}
Expand All @@ -203,6 +205,7 @@ void fetchPage() {
remoteSink.fetchPageAsync(toFinishSinks, ActionListener.wrap(resp -> {
Page page = resp.takePage();
if (page != null) {
onPageFetched.run();
buffer.addPage(page);
}
if (resp.finished()) {
Expand Down Expand Up @@ -252,19 +255,26 @@ void onSinkComplete() {
/**
* Add a remote sink as a new data source of this handler. The handler will start fetching data from this remote sink intermediately.
*
* @param remoteSink the remote sink
* @param failFast determines how failures in this remote sink are handled:
* - If {@code false}, failures from this remote sink will not cause the exchange source to abort.
* Callers must handle these failures notified via {@code listener}.
* - If {@code true}, failures from this remote sink will cause the exchange source to abort.
* Callers can safely ignore failures notified via this listener, as they are collected and
* reported by the exchange source.
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages.
* More clients reduce latency, but add overhead.
* @param listener a listener that will be notified when the sink fails or completes
* @param remoteSink the remote sink
* @param failFast determines how failures in this remote sink are handled:
* - If {@code false}, failures from this remote sink will not cause the exchange source to abort.
* Callers must handle these failures notified via {@code listener}.
* - If {@code true}, failures from this remote sink will cause the exchange source to abort.
* Callers can safely ignore failures notified via this listener, as they are collected and
* reported by the exchange source.
* @param onPageFetched a callback that will be called when a page is fetched from the remote sink
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages.
* More clients reduce latency, but add overhead.
* @param listener a listener that will be notified when the sink fails or completes
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
*/
public void addRemoteSink(RemoteSink remoteSink, boolean failFast, int instances, ActionListener<Void> listener) {
public void addRemoteSink(
RemoteSink remoteSink,
boolean failFast,
Runnable onPageFetched,
int instances,
ActionListener<Void> listener
) {
final int sinkId = nextSinkId.incrementAndGet();
remoteSinks.put(sinkId, remoteSink);
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(
Expand All @@ -284,7 +294,7 @@ public void onFailure(Exception e) {
protected void doRun() {
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(sinkListener)) {
for (int i = 0; i < instances; i++) {
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, refs.acquire());
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, onPageFetched, refs.acquire());
fetcher.fetchPage();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void testEarlyTermination() {
final int maxAllowedRows = between(1, 100);
final AtomicInteger processedRows = new AtomicInteger(0);
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity());
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
@Override
public Block eval(Page page) {
Expand Down Expand Up @@ -335,7 +335,7 @@ public void testResumeOnEarlyFinish() throws Exception {
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"), sourceFuture);
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity());
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
Driver driver = new Driver(driverContext, sourceOperator, List.of(), sinkOperator, () -> {});
PlainActionFuture<Void> future = new PlainActionFuture<>();
Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
sourceExchanger.addRemoteSink(
sinkExchanger::fetchPageAsync,
randomBoolean(),
() -> {},
1,
ActionListener.<Void>noop().delegateResponse((l, e) -> {
throw new AssertionError("unexpected failure", e);
Expand Down Expand Up @@ -248,7 +249,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context),
intermediateOperatorItr.next()
),
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(), Function.identity()),
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity()),
() -> {}
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,35 +97,50 @@ public void testBasic() throws Exception {
pages[i] = new Page(blockFactory.newConstantIntBlockWith(i, 2));
}
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
ExchangeSink sink1 = sinkExchanger.createExchangeSink();
ExchangeSink sink2 = sinkExchanger.createExchangeSink();
AtomicInteger pagesAddedToSink = new AtomicInteger();
ExchangeSink sink1 = sinkExchanger.createExchangeSink(pagesAddedToSink::incrementAndGet);
ExchangeSink sink2 = sinkExchanger.createExchangeSink(pagesAddedToSink::incrementAndGet);
PlainActionFuture<Void> sourceCompletion = new PlainActionFuture<>();
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(3, threadPool.executor(ESQL_TEST_EXECUTOR), sourceCompletion);
ExchangeSource source = sourceExchanger.createExchangeSource();
sourceExchanger.addRemoteSink(sinkExchanger::fetchPageAsync, randomBoolean(), 1, ActionListener.noop());
AtomicInteger pagesAddedToSource = new AtomicInteger();
sourceExchanger.addRemoteSink(
sinkExchanger::fetchPageAsync,
randomBoolean(),
pagesAddedToSource::incrementAndGet,
1,
ActionListener.noop()
);
SubscribableListener<Void> waitForReading = source.waitForReading().listener();
assertFalse(waitForReading.isDone());
assertNull(source.pollPage());
assertTrue(sink1.waitForWriting().listener().isDone());
randomFrom(sink1, sink2).addPage(pages[0]);
assertThat(pagesAddedToSink.get(), equalTo(1));
randomFrom(sink1, sink2).addPage(pages[1]);
assertThat(pagesAddedToSink.get(), equalTo(2));
assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(2)));
// source and sink buffers can store 5 pages
for (Page p : List.of(pages[2], pages[3], pages[4])) {
ExchangeSink sink = randomFrom(sink1, sink2);
assertBusy(() -> assertTrue(sink.waitForWriting().listener().isDone()));
sink.addPage(p);
}
assertThat(pagesAddedToSink.get(), equalTo(5));
assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(3)));
// sink buffer is full
assertFalse(randomFrom(sink1, sink2).waitForWriting().listener().isDone());
assertBusy(() -> assertTrue(source.waitForReading().listener().isDone()));
assertEquals(pages[0], source.pollPage());
assertBusy(() -> assertTrue(source.waitForReading().listener().isDone()));
assertEquals(pages[1], source.pollPage());
assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(5)));
// sink can write again
assertBusy(() -> assertTrue(randomFrom(sink1, sink2).waitForWriting().listener().isDone()));
randomFrom(sink1, sink2).addPage(pages[5]);
assertBusy(() -> assertTrue(randomFrom(sink1, sink2).waitForWriting().listener().isDone()));
randomFrom(sink1, sink2).addPage(pages[6]);
assertThat(pagesAddedToSink.get(), equalTo(7));
// sink buffer is full
assertFalse(randomFrom(sink1, sink2).waitForWriting().listener().isDone());
sink1.finish();
Expand All @@ -134,6 +149,7 @@ public void testBasic() throws Exception {
assertBusy(() -> assertTrue(source.waitForReading().listener().isDone()));
assertEquals(pages[2 + i], source.pollPage());
}
assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(7)));
// source buffer is empty
assertFalse(source.waitForReading().listener().isDone());
assertBusy(() -> assertTrue(sink2.waitForWriting().listener().isDone()));
Expand Down Expand Up @@ -340,10 +356,16 @@ public void testConcurrentWithHandlers() {
sinkHandler = randomFrom(sinkHandlers);
} else {
sinkHandler = new ExchangeSinkHandler(blockFactory, randomExchangeBuffer(), threadPool.relativeTimeInMillisSupplier());
sourceExchanger.addRemoteSink(sinkHandler::fetchPageAsync, randomBoolean(), randomIntBetween(1, 3), ActionListener.noop());
sourceExchanger.addRemoteSink(
sinkHandler::fetchPageAsync,
randomBoolean(),
() -> {},
randomIntBetween(1, 3),
ActionListener.noop()
);
sinkHandlers.add(sinkHandler);
}
return sinkHandler.createExchangeSink();
return sinkHandler.createExchangeSink(() -> {});
};
final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
Expand Down Expand Up @@ -398,14 +420,14 @@ public void testExchangeSourceContinueOnFailure() {
l.onResponse(new ExchangeResponse(blockFactory, page, r.finished()));
}));
}
}, false, instance, ActionListener.wrap(r -> {
}, false, () -> {}, instance, ActionListener.wrap(r -> {
assertFalse(sinkFailed.get());
completedSinks.incrementAndGet();
}, e -> {
assertTrue(sinkFailed.get());
failedSinks.incrementAndGet();
}));
return sinkHandler.createExchangeSink();
return sinkHandler.createExchangeSink(() -> {});
};
Set<Integer> actualSeqNos = runConcurrentTest(
maxInputSeqNo,
Expand All @@ -430,7 +452,7 @@ public void testClosingSinks() {
Page p1 = new Page(block1);
Page p2 = new Page(block2);
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
ExchangeSink sink = sinkExchanger.createExchangeSink();
ExchangeSink sink = sinkExchanger.createExchangeSink(() -> {});
sink.addPage(p1);
sink.addPage(p2);
assertFalse(sink.waitForWriting().listener().isDone());
Expand Down Expand Up @@ -475,7 +497,7 @@ public void testFinishEarly() throws Exception {
throw new AssertionError(e);
}
}
}, false, between(1, 3), sinkCompleted);
}, false, () -> {}, between(1, 3), sinkCompleted);
threadPool.schedule(
() -> sourceHandler.finishEarly(randomBoolean(), ActionListener.noop()),
TimeValue.timeValueMillis(between(0, 10)),
Expand Down Expand Up @@ -526,6 +548,7 @@ public void testConcurrentWithTransportActions() {
sourceHandler.addRemoteSink(
exchange0.newRemoteSink(task, exchangeId, node0, connection),
randomBoolean(),
() -> {},
randomIntBetween(1, 5),
ActionListener.noop()
);
Expand All @@ -535,7 +558,7 @@ public void testConcurrentWithTransportActions() {
maxInputSeqNo,
maxOutputSeqNo,
sourceHandler::createExchangeSource,
sinkHandler::createExchangeSink
() -> sinkHandler.createExchangeSink(() -> {})
);
var expectedSeqNos = IntStream.range(0, Math.min(maxInputSeqNo, maxOutputSeqNo)).boxed().collect(Collectors.toSet());
assertThat(actualSeqNos, hasSize(expectedSeqNos.size()));
Expand Down Expand Up @@ -601,12 +624,18 @@ public void sendResponse(TransportResponse transportResponse) {
sourceHandler.addRemoteSink(
exchange0.newRemoteSink(task, exchangeId, node0, connection),
true,
() -> {},
randomIntBetween(1, 5),
ActionListener.noop()
);
Exception err = expectThrows(
Exception.class,
() -> runConcurrentTest(maxSeqNo, maxSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink)
() -> runConcurrentTest(
maxSeqNo,
maxSeqNo,
sourceHandler::createExchangeSource,
() -> sinkHandler.createExchangeSink(() -> {})
)
);
Throwable cause = ExceptionsHelper.unwrap(err, IOException.class);
assertNotNull(cause);
Expand Down
Loading