diff --git a/examples/copas-example.lua b/examples/copas-example.lua index 858d32e..0c85dd9 100644 --- a/examples/copas-example.lua +++ b/examples/copas-example.lua @@ -2,11 +2,10 @@ local mqtt = require("mqtt") local copas = require("copas") -local mqtt_ioloop = require("mqtt.ioloop") local num_pings = 10 -- total number of ping-pongs -local timeout = 1 -- timeout between ping-pongs -local suffix = tostring(math.random(1000000)) -- mqtt topic suffix to distinct simultaneous rinning of this script +local delay = 1 -- delay between ping-pongs +local suffix = tostring(math.random(1000000)) -- mqtt topic suffix to distinct simultaneous running of this script -- NOTE: more about flespi tokens: https://flespi.com/kb/tokens-access-keys-to-flespi-platform local token = "stPwSVV73Eqw5LSv0iMXbc4EguS7JyuZR9lxU5uLxI5tiNM8ToTVqNpu85pFtJv9" @@ -16,6 +15,8 @@ local ping = mqtt.client{ username = token, clean = true, version = mqtt.v50, + -- NOTE: copas connector + connector = require("mqtt.luasocket-copas"), } local pong = mqtt.client{ @@ -23,6 +24,8 @@ local pong = mqtt.client{ username = token, clean = true, version = mqtt.v50, + -- NOTE: copas connector + connector = require("mqtt.luasocket-copas"), } ping:on{ @@ -30,17 +33,21 @@ ping:on{ assert(connack.rc == 0) print("ping connected") - for i = 1, num_pings do - copas.sleep(timeout) - print("ping", i) - assert(ping:publish{ topic = "luamqtt/copas-ping/"..suffix, payload = "ping"..i, qos = 1 }) - end - - copas.sleep(timeout) - - print("ping done") - assert(ping:publish{ topic = "luamqtt/copas-ping/"..suffix, payload = "done", qos = 1 }) - ping:disconnect() + -- adding another thread; copas handlers should return quickly, anything + -- that can wait should be off-loaded from the handler to a thread. + -- Especially anything that yields; socket reads/writes and sleeps, and the + -- code below does both, sleeping, and writing (implicit in 'publish') + copas.addthread(function() + for i = 1, num_pings do + copas.sleep(delay) + print("ping", i) + assert(ping:publish{ topic = "luamqtt/copas-ping/"..suffix, payload = "ping"..i, qos = 1 }) + end + + print("ping done") + assert(ping:publish{ topic = "luamqtt/copas-ping/"..suffix, payload = "done", qos = 1 }) + ping:disconnect() + end) end, error = function(err) print("ping MQTT client error:", err) @@ -72,19 +79,33 @@ pong:on{ end, } +local function add_client(cl) + -- add keep-alive timer + local timer = copas.addthread(function() + while cl do + copas.sleep(cl:check_keep_alive()) + end + end) + -- add client to connect and listen + copas.addthread(function() + while cl do + local timeout = cl:step() + if not timeout then + cl = nil -- exiting, inform keep-alive timer + copas.wakeup(timer) + else + if timeout > 0 then + copas.sleep(timeout) + end + end + end + end) +end + print("running copas loop...") -copas.addthread(function() - local ioloop = mqtt_ioloop.create{ sleep = 0.01, sleep_function = copas.sleep } - ioloop:add(ping) - ioloop:run_until_clients() -end) - -copas.addthread(function() - local ioloop = mqtt_ioloop.create{ sleep = 0.01, sleep_function = copas.sleep } - ioloop:add(pong) - ioloop:run_until_clients() -end) +add_client(ping) +add_client(pong) copas.loop() diff --git a/examples/copas.lua b/examples/copas.lua index 199e49a..6eb6c5a 100644 --- a/examples/copas.lua +++ b/examples/copas.lua @@ -55,22 +55,31 @@ client:on{ end } --- run io loop for client until connection close -copas.addthread(function() - print("running client in separated copas thread #1...") - mqtt.run_sync(client) - -- NOTE: in sync mode no automatic reconnect is working, but you may just wrap "mqtt.run_sync(client)" call in a loop like this: - -- while true do - -- mqtt.run_sync(client) - -- end -end) +local function add_client(cl) + -- add keep-alive timer + local timer = copas.addthread(function() + while cl do + copas.sleep(cl:check_keep_alive()) + end + end) + -- add client to connect and listen + copas.addthread(function() + while cl do + local timeout = cl:step() + if not timeout then + cl = nil -- exiting + copas.wakeup(timer) + else + if timeout > 0 then + copas.sleep(timeout) + end + end + end + end) +end -copas.addthread(function() - print("execution of separated copas thread #2...") - copas.sleep(0.1) - print("thread #2 stopped") -end) +add_client(client) copas.loop() print("done, copas loop is stopped") diff --git a/examples/sync.lua b/examples/sync.lua deleted file mode 100644 index b415b07..0000000 --- a/examples/sync.lua +++ /dev/null @@ -1,59 +0,0 @@ --- load mqtt module -local mqtt = require("mqtt") - --- create mqtt client -local client = mqtt.client{ - -- NOTE: this broker is not working sometimes; comment username = "..." below if you still want to use it - -- uri = "test.mosquitto.org", - uri = "mqtt.flespi.io", - -- NOTE: more about flespi tokens: https://flespi.com/kb/tokens-access-keys-to-flespi-platform - username = "stPwSVV73Eqw5LSv0iMXbc4EguS7JyuZR9lxU5uLxI5tiNM8ToTVqNpu85pFtJv9", - clean = true, -} -print("created MQTT client", client) - -client:on{ - connect = function(connack) - if connack.rc ~= 0 then - print("connection to broker failed:", connack:reason_string(), connack) - return - end - print("connected:", connack) -- successful connection - - -- subscribe to test topic and publish message after it - assert(client:subscribe{ topic="luamqtt/#", qos=1, callback=function(suback) - print("subscribed:", suback) - - -- publish test message - print('publishing test message "hello" to "luamqtt/simpletest" topic...') - assert(client:publish{ - topic = "luamqtt/simpletest", - payload = "hello", - qos = 1 - }) - end}) - end, - - message = function(msg) - assert(client:acknowledge(msg)) - - print("received:", msg) - print("disconnecting...") - assert(client:disconnect()) - end, - - error = function(err) - print("MQTT client error:", err) - end, - - close = function() - print("MQTT conn closed") - end -} - --- run io loop for client until connection close --- please note that in sync mode background PINGREQ's are not available, and automatic reconnects too -print("running client in synchronous input/output loop") -mqtt.run_sync(client) - -print("done, synchronous input/output loop is stopped") \ No newline at end of file