Skip to content

Commit

Permalink
Add task orchestration sample with polling and signal (#16)
Browse files Browse the repository at this point in the history
* Add polling task sample

* change threshold

* add scn
  • Loading branch information
longquanzheng authored Jan 2, 2024
1 parent 73f6a86 commit 286ba5b
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 0 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ This [Subscription workflow](https://github.com/indeedeng/iwf-golang-samples/tre
* [Temporal Java Sample](https://github.com/temporalio/subscription-workflow-project-template-java)
* [Cadence Java example](https://cadenceworkflow.io/docs/concepts/workflows/#example)


### [Task orchestration with polling & signal](./workflows/polling)
Orchestrating three services:
* Task A: receive a signal when completing
* Task B: receive a signal when completing
* Task C: polling until completion
3 changes: 3 additions & 0 deletions cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func startWorkflowWorker() (closeFunc func()) {

router.GET("/moneytransfer/start", startMoneyTransferWorkflow)

router.GET("/polling/start", startPollingWorkflow)
router.GET("/polling/complete", signalPollingWorkflow)

wfServer := &http.Server{
Addr: ":" + iwf.DefaultWorkerPort,
Handler: router,
Expand Down
43 changes: 43 additions & 0 deletions cmd/server/iwf/polling_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package iwf

import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/indeedeng/iwf-golang-samples/workflows/polling"
"net/http"
"strconv"
)

func startPollingWorkflow(c *gin.Context) {
wfId := c.Query("workflowId")
pollingCompletionThreshold := c.Query("pollingCompletionThreshold")

pollingCompletionThresholdInt, err := strconv.Atoi(pollingCompletionThreshold)
if err != nil {
c.JSON(http.StatusBadRequest, "must provide correct pollingCompletionThreshold via URL parameter")
return
}

_, err = client.StartWorkflow(c.Request.Context(), polling.PollingWorkflow{}, wfId, 0, pollingCompletionThresholdInt, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}

c.JSON(http.StatusOK, fmt.Sprintf("workflowId: %v is started", wfId))
return
}

func signalPollingWorkflow(c *gin.Context) {
wfId := c.Query("workflowId")
channel := c.Query("channel")

err := client.SignalWorkflow(c.Request.Context(), polling.PollingWorkflow{}, wfId, "", channel, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}

c.JSON(http.StatusOK, fmt.Sprintf("workflowId: %v is signal", wfId))
return
}
19 changes: 19 additions & 0 deletions workflows/polling/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
### How to run
* Start a iWF server following the [instructions](https://github.com/indeedeng/iwf#how-to-use)
* The easiest way is to run `docker run -p 8801:8801 -p 7233:7233 -p 8233:8233 -e AUTO_FIX_WORKER_URL=host.docker.internal --add-host host.docker.internal:host-gateway -it iworkflowio/iwf-server-lite:latest`
* Build and run this project `make bins && ./iwf-samples start`
* Start a workflow: `http://localhost:8803/polling/start?workflowId=test1&pollingCompletionThreshold=100`
* pollingCompletionThreshold means how many times the workflow will poll before complete the polling task C
* Signal the workflow to complete task A and B:
* complete task A: `http://localhost:8803/polling/complete?workflowId=test1&channel=taskACompleted`
* complete task B: `http://localhost:8803/polling/complete?workflowId=test1&channel=taskACompleted`
* alternatively you can signal the workflow in WebUI manually
* Watch in WebUI `http://localhost:8233/namespaces/default/workflows`
* Modify the pollingCompletionThreshold and see how the workflow complete task C automatically


### Screenshots
* The workflow should automatically continue As New after every 100 actions
<img width="773" alt="Screenshot 2024-01-01 at 10 06 11 PM" src="https://github.com/indeedeng/iwf-golang-samples/assets/4523955/bca7e02c-f24c-4288-9fc6-1cca74a7c1d3">
* You can use query handler to look at the current data like this
<img width="618" alt="Screenshot 2024-01-01 at 10 08 41 PM" src="https://github.com/indeedeng/iwf-golang-samples/assets/4523955/2909b494-5b05-404a-a047-31394eb4b43c">
133 changes: 133 additions & 0 deletions workflows/polling/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package polling

import (
"github.com/indeedeng/iwf-golang-samples/workflows/service"
"github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"
"github.com/indeedeng/iwf-golang-sdk/iwf"
"time"
)

func NewPollingWorkflow(svc service.MyService) iwf.ObjectWorkflow {

return &PollingWorkflow{
svc: svc,
}
}

const (
dataAttrTaskACompleted = "taskACompleted"
dataAttrTaskBCompleted = "taskBCompleted"
dataAttrTaskCCompleted = "taskCCompleted"

dataAttrCurrPolls = "currPolls" // tracks how many polls have been done

SignalChannelTaskACompleted = "taskACompleted"
SignalChannelTaskBCompleted = "taskBCompleted"
)

type PollingWorkflow struct {
iwf.WorkflowDefaults

svc service.MyService
}

func (e PollingWorkflow) GetWorkflowStates() []iwf.StateDef {
return []iwf.StateDef{
iwf.StartingStateDef(&checkAndCompleteState{svc: e.svc}),
}
}

func (e PollingWorkflow) GetPersistenceSchema() []iwf.PersistenceFieldDef {
return []iwf.PersistenceFieldDef{
iwf.DataAttributeDef(dataAttrTaskACompleted),
iwf.DataAttributeDef(dataAttrTaskBCompleted),
iwf.DataAttributeDef(dataAttrTaskCCompleted),
iwf.DataAttributeDef(dataAttrCurrPolls),
}
}

func (e PollingWorkflow) GetCommunicationSchema() []iwf.CommunicationMethodDef {
return []iwf.CommunicationMethodDef{
iwf.SignalChannelDef(SignalChannelTaskACompleted),
iwf.SignalChannelDef(SignalChannelTaskBCompleted),
}
}

type checkAndCompleteState struct {
iwf.WorkflowStateDefaults
svc service.MyService
}

func (i checkAndCompleteState) WaitUntil(
ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication,
) (*iwf.CommandRequest, error) {
var taskACompleted bool
persistence.GetDataAttribute(dataAttrTaskACompleted, &taskACompleted)
var taskBCompleted bool
persistence.GetDataAttribute(dataAttrTaskBCompleted, &taskBCompleted)
var taskCCompleted bool
persistence.GetDataAttribute(dataAttrTaskCCompleted, &taskCCompleted)

var commands []iwf.Command
if !taskACompleted {
commands = append(commands, iwf.NewSignalCommand("", SignalChannelTaskACompleted))
}
if !taskBCompleted {
commands = append(commands, iwf.NewSignalCommand("", SignalChannelTaskBCompleted))
}

if !taskCCompleted {
commands = append(commands, iwf.NewTimerCommand("", time.Now().Add(time.Second*2)))
}

return iwf.AnyCommandCompletedRequest(commands...), nil
}

func (i checkAndCompleteState) Execute(
ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence,
communication iwf.Communication,
) (*iwf.StateDecision, error) {
var taskACompleted bool
persistence.GetDataAttribute(dataAttrTaskACompleted, &taskACompleted)
var taskBCompleted bool
persistence.GetDataAttribute(dataAttrTaskBCompleted, &taskBCompleted)
var taskCCompleted bool
persistence.GetDataAttribute(dataAttrTaskCCompleted, &taskCCompleted)

var maxPollsRequired int
input.Get(&maxPollsRequired)

if !taskCCompleted {
i.svc.CallAPI1("calling API1 for polling service C")

var currPolls int
persistence.GetDataAttribute(dataAttrCurrPolls, &currPolls)
if currPolls >= maxPollsRequired {
taskCCompleted = true
persistence.SetDataAttribute(dataAttrTaskCCompleted, true)
}
persistence.SetDataAttribute(dataAttrCurrPolls, currPolls+1)
}

for _, signal := range commandResults.Signals {
switch signal.ChannelName {
case SignalChannelTaskACompleted:
if signal.Status == iwfidl.RECEIVED {
taskACompleted = true
persistence.SetDataAttribute(dataAttrTaskACompleted, true)
}
case SignalChannelTaskBCompleted:
if signal.Status == iwfidl.RECEIVED {
taskBCompleted = true
persistence.SetDataAttribute(dataAttrTaskBCompleted, true)
}
}
}

if taskACompleted && taskBCompleted && taskCCompleted {
return iwf.GracefulCompletingWorkflow, nil
}

// loop back to check
return iwf.SingleNextState(checkAndCompleteState{}, maxPollsRequired), nil
}
2 changes: 2 additions & 0 deletions workflows/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/indeedeng/iwf-golang-samples/workflows/engagement"
"github.com/indeedeng/iwf-golang-samples/workflows/microservices"
"github.com/indeedeng/iwf-golang-samples/workflows/moneytransfer"
"github.com/indeedeng/iwf-golang-samples/workflows/polling"
"github.com/indeedeng/iwf-golang-samples/workflows/service"
"github.com/indeedeng/iwf-golang-samples/workflows/subscription"
"github.com/indeedeng/iwf-golang-sdk/iwf"
Expand All @@ -20,6 +21,7 @@ func init() {
engagement.NewEngagementWorkflow(svc),
microservices.NewMicroserviceOrchestrationWorkflow(svc),
moneytransfer.NewMoneyTransferWorkflow(svc),
polling.NewPollingWorkflow(svc),
)
if err != nil {
panic(err)
Expand Down

0 comments on commit 286ba5b

Please sign in to comment.