From eed2c808fee9e95ad999170900f3dc0cbbe8ff67 Mon Sep 17 00:00:00 2001 From: Crimson Thompson Date: Mon, 10 Feb 2025 16:45:33 +0100 Subject: [PATCH] Return error before launching workflow if connector not found in forward bank account req --- internal/connectors/engine/engine.go | 42 ++++--- internal/connectors/engine/engine_test.go | 140 ++++++++++++++++++++++ internal/connectors/engine/ids.go | 16 +++ internal/connectors/engine/util.go | 2 +- internal/connectors/engine/workers.go | 6 +- 5 files changed, 183 insertions(+), 23 deletions(-) create mode 100644 internal/connectors/engine/engine_test.go create mode 100644 internal/connectors/engine/ids.go diff --git a/internal/connectors/engine/engine.go b/internal/connectors/engine/engine.go index c59475c0..33b46b6e 100644 --- a/internal/connectors/engine/engine.go +++ b/internal/connectors/engine/engine.go @@ -157,7 +157,7 @@ func (e *engine) InstallConnector(ctx context.Context, provider string, rawConfi detachedCtx, client.StartWorkflowOptions{ ID: fmt.Sprintf("install-%s-%s", e.stack, connector.ID.String()), - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -222,7 +222,7 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn detachedCtx, client.StartWorkflowOptions{ ID: id, - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -232,7 +232,7 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn workflow.RunUninstallConnector, workflow.UninstallConnector{ ConnectorID: connectorID, - DefaultWorkerName: getDefaultTaskQueue(e.stack), + DefaultWorkerName: GetDefaultTaskQueue(e.stack), TaskID: &task.ID, }, ) @@ -282,7 +282,7 @@ func (e *engine) ResetConnector(ctx context.Context, connectorID models.Connecto detachedCtx, client.StartWorkflowOptions{ ID: id, - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -292,7 +292,7 @@ func (e *engine) ResetConnector(ctx context.Context, connectorID models.Connecto workflow.RunResetConnector, workflow.ResetConnector{ ConnectorID: connectorID, - DefaultWorkerName: getDefaultTaskQueue(e.stack), + DefaultWorkerName: GetDefaultTaskQueue(e.stack), TaskID: task.ID, }, ) @@ -383,7 +383,7 @@ func (e *engine) CreateFormanceAccount(ctx context.Context, account models.Accou ctx, client.StartWorkflowOptions{ ID: fmt.Sprintf("create-formance-account-send-events-%s-%s-%s", e.stack, account.ConnectorID.String(), account.Reference), - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -437,7 +437,7 @@ func (e *engine) CreateFormancePayment(ctx context.Context, payment models.Payme ctx, client.StartWorkflowOptions{ ID: fmt.Sprintf("create-formance-payment-send-events-%s-%s-%s", e.stack, payment.ConnectorID.String(), payment.Reference), - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -461,8 +461,12 @@ func (e *engine) ForwardBankAccount(ctx context.Context, bankAccountID uuid.UUID ctx, span := otel.Tracer().Start(ctx, "engine.ForwardBankAccount") defer span.End() - id := models.TaskIDReference(fmt.Sprintf("create-bank-account-%s", e.stack), connectorID, bankAccountID.String()) + if _, err := e.storage.ConnectorsGet(ctx, connectorID); err != nil { + otel.RecordError(span, err) + return models.Task{}, fmt.Errorf("connector %w", ErrNotFound) + } + id := e.taskIDReferenceFor(IDPrefixBankAccountCreate, connectorID, bankAccountID.String()) now := time.Now().UTC() task := models.Task{ ID: models.TaskID{ @@ -484,7 +488,7 @@ func (e *engine) ForwardBankAccount(ctx context.Context, bankAccountID uuid.UUID ctx, client.StartWorkflowOptions{ ID: id, - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -542,7 +546,7 @@ func (e *engine) CreateTransfer(ctx context.Context, piID models.PaymentInitiati client.StartWorkflowOptions{ ID: id, StartDelay: startDelay, - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -601,7 +605,7 @@ func (e *engine) ReverseTransfer(ctx context.Context, reversal models.PaymentIni detachedCtx, client.StartWorkflowOptions{ ID: id, - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -661,7 +665,7 @@ func (e *engine) CreatePayout(ctx context.Context, piID models.PaymentInitiation client.StartWorkflowOptions{ ID: id, StartDelay: startDelay, - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -721,7 +725,7 @@ func (e *engine) ReversePayout(ctx context.Context, reversal models.PaymentIniti detachedCtx, client.StartWorkflowOptions{ ID: id, - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -761,7 +765,7 @@ func (e *engine) HandleWebhook(ctx context.Context, urlPath string, webhook mode ctx, client.StartWorkflowOptions{ ID: fmt.Sprintf("webhook-%s-%s-%s", e.stack, webhook.ConnectorID.String(), webhook.ID), - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -796,7 +800,7 @@ func (e *engine) CreatePool(ctx context.Context, pool models.Pool) error { ctx, client.StartWorkflowOptions{ ID: fmt.Sprintf("pools-creation-%s-%s", e.stack, pool.ID.String()), - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -842,7 +846,7 @@ func (e *engine) AddAccountToPool(ctx context.Context, id uuid.UUID, accountID m detachedCtx, client.StartWorkflowOptions{ ID: fmt.Sprintf("pools-add-account-%s-%s-%s", e.stack, pool.ID.String(), accountID.String()), - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -888,7 +892,7 @@ func (e *engine) RemoveAccountFromPool(ctx context.Context, id uuid.UUID, accoun detachedCtx, client.StartWorkflowOptions{ ID: fmt.Sprintf("pools-remove-account-%s-%s-%s", e.stack, pool.ID.String(), accountID.String()), - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -924,7 +928,7 @@ func (e *engine) DeletePool(ctx context.Context, poolID uuid.UUID) error { ctx, client.StartWorkflowOptions{ ID: fmt.Sprintf("pools-deletion-%s-%s", e.stack, poolID.String()), - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ @@ -1004,7 +1008,7 @@ func (e *engine) onStartPlugin(ctx context.Context, connector models.Connector) ctx, client.StartWorkflowOptions{ ID: fmt.Sprintf("install-%s-%s", e.stack, connector.ID.String()), - TaskQueue: getDefaultTaskQueue(e.stack), + TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, SearchAttributes: map[string]interface{}{ diff --git a/internal/connectors/engine/engine_test.go b/internal/connectors/engine/engine_test.go new file mode 100644 index 00000000..d526b472 --- /dev/null +++ b/internal/connectors/engine/engine_test.go @@ -0,0 +1,140 @@ +package engine_test + +import ( + "fmt" + "strings" + "testing" + + "github.com/formancehq/go-libs/v2/logging" + "github.com/formancehq/payments/internal/connectors/engine" + "github.com/formancehq/payments/internal/connectors/engine/activities" + "github.com/formancehq/payments/internal/connectors/engine/plugins" + "github.com/formancehq/payments/internal/connectors/engine/workflow" + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/internal/storage" + "github.com/google/uuid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "go.temporal.io/sdk/client" + gomock "go.uber.org/mock/gomock" +) + +func TestEngine(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Engine Suite") +} + +func WithWorkflowOptions(idPrefix, taskQueue string) gomock.Matcher { + return workflowOptionsMatcher{expectedIDPrefix: idPrefix, expectedTaskQueue: taskQueue} +} + +type workflowOptionsMatcher struct { + expectedIDPrefix string + expectedTaskQueue string +} + +func (m workflowOptionsMatcher) Matches(options any) bool { + opts, ok := options.(client.StartWorkflowOptions) + if !ok { + return false + } + + if !strings.HasPrefix(opts.ID, m.expectedIDPrefix) { + return false + } + if opts.TaskQueue != m.expectedTaskQueue { + return false + } + return true +} + +func (m workflowOptionsMatcher) String() string { + return "has same options" +} + +var _ = Describe("Engine Tests", func() { + var ( + stackName string + defaultTaskQueue string + eng engine.Engine + store *storage.MockStorage + cl *activities.MockClient + ) + BeforeEach(func() { + stackName = "STACKNAME" + defaultTaskQueue = engine.GetDefaultTaskQueue(stackName) + ctrl := gomock.NewController(GinkgoT()) + logger := logging.NewDefaultLogger(GinkgoWriter, false, false, false) + cl = activities.NewMockClient(ctrl) + store = storage.NewMockStorage(ctrl) + plgs := plugins.NewMockPlugins(ctrl) + eng = engine.New(logger, cl, store, plgs, stackName) + }) + + Context("forwarding a bank account to a connector", func() { + var ( + bankID uuid.UUID + connID models.ConnectorID + ) + BeforeEach(func() { + connID = models.ConnectorID{Reference: uuid.New(), Provider: "psp"} + bankID = uuid.New() + }) + + It("should return not found error when storage doesn't find connector", func(ctx SpecContext) { + store.EXPECT().ConnectorsGet(gomock.Any(), connID).Return( + nil, fmt.Errorf("some not found err: %w", storage.ErrNotFound), + ) + _, err := eng.ForwardBankAccount(ctx, bankID, connID, false) + Expect(err).NotTo(BeNil()) + Expect(err).To(MatchError(engine.ErrNotFound)) + }) + + It("should return storage error when task cannot be upserted", func(ctx SpecContext) { + store.EXPECT().ConnectorsGet(gomock.Any(), connID).Return( + &models.Connector{ID: connID}, nil, + ) + expectedErr := fmt.Errorf("fffff") + store.EXPECT().TasksUpsert(gomock.Any(), gomock.AssignableToTypeOf(models.Task{})).Return( + expectedErr, + ) + _, err := eng.ForwardBankAccount(ctx, bankID, connID, false) + Expect(err).NotTo(BeNil()) + Expect(err).To(MatchError(expectedErr)) + }) + + It("should return error when workflow cannot be started", func(ctx SpecContext) { + store.EXPECT().ConnectorsGet(gomock.Any(), connID).Return( + &models.Connector{ID: connID}, nil, + ) + store.EXPECT().TasksUpsert(gomock.Any(), gomock.AssignableToTypeOf(models.Task{})).Return(nil) + expectedErr := fmt.Errorf("workflow failed") + cl.EXPECT().ExecuteWorkflow(gomock.Any(), WithWorkflowOptions(engine.IDPrefixBankAccountCreate, defaultTaskQueue), + workflow.RunCreateBankAccount, + gomock.AssignableToTypeOf(workflow.CreateBankAccount{}), + ).Return(nil, expectedErr) + + _, err := eng.ForwardBankAccount(ctx, bankID, connID, false) + Expect(err).NotTo(BeNil()) + Expect(err).To(MatchError(expectedErr)) + }) + + It("should launch workflow and return task", func(ctx SpecContext) { + store.EXPECT().ConnectorsGet(gomock.Any(), connID).Return( + &models.Connector{ID: connID}, nil, + ) + store.EXPECT().TasksUpsert(gomock.Any(), gomock.AssignableToTypeOf(models.Task{})).Return(nil) + cl.EXPECT().ExecuteWorkflow(gomock.Any(), WithWorkflowOptions(engine.IDPrefixBankAccountCreate, defaultTaskQueue), + workflow.RunCreateBankAccount, + gomock.AssignableToTypeOf(workflow.CreateBankAccount{}), + ).Return(nil, nil) + + task, err := eng.ForwardBankAccount(ctx, bankID, connID, false) + Expect(err).To(BeNil()) + Expect(task.ID.Reference).To(ContainSubstring(engine.IDPrefixBankAccountCreate)) + Expect(task.ID.Reference).To(ContainSubstring(stackName)) + Expect(task.ConnectorID.String()).To(Equal(connID.String())) + Expect(task.Status).To(Equal(models.TASK_STATUS_PROCESSING)) + }) + }) +}) diff --git a/internal/connectors/engine/ids.go b/internal/connectors/engine/ids.go new file mode 100644 index 00000000..3f804f64 --- /dev/null +++ b/internal/connectors/engine/ids.go @@ -0,0 +1,16 @@ +package engine + +import ( + "fmt" + + "github.com/formancehq/payments/internal/models" +) + +const ( + IDPrefixBankAccountCreate = "create-bank-account" +) + +func (e *engine) taskIDReferenceFor(prefix string, connectorID models.ConnectorID, objectID string) string { + withStack := fmt.Sprintf("%s-%s", prefix, e.stack) + return models.TaskIDReference(withStack, connectorID, objectID) +} diff --git a/internal/connectors/engine/util.go b/internal/connectors/engine/util.go index 034e712e..b661797e 100644 --- a/internal/connectors/engine/util.go +++ b/internal/connectors/engine/util.go @@ -4,6 +4,6 @@ import ( "fmt" ) -func getDefaultTaskQueue(stack string) string { +func GetDefaultTaskQueue(stack string) string { return fmt.Sprintf("%s-default", stack) } diff --git a/internal/connectors/engine/workers.go b/internal/connectors/engine/workers.go index c3cd19c7..a9b690b5 100644 --- a/internal/connectors/engine/workers.go +++ b/internal/connectors/engine/workers.go @@ -133,7 +133,7 @@ func (w *WorkerPool) onStartPlugin(ctx context.Context, connector models.Connect } if !connector.ScheduledForDeletion { - err = w.AddWorker(getDefaultTaskQueue(w.stack)) + err = w.AddWorker(GetDefaultTaskQueue(w.stack)) if err != nil { return err } @@ -157,7 +157,7 @@ func (w *WorkerPool) onInsertPlugin(ctx context.Context, connectorID models.Conn return err } - if err := w.AddWorker(getDefaultTaskQueue(w.stack)); err != nil { + if err := w.AddWorker(GetDefaultTaskQueue(w.stack)); err != nil { return err } @@ -224,7 +224,7 @@ func (w *WorkerPool) Close() { } func (w *WorkerPool) AddDefaultWorker() error { - return w.AddWorker(getDefaultTaskQueue(w.stack)) + return w.AddWorker(GetDefaultTaskQueue(w.stack)) } // Installing a new connector lauches a new worker