Skip to content

Commit

Permalink
Delete PIT API
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Apr 18, 2022
1 parent 43fed74 commit 57232fb
Show file tree
Hide file tree
Showing 14 changed files with 913 additions and 39 deletions.
12 changes: 5 additions & 7 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,13 @@
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePITAction;
import org.opensearch.action.search.DeletePITAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePITAction;
import org.opensearch.action.search.TransportDeletePITAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -400,13 +402,7 @@
import org.opensearch.rest.action.ingest.RestGetPipelineAction;
import org.opensearch.rest.action.ingest.RestPutPipelineAction;
import org.opensearch.rest.action.ingest.RestSimulatePipelineAction;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePITAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.rest.action.search.RestSearchScrollAction;
import org.opensearch.rest.action.search.*;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.usage.UsageService;
Expand Down Expand Up @@ -664,6 +660,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class);
actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class);
actions.register(CreatePITAction.INSTANCE, TransportCreatePITAction.class);
actions.register(DeletePITAction.INSTANCE, TransportDeletePITAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down Expand Up @@ -839,6 +836,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {

// Point in time API
registerHandler.accept(new RestCreatePITAction());
registerHandler.accept(new RestDeletePITAction());
for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
settings,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

public class DeletePITAction extends ActionType<DeletePITResponse> {

public static final DeletePITAction INSTANCE = new DeletePITAction();
public static final String NAME = "indices:admin/delete/pit";

private DeletePITAction() {
super(NAME, DeletePITResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.CountDown;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportResponse;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class DeletePITController implements Runnable {
private final DiscoveryNodes nodes;
private final SearchTransportService searchTransportService;
private final CountDown expectedOps;
private final ActionListener<DeletePITResponse> listener;
private final AtomicBoolean hasFailed = new AtomicBoolean(false);
private final AtomicInteger freedSearchContexts = new AtomicInteger(0);
private final ClusterService clusterService;
private final Runnable runner;

public DeletePITController(
DeletePITRequest request,
ActionListener<DeletePITResponse> listener,
ClusterService clusterService,
SearchTransportService searchTransportService
) {
this.nodes = clusterService.state().getNodes();
this.clusterService = clusterService;
this.searchTransportService = searchTransportService;
this.listener = listener;
List<String> pitIds = request.getPitIds();
final int expectedOps;
if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) {
expectedOps = nodes.getSize();
runner = this::deleteAllPits;
} else {
// TODO: replace this with #closeContexts
List<SearchContextIdForNode> contexts = new ArrayList<>();
for (String scrollId : request.getPitIds()) {
SearchContextIdForNode[] context = TransportSearchHelper.parseScrollId(scrollId).getContext();
Collections.addAll(contexts, context);
}
if (contexts.isEmpty()) {
expectedOps = 0;
runner = () -> listener.onResponse(new DeletePITResponse(true));
} else {
expectedOps = contexts.size();
runner = () -> ClearScrollController.closeContexts(
clusterService.state().nodes(),
searchTransportService,
contexts,
ActionListener.wrap(r -> listener.onResponse(new DeletePITResponse(true)), listener::onFailure)
);
}
}
this.expectedOps = new CountDown(expectedOps);

}

@Override
public void run() {
runner.run();
}

void deleteAllPits() {
for (final DiscoveryNode node : clusterService.state().getNodes()) {
try {
Transport.Connection connection = searchTransportService.getConnection(null, node);
searchTransportService.sendDeleteAllPitContexts(connection, new ActionListener<TransportResponse>() {
@Override
public void onResponse(TransportResponse response) {
onFreedContext(true);
}

@Override
public void onFailure(Exception e) {
onFailedFreedContext(e, node);
}
});
} catch (Exception e) {
onFailedFreedContext(e, node);
}
}
}

public static class PITSinglePhaseSearchResult extends SearchPhaseResult {
public void setContextId(ShardSearchContextId contextId) {
this.contextId = contextId;
}
}

private void onFreedContext(boolean freed) {
if (freed) {
freedSearchContexts.incrementAndGet();
}
if (expectedOps.countDown()) {
boolean succeeded = hasFailed.get() == false;
listener.onResponse(new DeletePITResponse(succeeded));
}
}

private void onFailedFreedContext(Throwable e, DiscoveryNode node) {
/*
* We have to set the failure marker before we count down otherwise we can expose the failure marker before we have set it to a
* racing thread successfully freeing a context. This would lead to that thread responding that the clear scroll succeeded.
*/
hasFailed.set(true);
if (expectedOps.countDown()) {
listener.onResponse(new DeletePITResponse(false));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Request to delete one or more PIT contexts based on IDs.
*/
public class DeletePITRequest extends ActionRequest implements ToXContentObject {

private List<String> pitIds;

public DeletePITRequest(StreamInput in) throws IOException {
super(in);
pitIds = Arrays.asList(in.readStringArray());
}

public DeletePITRequest(String... pitIds) {
if (pitIds != null) {
this.pitIds = Arrays.asList(pitIds);
}
}

public DeletePITRequest(List<String> pitIds) {
if (pitIds != null) {
this.pitIds = pitIds;
}
}

public DeletePITRequest() {}

public List<String> getPitIds() {
return pitIds;
}

public void setPitIds(List<String> pitIds) {
this.pitIds = pitIds;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (pitIds == null || pitIds.isEmpty()) {
validationException = addValidationError("no pit ids specified", validationException);
}
return validationException;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (pitIds == null) {
out.writeVInt(0);
} else {
out.writeStringArray(pitIds.toArray(new String[pitIds.size()]));
}
}

public void addPitId(String pitId) {
if (pitIds == null) {
pitIds = new ArrayList<>();
}
pitIds.add(pitId);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.startArray("pit_id");
for (String pitId : pitIds) {
builder.value(pitId);
}
builder.endArray();
builder.endObject();
return builder;
}

public void fromXContent(XContentParser parser) throws IOException {
pitIds = null;
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("Malformed content, must start with an object");
} else {
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if ("pit_id".equals(currentFieldName)) {
if (token == XContentParser.Token.START_ARRAY) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id array element should only contain pit_id");
}
addPitId(parser.text());
}
} else {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id element should only contain pit_id");
}
addPitId(parser.text());
}
} else {
throw new IllegalArgumentException(
"Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] "
);
}
}
}
}

}
Loading

0 comments on commit 57232fb

Please sign in to comment.