diff --git a/docs/changelog/119580.yaml b/docs/changelog/119580.yaml new file mode 100644 index 0000000000000..ba437d2691c48 --- /dev/null +++ b/docs/changelog/119580.yaml @@ -0,0 +1,5 @@ +pr: 119580 +summary: Do not serialize `EsIndex` in plan +area: ES|QL +type: enhancement +issues: [] diff --git a/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/NetworkAccessCheckActions.java b/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/NetworkAccessCheckActions.java index f27dd9a0d36cb..49cf586ea1285 100644 --- a/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/NetworkAccessCheckActions.java +++ b/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/NetworkAccessCheckActions.java @@ -20,9 +20,6 @@ import java.net.SocketException; import java.net.URI; import java.net.URISyntaxException; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; @@ -84,37 +81,6 @@ static void urlOpenConnectionWithProxy() throws URISyntaxException, IOException assert urlConnection != null; } - static void httpClientSend() throws InterruptedException { - try (HttpClient httpClient = HttpClient.newBuilder().build()) { - // Shutdown the client, so the send action will shortcut before actually executing any network operation - // (but after it run our check in the prologue) - httpClient.shutdown(); - try { - httpClient.send(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding()); - } catch (IOException e) { - // Expected, since we shut down the client. - // "send" will be called and exercise the Entitlement check, we don't care if it fails afterward for this known reason. - } - } - } - - static void httpClientSendAsync() { - try (HttpClient httpClient = HttpClient.newBuilder().build()) { - // Shutdown the client, so the send action will return before actually executing any network operation - // (but after it run our check in the prologue) - httpClient.shutdown(); - var future = httpClient.sendAsync( - HttpRequest.newBuilder(URI.create("http://localhost")).build(), - HttpResponse.BodyHandlers.discarding() - ); - assert future.isCompletedExceptionally(); - future.exceptionally(ex -> { - assert ex instanceof IOException; - return null; - }); - } - } - static void createLDAPCertStore() throws NoSuchAlgorithmException { try { // We pass down null params to provoke a InvalidAlgorithmParameterException diff --git a/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/RestEntitlementsCheckAction.java b/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/RestEntitlementsCheckAction.java index 7c8e23343683a..d60c4b5692211 100644 --- a/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/RestEntitlementsCheckAction.java +++ b/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/RestEntitlementsCheckAction.java @@ -160,8 +160,8 @@ static CheckAction alwaysDenied(CheckedRunnable action) { entry("server_socket_accept", forPlugins(NetworkAccessCheckActions::serverSocketAccept)), entry("url_open_connection_proxy", forPlugins(NetworkAccessCheckActions::urlOpenConnectionWithProxy)), - entry("http_client_send", forPlugins(NetworkAccessCheckActions::httpClientSend)), - entry("http_client_send_async", forPlugins(NetworkAccessCheckActions::httpClientSendAsync)), + entry("http_client_send", forPlugins(VersionSpecificNetworkChecks::httpClientSend)), + entry("http_client_send_async", forPlugins(VersionSpecificNetworkChecks::httpClientSendAsync)), entry("create_ldap_cert_store", forPlugins(NetworkAccessCheckActions::createLDAPCertStore)), entry("server_socket_channel_bind", forPlugins(NetworkAccessCheckActions::serverSocketChannelBind)), diff --git a/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java b/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java index e1e0b9e52f510..df7777b6614aa 100644 --- a/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java +++ b/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java @@ -9,6 +9,26 @@ package org.elasticsearch.entitlement.qa.common; +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; + class VersionSpecificNetworkChecks { static void createInetAddressResolverProvider() {} + + static void httpClientSend() throws InterruptedException { + HttpClient httpClient = HttpClient.newBuilder().build(); + try { + httpClient.send(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding()); + } catch (IOException e) { + // Expected, the send action may fail with these parameters (but after it run the entitlement check in the prologue) + } + } + + static void httpClientSendAsync() { + HttpClient httpClient = HttpClient.newBuilder().build(); + httpClient.sendAsync(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding()); + } } diff --git a/libs/entitlement/qa/common/src/main18/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java b/libs/entitlement/qa/common/src/main18/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java index 0ead32ec480ee..6229b7f8e6cfc 100644 --- a/libs/entitlement/qa/common/src/main18/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java +++ b/libs/entitlement/qa/common/src/main18/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java @@ -9,6 +9,11 @@ package org.elasticsearch.entitlement.qa.common; +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.net.spi.InetAddressResolver; import java.net.spi.InetAddressResolverProvider; @@ -26,4 +31,18 @@ public String name() { } }; } + + static void httpClientSend() throws InterruptedException { + HttpClient httpClient = HttpClient.newBuilder().build(); + try { + httpClient.send(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding()); + } catch (IOException e) { + // Expected, the send action may fail with these parameters (but after it run the entitlement check in the prologue) + } + } + + static void httpClientSendAsync() { + HttpClient httpClient = HttpClient.newBuilder().build(); + httpClient.sendAsync(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding()); + } } diff --git a/libs/entitlement/qa/common/src/main21/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java b/libs/entitlement/qa/common/src/main21/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java new file mode 100644 index 0000000000000..8dcee7e7603de --- /dev/null +++ b/libs/entitlement/qa/common/src/main21/java/org/elasticsearch/entitlement/qa/common/VersionSpecificNetworkChecks.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.entitlement.qa.common; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.spi.InetAddressResolver; +import java.net.spi.InetAddressResolverProvider; + +class VersionSpecificNetworkChecks { + static void createInetAddressResolverProvider() { + var x = new InetAddressResolverProvider() { + @Override + public InetAddressResolver get(Configuration configuration) { + return null; + } + + @Override + public String name() { + return "TEST"; + } + }; + } + + static void httpClientSend() throws InterruptedException { + try (HttpClient httpClient = HttpClient.newBuilder().build()) { + // Shutdown the client, so the send action will shortcut before actually executing any network operation + // (but after it run our check in the prologue) + httpClient.shutdown(); + try { + httpClient.send(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding()); + } catch (IOException e) { + // Expected, since we shut down the client + } + } + } + + static void httpClientSendAsync() { + try (HttpClient httpClient = HttpClient.newBuilder().build()) { + // Shutdown the client, so the send action will return before actually executing any network operation + // (but after it run our check in the prologue) + httpClient.shutdown(); + var future = httpClient.sendAsync( + HttpRequest.newBuilder(URI.create("http://localhost")).build(), + HttpResponse.BodyHandlers.discarding() + ); + assert future.isCompletedExceptionally(); + future.exceptionally(ex -> { + assert ex instanceof IOException; + return null; + }); + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 0902e707b706e..88b4518c8de89 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -75,13 +75,15 @@ public void addTracingHandler(ChunkHandler chunkHandler) { @Override public void next() { - assert closing == false : "cannot request next chunk on closing stream"; assert handler != null : "handler must be set before requesting next chunk"; requestContext = threadContext.newStoredContext(); channel.eventLoop().submit(() -> { activityTracker.startActivity(); requested = true; try { + if (closing) { + return; + } if (buf == null) { channel.read(); } else { diff --git a/muted-tests.yml b/muted-tests.yml index 7c7af1994442b..6b17ea3bb7bcc 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -178,9 +178,6 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/118374 - class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT issue: https://github.com/elastic/elasticsearch/issues/118238 -- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests - method: testInvalidJSON - issue: https://github.com/elastic/elasticsearch/issues/116521 - class: org.elasticsearch.xpack.ccr.rest.ShardChangesRestIT method: testShardChangesNoOperation issue: https://github.com/elastic/elasticsearch/issues/118800 @@ -244,11 +241,19 @@ tests: - class: org.elasticsearch.search.ccs.CrossClusterIT method: testCancel issue: https://github.com/elastic/elasticsearch/issues/108061 -- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT - method: testOversizedChunkedEncoding - issue: https://github.com/elastic/elasticsearch/issues/120444 - class: org.elasticsearch.xpack.logsdb.seqno.RetentionLeaseRestIT issue: https://github.com/elastic/elasticsearch/issues/120434 +- class: org.elasticsearch.entitlement.qa.EntitlementsAllowedIT + method: testCheckActionWithPolicyPass {pathPrefix=allowed actionName=create_ldap_cert_store} + issue: https://github.com/elastic/elasticsearch/issues/120422 +- class: org.elasticsearch.entitlement.qa.EntitlementsAllowedIT + method: testCheckActionWithPolicyPass {pathPrefix=allowed_nonmodular actionName=create_ldap_cert_store} + issue: https://github.com/elastic/elasticsearch/issues/120423 +- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests + method: testInvalidJSON + issue: https://github.com/elastic/elasticsearch/issues/120482 +- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT + issue: https://github.com/elastic/elasticsearch/issues/120497 # Examples: # diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 0b5a8805be8c7..148c37b5bd177 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -158,7 +158,8 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_PROFILE_ROWS_PROCESSED = def(8_824_00_0); public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_825_00_0); public static final TransportVersion REVERT_BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_826_00_0); - public static final TransportVersion ADD_INDEX_BLOCK_TWO_PHASE = def(8_827_00_0); + public static final TransportVersion ESQL_SKIP_ES_INDEX_SERIALIZATION = def(8_827_00_0); + public static final TransportVersion ADD_INDEX_BLOCK_TWO_PHASE = def(8_828_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 73e6a0306247d..aeea0a5d65c8a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -10,7 +10,6 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -26,6 +25,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -43,8 +43,7 @@ import org.elasticsearch.transport.Transport; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -98,7 +97,6 @@ abstract class AbstractSearchAsyncAction exten protected final GroupShardsIterator toSkipShardsIts; protected final GroupShardsIterator shardsIts; private final SearchShardIterator[] shardIterators; - private final Map shardIndexMap; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); private final int maxConcurrentRequestsPerNode; @@ -142,17 +140,11 @@ abstract class AbstractSearchAsyncAction exten this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators); this.shardsIts = new GroupShardsIterator<>(iterators); - // we compute the shard index based on the natural order of the shards + this.shardIterators = iterators.toArray(new SearchShardIterator[0]); + // we later compute the shard index based on the natural order of the shards // that participate in the search request. This means that this number is // consistent between two requests that target the same shards. - Map shardMap = new HashMap<>(); - List searchIterators = new ArrayList<>(iterators); - CollectionUtil.timSort(searchIterators); - for (int i = 0; i < searchIterators.size(); i++) { - shardMap.put(searchIterators.get(i), i); - } - this.shardIndexMap = Collections.unmodifiableMap(shardMap); - this.shardIterators = searchIterators.toArray(SearchShardIterator[]::new); + Arrays.sort(shardIterators); // we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up // it's number of active shards but use 1 as the default if no replica of a shard is active at this point. @@ -236,6 +228,10 @@ protected final void run() { assert iterator.skip(); skipShard(iterator); } + final Map shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length); + for (int i = 0; i < shardIterators.length; i++) { + shardIndexMap.put(shardIterators[i], i); + } if (shardsIts.size() > 0) { doCheckNoMissingShards(getName(), request, shardsIts); for (int i = 0; i < shardsIts.size(); i++) { diff --git a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index 250cb81183899..f3d58fe4b051f 100644 --- a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.function.Consumer; +import java.util.function.Supplier; import static org.elasticsearch.core.Strings.format; @@ -349,4 +350,16 @@ public void afterFilesRestoredFromRepository(IndexShard indexShard) { } } } + + @Override + public void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier> onPermitAcquiredListenerSupplier) { + for (IndexEventListener listener : listeners) { + try { + listener.onAcquirePrimaryOperationPermit(indexShard, onPermitAcquiredListenerSupplier); + } catch (Exception e) { + logger.warn(() -> "[" + indexShard.shardId() + "] failed to invoke the listener on acquiring a primary permit", e); + throw e; + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java index 4e55a2e9599d5..e5104948cc426 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -17,6 +17,8 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import java.util.function.Supplier; + /** * An index event listener is the primary extension point for plugins and build-in services * to react / listen to per-index and per-shard events. These listeners are registered per-index @@ -190,4 +192,14 @@ default void afterIndexShardRecovery(IndexShard indexShard, ActionListener * @param indexShard the shard that is recovering */ default void afterFilesRestoredFromRepository(IndexShard indexShard) {} + + /** + * Called when a single primary permit is acquired for the given shard (see + * {@link IndexShard#acquirePrimaryOperationPermit(ActionListener, java.util.concurrent.Executor)}). + * + * @param indexShard the shard of which a primary permit is requested + * @param onPermitAcquiredListenerSupplier call this immediately to get a listener when the permit is acquired. The listener must be + * completed in order for the permit to be given to the acquiring operation. + */ + default void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier> onPermitAcquiredListenerSupplier) {} } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f52ea41d811c0..ab1c936d1c469 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.replication.PendingReplicationActions; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -189,7 +190,6 @@ import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.elasticsearch.index.shard.IndexShard.PrimaryPermitCheck.CHECK_PRIMARY_MODE; public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { @@ -779,28 +779,10 @@ public void relocated( final String targetAllocationId, final BiConsumer> consumer, final ActionListener listener - ) throws IllegalIndexShardStateException, IllegalStateException { - relocated(targetNodeId, targetAllocationId, consumer, listener, null); - } - - /** - * Provides an variant of {@link IndexShard#relocated(String, String, BiConsumer, ActionListener, Releasable)} with an option - * to relocate the shard under externally acquired primary permits. - * - * @param acquiredPrimaryPermits if null, waits until all the primary permits are acquired, otherwise it calls the consumer immediately - */ - public void relocated( - final String targetNodeId, - final String targetAllocationId, - final BiConsumer> consumer, - final ActionListener listener, - @Nullable final Releasable acquiredPrimaryPermits ) throws IllegalIndexShardStateException, IllegalStateException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; - assert acquiredPrimaryPermits == null || indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED - : "external primary permits are provided but not held by the shard"; try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { - ActionListener onAcquired = new ActionListener<>() { + indexShardOperationPermits.blockOperations(new ActionListener<>() { @Override public void onResponse(Releasable releasable) { boolean success = false; @@ -878,13 +860,8 @@ public void onFailure(Exception e) { listener.onFailure(e); } } - }; - if (acquiredPrimaryPermits == null) { - // Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it - indexShardOperationPermits.blockOperations(onAcquired, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); - } else { - ActionListener.completeWith(onAcquired, () -> acquiredPrimaryPermits); - } + }, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by + // CancellableThreads and we want to be able to interrupt it } } @@ -3592,48 +3569,35 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { ); } - /** - * Check to run before running the primary permit operation - */ - public enum PrimaryPermitCheck { - CHECK_PRIMARY_MODE, - /** - * IMPORTANT: Currently intented to be used only for acquiring primary permits during the recovery of hollow shards. - * Don't disable primary mode checks unless you're really sure. - */ - NONE - } - /** * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided * ActionListener will then be called using the provided executor. */ public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, Executor executorOnDelay) { - acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE); + acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false); } public void acquirePrimaryOperationPermit( ActionListener onPermitAcquired, Executor executorOnDelay, boolean forceExecution - ) { - acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, forceExecution, CHECK_PRIMARY_MODE); - } - - public void acquirePrimaryOperationPermit( - ActionListener onPermitAcquired, - Executor executorOnDelay, - boolean forceExecution, - PrimaryPermitCheck primaryPermitCheck ) { verifyNotClosed(); assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting; - indexShardOperationPermits.acquire( - wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired), - executorOnDelay, - forceExecution - ); + + ActionListener onPermitAcquiredWrapped = onPermitAcquired.delegateFailureAndWrap((delegate, releasable) -> { + final ActionListener wrappedListener = indexShardOperationPermits.wrapContextPreservingActionListener( + delegate, + executorOnDelay, + forceExecution + ); + try (var listeners = new RefCountingListener(wrappedListener.map(unused -> releasable))) { + indexEventListener.onAcquirePrimaryOperationPermit(this, () -> listeners.acquire()); + } + }); + + indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquiredWrapped), executorOnDelay, forceExecution); } public boolean isPrimaryMode() { @@ -3641,51 +3605,33 @@ public boolean isPrimaryMode() { return replicationTracker.isPrimaryMode(); } - public void acquireAllPrimaryOperationsPermits(final ActionListener onPermitAcquired, final TimeValue timeout) { - acquireAllPrimaryOperationsPermits(onPermitAcquired, timeout, CHECK_PRIMARY_MODE); - } - /** * Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called. * It is the responsibility of the caller to close the {@link Releasable}. */ - public void acquireAllPrimaryOperationsPermits( - final ActionListener onPermitAcquired, - final TimeValue timeout, - final PrimaryPermitCheck primaryPermitCheck - ) { + public void acquireAllPrimaryOperationsPermits(final ActionListener onPermitAcquired, final TimeValue timeout) { verifyNotClosed(); assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting; - asyncBlockOperations( - wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired), - timeout.duration(), - timeout.timeUnit() - ); + asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit()); } /** - * Wraps the action to run on a primary after acquiring permit. + * Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before + * executing the action. * - * @param primaryPermitCheck check to run before the primary mode operation * @param listener the listener to wrap * @return the wrapped listener */ - private ActionListener wrapPrimaryOperationPermitListener( - final PrimaryPermitCheck primaryPermitCheck, - final ActionListener listener - ) { - return switch (primaryPermitCheck) { - case CHECK_PRIMARY_MODE -> listener.delegateFailure((l, r) -> { - if (isPrimaryMode()) { - l.onResponse(r); - } else { - r.close(); - l.onFailure(new ShardNotInPrimaryModeException(shardId, state)); - } - }); - case NONE -> listener; - }; + private ActionListener wrapPrimaryOperationPermitListener(final ActionListener listener) { + return listener.delegateFailure((l, r) -> { + if (isPrimaryMode()) { + l.onResponse(r); + } else { + r.close(); + l.onFailure(new ShardNotInPrimaryModeException(shardId, state)); + } + }); } private void asyncBlockOperations(ActionListener onPermitAcquired, long timeout, TimeUnit timeUnit) { @@ -3723,7 +3669,7 @@ public void runUnderPrimaryPermit(final Runnable runnable, final Consumer void bumpPrimaryTerm( diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 94ac4f4aca096..79f5d054df30d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -216,32 +216,7 @@ private void innerAcquire( try { synchronized (this) { if (queuedBlockOperations > 0) { - final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); - final ActionListener wrappedListener; - if (executorOnDelay != null) { - wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired).delegateFailure( - (l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) { - @Override - public boolean isForceExecution() { - return forceExecution; - } - - @Override - protected void doRun() { - listener.onResponse(r); - } - - @Override - public void onRejection(Exception e) { - IOUtils.closeWhileHandlingException(r); - super.onRejection(e); - } - }) - ); - } else { - wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired); - } - delayedOperations.add(wrappedListener); + delayedOperations.add(wrapContextPreservingActionListener(onAcquired, executorOnDelay, forceExecution)); return; } else { releasable = acquire(); @@ -255,6 +230,39 @@ public void onRejection(Exception e) { onAcquired.onResponse(releasable); } + public ActionListener wrapContextPreservingActionListener( + ActionListener listener, + @Nullable final Executor executorOnDelay, + final boolean forceExecution + ) { + final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); + final ActionListener wrappedListener; + if (executorOnDelay != null) { + wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener).delegateFailure( + (l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) { + @Override + public boolean isForceExecution() { + return forceExecution; + } + + @Override + protected void doRun() { + listener.onResponse(r); + } + + @Override + public void onRejection(Exception e) { + IOUtils.closeWhileHandlingException(r); + super.onRejection(e); + } + }) + ); + } else { + wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener); + } + return wrappedListener; + } + private Releasable acquire() throws InterruptedException { assert Thread.holdsLock(this); if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0bbf63b07c4cb..7d436ab5d8d22 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -790,21 +790,6 @@ public void onFailure(final Exception e) { } }, TimeValue.timeValueSeconds(30)); latch.await(); - - // It's possible to acquire permits if we skip the primary mode check - var permitAcquiredLatch = new CountDownLatch(1); - indexShard.acquirePrimaryOperationPermit(ActionListener.wrap(r -> { - r.close(); - permitAcquiredLatch.countDown(); - }, Assert::assertNotNull), EsExecutors.DIRECT_EXECUTOR_SERVICE, false, IndexShard.PrimaryPermitCheck.NONE); - safeAwait(permitAcquiredLatch); - - var allPermitsAcquiredLatch = new CountDownLatch(1); - indexShard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> { - r.close(); - allPermitsAcquiredLatch.countDown(); - }, Assert::assertNotNull), TimeValue.timeValueSeconds(30), IndexShard.PrimaryPermitCheck.NONE); - safeAwait(allPermitsAcquiredLatch); } if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) { diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DeprecationChecks.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DeprecationChecks.java index 0b9a538d505c9..1bc040418bf07 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DeprecationChecks.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DeprecationChecks.java @@ -88,7 +88,8 @@ private DeprecationChecks() {} NodeDeprecationChecks::checkEqlEnabledSetting, NodeDeprecationChecks::checkNodeAttrData, NodeDeprecationChecks::checkWatcherBulkConcurrentRequestsSetting, - NodeDeprecationChecks::checkTracingApmSettings + NodeDeprecationChecks::checkTracingApmSettings, + NodeDeprecationChecks::checkSourceModeInComponentTemplates ); static List> INDEX_SETTINGS_CHECKS = List.of( @@ -97,8 +98,7 @@ private DeprecationChecks() {} IndexDeprecationChecks::checkIndexDataPath, IndexDeprecationChecks::storeTypeSettingCheck, IndexDeprecationChecks::frozenIndexSettingCheck, - IndexDeprecationChecks::deprecatedCamelCasePattern, - IndexDeprecationChecks::checkSourceModeInMapping + IndexDeprecationChecks::deprecatedCamelCasePattern ); static List> DATA_STREAM_CHECKS = List.of( diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java index de06e270a867e..1bef1464152db 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java @@ -15,7 +15,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.engine.frozen.FrozenEngine; -import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; @@ -203,31 +202,6 @@ static List findInPropertiesRecursively( return issues; } - static DeprecationIssue checkSourceModeInMapping(IndexMetadata indexMetadata, ClusterState clusterState) { - if (SourceFieldMapper.onOrAfterDeprecateModeVersion(indexMetadata.getCreationVersion())) { - boolean[] useSourceMode = { false }; - fieldLevelMappingIssue(indexMetadata, ((mappingMetadata, sourceAsMap) -> { - Object source = sourceAsMap.get("_source"); - if (source instanceof Map sourceMap) { - if (sourceMap.containsKey("mode")) { - useSourceMode[0] = true; - } - } - })); - if (useSourceMode[0]) { - return new DeprecationIssue( - DeprecationIssue.Level.CRITICAL, - SourceFieldMapper.DEPRECATION_WARNING, - "https://github.com/elastic/elasticsearch/pull/117172", - SourceFieldMapper.DEPRECATION_WARNING, - false, - null - ); - } - } - return null; - } - static DeprecationIssue deprecatedCamelCasePattern(IndexMetadata indexMetadata, ClusterState clusterState) { List fields = new ArrayList<>(); fieldLevelMappingIssue( diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodeDeprecationChecks.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodeDeprecationChecks.java index b6fff5a82f0cd..f1a1f91ba35a0 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodeDeprecationChecks.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodeDeprecationChecks.java @@ -9,12 +9,15 @@ import org.elasticsearch.action.admin.cluster.node.info.PluginsAndModules; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.common.settings.SecureSetting; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.script.ScriptService; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; @@ -1012,4 +1015,43 @@ static DeprecationIssue checkTracingApmSettings( DeprecationIssue.Level.CRITICAL ); } + + static DeprecationIssue checkSourceModeInComponentTemplates( + final Settings settings, + final PluginsAndModules pluginsAndModules, + final ClusterState clusterState, + final XPackLicenseState licenseState + ) { + List templates = new ArrayList<>(); + var templateNames = clusterState.metadata().componentTemplates().keySet(); + for (String templateName : templateNames) { + ComponentTemplate template = clusterState.metadata().componentTemplates().get(templateName); + if (template.template().mappings() != null) { + var sourceAsMap = (Map) XContentHelper.convertToMap(template.template().mappings().uncompressed(), true) + .v2() + .get("_doc"); + if (sourceAsMap != null) { + Object source = sourceAsMap.get("_source"); + if (source instanceof Map sourceMap) { + if (sourceMap.containsKey("mode")) { + templates.add(templateName); + } + } + } + } + + } + if (templates.isEmpty()) { + return null; + } + Collections.sort(templates); + return new DeprecationIssue( + DeprecationIssue.Level.CRITICAL, + SourceFieldMapper.DEPRECATION_WARNING, + "https://github.com/elastic/elasticsearch/pull/117172", + SourceFieldMapper.DEPRECATION_WARNING + " Affected component templates: [" + String.join(", ", templates) + "]", + false, + null + ); + } } diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/NodeDeprecationChecksTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/NodeDeprecationChecksTests.java index 3aaee0e5cdb52..7fe2be2736ea8 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/NodeDeprecationChecksTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/NodeDeprecationChecksTests.java @@ -11,23 +11,29 @@ import org.elasticsearch.action.admin.cluster.node.info.PluginsAndModules; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -832,4 +838,42 @@ public void testCheckNodeAttrData() { ); assertThat(issues, hasItem(expected)); } + + public void testCheckSourceModeInComponentTemplates() throws IOException { + Template template = Template.builder().mappings(CompressedXContent.fromJSON(""" + { "_doc": { "_source": { "mode": "stored"} } }""")).build(); + ComponentTemplate componentTemplate = new ComponentTemplate(template, 1L, new HashMap<>()); + + Template template2 = Template.builder().mappings(CompressedXContent.fromJSON(""" + { "_doc": { "_source": { "enabled": false} } }""")).build(); + ComponentTemplate componentTemplate2 = new ComponentTemplate(template2, 1L, new HashMap<>()); + + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata( + Metadata.builder() + .componentTemplates( + Map.of("my-template-1", componentTemplate, "my-template-2", componentTemplate, "my-template-3", componentTemplate2) + ) + ) + .build(); + + final List issues = DeprecationChecks.filterChecks( + DeprecationChecks.NODE_SETTINGS_CHECKS, + c -> c.apply( + Settings.EMPTY, + new PluginsAndModules(Collections.emptyList(), Collections.emptyList()), + clusterState, + new XPackLicenseState(() -> 0) + ) + ); + final DeprecationIssue expected = new DeprecationIssue( + DeprecationIssue.Level.CRITICAL, + SourceFieldMapper.DEPRECATION_WARNING, + "https://github.com/elastic/elasticsearch/pull/117172", + SourceFieldMapper.DEPRECATION_WARNING + " Affected component templates: [my-template-1, my-template-2]", + false, + null + ); + assertThat(issues, hasItem(expected)); + } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 852cefe83f989..f3b2ea0d864ff 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -215,7 +215,7 @@ public static Range rangeOf(Expression value, Expression lower, boolean includeL } public static EsRelation relation() { - return new EsRelation(EMPTY, new EsIndex(randomAlphaOfLength(8), emptyMap()), IndexMode.STANDARD, randomBoolean()); + return new EsRelation(EMPTY, new EsIndex(randomAlphaOfLength(8), emptyMap()), IndexMode.STANDARD); } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index a11b511cb83b7..fc1b7f6329ab3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -269,7 +269,13 @@ private LogicalPlan resolveIndex(UnresolvedRelation plan, IndexResolution indexR } var attributes = mappingAsAttributes(plan.source(), esIndex.mapping()); attributes.addAll(plan.metadataFields()); - return new EsRelation(plan.source(), esIndex, attributes.isEmpty() ? NO_FIELDS : attributes, plan.indexMode()); + return new EsRelation( + plan.source(), + esIndex.name(), + plan.indexMode(), + esIndex.indexNameWithModes(), + attributes.isEmpty() ? NO_FIELDS : attributes + ); } } @@ -1371,9 +1377,13 @@ private LogicalPlan doRule(LogicalPlan plan) { } if (missing.isEmpty() == false) { - List newOutput = new ArrayList<>(esr.output()); - newOutput.addAll(missing); - return new EsRelation(esr.source(), esr.index(), newOutput, esr.indexMode(), esr.frozen()); + return new EsRelation( + esr.source(), + esr.indexPattern(), + esr.indexMode(), + esr.indexNameWithModes(), + CollectionUtils.combine(esr.output(), missing) + ); } return esr; }); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java index 4b3077aebba7d..512a7253b813f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java @@ -102,13 +102,13 @@ public LogicalPlan apply(LogicalPlan plan) { p = new Eval(eval.source(), eval.child(), remaining); } } - } else if (p instanceof EsRelation esRelation && esRelation.indexMode() == IndexMode.LOOKUP) { + } else if (p instanceof EsRelation esr && esr.indexMode() == IndexMode.LOOKUP) { // Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway. // However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index, // it works differently as we extract all fields (other than the join key) that the EsRelation has. - var remaining = removeUnused(esRelation.output(), used); + var remaining = removeUnused(esr.output(), used); if (remaining != null) { - p = new EsRelation(esRelation.source(), esRelation.index(), remaining, esRelation.indexMode(), esRelation.frozen()); + p = new EsRelation(esr.source(), esr.indexPattern(), esr.indexMode(), esr.indexNameWithModes(), remaining); } } } while (recheck); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SkipQueryOnEmptyMappings.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SkipQueryOnEmptyMappings.java index a8672b64c8b98..d57a3de21b4a6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SkipQueryOnEmptyMappings.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SkipQueryOnEmptyMappings.java @@ -16,6 +16,6 @@ public final class SkipQueryOnEmptyMappings extends OptimizerRules.OptimizerRule @Override protected LogicalPlan rule(EsRelation plan) { - return plan.index().concreteIndices().isEmpty() ? new LocalRelation(plan.source(), plan.output(), LocalSupplier.EMPTY) : plan; + return plan.concreteIndices().isEmpty() ? new LocalRelation(plan.source(), plan.output(), LocalSupplier.EMPTY) : plan; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateMetricsAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateMetricsAggregate.java index 2879db5042f4a..5f34899875efd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateMetricsAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateMetricsAggregate.java @@ -220,7 +220,7 @@ private static Aggregate toStandardAggregate(Aggregate metrics) { if (attributes.stream().noneMatch(a -> a.name().equals(MetadataAttribute.TIMESTAMP_FIELD))) { attributes.removeIf(a -> a.name().equals(MetadataAttribute.TIMESTAMP_FIELD)); } - return new EsRelation(r.source(), r.index(), new ArrayList<>(attributes), IndexMode.STANDARD); + return new EsRelation(r.source(), r.indexPattern(), IndexMode.STANDARD, r.indexNameWithModes(), new ArrayList<>(attributes)); }); return new Aggregate(metrics.source(), child, Aggregate.AggregateType.STANDARD, metrics.groupings(), metrics.aggregates()); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java index d45d2e6680abd..2f28b1a0e41ba 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java @@ -104,8 +104,9 @@ private static PhysicalPlan rewrite( var query = Queries.combine(Queries.Clause.FILTER, asList(queryExec.query(), planQuery)); queryExec = new EsQueryExec( queryExec.source(), - queryExec.index(), + queryExec.indexPattern(), queryExec.indexMode(), + queryExec.indexNameWithModes(), queryExec.output(), query, queryExec.limit(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java index a8c5b99067849..adc6145ce2574 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java @@ -59,7 +59,7 @@ protected PhysicalPlan rule(AggregateExec aggregateExec, LocalPhysicalOptimizerC if (tuple.v2().size() == aggregateExec.aggregates().size()) { plan = new EsStatsQueryExec( aggregateExec.source(), - queryExec.index(), + queryExec.indexPattern(), queryExec.query(), queryExec.limit(), tuple.v1(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java index 11e386ddd046c..4f3358c539b05 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java @@ -53,6 +53,6 @@ protected PhysicalPlan rule(EsSourceExec plan) { attributes.add(ma); } }); - return new EsQueryExec(plan.source(), plan.index(), plan.indexMode(), attributes, plan.query()); + return new EsQueryExec(plan.source(), plan.indexPattern(), plan.indexMode(), plan.indexNameWithModes(), attributes, plan.query()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java index df0b258679d4c..90b3aa8625087 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java @@ -27,6 +27,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Set; + +import static org.elasticsearch.TransportVersions.ESQL_SKIP_ES_INDEX_SERIALIZATION; public class EsRelation extends LeafPlan { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( @@ -35,30 +38,41 @@ public class EsRelation extends LeafPlan { EsRelation::readFrom ); - private final EsIndex index; - private final List attrs; - private final boolean frozen; + private final String indexPattern; private final IndexMode indexMode; + private final Map indexNameWithModes; + private final List attrs; - public EsRelation(Source source, EsIndex index, IndexMode indexMode, boolean frozen) { - this(source, index, flatten(source, index.mapping()), indexMode, frozen); - } - - public EsRelation(Source source, EsIndex index, List attributes, IndexMode indexMode) { - this(source, index, attributes, indexMode, false); + public EsRelation(Source source, EsIndex index, IndexMode indexMode) { + this(source, index.name(), indexMode, index.indexNameWithModes(), flatten(source, index.mapping())); } - public EsRelation(Source source, EsIndex index, List attributes, IndexMode indexMode, boolean frozen) { + public EsRelation( + Source source, + String indexPattern, + IndexMode indexMode, + Map indexNameWithModes, + List attributes + ) { super(source); - this.index = index; - this.attrs = attributes; + this.indexPattern = indexPattern; this.indexMode = indexMode; - this.frozen = frozen; + this.indexNameWithModes = indexNameWithModes; + this.attrs = attributes; } private static EsRelation readFrom(StreamInput in) throws IOException { Source source = Source.readFrom((PlanStreamInput) in); - EsIndex esIndex = EsIndex.readFrom(in); + String indexPattern; + Map indexNameWithModes; + if (in.getTransportVersion().onOrAfter(ESQL_SKIP_ES_INDEX_SERIALIZATION)) { + indexPattern = in.readString(); + indexNameWithModes = in.readMap(IndexMode::readFrom); + } else { + var index = EsIndex.readFrom(in); + indexPattern = index.name(); + indexNameWithModes = index.indexNameWithModes(); + } List attributes = in.readNamedWriteableCollectionAsList(Attribute.class); if (supportingEsSourceOptions(in.getTransportVersion())) { // We don't do anything with these strings @@ -67,23 +81,32 @@ private static EsRelation readFrom(StreamInput in) throws IOException { in.readOptionalString(); } IndexMode indexMode = readIndexMode(in); - boolean frozen = in.readBoolean(); - return new EsRelation(source, esIndex, attributes, indexMode, frozen); + if (in.getTransportVersion().before(ESQL_SKIP_ES_INDEX_SERIALIZATION)) { + in.readBoolean(); + } + return new EsRelation(source, indexPattern, indexMode, indexNameWithModes, attributes); } @Override public void writeTo(StreamOutput out) throws IOException { Source.EMPTY.writeTo(out); - index().writeTo(out); - out.writeNamedWriteableCollection(output()); + if (out.getTransportVersion().onOrAfter(ESQL_SKIP_ES_INDEX_SERIALIZATION)) { + out.writeString(indexPattern); + out.writeMap(indexNameWithModes, (o, v) -> IndexMode.writeTo(v, out)); + } else { + new EsIndex(indexPattern, Map.of(), indexNameWithModes).writeTo(out); + } + out.writeNamedWriteableCollection(attrs); if (supportingEsSourceOptions(out.getTransportVersion())) { // write (null) string fillers expected by remote out.writeOptionalString(null); out.writeOptionalString(null); out.writeOptionalString(null); } - writeIndexMode(out, indexMode()); - out.writeBoolean(frozen()); + writeIndexMode(out, indexMode); + if (out.getTransportVersion().before(ESQL_SKIP_ES_INDEX_SERIALIZATION)) { + out.writeBoolean(false); + } } private static boolean supportingEsSourceOptions(TransportVersion version) { @@ -97,7 +120,7 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, EsRelation::new, index, attrs, indexMode, frozen); + return NodeInfo.create(this, EsRelation::new, indexPattern, indexMode, indexNameWithModes, attrs); } private static List flatten(Source source, Map mapping) { @@ -128,23 +151,27 @@ private static List flatten(Source source, Map mappi return list; } - public EsIndex index() { - return index; - } - - public boolean frozen() { - return frozen; + public String indexPattern() { + return indexPattern; } public IndexMode indexMode() { return indexMode; } + public Map indexNameWithModes() { + return indexNameWithModes; + } + @Override public List output() { return attrs; } + public Set concreteIndices() { + return indexNameWithModes.keySet(); + } + @Override public String commandName() { return "FROM"; @@ -159,7 +186,7 @@ public boolean expressionsResolved() { @Override public int hashCode() { - return Objects.hash(index, indexMode, frozen, attrs); + return Objects.hash(indexPattern, indexMode, indexNameWithModes, attrs); } @Override @@ -173,9 +200,9 @@ public boolean equals(Object obj) { } EsRelation other = (EsRelation) obj; - return Objects.equals(index, other.index) - && indexMode == other.indexMode() - && frozen == other.frozen + return Objects.equals(indexPattern, other.indexPattern) + && Objects.equals(indexMode, other.indexMode) + && Objects.equals(indexNameWithModes, other.indexNameWithModes) && Objects.equals(attrs, other.attrs); } @@ -183,7 +210,7 @@ public boolean equals(Object obj) { public String nodeString() { return nodeName() + "[" - + index + + indexPattern + "]" + (indexMode != IndexMode.STANDARD ? "[" + indexMode.name() + "]" : "") + NodeUtils.limitedToString(attrs); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java index ab533899aaff6..a3fc62d935795 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java @@ -36,22 +36,25 @@ import java.util.Map; import java.util.Objects; +import static org.elasticsearch.TransportVersions.ESQL_SKIP_ES_INDEX_SERIALIZATION; + public class EsQueryExec extends LeafExec implements EstimatesRowSize { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( PhysicalPlan.class, "EsQueryExec", - EsQueryExec::deserialize + EsQueryExec::readFrom ); public static final EsField DOC_ID_FIELD = new EsField("_doc", DataType.DOC_DATA_TYPE, Map.of(), false); public static final List NO_SORTS = List.of(); // only exists to mimic older serialization, but we no longer serialize sorts - private final EsIndex index; + private final String indexPattern; private final IndexMode indexMode; + private final Map indexNameWithModes; + private final List attrs; private final QueryBuilder query; private final Expression limit; private final List sorts; - private final List attrs; /** * Estimate of the number of bytes that'll be loaded per position before @@ -108,14 +111,22 @@ public FieldAttribute field() { } } - public EsQueryExec(Source source, EsIndex index, IndexMode indexMode, List attributes, QueryBuilder query) { - this(source, index, indexMode, attributes, query, null, null, null); + public EsQueryExec( + Source source, + String indexPattern, + IndexMode indexMode, + Map indexNameWithModes, + List attributes, + QueryBuilder query + ) { + this(source, indexPattern, indexMode, indexNameWithModes, attributes, query, null, null, null); } public EsQueryExec( Source source, - EsIndex index, + String indexPattern, IndexMode indexMode, + Map indexNameWithModes, List attrs, QueryBuilder query, Expression limit, @@ -123,10 +134,11 @@ public EsQueryExec( Integer estimatedRowSize ) { super(source); - this.index = index; + this.indexPattern = indexPattern; this.indexMode = indexMode; - this.query = query; + this.indexNameWithModes = indexNameWithModes; this.attrs = attrs; + this.query = query; this.limit = limit; this.sorts = sorts; this.estimatedRowSize = estimatedRowSize; @@ -136,9 +148,18 @@ public EsQueryExec( * The matching constructor is used during physical plan optimization and needs valid sorts. But we no longer serialize sorts. * If this cluster node is talking to an older instance it might receive a plan with sorts, but it will ignore them. */ - public static EsQueryExec deserialize(StreamInput in) throws IOException { + private static EsQueryExec readFrom(StreamInput in) throws IOException { var source = Source.readFrom((PlanStreamInput) in); - var index = EsIndex.readFrom(in); + String indexPattern; + Map indexNameWithModes; + if (in.getTransportVersion().onOrAfter(ESQL_SKIP_ES_INDEX_SERIALIZATION)) { + indexPattern = in.readString(); + indexNameWithModes = in.readMap(IndexMode::readFrom); + } else { + var index = EsIndex.readFrom(in); + indexPattern = index.name(); + indexNameWithModes = index.indexNameWithModes(); + } var indexMode = EsRelation.readIndexMode(in); var attrs = in.readNamedWriteableCollectionAsList(Attribute.class); var query = in.readOptionalNamedWriteable(QueryBuilder.class); @@ -146,7 +167,7 @@ public static EsQueryExec deserialize(StreamInput in) throws IOException { in.readOptionalCollectionAsList(EsQueryExec::readSort); var rowSize = in.readOptionalVInt(); // Ignore sorts from the old serialization format - return new EsQueryExec(source, index, indexMode, attrs, query, limit, NO_SORTS, rowSize); + return new EsQueryExec(source, indexPattern, indexMode, indexNameWithModes, attrs, query, limit, NO_SORTS, rowSize); } private static Sort readSort(StreamInput in) throws IOException { @@ -160,7 +181,12 @@ private static void writeSort(StreamOutput out, Sort sort) { @Override public void writeTo(StreamOutput out) throws IOException { Source.EMPTY.writeTo(out); - index().writeTo(out); + if (out.getTransportVersion().onOrAfter(ESQL_SKIP_ES_INDEX_SERIALIZATION)) { + out.writeString(indexPattern); + out.writeMap(indexNameWithModes, (o, v) -> IndexMode.writeTo(v, out)); + } else { + new EsIndex(indexPattern, Map.of(), indexNameWithModes).writeTo(out); + } EsRelation.writeIndexMode(out, indexMode()); out.writeNamedWriteableCollection(output()); out.writeOptionalNamedWriteable(query()); @@ -180,17 +206,32 @@ public static boolean isSourceAttribute(Attribute attr) { @Override protected NodeInfo info() { - return NodeInfo.create(this, EsQueryExec::new, index, indexMode, attrs, query, limit, sorts, estimatedRowSize); + return NodeInfo.create( + this, + EsQueryExec::new, + indexPattern, + indexMode, + indexNameWithModes, + attrs, + query, + limit, + sorts, + estimatedRowSize + ); } - public EsIndex index() { - return index; + public String indexPattern() { + return indexPattern; } public IndexMode indexMode() { return indexMode; } + public Map indexNameWithModes() { + return indexNameWithModes; + } + public QueryBuilder query() { return query; } @@ -234,13 +275,13 @@ public PhysicalPlan estimateRowSize(State state) { } return Objects.equals(this.estimatedRowSize, size) ? this - : new EsQueryExec(source(), index, indexMode, attrs, query, limit, sorts, size); + : new EsQueryExec(source(), indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts, size); } public EsQueryExec withLimit(Expression limit) { return Objects.equals(this.limit, limit) ? this - : new EsQueryExec(source(), index, indexMode, attrs, query, limit, sorts, estimatedRowSize); + : new EsQueryExec(source(), indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts, estimatedRowSize); } public boolean canPushSorts() { @@ -254,12 +295,12 @@ public EsQueryExec withSorts(List sorts) { } return Objects.equals(this.sorts, sorts) ? this - : new EsQueryExec(source(), index, indexMode, attrs, query, limit, sorts, estimatedRowSize); + : new EsQueryExec(source(), indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts, estimatedRowSize); } @Override public int hashCode() { - return Objects.hash(index, indexMode, attrs, query, limit, sorts); + return Objects.hash(indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts); } @Override @@ -273,8 +314,9 @@ public boolean equals(Object obj) { } EsQueryExec other = (EsQueryExec) obj; - return Objects.equals(index, other.index) + return Objects.equals(indexPattern, other.indexPattern) && Objects.equals(indexMode, other.indexMode) + && Objects.equals(indexNameWithModes, other.indexNameWithModes) && Objects.equals(attrs, other.attrs) && Objects.equals(query, other.query) && Objects.equals(limit, other.limit) @@ -286,7 +328,7 @@ public boolean equals(Object obj) { public String nodeString() { return nodeName() + "[" - + index + + indexPattern + "], " + "indexMode[" + indexMode diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsSourceExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsSourceExec.java index eeeafc52f158b..5da3ef9f72dd3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsSourceExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsSourceExec.java @@ -22,46 +22,71 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Objects; +import static org.elasticsearch.TransportVersions.ESQL_SKIP_ES_INDEX_SERIALIZATION; + public class EsSourceExec extends LeafExec { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( PhysicalPlan.class, "EsSourceExec", - EsSourceExec::new + EsSourceExec::readFrom ); - private final EsIndex index; + private final String indexPattern; + private final IndexMode indexMode; + private final Map indexNameWithModes; private final List attributes; private final QueryBuilder query; - private final IndexMode indexMode; public EsSourceExec(EsRelation relation) { - this(relation.source(), relation.index(), relation.output(), null, relation.indexMode()); + this(relation.source(), relation.indexPattern(), relation.indexMode(), relation.indexNameWithModes(), relation.output(), null); } - public EsSourceExec(Source source, EsIndex index, List attributes, QueryBuilder query, IndexMode indexMode) { + public EsSourceExec( + Source source, + String indexPattern, + IndexMode indexMode, + Map indexNameWithModes, + List attributes, + QueryBuilder query + ) { super(source); - this.index = index; + this.indexPattern = indexPattern; + this.indexMode = indexMode; + this.indexNameWithModes = indexNameWithModes; this.attributes = attributes; this.query = query; - this.indexMode = indexMode; } - private EsSourceExec(StreamInput in) throws IOException { - this( - Source.readFrom((PlanStreamInput) in), - EsIndex.readFrom(in), - in.readNamedWriteableCollectionAsList(Attribute.class), - in.readOptionalNamedWriteable(QueryBuilder.class), - EsRelation.readIndexMode(in) - ); + private static EsSourceExec readFrom(StreamInput in) throws IOException { + var source = Source.readFrom((PlanStreamInput) in); + String indexPattern; + Map indexNameWithModes; + if (in.getTransportVersion().onOrAfter(ESQL_SKIP_ES_INDEX_SERIALIZATION)) { + indexPattern = in.readString(); + indexNameWithModes = in.readMap(IndexMode::readFrom); + } else { + var index = EsIndex.readFrom(in); + indexPattern = index.name(); + indexNameWithModes = index.indexNameWithModes(); + } + var attributes = in.readNamedWriteableCollectionAsList(Attribute.class); + var query = in.readOptionalNamedWriteable(QueryBuilder.class); + var indexMode = EsRelation.readIndexMode(in); + return new EsSourceExec(source, indexPattern, indexMode, indexNameWithModes, attributes, query); } @Override public void writeTo(StreamOutput out) throws IOException { Source.EMPTY.writeTo(out); - index().writeTo(out); + if (out.getTransportVersion().onOrAfter(ESQL_SKIP_ES_INDEX_SERIALIZATION)) { + out.writeString(indexPattern); + out.writeMap(indexNameWithModes, (o, v) -> IndexMode.writeTo(v, out)); + } else { + new EsIndex(indexPattern, Map.of(), indexNameWithModes).writeTo(out); + } out.writeNamedWriteableCollection(output()); out.writeOptionalNamedWriteable(query()); EsRelation.writeIndexMode(out, indexMode()); @@ -72,18 +97,22 @@ public String getWriteableName() { return ENTRY.name; } - public EsIndex index() { - return index; - } - - public QueryBuilder query() { - return query; + public String indexPattern() { + return indexPattern; } public IndexMode indexMode() { return indexMode; } + public Map indexNameWithModes() { + return indexNameWithModes; + } + + public QueryBuilder query() { + return query; + } + @Override public List output() { return attributes; @@ -91,12 +120,12 @@ public List output() { @Override protected NodeInfo info() { - return NodeInfo.create(this, EsSourceExec::new, index, attributes, query, indexMode); + return NodeInfo.create(this, EsSourceExec::new, indexPattern, indexMode, indexNameWithModes, attributes, query); } @Override public int hashCode() { - return Objects.hash(index, attributes, query, indexMode); + return Objects.hash(indexPattern, indexMode, indexNameWithModes, attributes, query); } @Override @@ -110,14 +139,15 @@ public boolean equals(Object obj) { } EsSourceExec other = (EsSourceExec) obj; - return Objects.equals(index, other.index) + return Objects.equals(indexPattern, other.indexPattern) + && Objects.equals(indexMode, other.indexMode) + && Objects.equals(indexNameWithModes, other.indexNameWithModes) && Objects.equals(attributes, other.attributes) - && Objects.equals(query, other.query) - && Objects.equals(indexMode, other.indexMode); + && Objects.equals(query, other.query); } @Override public String nodeString() { - return nodeName() + "[" + index + "]" + NodeUtils.limitedToString(attributes); + return nodeName() + "[" + indexPattern + "]" + NodeUtils.limitedToString(attributes); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsStatsQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsStatsQueryExec.java index 5a98ecc7d6594..96214652b87cb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsStatsQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsStatsQueryExec.java @@ -16,7 +16,6 @@ import org.elasticsearch.xpack.esql.core.tree.NodeUtils; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.util.Queries; -import org.elasticsearch.xpack.esql.index.EsIndex; import java.io.IOException; import java.util.List; @@ -44,7 +43,7 @@ public QueryBuilder filter(QueryBuilder sourceQuery) { } } - private final EsIndex index; + private final String indexPattern; private final QueryBuilder query; private final Expression limit; private final List attrs; @@ -52,14 +51,14 @@ public QueryBuilder filter(QueryBuilder sourceQuery) { public EsStatsQueryExec( Source source, - EsIndex index, + String indexPattern, QueryBuilder query, Expression limit, List attributes, List stats ) { super(source); - this.index = index; + this.indexPattern = indexPattern; this.query = query; this.limit = limit; this.attrs = attributes; @@ -78,11 +77,7 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, EsStatsQueryExec::new, index, query, limit, attrs, stats); - } - - public EsIndex index() { - return index; + return NodeInfo.create(this, EsStatsQueryExec::new, indexPattern, query, limit, attrs, stats); } public QueryBuilder query() { @@ -113,7 +108,7 @@ public PhysicalPlan estimateRowSize(State state) { @Override public int hashCode() { - return Objects.hash(index, query, limit, attrs, stats); + return Objects.hash(indexPattern, query, limit, attrs, stats); } @Override @@ -127,7 +122,7 @@ public boolean equals(Object obj) { } EsStatsQueryExec other = (EsStatsQueryExec) obj; - return Objects.equals(index, other.index) + return Objects.equals(indexPattern, other.indexPattern) && Objects.equals(attrs, other.attrs) && Objects.equals(query, other.query) && Objects.equals(limit, other.limit) @@ -138,7 +133,7 @@ public boolean equals(Object obj) { public String nodeString() { return nodeName() + "[" - + index + + indexPattern + "], stats" + stats + "], query[" diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index ecd0284c7cb57..00bdf0a019096 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -562,7 +562,7 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan if (localSourceExec.indexMode() != IndexMode.LOOKUP) { throw new IllegalArgumentException("can't plan [" + join + "]"); } - Map indicesWithModes = localSourceExec.index().indexNameWithModes(); + Map indicesWithModes = localSourceExec.indexNameWithModes(); if (indicesWithModes.size() != 1) { throw new IllegalArgumentException("can't plan [" + join + "], found more than 1 index"); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index af5146f7b6926..7d6d7d9b3c8c9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -110,7 +110,7 @@ public static Set planConcreteIndices(PhysicalPlan plan) { return Set.of(); } var indices = new LinkedHashSet(); - forEachFromRelation(plan, relation -> indices.addAll(relation.index().concreteIndices())); + forEachFromRelation(plan, relation -> indices.addAll(relation.concreteIndices())); return indices; } @@ -122,7 +122,7 @@ public static String[] planOriginalIndices(PhysicalPlan plan) { return Strings.EMPTY_ARRAY; } var indices = new LinkedHashSet(); - forEachFromRelation(plan, relation -> indices.addAll(asList(Strings.commaDelimitedListToStringArray(relation.index().name())))); + forEachFromRelation(plan, relation -> indices.addAll(asList(Strings.commaDelimitedListToStringArray(relation.indexPattern())))); return indices.toArray(String[]::new); } @@ -192,7 +192,14 @@ public static PhysicalPlan localPlan( if (filter != null) { physicalFragment = physicalFragment.transformUp( EsSourceExec.class, - query -> new EsSourceExec(Source.EMPTY, query.index(), query.output(), filter, query.indexMode()) + query -> new EsSourceExec( + Source.EMPTY, + query.indexPattern(), + query.indexMode(), + query.indexNameWithModes(), + query.output(), + filter + ) ); } var localOptimized = physicalOptimizer.localOptimize(physicalFragment); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/QueryBuilderResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/QueryBuilderResolver.java index 91103ef286f72..7db81069f9d3c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/QueryBuilderResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/QueryBuilderResolver.java @@ -99,9 +99,7 @@ private Set fullTextFunctions(LogicalPlan plan) { public Set indexNames(LogicalPlan plan) { Holder> indexNames = new Holder<>(); - - plan.forEachDown(EsRelation.class, esRelation -> { indexNames.set(esRelation.index().concreteIndices()); }); - + plan.forEachDown(EsRelation.class, esRelation -> indexNames.set(esRelation.concreteIndices())); return indexNames.get(); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index feab6bbbe56a6..1c3b3a5c463e7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -115,7 +115,7 @@ public void testIndexResolution() { var plan = analyzer.analyze(UNRESOLVED_RELATION); var limit = as(plan, Limit.class); - assertEquals(new EsRelation(EMPTY, idx, NO_FIELDS, IndexMode.STANDARD), limit.child()); + assertEquals(new EsRelation(EMPTY, idx.name(), IndexMode.STANDARD, idx.indexNameWithModes(), NO_FIELDS), limit.child()); } public void testFailOnUnresolvedIndex() { @@ -133,7 +133,7 @@ public void testIndexWithClusterResolution() { var plan = analyzer.analyze(UNRESOLVED_RELATION); var limit = as(plan, Limit.class); - assertEquals(new EsRelation(EMPTY, idx, NO_FIELDS, IndexMode.STANDARD), limit.child()); + assertEquals(new EsRelation(EMPTY, idx.name(), IndexMode.STANDARD, idx.indexNameWithModes(), NO_FIELDS), limit.child()); } public void testAttributeResolution() { @@ -2071,7 +2071,7 @@ public void testLookup() { assertThat(project.projections().stream().map(Object::toString).toList(), hasItem(matchesRegex("languages\\{f}#\\d+ AS int#\\d+"))); var esRelation = as(project.child(), EsRelation.class); - assertThat(esRelation.index().name(), equalTo("test")); + assertThat(esRelation.indexPattern(), equalTo("test")); // Lookup's output looks sensible too assertMap( @@ -2583,7 +2583,7 @@ public void testFromEnrichAndMatchColonUsage() { assertEquals(enrich.policy().getMatchField(), "language_code"); var eval = as(enrich.child(), Eval.class); var esRelation = as(eval.child(), EsRelation.class); - assertEquals(esRelation.index().name(), "test"); + assertEquals(esRelation.indexPattern(), "test"); } public void testMapExpressionAsFunctionArgument() { @@ -2611,7 +2611,7 @@ public void testMapExpressionAsFunctionArgument() { assertEquals(new Literal(EMPTY, 2.0, DataType.DOUBLE), ee.value()); assertEquals(DataType.DOUBLE, ee.dataType()); EsRelation esRelation = as(eval.child(), EsRelation.class); - assertEquals(esRelation.index().name(), "test"); + assertEquals(esRelation.indexPattern(), "test"); } private void verifyMapExpression(MapExpression me) { @@ -2692,7 +2692,6 @@ private void assertEmptyEsRelation(LogicalPlan plan) { assertThat(plan, instanceOf(EsRelation.class)); EsRelation esRelation = (EsRelation) plan; assertThat(esRelation.output(), equalTo(NO_FIELDS)); - assertTrue(esRelation.index().mapping().isEmpty()); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/index/EsIndexSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/index/EsIndexSerializationTests.java index 8f846edf2b41c..2cc3fc2251409 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/index/EsIndexSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/index/EsIndexSerializationTests.java @@ -27,13 +27,14 @@ import java.util.TreeMap; import java.util.TreeSet; +import static org.elasticsearch.core.Tuple.tuple; import static org.elasticsearch.test.ByteSizeEqualsMatcher.byteSizeEquals; public class EsIndexSerializationTests extends AbstractWireSerializingTestCase { public static EsIndex randomEsIndex() { String name = randomAlphaOfLength(5); Map mapping = randomMapping(); - return new EsIndex(name, mapping, randomConcreteIndices()); + return new EsIndex(name, mapping, randomIndexNameWithModes()); } private static Map randomMapping() { @@ -45,13 +46,8 @@ private static Map randomMapping() { return result; } - private static Map randomConcreteIndices() { - int size = between(0, 10); - Map result = new HashMap<>(size); - while (result.size() < size) { - result.put(randomAlphaOfLength(5), randomFrom(IndexMode.values())); - } - return result; + public static Map randomIndexNameWithModes() { + return randomMap(0, 10, () -> tuple(randomIdentifier(), randomFrom(IndexMode.values()))); } @Override @@ -77,7 +73,10 @@ protected EsIndex mutateInstance(EsIndex instance) throws IOException { switch (between(0, 2)) { case 0 -> name = randomValueOtherThan(name, () -> randomAlphaOfLength(5)); case 1 -> mapping = randomValueOtherThan(mapping, EsIndexSerializationTests::randomMapping); - case 2 -> indexedNameWithModes = randomValueOtherThan(indexedNameWithModes, EsIndexSerializationTests::randomConcreteIndices); + case 2 -> indexedNameWithModes = randomValueOtherThan( + indexedNameWithModes, + EsIndexSerializationTests::randomIndexNameWithModes + ); default -> throw new IllegalArgumentException(); } return new EsIndex(name, mapping, indexedNameWithModes); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java index e41dff84b51bb..310d680cfbf41 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java @@ -580,6 +580,6 @@ protected List filteredWarnings() { } public static EsRelation relation() { - return new EsRelation(EMPTY, new EsIndex(randomAlphaOfLength(8), emptyMap()), randomFrom(IndexMode.values()), randomBoolean()); + return new EsRelation(EMPTY, new EsIndex(randomAlphaOfLength(8), emptyMap()), randomFrom(IndexMode.values())); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index b48c6376e6741..95acc84143614 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -6929,6 +6929,6 @@ public void testMapExpressionAsFunctionArgument() { assertEquals(DataType.DOUBLE, ee.dataType()); Limit limit = as(eval.child(), Limit.class); EsRelation esRelation = as(limit.child(), EsRelation.class); - assertEquals(esRelation.index().name(), "test"); + assertEquals(esRelation.indexPattern(), "test"); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 8c85f4220ccc1..1eb7f43ee72ba 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -2603,10 +2603,16 @@ public void testFieldExtractWithoutSourceAttributes() { | where round(emp_no) > 10 """)); // Transform the verified plan so that it is invalid (i.e. no source attributes) - List emptyAttrList = List.of(); var badPlan = verifiedPlan.transformDown( EsQueryExec.class, - node -> new EsSourceExec(node.source(), node.index(), emptyAttrList, node.query(), IndexMode.STANDARD) + node -> new EsSourceExec( + node.source(), + node.indexPattern(), + IndexMode.STANDARD, + node.indexNameWithModes(), + List.of(), + node.query() + ) ); var e = expectThrows(VerificationException.class, () -> physicalPlanOptimizer.verify(badPlan)); @@ -2728,8 +2734,7 @@ public void testProjectAwayColumns() { new EsField("some_field2", DataType.KEYWORD, Map.of(), true) ) ), - IndexMode.STANDARD, - false + IndexMode.STANDARD ); Attribute some_field1 = relation.output().get(0); Attribute some_field2 = relation.output().get(1); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java index d01b712a5a345..3e0ae4f97e405 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThanOrEqual; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan; -import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; @@ -33,9 +32,9 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.esql.EsqlTestUtils.FOUR; import static org.elasticsearch.xpack.esql.EsqlTestUtils.ONE; @@ -251,12 +250,6 @@ private static EsRelation relation() { } private static EsRelation relation(List fieldAttributes) { - return new EsRelation( - EMPTY, - new EsIndex(randomAlphaOfLength(8), emptyMap()), - fieldAttributes, - randomFrom(IndexMode.values()), - randomBoolean() - ); + return new EsRelation(EMPTY, randomIdentifier(), randomFrom(IndexMode.values()), Map.of(), fieldAttributes); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java index 90c8ae1032325..b7eadc243d977 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.StDistance; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add; -import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EvalExec; @@ -582,9 +581,8 @@ public TestPhysicalPlanBuilder limit(int limit) { } public TopNExec build() { - EsIndex esIndex = new EsIndex(this.index, Map.of()); List attributes = new ArrayList<>(fields.values()); - PhysicalPlan child = new EsQueryExec(Source.EMPTY, esIndex, indexMode, attributes, null, null, List.of(), 0); + PhysicalPlan child = new EsQueryExec(Source.EMPTY, this.index, indexMode, Map.of(), attributes, null, null, List.of(), 0); if (aliases.isEmpty() == false) { child = new EvalExec(Source.EMPTY, child, aliases); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/EsRelationSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/EsRelationSerializationTests.java index fa3038b24e8eb..18cd9716480e6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/EsRelationSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/EsRelationSerializationTests.java @@ -8,22 +8,25 @@ package org.elasticsearch.xpack.esql.plan.logical; import org.elasticsearch.index.IndexMode; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.core.expression.Attribute; -import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.EsIndexSerializationTests; import java.io.IOException; import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.esql.index.EsIndexSerializationTests.randomIndexNameWithModes; public class EsRelationSerializationTests extends AbstractLogicalPlanSerializationTests { public static EsRelation randomEsRelation() { - Source source = randomSource(); - EsIndex index = EsIndexSerializationTests.randomEsIndex(); - List attributes = randomFieldAttributes(0, 10, false); - IndexMode indexMode = randomFrom(IndexMode.values()); - boolean frozen = randomBoolean(); - return new EsRelation(source, index, attributes, indexMode, frozen); + return new EsRelation( + randomSource(), + randomIdentifier(), + randomFrom(IndexMode.values()), + randomIndexNameWithModes(), + randomFieldAttributes(0, 10, false) + ); } @Override @@ -33,18 +36,18 @@ protected EsRelation createTestInstance() { @Override protected EsRelation mutateInstance(EsRelation instance) throws IOException { - EsIndex index = instance.index(); - List attributes = instance.output(); + String indexPattern = instance.indexPattern(); IndexMode indexMode = instance.indexMode(); - boolean frozen = instance.frozen(); + Map indexNameWithModes = instance.indexNameWithModes(); + List attributes = instance.output(); switch (between(0, 3)) { - case 0 -> index = randomValueOtherThan(index, EsIndexSerializationTests::randomEsIndex); - case 1 -> attributes = randomValueOtherThan(attributes, () -> randomFieldAttributes(0, 10, false)); - case 2 -> indexMode = randomValueOtherThan(indexMode, () -> randomFrom(IndexMode.values())); - case 3 -> frozen = false == frozen; + case 0 -> indexPattern = randomValueOtherThan(indexPattern, ESTestCase::randomIdentifier); + case 1 -> indexMode = randomValueOtherThan(indexMode, () -> randomFrom(IndexMode.values())); + case 2 -> indexNameWithModes = randomValueOtherThan(indexNameWithModes, EsIndexSerializationTests::randomIndexNameWithModes); + case 3 -> attributes = randomValueOtherThan(attributes, () -> randomFieldAttributes(0, 10, false)); default -> throw new IllegalArgumentException(); } - return new EsRelation(instance.source(), index, attributes, indexMode, frozen); + return new EsRelation(instance.source(), indexPattern, indexMode, indexNameWithModes, attributes); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExecSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExecSerializationTests.java index 6104069769085..eb53a57d3bdfb 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExecSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExecSerializationTests.java @@ -14,24 +14,28 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Literal; -import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; -import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.EsIndexSerializationTests; import java.io.IOException; import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.esql.index.EsIndexSerializationTests.randomIndexNameWithModes; public class EsQueryExecSerializationTests extends AbstractPhysicalPlanSerializationTests { public static EsQueryExec randomEsQueryExec() { - Source source = randomSource(); - EsIndex index = EsIndexSerializationTests.randomEsIndex(); - IndexMode indexMode = randomFrom(IndexMode.values()); - List attrs = randomFieldAttributes(1, 10, false); - QueryBuilder query = randomQuery(); - Expression limit = new Literal(randomSource(), between(0, Integer.MAX_VALUE), DataType.INTEGER); - Integer estimatedRowSize = randomEstimatedRowSize(); - return new EsQueryExec(source, index, indexMode, attrs, query, limit, EsQueryExec.NO_SORTS, estimatedRowSize); + return new EsQueryExec( + randomSource(), + randomIdentifier(), + randomFrom(IndexMode.values()), + randomIndexNameWithModes(), + randomFieldAttributes(1, 10, false), + randomQuery(), + new Literal(randomSource(), between(0, Integer.MAX_VALUE), DataType.INTEGER), + EsQueryExec.NO_SORTS, + randomEstimatedRowSize() + ); } public static QueryBuilder randomQuery() { @@ -45,27 +49,39 @@ protected EsQueryExec createTestInstance() { @Override protected EsQueryExec mutateInstance(EsQueryExec instance) throws IOException { - EsIndex index = instance.index(); + String indexPattern = instance.indexPattern(); IndexMode indexMode = instance.indexMode(); + Map indexNameWithModes = instance.indexNameWithModes(); List attrs = instance.attrs(); QueryBuilder query = instance.query(); Expression limit = instance.limit(); Integer estimatedRowSize = instance.estimatedRowSize(); - switch (between(0, 5)) { - case 0 -> index = randomValueOtherThan(index, EsIndexSerializationTests::randomEsIndex); + switch (between(0, 6)) { + case 0 -> indexPattern = randomValueOtherThan(indexPattern, EsIndexSerializationTests::randomIdentifier); case 1 -> indexMode = randomValueOtherThan(indexMode, () -> randomFrom(IndexMode.values())); - case 2 -> attrs = randomValueOtherThan(attrs, () -> randomFieldAttributes(1, 10, false)); - case 3 -> query = randomValueOtherThan(query, EsQueryExecSerializationTests::randomQuery); - case 4 -> limit = randomValueOtherThan( + case 2 -> indexNameWithModes = randomValueOtherThan(indexNameWithModes, EsIndexSerializationTests::randomIndexNameWithModes); + case 3 -> attrs = randomValueOtherThan(attrs, () -> randomFieldAttributes(1, 10, false)); + case 4 -> query = randomValueOtherThan(query, EsQueryExecSerializationTests::randomQuery); + case 5 -> limit = randomValueOtherThan( limit, () -> new Literal(randomSource(), between(0, Integer.MAX_VALUE), DataType.INTEGER) ); - case 5 -> estimatedRowSize = randomValueOtherThan( + case 6 -> estimatedRowSize = randomValueOtherThan( estimatedRowSize, AbstractPhysicalPlanSerializationTests::randomEstimatedRowSize ); } - return new EsQueryExec(instance.source(), index, indexMode, attrs, query, limit, EsQueryExec.NO_SORTS, estimatedRowSize); + return new EsQueryExec( + instance.source(), + indexPattern, + indexMode, + indexNameWithModes, + attrs, + query, + limit, + EsQueryExec.NO_SORTS, + estimatedRowSize + ); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsSourceExecSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsSourceExecSerializationTests.java index 253127cc7ee95..a072a2e23a506 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsSourceExecSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsSourceExecSerializationTests.java @@ -10,24 +10,26 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.core.expression.Attribute; -import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.EsIndexSerializationTests; import java.io.IOException; import java.util.List; +import java.util.Map; -import static org.elasticsearch.xpack.esql.plan.logical.AbstractLogicalPlanSerializationTests.randomFieldAttributes; +import static org.elasticsearch.xpack.esql.index.EsIndexSerializationTests.randomIndexNameWithModes; public class EsSourceExecSerializationTests extends AbstractPhysicalPlanSerializationTests { public static EsSourceExec randomEsSourceExec() { - Source source = randomSource(); - EsIndex index = EsIndexSerializationTests.randomEsIndex(); - List attributes = randomFieldAttributes(1, 10, false); - QueryBuilder query = new TermQueryBuilder(randomAlphaOfLength(5), randomAlphaOfLength(5)); - IndexMode indexMode = randomFrom(IndexMode.values()); - return new EsSourceExec(source, index, attributes, query, indexMode); + return new EsSourceExec( + randomSource(), + randomIdentifier(), + randomFrom(IndexMode.values()), + randomIndexNameWithModes(), + randomFieldAttributes(1, 10, false), + new TermQueryBuilder(randomAlphaOfLength(5), randomAlphaOfLength(5)) + ); } @Override @@ -37,18 +39,20 @@ protected EsSourceExec createTestInstance() { @Override protected EsSourceExec mutateInstance(EsSourceExec instance) throws IOException { - EsIndex index = instance.index(); + String indexPattern = instance.indexPattern(); + IndexMode indexMode = instance.indexMode(); + Map indexNameWithModes = instance.indexNameWithModes(); List attributes = instance.output(); QueryBuilder query = instance.query(); - IndexMode indexMode = instance.indexMode(); - switch (between(0, 3)) { - case 0 -> index = randomValueOtherThan(index, EsIndexSerializationTests::randomEsIndex); - case 1 -> attributes = randomValueOtherThan(attributes, () -> randomFieldAttributes(1, 10, false)); - case 2 -> query = randomValueOtherThan(query, () -> new TermQueryBuilder(randomAlphaOfLength(5), randomAlphaOfLength(5))); - case 3 -> indexMode = randomValueOtherThan(indexMode, () -> randomFrom(IndexMode.values())); + switch (between(0, 4)) { + case 0 -> indexPattern = randomValueOtherThan(indexPattern, ESTestCase::randomIdentifier); + case 1 -> indexMode = randomValueOtherThan(indexMode, () -> randomFrom(IndexMode.values())); + case 2 -> indexNameWithModes = randomValueOtherThan(indexNameWithModes, EsIndexSerializationTests::randomIndexNameWithModes); + case 3 -> attributes = randomValueOtherThan(attributes, () -> randomFieldAttributes(1, 10, false)); + case 4 -> query = randomValueOtherThan(query, () -> new TermQueryBuilder(randomAlphaOfLength(5), randomAlphaOfLength(5))); default -> throw new IllegalStateException(); } - return new EsSourceExec(instance.source(), index, attributes, query, indexMode); + return new EsSourceExec(instance.source(), indexPattern, indexMode, indexNameWithModes, attributes, query); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExecSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExecSerializationTests.java index f8e12cd4f5ba9..1e930e1da82e8 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExecSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExecSerializationTests.java @@ -25,7 +25,11 @@ import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.IntStream; +import static java.util.stream.Collectors.toMap; import static org.elasticsearch.test.ByteSizeEqualsMatcher.byteSizeEquals; import static org.hamcrest.Matchers.equalTo; @@ -66,14 +70,15 @@ protected boolean alwaysEmptySource() { * See {@link #testManyTypeConflicts(boolean, ByteSizeValue)} for more. */ public void testManyTypeConflicts() throws IOException { - testManyTypeConflicts(false, ByteSizeValue.ofBytes(1424046L)); /* * History: * 2.3mb - shorten error messages for UnsupportedAttributes #111973 * 1.8mb - cache EsFields #112008 * 1.4mb - string serialization #112929 * 1424046b - remove node-level plan #117422 + * 1040607b - remove EsIndex mapping serialization #119580 */ + testManyTypeConflicts(false, ByteSizeValue.ofBytes(1040607)); } /** @@ -81,7 +86,6 @@ public void testManyTypeConflicts() throws IOException { * See {@link #testManyTypeConflicts(boolean, ByteSizeValue)} for more. */ public void testManyTypeConflictsWithParent() throws IOException { - testManyTypeConflicts(true, ByteSizeValue.ofBytes(2774190)); /* * History: * 2 gb+ - start @@ -91,7 +95,9 @@ public void testManyTypeConflictsWithParent() throws IOException { * 2774214b - string serialization #112929 * 2774192b - remove field attribute #112881 * 2774190b - remove node-level plan #117422 + * 2007288b - remove EsIndex mapping serialization #119580 */ + testManyTypeConflicts(true, ByteSizeValue.ofBytes(2007288)); } private void testManyTypeConflicts(boolean withParent, ByteSizeValue expected) throws IOException { @@ -105,19 +111,19 @@ private void testManyTypeConflicts(boolean withParent, ByteSizeValue expected) t * with a single root field that has many children, grandchildren etc. */ public void testDeeplyNestedFields() throws IOException { - ByteSizeValue expected = ByteSizeValue.ofBytes(47252409); /* * History: * 48223371b - string serialization #112929 * 47252411b - remove field attribute #112881 - * 47252409b - remove node-level plan + * 47252409b - remove node-level plan #117422 + * 43927169b - remove EsIndex mapping serialization #119580 */ int depth = 6; int childrenPerLevel = 8; EsIndex index = EsIndexSerializationTests.deeplyNestedIndex(depth, childrenPerLevel); - testSerializePlanWithIndex(index, expected); + testSerializePlanWithIndex(index, ByteSizeValue.ofBytes(43927169)); } /** @@ -126,19 +132,39 @@ public void testDeeplyNestedFields() throws IOException { * with a single root field that has many children, grandchildren etc. */ public void testDeeplyNestedFieldsKeepOnlyOne() throws IOException { - ByteSizeValue expected = ByteSizeValue.ofBytes(9425804); /* * History: * 9426058b - string serialization #112929 * 9425806b - remove field attribute #112881 * 9425804b - remove node-level plan #117422 + * 352b - remove EsIndex mapping serialization #119580 */ int depth = 6; int childrenPerLevel = 9; EsIndex index = EsIndexSerializationTests.deeplyNestedIndex(depth, childrenPerLevel); - testSerializePlanWithIndex(index, expected, false); + testSerializePlanWithIndex(index, ByteSizeValue.ofBytes(352), false); + } + + /** + * Test the size of serializing a plan like + * FROM index* | LIMIT 10 | KEEP one_single_field + * with an index pattern pointing to a hundred actual indices with rather long names + */ + public void testIndexPatternTargetingMultipleIndices() throws IOException { + /* + * History: 4996b - initial + */ + + var index = new EsIndex( + "index*", + Map.of(), + IntStream.range(0, 100) + .mapToObj(i -> "partial-.ds-index-service-logs-2025.01.01-000" + i) + .collect(toMap(Function.identity(), i -> IndexMode.STANDARD)) + ); + testSerializePlanWithIndex(index, ByteSizeValue.ofBytes(4996)); } /** @@ -165,8 +191,8 @@ private void testSerializePlanWithIndex(EsIndex index, ByteSizeValue expected) t private void testSerializePlanWithIndex(EsIndex index, ByteSizeValue expected, boolean keepAllFields) throws IOException { List allAttributes = Analyzer.mappingAsAttributes(randomSource(), index.mapping()); - List keepAttributes = keepAllFields ? allAttributes : List.of(allAttributes.get(0)); - EsRelation relation = new EsRelation(randomSource(), index, keepAttributes, IndexMode.STANDARD); + List keepAttributes = keepAllFields || allAttributes.isEmpty() ? allAttributes : List.of(allAttributes.getFirst()); + EsRelation relation = new EsRelation(randomSource(), index.name(), IndexMode.STANDARD, index.indexNameWithModes(), keepAttributes); Limit limit = new Limit(randomSource(), new Literal(randomSource(), 10, DataType.INTEGER), relation); Project project = new Project(randomSource(), limit, limit.output()); FragmentExec fragmentExec = new FragmentExec(project); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 337d2c77ae5a5..4ef51d44b9b34 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -85,7 +85,17 @@ public void testLuceneSourceOperatorHugeRowSize() throws IOException { int estimatedRowSize = randomEstimatedRowSize(estimatedRowSizeIsHuge); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( FoldContext.small(), - new EsQueryExec(Source.EMPTY, index(), IndexMode.STANDARD, List.of(), null, null, null, estimatedRowSize) + new EsQueryExec( + Source.EMPTY, + index().name(), + IndexMode.STANDARD, + index().indexNameWithModes(), + List.of(), + null, + null, + null, + estimatedRowSize + ) ); assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency())); LocalExecutionPlanner.DriverSupplier supplier = plan.driverFactories.get(0).driverSupplier(); @@ -101,7 +111,17 @@ public void testLuceneTopNSourceOperator() throws IOException { Literal limit = new Literal(Source.EMPTY, 10, DataType.INTEGER); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( FoldContext.small(), - new EsQueryExec(Source.EMPTY, index(), IndexMode.STANDARD, List.of(), null, limit, List.of(sort), estimatedRowSize) + new EsQueryExec( + Source.EMPTY, + index().name(), + IndexMode.STANDARD, + index().indexNameWithModes(), + List.of(), + null, + limit, + List.of(sort), + estimatedRowSize + ) ); assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency())); LocalExecutionPlanner.DriverSupplier supplier = plan.driverFactories.get(0).driverSupplier(); @@ -117,7 +137,17 @@ public void testLuceneTopNSourceOperatorDistanceSort() throws IOException { Literal limit = new Literal(Source.EMPTY, 10, DataType.INTEGER); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( FoldContext.small(), - new EsQueryExec(Source.EMPTY, index(), IndexMode.STANDARD, List.of(), null, limit, List.of(sort), estimatedRowSize) + new EsQueryExec( + Source.EMPTY, + index().name(), + IndexMode.STANDARD, + index().indexNameWithModes(), + List.of(), + null, + limit, + List.of(sort), + estimatedRowSize + ) ); assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency())); LocalExecutionPlanner.DriverSupplier supplier = plan.driverFactories.get(0).driverSupplier(); diff --git a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsIndexModeCustomSettingsIT.java b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsIndexModeCustomSettingsIT.java index 99acbec04551e..b5a3ff482c3cf 100644 --- a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsIndexModeCustomSettingsIT.java +++ b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsIndexModeCustomSettingsIT.java @@ -122,6 +122,14 @@ public void testConfigureStoredSourceBeforeIndexCreation() throws IOException { var mapping = getMapping(client, getDataStreamBackingIndex(client, "logs-custom-dev", 0)); String sourceMode = (String) subObject("_source").apply(mapping).get("mode"); assertThat(sourceMode, equalTo("stored")); + + request = new Request("GET", "/_migration/deprecations"); + var nodeSettings = (Map) ((List) entityAsMap(client.performRequest(request)).get("node_settings")).getFirst(); + assertThat(nodeSettings.get("message"), equalTo(SourceFieldMapper.DEPRECATION_WARNING)); + assertThat( + (String) nodeSettings.get("details"), + containsString(SourceFieldMapper.DEPRECATION_WARNING + " Affected component templates: [logs@custom]") + ); } public void testConfigureDisabledSourceBeforeIndexCreation() { @@ -196,6 +204,14 @@ public void testConfigureStoredSourceWhenIndexIsCreated() throws IOException { var mapping = getMapping(client, getDataStreamBackingIndex(client, "logs-custom-dev", 0)); String sourceMode = (String) subObject("_source").apply(mapping).get("mode"); assertThat(sourceMode, equalTo("stored")); + + request = new Request("GET", "/_migration/deprecations"); + var nodeSettings = (Map) ((List) entityAsMap(client.performRequest(request)).get("node_settings")).getFirst(); + assertThat(nodeSettings.get("message"), equalTo(SourceFieldMapper.DEPRECATION_WARNING)); + assertThat( + (String) nodeSettings.get("details"), + containsString(SourceFieldMapper.DEPRECATION_WARNING + " Affected component templates: [logs@custom]") + ); } public void testConfigureDisabledSourceWhenIndexIsCreated() throws IOException {