Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds TCP_NODELAY as a runtime connect option #1417

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ $ ctest -VV

### Cross compilation

Cross compilation using CMake is performed by using so called "toolchain files" (see: http://www.vtk.org/Wiki/CMake_Cross_Compiling).
Cross compilation using CMake is performed by using so called "toolchain files" (see: https://cmake.org/cmake/help/latest/manual/cmake-toolchains.7.html).

The path to the toolchain file can be specified by using CMake's `-DCMAKE_TOOLCHAIN_FILE` option. In case no toolchain file is specified, the build is performed for the native build platform.

Expand Down Expand Up @@ -332,4 +332,3 @@ https://docs.microsoft.com/en-us/cpp/cpp/cdecl?view=vs-2019

If you call this library from another language, you may need to take this into account.


1 change: 1 addition & 0 deletions src/Clients.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ typedef struct
MQTTClient_SSLOptions *sslopts; /**< the SSL/TLS connect options */
SSL_SESSION* session; /**< SSL session pointer for fast handhake */
#endif
int nodelay; /**< TCP_NODELAY socket option */
} Clients;

int clientIDCompare(void* a, void* b);
Expand Down
6 changes: 5 additions & 1 deletion src/MQTTAsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
goto exit;
}

if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 9)
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
Expand Down Expand Up @@ -693,6 +693,10 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
if (options->httpsProxy)
m->c->httpsProxy = MQTTStrdup(options->httpsProxy);
}
if (options->struct_version >= 9)
m->c->nodelay = options->nodelay;
else
m->c->nodelay = 0;

if (m->c->will)
{
Expand Down
29 changes: 19 additions & 10 deletions src/MQTTAsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -1199,15 +1199,18 @@ typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0, 1, 2, 3 4 5 6, 7 or 8.
/** The version number of this structure. Must be 0, 1, 2, 3 4 5 6, 7,
* 8, or 9.
*
* 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs
* 2 signifies no MQTTVersion
* 3 signifies no automatic reconnect options
* 4 signifies no binary password option (just string)
* 5 signifies no MQTTV5 properties
* 6 signifies no HTTP headers option
* 7 signifies no HTTP proxy and HTTPS proxy options
* 7 signifies no HTTP proxy and HTTPS proxy options
* 8 signifies no TCP_NODELAY socket option
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
Expand Down Expand Up @@ -1378,27 +1381,33 @@ typedef struct
* HTTPS proxy
*/
const char* httpsProxy;
/**
* Set the TCP_NODELAY option on the client socket. This could resuce
* the latency on small messages as the cost of increased network
* traffic.
*/
int nodelay;
} MQTTAsync_connectOptions;

/** Initializer for connect options for MQTT 3.1.1 non-WebSocket connections */
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 9, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 5.0 non-WebSocket connections */
#define MQTTAsync_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 8, 60, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
#define MQTTAsync_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 9, 60, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 3.1.1 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTAsync_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
#define MQTTAsync_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 5.0 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTAsync_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
#define MQTTAsync_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0}


/**
Expand Down
5 changes: 4 additions & 1 deletion src/MQTTClient.c
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,10 @@ static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_c
int sessionPresent = 0;
MQTTResponse resp = MQTTResponse_initializer;


FUNC_ENTRY;
m->c->nodelay =options->nodelay;

resp.reasonCode = SOCKET_ERROR;
if (m->ma && !running)
{
Expand Down Expand Up @@ -1759,7 +1762,7 @@ MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions*
goto exit;
}

if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 9)
{
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
Expand Down
27 changes: 18 additions & 9 deletions src/MQTTClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,9 @@ typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0, 1, 2, 3, 4, 5, 6, 7 or 8.
/** The version number of this structure. Must be 0, 1, 2, 3, 4, 5, 6,
* 7, 8, or 9.
*
* 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs
* 2 signifies no MQTTVersion
Expand All @@ -834,6 +836,7 @@ typedef struct
* 5 signifies no maxInflightMessages and cleanstart
* 6 signifies no HTTP headers option
* 7 signifies no HTTP proxy and HTTPS proxy options
* 8 signifies no NO_DELAY
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
Expand Down Expand Up @@ -976,27 +979,33 @@ typedef struct
* HTTPS proxy
*/
const char* httpsProxy;
/**
* Set the TCP_NODELAY option on the client socket. This could resuce
* the latency on small messages as the cost of increased network
* traffic.
*/
int nodelay;
} MQTTClient_connectOptions;

/** Initializer for connect options for MQTT 3.1.1 non-WebSocket connections */
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL}
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 9, 60, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 5.0 non-WebSocket connections */
#define MQTTClient_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 8, 60, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL}
#define MQTTClient_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 9, 60, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 3.1.1 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTClient_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL}
#define MQTTClient_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 5.0 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTClient_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL}
#define MQTTClient_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL, 0}

