Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add synchronous execution option to workflow provisioning #990

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

junweid62
Copy link
Contributor

Description

This PR introduces a new wait_for_completion_timeout feature to the Provision Workflow API in the OpenSearch Flow Framework. The feature allows users to control whether the API call waits for the entire workflow provisioning process to complete before returning a response.

What’s Changed:

  1. Added support for the wait_for_completion_timeout parameter in the REST layer (RestProvisionWorkflowAction).
  • Accepts a time duration value (e.g., 30s, 1m).
  • If the workflow is provisioned within the specified timeout, the API returns the created resources (same response as GetWorkflowStatus).
  • If the timeout is reached before provisioning completes, the API returns the workflow state without waiting further.
  1. Updated the transport layer (ProvisionWorkflowTransportAction) to handle the timeout logic and ensure correct behavior during synchronous provisioning.

Success Response:

{
    "workflow_id": "K13IR5QBEpCfUu_-AQdU",
    "state": "COMPLETED",
    "resources_created": [
        {
            "workflow_step_name": "create_connector",
            "workflow_step_id": "create_connector_1",
            "resource_id": "LF3IR5QBEpCfUu_-Awd_",
            "resource_type": "connector_id"
        },
        {
            "workflow_step_id": "register_model_2",
            "workflow_step_name": "register_remote_model",
            "resource_id": "L13IR5QBEpCfUu_-BQdI",
            "resource_type": "model_id"
        },
        {
            "workflow_step_name": "deploy_model",
            "workflow_step_id": "deploy_model_3",
            "resource_id": "L13IR5QBEpCfUu_-BQdI",
            "resource_type": "model_id"
        }
    ]
}

TimeOut Response:

{
    "workflow_id": "SmACR5QBdrR0lYdqgHa9",
    "state": "PROVISIONING",
    "resources_created": [
        {
            "workflow_step_name": "create_connector",
            "workflow_step_id": "create_connector_1",
            "resource_type": "connector_id",
            "resource_id": "S2ACR5QBdrR0lYdqgXYK"
        },
        {
            "workflow_step_name": "register_remote_model",
            "workflow_step_id": "register_model_2",
            "resource_type": "model_id",
            "resource_id": "TWACR5QBdrR0lYdqgXZ-"
        }
    ]
}

Areas of Concern:

I have a few parts of the implementation that I believe can be further improved, particularly in ProvisionWorkflowTransportAction. Some of the logic feels a bit verbose and might not be the most efficient way to handle the timeout and synchronous execution. I’d appreciate the feedback from reviewers.

Related Issues

Resolves #967

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Junwei Dai added 2 commits January 7, 2025 15:00
Signed-off-by: Junwei Dai <[email protected]>
Copy link
Member

@dbwiddis dbwiddis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good.

  • You need to handle -1 time value; my recommendation is you use that for the default "async" rather than null
  • You need to do stream version checks for the new (optional) workflow state in the response, and the new timeout parameter in the workflow request (unless you want to just keep it in the params map).

@@ -87,6 +89,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);
boolean reprovision = request.paramAsBoolean(REPROVISION_WORKFLOW, false);
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
TimeValue waitForCompletionTimeout = request.paramAsTime(WAIT_FOR_COMPLETION_TIMEOUT, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally like to avoid null whenever possible. It can crop up in unexpected places (toString() gets called somewhere, for example).

There is a special constant TimeValue.MINUS_ONE you can use to "disable" the feature. Then later rather than testing != null you can just test for this special value.

This is a common pattern in OpenSearch when using TimeValue settings or parameters, so it is behavior that existing users should expect. See for example cancel_after_time_interval.

In fact, since -1 is an allowed timevalue, unless you explicitly test for it to not be negative, you'll allow that particular edge case (which will probably behave similarly to 0, returning immediately?).

No other negative values are allowed in timevalues, so you can reliably use a <0 test for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, I'm not sure we need to parse it here, rather than just passing it along in the params map to be handled in the transport action?

) {
threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL).execute(() -> {
try {
Thread.sleep(timeout);
Copy link
Member

@dbwiddis dbwiddis Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are probably better ways to handle timeouts than sleeping threads.

Async search uses a TimeoutRunnableListener<Response> to execute a block of code when the timeout expires. We should probably follow that pattern.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport 2.x backport PRs to 2.x branch
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Add option to provision synchronously
2 participants