Skip to content

Commit

Permalink
Safer settings reload and MQTT change detection (#1701)
Browse files Browse the repository at this point in the history
* MQTT: config change detection
* Reload settings when config json is uploaded
* Apply only new settings
* Finish config early when not enabled
* Reuse existing buffers from getSetting String using std::move
  • Loading branch information
mcspr authored May 26, 2019
1 parent 868d4d5 commit 334b499
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 99 deletions.
11 changes: 11 additions & 0 deletions code/espurna/espurna.ino
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
std::vector<void (*)()> _loop_callbacks;
std::vector<void (*)()> _reload_callbacks;

bool _reload_config = false;
unsigned long _loop_delay = 0;

// -----------------------------------------------------------------------------
Expand All @@ -40,6 +41,10 @@ void espurnaRegisterReload(void (*callback)()) {
}

void espurnaReload() {
_reload_config = true;
}

void _espurnaReload() {
for (unsigned char i = 0; i < _reload_callbacks.size(); i++) {
(_reload_callbacks[i])();
}
Expand Down Expand Up @@ -228,6 +233,12 @@ void setup() {

void loop() {

// Reload config before running any callbacks
if (_reload_config) {
_espurnaReload();
_reload_config = false;
}

// Call registered loop callbacks
for (unsigned char i = 0; i < _loop_callbacks.size(); i++) {
(_loop_callbacks[i])();
Expand Down
211 changes: 124 additions & 87 deletions code/espurna/mqtt.ino
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
#include <ESP8266mDNS.h>
#include <ArduinoJson.h>
#include <vector>
#include <utility>
#include <Ticker.h>

#if MQTT_USE_ASYNC // Using AsyncMqttClient
Expand Down Expand Up @@ -46,10 +47,12 @@ String _mqtt_topic_json;
String _mqtt_setter;
String _mqtt_getter;
bool _mqtt_forward;
char *_mqtt_user = 0;
char *_mqtt_pass = 0;
char *_mqtt_will;
char *_mqtt_clientid;
String _mqtt_user;
String _mqtt_pass;
String _mqtt_will;
String _mqtt_server;
uint16_t _mqtt_port;
String _mqtt_clientid;

std::vector<mqtt_callback_f> _mqtt_callbacks;

Expand Down Expand Up @@ -82,41 +85,23 @@ void _mqttConnect() {
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX;
}

String h = getSetting("mqttServer", MQTT_SERVER);
#if MDNS_CLIENT_SUPPORT
h = mdnsResolve(h);
_mqtt_server = mdnsResolve(_mqtt_server);
#endif
char * host = strdup(h.c_str());

unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt();

if (_mqtt_user) free(_mqtt_user);
if (_mqtt_pass) free(_mqtt_pass);
if (_mqtt_will) free(_mqtt_will);
if (_mqtt_clientid) free(_mqtt_clientid);

String user = getSetting("mqttUser", MQTT_USER);
_mqttPlaceholders(&user);
_mqtt_user = strdup(user.c_str());
_mqtt_pass = strdup(getSetting("mqttPassword", MQTT_PASS).c_str());
_mqtt_will = strdup(mqttTopic(MQTT_TOPIC_STATUS, false).c_str());
String clientid = getSetting("mqttClientID", getIdentifier());
_mqttPlaceholders(&clientid);
_mqtt_clientid = strdup(clientid.c_str());

DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%d\n"), host, port);
DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%u\n"), _mqtt_server.c_str(), _mqtt_port);

#if MQTT_USE_ASYNC
_mqtt_connecting = true;

_mqtt.setServer(host, port);
_mqtt.setClientId(_mqtt_clientid);
_mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
_mqtt.setClientId(_mqtt_clientid.c_str());
_mqtt.setKeepAlive(_mqtt_keepalive);
_mqtt.setCleanSession(false);
_mqtt.setWill(_mqtt_will, _mqtt_qos, _mqtt_retain, "0");
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
_mqtt.setCredentials(_mqtt_user, _mqtt_pass);
_mqtt.setWill(_mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, "0");
if (_mqtt_user.length() && _mqtt_pass.length()) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
_mqtt.setCredentials(_mqtt_user.c_str(), _mqtt_pass.c_str());
}

#if ASYNC_TCP_SSL_ENABLED
Expand All @@ -135,11 +120,11 @@ void _mqttConnect() {

#endif // ASYNC_TCP_SSL_ENABLED

DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid);
DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid.c_str());
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), _mqtt_retain ? 1 : 0);
DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str());

_mqtt.connect();

