forked from temporalio/samples-dotnet
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update SDK and add context propagation sample (temporalio#68)
Fixes temporalio#14
- Loading branch information
Showing
12 changed files
with
413 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
169 changes: 169 additions & 0 deletions
169
src/ContextPropagation/ContextPropagationInterceptor.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
namespace TemporalioSamples.ContextPropagation; | ||
|
||
using System.Threading.Tasks; | ||
using Temporalio.Api.Common.V1; | ||
using Temporalio.Client; | ||
using Temporalio.Client.Interceptors; | ||
using Temporalio.Converters; | ||
using Temporalio.Worker.Interceptors; | ||
using Temporalio.Workflows; | ||
|
||
/// <summary> | ||
/// General purpose interceptor that can be used to propagate async-local context through workflows | ||
/// and activities. This must be set on the client used for interacting with workflows and used for | ||
/// the worker. | ||
/// </summary> | ||
/// <typeparam name="T">Context data type.</typeparam> | ||
public class ContextPropagationInterceptor<T> : IClientInterceptor, IWorkerInterceptor | ||
{ | ||
private readonly AsyncLocal<T> context; | ||
private readonly IPayloadConverter payloadConverter; | ||
private readonly string headerKey; | ||
|
||
public ContextPropagationInterceptor( | ||
AsyncLocal<T> context, | ||
IPayloadConverter payloadConverter, | ||
string headerKey = "__my_context_key") | ||
{ | ||
this.context = context; | ||
this.payloadConverter = payloadConverter; | ||
this.headerKey = headerKey; | ||
} | ||
|
||
public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) => | ||
new ContextPropagationClientOutboundInterceptor(this, nextInterceptor); | ||
|
||
public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor nextInterceptor) => | ||
new ContextPropagationWorkflowInboundInterceptor(this, nextInterceptor); | ||
|
||
public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) => | ||
new ContextPropagationActivityInboundInterceptor(this, nextInterceptor); | ||
|
||
private Dictionary<string, Payload> HeaderFromContext(IDictionary<string, Payload>? existing) | ||
{ | ||
var ret = existing != null ? | ||
new Dictionary<string, Payload>(existing) : new Dictionary<string, Payload>(1); | ||
ret[headerKey] = payloadConverter.ToPayload(context.Value); | ||
return ret; | ||
} | ||
|
||
private void WithHeadersApplied(IReadOnlyDictionary<string, Payload>? headers, Action func) => | ||
WithHeadersApplied( | ||
headers, | ||
() => | ||
{ | ||
func(); | ||
return (object?)null; | ||
}); | ||
|
||
private TResult WithHeadersApplied<TResult>( | ||
IReadOnlyDictionary<string, Payload>? headers, Func<TResult> func) | ||
{ | ||
if (headers?.TryGetValue(headerKey, out var payload) == true && payload != null) | ||
{ | ||
context.Value = payloadConverter.ToValue<T>(payload); | ||
} | ||
// These are async local, no need to unapply afterwards | ||
return func(); | ||
} | ||
|
||
private class ContextPropagationClientOutboundInterceptor : ClientOutboundInterceptor | ||
{ | ||
private readonly ContextPropagationInterceptor<T> root; | ||
|
||
public ContextPropagationClientOutboundInterceptor( | ||
ContextPropagationInterceptor<T> root, ClientOutboundInterceptor next) | ||
: base(next) => this.root = root; | ||
|
||
public override Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowAsync<TWorkflow, TResult>( | ||
StartWorkflowInput input) => | ||
base.StartWorkflowAsync<TWorkflow, TResult>( | ||
input with { Headers = root.HeaderFromContext(input.Headers) }); | ||
|
||
public override Task SignalWorkflowAsync(SignalWorkflowInput input) => | ||
base.SignalWorkflowAsync( | ||
input with { Headers = root.HeaderFromContext(input.Headers) }); | ||
|
||
public override Task<TResult> QueryWorkflowAsync<TResult>(QueryWorkflowInput input) => | ||
base.QueryWorkflowAsync<TResult>( | ||
input with { Headers = root.HeaderFromContext(input.Headers) }); | ||
|
||
public override Task<WorkflowUpdateHandle<TResult>> StartWorkflowUpdateAsync<TResult>( | ||
StartWorkflowUpdateInput input) => | ||
base.StartWorkflowUpdateAsync<TResult>( | ||
input with { Headers = root.HeaderFromContext(input.Headers) }); | ||
} | ||
|
||
private class ContextPropagationWorkflowInboundInterceptor : WorkflowInboundInterceptor | ||
{ | ||
private readonly ContextPropagationInterceptor<T> root; | ||
|
||
public ContextPropagationWorkflowInboundInterceptor( | ||
ContextPropagationInterceptor<T> root, WorkflowInboundInterceptor next) | ||
: base(next) => this.root = root; | ||
|
||
public override void Init(WorkflowOutboundInterceptor outbound) => | ||
base.Init(new ContextPropagationWorkflowOutboundInterceptor(root, outbound)); | ||
|
||
public override Task<object?> ExecuteWorkflowAsync(ExecuteWorkflowInput input) => | ||
root.WithHeadersApplied(Workflow.Info.Headers, () => Next.ExecuteWorkflowAsync(input)); | ||
|
||
public override Task HandleSignalAsync(HandleSignalInput input) => | ||
root.WithHeadersApplied(input.Headers, () => Next.HandleSignalAsync(input)); | ||
|
||
public override object? HandleQuery(HandleQueryInput input) => | ||
root.WithHeadersApplied(input.Headers, () => Next.HandleQuery(input)); | ||
|
||
public override void ValidateUpdate(HandleUpdateInput input) => | ||
root.WithHeadersApplied(input.Headers, () => Next.ValidateUpdate(input)); | ||
|
||
public override Task<object?> HandleUpdateAsync(HandleUpdateInput input) => | ||
root.WithHeadersApplied(input.Headers, () => Next.HandleUpdateAsync(input)); | ||
} | ||
|
||
private class ContextPropagationWorkflowOutboundInterceptor : WorkflowOutboundInterceptor | ||
{ | ||
private readonly ContextPropagationInterceptor<T> root; | ||
|
||
public ContextPropagationWorkflowOutboundInterceptor( | ||
ContextPropagationInterceptor<T> root, WorkflowOutboundInterceptor next) | ||
: base(next) => this.root = root; | ||
|
||
public override Task<TResult> ScheduleActivityAsync<TResult>( | ||
ScheduleActivityInput input) => | ||
Next.ScheduleActivityAsync<TResult>( | ||
input with { Headers = root.HeaderFromContext(input.Headers) }); | ||
|
||
public override Task<TResult> ScheduleLocalActivityAsync<TResult>( | ||
ScheduleLocalActivityInput input) => | ||
Next.ScheduleLocalActivityAsync<TResult>( | ||
input with { Headers = root.HeaderFromContext(input.Headers) }); | ||
|
||
public override Task SignalChildWorkflowAsync( | ||
SignalChildWorkflowInput input) => | ||
Next.SignalChildWorkflowAsync( | ||
input with { Headers = root.HeaderFromContext(input.Headers) }); | ||
|
||
public override Task SignalExternalWorkflowAsync( | ||
SignalExternalWorkflowInput input) => | ||
Next.SignalExternalWorkflowAsync( | ||
input with { Headers = root.HeaderFromContext(input.Headers) }); | ||
|
||
public override Task<ChildWorkflowHandle<TWorkflow, TResult>> StartChildWorkflowAsync<TWorkflow, TResult>( | ||
StartChildWorkflowInput input) => | ||
Next.StartChildWorkflowAsync<TWorkflow, TResult>( | ||
input with { Headers = root.HeaderFromContext(input.Headers) }); | ||
} | ||
|
||
private class ContextPropagationActivityInboundInterceptor : ActivityInboundInterceptor | ||
{ | ||
private readonly ContextPropagationInterceptor<T> root; | ||
|
||
public ContextPropagationActivityInboundInterceptor( | ||
ContextPropagationInterceptor<T> root, ActivityInboundInterceptor next) | ||
: base(next) => this.root = root; | ||
|
||
public override Task<object?> ExecuteActivityAsync(ExecuteActivityInput input) => | ||
root.WithHeadersApplied(input.Headers, () => Next.ExecuteActivityAsync(input)); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
namespace TemporalioSamples.ContextPropagation; | ||
|
||
public static class MyContext | ||
{ | ||
public static readonly AsyncLocal<string> UserId = new(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
using Microsoft.Extensions.Logging; | ||
using Temporalio.Client; | ||
using Temporalio.Converters; | ||
using Temporalio.Worker; | ||
using TemporalioSamples.ContextPropagation; | ||
|
||
using var loggerFactory = LoggerFactory.Create(builder => | ||
builder. | ||
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). | ||
SetMinimumLevel(LogLevel.Information)); | ||
var logger = loggerFactory.CreateLogger<Program>(); | ||
|
||
// Create a client to localhost on default namespace | ||
var client = await TemporalClient.ConnectAsync(new("localhost:7233") | ||
{ | ||
LoggerFactory = loggerFactory, | ||
// This is where we set the interceptor to propagate context | ||
Interceptors = new[] | ||
{ | ||
new ContextPropagationInterceptor<string>( | ||
MyContext.UserId, | ||
DataConverter.Default.PayloadConverter), | ||
}, | ||
}); | ||
|
||
async Task RunWorkerAsync() | ||
{ | ||
// Cancellation token cancelled on ctrl+c | ||
using var tokenSource = new CancellationTokenSource(); | ||
Console.CancelKeyPress += (_, eventArgs) => | ||
{ | ||
tokenSource.Cancel(); | ||
eventArgs.Cancel = true; | ||
}; | ||
|
||
// Run worker until cancelled | ||
logger.LogInformation("Running worker"); | ||
using var worker = new TemporalWorker( | ||
client, | ||
new TemporalWorkerOptions(taskQueue: "interceptors-sample"). | ||
AddAllActivities<SayHelloActivities>(new()). | ||
AddWorkflow<SayHelloWorkflow>()); | ||
try | ||
{ | ||
await worker.ExecuteAsync(tokenSource.Token); | ||
} | ||
catch (OperationCanceledException) | ||
{ | ||
logger.LogInformation("Worker cancelled"); | ||
} | ||
} | ||
|
||
async Task ExecuteWorkflowAsync() | ||
{ | ||
// Set our user ID that can be accessed in the workflow and activity | ||
MyContext.UserId.Value = "some-user"; | ||
|
||
// Start workflow, send signal, wait for completion, issue query | ||
logger.LogInformation("Executing workflow"); | ||
var handle = await client.StartWorkflowAsync( | ||
(SayHelloWorkflow wf) => wf.RunAsync("Temporal"), | ||
new(id: "interceptors-workflow-id", taskQueue: "interceptors-sample")); | ||
await handle.SignalAsync(wf => wf.SignalCompleteAsync()); | ||
var result = await handle.GetResultAsync(); | ||
_ = await handle.QueryAsync(wf => wf.IsComplete()); | ||
logger.LogInformation("Workflow result: {Result}", result); | ||
} | ||
|
||
switch (args.ElementAtOrDefault(0)) | ||
{ | ||
case "worker": | ||
await RunWorkerAsync(); | ||
break; | ||
case "workflow": | ||
await ExecuteWorkflowAsync(); | ||
break; | ||
default: | ||
throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument"); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Interceptors | ||
|
||
This sample demonstrates how to use interceptors to propagate contextual information from an `AsyncLocal` throughout the | ||
workflows and activities. While this demonstrates context propagation specifically, it can also be used to show how to | ||
create interceptors for any other purpose. | ||
|
||
To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory in a | ||
separate terminal to start the worker: | ||
|
||
dotnet run worker | ||
|
||
Then in another terminal, run the workflow from this directory: | ||
|
||
dotnet run workflow | ||
|
||
The workflow terminal will show the completed workflow result and the worker terminal will show the contextual user ID | ||
is present in the workflow and activity. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
namespace TemporalioSamples.ContextPropagation; | ||
|
||
using Microsoft.Extensions.Logging; | ||
using Temporalio.Activities; | ||
|
||
public class SayHelloActivities | ||
{ | ||
[Activity] | ||
public string SayHello(string name) | ||
{ | ||
ActivityExecutionContext.Current.Logger.LogInformation( | ||
"Activity called by user {UserId}", | ||
MyContext.UserId.Value); | ||
return $"Hello, {name}!"; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
namespace TemporalioSamples.ContextPropagation; | ||
|
||
using Microsoft.Extensions.Logging; | ||
using Temporalio.Workflows; | ||
|
||
[Workflow] | ||
public class SayHelloWorkflow | ||
{ | ||
private bool complete; | ||
|
||
[WorkflowRun] | ||
public async Task<string> RunAsync(string name) | ||
{ | ||
Workflow.Logger.LogInformation( | ||
"Workflow called by user {UserId}", | ||
MyContext.UserId.Value); | ||
|
||
// Wait for signal then run activity | ||
await Workflow.WaitConditionAsync(() => complete); | ||
return await Workflow.ExecuteActivityAsync( | ||
(SayHelloActivities act) => act.SayHello(name), | ||
new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) }); | ||
} | ||
|
||
[WorkflowSignal] | ||
public async Task SignalCompleteAsync() | ||
{ | ||
Workflow.Logger.LogInformation( | ||
"Signal called by user {UserId}", | ||
MyContext.UserId.Value); | ||
complete = true; | ||
} | ||
|
||
[WorkflowQuery] | ||
public bool IsComplete() | ||
{ | ||
Workflow.Logger.LogInformation( | ||
"Query called by user {UserId}", | ||
MyContext.UserId.Value); | ||
return complete; | ||
} | ||
} |
Oops, something went wrong.