Skip to content

Commit

Permalink
Implemented server keep alive setting and usage in connect
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Dec 18, 2024
1 parent ca13e48 commit 8fc6c5d
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 0 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Server keep alive: added configuration setting so that the broker can specify a keep alive other then the one selected by clients. (#789)
[feature] User properties: covered publish with user properties with tests and fixed publish of will messages. (#877)
[feature] Topic alias: implemented handling of topic alias received by publishers. (#873)
[feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858)
Expand Down
10 changes: 10 additions & 0 deletions broker/src/main/java/io/moquette/broker/BrokerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.moquette.broker.config.IConfig;

import java.util.Locale;
import java.util.Optional;

class BrokerConfiguration {

Expand All @@ -30,6 +31,7 @@ class BrokerConfiguration {
private final int topicAliasMaximum;
// integer max value means that the property is unset
private int receiveMaximum;
private Optional<Integer> serverKeepAlive = Optional.empty();

BrokerConfiguration(IConfig props) {
allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true);
Expand Down Expand Up @@ -70,6 +72,10 @@ class BrokerConfiguration {
receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM);

topicAliasMaximum = props.intProp(IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, BrokerConstants.DISABLED_TOPIC_ALIAS);

if (props.getProperty(IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME) != null) {
serverKeepAlive = Optional.of((int) props.durationProp(IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME).toMillis() / 1_000);
}
}

// test method
Expand Down Expand Up @@ -133,4 +139,8 @@ public int receiveMaximum() {
public int topicAliasMaximum() {
return topicAliasMaximum;
}

public Optional<Integer> getServerKeepAlive() {
return serverKeepAlive;
}
}
11 changes: 11 additions & 0 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser
connAckPropertiesBuilder.topicAliasMaximum(topicAliasMaximum);
}

if (brokerConfig.getServerKeepAlive().isPresent()) {
connAckPropertiesBuilder.serverKeepAlive(brokerConfig.getServerKeepAlive().get());
}

final MqttProperties ackProperties = connAckPropertiesBuilder.build();
connAckBuilder.properties(ackProperties);
}
Expand Down Expand Up @@ -464,6 +468,13 @@ private void setupInflightResender(Channel channel) {

private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) {
int keepAlive = msg.variableHeader().keepAliveTimeSeconds();

// force server keep alive if configured
if (brokerConfig.getServerKeepAlive().isPresent()) {
int serverKeepAlive = brokerConfig.getServerKeepAlive().get();
LOG.info("Forcing server keep alive ({}) over client selection ({})", serverKeepAlive, keepAlive);
keepAlive = serverKeepAlive;
}
NettyUtils.keepAlive(channel, keepAlive);
NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
NettyUtils.clientID(channel, clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Locale;
import java.util.Properties;
import java.util.function.Consumer;
Expand All @@ -30,6 +31,7 @@
import static io.moquette.broker.config.IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.RECEIVE_MAXIMUM;
import static io.moquette.broker.config.IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.SESSION_QUEUE_SIZE;
import static io.moquette.broker.config.IConfig.SSL_PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.SSL_PROVIDER;
Expand Down Expand Up @@ -215,6 +217,12 @@ public FluentConfig topicAliasMaximum(int topicAliasMaximum) {
return this;
}

public FluentConfig serverKeepAlive(Duration keepAliveSeconds) {
int seconds = (int) keepAliveSeconds.toMillis() / 1_000;
configAccumulator.put(SERVER_KEEP_ALIVE_PROPERTY_NAME, seconds + "s");
return this;
}

public class TLSConfig {

private SSLProvider providerType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public abstract class IConfig {
public static final String MAX_SERVER_GRANTED_QOS_PROPERTY_NAME = "max_server_granted_qos";
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092;
public static final String TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME = "topic_alias_maximum";
public static final String SERVER_KEEP_ALIVE_PROPERTY_NAME = "server_keep_alive";

public abstract void setProperty(String name, String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublishBuilder;
import io.moquette.broker.config.FluentConfig;
import io.moquette.broker.config.IConfig;
import io.moquette.testclient.Client;
import io.netty.handler.codec.mqtt.*;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -70,6 +72,35 @@ public void simpleConnect() {
client.disconnect();
}

@Test
public void givenServerKeepAliveConfiguredThenConnectAckMustRespectIt() throws IOException {
stopServer();
IConfig config = new FluentConfig()
.dataPath(dbPath)
.enablePersistence()
.port(1883)
.disableTelemetry()
.persistentQueueType(FluentConfig.PersistentQueueType.SEGMENTED)
.serverKeepAlive(Duration.ofSeconds(12))
.build();
startServer(config);

Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier("simple_connect_test")
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
final Mqtt5ConnAck connectAck = client.connect();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Accept plain connection");
assertTrue(connectAck.getServerKeepAlive().isPresent());
connectAck.getServerKeepAlive().ifPresent(serverKeepAlive -> {
assertEquals(12 ,serverKeepAlive);
});

client.disconnect();
}

@Test
public void sendConnectOnDisconnectedConnection() throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
Expand Down
9 changes: 9 additions & 0 deletions distribution/src/main/resources/moquette.conf
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,12 @@ password_file config/password_file.conf
# default: 0 (disabled)
#*********************************************************************
# topic_alias_maximum 16

#*********************************************************************
# Keep alive provided by server, only for MQTT5.
#
# server_keep_alive:
# Option used to configure a server preferred keep alive that the client must respect.
# default: empty (disabled)
#*********************************************************************
# server_keep_alive 2s

0 comments on commit 8fc6c5d

Please sign in to comment.