Skip to content

Commit

Permalink
Return error before launching workflow if connector not found in forw…
Browse files Browse the repository at this point in the history
…ard bank account req
  • Loading branch information
laouji committed Feb 10, 2025
1 parent e6cc95c commit eed2c80
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 23 deletions.
42 changes: 23 additions & 19 deletions internal/connectors/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (e *engine) InstallConnector(ctx context.Context, provider string, rawConfi
detachedCtx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("install-%s-%s", e.stack, connector.ID.String()),
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -222,7 +222,7 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn
detachedCtx,
client.StartWorkflowOptions{
ID: id,
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand All @@ -232,7 +232,7 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn
workflow.RunUninstallConnector,
workflow.UninstallConnector{
ConnectorID: connectorID,
DefaultWorkerName: getDefaultTaskQueue(e.stack),
DefaultWorkerName: GetDefaultTaskQueue(e.stack),
TaskID: &task.ID,
},
)
Expand Down Expand Up @@ -282,7 +282,7 @@ func (e *engine) ResetConnector(ctx context.Context, connectorID models.Connecto
detachedCtx,
client.StartWorkflowOptions{
ID: id,
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand All @@ -292,7 +292,7 @@ func (e *engine) ResetConnector(ctx context.Context, connectorID models.Connecto
workflow.RunResetConnector,
workflow.ResetConnector{
ConnectorID: connectorID,
DefaultWorkerName: getDefaultTaskQueue(e.stack),
DefaultWorkerName: GetDefaultTaskQueue(e.stack),
TaskID: task.ID,
},
)
Expand Down Expand Up @@ -383,7 +383,7 @@ func (e *engine) CreateFormanceAccount(ctx context.Context, account models.Accou
ctx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("create-formance-account-send-events-%s-%s-%s", e.stack, account.ConnectorID.String(), account.Reference),
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -437,7 +437,7 @@ func (e *engine) CreateFormancePayment(ctx context.Context, payment models.Payme
ctx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("create-formance-payment-send-events-%s-%s-%s", e.stack, payment.ConnectorID.String(), payment.Reference),
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand All @@ -461,8 +461,12 @@ func (e *engine) ForwardBankAccount(ctx context.Context, bankAccountID uuid.UUID
ctx, span := otel.Tracer().Start(ctx, "engine.ForwardBankAccount")
defer span.End()

id := models.TaskIDReference(fmt.Sprintf("create-bank-account-%s", e.stack), connectorID, bankAccountID.String())
if _, err := e.storage.ConnectorsGet(ctx, connectorID); err != nil {
otel.RecordError(span, err)
return models.Task{}, fmt.Errorf("connector %w", ErrNotFound)
}

id := e.taskIDReferenceFor(IDPrefixBankAccountCreate, connectorID, bankAccountID.String())
now := time.Now().UTC()
task := models.Task{
ID: models.TaskID{
Expand All @@ -484,7 +488,7 @@ func (e *engine) ForwardBankAccount(ctx context.Context, bankAccountID uuid.UUID
ctx,
client.StartWorkflowOptions{
ID: id,
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -542,7 +546,7 @@ func (e *engine) CreateTransfer(ctx context.Context, piID models.PaymentInitiati
client.StartWorkflowOptions{
ID: id,
StartDelay: startDelay,
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -601,7 +605,7 @@ func (e *engine) ReverseTransfer(ctx context.Context, reversal models.PaymentIni
detachedCtx,
client.StartWorkflowOptions{
ID: id,
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -661,7 +665,7 @@ func (e *engine) CreatePayout(ctx context.Context, piID models.PaymentInitiation
client.StartWorkflowOptions{
ID: id,
StartDelay: startDelay,
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -721,7 +725,7 @@ func (e *engine) ReversePayout(ctx context.Context, reversal models.PaymentIniti
detachedCtx,
client.StartWorkflowOptions{
ID: id,
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -761,7 +765,7 @@ func (e *engine) HandleWebhook(ctx context.Context, urlPath string, webhook mode
ctx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("webhook-%s-%s-%s", e.stack, webhook.ConnectorID.String(), webhook.ID),
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),

Check warning on line 768 in internal/connectors/engine/engine.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/engine.go#L768

Added line #L768 was not covered by tests
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -796,7 +800,7 @@ func (e *engine) CreatePool(ctx context.Context, pool models.Pool) error {
ctx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("pools-creation-%s-%s", e.stack, pool.ID.String()),
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -842,7 +846,7 @@ func (e *engine) AddAccountToPool(ctx context.Context, id uuid.UUID, accountID m
detachedCtx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("pools-add-account-%s-%s-%s", e.stack, pool.ID.String(), accountID.String()),
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -888,7 +892,7 @@ func (e *engine) RemoveAccountFromPool(ctx context.Context, id uuid.UUID, accoun
detachedCtx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("pools-remove-account-%s-%s-%s", e.stack, pool.ID.String(), accountID.String()),
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -924,7 +928,7 @@ func (e *engine) DeletePool(ctx context.Context, poolID uuid.UUID) error {
ctx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("pools-deletion-%s-%s", e.stack, poolID.String()),
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -1004,7 +1008,7 @@ func (e *engine) onStartPlugin(ctx context.Context, connector models.Connector)
ctx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("install-%s-%s", e.stack, connector.ID.String()),
TaskQueue: getDefaultTaskQueue(e.stack),
TaskQueue: GetDefaultTaskQueue(e.stack),

Check warning on line 1011 in internal/connectors/engine/engine.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/engine.go#L1011

Added line #L1011 was not covered by tests
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down
140 changes: 140 additions & 0 deletions internal/connectors/engine/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package engine_test

import (
"fmt"
"strings"
"testing"

"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/payments/internal/connectors/engine"
"github.com/formancehq/payments/internal/connectors/engine/activities"
"github.com/formancehq/payments/internal/connectors/engine/plugins"
"github.com/formancehq/payments/internal/connectors/engine/workflow"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/storage"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.temporal.io/sdk/client"
gomock "go.uber.org/mock/gomock"
)

func TestEngine(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Engine Suite")
}

func WithWorkflowOptions(idPrefix, taskQueue string) gomock.Matcher {
return workflowOptionsMatcher{expectedIDPrefix: idPrefix, expectedTaskQueue: taskQueue}
}

type workflowOptionsMatcher struct {
expectedIDPrefix string
expectedTaskQueue string
}

func (m workflowOptionsMatcher) Matches(options any) bool {
opts, ok := options.(client.StartWorkflowOptions)
if !ok {
return false
}

if !strings.HasPrefix(opts.ID, m.expectedIDPrefix) {
return false
}
if opts.TaskQueue != m.expectedTaskQueue {
return false
}
return true
}

func (m workflowOptionsMatcher) String() string {
return "has same options"
}

var _ = Describe("Engine Tests", func() {
var (
stackName string
defaultTaskQueue string
eng engine.Engine
store *storage.MockStorage
cl *activities.MockClient
)
BeforeEach(func() {
stackName = "STACKNAME"
defaultTaskQueue = engine.GetDefaultTaskQueue(stackName)
ctrl := gomock.NewController(GinkgoT())
logger := logging.NewDefaultLogger(GinkgoWriter, false, false, false)
cl = activities.NewMockClient(ctrl)
store = storage.NewMockStorage(ctrl)
plgs := plugins.NewMockPlugins(ctrl)
eng = engine.New(logger, cl, store, plgs, stackName)
})

Context("forwarding a bank account to a connector", func() {
var (
bankID uuid.UUID
connID models.ConnectorID
)
BeforeEach(func() {
connID = models.ConnectorID{Reference: uuid.New(), Provider: "psp"}
bankID = uuid.New()
})

It("should return not found error when storage doesn't find connector", func(ctx SpecContext) {
store.EXPECT().ConnectorsGet(gomock.Any(), connID).Return(
nil, fmt.Errorf("some not found err: %w", storage.ErrNotFound),
)
_, err := eng.ForwardBankAccount(ctx, bankID, connID, false)
Expect(err).NotTo(BeNil())
Expect(err).To(MatchError(engine.ErrNotFound))
})

It("should return storage error when task cannot be upserted", func(ctx SpecContext) {
store.EXPECT().ConnectorsGet(gomock.Any(), connID).Return(
&models.Connector{ID: connID}, nil,
)
expectedErr := fmt.Errorf("fffff")
store.EXPECT().TasksUpsert(gomock.Any(), gomock.AssignableToTypeOf(models.Task{})).Return(
expectedErr,
)
_, err := eng.ForwardBankAccount(ctx, bankID, connID, false)
Expect(err).NotTo(BeNil())
Expect(err).To(MatchError(expectedErr))
})

It("should return error when workflow cannot be started", func(ctx SpecContext) {
store.EXPECT().ConnectorsGet(gomock.Any(), connID).Return(
&models.Connector{ID: connID}, nil,
)
store.EXPECT().TasksUpsert(gomock.Any(), gomock.AssignableToTypeOf(models.Task{})).Return(nil)
expectedErr := fmt.Errorf("workflow failed")
cl.EXPECT().ExecuteWorkflow(gomock.Any(), WithWorkflowOptions(engine.IDPrefixBankAccountCreate, defaultTaskQueue),
workflow.RunCreateBankAccount,
gomock.AssignableToTypeOf(workflow.CreateBankAccount{}),
).Return(nil, expectedErr)

_, err := eng.ForwardBankAccount(ctx, bankID, connID, false)
Expect(err).NotTo(BeNil())
Expect(err).To(MatchError(expectedErr))
})

It("should launch workflow and return task", func(ctx SpecContext) {
store.EXPECT().ConnectorsGet(gomock.Any(), connID).Return(
&models.Connector{ID: connID}, nil,
)
store.EXPECT().TasksUpsert(gomock.Any(), gomock.AssignableToTypeOf(models.Task{})).Return(nil)
cl.EXPECT().ExecuteWorkflow(gomock.Any(), WithWorkflowOptions(engine.IDPrefixBankAccountCreate, defaultTaskQueue),
workflow.RunCreateBankAccount,
gomock.AssignableToTypeOf(workflow.CreateBankAccount{}),
).Return(nil, nil)

task, err := eng.ForwardBankAccount(ctx, bankID, connID, false)
Expect(err).To(BeNil())
Expect(task.ID.Reference).To(ContainSubstring(engine.IDPrefixBankAccountCreate))
Expect(task.ID.Reference).To(ContainSubstring(stackName))
Expect(task.ConnectorID.String()).To(Equal(connID.String()))
Expect(task.Status).To(Equal(models.TASK_STATUS_PROCESSING))
})
})
})
16 changes: 16 additions & 0 deletions internal/connectors/engine/ids.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package engine

import (
"fmt"

"github.com/formancehq/payments/internal/models"
)

const (
IDPrefixBankAccountCreate = "create-bank-account"
)

func (e *engine) taskIDReferenceFor(prefix string, connectorID models.ConnectorID, objectID string) string {
withStack := fmt.Sprintf("%s-%s", prefix, e.stack)
return models.TaskIDReference(withStack, connectorID, objectID)
}
2 changes: 1 addition & 1 deletion internal/connectors/engine/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import (
"fmt"
)

func getDefaultTaskQueue(stack string) string {
func GetDefaultTaskQueue(stack string) string {
return fmt.Sprintf("%s-default", stack)
}
6 changes: 3 additions & 3 deletions internal/connectors/engine/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (w *WorkerPool) onStartPlugin(ctx context.Context, connector models.Connect
}

if !connector.ScheduledForDeletion {
err = w.AddWorker(getDefaultTaskQueue(w.stack))
err = w.AddWorker(GetDefaultTaskQueue(w.stack))

Check warning on line 136 in internal/connectors/engine/workers.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/workers.go#L136

Added line #L136 was not covered by tests
if err != nil {
return err
}
Expand All @@ -157,7 +157,7 @@ func (w *WorkerPool) onInsertPlugin(ctx context.Context, connectorID models.Conn
return err
}

if err := w.AddWorker(getDefaultTaskQueue(w.stack)); err != nil {
if err := w.AddWorker(GetDefaultTaskQueue(w.stack)); err != nil {
return err
}

Expand Down Expand Up @@ -224,7 +224,7 @@ func (w *WorkerPool) Close() {
}

func (w *WorkerPool) AddDefaultWorker() error {
return w.AddWorker(getDefaultTaskQueue(w.stack))
return w.AddWorker(GetDefaultTaskQueue(w.stack))
}

// Installing a new connector lauches a new worker
Expand Down

0 comments on commit eed2c80

Please sign in to comment.