Skip to content

Commit

Permalink
Adding cleanup logic if create pit fails
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Apr 14, 2022
1 parent f8f6367 commit 0fcd1df
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -46,6 +49,7 @@ public class CreatePITController implements Runnable {
private final Task task;
private final ActionListener<CreatePITResponse> listener;
private final CreatePITRequest request;
private static final Logger logger = LogManager.getLogger(CreatePITController.class);

public CreatePITController(
CreatePITRequest request,
Expand Down Expand Up @@ -88,7 +92,10 @@ public void executeCreatePit() {

final StepListener<CreatePITResponse> createPitListener = new StepListener<>();

final ActionListener<CreatePITResponse> updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), listener::onFailure);
final ActionListener<CreatePITResponse> updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> {
logger.error("PIT creation failed while updating PIT ID", e);
listener.onFailure(e);
});
/**
* Phase 1 of create PIT
*/
Expand Down Expand Up @@ -144,7 +151,8 @@ public void executeUpdatePitId(
final ActionListener<UpdatePitContextResponse> groupedActionListener = getGroupedListener(
updatePitIdListener,
createPITResponse,
contextId.shards().size()
contextId.shards().size(),
contextId.shards().values()
);
/**
* store the create time ( same create time for all PIT contexts across shards ) to be used
Expand All @@ -169,6 +177,7 @@ public void executeUpdatePitId(
groupedActionListener
);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Create pit update phase failed on node [{}]", node), e);
groupedActionListener.onFailure(new OpenSearchException("Create pit failed on node[" + node + "]", e));
}
}
Expand Down Expand Up @@ -199,7 +208,8 @@ private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLoo
private ActionListener<UpdatePitContextResponse> getGroupedListener(
ActionListener<CreatePITResponse> updatePitIdListener,
CreatePITResponse createPITResponse,
int size
int size,
Collection<SearchContextIdForNode> contexts
) {
return new GroupedActionListener<>(new ActionListener<>() {
@Override
Expand All @@ -209,11 +219,31 @@ public void onResponse(final Collection<UpdatePitContextResponse> responses) {

@Override
public void onFailure(final Exception e) {
cleanupContexts(contexts);
updatePitIdListener.onFailure(e);
}
}, size);
}

/**
* Cleanup all created PIT contexts in case of failure
*/
private void cleanupContexts(Collection<SearchContextIdForNode> contexts) {
ActionListener<Integer> deleteListener = new ActionListener<>() {
@Override
public void onResponse(Integer freed) {
// log the number of freed contexts - this is invoke and forget call
logger.debug(() -> new ParameterizedMessage("Cleaned up {} contexts out of {}", freed, contexts.size()));
}

@Override
public void onFailure(Exception e) {
logger.debug("Cleaning up PIT contexts failed ", e);
}
};
ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener);
}

@Override
public void run() {
runner.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client

@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(POST, "/{index}/_pit")));
return unmodifiableList(asList(new Route(POST, "/{index}/_search/_point_in_time")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.shard.ShardId;
import org.opensearch.search.*;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchService;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.InternalSearchResponse;
Expand All @@ -37,7 +41,11 @@
import org.opensearch.tasks.TaskId;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.Transport;
import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -131,6 +139,7 @@ public void onFailure(Exception e) {
*/
public void testUpdatePitAfterCreatePitSuccess() throws InterruptedException {
List<DiscoveryNode> updateNodesInvoked = new CopyOnWriteArrayList<>();
List<DiscoveryNode> deleteNodesInvoked = new CopyOnWriteArrayList<>();
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
@Override
public void updatePitContext(
Expand All @@ -143,6 +152,20 @@ public void updatePitContext(
t.start();
}

/**
* Test if cleanup request is called
*/
@Override
public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener
) {
deleteNodesInvoked.add(connection.getNode());
Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true)));
t.start();
}

@Override
public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
return new SearchAsyncActionTests.MockConnection(node);
Expand Down Expand Up @@ -184,13 +207,15 @@ public void onFailure(Exception e) {
createListener.onResponse(createPITResponse);
latch.await();
assertEquals(3, updateNodesInvoked.size());
assertEquals(0, deleteNodesInvoked.size());
}

/**
* If create phase results in failure, update pit phase should not proceed and propagate the exception
*/
public void testUpdatePitAfterCreatePitFailure() throws InterruptedException {
List<DiscoveryNode> updateNodesInvoked = new CopyOnWriteArrayList<>();
List<DiscoveryNode> deleteNodesInvoked = new CopyOnWriteArrayList<>();
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
@Override
public void updatePitContext(
Expand All @@ -207,6 +232,17 @@ public void updatePitContext(
public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
return new SearchAsyncActionTests.MockConnection(node);
}

@Override
public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener
) {
deleteNodesInvoked.add(connection.getNode());
Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true)));
t.start();
}
};

CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -242,13 +278,18 @@ public void onFailure(Exception e) {
createListener.onFailure(new Exception("Exception occurred in phase 1"));
latch.await();
assertEquals(0, updateNodesInvoked.size());
/**
* cleanup is not called on create pit phase one failure
*/
assertEquals(0, deleteNodesInvoked.size());
}

/**
* Testing that any update pit failures fails the request
*/
public void testUpdatePitFailureForNodeDrop() throws InterruptedException {
List<DiscoveryNode> updateNodesInvoked = new CopyOnWriteArrayList<>();
List<DiscoveryNode> deleteNodesInvoked = new CopyOnWriteArrayList<>();
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
@Override
public void updatePitContext(
Expand All @@ -267,6 +308,17 @@ public void updatePitContext(
}
}

@Override
public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener
) {
deleteNodesInvoked.add(connection.getNode());
Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true)));
t.start();
}

@Override
public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
return new SearchAsyncActionTests.MockConnection(node);
Expand Down Expand Up @@ -304,13 +356,15 @@ public void onFailure(Exception e) {
createListener.onResponse(createPITResponse);
latch.await();
assertEquals(3, updateNodesInvoked.size());
/**
* check if cleanup is called for all nodes in case of update pit failure
*/
assertEquals(3, deleteNodesInvoked.size());
}

/**
* Testing that the update pit call is made to all the nodes despite failures in some nodes
*/
public void testUpdatePitFailureWhereAllNodesDown() throws InterruptedException {
List<DiscoveryNode> updateNodesInvoked = new CopyOnWriteArrayList<>();
List<DiscoveryNode> deleteNodesInvoked = new CopyOnWriteArrayList<>();
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
@Override
public void updatePitContext(
Expand All @@ -323,6 +377,17 @@ public void updatePitContext(
t.start();
}

@Override
public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener
) {
deleteNodesInvoked.add(connection.getNode());
Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true)));
t.start();
}

@Override
public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
return new SearchAsyncActionTests.MockConnection(node);
Expand Down Expand Up @@ -360,6 +425,11 @@ public void onFailure(Exception e) {
createListener.onResponse(createPITResponse);
latch.await();
assertEquals(3, updateNodesInvoked.size());
/**
* check if cleanup is called for all nodes in case of update pit failure
*/
assertEquals(3, deleteNodesInvoked.size());

}

QueryBuilder randomQueryBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ public void testPitAfterUpdateIndex() throws Exception {
Matchers.equalTo(50L)
);
/**
* using point in time id will have the same search results as ones before updation
* using point in time id will have the same search results as ones before update
*/
assertThat(
client().prepareSearch()
Expand Down

0 comments on commit 0fcd1df

Please sign in to comment.