From 43bd56b03779c863654df877fb21f3c90c3640d6 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sun, 7 Jan 2024 16:58:43 -0500 Subject: [PATCH] SelectValueTaskAsync Fixes, Benchmarks --- .../Streams/SelectAsyncBenchmarks.cs | 189 ++++++++++++++++++ .../Dsl/Internal/InternalFlowOperations.cs | 6 + src/core/Akka.Streams/Dsl/SourceOperations.cs | 5 + .../Akka.Streams/Implementation/Fusing/Ops.cs | 152 ++++++++------ 4 files changed, 293 insertions(+), 59 deletions(-) create mode 100644 src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs diff --git a/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs new file mode 100644 index 00000000000..47f6dd3f5d8 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs @@ -0,0 +1,189 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Streams; +using Akka.Streams.Dsl; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Streams; + +[Config(typeof(MicroBenchmarkConfig))] +public class SelectAsyncBenchmarks +{ + public struct IntOrCompletion + { + public readonly int IntValue; + public readonly TaskCompletionSource? Completion; + + public IntOrCompletion(int intValue, TaskCompletionSource? completion) + { + IntValue = intValue; + Completion = completion; + } + } + private ActorSystem system; + private ActorMaterializer materializer; + + private IRunnableGraph simpleGraph; + private Task selectAsyncStub; + private Channel asyncCh; + private Task selectValueTaskAsyncStub; + private Channel vtAsyncCh; + private Task selectAsyncSyncStub; + private Task selectAsyncValueTaskSyncStub; + private Channel asyncChSync; + private Channel vtAsyncChSync; + + [GlobalSetup] + public void Setup() + { + system = ActorSystem.Create("system"); + materializer = system.Materializer(); + asyncCh = Channel.CreateUnbounded(); + + asyncChSync = Channel.CreateUnbounded(); + + vtAsyncChSync = Channel.CreateUnbounded(); + + selectAsyncSyncStub = Source.ChannelReader(asyncChSync.Reader) + .SelectAsync(4, a => + { + if (a.Completion != null) + { + a.Completion.TrySetResult(); + } + else + { + } + + return Task.FromResult(NotUsed.Instance); + }).RunWith(Sink.Ignore(), materializer); + + selectAsyncValueTaskSyncStub = Source.ChannelReader(vtAsyncChSync.Reader) + .SelectValueTaskAsync(4, a => + { + if (a.Completion != null) + { + a.Completion.TrySetResult(); + } + else + { + } + + return ValueTask.FromResult(NotUsed.Instance); + }).RunWith(Sink.Ignore(), materializer); + selectAsyncStub = Source.ChannelReader(asyncCh.Reader) + .SelectAsync(4, async a => + { + if (a.Completion != null) + { + a.Completion.TrySetResult(); + } + else + { + //await Task.Yield(); + await Task.Delay(0); + } + + return NotUsed.Instance; + }).RunWith(Sink.Ignore(), materializer); + vtAsyncCh = Channel.CreateUnbounded(); + int vta = 0; + selectValueTaskAsyncStub = Source.ChannelReader(vtAsyncCh.Reader) + .SelectValueTaskAsync(4, async a => + { + if (a.Completion != null) + { + a.Completion.TrySetResult(); + //return NotUsed.Instance; + } + else + { + //await Task.Yield(); + await Task.Delay(0); + //return NotUsed.Instance; + //Console.WriteLine(++vta); + //return vta; + } + + return NotUsed.Instance; + }).RunWith(Sink.Ignore(), materializer); + } + + [GlobalCleanup] + public void Cleanup() + { + materializer.Dispose(); + system.Dispose(); + } + + [Benchmark] + public async Task RunSelectAsync() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task RunSelectValueTaskAsync() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + [Benchmark] + public async Task RunSelectAsyncSync() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncChSync.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task RunSelectValueTaskAsyncSync() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + vtAsyncChSync.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + vtAsyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index 7000d6470be..1d03ace647f 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -339,6 +339,12 @@ public static IFlow SelectAsync(this IFlow(parallelism, asyncMapper)); } + + public static IFlow SelectValueTaskAsync(this IFlow flow, int parallelism, + Func> asyncMapper) + { + return flow.Via(new Fusing.SelectValueTaskAsync(parallelism, asyncMapper)); + } /// /// Transform this stream by applying the given function to each of the elements diff --git a/src/core/Akka.Streams/Dsl/SourceOperations.cs b/src/core/Akka.Streams/Dsl/SourceOperations.cs index 7ceed16caae..29822362060 100644 --- a/src/core/Akka.Streams/Dsl/SourceOperations.cs +++ b/src/core/Akka.Streams/Dsl/SourceOperations.cs @@ -281,6 +281,11 @@ public static Source SelectAsync(this Source)InternalFlowOperations.SelectAsync(flow, parallelism, asyncMapper); } + + public static Source SelectValueTaskAsync(this Source flow, int parallelism, Func> asyncMapper) + { + return (Source)InternalFlowOperations.SelectValueTaskAsync(flow, parallelism, asyncMapper); + } /// /// Transform this stream by applying the given function to each of the elements diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index ca226ccfb58..dc732e10f8e 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -24,6 +24,7 @@ using Akka.Streams.Util; using Akka.Util; using Akka.Util.Internal; +using Debug = System.Diagnostics.Debug; using Decider = Akka.Streams.Supervision.Decider; using Directive = Akka.Streams.Supervision.Directive; @@ -2520,6 +2521,37 @@ public sealed class NotYetThereSentinel : Exception public static readonly NotYetThereSentinel Instance = new(); } + public readonly struct SlimResult + { + public readonly Exception Error; + public readonly T Result; + + public static readonly SlimResult NotYetReady = + new SlimResult(NotYetThereSentinel.Instance, default); + + public SlimResult(Exception errorOrSentinel, T result) + { + if (result == null) + { + Error = errorOrSentinel ?? ReactiveStreamsCompliance + .ElementMustNotBeNullException; + } + else + { + Result = result; + } + } + + public bool IsSuccess() + { + return Error == null; + } + + public bool IsDone() + { + return Error != NotYetThereSentinel.Instance; + } + } /// /// INTERNAL API /// @@ -2535,11 +2567,11 @@ private sealed class Logic : InAndOutGraphStageLogic { private sealed class Holder { - public Result Element { get; private set; } + public SlimResult Element { get; private set; } private readonly Action> _callback; private ValueTask _pending; - private static readonly Action OnCompletedAction = + /*private static readonly Action OnCompletedAction = CompletionActionVt; private static readonly Action, object> @@ -2560,7 +2592,9 @@ private static readonly Action, object> ca.Invoke(Result.Success(t.Result)); } }; - + */ + private readonly Action TaskCompletedAction; + /* private static void CompletionActionVt(object discard) { var inst = (Holder)discard; @@ -2574,7 +2608,7 @@ private static void CompletionActionVt(object discard) { inst.VTCompletionError(vtCapture); } - } + }*/ private void VTCompletionError(ValueTask vtCapture) { @@ -2587,69 +2621,63 @@ private void VTCompletionError(ValueTask vtCapture) ? t.Exception.InnerExceptions[0] : t.Exception; - Invoke(Result.Failure(exception)); + Invoke(new SlimResult(exception,default)); + } + else + { + //Console.WriteLine("Unexpected condition, CompletionError without faulted!!!!"); } } - public Holder(Result element, Action> callback) + public Holder(SlimResult element, Action> callback) { _callback = callback; Element = element; + TaskCompletedAction = () => + { + var inst = this._pending; + this._pending = default; + if (inst.IsCompletedSuccessfully) + { + this.Invoke(new SlimResult(default,inst.Result)); + } + else + { + this.VTCompletionError(inst); + } + }; } - public void SetElement(Result result) + public void SetElement(SlimResult result) { - Element = result.IsSuccess && result.Value == null - ? Result.Failure(ReactiveStreamsCompliance - .ElementMustNotBeNullException) - : result; + Element = result; } public void SetContinuation(ValueTask vt) { - var valueTask = vt; - var peeker = - Unsafe.As, ValueTaskCheatingPeeker>( - ref valueTask); - if (peeker._obj == null) - { - Invoke(Result.Success(peeker._result)); - } - else if (peeker._obj is Task asTask) - { - asTask.ContinueWith(TaskCompletedAction, this, - TaskContinuationOptions.NotOnCanceled); - } - else - { - _pending = vt; - var source = - Unsafe.As>(peeker._obj); - source.OnCompleted(OnCompletedAction, this, - peeker._token, - ValueTaskSourceOnCompletedFlags.None); - } + _pending = vt; + vt.ConfigureAwait(true).GetAwaiter() + .OnCompleted(TaskCompletedAction); } - public void Invoke(Result result) + public void Invoke(SlimResult result) { SetElement(result); _callback(this); } } - private static readonly Result NotYetThere = - Result.Failure(NotYetThereSentinel.Instance); + private static readonly SlimResult NotYetThere = + SlimResult.NotYetReady; private readonly SelectValueTaskAsync _stage; private readonly Decider _decider; private IBuffer> _buffer; private readonly Action> _taskCallback; - private readonly - ConcurrentQueue< - Holder> _queue; - + //Use this to hold on to reused holders. + private readonly ConcurrentQueue> _holderReuseQueue; + public Logic(Attributes inheritedAttributes, SelectValueTaskAsync stage) : base(stage.Shape) { @@ -2660,8 +2688,10 @@ public Logic(Attributes inheritedAttributes, ? attr.Decider : Deciders.StoppingDecider; - _taskCallback = GetAsyncCallback>(HolderCompleted); - _queue = + _taskCallback = + GetAsyncCallback>(hc => + HolderCompleted(hc)); + _holderReuseQueue = new ConcurrentQueue< Holder>(); SetHandlers(stage.In, stage.Out, this); @@ -2669,7 +2699,7 @@ public Logic(Attributes inheritedAttributes, private Holder RentOrGet() { - if (_queue.TryDequeue(out var item)) + if (_holderReuseQueue.TryDequeue(out var item)) { return item; } @@ -2686,20 +2716,16 @@ public override void OnPush() { var task = _stage._mapFunc(message); var holder = RentOrGet(); - //var holder = new Holder(NotYetThere, _taskCallback); _buffer.Enqueue(holder); - // We dispatch the task if it's ready to optimize away // scheduling it to an execution context if (task.IsCompletedSuccessfully) { - holder.SetElement(Result.Success(task.Result)); + holder.SetElement(new SlimResult(null,task.Result)); HolderCompleted(holder); } else holder.SetContinuation(task); - //task.GetAwaiter().ContinueWith(t => holder.Invoke(Result.FromTask(t)), - // TaskContinuationOptions.ExecuteSynchronously); } catch (Exception e) { @@ -2714,7 +2740,7 @@ private void onPushErrorDecider(Exception e, TIn message) { var strategy = _decider(e); Log.Error(e, - "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", + "An exception occured inside SelectValueTaskAsync while processing message [{0}]. Supervision strategy: {1}", message, strategy); switch (strategy) { @@ -2743,8 +2769,10 @@ public override void OnUpstreamFinish() private int Todo => _buffer.Used; - public override void PreStart() => _buffer = - Buffer.Create>(_stage._parallelism, Materializer); + public override void PreStart() => + _buffer = + Buffer.Create>(_stage._parallelism, + Materializer); private void PushOne() { @@ -2754,25 +2782,31 @@ private void PushOne() if (_buffer.IsEmpty) { if (IsClosed(inlet)) + { CompleteStage(); + } else if (!HasBeenPulled(inlet)) + { Pull(inlet); + } } - else if (_buffer.Peek().Element == NotYetThere) + else if (_buffer.Peek().Element.IsDone() == false) { if (Todo < _stage._parallelism && !HasBeenPulled(inlet)) + { TryPull(inlet); + } } else { var dequeued = _buffer.Dequeue(); - var result = dequeued.Element; + var result = dequeued!.Element; dequeued.SetElement(NotYetThere); - _queue.Enqueue(dequeued); - if (!result.IsSuccess) + _holderReuseQueue.Enqueue(dequeued); + if (!result.IsSuccess()) continue; - Push(_stage.Out, result.Value); + Push(_stage.Out, result.Result); if (Todo < _stage._parallelism && !HasBeenPulled(inlet)) TryPull(inlet); @@ -2785,17 +2819,17 @@ private void PushOne() private void HolderCompleted(Holder holder) { var element = holder.Element; - if (element.IsSuccess) + if (element.IsSuccess()) { if (IsAvailable(_stage.Out)) PushOne(); return; } - var exception = element.Exception; + var exception = element.Error; var strategy = _decider(exception); Log.Error(exception, - "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", + "An exception occured inside SelectValueTaskAsync while executing Task. Supervision strategy: {0}", strategy); switch (strategy) {