Skip to content

Commit

Permalink
Implemented server keep alive setting and usage in connect (#883)
Browse files Browse the repository at this point in the history
Updates the configuration harness to expose a duration setting name server_keep_alive.
Updated the connAck properties creation to include server keep alive if the broker configuration has selected it.
Updates the connect processing to leverage the server keep alive selected keep alive instead of the client one.
  • Loading branch information
andsel authored Dec 18, 2024
1 parent ca13e48 commit c9b4740
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 0 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Server keep alive: added configuration setting so that the broker can specify a keep alive other then the one selected by clients. (#789)
[feature] User properties: covered publish with user properties with tests and fixed publish of will messages. (#877)
[feature] Topic alias: implemented handling of topic alias received by publishers. (#873)
[feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858)
Expand Down
10 changes: 10 additions & 0 deletions broker/src/main/java/io/moquette/broker/BrokerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.moquette.broker.config.IConfig;

import java.util.Locale;
import java.util.Optional;

class BrokerConfiguration {

Expand All @@ -30,6 +31,7 @@ class BrokerConfiguration {
private final int topicAliasMaximum;
// integer max value means that the property is unset
private int receiveMaximum;
private Optional<Integer> serverKeepAlive = Optional.empty();

BrokerConfiguration(IConfig props) {
allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true);
Expand Down Expand Up @@ -70,6 +72,10 @@ class BrokerConfiguration {
receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM);

topicAliasMaximum = props.intProp(IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, BrokerConstants.DISABLED_TOPIC_ALIAS);

if (props.getProperty(IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME) != null) {
serverKeepAlive = Optional.of((int) props.durationProp(IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME).toMillis() / 1_000);
}
}

// test method
Expand Down Expand Up @@ -133,4 +139,8 @@ public int receiveMaximum() {
public int topicAliasMaximum() {
return topicAliasMaximum;
}

public Optional<Integer> getServerKeepAlive() {
return serverKeepAlive;
}
}
11 changes: 11 additions & 0 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser
connAckPropertiesBuilder.topicAliasMaximum(topicAliasMaximum);
}

if (brokerConfig.getServerKeepAlive().isPresent()) {
connAckPropertiesBuilder.serverKeepAlive(brokerConfig.getServerKeepAlive().get());
}

final MqttProperties ackProperties = connAckPropertiesBuilder.build();
connAckBuilder.properties(ackProperties);
}
Expand Down Expand Up @@ -464,6 +468,13 @@ private void setupInflightResender(Channel channel) {

private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) {
int keepAlive = msg.variableHeader().keepAliveTimeSeconds();

// force server keep alive if configured
if (brokerConfig.getServerKeepAlive().isPresent()) {
int serverKeepAlive = brokerConfig.getServerKeepAlive().get();
LOG.info("Forcing server keep alive ({}) over client selection ({})", serverKeepAlive, keepAlive);
keepAlive = serverKeepAlive;
}
NettyUtils.keepAlive(channel, keepAlive);
NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
NettyUtils.clientID(channel, clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Locale;
import java.util.Properties;
import java.util.function.Consumer;
Expand All @@ -30,6 +31,7 @@
import static io.moquette.broker.config.IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.RECEIVE_MAXIMUM;
import static io.moquette.broker.config.IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.SESSION_QUEUE_SIZE;
import static io.moquette.broker.config.IConfig.SSL_PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.SSL_PROVIDER;
Expand Down Expand Up @@ -215,6 +217,12 @@ public FluentConfig topicAliasMaximum(int topicAliasMaximum) {
return this;
}

public FluentConfig serverKeepAlive(Duration keepAliveSeconds) {
int seconds = (int) keepAliveSeconds.toMillis() / 1_000;
configAccumulator.put(SERVER_KEEP_ALIVE_PROPERTY_NAME, seconds + "s");
return this;
}

public class TLSConfig {

private SSLProvider providerType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public abstract class IConfig {
public static final String MAX_SERVER_GRANTED_QOS_PROPERTY_NAME = "max_server_granted_qos";
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092;
public static final String TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME = "topic_alias_maximum";
public static final String SERVER_KEEP_ALIVE_PROPERTY_NAME = "server_keep_alive";

public abstract void setProperty(String name, String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublishBuilder;
import io.moquette.broker.config.FluentConfig;
import io.moquette.broker.config.IConfig;
import io.moquette.testclient.Client;
import io.netty.handler.codec.mqtt.*;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -70,6 +72,35 @@ public void simpleConnect() {
client.disconnect();
}

@Test
public void givenServerKeepAliveConfiguredThenConnectAckMustRespectIt() throws IOException {
stopServer();
IConfig config = new FluentConfig()
.dataPath(dbPath)
.enablePersistence()
.port(1883)
.disableTelemetry()
.persistentQueueType(FluentConfig.PersistentQueueType.SEGMENTED)
.serverKeepAlive(Duration.ofSeconds(12))
.build();
startServer(config);

Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier("simple_connect_test")
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
final Mqtt5ConnAck connectAck = client.connect();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Accept plain connection");
assertTrue(connectAck.getServerKeepAlive().isPresent());
connectAck.getServerKeepAlive().ifPresent(serverKeepAlive -> {
assertEquals(12 ,serverKeepAlive);
});

client.disconnect();
}

@Test
public void sendConnectOnDisconnectedConnection() throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
Expand Down
9 changes: 9 additions & 0 deletions distribution/src/main/resources/moquette.conf
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,12 @@ password_file config/password_file.conf
# default: 0 (disabled)
#*********************************************************************
# topic_alias_maximum 16

#*********************************************************************
# Keep alive provided by server, only for MQTT5.
#
# server_keep_alive:
# Option used to configure a server preferred keep alive that the client must respect.
# default: empty (disabled)
#*********************************************************************
# server_keep_alive 2s

0 comments on commit c9b4740

Please sign in to comment.