From d67e3897165e01b226421a2587cee7064dd896f2 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Mon, 18 Apr 2022 10:52:48 +0530 Subject: [PATCH] Delete PIT API changes Signed-off-by: Bharathwaj G --- .../org/opensearch/action/ActionModule.java | 9 +- .../action/search/DeletePITController.java | 126 ------------------ .../action/search/DeletePITRequest.java | 3 + .../action/search/DeletePITResponse.java | 10 +- .../search/TransportDeletePITAction.java | 21 ++- .../java/org/opensearch/client/Client.java | 16 ++- .../client/support/AbstractClient.java | 22 ++- .../action/search/RestDeletePITAction.java | 4 +- .../org/opensearch/search/SearchService.java | 2 +- .../search/DeletePitMultiNodeTests.java | 56 +++++--- .../opensearch/search/SearchServiceTests.java | 27 ++++ 11 files changed, 141 insertions(+), 155 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/action/search/DeletePITController.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 67f5a3afb1a1b..350d91a560182 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -402,7 +402,14 @@ import org.opensearch.rest.action.ingest.RestGetPipelineAction; import org.opensearch.rest.action.ingest.RestPutPipelineAction; import org.opensearch.rest.action.ingest.RestSimulatePipelineAction; -import org.opensearch.rest.action.search.*; +import org.opensearch.rest.action.search.RestClearScrollAction; +import org.opensearch.rest.action.search.RestCountAction; +import org.opensearch.rest.action.search.RestCreatePITAction; +import org.opensearch.rest.action.search.RestDeletePITAction; +import org.opensearch.rest.action.search.RestExplainAction; +import org.opensearch.rest.action.search.RestMultiSearchAction; +import org.opensearch.rest.action.search.RestSearchAction; +import org.opensearch.rest.action.search.RestSearchScrollAction; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; diff --git a/server/src/main/java/org/opensearch/action/search/DeletePITController.java b/server/src/main/java/org/opensearch/action/search/DeletePITController.java deleted file mode 100644 index 88d4a71f77259..0000000000000 --- a/server/src/main/java/org/opensearch/action/search/DeletePITController.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.search; - -import org.opensearch.action.ActionListener; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.util.concurrent.CountDown; -import org.opensearch.search.SearchPhaseResult; -import org.opensearch.search.internal.ShardSearchContextId; -import org.opensearch.transport.Transport; -import org.opensearch.transport.TransportResponse; - -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -public class DeletePITController implements Runnable { - private final DiscoveryNodes nodes; - private final SearchTransportService searchTransportService; - private final CountDown expectedOps; - private final ActionListener listener; - private final AtomicBoolean hasFailed = new AtomicBoolean(false); - private final AtomicInteger freedSearchContexts = new AtomicInteger(0); - private final ClusterService clusterService; - private final Runnable runner; - - public DeletePITController( - DeletePITRequest request, - ActionListener listener, - ClusterService clusterService, - SearchTransportService searchTransportService - ) { - this.nodes = clusterService.state().getNodes(); - this.clusterService = clusterService; - this.searchTransportService = searchTransportService; - this.listener = listener; - List pitIds = request.getPitIds(); - final int expectedOps; - if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { - expectedOps = nodes.getSize(); - runner = this::deleteAllPits; - } else { - // TODO: replace this with #closeContexts - List contexts = new ArrayList<>(); - for (String scrollId : request.getPitIds()) { - SearchContextIdForNode[] context = TransportSearchHelper.parseScrollId(scrollId).getContext(); - Collections.addAll(contexts, context); - } - if (contexts.isEmpty()) { - expectedOps = 0; - runner = () -> listener.onResponse(new DeletePITResponse(true)); - } else { - expectedOps = contexts.size(); - runner = () -> ClearScrollController.closeContexts( - clusterService.state().nodes(), - searchTransportService, - contexts, - ActionListener.wrap(r -> listener.onResponse(new DeletePITResponse(true)), listener::onFailure) - ); - } - } - this.expectedOps = new CountDown(expectedOps); - - } - - @Override - public void run() { - runner.run(); - } - - void deleteAllPits() { - for (final DiscoveryNode node : clusterService.state().getNodes()) { - try { - Transport.Connection connection = searchTransportService.getConnection(null, node); - searchTransportService.sendDeleteAllPitContexts(connection, new ActionListener() { - @Override - public void onResponse(TransportResponse response) { - onFreedContext(true); - } - - @Override - public void onFailure(Exception e) { - onFailedFreedContext(e, node); - } - }); - } catch (Exception e) { - onFailedFreedContext(e, node); - } - } - } - - public static class PITSinglePhaseSearchResult extends SearchPhaseResult { - public void setContextId(ShardSearchContextId contextId) { - this.contextId = contextId; - } - } - - private void onFreedContext(boolean freed) { - if (freed) { - freedSearchContexts.incrementAndGet(); - } - if (expectedOps.countDown()) { - boolean succeeded = hasFailed.get() == false; - listener.onResponse(new DeletePITResponse(succeeded)); - } - } - - private void onFailedFreedContext(Throwable e, DiscoveryNode node) { - /* - * We have to set the failure marker before we count down otherwise we can expose the failure marker before we have set it to a - * racing thread successfully freeing a context. This would lead to that thread responding that the clear scroll succeeded. - */ - hasFailed.set(true); - if (expectedOps.countDown()) { - listener.onResponse(new DeletePITResponse(false)); - } - } -} diff --git a/server/src/main/java/org/opensearch/action/search/DeletePITRequest.java b/server/src/main/java/org/opensearch/action/search/DeletePITRequest.java index 2bd8e9681318b..04b3aeb0f6a07 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePITRequest.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePITRequest.java @@ -29,6 +29,9 @@ */ public class DeletePITRequest extends ActionRequest implements ToXContentObject { + /** + * List of PIT IDs to be deleted , and use "_all" to delete all PIT reader contexts + */ private List pitIds; public DeletePITRequest(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/opensearch/action/search/DeletePITResponse.java b/server/src/main/java/org/opensearch/action/search/DeletePITResponse.java index 388ca99b42402..220f5377bc1ce 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePITResponse.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePITResponse.java @@ -12,7 +12,12 @@ import org.opensearch.common.ParseField; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.xcontent.*; +import org.opensearch.common.xcontent.ConstructingObjectParser; +import org.opensearch.common.xcontent.ObjectParser; +import org.opensearch.common.xcontent.StatusToXContentObject; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; import org.opensearch.rest.RestStatus; import java.io.IOException; @@ -23,6 +28,9 @@ public class DeletePITResponse extends ActionResponse implements StatusToXContentObject { + /** + * This will be true if all PIT reader contexts are deleted. + */ private final boolean succeeded; public DeletePITResponse(boolean succeeded) { diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePITAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePITAction.java index c28ff8797a2d0..918b5a791c9f0 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePITAction.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -33,6 +34,9 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; +/** + * Transport action for deleting pit reader context - supports deleting list and all pit contexts + */ public class TransportDeletePITAction extends HandledTransportAction { private SearchService searchService; private final NamedWriteableRegistry namedWriteableRegistry; @@ -74,15 +78,21 @@ protected void doExecute(Task task, DeletePITRequest request, ActionListener new ParameterizedMessage("Delete PITs failed. " + "Cleared {} contexts out of {}", r, contexts.size()) + ); listener.onResponse(new DeletePITResponse(false)); } }, e -> { - logger.debug("Delete PIT failed ", e); + logger.debug("Delete PITs failed ", e); listener.onResponse(new DeletePITResponse(false)); })); } } + /** + * Delete all active PIT reader contexts + */ void deleteAllPits(ActionListener listener) { int size = clusterService.state().getNodes().getSize(); ActionListener groupedActionListener = getGroupedListener(listener, size); @@ -96,11 +106,17 @@ void deleteAllPits(ActionListener listener) { } } + /** + * Delete list of pits, return success if all reader contexts are deleted ( or not found ). + */ void deletePits(List contexts, ActionListener listener) { final StepListener> lookupListener = getLookupListener(contexts); lookupListener.whenComplete(nodeLookup -> { final GroupedActionListener groupedListener = new GroupedActionListener<>( - ActionListener.delegateFailure(listener, (l, rs) -> l.onResponse(Math.toIntExact(rs.stream().filter(r -> r).count()))), + ActionListener.delegateFailure( + listener, + (l, result) -> l.onResponse(Math.toIntExact(result.stream().filter(r -> r).count())) + ), contexts.size() ); @@ -117,6 +133,7 @@ void deletePits(List contexts, ActionListener l ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false)) ); } catch (Exception e) { + logger.debug("Delete PIT failed ", e); groupedListener.onResponse(false); } } diff --git a/server/src/main/java/org/opensearch/client/Client.java b/server/src/main/java/org/opensearch/client/Client.java index 5d9c958b3b4b2..472503379fec4 100644 --- a/server/src/main/java/org/opensearch/client/Client.java +++ b/server/src/main/java/org/opensearch/client/Client.java @@ -55,7 +55,21 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.search.*; +import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.ClearScrollRequestBuilder; +import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.CreatePITRequest; +import org.opensearch.action.search.CreatePITResponse; +import org.opensearch.action.search.DeletePITRequest; +import org.opensearch.action.search.DeletePITResponse; +import org.opensearch.action.search.MultiSearchRequest; +import org.opensearch.action.search.MultiSearchRequestBuilder; +import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.action.search.SearchScrollRequestBuilder; import org.opensearch.action.termvectors.MultiTermVectorsRequest; import org.opensearch.action.termvectors.MultiTermVectorsRequestBuilder; import org.opensearch.action.termvectors.MultiTermVectorsResponse; diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 837a608caaac4..21a5f9ce89c56 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -327,7 +327,27 @@ import org.opensearch.action.ingest.SimulatePipelineRequest; import org.opensearch.action.ingest.SimulatePipelineRequestBuilder; import org.opensearch.action.ingest.SimulatePipelineResponse; -import org.opensearch.action.search.*; +import org.opensearch.action.search.ClearScrollAction; +import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.ClearScrollRequestBuilder; +import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.CreatePITAction; +import org.opensearch.action.search.CreatePITRequest; +import org.opensearch.action.search.CreatePITResponse; +import org.opensearch.action.search.DeletePITAction; +import org.opensearch.action.search.DeletePITRequest; +import org.opensearch.action.search.DeletePITResponse; +import org.opensearch.action.search.MultiSearchAction; +import org.opensearch.action.search.MultiSearchRequest; +import org.opensearch.action.search.MultiSearchRequestBuilder; +import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.SearchAction; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollAction; +import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.action.search.SearchScrollRequestBuilder; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.action.termvectors.MultiTermVectorsAction; diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestDeletePITAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestDeletePITAction.java index 821282d433c0d..26739d3749f92 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestDeletePITAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestDeletePITAction.java @@ -21,7 +21,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; -import static org.opensearch.rest.RestRequest.Method.*; +import static org.opensearch.rest.RestRequest.Method.DELETE; public class RestDeletePITAction extends BaseRestHandler { @@ -50,6 +50,6 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client @Override public List routes() { - return unmodifiableList(asList(new Route(DELETE, "/_pit"), new Route(DELETE, "/_pit/{id}"))); + return unmodifiableList(asList(new Route(DELETE, "/_search/_point_in_time"), new Route(DELETE, "/_search/_point_in_time/{id}"))); } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 0ae58c2314cf5..180627c9d0ee1 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -1025,7 +1025,7 @@ public boolean freeReaderContext(ShardSearchContextId contextId) { } /** - * Free reader context if found otherwise return false + * Free reader context if found , return false if delete reader fails */ public boolean freeReaderContextIfFound(ShardSearchContextId contextId) { try { diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java index e617ba522bb06..a117e891c6e6d 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -11,7 +11,12 @@ import org.junit.After; import org.junit.Before; import org.opensearch.action.ActionFuture; -import org.opensearch.action.search.*; +import org.opensearch.action.search.CreatePITAction; +import org.opensearch.action.search.CreatePITRequest; +import org.opensearch.action.search.CreatePITResponse; +import org.opensearch.action.search.DeletePITAction; +import org.opensearch.action.search.DeletePITRequest; +import org.opensearch.action.search.DeletePITResponse; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.search.builder.PointInTimeBuilder; @@ -62,6 +67,13 @@ public void testDeletePit() throws Exception { ActionFuture deleteExecute = client().execute(DeletePITAction.INSTANCE, deletePITRequest); DeletePITResponse deletePITResponse = deleteExecute.get(); assertTrue(deletePITResponse.isSucceeded()); + /** + * Checking deleting the same PIT id again results in succeeded + */ + deleteExecute = client().execute(DeletePITAction.INSTANCE, deletePITRequest); + deletePITResponse = deleteExecute.get(); + assertTrue(deletePITResponse.isSucceeded()); + } public void testDeletePitWhileNodeDrop() throws Exception { @@ -79,15 +91,19 @@ public void testDeletePitWhileNodeDrop() throws Exception { public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(DeletePITAction.INSTANCE, deletePITRequest); DeletePITResponse deletePITResponse = execute.get(); - assertEquals(false, deletePITResponse.isSucceeded()); + assertFalse(deletePITResponse.isSucceeded()); return super.onNodeStopped(nodeName); } }); ensureGreen(); + /** + * When we invoke delete again, returns success after clearing the remaining readers. Asserting reader context + * not found exceptions don't result in failures ( as deletion in one node is successful ) + */ ActionFuture execute = client().execute(DeletePITAction.INSTANCE, deletePITRequest); DeletePITResponse deletePITResponse = execute.get(); - assertEquals(true, deletePITResponse.isSucceeded()); + assertTrue(deletePITResponse.isSucceeded()); client().admin().indices().prepareDelete("index1").get(); } @@ -102,15 +118,19 @@ public void testDeleteAllPitsWhileNodeDrop() throws Exception { public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(DeletePITAction.INSTANCE, deletePITRequest); DeletePITResponse deletePITResponse = execute.get(); - assertEquals(false, deletePITResponse.isSucceeded()); + assertFalse(deletePITResponse.isSucceeded()); return super.onNodeStopped(nodeName); } }); ensureGreen(); + /** + * When we invoke delete again, returns success after clearing the remaining readers. Asserting reader context + * not found exceptions don't result in failures ( as deletion in one node is successful ) + */ ActionFuture execute = client().execute(DeletePITAction.INSTANCE, deletePITRequest); DeletePITResponse deletePITResponse = execute.get(); - assertEquals(true, deletePITResponse.isSucceeded()); + assertTrue(deletePITResponse.isSucceeded()); client().admin().indices().prepareDelete("index1").get(); } @@ -120,13 +140,6 @@ public void testDeleteWhileSearch() throws Exception { List pitIds = new ArrayList<>(); pitIds.add(pitResponse.getId()); DeletePITRequest deletePITRequest = new DeletePITRequest(pitIds); - for(int i=0; i<4; i++) { - client().prepareSearch() - .setSize(2) - .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) - .execute(); - } - int count = 0; Thread[] threads = new Thread[5]; CountDownLatch latch = new CountDownLatch(threads.length); final AtomicBoolean deleted = new AtomicBoolean(false); @@ -136,16 +149,19 @@ public void testDeleteWhileSearch() throws Exception { latch.countDown(); try { latch.await(); - for (int j=0; j<30; j++) { + for (int j = 0; j < 30; j++) { client().prepareSearch() - .setSize(2) - .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) - .execute().get(); + .setSize(2) + .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) + .execute() + .get(); } } catch (Exception e) { - if(deleted.get() == true) { - if (!e.getMessage().contains("all shards failed")) - throw new AssertionError(e); + /** + * assert for exception once delete pit goes through. throw error in case of any exeption before that. + */ + if (deleted.get() == true) { + if (!e.getMessage().contains("all shards failed")) throw new AssertionError(e); return; } throw new AssertionError(e); @@ -157,7 +173,7 @@ public void testDeleteWhileSearch() throws Exception { ActionFuture execute = client().execute(DeletePITAction.INSTANCE, deletePITRequest); DeletePITResponse deletePITResponse = execute.get(); deleted.set(true); - assertEquals(true, deletePITResponse.isSucceeded()); + assertTrue(deletePITResponse.isSucceeded()); for (Thread thread : threads) { thread.join(); diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 18a0d55c487c5..7c6ec33e7c1f0 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1414,6 +1414,33 @@ public void testOpenReaderContext() { assertTrue(searchService.freeReaderContext(future.actionGet())); } + public void testDeletePitReaderContext() { + createIndex("index"); + SearchService searchService = getInstanceFromNode(SearchService.class); + PlainActionFuture future = new PlainActionFuture<>(); + searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); + future.actionGet(); + assertThat(searchService.getActiveContexts(), equalTo(1)); + assertTrue(searchService.freeReaderContextIfFound(future.actionGet())); + // assert true for reader context not found + assertTrue(searchService.freeReaderContextIfFound(future.actionGet())); + // adding this assert to showcase behavior difference + assertFalse(searchService.freeReaderContext(future.actionGet())); + } + + public void testDeleteAllPitReaderContexts() { + createIndex("index"); + SearchService searchService = getInstanceFromNode(SearchService.class); + PlainActionFuture future = new PlainActionFuture<>(); + searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); + future.actionGet(); + searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); + future.actionGet(); + assertThat(searchService.getActiveContexts(), equalTo(2)); + searchService.freeAllPitContexts(); + assertThat(searchService.getActiveContexts(), equalTo(0)); + } + public void testPitContextMaxKeepAlive() { createIndex("index"); SearchService searchService = getInstanceFromNode(SearchService.class);