Skip to content

Commit

Permalink
Merge pull request #12 from FRACerqueira/v1.0.3
Browse files Browse the repository at this point in the history
V1.0.3
  • Loading branch information
FRACerqueira authored Nov 10, 2023
2 parents d3ecdc9 + d876a35 commit 922ccee
Show file tree
Hide file tree
Showing 40 changed files with 1,975 additions and 533 deletions.
4 changes: 2 additions & 2 deletions PipeAndFilter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docs", "Docs", "{0EE631D9-C
LICENSE = LICENSE
docs\LICENSE.md = docs\LICENSE.md
SECURITY.md = SECURITY.md
whatsnewprev.md = whatsnewprev.md
docs\whatsnewprev.md = docs\whatsnewprev.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PipeAndFilter", "Src\PipeAndFilter.csproj", "{27DC4458-EAEB-4C93-ACAF-25F25C404DBC}"
Expand All @@ -21,7 +21,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PipeAndFilterSampleConsole"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PipeAndFilterSampleApi", "Samples\SampleApi\PipeAndFilterSampleApi.csproj", "{6696533B-7288-4B59-ADBC-919D875DA5F3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PipeandFIlterBenchmarking", "Samples\PipeandFIlterBenchmarking\PipeandFIlterBenchmarking.csproj", "{1DA0127C-8DB8-4DBA-854F-668F0DBE8289}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PipeandFIlterBenchmarking", "Samples\PipeandFIlterBenchmarking\PipeandFIlterBenchmarking.csproj", "{1DA0127C-8DB8-4DBA-854F-668F0DBE8289}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
38 changes: 18 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,22 @@
- [API Reference](https://fracerqueira.github.io/PipeAndFilter/apis/apis.html)

## What's new in the latest version
### V1.0.2
### V1.0.3

[**Top**](#table-of-contents)

- Added ability to save/overwrite multiple result to use during the execution another pipe / aggregation pipe
- Removed propery 'SavedTasks' in EventPipe
- Removed propery 'SavedPipes' in EventPipe
- Removed Method 'SaveValue'
- Removed Method 'RemoveSavedValue'
- Added Method TrySavedValue
- Now TrySavedValue return true/false if exist id saved and value in out paramameter
- Added Method SaveValueAtEnd
- Now SaveValueAtEnd receives the unique id to be saved/overwrite and the value
- Added Method RemoveValueAtEnd
- Now RemoveValueAtEnd receives the unique id to be removed if any
- Added ability to multiple preconditions for Tasks
- Channged command AddTaskCondition
- Now the same parameters as AddTask
- Added command WithCondition for AddTaskCondition
- First Release G.A

## Features
[**Top**](#table-of-contents)

- Thread safety to obtain/change contract values ​​and/or generic purpose when running a Task (pararel execute)
- Add multiple pipe
- Add multiple agregate pipe (for run pararel tasks)
- Add multiple Aggregate pipe (for run pararel tasks)
- Set the maximum amount of parallel execution
- Add multiple preconditions to run a pipe or task
- Add multiple link to the pipe to jump to another pipe
- Perform an action with conditions after pipe/aggregatepipe
- Have detailed status (execution date, execution time, type of execution, result of each execution) and number of executions in each pipe
- Save multiple results from each pipe to be used during the another pipe/aggregate pipe run
- Save multiple results in each task to be effective during the aggregation pipe run
Expand Down Expand Up @@ -90,25 +77,36 @@ dotnet run --project [name of sample]

The **PipeAndFilter** use **fluent interface**; an object-oriented API whose design relies extensively on method chaining. Its goal is to increase code legibility. The term was coined in 2005 by Eric Evans and Martin Fowler.

### Sample-Console Usage
### Sample-Console Usage (Full features)

```csharp
await PipeAndFilter.New<MyClass>()
.AddPipe(Pipe1)
.WithGotoCondition(Cond0, "LastPipe")
.WithCondition(Cond1)
.WithCondition(Cond2)
.AfterRunningPipe(ExecPipeAfter)
.WithCondition(CondA1)
.WithGotoCondition(CondA2, "LastPipe")
.AddPipe(Pipe2)
.AfterRunningPipe()
.WithGotoCondition(CondA3, "LastPipe")
.AddPipe(Pipe3)
.AddPipeTasks(Pipe4)
.AfterRunningAggregatePipe(ExecPipeAfterTask)
.MaxDegreeProcess(8)
.AddTaskCondition(Task50)
.WithCondition(CondTrue)
.AddTask(Task100)
.AddPipe(Pipe4)
.AddAggregatePipe(Pipe5)
.WithCondition(Cond1)
.MaxDegreeProcess(4)
.AddTask(Task50)
.AddTaskCondition(Task100)
.WithCondition(Cond3)
.WithCondition(Cond4)
.AddTask(Task150)
.AddPipe(Pipe5, "LastPipe")
.AddPipe(Pipe6, "LastPipe")
.BuildAndCreate()
.Init(contract)
.CorrelationId(null)
Expand Down
8 changes: 4 additions & 4 deletions Samples/PipeandFIlterBenchmarking/MemoryBenchmarkerDemo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ await aux.BuildAndCreate()
public async Task PipeTaskAsync()
{
await PipeAndFilter.New<MyClass>()
.AddPipeTasks(ExecPipe)
.AddAggregatePipe(ExecPipe)
.AddTask(ExecTask)
.BuildAndCreate()
.Run();
Expand All @@ -86,7 +86,7 @@ await PipeAndFilter.New<MyClass>()
public async Task PipeWith10TaskAsync()
{
var aux = PipeAndFilter.New<MyClass>()
.AddPipeTasks(ExecPipe)
.AddAggregatePipe(ExecPipe)
.MaxDegreeProcess(4);
for (int i = 0; i < 10; i++)
{
Expand All @@ -101,7 +101,7 @@ await aux.BuildAndCreate()
public async Task PipeTaskConditionAsync()
{
await PipeAndFilter.New<MyClass>()
.AddPipeTasks(ExecPipe)
.AddAggregatePipe(ExecPipe)
.AddTaskCondition(ExecTask)
.WithCondition(CondTrue)
.BuildAndCreate()
Expand All @@ -112,7 +112,7 @@ await PipeAndFilter.New<MyClass>()
public async Task PipeWith10TaskConditionAsync()
{
var aux = PipeAndFilter.New<MyClass>()
.AddPipeTasks(ExecPipe)
.AddAggregatePipe(ExecPipe)
.MaxDegreeProcess(4);
for (int i = 0; i < 10; i++)
{
Expand Down
45 changes: 38 additions & 7 deletions Samples/SampleConsole/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Diagnostics.Contracts;
using PipeFilterCore;
using PipeFilterCore;

namespace PipeFilterCoreSamples
{
Expand All @@ -18,17 +17,39 @@ public static async Task<int> Main()
.WithGotoCondition(CondFalse, "LastPipe")
.WithCondition(CondTrue)
.WithCondition(CondTrue)
.AddPipe(ExecPipe)
.AfterRunningPipe(ExecPipeAfter)
.WithCondition(CondTrue)
.WithGotoCondition(CondFalse, "LastPipe")
.AddPipe(ExecPipe100)
.AddPipeTasks(AgregateTask)
.AfterRunningAggregatePipe(ExecPipeAfterTask)
.MaxDegreeProcess(8)
.AddTaskCondition(Task50)
.WithCondition(CondTrue)
.AddTask(Task100)
.AddAggregatePipe(AggregateTask)
.WithCondition(CondTrue)
.WithGotoCondition(CondFalse, "LastPipe")
.MaxDegreeProcess(4)
.AddTask(Task50)
.AddTaskCondition(Task100)
.WithCondition(CondTrue)
.WithCondition(CondFalse)
.AddTask(Task150)
.AddTask(Task150)
.AddAggregatePipe(AggregateTask)
.WithCondition(CondTrue)
.AddTaskCondition(Task100)
.WithCondition(CondTrue)
.AfterRunningAggregatePipe(ExecPipeAfterTask)
.MaxDegreeProcess(8)
.AddTaskCondition(Task50)
.WithCondition(CondTrue)
.AddTask(Task100)
.AddAggregatePipe(AggregateTask)
.WithCondition(CondTrue)
.AddTaskCondition(Task100)
.WithCondition(CondTrue)
.AddPipe(ExecPipe, "LastPipe")
.AfterRunningPipe(ExecPipeAfterTask)
.BuildAndCreate()
.Init(contract)
.CorrelationId(null)
Expand All @@ -41,7 +62,7 @@ public static async Task<int> Main()
Console.WriteLine($"{item.Alias}:{item.Status.Value} Count: {item.Count} => {item.Status.Elapsedtime}");
foreach (var det in item.StatusDetails)
{
Console.WriteLine($"\t{det.TypeExec}:{det.GotoAlias ?? det.Alias}:{det.Condition} => {det.Value}:{det.Elapsedtime} UTC:{det.DateRef:MM/dd/yyyy hh:mm:ss ffff}");
Console.WriteLine($"\t{det.TypeExec}:{det.GotoAlias ?? string.Empty}:{det.Alias ?? string.Empty}:{det.Condition}:{det.ToAliasCondition ?? string.Empty} => {det.Value}:{det.Elapsedtime} UTC:{det.DateRef:MM/dd/yyyy hh:mm:ss ffff}");
}
}

Expand Down Expand Up @@ -108,7 +129,17 @@ private static Task ExecPipe(EventPipe<MyClass> pipe, CancellationToken token)
return Task.CompletedTask;
}

private static Task AgregateTask(EventPipe<MyClass> pipe, CancellationToken token)
private static Task ExecPipeAfter(EventPipe<MyClass> pipe, CancellationToken token)
{
return Task.CompletedTask;
}

private static Task ExecPipeAfterTask(EventPipe<MyClass> pipe, CancellationToken token)
{
return Task.CompletedTask;
}

private static Task AggregateTask(EventPipe<MyClass> pipe, CancellationToken token)
{
return Task.CompletedTask;
}
Expand Down
85 changes: 85 additions & 0 deletions Src/CommandsInterface/IPipeAndFilterAfterAggregate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
namespace PipeFilterCore
{
/// <summary>
/// Represents the commands after running Aggregate pipe.
/// </summary>
/// <typeparam name="T">Type of contract.</typeparam>
public interface IPipeAndFilterAfterAggregate<T> : IPipeAndFilterBuild<T> where T : class
{
/// <summary>
/// Add new pipe.
/// </summary>
/// <param name="command">The handler to execute.</param>
/// <param name="alias">
/// The unique alias for pipe.
/// <br>If the alias is omitted, the alias will be the handler name followed by the reference quantity (if any).</br>
/// <br>Alias ​​is used to reference in another pipe.</br>
/// </param>
/// <returns><see cref="IPipeAndFilterPipe{T}"/></returns>
IPipeAndFilterPipe<T> AddPipe(Func<EventPipe<T>, CancellationToken, Task>?command = null, string? alias = null);

/// <summary>
/// Add new aggregate pipe.
/// </summary>
/// <param name="command">The handler to execute.
/// <br>The handler command will run after all tasks are executed.</br>
/// </param>
/// <param name="alias">
/// The unique alias for pipe.
/// <br>If the alias is omitted, the alias will be the handler name followed by the reference quantity (if any).</br>
/// <br>Alias ​​is used to reference in another pipe.</br>
/// </param>
/// <returns><see cref="IPipeAndFilterAggregate{T}"/></returns>
IPipeAndFilterAggregate<T> AddAggregatePipe(Func<EventPipe<T>, CancellationToken, Task>? command = null, string? alias = null);

/// <summary>
/// Add new task (execution in parallel) through pipe.
/// </summary>
/// <param name="command">The handler to execute.</param>
/// <param name="nametask">The name for task (optional).</param>
/// <returns><see cref="IPipeAndFilterAfterAggregate{T}"/></returns>
IPipeAndFilterAfterAggregate<T> AddTask(Func<EventPipe<T>, CancellationToken, Task> command, string? nametask = null);

/// <summary>
/// Maximum number of concurrent tasks enable.
/// </summary>
/// <param name="value">
/// Number of concurrent tasks.
/// <br>The default value is number of processors.</br>
/// </param>
/// <returns><see cref="IPipeAndFilterAfterAggregate{T}"/></returns>
IPipeAndFilterAfterAggregate<T> MaxDegreeProcess(int value);


/// <summary>
/// Add new task (execution in parallel) through pipe with conditions.
/// </summary>
/// <param name="command">The handler to execute.</param>
/// <param name="nametask">The name for task (optional).</param>
/// <returns><see cref="IPipeAndFilterAfterAggregateCondition{T}"/></returns>
IPipeAndFilterAfterAggregateCondition<T> AddTaskCondition(Func<EventPipe<T>, CancellationToken, Task> command, string? nametask = null);

/// <summary>
/// Add new condition.
/// </summary>
/// <param name="condition">The handle to execute.</param>
/// <param name="namecondition">The name for condition(optional).</param>
/// <returns><see cref="IPipeAndFilterAfterAggregate{T}"/></returns>
IPipeAndFilterAfterAggregate<T> WithCondition(Func<EventPipe<T>, CancellationToken, ValueTask<bool>> condition, string? namecondition = null);

/// <summary>
/// Add new go to condition.
/// <br>If the condition is true, jump to the given pipe without executing the current pipe.</br>
/// <br>If the false condition continues.</br>
/// </summary>
/// <param name="condition">The handle to execute.</param>
/// <param name="aliasgoto">
/// The alias to another pipe.
/// </param>
/// <param name="namecondition">The name for condition(optional).</param>
/// <returns><see cref="IPipeAndFilterAfterAggregate{T}"/></returns>
IPipeAndFilterAfterAggregate<T> WithGotoCondition(Func<EventPipe<T>, CancellationToken, ValueTask<bool>> condition, string aliasgoto, string? namecondition = null);


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,53 @@
namespace PipeFilterCore
{
/// <summary>
/// Represents commands for conditions.
/// Represents command conditions for after pipe Aggregate.
/// </summary>
/// <typeparam name="T">Type of contract.</typeparam>
public interface IPipeAndFilterTaskCondition<T>: IPipeAndFilterBuild<T> where T : class
public interface IPipeAndFilterAfterAggregateCondition<T>: IPipeAndFilterBuild<T> where T : class
{
/// <summary>
/// Add new pipe.
/// </summary>
/// <param name="command">The handler pipe to execute.</param>
/// <param name="command">The handler to execute.</param>
/// <param name="alias">
/// The unique alias for pipe.
/// <br>If the alias is omitted, the alias will be the handler name followed by the reference quantity (if any).</br>
/// <br>Alias ​​is used to reference in another pipe.</br>
/// </param>
/// <returns><see cref="IPipeAndFilterAdd{T}"/></returns>
IPipeAndFilterAdd<T> AddPipe(Func<EventPipe<T>, CancellationToken, Task> command, string? alias = null);
/// <returns><see cref="IPipeAndFilterPipe{T}"/></returns>
IPipeAndFilterPipe<T> AddPipe(Func<EventPipe<T>, CancellationToken, Task>? command = null, string? alias = null);

/// <summary>
/// Add new pipe aggregate tasks.
/// Add new aggregate pipe.
/// </summary>
/// <param name="command">The handler pipe aggregate to execute.
/// <param name="command">The handler to execute.
/// <br>The handler command will run after all tasks are executed.</br>
/// </param>
/// <param name="alias">
/// The unique alias for pipe.
/// <br>If the alias is omitted, the alias will be the handler name followed by the reference quantity (if any).</br>
/// <br>Alias ​​is used to reference in another pipe.</br>
/// </param>
/// <returns><see cref="IPipeAndFilterTasks{T}"/></returns>
IPipeAndFilterTasks<T> AddPipeTasks(Func<EventPipe<T>, CancellationToken, Task> command, string? alias = null);
/// <returns><see cref="IPipeAndFilterAggregate{T}"/></returns>
IPipeAndFilterAggregate<T> AddAggregatePipe(Func<EventPipe<T>, CancellationToken, Task>? command = null, string? alias = null);


/// <summary>
/// Add new task (execution in parallel) through pipe.
/// Add new task (execution in parallel) for after pipe Aggregate.
/// </summary>
/// <param name="command">The handler task to execute.</param>
/// <param name="command">The handler to execute.</param>
/// <param name="nametask">The name for task (optional).</param>
/// <returns><see cref="IPipeAndFilterTasks{T}"/></returns>
IPipeAndFilterTasks<T> AddTask(Func<EventPipe<T>, CancellationToken, Task> command, string? nametask = null);
/// <returns><see cref="IPipeAndFilterAfterAggregate{T}"/></returns>
IPipeAndFilterAfterAggregate<T> AddTask(Func<EventPipe<T>, CancellationToken, Task> command, string? nametask = null);

/// <summary>
/// Add new condition for task.
/// </summary>
/// <param name="condition">The handle condition to execute.</param>
/// <param name="condition">The handleto execute.</param>
/// <param name="namecondition">The name for condition(optional).</param>
/// <returns><see cref="IPipeAndFilterTaskCondition{T}"/></returns>
IPipeAndFilterTaskCondition<T> WithCondition(Func<EventPipe<T>, CancellationToken, ValueTask<bool>> condition, string? namecondition = null);
/// <returns><see cref="IPipeAndFilterAfterAggregateCondition{T}"/></returns>
IPipeAndFilterAfterAggregateCondition<T> WithCondition(Func<EventPipe<T>, CancellationToken, ValueTask<bool>> condition, string? namecondition = null);
}

}
Loading

0 comments on commit 922ccee

Please sign in to comment.