Skip to content

Commit

Permalink
Mark read-only flush and verify (elastic#119743)
Browse files Browse the repository at this point in the history
When marking read-only now flush and mark index as verified guaranteeing
that we can upgrade safely to next version with N-1 indices (becoming N-2).
Use this in the deprecation check.
  • Loading branch information
henningandersen committed Jan 21, 2025
1 parent ba57820 commit 1f26512
Show file tree
Hide file tree
Showing 15 changed files with 296 additions and 21 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/119743.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119743
summary: POC mark read-only
area: Engine
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.upgrades;

import io.netty.handler.codec.http.HttpMethod;

import com.carrotsearch.randomizedtesting.annotations.Name;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.Map;

public class AddIndexBlockRollingUpgradeIT extends AbstractRollingUpgradeTestCase {

private static final String INDEX_NAME = "test_add_block";

public AddIndexBlockRollingUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
super(upgradedNodes);
}

public void testAddBlock() throws Exception {
if (isOldCluster()) {
createIndex(INDEX_NAME);
} else if (isMixedCluster()) {
blockWrites();
// this is used both for upgrading from 9.0.0 to current and from 8.18 to current.
if (minimumTransportVersion().before(TransportVersions.ADD_INDEX_BLOCK_TWO_PHASE)) {
assertNull(verifiedSettingValue());
} else {
assertThat(verifiedSettingValue(), Matchers.equalTo("true"));
}
} else {
assertTrue(isUpgradedCluster());
blockWrites();
assertThat(verifiedSettingValue(), Matchers.equalTo("true"));
}
}

private static void blockWrites() throws IOException {
client().performRequest(new Request(HttpMethod.PUT.name(), "/" + INDEX_NAME + "/_block/write"));

expectThrows(
ResponseException.class,
() -> client().performRequest(
newXContentRequest(HttpMethod.PUT, "/" + INDEX_NAME + "/_doc/test", (builder, params) -> builder.field("test", "test"))
)
);
}

@SuppressWarnings("unchecked")
private static String verifiedSettingValue() throws IOException {
final var settingsRequest = new Request(HttpMethod.GET.name(), "/" + INDEX_NAME + "/_settings?flat_settings");
final Map<String, Object> settingsResponse = entityAsMap(client().performRequest(settingsRequest));
return (String) ((Map<String, Object>) ((Map<String, Object>) settingsResponse.get(INDEX_NAME)).get("settings")).get(
MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.getKey()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,21 @@
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -266,6 +273,74 @@ public void testAddIndexBlock() throws Exception {
assertHitCount(prepareSearch(indexName).setSize(0), nbDocs);
}

public void testReAddUnverifiedIndexBlock() {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
ensureGreen(indexName);

final int nbDocs = randomIntBetween(0, 50);
indexRandom(
randomBoolean(),
false,
randomBoolean(),
IntStream.range(0, nbDocs).mapToObj(i -> prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i)).collect(toList())
);

final APIBlock block = APIBlock.WRITE;
try {
AddIndexBlockResponse response = indicesAdmin().prepareAddBlock(block, indexName).get();
assertTrue("Add block [" + block + "] to index [" + indexName + "] not acknowledged: " + response, response.isAcknowledged());
assertIndexHasBlock(block, indexName);

removeVerified(indexName);

AddIndexBlockResponse response2 = indicesAdmin().prepareAddBlock(block, indexName).get();
assertTrue("Add block [" + block + "] to index [" + indexName + "] not acknowledged: " + response, response2.isAcknowledged());
assertIndexHasBlock(block, indexName);
} finally {
disableIndexBlock(indexName, block);
}

}

private static void removeVerified(String indexName) {
PlainActionFuture<Void> listener = new PlainActionFuture<>();
internalCluster().clusterService(internalCluster().getMasterName())
.createTaskQueue("test", Priority.NORMAL, new SimpleBatchedExecutor<>() {
@Override
public Tuple<ClusterState, Object> executeTask(
ClusterStateTaskListener clusterStateTaskListener,
ClusterState clusterState
) {

IndexMetadata indexMetadata = clusterState.metadata().index(indexName);
Settings.Builder settingsBuilder = Settings.builder().put(indexMetadata.getSettings());
settingsBuilder.remove(MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.getKey());
return Tuple.tuple(
ClusterState.builder(clusterState)
.metadata(
Metadata.builder(clusterState.metadata())
.put(
IndexMetadata.builder(indexMetadata)
.settings(settingsBuilder)
.settingsVersion(indexMetadata.getSettingsVersion() + 1)
)
)
.build(),
null
);
}

@Override
public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Object ignored) {
listener.onResponse(null);
}
})
.submitTask("test", e -> fail(e), null);

listener.actionGet();
}

public void testSameBlockTwice() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
Expand Down Expand Up @@ -452,6 +527,9 @@ static void assertIndexHasBlock(APIBlock block, final String... indices) {
.count(),
equalTo(1L)
);
if (block.getBlock().contains(ClusterBlockLevel.WRITE)) {
assertThat(MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.get(indexSettings), is(true));
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_PROFILE_ROWS_PROCESSED = def(8_824_00_0);
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_825_00_0);
public static final TransportVersion REVERT_BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_826_00_0);
public static final TransportVersion ESQL_SKIP_ES_INDEX_SERIALIZATION = def(8_827_00_0);
public static final TransportVersion ADD_INDEX_BLOCK_TWO_PHASE = def(8_828_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public record AddIndexBlockClusterStateUpdateRequest(
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
APIBlock block,
boolean markVerified,
long taskId,
Index[] indices
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.admin.indices.readonly;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -32,12 +33,18 @@ public class AddIndexBlockRequest extends AcknowledgedRequest<AddIndexBlockReque
private final APIBlock block;
private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
private boolean markVerified = true;

public AddIndexBlockRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
block = APIBlock.readFrom(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_INDEX_BLOCK_TWO_PHASE)) {
markVerified = in.readBoolean();
} else {
markVerified = false;
}
}

