Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial changes for extension channel #2812

Draft
wants to merge 1 commit into
base: v2.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,10 @@ public string DurableOrchestrationClientToString(IDurableOrchestrationClient cli
if (this.config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough)
{
// Out-of-proc v2 (aka middleware passthrough) uses gRPC instead of vanilla HTTP + JSON as the RPC protocol.
string? localRpcAddress = this.config.GetLocalRpcAddress();
if (localRpcAddress == null)
{
throw new InvalidOperationException("The local RPC address has not been configured!");
}

return JsonConvert.SerializeObject(new OrchestrationClientInputData
{
TaskHubName = string.IsNullOrEmpty(attr.TaskHub) ? client.TaskHubName : attr.TaskHub,
ConnectionName = attr.ConnectionName,
RpcBaseUrl = localRpcAddress,
RequiredQueryStringParameters = this.config.HttpApiHandler.GetUniversalQueryStrings(),
});
}

Expand Down
40 changes: 2 additions & 38 deletions src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Text;
Expand Down Expand Up @@ -70,9 +69,6 @@ public class DurableTaskExtension :
#pragma warning disable CS0169
private readonly ITelemetryActivator telemetryActivator;
#pragma warning restore CS0169
#endif
#if FUNCTIONS_V3_OR_GREATER
private readonly LocalGrpcListener localGrpcListener;
#endif
private readonly bool isOptionsConfigured;
private readonly Guid extensionGuid;
Expand Down Expand Up @@ -202,10 +198,6 @@ public DurableTaskExtension(
runtimeType == WorkerRuntimeType.Custom)
{
this.OutOfProcProtocol = OutOfProcOrchestrationProtocol.MiddlewarePassthrough;
#if FUNCTIONS_V3_OR_GREATER
this.localGrpcListener = new LocalGrpcListener(this);
this.HostLifetimeService.OnStopped.Register(this.StopLocalGrpcServer);
#endif
}
else
{
Expand Down Expand Up @@ -464,12 +456,6 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)

// The RPC server needs to be started sometime before any functions can be triggered
// and this is the latest point in the pipeline available to us.
#if FUNCTIONS_V3_OR_GREATER
if (this.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough)
{
this.StartLocalGrpcServer();
}
#endif
#if FUNCTIONS_V2_OR_GREATER
if (this.OutOfProcProtocol == OutOfProcOrchestrationProtocol.OrchestratorShim)
{
Expand All @@ -478,18 +464,6 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
#endif
}

internal string GetLocalRpcAddress()
{
#if FUNCTIONS_V3_OR_GREATER
if (this.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough)
{
return this.localGrpcListener.ListenAddress;
}
#endif

return this.HttpApiHandler.GetBaseUrl();
}

internal DurabilityProvider GetDurabilityProvider(DurableClientAttribute attribute)
{
return this.durabilityProviderFactory.GetDurabilityProvider(attribute);
Expand Down Expand Up @@ -566,18 +540,6 @@ private void StopLocalHttpServer()
}
#endif

#if FUNCTIONS_V3_OR_GREATER
private void StartLocalGrpcServer()
{
this.localGrpcListener.StartAsync().GetAwaiter().GetResult();
}

private void StopLocalGrpcServer()
{
this.localGrpcListener.StopAsync().GetAwaiter().GetResult();
}
#endif

private void ResolveAppSettingOptions()
{
if (this.Options == null)
Expand Down Expand Up @@ -1606,7 +1568,9 @@ private sealed class NoOpScaleMonitor : IScaleMonitor
/// <param name="name">A descriptive name.</param>
public NoOpScaleMonitor(string name)
{
#pragma warning disable CS0618 // Type or member is obsolete
this.Descriptor = new ScaleMonitorDescriptor(name);
#pragma warning restore CS0618 // Type or member is obsolete
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using Microsoft.ApplicationInsights.Extensibility;
#else
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options;
Expand All @@ -22,6 +24,10 @@
using Microsoft.Extensions.DependencyInjection.Extensions;
#endif

#if FUNCTIONS_V3_OR_GREATER
using Microsoft.Azure.WebJobs.Extensions.Rpc;
#endif

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
/// <summary>
Expand Down Expand Up @@ -85,7 +91,7 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder)
throw new ArgumentNullException(nameof(builder));
}

builder
IWebJobsExtensionBuilder extension = builder
.AddExtension<DurableTaskExtension>()
.BindOptions<DurableTaskOptions>();

Expand All @@ -106,6 +112,23 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder)
serviceCollection.AddSingleton<IPlatformInformation, DefaultPlatformInformation>();
#pragma warning restore CS0612, CS0618 // Type or member is obsolete

#if FUNCTIONS_V3_OR_GREATER
serviceCollection.AddSingleton(sp =>
{
foreach (IExtensionConfigProvider cfg in sp.GetServices<IExtensionConfigProvider>())
{
if (cfg is DurableTaskExtension ext)
{
return ext;
}
}

throw new InvalidOperationException($"Unable to resolve service {typeof(DurableTaskExtension)}.");
});

extension.MapWorkerGrpcService<TaskHubGrpcServer>();
#endif

return builder;
}

Expand Down
Loading