Skip to content

Commit

Permalink
clean ActiveMQ shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
querwurzel committed Nov 7, 2023
1 parent 08b3542 commit cdf0539
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package com.github.binpastes.paste.business.tracking;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.time.Instant;
import java.util.concurrent.Executor;

public class MessagingClient {

Expand All @@ -23,22 +25,21 @@ public class MessagingClient {
private final ThreadLocal<ClientProducer> clientProducers = new ThreadLocal<>();
private final ThreadLocal<ClientConsumer> clientConsumers = new ThreadLocal<>();

public MessagingClient(final ClientSessionFactory clientSessionFactory, Executor consumerThreadPool, Executor producerThreadPool) {
public MessagingClient(
final ClientSessionFactory clientSessionFactory,
final Scheduler consumerThreadPool,
final Scheduler producerThreadPool
) {
this.sessionFactory = clientSessionFactory;
this.consumerThreadPool = Schedulers.fromExecutor(consumerThreadPool);
this.producerThreadPool = Schedulers.fromExecutor(producerThreadPool);
this.consumerThreadPool = consumerThreadPool;
this.producerThreadPool = producerThreadPool;
}

public void sendMessage(String pasteId, Instant timeViewed) {
Mono.fromRunnable(() -> {
try {
var session = session();
var clientProducer = this.clientProducers.get();

if (clientProducer == null) {
clientProducer = session.createProducer(new SimpleString("binpastes"));
this.clientProducers.set(clientProducer);
}
var clientProducer = producer();

var clientMessage = session
.createMessage(true)
Expand All @@ -57,28 +58,47 @@ public void sendMessage(String pasteId, Instant timeViewed) {

public Mono<Message> receiveMessage() {
return Mono.fromCallable(() -> {
try {
var session = session();
var clientConsumer = this.clientConsumers.get();
var clientConsumer = consumer();

if (clientConsumer == null) {
clientConsumer = session.createConsumer(new SimpleString("pasteTrackingQueue"));
this.clientConsumers.set(clientConsumer);
}
var message = clientConsumer.receive();
if (message == null) {
return null;
}

ClientMessage message = clientConsumer.receive();
var pasteId = message.getStringProperty(Message.PASTE_ID_PROPERTY);
var timeViewed = Instant.ofEpochMilli(message.getLongProperty(Message.TIME_VIEWED_PROPERTY));
log.debug("Received tracking message for paste {}", pasteId);
var pasteId = message.getStringProperty(Message.PASTE_ID_PROPERTY);
var timeViewed = Instant.ofEpochMilli(message.getLongProperty(Message.TIME_VIEWED_PROPERTY));
log.debug("Received tracking message for paste {}", pasteId);

return new Message(pasteId, timeViewed);
} catch (ActiveMQException e) {
throw new RuntimeException(e);
}
return new Message(pasteId, timeViewed);
})
.onErrorComplete(ActiveMQObjectClosedException.class)
.subscribeOn(consumerThreadPool);
}

private ClientProducer producer() throws ActiveMQException {
var clientProducer = this.clientProducers.get();

if (clientProducer == null) {
var address = new SimpleString("binpastes");
clientProducer = session().createProducer(address);
this.clientProducers.set(clientProducer);
}

return clientProducer;
}

private ClientConsumer consumer() throws ActiveMQException {
var clientConsumer = this.clientConsumers.get();

if (clientConsumer == null) {
var queue = new SimpleString("pasteTrackingQueue");
clientConsumer = session().createConsumer(queue);
this.clientConsumers.set(clientConsumer);
}

return clientConsumer;
}

private ClientSession session() throws ActiveMQException {
var session = sessions.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand All @@ -25,24 +27,24 @@ class MessagingConfig {
@DependsOn("flywayInitializer")
public MessagingClient messagingClient(
ClientSessionFactory clientSessionFactory,
Executor consumerThreadPool,
Executor producerThreadPool
Scheduler consumerThreadPool,
Scheduler producerThreadPool
) {
return new MessagingClient(clientSessionFactory, consumerThreadPool, producerThreadPool);
}

@Bean(destroyMethod = "stop")
public EmbeddedActiveMQ activeMqServer() throws Exception {

org.apache.activemq.artemis.core.config.Configuration config = new ConfigurationImpl();
org.apache.activemq.artemis.core.config.Configuration config = new ConfigurationImpl();

CoreAddressConfiguration addr = new CoreAddressConfiguration();
addr
.setName("binpastes")
.addQueueConfig(new QueueConfiguration()
.setName(new SimpleString("pasteTrackingQueue"))
.setAddress(new SimpleString("binpastes"))
.setDelayBeforeDispatch(3000L)
.setMaxConsumers(1)
.setDelayBeforeDispatch(SECONDS.toMillis(3))
.setDurable(true))
.addRoutingType(RoutingType.ANYCAST);

Expand All @@ -54,12 +56,13 @@ public EmbeddedActiveMQ activeMqServer() throws Exception {
config.addAddressConfiguration(addr);
config.addAddressSetting("binpastes", addressSettings);

config.setMessageExpiryScanPeriod(30000);
config.setMessageExpiryScanPeriod(SECONDS.toMillis(3));

config.setName("binpastesMQ");
config.addAcceptorConfiguration("in-vm", "vm://0");


config.setGracefulShutdownEnabled(true);
config.setGracefulShutdownTimeout(SECONDS.toMillis(1));
config.setJournalPoolFiles(3);
config.setSecurityEnabled(false);
config.setJMXManagementEnabled(true);
Expand Down Expand Up @@ -103,24 +106,26 @@ ConnectionFactory activeMqConnectionFactory() throws Exception {
return activeMQConnectionFactory;
}*/

@Bean(destroyMethod = "shutdown")
public ThreadPoolTaskExecutor consumerThreadPool() {
@Bean(initMethod = "init", destroyMethod = "dispose")
public Scheduler consumerThreadPool() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(1);
pool.setMaxPoolSize(1);
pool.setAllowCoreThreadTimeOut(true);
pool.setThreadNamePrefix("artemis-consumer-");
return pool;
pool.setThreadNamePrefix("activemq-consumer-");
pool.initialize();
return Schedulers.fromExecutor(pool);
}

@Bean(destroyMethod = "shutdown")
public ThreadPoolTaskExecutor producerThreadPool() {
@Bean(initMethod = "init", destroyMethod = "dispose")
public Scheduler producerThreadPool() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(Runtime.getRuntime().availableProcessors());
pool.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
pool.setAllowCoreThreadTimeOut(true);
pool.setThreadNamePrefix("artemis-producer-");
return pool;
pool.setThreadNamePrefix("activemq-producer-");
pool.initialize();
return Schedulers.fromExecutor(pool);
}

@Bean(destroyMethod = "close")
Expand Down

0 comments on commit cdf0539

Please sign in to comment.