/**
Expand Down Expand Up @@ -103,6 +110,15 @@ public AddIndexBlockRequest indicesOptions(IndicesOptions indicesOptions) {
return this;
}

public boolean markVerified() {
return markVerified;
}

public AddIndexBlockRequest markVerified(boolean markVerified) {
this.markVerified = markVerified;
return this;
}

/**
* Returns the block to be added
*/
Expand All @@ -116,6 +132,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
block.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_INDEX_BLOCK_TWO_PHASE)) {
out.writeBoolean(markVerified);
}
}

@Override
Expand All @@ -136,4 +155,5 @@ public int hashCode() {
result = 31 * result + Arrays.hashCode(indices);
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ protected void masterOperation(
request.masterNodeTimeout(),
request.ackTimeout(),
request.getBlock(),
request.markVerified(),
task.getId(),
concreteIndices
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
*/
package org.elasticsearch.action.admin.indices.readonly;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -121,7 +124,7 @@ protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard rep
});
}

private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) throws IOException {
final ShardId shardId = indexShard.shardId();
if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) {
throw new IllegalStateException(
Expand All @@ -133,6 +136,15 @@ private void executeShardOperation(final ShardRequest request, final IndexShard
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
throw new IllegalStateException("index shard " + shardId + " has not applied block " + request.clusterBlock());
}

// same pattern as in TransportVerifyShardBeforeCloseAction, but could also flush in phase1.
if (request.phase1()) {
indexShard.sync();
} else {
if (request.clusterBlock().contains(ClusterBlockLevel.WRITE)) {
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
}
}
}

@Override
Expand Down Expand Up @@ -160,31 +172,45 @@ public void markShardCopyAsStaleIfNeeded(
public static final class ShardRequest extends ReplicationRequest<ShardRequest> {

private final ClusterBlock clusterBlock;
private final boolean phase1;

ShardRequest(StreamInput in) throws IOException {
super(in);
clusterBlock = new ClusterBlock(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_INDEX_BLOCK_TWO_PHASE)) {
phase1 = in.readBoolean();
} else {
phase1 = true; // does not matter, not verified anyway
}
}

public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, boolean phase1, final TaskId parentTaskId) {
super(shardId);
this.clusterBlock = Objects.requireNonNull(clusterBlock);
this.phase1 = phase1;
setParentTask(parentTaskId);
}

@Override
public String toString() {
return "verify shard " + shardId + " before block with " + clusterBlock;
return "verify shard " + shardId + " before block with " + clusterBlock + " phase1=" + phase1;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
clusterBlock.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_INDEX_BLOCK_TWO_PHASE)) {
out.writeBoolean(phase1);
}
}

public ClusterBlock clusterBlock() {
return clusterBlock;
}

public boolean phase1() {
return phase1;
}
}
}
Loading

0 comments on commit 1f26512

Please sign in to comment.