diff --git a/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageReplyToWhiteboardProvider.java b/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageReplyToWhiteboardProvider.java index 110315c..85fec2e 100644 --- a/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageReplyToWhiteboardProvider.java +++ b/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageReplyToWhiteboardProvider.java @@ -37,6 +37,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.osgi.framework.BundleContext; @@ -63,12 +68,15 @@ import in.bytehue.messaging.mqtt5.provider.helper.FilterParser; import in.bytehue.messaging.mqtt5.provider.helper.FilterParser.Expression; import in.bytehue.messaging.mqtt5.provider.helper.SubscriptionAck; +import in.bytehue.messaging.mqtt5.provider.helper.ThreadFactoryBuilder; @Component(configurationPid = PID) @MessagingFeature(name = MESSAGING_ID, protocol = MESSAGING_PROTOCOL) public final class MessageReplyToWhiteboardProvider { public static final String PID = "in.bytehue.messaging.whiteboard"; + public static final String THREAD_NAME_PREFIX = "reply-to-handler"; + public static final String THREAD_NAME_SUFFIX = "-%d"; @interface Config { boolean storeReplyToChannelInfoIfReceivedInMessage() default true; @@ -93,6 +101,7 @@ public final class MessageReplyToWhiteboardProvider { private ComponentServiceObjects mcbFactory; private Config config; + private ExecutorService executorService; private final List subscriptions = new CopyOnWriteArrayList<>(); private ServiceTracker tracker1; @@ -102,6 +111,21 @@ public final class MessageReplyToWhiteboardProvider { @Activate void activate(final Config config, final BundleContext context) { this.config = config; + final ThreadFactory threadFactory = + new ThreadFactoryBuilder() + .setThreadFactoryName(THREAD_NAME_PREFIX) + .setThreadNameFormat(THREAD_NAME_SUFFIX) + .setDaemon(true) + .build(); + executorService = new ThreadPoolExecutor( + 0, // core pool size (0 for cached behavior) + 3, // maximum pool size + 60L, // thread idle time before termination + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + threadFactory + ); + subscriptions.stream().filter(sub -> !sub.isProcessed()).forEach(sub -> { switch (sub.type) { case REPLY_TO_SUB: @@ -125,7 +149,7 @@ public synchronized ReplyToSingleSubscriptionHandler addingService( final ReplyToSubDTO sub = new ReplyToSubDTO(handler, REPLY_TO_SINGLE_SUB, reference); subscriptions.add(sub); - new Thread(() -> processReplyToSingleSubscriptionHandler(sub)).start(); + executorService.submit(() -> processReplyToSingleSubscriptionHandler(sub)); return handler; } @@ -152,7 +176,7 @@ public synchronized ReplyToSubscriptionHandler addingService( final ReplyToSubDTO sub = new ReplyToSubDTO(handler, REPLY_TO_SUB, reference); subscriptions.add(sub); - new Thread(() -> processReplyToSubscriptionHandler(sub)).start(); + executorService.submit(() -> processReplyToSubscriptionHandler(sub)); return handler; } @@ -179,7 +203,7 @@ public synchronized ReplyToManySubscriptionHandler addingService( final ReplyToSubDTO sub = new ReplyToSubDTO(handler, REPLY_TO_MANY_SUB, reference); subscriptions.add(sub); - new Thread(() -> processReplyToManySubscriptionHandler(sub)).start(); + executorService.submit(() -> processReplyToManySubscriptionHandler(sub)); return handler; } @@ -210,6 +234,8 @@ void deactivate() { tracker1.close(); tracker2.close(); tracker3.close(); + + executorService.shutdownNow(); } @Modified