diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 9cecc699200..cf7622e1269 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -1,6 +1,6 @@
name: MassTransit
env:
- MASSTRANSIT_VERSION: 8.2.2
+ MASSTRANSIT_VERSION: 8.2.3
on:
push:
paths:
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 8a92afcb6ca..8a6459df940 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -8,7 +8,7 @@
-
+
@@ -26,16 +26,16 @@
-
+
-
+
-
-
+
+
@@ -73,14 +73,14 @@
-
-
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
+
diff --git a/MassTransit.sln.DotSettings b/MassTransit.sln.DotSettings
index 7b784e69f06..00bee3f9ac2 100644
--- a/MassTransit.sln.DotSettings
+++ b/MassTransit.sln.DotSettings
@@ -38,6 +38,7 @@
OUTDENT
USUAL_INDENT
OUTDENT
+ INDENT
1
False
False
diff --git a/doc/content/2.quick-starts/0.index.md b/doc/content/2.quick-starts/0.index.md
index 34c6ebe647c..d094ce73cf8 100755
--- a/doc/content/2.quick-starts/0.index.md
+++ b/doc/content/2.quick-starts/0.index.md
@@ -37,5 +37,12 @@ Transports
#description
Requires an AWS account
::
+
+ ::card
+ #title
+ [PostgreSQL](/quick-starts/postgresql)
+ #description
+ For smaller setups
+ ::
::
diff --git a/doc/content/2.quick-starts/2.rabbitmq.md b/doc/content/2.quick-starts/2.rabbitmq.md
index b6b135c0985..2171c4041a3 100644
--- a/doc/content/2.quick-starts/2.rabbitmq.md
+++ b/doc/content/2.quick-starts/2.rabbitmq.md
@@ -16,7 +16,10 @@ This tutorial will get you from zero to up and running with [RabbitMQ](/document
The following instructions assume you are starting from a completed [In-Memory Quick Start](/quick-starts/in-memory)
::
-This example requires a functioning installation of the .NET Runtime and SDK (at least 6.0) and a functioning installation of _Docker_ with _Docker Compose_ support enabled.
+This example requires the following:
+
+- a functioning installation of the .NET Runtime and SDK (at least 6.0)
+- a functioning installation of _Docker_ with _Docker Compose_ support enabled.
## Run RabbitMQ
diff --git a/doc/content/2.quick-starts/5.postgresql.md b/doc/content/2.quick-starts/5.postgresql.md
new file mode 100644
index 00000000000..9b597a2811b
--- /dev/null
+++ b/doc/content/2.quick-starts/5.postgresql.md
@@ -0,0 +1,125 @@
+---
+navigation.title: PostgreSQL
+---
+
+# PostgreSQL Quick Start
+
+> This tutorial will get you from zero to up and running with [SQL](/documentation/transports/sql) and MassTransit.
+
+> Walkthrough Video TBD
+
+- The source for this sample is available [on GitHub](https://github.com/MassTransit/Sample-GettingStarted).
+
+## Prerequisites
+
+::alert{type="info"}
+The following instructions assume you are starting from a completed [In-Memory Quick Start](/quick-starts/in-memory)
+::
+
+This example requires the following:
+
+- a functioning installation of the dotnet runtime and sdk (at least 6.0)
+- a functioning installation of _Docker_ with _Docker Compose_ support enabled.
+
+## Run PostgreSQL
+
+For this quick start, we recommend running the preconfigured [official Docker image of Postgres](https://hub.docker.com/_/postgres).
+
+```bash
+$ docker run -p 5432:5432 postgres
+```
+
+If you are running on an ARM platform
+
+```bash
+$ docker run --platform linux/arm64 -p 5432:5432 postgres
+```
+
+Once its up and running you can use your preferred tool to browse into the database.
+
+## Configure PostgreSQL
+
+Add the _MassTransit.SqlTransport.PostgreSQL_ package to the project.
+
+```bash
+$ dotnet add package MassTransit.SqlTransport.PostgreSQL
+```
+
+### Edit Program.cs
+
+Change _UsingInMemory_ to _UsingPostgres_ as shown below.
+
+```csharp
+public static IHostBuilder CreateHostBuilder(string[] args) =>
+ Host.CreateDefaultBuilder(args)
+ .ConfigureServices((hostContext, services) =>
+ {
+ services.AddOptions().Configure(options =>
+ {
+ options.Host = "localhost";
+ options.Database = "sample";
+ options.Schema = "transport";
+ options.Role = "transport";
+ options.Username = "masstransit";
+ options.Password = "H4rd2Gu3ss!";
+
+ // credentials to run migrations
+ options.AdminUsername = "migration-user";
+ options.AdminPassword = "H4rderTooGu3ss!!;
+ });
+ // MassTransit will run the migrations on start up
+ services.AddPostgresMigrationHostedService();
+ services.AddMassTransit(x =>
+ {
+ // elided...
+
+ x.UsingPostgres((context,cfg) =>
+ {
+ cfg.ConfigureEndpoints(context);
+ });
+ });
+
+ services.AddHostedService();
+ });
+```
+
+| Setting | Description |
+|-----------------|-------------------------------------------------------------------------------------------|
+| `Host` | The host to connect to. We are using `localhost` to connect to the docker container |
+| `Port` | We are using the default `5432`, so we aren't setting it. |
+| `Database` | The name of the database to connect to |
+| `Schema` | The schema to place the tables and functions inside of |
+| `Role` | the role to assign for all created tables, functions, etc. |
+| `Username` | The username of the user to login as for normal operations |
+| `Password` | The password of the user to login as for normal operations |
+| `AdminUsername` | The username of the admin user to login as when running migration commands |
+| `AdminPassword` | The password of the admin user to login as when running migration commands |
+
+
+## Run the Project
+
+```bash
+$ dotnet run
+```
+
+The output should have changed to show the message consumer generating the output (again, press Control+C to exit). Notice that the bus address now starts with _db_.
+
+```
+Building...
+info: MassTransit[0]
+ Configured endpoint Message, Consumer: GettingStarted.MessageConsumer
+info: Microsoft.Hosting.Lifetime[0]
+ Application started. Press Ctrl+C to shut down.
+info: Microsoft.Hosting.Lifetime[0]
+ Hosting environment: Development
+info: Microsoft.Hosting.Lifetime[0]
+ Content root path: /Users/chris/Garbage/start/GettingStarted
+info: MassTransit[0]
+ Bus started: db://localhost/
+info: GettingStarted.MessageConsumer[0]
+ Received Text: The time is 3/24/2021 12:11:10 PM -05:00
+```
+
+At this point the service is connecting to PostgreSQL on _localhost_ and publishing messages which are received by the consumer.
+
+:tada:
diff --git a/doc/content/3.documentation/2.configuration/0.index.md b/doc/content/3.documentation/2.configuration/0.index.md
index f56f24a4a4e..5f6051f6899 100755
--- a/doc/content/3.documentation/2.configuration/0.index.md
+++ b/doc/content/3.documentation/2.configuration/0.index.md
@@ -173,6 +173,47 @@ x.AddConfigureEndpointsCallback((name, cfg) =>
});
```
+## Endpoint Strategies
+
+Deciding how to configure receive endpoints in your application can be easy or hard, depending upon how much energy you want to spend being concerned with things that usually don't matter. However, there are nuances to the following approaches that should be considered.
+
+### One Consumer for Each Queue
+
+Creates a queue for each registered consumer, saga, and routing slip activity. Separate queues are created for execute and compensate if compensation is supported by the activity.
+
+::alert{type="info"}
+This is the preferred approach since it ensures that every consumer can be configured independently, including retries, delivery, and the outbox. It also ensures that messages for a consumer are not stuck behind other messages for other consumers sharing the same queue.
+::
+
+### Multiple Consumers on a Single Queue
+
+Configuring multiple consumers, while fully supported by MassTransit, may make sense in certain circumstances, however, proceed with caution as there are limitations to this approach.
+
+The recommendation here is to configure multiple consumers on a single queue only when those consumers are closely related in terms of business function and each consumer consumes distinct message types. An example might be consumers that each create, update, or delete an entity when the dependencies of those operations are different – create and update may depend upon a validation component, while delete may not share that dependency.
+
+#### Consume Multiple Message Types
+
+In situations where it is preferable to consume multiple message types from a single queue, create a consumer that consumes multiple message types by adding more IConsumer interface implementations to the consumer class.
+
+```csharp
+public class AddressConsumer :
+ IConsumer,
+ IConsumer
+{
+}
+```
+
+Sagas follow this approach, creating a single queue for each saga and configuring the broker to route message types consumed by the saga that are published to topic/exchanges to the saga’s queue.
+
+### All Consumers on a Single Queue
+
+This is never a good idea and is highly discouraged. While it is supported by MassTransit, it’s unlikely to be operationally sustainable.
+
+Routing slip activities must not be configured on a single queue as they will not work properly.
+
+
+
+
## Endpoint Name Formatters
_ConfigureEndpoints_ uses an `IEndpointNameFormatter` to format the queue names for all supported consumer types. The default endpoint name formatter returns _PascalCase_ class names without the namespace. There are several built-in endpoint name formatters included. For the _SubmitOrderConsumer_, the receive endpoint names would be formatted as shown below. Note that class suffixes such as _Consumer_, _Saga_, and _Activity_ are trimmed from the endpoint name by default.
diff --git a/src/MassTransit.Abstractions/Configuration/Middleware/IReceiveEndpointConfigurator.cs b/src/MassTransit.Abstractions/Configuration/Middleware/IReceiveEndpointConfigurator.cs
index 4b2ece69610..b4231ba427c 100644
--- a/src/MassTransit.Abstractions/Configuration/Middleware/IReceiveEndpointConfigurator.cs
+++ b/src/MassTransit.Abstractions/Configuration/Middleware/IReceiveEndpointConfigurator.cs
@@ -60,6 +60,12 @@ public interface IReceiveEndpointConfigurator :
void ConfigureMessageTopology(bool enabled = true)
where T : class;
+ ///
+ /// Configures whether the broker topology is configured for the specified message type. Related to
+ /// , but for an individual message type.
+ ///
+ void ConfigureMessageTopology(Type messageType, bool enabled = true);
+
[EditorBrowsable(EditorBrowsableState.Never)]
void AddEndpointSpecification(IReceiveEndpointSpecification configurator);
diff --git a/src/MassTransit.Abstractions/Configuration/Topology/IConsumeTopologyConfigurator.cs b/src/MassTransit.Abstractions/Configuration/Topology/IConsumeTopologyConfigurator.cs
index 5149dc1fd80..4ff33747245 100644
--- a/src/MassTransit.Abstractions/Configuration/Topology/IConsumeTopologyConfigurator.cs
+++ b/src/MassTransit.Abstractions/Configuration/Topology/IConsumeTopologyConfigurator.cs
@@ -1,5 +1,6 @@
namespace MassTransit
{
+ using System;
using Configuration;
@@ -15,19 +16,17 @@ public interface IConsumeTopologyConfigurator :
new IMessageConsumeTopologyConfigurator GetMessageTopology()
where T : class;
+ ///
+ /// Returns the specification for the message type
+ ///
+ ///
+ IMessageConsumeTopologyConfigurator GetMessageTopology(Type messageType);
+
///
/// Adds a convention to the topology, which will be applied to every message type
/// requested, to determine if a convention for the message type is available.
///
/// The Consume topology convention
bool TryAddConvention(IConsumeTopologyConvention convention);
-
- ///
- /// Add a Consume topology for a specific message type
- ///
- /// The message type
- /// The topology
- void AddMessageConsumeTopology(IMessageConsumeTopology topology)
- where T : class;
}
}
diff --git a/src/MassTransit.Abstractions/Configuration/Topology/IMessageConsumeTopologyConfigurator.cs b/src/MassTransit.Abstractions/Configuration/Topology/IMessageConsumeTopologyConfigurator.cs
index 15a4841f5f2..71b3180f072 100644
--- a/src/MassTransit.Abstractions/Configuration/Topology/IMessageConsumeTopologyConfigurator.cs
+++ b/src/MassTransit.Abstractions/Configuration/Topology/IMessageConsumeTopologyConfigurator.cs
@@ -14,12 +14,6 @@ public interface IMessageConsumeTopologyConfigurator :
IMessageConsumeTopology
where TMessage : class
{
- ///
- /// Specify whether the broker topology should be configured for this message type
- /// (defaults to true)
- ///
- bool ConfigureConsumeTopology { get; set; }
-
void Add(IMessageConsumeTopology consumeTopology);
///
@@ -60,6 +54,12 @@ void AddOrUpdateConvention(Func add, Func
+ /// Specify whether the broker topology should be configured for this message type
+ /// (defaults to true)
+ ///
+ bool ConfigureConsumeTopology { get; set; }
+
bool TryAddConvention(IConsumeTopologyConvention convention);
}
}
diff --git a/src/MassTransit.Abstractions/Configuration/Topology/IMessagePublishTopologyConfigurator.cs b/src/MassTransit.Abstractions/Configuration/Topology/IMessagePublishTopologyConfigurator.cs
index 0e399b0f39c..fd2a3bac811 100644
--- a/src/MassTransit.Abstractions/Configuration/Topology/IMessagePublishTopologyConfigurator.cs
+++ b/src/MassTransit.Abstractions/Configuration/Topology/IMessagePublishTopologyConfigurator.cs
@@ -50,5 +50,7 @@ public interface IMessagePublishTopologyConfigurator :
/// Exclude the message type from being created as a topic/exchange.
///
new bool Exclude { set; }
+
+ bool TryAddConvention(IPublishTopologyConvention convention);
}
}
diff --git a/src/MassTransit.Abstractions/Configuration/Topology/IMessageSendTopologyConfigurator.cs b/src/MassTransit.Abstractions/Configuration/Topology/IMessageSendTopologyConfigurator.cs
index cb61850edfc..fb14d58c691 100644
--- a/src/MassTransit.Abstractions/Configuration/Topology/IMessageSendTopologyConfigurator.cs
+++ b/src/MassTransit.Abstractions/Configuration/Topology/IMessageSendTopologyConfigurator.cs
@@ -64,5 +64,6 @@ bool TryGetConvention([NotNullWhen(true)] out TConvention? conventi
public interface IMessageSendTopologyConfigurator :
ISpecification
{
+ bool TryAddConvention(ISendTopologyConvention convention);
}
}
diff --git a/src/MassTransit.Abstractions/MediatorRequestExtensions.cs b/src/MassTransit.Abstractions/MediatorRequestExtensions.cs
index 5656ae01e82..8d9702c712d 100644
--- a/src/MassTransit.Abstractions/MediatorRequestExtensions.cs
+++ b/src/MassTransit.Abstractions/MediatorRequestExtensions.cs
@@ -14,14 +14,16 @@ public static class MediatorRequestExtensions
///
/// The request message
///
+ ///
/// The response type
/// The response object
- public static async Task SendRequest(this IMediator mediator, Request request, CancellationToken cancellationToken = default)
+ public static async Task SendRequest(this IMediator mediator, Request request, CancellationToken cancellationToken = default,
+ RequestTimeout timeout = default)
where T : class
{
try
{
- using RequestHandle> handle = mediator.CreateRequest(request, cancellationToken);
+ using RequestHandle> handle = mediator.CreateRequest(request, cancellationToken, timeout);
Response response = await handle.GetResponse().ConfigureAwait(false);
diff --git a/src/MassTransit.Abstractions/Scheduling/IMessageScheduler.cs b/src/MassTransit.Abstractions/Scheduling/IMessageScheduler.cs
index 066f0e3a3e8..455ed848a9a 100644
--- a/src/MassTransit.Abstractions/Scheduling/IMessageScheduler.cs
+++ b/src/MassTransit.Abstractions/Scheduling/IMessageScheduler.cs
@@ -151,7 +151,8 @@ Task> ScheduleSend(Uri destinationAddress, DateTime sched
///
/// The destination address of the scheduled message
/// The tokenId of the scheduled message
- Task CancelScheduledSend(Uri destinationAddress, Guid tokenId);
+ ///
+ Task CancelScheduledSend(Uri destinationAddress, Guid tokenId, CancellationToken cancellationToken = default);
///
/// Send a message
@@ -278,7 +279,8 @@ Task> SchedulePublish(DateTime scheduledTime, object valu
/// the destinationAddress.
///
/// The tokenId of the scheduled message
- Task CancelScheduledPublish(Guid tokenId)
+ ///
+ Task CancelScheduledPublish(Guid tokenId, CancellationToken cancellationToken = default)
where T : class;
///
@@ -287,6 +289,7 @@ Task CancelScheduledPublish(Guid tokenId)
///
///
/// The tokenId of the scheduled message
- Task CancelScheduledPublish(Type messageType, Guid tokenId);
+ ///
+ Task CancelScheduledPublish(Type messageType, Guid tokenId, CancellationToken cancellationToken = default);
}
}
diff --git a/src/MassTransit.Abstractions/Scheduling/IScheduleMessageProvider.cs b/src/MassTransit.Abstractions/Scheduling/IScheduleMessageProvider.cs
index 78e354256b3..f1875f6b35b 100644
--- a/src/MassTransit.Abstractions/Scheduling/IScheduleMessageProvider.cs
+++ b/src/MassTransit.Abstractions/Scheduling/IScheduleMessageProvider.cs
@@ -25,13 +25,15 @@ Task> ScheduleSend(Uri destinationAddress, DateTime sched
/// Cancel a scheduled message by TokenId
///
/// The tokenId of the scheduled message
- Task CancelScheduledSend(Guid tokenId);
+ ///
+ Task CancelScheduledSend(Guid tokenId, CancellationToken cancellationToken);
///
/// Cancel a scheduled message by TokenId
///
/// The destination address of the scheduled message
/// The tokenId of the scheduled message
- Task CancelScheduledSend(Uri destinationAddress, Guid tokenId);
+ ///
+ Task CancelScheduledSend(Uri destinationAddress, Guid tokenId, CancellationToken cancellationToken);
}
}
diff --git a/src/MassTransit.Abstractions/Topology/Topology/MessagePublishTopology.cs b/src/MassTransit.Abstractions/Topology/Topology/MessagePublishTopology.cs
index 68371f5b37d..6ec60a14e6e 100644
--- a/src/MassTransit.Abstractions/Topology/Topology/MessagePublishTopology.cs
+++ b/src/MassTransit.Abstractions/Topology/Topology/MessagePublishTopology.cs
@@ -78,6 +78,12 @@ public bool TryAddConvention(IMessagePublishTopologyConvention convent
return true;
}
+ public bool TryAddConvention(IPublishTopologyConvention convention)
+ {
+ return convention.TryGetMessagePublishTopologyConvention(out IMessagePublishTopologyConvention messagePublishTopologyConvention)
+ && TryAddConvention(messagePublishTopologyConvention);
+ }
+
public void AddOrUpdateConvention(Func add, Func update)
where TConvention : class, IMessagePublishTopologyConvention
{
diff --git a/src/MassTransit.Abstractions/Topology/Topology/MessageSendTopology.cs b/src/MassTransit.Abstractions/Topology/Topology/MessageSendTopology.cs
index d79498d62d8..531ef98fb98 100644
--- a/src/MassTransit.Abstractions/Topology/Topology/MessageSendTopology.cs
+++ b/src/MassTransit.Abstractions/Topology/Topology/MessageSendTopology.cs
@@ -77,6 +77,12 @@ public bool TryAddConvention(IMessageSendTopologyConvention convention
return true;
}
+ public bool TryAddConvention(ISendTopologyConvention convention)
+ {
+ return convention.TryGetMessageSendTopologyConvention(out IMessageSendTopologyConvention messageSendTopologyConvention)
+ && TryAddConvention(messageSendTopologyConvention);
+ }
+
public void UpdateConvention(Func update)
where TConvention : class, IMessageSendTopologyConvention
{
diff --git a/src/MassTransit.Abstractions/Topology/Topology/PublishTopology.cs b/src/MassTransit.Abstractions/Topology/Topology/PublishTopology.cs
index 5adbf571e5c..5cf9df73e6d 100644
--- a/src/MassTransit.Abstractions/Topology/Topology/PublishTopology.cs
+++ b/src/MassTransit.Abstractions/Topology/Topology/PublishTopology.cs
@@ -14,19 +14,18 @@ public class PublishTopology :
{
readonly List _conventions;
readonly object _lock = new object();
- readonly ConcurrentDictionary _messageTypeFactoryCache;
readonly ConcurrentDictionary> _messageTypes;
+ readonly ConcurrentDictionary _messageTypeSelectorCache;
readonly PublishTopologyConfigurationObservable _observers;
public PublishTopology()
{
_messageTypes = new ConcurrentDictionary>();
-
- _observers = new PublishTopologyConfigurationObservable();
- _messageTypeFactoryCache = new ConcurrentDictionary();
+ _messageTypeSelectorCache = new ConcurrentDictionary();
_conventions = new List(8);
+ _observers = new PublishTopologyConfigurationObservable();
_observers.Connect(this);
}
@@ -47,8 +46,7 @@ IMessagePublishTopologyConfigurator IPublishTopologyConfigurator.GetMessageTo
public bool TryGetPublishAddress(Type messageType, Uri baseAddress, out Uri? publishAddress)
{
- return _messageTypes.GetOrAdd(messageType, type => new Lazy(() => CreateMessageType(type)))
- .Value.TryGetPublishAddress(baseAddress, out publishAddress);
+ return GetMessageTopology(messageType).TryGetPublishAddress(baseAddress, out publishAddress);
}
public ConnectHandle ConnectPublishTopologyConfigurationObserver(IPublishTopologyConfigurationObserver observer)
@@ -58,10 +56,10 @@ public ConnectHandle ConnectPublishTopologyConfigurationObserver(IPublishTopolog
public bool TryAddConvention(IPublishTopologyConvention convention)
{
+ var conventionType = convention.GetType();
+
lock (_lock)
{
- var conventionType = convention.GetType();
-
for (var i = 0; i < _conventions.Count; i++)
{
if (_conventions[i].GetType() == conventionType)
@@ -69,8 +67,12 @@ public bool TryAddConvention(IPublishTopologyConvention convention)
}
_conventions.Add(convention);
- return true;
}
+
+ foreach (Lazy messagePublishTopologyConfigurator in _messageTypes.Values)
+ messagePublishTopologyConfigurator.Value.TryAddConvention(convention);
+
+ return true;
}
void IPublishTopologyConfigurator.AddMessagePublishTopology(IMessagePublishTopology topology)
@@ -95,7 +97,8 @@ public IMessagePublishTopologyConfigurator GetMessageTopology(Type messageType)
if (MessageTypeCache.IsValidMessageType(messageType) == false)
throw new ArgumentException(MessageTypeCache.InvalidMessageTypeReason(messageType), nameof(messageType));
- return GetOrAddByMessageType(messageType).CreateMessageType();
+ return _messageTypeSelectorCache.GetOrAdd(messageType, _ => Activation.Activate(messageType, new MessageTypeSelectorFactory(), this))
+ .GetMessageTopology();
}
protected virtual IMessagePublishTopologyConfigurator CreateMessageTopology()
@@ -150,27 +153,6 @@ void ApplyConventionsToMessageTopology(IMessagePublishTopologyConfigurator
}
}
- IMessagePublishTopologyConfigurator CreateMessageType(Type messageType)
- {
- return GetOrAddByMessageType(messageType).CreateMessageType();
- }
-
- IMessageTypeFactory GetOrAddByMessageType(Type type)
- {
- return _messageTypeFactoryCache.GetOrAdd(type, _ => Activation.Activate(type, new Factory(), this));
- }
-
-
- readonly struct Factory :
- IActivationType
- {
- public IMessageTypeFactory ActivateType(PublishTopology publishTopology)
- where T : class
- {
- return new MessageTypeFactory(publishTopology);
- }
- }
-
class ImplementedMessageTypeConnector :
IImplementedMessageType
@@ -190,26 +172,37 @@ public void ImplementsMessageType(bool direct)
}
- interface IMessageTypeFactory
+ readonly struct MessageTypeSelectorFactory :
+ IActivationType
+ {
+ public IMessageTypeSelector ActivateType(PublishTopology consumeTopology)
+ where T : class
+ {
+ return new MessageTypeSelector(consumeTopology);
+ }
+ }
+
+
+ interface IMessageTypeSelector
{
- IMessagePublishTopologyConfigurator CreateMessageType();
+ IMessagePublishTopologyConfigurator GetMessageTopology();
}
- class MessageTypeFactory :
- IMessageTypeFactory
+ class MessageTypeSelector :
+ IMessageTypeSelector
where T : class
{
readonly PublishTopology _publishTopology;
- public MessageTypeFactory(PublishTopology publishTopology)
+ public MessageTypeSelector(PublishTopology publishTopology)
{
_publishTopology = publishTopology;
}
- public IMessagePublishTopologyConfigurator CreateMessageType()
+ public IMessagePublishTopologyConfigurator GetMessageTopology()
{
- return _publishTopology.CreateMessageTopology();
+ return _publishTopology.GetMessageTopology();
}
}
}
diff --git a/src/MassTransit.Abstractions/Topology/Topology/SendTopology.cs b/src/MassTransit.Abstractions/Topology/Topology/SendTopology.cs
index 27440e6519e..2a6bdfe4670 100644
--- a/src/MassTransit.Abstractions/Topology/Topology/SendTopology.cs
+++ b/src/MassTransit.Abstractions/Topology/Topology/SendTopology.cs
@@ -57,10 +57,10 @@ public ConnectHandle ConnectSendTopologyConfigurationObserver(ISendTopologyConfi
public bool TryAddConvention(ISendTopologyConvention convention)
{
+ var conventionType = convention.GetType();
+
lock (_lock)
{
- var conventionType = convention.GetType();
-
for (var i = 0; i < _conventions.Count; i++)
{
if (_conventions[i].GetType() == conventionType)
@@ -68,8 +68,12 @@ public bool TryAddConvention(ISendTopologyConvention convention)
}
_conventions.Add(convention);
- return true;
}
+
+ foreach (Lazy messageSendTopologyConfigurator in _messageTypes.Values)
+ messageSendTopologyConfigurator.Value.TryAddConvention(convention);
+
+ return true;
}
void ISendTopologyConfigurator.AddMessageSendTopology(IMessageSendTopology topology)
diff --git a/src/MassTransit.TestFramework/Futures/BatchCompleted.cs b/src/MassTransit.TestFramework/Futures/BatchCompleted.cs
new file mode 100644
index 00000000000..7f41a5b2129
--- /dev/null
+++ b/src/MassTransit.TestFramework/Futures/BatchCompleted.cs
@@ -0,0 +1,11 @@
+namespace MassTransit.TestFramework.Futures;
+
+using System;
+using System.Collections.Generic;
+
+
+public interface BatchCompleted
+{
+ public Guid CorrelationId { get; }
+ public IReadOnlyList ProcessedJobsNumbers { get; }
+}
diff --git a/src/MassTransit.TestFramework/Futures/BatchFaulted.cs b/src/MassTransit.TestFramework/Futures/BatchFaulted.cs
new file mode 100644
index 00000000000..7e7e2bbad8d
--- /dev/null
+++ b/src/MassTransit.TestFramework/Futures/BatchFaulted.cs
@@ -0,0 +1,11 @@
+namespace MassTransit.TestFramework.Futures;
+
+using System;
+using System.Collections.Generic;
+
+
+public interface BatchFaulted
+{
+ public Guid CorrelationId { get; }
+ public IReadOnlyList ProcessedJobsNumbers { get; }
+}
diff --git a/src/MassTransit.TestFramework/Futures/BatchFuture.cs b/src/MassTransit.TestFramework/Futures/BatchFuture.cs
new file mode 100644
index 00000000000..4e074bd0428
--- /dev/null
+++ b/src/MassTransit.TestFramework/Futures/BatchFuture.cs
@@ -0,0 +1,46 @@
+namespace MassTransit.TestFramework.Futures;
+
+using System.Collections.Generic;
+using System.Linq;
+
+
+public class BatchFuture :
+ Future
+{
+ public BatchFuture()
+ {
+ ConfigureCommand(x => x.CorrelateById(context => context.Message.CorrelationId));
+
+ SendRequests(x => x.JobNumbers,
+ x =>
+ {
+ x.UsingRequestInitializer(context => new
+ {
+ CorrelationId = InVar.Id,
+ JobNumber = context.Message
+ });
+ x.TrackPendingRequest(message => message.CorrelationId);
+ })
+ .OnResponseReceived(x =>
+ {
+ x.CompletePendingRequest(y => y.CorrelationId);
+ });
+
+ WhenAllCompleted(r => r.SetCompletedUsingInitializer(MapResponse));
+ WhenAllCompletedOrFaulted(r => r.SetFaultedUsingInitializer(MapResponse));
+ }
+
+ object MapResponse(BehaviorContext context)
+ {
+ var command = context.GetCommand();
+ List processedJobNumbers = context
+ .SelectResults()
+ .Select(r => r.JobNumber).ToList();
+
+ return new
+ {
+ command.CorrelationId,
+ ProcessedJobsNumbers = processedJobNumbers
+ };
+ }
+}
diff --git a/src/MassTransit.TestFramework/Futures/BatchRequest.cs b/src/MassTransit.TestFramework/Futures/BatchRequest.cs
new file mode 100644
index 00000000000..1b3be1a7c24
--- /dev/null
+++ b/src/MassTransit.TestFramework/Futures/BatchRequest.cs
@@ -0,0 +1,12 @@
+namespace MassTransit.TestFramework.Futures;
+
+using System;
+using System.Collections.Generic;
+
+
+public interface BatchRequest
+{
+ public DateTime? BatchExpiry { get; }
+ public Guid CorrelationId { get; }
+ public IReadOnlyList JobNumbers { get; }
+}
diff --git a/src/MassTransit.TestFramework/Futures/ProcessBatchItem.cs b/src/MassTransit.TestFramework/Futures/ProcessBatchItem.cs
new file mode 100644
index 00000000000..2e0d09c206d
--- /dev/null
+++ b/src/MassTransit.TestFramework/Futures/ProcessBatchItem.cs
@@ -0,0 +1,10 @@
+namespace MassTransit.TestFramework.Futures;
+
+using System;
+
+
+public interface ProcessBatchItem
+{
+ public Guid CorrelationId { get; }
+ public string JobNumber { get; }
+}
diff --git a/src/MassTransit.TestFramework/Futures/ProcessBatchItemCompleted.cs b/src/MassTransit.TestFramework/Futures/ProcessBatchItemCompleted.cs
new file mode 100644
index 00000000000..111d2774987
--- /dev/null
+++ b/src/MassTransit.TestFramework/Futures/ProcessBatchItemCompleted.cs
@@ -0,0 +1,10 @@
+namespace MassTransit.TestFramework.Futures;
+
+using System;
+
+
+public interface ProcessBatchItemCompleted
+{
+ public Guid CorrelationId { get; }
+ public string JobNumber { get; }
+}
diff --git a/src/MassTransit.TestFramework/Futures/ProcessBatchItemConsumer.cs b/src/MassTransit.TestFramework/Futures/ProcessBatchItemConsumer.cs
new file mode 100644
index 00000000000..9df157480fb
--- /dev/null
+++ b/src/MassTransit.TestFramework/Futures/ProcessBatchItemConsumer.cs
@@ -0,0 +1,29 @@
+namespace MassTransit.TestFramework.Futures;
+
+using System;
+using System.Threading.Tasks;
+
+
+public class ProcessBatchItemConsumer :
+ IConsumer
+{
+ public Task Consume(ConsumeContext context)
+ {
+ async Task WaitAndRespond(int milliSecond)
+ {
+ await Task.Delay(milliSecond);
+ await context.RespondAsync(new
+ {
+ context.Message.CorrelationId,
+ context.Message.JobNumber
+ });
+ }
+
+ return context.Message.JobNumber switch
+ {
+ "Delay" => WaitAndRespond(2000),
+ "Error" => throw new InvalidOperationException(),
+ _ => WaitAndRespond(0)
+ };
+ }
+}
diff --git a/src/MassTransit.TestFramework/Futures/Tests/BatchFuture_Specs.cs b/src/MassTransit.TestFramework/Futures/Tests/BatchFuture_Specs.cs
new file mode 100644
index 00000000000..8c1f6d374bb
--- /dev/null
+++ b/src/MassTransit.TestFramework/Futures/Tests/BatchFuture_Specs.cs
@@ -0,0 +1,20 @@
+namespace MassTransit.TestFramework.Futures.Tests;
+
+using NUnit.Framework;
+
+
+[TestFixture]
+public class BatchFuture_Specs :
+ FutureTestFixture
+{
+ public BatchFuture_Specs(IFutureTestFixtureConfigurator testFixtureConfigurator)
+ : base(testFixtureConfigurator)
+ {
+ }
+
+ protected override void ConfigureMassTransit(IBusRegistrationConfigurator configurator)
+ {
+ configurator.AddConsumer();
+ configurator.AddFuture();
+ }
+}
diff --git a/src/MassTransit.TestFramework/TestConsumeContext.cs b/src/MassTransit.TestFramework/TestConsumeContext.cs
index da058159341..06711ee6a50 100644
--- a/src/MassTransit.TestFramework/TestConsumeContext.cs
+++ b/src/MassTransit.TestFramework/TestConsumeContext.cs
@@ -20,7 +20,7 @@ public static class TestConsumeContext
static InMemoryReceiveEndpointContext Build()
{
- var topologyConfiguration = new InMemoryTopologyConfiguration(InMemoryBus.MessageTopology);
+ var topologyConfiguration = new InMemoryTopologyConfiguration(InMemoryBus.CreateMessageTopology());
IInMemoryBusConfiguration busConfiguration = new InMemoryBusConfiguration(topologyConfiguration, null);
var receiveEndpointConfiguration = busConfiguration.HostConfiguration.CreateReceiveEndpointConfiguration("input-queue");
diff --git a/src/MassTransit/Configuration/Configuration/ReceiveEndpointConfiguration.cs b/src/MassTransit/Configuration/Configuration/ReceiveEndpointConfiguration.cs
index a253815095d..37238237129 100644
--- a/src/MassTransit/Configuration/Configuration/ReceiveEndpointConfiguration.cs
+++ b/src/MassTransit/Configuration/Configuration/ReceiveEndpointConfiguration.cs
@@ -105,6 +105,11 @@ public void ConfigureMessageTopology(bool enabled = true)
Topology.Consume.GetMessageTopology().ConfigureConsumeTopology = enabled;
}
+ public void ConfigureMessageTopology(Type messageType, bool enabled = true)
+ {
+ Topology.Consume.GetMessageTopology(messageType).ConfigureConsumeTopology = enabled;
+ }
+
public void AddDependency(IReceiveEndpointDependency dependency)
{
_dependencies.Add(dependency);
diff --git a/src/MassTransit/Configuration/Configuration/ReceiverConfiguration.cs b/src/MassTransit/Configuration/Configuration/ReceiverConfiguration.cs
index b64c9426af9..9cc9a969fef 100644
--- a/src/MassTransit/Configuration/Configuration/ReceiverConfiguration.cs
+++ b/src/MassTransit/Configuration/Configuration/ReceiverConfiguration.cs
@@ -54,6 +54,10 @@ public void ConfigureMessageTopology(bool enabled = true)
{
}
+ public void ConfigureMessageTopology(Type messageType, bool enabled = true)
+ {
+ }
+
public void AddEndpointSpecification(IReceiveEndpointSpecification specification)
{
Specifications.Add(specification);
diff --git a/src/MassTransit/Configuration/DependencyInjection/TestHarnessOptions.cs b/src/MassTransit/Configuration/DependencyInjection/TestHarnessOptions.cs
index 81e8b65d6b4..916a837a25d 100644
--- a/src/MassTransit/Configuration/DependencyInjection/TestHarnessOptions.cs
+++ b/src/MassTransit/Configuration/DependencyInjection/TestHarnessOptions.cs
@@ -7,6 +7,6 @@ namespace MassTransit
public class TestHarnessOptions
{
public TimeSpan TestTimeout { get; set; } = Debugger.IsAttached ? TimeSpan.FromMinutes(50) : TimeSpan.FromSeconds(30);
- public TimeSpan TestInactivityTimeout { get; set; } = TimeSpan.FromSeconds(1.2);
+ public TimeSpan TestInactivityTimeout { get; set; } = Debugger.IsAttached ? TimeSpan.FromMinutes(30) : TimeSpan.FromSeconds(1.2);
}
}
diff --git a/src/MassTransit/Configuration/MediatorConfigurationExtensions.cs b/src/MassTransit/Configuration/MediatorConfigurationExtensions.cs
index 81531a7eac5..87afdc37dc3 100644
--- a/src/MassTransit/Configuration/MediatorConfigurationExtensions.cs
+++ b/src/MassTransit/Configuration/MediatorConfigurationExtensions.cs
@@ -21,7 +21,7 @@ public static IMediator CreateMediator(this IBusFactorySelector selector, Action
if (configure == null)
throw new ArgumentNullException(nameof(configure));
- var topologyConfiguration = new InMemoryTopologyConfiguration(InMemoryBus.MessageTopology);
+ var topologyConfiguration = new InMemoryTopologyConfiguration(InMemoryBus.CreateMessageTopology());
var busConfiguration = new InMemoryBusConfiguration(topologyConfiguration, new Uri("loopback://localhost"));
if (LogContext.Current != null)
diff --git a/src/Transports/MassTransit.Azure.ServiceBus.Core/Configuration/Topology/ServiceBusPartitionKeyConventionExtensions.cs b/src/MassTransit/Configuration/PartitionKeyConventionExtensions.cs
similarity index 85%
rename from src/Transports/MassTransit.Azure.ServiceBus.Core/Configuration/Topology/ServiceBusPartitionKeyConventionExtensions.cs
rename to src/MassTransit/Configuration/PartitionKeyConventionExtensions.cs
index 4e087189087..c18a7129360 100644
--- a/src/Transports/MassTransit.Azure.ServiceBus.Core/Configuration/Topology/ServiceBusPartitionKeyConventionExtensions.cs
+++ b/src/MassTransit/Configuration/PartitionKeyConventionExtensions.cs
@@ -1,22 +1,21 @@
-namespace MassTransit
+namespace MassTransit
{
using System;
- using AzureServiceBusTransport;
- using AzureServiceBusTransport.Configuration;
+ using Configuration;
+ using Transports;
- public static class ServiceBusPartitionKeyConventionExtensions
+ public static class PartitionKeyConventionExtensions
{
public static void UsePartitionKeyFormatter(this IMessageSendTopologyConfigurator configurator, IMessagePartitionKeyFormatter formatter)
where T : class
{
- configurator.UpdateConvention>(
- update =>
- {
- update.SetFormatter(formatter);
+ configurator.UpdateConvention>(update =>
+ {
+ update.SetFormatter(formatter);
- return update;
- });
+ return update;
+ });
}
///
diff --git a/src/MassTransit/Configuration/RoutingKeyConventionExtensions.cs b/src/MassTransit/Configuration/RoutingKeyConventionExtensions.cs
index 96cbbbee891..d5fcdc61bcc 100644
--- a/src/MassTransit/Configuration/RoutingKeyConventionExtensions.cs
+++ b/src/MassTransit/Configuration/RoutingKeyConventionExtensions.cs
@@ -10,13 +10,12 @@ public static class RoutingKeyConventionExtensions
public static void UseRoutingKeyFormatter(this IMessageSendTopologyConfigurator configurator, IMessageRoutingKeyFormatter formatter)
where T : class
{
- configurator.UpdateConvention>(
- update =>
- {
- update.SetFormatter(formatter);
+ configurator.UpdateConvention>(update =>
+ {
+ update.SetFormatter(formatter);
- return update;
- });
+ return update;
+ });
}
///
diff --git a/src/MassTransit/Contexts/Context/ConsumeMessageSchedulerContext.cs b/src/MassTransit/Contexts/Context/ConsumeMessageSchedulerContext.cs
index 7a05d3ac8ec..d9b7ff2b4fe 100644
--- a/src/MassTransit/Contexts/Context/ConsumeMessageSchedulerContext.cs
+++ b/src/MassTransit/Contexts/Context/ConsumeMessageSchedulerContext.cs
@@ -151,9 +151,9 @@ public Task> ScheduleSend(DateTime scheduledTime, object
return _scheduler.Value.ScheduleSend(_inputAddress, scheduledTime, values, pipe, cancellationToken);
}
- Task IMessageScheduler.CancelScheduledSend(Uri destinationAddress, Guid tokenId)
+ Task IMessageScheduler.CancelScheduledSend(Uri destinationAddress, Guid tokenId, CancellationToken cancellationToken)
{
- return _scheduler.Value.CancelScheduledSend(destinationAddress, tokenId);
+ return _scheduler.Value.CancelScheduledSend(destinationAddress, tokenId, cancellationToken);
}
public Task> SchedulePublish(DateTime scheduledTime, T message, CancellationToken cancellationToken)
@@ -218,15 +218,15 @@ public Task> SchedulePublish(DateTime scheduledTime, obje
return _scheduler.Value.SchedulePublish(scheduledTime, values, pipe, cancellationToken);
}
- public Task CancelScheduledPublish(Guid tokenId)
+ public Task CancelScheduledPublish(Guid tokenId, CancellationToken cancellationToken)
where T : class
{
- return _scheduler.Value.CancelScheduledPublish(tokenId);
+ return _scheduler.Value.CancelScheduledPublish(tokenId, cancellationToken);
}
- public Task CancelScheduledPublish(Type messageType, Guid tokenId)
+ public Task CancelScheduledPublish(Type messageType, Guid tokenId, CancellationToken cancellationToken)
{
- return _scheduler.Value.CancelScheduledPublish(messageType, tokenId);
+ return _scheduler.Value.CancelScheduledPublish(messageType, tokenId, cancellationToken);
}
}
}
diff --git a/src/MassTransit/Futures/Future.cs b/src/MassTransit/Futures/Future.cs
index b9b091b5bda..6928f301390 100644
--- a/src/MassTransit/Futures/Future.cs
+++ b/src/MassTransit/Futures/Future.cs
@@ -417,6 +417,18 @@ protected void WhenAnyFaulted(Action> configure
configure?.Invoke(configurator);
}
+ ///
+ /// When all requests have either completed or faulted, Set the future Faulted
+ ///
+ ///
+ protected void WhenAllCompletedOrFaulted(Action> configure)
+ {
+ _fault.WaitForPending = true;
+ var configurator = new FutureFaultConfigurator(_fault);
+
+ configure?.Invoke(configurator);
+ }
+
static Task GetResult(BehaviorContext context)
{
if (context.TryGetResult(context.Saga.CorrelationId, out TResult completed))
diff --git a/src/MassTransit/Futures/Futures/FutureFault.cs b/src/MassTransit/Futures/Futures/FutureFault.cs
index 52039513dfa..e28c71edfc8 100644
--- a/src/MassTransit/Futures/Futures/FutureFault.cs
+++ b/src/MassTransit/Futures/Futures/FutureFault.cs
@@ -26,6 +26,8 @@ public ContextMessageFactory, TFault> Facto
set => _factory = value;
}
+ public bool WaitForPending { get; set; }
+
public IEnumerable Validate()
{
yield break;
@@ -33,12 +35,15 @@ public IEnumerable Validate()
public async Task SetFaulted(BehaviorContext context)
{
- context.SetFaulted(context.Saga.CorrelationId);
+ if (!WaitForPending || !context.Saga.HasPending())
+ {
+ context.SetFaulted(context.Saga.CorrelationId);
- var fault = await context.SendMessageToSubscriptions(_factory,
- context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty());
+ var fault = await context.SendMessageToSubscriptions(_factory,
+ context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty());
- context.SetFault(context.Saga.CorrelationId, fault);
+ context.SetFault(context.Saga.CorrelationId, fault);
+ }
}
static Task> DefaultFactory(BehaviorContext context)
@@ -86,6 +91,8 @@ public ContextMessageFactory, TFault> Factory
set => _factory = value;
}
+ public bool WaitForPending { get; set; }
+
public IEnumerable Validate()
{
yield break;
@@ -93,12 +100,15 @@ public IEnumerable Validate()
public async Task SetFaulted(BehaviorContext context)
{
- context.SetFaulted(context.Saga.CorrelationId);
+ if (!WaitForPending || !context.Saga.HasPending())
+ {
+ context.SetFaulted(context.Saga.CorrelationId);
- var fault = await context.SendMessageToSubscriptions(_factory,
- context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty());
+ var fault = await context.SendMessageToSubscriptions(_factory,
+ context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty());
- context.SetFault(context.Saga.CorrelationId, fault);
+ context.SetFault(context.Saga.CorrelationId, fault);
+ }
}
static Task> DefaultFactory(BehaviorContext context)
diff --git a/src/MassTransit/InMemoryTransport/InMemoryBus.cs b/src/MassTransit/InMemoryTransport/InMemoryBus.cs
index ab82b01f6de..0df66b068bb 100644
--- a/src/MassTransit/InMemoryTransport/InMemoryBus.cs
+++ b/src/MassTransit/InMemoryTransport/InMemoryBus.cs
@@ -8,8 +8,6 @@ namespace MassTransit
public static class InMemoryBus
{
- public static IMessageTopologyConfigurator MessageTopology => Cached.MessageTopologyValue.Value;
-
///
/// Configure and create an in-memory bus
///
@@ -28,7 +26,7 @@ public static IBusControl Create(Action configu
///
public static IBusControl Create(Uri baseAddress, Action configure)
{
- var topologyConfiguration = new InMemoryTopologyConfiguration(MessageTopology);
+ var topologyConfiguration = new InMemoryTopologyConfiguration(CreateMessageTopology());
var busConfiguration = new InMemoryBusConfiguration(topologyConfiguration, baseAddress);
var configurator = new InMemoryBusFactoryConfigurator(busConfiguration);
@@ -38,17 +36,19 @@ public static IBusControl Create(Uri baseAddress, Action MessageTopologyValue =
- new Lazy(() => new MessageTopology(_entityNameFormatter));
-
- static readonly IEntityNameFormatter _entityNameFormatter;
+ internal static readonly IEntityNameFormatter EntityNameFormatter;
static Cached()
{
- _entityNameFormatter = new MessageUrnEntityNameFormatter();
+ EntityNameFormatter = new MessageUrnEntityNameFormatter();
}
}
}
diff --git a/src/MassTransit/InMemoryTransport/InMemoryTransport/Configuration/InMemoryRegistrationBusFactory.cs b/src/MassTransit/InMemoryTransport/InMemoryTransport/Configuration/InMemoryRegistrationBusFactory.cs
index 64d9a2eac15..7ab49bc66ac 100644
--- a/src/MassTransit/InMemoryTransport/InMemoryTransport/Configuration/InMemoryRegistrationBusFactory.cs
+++ b/src/MassTransit/InMemoryTransport/InMemoryTransport/Configuration/InMemoryRegistrationBusFactory.cs
@@ -13,7 +13,7 @@ public class InMemoryRegistrationBusFactory :
readonly Action _configure;
public InMemoryRegistrationBusFactory(Uri baseAddress, Action configure)
- : this(new InMemoryBusConfiguration(new InMemoryTopologyConfiguration(InMemoryBus.MessageTopology), baseAddress), configure)
+ : this(new InMemoryBusConfiguration(new InMemoryTopologyConfiguration(InMemoryBus.CreateMessageTopology()), baseAddress), configure)
{
}
diff --git a/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryConsumeTopology.cs b/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryConsumeTopology.cs
index e4b1b2d7f16..c919b41b414 100644
--- a/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryConsumeTopology.cs
+++ b/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryConsumeTopology.cs
@@ -63,7 +63,7 @@ public override IEnumerable Validate()
return base.Validate().Concat(_specifications.SelectMany(x => x.Validate()));
}
- protected override IMessageConsumeTopologyConfigurator CreateMessageTopology(Type type)
+ protected override IMessageConsumeTopologyConfigurator CreateMessageTopology()
{
var topology = new InMemoryMessageConsumeTopology(_messageTopology.GetMessageTopology(), _publishTopology);
diff --git a/src/MassTransit/JsonMessageBody.cs b/src/MassTransit/JsonMessageBody.cs
new file mode 100644
index 00000000000..a661685e9bd
--- /dev/null
+++ b/src/MassTransit/JsonMessageBody.cs
@@ -0,0 +1,12 @@
+namespace MassTransit;
+
+using System.Text.Json;
+
+
+///
+/// If the incoming message is in a JSON format, use this to unwrap the JSON document from any transport-specific encapsulation
+///
+public interface JsonMessageBody
+{
+ JsonElement? GetJsonElement(JsonSerializerOptions options);
+}
diff --git a/src/MassTransit/Middleware/InMemoryOutbox/InMemoryOutboxMessageSchedulerContext.cs b/src/MassTransit/Middleware/InMemoryOutbox/InMemoryOutboxMessageSchedulerContext.cs
index 4fd93a8c289..ce0fd13a547 100644
--- a/src/MassTransit/Middleware/InMemoryOutbox/InMemoryOutboxMessageSchedulerContext.cs
+++ b/src/MassTransit/Middleware/InMemoryOutbox/InMemoryOutboxMessageSchedulerContext.cs
@@ -365,20 +365,20 @@ public async Task> SchedulePublish(DateTime scheduledTime
return scheduledMessage;
}
- public Task CancelScheduledPublish(Guid tokenId)
+ public Task CancelScheduledPublish(Guid tokenId, CancellationToken cancellationToken)
where T : class
{
- return AddCancelMessage(() => _scheduler.Value.CancelScheduledPublish(tokenId));
+ return AddCancelMessage(() => _scheduler.Value.CancelScheduledPublish(tokenId, cancellationToken));
}
- public Task CancelScheduledPublish(Type messageType, Guid tokenId)
+ public Task CancelScheduledPublish(Type messageType, Guid tokenId, CancellationToken cancellationToken)
{
- return AddCancelMessage(() => _scheduler.Value.CancelScheduledPublish(messageType, tokenId));
+ return AddCancelMessage(() => _scheduler.Value.CancelScheduledPublish(messageType, tokenId, cancellationToken));
}
- public Task CancelScheduledSend(Uri destinationAddress, Guid tokenId)
+ public Task CancelScheduledSend(Uri destinationAddress, Guid tokenId, CancellationToken cancellationToken)
{
- return AddCancelMessage(() => _scheduler.Value.CancelScheduledSend(destinationAddress, tokenId));
+ return AddCancelMessage(() => _scheduler.Value.CancelScheduledSend(destinationAddress, tokenId, cancellationToken));
}
void AddScheduledMessage(ScheduledMessage scheduledMessage)
diff --git a/src/MassTransit/Middleware/SetPartitionKeyFilter.cs b/src/MassTransit/Middleware/SetPartitionKeyFilter.cs
new file mode 100644
index 00000000000..4fc485c397b
--- /dev/null
+++ b/src/MassTransit/Middleware/SetPartitionKeyFilter.cs
@@ -0,0 +1,33 @@
+namespace MassTransit.Middleware
+{
+ using System.Threading.Tasks;
+ using Transports;
+
+
+ public class SetPartitionKeyFilter :
+ IFilter>
+ where TMessage : class
+ {
+ readonly IMessagePartitionKeyFormatter _routingKeyFormatter;
+
+ public SetPartitionKeyFilter(IMessagePartitionKeyFormatter routingKeyFormatter)
+ {
+ _routingKeyFormatter = routingKeyFormatter;
+ }
+
+ public Task Send(SendContext context, IPipe> next)
+ {
+ var routingKey = _routingKeyFormatter.FormatPartitionKey(context);
+
+ if (context.TryGetPayload(out PartitionKeySendContext routingKeySendContext))
+ routingKeySendContext.PartitionKey = routingKey;
+
+ return next.Send(context);
+ }
+
+ public void Probe(ProbeContext context)
+ {
+ context.CreateFilterScope("setPartitionKey");
+ }
+ }
+}
diff --git a/src/MassTransit/Scheduling/BaseScheduleMessageProvider.cs b/src/MassTransit/Scheduling/BaseScheduleMessageProvider.cs
index 049e614bc5c..33387cfccb8 100644
--- a/src/MassTransit/Scheduling/BaseScheduleMessageProvider.cs
+++ b/src/MassTransit/Scheduling/BaseScheduleMessageProvider.cs
@@ -29,19 +29,19 @@ public async Task> ScheduleSend(Uri destinationAddress, D
command.Destination, message);
}
- public Task CancelScheduledSend(Guid tokenId)
+ public Task CancelScheduledSend(Guid tokenId, CancellationToken cancellationToken)
{
- return CancelScheduledSend(tokenId, null);
+ return CancelScheduledSend(tokenId, null, cancellationToken);
}
- public Task CancelScheduledSend(Uri destinationAddress, Guid tokenId)
+ public Task CancelScheduledSend(Uri destinationAddress, Guid tokenId, CancellationToken cancellationToken)
{
- return CancelScheduledSend(tokenId, destinationAddress);
+ return CancelScheduledSend(tokenId, destinationAddress, cancellationToken);
}
protected abstract Task ScheduleSend(ScheduleMessage message, IPipe> pipe, CancellationToken cancellationToken);
- protected abstract Task CancelScheduledSend(Guid tokenId, Uri destinationAddress);
+ protected abstract Task CancelScheduledSend(Guid tokenId, Uri destinationAddress, CancellationToken cancellationToken);
}
diff --git a/src/MassTransit/Scheduling/DelayedScheduleMessageProvider.cs b/src/MassTransit/Scheduling/DelayedScheduleMessageProvider.cs
index 789648e13e3..1dd6babeb21 100644
--- a/src/MassTransit/Scheduling/DelayedScheduleMessageProvider.cs
+++ b/src/MassTransit/Scheduling/DelayedScheduleMessageProvider.cs
@@ -35,12 +35,12 @@ public async Task> ScheduleSend(Uri destinationAddress, D
return new ScheduledMessageHandle(scheduleMessagePipe.ScheduledMessageId ?? NewId.NextGuid(), scheduledTime, destinationAddress, message);
}
- public Task CancelScheduledSend(Guid tokenId)
+ public Task CancelScheduledSend(Guid tokenId, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
- public Task CancelScheduledSend(Uri destinationAddress, Guid tokenId)
+ public Task CancelScheduledSend(Uri destinationAddress, Guid tokenId, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
diff --git a/src/MassTransit/Scheduling/EndpointScheduleMessageProvider.cs b/src/MassTransit/Scheduling/EndpointScheduleMessageProvider.cs
index e402b4f6765..b3aa2833fe0 100644
--- a/src/MassTransit/Scheduling/EndpointScheduleMessageProvider.cs
+++ b/src/MassTransit/Scheduling/EndpointScheduleMessageProvider.cs
@@ -22,7 +22,7 @@ protected override async Task ScheduleSend(ScheduleMessage message, IPipe(new
InVar.CorrelationId,
InVar.Timestamp,
TokenId = tokenId
- })
+ }, cancellationToken)
.ConfigureAwait(false);
}
}
diff --git a/src/MassTransit/Scheduling/MessageScheduler.cs b/src/MassTransit/Scheduling/MessageScheduler.cs
index 0abca4aa2bb..ae1909caa4c 100644
--- a/src/MassTransit/Scheduling/MessageScheduler.cs
+++ b/src/MassTransit/Scheduling/MessageScheduler.cs
@@ -159,9 +159,9 @@ public async Task> ScheduleSend(Uri destinationAddress, D
return await _provider.ScheduleSend(destinationAddress, scheduledTime, send.Message, send.Pipe, cancellationToken).ConfigureAwait(false);
}
- public Task CancelScheduledSend(Uri destinationAddress, Guid tokenId)
+ public Task CancelScheduledSend(Uri destinationAddress, Guid tokenId, CancellationToken cancellationToken)
{
- return _provider.CancelScheduledSend(destinationAddress, tokenId);
+ return _provider.CancelScheduledSend(destinationAddress, tokenId, cancellationToken);
}
public Task> SchedulePublish(DateTime scheduledTime, T message, CancellationToken cancellationToken = default)
@@ -259,19 +259,19 @@ public Task> SchedulePublish(DateTime scheduledTime, obje
return ScheduleSend(destinationAddress, scheduledTime, values, pipe, cancellationToken);
}
- public Task CancelScheduledPublish(Guid tokenId)
+ public Task CancelScheduledPublish(Guid tokenId, CancellationToken cancellationToken)
where T : class
{
var destinationAddress = GetPublishAddress();
- return CancelScheduledSend(destinationAddress, tokenId);
+ return CancelScheduledSend(destinationAddress, tokenId, cancellationToken);
}
- public Task CancelScheduledPublish(Type messageType, Guid tokenId)
+ public Task CancelScheduledPublish(Type messageType, Guid tokenId, CancellationToken cancellationToken)
{
var destinationAddress = GetPublishAddress(messageType);
- return CancelScheduledSend(destinationAddress, tokenId);
+ return CancelScheduledSend(destinationAddress, tokenId, cancellationToken);
}
Uri GetPublishAddress()
diff --git a/src/MassTransit/Scheduling/PublishScheduleMessageProvider.cs b/src/MassTransit/Scheduling/PublishScheduleMessageProvider.cs
index a81bc6f681f..4ccf934e7d6 100644
--- a/src/MassTransit/Scheduling/PublishScheduleMessageProvider.cs
+++ b/src/MassTransit/Scheduling/PublishScheduleMessageProvider.cs
@@ -20,14 +20,14 @@ protected override Task ScheduleSend(ScheduleMessage message, IPipe(new
{
InVar.CorrelationId,
InVar.Timestamp,
TokenId = tokenId
- });
+ }, cancellationToken);
}
}
}
diff --git a/src/MassTransit/Serialization/SystemTextJsonMessageSerializer.cs b/src/MassTransit/Serialization/SystemTextJsonMessageSerializer.cs
index 90992c299bc..557a90c3891 100644
--- a/src/MassTransit/Serialization/SystemTextJsonMessageSerializer.cs
+++ b/src/MassTransit/Serialization/SystemTextJsonMessageSerializer.cs
@@ -3,7 +3,6 @@ namespace MassTransit.Serialization
{
using System;
using System.Net.Mime;
- using System.Reflection;
using System.Runtime.Serialization;
using System.Text.Encodings.Web;
using System.Text.Json;
@@ -77,7 +76,11 @@ public SerializerContext Deserialize(MessageBody body, Headers headers, Uri? des
{
try
{
- var envelope = JsonSerializer.Deserialize(body.GetBytes(), Options);
+ JsonElement? bodyElement = body is JsonMessageBody jsonMessageBody
+ ? jsonMessageBody.GetJsonElement(Options)
+ : JsonSerializer.Deserialize(body.GetBytes(), Options);
+
+ var envelope = bodyElement?.Deserialize(Options);
if (envelope == null)
throw new SerializationException("Message envelope not found");
@@ -85,9 +88,7 @@ public SerializerContext Deserialize(MessageBody body, Headers headers, Uri? des
var messageTypes = envelope.MessageType ?? Array.Empty();
- var serializerContext = new SystemTextJsonSerializerContext(this, Options, ContentType, messageContext, messageTypes, envelope);
-
- return serializerContext;
+ return new SystemTextJsonSerializerContext(this, Options, ContentType, messageContext, messageTypes, envelope);
}
catch (SerializationException)
{
diff --git a/src/MassTransit/Serialization/SystemTextJsonRawMessageSerializer.cs b/src/MassTransit/Serialization/SystemTextJsonRawMessageSerializer.cs
index e1995764d76..2ded6093a17 100644
--- a/src/MassTransit/Serialization/SystemTextJsonRawMessageSerializer.cs
+++ b/src/MassTransit/Serialization/SystemTextJsonRawMessageSerializer.cs
@@ -39,18 +39,25 @@ public SerializerContext Deserialize(MessageBody body, Headers headers, Uri? des
{
try
{
- var bytes = body.GetBytes();
-
- var jsonElement = bytes.Length > 0
- ? JsonSerializer.Deserialize(bytes, SystemTextJsonMessageSerializer.Options)
- : JsonDocument.Parse("{}").RootElement;
+ JsonElement? bodyElement;
+ if (body is JsonMessageBody jsonMessageBody)
+ bodyElement = jsonMessageBody.GetJsonElement(SystemTextJsonMessageSerializer.Options);
+ else
+ {
+ var bytes = body.GetBytes();
+ bodyElement = bytes.Length > 0
+ ? JsonSerializer.Deserialize(bytes, SystemTextJsonMessageSerializer.Options)
+ : null;
+ }
+
+ bodyElement ??= JsonDocument.Parse("{}").RootElement;
var messageTypes = headers.GetMessageTypes();
var messageContext = new RawMessageContext(headers, destinationAddress, _options);
var serializerContext = new SystemTextJsonRawSerializerContext(SystemTextJsonMessageSerializer.Instance,
- SystemTextJsonMessageSerializer.Options, ContentType, messageContext, messageTypes, _options, jsonElement);
+ SystemTextJsonMessageSerializer.Options, ContentType, messageContext, messageTypes, _options, bodyElement.Value);
return serializerContext;
}
diff --git a/src/MassTransit/SqlTransport/Configuration/SqlBusFactoryConfiguratorExtensions.cs b/src/MassTransit/SqlTransport/Configuration/SqlBusFactoryConfiguratorExtensions.cs
deleted file mode 100644
index d5319f3d6f6..00000000000
--- a/src/MassTransit/SqlTransport/Configuration/SqlBusFactoryConfiguratorExtensions.cs
+++ /dev/null
@@ -1,29 +0,0 @@
-#nullable enable
-namespace MassTransit
-{
- using System;
- using SqlTransport.Configuration;
-
-
- public static class SqlBusFactoryConfiguratorExtensions
- {
- ///
- /// Create a bus using the database transport
- ///
- public static IBusControl CreateUsingDb(this IBusFactorySelector selector, Action configure)
- {
- return SqlBusFactory.Create(configure);
- }
-
- ///
- /// Configure the bus to use the database transport
- ///
- /// The registration configurator (configured via AddMassTransit)
- /// The configuration callback for the bus factory
- public static void UsingDb(this IBusRegistrationConfigurator configurator,
- Action? configure = null)
- {
- configurator.SetBusFactory(new SqlRegistrationBusFactory(configure));
- }
- }
-}
diff --git a/src/MassTransit/SqlTransport/Configuration/SqlScheduleMessageExtensions.cs b/src/MassTransit/SqlTransport/Configuration/SqlScheduleMessageExtensions.cs
index 2da36e71c45..4b225d2d6f8 100644
--- a/src/MassTransit/SqlTransport/Configuration/SqlScheduleMessageExtensions.cs
+++ b/src/MassTransit/SqlTransport/Configuration/SqlScheduleMessageExtensions.cs
@@ -1,16 +1,31 @@
namespace MassTransit
{
using System;
+ using DependencyInjection;
+ using Microsoft.Extensions.DependencyInjection;
+ using Microsoft.Extensions.DependencyInjection.Extensions;
+ using Scheduling;
using SqlTransport.Configuration;
+ using Transports;
public static class SqlScheduleMessageExtensions
{
///
- /// Uses the database transport's built-in message scheduler
+ /// Uses the SQL transport's built-in message scheduler
///
///
+ [Obsolete("Use the renamed UseSqlMessageScheduler instead")]
public static void UseDbMessageScheduler(this IBusFactoryConfigurator configurator)
+ {
+ UseSqlMessageScheduler(configurator);
+ }
+
+ ///
+ /// Uses the SQL transport's built-in message scheduler
+ ///
+ ///
+ public static void UseSqlMessageScheduler(this IBusFactoryConfigurator configurator)
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
@@ -19,5 +34,43 @@ public static void UseDbMessageScheduler(this IBusFactoryConfigurator configurat
configurator.AddPrePipeSpecification(pipeBuilderConfigurator);
}
+
+ ///
+ /// Add a to the container that uses the SQL Transport message enqueue time to schedule messages.
+ ///
+ ///
+ public static void AddSqlMessageScheduler(this IBusRegistrationConfigurator configurator)
+ {
+ configurator.TryAddScoped(provider =>
+ {
+ var busInstance = provider.GetRequiredService>().Value;
+ var sendEndpointProvider = provider.GetRequiredService();
+
+ var hostConfiguration = busInstance.HostConfiguration as ISqlHostConfiguration
+ ?? throw new ArgumentException("The SQL transport configuration was not found");
+
+ return new MessageScheduler(new SqlScheduleMessageProvider(hostConfiguration, sendEndpointProvider), busInstance.Bus.Topology);
+ });
+ }
+
+ ///
+ /// Add a to the container that uses the SQL Transport message enqueue time to schedule messages.
+ ///
+ ///
+ public static void AddSqlMessageScheduler(this IBusRegistrationConfigurator configurator)
+ where TBus : class, IBus
+ {
+ configurator.TryAddScoped(provider =>
+ {
+ var busInstance = provider.GetRequiredService>().Value;
+ var sendEndpointProvider = provider.GetRequiredService();
+
+ var hostConfiguration = busInstance.HostConfiguration as ISqlHostConfiguration
+ ?? throw new ArgumentException("The SQL transport configuration was not found");
+
+ return Bind.Create(
+ new MessageScheduler(new SqlScheduleMessageProvider(hostConfiguration, sendEndpointProvider), busInstance.Bus.Topology));
+ });
+ }
}
}
diff --git a/src/MassTransit/SqlTransport/Configuration/SqlTransportOptions.cs b/src/MassTransit/SqlTransport/Configuration/SqlTransportOptions.cs
index 917fa713e9f..936df2d3186 100644
--- a/src/MassTransit/SqlTransport/Configuration/SqlTransportOptions.cs
+++ b/src/MassTransit/SqlTransport/Configuration/SqlTransportOptions.cs
@@ -13,4 +13,9 @@ public class SqlTransportOptions
public string? AdminUsername { get; set; }
public string? AdminPassword { get; set; }
+
+ ///
+ /// Optional, if specified, will be parsed to capture additional properties on the connection.
+ ///
+ public string? ConnectionString { get; set; }
}
diff --git a/src/MassTransit/SqlTransport/Scheduling/SqlScheduleMessageProvider.cs b/src/MassTransit/SqlTransport/Scheduling/SqlScheduleMessageProvider.cs
index 3fc40bddccd..d8cc5c71825 100644
--- a/src/MassTransit/SqlTransport/Scheduling/SqlScheduleMessageProvider.cs
+++ b/src/MassTransit/SqlTransport/Scheduling/SqlScheduleMessageProvider.cs
@@ -5,16 +5,32 @@ namespace MassTransit.Scheduling
using System.Threading;
using System.Threading.Tasks;
using SqlTransport;
+ using SqlTransport.Configuration;
+ using Transports;
public class SqlScheduleMessageProvider :
IScheduleMessageProvider
{
- readonly ConsumeContext _context;
+ readonly Func, CancellationToken, Task> _cancel;
+ readonly ConsumeContext? _context;
+ readonly ISqlHostConfiguration? _hostConfiguration;
+ readonly ISendEndpointProvider _sendEndpointProvider;
public SqlScheduleMessageProvider(ConsumeContext context)
{
_context = context;
+ _sendEndpointProvider = context;
+
+ _cancel = RetryUsingContext;
+ }
+
+ public SqlScheduleMessageProvider(ISqlHostConfiguration hostConfiguration, ISendEndpointProvider sendEndpointProvider)
+ {
+ _hostConfiguration = hostConfiguration;
+ _sendEndpointProvider = sendEndpointProvider;
+
+ _cancel = RetryUsingHostConfiguration;
}
public async Task> ScheduleSend(Uri destinationAddress, DateTime scheduledTime, T message, IPipe> pipe,
@@ -30,7 +46,7 @@ public async Task> ScheduleSend(Uri destinationAddress, D
schedulePipe.ScheduledMessageId = tokenId;
- var endpoint = await _context.GetSendEndpoint(destinationAddress).ConfigureAwait(false);
+ var endpoint = await _sendEndpointProvider.GetSendEndpoint(destinationAddress).ConfigureAwait(false);
await endpoint.Send(message, schedulePipe, cancellationToken).ConfigureAwait(false);
@@ -40,24 +56,65 @@ public async Task> ScheduleSend(Uri destinationAddress, D
return new ScheduledMessageHandle(schedulePipe.ScheduledMessageId ?? NewId.NextGuid(), scheduledTime, destinationAddress, message);
}
- public async Task CancelScheduledSend(Guid tokenId)
+ public Task CancelScheduledSend(Guid tokenId, CancellationToken cancellationToken)
{
- if (!_context.TryGetPayload(out ClientContext? clientContext))
- throw new ArgumentException("The client context was not available", nameof(_context));
+ return _cancel(async clientContext =>
+ {
+ var deleted = await clientContext.DeleteScheduledMessage(tokenId, cancellationToken).ConfigureAwait(false);
+ if (deleted)
+ LogContext.Debug?.Log("CANCEL {TokenId}", tokenId);
+ }, cancellationToken);
+ }
- var deleted = await clientContext.DeleteScheduledMessage(tokenId).ConfigureAwait(false);
- if (deleted)
- LogContext.Debug?.Log("CANCEL {TokenId}", tokenId);
+ public Task CancelScheduledSend(Uri destinationAddress, Guid tokenId, CancellationToken cancellationToken)
+ {
+ return _cancel(async clientContext =>
+ {
+ var deleted = await clientContext.DeleteScheduledMessage(tokenId, cancellationToken).ConfigureAwait(false);
+ if (deleted)
+ LogContext.Debug?.Log("CANCEL {DestinationAddress} {TokenId}", destinationAddress, tokenId);
+ }, cancellationToken);
}
- public async Task CancelScheduledSend(Uri destinationAddress, Guid tokenId)
+ Task RetryUsingContext(Func callback, CancellationToken cancellationToken)
{
- if (!_context.TryGetPayload(out ClientContext? clientContext))
+ if (!_context!.TryGetPayload(out ClientContext? clientContext))
throw new ArgumentException("The client context was not available", nameof(_context));
- var deleted = await clientContext.DeleteScheduledMessage(tokenId).ConfigureAwait(false);
- if (deleted)
- LogContext.Debug?.Log("CANCEL {DestinationAddress} {TokenId}", destinationAddress, tokenId);
+ return callback(clientContext);
+ }
+
+ Task RetryUsingHostConfiguration(Func callback, CancellationToken cancellationToken)
+ {
+ var pipe = new ClientContextPipe(callback, cancellationToken);
+
+ return _hostConfiguration.Retry(() => _hostConfiguration!.ConnectionContextSupervisor.Send(pipe, cancellationToken), cancellationToken,
+ _hostConfiguration!.ConnectionContextSupervisor.Stopping);
+ }
+
+
+ class ClientContextPipe :
+ IPipe
+ {
+ readonly Func _callback;
+ readonly CancellationToken _cancellationToken;
+
+ public ClientContextPipe(Func callback, CancellationToken cancellationToken)
+ {
+ _callback = callback;
+ _cancellationToken = cancellationToken;
+ }
+
+ public Task Send(ConnectionContext context)
+ {
+ var clientContext = context.CreateClientContext(_cancellationToken);
+
+ return _callback(clientContext);
+ }
+
+ public void Probe(ProbeContext context)
+ {
+ }
}
}
}
diff --git a/src/MassTransit/SqlTransport/SqlBusFactory.cs b/src/MassTransit/SqlTransport/SqlBusFactory.cs
index f1dc9704419..38c192b9ddc 100644
--- a/src/MassTransit/SqlTransport/SqlBusFactory.cs
+++ b/src/MassTransit/SqlTransport/SqlBusFactory.cs
@@ -1,7 +1,6 @@
namespace MassTransit
{
using System;
- using System.Threading;
using Configuration;
using SqlTransport.Configuration;
using SqlTransport.Topology;
@@ -10,8 +9,6 @@
public static class SqlBusFactory
{
- public static IMessageTopologyConfigurator MessageTopology => Cached.MessageTopologyValue.Value;
-
///
/// Create a bus using the database transport
///
@@ -19,7 +16,7 @@ public static class SqlBusFactory
///
public static IBusControl Create(Action configure)
{
- var topologyConfiguration = new SqlTopologyConfiguration(MessageTopology);
+ var topologyConfiguration = new SqlTopologyConfiguration(CreateMessageTopology());
var busConfiguration = new SqlBusConfiguration(topologyConfiguration);
var configurator = new SqlBusFactoryConfigurator(busConfiguration);
@@ -29,15 +26,19 @@ public static IBusControl Create(Action configure)
return configurator.Build(busConfiguration);
}
+ public static IMessageTopologyConfigurator CreateMessageTopology()
+ {
+ return new MessageTopology(Cached.EntityNameFormatter);
+ }
+
static class Cached
{
- internal static readonly Lazy MessageTopologyValue;
+ internal static readonly IEntityNameFormatter EntityNameFormatter;
static Cached()
{
- IEntityNameFormatter formatter = new MessageNameFormatterEntityNameFormatter(new SqlMessageNameFormatter());
- MessageTopologyValue = new Lazy(() => new MessageTopology(formatter), LazyThreadSafetyMode.PublicationOnly);
+ EntityNameFormatter = new MessageNameFormatterEntityNameFormatter(new SqlMessageNameFormatter());
}
}
}
diff --git a/src/MassTransit/SqlTransport/SqlTransport/ClientContext.cs b/src/MassTransit/SqlTransport/SqlTransport/ClientContext.cs
index 191da221bee..c7aebf0d986 100644
--- a/src/MassTransit/SqlTransport/SqlTransport/ClientContext.cs
+++ b/src/MassTransit/SqlTransport/SqlTransport/ClientContext.cs
@@ -58,7 +58,7 @@ Task> ReceiveMessages(string queueName, SqlRece
TimeSpan lockDuration);
Task DeleteMessage(Guid lockId, long messageDeliveryId);
- Task DeleteScheduledMessage(Guid tokenId);
+ Task DeleteScheduledMessage(Guid tokenId, CancellationToken cancellationToken);
Task MoveMessage(Guid lockId, long messageDeliveryId, string queueName, SqlQueueType queueType, SendHeaders sendHeaders);
Task RenewLock(Guid lockId, long messageDeliveryId, TimeSpan duration);
Task Unlock(Guid lockId, long messageDeliveryId, TimeSpan delay, SendHeaders sendHeaders);
diff --git a/src/MassTransit/SqlTransport/SqlTransport/Configuration/SqlRegistrationBusFactory.cs b/src/MassTransit/SqlTransport/SqlTransport/Configuration/SqlRegistrationBusFactory.cs
index 5baafff7713..cf6ed2a778e 100644
--- a/src/MassTransit/SqlTransport/SqlTransport/Configuration/SqlRegistrationBusFactory.cs
+++ b/src/MassTransit/SqlTransport/SqlTransport/Configuration/SqlRegistrationBusFactory.cs
@@ -14,7 +14,7 @@ public class SqlRegistrationBusFactory :
readonly Action? _configure;
public SqlRegistrationBusFactory(Action? configure)
- : this(new SqlBusConfiguration(new SqlTopologyConfiguration(SqlBusFactory.MessageTopology)), configure)
+ : this(new SqlBusConfiguration(new SqlTopologyConfiguration(SqlBusFactory.CreateMessageTopology())), configure)
{
}
diff --git a/src/MassTransit/SqlTransport/SqlTransport/ScopeClientContext.cs b/src/MassTransit/SqlTransport/SqlTransport/ScopeClientContext.cs
index 147eca8ed55..73e83224278 100644
--- a/src/MassTransit/SqlTransport/SqlTransport/ScopeClientContext.cs
+++ b/src/MassTransit/SqlTransport/SqlTransport/ScopeClientContext.cs
@@ -83,9 +83,9 @@ public Task DeleteMessage(Guid lockId, long messageDeliveryId)
return _context.DeleteMessage(lockId, messageDeliveryId);
}
- public Task DeleteScheduledMessage(Guid tokenId)
+ public Task DeleteScheduledMessage(Guid tokenId, CancellationToken cancellationToken)
{
- return _context.DeleteScheduledMessage(tokenId);
+ return _context.DeleteScheduledMessage(tokenId, cancellationToken);
}
public Task MoveMessage(Guid lockId, long messageDeliveryId, string queueName, SqlQueueType queueType, SendHeaders sendHeaders)
diff --git a/src/MassTransit/SqlTransport/SqlTransport/SharedClientContext.cs b/src/MassTransit/SqlTransport/SqlTransport/SharedClientContext.cs
index 6358afc3fb9..fb2df3fd1da 100644
--- a/src/MassTransit/SqlTransport/SqlTransport/SharedClientContext.cs
+++ b/src/MassTransit/SqlTransport/SqlTransport/SharedClientContext.cs
@@ -83,9 +83,9 @@ public Task DeleteMessage(Guid lockId, long messageDeliveryId)
return _context.DeleteMessage(lockId, messageDeliveryId);
}
- public Task DeleteScheduledMessage(Guid tokenId)
+ public Task DeleteScheduledMessage(Guid tokenId, CancellationToken cancellationToken)
{
- return _context.DeleteScheduledMessage(tokenId);
+ return _context.DeleteScheduledMessage(tokenId, cancellationToken);
}
public Task MoveMessage(Guid lockId, long messageDeliveryId, string queueName, SqlQueueType queueType, SendHeaders sendHeaders)
diff --git a/src/MassTransit/SqlTransport/SqlTransport/SqlClientContext.cs b/src/MassTransit/SqlTransport/SqlTransport/SqlClientContext.cs
index 401d505a624..2ab55c20ec4 100644
--- a/src/MassTransit/SqlTransport/SqlTransport/SqlClientContext.cs
+++ b/src/MassTransit/SqlTransport/SqlTransport/SqlClientContext.cs
@@ -39,7 +39,7 @@ public abstract Task> ReceiveMessages(string qu
TimeSpan lockDuration);
public abstract Task DeleteMessage(Guid lockId, long messageDeliveryId);
- public abstract Task DeleteScheduledMessage(Guid tokenId);
+ public abstract Task DeleteScheduledMessage(Guid tokenId, CancellationToken cancellationToken);
public abstract Task MoveMessage(Guid lockId, long messageDeliveryId, string queueName, SqlQueueType queueType, SendHeaders sendHeaders);
public abstract Task RenewLock(Guid lockId, long messageDeliveryId, TimeSpan duration);
public abstract Task Unlock(Guid lockId, long messageDeliveryId, TimeSpan delay, SendHeaders sendHeaders);
diff --git a/src/MassTransit/SqlTransport/SqlTransport/Topology/SqlConsumeTopology.cs b/src/MassTransit/SqlTransport/SqlTransport/Topology/SqlConsumeTopology.cs
index 8f0eaec038e..07699719912 100644
--- a/src/MassTransit/SqlTransport/SqlTransport/Topology/SqlConsumeTopology.cs
+++ b/src/MassTransit/SqlTransport/SqlTransport/Topology/SqlConsumeTopology.cs
@@ -65,7 +65,7 @@ public override IEnumerable Validate()
return base.Validate().Concat(_specifications.SelectMany(x => x.Validate()));
}
- protected override IMessageConsumeTopologyConfigurator CreateMessageTopology(Type type)
+ protected override IMessageConsumeTopologyConfigurator CreateMessageTopology()
{
var messageTopology = new SqlMessageConsumeTopology(_publishTopology.GetMessageTopology());
diff --git a/src/MassTransit/Testing/AsyncTestHarness.cs b/src/MassTransit/Testing/AsyncTestHarness.cs
index ef55feff9fc..881f7c306b0 100644
--- a/src/MassTransit/Testing/AsyncTestHarness.cs
+++ b/src/MassTransit/Testing/AsyncTestHarness.cs
@@ -19,7 +19,7 @@ public abstract class AsyncTestHarness :
protected AsyncTestHarness()
{
TestTimeout = Debugger.IsAttached ? TimeSpan.FromMinutes(50) : TimeSpan.FromSeconds(30);
- TestInactivityTimeout = TimeSpan.FromSeconds(6);
+ TestInactivityTimeout = Debugger.IsAttached ? TimeSpan.FromMinutes(30) : TimeSpan.FromSeconds(6);
_inactivityObserver = new Lazy(() => new AsyncInactivityObserver(TestInactivityTimeout, TestCancellationToken));
}
diff --git a/src/MassTransit/Testing/InMemoryTestHarness.cs b/src/MassTransit/Testing/InMemoryTestHarness.cs
index 1b765edddc0..8aebf70b422 100644
--- a/src/MassTransit/Testing/InMemoryTestHarness.cs
+++ b/src/MassTransit/Testing/InMemoryTestHarness.cs
@@ -27,7 +27,7 @@ public InMemoryTestHarness(string virtualHost, IEnumerable :
diff --git a/src/MassTransit/Topology/Configuration/IPartitionKeySendTopologyConvention.cs b/src/MassTransit/Topology/Configuration/IPartitionKeySendTopologyConvention.cs
new file mode 100644
index 00000000000..7bc14d45247
--- /dev/null
+++ b/src/MassTransit/Topology/Configuration/IPartitionKeySendTopologyConvention.cs
@@ -0,0 +1,7 @@
+namespace MassTransit.Configuration
+{
+ public interface IPartitionKeySendTopologyConvention :
+ ISendTopologyConvention
+ {
+ }
+}
diff --git a/src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Configuration/Topology/PartitionKeyMessageSendTopologyConvention.cs b/src/MassTransit/Topology/Configuration/PartitionKeyMessageSendTopologyConvention.cs
similarity index 93%
rename from src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Configuration/Topology/PartitionKeyMessageSendTopologyConvention.cs
rename to src/MassTransit/Topology/Configuration/PartitionKeyMessageSendTopologyConvention.cs
index 8eeca1092be..ead54a4cb74 100644
--- a/src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Configuration/Topology/PartitionKeyMessageSendTopologyConvention.cs
+++ b/src/MassTransit/Topology/Configuration/PartitionKeyMessageSendTopologyConvention.cs
@@ -1,6 +1,6 @@
-namespace MassTransit.AzureServiceBusTransport.Configuration
+namespace MassTransit.Configuration
{
- using MassTransit.Configuration;
+ using Transports;
public class PartitionKeyMessageSendTopologyConvention :
diff --git a/src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Configuration/Topology/PartitionKeySendTopologyConvention.cs b/src/MassTransit/Topology/Configuration/PartitionKeySendTopologyConvention.cs
similarity index 81%
rename from src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Configuration/Topology/PartitionKeySendTopologyConvention.cs
rename to src/MassTransit/Topology/Configuration/PartitionKeySendTopologyConvention.cs
index e0e08e27290..4412585b24c 100644
--- a/src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Configuration/Topology/PartitionKeySendTopologyConvention.cs
+++ b/src/MassTransit/Topology/Configuration/PartitionKeySendTopologyConvention.cs
@@ -1,8 +1,5 @@
-namespace MassTransit.AzureServiceBusTransport.Configuration
+namespace MassTransit.Configuration
{
- using MassTransit.Configuration;
-
-
public class PartitionKeySendTopologyConvention :
IPartitionKeySendTopologyConvention
{
@@ -10,8 +7,6 @@ public class PartitionKeySendTopologyConvention :
public PartitionKeySendTopologyConvention()
{
- DefaultFormatter = new EmptyPartitionKeyFormatter();
-
_cache = new TopologyConventionCache(typeof(IPartitionKeyMessageSendTopologyConvention<>), new Factory());
}
@@ -20,8 +15,6 @@ bool IMessageSendTopologyConvention.TryGetMessageSendTopologyConvention(out I
return _cache.GetOrAdd>().TryGetMessageSendTopologyConvention(out convention);
}
- public IPartitionKeyFormatter DefaultFormatter { get; set; }
-
class Factory :
IConventionTypeFactory
diff --git a/src/MassTransit/Topology/Configuration/SetPartitionKeyMessageSendTopology.cs b/src/MassTransit/Topology/Configuration/SetPartitionKeyMessageSendTopology.cs
new file mode 100644
index 00000000000..1d5c97ec475
--- /dev/null
+++ b/src/MassTransit/Topology/Configuration/SetPartitionKeyMessageSendTopology.cs
@@ -0,0 +1,26 @@
+namespace MassTransit.Configuration;
+
+using System;
+using Middleware;
+using Transports;
+
+
+public class SetPartitionKeyMessageSendTopology :
+ IMessageSendTopology
+ where TMessage : class
+{
+ readonly IFilter> _filter;
+
+ public SetPartitionKeyMessageSendTopology(IMessagePartitionKeyFormatter partitionKeyFormatter)
+ {
+ if (partitionKeyFormatter == null)
+ throw new ArgumentNullException(nameof(partitionKeyFormatter));
+
+ _filter = new SetPartitionKeyFilter(partitionKeyFormatter);
+ }
+
+ public void Apply(ITopologyPipeBuilder> builder)
+ {
+ builder.AddFilter(_filter);
+ }
+}
diff --git a/src/MassTransit/Topology/ConsumeTopology.cs b/src/MassTransit/Topology/ConsumeTopology.cs
index fa6adc09bd7..1de22c27b25 100644
--- a/src/MassTransit/Topology/ConsumeTopology.cs
+++ b/src/MassTransit/Topology/ConsumeTopology.cs
@@ -7,6 +7,7 @@ namespace MassTransit
using System.Security.Cryptography;
using System.Text;
using Configuration;
+ using Metadata;
using NewIdFormatters;
@@ -15,19 +16,22 @@ public class ConsumeTopology :
IConsumeTopologyConfigurationObserver
{
readonly List