Skip to content

Commit

Permalink
Merge pull request #319 from Particular/queue-address-1.0
Browse files Browse the repository at this point in the history
Provide correct queue address to MessageContext
  • Loading branch information
SzymonPobiega authored Oct 13, 2023
2 parents 18721cf + 16c9c11 commit 67e75f1
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
namespace NServiceBus.AcceptanceTests;

using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using System.Text.Json;
using System.Threading.Tasks;
using System;
using System.Linq;

class When_message_fails : AwsLambdaSQSEndpointTestBase
{
[Test]
public async Task Should_move_message_to_error_queue()
{
// the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name
var endpointName = QueueName;

var receivedMessages = await GenerateAndReceiveSQSEvent<TestMessage>();

var context = new TestContext();

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient());

var advanced = configuration.AdvancedConfiguration;
advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context));
advanced.Recoverability().Immediate(i => i.NumberOfRetries(0));

advanced.SendFailedMessagesTo(ErrorQueueName);

return configuration;
});

Assert.DoesNotThrowAsync(() => endpoint.Process(receivedMessages, null), "message should be moved to the error queue instead");

var errorMessages = await RetrieveMessagesInErrorQueue();
Assert.AreEqual(1, errorMessages.Records.Count);
JsonDocument errorMessage = JsonSerializer.Deserialize<JsonDocument>(errorMessages.Records.First().Body);
var errorMessageHeader = errorMessage.RootElement.GetProperty("Headers");
Assert.AreEqual("simulated exception", errorMessageHeader.GetProperty("NServiceBus.ExceptionInfo.Message").GetString());
Assert.AreEqual(endpointName, errorMessageHeader.GetProperty("NServiceBus.ProcessingEndpoint").GetString());
Assert.AreEqual(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString());
}

[Test]
public async Task Should_rethrow_when_disabling_error_queue()
{
// the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name
var endpointName = QueueName;

var receivedMessages = await GenerateAndReceiveSQSEvent<TestMessage>();

var context = new TestContext();

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient());

configuration.DoNotSendMessagesToErrorQueue();

var advanced = configuration.AdvancedConfiguration;
advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context));
advanced.Recoverability().Immediate(i => i.NumberOfRetries(0));
advanced.SendFailedMessagesTo(ErrorQueueName);

return configuration;
});

var exception = Assert.ThrowsAsync<Exception>(() => endpoint.Process(receivedMessages, null));

StringAssert.Contains("Failed to process message", exception.Message);
Assert.AreEqual("simulated exception", exception.InnerException.Message);
Assert.AreEqual(0, await CountMessagesInErrorQueue());
}

public class TestContext
{
}

public class TestMessage : ICommand
{
}

public class FailingMessageHandler : IHandleMessages<TestMessage>
{
public Task Handle(TestMessage message, IMessageHandlerContext context) => throw new Exception("simulated exception");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public async Task Should_move_message_to_prefixed_error_queue()
var errorMessageHeader = errorMessage.RootElement.GetProperty("Headers");
Assert.AreEqual("simulated exception", errorMessageHeader.GetProperty("NServiceBus.ExceptionInfo.Message").GetString());
Assert.AreEqual(endpointName, errorMessageHeader.GetProperty("NServiceBus.ProcessingEndpoint").GetString());
StringAssert.EndsWith(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString());
Assert.AreEqual(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString());
}

public class TestContext
Expand Down
20 changes: 11 additions & 9 deletions src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ async Task<ServerlessTransportInfrastructure> Initialize(ILambdaContext executio

var transportInfrastructure = serverlessTransport.GetTransportInfrastructure(endpoint);

queueUrl = await GetQueueUrl(transportInfrastructure.PipelineInvoker.ReceiveAddress).ConfigureAwait(false);
receiveQueueAddress = transportInfrastructure.PipelineInvoker.ReceiveAddress;
receiveQueueUrl = await GetQueueUrl(receiveQueueAddress).ConfigureAwait(false);
errorQueueUrl = await GetQueueUrl(transportInfrastructure.ErrorQueueAddress).ConfigureAwait(false);

return transportInfrastructure;
Expand Down Expand Up @@ -382,7 +383,7 @@ async Task ProcessMessageWithInMemoryRetries(Dictionary<string, string> headers,
new Dictionary<string, string>(headers),
body,
transportTransaction,
queueUrl,
receiveQueueAddress,
context);

await Process(messageContext, lambdaContext, token).ConfigureAwait(false);
Expand All @@ -404,7 +405,7 @@ async Task ProcessMessageWithInMemoryRetries(Dictionary<string, string> headers,
body,
transportTransaction,
immediateProcessingAttempts,
queueUrl,
receiveQueueAddress,
context);

errorHandlerResult = await ProcessFailedMessage(errorContext, lambdaContext).ConfigureAwait(false);
Expand Down Expand Up @@ -442,7 +443,7 @@ async Task DeleteMessageAndBodyIfRequired(Message message, string messageS3BodyK
try
{
// should not be cancelled
await sqsClient.DeleteMessageAsync(queueUrl, message.ReceiptHandle, CancellationToken.None)
await sqsClient.DeleteMessageAsync(receiveQueueUrl, message.ReceiptHandle, CancellationToken.None)
.ConfigureAwait(false);
}
catch (ReceiptHandleIsInvalidException ex)
Expand Down Expand Up @@ -483,15 +484,15 @@ await sqsClient.SendMessageAsync(new SendMessageRequest
{
await sqsClient.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest
{
QueueUrl = queueUrl,
QueueUrl = receiveQueueUrl,
ReceiptHandle = message.ReceiptHandle,
VisibilityTimeout = 0
}, CancellationToken.None)
.ConfigureAwait(false);
}
catch (Exception changeMessageVisibilityEx)
{
Logger.Warn($"Error returning poison message back to input queue at url {queueUrl}. Poison message will become available at the input queue again after the visibility timeout expires.", changeMessageVisibilityEx);
Logger.Warn($"Error returning poison message back to input queue at url {receiveQueueUrl}. Poison message will become available at the input queue again after the visibility timeout expires.", changeMessageVisibilityEx);
}

throw;
Expand All @@ -501,14 +502,14 @@ await sqsClient.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest
{
await sqsClient.DeleteMessageAsync(new DeleteMessageRequest
{
QueueUrl = queueUrl,
QueueUrl = receiveQueueUrl,
ReceiptHandle = message.ReceiptHandle
}, CancellationToken.None)
.ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.Warn($"Error removing poison message from input queue {queueUrl}. This may cause duplicate poison messages in the error queue for this endpoint.", ex);
Logger.Warn($"Error removing poison message from input queue {receiveQueueAddress}. This may cause duplicate poison messages in the error queue for this endpoint.", ex);
}

// If there is a message body in S3, simply leave it there
Expand Down Expand Up @@ -538,7 +539,8 @@ static void LogPoisonMessage(string messageId, Exception exception)
IEndpointInstance endpoint;
IAmazonSQS sqsClient;
S3Settings s3Settings;
string queueUrl;
string receiveQueueAddress;
string receiveQueueUrl;
string errorQueueUrl;

static readonly ILog Logger = LogManager.GetLogger(typeof(AwsLambdaSQSEndpoint));
Expand Down

0 comments on commit 67e75f1

Please sign in to comment.