1 /*
2  * ebusd - daemon for communication with eBUS heating systems.
3  * Copyright (C) 2016-2021 John Baier <ebusd@ebusd.eu>
4  *
5  * This program is free software: you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation, either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #ifdef HAVE_CONFIG_H
20 #  include <config.h>
21 #endif
22 
23 #include "ebusd/mqtthandler.h"
24 #include <csignal>
25 #include <deque>
26 #include "lib/utils/log.h"
27 
28 namespace ebusd {
29 
30 using std::dec;
31 
32 #define O_HOST 1
33 #define O_PORT (O_HOST+1)
34 #define O_CLID (O_PORT+1)
35 #define O_USER (O_CLID+1)
36 #define O_PASS (O_USER+1)
37 #define O_TOPI (O_PASS+1)
38 #define O_RETA (O_TOPI+1)
39 #define O_JSON (O_RETA+1)
40 #define O_LOGL (O_JSON+1)
41 #define O_VERS (O_LOGL+1)
42 #define O_IGIN (O_VERS+1)
43 #define O_CHGS (O_IGIN+1)
44 #define O_CAFI (O_CHGS+1)
45 #define O_CERT (O_CAFI+1)
46 #define O_KEYF (O_CERT+1)
47 #define O_KEPA (O_KEYF+1)
48 #define O_INSE (O_KEPA+1)
49 #define O_VERB (O_INSE+1)
50 
51 /** the definition of the MQTT arguments. */
52 static const struct argp_option g_mqtt_argp_options[] = {
53   {nullptr,        0,      nullptr,       0, "MQTT options:", 1 },
54   {"mqtthost",     O_HOST, "HOST",        0, "Connect to MQTT broker on HOST [localhost]", 0 },
55   {"mqttport",     O_PORT, "PORT",        0, "Connect to MQTT broker on PORT (usually 1883), 0 to disable [0]", 0 },
56   {"mqttclientid", O_CLID, "ID",         0, "Set client ID for connection to MQTT broker [" PACKAGE_NAME "_"
57    PACKAGE_VERSION "_<pid>]", 0 },
58   {"mqttuser",     O_USER, "USER",        0, "Connect as USER to MQTT broker (no default)", 0 },
59   {"mqttpass",     O_PASS, "PASSWORD",    0, "Use PASSWORD when connecting to MQTT broker (no default)", 0 },
60   {"mqtttopic",    O_TOPI, "TOPIC",       0,
61    "Use MQTT TOPIC (prefix before /%circuit/%name or complete format) [ebusd]", 0 },
62   {"mqttretain",   O_RETA, nullptr,       0, "Retain all topics instead of only selected global ones", 0 },
63   {"mqttjson",     O_JSON, nullptr,       0, "Publish in JSON format instead of strings", 0 },
64   {"mqttverbose",  O_VERB, nullptr,       0, "Publish all available attributes", 0 },
65 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
66   {"mqttlog",      O_LOGL, nullptr,       0, "Log library events", 0 },
67 #endif
68 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1004001)
69   {"mqttversion",  O_VERS, "VERSION",     0, "Use protocol VERSION [3.1]", 0 },
70 #endif
71   {"mqttignoreinvalid", O_IGIN, nullptr, 0,
72    "Ignore invalid parameters during init (e.g. for DNS not resolvable yet)", 0 },
73   {"mqttchanges",  O_CHGS, nullptr,       0, "Whether to only publish changed messages instead of all received", 0 },
74 
75 #if (LIBMOSQUITTO_MAJOR >= 1)
76   {"mqttca",       O_CAFI, "CA",          0, "Use CA file or dir (ending with '/') for MQTT TLS (no default)", 0 },
77   {"mqttcert",     O_CERT, "CERTFILE",    0, "Use CERTFILE for MQTT TLS client certificate (no default)", 0 },
78   {"mqttkey",      O_KEYF, "KEYFILE",     0, "Use KEYFILE for MQTT TLS client certificate (no default)", 0 },
79   {"mqttkeypass",  O_KEPA, "PASSWORD",    0, "Use PASSWORD for the encrypted KEYFILE (no default)", 0 },
80   {"mqttinsecure", O_INSE, nullptr,       0, "Allow insecure TLS connection (e.g. using a self signed certificate)", 0 },
81 #endif
82 
83   {nullptr,        0,      nullptr,       0, nullptr, 0 },
84 };
85 
86 static const char* g_host = "localhost";  //!< host name of MQTT broker [localhost]
87 static uint16_t g_port = 0;               //!< optional port of MQTT broker, 0 to disable [0]
88 static const char* g_clientId = nullptr;  //!< optional clientid override for MQTT broker
89 static const char* g_username = nullptr;  //!< optional user name for MQTT broker (no default)
90 static const char* g_password = nullptr;  //!< optional password for MQTT broker (no default)
91 /** the MQTT topic string parts. */
92 static vector<string> g_topicStrs;
93 /** the MQTT topic field parts. */
94 static vector<string> g_topicFields;
95 static bool g_retain = false;             //!< whether to retail all topics
96 static OutputFormat g_publishFormat = OF_NONE;  //!< the OutputFormat for publishing messages
97 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
98 static bool g_logFromLib = false;         //!< log library events
99 #endif
100 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1004001)
101 static int g_version = MQTT_PROTOCOL_V31;  //!< protocol version to use
102 #endif
103 static bool g_ignoreInvalidParams = false;  //!< ignore invalid parameters during init
104 static bool g_onlyChanges = false;        //!< whether to only publish changed messages instead of all received
105 
106 #if (LIBMOSQUITTO_MAJOR >= 1)
107 static const char* g_cafile = nullptr;    //!< CA file for TLS
108 static const char* g_capath = nullptr;    //!< CA path for TLS
109 static const char* g_certfile = nullptr;  //!< client certificate file for TLS
110 static const char* g_keyfile = nullptr;   //!< client key file for TLS
111 static const char* g_keypass = nullptr;   //!< client key file password for TLS
112 static bool g_insecure = false;           //!< whether to allow insecure TLS connection
113 #endif
114 
115 bool parseTopic(const string& topic, vector<string>* strs, vector<string>* fields);
116 
replaceSecret(char * arg)117 static char* replaceSecret(char *arg) {
118   char* ret = strdup(arg);
119   int cnt = 0;
120   while (*arg && cnt++ < 256) {
121     *arg++ = ' ';
122   }
123   return ret;
124 }
125 
126 /**
127  * The MQTT argument parsing function.
128  * @param key the key from @a g_mqtt_argp_options.
129  * @param arg the option argument, or nullptr.
130  * @param state the parsing state.
131  */
mqtt_parse_opt(int key,char * arg,struct argp_state * state)132 static error_t mqtt_parse_opt(int key, char *arg, struct argp_state *state) {
133   result_t result = RESULT_OK;
134 
135   switch (key) {
136   case O_HOST:  // --mqtthost=localhost
137     if (arg == nullptr || arg[0] == 0) {
138       argp_error(state, "invalid mqtthost");
139       return EINVAL;
140     }
141     g_host = arg;
142     break;
143 
144   case O_PORT:  // --mqttport=1883
145     g_port = (uint16_t)parseInt(arg, 10, 1, 65535, &result);
146     if (result != RESULT_OK) {
147       argp_error(state, "invalid mqttport");
148       return EINVAL;
149     }
150     break;
151 
152   case O_CLID:  // --mqttclientid=clientid
153     if (arg == nullptr || arg[0] == 0) {
154       argp_error(state, "invalid mqttclientid");
155       return EINVAL;
156     }
157     g_clientId = arg;
158     break;
159 
160   case O_USER:  // --mqttuser=username
161     if (arg == nullptr) {
162       argp_error(state, "invalid mqttuser");
163       return EINVAL;
164     }
165     g_username = arg;
166     break;
167 
168   case O_PASS:  // --mqttpass=password
169     if (arg == nullptr) {
170       argp_error(state, "invalid mqttpass");
171       return EINVAL;
172     }
173     g_password = replaceSecret(arg);
174     break;
175 
176   case O_TOPI:  // --mqtttopic=ebusd
177     if (arg == nullptr || arg[0] == 0 || strchr(arg, '#') || strchr(arg, '+') || arg[strlen(arg)-1] == '/') {
178       argp_error(state, "invalid mqtttopic");
179       return EINVAL;
180     }
181     if (!parseTopic(arg, &g_topicStrs, &g_topicFields)) {
182       argp_error(state, "malformed mqtttopic");
183     }
184     break;
185 
186   case O_RETA:  // --mqttretain
187     g_retain = true;
188     break;
189 
190   case O_JSON:  // --mqttjson
191     g_publishFormat |= OF_JSON|OF_NAMES;
192     break;
193 
194   case O_VERB:  // --mqttverbose
195     g_publishFormat |= OF_NAMES|OF_UNITS|OF_COMMENTS|OF_ALL_ATTRS;
196     break;
197 
198 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
199   case O_LOGL:
200     g_logFromLib = true;
201     break;
202 #endif
203 
204 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1004001)
205   case O_VERS:  // --mqttversion=3.1.1
206     if (arg == nullptr || arg[0] == 0 || (strcmp(arg, "3.1") != 0 && strcmp(arg, "3.1.1") != 0)) {
207       argp_error(state, "invalid mqttversion");
208       return EINVAL;
209     }
210     g_version = strcmp(arg, "3.1.1") == 0 ? MQTT_PROTOCOL_V311 : MQTT_PROTOCOL_V31;
211     break;
212 #endif
213 
214   case O_IGIN:
215     g_ignoreInvalidParams = true;
216     break;
217 
218   case O_CHGS:
219     g_onlyChanges = true;
220     break;
221 
222 #if (LIBMOSQUITTO_MAJOR >= 1)
223     case O_CAFI:  // --mqttca=file or --mqttca=dir/
224       if (arg == nullptr || arg[0] == 0) {
225         argp_error(state, "invalid mqttca");
226         return EINVAL;
227       }
228       if (arg[strlen(arg)-1] == '/') {
229         g_cafile = nullptr;
230         g_capath = arg;
231       } else {
232         g_cafile = arg;
233         g_capath = nullptr;
234       }
235       break;
236 
237     case O_CERT:  // --mqttcert=CERTFILE
238       if (arg == nullptr || arg[0] == 0) {
239         argp_error(state, "invalid mqttcert");
240         return EINVAL;
241       }
242       g_certfile = arg;
243       break;
244 
245     case O_KEYF:  // --mqttkey=KEYFILE
246       if (arg == nullptr || arg[0] == 0) {
247         argp_error(state, "invalid mqttkey");
248         return EINVAL;
249       }
250       g_keyfile = arg;
251       break;
252 
253     case O_KEPA:  // --mqttkeypass=PASSWORD
254       if (arg == nullptr) {
255         argp_error(state, "invalid mqttkeypass");
256         return EINVAL;
257       }
258       g_keypass = replaceSecret(arg);
259       break;
260     case O_INSE:  // --mqttinsecure
261       g_insecure = true;
262       break;
263 #endif
264 
265   default:
266     return ARGP_ERR_UNKNOWN;
267   }
268   return 0;
269 }
270 
271 static const struct argp g_mqtt_argp = { g_mqtt_argp_options, mqtt_parse_opt, nullptr, nullptr, nullptr, nullptr,
272   nullptr };
273 static const struct argp_child g_mqtt_argp_child = {&g_mqtt_argp, 0, "", 1};
274 
275 
mqtthandler_getargs()276 const struct argp_child* mqtthandler_getargs() {
277   if (g_topicStrs.empty()) {
278     g_topicStrs.push_back(PACKAGE);
279   }
280   return &g_mqtt_argp_child;
281 }
282 
check(int code,const char * method)283 bool check(int code, const char* method) {
284   if (code == MOSQ_ERR_SUCCESS) {
285     return true;
286   }
287   if (code == MOSQ_ERR_ERRNO) {
288     char* error = strerror(errno);
289     logOtherError("mqtt", "%s: errno %d=%s", method, errno, error);
290     return false;
291   }
292 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
293   const char* msg = mosquitto_strerror(code);
294   logOtherError("mqtt", "%s: %s", method, msg);
295 #else
296   logOtherError("mqtt", "%s: error code %d", method, code);
297 #endif
298   return false;
299 }
300 
mqtthandler_register(UserInfo * userInfo,BusHandler * busHandler,MessageMap * messages,list<DataHandler * > * handlers)301 bool mqtthandler_register(UserInfo* userInfo, BusHandler* busHandler, MessageMap* messages,
302     list<DataHandler*>* handlers) {
303   if (g_port > 0) {
304     int major = -1;
305     int minor = -1;
306     int revision = -1;
307     mosquitto_lib_version(&major, &minor, &revision);
308     if (major < LIBMOSQUITTO_MAJOR) {
309       logOtherError("mqtt", "invalid mosquitto version %d instead of %d", major, LIBMOSQUITTO_MAJOR);
310       return false;
311     }
312     logOtherInfo("mqtt", "mosquitto version %d.%d.%d (compiled with %d.%d.%d)", major, minor, revision,
313       LIBMOSQUITTO_MAJOR, LIBMOSQUITTO_MINOR, LIBMOSQUITTO_REVISION);
314     handlers->push_back(new MqttHandler(userInfo, busHandler, messages));
315   }
316   return true;
317 }
318 
319 /** the known topic field names. */
320 static const char* knownFieldNames[] = {
321   "circuit",
322   "name",
323   "field",
324 };
325 
326 /** the number of known field names. */
327 static const size_t knownFieldCount = sizeof(knownFieldNames) / sizeof(char*);
328 
329 
330 /**
331  * Parse the topic template.
332  * @param topic the topic template.
333  * @param strs the @a vector to which the string parts shall be added.
334  * @param fields the @a vector to which the field parts shall be added.
335  * @return true on success, false on malformed topic template.
336  */
parseTopic(const string & topic,vector<string> * strs,vector<string> * fields)337 bool parseTopic(const string& topic, vector<string>* strs, vector<string>* fields) {
338   size_t lastpos = 0;
339   size_t end = topic.length();
340   vector<string> columns;
341   strs->clear();
342   fields->clear();
343   for (size_t pos=topic.find('%', lastpos); pos != string::npos; ) {
344     size_t idx = knownFieldCount;
345     size_t len = 0;
346     for (size_t i = 0; i < knownFieldCount; i++) {
347       len = strlen(knownFieldNames[i]);
348       if (topic.substr(pos+1, len) == knownFieldNames[i]) {
349         idx = i;
350         break;
351       }
352     }
353     if (idx == knownFieldCount) {  // TODO could allow custom attributes here
354       return false;
355     }
356     string fieldName = knownFieldNames[idx];
357     for (const auto& it : *fields) {
358       if (it == fieldName) {
359         return false;  // duplicate column
360       }
361     }
362     strs->push_back(topic.substr(lastpos, pos-lastpos));
363     fields->push_back(fieldName);
364     lastpos = pos+1+len;
365     pos = topic.find('%', lastpos);
366   }
367   if (lastpos < end) {
368     strs->push_back(topic.substr(lastpos, end-lastpos));
369   }
370   return true;
371 }
372 
373 #if (LIBMOSQUITTO_MAJOR >= 1)
on_keypassword(char * buf,int size,int rwflag,void * userdata)374 int on_keypassword(char *buf, int size, int rwflag, void *userdata) {
375   if (!g_keypass) {
376     return 0;
377   }
378   int len = static_cast<int>(strlen(g_keypass));
379   if (len > size) {
380     len = size;
381   }
382   memcpy(buf, g_keypass, len);
383   return len;
384 }
385 #endif
386 
on_connect(struct mosquitto * mosq,void * obj,int rc)387 void on_connect(
388 #if (LIBMOSQUITTO_MAJOR >= 1)
389   struct mosquitto *mosq,
390 #endif
391   void *obj, int rc) {
392   if (rc == 0) {
393     logOtherNotice("mqtt", "connection established");
394     MqttHandler* handler = reinterpret_cast<MqttHandler*>(obj);
395     if (handler) {
396       handler->notifyConnected();
397     }
398   } else {
399     if (rc >= 1 && rc <= 3) {
400       logOtherError("mqtt", "connection refused: %s",
401                     rc == 1 ? "wrong protocol" : (rc == 2 ? "wrong username/password" : "broker down"));
402     } else {
403       logOtherError("mqtt", "connection refused: %d", rc);
404     }
405   }
406 }
407 
408 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
on_log(struct mosquitto * mosq,void * obj,int level,const char * msg)409 void on_log(struct mosquitto *mosq, void *obj, int level, const char* msg) {
410   switch (level) {
411   case MOSQ_LOG_DEBUG:
412     logOtherDebug("mqtt", "log %s", msg);
413     break;
414   case MOSQ_LOG_INFO:
415     logOtherInfo("mqtt", "log %s", msg);
416     break;
417   case MOSQ_LOG_NOTICE:
418     logOtherNotice("mqtt", "log %s", msg);
419     break;
420   case MOSQ_LOG_WARNING:
421     logOtherNotice("mqtt", "log warning %s", msg);
422     break;
423   case MOSQ_LOG_ERR:
424     logOtherError("mqtt", "log %s", msg);
425     break;
426   default:
427     logOtherError("mqtt", "log other %s", msg);
428     break;
429   }
430 }
431 #endif
432 
on_message(struct mosquitto * mosq,void * obj,const struct mosquitto_message * message)433 void on_message(
434 #if (LIBMOSQUITTO_MAJOR >= 1)
435   struct mosquitto *mosq,
436 #endif
437   void *obj, const struct mosquitto_message *message) {
438   MqttHandler* handler = reinterpret_cast<MqttHandler*>(obj);
439   if (!handler || !message || !handler->isRunning()) {
440     return;
441   }
442   string topic(message->topic);
443   string data(message->payloadlen > 0 ? reinterpret_cast<char*>(message->payload) : "");
444   handler->notifyTopic(topic, data);
445 }
446 
447 
MqttHandler(UserInfo * userInfo,BusHandler * busHandler,MessageMap * messages)448 MqttHandler::MqttHandler(UserInfo* userInfo, BusHandler* busHandler, MessageMap* messages)
449   : DataSink(userInfo, "mqtt"), DataSource(busHandler), WaitThread(), m_messages(messages), m_connected(false),
450     m_initialConnectFailed(false), m_lastUpdateCheckResult("."), m_lastScanStatus("."), m_lastErrorLogTime(0) {
451   m_publishByField = false;
452   m_mosquitto = nullptr;
453   if (g_topicFields.empty()) {
454     if (g_topicStrs.empty()) {
455       g_topicStrs.push_back("");
456     } else {
457       string str = g_topicStrs[0];
458       if (str.empty() || str[str.length()-1] != '/') {
459         g_topicStrs[0] = str+"/";
460       }
461     }
462     g_topicFields.push_back("circuit");
463     g_topicStrs.push_back("/");
464     g_topicFields.push_back("name");
465   } else {
466     for (size_t i = 0; i < g_topicFields.size(); i++) {
467       if (g_topicFields[i] == "field") {
468         m_publishByField = true;
469         break;
470       }
471     }
472   }
473   m_globalTopic = getTopic(nullptr, "global/");
474   m_subscribeTopic = getTopic(nullptr, "#");
475   if (check(mosquitto_lib_init(), "unable to initialize")) {
476     signal(SIGPIPE, SIG_IGN);  // needed before libmosquitto v. 1.1.3
477     ostringstream clientId;
478     if (g_clientId) {
479       clientId << g_clientId;
480     } else {
481       clientId << PACKAGE_NAME << '_' << PACKAGE_VERSION << '_' << static_cast<unsigned>(getpid());
482     }
483 #if (LIBMOSQUITTO_MAJOR >= 1)
484     m_mosquitto = mosquitto_new(clientId.str().c_str(), true, this);
485 #else
486     m_mosquitto = mosquitto_new(clientId.str().c_str(), this);
487 #endif
488     if (!m_mosquitto) {
489       logOtherError("mqtt", "unable to instantiate");
490     }
491   }
492   if (m_mosquitto) {
493 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1004001)
494     check(mosquitto_threaded_set(m_mosquitto, true), "threaded_set");
495     check(mosquitto_opts_set(m_mosquitto, MOSQ_OPT_PROTOCOL_VERSION, reinterpret_cast<void*>(&g_version)),
496        "opts_set protocol version");
497 #endif
498     if (g_username || g_password) {
499       if (!g_username) {
500         g_username = PACKAGE;
501       }
502       if (mosquitto_username_pw_set(m_mosquitto, g_username, g_password) != MOSQ_ERR_SUCCESS) {
503         logOtherError("mqtt", "unable to set username/password, trying without");
504       }
505     }
506     string willTopic = m_globalTopic+"running";
507     string willData = "false";
508     size_t len = willData.length();
509 #if (LIBMOSQUITTO_MAJOR >= 1)
510     mosquitto_will_set(m_mosquitto, willTopic.c_str(), (uint32_t)len,
511         reinterpret_cast<const uint8_t*>(willData.c_str()), 0, true);
512 #else
513     mosquitto_will_set(m_mosquitto, true, willTopic.c_str(), (uint32_t)len,
514         reinterpret_cast<const uint8_t*>(willData.c_str()), 0, true);
515 #endif
516 
517 #if (LIBMOSQUITTO_MAJOR >= 1)
518     if (g_cafile || g_capath) {
519       int ret;
520       ret = mosquitto_tls_set(m_mosquitto, g_cafile, g_capath, g_certfile, g_keyfile, on_keypassword);
521       if (ret != MOSQ_ERR_SUCCESS) {
522         logOtherError("mqtt", "unable to set TLS: %d", ret);
523       } else if (g_insecure) {
524         ret = mosquitto_tls_insecure_set(m_mosquitto, true);
525         if (ret != MOSQ_ERR_SUCCESS) {
526           logOtherError("mqtt", "unable to set TLS insecure: %d", ret);
527         }
528       }
529     }
530 #endif
531 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
532     if (g_logFromLib) {
533       mosquitto_log_callback_set(m_mosquitto, on_log);
534     }
535 #endif
536     mosquitto_connect_callback_set(m_mosquitto, on_connect);
537     mosquitto_message_callback_set(m_mosquitto, on_message);
538     int ret;
539 #if (LIBMOSQUITTO_MAJOR >= 1)
540     ret = mosquitto_connect(m_mosquitto, g_host, g_port, 60);
541 #else
542     ret = mosquitto_connect(m_mosquitto, g_host, g_port, 60, true);
543 #endif
544     if (ret == MOSQ_ERR_INVAL && !g_ignoreInvalidParams) {
545       logOtherError("mqtt", "unable to connect (invalid parameters)");
546       mosquitto_destroy(m_mosquitto);
547       m_mosquitto = nullptr;
548     } else if (!check(ret, "unable to connect, retrying")) {
549       m_connected = false;
550       m_initialConnectFailed = g_ignoreInvalidParams;
551     } else {
552       m_connected = true;  // assume success until connect_callback says otherwise
553       logOtherDebug("mqtt", "connection requested");
554     }
555   }
556 }
557 
~MqttHandler()558 MqttHandler::~MqttHandler() {
559   join();
560   if (m_mosquitto) {
561     mosquitto_destroy(m_mosquitto);
562     m_mosquitto = nullptr;
563   }
564   mosquitto_lib_cleanup();
565 }
566 
start()567 void MqttHandler::start() {
568   if (m_mosquitto) {
569     WaitThread::start("MQTT");
570   }
571 }
572 
notifyConnected()573 void MqttHandler::notifyConnected() {
574   if (m_mosquitto && isRunning()) {
575     const string sep = (g_publishFormat & OF_JSON) ? "\"" : "";
576     publishTopic(m_globalTopic+"version", sep + (PACKAGE_STRING "." REVISION) + sep, true);
577     publishTopic(m_globalTopic+"running", "true", true);
578     check(mosquitto_subscribe(m_mosquitto, nullptr, m_subscribeTopic.c_str(), 0), "subscribe");
579   }
580 }
581 
notifyTopic(const string & topic,const string & data)582 void MqttHandler::notifyTopic(const string& topic, const string& data) {
583   size_t pos = topic.rfind('/');
584   if (pos == string::npos) {
585     return;
586   }
587   string direction = topic.substr(pos+1);
588   if (direction.empty()) {
589     return;
590   }
591   bool isWrite = direction == "set";
592   bool isList = !isWrite && direction == "list";
593   if (!isWrite && !isList && direction != "get") {
594     return;
595   }
596 
597   logOtherDebug("mqtt", "received topic %s with data %s", topic.c_str(), data.c_str());
598   string remain = topic.substr(0, pos);
599   size_t last = 0;
600   string circuit, name;
601   bool finalField = false;
602   for (size_t idx = 0; idx < g_topicStrs.size()+1 && !finalField; idx++) {
603     string field;
604     string chk;
605     if (idx < g_topicStrs.size()) {
606       chk = g_topicStrs[idx];
607       pos = remain.find(chk, last);
608       if (pos == string::npos) {
609         if (!isList) {
610           return;
611         }
612         if (idx == 0 && remain+"/" == chk) {  // check for only first prefix, e.g. "ebusd/"
613           break;
614         }
615         pos = remain.size();
616         finalField = true;
617       }
618     } else if (idx-1 < g_topicFields.size()) {
619       pos = remain.size();
620     } else if (last < remain.size()) {
621       if (!isList) {
622         return;
623       }
624       break;
625     } else {
626       break;
627     }
628     field = remain.substr(last, pos-last);
629     last = pos+chk.size();
630     if (idx == 0) {
631       if (pos > 0) {
632         return;
633       }
634     } else {
635       if (field.empty()) {
636         if (!isList) {
637           return;
638         }
639         continue;
640       }
641       string fieldName = g_topicFields[idx-1];
642       if (fieldName == "circuit") {
643         circuit = field;
644       } else if (fieldName == "name") {
645         name = field;
646       } else if (fieldName == "field") {
647         // field = field;  // TODO add support for writing a single field
648       } else {
649         return;
650       }
651     }
652   }
653   if (isList) {
654     logOtherInfo("mqtt", "received list topic for %s %s", circuit.c_str(), name.c_str());
655     deque<Message*> messages;
656     bool circuitPrefix = circuit.length() > 0 && circuit.find_last_of('*') == circuit.length()-1;
657     if (circuitPrefix) {
658       circuit = circuit.substr(0, circuit.length()-1);
659     }
660     bool namePrefix = name.length() > 0 && name.find_last_of('*') == name.length()-1;
661     if (namePrefix) {
662       name = name.substr(0, name.length()-1);
663     }
664     m_messages->findAll(circuit, name, m_levels, !(circuitPrefix || namePrefix), true, true,
665                         true, true, true, 0, 0, false, &messages);
666     bool onlyWithData = !data.empty();
667     for (const auto message : messages) {
668       if ((circuitPrefix && (
669           message->getCircuit().substr(0, circuit.length()) != circuit
670           || (!namePrefix && name.length() > 0 && message->getName() != name)))
671       || (namePrefix && (
672           message->getName().substr(0, name.length()) != name
673           || (!circuitPrefix && circuit.length() > 0 && message->getCircuit() != circuit)))
674       ) {
675         continue;
676       }
677       time_t lastup = message->getLastUpdateTime();
678       if (onlyWithData && lastup == 0) {
679         continue;
680       }
681       ostringstream ostream;
682       publishMessage(message, &ostream, true);
683     }
684     return;
685   }
686   if (name.empty()) {
687     return;
688   }
689   logOtherInfo("mqtt", "received %s topic for %s %s", direction.c_str(), circuit.c_str(), name.c_str());
690   Message* message = m_messages->find(circuit, name, m_levels, isWrite);
691   if (message == nullptr) {
692     message = m_messages->find(circuit, name, m_levels, isWrite, true);
693   }
694   if (message == nullptr) {
695     logOtherError("mqtt", "%s message %s %s not found", isWrite?"write":"read", circuit.c_str(), name.c_str());
696     return;
697   }
698   if (!message->isPassive()) {
699     string useData = data;
700     if (!isWrite && !data.empty()) {
701       size_t pos = useData.find_last_of('?');
702       if (pos != string::npos && pos > 0 && useData[pos-1] != UI_FIELD_SEPARATOR) {
703         pos = string::npos;
704       }
705       if (pos != string::npos) {
706         string args = useData.substr(pos + 1);
707         useData = useData.substr(0, pos > 0 ? pos - 1 : pos);
708         if (!args.empty()) {
709           result_t ret = RESULT_OK;
710           size_t pollPriority = (size_t)parseInt(args.c_str(), 10, 1, 9, &ret);
711           if (ret == RESULT_OK && pollPriority > 0 && message->setPollPriority(pollPriority)) {
712             m_messages->addPollMessage(false, message);
713           }
714         }
715       }
716     }
717     result_t result = m_busHandler->readFromBus(message, useData);
718     if (result != RESULT_OK) {
719       logOtherError("mqtt", "%s %s %s: %s", isWrite?"write":"read", circuit.c_str(), name.c_str(),
720           getResultCode(result));
721       return;
722     }
723     logOtherNotice("mqtt", "%s %s %s: %s", isWrite?"write":"read", circuit.c_str(), name.c_str(), data.c_str());
724   }
725   ostringstream ostream;
726   publishMessage(message, &ostream);
727 }
728 
notifyUpdateCheckResult(const string & checkResult)729 void MqttHandler::notifyUpdateCheckResult(const string& checkResult) {
730   if (checkResult != m_lastUpdateCheckResult) {
731     m_lastUpdateCheckResult = checkResult;
732     const string sep = (g_publishFormat & OF_JSON) ? "\"" : "";
733     publishTopic(m_globalTopic+"updatecheck", sep + (checkResult.empty() ? "OK" : checkResult) + sep, true);
734   }
735 }
736 
notifyScanStatus(const string & scanStatus)737 void MqttHandler::notifyScanStatus(const string& scanStatus) {
738   if (scanStatus != m_lastScanStatus) {
739     m_lastScanStatus = scanStatus;
740     const string sep = (g_publishFormat & OF_JSON) ? "\"" : "";
741     publishTopic(m_globalTopic+"scan", sep + (scanStatus.empty() ? "OK" : scanStatus) + sep, true);
742   }
743 }
744 
run()745 void MqttHandler::run() {
746   time_t lastTaskRun, now, start, lastSignal = 0, lastUpdates = 0;
747   bool signal = false;
748   string signalTopic = m_globalTopic+"signal";
749   string uptimeTopic = m_globalTopic+"uptime";
750   ostringstream updates;
751 
752   time(&now);
753   start = lastTaskRun = now;
754   bool allowReconnect = false;
755   while (isRunning()) {
756     bool wasConnected = m_connected;
757     bool needsWait = handleTraffic(allowReconnect);
758     bool reconnected = !wasConnected && m_connected;
759     allowReconnect = false;
760     time(&now);
761     bool sendSignal = reconnected;
762     if (now < start) {
763       // clock skew
764       if (now < lastSignal) {
765         lastSignal -= lastTaskRun-now;
766       }
767       lastTaskRun = now;
768     } else if (now > lastTaskRun+15) {
769       allowReconnect = true;
770       sendSignal = true;
771       time_t uptime = now-start;
772       updates.str("");
773       updates.clear();
774       updates << dec << static_cast<unsigned>(uptime);
775       publishTopic(uptimeTopic, updates.str());
776       time(&lastTaskRun);
777     }
778     if (sendSignal) {
779       if (m_busHandler->hasSignal()) {
780         lastSignal = now;
781         if (!signal || reconnected) {
782           signal = true;
783           publishTopic(signalTopic, "true", true);
784         }
785       } else {
786         if (signal || reconnected) {
787           signal = false;
788           publishTopic(signalTopic, "false", true);
789         }
790       }
791     }
792     if (!m_updatedMessages.empty()) {
793       m_messages->lock();
794       if (m_connected) {
795         for (auto it = m_updatedMessages.begin(); it != m_updatedMessages.end(); ) {
796           const vector<Message*>* messages = m_messages->getByKey(it->first);
797           if (messages) {
798             for (auto message : *messages) {
799               if (message->getLastChangeTime() > 0 && message->isAvailable()
800               && (!g_onlyChanges || message->getLastChangeTime() > lastUpdates)) {
801                 updates.str("");
802                 updates.clear();
803                 updates << dec;
804                 publishMessage(message, &updates);
805               }
806             }
807           }
808           it = m_updatedMessages.erase(it);
809         }
810         time(&lastUpdates);
811       } else {
812         m_updatedMessages.clear();
813       }
814       m_messages->unlock();
815     }
816     if ((!m_connected && !Wait(5)) || (needsWait && !Wait(1))) {
817       break;
818     }
819   }
820   publishTopic(signalTopic, "false", true);
821   publishTopic(m_globalTopic+"scan", "", true);  // clear retain of scan status
822 }
823 
handleTraffic(bool allowReconnect)824 bool MqttHandler::handleTraffic(bool allowReconnect) {
825   if (!m_mosquitto) {
826     return false;
827   }
828   int ret;
829 #if (LIBMOSQUITTO_MAJOR >= 1)
830   ret = mosquitto_loop(m_mosquitto, -1, 1);  // waits up to 1 second for network traffic
831 #else
832   ret = mosquitto_loop(m_mosquitto, -1);  // waits up to 1 second for network traffic
833 #endif
834   if (!m_connected && (ret == MOSQ_ERR_NO_CONN || ret == MOSQ_ERR_CONN_LOST) && allowReconnect) {
835     if (m_initialConnectFailed) {
836 #if (LIBMOSQUITTO_MAJOR >= 1)
837       ret = mosquitto_connect(m_mosquitto, g_host, g_port, 60);
838 #else
839       ret = mosquitto_connect(m_mosquitto, g_host, g_port, 60, true);
840 #endif
841       if (ret == MOSQ_ERR_INVAL) {
842         logOtherError("mqtt", "unable to connect (invalid parameters), retrying");
843       }
844       if (ret == MOSQ_ERR_SUCCESS) {
845         m_initialConnectFailed = false;
846       }
847     } else {
848       ret = mosquitto_reconnect(m_mosquitto);
849     }
850   }
851   if (!m_connected && ret == MOSQ_ERR_SUCCESS) {
852     m_connected = true;
853     logOtherNotice("mqtt", "connection re-established");
854   }
855   if (!m_connected || ret == MOSQ_ERR_SUCCESS) {
856     return false;
857   }
858   if (ret == MOSQ_ERR_NO_CONN || ret == MOSQ_ERR_CONN_LOST || ret == MOSQ_ERR_CONN_REFUSED) {
859     logOtherError("mqtt", "communication error: %s", ret == MOSQ_ERR_NO_CONN ? "not connected"
860                   : (ret == MOSQ_ERR_CONN_LOST ? "connection lost" : "connection refused"));
861     m_connected = false;
862   } else {
863     time_t now;
864     time(&now);
865     if (now > m_lastErrorLogTime + 10) {  // log at most every 10 seconds
866       m_lastErrorLogTime = now;
867       check(ret, "communication error");
868     }
869   }
870   return true;
871 }
872 
getTopic(const Message * message,const string & suffix,const string & fieldName)873 string MqttHandler::getTopic(const Message* message, const string& suffix, const string& fieldName) {
874   ostringstream ret;
875   for (size_t i = 0; i < g_topicStrs.size(); i++) {
876     ret << g_topicStrs[i];
877     if (!message) {
878       break;
879     }
880     if (i < g_topicFields.size()) {
881       if (g_topicFields[i] == "field") {
882         ret << fieldName;
883       } else {
884         message->dumpField(g_topicFields[i], false, OF_NONE, &ret);
885       }
886     }
887   }
888   if (!suffix.empty()) {
889     ret << suffix;
890   }
891   return ret.str();
892 }
893 
publishMessage(const Message * message,ostringstream * updates,bool includeWithoutData)894 void MqttHandler::publishMessage(const Message* message, ostringstream* updates, bool includeWithoutData) {
895   OutputFormat outputFormat = g_publishFormat;
896   bool json = outputFormat & OF_JSON;
897   bool noData = includeWithoutData && message->getLastUpdateTime() == 0;
898   if (!m_publishByField) {
899     if (noData) {
900       publishEmptyTopic(getTopic(message));  // alternatively: , json ? "null" : "");
901       return;
902     }
903     if (json) {
904       *updates << "{";
905     }
906     result_t result = message->decodeLastData(false, nullptr, -1, outputFormat, updates);
907     if (result != RESULT_OK) {
908       logOtherError("mqtt", "decode %s %s: %s", message->getCircuit().c_str(), message->getName().c_str(),
909           getResultCode(result));
910       return;
911     }
912     if (json) {
913       *updates << "}";
914     }
915     publishTopic(getTopic(message), updates->str());
916     return;
917   }
918   if (json && !(outputFormat & OF_ALL_ATTRS)) {
919     outputFormat |= OF_SHORT;
920   }
921   for (size_t index = 0; index < message->getFieldCount(); index++) {
922     string name = message->getFieldName(index);
923     if (noData) {
924       publishEmptyTopic(getTopic(message, "", name));  // alternatively: , json ? "null" : "");
925       continue;
926     }
927     result_t result = message->decodeLastData(false, nullptr, index, outputFormat, updates);
928     if (result != RESULT_OK) {
929       logOtherError("mqtt", "decode %s %s %s: %s", message->getCircuit().c_str(), message->getName().c_str(),
930           name.c_str(), getResultCode(result));
931       return;
932     }
933     publishTopic(getTopic(message, "", name), updates->str());
934     updates->str("");
935     updates->clear();
936   }
937 }
938 
publishTopic(const string & topic,const string & data,bool retain)939 void MqttHandler::publishTopic(const string& topic, const string& data, bool retain) {
940   const char* topicStr = topic.c_str();
941   const char* dataStr = data.c_str();
942   const size_t len = strlen(dataStr);
943   logOtherDebug("mqtt", "publish %s %s", topicStr, dataStr);
944   check(mosquitto_publish(m_mosquitto, nullptr, topicStr, (uint32_t)len,
945       reinterpret_cast<const uint8_t*>(dataStr), 0, g_retain || retain), "publish");
946 }
947 
publishEmptyTopic(const string & topic)948 void MqttHandler::publishEmptyTopic(const string& topic) {
949   const char* topicStr = topic.c_str();
950   logOtherDebug("mqtt", "publish empty %s", topicStr);
951   check(mosquitto_publish(m_mosquitto, nullptr, topicStr, 0, nullptr, 0, g_retain), "publish empty");
952 }
953 
954 }  // namespace ebusd
955