Skip to content

Commit

Permalink
Extracted to constants
Browse files Browse the repository at this point in the history
  • Loading branch information
amitjoy committed Nov 14, 2024
1 parent 8233162 commit 638feea
Showing 1 changed file with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static in.bytehue.messaging.mqtt5.provider.MessageReplyToWhiteboardProvider.ReplyToSubDTO.Type.REPLY_TO_SUB;
import static in.bytehue.messaging.mqtt5.provider.helper.MessageHelper.adaptTo;
import static in.bytehue.messaging.mqtt5.provider.helper.MessageHelper.prepareExceptionAsMessage;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.osgi.service.messaging.Features.REPLY_TO;
import static org.osgi.service.messaging.MessageConstants.MESSAGING_FEATURE_PROPERTY;
import static org.osgi.service.messaging.MessageConstants.MESSAGING_NAME_PROPERTY;
Expand All @@ -41,7 +42,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.osgi.framework.BundleContext;
Expand Down Expand Up @@ -79,6 +79,9 @@ public final class MessageReplyToWhiteboardProvider {
private static final String THREAD_NAME_PREFIX = "reply-to-handler";
private static final String THREAD_NAME_SUFFIX = "-%d";

private static final int CORE_POOL_SIZE = 0; // 0 for cached behavior
private static final int MAX_POOL_SIZE = 3;
private static final long IDLE_TIME = 60L;

@interface Config {
boolean storeReplyToChannelInfoIfReceivedInMessage() default true;
Expand Down Expand Up @@ -113,21 +116,22 @@ public final class MessageReplyToWhiteboardProvider {
@Activate
void activate(final Config config, final BundleContext context) {
this.config = config;
// @formatter:off
final ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setThreadFactoryName(THREAD_NAME_PREFIX)
.setThreadNameFormat(THREAD_NAME_SUFFIX)
.setDaemon(true)
.build();
executorService = new ThreadPoolExecutor(
0, // core pool size (0 for cached behavior)
3, // maximum pool size
60L, // thread idle time before termination
TimeUnit.SECONDS,
CORE_POOL_SIZE,
MAX_POOL_SIZE,
IDLE_TIME,
SECONDS,
new LinkedBlockingQueue<>(),
threadFactory
);

// @formatter:on
subscriptions.stream().filter(sub -> !sub.isProcessed()).forEach(sub -> {
switch (sub.type) {
case REPLY_TO_SUB:
Expand Down Expand Up @@ -236,7 +240,7 @@ void deactivate() {
tracker1.close();
tracker2.close();
tracker3.close();

executorService.shutdownNow();
}

Expand Down

0 comments on commit 638feea

Please sign in to comment.