diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index dc732e10f8e..ce464c2aa5c 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -2528,7 +2528,13 @@ public readonly struct SlimResult public static readonly SlimResult NotYetReady = new SlimResult(NotYetThereSentinel.Instance, default); - + + public static SlimResult FromTask(Task task) + { + return task.IsCanceled || task.IsFaulted + ? new SlimResult(task.Exception, default) + : new SlimResult(default, task.Result); + } public SlimResult(Exception errorOrSentinel, T result) { if (result == null) diff --git a/src/core/Akka.Streams/Implementation/PooledValueTaskContinuationHelper.cs b/src/core/Akka.Streams/Implementation/PooledValueTaskContinuationHelper.cs deleted file mode 100644 index a779888b574..00000000000 --- a/src/core/Akka.Streams/Implementation/PooledValueTaskContinuationHelper.cs +++ /dev/null @@ -1,118 +0,0 @@ -using System; -using System.Runtime.CompilerServices; -using System.Threading.Tasks; -using System.Threading.Tasks.Sources; -using Akka.Util; - -namespace Akka.Streams.Implementation; - -/// -/// We use this because ValueTask doesn't give us a polite ContinueWith, -/// i.e. one where we can pass state in. -/// -/// -internal readonly struct ValueTaskCheatingPeeker -{ - internal readonly object? _obj; - /// The result to be used if the operation completed successfully synchronously. - internal readonly T? _result; - /// Opaque value passed through to the . - internal readonly short _token; - /// true to continue on the captured context; otherwise, false. - /// Stored in the rather than in the configured awaiter to utilize otherwise padding space. - internal readonly bool _continueOnCapturedContext; -} - -internal sealed class - PooledValueTaskContinuationHelper -{ - private ValueTask _valueTask; - private readonly Action> _continuationAction; - - // ReSharper disable once StaticMemberInGenericType - private static readonly Action OnCompletedAction = - CompletionActionVt; - - private static readonly Action,object> TaskCompletedAction = (Task t,object o) => - { - var ca = (Action>)o; - if (t.IsFaulted) - { - var exception = t.Exception?.InnerExceptions != null && - t.Exception.InnerExceptions.Count == 1 - ? t.Exception.InnerExceptions[0] - : t.Exception; - - ca(new Try(exception)); - } - else - { - ca(new Try(t.Result)); - } - }; - - public PooledValueTaskContinuationHelper(Action> continuationAction) - { - _continuationAction = continuationAction; - } - - public void AttachAwaiter(ValueTask valueTask) - { - _valueTask = valueTask; - AttachOrContinue(); - } - - private void AttachOrContinue() - { - var peeker = - Unsafe.As, ValueTaskCheatingPeeker>(ref _valueTask); - if (peeker._obj == null) - { - _continuationAction(peeker._result); - } - else if (peeker._obj is Task asTask) - { - asTask.ContinueWith(TaskCompletedAction,_continuationAction, - TaskContinuationOptions.NotOnCanceled); - } - else - { - var source = Unsafe.As>(peeker._obj); - source.OnCompleted(OnCompletedAction, this, peeker._token, - ValueTaskSourceOnCompletedFlags.None); - } - } - - //TODO: May be better to have instanced and take the alloc penalty, - // To avoid casting cost here. - private static void CompletionActionVt(object discard) - { - var inst = (PooledValueTaskContinuationHelper)discard; - var vtCapture = inst._valueTask; - inst._valueTask = default; - if (vtCapture.IsCompletedSuccessfully) - { - var result = vtCapture.Result; - inst._continuationAction(result); - } - else if(vtCapture.IsCanceled == false) - { - inst.VTCompletionError(vtCapture); - } - } - - private void VTCompletionError(ValueTask vtCapture) - { - var t = vtCapture.AsTask(); - //We only care about faulted, not canceled. - if (t.IsFaulted) - { - var exception = t.Exception?.InnerExceptions != null && - t.Exception.InnerExceptions.Count == 1 - ? t.Exception.InnerExceptions[0] - : t.Exception; - - _continuationAction(new Try(exception)); - } - } -} \ No newline at end of file diff --git a/src/core/Akka.Streams/Implementation/Sources.cs b/src/core/Akka.Streams/Implementation/Sources.cs index 762c8278ae5..a19abe2c7e2 100644 --- a/src/core/Akka.Streams/Implementation/Sources.cs +++ b/src/core/Akka.Streams/Implementation/Sources.cs @@ -12,6 +12,7 @@ using Akka.Annotations; using Akka.Pattern; using Akka.Streams.Dsl; +using Akka.Streams.Implementation.Fusing; using Akka.Streams.Implementation.Stages; using Akka.Streams.Stage; using Akka.Streams.Supervision; @@ -779,8 +780,8 @@ private sealed class Logic : OutGraphStageLogic private readonly Lazy _decider; private Option _state = Option.None; - private readonly PooledValueTaskContinuationHelper> - _pooledContinuation; + private ValueTask> _currentReadVt; + private readonly Action _valueTaskAwaiterOnCompleteAction; public Logic(UnfoldResourceSourceValueTaskAsync stage, Attributes inheritedAttributes) : base(stage.Shape) { @@ -790,12 +791,11 @@ public Logic(UnfoldResourceSourceValueTaskAsync sta var strategy = inheritedAttributes.GetAttribute(null); return strategy != null ? strategy.Decider : Deciders.StoppingDecider; }); - _pooledContinuation = - new PooledValueTaskContinuationHelper>( - ReadCallback); + _valueTaskAwaiterOnCompleteAction = SelfReadCallback; SetHandler(_stage.Out, this); } + private Action> CreatedCallback => GetAsyncCallback>(resource => { if (resource.IsSuccess) @@ -830,12 +830,26 @@ private void ErrorHandler(Exception ex) throw new ArgumentOutOfRangeException(); } } - - private Action>> ReadCallback => GetAsyncCallback>>(read => + + + private void SelfReadCallback() { - if (read.IsSuccess) + var swap = _currentReadVt; + _currentReadVt = default; + if (swap.IsCompletedSuccessfully) { - var data = read.Success.Value; + ReadCallback(new SlimResult>(default,swap.Result)); + } + else + { + ReadCallback(SlimResult>.FromTask(swap.AsTask())); + } + } + private Action>> ReadCallback => GetAsyncCallback>>(read => + { + if (read.IsSuccess()) + { + var data = read.Result; if (data.HasValue) { var some = data.Value; @@ -855,7 +869,7 @@ private void ErrorHandler(Exception ex) } } } - else ErrorHandler(read.Failure.Value); + else ErrorHandler(read.Error); }); private void CloseResource() @@ -894,7 +908,9 @@ public override void OnPull() } else { - _pooledContinuation.AttachAwaiter(vt); + _currentReadVt = vt; + _currentReadVt.GetAwaiter().OnCompleted(_valueTaskAwaiterOnCompleteAction); + //_pooledContinuation.AttachAwaiter(vt); } diff --git a/src/core/Akka.Streams/Implementation/Unfold.cs b/src/core/Akka.Streams/Implementation/Unfold.cs index 010ff412dd8..488aeafd34f 100644 --- a/src/core/Akka.Streams/Implementation/Unfold.cs +++ b/src/core/Akka.Streams/Implementation/Unfold.cs @@ -10,6 +10,7 @@ using System.Threading.Tasks; using System.Threading.Tasks.Sources; using Akka.Annotations; +using Akka.Streams.Implementation.Fusing; using Akka.Streams.Stage; using Akka.Streams.Util; using Akka.Util; @@ -103,7 +104,7 @@ private sealed class Logic : OutGraphStageLogic { private readonly UnfoldValueTaskAsync _stage; private TState _state; - private Action>> _asyncHandler; + private Action>> _asyncHandler; private ValueTask> _currentTask; public Logic(UnfoldValueTaskAsync stage) : base(stage.Shape) { @@ -116,38 +117,42 @@ public Logic(UnfoldValueTaskAsync stage) : base(stage.Shape) public override void OnPull() { var vt = _stage.UnfoldFunc(_state); - var peeker = Unsafe.As>,ValueTaskCheatingPeeker>>(ref vt); - if (peeker._obj == null) - { - _asyncHandler(Result.Success>(peeker._result)); - } - else - { - _currentTask = vt; - vt.GetAwaiter().OnCompleted(CompletionAction); - } + if (vt.IsCompletedSuccessfully) + { + _asyncHandler( + new SlimResult>(default, + vt.Result)); + } + else + { + _currentTask = vt; + vt.GetAwaiter().OnCompleted(CompletionAction); + } } private void CompletionAction() { if (_currentTask.IsCompletedSuccessfully) { - _asyncHandler.Invoke(Result.Success(_currentTask.Result)); + _asyncHandler.Invoke( + new SlimResult>(default, + _currentTask.Result)); } else { _asyncHandler.Invoke( - Result.FromTask(_currentTask.AsTask())); + SlimResult>.FromTask( + _currentTask.AsTask())); } } public override void PreStart() { - var ac = GetAsyncCallback>>(result => + var ac = GetAsyncCallback>>(result => { - if (!result.IsSuccess) - Fail(_stage.Out, result.Exception); + if (!result.IsSuccess()) + Fail(_stage.Out, result.Error); else { - var option = result.Value; + var option = result.Result; if (!option.HasValue) Complete(_stage.Out); else