From cdf0539710ec4723ca9084761997ddf3a5033ba4 Mon Sep 17 00:00:00 2001 From: querwurzel <> Date: Tue, 7 Nov 2023 18:15:03 +0100 Subject: [PATCH] clean ActiveMQ shutdown --- .../business/tracking/MessagingClient.java | 74 ++++++++++++------- .../business/tracking/MessagingConfig.java | 35 +++++---- 2 files changed, 67 insertions(+), 42 deletions(-) diff --git a/backend/src/main/java/com/github/binpastes/paste/business/tracking/MessagingClient.java b/backend/src/main/java/com/github/binpastes/paste/business/tracking/MessagingClient.java index b41cfdb..2b056e7 100644 --- a/backend/src/main/java/com/github/binpastes/paste/business/tracking/MessagingClient.java +++ b/backend/src/main/java/com/github/binpastes/paste/business/tracking/MessagingClient.java @@ -1,16 +1,18 @@ package com.github.binpastes.paste.business.tracking; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.client.*; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import java.time.Instant; -import java.util.concurrent.Executor; public class MessagingClient { @@ -23,22 +25,21 @@ public class MessagingClient { private final ThreadLocal clientProducers = new ThreadLocal<>(); private final ThreadLocal clientConsumers = new ThreadLocal<>(); - public MessagingClient(final ClientSessionFactory clientSessionFactory, Executor consumerThreadPool, Executor producerThreadPool) { + public MessagingClient( + final ClientSessionFactory clientSessionFactory, + final Scheduler consumerThreadPool, + final Scheduler producerThreadPool + ) { this.sessionFactory = clientSessionFactory; - this.consumerThreadPool = Schedulers.fromExecutor(consumerThreadPool); - this.producerThreadPool = Schedulers.fromExecutor(producerThreadPool); + this.consumerThreadPool = consumerThreadPool; + this.producerThreadPool = producerThreadPool; } public void sendMessage(String pasteId, Instant timeViewed) { Mono.fromRunnable(() -> { try { var session = session(); - var clientProducer = this.clientProducers.get(); - - if (clientProducer == null) { - clientProducer = session.createProducer(new SimpleString("binpastes")); - this.clientProducers.set(clientProducer); - } + var clientProducer = producer(); var clientMessage = session .createMessage(true) @@ -57,28 +58,47 @@ public void sendMessage(String pasteId, Instant timeViewed) { public Mono receiveMessage() { return Mono.fromCallable(() -> { - try { - var session = session(); - var clientConsumer = this.clientConsumers.get(); + var clientConsumer = consumer(); - if (clientConsumer == null) { - clientConsumer = session.createConsumer(new SimpleString("pasteTrackingQueue")); - this.clientConsumers.set(clientConsumer); - } + var message = clientConsumer.receive(); + if (message == null) { + return null; + } - ClientMessage message = clientConsumer.receive(); - var pasteId = message.getStringProperty(Message.PASTE_ID_PROPERTY); - var timeViewed = Instant.ofEpochMilli(message.getLongProperty(Message.TIME_VIEWED_PROPERTY)); - log.debug("Received tracking message for paste {}", pasteId); + var pasteId = message.getStringProperty(Message.PASTE_ID_PROPERTY); + var timeViewed = Instant.ofEpochMilli(message.getLongProperty(Message.TIME_VIEWED_PROPERTY)); + log.debug("Received tracking message for paste {}", pasteId); - return new Message(pasteId, timeViewed); - } catch (ActiveMQException e) { - throw new RuntimeException(e); - } + return new Message(pasteId, timeViewed); }) + .onErrorComplete(ActiveMQObjectClosedException.class) .subscribeOn(consumerThreadPool); } + private ClientProducer producer() throws ActiveMQException { + var clientProducer = this.clientProducers.get(); + + if (clientProducer == null) { + var address = new SimpleString("binpastes"); + clientProducer = session().createProducer(address); + this.clientProducers.set(clientProducer); + } + + return clientProducer; + } + + private ClientConsumer consumer() throws ActiveMQException { + var clientConsumer = this.clientConsumers.get(); + + if (clientConsumer == null) { + var queue = new SimpleString("pasteTrackingQueue"); + clientConsumer = session().createConsumer(queue); + this.clientConsumers.set(clientConsumer); + } + + return clientConsumer; + } + private ClientSession session() throws ActiveMQException { var session = sessions.get(); diff --git a/backend/src/main/java/com/github/binpastes/paste/business/tracking/MessagingConfig.java b/backend/src/main/java/com/github/binpastes/paste/business/tracking/MessagingConfig.java index b9b231d..67bc990 100644 --- a/backend/src/main/java/com/github/binpastes/paste/business/tracking/MessagingConfig.java +++ b/backend/src/main/java/com/github/binpastes/paste/business/tracking/MessagingConfig.java @@ -14,6 +14,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -25,16 +27,15 @@ class MessagingConfig { @DependsOn("flywayInitializer") public MessagingClient messagingClient( ClientSessionFactory clientSessionFactory, - Executor consumerThreadPool, - Executor producerThreadPool + Scheduler consumerThreadPool, + Scheduler producerThreadPool ) { return new MessagingClient(clientSessionFactory, consumerThreadPool, producerThreadPool); } @Bean(destroyMethod = "stop") public EmbeddedActiveMQ activeMqServer() throws Exception { - - org.apache.activemq.artemis.core.config.Configuration config = new ConfigurationImpl(); + org.apache.activemq.artemis.core.config.Configuration config = new ConfigurationImpl(); CoreAddressConfiguration addr = new CoreAddressConfiguration(); addr @@ -42,7 +43,8 @@ public EmbeddedActiveMQ activeMqServer() throws Exception { .addQueueConfig(new QueueConfiguration() .setName(new SimpleString("pasteTrackingQueue")) .setAddress(new SimpleString("binpastes")) - .setDelayBeforeDispatch(3000L) + .setMaxConsumers(1) + .setDelayBeforeDispatch(SECONDS.toMillis(3)) .setDurable(true)) .addRoutingType(RoutingType.ANYCAST); @@ -54,12 +56,13 @@ public EmbeddedActiveMQ activeMqServer() throws Exception { config.addAddressConfiguration(addr); config.addAddressSetting("binpastes", addressSettings); - config.setMessageExpiryScanPeriod(30000); + config.setMessageExpiryScanPeriod(SECONDS.toMillis(3)); config.setName("binpastesMQ"); config.addAcceptorConfiguration("in-vm", "vm://0"); - + config.setGracefulShutdownEnabled(true); + config.setGracefulShutdownTimeout(SECONDS.toMillis(1)); config.setJournalPoolFiles(3); config.setSecurityEnabled(false); config.setJMXManagementEnabled(true); @@ -103,24 +106,26 @@ ConnectionFactory activeMqConnectionFactory() throws Exception { return activeMQConnectionFactory; }*/ - @Bean(destroyMethod = "shutdown") - public ThreadPoolTaskExecutor consumerThreadPool() { + @Bean(initMethod = "init", destroyMethod = "dispose") + public Scheduler consumerThreadPool() { ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); pool.setCorePoolSize(1); pool.setMaxPoolSize(1); pool.setAllowCoreThreadTimeOut(true); - pool.setThreadNamePrefix("artemis-consumer-"); - return pool; + pool.setThreadNamePrefix("activemq-consumer-"); + pool.initialize(); + return Schedulers.fromExecutor(pool); } - @Bean(destroyMethod = "shutdown") - public ThreadPoolTaskExecutor producerThreadPool() { + @Bean(initMethod = "init", destroyMethod = "dispose") + public Scheduler producerThreadPool() { ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); pool.setCorePoolSize(Runtime.getRuntime().availableProcessors()); pool.setMaxPoolSize(Runtime.getRuntime().availableProcessors()); pool.setAllowCoreThreadTimeOut(true); - pool.setThreadNamePrefix("artemis-producer-"); - return pool; + pool.setThreadNamePrefix("activemq-producer-"); + pool.initialize(); + return Schedulers.fromExecutor(pool); } @Bean(destroyMethod = "close")