diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITController.java b/server/src/main/java/org/opensearch/action/search/CreatePITController.java index 4e32b1ad1752e..0eb8cc65ea011 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePITController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITController.java @@ -8,6 +8,9 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -46,6 +49,7 @@ public class CreatePITController implements Runnable { private final Task task; private final ActionListener listener; private final CreatePITRequest request; + private static final Logger logger = LogManager.getLogger(CreatePITController.class); public CreatePITController( CreatePITRequest request, @@ -88,7 +92,10 @@ public void executeCreatePit() { final StepListener createPitListener = new StepListener<>(); - final ActionListener updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), listener::onFailure); + final ActionListener updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> { + logger.error("PIT creation failed while updating PIT ID", e); + listener.onFailure(e); + }); /** * Phase 1 of create PIT */ @@ -144,7 +151,8 @@ public void executeUpdatePitId( final ActionListener groupedActionListener = getGroupedListener( updatePitIdListener, createPITResponse, - contextId.shards().size() + contextId.shards().size(), + contextId.shards().values() ); /** * store the create time ( same create time for all PIT contexts across shards ) to be used @@ -169,6 +177,7 @@ public void executeUpdatePitId( groupedActionListener ); } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Create pit update phase failed on node [{}]", node), e); groupedActionListener.onFailure(new OpenSearchException("Create pit failed on node[" + node + "]", e)); } } @@ -199,7 +208,8 @@ private StepListener> getConnectionLoo private ActionListener getGroupedListener( ActionListener updatePitIdListener, CreatePITResponse createPITResponse, - int size + int size, + Collection contexts ) { return new GroupedActionListener<>(new ActionListener<>() { @Override @@ -209,11 +219,31 @@ public void onResponse(final Collection responses) { @Override public void onFailure(final Exception e) { + cleanupContexts(contexts); updatePitIdListener.onFailure(e); } }, size); } + /** + * Cleanup all created PIT contexts in case of failure + */ + private void cleanupContexts(Collection contexts) { + ActionListener deleteListener = new ActionListener<>() { + @Override + public void onResponse(Integer freed) { + // log the number of freed contexts - this is invoke and forget call + logger.debug(() -> new ParameterizedMessage("Cleaned up {} contexts out of {}", freed, contexts.size())); + } + + @Override + public void onFailure(Exception e) { + logger.debug("Cleaning up PIT contexts failed ", e); + } + }; + ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener); + } + @Override public void run() { runner.run(); diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java b/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java index 64419e785a838..31a20f0f907cb 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java @@ -18,7 +18,11 @@ import java.io.IOException; +/** + * Create point in time response with point in time id and success / failures + */ public class CreatePITResponse extends ActionResponse implements StatusToXContentObject { + // point in time id private final String id; private final int totalShards; private final int successfulShards; diff --git a/server/src/main/java/org/opensearch/action/search/UpdatePITContextRequest.java b/server/src/main/java/org/opensearch/action/search/UpdatePITContextRequest.java index a58bb4b4a1ecd..d52bb0f84f368 100644 --- a/server/src/main/java/org/opensearch/action/search/UpdatePITContextRequest.java +++ b/server/src/main/java/org/opensearch/action/search/UpdatePITContextRequest.java @@ -15,25 +15,28 @@ import java.io.IOException; +/** + * Request used to update PIT reader contexts with pitId, keepAlive and creationTime + */ public class UpdatePITContextRequest extends TransportRequest { private final String pitId; private final long keepAlive; - private final long createTime; + private final long creationTime; private final ShardSearchContextId searchContextId; - public UpdatePITContextRequest(ShardSearchContextId searchContextId, String pitId, long keepAlive, long createTime) { + public UpdatePITContextRequest(ShardSearchContextId searchContextId, String pitId, long keepAlive, long creationTime) { this.pitId = pitId; this.searchContextId = searchContextId; this.keepAlive = keepAlive; - this.createTime = createTime; + this.creationTime = creationTime; } UpdatePITContextRequest(StreamInput in) throws IOException { super(in); pitId = in.readString(); keepAlive = in.readLong(); - createTime = in.readLong(); + creationTime = in.readLong(); searchContextId = new ShardSearchContextId(in); } @@ -42,7 +45,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(pitId); out.writeLong(keepAlive); - out.writeLong(createTime); + out.writeLong(creationTime); searchContextId.writeTo(out); } @@ -54,8 +57,8 @@ public String getPitId() { return pitId; } - public long getCreateTime() { - return createTime; + public long getCreationTime() { + return creationTime; } public long getKeepAlive() { diff --git a/server/src/main/java/org/opensearch/action/search/UpdatePitContextResponse.java b/server/src/main/java/org/opensearch/action/search/UpdatePitContextResponse.java index 244507e9b4adb..b907ae1f8b8a9 100644 --- a/server/src/main/java/org/opensearch/action/search/UpdatePitContextResponse.java +++ b/server/src/main/java/org/opensearch/action/search/UpdatePitContextResponse.java @@ -17,27 +17,27 @@ public class UpdatePitContextResponse extends TransportResponse { private final String pitId; - private final long createTime; + private final long creationTime; private final long keepAlive; UpdatePitContextResponse(StreamInput in) throws IOException { super(in); pitId = in.readString(); - createTime = in.readLong(); + creationTime = in.readLong(); keepAlive = in.readLong(); } - public UpdatePitContextResponse(String pitId, long createTime, long keepAlive) { + public UpdatePitContextResponse(String pitId, long creationTime, long keepAlive) { this.pitId = pitId; this.keepAlive = keepAlive; - this.createTime = createTime; + this.creationTime = creationTime; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(pitId); - out.writeLong(createTime); + out.writeLong(creationTime); out.writeLong(keepAlive); } @@ -49,7 +49,7 @@ public long getKeepAlive() { return keepAlive; } - public long getCreateTime() { - return createTime; + public long getCreationTime() { + return creationTime; } } diff --git a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java index e2005c65ad3d7..bd4e16f50a2cb 100644 --- a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java @@ -137,7 +137,7 @@ default void onNewPitContext(ReaderContext readerContext) {} /** * Executed when a Point-In-Time search {@link SearchContext} is freed. - * This happens on deleteion of a Point-In-Time or on it's keep-alive expiring. + * This happens on deletion of a Point-In-Time or on it's keep-alive is expiring. * @param readerContext the freed search context */ default void onFreePitContext(ReaderContext readerContext) {} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java index 7f2a800d134b4..96c491f96a063 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java @@ -53,7 +53,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client @Override public List routes() { - return unmodifiableList(asList(new Route(POST, "/{index}/_pit"))); + return unmodifiableList(asList(new Route(POST, "/{index}/_search/_point_in_time"))); } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 8ac12f8fbb1ef..0ae58c2314cf5 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -1051,9 +1051,9 @@ public void updatePitIdAndKeepAlive(UpdatePITContextRequest request, ActionListe } Releasable updatePit = null; try { - updatePit = readerContext.updatePitIdAndKeepAlive(request.getKeepAlive(), request.getPitId(), request.getCreateTime()); + updatePit = readerContext.updatePitIdAndKeepAlive(request.getKeepAlive(), request.getPitId(), request.getCreationTime()); updatePit.close(); - listener.onResponse(new UpdatePitContextResponse(request.getPitId(), request.getCreateTime(), request.getKeepAlive())); + listener.onResponse(new UpdatePitContextResponse(request.getPitId(), request.getCreationTime(), request.getKeepAlive())); } catch (Exception e) { freeReaderContext(readerContext.id()); if (updatePit != null) { diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 08bc3c4f65955..5c19d54327fea 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -22,7 +22,8 @@ public class PitReaderContext extends ReaderContext { // Storing the encoded PIT ID as part of PIT reader context for use cases such as list pit API private final SetOnce pitId = new SetOnce<>(); - private final SetOnce createTime = new SetOnce<>(); + // Creation time of PIT contexts which helps users to differentiate between multiple PIT reader contexts + private final SetOnce creationTime = new SetOnce<>(); public PitReaderContext( ShardSearchContextId id, @@ -52,18 +53,18 @@ public Releasable updatePitIdAndKeepAlive(long keepAliveInMillis, String pitId, refCounted.incRef(); tryUpdateKeepAlive(keepAliveInMillis); setPitId(pitId); - setCreateTime(createTime); + setCreationTime(createTime); return Releasables.releaseOnce(() -> { this.lastAccessTime.updateAndGet(curr -> Math.max(curr, nowInMillis())); refCounted.decRef(); }); } - public long getCreateTime() { - return this.createTime.get(); + public long getCreationTime() { + return this.creationTime.get(); } - public void setCreateTime(final long createTime) { - this.createTime.set(createTime); + public void setCreationTime(final long creationTime) { + this.creationTime.set(creationTime); } } diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index c44d8c651cce6..db4abf7dd179f 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -28,7 +28,11 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.index.shard.ShardId; -import org.opensearch.search.*; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.SearchPhaseResult; +import org.opensearch.search.SearchService; +import org.opensearch.search.SearchShardTarget; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; @@ -37,7 +41,11 @@ import org.opensearch.tasks.TaskId; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import static org.mockito.Mockito.mock; @@ -126,8 +134,12 @@ public void onFailure(Exception e) { when(state.getNodes()).thenReturn(nodes); } + /** + * Test if transport call for update pit is made to all nodes present as part of PIT ID returned from phase one of create pit + */ public void testUpdatePitAfterCreatePitSuccess() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @Override public void updatePitContext( @@ -140,6 +152,20 @@ public void updatePitContext( t.start(); } + /** + * Test if cleanup request is called + */ + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + @Override public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); @@ -181,10 +207,15 @@ public void onFailure(Exception e) { createListener.onResponse(createPITResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); + assertEquals(0, deleteNodesInvoked.size()); } + /** + * If create phase results in failure, update pit phase should not proceed and propagate the exception + */ public void testUpdatePitAfterCreatePitFailure() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @Override public void updatePitContext( @@ -201,6 +232,17 @@ public void updatePitContext( public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); } + + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } }; CountDownLatch latch = new CountDownLatch(1); @@ -236,10 +278,18 @@ public void onFailure(Exception e) { createListener.onFailure(new Exception("Exception occurred in phase 1")); latch.await(); assertEquals(0, updateNodesInvoked.size()); + /** + * cleanup is not called on create pit phase one failure + */ + assertEquals(0, deleteNodesInvoked.size()); } + /** + * Testing that any update pit failures fails the request + */ public void testUpdatePitFailureForNodeDrop() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @Override public void updatePitContext( @@ -258,6 +308,17 @@ public void updatePitContext( } } + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + @Override public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); @@ -295,10 +356,15 @@ public void onFailure(Exception e) { createListener.onResponse(createPITResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); + /** + * check if cleanup is called for all nodes in case of update pit failure + */ + assertEquals(3, deleteNodesInvoked.size()); } public void testUpdatePitFailureWhereAllNodesDown() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @Override public void updatePitContext( @@ -311,6 +377,17 @@ public void updatePitContext( t.start(); } + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + @Override public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); @@ -348,6 +425,11 @@ public void onFailure(Exception e) { createListener.onResponse(createPITResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); + /** + * check if cleanup is called for all nodes in case of update pit failure + */ + assertEquals(3, deleteNodesInvoked.size()); + } QueryBuilder randomQueryBuilder() { diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index 29f570f60b000..32a2e7b21eb6f 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -70,17 +70,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); } - public void testCreatePitWithAllNodesDown() throws Exception { - internalCluster().startMasterOnlyNode(); - internalCluster().stopRandomDataNode(); - internalCluster().stopRandomDataNode(); - CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), false); - request.setIndices(new String[] { "index" }); - ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); - ExecutionException ex = expectThrows(ExecutionException.class, execute::get); - assertTrue(ex.getMessage().contains("all shards failed")); - } - public void testCreatePitWhileNodeDropWithAllowPartialCreationTrue() throws Exception { CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); @@ -141,23 +130,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); } - public void testPitSearchWithAllNodesDown() throws Exception { - CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); - request.setIndices(new String[] { "index" }); - ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); - CreatePITResponse pitResponse = execute.get(); - - internalCluster().startMasterOnlyNode(); - internalCluster().stopRandomDataNode(); - internalCluster().stopRandomDataNode(); - ActionFuture searchExecute = client().prepareSearch() - .setSize(2) - .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) - .execute(); - ExecutionException ex = expectThrows(ExecutionException.class, searchExecute::get); - assertTrue(ex.getMessage().contains("all shards failed")); - } - public void testPitInvalidDefaultKeepAlive() { IllegalArgumentException exc = expectThrows( IllegalArgumentException.class, diff --git a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java index 69ce5cb5b63ec..e2f48f2d661d2 100644 --- a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java @@ -221,9 +221,10 @@ public void testSearchWithFirstPhaseKeepAliveExpiry() throws ExecutionException, CreatePITResponse pitResponse = execute.get(); SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); + // since first phase temporary keep alive is set at 1 second in this test file + // and create pit request keep alive is less than that, keep alive is set to 1 second, (max of 2 keep alives) + // so reader context will clear up after 1 second Thread.sleep(1000); - // since first phase keep alive is set at 1 second in this test file and create pit request keep alive is - // less than that, so reader context will clear up after a second client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> { @@ -362,6 +363,9 @@ public void testOpenPitContextsConcurrently() throws Exception { service.doClose(); } + /** + * Point in time search should return the same results as creation time and index updates should not affect the PIT search results + */ public void testPitAfterUpdateIndex() throws Exception { client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 5)).get(); client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().get(); @@ -482,7 +486,7 @@ public void testPitAfterUpdateIndex() throws Exception { Matchers.equalTo(50L) ); /** - * using point in time id will have the old search results before update + * using point in time id will have the same search results as ones before update */ assertThat( client().prepareSearch() diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 0b36d6467a1b0..18a0d55c487c5 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1452,7 +1452,7 @@ public void testUpdatePitId() { searchService.updatePitIdAndKeepAlive(updateRequest, updateFuture); UpdatePitContextResponse updateResponse = updateFuture.actionGet(); assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId")); - assertTrue(updateResponse.getCreateTime() == updateRequest.getCreateTime()); + assertTrue(updateResponse.getCreationTime() == updateRequest.getCreationTime()); assertTrue(updateResponse.getKeepAlive() == updateRequest.getKeepAlive()); assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId")); assertThat(searchService.getActiveContexts(), equalTo(1));