Expand All @@ -152,10 +137,10 @@ void _mqttConnect() {
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
if (secure) {
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
if (_mqtt_client_secure.connect(host, port)) {
if (_mqtt_client_secure.connect(_mqtt_server.c_str(), _mqtt_port)) {
char fp[60] = {0};
if (sslFingerPrintChar(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
if (_mqtt_client_secure.verify(fp, host)) {
if (_mqtt_client_secure.verify(fp, _mqtt_server.c_str())) {
_mqtt.setClient(_mqtt_client_secure);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n"));
Expand Down Expand Up @@ -184,20 +169,20 @@ void _mqttConnect() {

if (response) {

_mqtt.setServer(host, port);
_mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);

if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
response = _mqtt.connect(_mqtt_clientid, _mqtt_user, _mqtt_pass, _mqtt_will, _mqtt_qos, _mqtt_retain, "0");
if (_mqtt_user.length() && _mqtt_pass.length()) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
response = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, "0");
} else {
response = _mqtt.connect(_mqtt_clientid, _mqtt_will, _mqtt_qos, _mqtt_retain, "0");
response = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, "0");
}

DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid);
DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid.c_str());
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), _mqtt_retain ? 1 : 0);
DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str());

}

Expand All @@ -210,50 +195,114 @@ void _mqttConnect() {

#endif // MQTT_USE_ASYNC

free(host);

}

