From 992cee951de1901b3eba90656443123a615ed051 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Tue, 2 Jan 2024 11:02:51 -0800 Subject: [PATCH] Use separate state for polling (#17) * use separate state for polling * fix link --- workflows/polling/README.md | 2 +- workflows/polling/workflow.go | 122 ++++++++++++++++------------------ 2 files changed, 58 insertions(+), 66 deletions(-) diff --git a/workflows/polling/README.md b/workflows/polling/README.md index 299ed87..280020c 100644 --- a/workflows/polling/README.md +++ b/workflows/polling/README.md @@ -6,7 +6,7 @@ * pollingCompletionThreshold means how many times the workflow will poll before complete the polling task C * Signal the workflow to complete task A and B: * complete task A: `http://localhost:8803/polling/complete?workflowId=test1&channel=taskACompleted` - * complete task B: `http://localhost:8803/polling/complete?workflowId=test1&channel=taskACompleted` + * complete task B: `http://localhost:8803/polling/complete?workflowId=test1&channel=taskBCompleted` * alternatively you can signal the workflow in WebUI manually * Watch in WebUI `http://localhost:8233/namespaces/default/workflows` * Modify the pollingCompletionThreshold and see how the workflow complete task C automatically diff --git a/workflows/polling/workflow.go b/workflows/polling/workflow.go index 9898d37..2041129 100644 --- a/workflows/polling/workflow.go +++ b/workflows/polling/workflow.go @@ -2,7 +2,6 @@ package polling import ( "github.com/indeedeng/iwf-golang-samples/workflows/service" - "github.com/indeedeng/iwf-golang-sdk/gen/iwfidl" "github.com/indeedeng/iwf-golang-sdk/iwf" "time" ) @@ -15,14 +14,12 @@ func NewPollingWorkflow(svc service.MyService) iwf.ObjectWorkflow { } const ( - dataAttrTaskACompleted = "taskACompleted" - dataAttrTaskBCompleted = "taskBCompleted" - dataAttrTaskCCompleted = "taskCCompleted" - dataAttrCurrPolls = "currPolls" // tracks how many polls have been done SignalChannelTaskACompleted = "taskACompleted" SignalChannelTaskBCompleted = "taskBCompleted" + + InternalChannelTaskCCompleted = "taskCCompleted" ) type PollingWorkflow struct { @@ -33,15 +30,14 @@ type PollingWorkflow struct { func (e PollingWorkflow) GetWorkflowStates() []iwf.StateDef { return []iwf.StateDef{ - iwf.StartingStateDef(&checkAndCompleteState{svc: e.svc}), + iwf.StartingStateDef(&initState{}), + iwf.NonStartingStateDef(&pollState{svc: e.svc}), + iwf.NonStartingStateDef(&checkAndCompleteState{svc: e.svc}), } } func (e PollingWorkflow) GetPersistenceSchema() []iwf.PersistenceFieldDef { return []iwf.PersistenceFieldDef{ - iwf.DataAttributeDef(dataAttrTaskACompleted), - iwf.DataAttributeDef(dataAttrTaskBCompleted), - iwf.DataAttributeDef(dataAttrTaskCCompleted), iwf.DataAttributeDef(dataAttrCurrPolls), } } @@ -50,9 +46,27 @@ func (e PollingWorkflow) GetCommunicationSchema() []iwf.CommunicationMethodDef { return []iwf.CommunicationMethodDef{ iwf.SignalChannelDef(SignalChannelTaskACompleted), iwf.SignalChannelDef(SignalChannelTaskBCompleted), + iwf.InternalChannelDef(InternalChannelTaskCCompleted), } } +type initState struct { + iwf.WorkflowStateDefaultsNoWaitUntil +} + +func (i initState) Execute( + ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, + communication iwf.Communication, +) (*iwf.StateDecision, error) { + var maxPollsRequired int + input.Get(&maxPollsRequired) + + return iwf.MultiNextStatesWithInput( + iwf.NewStateMovement(pollState{}, maxPollsRequired), + iwf.NewStateMovement(checkAndCompleteState{}, nil), + ), nil +} + type checkAndCompleteState struct { iwf.WorkflowStateDefaults svc service.MyService @@ -61,73 +75,51 @@ type checkAndCompleteState struct { func (i checkAndCompleteState) WaitUntil( ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication, ) (*iwf.CommandRequest, error) { - var taskACompleted bool - persistence.GetDataAttribute(dataAttrTaskACompleted, &taskACompleted) - var taskBCompleted bool - persistence.GetDataAttribute(dataAttrTaskBCompleted, &taskBCompleted) - var taskCCompleted bool - persistence.GetDataAttribute(dataAttrTaskCCompleted, &taskCCompleted) - - var commands []iwf.Command - if !taskACompleted { - commands = append(commands, iwf.NewSignalCommand("", SignalChannelTaskACompleted)) - } - if !taskBCompleted { - commands = append(commands, iwf.NewSignalCommand("", SignalChannelTaskBCompleted)) - } - - if !taskCCompleted { - commands = append(commands, iwf.NewTimerCommand("", time.Now().Add(time.Second*2))) - } - - return iwf.AnyCommandCompletedRequest(commands...), nil + return iwf.AllCommandsCompletedRequest( + iwf.NewSignalCommand("", SignalChannelTaskACompleted), + iwf.NewSignalCommand("", SignalChannelTaskBCompleted), + iwf.NewInternalChannelCommand("", InternalChannelTaskCCompleted), + ), nil } func (i checkAndCompleteState) Execute( ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication, ) (*iwf.StateDecision, error) { - var taskACompleted bool - persistence.GetDataAttribute(dataAttrTaskACompleted, &taskACompleted) - var taskBCompleted bool - persistence.GetDataAttribute(dataAttrTaskBCompleted, &taskBCompleted) - var taskCCompleted bool - persistence.GetDataAttribute(dataAttrTaskCCompleted, &taskCCompleted) + return iwf.GracefulCompletingWorkflow, nil +} - var maxPollsRequired int - input.Get(&maxPollsRequired) +type pollState struct { + iwf.WorkflowStateDefaults + svc service.MyService +} - if !taskCCompleted { - i.svc.CallAPI1("calling API1 for polling service C") +func (i pollState) WaitUntil( + ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication, +) (*iwf.CommandRequest, error) { - var currPolls int - persistence.GetDataAttribute(dataAttrCurrPolls, &currPolls) - if currPolls >= maxPollsRequired { - taskCCompleted = true - persistence.SetDataAttribute(dataAttrTaskCCompleted, true) - } - persistence.SetDataAttribute(dataAttrCurrPolls, currPolls+1) - } + return iwf.AnyCommandCompletedRequest( + iwf.NewTimerCommand("", time.Now().Add(time.Second*2)), + ), nil +} - for _, signal := range commandResults.Signals { - switch signal.ChannelName { - case SignalChannelTaskACompleted: - if signal.Status == iwfidl.RECEIVED { - taskACompleted = true - persistence.SetDataAttribute(dataAttrTaskACompleted, true) - } - case SignalChannelTaskBCompleted: - if signal.Status == iwfidl.RECEIVED { - taskBCompleted = true - persistence.SetDataAttribute(dataAttrTaskBCompleted, true) - } - } - } +func (i pollState) Execute( + ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, + communication iwf.Communication, +) (*iwf.StateDecision, error) { + var maxPollsRequired int + input.Get(&maxPollsRequired) - if taskACompleted && taskBCompleted && taskCCompleted { - return iwf.GracefulCompletingWorkflow, nil + i.svc.CallAPI1("calling API1 for polling service C") + + var currPolls int + persistence.GetDataAttribute(dataAttrCurrPolls, &currPolls) + if currPolls >= maxPollsRequired { + communication.PublishInternalChannel(InternalChannelTaskCCompleted, nil) + return iwf.DeadEnd, nil } + persistence.SetDataAttribute(dataAttrCurrPolls, currPolls+1) // loop back to check - return iwf.SingleNextState(checkAndCompleteState{}, maxPollsRequired), nil -} + return iwf.SingleNextState(pollState{}, maxPollsRequired), nil +} \ No newline at end of file