Skip to content

Commit

Permalink
Added consumer instance pinning for partition keys, and ensuring part…
Browse files Browse the repository at this point in the history
…itions are ordered and allow concurrent delivery
  • Loading branch information
phatboyg committed Apr 11, 2024
1 parent 9771ce5 commit ae18abb
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 35 deletions.
3 changes: 2 additions & 1 deletion src/MassTransit/SqlTransport/SqlTransport/ClientContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ Task Send<T>(string queueName, SqlMessageSendContext<T> context)
Task Publish<T>(string topicName, SqlMessageSendContext<T> context)
where T : class;

Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(string queueName, SqlReceiveMode mode, int messageLimit, TimeSpan lockDuration);
Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(string queueName, SqlReceiveMode mode, int messageLimit, int concurrentCount,
TimeSpan lockDuration);

Task<bool> DeleteMessage(Guid lockId, long messageDeliveryId);
Task<bool> DeleteScheduledMessage(Guid tokenId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Task Handle(SqlTransportMessage message, CancellationToken cancellationToken)
try
{
while (!IsStopping)
await algorithm.Run(ReceiveMessages, (m, c) => Handle(m, c), Stopping).ConfigureAwait(false);
await algorithm.Run((messageLimit, token) => ReceiveMessages(messageLimit, token), (m, c) => Handle(m, c), Stopping).ConfigureAwait(false);
}
catch (OperationCanceledException exception) when (exception.CancellationToken == Stopping)
{
Expand Down Expand Up @@ -107,7 +107,7 @@ async Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(int messageLimit, C
try
{
IList<SqlTransportMessage> messages = (await _client.ReceiveMessages(_receiveSettings.EntityName, _receiveSettings.ReceiveMode, messageLimit,
_receiveSettings.LockDuration).ConfigureAwait(false)).ToList();
_receiveSettings.ConcurrentDeliveryLimit, _receiveSettings.LockDuration).ConfigureAwait(false)).ToList();

if (messages.Count > 0)
return messages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ public Task<bool> Unlock(Guid lockId, long messageDeliveryId, TimeSpan delay, Se
return _context.Unlock(lockId, messageDeliveryId, delay, sendHeaders);
}

public Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(string queueName, SqlReceiveMode mode, int messageLimit, TimeSpan lockDuration)
public Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(string queueName, SqlReceiveMode mode, int messageLimit, int concurrentLimit,
TimeSpan lockDuration)
{
return _context.ReceiveMessages(queueName, mode, messageLimit, lockDuration);
return _context.ReceiveMessages(queueName, mode, messageLimit, concurrentLimit, lockDuration);
}

public Task<bool> DeleteMessage(Guid lockId, long messageDeliveryId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ public Task<bool> Unlock(Guid lockId, long messageDeliveryId, TimeSpan delay, Se
return _context.Unlock(lockId, messageDeliveryId, delay, sendHeaders);
}

public Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(string queueName, SqlReceiveMode mode, int messageLimit, TimeSpan lockDuration)
public Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(string queueName, SqlReceiveMode mode, int messageLimit, int concurrentLimit,
TimeSpan lockDuration)
{
return _context.ReceiveMessages(queueName, mode, messageLimit, lockDuration);
return _context.ReceiveMessages(queueName, mode, messageLimit, concurrentLimit, lockDuration);
}

public Task<bool> DeleteMessage(Guid lockId, long messageDeliveryId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public abstract Task Send<T>(string queueName, SqlMessageSendContext<T> context)
public abstract Task Publish<T>(string topicName, SqlMessageSendContext<T> context)
where T : class;

public abstract Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(string queueName, SqlReceiveMode mode, int messageLimit,
public abstract Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(string queueName, SqlReceiveMode mode, int messageLimit, int concurrentLimit,
TimeSpan lockDuration);

public abstract Task<bool> DeleteMessage(Guid lockId, long messageDeliveryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,38 @@ public override Task<long> PurgeQueue(string queueName, CancellationToken cancel
}

public override async Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(string queueName, SqlReceiveMode mode, int messageLimit,
TimeSpan lockDuration)
int concurrentLimit, TimeSpan lockDuration)
{
try
{
var sql = mode switch
if (mode == SqlReceiveMode.Normal)
{
SqlReceiveMode.Normal => _receiveSql,
_ => _receivePartitionedSql
return await _context.Query((x, t) => x.QueryAsync<SqlTransportMessage>(_receiveSql, new
{
queue_name = queueName,
fetch_consumer_id = _consumerId,
fetch_lock_id = NewId.NextGuid(),
lock_duration = lockDuration,
fetch_count = messageLimit
}), CancellationToken).ConfigureAwait(false);
}

var ordered = mode switch
{
SqlReceiveMode.PartitionedOrdered => 1,
SqlReceiveMode.PartitionedOrderedConcurrent => 1,
_ => 0
};
return await _context.Query((x, t) => x.QueryAsync<SqlTransportMessage>(sql, new

return await _context.Query((x, t) => x.QueryAsync<SqlTransportMessage>(_receivePartitionedSql, new
{
queue_name = queueName,
fetch_consumer_id = _consumerId,
fetch_lock_id = NewId.NextGuid(),
lock_duration = lockDuration,
fetch_count = messageLimit
fetch_count = messageLimit,
concurrent_count = concurrentLimit,
ordered
}), CancellationToken).ConfigureAwait(false);
}
catch (PostgresException exception) when (exception.ErrorCode == 40001)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,9 @@ queue_name text
, fetch_consumer_id uuid
, fetch_lock_id uuid
, lock_duration interval
, fetch_count integer DEFAULT 1)
, fetch_count integer DEFAULT 1
, concurrent_count integer DEFAULT 1
, ordered integer DEFAULT 0)
RETURNS TABLE(
transport_message_id uuid
, queue_id bigint
Expand Down Expand Up @@ -480,16 +482,21 @@ SELECT md.*
WHERE md.message_delivery_id IN (
WITH ready AS (
SELECT mdx.message_delivery_id, mdx.enqueue_time, mdx.lock_id, mdx.priority,
row_number() over pw as row_number
row_number() over ( partition by mdx.partition_key
order by mdx.priority, mdx.enqueue_time, mdx.message_delivery_id ) as row_normal,
row_number() over ( partition by mdx.partition_key
order by mdx.priority, mdx.message_delivery_id,mdx.enqueue_time ) as row_ordered,
first_value(CASE WHEN mdx.enqueue_time > v_now THEN mdx.consumer_id END) over (partition by mdx.partition_key
order by mdx.enqueue_time DESC, mdx.message_delivery_id DESC) as consumer_id
FROM "{0}".message_delivery mdx
WHERE mdx.queue_id = v_queue_id
AND mdx.delivery_count < mdx.max_delivery_count
WINDOW pw as (partition by mdx.partition_key order by mdx.priority, mdx.enqueue_time, mdx.message_delivery_id)
)
SELECT ready.message_delivery_id
FROM ready
WHERE row_number = 1 AND ready.lock_id IS NULL
AND ready.enqueue_time < v_now
WHERE ( ( ordered = 0 AND ready.row_normal <= concurrent_count) OR ( ordered = 1 AND ready.row_ordered <= concurrent_count ) )
AND ready.enqueue_time <= v_now
AND (ready.consumer_id IS NULL OR ready.consumer_id = fetch_consumer_id)
ORDER BY ready.priority, ready.enqueue_time, ready.message_delivery_id
LIMIT fetch_count FOR UPDATE SKIP LOCKED)
FOR UPDATE OF md SKIP LOCKED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static class SqlStatements
public const string DbReceiveSql = """SELECT * FROM "{0}".fetch_messages(@queue_name,@fetch_consumer_id,@fetch_lock_id,@lock_duration,@fetch_count)""";

public const string DbReceivePartitionedSql =
"""SELECT * FROM "{0}".fetch_messages_partitioned(@queue_name,@fetch_consumer_id,@fetch_lock_id,@lock_duration,@fetch_count)""";
"""SELECT * FROM "{0}".fetch_messages_partitioned(@queue_name,@fetch_consumer_id,@fetch_lock_id,@lock_duration,@fetch_count,@concurrent_count,@ordered)""";

public const string DbMoveMessageSql = """SELECT * FROM "{0}".move_message(@message_delivery_id,@lock_id,@queue_name,@queue_type,@headers)""";
public const string DbDeleteMessageSql = """SELECT * FROM "{0}".delete_message(@message_delivery_id,@lock_id)""";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,38 @@ public override async Task<long> PurgeQueue(string queueName, CancellationToken
}

public override async Task<IEnumerable<SqlTransportMessage>> ReceiveMessages(string queueName, SqlReceiveMode mode, int messageLimit,
TimeSpan lockDuration)
int concurrentLimit, TimeSpan lockDuration)
{
try
{
var sql = mode switch
if (mode == SqlReceiveMode.Normal)
{
SqlReceiveMode.Normal => _receiveSql,
_ => _receivePartitionedSql
return await Query<SqlTransportMessage>(_receiveSql, new
{
queueName,
consumerId = _consumerId,
lockId = NewId.NextGuid(),
lockDuration = (int)lockDuration.TotalSeconds,
fetchCount = messageLimit
}).ConfigureAwait(false);
}

var ordered = mode switch
{
SqlReceiveMode.PartitionedOrdered => 1,
SqlReceiveMode.PartitionedOrderedConcurrent => 1,
_ => 0
};

return await Query<SqlTransportMessage>(sql, new
return await Query<SqlTransportMessage>(_receivePartitionedSql, new
{
queueName,
consumerId = _consumerId,
lockId = NewId.NextGuid(),
lockDuration = (int)lockDuration.TotalSeconds,
fetchCount = messageLimit
fetchCount = messageLimit,
concurrentCount = concurrentLimit,
ordered
}).ConfigureAwait(false);
}
catch (SqlException exception) when (exception.Number == 1205)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,9 @@ @queueName varchar(256),
@consumerId uniqueidentifier,
@lockId uniqueidentifier,
@lockDuration int,
@fetchCount int = 1
@fetchCount int = 1,
@concurrentCount int = 1,
@ordered int = 0
AS
BEGIN
SET NOCOUNT ON;
Expand Down Expand Up @@ -879,15 +881,18 @@ WITH ready AS (SELECT mdx.MessageDeliveryId,
mdx.EnqueueTime,
mdx.LockId,
mdx.Priority,
row_number() over (partition by mdx.PartitionKey order by mdx.Priority, mdx.EnqueueTime, mdx.MessageDeliveryId) as row_number
row_number() over (partition by mdx.PartitionKey order by mdx.Priority, mdx.EnqueueTime, mdx.MessageDeliveryId) as row_normal,
row_number() over (partition by mdx.PartitionKey order by mdx.Priority, mdx.MessageDeliveryId, mdx.EnqueueTime) as row_ordered,
first_value(CASE WHEN mdx.EnqueueTime > @now THEN mdx.ConsumerId END) over (partition by mdx.PartitionKey
order by mdx.EnqueueTime DESC, mdx.MessageDeliveryId DESC) as ConsumerId
FROM transport.MessageDelivery mdx WITH (ROWLOCK, READPAST, UPDLOCK)
WHERE mdx.QueueId = @queueId
AND mdx.DeliveryCount < mdx.MaxDeliveryCount),
so_ready as (SELECT ready.MessageDeliveryId
FROM ready
WHERE ready.row_number = 1
AND ready.LockId IS NULL
AND ready.EnqueueTime < @now
WHERE ( ( @ordered = 0 AND ready.row_normal <= @concurrentCount) OR ( @ordered = 1 AND ready.row_ordered <= @concurrentCount ) )
AND (ready.ConsumerId IS NULL OR ready.ConsumerId = @consumerId)
AND ready.EnqueueTime <= @now
ORDER BY ready.Priority, ready.EnqueueTime, ready.MessageDeliveryId
OFFSET 0 ROWS FETCH NEXT @fetchCount ROWS ONLY),
msgs AS (SELECT md.*
Expand Down
10 changes: 6 additions & 4 deletions tests/MassTransit.SqlTransport.Tests/PartitionKey_Specs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace MassTransit.DbTransport.Tests;
using Internals;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using SqlTransport;
using Testing;


Expand All @@ -31,8 +30,9 @@ public async Task Should_consume_a_lot_of_published_messages()
{
cfg.ReceiveEndpoint("partitioned-input-queue", e =>
{
e.PrefetchCount = MessageLimit;
e.ConcurrentMessageLimit = MessageLimit;
e.PrefetchCount = 10;
e.ConcurrentMessageLimit = 10;
e.PurgeOnStartup = true;

e.SetReceiveMode(SqlReceiveMode.Partitioned);

Expand Down Expand Up @@ -64,7 +64,7 @@ public async Task Should_consume_a_lot_of_published_messages()
}
}

const int MessageLimit = 10;
const int MessageLimit = 30;
const int NumKeys = 2;

readonly T _configuration;
Expand Down Expand Up @@ -93,6 +93,8 @@ public async Task Consume(ConsumeContext<PartitionedTestMessage> context)
{
if (Interlocked.Decrement(ref _index) <= 0)
_taskCompletionSource.TrySetResult(context);

await Task.Delay(4);
}
}
}

0 comments on commit ae18abb

Please sign in to comment.