void _mqttPlaceholders(String *text) {
text->replace("{hostname}", getSetting("hostname"));
text->replace("{magnitude}", "#");
void _mqttPlaceholders(String& text) {

text.replace("{hostname}", getSetting("hostname"));
text.replace("{magnitude}", "#");

String mac = WiFi.macAddress();
mac.replace(":", "");
text->replace("{mac}", mac);
text.replace("{mac}", mac);

}

template<typename T>
void _mqttApplySetting(T& current, T& updated) {
if (current != updated) {
current = std::move(updated);
mqttDisconnect();
}
}

template<typename T>
void _mqttApplySetting(T& current, const T& updated) {
if (current != updated) {
current = updated;
mqttDisconnect();
}
}

template<typename T>
void _mqttApplyTopic(T& current, const char* magnitude) {
String updated = mqttTopic(magnitude, false);
if (current != updated) {
mqttFlush();
current = std::move(updated);
}
}

void _mqttConfigure() {

// Get base topic
_mqtt_topic = getSetting("mqttTopic", MQTT_TOPIC);
if (_mqtt_topic.endsWith("/")) _mqtt_topic.remove(_mqtt_topic.length()-1);
// Enable only when server is set
{
String server = getSetting("mqttServer", MQTT_SERVER);
uint16_t port = getSetting("mqttPort", MQTT_PORT).toInt();
bool enabled = false;
if (server.length()) {
enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1;
}

_mqttApplySetting(_mqtt_server, server);
_mqttApplySetting(_mqtt_enabled, enabled);
_mqttApplySetting(_mqtt_port, port);

if (!enabled) return;
}

// Get base topic and apply placeholders
{
String topic = getSetting("mqttTopic", MQTT_TOPIC);
if (topic.endsWith("/")) topic.remove(_mqtt_topic.length()-1);

// Placeholders
_mqttPlaceholders(&_mqtt_topic);
if (_mqtt_topic.indexOf("#") == -1) _mqtt_topic = _mqtt_topic + "/#";
// Replace things inside curly braces (like {hostname}, {mac} etc.)
_mqttPlaceholders(topic);

// Getters and setters
_mqtt_setter = getSetting("mqttSetter", MQTT_SETTER);
_mqtt_getter = getSetting("mqttGetter", MQTT_GETTER);
_mqtt_forward = !_mqtt_getter.equals(_mqtt_setter) && RELAY_REPORT_STATUS;
if (topic.indexOf("#") == -1) topic.concat("/#");
_mqttApplySetting(_mqtt_topic, topic);

_mqttApplyTopic(_mqtt_will, MQTT_TOPIC_STATUS);
}

// Getter and setter
{
String setter = getSetting("mqttSetter", MQTT_SETTER);
String getter = getSetting("mqttGetter", MQTT_GETTER);
bool forward = !setter.equals(getter) && RELAY_REPORT_STATUS;

_mqttApplySetting(_mqtt_setter, setter);
_mqttApplySetting(_mqtt_getter, getter);
_mqttApplySetting(_mqtt_forward, forward);
}

// MQTT options
_mqtt_qos = getSetting("mqttQoS", MQTT_QOS).toInt();
_mqtt_retain = getSetting("mqttRetain", MQTT_RETAIN).toInt() == 1;
_mqtt_keepalive = getSetting("mqttKeep", MQTT_KEEPALIVE).toInt();
if (getSetting("mqttClientID").length() == 0) delSetting("mqttClientID");

// Enable
if (getSetting("mqttServer", MQTT_SERVER).length() == 0) {
mqttEnabled(false);
} else {
_mqtt_enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1;
{
String user = getSetting("mqttUser", MQTT_USER);
_mqttPlaceholders(user);

String pass = getSetting("mqttPassword", MQTT_PASS);

unsigned char qos = getSetting("mqttQoS", MQTT_QOS).toInt();
bool retain = getSetting("mqttRetain", MQTT_RETAIN).toInt() == 1;
unsigned long keepalive = getSetting("mqttKeep", MQTT_KEEPALIVE).toInt();

String id = getSetting("mqttClientID", getIdentifier());
_mqttPlaceholders(id);

_mqttApplySetting(_mqtt_user, user);
_mqttApplySetting(_mqtt_pass, pass);
_mqttApplySetting(_mqtt_qos, qos);
_mqttApplySetting(_mqtt_retain, retain);
_mqttApplySetting(_mqtt_keepalive, keepalive);
_mqttApplySetting(_mqtt_clientid, id);
}

// MQTT JSON
{
_mqttApplySetting(_mqtt_use_json, getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1);
_mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON);
}
_mqtt_use_json = (getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1);
mqttQueueTopic(MQTT_TOPIC_JSON);

_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;

Expand Down Expand Up @@ -491,9 +540,6 @@ void mqttSend(const char * topic, const char * message, bool force, bool retain)
// Equeue message
if (useJson) {

// Set default queue topic
mqttQueueTopic(MQTT_TOPIC_JSON);

// Enqueue new message
mqttEnqueue(topic, message);

Expand Down Expand Up @@ -608,14 +654,6 @@ void mqttFlush() {

}

void mqttQueueTopic(const char * topic) {
String t = mqttTopic(topic, false);
if (!t.equals(_mqtt_topic_json)) {
mqttFlush();
_mqtt_topic_json = t;
}
}

int8_t mqttEnqueue(const char * topic, const char * message, unsigned char parent) {

// Queue is not meant to send message "offline"
Expand Down Expand Up @@ -709,19 +747,18 @@ void mqttRegister(mqtt_callback_f callback) {

void mqttSetBroker(IPAddress ip, unsigned int port) {
setSetting("mqttServer", ip.toString());
_mqtt_server = ip.toString();

setSetting("mqttPort", port);
_mqtt_port = port;

mqttEnabled(MQTT_AUTOCONNECT);
}

void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) {
if (getSetting("mqttServer", MQTT_SERVER).length() == 0) mqttSetBroker(ip, port);
}

void mqttReset() {
_mqttConfigure();
mqttDisconnect();
}

// -----------------------------------------------------------------------------
// Initialization
// -----------------------------------------------------------------------------
Expand Down
12 changes: 0 additions & 12 deletions code/espurna/ws.ino
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,6 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) {

String adminPass;
bool save = false;
#if MQTT_SUPPORT
bool changedMQTT = false;
#endif

for (auto kv: config) {

Expand Down Expand Up @@ -263,9 +260,6 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) {
// Update flags if value has changed
if (changed) {
save = true;
#if MQTT_SUPPORT
if (key.startsWith("mqtt")) changedMQTT = true;
#endif
}

}
Expand All @@ -276,12 +270,6 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) {
// Callbacks
espurnaReload();

// This should got to callback as well
// but first change management has to be in place
#if MQTT_SUPPORT
if (changedMQTT) mqttReset();
#endif

// Persist settings
saveSettings();

Expand Down

6 comments on commit 334b499

@reaper7
Copy link
Contributor

@reaper7 reaper7 commented on 334b499 May 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcspr - with this commit I have problem with topics,
all ends with double "//" like:
dhouse/salon/light1//status

Something is wrong with removal "/" at the end, maybe in _mqttConfigure ??
look at line:
https://github.com/xoseperez/espurna/blob/dev/code/espurna/mqtt.ino#L257

if (topic.endsWith("/")) topic.remove(_mqtt_topic.length()-1);

it should not be there:

if (topic.endsWith("/")) topic.remove(topic.length()-1);

@mcspr
Copy link
Collaborator Author

@mcspr mcspr commented on 334b499 Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Just so that you know, if you created a PR for this I wouldn't need to remember this comment ;)

@reaper7
Copy link
Contributor

@reaper7 reaper7 commented on 334b499 Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcspr - I'm not sure if this causes a mistake.

@mcspr
Copy link
Collaborator Author

@mcspr mcspr commented on 334b499 Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried it? It looks like it. We are modifying local topic var based on data from _mqtt_topic, which will later be _mqtt_topic = topic

@reaper7
Copy link
Contributor

@reaper7 reaper7 commented on 334b499 Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried it?

not yet, give me a few minutes to check it out

@reaper7
Copy link
Contributor

@reaper7 reaper7 commented on 334b499 Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is it...
PR done: #1755

Please sign in to comment.