From f52b768fdcea32a7f9d9b7f8f656e86adfb8f8a5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 25 Jan 2025 20:43:14 -0800 Subject: [PATCH] Track in/out pages in exchange --- .../operator/exchange/ExchangeService.java | 3 +- .../exchange/ExchangeSinkHandler.java | 16 +++--- .../exchange/ExchangeSourceHandler.java | 40 +++++++++------ .../compute/operator/DriverTests.java | 4 +- .../operator/ForkingOperatorTestCase.java | 3 +- .../exchange/ExchangeServiceTests.java | 51 +++++++++++++++---- .../esql/planner/LocalExecutionPlanner.java | 25 ++++----- .../esql/plugin/ClusterComputeHandler.java | 5 +- .../xpack/esql/plugin/ComputeContext.java | 9 ++-- .../xpack/esql/plugin/ComputeService.java | 14 +++-- .../esql/plugin/DataNodeComputeHandler.java | 11 ++-- .../elasticsearch/xpack/esql/CsvTests.java | 5 +- .../optimizer/PhysicalPlanOptimizerTests.java | 5 +- 13 files changed, 125 insertions(+), 66 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index d1a5d1757bc90..ac02273a48ee4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -51,7 +51,8 @@ /** * {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes. * It holds a map of {@link ExchangeSinkHandler} instances for each node in the cluster to serve {@link ExchangeRequest}s - * To connect exchange sources to exchange sinks, use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)}. + * To connect exchange sources to exchange sinks, + * use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)}. */ public final class ExchangeService extends AbstractLifecycleComponent { // TODO: Make this a child action of the data node transport to ensure that exchanges diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java index 21eb2ed565618..ef137f7306e67 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java @@ -22,10 +22,10 @@ /** * An {@link ExchangeSinkHandler} receives pages and status from its {@link ExchangeSink}s, which are created using - * {@link #createExchangeSink()}} method. Pages and status can then be retrieved asynchronously by {@link ExchangeSourceHandler}s + * {@link #createExchangeSink(Runnable)}} method. Pages and status can then be retrieved asynchronously by {@link ExchangeSourceHandler}s * using the {@link #fetchPageAsync(boolean, ActionListener)} method. * - * @see #createExchangeSink() + * @see #createExchangeSink(Runnable) * @see #fetchPageAsync(boolean, ActionListener) * @see ExchangeSourceHandler */ @@ -52,9 +52,11 @@ public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSup private class ExchangeSinkImpl implements ExchangeSink { boolean finished; + private final Runnable onPageFetched; private final SubscribableListener onFinished = new SubscribableListener<>(); - ExchangeSinkImpl() { + ExchangeSinkImpl(Runnable onPageFetched) { + this.onPageFetched = onPageFetched; onChanged(); buffer.addCompletionListener(onFinished); outstandingSinks.incrementAndGet(); @@ -62,6 +64,7 @@ private class ExchangeSinkImpl implements ExchangeSink { @Override public void addPage(Page page) { + onPageFetched.run(); buffer.addPage(page); notifyListeners(); } @@ -101,7 +104,7 @@ public IsBlockedResult waitForWriting() { * @param sourceFinished if true, then this handler can finish as sources have enough pages. * @param listener the listener that will be notified when pages are ready or this handler is finished * @see RemoteSink - * @see ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener) + * @see ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener) */ public void fetchPageAsync(boolean sourceFinished, ActionListener listener) { if (sourceFinished) { @@ -161,10 +164,11 @@ private void notifyListeners() { /** * Create a new exchange sink for exchanging data * + * @param onPageFetched a {@link Runnable} that will be called when a page is fetched. * @see ExchangeSinkOperator */ - public ExchangeSink createExchangeSink() { - return new ExchangeSinkImpl(); + public ExchangeSink createExchangeSink(Runnable onPageFetched) { + return new ExchangeSinkImpl(onPageFetched); } /** diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java index aa722695b841e..db9a62da5d9ea 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java @@ -27,10 +27,10 @@ /** * An {@link ExchangeSourceHandler} asynchronously fetches pages and status from multiple {@link RemoteSink}s * and feeds them to its {@link ExchangeSource}, which are created using the {@link #createExchangeSource()}) method. - * {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, boolean, int, ActionListener)}) method. + * {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)}) method. * * @see #createExchangeSource() - * @see #addRemoteSink(RemoteSink, boolean, int, ActionListener) + * @see #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener) */ public final class ExchangeSourceHandler { private final ExchangeBuffer buffer; @@ -185,11 +185,13 @@ private final class RemoteSinkFetcher { private volatile boolean finished = false; private final RemoteSink remoteSink; private final boolean failFast; + private final Runnable onPageFetched; private final ActionListener completionListener; - RemoteSinkFetcher(RemoteSink remoteSink, boolean failFast, ActionListener completionListener) { + RemoteSinkFetcher(RemoteSink remoteSink, boolean failFast, Runnable onPageFetched, ActionListener completionListener) { outstandingSinks.trackNewInstance(); this.remoteSink = remoteSink; + this.onPageFetched = onPageFetched; this.failFast = failFast; this.completionListener = completionListener; } @@ -203,6 +205,7 @@ void fetchPage() { remoteSink.fetchPageAsync(toFinishSinks, ActionListener.wrap(resp -> { Page page = resp.takePage(); if (page != null) { + onPageFetched.run(); buffer.addPage(page); } if (resp.finished()) { @@ -252,19 +255,26 @@ void onSinkComplete() { /** * Add a remote sink as a new data source of this handler. The handler will start fetching data from this remote sink intermediately. * - * @param remoteSink the remote sink - * @param failFast determines how failures in this remote sink are handled: - * - If {@code false}, failures from this remote sink will not cause the exchange source to abort. - * Callers must handle these failures notified via {@code listener}. - * - If {@code true}, failures from this remote sink will cause the exchange source to abort. - * Callers can safely ignore failures notified via this listener, as they are collected and - * reported by the exchange source. - * @param instances the number of concurrent ``clients`` that this handler should use to fetch pages. - * More clients reduce latency, but add overhead. - * @param listener a listener that will be notified when the sink fails or completes + * @param remoteSink the remote sink + * @param failFast determines how failures in this remote sink are handled: + * - If {@code false}, failures from this remote sink will not cause the exchange source to abort. + * Callers must handle these failures notified via {@code listener}. + * - If {@code true}, failures from this remote sink will cause the exchange source to abort. + * Callers can safely ignore failures notified via this listener, as they are collected and + * reported by the exchange source. + * @param onPageFetched a callback that will be called when a page is fetched from the remote sink + * @param instances the number of concurrent ``clients`` that this handler should use to fetch pages. + * More clients reduce latency, but add overhead. + * @param listener a listener that will be notified when the sink fails or completes * @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener) */ - public void addRemoteSink(RemoteSink remoteSink, boolean failFast, int instances, ActionListener listener) { + public void addRemoteSink( + RemoteSink remoteSink, + boolean failFast, + Runnable onPageFetched, + int instances, + ActionListener listener + ) { final int sinkId = nextSinkId.incrementAndGet(); remoteSinks.put(sinkId, remoteSink); final ActionListener sinkListener = ActionListener.assertAtLeastOnce( @@ -284,7 +294,7 @@ public void onFailure(Exception e) { protected void doRun() { try (EsqlRefCountingListener refs = new EsqlRefCountingListener(sinkListener)) { for (int i = 0; i < instances; i++) { - var fetcher = new RemoteSinkFetcher(remoteSink, failFast, refs.acquire()); + var fetcher = new RemoteSinkFetcher(remoteSink, failFast, onPageFetched, refs.acquire()); fetcher.fetchPage(); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index cc983e6b83fbe..e715b94bc55e5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -297,7 +297,7 @@ public void testEarlyTermination() { final int maxAllowedRows = between(1, 100); final AtomicInteger processedRows = new AtomicInteger(0); var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis); - var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity()); + var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity()); final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() { @Override public Block eval(Page page) { @@ -335,7 +335,7 @@ public void testResumeOnEarlyFinish() throws Exception { var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"), sourceFuture); var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis); var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource()); - var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity()); + var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity()); Driver driver = new Driver(driverContext, sourceOperator, List.of(), sinkOperator, () -> {}); PlainActionFuture future = new PlainActionFuture<>(); Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java index 94a5299dd8216..744121a3807c3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java @@ -220,6 +220,7 @@ List createDriversForInput(List input, List results, boolean sourceExchanger.addRemoteSink( sinkExchanger::fetchPageAsync, randomBoolean(), + () -> {}, 1, ActionListener.noop().delegateResponse((l, e) -> { throw new AssertionError("unexpected failure", e); @@ -248,7 +249,7 @@ List createDriversForInput(List input, List results, boolean simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context), intermediateOperatorItr.next() ), - new ExchangeSinkOperator(sinkExchanger.createExchangeSink(), Function.identity()), + new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity()), () -> {} ) ); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index 363ad9c49ddfe..fffeeac4e4cc2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -97,35 +97,50 @@ public void testBasic() throws Exception { pages[i] = new Page(blockFactory.newConstantIntBlockWith(i, 2)); } ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier()); - ExchangeSink sink1 = sinkExchanger.createExchangeSink(); - ExchangeSink sink2 = sinkExchanger.createExchangeSink(); + AtomicInteger pagesAddedToSink = new AtomicInteger(); + ExchangeSink sink1 = sinkExchanger.createExchangeSink(pagesAddedToSink::incrementAndGet); + ExchangeSink sink2 = sinkExchanger.createExchangeSink(pagesAddedToSink::incrementAndGet); PlainActionFuture sourceCompletion = new PlainActionFuture<>(); ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(3, threadPool.executor(ESQL_TEST_EXECUTOR), sourceCompletion); ExchangeSource source = sourceExchanger.createExchangeSource(); - sourceExchanger.addRemoteSink(sinkExchanger::fetchPageAsync, randomBoolean(), 1, ActionListener.noop()); + AtomicInteger pagesAddedToSource = new AtomicInteger(); + sourceExchanger.addRemoteSink( + sinkExchanger::fetchPageAsync, + randomBoolean(), + pagesAddedToSource::incrementAndGet, + 1, + ActionListener.noop() + ); SubscribableListener waitForReading = source.waitForReading().listener(); assertFalse(waitForReading.isDone()); assertNull(source.pollPage()); assertTrue(sink1.waitForWriting().listener().isDone()); randomFrom(sink1, sink2).addPage(pages[0]); + assertThat(pagesAddedToSink.get(), equalTo(1)); randomFrom(sink1, sink2).addPage(pages[1]); + assertThat(pagesAddedToSink.get(), equalTo(2)); + assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(2))); // source and sink buffers can store 5 pages for (Page p : List.of(pages[2], pages[3], pages[4])) { ExchangeSink sink = randomFrom(sink1, sink2); assertBusy(() -> assertTrue(sink.waitForWriting().listener().isDone())); sink.addPage(p); } + assertThat(pagesAddedToSink.get(), equalTo(5)); + assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(3))); // sink buffer is full assertFalse(randomFrom(sink1, sink2).waitForWriting().listener().isDone()); assertBusy(() -> assertTrue(source.waitForReading().listener().isDone())); assertEquals(pages[0], source.pollPage()); assertBusy(() -> assertTrue(source.waitForReading().listener().isDone())); assertEquals(pages[1], source.pollPage()); + assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(5))); // sink can write again assertBusy(() -> assertTrue(randomFrom(sink1, sink2).waitForWriting().listener().isDone())); randomFrom(sink1, sink2).addPage(pages[5]); assertBusy(() -> assertTrue(randomFrom(sink1, sink2).waitForWriting().listener().isDone())); randomFrom(sink1, sink2).addPage(pages[6]); + assertThat(pagesAddedToSink.get(), equalTo(7)); // sink buffer is full assertFalse(randomFrom(sink1, sink2).waitForWriting().listener().isDone()); sink1.finish(); @@ -134,6 +149,7 @@ public void testBasic() throws Exception { assertBusy(() -> assertTrue(source.waitForReading().listener().isDone())); assertEquals(pages[2 + i], source.pollPage()); } + assertBusy(() -> assertThat(pagesAddedToSource.get(), equalTo(7))); // source buffer is empty assertFalse(source.waitForReading().listener().isDone()); assertBusy(() -> assertTrue(sink2.waitForWriting().listener().isDone())); @@ -340,10 +356,16 @@ public void testConcurrentWithHandlers() { sinkHandler = randomFrom(sinkHandlers); } else { sinkHandler = new ExchangeSinkHandler(blockFactory, randomExchangeBuffer(), threadPool.relativeTimeInMillisSupplier()); - sourceExchanger.addRemoteSink(sinkHandler::fetchPageAsync, randomBoolean(), randomIntBetween(1, 3), ActionListener.noop()); + sourceExchanger.addRemoteSink( + sinkHandler::fetchPageAsync, + randomBoolean(), + () -> {}, + randomIntBetween(1, 3), + ActionListener.noop() + ); sinkHandlers.add(sinkHandler); } - return sinkHandler.createExchangeSink(); + return sinkHandler.createExchangeSink(() -> {}); }; final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000); final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000); @@ -398,14 +420,14 @@ public void testExchangeSourceContinueOnFailure() { l.onResponse(new ExchangeResponse(blockFactory, page, r.finished())); })); } - }, false, instance, ActionListener.wrap(r -> { + }, false, () -> {}, instance, ActionListener.wrap(r -> { assertFalse(sinkFailed.get()); completedSinks.incrementAndGet(); }, e -> { assertTrue(sinkFailed.get()); failedSinks.incrementAndGet(); })); - return sinkHandler.createExchangeSink(); + return sinkHandler.createExchangeSink(() -> {}); }; Set actualSeqNos = runConcurrentTest( maxInputSeqNo, @@ -430,7 +452,7 @@ public void testClosingSinks() { Page p1 = new Page(block1); Page p2 = new Page(block2); ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier()); - ExchangeSink sink = sinkExchanger.createExchangeSink(); + ExchangeSink sink = sinkExchanger.createExchangeSink(() -> {}); sink.addPage(p1); sink.addPage(p2); assertFalse(sink.waitForWriting().listener().isDone()); @@ -475,7 +497,7 @@ public void testFinishEarly() throws Exception { throw new AssertionError(e); } } - }, false, between(1, 3), sinkCompleted); + }, false, () -> {}, between(1, 3), sinkCompleted); threadPool.schedule( () -> sourceHandler.finishEarly(randomBoolean(), ActionListener.noop()), TimeValue.timeValueMillis(between(0, 10)), @@ -526,6 +548,7 @@ public void testConcurrentWithTransportActions() { sourceHandler.addRemoteSink( exchange0.newRemoteSink(task, exchangeId, node0, connection), randomBoolean(), + () -> {}, randomIntBetween(1, 5), ActionListener.noop() ); @@ -535,7 +558,7 @@ public void testConcurrentWithTransportActions() { maxInputSeqNo, maxOutputSeqNo, sourceHandler::createExchangeSource, - sinkHandler::createExchangeSink + () -> sinkHandler.createExchangeSink(() -> {}) ); var expectedSeqNos = IntStream.range(0, Math.min(maxInputSeqNo, maxOutputSeqNo)).boxed().collect(Collectors.toSet()); assertThat(actualSeqNos, hasSize(expectedSeqNos.size())); @@ -601,12 +624,18 @@ public void sendResponse(TransportResponse transportResponse) { sourceHandler.addRemoteSink( exchange0.newRemoteSink(task, exchangeId, node0, connection), true, + () -> {}, randomIntBetween(1, 5), ActionListener.noop() ); Exception err = expectThrows( Exception.class, - () -> runConcurrentTest(maxSeqNo, maxSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink) + () -> runConcurrentTest( + maxSeqNo, + maxSeqNo, + sourceHandler::createExchangeSource, + () -> sinkHandler.createExchangeSink(() -> {}) + ) ); Throwable cause = ExceptionsHelper.unwrap(err, IOException.class); assertNotNull(cause); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index c17ff0475b945..5975af29f5d04 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -36,9 +36,9 @@ import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory; import org.elasticsearch.compute.operator.StringExtractOperator; -import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; +import org.elasticsearch.compute.operator.exchange.ExchangeSink; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator.ExchangeSinkOperatorFactory; -import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; +import org.elasticsearch.compute.operator.exchange.ExchangeSource; import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator.ExchangeSourceOperatorFactory; import org.elasticsearch.compute.operator.topn.TopNEncoder; import org.elasticsearch.compute.operator.topn.TopNOperator; @@ -103,6 +103,7 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -126,8 +127,8 @@ public class LocalExecutionPlanner { private final BlockFactory blockFactory; private final Settings settings; private final Configuration configuration; - private final ExchangeSourceHandler exchangeSourceHandler; - private final ExchangeSinkHandler exchangeSinkHandler; + private final Supplier exchangeSourceSupplier; + private final Supplier exchangeSinkSupplier; private final EnrichLookupService enrichLookupService; private final LookupFromIndexService lookupFromIndexService; private final PhysicalOperationProviders physicalOperationProviders; @@ -140,8 +141,8 @@ public LocalExecutionPlanner( BlockFactory blockFactory, Settings settings, Configuration configuration, - ExchangeSourceHandler exchangeSourceHandler, - ExchangeSinkHandler exchangeSinkHandler, + Supplier exchangeSourceSupplier, + Supplier exchangeSinkSupplier, EnrichLookupService enrichLookupService, LookupFromIndexService lookupFromIndexService, PhysicalOperationProviders physicalOperationProviders @@ -152,8 +153,8 @@ public LocalExecutionPlanner( this.bigArrays = bigArrays; this.blockFactory = blockFactory; this.settings = settings; - this.exchangeSourceHandler = exchangeSourceHandler; - this.exchangeSinkHandler = exchangeSinkHandler; + this.exchangeSourceSupplier = exchangeSourceSupplier; + this.exchangeSinkSupplier = exchangeSinkSupplier; this.enrichLookupService = enrichLookupService; this.lookupFromIndexService = lookupFromIndexService; this.physicalOperationProviders = physicalOperationProviders; @@ -323,7 +324,7 @@ private PhysicalOperation planExchange(ExchangeExec exchangeExec, LocalExecution } private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalExecutionPlannerContext context) { - Objects.requireNonNull(exchangeSinkHandler, "ExchangeSinkHandler wasn't provided"); + Objects.requireNonNull(exchangeSinkSupplier, "ExchangeSinkHandler wasn't provided"); var child = exchangeSink.child(); PhysicalOperation source = plan(child, context); @@ -332,11 +333,11 @@ private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalE ? Function.identity() : alignPageToAttributes(exchangeSink.output(), source.layout); - return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkHandler::createExchangeSink, transformer), source.layout); + return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier, transformer), source.layout); } private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) { - Objects.requireNonNull(exchangeSourceHandler, "ExchangeSourceHandler wasn't provided"); + Objects.requireNonNull(exchangeSourceSupplier, "ExchangeSourceHandler wasn't provided"); var builder = new Layout.Builder(); builder.append(exchangeSource.output()); @@ -344,7 +345,7 @@ private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, var l = builder.build(); var layout = exchangeSource.isIntermediateAgg() ? new ExchangeLayout(l) : l; - return PhysicalOperation.fromSource(new ExchangeSourceOperatorFactory(exchangeSourceHandler::createExchangeSource), layout); + return PhysicalOperation.fromSource(new ExchangeSourceOperatorFactory(exchangeSourceSupplier), layout); } private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerContext context) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 20211323b3afb..19ed77405daa2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -92,6 +92,7 @@ void startComputeOnRemoteCluster( exchangeSource.addRemoteSink( remoteSink, true, + () -> {}, queryPragmas.concurrentExchangeClients(), computeListener.acquireAvoid() ); @@ -209,8 +210,8 @@ void runComputeOnRemoteCluster( List.of(), configuration, configuration.newFoldContext(), - exchangeSource, - exchangeSink + exchangeSource::createExchangeSource, + () -> exchangeSink.createExchangeSink(() -> {}) ), coordinatorPlan, computeListener.acquireCompute() diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java index 4e178bb740757..82943d23581fd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java @@ -7,14 +7,15 @@ package org.elasticsearch.xpack.esql.plugin; -import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; -import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; +import org.elasticsearch.compute.operator.exchange.ExchangeSink; +import org.elasticsearch.compute.operator.exchange.ExchangeSource; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.session.Configuration; import java.util.List; +import java.util.function.Supplier; record ComputeContext( String sessionId, @@ -22,8 +23,8 @@ record ComputeContext( List searchContexts, Configuration configuration, FoldContext foldCtx, - ExchangeSourceHandler exchangeSource, - ExchangeSinkHandler exchangeSink + Supplier exchangeSourceSupplier, + Supplier exchangeSinkSupplier ) { List searchExecutionContexts() { return searchContexts.stream().map(SearchContext::getSearchExecutionContext).toList(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 75619958c5228..de6fc082eb243 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -224,7 +224,15 @@ public void execute( ) { runCompute( rootTask, - new ComputeContext(sessionId, LOCAL_CLUSTER, List.of(), configuration, foldContext, exchangeSource, null), + new ComputeContext( + sessionId, + LOCAL_CLUSTER, + List.of(), + configuration, + foldContext, + exchangeSource::createExchangeSource, + null + ), coordinatorPlan, localListener.acquireCompute() ); @@ -372,8 +380,8 @@ public SourceProvider createSourceProvider() { blockFactory, clusterService.getSettings(), context.configuration(), - context.exchangeSource(), - context.exchangeSink(), + context.exchangeSourceSupplier(), + context.exchangeSinkSupplier(), enrichLookupService, lookupFromIndexService, new EsPhysicalOperationProviders(context.foldCtx(), contexts, searchService.getIndicesService().getAnalysis()) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 5b4f3e8cbffb1..7020932819421 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -130,6 +130,7 @@ void startComputeOnDataNodes( exchangeSource.addRemoteSink( remoteSink, true, + () -> {}, queryPragmas.concurrentExchangeClients(), computeListener.acquireAvoid() ); @@ -330,7 +331,7 @@ private class DataNodeRequestExecutor { this.exchangeSink = exchangeSink; this.computeListener = computeListener; this.maxConcurrentShards = maxConcurrentShards; - this.blockingSink = exchangeSink.createExchangeSink(); + this.blockingSink = exchangeSink.createExchangeSink(() -> {}); } void start() { @@ -376,7 +377,7 @@ public void onFailure(Exception e) { configuration, configuration.newFoldContext(), null, - exchangeSink + () -> exchangeSink.createExchangeSink(() -> {}) ); computeService.runCompute(parentTask, computeContext, request.plan(), batchListener); }, batchListener::onFailure)); @@ -428,7 +429,7 @@ private void runComputeOnDataNode( () -> exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled())) ); var exchangeSource = new ExchangeSourceHandler(1, esqlExecutor, computeListener.acquireAvoid()); - exchangeSource.addRemoteSink(internalSink::fetchPageAsync, true, 1, ActionListener.noop()); + exchangeSource.addRemoteSink(internalSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop()); var reductionListener = computeListener.acquireCompute(); computeService.runCompute( task, @@ -438,8 +439,8 @@ private void runComputeOnDataNode( List.of(), request.configuration(), new FoldContext(request.pragmas().foldLimit().getBytes()), - exchangeSource, - externalSink + exchangeSource::createExchangeSource, + () -> externalSink.createExchangeSink(() -> {}) ), reducePlan, ActionListener.wrap(resp -> { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index ae9c12fd7c711..02e683542df7c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -626,8 +626,8 @@ void executeSubPlan( blockFactory, randomNodeSettings(), configuration, - exchangeSource, - exchangeSink, + exchangeSource::createExchangeSource, + () -> exchangeSink.createExchangeSink(() -> {}), Mockito.mock(EnrichLookupService.class), Mockito.mock(LookupFromIndexService.class), physicalOperationProviders @@ -653,6 +653,7 @@ void executeSubPlan( exchangeSource.addRemoteSink( exchangeSink::fetchPageAsync, Randomness.get().nextBoolean(), + () -> {}, randomIntBetween(1, 3), ActionListener.noop().delegateResponse((l, e) -> { throw new AssertionError("expected no failure", e); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index a51ad384d9488..23e0937380f34 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -7587,6 +7587,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP var plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(EstimatesRowSize.estimateRowSize(0, plan), config); plan = useDataNodePlan ? plans.v2() : plans.v1(); plan = PlannerUtils.localPlan(List.of(), config, FoldContext.small(), plan); + ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(null, 10, () -> 10); LocalExecutionPlanner planner = new LocalExecutionPlanner( "test", "", @@ -7595,8 +7596,8 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP TestBlockFactory.getNonBreakingInstance(), Settings.EMPTY, config, - new ExchangeSourceHandler(10, null, null), - new ExchangeSinkHandler(null, 10, () -> 10), + new ExchangeSourceHandler(10, null, null)::createExchangeSource, + () -> exchangeSinkHandler.createExchangeSink(() -> {}), null, null, new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null)