From 077c69e709f77ce6f89d155407b13e6ea3310d1e Mon Sep 17 00:00:00 2001 From: "max.wang" Date: Mon, 27 Nov 2023 14:52:02 +0800 Subject: [PATCH] add support to publish message to specific clients --- .../java/io/moquette/broker/PostOffice.java | 7 +++++-- .../main/java/io/moquette/broker/Server.java | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index 86f30ea83..175534f21 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -187,7 +187,7 @@ public RouteResult ifFailed(Runnable action) { private static final Logger LOG = LoggerFactory.getLogger(PostOffice.class); - private static final Set NO_FILTER = new HashSet<>(); + private static final Set NO_FILTER = Collections.unmodifiableSet(new HashSet<>()); private final Authorizator authorizator; private final ISubscriptionsDirectory subscriptions; @@ -701,12 +701,15 @@ static MqttQoS lowerQosToTheSubscriptionDesired(Subscription sub, MqttQoS qos) { * the result of the enqueuing operation to session loops. */ public RoutingResults internalPublish(MqttPublishMessage msg) { + return internalPublish(msg,NO_FILTER); + } + public RoutingResults internalPublish(MqttPublishMessage msg,Set clientIds) { final MqttQoS qos = msg.fixedHeader().qosLevel(); final Topic topic = new Topic(msg.variableHeader().topicName()); final ByteBuf payload = msg.payload(); LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qos); - final RoutingResults publishResult = publish2Subscribers(payload, topic, qos); + final RoutingResults publishResult = publish2Subscribers(payload, topic, qos,clientIds); LOG.trace("after routed publishes: {}", publishResult); if (!msg.fixedHeader().isRetain()) { diff --git a/broker/src/main/java/io/moquette/broker/Server.java b/broker/src/main/java/io/moquette/broker/Server.java index 82d1a3b77..c3d18b84e 100644 --- a/broker/src/main/java/io/moquette/broker/Server.java +++ b/broker/src/main/java/io/moquette/broker/Server.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.HashSet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -567,6 +568,24 @@ public RoutingResults internalPublish(MqttPublishMessage msg, final String clien msg.payload().release(); return routingResults; } + public RoutingResults publishToClient(MqttPublishMessage msg, final String clientId) { + return publishToClients(msg,Collections.singletonList(clientId)); + } + public RoutingResults publishToClients(MqttPublishMessage msg, final Collection clientIds) { + final int messageID = msg.variableHeader().packetId(); + if (!initialized) { + LOG.error("Moquette is not started, message cannot be published. CId: [{}], messageId: {}", + String.join(",",clientIds), + messageID); + throw new IllegalStateException("Can't publish on a integration is not yet started"); + } + LOG.trace("publishing to message CId: [{}], messageId: {}", String.join(",",clientIds), messageID); + final RoutingResults routingResults = clientIds.isEmpty()? + dispatcher.internalPublish(msg): + dispatcher.internalPublish(msg,new HashSet<>(clientIds)); + msg.payload().release(); + return routingResults; + } public void stopServer() { LOG.info("Unbinding integration from the configured ports");