diff --git a/src/Demos/GreenFeetWorkFlow.Tests/AdoSingletonStepTests.cs b/src/Demos/GreenFeetWorkFlow.Tests/AdoSingletonStepTests.cs index bbceb84..7e0d462 100644 --- a/src/Demos/GreenFeetWorkFlow.Tests/AdoSingletonStepTests.cs +++ b/src/Demos/GreenFeetWorkFlow.Tests/AdoSingletonStepTests.cs @@ -42,10 +42,10 @@ public void When_creating_two_identical_singleton_Then_fail() var step = new Step(helper.RndName) { Singleton = true, }; var step2 = new Step(helper.RndName) { Singleton = true, }; - Action act = () => engine.Data.AddSteps(new[] { step, step2 }); + Func act = () => engine.Data.AddStepsAsync(new[] { step, step2 }); act.Should() - .Throw() + .ThrowAsync() .WithMessage("Cannot insert duplicate key row*"); } } diff --git a/src/Demos/GreenFeetWorkFlow.Tests/AdoTests.cs b/src/Demos/GreenFeetWorkFlow.Tests/AdoTests.cs index e57d6c7..27956b9 100644 --- a/src/Demos/GreenFeetWorkFlow.Tests/AdoTests.cs +++ b/src/Demos/GreenFeetWorkFlow.Tests/AdoTests.cs @@ -40,7 +40,7 @@ public void When_executing_OneStep_with_state_Then_succeed() } [Test] - public void When_adding_two_steps_in_the_same_transaction_Then_succeed() + public async Task When_adding_two_steps_in_the_same_transaction_Then_succeed() { string[] stepResults = new string[2]; const string name = "v1/When_adding_two_steps_in_the_same_transaction_Then_succeed"; @@ -53,11 +53,11 @@ public void When_adding_two_steps_in_the_same_transaction_Then_succeed() return ExecutionResult.Done(); }))); - using var connection = new SqlConnection(helper.ConnectionString); + await using var connection = new SqlConnection(helper.ConnectionString); connection.Open(); - using var tx = connection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); - engine.Data.AddStep(new Step(name, 0), tx); - engine.Data.AddStep(new Step(name, 1), tx); + await using var tx = connection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); + await engine.Data.AddStepAsync(new Step(name, 0), tx); + await engine.Data.AddStepAsync(new Step(name, 1), tx); tx.Commit(); engine.Start(cfg); @@ -102,10 +102,10 @@ public async Task When_executing_step_throwing_special_FailCurrentStepException_ name, GenericImplementation.Create(step => throw step.FailAsException(newSteps: new Step(nameNewStep)))) ); - engine.Data.AddStep(new Step(name) { FlowId = helper.FlowId }); + await engine.Data.AddStepAsync(new Step(name) { FlowId = helper.FlowId }); await engine.StartAsSingleWorker(cfg); - var steps = engine.Data.SearchSteps(new SearchModel(FlowId: helper.FlowId), FetchLevels.ALL); + var steps = await engine.Data.SearchStepsAsync(new SearchModel(FlowId: helper.FlowId), FetchLevels.ALL); steps.Is(@" [ { @@ -360,7 +360,7 @@ public void When_step_is_in_the_future_Then_it_wont_execute() FlowId = helper.FlowId, ScheduleTime = DateTime.Now.AddYears(35) }; - var id = engine.Data.AddStep(futureStep, null); + var id = engine.Data.AddStepAsync(futureStep, null); engine.Start(cfg); helper.AssertTableCounts(helper.FlowId, ready: 1, done: 0, failed: 0); @@ -368,7 +368,7 @@ public void When_step_is_in_the_future_Then_it_wont_execute() } [Test] - public void When_step_is_in_the_future_Then_it_can_be_activated_to_execute_now() + public async Task When_step_is_in_the_future_Then_it_can_be_activated_to_execute_now() { string? stepResult = null; @@ -381,8 +381,8 @@ public void When_step_is_in_the_future_Then_it_can_be_activated_to_execute_now() FlowId = helper.FlowId, ScheduleTime = DateTime.Now.AddYears(35) }; - var id = engine.Data.AddStep(futureStep, null); - var count = engine.Data.ActivateStep(id, null); + var id = await engine.Data.AddStepAsync(futureStep, null); + var count = await engine.Data.ActivateStepAsync(id, null); count.Should().Be(1); engine.Start(cfg); @@ -393,7 +393,7 @@ public void When_step_is_in_the_future_Then_it_can_be_activated_to_execute_now() } [Test] - public void When_step_is_in_the_future_Then_it_can_be_activated_to_execute_now_with_args() + public async Task When_step_is_in_the_future_Then_it_can_be_activated_to_execute_now_with_args() { string? stepResult = null; string args = "1234"; @@ -406,8 +406,8 @@ public void When_step_is_in_the_future_Then_it_can_be_activated_to_execute_now_w FlowId = helper.FlowId, ScheduleTime = DateTime.Now.AddYears(35) }; - var id = engine.Data.AddStep(futureStep, null); - var count = engine.Data.ActivateStep(id, args); + var id = await engine.Data.AddStepAsync(futureStep, null); + var count = await engine.Data.ActivateStepAsync(id, args); count.Should().Be(1); engine.Start(cfg); @@ -460,7 +460,7 @@ public void When_a_step_creates_two_steps_Then_those_steps_can_be_synchronized_a (int count, Guid id, DateTime maxWait) = helper.Formatter!.Deserialize<(int, Guid, DateTime)>(step.State); var sales = GroceryBuyer.SalesDb.Where(x => x.id == id).ToArray(); if (sales.Length != 2 && DateTime.Now <= maxWait) - return ExecutionResult.Rerun(scheduleTime: DateTime.Now.AddSeconds(0.1)); + return ExecutionResult.Rerun(scheduleTime: DateTime.Now.AddSeconds(0.2)); stepResult = $"total: {sales.Sum(x => x.total)}"; helper.cts.Cancel(); diff --git a/src/Demos/GreenFeetWorkFlow.Tests/EngineTests.cs b/src/Demos/GreenFeetWorkFlow.Tests/EngineTests.cs index 2af737b..ff2bfa2 100644 --- a/src/Demos/GreenFeetWorkFlow.Tests/EngineTests.cs +++ b/src/Demos/GreenFeetWorkFlow.Tests/EngineTests.cs @@ -11,13 +11,13 @@ public void Setup() } [Test] - public void When_adding_an_event_Then_an_id_PK_is_returned() + public async Task When_adding_an_event_Then_an_id_PK_is_returned() { helper.CreateEngine(); Step step = new Step(helper.RndName) { ScheduleTime = DateTime.Now.AddMonths(1) }; - var id = helper.Engine!.Data.AddStep(step); + var id = await helper.Engine!.Data.AddStepAsync(step); - var result = helper.Engine.Data.SearchSteps(new(Id: id), StepStatus.Ready); + var result = await helper.Engine.Data.SearchStepsAsync(new(Id: id), StepStatus.Ready); result.Single().Name.Should().Be(helper.RndName); } diff --git a/src/Demos/GreenFeetWorkFlow.Tests/PerformanceTests.cs b/src/Demos/GreenFeetWorkFlow.Tests/PerformanceTests.cs index 58b50a4..a6be1b9 100644 --- a/src/Demos/GreenFeetWorkFlow.Tests/PerformanceTests.cs +++ b/src/Demos/GreenFeetWorkFlow.Tests/PerformanceTests.cs @@ -7,7 +7,7 @@ namespace GreenFeetWorkflow.Tests; public class PerformanceTests { [Test] - public void Inserting_10000_steps_timing() + public async Task Inserting_10000_steps_timing() { var helper = new TestHelper(); @@ -16,7 +16,7 @@ public void Inserting_10000_steps_timing() var name = "inserttest"; var steps = Enumerable.Range(0, 10000).Select(x => new Step(name)).ToArray(); - engine.Data.AddSteps(steps); + await engine.Data.AddStepsAsync(steps); watch.Stop(); watch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(3)); diff --git a/src/Demos/GreenFeetWorkFlow.Tests/RuntimeDataTests.cs b/src/Demos/GreenFeetWorkFlow.Tests/RuntimeDataTests.cs index 99c2834..6983280 100644 --- a/src/Demos/GreenFeetWorkFlow.Tests/RuntimeDataTests.cs +++ b/src/Demos/GreenFeetWorkFlow.Tests/RuntimeDataTests.cs @@ -14,22 +14,22 @@ public class RuntimeDataTests NumberOfWorkers: 1); [SetUp] - public void Setup() + public async Task Setup() { helper = new TestHelper(); } [Test] - public void When_searching_with_no_parameters_Then_success() + public async Task When_searching_with_no_parameters_Then_success() { var engine = helper.CreateEngine(); - var steps = engine.Data.SearchSteps(new SearchModel(), FetchLevels.ALL); + var steps = await engine.Data.SearchStepsAsync(new SearchModel(), FetchLevels.ALL); steps.Keys.Count.Should().Be(3); } [Test] - public void When_SearchSteps_Then_success() + public async Task When_SearchSteps_Then_success() { var engine = helper.CreateEngine(); var step = new Step(helper.RndName) @@ -39,47 +39,47 @@ public void When_SearchSteps_Then_success() SearchKey = Guid.NewGuid().ToString(), Description = Guid.NewGuid().ToString(), }; - var id = engine.Data.AddStep(step, null); + var id = await engine.Data.AddStepAsync(step, null); FetchLevels fetchLevels = FetchLevels.ALL; - var steps = engine.Data.SearchSteps(new SearchModel() + var steps = await engine.Data.SearchStepsAsync(new SearchModel() { Id = id, }, fetchLevels); steps[StepStatus.Ready].Single().Id.Should().Be(id); - steps = engine.Data.SearchSteps(new SearchModel() + steps = await engine.Data.SearchStepsAsync(new SearchModel() { CorrelationId = step.CorrelationId, }, fetchLevels); steps[StepStatus.Ready].Single().Id.Should().Be(id); - steps = engine.Data.SearchSteps(new SearchModel() + steps = await engine.Data.SearchStepsAsync(new SearchModel() { Name = step.Name, }, fetchLevels); steps[StepStatus.Ready].Single().Id.Should().Be(id); - steps = engine.Data.SearchSteps(new SearchModel() + steps = await engine.Data.SearchStepsAsync(new SearchModel() { FlowId = step.FlowId, }, fetchLevels); steps[StepStatus.Ready].Single().Id.Should().Be(id); - steps = engine.Data.SearchSteps(new SearchModel() + steps = await engine.Data.SearchStepsAsync(new SearchModel() { SearchKey = step.SearchKey, }, fetchLevels); steps[StepStatus.Ready].Single().Id.Should().Be(id); - steps = engine.Data.SearchSteps(new SearchModel() + steps = await engine.Data.SearchStepsAsync(new SearchModel() { Description = step.Description, }, fetchLevels); steps[StepStatus.Ready].Single().Id.Should().Be(id); // combined search - steps = engine.Data.SearchSteps(new SearchModel() + steps = await engine.Data.SearchStepsAsync(new SearchModel() { Id = id, CorrelationId = step.CorrelationId, @@ -99,11 +99,11 @@ public async Task When_reexecuting_a_step_Then_execute_it() var stepState = 12345; var step = new Step("v1/fail-and-reactivate", stepState) { FlowId = helper.FlowId, CorrelationId = helper.CorrelationId }; - var id = engine.Data.AddStep(step); + var id = await engine.Data.AddStepAsync(step); await engine.StartAsSingleWorker(cfg); - var newId = engine.Data - .ReExecuteSteps(new SearchModel(Id: id)) + var newId = (await engine.Data + .ReExecuteStepsAsync(new SearchModel(Id: id))) .Single(); var persister = helper.Persister; diff --git a/src/Demos/GreenFeetWorkFlow.Tests/TestHelper.cs b/src/Demos/GreenFeetWorkFlow.Tests/TestHelper.cs index 56c8102..ff72e1a 100644 --- a/src/Demos/GreenFeetWorkFlow.Tests/TestHelper.cs +++ b/src/Demos/GreenFeetWorkFlow.Tests/TestHelper.cs @@ -58,7 +58,7 @@ public WorkflowEngine CreateEngine(params (string, IStepImplementation)[] stepHa return Engine; } - public void CreateAndRunEngineForPerformance(Step[] steps, int workerCount, params (string, IStepImplementation)[] stepHandlers) + public async Task CreateAndRunEngineForPerformance(Step[] steps, int workerCount, params (string, IStepImplementation)[] stepHandlers) { logger = new DiagnosticsStepLogger(); logger.Configuration.TraceLoggingEnabledUntil = DateTime.MinValue; @@ -74,7 +74,7 @@ public void CreateAndRunEngineForPerformance(Step[] steps, int workerCount, para iocContainer = new AutofacAdaptor(builder.Build()); Engine = new WorkflowEngine(logger, iocContainer, Formatter); - Engine.Data.AddSteps(steps); + await Engine.Data.AddStepsAsync(steps); var workflowConfiguration = new WorkflowConfiguration(new WorkerConfig() { @@ -84,7 +84,7 @@ public void CreateAndRunEngineForPerformance(Step[] steps, int workerCount, para Engine.Start(workflowConfiguration, stoppingToken: cts.Token); } - public void CreateAndRunEngine(Step[] steps, int workerCount, params (string, IStepImplementation)[] stepHandlers) + public async Task CreateAndRunEngine(Step[] steps, int workerCount, params (string, IStepImplementation)[] stepHandlers) { logger = new DiagnosticsStepLogger(); @@ -96,7 +96,7 @@ public void CreateAndRunEngine(Step[] steps, int workerCount, params (string, IS iocContainer = new AutofacAdaptor(builder.Build()); Engine = new WorkflowEngine(logger, iocContainer, Formatter); - Engine.Data.AddSteps(steps); + await Engine.Data.AddStepsAsync(steps); var workflowConfiguration = new WorkflowConfiguration(new WorkerConfig() { @@ -109,7 +109,7 @@ public void CreateAndRunEngine(Step[] steps, int workerCount, params (string, IS Engine.Start(workflowConfiguration, stoppingToken: cts.Token); } - public void CreateAndRunEngineWithAttributes(Step[] steps, int workerCount) + public async Task CreateAndRunEngineWithAttributes(Step[] steps, int workerCount) { logger = new DiagnosticsStepLogger(); @@ -121,7 +121,7 @@ public void CreateAndRunEngineWithAttributes(Step[] steps, int workerCount) iocContainer = new AutofacAdaptor(builder.Build()); Engine = new WorkflowEngine(logger, iocContainer, Formatter); - Engine.Data.AddSteps(steps); + await Engine.Data.AddStepsAsync(steps); var workflowConfiguration = new WorkflowConfiguration(new WorkerConfig() { StopWhenNoWork = workerCount == 1 }, diff --git a/src/Demos/GreenFeetWorkFlow.WebApiDemo/WorkflowStarter.cs b/src/Demos/GreenFeetWorkFlow.WebApiDemo/WorkflowStarter.cs index 12f579e..c2070fc 100644 --- a/src/Demos/GreenFeetWorkFlow.WebApiDemo/WorkflowStarter.cs +++ b/src/Demos/GreenFeetWorkFlow.WebApiDemo/WorkflowStarter.cs @@ -12,7 +12,7 @@ public WorkflowStarter(WorkflowEngine engine) protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - engine.Data.AddStep(new Step(StepFetchWeatherForecast.Name) { Singleton = true }); + await engine.Data.AddStepAsync(new Step(StepFetchWeatherForecast.Name) { Singleton = true }); await engine.StartAsync(new WorkflowConfiguration(new WorkerConfig(), NumberOfWorkers: 1), stoppingToken: stoppingToken); } diff --git a/src/Product/GreenFeetWorkFlow/WorkflowRuntimeData.cs b/src/Product/GreenFeetWorkFlow/WorkflowRuntimeData.cs index 67f803b..0b8c00e 100644 --- a/src/Product/GreenFeetWorkFlow/WorkflowRuntimeData.cs +++ b/src/Product/GreenFeetWorkFlow/WorkflowRuntimeData.cs @@ -16,7 +16,7 @@ public WorkflowRuntimeData(IWorkflowIocContainer iocContainer, IWorkflowStepStat } /// Reschedule a ready step to 'now' and send it activation data - public int ActivateStep(int id, object? activationArguments, object? transaction = null) + public async Task ActivateStepAsync(int id, object? activationArguments, object? transaction = null) { var persister = iocContainer.GetInstance(); @@ -31,16 +31,16 @@ public int ActivateStep(int id, object? activationArguments, object? transaction return persister.Update(StepStatus.Ready, step); }, transaction); - return rows; + return await Task.FromResult(rows); } /// Add step to be executed. May throw exception if persistence layer fails. For example when inserting multiple singleton elements /// the identity of the step - public int AddStep(Step step, object? transaction = null) => AddSteps(new[] { step }, transaction).Single(); + public async Task AddStepAsync(Step step, object? transaction = null) => (await AddStepsAsync(new[] { step }, transaction)).Single(); /// Add steps to be executed. May throw exception if persistence layer fails. For example when inserting multiple singleton elements /// the identity of the steps - public int[] AddSteps(Step[] steps, object? transaction = null) + public async Task AddStepsAsync(Step[] steps, object? transaction = null) { var now = DateTime.Now; @@ -50,7 +50,8 @@ public int[] AddSteps(Step[] steps, object? transaction = null) } IStepPersister persister = iocContainer.GetInstance(); - return persister.InTransaction(() => persister.Insert(StepStatus.Ready, steps), transaction); + var result = persister.InTransaction(() => persister.Insert(StepStatus.Ready, steps), transaction); + return await Task.FromResult(result); } internal void FixupNewStep(Step? originStep, Step step, DateTime now) @@ -71,23 +72,23 @@ internal void FixupNewStep(Step? originStep, Step step, DateTime now) FormatStateForSerialization(step); } - public List SearchSteps(SearchModel criteria, StepStatus target, object? transaction = null) + public async Task> SearchStepsAsync(SearchModel criteria, StepStatus target, object? transaction = null) { IStepPersister persister = iocContainer.GetInstance(); var result = persister.InTransaction(() => persister.SearchSteps(criteria, target), transaction); - return result; + return await Task.FromResult(result); } - public Dictionary> SearchSteps(SearchModel criteria, FetchLevels fetchLevels, object? transaction = null) + public async Task< Dictionary>> SearchStepsAsync(SearchModel criteria, FetchLevels fetchLevels, object? transaction = null) { IStepPersister persister = iocContainer.GetInstance(); var result = persister.InTransaction(() => persister.SearchSteps(criteria, fetchLevels), transaction); - return result; + return await Task.FromResult(result); } /// Re-execute steps that are 'done' or 'failed' by inserting a clone into the 'ready' queue /// Ids of inserted steps - public int[] ReExecuteSteps(SearchModel criteria, object? transaction = null) + public async Task ReExecuteStepsAsync(SearchModel criteria, object? transaction = null) { IStepPersister persister = iocContainer.GetInstance(); @@ -123,7 +124,7 @@ public int[] ReExecuteSteps(SearchModel criteria, object? transaction = null) }, transaction); - return ids; + return await Task.FromResult(ids); } /// @@ -132,7 +133,7 @@ public int[] ReExecuteSteps(SearchModel criteria, object? transaction = null) /// /// /// The ids of the failed steps - public int[] FailSteps(SearchModel criteria, object? transaction = null) + public async Task FailStepsAsync(SearchModel criteria, object? transaction = null) { IStepPersister persister = iocContainer.GetInstance(); @@ -144,13 +145,12 @@ public int[] FailSteps(SearchModel criteria, object? transaction = null) { persister.Delete(StepStatus.Ready, step.Id); persister.Insert(StepStatus.Failed, step); - } return steps.Select(x=>x.Id).ToArray(); }, transaction); - return ids; + return await Task.FromResult(ids); } /// we round down to ensure a worker can pick up the step/rerun-step. if in unittest mode it may exit if not rounded.