-
Notifications
You must be signed in to change notification settings - Fork 530
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: initial go order-processor workflow quickstart
Signed-off-by: mikeee <[email protected]>
- Loading branch information
Showing
3 changed files
with
359 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"log" | ||
"time" | ||
|
||
"github.com/dapr/go-sdk/client" | ||
"github.com/dapr/go-sdk/workflow" | ||
) | ||
|
||
var ( | ||
stateStoreName = "statestore" | ||
workflowComponent = "dapr" | ||
workflowName = "OrderProcessingWorkflow" | ||
defaultItemName = "cars" | ||
) | ||
|
||
func main() { | ||
fmt.Println("*** Welcome to the Dapr Workflow console app sample!") | ||
fmt.Println("*** Using this app, you can place orders that start workflows.") | ||
|
||
w, err := workflow.NewWorker() | ||
if err != nil { | ||
log.Fatalf("failed to start worker: %v", err) | ||
} | ||
|
||
if err := w.RegisterWorkflow(OrderProcessingWorkflow); err != nil { | ||
log.Fatal(err) | ||
} | ||
if err := w.RegisterActivity(NotifyActivity); err != nil { | ||
log.Fatal(err) | ||
} | ||
if err := w.RegisterActivity(RequestApprovalActivity); err != nil { | ||
log.Fatal(err) | ||
} | ||
if err := w.RegisterActivity(VerifyInventoryActivity); err != nil { | ||
log.Fatal(err) | ||
} | ||
if err := w.RegisterActivity(ProcessPaymentActivity); err != nil { | ||
log.Fatal(err) | ||
} | ||
if err := w.RegisterActivity(UpdateInventoryActivity); err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
if err := w.Start(); err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
daprClient, err := client.NewClient() | ||
if err != nil { | ||
log.Fatalf("failed to initialise dapr client: %v", err) | ||
} | ||
wfClient, err := workflow.NewClient(workflow.WithDaprClient(daprClient)) | ||
if err != nil { | ||
log.Fatalf("failed to initialise workflow client: %v", err) | ||
} | ||
|
||
inventory := []InventoryItem{ | ||
{ItemName: "paperclip", PerItemCost: 5, Quantity: 100}, | ||
{ItemName: "cars", PerItemCost: 15000, Quantity: 100}, | ||
{ItemName: "computers", PerItemCost: 500, Quantity: 100}, | ||
} | ||
if err := restockInventory(daprClient, inventory); err != nil { | ||
log.Fatalf("failed to restock: %v", err) | ||
} | ||
|
||
fmt.Println("==========Begin the purchase of item:==========") | ||
|
||
itemName := defaultItemName | ||
orderQuantity := 10 | ||
|
||
totalCost := inventory[1].PerItemCost * orderQuantity | ||
|
||
orderPayload := OrderPayload{ | ||
ItemName: itemName, | ||
Quantity: orderQuantity, | ||
TotalCost: totalCost, | ||
} | ||
|
||
id, err := wfClient.ScheduleNewWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload)) | ||
if err != nil { | ||
log.Fatalf("failed to start workflow: %v", err) | ||
} | ||
|
||
approvalSought := false | ||
|
||
startTime := time.Now() | ||
|
||
for { | ||
timeDelta := time.Since(startTime) | ||
metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id) | ||
if err != nil { | ||
log.Fatalf("failed to fetch workflow: %v", err) | ||
} | ||
if (metadata.RuntimeStatus == workflow.StatusCompleted) || (metadata.RuntimeStatus == workflow.StatusFailed) || (metadata.RuntimeStatus == workflow.StatusTerminated) { | ||
fmt.Printf("Workflow completed - result: %v\n", metadata.RuntimeStatus.String()) | ||
break | ||
} | ||
if timeDelta.Seconds() >= 1 { | ||
metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id) | ||
if err != nil { | ||
log.Fatalf("failed to fetch workflow: %v", err) | ||
} | ||
if totalCost > 50000 && !approvalSought && ((metadata.RuntimeStatus != workflow.StatusCompleted) || (metadata.RuntimeStatus != workflow.StatusFailed) || (metadata.RuntimeStatus != workflow.StatusTerminated)) { | ||
approvalSought = true | ||
promptForApproval(id) | ||
} | ||
} | ||
} | ||
|
||
fmt.Println("Purchase of item is complete") | ||
} | ||
|
||
func promptForApproval(id string) { | ||
wfClient, err := workflow.NewClient() | ||
if err != nil { | ||
log.Fatalf("failed to initialise wfClient: %v", err) | ||
} | ||
if err := wfClient.RaiseEvent(context.Background(), id, "manager_approval"); err != nil { | ||
log.Fatal(err) | ||
} | ||
} | ||
|
||
func restockInventory(daprClient client.Client, inventory []InventoryItem) error { | ||
for _, item := range inventory { | ||
itemSerialized, err := json.Marshal(item) | ||
if err != nil { | ||
return err | ||
} | ||
fmt.Printf("adding base stock item: %s\n", item.ItemName) | ||
if err := daprClient.SaveState(context.Background(), stateStoreName, item.ItemName, itemSerialized, nil); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package main | ||
|
||
type OrderPayload struct { | ||
ItemName string `json:"item_name"` | ||
TotalCost int `json:"total_cost"` | ||
Quantity int `json:"quanity"` | ||
} | ||
|
||
type OrderResult struct { | ||
Processed bool `json:"processed"` | ||
} | ||
|
||
type InventoryItem struct { | ||
ItemName string `json:"item_name"` | ||
PerItemCost int `json:"per_item_cost"` | ||
Quantity int `json:"quanity"` | ||
} | ||
|
||
type InventoryRequest struct { | ||
RequestID string `json:"request_id"` | ||
ItemName string `json:"item_name"` | ||
Quantity int `json:"quanity"` | ||
} | ||
|
||
type InventoryResult struct { | ||
Success bool `json:"success"` | ||
InventoryItem InventoryItem `json:"inventory_item"` | ||
} | ||
|
||
type PaymentRequest struct { | ||
RequestID string `json:"request_id"` | ||
ItemBeingPurchased string `json:"item_being_purchased"` | ||
Amount int `json:"amount"` | ||
Quantity int `json:"quantity"` | ||
} | ||
|
||
type ApprovalRequired struct { | ||
Approval bool `json:"approval"` | ||
} | ||
|
||
type Notification struct { | ||
Message string `json:"message"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"log" | ||
"time" | ||
|
||
"github.com/dapr/go-sdk/client" | ||
"github.com/dapr/go-sdk/workflow" | ||
) | ||
|
||
func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) { | ||
orderID := ctx.InstanceID() | ||
var orderPayload OrderPayload | ||
if err := ctx.GetInput(&orderPayload); err != nil { | ||
return nil, err | ||
} | ||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Received order %s for %d %s - $%d", orderID, orderPayload.Quantity, orderPayload.ItemName, orderPayload.TotalCost)})).Await(nil); err != nil { | ||
return OrderResult{Processed: false}, err | ||
} | ||
|
||
var verifyInventoryResult InventoryResult | ||
if err := ctx.CallActivity(VerifyInventoryActivity, workflow.ActivityInput(InventoryRequest{ | ||
RequestID: orderID, | ||
ItemName: orderPayload.ItemName, | ||
Quantity: orderPayload.Quantity, | ||
})).Await(&verifyInventoryResult); err != nil { | ||
return OrderResult{Processed: false}, err | ||
} | ||
|
||
if !verifyInventoryResult.Success { | ||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Insufficient inventory for %s", orderPayload.ItemName)})).Await(nil); err != nil { | ||
return OrderResult{Processed: false}, err | ||
} | ||
} | ||
|
||
if orderPayload.TotalCost > 50000 { | ||
var approvalRequired ApprovalRequired | ||
if err := ctx.CallActivity(RequestApprovalActivity, workflow.ActivityInput(orderPayload)).Await(&approvalRequired); err != nil { | ||
return OrderResult{Processed: false}, err | ||
} | ||
if err := ctx.WaitForExternalEvent("manager_approval", time.Second*200).Await(nil); err != nil { | ||
return OrderResult{Processed: false}, err | ||
} | ||
// TODO: Confirm timeout flow - this will be in the form of an error. | ||
if approvalRequired.Approval { | ||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil { | ||
log.Printf("failed to notify of a successful order: %v\n", err) | ||
} | ||
} else { | ||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil { | ||
log.Printf("failed to notify of an unsuccessful order :%v\n", err) | ||
} | ||
return OrderResult{Processed: false}, nil | ||
} | ||
} | ||
if err := ctx.CallActivity(ProcessPaymentActivity, workflow.ActivityInput(PaymentRequest{ | ||
RequestID: orderID, | ||
ItemBeingPurchased: orderPayload.ItemName, | ||
Amount: orderPayload.TotalCost, | ||
Quantity: orderPayload.Quantity, | ||
})).Await(nil); err != nil { | ||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil { | ||
log.Printf("failed to notify of a failed order: %v", err) | ||
} | ||
return OrderResult{Processed: false}, err | ||
} | ||
if err := ctx.CallActivity(UpdateInventoryActivity, workflow.ActivityInput(PaymentRequest{RequestID: orderID, ItemBeingPurchased: orderPayload.ItemName, Amount: orderPayload.TotalCost, Quantity: orderPayload.Quantity})).Await(nil); err != nil { | ||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil { | ||
log.Printf("failed to notify of a failed order: %v", err) | ||
} | ||
return OrderResult{Processed: false}, err | ||
} | ||
|
||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil { | ||
log.Printf("failed to notify of a successful order: %v", err) | ||
} | ||
return OrderResult{Processed: true}, nil | ||
} | ||
|
||
// NotifyActivity outputs a notification message | ||
func NotifyActivity(ctx workflow.ActivityContext) (any, error) { | ||
var input Notification | ||
if err := ctx.GetInput(&input); err != nil { | ||
return "", err | ||
} | ||
fmt.Printf("NotifyActivity: %s\n", input.Message) | ||
return nil, nil | ||
} | ||
|
||
// ProcessPaymentActivity is used to process a payment | ||
func ProcessPaymentActivity(ctx workflow.ActivityContext) (any, error) { | ||
var input PaymentRequest | ||
if err := ctx.GetInput(&input); err != nil { | ||
return "", err | ||
} | ||
fmt.Printf("ProcessPaymentActivity: %s for %d - %s (%dUSD)\n", input.RequestID, input.Quantity, input.ItemBeingPurchased, input.Amount) | ||
return nil, nil | ||
} | ||
|
||
// VerifyInventoryActivity is used to verify if an item is availabe in the inventory | ||
// g | ||
func VerifyInventoryActivity(ctx workflow.ActivityContext) (any, error) { | ||
var input InventoryRequest | ||
if err := ctx.GetInput(&input); err != nil { | ||
return nil, err | ||
} | ||
fmt.Printf("VerifyInventoryActivity: Verifying inventory for order %s of %d %s\n", input.RequestID, input.Quantity, input.ItemName) | ||
dClient, err := client.NewClient() | ||
if err != nil { | ||
return nil, err | ||
} | ||
item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemName, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if item == nil { | ||
return InventoryResult{ | ||
Success: false, | ||
InventoryItem: InventoryItem{}, | ||
}, nil | ||
} | ||
var result InventoryItem | ||
if err := json.Unmarshal(item.Value, &result); err != nil { | ||
log.Fatalf("failed to parse inventory result %v", err) | ||
} | ||
fmt.Printf("VerifyInventoryActivity: There are %d of %s available for purchase\n", result.Quantity, result.ItemName) | ||
if result.Quantity >= input.Quantity { | ||
return InventoryResult{Success: true, InventoryItem: result}, nil | ||
} | ||
return InventoryResult{Success: false, InventoryItem: InventoryItem{}}, nil | ||
} | ||
|
||
func UpdateInventoryActivity(ctx workflow.ActivityContext) (any, error) { | ||
var input PaymentRequest | ||
if err := ctx.GetInput(&input); err != nil { | ||
return nil, err | ||
} | ||
fmt.Printf("UpdateInventoryActivity: Checking Inventory for order %s for %d * %s\n", input.RequestID, input.Quantity, input.ItemBeingPurchased) | ||
dClient, err := client.NewClient() | ||
if err != nil { | ||
return nil, err | ||
} | ||
item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemBeingPurchased, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
var result InventoryItem | ||
err = json.Unmarshal(item.Value, &result) | ||
if err != nil { | ||
return nil, err | ||
} | ||
newQuantity := result.Quantity - input.Quantity | ||
if newQuantity < 0 { | ||
return nil, fmt.Errorf("insufficient inventory for: %s", input.ItemBeingPurchased) | ||
} | ||
result.Quantity = newQuantity | ||
newState, err := json.Marshal(result) | ||
if err != nil { | ||
log.Fatalf("failed to marshal new state: %v", err) | ||
} | ||
dClient.SaveState(context.Background(), stateStoreName, input.ItemBeingPurchased, newState, nil) | ||
fmt.Printf("UpdateInventoryActivity: There are now %d %s left in stock\n", result.Quantity, result.ItemName) | ||
return InventoryResult{Success: true, InventoryItem: result}, nil | ||
} | ||
|
||
func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) { | ||
var input OrderPayload | ||
if err := ctx.GetInput(&input); err != nil { | ||
return nil, err | ||
} | ||
fmt.Printf("RequestApprovalActivity: Requesting approval for payment of %dUSD for %d %s\n", input.TotalCost, input.Quantity, input.ItemName) | ||
return ApprovalRequired{Approval: true}, nil | ||
} |