Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can not receive sqs message after the application has been running for a few hours/days #551

Closed
ramyogi7283 opened this issue Oct 27, 2022 · 5 comments

Comments

@ramyogi7283
Copy link

Type: Can not receive sqs message after the application has been running for a few days

Component:
SQS

Describe the bug
After the application runs for a few days, listener can not receive the message and I saw the count of messages in the queue growing in the Amazon admin. The application runs normally only sqs listener not receive message. After I restart application then it worked well again.

io.awspring.cloud spring-cloud-aws-dependencies 2.3.1 pom import

Sample
@SqsListener(value = "#{sqsQueueNameReader.listQueueNames()}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void consumeSQSMessage(String payload, @headers Map<String, String> headers) {
Message message = MessageBuilder.withPayload(payload).copyHeaders(headers).build();
consumeMessage(message);
}

Reference :
spring-attic/spring-cloud-aws#742

@tomazfernandes
Copy link
Contributor

@ramyogi7283, this is a known issue with the 2.x version. The 3.0.0 version is a complete rewrite and should not have this problem. I don't think we're releasing a 2.x version with this fix.

Feel free to try 3.0.0-M3 out and provide feedback! A GA version should be released this year or early next.

Thanks.

@ramyogi7283
Copy link
Author

@tomazfernandes I am exploring 3.0.0-RC1 as you mentioned. SQSListener does not accept list of name through SpEL (#678) so I submitted issues already , I have few clarification.

I would like to create individual container for each queue name so that I can stop and start container based upon queue name.
I created Spring bean as below but this give one container and I cannot AutoWired the Registry also.

@bean
MessageListenerContainer listenerContainer(SqsAsyncClient sqsAsyncClient) {
return SqsMessageListenerContainerFactory
.builder()
.sqsAsyncClient(sqsAsyncClient)
.configure(options -> options
.acknowledgementMode(AcknowledgementMode.ALWAYS))
.messageListener((payload) -> {
sqsSourceConsumer.consumeMessage(payload);
})
.build()
.createContainer("queueName1","queueName2","queueName3");
}

So when I create a container like above this DefaultListenerContainerRegistry cannot be used ?
How to create a container for each queue name and register with DefaultListenerContainerRegistry and the Autowire that registry for me to stop and start container for each queue name .

I am really impressed with latest AWSSPRING but documentation not enough to understand better.
Please help me understand better.

@tomazfernandes
Copy link
Contributor

Hi @ramyogi7283! Thanks a lot for trying it out and providing feedback, this helps us a lot.

As mentioned in this section of the documentation, the MessageListenerContainerRegistry automatically registers containers created with the @SqsListener annotation.

However you can also manually register the containers in the provided MessageListenerContainerRegistry bean, or even create new Registry instances e.g. if you'd like to manage the lifecycle of groups of containers.

public interface MessageListenerContainerRegistry extends SmartLifecycle {

	/**
	 * Register a {@link MessageListenerContainer} instance with this registry.
	 * @param listenerContainer the instance.
	 */
	void registerListenerContainer(MessageListenerContainer<?> listenerContainer);

	/**
	 * Return the {@link MessageListenerContainer} instances registered within this registry.
	 * @return the container instances.
	 */
	Collection<MessageListenerContainer<?>> getListenerContainers();

	/**
	 * Return the {@link MessageListenerContainer} instance registered within this registry with the provided id, or
	 * null if none.
	 * @param id the id.
	 * @return the container instance.
	 */
	@Nullable
	MessageListenerContainer<?> getContainerById(String id);

}

Just mind that if you use the default registry, it'll apply its bean lifecycle hooks on all registered containers e.g. stop on application shutdown, since it's registered as a bean.

Does this answer your question? If so maybe we can add this clarification to the docs.

Thanks!

@ramyogi7283
Copy link
Author

@tomazfernandes Please provide example how to register container if manually created containers. and access MessageListenerContainerRegistry.

@tomazfernandes
Copy link
Contributor

@ramyogi7283, you can autowire the default MessageListenerContainerRegistry and register a container by using the registerListenerContainer method. Does that not work for you?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants