Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into poc_mark_read_only
Browse files Browse the repository at this point in the history
  • Loading branch information
henningandersen committed Jan 21, 2025
2 parents 2b2f329 + f4bbe75 commit 80c66f8
Show file tree
Hide file tree
Showing 47 changed files with 723 additions and 422 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/119580.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119580
summary: Do not serialize `EsIndex` in plan
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
Expand Down Expand Up @@ -84,37 +81,6 @@ static void urlOpenConnectionWithProxy() throws URISyntaxException, IOException
assert urlConnection != null;
}

static void httpClientSend() throws InterruptedException {
try (HttpClient httpClient = HttpClient.newBuilder().build()) {
// Shutdown the client, so the send action will shortcut before actually executing any network operation
// (but after it run our check in the prologue)
httpClient.shutdown();
try {
httpClient.send(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding());
} catch (IOException e) {
// Expected, since we shut down the client.
// "send" will be called and exercise the Entitlement check, we don't care if it fails afterward for this known reason.
}
}
}

static void httpClientSendAsync() {
try (HttpClient httpClient = HttpClient.newBuilder().build()) {
// Shutdown the client, so the send action will return before actually executing any network operation
// (but after it run our check in the prologue)
httpClient.shutdown();
var future = httpClient.sendAsync(
HttpRequest.newBuilder(URI.create("http://localhost")).build(),
HttpResponse.BodyHandlers.discarding()
);
assert future.isCompletedExceptionally();
future.exceptionally(ex -> {
assert ex instanceof IOException;
return null;
});
}
}

static void createLDAPCertStore() throws NoSuchAlgorithmException {
try {
// We pass down null params to provoke a InvalidAlgorithmParameterException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ static CheckAction alwaysDenied(CheckedRunnable<Exception> action) {
entry("server_socket_accept", forPlugins(NetworkAccessCheckActions::serverSocketAccept)),

entry("url_open_connection_proxy", forPlugins(NetworkAccessCheckActions::urlOpenConnectionWithProxy)),
entry("http_client_send", forPlugins(NetworkAccessCheckActions::httpClientSend)),
entry("http_client_send_async", forPlugins(NetworkAccessCheckActions::httpClientSendAsync)),
entry("http_client_send", forPlugins(VersionSpecificNetworkChecks::httpClientSend)),
entry("http_client_send_async", forPlugins(VersionSpecificNetworkChecks::httpClientSendAsync)),
entry("create_ldap_cert_store", forPlugins(NetworkAccessCheckActions::createLDAPCertStore)),

entry("server_socket_channel_bind", forPlugins(NetworkAccessCheckActions::serverSocketChannelBind)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@

package org.elasticsearch.entitlement.qa.common;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;

class VersionSpecificNetworkChecks {
static void createInetAddressResolverProvider() {}

static void httpClientSend() throws InterruptedException {
HttpClient httpClient = HttpClient.newBuilder().build();
try {
httpClient.send(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding());
} catch (IOException e) {
// Expected, the send action may fail with these parameters (but after it run the entitlement check in the prologue)
}
}

static void httpClientSendAsync() {
HttpClient httpClient = HttpClient.newBuilder().build();
httpClient.sendAsync(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@

package org.elasticsearch.entitlement.qa.common;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.spi.InetAddressResolver;
import java.net.spi.InetAddressResolverProvider;

Expand All @@ -26,4 +31,18 @@ public String name() {
}
};
}

static void httpClientSend() throws InterruptedException {
HttpClient httpClient = HttpClient.newBuilder().build();
try {
httpClient.send(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding());
} catch (IOException e) {
// Expected, the send action may fail with these parameters (but after it run the entitlement check in the prologue)
}
}

static void httpClientSendAsync() {
HttpClient httpClient = HttpClient.newBuilder().build();
httpClient.sendAsync(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.entitlement.qa.common;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.spi.InetAddressResolver;
import java.net.spi.InetAddressResolverProvider;

class VersionSpecificNetworkChecks {
static void createInetAddressResolverProvider() {
var x = new InetAddressResolverProvider() {
@Override
public InetAddressResolver get(Configuration configuration) {
return null;
}

@Override
public String name() {
return "TEST";
}
};
}

static void httpClientSend() throws InterruptedException {
try (HttpClient httpClient = HttpClient.newBuilder().build()) {
// Shutdown the client, so the send action will shortcut before actually executing any network operation
// (but after it run our check in the prologue)
httpClient.shutdown();
try {
httpClient.send(HttpRequest.newBuilder(URI.create("http://localhost")).build(), HttpResponse.BodyHandlers.discarding());
} catch (IOException e) {
// Expected, since we shut down the client
}
}
}

static void httpClientSendAsync() {
try (HttpClient httpClient = HttpClient.newBuilder().build()) {
// Shutdown the client, so the send action will return before actually executing any network operation
// (but after it run our check in the prologue)
httpClient.shutdown();
var future = httpClient.sendAsync(
HttpRequest.newBuilder(URI.create("http://localhost")).build(),
HttpResponse.BodyHandlers.discarding()
);
assert future.isCompletedExceptionally();
future.exceptionally(ex -> {
assert ex instanceof IOException;
return null;
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ public void addTracingHandler(ChunkHandler chunkHandler) {

@Override
public void next() {
assert closing == false : "cannot request next chunk on closing stream";
assert handler != null : "handler must be set before requesting next chunk";
requestContext = threadContext.newStoredContext();
channel.eventLoop().submit(() -> {
activityTracker.startActivity();
requested = true;
try {
if (closing) {
return;
}
if (buf == null) {
channel.read();
} else {
Expand Down
17 changes: 11 additions & 6 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,6 @@ tests:
issue: https://github.com/elastic/elasticsearch/issues/118374
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
issue: https://github.com/elastic/elasticsearch/issues/118238
- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests
method: testInvalidJSON
issue: https://github.com/elastic/elasticsearch/issues/116521
- class: org.elasticsearch.xpack.ccr.rest.ShardChangesRestIT
method: testShardChangesNoOperation
issue: https://github.com/elastic/elasticsearch/issues/118800
Expand Down Expand Up @@ -244,11 +241,19 @@ tests:
- class: org.elasticsearch.search.ccs.CrossClusterIT
method: testCancel
issue: https://github.com/elastic/elasticsearch/issues/108061
- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT
method: testOversizedChunkedEncoding
issue: https://github.com/elastic/elasticsearch/issues/120444
- class: org.elasticsearch.xpack.logsdb.seqno.RetentionLeaseRestIT
issue: https://github.com/elastic/elasticsearch/issues/120434
- class: org.elasticsearch.entitlement.qa.EntitlementsAllowedIT
method: testCheckActionWithPolicyPass {pathPrefix=allowed actionName=create_ldap_cert_store}
issue: https://github.com/elastic/elasticsearch/issues/120422
- class: org.elasticsearch.entitlement.qa.EntitlementsAllowedIT
method: testCheckActionWithPolicyPass {pathPrefix=allowed_nonmodular actionName=create_ldap_cert_store}
issue: https://github.com/elastic/elasticsearch/issues/120423
- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests
method: testInvalidJSON
issue: https://github.com/elastic/elasticsearch/issues/120482
- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT
issue: https://github.com/elastic/elasticsearch/issues/120497

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_PROFILE_ROWS_PROCESSED = def(8_824_00_0);
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_825_00_0);
public static final TransportVersion REVERT_BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_826_00_0);
public static final TransportVersion ADD_INDEX_BLOCK_TWO_PHASE = def(8_827_00_0);
public static final TransportVersion ESQL_SKIP_ES_INDEX_SERIALIZATION = def(8_827_00_0);
public static final TransportVersion ADD_INDEX_BLOCK_TWO_PHASE = def(8_828_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.action.search;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -26,6 +25,7 @@
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
Expand All @@ -43,8 +43,7 @@
import org.elasticsearch.transport.Transport;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -98,7 +97,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
private final SearchShardIterator[] shardIterators;
private final Map<SearchShardIterator, Integer> shardIndexMap;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
private final int maxConcurrentRequestsPerNode;
Expand Down Expand Up @@ -142,17 +140,11 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
this.shardsIts = new GroupShardsIterator<>(iterators);

// we compute the shard index based on the natural order of the shards
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
// we later compute the shard index based on the natural order of the shards
// that participate in the search request. This means that this number is
// consistent between two requests that target the same shards.
Map<SearchShardIterator, Integer> shardMap = new HashMap<>();
List<SearchShardIterator> searchIterators = new ArrayList<>(iterators);
CollectionUtil.timSort(searchIterators);
for (int i = 0; i < searchIterators.size(); i++) {
shardMap.put(searchIterators.get(i), i);
}
this.shardIndexMap = Collections.unmodifiableMap(shardMap);
this.shardIterators = searchIterators.toArray(SearchShardIterator[]::new);
Arrays.sort(shardIterators);

// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
Expand Down Expand Up @@ -236,6 +228,10 @@ protected final void run() {
assert iterator.skip();
skipShard(iterator);
}
final Map<SearchShardIterator, Integer> shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length);
for (int i = 0; i < shardIterators.length; i++) {
shardIndexMap.put(shardIterators[i], i);
}
if (shardsIts.size() > 0) {
doCheckNoMissingShards(getName(), request, shardsIts);
for (int i = 0; i < shardsIts.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.elasticsearch.core.Strings.format;

Expand Down Expand Up @@ -349,4 +350,16 @@ public void afterFilesRestoredFromRepository(IndexShard indexShard) {
}
}
}

@Override
public void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {
for (IndexEventListener listener : listeners) {
try {
listener.onAcquirePrimaryOperationPermit(indexShard, onPermitAcquiredListenerSupplier);
} catch (Exception e) {
logger.warn(() -> "[" + indexShard.shardId() + "] failed to invoke the listener on acquiring a primary permit", e);
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;

import java.util.function.Supplier;

/**
* An index event listener is the primary extension point for plugins and build-in services
* to react / listen to per-index and per-shard events. These listeners are registered per-index
Expand Down Expand Up @@ -190,4 +192,14 @@ default void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void>
* @param indexShard the shard that is recovering
*/
default void afterFilesRestoredFromRepository(IndexShard indexShard) {}

/**
* Called when a single primary permit is acquired for the given shard (see
* {@link IndexShard#acquirePrimaryOperationPermit(ActionListener, java.util.concurrent.Executor)}).
*
* @param indexShard the shard of which a primary permit is requested
* @param onPermitAcquiredListenerSupplier call this immediately to get a listener when the permit is acquired. The listener must be
* completed in order for the permit to be given to the acquiring operation.
*/
default void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {}
}
Loading

0 comments on commit 80c66f8

Please sign in to comment.