From 67cff3ab895bca849104d1170fe9625b13413523 Mon Sep 17 00:00:00 2001 From: mikeee Date: Wed, 31 Jan 2024 23:34:08 +0000 Subject: [PATCH] feat: initial go order-processor workflow quickstart Signed-off-by: mikeee --- workflows/go/sdk/order-processor/main.go | 140 +++++++++++++++ workflows/go/sdk/order-processor/models.go | 43 +++++ workflows/go/sdk/order-processor/workflow.go | 176 +++++++++++++++++++ 3 files changed, 359 insertions(+) create mode 100644 workflows/go/sdk/order-processor/main.go create mode 100644 workflows/go/sdk/order-processor/models.go create mode 100644 workflows/go/sdk/order-processor/workflow.go diff --git a/workflows/go/sdk/order-processor/main.go b/workflows/go/sdk/order-processor/main.go new file mode 100644 index 000000000..38689cc31 --- /dev/null +++ b/workflows/go/sdk/order-processor/main.go @@ -0,0 +1,140 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/dapr/go-sdk/client" + "github.com/dapr/go-sdk/workflow" +) + +var ( + stateStoreName = "statestore" + workflowComponent = "dapr" + workflowName = "OrderProcessingWorkflow" + defaultItemName = "cars" +) + +func main() { + fmt.Println("*** Welcome to the Dapr Workflow console app sample!") + fmt.Println("*** Using this app, you can place orders that start workflows.") + + w, err := workflow.NewWorker() + if err != nil { + log.Fatalf("failed to start worker: %v", err) + } + + if err := w.RegisterWorkflow(OrderProcessingWorkflow); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(NotifyActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(RequestApprovalActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(VerifyInventoryActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(ProcessPaymentActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(UpdateInventoryActivity); err != nil { + log.Fatal(err) + } + + if err := w.Start(); err != nil { + log.Fatal(err) + } + + daprClient, err := client.NewClient() + if err != nil { + log.Fatalf("failed to initialise dapr client: %v", err) + } + wfClient, err := workflow.NewClient(workflow.WithDaprClient(daprClient)) + if err != nil { + log.Fatalf("failed to initialise workflow client: %v", err) + } + + inventory := []InventoryItem{ + {ItemName: "paperclip", PerItemCost: 5, Quantity: 100}, + {ItemName: "cars", PerItemCost: 15000, Quantity: 100}, + {ItemName: "computers", PerItemCost: 500, Quantity: 100}, + } + if err := restockInventory(daprClient, inventory); err != nil { + log.Fatalf("failed to restock: %v", err) + } + + fmt.Println("==========Begin the purchase of item:==========") + + itemName := defaultItemName + orderQuantity := 10 + + totalCost := inventory[1].PerItemCost * orderQuantity + + orderPayload := OrderPayload{ + ItemName: itemName, + Quantity: orderQuantity, + TotalCost: totalCost, + } + + id, err := wfClient.ScheduleNewWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload)) + if err != nil { + log.Fatalf("failed to start workflow: %v", err) + } + + approvalSought := false + + startTime := time.Now() + + for { + timeDelta := time.Since(startTime) + metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id) + if err != nil { + log.Fatalf("failed to fetch workflow: %v", err) + } + if (metadata.RuntimeStatus == workflow.StatusCompleted) || (metadata.RuntimeStatus == workflow.StatusFailed) || (metadata.RuntimeStatus == workflow.StatusTerminated) { + fmt.Printf("Workflow completed - result: %v\n", metadata.RuntimeStatus.String()) + break + } + if timeDelta.Seconds() >= 1 { + metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id) + if err != nil { + log.Fatalf("failed to fetch workflow: %v", err) + } + if totalCost > 50000 && !approvalSought && ((metadata.RuntimeStatus != workflow.StatusCompleted) || (metadata.RuntimeStatus != workflow.StatusFailed) || (metadata.RuntimeStatus != workflow.StatusTerminated)) { + approvalSought = true + promptForApproval(id) + } + } + } + + fmt.Println("Purchase of item is complete") +} + +func promptForApproval(id string) { + wfClient, err := workflow.NewClient() + if err != nil { + log.Fatalf("failed to initialise wfClient: %v", err) + } + if err := wfClient.RaiseEvent(context.Background(), id, "manager_approval"); err != nil { + log.Fatal(err) + } +} + +func restockInventory(daprClient client.Client, inventory []InventoryItem) error { + for _, item := range inventory { + itemSerialized, err := json.Marshal(item) + if err != nil { + return err + } + fmt.Printf("adding base stock item: %s\n", item.ItemName) + if err := daprClient.SaveState(context.Background(), stateStoreName, item.ItemName, itemSerialized, nil); err != nil { + return err + } + } + return nil +} diff --git a/workflows/go/sdk/order-processor/models.go b/workflows/go/sdk/order-processor/models.go new file mode 100644 index 000000000..29d340932 --- /dev/null +++ b/workflows/go/sdk/order-processor/models.go @@ -0,0 +1,43 @@ +package main + +type OrderPayload struct { + ItemName string `json:"item_name"` + TotalCost int `json:"total_cost"` + Quantity int `json:"quanity"` +} + +type OrderResult struct { + Processed bool `json:"processed"` +} + +type InventoryItem struct { + ItemName string `json:"item_name"` + PerItemCost int `json:"per_item_cost"` + Quantity int `json:"quanity"` +} + +type InventoryRequest struct { + RequestID string `json:"request_id"` + ItemName string `json:"item_name"` + Quantity int `json:"quanity"` +} + +type InventoryResult struct { + Success bool `json:"success"` + InventoryItem InventoryItem `json:"inventory_item"` +} + +type PaymentRequest struct { + RequestID string `json:"request_id"` + ItemBeingPurchased string `json:"item_being_purchased"` + Amount int `json:"amount"` + Quantity int `json:"quantity"` +} + +type ApprovalRequired struct { + Approval bool `json:"approval"` +} + +type Notification struct { + Message string `json:"message"` +} diff --git a/workflows/go/sdk/order-processor/workflow.go b/workflows/go/sdk/order-processor/workflow.go new file mode 100644 index 000000000..4f26f640d --- /dev/null +++ b/workflows/go/sdk/order-processor/workflow.go @@ -0,0 +1,176 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/dapr/go-sdk/client" + "github.com/dapr/go-sdk/workflow" +) + +func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) { + orderID := ctx.InstanceID() + var orderPayload OrderPayload + if err := ctx.GetInput(&orderPayload); err != nil { + return nil, err + } + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Received order %s for %d %s - $%d", orderID, orderPayload.Quantity, orderPayload.ItemName, orderPayload.TotalCost)})).Await(nil); err != nil { + return OrderResult{Processed: false}, err + } + + var verifyInventoryResult InventoryResult + if err := ctx.CallActivity(VerifyInventoryActivity, workflow.ActivityInput(InventoryRequest{ + RequestID: orderID, + ItemName: orderPayload.ItemName, + Quantity: orderPayload.Quantity, + })).Await(&verifyInventoryResult); err != nil { + return OrderResult{Processed: false}, err + } + + if !verifyInventoryResult.Success { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Insufficient inventory for %s", orderPayload.ItemName)})).Await(nil); err != nil { + return OrderResult{Processed: false}, err + } + } + + if orderPayload.TotalCost > 50000 { + var approvalRequired ApprovalRequired + if err := ctx.CallActivity(RequestApprovalActivity, workflow.ActivityInput(orderPayload)).Await(&approvalRequired); err != nil { + return OrderResult{Processed: false}, err + } + if err := ctx.WaitForExternalEvent("manager_approval", time.Second*200).Await(nil); err != nil { + return OrderResult{Processed: false}, err + } + // TODO: Confirm timeout flow - this will be in the form of an error. + if approvalRequired.Approval { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a successful order: %v\n", err) + } + } else { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of an unsuccessful order :%v\n", err) + } + return OrderResult{Processed: false}, nil + } + } + if err := ctx.CallActivity(ProcessPaymentActivity, workflow.ActivityInput(PaymentRequest{ + RequestID: orderID, + ItemBeingPurchased: orderPayload.ItemName, + Amount: orderPayload.TotalCost, + Quantity: orderPayload.Quantity, + })).Await(nil); err != nil { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a failed order: %v", err) + } + return OrderResult{Processed: false}, err + } + if err := ctx.CallActivity(UpdateInventoryActivity, workflow.ActivityInput(PaymentRequest{RequestID: orderID, ItemBeingPurchased: orderPayload.ItemName, Amount: orderPayload.TotalCost, Quantity: orderPayload.Quantity})).Await(nil); err != nil { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a failed order: %v", err) + } + return OrderResult{Processed: false}, err + } + + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a successful order: %v", err) + } + return OrderResult{Processed: true}, nil +} + +// NotifyActivity outputs a notification message +func NotifyActivity(ctx workflow.ActivityContext) (any, error) { + var input Notification + if err := ctx.GetInput(&input); err != nil { + return "", err + } + fmt.Printf("NotifyActivity: %s\n", input.Message) + return nil, nil +} + +// ProcessPaymentActivity is used to process a payment +func ProcessPaymentActivity(ctx workflow.ActivityContext) (any, error) { + var input PaymentRequest + if err := ctx.GetInput(&input); err != nil { + return "", err + } + fmt.Printf("ProcessPaymentActivity: %s for %d - %s (%dUSD)\n", input.RequestID, input.Quantity, input.ItemBeingPurchased, input.Amount) + return nil, nil +} + +// VerifyInventoryActivity is used to verify if an item is availabe in the inventory +// g +func VerifyInventoryActivity(ctx workflow.ActivityContext) (any, error) { + var input InventoryRequest + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + fmt.Printf("VerifyInventoryActivity: Verifying inventory for order %s of %d %s\n", input.RequestID, input.Quantity, input.ItemName) + dClient, err := client.NewClient() + if err != nil { + return nil, err + } + item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemName, nil) + if err != nil { + return nil, err + } + if item == nil { + return InventoryResult{ + Success: false, + InventoryItem: InventoryItem{}, + }, nil + } + var result InventoryItem + if err := json.Unmarshal(item.Value, &result); err != nil { + log.Fatalf("failed to parse inventory result %v", err) + } + fmt.Printf("VerifyInventoryActivity: There are %d of %s available for purchase\n", result.Quantity, result.ItemName) + if result.Quantity >= input.Quantity { + return InventoryResult{Success: true, InventoryItem: result}, nil + } + return InventoryResult{Success: false, InventoryItem: InventoryItem{}}, nil +} + +func UpdateInventoryActivity(ctx workflow.ActivityContext) (any, error) { + var input PaymentRequest + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + fmt.Printf("UpdateInventoryActivity: Checking Inventory for order %s for %d * %s\n", input.RequestID, input.Quantity, input.ItemBeingPurchased) + dClient, err := client.NewClient() + if err != nil { + return nil, err + } + item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemBeingPurchased, nil) + if err != nil { + return nil, err + } + var result InventoryItem + err = json.Unmarshal(item.Value, &result) + if err != nil { + return nil, err + } + newQuantity := result.Quantity - input.Quantity + if newQuantity < 0 { + return nil, fmt.Errorf("insufficient inventory for: %s", input.ItemBeingPurchased) + } + result.Quantity = newQuantity + newState, err := json.Marshal(result) + if err != nil { + log.Fatalf("failed to marshal new state: %v", err) + } + dClient.SaveState(context.Background(), stateStoreName, input.ItemBeingPurchased, newState, nil) + fmt.Printf("UpdateInventoryActivity: There are now %d %s left in stock\n", result.Quantity, result.ItemName) + return InventoryResult{Success: true, InventoryItem: result}, nil +} + +func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) { + var input OrderPayload + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + fmt.Printf("RequestApprovalActivity: Requesting approval for payment of %dUSD for %d %s\n", input.TotalCost, input.Quantity, input.ItemName) + return ApprovalRequired{Approval: true}, nil +}