From 074f4107d86dacee87afc6839cbecff0a60e750f Mon Sep 17 00:00:00 2001 From: Emil Koutanov Date: Wed, 31 Jan 2024 09:27:14 +1100 Subject: [PATCH] Simplified backpressure example --- .gitignore | 3 ++- Examples/Backpressure/Backpressure.cs | 16 +++++----------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 327baed..d04193e 100644 --- a/.gitignore +++ b/.gitignore @@ -263,4 +263,5 @@ FakesAssemblies/ *.opt .idea -BenchmarkDotNet.Artifacts \ No newline at end of file +BenchmarkDotNet.Artifacts +.DS_Store \ No newline at end of file diff --git a/Examples/Backpressure/Backpressure.cs b/Examples/Backpressure/Backpressure.cs index e5c9917..f91ce95 100644 --- a/Examples/Backpressure/Backpressure.cs +++ b/Examples/Backpressure/Backpressure.cs @@ -40,31 +40,25 @@ public static async Task RunAsync() const int MaxPending = 10; var sinkActor = new SinkActor(); - var pendingWork = new Dictionary(); + var pendingWork = new List(); for (int i = 0; i < Messages; i++) { var workItem = new WorkItem(i); await FreeCapacityAsync(pendingWork, MaxPending); - pendingWork[workItem.Id] = workItem.Completion.Task; + pendingWork.Add(workItem.Completion.Task); Console.WriteLine("submitting work item {0}", workItem.Id); sinkActor.Send(workItem); } await sinkActor.Drain(); } - private static async Task FreeCapacityAsync(Dictionary pendingWork, int maxPending) + private static async Task FreeCapacityAsync(List pendingWork, int maxPending) { if (pendingWork.Count == maxPending) { Console.WriteLine("waiting for backlog to subside"); - await Task.WhenAny(pendingWork.Values); - foreach (var entry in pendingWork) - { - if (entry.Value.IsCompleted) - { - pendingWork.Remove(entry.Key); - } - } + await Task.WhenAny(pendingWork); + pendingWork.RemoveAll(task => task.IsCompleted); Console.WriteLine("reduced to {0}", pendingWork.Count); } }