Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add synchronous execution option to workflow provisioning #990

Merged
merged 17 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.18...2.x)
### Features
- Add synchronous execution option to workflow provisioning ([#990](https://github.com/opensearch-project/flow-framework/pull/990))

### Enhancements
### Bug Fixes
- Remove useCase and defaultParams field in WorkflowRequest ([#758](https://github.com/opensearch-project/flow-framework/pull/758))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ private CommonValue() {}
public static final String PROVISION_WORKFLOW = "provision";
/** The param name for update workflow field in create API */
public static final String UPDATE_WORKFLOW_FIELDS = "update_fields";
/** The param name for specifying the timeout duration in seconds to wait for workflow completion */
public static final String WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
/** The field name for workflow steps. This field represents the name of the workflow steps to be fetched. */
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
Expand Down Expand Up @@ -186,6 +188,8 @@ private CommonValue() {}
public static final String SOURCE_INDEX = "source_index";
/** The destination index field for reindex */
public static final String DESTINATION_INDEX = "destination_index";
/** Provision Timeout field */
public static final String PROVISION_TIMEOUT_FIELD = "provision.timeout";
/*
* Constants associated with resource provisioning / state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand Down Expand Up @@ -43,6 +44,7 @@
import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
import static org.opensearch.flowframework.common.CommonValue.WAIT_FOR_COMPLETION_TIMEOUT;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -88,6 +90,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
boolean reprovision = request.paramAsBoolean(REPROVISION_WORKFLOW, false);
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
String useCase = request.param(USE_CASE);
TimeValue waitForCompletionTimeout = request.paramAsTime(WAIT_FOR_COMPLETION_TIMEOUT, TimeValue.MINUS_ONE);

// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
Expand Down Expand Up @@ -145,6 +148,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
return processError(ffe, params, request);
}
// Ensure wait_for_completion is not set unless reprovision or provision is true
if (waitForCompletionTimeout != TimeValue.MINUS_ONE && !(reprovision || provision)) {
FlowFrameworkException ffe = new FlowFrameworkException(
"Request parameters 'wait_for_completion_timeout' are not allowed unless the 'provision' or 'reprovision' parameter is set to true.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}

try {
Template template;
Map<String, String> useCaseDefaultsMap = Collections.emptyMap();
Expand Down Expand Up @@ -219,7 +231,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
if (updateFields) {
params = Map.of(UPDATE_WORKFLOW_FIELDS, "true");
}

if (waitForCompletionTimeout != TimeValue.MINUS_ONE) {
params = Map.of(WAIT_FOR_COMPLETION_TIMEOUT, waitForCompletionTimeout.toString());
}
WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
template,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand All @@ -33,6 +34,7 @@
import java.util.stream.Collectors;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.WAIT_FOR_COMPLETION_TIMEOUT;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -73,6 +75,7 @@ public List<Route> routes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
TimeValue waitForCompletionTimeout = request.paramAsTime(WAIT_FOR_COMPLETION_TIMEOUT, TimeValue.MINUS_ONE);
try {
Map<String, String> params = parseParamsAndContent(request);
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
Expand All @@ -86,7 +89,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
// Create request and provision
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, params);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, params, waitForCompletionTimeout);
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WAIT_FOR_COMPLETION_TIMEOUT;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.checkFilterByBackendRoles;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
Expand Down Expand Up @@ -214,6 +215,16 @@
}
}
String workflowId = request.getWorkflowId();
TimeValue waitForTimeCompletion;
if (request.getParams().containsKey(WAIT_FOR_COMPLETION_TIMEOUT)) {
waitForTimeCompletion = TimeValue.parseTimeValue(
request.getParams().get(WAIT_FOR_COMPLETION_TIMEOUT),
WAIT_FOR_COMPLETION_TIMEOUT
);
} else {
// default to minus one indicate async execution
waitForTimeCompletion = TimeValue.MINUS_ONE;
}
if (workflowId == null) {
// This is a new workflow (POST)
// Throttle incoming requests
Expand Down Expand Up @@ -251,7 +262,8 @@
WorkflowRequest workflowRequest = new WorkflowRequest(
globalContextResponse.getId(),
null,
request.getParams()
request.getParams(),
waitForTimeCompletion
);
logger.info(
"Provisioning parameter is set, continuing to provision workflow {}",
Expand All @@ -261,7 +273,14 @@
ProvisionWorkflowAction.INSTANCE,
workflowRequest,
ActionListener.wrap(provisionResponse -> {
listener.onResponse(new WorkflowResponse(provisionResponse.getWorkflowId()));
listener.onResponse(
(workflowRequest.getWaitForCompletionTimeout() == TimeValue.MINUS_ONE)
? new WorkflowResponse(provisionResponse.getWorkflowId())
: new WorkflowResponse(
provisionResponse.getWorkflowId(),
provisionResponse.getWorkflowState()
)
);
}, exception -> {
String errorMessage = "Provisioning failed.";
logger.error(errorMessage, exception);
Expand Down Expand Up @@ -346,19 +365,26 @@
.build();

if (request.isReprovision()) {

// Reprovision request
ReprovisionWorkflowRequest reprovisionRequest = new ReprovisionWorkflowRequest(
getResponse.getId(),
existingTemplate,
template
template,
waitForTimeCompletion
);
logger.info("Reprovisioning parameter is set, continuing to reprovision workflow {}", getResponse.getId());
client.execute(
ReprovisionWorkflowAction.INSTANCE,
reprovisionRequest,
ActionListener.wrap(reprovisionResponse -> {
listener.onResponse(new WorkflowResponse(reprovisionResponse.getWorkflowId()));
listener.onResponse(
reprovisionRequest.getWaitForCompletionTimeout() == TimeValue.MINUS_ONE
? new WorkflowResponse(reprovisionResponse.getWorkflowId())
: new WorkflowResponse(
reprovisionResponse.getWorkflowId(),
reprovisionResponse.getWorkflowState()

Check warning on line 385 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L383-L385

Added lines #L383 - L385 were not covered by tests
)
);
}, exception -> {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Reprovisioning failed for workflow {}",
Expand Down
Loading
Loading