From f8f6367f39f9f041dddef3328fc235961020a839 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 13 Apr 2022 13:54:20 +0530 Subject: [PATCH 1/2] Segregating create pit logic into separate controller Signed-off-by: Bharathwaj G --- .../action/search/CreatePITController.java | 221 ++++++++++ .../action/search/CreatePITResponse.java | 4 + .../search/TransportCreatePITAction.java | 168 +------ .../search/UpdatePITContextRequest.java | 17 +- .../search/UpdatePitContextResponse.java | 14 +- .../index/shard/SearchOperationListener.java | 2 +- .../org/opensearch/search/SearchService.java | 4 +- .../search/internal/PitReaderContext.java | 13 +- .../search/CreatePitControllerTests.java | 414 ++++++++++++++++++ .../opensearch/search/PitMultiNodeTests.java | 28 -- .../opensearch/search/PitSingleNodeTests.java | 10 +- .../opensearch/search/SearchServiceTests.java | 2 +- 12 files changed, 682 insertions(+), 215 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/search/CreatePITController.java create mode 100644 server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITController.java b/server/src/main/java/org/opensearch/action/search/CreatePITController.java new file mode 100644 index 0000000000000..4e32b1ad1752e --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/CreatePITController.java @@ -0,0 +1,221 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.shard.ShardId; +import org.opensearch.search.SearchPhaseResult; +import org.opensearch.search.SearchService; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.tasks.Task; +import org.opensearch.transport.Transport; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** + * Controller for creating PIT reader context + * Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a temporary keep alive + * Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and + * fail user request if any of the updates in this phase are failed + */ +public class CreatePITController implements Runnable { + private final Runnable runner; + private final SearchTransportService searchTransportService; + private final ClusterService clusterService; + private final TransportSearchAction transportSearchAction; + private final NamedWriteableRegistry namedWriteableRegistry; + private final Task task; + private final ActionListener listener; + private final CreatePITRequest request; + + public CreatePITController( + CreatePITRequest request, + SearchTransportService searchTransportService, + ClusterService clusterService, + TransportSearchAction transportSearchAction, + NamedWriteableRegistry namedWriteableRegistry, + Task task, + ActionListener listener + ) { + this.searchTransportService = searchTransportService; + this.clusterService = clusterService; + this.transportSearchAction = transportSearchAction; + this.namedWriteableRegistry = namedWriteableRegistry; + this.task = task; + this.listener = listener; + this.request = request; + runner = this::executeCreatePit; + } + + private TimeValue getCreatePitTemporaryKeepAlive() { + return SearchService.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.get(clusterService.getSettings()); + } + + public void executeCreatePit() { + SearchRequest searchRequest = new SearchRequest(request.getIndices()); + searchRequest.preference(request.getPreference()); + searchRequest.routing(request.getRouting()); + searchRequest.indicesOptions(request.getIndicesOptions()); + searchRequest.allowPartialSearchResults(request.shouldAllowPartialPitCreation()); + + SearchTask searchTask = new SearchTask( + task.getId(), + task.getType(), + task.getAction(), + () -> task.getDescription(), + task.getParentTaskId(), + task.getHeaders() + ); + + final StepListener createPitListener = new StepListener<>(); + + final ActionListener updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), listener::onFailure); + /** + * Phase 1 of create PIT + */ + executeCreatePit(searchTask, searchRequest, createPitListener); + + /** + * Phase 2 of create PIT where we update pit id in pit contexts + */ + executeUpdatePitId(request, createPitListener, updatePitIdListener); + } + + /** + * Creates PIT reader context with temporary keep alive + */ + public void executeCreatePit(Task task, SearchRequest searchRequest, StepListener createPitListener) { + transportSearchAction.executeRequest( + task, + searchRequest, + TransportCreatePITAction.CREATE_PIT_ACTION, + true, + new TransportSearchAction.SinglePhaseSearchAction() { + @Override + public void executeOnShardTarget( + SearchTask searchTask, + SearchShardTarget target, + Transport.Connection connection, + ActionListener searchPhaseResultActionListener + ) { + searchTransportService.createPitContext( + connection, + new TransportCreatePITAction.CreateReaderContextRequest(target.getShardId(), getCreatePitTemporaryKeepAlive()), + searchTask, + ActionListener.wrap(r -> searchPhaseResultActionListener.onResponse(r), searchPhaseResultActionListener::onFailure) + ); + } + }, + createPitListener + ); + } + + /** + * Updates PIT ID, keep alive and createdTime of PIT reader context + */ + public void executeUpdatePitId( + CreatePITRequest request, + StepListener createPitListener, + ActionListener updatePitIdListener + ) { + createPitListener.whenComplete(createPITResponse -> { + SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId()); + final StepListener> lookupListener = getConnectionLookupListener(contextId); + lookupListener.whenComplete(nodelookup -> { + final ActionListener groupedActionListener = getGroupedListener( + updatePitIdListener, + createPITResponse, + contextId.shards().size() + ); + /** + * store the create time ( same create time for all PIT contexts across shards ) to be used + * for list PIT api + */ + long createTime = System.currentTimeMillis(); + for (Map.Entry entry : contextId.shards().entrySet()) { + DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode()); + try { + final Transport.Connection connection = searchTransportService.getConnection( + entry.getValue().getClusterAlias(), + node + ); + searchTransportService.updatePitContext( + connection, + new UpdatePITContextRequest( + entry.getValue().getSearchContextId(), + createPITResponse.getId(), + request.getKeepAlive().millis(), + createTime + ), + groupedActionListener + ); + } catch (Exception e) { + groupedActionListener.onFailure(new OpenSearchException("Create pit failed on node[" + node + "]", e)); + } + } + }, updatePitIdListener::onFailure); + }, updatePitIdListener::onFailure); + } + + private StepListener> getConnectionLookupListener(SearchContextId contextId) { + ClusterState state = clusterService.state(); + + final Set clusters = contextId.shards() + .values() + .stream() + .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) + .map(SearchContextIdForNode::getClusterAlias) + .collect(Collectors.toSet()); + + final StepListener> lookupListener = new StepListener<>(); + + if (clusters.isEmpty() == false) { + searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); + } else { + lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId)); + } + return lookupListener; + } + + private ActionListener getGroupedListener( + ActionListener updatePitIdListener, + CreatePITResponse createPITResponse, + int size + ) { + return new GroupedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(final Collection responses) { + updatePitIdListener.onResponse(createPITResponse); + } + + @Override + public void onFailure(final Exception e) { + updatePitIdListener.onFailure(e); + } + }, size); + } + + @Override + public void run() { + runner.run(); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java b/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java index 64419e785a838..31a20f0f907cb 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java @@ -18,7 +18,11 @@ import java.io.IOException; +/** + * Create point in time response with point in time id and success / failures + */ public class CreatePITResponse extends ActionResponse implements StatusToXContentObject { + // point in time id private final String id; private final int totalShards; private final int successfulShards; diff --git a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java index 9d81951de411c..225973211c406 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java @@ -8,16 +8,10 @@ package org.opensearch.action.search; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; -import org.opensearch.action.StepListener; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.Strings; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.StreamInput; @@ -25,26 +19,15 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.shard.ShardId; import org.opensearch.search.SearchPhaseResult; -import org.opensearch.search.SearchService; -import org.opensearch.search.SearchShardTarget; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.tasks.Task; -import org.opensearch.transport.Transport; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.stream.Collectors; /** * Transport action for creating PIT reader context - * Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a temporary keep alive - * Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and - * fail user request if any of the updates in this phase are failed */ public class TransportCreatePITAction extends HandledTransportAction { @@ -72,153 +55,18 @@ public TransportCreatePITAction( this.namedWriteableRegistry = namedWriteableRegistry; } - public TimeValue getCreatePitTemporaryKeepAlive() { - return SearchService.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.get(clusterService.getSettings()); - } - @Override protected void doExecute(Task task, CreatePITRequest request, ActionListener listener) { - SearchRequest searchRequest = new SearchRequest(request.getIndices()); - searchRequest.preference(request.getPreference()); - searchRequest.routing(request.getRouting()); - searchRequest.indicesOptions(request.getIndicesOptions()); - searchRequest.allowPartialSearchResults(request.shouldAllowPartialPitCreation()); - - SearchTask searchTask = new SearchTask( - task.getId(), - task.getType(), - task.getAction(), - () -> task.getDescription(), - task.getParentTaskId(), - task.getHeaders() - ); - - final StepListener createPitListener = new StepListener<>(); - - final ActionListener updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), listener::onFailure); - /** - * Phase 1 of create PIT - */ - executeCreatePit(searchTask, searchRequest, createPitListener); - - /** - * Phase 2 of create PIT where we update pit id in pit contexts - */ - executeUpdatePitId(request, createPitListener, updatePitIdListener); - } - - /** - * Creates PIT reader context with temporary keep alive - */ - public void executeCreatePit(Task task, SearchRequest searchRequest, StepListener createPitListener) { - transportSearchAction.executeRequest( + Runnable runnable = new CreatePITController( + request, + searchTransportService, + clusterService, + transportSearchAction, + namedWriteableRegistry, task, - searchRequest, - CREATE_PIT_ACTION, - true, - new TransportSearchAction.SinglePhaseSearchAction() { - @Override - public void executeOnShardTarget( - SearchTask searchTask, - SearchShardTarget target, - Transport.Connection connection, - ActionListener searchPhaseResultActionListener - ) { - searchTransportService.createPitContext( - connection, - new CreateReaderContextRequest(target.getShardId(), getCreatePitTemporaryKeepAlive()), - searchTask, - ActionListener.wrap(r -> searchPhaseResultActionListener.onResponse(r), searchPhaseResultActionListener::onFailure) - ); - } - }, - createPitListener + listener ); - } - - /** - * Updates PIT ID, keep alive and createdTime of PIT reader context - */ - public void executeUpdatePitId( - CreatePITRequest request, - StepListener createPitListener, - ActionListener updatePitIdListener - ) { - createPitListener.whenComplete(createPITResponse -> { - SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId()); - final StepListener> lookupListener = getConnectionLookupListener(contextId); - lookupListener.whenComplete(nodelookup -> { - final ActionListener groupedActionListener = getGroupedListener( - updatePitIdListener, - createPITResponse, - contextId.shards().size() - ); - /** - * store the create time ( same create time for all PIT contexts across shards ) to be used - * for list PIT api - */ - long createTime = System.currentTimeMillis(); - for (Map.Entry entry : contextId.shards().entrySet()) { - DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode()); - try { - final Transport.Connection connection = searchTransportService.getConnection( - entry.getValue().getClusterAlias(), - node - ); - searchTransportService.updatePitContext( - connection, - new UpdatePITContextRequest( - entry.getValue().getSearchContextId(), - createPITResponse.getId(), - request.getKeepAlive().millis(), - createTime - ), - groupedActionListener - ); - } catch (Exception e) { - groupedActionListener.onFailure(new OpenSearchException("Create pit failed on node[" + node + "]", e)); - } - } - }, updatePitIdListener::onFailure); - }, updatePitIdListener::onFailure); - } - - private StepListener> getConnectionLookupListener(SearchContextId contextId) { - ClusterState state = clusterService.state(); - - final Set clusters = contextId.shards() - .values() - .stream() - .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) - .map(SearchContextIdForNode::getClusterAlias) - .collect(Collectors.toSet()); - - final StepListener> lookupListener = new StepListener<>(); - - if (clusters.isEmpty() == false) { - searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); - } else { - lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId)); - } - return lookupListener; - } - - private ActionListener getGroupedListener( - ActionListener updatePitIdListener, - CreatePITResponse createPITResponse, - int size - ) { - return new GroupedActionListener<>(new ActionListener<>() { - @Override - public void onResponse(final Collection responses) { - updatePitIdListener.onResponse(createPITResponse); - } - - @Override - public void onFailure(final Exception e) { - updatePitIdListener.onFailure(e); - } - }, size); + runnable.run(); } public static class CreateReaderContextRequest extends TransportRequest { diff --git a/server/src/main/java/org/opensearch/action/search/UpdatePITContextRequest.java b/server/src/main/java/org/opensearch/action/search/UpdatePITContextRequest.java index a58bb4b4a1ecd..d52bb0f84f368 100644 --- a/server/src/main/java/org/opensearch/action/search/UpdatePITContextRequest.java +++ b/server/src/main/java/org/opensearch/action/search/UpdatePITContextRequest.java @@ -15,25 +15,28 @@ import java.io.IOException; +/** + * Request used to update PIT reader contexts with pitId, keepAlive and creationTime + */ public class UpdatePITContextRequest extends TransportRequest { private final String pitId; private final long keepAlive; - private final long createTime; + private final long creationTime; private final ShardSearchContextId searchContextId; - public UpdatePITContextRequest(ShardSearchContextId searchContextId, String pitId, long keepAlive, long createTime) { + public UpdatePITContextRequest(ShardSearchContextId searchContextId, String pitId, long keepAlive, long creationTime) { this.pitId = pitId; this.searchContextId = searchContextId; this.keepAlive = keepAlive; - this.createTime = createTime; + this.creationTime = creationTime; } UpdatePITContextRequest(StreamInput in) throws IOException { super(in); pitId = in.readString(); keepAlive = in.readLong(); - createTime = in.readLong(); + creationTime = in.readLong(); searchContextId = new ShardSearchContextId(in); } @@ -42,7 +45,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(pitId); out.writeLong(keepAlive); - out.writeLong(createTime); + out.writeLong(creationTime); searchContextId.writeTo(out); } @@ -54,8 +57,8 @@ public String getPitId() { return pitId; } - public long getCreateTime() { - return createTime; + public long getCreationTime() { + return creationTime; } public long getKeepAlive() { diff --git a/server/src/main/java/org/opensearch/action/search/UpdatePitContextResponse.java b/server/src/main/java/org/opensearch/action/search/UpdatePitContextResponse.java index 244507e9b4adb..b907ae1f8b8a9 100644 --- a/server/src/main/java/org/opensearch/action/search/UpdatePitContextResponse.java +++ b/server/src/main/java/org/opensearch/action/search/UpdatePitContextResponse.java @@ -17,27 +17,27 @@ public class UpdatePitContextResponse extends TransportResponse { private final String pitId; - private final long createTime; + private final long creationTime; private final long keepAlive; UpdatePitContextResponse(StreamInput in) throws IOException { super(in); pitId = in.readString(); - createTime = in.readLong(); + creationTime = in.readLong(); keepAlive = in.readLong(); } - public UpdatePitContextResponse(String pitId, long createTime, long keepAlive) { + public UpdatePitContextResponse(String pitId, long creationTime, long keepAlive) { this.pitId = pitId; this.keepAlive = keepAlive; - this.createTime = createTime; + this.creationTime = creationTime; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(pitId); - out.writeLong(createTime); + out.writeLong(creationTime); out.writeLong(keepAlive); } @@ -49,7 +49,7 @@ public long getKeepAlive() { return keepAlive; } - public long getCreateTime() { - return createTime; + public long getCreationTime() { + return creationTime; } } diff --git a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java index e2005c65ad3d7..bd4e16f50a2cb 100644 --- a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java @@ -137,7 +137,7 @@ default void onNewPitContext(ReaderContext readerContext) {} /** * Executed when a Point-In-Time search {@link SearchContext} is freed. - * This happens on deleteion of a Point-In-Time or on it's keep-alive expiring. + * This happens on deletion of a Point-In-Time or on it's keep-alive is expiring. * @param readerContext the freed search context */ default void onFreePitContext(ReaderContext readerContext) {} diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 415ba9fe20a00..eac93982f7d61 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -1035,9 +1035,9 @@ public void updatePitIdAndKeepAlive(UpdatePITContextRequest request, ActionListe } Releasable updatePit = null; try { - updatePit = readerContext.updatePitIdAndKeepAlive(request.getKeepAlive(), request.getPitId(), request.getCreateTime()); + updatePit = readerContext.updatePitIdAndKeepAlive(request.getKeepAlive(), request.getPitId(), request.getCreationTime()); updatePit.close(); - listener.onResponse(new UpdatePitContextResponse(request.getPitId(), request.getCreateTime(), request.getKeepAlive())); + listener.onResponse(new UpdatePitContextResponse(request.getPitId(), request.getCreationTime(), request.getKeepAlive())); } catch (Exception e) { freeReaderContext(readerContext.id()); if (updatePit != null) { diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 08bc3c4f65955..5c19d54327fea 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -22,7 +22,8 @@ public class PitReaderContext extends ReaderContext { // Storing the encoded PIT ID as part of PIT reader context for use cases such as list pit API private final SetOnce pitId = new SetOnce<>(); - private final SetOnce createTime = new SetOnce<>(); + // Creation time of PIT contexts which helps users to differentiate between multiple PIT reader contexts + private final SetOnce creationTime = new SetOnce<>(); public PitReaderContext( ShardSearchContextId id, @@ -52,18 +53,18 @@ public Releasable updatePitIdAndKeepAlive(long keepAliveInMillis, String pitId, refCounted.incRef(); tryUpdateKeepAlive(keepAliveInMillis); setPitId(pitId); - setCreateTime(createTime); + setCreationTime(createTime); return Releasables.releaseOnce(() -> { this.lastAccessTime.updateAndGet(curr -> Math.max(curr, nowInMillis())); refCounted.decRef(); }); } - public long getCreateTime() { - return this.createTime.get(); + public long getCreationTime() { + return this.creationTime.get(); } - public void setCreateTime(final long createTime) { - this.createTime.set(createTime); + public void setCreationTime(final long creationTime) { + this.creationTime.set(creationTime); } } diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java new file mode 100644 index 0000000000000..a0d2247579e09 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -0,0 +1,414 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.lucene.search.TotalHits; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AtomicArray; +import org.opensearch.index.query.IdsQueryBuilder; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.index.shard.ShardId; +import org.opensearch.search.*; +import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.internal.AliasFilter; +import org.opensearch.search.internal.InternalSearchResponse; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskId; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.Transport; +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CreatePitControllerTests extends OpenSearchTestCase { + + DiscoveryNode node1 = null; + DiscoveryNode node2 = null; + DiscoveryNode node3 = null; + String pitId = null; + TransportSearchAction transportSearchAction = null; + Task task = null; + DiscoveryNodes nodes = null; + NamedWriteableRegistry namedWriteableRegistry = null; + SearchResponse searchResponse = null; + ActionListener createPitListener = null; + ClusterService clusterServiceMock = null; + + @Before + public void setupData() { + node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + setPitId(); + namedWriteableRegistry = new NamedWriteableRegistry( + Arrays.asList( + new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, IdsQueryBuilder.NAME, IdsQueryBuilder::new) + ) + ); + nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); + transportSearchAction = mock(TransportSearchAction.class); + task = new Task( + randomLong(), + "transport", + SearchAction.NAME, + "description", + new TaskId(randomLong() + ":" + randomLong()), + Collections.emptyMap() + ); + InternalSearchResponse response = new InternalSearchResponse( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), + InternalAggregations.EMPTY, + null, + null, + false, + null, + 1 + ); + searchResponse = new SearchResponse( + response, + null, + 3, + 3, + 0, + 100, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY, + pitId + ); + createPitListener = new ActionListener() { + @Override + public void onResponse(CreatePITResponse createPITResponse) { + assertEquals(3, createPITResponse.getTotalShards()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }; + + clusterServiceMock = mock(ClusterService.class); + ClusterState state = mock(ClusterState.class); + + final Settings keepAliveSettings = Settings.builder() + .put(SearchService.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.getKey(), 30000) + .build(); + when(clusterServiceMock.getSettings()).thenReturn(keepAliveSettings); + + when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA); + when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA); + when(clusterServiceMock.state()).thenReturn(state); + when(state.getNodes()).thenReturn(nodes); + } + + /** + * Test if transport call for update pit is made to all nodes present as part of PIT ID returned from phase one of create pit + */ + public void testUpdatePitAfterCreatePitSuccess() throws InterruptedException { + List updateNodesInvoked = new CopyOnWriteArrayList<>(); + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + @Override + public void updatePitContext( + Transport.Connection connection, + UpdatePITContextRequest request, + ActionListener listener + ) { + updateNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + + CountDownLatch latch = new CountDownLatch(1); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + + CreatePITController controller = new CreatePITController( + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener + ); + + CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse); + + ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePITResponse createPITResponse) { + assertEquals(3, createPITResponse.getTotalShards()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }, latch); + + StepListener createListener = new StepListener<>(); + + controller.executeUpdatePitId(request, createListener, updatelistener); + createListener.onResponse(createPITResponse); + latch.await(); + assertEquals(3, updateNodesInvoked.size()); + } + + /** + * If create phase results in failure, update pit phase should not proceed and propagate the exception + */ + public void testUpdatePitAfterCreatePitFailure() throws InterruptedException { + List updateNodesInvoked = new CopyOnWriteArrayList<>(); + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + @Override + public void updatePitContext( + Transport.Connection connection, + UpdatePITContextRequest request, + ActionListener listener + ) { + updateNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + + CountDownLatch latch = new CountDownLatch(1); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + + CreatePITController controller = new CreatePITController( + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener + ); + + ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePITResponse createPITResponse) { + throw new AssertionError("on response is called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e.getCause().getMessage().contains("Exception occurred in phase 1")); + } + }, latch); + + StepListener createListener = new StepListener<>(); + + controller.executeUpdatePitId(request, createListener, updatelistener); + createListener.onFailure(new Exception("Exception occurred in phase 1")); + latch.await(); + assertEquals(0, updateNodesInvoked.size()); + } + + /** + * Testing that any update pit failures fails the request + */ + public void testUpdatePitFailureForNodeDrop() throws InterruptedException { + List updateNodesInvoked = new CopyOnWriteArrayList<>(); + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + @Override + public void updatePitContext( + Transport.Connection connection, + UpdatePITContextRequest request, + ActionListener listener + ) { + + updateNodesInvoked.add(connection.getNode()); + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + CreatePITController controller = new CreatePITController( + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener + ); + + CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse); + CountDownLatch latch = new CountDownLatch(1); + + ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePITResponse createPITResponse) { + throw new AssertionError("response is called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e.getMessage().contains("node 3 down")); + } + }, latch); + + StepListener createListener = new StepListener<>(); + controller.executeUpdatePitId(request, createListener, updatelistener); + createListener.onResponse(createPITResponse); + latch.await(); + assertEquals(3, updateNodesInvoked.size()); + } + + /** + * Testing that the update pit call is made to all the nodes despite failures in some nodes + */ + public void testUpdatePitFailureWhereAllNodesDown() throws InterruptedException { + List updateNodesInvoked = new CopyOnWriteArrayList<>(); + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + @Override + public void updatePitContext( + Transport.Connection connection, + UpdatePITContextRequest request, + ActionListener listener + ) { + updateNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + CreatePITController controller = new CreatePITController( + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener + ); + + CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse); + CountDownLatch latch = new CountDownLatch(1); + + ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePITResponse createPITResponse) { + throw new AssertionError("response is called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e.getMessage().contains("node down")); + } + }, latch); + + StepListener createListener = new StepListener<>(); + controller.executeUpdatePitId(request, createListener, updatelistener); + createListener.onResponse(createPITResponse); + latch.await(); + assertEquals(3, updateNodesInvoked.size()); + } + + QueryBuilder randomQueryBuilder() { + if (randomBoolean()) { + return new TermQueryBuilder(randomAlphaOfLength(10), randomAlphaOfLength(10)); + } else if (randomBoolean()) { + return new MatchAllQueryBuilder(); + } else { + return new IdsQueryBuilder().addIds(randomAlphaOfLength(10)); + } + } + + private void setPitId() { + AtomicArray array = new AtomicArray<>(3); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult( + new ShardSearchContextId("a", 1), + node1 + ); + testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult( + new ShardSearchContextId("b", 12), + node2 + ); + testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult( + new ShardSearchContextId("c", 42), + node3 + ); + testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null)); + array.setOnce(0, testSearchPhaseResult1); + array.setOnce(1, testSearchPhaseResult2); + array.setOnce(2, testSearchPhaseResult3); + + final Version version = Version.CURRENT; + final Map aliasFilters = new HashMap<>(); + for (SearchPhaseResult result : array.asList()) { + final AliasFilter aliasFilter; + if (randomBoolean()) { + aliasFilter = new AliasFilter(randomQueryBuilder()); + } else if (randomBoolean()) { + aliasFilter = new AliasFilter(randomQueryBuilder(), "alias-" + between(1, 10)); + } else { + aliasFilter = AliasFilter.EMPTY; + } + if (randomBoolean()) { + aliasFilters.put(result.getSearchShardTarget().getShardId().getIndex().getUUID(), aliasFilter); + } + } + pitId = SearchContextId.encode(array.asList(), aliasFilters, version); + } + +} diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index 29f570f60b000..32a2e7b21eb6f 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -70,17 +70,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); } - public void testCreatePitWithAllNodesDown() throws Exception { - internalCluster().startMasterOnlyNode(); - internalCluster().stopRandomDataNode(); - internalCluster().stopRandomDataNode(); - CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), false); - request.setIndices(new String[] { "index" }); - ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); - ExecutionException ex = expectThrows(ExecutionException.class, execute::get); - assertTrue(ex.getMessage().contains("all shards failed")); - } - public void testCreatePitWhileNodeDropWithAllowPartialCreationTrue() throws Exception { CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); @@ -141,23 +130,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); } - public void testPitSearchWithAllNodesDown() throws Exception { - CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); - request.setIndices(new String[] { "index" }); - ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); - CreatePITResponse pitResponse = execute.get(); - - internalCluster().startMasterOnlyNode(); - internalCluster().stopRandomDataNode(); - internalCluster().stopRandomDataNode(); - ActionFuture searchExecute = client().prepareSearch() - .setSize(2) - .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) - .execute(); - ExecutionException ex = expectThrows(ExecutionException.class, searchExecute::get); - assertTrue(ex.getMessage().contains("all shards failed")); - } - public void testPitInvalidDefaultKeepAlive() { IllegalArgumentException exc = expectThrows( IllegalArgumentException.class, diff --git a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java index 69ce5cb5b63ec..d43325d56ab58 100644 --- a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java @@ -221,9 +221,10 @@ public void testSearchWithFirstPhaseKeepAliveExpiry() throws ExecutionException, CreatePITResponse pitResponse = execute.get(); SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); + // since first phase temporary keep alive is set at 1 second in this test file + // and create pit request keep alive is less than that, keep alive is set to 1 second, (max of 2 keep alives) + // so reader context will clear up after 1 second Thread.sleep(1000); - // since first phase keep alive is set at 1 second in this test file and create pit request keep alive is - // less than that, so reader context will clear up after a second client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> { @@ -362,6 +363,9 @@ public void testOpenPitContextsConcurrently() throws Exception { service.doClose(); } + /** + * Point in time search should return the same results as creation time and index updates should not affect the PIT search results + */ public void testPitAfterUpdateIndex() throws Exception { client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 5)).get(); client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().get(); @@ -482,7 +486,7 @@ public void testPitAfterUpdateIndex() throws Exception { Matchers.equalTo(50L) ); /** - * using point in time id will have the old search results before update + * using point in time id will have the same search results as ones before updation */ assertThat( client().prepareSearch() diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 0b36d6467a1b0..18a0d55c487c5 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1452,7 +1452,7 @@ public void testUpdatePitId() { searchService.updatePitIdAndKeepAlive(updateRequest, updateFuture); UpdatePitContextResponse updateResponse = updateFuture.actionGet(); assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId")); - assertTrue(updateResponse.getCreateTime() == updateRequest.getCreateTime()); + assertTrue(updateResponse.getCreationTime() == updateRequest.getCreationTime()); assertTrue(updateResponse.getKeepAlive() == updateRequest.getKeepAlive()); assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId")); assertThat(searchService.getActiveContexts(), equalTo(1)); From 0fcd1dfb0d8fe40469dadfd32f6112357b35bfbe Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 14 Apr 2022 12:06:23 +0530 Subject: [PATCH 2/2] Adding cleanup logic if create pit fails Signed-off-by: Bharathwaj G --- .../action/search/CreatePITController.java | 36 ++++++++- .../action/search/RestCreatePITAction.java | 2 +- .../search/CreatePitControllerTests.java | 80 +++++++++++++++++-- .../opensearch/search/PitSingleNodeTests.java | 2 +- 4 files changed, 110 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITController.java b/server/src/main/java/org/opensearch/action/search/CreatePITController.java index 4e32b1ad1752e..0eb8cc65ea011 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePITController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITController.java @@ -8,6 +8,9 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -46,6 +49,7 @@ public class CreatePITController implements Runnable { private final Task task; private final ActionListener listener; private final CreatePITRequest request; + private static final Logger logger = LogManager.getLogger(CreatePITController.class); public CreatePITController( CreatePITRequest request, @@ -88,7 +92,10 @@ public void executeCreatePit() { final StepListener createPitListener = new StepListener<>(); - final ActionListener updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), listener::onFailure); + final ActionListener updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> { + logger.error("PIT creation failed while updating PIT ID", e); + listener.onFailure(e); + }); /** * Phase 1 of create PIT */ @@ -144,7 +151,8 @@ public void executeUpdatePitId( final ActionListener groupedActionListener = getGroupedListener( updatePitIdListener, createPITResponse, - contextId.shards().size() + contextId.shards().size(), + contextId.shards().values() ); /** * store the create time ( same create time for all PIT contexts across shards ) to be used @@ -169,6 +177,7 @@ public void executeUpdatePitId( groupedActionListener ); } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Create pit update phase failed on node [{}]", node), e); groupedActionListener.onFailure(new OpenSearchException("Create pit failed on node[" + node + "]", e)); } } @@ -199,7 +208,8 @@ private StepListener> getConnectionLoo private ActionListener getGroupedListener( ActionListener updatePitIdListener, CreatePITResponse createPITResponse, - int size + int size, + Collection contexts ) { return new GroupedActionListener<>(new ActionListener<>() { @Override @@ -209,11 +219,31 @@ public void onResponse(final Collection responses) { @Override public void onFailure(final Exception e) { + cleanupContexts(contexts); updatePitIdListener.onFailure(e); } }, size); } + /** + * Cleanup all created PIT contexts in case of failure + */ + private void cleanupContexts(Collection contexts) { + ActionListener deleteListener = new ActionListener<>() { + @Override + public void onResponse(Integer freed) { + // log the number of freed contexts - this is invoke and forget call + logger.debug(() -> new ParameterizedMessage("Cleaned up {} contexts out of {}", freed, contexts.size())); + } + + @Override + public void onFailure(Exception e) { + logger.debug("Cleaning up PIT contexts failed ", e); + } + }; + ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener); + } + @Override public void run() { runner.run(); diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java index 7f2a800d134b4..96c491f96a063 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java @@ -53,7 +53,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client @Override public List routes() { - return unmodifiableList(asList(new Route(POST, "/{index}/_pit"))); + return unmodifiableList(asList(new Route(POST, "/{index}/_search/_point_in_time"))); } } diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index a0d2247579e09..db4abf7dd179f 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -28,7 +28,11 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.index.shard.ShardId; -import org.opensearch.search.*; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.SearchPhaseResult; +import org.opensearch.search.SearchService; +import org.opensearch.search.SearchShardTarget; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; @@ -37,7 +41,11 @@ import org.opensearch.tasks.TaskId; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import static org.mockito.Mockito.mock; @@ -131,6 +139,7 @@ public void onFailure(Exception e) { */ public void testUpdatePitAfterCreatePitSuccess() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @Override public void updatePitContext( @@ -143,6 +152,20 @@ public void updatePitContext( t.start(); } + /** + * Test if cleanup request is called + */ + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + @Override public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); @@ -184,6 +207,7 @@ public void onFailure(Exception e) { createListener.onResponse(createPITResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); + assertEquals(0, deleteNodesInvoked.size()); } /** @@ -191,6 +215,7 @@ public void onFailure(Exception e) { */ public void testUpdatePitAfterCreatePitFailure() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @Override public void updatePitContext( @@ -207,6 +232,17 @@ public void updatePitContext( public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); } + + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } }; CountDownLatch latch = new CountDownLatch(1); @@ -242,6 +278,10 @@ public void onFailure(Exception e) { createListener.onFailure(new Exception("Exception occurred in phase 1")); latch.await(); assertEquals(0, updateNodesInvoked.size()); + /** + * cleanup is not called on create pit phase one failure + */ + assertEquals(0, deleteNodesInvoked.size()); } /** @@ -249,6 +289,7 @@ public void onFailure(Exception e) { */ public void testUpdatePitFailureForNodeDrop() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @Override public void updatePitContext( @@ -267,6 +308,17 @@ public void updatePitContext( } } + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + @Override public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); @@ -304,13 +356,15 @@ public void onFailure(Exception e) { createListener.onResponse(createPITResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); + /** + * check if cleanup is called for all nodes in case of update pit failure + */ + assertEquals(3, deleteNodesInvoked.size()); } - /** - * Testing that the update pit call is made to all the nodes despite failures in some nodes - */ public void testUpdatePitFailureWhereAllNodesDown() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @Override public void updatePitContext( @@ -323,6 +377,17 @@ public void updatePitContext( t.start(); } + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + @Override public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); @@ -360,6 +425,11 @@ public void onFailure(Exception e) { createListener.onResponse(createPITResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); + /** + * check if cleanup is called for all nodes in case of update pit failure + */ + assertEquals(3, deleteNodesInvoked.size()); + } QueryBuilder randomQueryBuilder() { diff --git a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java index d43325d56ab58..e2f48f2d661d2 100644 --- a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java @@ -486,7 +486,7 @@ public void testPitAfterUpdateIndex() throws Exception { Matchers.equalTo(50L) ); /** - * using point in time id will have the same search results as ones before updation + * using point in time id will have the same search results as ones before update */ assertThat( client().prepareSearch()