Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve exception handling in .NET isolated #3034

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using DurableTask.Core.History;
using DurableTask.Core.Middleware;
using Microsoft.Azure.WebJobs.Host.Executors;
using Newtonsoft.Json;
using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
Expand Down Expand Up @@ -583,29 +584,47 @@ public async Task CallActivityAsync(DispatchMiddlewareContext dispatchContext, F
isReplay: false,
scheduledEvent.EventId);

bool detailsParsedFromSerializedException;

activityResult = new ActivityExecutionResult
{
ResponseEvent = new TaskFailedEvent(
eventId: -1,
taskScheduledId: scheduledEvent.EventId,
reason: $"Function '{functionName}' failed with an unhandled exception.",
details: null,
GetFailureDetails(result.Exception)),
GetFailureDetails(result.Exception, out detailsParsedFromSerializedException)),
};

if (!detailsParsedFromSerializedException && this.extension.PlatformInformationService.GetWorkerRuntimeType() == WorkerRuntimeType.DotNetIsolated)
{
this.TraceHelper.ExtensionWarningEvent(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
"Failure details not parsed from serialized exception details, worker failed to serialize exception");
}
}

// Send the result of the activity function to the DTFx dispatch pipeline.
// This allows us to bypass the default, in-process execution and process the given results immediately.
dispatchContext.SetProperty(activityResult);
}

private static FailureDetails GetFailureDetails(Exception e)
private static FailureDetails GetFailureDetails(Exception e, out bool fromSerializedException)
{
fromSerializedException = false;
if (e.InnerException != null && e.InnerException.Message.StartsWith("Result:"))
{
Exception rpcException = e.InnerException;
if (TryGetRpcExceptionFields(rpcException.Message, out string? exception, out string? stackTrace))
{
if (TryExtractSerializedFailureDetailsFromException(exception, out FailureDetails? details) && details is not null)
{
fromSerializedException = true;
return details;
}

if (TrySplitExceptionTypeFromMessage(exception, out string? exceptionType, out string? exceptionMessage))
{
return new FailureDetails(exceptionType, exceptionMessage, stackTrace, innerFailure: null, isNonRetriable: false);
Expand All @@ -624,6 +643,21 @@ private static FailureDetails GetFailureDetails(Exception e)
return new FailureDetails(e);
}

private static FailureDetails? GetFailureDetails(P.TaskFailureDetails taskFailureDetails)
{
if (taskFailureDetails is null)
{
return null;
}

return new FailureDetails(
taskFailureDetails.ErrorType ?? string.Empty,
taskFailureDetails.ErrorMessage ?? string.Empty,
taskFailureDetails.StackTrace,
GetFailureDetails(taskFailureDetails.InnerFailure),
taskFailureDetails.IsNonRetriable);
}

private static bool TryGetRpcExceptionFields(
string rpcExceptionMessage,
[NotNullWhen(true)] out string? exception,
Expand Down Expand Up @@ -666,6 +700,34 @@ private static bool TryGetRpcExceptionFields(
return true;
}

private static bool TryExtractSerializedFailureDetailsFromException(string exception, out FailureDetails? details)
{
try
{
if (exception[0] != '{')
{
details = null;
return false;
}

int newlineIndex = exception.IndexOf('\n');
string serializedMessage = newlineIndex < 0 ? exception : exception.Substring(0, newlineIndex).Trim();
P.TaskFailureDetails? taskFailureDetails = JsonConvert.DeserializeObject<P.TaskFailureDetails>(serializedMessage);
if (taskFailureDetails != null)
{
details = GetFailureDetails(taskFailureDetails);
return true;
}
}
catch (Exception)
{
// Apparently the exception message was not serialized by the worker middleware, this will be logged in CallActivityAsync()
}

details = null;
return false;
}

private static bool TrySplitExceptionTypeFromMessage(
string exception,
[NotNullWhen(true)] out string? exceptionType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Exceptions;
using Microsoft.Azure.Functions.Worker.Middleware;
using Microsoft.DurableTask.Worker.Grpc;

Expand All @@ -27,6 +28,11 @@ public Task Invoke(FunctionContext functionContext, FunctionExecutionDelegate ne
return RunEntityAsync(functionContext, triggerBinding, next);
}

if (IsActivityTrigger(functionContext, out triggerBinding))
{
return RunActivityAsync(functionContext, triggerBinding, next);
}

return next(functionContext);
}

Expand Down Expand Up @@ -94,4 +100,33 @@ static async Task RunEntityAsync(
await next(context);
context.GetInvocationResult().Value = dispatcher.Result;
}

private static bool IsActivityTrigger(
FunctionContext context, [NotNullWhen(true)] out BindingMetadata? activityTriggerBinding)
{
foreach (BindingMetadata binding in context.FunctionDefinition.InputBindings.Values)
{
if (string.Equals(binding.Type, "activityTrigger", StringComparison.OrdinalIgnoreCase))
{
activityTriggerBinding = binding;
return true;
}
}

activityTriggerBinding = null;
return false;
}

private static async Task RunActivityAsync(FunctionContext functionContext, BindingMetadata triggerBinding, FunctionExecutionDelegate next)
{
try
{
await next(functionContext);
return;
}
catch (Exception ex)
{
throw new DurableSerializationException(ex);
}
cgillum marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using Microsoft.DurableTask.Protobuf;
using Newtonsoft.Json;

namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Exceptions;

internal class DurableSerializationException : Exception
{
private Exception FromException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fields should be lowercased and, whenever possible, should be readonly.

Suggested change
private Exception FromException;
private readonly Exception fromException;


// We set the base class properties of this exception to the same as the parent,
// so that methods in the worker after this can still (typically) access the same information vs w/o
// this exception type.
internal DurableSerializationException(Exception fromException) : base(fromException.Message, fromException.InnerException)
{
this.FromException = fromException;
}

public override string ToString()
{
TaskFailureDetails? failureDetails = TaskFailureDetailsConverter.TaskFailureFromException(this.FromException);
return JsonConvert.SerializeObject(failureDetails);
}

public override string? StackTrace => this.FromException.StackTrace;
}
27 changes: 27 additions & 0 deletions src/Worker.Extensions.DurableTask/TaskFailureDetailsConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Text;
using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask;
internal class TaskFailureDetailsConverter
{
internal static P.TaskFailureDetails? TaskFailureFromException(Exception? fromException)
{
if (fromException is null)
{
return null;
}
return new P.TaskFailureDetails()
{
ErrorType = fromException.GetType().FullName,
ErrorMessage = fromException.Message,
StackTrace = fromException.StackTrace,
InnerFailure = TaskFailureFromException(fromException.InnerException),
IsNonRetriable = false
};
}
}
163 changes: 163 additions & 0 deletions test/e2e/Apps/BasicDotNetIsolated/ActivityErrorHandling.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Concurrent;
using System.Net;
using Grpc.Core;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Entities;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Durable.Tests.E2E;

public static class ActivityErrorHandling
{
private static ConcurrentDictionary<string, int> retryCount = new ConcurrentDictionary<string, int>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider naming this globalRetryCount to make it clear that it's a global (static) value.


[Function("RethrowActivityException_HttpStart")]
public static async Task<HttpResponseData> RethrowHttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("RethrowActivityException_HttpStart");

string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(RethrowActivityException));

logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

return await client.CreateCheckStatusResponseAsync(req, instanceId);
}

[Function("CatchActivityException_HttpStart")]
public static async Task<HttpResponseData> CatchHttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("RethrowActivityException_HttpStart");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be:

Suggested change
ILogger logger = executionContext.GetLogger("RethrowActivityException_HttpStart");
ILogger logger = executionContext.GetLogger("CatchHttpStart");


string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(CatchActivityException));

logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

return await client.CreateCheckStatusResponseAsync(req, instanceId);
}

[Function("RetryActivityException_HttpStart")]
public static async Task<HttpResponseData> RetryHttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("RetryActivityException_HttpStart");

string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(RetryActivityFunction));

logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

return await client.CreateCheckStatusResponseAsync(req, instanceId);
}

