Skip to content

Commit

Permalink
Use separate state for polling (#17)
Browse files Browse the repository at this point in the history
* use separate state for polling

* fix link
  • Loading branch information
longquanzheng authored Jan 2, 2024
1 parent 286ba5b commit 992cee9
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 66 deletions.
2 changes: 1 addition & 1 deletion workflows/polling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* pollingCompletionThreshold means how many times the workflow will poll before complete the polling task C
* Signal the workflow to complete task A and B:
* complete task A: `http://localhost:8803/polling/complete?workflowId=test1&channel=taskACompleted`
* complete task B: `http://localhost:8803/polling/complete?workflowId=test1&channel=taskACompleted`
* complete task B: `http://localhost:8803/polling/complete?workflowId=test1&channel=taskBCompleted`
* alternatively you can signal the workflow in WebUI manually
* Watch in WebUI `http://localhost:8233/namespaces/default/workflows`
* Modify the pollingCompletionThreshold and see how the workflow complete task C automatically
Expand Down
122 changes: 57 additions & 65 deletions workflows/polling/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package polling

import (
"github.com/indeedeng/iwf-golang-samples/workflows/service"
"github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"
"github.com/indeedeng/iwf-golang-sdk/iwf"
"time"
)
Expand All @@ -15,14 +14,12 @@ func NewPollingWorkflow(svc service.MyService) iwf.ObjectWorkflow {
}

const (
dataAttrTaskACompleted = "taskACompleted"
dataAttrTaskBCompleted = "taskBCompleted"
dataAttrTaskCCompleted = "taskCCompleted"

dataAttrCurrPolls = "currPolls" // tracks how many polls have been done

SignalChannelTaskACompleted = "taskACompleted"
SignalChannelTaskBCompleted = "taskBCompleted"

InternalChannelTaskCCompleted = "taskCCompleted"
)

type PollingWorkflow struct {
Expand All @@ -33,15 +30,14 @@ type PollingWorkflow struct {

func (e PollingWorkflow) GetWorkflowStates() []iwf.StateDef {
return []iwf.StateDef{
iwf.StartingStateDef(&checkAndCompleteState{svc: e.svc}),
iwf.StartingStateDef(&initState{}),
iwf.NonStartingStateDef(&pollState{svc: e.svc}),
iwf.NonStartingStateDef(&checkAndCompleteState{svc: e.svc}),
}
}

func (e PollingWorkflow) GetPersistenceSchema() []iwf.PersistenceFieldDef {
return []iwf.PersistenceFieldDef{
iwf.DataAttributeDef(dataAttrTaskACompleted),
iwf.DataAttributeDef(dataAttrTaskBCompleted),
iwf.DataAttributeDef(dataAttrTaskCCompleted),
iwf.DataAttributeDef(dataAttrCurrPolls),
}
}
Expand All @@ -50,9 +46,27 @@ func (e PollingWorkflow) GetCommunicationSchema() []iwf.CommunicationMethodDef {
return []iwf.CommunicationMethodDef{
iwf.SignalChannelDef(SignalChannelTaskACompleted),
iwf.SignalChannelDef(SignalChannelTaskBCompleted),
iwf.InternalChannelDef(InternalChannelTaskCCompleted),
}
}

type initState struct {
iwf.WorkflowStateDefaultsNoWaitUntil
}

func (i initState) Execute(
ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence,
communication iwf.Communication,
) (*iwf.StateDecision, error) {
var maxPollsRequired int
input.Get(&maxPollsRequired)

return iwf.MultiNextStatesWithInput(
iwf.NewStateMovement(pollState{}, maxPollsRequired),
iwf.NewStateMovement(checkAndCompleteState{}, nil),
), nil
}

type checkAndCompleteState struct {
iwf.WorkflowStateDefaults
svc service.MyService
Expand All @@ -61,73 +75,51 @@ type checkAndCompleteState struct {
func (i checkAndCompleteState) WaitUntil(
ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication,
) (*iwf.CommandRequest, error) {
var taskACompleted bool
persistence.GetDataAttribute(dataAttrTaskACompleted, &taskACompleted)
var taskBCompleted bool
persistence.GetDataAttribute(dataAttrTaskBCompleted, &taskBCompleted)
var taskCCompleted bool
persistence.GetDataAttribute(dataAttrTaskCCompleted, &taskCCompleted)

var commands []iwf.Command
if !taskACompleted {
commands = append(commands, iwf.NewSignalCommand("", SignalChannelTaskACompleted))
}
if !taskBCompleted {
commands = append(commands, iwf.NewSignalCommand("", SignalChannelTaskBCompleted))
}

if !taskCCompleted {
commands = append(commands, iwf.NewTimerCommand("", time.Now().Add(time.Second*2)))
}

return iwf.AnyCommandCompletedRequest(commands...), nil
return iwf.AllCommandsCompletedRequest(
iwf.NewSignalCommand("", SignalChannelTaskACompleted),
iwf.NewSignalCommand("", SignalChannelTaskBCompleted),
iwf.NewInternalChannelCommand("", InternalChannelTaskCCompleted),
), nil
}

func (i checkAndCompleteState) Execute(
ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence,
communication iwf.Communication,
) (*iwf.StateDecision, error) {
var taskACompleted bool
persistence.GetDataAttribute(dataAttrTaskACompleted, &taskACompleted)
var taskBCompleted bool
persistence.GetDataAttribute(dataAttrTaskBCompleted, &taskBCompleted)
var taskCCompleted bool
persistence.GetDataAttribute(dataAttrTaskCCompleted, &taskCCompleted)
return iwf.GracefulCompletingWorkflow, nil
}

var maxPollsRequired int
input.Get(&maxPollsRequired)
type pollState struct {
iwf.WorkflowStateDefaults
svc service.MyService
}

if !taskCCompleted {
i.svc.CallAPI1("calling API1 for polling service C")
func (i pollState) WaitUntil(
ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication,
) (*iwf.CommandRequest, error) {

var currPolls int
persistence.GetDataAttribute(dataAttrCurrPolls, &currPolls)
if currPolls >= maxPollsRequired {
taskCCompleted = true
persistence.SetDataAttribute(dataAttrTaskCCompleted, true)
}
persistence.SetDataAttribute(dataAttrCurrPolls, currPolls+1)
}
return iwf.AnyCommandCompletedRequest(
iwf.NewTimerCommand("", time.Now().Add(time.Second*2)),
), nil
}

for _, signal := range commandResults.Signals {
switch signal.ChannelName {
case SignalChannelTaskACompleted:
if signal.Status == iwfidl.RECEIVED {
taskACompleted = true
persistence.SetDataAttribute(dataAttrTaskACompleted, true)
}
case SignalChannelTaskBCompleted:
if signal.Status == iwfidl.RECEIVED {
taskBCompleted = true
persistence.SetDataAttribute(dataAttrTaskBCompleted, true)
}
}
}
func (i pollState) Execute(
ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence,
communication iwf.Communication,
) (*iwf.StateDecision, error) {
var maxPollsRequired int
input.Get(&maxPollsRequired)

if taskACompleted && taskBCompleted && taskCCompleted {
return iwf.GracefulCompletingWorkflow, nil
i.svc.CallAPI1("calling API1 for polling service C")

var currPolls int
persistence.GetDataAttribute(dataAttrCurrPolls, &currPolls)
if currPolls >= maxPollsRequired {
communication.PublishInternalChannel(InternalChannelTaskCCompleted, nil)
return iwf.DeadEnd, nil
}

persistence.SetDataAttribute(dataAttrCurrPolls, currPolls+1)
// loop back to check
return iwf.SingleNextState(checkAndCompleteState{}, maxPollsRequired), nil
}
return iwf.SingleNextState(pollState{}, maxPollsRequired), nil
}

0 comments on commit 992cee9

Please sign in to comment.