/**
* This function attempts to connect a previously-created client (see
Expand Down
12 changes: 6 additions & 6 deletions src/MQTTProtocolOut.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket
if (timeout < 0)
rc = -1;
else
rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket), timeout);
rc = Socket_new(aClient->net.http_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket), timeout);
#else
rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket));
rc = Socket_new(aClient->net.http_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket));
#endif
}
#if defined(OPENSSL)
Expand All @@ -277,9 +277,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket
if (timeout < 0)
rc = -1;
else
rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket), timeout);
rc = Socket_new(aClient->net.https_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket), timeout);
#else
rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket));
rc = Socket_new(aClient->net.https_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket));
#endif
}
#endif
Expand All @@ -295,9 +295,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket
if (timeout < 0)
rc = -1;
else
rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket), timeout);
rc = Socket_new(ip_address, addr_len, port, aClient->nodelay, &(aClient->net.socket), timeout);
#else
rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket));
rc = Socket_new(ip_address, addr_len, port, aClient->nodelay, &(aClient->net.socket));
#endif
}
if (rc == EINPROGRESS || rc == EWOULDBLOCK)
Expand Down
18 changes: 16 additions & 2 deletions src/Socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@

#include "Heap.h"

#if defined(_WIN32) || defined(_WIN64)
#define SOL_TCP IPPROTO_TCP
#elif !defined(SOL_TCP) && defined(IPPROTO_TCP)
#define SOL_TCP IPPROTO_TCP
#endif


#if defined(USE_SELECT)
int isReady(int socket, fd_set* read_set, fd_set* write_set);
int Socket_continueWrites(fd_set* pwset, SOCKET* socket, mutex_type mutex);
Expand Down Expand Up @@ -1049,9 +1056,9 @@ int Socket_close(SOCKET socket)
* @return completion code 0=good, SOCKET_ERROR=fail
*/
#if defined(__GNUC__) && defined(__linux__)
int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock, long timeout)
int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* sock, long timeout)
#else
int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock)
int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* sock)
#endif
{
int type = SOCK_STREAM;
Expand Down Expand Up @@ -1181,6 +1188,13 @@ int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock)
Log(LOG_ERROR, -1, "Could not set SO_SNDBUF for socket %d", *sock);
}
#endif
if (nodelay)
{
int opt = 1;
if (setsockopt(*sock, SOL_TCP, TCP_NODELAY, &opt, sizeof(opt)) != 0)
Log(LOG_ERROR, -1, "Could not set TCP_NODELAY for socket %d", *sock);

}
Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port);
if (Socket_addSocket(*sock) == SOCKET_ERROR)
rc = Socket_error("addSocket", *sock);
Expand Down
4 changes: 2 additions & 2 deletions src/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ int Socket_putdatas(SOCKET socket, char* buf0, size_t buf0len, PacketBuffers buf
int Socket_close(SOCKET socket);
#if defined(__GNUC__) && defined(__linux__)
/* able to use GNU's getaddrinfo_a to make timeouts possible */
int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* socket, long timeout);
int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* socket, long timeout);
#else
int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* socket);
int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* socket);
#endif

int Socket_noPendingWrites(SOCKET socket);
Expand Down
77 changes: 76 additions & 1 deletion test/test1.c
Original file line number Diff line number Diff line change
Expand Up @@ -1168,10 +1168,85 @@ int test6a(struct Options options)
return failures;
}

/*********************************************************************

Test7: Socket options

*********************************************************************/

int test7(struct Options options)
{
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
int rc = 0;
char* test_topic = "C client test7";

fprintf(xml, "<testcase classname=\"test1\" name=\"socket options\"");
global_start_time = start_clock();
failures = 0;
MyLog(LOGA_INFO, "Starting test 7 - socket options");

rc = MQTTClient_create(&c, options.connection, "sockopt_test",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTCLIENT_SUCCESS)
{
MQTTClient_destroy(&c);
goto exit;
}

opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}

opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;

opts.nodelay = 1;

/* Test with the socket option(s) */
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTClient_connect(c, &opts);
assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
if (rc != MQTTCLIENT_SUCCESS)
goto exit;

rc = MQTTClient_disconnect(c, 0);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);

/* Try to reconnect */
rc = MQTTClient_connect(c, &opts);
assert("Connect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);

rc = MQTTClient_disconnect(c, 0);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);

MQTTClient_destroy(&c);

exit:
MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}


int main(int argc, char** argv)
{
int rc = 0;
int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test6a};
int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test6a, test7};
int i;

xml = fopen("TEST-test1.xml", "w");
Expand Down
Loading
Loading