From c520cc6cc69d183b01c2a0e3acd40d1feabc1de3 Mon Sep 17 00:00:00 2001 From: Artan Sadiku Date: Tue, 24 Oct 2023 21:05:07 +0200 Subject: [PATCH 1/2] Update CMake toolchain link in README file - Old link was pointing to a resource that was moved. New link points to the manual page of the CMake software Signed-off-by: Artan Sadiku --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 2950379e0..5fea0d6eb 100644 --- a/README.md +++ b/README.md @@ -220,7 +220,7 @@ $ ctest -VV ### Cross compilation -Cross compilation using CMake is performed by using so called "toolchain files" (see: http://www.vtk.org/Wiki/CMake_Cross_Compiling). +Cross compilation using CMake is performed by using so called "toolchain files" (see: https://cmake.org/cmake/help/latest/manual/cmake-toolchains.7.html). The path to the toolchain file can be specified by using CMake's `-DCMAKE_TOOLCHAIN_FILE` option. In case no toolchain file is specified, the build is performed for the native build platform. @@ -332,4 +332,3 @@ https://docs.microsoft.com/en-us/cpp/cpp/cdecl?view=vs-2019 If you call this library from another language, you may need to take this into account. - From a1f8a7e0370a2ab3308cd94caf84d9fe07bc2cf1 Mon Sep 17 00:00:00 2001 From: fpagliughi Date: Sun, 5 Nov 2023 07:58:21 -0500 Subject: [PATCH 2/2] #530 Added TCP_NODELAY option. --- src/Clients.h | 1 + src/MQTTAsync.c | 6 ++- src/MQTTAsync.h | 29 ++++++++---- src/MQTTClient.c | 5 +- src/MQTTClient.h | 27 +++++++---- src/MQTTProtocolOut.c | 12 ++--- src/Socket.c | 18 ++++++- src/Socket.h | 4 +- test/test1.c | 77 +++++++++++++++++++++++++++++- test/test4.c | 108 +++++++++++++++++++++++++++++++++++++++++- 10 files changed, 254 insertions(+), 33 deletions(-) diff --git a/src/Clients.h b/src/Clients.h index 674895821..c1e5e628e 100644 --- a/src/Clients.h +++ b/src/Clients.h @@ -157,6 +157,7 @@ typedef struct MQTTClient_SSLOptions *sslopts; /**< the SSL/TLS connect options */ SSL_SESSION* session; /**< SSL session pointer for fast handhake */ #endif + int nodelay; /**< TCP_NODELAY socket option */ } Clients; int clientIDCompare(void* a, void* b); diff --git a/src/MQTTAsync.c b/src/MQTTAsync.c index c6a11f6d3..b761f2707 100644 --- a/src/MQTTAsync.c +++ b/src/MQTTAsync.c @@ -562,7 +562,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) goto exit; } - if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8) + if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 9) { rc = MQTTASYNC_BAD_STRUCTURE; goto exit; @@ -693,6 +693,10 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) if (options->httpsProxy) m->c->httpsProxy = MQTTStrdup(options->httpsProxy); } + if (options->struct_version >= 9) + m->c->nodelay = options->nodelay; + else + m->c->nodelay = 0; if (m->c->will) { diff --git a/src/MQTTAsync.h b/src/MQTTAsync.h index e11af0482..af058c442 100644 --- a/src/MQTTAsync.h +++ b/src/MQTTAsync.h @@ -1199,7 +1199,9 @@ typedef struct { /** The eyecatcher for this structure. must be MQTC. */ char struct_id[4]; - /** The version number of this structure. Must be 0, 1, 2, 3 4 5 6, 7 or 8. + /** The version number of this structure. Must be 0, 1, 2, 3 4 5 6, 7, + * 8, or 9. + * * 0 signifies no SSL options and no serverURIs * 1 signifies no serverURIs * 2 signifies no MQTTVersion @@ -1207,7 +1209,8 @@ typedef struct * 4 signifies no binary password option (just string) * 5 signifies no MQTTV5 properties * 6 signifies no HTTP headers option - * 7 signifies no HTTP proxy and HTTPS proxy options + * 7 signifies no HTTP proxy and HTTPS proxy options + * 8 signifies no TCP_NODELAY socket option */ int struct_version; /** The "keep alive" interval, measured in seconds, defines the maximum time @@ -1378,27 +1381,33 @@ typedef struct * HTTPS proxy */ const char* httpsProxy; + /** + * Set the TCP_NODELAY option on the client socket. This could resuce + * the latency on small messages as the cost of increased network + * traffic. + */ + int nodelay; } MQTTAsync_connectOptions; /** Initializer for connect options for MQTT 3.1.1 non-WebSocket connections */ -#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\ -NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL} +#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 9, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\ +NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0} /** Initializer for connect options for MQTT 5.0 non-WebSocket connections */ -#define MQTTAsync_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 8, 60, 0, 65535, NULL, NULL, NULL, 30, 0,\ -NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL} +#define MQTTAsync_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 9, 60, 0, 65535, NULL, NULL, NULL, 30, 0,\ +NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0} /** Initializer for connect options for MQTT 3.1.1 WebSockets connections. * The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts. */ -#define MQTTAsync_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 1, 65535, NULL, NULL, NULL, 30, 0,\ -NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL} +#define MQTTAsync_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 1, 65535, NULL, NULL, NULL, 30, 0,\ +NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0} /** Initializer for connect options for MQTT 5.0 WebSockets connections. * The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts. */ -#define MQTTAsync_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 0, 65535, NULL, NULL, NULL, 30, 0,\ -NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL} +#define MQTTAsync_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 0, 65535, NULL, NULL, NULL, 30, 0,\ +NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0} /** diff --git a/src/MQTTClient.c b/src/MQTTClient.c index b6a7714b7..9f56b574c 100644 --- a/src/MQTTClient.c +++ b/src/MQTTClient.c @@ -1206,7 +1206,10 @@ static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_c int sessionPresent = 0; MQTTResponse resp = MQTTResponse_initializer; + FUNC_ENTRY; + m->c->nodelay =options->nodelay; + resp.reasonCode = SOCKET_ERROR; if (m->ma && !running) { @@ -1759,7 +1762,7 @@ MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* goto exit; } - if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8) + if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 9) { rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE; goto exit; diff --git a/src/MQTTClient.h b/src/MQTTClient.h index a5dc7f267..a8fa00d34 100644 --- a/src/MQTTClient.h +++ b/src/MQTTClient.h @@ -825,7 +825,9 @@ typedef struct { /** The eyecatcher for this structure. must be MQTC. */ char struct_id[4]; - /** The version number of this structure. Must be 0, 1, 2, 3, 4, 5, 6, 7 or 8. + /** The version number of this structure. Must be 0, 1, 2, 3, 4, 5, 6, + * 7, 8, or 9. + * * 0 signifies no SSL options and no serverURIs * 1 signifies no serverURIs * 2 signifies no MQTTVersion @@ -834,6 +836,7 @@ typedef struct * 5 signifies no maxInflightMessages and cleanstart * 6 signifies no HTTP headers option * 7 signifies no HTTP proxy and HTTPS proxy options + * 8 signifies no NO_DELAY */ int struct_version; /** The "keep alive" interval, measured in seconds, defines the maximum time @@ -976,27 +979,33 @@ typedef struct * HTTPS proxy */ const char* httpsProxy; + /** + * Set the TCP_NODELAY option on the client socket. This could resuce + * the latency on small messages as the cost of increased network + * traffic. + */ + int nodelay; } MQTTClient_connectOptions; /** Initializer for connect options for MQTT 3.1.1 non-WebSocket connections */ -#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\ -0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL} +#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 9, 60, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\ +0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL, 0} /** Initializer for connect options for MQTT 5.0 non-WebSocket connections */ -#define MQTTClient_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 8, 60, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\ -0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL} +#define MQTTClient_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 9, 60, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\ +0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL, 0} /** Initializer for connect options for MQTT 3.1.1 WebSockets connections. * The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts. */ -#define MQTTClient_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\ -0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL} +#define MQTTClient_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\ +0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL, 0} /** Initializer for connect options for MQTT 5.0 WebSockets connections. * The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts. */ -#define MQTTClient_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\ -0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL} +#define MQTTClient_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\ +0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL, 0} /** * This function attempts to connect a previously-created client (see diff --git a/src/MQTTProtocolOut.c b/src/MQTTProtocolOut.c index 756f42148..79451a03e 100644 --- a/src/MQTTProtocolOut.c +++ b/src/MQTTProtocolOut.c @@ -265,9 +265,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket if (timeout < 0) rc = -1; else - rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket), timeout); + rc = Socket_new(aClient->net.http_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket), timeout); #else - rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket)); + rc = Socket_new(aClient->net.http_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket)); #endif } #if defined(OPENSSL) @@ -277,9 +277,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket if (timeout < 0) rc = -1; else - rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket), timeout); + rc = Socket_new(aClient->net.https_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket), timeout); #else - rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket)); + rc = Socket_new(aClient->net.https_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket)); #endif } #endif @@ -295,9 +295,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket if (timeout < 0) rc = -1; else - rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket), timeout); + rc = Socket_new(ip_address, addr_len, port, aClient->nodelay, &(aClient->net.socket), timeout); #else - rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket)); + rc = Socket_new(ip_address, addr_len, port, aClient->nodelay, &(aClient->net.socket)); #endif } if (rc == EINPROGRESS || rc == EWOULDBLOCK) diff --git a/src/Socket.c b/src/Socket.c index 7f8a2cff4..eaade682c 100644 --- a/src/Socket.c +++ b/src/Socket.c @@ -44,6 +44,13 @@ #include "Heap.h" +#if defined(_WIN32) || defined(_WIN64) +#define SOL_TCP IPPROTO_TCP +#elif !defined(SOL_TCP) && defined(IPPROTO_TCP) +#define SOL_TCP IPPROTO_TCP +#endif + + #if defined(USE_SELECT) int isReady(int socket, fd_set* read_set, fd_set* write_set); int Socket_continueWrites(fd_set* pwset, SOCKET* socket, mutex_type mutex); @@ -1049,9 +1056,9 @@ int Socket_close(SOCKET socket) * @return completion code 0=good, SOCKET_ERROR=fail */ #if defined(__GNUC__) && defined(__linux__) -int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock, long timeout) +int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* sock, long timeout) #else -int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock) +int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* sock) #endif { int type = SOCK_STREAM; @@ -1181,6 +1188,13 @@ int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock) Log(LOG_ERROR, -1, "Could not set SO_SNDBUF for socket %d", *sock); } #endif + if (nodelay) + { + int opt = 1; + if (setsockopt(*sock, SOL_TCP, TCP_NODELAY, &opt, sizeof(opt)) != 0) + Log(LOG_ERROR, -1, "Could not set TCP_NODELAY for socket %d", *sock); + + } Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port); if (Socket_addSocket(*sock) == SOCKET_ERROR) rc = Socket_error("addSocket", *sock); diff --git a/src/Socket.h b/src/Socket.h index e5b9fe639..703bf9c6e 100644 --- a/src/Socket.h +++ b/src/Socket.h @@ -145,9 +145,9 @@ int Socket_putdatas(SOCKET socket, char* buf0, size_t buf0len, PacketBuffers buf int Socket_close(SOCKET socket); #if defined(__GNUC__) && defined(__linux__) /* able to use GNU's getaddrinfo_a to make timeouts possible */ -int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* socket, long timeout); +int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* socket, long timeout); #else -int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* socket); +int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* socket); #endif int Socket_noPendingWrites(SOCKET socket); diff --git a/test/test1.c b/test/test1.c index bf2c7d056..3844f998f 100644 --- a/test/test1.c +++ b/test/test1.c @@ -1168,10 +1168,85 @@ int test6a(struct Options options) return failures; } +/********************************************************************* + +Test7: Socket options + +*********************************************************************/ + +int test7(struct Options options) +{ + MQTTClient c; + MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer; + MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer; + int rc = 0; + char* test_topic = "C client test7"; + + fprintf(xml, "message = "will message"; + opts.will->qos = 1; + opts.will->retained = 0; + opts.will->topicName = "will topic"; + opts.will = NULL; + + opts.nodelay = 1; + + /* Test with the socket option(s) */ + MyLog(LOGA_DEBUG, "Connecting"); + rc = MQTTClient_connect(c, &opts); + assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); + if (rc != MQTTCLIENT_SUCCESS) + goto exit; + + rc = MQTTClient_disconnect(c, 0); + assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); + + /* Try to reconnect */ + rc = MQTTClient_connect(c, &opts); + assert("Connect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); + + rc = MQTTClient_disconnect(c, 0); + assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); + + MQTTClient_destroy(&c); + +exit: + MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.", + (failures == 0) ? "passed" : "failed", tests, failures); + write_test_result(); + return failures; +} + + int main(int argc, char** argv) { int rc = 0; - int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test6a}; + int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test6a, test7}; int i; xml = fopen("TEST-test1.xml", "w"); diff --git a/test/test4.c b/test/test4.c index 9c34a9715..08c2cb736 100644 --- a/test/test4.c +++ b/test/test4.c @@ -1741,6 +1741,112 @@ int test8(struct Options options) return failures; } +/********************************************************************* + +Test9: Socket option(s) + +*********************************************************************/ + +void test9_onConnect(void* context, MQTTAsync_successData* response) +{ + MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context); + test_finished = 1; +} + +void test9_onConnectFailure(void* context, MQTTAsync_failureData* response) +{ + MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context); + MyLog(LOGA_INFO, "Connack rc is %d", response ? response->code : -999); + test_finished = 1; +} + +void test9_onDisconnect(void* context, MQTTAsync_successData* response) +{ + MyLog(LOGA_DEBUG, "In onDisconnect callback %p", context); + test_finished = 1; +} + + +int test9(struct Options options) +{ + MQTTAsync c; + MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; + MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; + MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer; + int rc = 0; + char* test_topic = "C client test9"; + + MyLog(LOGA_INFO, "Starting test 9 - socket options"); + fprintf(xml, "message = "will message"; + opts.will->qos = 1; + opts.will->retained = 0; + opts.will->topicName = "will topic"; + opts.will = NULL; + + opts.onSuccess = test9_onConnect; + opts.onFailure = test9_onConnectFailure; + opts.context = c; + + opts.nodelay = 1; + + test_finished = 0; + + MyLog(LOGA_DEBUG, "Connecting"); + rc = MQTTAsync_connect(c, &opts); + rc = 0; + assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc); + if (rc != MQTTASYNC_SUCCESS) + goto exit; + + while (!test_finished) + #if defined(_WIN32) + Sleep(100); + #else + usleep(10000L); + #endif + + test_finished = 0; + dopts.timeout = 0; + dopts.onSuccess = test8_onDisconnect; + dopts.context = c; + rc = MQTTAsync_disconnect(c, &dopts); + assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc); + + while (!test_finished) +#if defined(_WIN32) + Sleep(100); +#else + usleep(10000L); +#endif + + MQTTAsync_destroy(&c); + +exit: + MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.", + (failures == 0) ? "passed" : "failed", tests, failures); + write_test_result(); + return failures; +} + void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) @@ -1754,7 +1860,7 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) int main(int argc, char** argv) { int rc = 0; - int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test7, test8}; /* indexed starting from 1 */ + int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test7, test8, test9}; /* indexed starting from 1 */ MQTTAsync_nameValue* info; int i;