Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update from original #1

Merged
merged 27 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3eb41b3
Fixed #5125 - Added overloads for the Entity Framework outbox to disa…
phatboyg Apr 25, 2024
7997da1
Related to #5114 - mute the InvalidOperationException error from PGSQ…
phatboyg Apr 25, 2024
7a28c39
Related to #5060 - handling of SQL Server instance/host/port in all s…
phatboyg Apr 25, 2024
af7a812
Fixed #5130 - Changed SessionId and PartitionKey filters on Azure Ser…
phatboyg Apr 25, 2024
e7c59ea
Standardized the topology for partition key across service bus, event…
phatboyg Apr 25, 2024
2af9f8f
Package updates, consistent use of language version across all src/te…
phatboyg Apr 25, 2024
c287ee3
Related to #3303 - Added support for ICredentialsProvider and perhaps…
phatboyg Apr 25, 2024
be70dec
Fixed #5137 - Updated SqlTransportOptions so that it allows for addit…
phatboyg Apr 26, 2024
1050df6
Added WhenAllCompletedOrFaulted to Durable Future
zyofeng Apr 9, 2024
f36c14f
Cleaned up warnings on RabbitMQ ConfigurationHostSettings
phatboyg Apr 26, 2024
71429ff
Updated version to 8.2.3
phatboyg Apr 26, 2024
25fd050
Fixed #5141 - Quotes around username in Postgres Migrator
phatboyg Apr 27, 2024
12563fa
Added xmin to PostgresLockStatementFormatter.cs queries to allow for …
phatboyg May 1, 2024
1d640dc
Fixed #4510 - Properly handle cross-region or SNS deliveries that are…
phatboyg May 2, 2024
5e70659
Added formatting option for raw TEXT to avoid excessive indention.
phatboyg May 2, 2024
c9561ee
Fixed #5146 - Extend the inactivity timeout when the debugger is atta…
phatboyg May 2, 2024
25d6373
Added ReplyTo as a settable property on the ServiceBusSendContext
phatboyg May 2, 2024
0381674
Fixed "Submitted policy is over max allowed size" exception, serializ…
sergiuciudin Feb 29, 2024
3e8be46
Added ConfigureConsumeTopology overload that accepts a messageType pa…
phatboyg May 3, 2024
6a81e2d
Added endpoint strategies to the documentation
phatboyg May 8, 2024
4fe74cd
Add SQL Quick Start
drusellers May 8, 2024
4c37cd8
Removed unused configuration methods.
phatboyg May 8, 2024
231fb93
Fixed #5185 - Missing parameter on PostgreSQL query, also using wrong…
phatboyg May 15, 2024
2a56e02
Fixed #5191 - Redelivery using SQL Server with the SQL Transport was …
phatboyg May 15, 2024
40775e9
AddSqlMessageScheduler() method created so that scheduled messages ca…
phatboyg May 17, 2024
b2ce0ef
Add requestTimeout to SendRequest on mediator.
phatboyg May 17, 2024
be8e177
Separated MessageTopology so that each bus gets its own topology to a…
phatboyg May 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: MassTransit
env:
MASSTRANSIT_VERSION: 8.2.2
MASSTRANSIT_VERSION: 8.2.3
on:
push:
paths:
Expand Down
16 changes: 8 additions & 8 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="3.7.301.8" />
<PackageVersion Include="AWSSDK.S3" Version="3.7.307.1" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.300.60" />
<PackageVersion Include="Azure.Identity" Version="1.11.0" />
<PackageVersion Include="Azure.Identity" Version="1.11.2" />
<PackageVersion Include="Azure.Messaging.EventHubs.Processor" Version="5.11.2" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.19.1" />
Expand All @@ -26,16 +26,16 @@
<PackageVersion Include="Iesi.Collections" Version="4.1.1" />
<PackageVersion Include="Ionic.Zlib.Core" Version="1.0.0" />
<PackageVersion Include="Marten" Version="5.11.0" Condition="'$(TargetFramework)' != 'net6.0' AND '$(TargetFramework)' != 'net8.0'" />
<PackageVersion Include="Marten" Version="7.7.0" Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net8.0'" />
<PackageVersion Include="Marten" Version="7.9.0" Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net8.0'" />
<PackageVersion Include="MathNet.Numerics" Version="5.0.0" />
<PackageVersion Include="MediatR" Version="12.2.0" />
<PackageVersion Include="Microsoft.ApplicationInsights.AspNetCore" Version="2.22.0" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Core" Version="1.1.0" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Protocols.MessagePack" Version="7.0.13" />
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="3.38.1" />
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="3.39.1" />
<PackageVersion Include="Microsoft.Azure.Cosmos.Table" Version="1.0.8" />
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="6.3.0" />
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="5.14.0" />
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="6.3.1" />
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="5.15.1" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="3.3.4" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.1.0" />
Expand Down Expand Up @@ -73,14 +73,14 @@
<PackageVersion Include="NHibernate" Version="5.5.1" />
<PackageVersion Include="Npgsql" Version="8.0.0" />
<PackageVersion Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.0" />
<PackageVersion Include="NUnit" Version="4.1.0"/>
<PackageVersion Include="NUnit.Analyzers" Version="4.1.0">
<PackageVersion Include="NUnit" Version="4.1.0" />
<PackageVersion Include="NUnit.Analyzers" Version="4.2.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
<PackageVersion Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Jaeger" Version="1.5.1" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
<PackageVersion Include="prometheus-net" Version="6.0.0" />
<PackageVersion Include="Quartz" Version="3.8.1" />
<PackageVersion Include="Quartz.Extensions.Hosting" Version="3.8.1" />
Expand Down
1 change: 1 addition & 0 deletions MassTransit.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/INDENT_PREPROCESSOR_IF/@EntryValue">OUTDENT</s:String>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/INDENT_PREPROCESSOR_OTHER/@EntryValue">USUAL_INDENT</s:String>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/INDENT_PREPROCESSOR_REGION/@EntryValue">OUTDENT</s:String>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/INDENT_RAW_LITERAL_STRING/@EntryValue">INDENT</s:String>
<s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_BLANK_LINES_IN_DECLARATIONS/@EntryValue">1</s:Int64>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_EXISTING_DECLARATION_PARENS_ARRANGEMENT/@EntryValue">False</s:Boolean>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_EXISTING_EMBEDDED_ARRANGEMENT/@EntryValue">False</s:Boolean>
Expand Down
7 changes: 7 additions & 0 deletions doc/content/2.quick-starts/0.index.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,12 @@ Transports
#description
Requires an AWS account
::

