From c4f95de80c5b56c157b68025affa6a2a5e9875f2 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Thu, 13 Jun 2024 20:20:26 +0530 Subject: [PATCH 1/2] enabling term version check on local state for all admin read actions Signed-off-by: Rajiv Kumar Vaidyanathan --- .../opensearch/action/IndicesRequestIT.java | 35 ++++++++++- .../AdmissionForClusterManagerIT.java | 26 +++++++- .../health/TransportClusterHealthAction.java | 5 ++ .../state/TransportClusterStateAction.java | 5 -- .../TransportPendingClusterTasksAction.java | 5 ++ .../TransportClusterManagerNodeAction.java | 60 ++++++++++++------- ...TransportClusterManagerNodeReadAction.java | 5 ++ 7 files changed, 110 insertions(+), 31 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java b/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java index 84d833569edcb..fb0bfb7ba27ce 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java @@ -84,6 +84,8 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.search.SearchType; +import org.opensearch.action.support.clustermanager.term.GetTermVersionAction; +import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse; import org.opensearch.action.support.replication.TransportReplicationActionTests; import org.opensearch.action.termvectors.MultiTermVectorsAction; import org.opensearch.action.termvectors.MultiTermVectorsRequest; @@ -93,6 +95,8 @@ import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; @@ -124,6 +128,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -195,6 +200,7 @@ public void cleanUp() { } public void testGetFieldMappings() { + String getFieldMappingsShardAction = GetFieldMappingsAction.NAME + "[index][s]"; interceptTransportActions(getFieldMappingsShardAction); @@ -546,6 +552,7 @@ public void testDeleteIndex() { public void testGetMappings() { interceptTransportActions(GetMappingsAction.NAME); + stubClusterTermResponse(internalCluster().getClusterManagerName()); GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases()); internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet(); @@ -565,8 +572,9 @@ public void testPutMapping() { } public void testGetSettings() { - interceptTransportActions(GetSettingsAction.NAME); + interceptTransportActions(GetSettingsAction.NAME); + stubClusterTermResponse(internalCluster().getClusterManagerName()); GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases()); internalCluster().coordOnlyNodeClient().admin().indices().getSettings(getSettingsRequest).actionGet(); @@ -781,6 +789,7 @@ public List getTransportInterceptors( } private final Set actions = new HashSet<>(); + private final Map stubHandlers = new ConcurrentHashMap<>(); private final Map> requests = new HashMap<>(); @@ -804,6 +813,11 @@ synchronized void interceptTransportActions(String... actions) { synchronized void clearInterceptedActions() { actions.clear(); + stubHandlers.clear(); + } + + synchronized void stub(String action, TransportRequestHandler handler) { + stubHandlers.put(action, handler); } private class InterceptingRequestHandler implements TransportRequestHandler { @@ -830,8 +844,25 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro } } } - requestHandler.messageReceived(request, channel, task); + if (!stubHandlers.containsKey(action)) { + requestHandler.messageReceived(request, channel, task); + } else { + stubHandlers.get(action).messageReceived(request, channel, task); + } + } } } + + private void stubClusterTermResponse(String master) { + PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, master); + pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get().instance.stub( + GetTermVersionAction.NAME, + (request, channel, task) -> channel.sendResponse( + new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1)) + ) + ); + + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java index b9da5ffb86af0..21d89771e5fa6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java @@ -12,7 +12,11 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse; +import org.opensearch.action.support.clustermanager.term.GetTermVersionAction; +import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; @@ -20,6 +24,7 @@ import org.opensearch.node.IoUsageStats; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.plugins.Plugin; import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; @@ -29,9 +34,13 @@ import org.opensearch.rest.action.admin.indices.RestGetAliasesAction; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import org.junit.Before; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -62,6 +71,10 @@ public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase { .put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50) .build(); + protected Collection> nodePlugins() { + return List.of(MockTransportService.TestPlugin.class); + } + @Before public void init() { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode( @@ -82,12 +95,14 @@ public void init() { } public void testAdmissionControlEnforced() throws Exception { + stubClusterTermResponse(internalCluster().getClusterManagerName()); cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); // Write API on ClusterManager assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}")); - + stubClusterTermResponse(internalCluster().getClusterManagerName()); // Read API on ClusterManager + GetAliasesRequest aliasesRequest = new GetAliasesRequest(); aliasesRequest.aliases("alias1"); try { @@ -156,6 +171,7 @@ public void admissionControlDisabledOnBreach(Settings admission) throws Interrup } public void testAdmissionControlResponseStatus() throws Exception { + stubClusterTermResponse(internalCluster().getClusterManagerName()); cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); // Write API on ClusterManager @@ -195,4 +211,12 @@ Map getAdmissionControlStats(AdmissionControlS } return acStats; } + + private void stubClusterTermResponse(String master) { + MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master); + primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> { + channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1))); + }); + } + } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java index 1cc357a4c20f4..f69f462372888 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -534,4 +534,9 @@ private ClusterHealthResponse clusterHealth( pendingTaskTimeInQueue ); } + + @Override + protected boolean localExecuteSupportedByAction() { + return false; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index cae465a90446e..cdb00b584c5dd 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -233,9 +233,4 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false); } - - @Override - protected boolean localExecuteSupportedByAction() { - return true; - } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java index 5d5053cc80738..01846ef46c1ed 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java @@ -110,4 +110,9 @@ protected void clusterManagerOperation( logger.trace("done fetching pending tasks from cluster service"); listener.onResponse(new PendingClusterTasksResponse(pendingTasks)); } + + @Override + protected boolean localExecuteSupportedByAction() { + return false; + } } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 080b0d607e991..4e869f29878cd 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -267,24 +267,7 @@ protected void doStart(ClusterState clusterState) { final DiscoveryNodes nodes = clusterState.nodes(); if (nodes.isLocalNodeElectedClusterManager() || localExecute(request)) { // check for block, if blocked, retry, else, execute locally - final ClusterBlockException blockException = checkBlock(request, clusterState); - if (blockException != null) { - if (!blockException.retryable()) { - listener.onFailure(blockException); - } else { - logger.debug("can't execute due to a cluster block, retrying", blockException); - retry(clusterState, blockException, newState -> { - try { - ClusterBlockException newException = checkBlock(request, newState); - return (newException == null || !newException.retryable()); - } catch (Exception e) { - // accept state as block will be rechecked by doStart() and listener.onFailure() then called - logger.trace("exception occurred during cluster block checking, accepting state", e); - return true; - } - }); - } - } else { + if (!checkForBlock(request, clusterState)) { threadPool.executor(executor) .execute( ActionRunnable.wrap( @@ -422,12 +405,43 @@ public GetTermVersionResponse read(StreamInput in) throws IOException { }; } + private boolean checkForBlock(Request request, ClusterState localClusterState) { + final ClusterBlockException blockException = checkBlock(request, localClusterState); + if (blockException != null) { + if (!blockException.retryable()) { + listener.onFailure(blockException); + } else { + logger.debug("can't execute due to a cluster block, retrying", blockException); + retry(localClusterState, blockException, newState -> { + try { + ClusterBlockException newException = checkBlock(request, newState); + return (newException == null || !newException.retryable()); + } catch (Exception e) { + // accept state as block will be rechecked by doStart() and listener.onFailure() then called + logger.trace("exception occurred during cluster block checking, accepting state", e); + return true; + } + }); + } + return true; + } else { + return false; + } + } + private void executeOnLocalNode(ClusterState localClusterState) { - Runnable runTask = ActionRunnable.wrap( - getDelegateForLocalExecute(localClusterState), - l -> clusterManagerOperation(task, request, localClusterState, l) - ); - threadPool.executor(executor).execute(runTask); + try { + // check for block, if blocked, retry, else, execute locally + if (!checkForBlock(request, localClusterState)) { + Runnable runTask = ActionRunnable.wrap( + getDelegateForLocalExecute(localClusterState), + l -> clusterManagerOperation(task, request, localClusterState, l) + ); + threadPool.executor(executor).execute(runTask); + } + } catch (Exception e) { + listener.onFailure(e); + } } private void executeOnClusterManager(DiscoveryNode clusterManagerNode, ClusterState clusterState) { diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java index d58487a475bcf..98b18c79a3c8d 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java @@ -124,4 +124,9 @@ protected TransportClusterManagerNodeReadAction( protected final boolean localExecute(Request request) { return request.local(); } + + protected boolean localExecuteSupportedByAction() { + return true; + } + } From 92463f432079a569929d547991ee106708b29f6b Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Tue, 9 Jul 2024 20:58:43 +0530 Subject: [PATCH 2/2] enable term check selectively for transport actions based on code audit Signed-off-by: Rajiv Kumar Vaidyanathan --- CHANGELOG.md | 1 + .../opensearch/action/IndicesRequestIT.java | 54 ++--- .../AdmissionForClusterManagerIT.java | 32 ++- .../TransportGetDecommissionStateAction.java | 3 +- .../get/TransportGetRepositoriesAction.java | 3 +- .../TransportClusterSearchShardsAction.java | 3 +- .../TransportGetWeightedRoutingAction.java | 3 +- .../state/TransportClusterStateAction.java | 1 + .../TransportGetStoredScriptAction.java | 3 +- .../alias/get/TransportGetAliasesAction.java | 3 +- .../indices/TransportIndicesExistsAction.java | 3 +- .../TransportIndicesShardStoresAction.java | 3 +- .../TransportGetComponentTemplateAction.java | 3 +- ...sportGetComposableIndexTemplateAction.java | 3 +- .../get/TransportGetIndexTemplatesAction.java | 3 +- .../ingest/GetPipelineTransportAction.java | 3 +- .../GetSearchPipelineTransportAction.java | 3 +- ...TransportClusterManagerNodeReadAction.java | 20 +- .../info/TransportClusterInfoAction.java | 1 + .../mapping/get/GetMappingsActionTests.java | 227 ++++++++++++++++++ 20 files changed, 315 insertions(+), 60 deletions(-) create mode 100644 server/src/test/java/org/opensearch/action/admin/indices/mapping/get/GetMappingsActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index ab0c80e37e14c..e77b183601674 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668)) - Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790))) - Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749)) +- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java b/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java index fb0bfb7ba27ce..927a79d4884ef 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java @@ -85,7 +85,7 @@ import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.search.SearchType; import org.opensearch.action.support.clustermanager.term.GetTermVersionAction; -import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse; +import org.opensearch.action.support.clustermanager.term.GetTermVersionRequest; import org.opensearch.action.support.replication.TransportReplicationActionTests; import org.opensearch.action.termvectors.MultiTermVectorsAction; import org.opensearch.action.termvectors.MultiTermVectorsRequest; @@ -95,8 +95,6 @@ import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; @@ -128,7 +126,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -551,14 +548,14 @@ public void testDeleteIndex() { } public void testGetMappings() { - interceptTransportActions(GetMappingsAction.NAME); - stubClusterTermResponse(internalCluster().getClusterManagerName()); - + interceptTransportActions(GetTermVersionAction.NAME, GetMappingsAction.NAME); GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases()); internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet(); clearInterceptedActions(); - assertSameIndices(getMappingsRequest, GetMappingsAction.NAME); + + assertActionInvocation(GetTermVersionAction.NAME, GetTermVersionRequest.class); + assertNoActionInvocation(GetMappingsAction.NAME); } public void testPutMapping() { @@ -574,7 +571,6 @@ public void testPutMapping() { public void testGetSettings() { interceptTransportActions(GetSettingsAction.NAME); - stubClusterTermResponse(internalCluster().getClusterManagerName()); GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases()); internalCluster().coordOnlyNodeClient().admin().indices().getSettings(getSettingsRequest).actionGet(); @@ -670,6 +666,21 @@ private static void assertSameIndices(IndicesRequest originalRequest, boolean op } } + private static void assertActionInvocation(String action, Class requestClass) { + List requests = consumeTransportRequests(action); + assertFalse(requests.isEmpty()); + for (TransportRequest internalRequest : requests) { + assertTrue(internalRequest.getClass() == requestClass); + } + } + + private static void assertNoActionInvocation(String... actions) { + for (String action : actions) { + List requests = consumeTransportRequests(action); + assertTrue(requests.isEmpty()); + } + } + private static void assertIndicesSubset(List indices, String... actions) { // indices returned by each bulk shard request need to be a subset of the original indices for (String action : actions) { @@ -789,8 +800,6 @@ public List getTransportInterceptors( } private final Set actions = new HashSet<>(); - private final Map stubHandlers = new ConcurrentHashMap<>(); - private final Map> requests = new HashMap<>(); @Override @@ -813,11 +822,6 @@ synchronized void interceptTransportActions(String... actions) { synchronized void clearInterceptedActions() { actions.clear(); - stubHandlers.clear(); - } - - synchronized void stub(String action, TransportRequestHandler handler) { - stubHandlers.put(action, handler); } private class InterceptingRequestHandler implements TransportRequestHandler { @@ -844,25 +848,9 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro } } } - if (!stubHandlers.containsKey(action)) { - requestHandler.messageReceived(request, channel, task); - } else { - stubHandlers.get(action).messageReceived(request, channel, task); - } + requestHandler.messageReceived(request, channel, task); } } } - - private void stubClusterTermResponse(String master) { - PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, master); - pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get().instance.stub( - GetTermVersionAction.NAME, - (request, channel, task) -> channel.sendResponse( - new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1)) - ) - ); - - } - } diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java index 21d89771e5fa6..e3a4216e772fb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java @@ -15,8 +15,8 @@ import org.opensearch.action.support.clustermanager.term.GetTermVersionAction; import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.coordination.ClusterStateTermVersion; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; @@ -92,15 +92,32 @@ public void init() { // Enable admission control client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet(); + MockTransportService primaryService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + clusterManagerNode + ); + + // Force always fetch from ClusterManager + ClusterService clusterService = internalCluster().clusterService(); + GetTermVersionResponse oosTerm = new GetTermVersionResponse( + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().metadata().clusterUUID(), + clusterService.state().term() - 1, + clusterService.state().version() - 1 + ) + ); + primaryService.addRequestHandlingBehavior( + GetTermVersionAction.NAME, + (handler, request, channel, task) -> channel.sendResponse(oosTerm) + ); } public void testAdmissionControlEnforced() throws Exception { - stubClusterTermResponse(internalCluster().getClusterManagerName()); cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); // Write API on ClusterManager assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}")); - stubClusterTermResponse(internalCluster().getClusterManagerName()); // Read API on ClusterManager GetAliasesRequest aliasesRequest = new GetAliasesRequest(); @@ -171,7 +188,6 @@ public void admissionControlDisabledOnBreach(Settings admission) throws Interrup } public void testAdmissionControlResponseStatus() throws Exception { - stubClusterTermResponse(internalCluster().getClusterManagerName()); cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); // Write API on ClusterManager @@ -211,12 +227,4 @@ Map getAdmissionControlStats(AdmissionControlS } return acStats; } - - private void stubClusterTermResponse(String master) { - MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master); - primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> { - channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1))); - }); - } - } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java index 22feb4d99297a..c8a3be78a790e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java @@ -48,7 +48,8 @@ public TransportGetDecommissionStateAction( threadPool, actionFilters, GetDecommissionStateRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java index c7d784dbc96e7..c99b52dfe34f4 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java @@ -79,7 +79,8 @@ public TransportGetRepositoriesAction( threadPool, actionFilters, GetRepositoriesRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java index a2a65b6400c97..83e104236f640 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java @@ -85,7 +85,8 @@ public TransportClusterSearchShardsAction( threadPool, actionFilters, ClusterSearchShardsRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); this.indicesService = indicesService; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java index 50368d85e0011..6c110c0ea2a73 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java @@ -55,7 +55,8 @@ public TransportGetWeightedRoutingAction( threadPool, actionFilters, ClusterGetWeightedRoutingRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); this.weightedRoutingService = weightedRoutingService; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index cdb00b584c5dd..13ea7eaa43bf8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -92,6 +92,7 @@ public TransportClusterStateAction( ClusterStateRequest::new, indexNameExpressionResolver ); + this.localExecuteSupported = true; } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportGetStoredScriptAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportGetStoredScriptAction.java index db1f1edde2812..c34ec49406802 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportGetStoredScriptAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportGetStoredScriptAction.java @@ -73,7 +73,8 @@ public TransportGetStoredScriptAction( threadPool, actionFilters, GetStoredScriptRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); this.scriptService = scriptService; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/alias/get/TransportGetAliasesAction.java b/server/src/main/java/org/opensearch/action/admin/indices/alias/get/TransportGetAliasesAction.java index 3aca9c1976f16..4f4e3bd481ee7 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/alias/get/TransportGetAliasesAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/alias/get/TransportGetAliasesAction.java @@ -86,7 +86,8 @@ public TransportGetAliasesAction( threadPool, actionFilters, GetAliasesRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); this.systemIndices = systemIndices; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java index 428a0eb35513d..a298eae1aa865 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java @@ -71,7 +71,8 @@ public TransportIndicesExistsAction( threadPool, actionFilters, IndicesExistsRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index 3fbf9ac1bb570..a8b97d0f344ae 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -105,7 +105,8 @@ public TransportIndicesShardStoresAction( threadPool, actionFilters, IndicesShardStoresRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); this.listShardStoresInfo = listShardStoresInfo; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java index e2594cd792cd3..c3217d109044d 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java @@ -76,7 +76,8 @@ public TransportGetComponentTemplateAction( threadPool, actionFilters, GetComponentTemplateAction.Request::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java index b1ef32db7274f..84fbb59481c10 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java @@ -76,7 +76,8 @@ public TransportGetComposableIndexTemplateAction( threadPool, actionFilters, GetComposableIndexTemplateAction.Request::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetIndexTemplatesAction.java b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetIndexTemplatesAction.java index 10b4975f7b9d0..522234dda509f 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetIndexTemplatesAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetIndexTemplatesAction.java @@ -76,7 +76,8 @@ public TransportGetIndexTemplatesAction( threadPool, actionFilters, GetIndexTemplatesRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/ingest/GetPipelineTransportAction.java b/server/src/main/java/org/opensearch/action/ingest/GetPipelineTransportAction.java index 80333c7346f92..7bc0380bccbc0 100644 --- a/server/src/main/java/org/opensearch/action/ingest/GetPipelineTransportAction.java +++ b/server/src/main/java/org/opensearch/action/ingest/GetPipelineTransportAction.java @@ -70,7 +70,8 @@ public GetPipelineTransportAction( threadPool, actionFilters, GetPipelineRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java index a7fcb8f1cfbae..215b7ae1a610c 100644 --- a/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java +++ b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java @@ -48,7 +48,8 @@ public GetSearchPipelineTransportAction( threadPool, actionFilters, GetSearchPipelineRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java index 98b18c79a3c8d..88cb2ed6a9bf0 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java @@ -51,6 +51,8 @@ public abstract class TransportClusterManagerNodeReadAction< Request extends ClusterManagerNodeReadRequest, Response extends ActionResponse> extends TransportClusterManagerNodeAction { + protected boolean localExecuteSupported = false; + protected TransportClusterManagerNodeReadAction( String actionName, TransportService transportService, @@ -58,7 +60,8 @@ protected TransportClusterManagerNodeReadAction( ThreadPool threadPool, ActionFilters actionFilters, Writeable.Reader request, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + boolean localExecuteSupported ) { this( actionName, @@ -71,6 +74,19 @@ protected TransportClusterManagerNodeReadAction( request, indexNameExpressionResolver ); + this.localExecuteSupported = localExecuteSupported; + } + + protected TransportClusterManagerNodeReadAction( + String actionName, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + Writeable.Reader request, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + this(actionName, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver, false); } protected TransportClusterManagerNodeReadAction( @@ -126,7 +142,7 @@ protected final boolean localExecute(Request request) { } protected boolean localExecuteSupportedByAction() { - return true; + return localExecuteSupported; } } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java index 65f00a4731ab5..8a0082ad05f66 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java @@ -62,6 +62,7 @@ public TransportClusterInfoAction( IndexNameExpressionResolver indexNameExpressionResolver ) { super(actionName, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver); + this.localExecuteSupported = true; } @Override diff --git a/server/src/test/java/org/opensearch/action/admin/indices/mapping/get/GetMappingsActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/mapping/get/GetMappingsActionTests.java new file mode 100644 index 0000000000000..87f218760038e --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/mapping/get/GetMappingsActionTests.java @@ -0,0 +1,227 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.admin.indices.mapping.get; + +import org.opensearch.Version; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlock; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.settings.SettingsModule; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.indices.IndicesService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptySet; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; +import static org.opensearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class GetMappingsActionTests extends OpenSearchTestCase { + private TransportService transportService; + private ClusterService clusterService; + private ThreadPool threadPool; + private SettingsFilter settingsFilter; + private final String indexName = "test_index"; + CapturingTransport capturingTransport = new CapturingTransport(); + private DiscoveryNode localNode; + private DiscoveryNode remoteNode; + private DiscoveryNode[] allNodes; + private TransportGetMappingsAction transportAction = null; + + @Before + public void setUp() throws Exception { + super.setUp(); + + settingsFilter = new SettingsModule(Settings.EMPTY, emptyList(), emptyList(), emptySet()).getSettingsFilter(); + threadPool = new TestThreadPool("GetIndexActionTests"); + clusterService = createClusterService(threadPool); + + transportService = capturingTransport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), + null, + emptySet(), + NoopTracer.INSTANCE + ); + transportService.start(); + transportService.acceptIncomingRequests(); + + localNode = new DiscoveryNode( + "local_node", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + remoteNode = new DiscoveryNode( + "remote_node", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); + allNodes = new DiscoveryNode[] { localNode, remoteNode }; + setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); + transportAction = new TransportGetMappingsAction( + GetMappingsActionTests.this.transportService, + GetMappingsActionTests.this.clusterService, + GetMappingsActionTests.this.threadPool, + new ActionFilters(emptySet()), + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), + mock(IndicesService.class) + ); + + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + transportService.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + + public void testGetTransportWithoutMatchingTerm() { + transportAction.execute(null, new GetMappingsRequest(), ActionListener.wrap(Assert::assertNotNull, exception -> { + throw new AssertionError(exception); + })); + assertThat(capturingTransport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturingTransport.capturedRequests()[0]; + // mismatch term and version + GetTermVersionResponse termResp = new GetTermVersionResponse( + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().metadata().clusterUUID(), + clusterService.state().term() - 1, + clusterService.state().version() - 1 + ) + ); + capturingTransport.handleResponse(capturedRequest.requestId, termResp); + + assertThat(capturingTransport.capturedRequests().length, equalTo(2)); + CapturingTransport.CapturedRequest capturedRequest1 = capturingTransport.capturedRequests()[1]; + + capturingTransport.handleResponse(capturedRequest1.requestId, new GetMappingsResponse(new HashMap<>())); + } + + public void testGetTransportWithMatchingTerm() { + transportAction.execute(null, new GetMappingsRequest(), ActionListener.wrap(Assert::assertNotNull, exception -> { + throw new AssertionError(exception); + })); + assertThat(capturingTransport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturingTransport.capturedRequests()[0]; + GetTermVersionResponse termResp = new GetTermVersionResponse( + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().metadata().clusterUUID(), + clusterService.state().term(), + clusterService.state().version() + ) + ); + capturingTransport.handleResponse(capturedRequest.requestId, termResp); + + // no more transport calls + assertThat(capturingTransport.capturedRequests().length, equalTo(1)); + } + + public void testGetTransportClusterBlockWithMatchingTerm() { + ClusterBlock readClusterBlock = new ClusterBlock( + 1, + "uuid", + "", + false, + true, + true, + RestStatus.OK, + EnumSet.of(ClusterBlockLevel.METADATA_READ) + ); + ClusterBlocks.Builder builder = ClusterBlocks.builder(); + builder.addGlobalBlock(readClusterBlock); + ClusterState metadataReadBlockedState = ClusterState.builder(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)) + .blocks(builder) + .build(); + setState(clusterService, metadataReadBlockedState); + + transportAction.execute( + null, + new GetMappingsRequest(), + ActionListener.wrap(response -> { throw new AssertionError(response); }, exception -> { + Assert.assertTrue(exception instanceof ClusterBlockException); + }) + ); + assertThat(capturingTransport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturingTransport.capturedRequests()[0]; + GetTermVersionResponse termResp = new GetTermVersionResponse( + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().metadata().clusterUUID(), + clusterService.state().term(), + clusterService.state().version() + ) + ); + capturingTransport.handleResponse(capturedRequest.requestId, termResp); + + // no more transport calls + assertThat(capturingTransport.capturedRequests().length, equalTo(1)); + } +}