Skip to content

Commit

Permalink
describe various integration patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
kbilsted committed Feb 26, 2024
1 parent f79af85 commit 3ea88e6
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 12 deletions.
168 changes: 158 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
[![Stats](https://img.shields.io/badge/Doc_lines-453-ffb469.svg)]()
<!--end-->

Micro Workflow is a very fast, small, embedable and distributed workflow system primarily for .Net developers.

# 0.
The code base is so small every one can read and understand the inner workings if necesarry.

# 1. Why use Micro Workflow

You should consider using Micro Workflow due to one or more of the following reason

when you have a need for a queue, scheduling of code to execute, or a business process that needs to be robus. We provide many examples in "integration patterns" in section 6 below.

# 1. Design goals

**Simplicity**
* We model only the steps in a workflow, *not* the transitions between them
Expand All @@ -18,15 +24,19 @@

**We use C# all the way**
* We don't want to invent a new language - we love C#!
* Workflow code is *readable*, *debugable*, and *testable* - like the rest of your code base
* You can use existing best practices for logging, IOC containers etc.
* Workflow code is *easy to commit and merge* use your existing *branching strategies*
* Workflow code is *readable*, *debugable*, and *testable* - like the rest of your code base.
* You can use existing best practices for logging, IOC containers etc. of your choice
* Since Workflow code is just C# it is *easy to commit and merge* use your existing *branching strategies*
* You *do not* need a special graphical editor for specifying flows

**The datamodel is simple - just three DB tables**
* If things break in production, it is easy for you to figure out what has happened and remedy the problem
* You can reason about the consequences of versioning the step implementations vs. simply change the existing flows

**Distributed mindset**
* Supports *Fail-over setup* To improve up-time applications/integrations are often running on multiple servers at the same time. This is a common scenario is supported with no special setup.
* Supports incremental deployments across more instances. When deploying multiple instances, the roll-out is typical gradual. Hence we support that steps may be added that is only known to a sub-set of the running workflows.

**Scalable**
* You can add more workers in the workflow engine (vertical scaling)
* You can add more servers each running a workflow engine (horizontal scaling)
Expand Down Expand Up @@ -54,7 +64,7 @@ This is how you control ordering of events.

There are no restrictions on the names of steps, but we found using a scheme similar to REST api's is beneficial. Hence we recommend you to use `{version}/{business domain}/{workflow name}/{workflow step}`.

```
```C#
[StepName(Name)]
class FetchData : IStepImplementation
{
Expand Down Expand Up @@ -104,6 +114,7 @@ You likely want to persist workflows in a database. We currently use Microsoft S




# 4. Core concepts in Micro Workflow

The model revolves around the notion of a *step*. A step is in traditional workfow litterature referred to as an activity. Where activities live in a workflow. The workflow has identity and state and so forth.
Expand Down Expand Up @@ -141,8 +152,145 @@ Operations you can do on steps



# 5. Integration patterns

We discuss some of the typical integration patterns between services


## A queue with unlimited retries and no strict FIFO ordering

A queue implementation where elements are placed in a persistent storage and processed one by one. In case of an error, the element is retried later whilst other elements are processed. This queue is mostly FIFO (first in first out), but due to the nature of the retry, it is not guaranteed.

This implementation is also known as an "outbox pattern".

The Micro Workflow uses a dynamic number of workers for execution so scale up easily.

The implementation roughly

```C#
[StepName(Name)]
class SendMemberEnrollmentToAcmeCompany(HttpClient client) : IStepImplementation
{
public const string Name = "v1/Send-member-enrollment-to-Acme-company";

public async Task<ExecutionResult> ExecuteAsync(Step step)
{
var result = await client.PostAsync("...", null);
result.EnsureSuccessStatusCode();
return step.Done();
}
}
```


## A queue with limited and intelligent retries and no strict FIFO ordering

Similar to the above example except

We retry only 5 times, and we retry only if the receiving service fails with a http 500 range. A http 400 range signals the payload is wrong and thus we fail the job.


```C#
[StepName(Name)]
class SendMemberEnrollmentToAcmeCompanyLimitedRetry(HttpClient client) : IStepImplementation
{
public const string Name = "v1/Send-member-enrollment-to-Acme-company-limited-retry";

public async Task<ExecutionResult> ExecuteAsync(Step step)
{
if (step.ExecutionCount > 5)
return step.Fail("too many retries");

var result = await client.PostAsync("...", null);

switch ((int)result.StatusCode)
{
case >= 200 and < 300:
return step.Done();

case >= 400 and < 500:
return step.Fail("Wrong payload " + result.ToString());

case >= 500:
return step.Rerun(description: $"Upstream error {result}");

default: throw new NotImplementedException();
}
}
}
```


## A queue that may only perform operations in a limited time window

Some integrations are prohibited from running 24/7. For examples systems running nightly batch processing. Similar, integration with customers are sometimes implemeted such that the customer is not disturbed in the middle of the night with notifications but only within "reasonable hours" e.g between 7-20.

```C#
[StepName(Name)]
class SendMemberEnrollmentToAcmeCompanyLimitedTimewindow() : IStepImplementation
{
public const string Name = "v1/Send-member-enrollment-to-Acme-company-limited-from-0700-to-2000";

public async Task<ExecutionResult> ExecuteAsync(Step step)
{
// ensure window of 0700 - 2000
var now = DateTime.Now;
if (now.Hour < 7)
return step.Rerun(scheduleTime: now.Date.AddHours(7));

if (now.Hour >= 20)
return step.Rerun(scheduleTime: now.Date.AddDays(1).AddHours(7));

// ...
return step.Done();
}
}
```


## A scheduled task Once every hour tp fetch latest data from source

For this scenario to work we run the same code block over and over again with a set interval. We utilize that a step may defined as `singleton` meaning that it can only exist once in the ready queue. Thus we cannot
accidently add the step twice (i.e. in a distributed environment). We use `AddStepIfNotExists()` but could have used `Add()` with a `try..catch`.

```
public void ScheduleDataFetch(WorkflowEngine engine)
{
var step = new Step(ScheduledFetchDataOnceAnHour.Name)
{
Singleton = true,
ScheduleTime = DateTime.Now.Date.AddHours(DateTime.Now.Hour)
};
engine.Data.AddStepIfNotExists(step, new SearchModel(Name: ScheduledFetchDataOnceAnHour.Name));
}
```

And for fun, we fail the step if it has not been created correctly, to ensure we don't mess up.

```
[StepName(Name)]
class ScheduledFetchDataOnceAnHour() : IStepImplementation
{
public const string Name = "v1/fetch-data-from-acme-once-an-hour";
public async Task<ExecutionResult> ExecuteAsync(Step step)
{
if (!step.Singleton)
throw new FailCurrentStepException("Must be a singleton step!");
// ... fetch data
return step.Rerun(scheduleTime: step.ExecutionStartTime!.Value.AddHours(1));
}
}
```




# 5. Performance
# 6. Performance

Simplicify is the focus of the code base. Performance is simply a side-effect of keeping things simple.

Expand All @@ -154,7 +302,7 @@ You can take outset in some simple test scenarios at https://github.com/kbilsted



# 6. Flow versioning
# 7. Flow versioning

Since each step may be regarded as being part of a flow, or as a single independent step, there is no notion of versions. However, you can use a version number in steps (similar to using version in REST api's).
This enable you to create a new version with new steps that has a different implementation to the old.
Expand All @@ -163,7 +311,7 @@ If all steps need to execute on the new code, simply use multiple step names for



# 7. Retries and ordering
# 8. Retries and ordering
The automatic retry of a step in case of a failure is key feature. You can control ordering to some degree by putting longer and longer delays when retrying a failing step. This technique is sometimes called exponential back-off, since the time between retries exponentially increase to ensure throughput of succesful jobs. The default retry delay is calculated as `delay = retries^3 seconds`.

If you want to stop retrying either return a `step.Fail()` or `throw FailCurrentStepException`.
Expand All @@ -173,7 +321,7 @@ Step execution is only orderes by an earliest execution time. If you need to con



# 8. Micro Workflow and related concepts
# 9. Micro Workflow and related concepts
Another way to gain conceptual insights into the framework, we explain why Micro Workflow is a good implementation fit to many concepts.


Expand Down
2 changes: 1 addition & 1 deletion src/Demos/MicroWorkflow.Tests/AdoSingletonStepTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void When_creating_a_singleton_Then_it_is_created()
helper.StepHandlers = [Handle(name, step => {
stepResult = $"hello";
stepResultIsSingleton = step.Singleton;
return ExecutionResult.Done();
return step.Done();
})];
helper.StopWhenNoWork().BuildAndStart();

Expand Down
103 changes: 103 additions & 0 deletions src/Demos/MicroWorkflow.Tests/DocumentationTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
namespace MicroWorkflow;

/// <summary>
/// not real tests just a place for code used in the readme.md
/// </summary>
class DocumentationTests
{
[StepName(Name)]
class SendMemberEnrollmentToAcmeCompany(HttpClient client) : IStepImplementation
{
public const string Name = "v1/send-member-enrollment-to-acme-company";

public async Task<ExecutionResult> ExecuteAsync(Step step)
{
var result = await client.PostAsync("...", null);
result.EnsureSuccessStatusCode();
return step.Done();
}
}

[StepName(Name)]
class SendMemberEnrollmentToAcmeCompanyLimitedRetry(HttpClient client) : IStepImplementation
{
public const string Name = "v1/send-member-enrollment-to-acme-company-limited-retry";

public async Task<ExecutionResult> ExecuteAsync(Step step)
{
if (step.ExecutionCount > 5)
return step.Fail("too many retries");

var result = await client.PostAsync("...", null);

switch ((int)result.StatusCode)
{
case >= 200 and < 300:
return step.Done();

case >= 400 and < 500:
return step.Fail("Wrong payload " + result.ToString());

case >= 500:
return step.Rerun(description: $"Upstream error {result}");

default: throw new NotImplementedException();
}
}
}



[StepName(Name)]
class SendMemberEnrollmentToAcmeCompanyLimitedTimewindow() : IStepImplementation
{
public const string Name = "v1/send-member-enrollment-to-acme-company-limited-from-0700-to-2000";

public async Task<ExecutionResult> ExecuteAsync(Step step)
{
// ensure window of 0700 - 2000
var now = DateTime.Now;
if (now.Hour < 7)
return step.Rerun(scheduleTime: now.Date.AddHours(7));

if (now.Hour >= 20)
return step.Rerun(scheduleTime: now.Date.AddDays(1).AddHours(7));

// ...

return step.Done();
}
}


[StepName(Name)]
class ScheduledFetchDataOnceAnHour() : IStepImplementation
{
public const string Name = "v1/fetch-data-from-acme-once-an-hour";

public async Task<ExecutionResult> ExecuteAsync(Step step)
{
if (!step.Singleton)
throw new FailCurrentStepException("Must be a singleton step!");

// ... fetch data

return step.Rerun(scheduleTime: step.ExecutionStartTime!.Value.AddHours(1));
}
}


class AddScheduler
{
public void ScheduleDataFetch(WorkflowEngine engine)
{
var step = new Step(ScheduledFetchDataOnceAnHour.Name)
{
Singleton = true,
ScheduleTime = DateTime.Now.Date.AddHours(DateTime.Now.Hour)
};

engine.Data.AddStepIfNotExists(step, new SearchModel(Name: ScheduledFetchDataOnceAnHour.Name));
}
}
}
2 changes: 1 addition & 1 deletion src/Product/MicroWorkflow/Step.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class Step
/// <summary> the arguments for an activation as it is formatted and persisted in the persistencelayer </summary>
public string? ActivationArgs { get; set; }

/// <summary> The time when the latest execution took place </summary>
/// <summary> During a step implementation, this is the time when the engine started the execution. When queried outside this scope it may not yet have been executed (then it is null) or in case of a step that is being rerun, it is the last execution time. </summary>
public DateTime? ExecutionStartTime { get; set; }

/// <summary> The elapsed time of the latest execution </summary>
Expand Down

0 comments on commit 3ea88e6

Please sign in to comment.