::card
#title
[PostgreSQL](/quick-starts/postgresql)
#description
For smaller setups
::
::

5 changes: 4 additions & 1 deletion doc/content/2.quick-starts/2.rabbitmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ This tutorial will get you from zero to up and running with [RabbitMQ](/document
The following instructions assume you are starting from a completed [In-Memory Quick Start](/quick-starts/in-memory)
::

This example requires a functioning installation of the .NET Runtime and SDK (at least 6.0) and a functioning installation of _Docker_ with _Docker Compose_ support enabled.
This example requires the following:

- a functioning installation of the .NET Runtime and SDK (at least 6.0)
- a functioning installation of _Docker_ with _Docker Compose_ support enabled.

## Run RabbitMQ

Expand Down
125 changes: 125 additions & 0 deletions doc/content/2.quick-starts/5.postgresql.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
---
navigation.title: PostgreSQL
---

# PostgreSQL Quick Start

> This tutorial will get you from zero to up and running with [SQL](/documentation/transports/sql) and MassTransit.

> Walkthrough Video TBD

- The source for this sample is available [on GitHub](https://github.com/MassTransit/Sample-GettingStarted).

## Prerequisites

::alert{type="info"}
The following instructions assume you are starting from a completed [In-Memory Quick Start](/quick-starts/in-memory)
::

This example requires the following:

- a functioning installation of the dotnet runtime and sdk (at least 6.0)
- a functioning installation of _Docker_ with _Docker Compose_ support enabled.

## Run PostgreSQL

For this quick start, we recommend running the preconfigured [official Docker image of Postgres](https://hub.docker.com/_/postgres).

```bash
$ docker run -p 5432:5432 postgres
```

If you are running on an ARM platform

```bash
$ docker run --platform linux/arm64 -p 5432:5432 postgres
```

Once its up and running you can use your preferred tool to browse into the database.

## Configure PostgreSQL

Add the _MassTransit.SqlTransport.PostgreSQL_ package to the project.

```bash
$ dotnet add package MassTransit.SqlTransport.PostgreSQL
```

### Edit Program.cs

Change _UsingInMemory_ to _UsingPostgres_ as shown below.

```csharp
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddOptions<SqlTransportOptions>().Configure(options =>
{
options.Host = "localhost";
options.Database = "sample";
options.Schema = "transport";
options.Role = "transport";
options.Username = "masstransit";
options.Password = "H4rd2Gu3ss!";

// credentials to run migrations
options.AdminUsername = "migration-user";
options.AdminPassword = "H4rderTooGu3ss!!;
});
// MassTransit will run the migrations on start up
services.AddPostgresMigrationHostedService();
services.AddMassTransit(x =>
{
// elided...

x.UsingPostgres((context,cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});

services.AddHostedService<Worker>();
});
```

| Setting | Description |
|-----------------|-------------------------------------------------------------------------------------------|
| `Host` | The host to connect to. We are using `localhost` to connect to the docker container |
| `Port` | We are using the default `5432`, so we aren't setting it. |
| `Database` | The name of the database to connect to |
| `Schema` | The schema to place the tables and functions inside of |
| `Role` | the role to assign for all created tables, functions, etc. |
| `Username` | The username of the user to login as for normal operations |
| `Password` | The password of the user to login as for normal operations |
| `AdminUsername` | The username of the admin user to login as when running migration commands |
| `AdminPassword` | The password of the admin user to login as when running migration commands |


## Run the Project

```bash
$ dotnet run
```

The output should have changed to show the message consumer generating the output (again, press Control+C to exit). Notice that the bus address now starts with _db_.

```
Building...
info: MassTransit[0]
Configured endpoint Message, Consumer: GettingStarted.MessageConsumer
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
Content root path: /Users/chris/Garbage/start/GettingStarted
info: MassTransit[0]
Bus started: db://localhost/
info: GettingStarted.MessageConsumer[0]
Received Text: The time is 3/24/2021 12:11:10 PM -05:00
```

At this point the service is connecting to PostgreSQL on _localhost_ and publishing messages which are received by the consumer.

:tada:
41 changes: 41 additions & 0 deletions doc/content/3.documentation/2.configuration/0.index.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,47 @@ x.AddConfigureEndpointsCallback((name, cfg) =>
});
```

## Endpoint Strategies

Deciding how to configure receive endpoints in your application can be easy or hard, depending upon how much energy you want to spend being concerned with things that usually don't matter. However, there are nuances to the following approaches that should be considered.

### One Consumer for Each Queue

Creates a queue for each registered consumer, saga, and routing slip activity. Separate queues are created for execute and compensate if compensation is supported by the activity.

::alert{type="info"}
This is the preferred approach since it ensures that every consumer can be configured independently, including retries, delivery, and the outbox. It also ensures that messages for a consumer are not stuck behind other messages for other consumers sharing the same queue.
::

### Multiple Consumers on a Single Queue

Configuring multiple consumers, while fully supported by MassTransit, may make sense in certain circumstances, however, proceed with caution as there are limitations to this approach.

The recommendation here is to configure multiple consumers on a single queue only when those consumers are closely related in terms of business function and each consumer consumes distinct message types. An example might be consumers that each create, update, or delete an entity when the dependencies of those operations are different – create and update may depend upon a validation component, while delete may not share that dependency.

#### Consume Multiple Message Types

In situations where it is preferable to consume multiple message types from a single queue, create a consumer that consumes multiple message types by adding more IConsumer<T> interface implementations to the consumer class.

```csharp
public class AddressConsumer :
IConsumer<CreateAddress>,
IConsumer<UpdateAddress>
{
}
```

Sagas follow this approach, creating a single queue for each saga and configuring the broker to route message types consumed by the saga that are published to topic/exchanges to the saga’s queue.

### All Consumers on a Single Queue

This is never a good idea and is highly discouraged. While it is supported by MassTransit, it’s unlikely to be operationally sustainable.

Routing slip activities must not be configured on a single queue as they will not work properly.




## Endpoint Name Formatters

_ConfigureEndpoints_ uses an `IEndpointNameFormatter` to format the queue names for all supported consumer types. The default endpoint name formatter returns _PascalCase_ class names without the namespace. There are several built-in endpoint name formatters included. For the _SubmitOrderConsumer_, the receive endpoint names would be formatted as shown below. Note that class suffixes such as _Consumer_, _Saga_, and _Activity_ are trimmed from the endpoint name by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public interface IReceiveEndpointConfigurator :
void ConfigureMessageTopology<T>(bool enabled = true)
where T : class;

/// <summary>
/// Configures whether the broker topology is configured for the specified message type. Related to
/// <see cref="ConfigureConsumeTopology" />, but for an individual message type.
/// </summary>
void ConfigureMessageTopology(Type messageType, bool enabled = true);

[EditorBrowsable(EditorBrowsableState.Never)]
void AddEndpointSpecification(IReceiveEndpointSpecification configurator);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace MassTransit
{
using System;
using Configuration;


Expand All @@ -15,19 +16,17 @@ public interface IConsumeTopologyConfigurator :
new IMessageConsumeTopologyConfigurator<T> GetMessageTopology<T>()
where T : class;

/// <summary>
/// Returns the specification for the message type
/// </summary>
/// <returns></returns>
IMessageConsumeTopologyConfigurator GetMessageTopology(Type messageType);

/// <summary>
/// Adds a convention to the topology, which will be applied to every message type
/// requested, to determine if a convention for the message type is available.
/// </summary>
/// <param name="convention">The Consume topology convention</param>
bool TryAddConvention(IConsumeTopologyConvention convention);

/// <summary>
/// Add a Consume topology for a specific message type
/// </summary>
/// <typeparam name="T">The message type</typeparam>
/// <param name="topology">The topology</param>
void AddMessageConsumeTopology<T>(IMessageConsumeTopology<T> topology)
where T : class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ public interface IMessageConsumeTopologyConfigurator<TMessage> :
IMessageConsumeTopology<TMessage>
where TMessage : class
{
/// <summary>
/// Specify whether the broker topology should be configured for this message type
/// (defaults to true)
/// </summary>
bool ConfigureConsumeTopology { get; set; }

void Add(IMessageConsumeTopology<TMessage> consumeTopology);

/// <summary>
Expand Down Expand Up @@ -60,6 +54,12 @@ void AddOrUpdateConvention<TConvention>(Func<TConvention> add, Func<TConvention,
public interface IMessageConsumeTopologyConfigurator :
ISpecification
{
/// <summary>
/// Specify whether the broker topology should be configured for this message type
/// (defaults to true)
/// </summary>
bool ConfigureConsumeTopology { get; set; }

bool TryAddConvention(IConsumeTopologyConvention convention);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,7 @@ public interface IMessagePublishTopologyConfigurator :
/// Exclude the message type from being created as a topic/exchange.
/// </summary>
new bool Exclude { set; }

bool TryAddConvention(IPublishTopologyConvention convention);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,6 @@ bool TryGetConvention<TConvention>([NotNullWhen(true)] out TConvention? conventi
public interface IMessageSendTopologyConfigurator :
ISpecification
{
bool TryAddConvention(ISendTopologyConvention convention);
}
}
6 changes: 4 additions & 2 deletions src/MassTransit.Abstractions/MediatorRequestExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ public static class MediatorRequestExtensions
/// <param name="mediator"></param>
/// <param name="request">The request message</param>
/// <param name="cancellationToken"></param>
/// <param name="timeout"></param>
/// <typeparam name="T">The response type</typeparam>
/// <returns>The response object</returns>
public static async Task<T> SendRequest<T>(this IMediator mediator, Request<T> request, CancellationToken cancellationToken = default)
public static async Task<T> SendRequest<T>(this IMediator mediator, Request<T> request, CancellationToken cancellationToken = default,
RequestTimeout timeout = default)
where T : class
{
try
{
using RequestHandle<Request<T>> handle = mediator.CreateRequest(request, cancellationToken);
using RequestHandle<Request<T>> handle = mediator.CreateRequest(request, cancellationToken, timeout);

Response<T> response = await handle.GetResponse<T>().ConfigureAwait(false);

Expand Down
9 changes: 6 additions & 3 deletions src/MassTransit.Abstractions/Scheduling/IMessageScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ Task<ScheduledMessage<T>> ScheduleSend<T>(Uri destinationAddress, DateTime sched
/// </summary>
/// <param name="destinationAddress">The destination address of the scheduled message</param>
/// <param name="tokenId">The tokenId of the scheduled message</param>
Task CancelScheduledSend(Uri destinationAddress, Guid tokenId);
/// <param name="cancellationToken"></param>
Task CancelScheduledSend(Uri destinationAddress, Guid tokenId, CancellationToken cancellationToken = default);

/// <summary>
/// Send a message
Expand Down Expand Up @@ -278,7 +279,8 @@ Task<ScheduledMessage<T>> SchedulePublish<T>(DateTime scheduledTime, object valu
/// the destinationAddress.
/// </summary>
/// <param name="tokenId">The tokenId of the scheduled message</param>
Task CancelScheduledPublish<T>(Guid tokenId)
/// <param name="cancellationToken"></param>
Task CancelScheduledPublish<T>(Guid tokenId, CancellationToken cancellationToken = default)
where T : class;

/// <summary>
Expand All @@ -287,6 +289,7 @@ Task CancelScheduledPublish<T>(Guid tokenId)
/// </summary>
/// <param name="messageType"></param>
/// <param name="tokenId">The tokenId of the scheduled message</param>
Task CancelScheduledPublish(Type messageType, Guid tokenId);
/// <param name="cancellationToken"></param>
Task CancelScheduledPublish(Type messageType, Guid tokenId, CancellationToken cancellationToken = default);
}
}
Loading
Loading