[Function("CustomRetryActivityException_HttpStart")]
public static async Task<HttpResponseData> CustomRetryHttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("CustomRetryActivityException_HttpStart");

string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(CustomRetryActivityFunction));

logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

return await client.CreateCheckStatusResponseAsync(req, instanceId);
}

[Function(nameof(RethrowActivityException))]
public static async Task<string> RethrowActivityException(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var output = await context.CallActivityAsync<string>(nameof(RaiseException), context.InstanceId);
return output;
}

[Function(nameof(CatchActivityException))]
public static async Task<string> CatchActivityException(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
try
{
var output = await context.CallActivityAsync<string>(nameof(RaiseException), context.InstanceId);
return output;
}
catch (TaskFailedException ex)
{
return ex.Message;
}
}

[Function(nameof(RetryActivityFunction))]
public static async Task<string> RetryActivityFunction(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var options = TaskOptions.FromRetryPolicy(new RetryPolicy(
maxNumberOfAttempts: 3,
firstRetryInterval: TimeSpan.FromSeconds(3)));

var output = await context.CallActivityAsync<string>(nameof(RaiseException), context.InstanceId, options: options);
return output;
}

[Function(nameof(CustomRetryActivityFunction))]
public static async Task<string> CustomRetryActivityFunction(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var options = TaskOptions.FromRetryHandler(retryContext => {
if (retryContext.LastFailure.IsCausedBy<InvalidOperationException>() &&
retryContext.LastFailure.InnerFailure is not null &&
retryContext.LastFailure.InnerFailure.IsCausedBy<OverflowException>() &&
retryContext.LastAttemptNumber < 3) {
return true;
}
return false;
});

string output = await context.CallActivityAsync<string>(nameof(RaiseComplexException), context.InstanceId, options: options);
return output;
}

[Function(nameof(RaiseException))]
public static string RaiseException([ActivityTrigger] string instanceId, FunctionContext executionContext)
{
if (retryCount.AddOrUpdate(instanceId, 1, (key, oldValue) => oldValue + 1) == 1)
{
throw new InvalidOperationException("This activity failed");
}
else
{
return "Success";
}
}

[Function(nameof(RaiseComplexException))]
public static string RaiseComplexException([ActivityTrigger] string instanceId, FunctionContext executionContext)
{
if (retryCount.AddOrUpdate(instanceId, 1, (key, oldValue) => oldValue + 1) == 1)
{
var exception = new InvalidOperationException("This activity failed\r\nMore information about the failure", innerException: new OverflowException("Inner exception message"));
throw exception;
}
else
{
return "Success";
}
}
}
Loading
Loading