Skip to content

Commit

Permalink
Merge pull request #10 from FRACerqueira/v.1.0.2
Browse files Browse the repository at this point in the history
V.1.0.2
  • Loading branch information
FRACerqueira authored Nov 10, 2023
2 parents 1198f7f + a6d5b84 commit d3ecdc9
Show file tree
Hide file tree
Showing 24 changed files with 646 additions and 566 deletions.
132 changes: 55 additions & 77 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

## Table of Contents

- [What's new - previous versions]()
- [What's new - previous versions](./docs/whatsnewprev.md)
- [Features](#features)
- [Installing](#installing)
- [Examples](#examples)
Expand All @@ -28,23 +28,38 @@
- [API Reference](https://fracerqueira.github.io/PipeAndFilter/apis/apis.html)

## What's new in the latest version
### V1.0.1
### V1.0.2

[**Top**](#table-of-contents)

- First Release G.A
- Added ability to save/overwrite multiple result to use during the execution another pipe / aggregation pipe
- Removed propery 'SavedTasks' in EventPipe
- Removed propery 'SavedPipes' in EventPipe
- Removed Method 'SaveValue'
- Removed Method 'RemoveSavedValue'
- Added Method TrySavedValue
- Now TrySavedValue return true/false if exist id saved and value in out paramameter
- Added Method SaveValueAtEnd
- Now SaveValueAtEnd receives the unique id to be saved/overwrite and the value
- Added Method RemoveValueAtEnd
- Now RemoveValueAtEnd receives the unique id to be removed if any
- Added ability to multiple preconditions for Tasks
- Channged command AddTaskCondition
- Now the same parameters as AddTask
- Added command WithCondition for AddTaskCondition

## Features
[**Top**](#table-of-contents)

- Contract with thread safety for change values
- Thread safety to obtain/change contract values ​​and/or generic purpose when running a Task (pararel execute)
- Add multiple pipe
- Add multiple agregate pipe (for run pararel tasks)
- Set the maximum amount of parallel execution
- Add multiple preconditions to run a pipe
- Add multiple preconditions to run a pipe or task
- Add multiple link to the pipe to jump to another pipe
- Add tasks with a precondition
- Have detailed status (execution date, execution time, type of execution, result of each execution) and number of executions in each pipe
- Save a result from each pipe to use when executing another pipe
- Save a result from each task to use during the execution of the aggregation pipe
- Save multiple results from each pipe to be used during the another pipe/aggregate pipe run
- Save multiple results in each task to be effective during the aggregation pipe run
- Terminate the PipeAndFilter on any task, condition or pipe
- Simple and clear fluent syntax

Expand Down Expand Up @@ -78,20 +93,22 @@ The **PipeAndFilter** use **fluent interface**; an object-oriented API whose des
### Sample-Console Usage

```csharp
var result = await PipeAndFilter.New<MyClass>()
.AddPipe(ExecPipe1)
.WithGotoCondition(CondFalse, "LastPipe")
.WithCondition(CondTrue)
.WithCondition(CondTrue)
.AddPipe(ExecPipe2)
.AddPipe(ExecPipe3)
.AddPipeTasks(AgregateTask)
.WithCondition(CondTrue)
await PipeAndFilter.New<MyClass>()
.AddPipe(Pipe1)
.WithGotoCondition(Cond0, "LastPipe")
.WithCondition(Cond1)
.WithCondition(Cond2)
.AddPipe(Pipe2)
.AddPipe(Pipe3)
.AddPipeTasks(Pipe4)
.WithCondition(Cond1)
.MaxDegreeProcess(4)
.AddTask(Task1)
.AddTaskCondition(Task2, CondFalse)
.AddTask(Task3)
.AddPipe(ExecPipe5, "LastPipe")
.AddTask(Task50)
.AddTaskCondition(Task100)
.WithCondition(Cond3)
.WithCondition(Cond4)
.AddTask(Task150)
.AddPipe(Pipe5, "LastPipe")
.BuildAndCreate()
.Init(contract)
.CorrelationId(null)
Expand All @@ -105,12 +122,12 @@ var result = await PipeAndFilter.New<MyClass>()
```csharp
builder.Services
.AddPipeAndFilter(PipeAndFilter.New<WeatherForecast>()
.AddPipe(ExecPipe)
.AddPipe(TemperatureAdd10)
.Build());
```

```csharp
private static Task ExecPipe(EventPipe<WeatherForecast> pipe, CancellationToken token)
private static Task TemperatureAdd10(EventPipe<WeatherForecast> pipe, CancellationToken token)
{
pipe.ThreadSafeAccess((contract) =>
{
Expand All @@ -126,9 +143,9 @@ private static Task ExecPipe(EventPipe<WeatherForecast> pipe, CancellationToken
public class WeatherForecastController : ControllerBase
{
private readonly ILogger<WeatherForecastController> _logger;
private readonly IPipeAndFilterServiceBuild<WeatherForecast> _mypipe;
private readonly IPipeAndFilterService<WeatherForecast> _mypipe;

public WeatherForecastController(ILogger<WeatherForecastController> logger, IPipeAndFilterServiceBuild<WeatherForecast> pipeAndFilter)
public WeatherForecastController(ILogger<WeatherForecastController> logger, IPipeAndFilterService<WeatherForecast> pipeAndFilter)
{
_logger = logger;
_mypipes = pipeAndFilter;
Expand Down Expand Up @@ -158,65 +175,26 @@ All pipes, conditions and tasks do not perform any task, they are only called an
See folder [**Samples/PipeandFIlterBenchmarking**](https://github.com/FRACerqueira/PipeAndFilter/tree/main/Samples/PipeandFIlterBenchmarking).

```
BenchmarkDotNet v0.13.10, Windows 10 (10.0.19044.3570/21H2/November2021Update)
Intel Core i7-8565U CPU 1.80GHz (Whiskey Lake), 1 CPU, 8 logical and 4 physical cores
.NET SDK 8.0.100-rc.2.23502.2
[Host] : .NET 8.0.0 (8.0.23.47906), X64 RyuJIT AVX2
DefaultJob : .NET 8.0.0 (8.0.23.47906), X64 RyuJIT AVX2
```

| Method | Mean | Error | StdDev | Median | Gen0 | Allocated |
|----------------------------- |-----------:|-----------:|-----------:|-----------:|--------:|----------:|
| PipeAsync | 7.419 us | 0.1483 us | 0.3347 us | 7.345 us | 1.1597 | 4.74 KB |
| PipeWith10Async | 239.257 us | 10.6802 us | 30.9852 us | 234.596 us | 19.5313 | 80.68 KB |
| PipeWithConditionAsync | 8.273 us | 0.1639 us | 0.2599 us | 8.146 us | 1.4038 | 5.76 KB |
| PipeWith10ConditionAsync | 20.606 us | 0.4113 us | 0.9774 us | 20.202 us | 3.6011 | 14.78 KB |
| PipeWith10ConditionGotoAsync | 33.396 us | 0.6631 us | 1.2455 us | 33.024 us | 5.1270 | 21.08 KB |
| PipeTaskAsync | 16.918 us | 0.5232 us | 1.5096 us | 16.795 us | 1.7090 | 7.07 KB |
| PipeWith10TaskAsync | 72.402 us | 3.5790 us | 9.8577 us | 68.424 us | 4.8828 | 20.47 KB |
| PipeTaskConditionAsync | 19.375 us | 0.3853 us | 0.9736 us | 19.425 us | 1.8616 | 7.66 KB |
| PipeWith10TaskConditionAsync | 63.898 us | 1.2774 us | 2.9858 us | 63.562 us | 4.8828 | 20.47 KB |

```
------------------------------------------------------------------------------------------------------
BenchmarkDotNet v0.13.10, Windows 10 (10.0.19044.3570/21H2/November2021Update)
Intel Core i7-8565U CPU 1.80GHz (Whiskey Lake), 1 CPU, 8 logical and 4 physical cores
.NET SDK 8.0.100-rc.2.23502.2
[Host] : .NET 7.0.13 (7.0.1323.51816), X64 RyuJIT AVX2
DefaultJob : .NET 7.0.13 (7.0.1323.51816), X64 RyuJIT AVX2
------------------------------------------------------------------------------------------------------
| Method | Mean | Error | StdDev | Median | Gen0 | Allocated |
|----------------------------- |----------:|----------:|----------:|----------:|--------:|----------:|
| PipeAsync | 3.990 us | 0.0460 us | 0.0384 us | 3.992 us | 0.8698 | 3.57 KB |
| PipeWith10Async | 97.574 us | 1.7283 us | 1.7748 us | 97.153 us | 15.0146 | 61.37 KB |
| PipeWithConditionAsync | 5.003 us | 0.0591 us | 0.0524 us | 5.003 us | 1.0834 | 4.45 KB |
| PipeWith10ConditionAsync | 13.157 us | 0.1262 us | 0.0985 us | 13.155 us | 3.1891 | 13.05 KB |
| PipeWith10ConditionGotoAsync | 18.253 us | 0.3007 us | 0.2347 us | 18.211 us | 3.9978 | 16.34 KB |
| PipeTaskAsync | 9.741 us | 0.1923 us | 0.3517 us | 9.649 us | 1.3275 | 5.45 KB |
| PipeWith10TaskAsync | 45.064 us | 0.7313 us | 0.8981 us | 44.984 us | 4.5166 | 18.58 KB |
| PipeTaskConditionAsync | 11.280 us | 0.1956 us | 0.1830 us | 11.312 us | 1.5564 | 6.39 KB |
| PipeWith10TaskConditionAsync | 48.034 us | 0.9578 us | 2.5895 us | 47.222 us | 4.5166 | 18.58 KB |
```

| Method | Mean | Error | StdDev | Median | Gen0 | Allocated |
|----------------------------- |-----------:|----------:|-----------:|-----------:|--------:|----------:|
| PipeAsync | 11.231 μs | 0.4573 μs | 1.3195 μs | 10.898 μs | 1.1597 | 4.79 KB |
| PipeWith10Async | 226.285 μs | 4.5187 μs | 10.8264 μs | 222.330 μs | 19.5313 | 80.73 KB |
| PipeWithConditionAsync | 9.902 μs | 0.1137 μs | 0.0950 μs | 9.858 μs | 1.4038 | 5.8 KB |
| PipeWith10ConditionAsync | 26.949 μs | 0.9860 μs | 2.6824 μs | 26.154 μs | 3.6011 | 14.83 KB |
| PipeWith10ConditionGotoAsync | 39.820 μs | 0.7613 μs | 1.2075 μs | 39.498 μs | 5.1270 | 21.13 KB |
| PipeTaskAsync | 20.286 μs | 0.6041 μs | 1.7334 μs | 19.744 μs | 1.7395 | 7.12 KB |
| PipeWith10TaskAsync | 101.252 μs | 5.3239 μs | 15.4455 μs | 97.842 μs | 4.8828 | 20.53 KB |
| PipeTaskConditionAsync | 24.214 μs | 1.3098 μs | 3.7998 μs | 22.740 μs | 1.8616 | 7.7 KB |
| PipeWith10TaskConditionAsync | 98.953 μs | 3.9903 μs | 11.0570 μs | 95.221 μs | 4.8828 | 20.56 KB |

```
BenchmarkDotNet v0.13.10, Windows 10 (10.0.19044.3570/21H2/November2021Update)
Intel Core i7-8565U CPU 1.80GHz (Whiskey Lake), 1 CPU, 8 logical and 4 physical cores
.NET SDK 8.0.100-rc.2.23502.2
[Host] : .NET Core 3.1.32 (CoreCLR 4.700.22.55902, CoreFX 4.700.22.56512), X64 RyuJIT AVX2
DefaultJob : .NET Core 3.1.32 (CoreCLR 4.700.22.55902, CoreFX 4.700.22.56512), X64 RyuJIT AVX2
```

| Method | Mean | Error | StdDev | Gen0 | Allocated |
|----------------------------- |----------:|---------:|---------:|--------:|----------:|
| PipeAsync | 14.38 us | 0.225 us | 0.199 us | 1.1597 | 4.77 KB |
| PipeWith10Async | 328.18 us | 6.287 us | 5.880 us | 19.5313 | 81.7 KB |
| PipeWithConditionAsync | 17.11 us | 0.283 us | 0.237 us | 1.4038 | 5.85 KB |
| PipeWith10ConditionAsync | 36.82 us | 0.702 us | 0.657 us | 3.6621 | 15.44 KB |
| PipeWith10ConditionGotoAsync | 58.69 us | 1.103 us | 2.557 us | 5.3101 | 21.8 KB |
| PipeTaskAsync | 37.06 us | 0.720 us | 1.077 us | 1.7090 | 7.18 KB |
| PipeWith10TaskAsync | 222.57 us | 2.935 us | 2.745 us | 4.8828 | 20.52 KB |
| PipeTaskConditionAsync | 42.97 us | 0.906 us | 2.525 us | 1.8921 | 7.84 KB |
| PipeWith10TaskConditionAsync | 224.77 us | 2.119 us | 1.982 us | 4.8828 | 20.54 KB |

## Code of Conduct
[**Top**](#table-of-contents)

Expand Down
3 changes: 2 additions & 1 deletion Samples/PipeandFIlterBenchmarking/MemoryBenchmarkerDemo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public async Task PipeTaskConditionAsync()
{
await PipeAndFilter.New<MyClass>()
.AddPipeTasks(ExecPipe)
.AddTaskCondition(ExecTask,CondTrue)
.AddTaskCondition(ExecTask)
.WithCondition(CondTrue)
.BuildAndCreate()
.Run();
}
Expand Down
23 changes: 13 additions & 10 deletions Samples/SampleConsole/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using PipeFilterCore;
using System.Diagnostics.Contracts;
using PipeFilterCore;

namespace PipeFilterCoreSamples
{
Expand All @@ -23,7 +24,9 @@ public static async Task<int> Main()
.WithCondition(CondTrue)
.MaxDegreeProcess(4)
.AddTask(Task50)
.AddTaskCondition(Task100, CondFalse)
.AddTaskCondition(Task100)
.WithCondition(CondTrue)
.WithCondition(CondFalse)
.AddTask(Task150)
.AddPipe(ExecPipe, "LastPipe")
.BuildAndCreate()
Expand All @@ -32,13 +35,13 @@ public static async Task<int> Main()
.Logger(null)
.Run();

Console.WriteLine($"Contract value : {contract.MyProperty} Total Elapsedtime: {pl.Elapsedtime}" );
Console.WriteLine($"Contract value : {contract.MyProperty} Total Elapsedtime: {pl.Elapsedtime}");
foreach (var item in pl.Status)
{
Console.WriteLine($"{item.Alias}:{item.Status.Value} Count: {item.Count} => {item.Status.Elapsedtime}");
foreach (var det in item.StatusDetails)
foreach (var det in item.StatusDetails)
{
Console.WriteLine($"\t{det.TypeExec}:{det.GotoAlias ?? det.Alias}:{det.Condition} => {det.Value}:{det.Elapsedtime} UTC:{det.DateRef.ToString("MM/dd/yyyy hh:mm:ss ffff")}");
Console.WriteLine($"\t{det.TypeExec}:{det.GotoAlias ?? det.Alias}:{det.Condition} => {det.Value}:{det.Elapsedtime} UTC:{det.DateRef:MM/dd/yyyy hh:mm:ss ffff}");
}
}

Expand All @@ -57,7 +60,7 @@ private static async Task Task50(EventPipe<MyClass> pipe, CancellationToken toke
try
{
await Task.Delay(50, token);
pipe.SaveValue(50);
pipe.SaveValueAtEnd("T50",50);
}
catch (TaskCanceledException)
{
Expand All @@ -74,7 +77,7 @@ private static async Task Task100(EventPipe<MyClass> pipe, CancellationToken tok
try
{
await Task.Delay(100, token);
pipe.SaveValue(100);
pipe.SaveValueAtEnd("T100",100);
}
catch (TaskCanceledException)
{
Expand All @@ -91,7 +94,7 @@ private static async Task Task150(EventPipe<MyClass> pipe, CancellationToken tok
try
{
await Task.Delay(150, token);
pipe.SaveValue(150);
pipe.SaveValueAtEnd("T150",150);
}
catch (TaskCanceledException)
{
Expand All @@ -101,7 +104,7 @@ private static async Task Task150(EventPipe<MyClass> pipe, CancellationToken tok

private static Task ExecPipe(EventPipe<MyClass> pipe, CancellationToken token)
{
pipe.SaveValue("Saved");
pipe.SaveValueAtEnd("ExecPipe", "Saved1");
return Task.CompletedTask;
}

Expand All @@ -112,7 +115,7 @@ private static Task AgregateTask(EventPipe<MyClass> pipe, CancellationToken toke

private static async Task ExecPipe100(EventPipe<MyClass> pipe, CancellationToken token)
{
pipe.SaveValue("Saved0");
pipe.SaveValueAtEnd("ExecPipe100", "Saved2");
try
{
await Task.Delay(100, token);
Expand Down
58 changes: 58 additions & 0 deletions Src/CommandsInterface/IPipeAndFilterTaskCondition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// ********************************************************************************************
// MIT LICENCE
// The maintenance and evolution is maintained by the PipeAndFilter project under MIT license
// ********************************************************************************************

namespace PipeFilterCore
{
/// <summary>
/// Represents commands for conditions.
/// </summary>
/// <typeparam name="T">Type of contract.</typeparam>
public interface IPipeAndFilterTaskCondition<T>: IPipeAndFilterBuild<T> where T : class
{
/// <summary>
/// Add new pipe.
/// </summary>
/// <param name="command">The handler pipe to execute.</param>
/// <param name="alias">
/// The unique alias for pipe.
/// <br>If the alias is omitted, the alias will be the handler name followed by the reference quantity (if any).</br>
/// <br>Alias ​​is used to reference in another pipe.</br>
/// </param>
/// <returns><see cref="IPipeAndFilterAdd{T}"/></returns>
IPipeAndFilterAdd<T> AddPipe(Func<EventPipe<T>, CancellationToken, Task> command, string? alias = null);

/// <summary>
/// Add new pipe aggregate tasks.
/// </summary>
/// <param name="command">The handler pipe aggregate to execute.
/// <br>The handler command will run after all tasks are executed.</br>
/// </param>
/// <param name="alias">
/// The unique alias for pipe.
/// <br>If the alias is omitted, the alias will be the handler name followed by the reference quantity (if any).</br>
/// <br>Alias ​​is used to reference in another pipe.</br>
/// </param>
/// <returns><see cref="IPipeAndFilterTasks{T}"/></returns>
IPipeAndFilterTasks<T> AddPipeTasks(Func<EventPipe<T>, CancellationToken, Task> command, string? alias = null);


/// <summary>
/// Add new task (execution in parallel) through pipe.
/// </summary>
/// <param name="command">The handler task to execute.</param>
/// <param name="nametask">The name for task (optional).</param>
/// <returns><see cref="IPipeAndFilterTasks{T}"/></returns>
IPipeAndFilterTasks<T> AddTask(Func<EventPipe<T>, CancellationToken, Task> command, string? nametask = null);

/// <summary>
/// Add new condition for task.
/// </summary>
/// <param name="condition">The handle condition to execute.</param>
/// <param name="namecondition">The name for condition(optional).</param>
/// <returns><see cref="IPipeAndFilterTaskCondition{T}"/></returns>
IPipeAndFilterTaskCondition<T> WithCondition(Func<EventPipe<T>, CancellationToken, ValueTask<bool>> condition, string? namecondition = null);
}

}
8 changes: 3 additions & 5 deletions Src/CommandsInterface/IPipeAndFilterTasks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,12 @@ public interface IPipeAndFilterTasks<T>: IPipeAndFilterBuild<T> where T : class


/// <summary>
/// Add new task (execution in parallel) through pipe with a condition.
/// Add new task (execution in parallel) through pipe with conditions.
/// </summary>
/// <param name="command">The handler task to execute.</param>
/// <param name="condition">The handler task to condition.</param>
/// <param name="nametask">The name for task (optional).</param>
/// <param name="namecondition">The name for condition (optional).</param>
/// <returns><see cref="IPipeAndFilterTasks{T}"/></returns>
IPipeAndFilterTasks<T> AddTaskCondition(Func<EventPipe<T>, CancellationToken, Task> command, Func<EventPipe<T>, CancellationToken, ValueTask<bool>> condition, string? nametask = null, string? namecondition = null);
/// <returns><see cref="IPipeAndFilterTaskCondition{T}"/></returns>
IPipeAndFilterTaskCondition<T> AddTaskCondition(Func<EventPipe<T>, CancellationToken, Task> command, string? nametask = null);

/// <summary>
/// Add new condition.
Expand Down
10 changes: 3 additions & 7 deletions Src/CommandsInterface/internal/IPipeAndFilterOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@ namespace PipeFilterCore
internal interface IPipeAndFilterOptions<T> where T : class
{
string? ServiceId { get; }
IImmutableDictionary<string, string> AliasToId { get; }
IImmutableDictionary<string, string?> IdToAlias { get; }
IImmutableDictionary<string, int> MaxDegreeProcess { get; }
IImmutableList<PipeCommand<T>> Pipes { get; }
IImmutableDictionary<string, bool> AggregateTasks { get; }
IImmutableDictionary<string, IImmutableList<PipeCondition<T>>> PreConditions { get; }
IImmutableDictionary<string, IImmutableList<PipeTask<T>>> Tasks { get; }
ImmutableDictionary <string, string> AliasToId { get; }
ImmutableDictionary<string, string?> IdToAlias { get; }
ImmutableList<PipeCommand<T>> Pipes { get; }
}
}
Loading

0 comments on commit d3ecdc9

Please sign in to comment.