-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathJsonMessageHandler.cs
207 lines (189 loc) · 10.8 KB
/
JsonMessageHandler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;
using System.Text.Json.Serialization.Metadata;
using DotNext.Buffers;
namespace DotNext.Net.Cluster.Messaging;
/// <summary>
/// Helper base class to implement a two way message handler that gets called via <see cref="JsonMessage{TIn}"/>.
/// </summary>
/// <example>
/// <code>
/// public class ExampleHandler : JsonMessageHandler<ExampleDto, ExampleDto, ExampleHandler>, INameOfMessageHandler
/// {
/// private readonly ILogger<ExampleHandler> _logger;
/// public ExampleHandler(ILogger<ExampleHandler> logger) => _logger = logger;
///
/// public override Task<ExampleDto> OnMessage(ExampleDto message, ISubscriber sender, object? context, CancellationToken token)
/// {
/// _logger.LogInformation($"Got {message.MyCustomValue}");
/// return Task.FromResult<ExampleDto>(new("Got:" + message.MyCustomValue));
/// }
///
/// public static string Name => nameof(ExampleHandler);
/// }
/// </code>
/// and in Program.cs: <code>services.AddSingleton<IInputChannel, ExampleHandler>();</code>
/// </example>
/// <typeparam name="TIn">The instance type the message handler accepts. It must implement <see cref="IJsonMessageSerializable{TIn}"/>.</typeparam>
/// <typeparam name="TOut">The instance type the message handler will return. It must implement <see cref="IJsonMessageSerializable{TOut}"/>.</typeparam>
/// <typeparam name="TSelf">Implementing class that also implements <see cref="INameOfMessageHandler"/>.</typeparam>
public abstract class JsonMessageHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicProperties | DynamicallyAccessedMemberTypes.PublicFields | DynamicallyAccessedMemberTypes.Interfaces | DynamicallyAccessedMemberTypes.PublicConstructors)]TIn, [DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicProperties | DynamicallyAccessedMemberTypes.PublicFields | DynamicallyAccessedMemberTypes.Interfaces | DynamicallyAccessedMemberTypes.PublicConstructors)]TOut, TSelf> : IInputChannel
where TIn : IJsonMessageSerializable<TIn>
where TOut : IJsonMessageSerializable<TOut>
where TSelf : JsonMessageHandler<TIn, TOut, TSelf>, INameOfMessageHandler
{
/// <summary>
/// Process the message.
/// </summary>
/// <param name="message">The message sent from <see cref="Messenger.SendJsonMessageAsync{TIn,TOut}" />.</param>
/// <param name="sender">Who sent the message.</param>
/// <param name="context">The context of the underlying network request.</param>
/// <param name="token">CancellationToken.</param>
/// <returns>Instance of <typeparamref name="TOut"/>.</returns>
public abstract Task<TOut> OnMessage(TIn message, ISubscriber sender, object? context, CancellationToken token);
/// <inheritdoc />
/// <returns>True for two-way messages matching the <see cref="INameOfMessageHandler.Name"/>.</returns>
bool IInputChannel.IsSupported(string messageName, bool oneWay) => !oneWay && TSelf.Name == messageName;
/// <inheritdoc />
async Task<IMessage> IInputChannel.ReceiveMessage(ISubscriber sender, IMessage message, object? context, CancellationToken token)
{
var inValue = TIn.TypeInfo is { } ty
? await JsonMessage<TIn>.FromJsonAsync(message, ty, TIn.Allocator, token).ConfigureAwait(false) ?? throw new("Invalid payload")
: await JsonMessage<TIn>.FromJsonAsync(message, TIn.Options, TIn.Allocator, token).ConfigureAwait(false) ?? throw new("Invalid payload");
var jsonMessageSerializable = await OnMessage(inValue, sender, context, token).ConfigureAwait(false);
return new JsonMessage<TOut>("Does it matter?!?", jsonMessageSerializable)
{ Options = TOut.Options, TypeInfo = TOut.TypeInfo };
}
/// <inheritdoc />
Task IInputChannel.ReceiveSignal(ISubscriber sender, IMessage signal, object? context, CancellationToken token) => throw new UnreachableException();
/// <summary>
/// Make a call to the Remote service on <paramref name="leader"/>.
/// </summary>
/// <param name="leader">Machine to call.</param>
/// <param name="message">Message to send.</param>
/// <param name="token">CancellationToken.</param>
/// <returns>Object from remote service.</returns>
public static Task<TOut?> RemoteCallAsync(ISubscriber leader, TIn message, CancellationToken token = default)
{
ValueTask<TOut?> ResponseReader(IMessage x, CancellationToken y) => TOut.TypeInfo is not null
? JsonMessage<TOut>.FromJsonAsync(x, TOut.TypeInfo, TOut.Allocator, token)
: JsonMessage<TOut>.FromJsonAsync(x, TOut.Options, TOut.Allocator, token);
return leader.SendMessageAsync(new JsonMessage<TIn>(TSelf.Name, message) { Options = TIn.Options, TypeInfo = TIn.TypeInfo }, ResponseReader, token);
}
}
/// <summary>
/// Helper base class to implement a one way message handler that gets called via <see cref="JsonMessage{TIn}"/>.
/// </summary>
/// <example>
/// <code>
/// public class ExampleBroadcastHandler : JsonMessageHandler<ExampleDto, ExampleBroadcastHandler>, INameOfMessageHandler
/// {
/// private readonly ILogger<ExampleBroadcastHandler> _logger;
/// public ExampleBroadcastHandler(ILogger<ExampleBroadcastHandler> logger) => _logger = logger;
///
/// public override Task OnMessage(ExampleDto message, ISubscriber sender, object? context, CancellationToken token)
/// {
/// _logger.LogInformation($"Got Broadcast {message.MyCustomValue}");
/// return Task.CompletedTask;
/// }
///
/// public static string Name => nameof(ExampleBroadcastHandler);
/// }
/// </code>
/// and in Program.cs: <code>services.AddSingleton<IInputChannel, ExampleBroadcastHandler>();</code>
/// </example>
/// <typeparam name="TIn">The instance type the message handler accepts.</typeparam>
/// <typeparam name="TSelf">Implementing class that also implements <see cref="INameOfMessageHandler"/>.</typeparam>
public abstract class JsonMessageHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicProperties | DynamicallyAccessedMemberTypes.PublicFields | DynamicallyAccessedMemberTypes.Interfaces | DynamicallyAccessedMemberTypes.PublicConstructors)]TIn, TSelf> : IInputChannel
where TIn : IJsonMessageSerializable<TIn>
where TSelf : JsonMessageHandler<TIn, TSelf>, INameOfMessageHandler
{
/// <summary>
/// Process the message.
/// </summary>
/// <param name="message">The message sent from <see cref="RemoteCallAsync" /> or <see cref="BroadcastAsync"/>.</param>
/// <param name="sender">Who sent the message.</param>
/// <param name="context">The context of the underlying network request.</param>
/// <param name="token">CancellationToken.</param>
/// <returns>Task.</returns>
public abstract Task OnMessage(TIn message, ISubscriber sender, object? context, CancellationToken token);
/// <inheritdoc />
/// <returns>True for one-way messages matching the <see cref="INameOfMessageHandler.Name"/>.</returns>
bool IInputChannel.IsSupported(string messageName, bool oneWay) => oneWay && TSelf.Name == messageName;
/// <inheritdoc />
Task<IMessage> IInputChannel.ReceiveMessage(ISubscriber sender, IMessage message, object? context, CancellationToken token) => throw new UnreachableException();
/// <inheritdoc />
async Task IInputChannel.ReceiveSignal(ISubscriber sender, IMessage signal, object? context, CancellationToken token)
{
var inValue = TIn.TypeInfo is { } ty
? await JsonMessage<TIn>.FromJsonAsync(signal, ty, TIn.Allocator, token).ConfigureAwait(false) ?? throw new("Invalid payload")
: await JsonMessage<TIn>.FromJsonAsync(signal, TIn.Options, TIn.Allocator, token).ConfigureAwait(false) ?? throw new("Invalid payload");
await OnMessage(inValue, sender, context, token).ConfigureAwait(false);
}
/// <summary>
/// Make a call to the Remote service on <paramref name="leader"/>.
/// </summary>
/// <param name="leader">Machine to call.</param>
/// <param name="message">Message to send.</param>
/// <param name="requiresConfirmation"><see langword="true"/> to wait for confirmation of delivery from receiver; otherwise, <see langword="false"/>.</param>
/// <param name="token">CancellationToken.</param>
/// <returns>Task.</returns>
public static Task RemoteCallAsync(ISubscriber leader, TIn message, bool requiresConfirmation = true, CancellationToken token = default) =>
leader.SendSignalAsync(new JsonMessage<TIn>(TSelf.Name, message) { Options = TIn.Options, TypeInfo = TIn.TypeInfo }, requiresConfirmation, token);
/// <summary>
/// Send message to all services in cluster except current machine.
/// </summary>
/// <param name="cluster">Machine to call.</param>
/// <param name="message">Message to send.</param>
/// <param name="requiresConfirmation"><see langword="true"/> to wait for confirmation of delivery from receiver; otherwise, <see langword="false"/>.</param>
/// <returns>Task.</returns>
public static Task BroadcastAsync(IMessageBus cluster, TIn message, bool requiresConfirmation = true) =>
cluster.SendBroadcastSignalAsync(new JsonMessage<TIn>(TSelf.Name, message) { Options = TIn.Options, TypeInfo = TIn.TypeInfo }, requiresConfirmation);
}
/// <summary>
/// Implement in DTO classes to inform other classes how to handle serialization of the DTO to json.
/// </summary>
/// <example>
/// <code>
/// public partial class ExampleDto : IJsonMessageSerializable<ExampleDto>
/// {
/// public string MyCustomValue { get; set; }
///
/// public ExampleDto(string myCustomValue) => MyCustomValue = myCustomValue;
///
/// public static JsonSerializerOptions? Options => null;
/// public static JsonTypeInfo<ExampleDto>? TypeInfo => MyJsonContext.Default.ExampleDto;
/// public static MemoryAllocator<byte>? Allocator => null;
///
/// [JsonSerializable(typeof(ExampleDto))] private partial class MyJsonContext : JsonSerializerContext {}
/// }
/// </code>
/// </example>
/// <typeparam name="T">The DTO class itself.</typeparam>
public interface IJsonMessageSerializable<T>
where T : IJsonMessageSerializable<T>
{
/// <summary>
/// Optional set the serializer. Mutually exclusive to <see cref="TypeInfo"/>.
/// </summary>
static abstract JsonSerializerOptions? Options { get; }
/// <summary>
/// Optional set the serializer. Mutually exclusive to <see cref="Options"/>.
/// </summary>
static abstract JsonTypeInfo<T>? TypeInfo { get; }
/// <summary>
/// Optional set the MemoryAllocator.
/// </summary>
static abstract MemoryAllocator<byte>? Allocator { get; }
}
/// <summary>
/// Implement in class that inherits from <see cref="JsonMessageHandler{TIn, TSelf}"/> or <see cref="JsonMessageHandler{TIn, TOut, TSelf}"/>.
/// </summary>
public interface INameOfMessageHandler
{
/// <summary>
/// Name that message handler handles.
/// </summary>
static abstract string Name { get; }
}