1 /*
2 * ebusd - daemon for communication with eBUS heating systems.
3 * Copyright (C) 2016-2021 John Baier <ebusd@ebusd.eu>
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #ifdef HAVE_CONFIG_H
20 # include <config.h>
21 #endif
22
23 #include "ebusd/mqtthandler.h"
24 #include <csignal>
25 #include <deque>
26 #include "lib/utils/log.h"
27
28 namespace ebusd {
29
30 using std::dec;
31
32 #define O_HOST 1
33 #define O_PORT (O_HOST+1)
34 #define O_CLID (O_PORT+1)
35 #define O_USER (O_CLID+1)
36 #define O_PASS (O_USER+1)
37 #define O_TOPI (O_PASS+1)
38 #define O_RETA (O_TOPI+1)
39 #define O_JSON (O_RETA+1)
40 #define O_LOGL (O_JSON+1)
41 #define O_VERS (O_LOGL+1)
42 #define O_IGIN (O_VERS+1)
43 #define O_CHGS (O_IGIN+1)
44 #define O_CAFI (O_CHGS+1)
45 #define O_CERT (O_CAFI+1)
46 #define O_KEYF (O_CERT+1)
47 #define O_KEPA (O_KEYF+1)
48 #define O_INSE (O_KEPA+1)
49 #define O_VERB (O_INSE+1)
50
51 /** the definition of the MQTT arguments. */
52 static const struct argp_option g_mqtt_argp_options[] = {
53 {nullptr, 0, nullptr, 0, "MQTT options:", 1 },
54 {"mqtthost", O_HOST, "HOST", 0, "Connect to MQTT broker on HOST [localhost]", 0 },
55 {"mqttport", O_PORT, "PORT", 0, "Connect to MQTT broker on PORT (usually 1883), 0 to disable [0]", 0 },
56 {"mqttclientid", O_CLID, "ID", 0, "Set client ID for connection to MQTT broker [" PACKAGE_NAME "_"
57 PACKAGE_VERSION "_<pid>]", 0 },
58 {"mqttuser", O_USER, "USER", 0, "Connect as USER to MQTT broker (no default)", 0 },
59 {"mqttpass", O_PASS, "PASSWORD", 0, "Use PASSWORD when connecting to MQTT broker (no default)", 0 },
60 {"mqtttopic", O_TOPI, "TOPIC", 0,
61 "Use MQTT TOPIC (prefix before /%circuit/%name or complete format) [ebusd]", 0 },
62 {"mqttretain", O_RETA, nullptr, 0, "Retain all topics instead of only selected global ones", 0 },
63 {"mqttjson", O_JSON, nullptr, 0, "Publish in JSON format instead of strings", 0 },
64 {"mqttverbose", O_VERB, nullptr, 0, "Publish all available attributes", 0 },
65 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
66 {"mqttlog", O_LOGL, nullptr, 0, "Log library events", 0 },
67 #endif
68 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1004001)
69 {"mqttversion", O_VERS, "VERSION", 0, "Use protocol VERSION [3.1]", 0 },
70 #endif
71 {"mqttignoreinvalid", O_IGIN, nullptr, 0,
72 "Ignore invalid parameters during init (e.g. for DNS not resolvable yet)", 0 },
73 {"mqttchanges", O_CHGS, nullptr, 0, "Whether to only publish changed messages instead of all received", 0 },
74
75 #if (LIBMOSQUITTO_MAJOR >= 1)
76 {"mqttca", O_CAFI, "CA", 0, "Use CA file or dir (ending with '/') for MQTT TLS (no default)", 0 },
77 {"mqttcert", O_CERT, "CERTFILE", 0, "Use CERTFILE for MQTT TLS client certificate (no default)", 0 },
78 {"mqttkey", O_KEYF, "KEYFILE", 0, "Use KEYFILE for MQTT TLS client certificate (no default)", 0 },
79 {"mqttkeypass", O_KEPA, "PASSWORD", 0, "Use PASSWORD for the encrypted KEYFILE (no default)", 0 },
80 {"mqttinsecure", O_INSE, nullptr, 0, "Allow insecure TLS connection (e.g. using a self signed certificate)", 0 },
81 #endif
82
83 {nullptr, 0, nullptr, 0, nullptr, 0 },
84 };
85
86 static const char* g_host = "localhost"; //!< host name of MQTT broker [localhost]
87 static uint16_t g_port = 0; //!< optional port of MQTT broker, 0 to disable [0]
88 static const char* g_clientId = nullptr; //!< optional clientid override for MQTT broker
89 static const char* g_username = nullptr; //!< optional user name for MQTT broker (no default)
90 static const char* g_password = nullptr; //!< optional password for MQTT broker (no default)
91 /** the MQTT topic string parts. */
92 static vector<string> g_topicStrs;
93 /** the MQTT topic field parts. */
94 static vector<string> g_topicFields;
95 static bool g_retain = false; //!< whether to retail all topics
96 static OutputFormat g_publishFormat = OF_NONE; //!< the OutputFormat for publishing messages
97 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
98 static bool g_logFromLib = false; //!< log library events
99 #endif
100 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1004001)
101 static int g_version = MQTT_PROTOCOL_V31; //!< protocol version to use
102 #endif
103 static bool g_ignoreInvalidParams = false; //!< ignore invalid parameters during init
104 static bool g_onlyChanges = false; //!< whether to only publish changed messages instead of all received
105
106 #if (LIBMOSQUITTO_MAJOR >= 1)
107 static const char* g_cafile = nullptr; //!< CA file for TLS
108 static const char* g_capath = nullptr; //!< CA path for TLS
109 static const char* g_certfile = nullptr; //!< client certificate file for TLS
110 static const char* g_keyfile = nullptr; //!< client key file for TLS
111 static const char* g_keypass = nullptr; //!< client key file password for TLS
112 static bool g_insecure = false; //!< whether to allow insecure TLS connection
113 #endif
114
115 bool parseTopic(const string& topic, vector<string>* strs, vector<string>* fields);
116
replaceSecret(char * arg)117 static char* replaceSecret(char *arg) {
118 char* ret = strdup(arg);
119 int cnt = 0;
120 while (*arg && cnt++ < 256) {
121 *arg++ = ' ';
122 }
123 return ret;
124 }
125
126 /**
127 * The MQTT argument parsing function.
128 * @param key the key from @a g_mqtt_argp_options.
129 * @param arg the option argument, or nullptr.
130 * @param state the parsing state.
131 */
mqtt_parse_opt(int key,char * arg,struct argp_state * state)132 static error_t mqtt_parse_opt(int key, char *arg, struct argp_state *state) {
133 result_t result = RESULT_OK;
134
135 switch (key) {
136 case O_HOST: // --mqtthost=localhost
137 if (arg == nullptr || arg[0] == 0) {
138 argp_error(state, "invalid mqtthost");
139 return EINVAL;
140 }
141 g_host = arg;
142 break;
143
144 case O_PORT: // --mqttport=1883
145 g_port = (uint16_t)parseInt(arg, 10, 1, 65535, &result);
146 if (result != RESULT_OK) {
147 argp_error(state, "invalid mqttport");
148 return EINVAL;
149 }
150 break;
151
152 case O_CLID: // --mqttclientid=clientid
153 if (arg == nullptr || arg[0] == 0) {
154 argp_error(state, "invalid mqttclientid");
155 return EINVAL;
156 }
157 g_clientId = arg;
158 break;
159
160 case O_USER: // --mqttuser=username
161 if (arg == nullptr) {
162 argp_error(state, "invalid mqttuser");
163 return EINVAL;
164 }
165 g_username = arg;
166 break;
167
168 case O_PASS: // --mqttpass=password
169 if (arg == nullptr) {
170 argp_error(state, "invalid mqttpass");
171 return EINVAL;
172 }
173 g_password = replaceSecret(arg);
174 break;
175
176 case O_TOPI: // --mqtttopic=ebusd
177 if (arg == nullptr || arg[0] == 0 || strchr(arg, '#') || strchr(arg, '+') || arg[strlen(arg)-1] == '/') {
178 argp_error(state, "invalid mqtttopic");
179 return EINVAL;
180 }
181 if (!parseTopic(arg, &g_topicStrs, &g_topicFields)) {
182 argp_error(state, "malformed mqtttopic");
183 }
184 break;
185
186 case O_RETA: // --mqttretain
187 g_retain = true;
188 break;
189
190 case O_JSON: // --mqttjson
191 g_publishFormat |= OF_JSON|OF_NAMES;
192 break;
193
194 case O_VERB: // --mqttverbose
195 g_publishFormat |= OF_NAMES|OF_UNITS|OF_COMMENTS|OF_ALL_ATTRS;
196 break;
197
198 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
199 case O_LOGL:
200 g_logFromLib = true;
201 break;
202 #endif
203
204 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1004001)
205 case O_VERS: // --mqttversion=3.1.1
206 if (arg == nullptr || arg[0] == 0 || (strcmp(arg, "3.1") != 0 && strcmp(arg, "3.1.1") != 0)) {
207 argp_error(state, "invalid mqttversion");
208 return EINVAL;
209 }
210 g_version = strcmp(arg, "3.1.1") == 0 ? MQTT_PROTOCOL_V311 : MQTT_PROTOCOL_V31;
211 break;
212 #endif
213
214 case O_IGIN:
215 g_ignoreInvalidParams = true;
216 break;
217
218 case O_CHGS:
219 g_onlyChanges = true;
220 break;
221
222 #if (LIBMOSQUITTO_MAJOR >= 1)
223 case O_CAFI: // --mqttca=file or --mqttca=dir/
224 if (arg == nullptr || arg[0] == 0) {
225 argp_error(state, "invalid mqttca");
226 return EINVAL;
227 }
228 if (arg[strlen(arg)-1] == '/') {
229 g_cafile = nullptr;
230 g_capath = arg;
231 } else {
232 g_cafile = arg;
233 g_capath = nullptr;
234 }
235 break;
236
237 case O_CERT: // --mqttcert=CERTFILE
238 if (arg == nullptr || arg[0] == 0) {
239 argp_error(state, "invalid mqttcert");
240 return EINVAL;
241 }
242 g_certfile = arg;
243 break;
244
245 case O_KEYF: // --mqttkey=KEYFILE
246 if (arg == nullptr || arg[0] == 0) {
247 argp_error(state, "invalid mqttkey");
248 return EINVAL;
249 }
250 g_keyfile = arg;
251 break;
252
253 case O_KEPA: // --mqttkeypass=PASSWORD
254 if (arg == nullptr) {
255 argp_error(state, "invalid mqttkeypass");
256 return EINVAL;
257 }
258 g_keypass = replaceSecret(arg);
259 break;
260 case O_INSE: // --mqttinsecure
261 g_insecure = true;
262 break;
263 #endif
264
265 default:
266 return ARGP_ERR_UNKNOWN;
267 }
268 return 0;
269 }
270
271 static const struct argp g_mqtt_argp = { g_mqtt_argp_options, mqtt_parse_opt, nullptr, nullptr, nullptr, nullptr,
272 nullptr };
273 static const struct argp_child g_mqtt_argp_child = {&g_mqtt_argp, 0, "", 1};
274
275
mqtthandler_getargs()276 const struct argp_child* mqtthandler_getargs() {
277 if (g_topicStrs.empty()) {
278 g_topicStrs.push_back(PACKAGE);
279 }
280 return &g_mqtt_argp_child;
281 }
282
check(int code,const char * method)283 bool check(int code, const char* method) {
284 if (code == MOSQ_ERR_SUCCESS) {
285 return true;
286 }
287 if (code == MOSQ_ERR_ERRNO) {
288 char* error = strerror(errno);
289 logOtherError("mqtt", "%s: errno %d=%s", method, errno, error);
290 return false;
291 }
292 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
293 const char* msg = mosquitto_strerror(code);
294 logOtherError("mqtt", "%s: %s", method, msg);
295 #else
296 logOtherError("mqtt", "%s: error code %d", method, code);
297 #endif
298 return false;
299 }
300
mqtthandler_register(UserInfo * userInfo,BusHandler * busHandler,MessageMap * messages,list<DataHandler * > * handlers)301 bool mqtthandler_register(UserInfo* userInfo, BusHandler* busHandler, MessageMap* messages,
302 list<DataHandler*>* handlers) {
303 if (g_port > 0) {
304 int major = -1;
305 int minor = -1;
306 int revision = -1;
307 mosquitto_lib_version(&major, &minor, &revision);
308 if (major < LIBMOSQUITTO_MAJOR) {
309 logOtherError("mqtt", "invalid mosquitto version %d instead of %d", major, LIBMOSQUITTO_MAJOR);
310 return false;
311 }
312 logOtherInfo("mqtt", "mosquitto version %d.%d.%d (compiled with %d.%d.%d)", major, minor, revision,
313 LIBMOSQUITTO_MAJOR, LIBMOSQUITTO_MINOR, LIBMOSQUITTO_REVISION);
314 handlers->push_back(new MqttHandler(userInfo, busHandler, messages));
315 }
316 return true;
317 }
318
319 /** the known topic field names. */
320 static const char* knownFieldNames[] = {
321 "circuit",
322 "name",
323 "field",
324 };
325
326 /** the number of known field names. */
327 static const size_t knownFieldCount = sizeof(knownFieldNames) / sizeof(char*);
328
329
330 /**
331 * Parse the topic template.
332 * @param topic the topic template.
333 * @param strs the @a vector to which the string parts shall be added.
334 * @param fields the @a vector to which the field parts shall be added.
335 * @return true on success, false on malformed topic template.
336 */
parseTopic(const string & topic,vector<string> * strs,vector<string> * fields)337 bool parseTopic(const string& topic, vector<string>* strs, vector<string>* fields) {
338 size_t lastpos = 0;
339 size_t end = topic.length();
340 vector<string> columns;
341 strs->clear();
342 fields->clear();
343 for (size_t pos=topic.find('%', lastpos); pos != string::npos; ) {
344 size_t idx = knownFieldCount;
345 size_t len = 0;
346 for (size_t i = 0; i < knownFieldCount; i++) {
347 len = strlen(knownFieldNames[i]);
348 if (topic.substr(pos+1, len) == knownFieldNames[i]) {
349 idx = i;
350 break;
351 }
352 }
353 if (idx == knownFieldCount) { // TODO could allow custom attributes here
354 return false;
355 }
356 string fieldName = knownFieldNames[idx];
357 for (const auto& it : *fields) {
358 if (it == fieldName) {
359 return false; // duplicate column
360 }
361 }
362 strs->push_back(topic.substr(lastpos, pos-lastpos));
363 fields->push_back(fieldName);
364 lastpos = pos+1+len;
365 pos = topic.find('%', lastpos);
366 }
367 if (lastpos < end) {
368 strs->push_back(topic.substr(lastpos, end-lastpos));
369 }
370 return true;
371 }
372
373 #if (LIBMOSQUITTO_MAJOR >= 1)
on_keypassword(char * buf,int size,int rwflag,void * userdata)374 int on_keypassword(char *buf, int size, int rwflag, void *userdata) {
375 if (!g_keypass) {
376 return 0;
377 }
378 int len = static_cast<int>(strlen(g_keypass));
379 if (len > size) {
380 len = size;
381 }
382 memcpy(buf, g_keypass, len);
383 return len;
384 }
385 #endif
386
on_connect(struct mosquitto * mosq,void * obj,int rc)387 void on_connect(
388 #if (LIBMOSQUITTO_MAJOR >= 1)
389 struct mosquitto *mosq,
390 #endif
391 void *obj, int rc) {
392 if (rc == 0) {
393 logOtherNotice("mqtt", "connection established");
394 MqttHandler* handler = reinterpret_cast<MqttHandler*>(obj);
395 if (handler) {
396 handler->notifyConnected();
397 }
398 } else {
399 if (rc >= 1 && rc <= 3) {
400 logOtherError("mqtt", "connection refused: %s",
401 rc == 1 ? "wrong protocol" : (rc == 2 ? "wrong username/password" : "broker down"));
402 } else {
403 logOtherError("mqtt", "connection refused: %d", rc);
404 }
405 }
406 }
407
408 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
on_log(struct mosquitto * mosq,void * obj,int level,const char * msg)409 void on_log(struct mosquitto *mosq, void *obj, int level, const char* msg) {
410 switch (level) {
411 case MOSQ_LOG_DEBUG:
412 logOtherDebug("mqtt", "log %s", msg);
413 break;
414 case MOSQ_LOG_INFO:
415 logOtherInfo("mqtt", "log %s", msg);
416 break;
417 case MOSQ_LOG_NOTICE:
418 logOtherNotice("mqtt", "log %s", msg);
419 break;
420 case MOSQ_LOG_WARNING:
421 logOtherNotice("mqtt", "log warning %s", msg);
422 break;
423 case MOSQ_LOG_ERR:
424 logOtherError("mqtt", "log %s", msg);
425 break;
426 default:
427 logOtherError("mqtt", "log other %s", msg);
428 break;
429 }
430 }
431 #endif
432
on_message(struct mosquitto * mosq,void * obj,const struct mosquitto_message * message)433 void on_message(
434 #if (LIBMOSQUITTO_MAJOR >= 1)
435 struct mosquitto *mosq,
436 #endif
437 void *obj, const struct mosquitto_message *message) {
438 MqttHandler* handler = reinterpret_cast<MqttHandler*>(obj);
439 if (!handler || !message || !handler->isRunning()) {
440 return;
441 }
442 string topic(message->topic);
443 string data(message->payloadlen > 0 ? reinterpret_cast<char*>(message->payload) : "");
444 handler->notifyTopic(topic, data);
445 }
446
447
MqttHandler(UserInfo * userInfo,BusHandler * busHandler,MessageMap * messages)448 MqttHandler::MqttHandler(UserInfo* userInfo, BusHandler* busHandler, MessageMap* messages)
449 : DataSink(userInfo, "mqtt"), DataSource(busHandler), WaitThread(), m_messages(messages), m_connected(false),
450 m_initialConnectFailed(false), m_lastUpdateCheckResult("."), m_lastScanStatus("."), m_lastErrorLogTime(0) {
451 m_publishByField = false;
452 m_mosquitto = nullptr;
453 if (g_topicFields.empty()) {
454 if (g_topicStrs.empty()) {
455 g_topicStrs.push_back("");
456 } else {
457 string str = g_topicStrs[0];
458 if (str.empty() || str[str.length()-1] != '/') {
459 g_topicStrs[0] = str+"/";
460 }
461 }
462 g_topicFields.push_back("circuit");
463 g_topicStrs.push_back("/");
464 g_topicFields.push_back("name");
465 } else {
466 for (size_t i = 0; i < g_topicFields.size(); i++) {
467 if (g_topicFields[i] == "field") {
468 m_publishByField = true;
469 break;
470 }
471 }
472 }
473 m_globalTopic = getTopic(nullptr, "global/");
474 m_subscribeTopic = getTopic(nullptr, "#");
475 if (check(mosquitto_lib_init(), "unable to initialize")) {
476 signal(SIGPIPE, SIG_IGN); // needed before libmosquitto v. 1.1.3
477 ostringstream clientId;
478 if (g_clientId) {
479 clientId << g_clientId;
480 } else {
481 clientId << PACKAGE_NAME << '_' << PACKAGE_VERSION << '_' << static_cast<unsigned>(getpid());
482 }
483 #if (LIBMOSQUITTO_MAJOR >= 1)
484 m_mosquitto = mosquitto_new(clientId.str().c_str(), true, this);
485 #else
486 m_mosquitto = mosquitto_new(clientId.str().c_str(), this);
487 #endif
488 if (!m_mosquitto) {
489 logOtherError("mqtt", "unable to instantiate");
490 }
491 }
492 if (m_mosquitto) {
493 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1004001)
494 check(mosquitto_threaded_set(m_mosquitto, true), "threaded_set");
495 check(mosquitto_opts_set(m_mosquitto, MOSQ_OPT_PROTOCOL_VERSION, reinterpret_cast<void*>(&g_version)),
496 "opts_set protocol version");
497 #endif
498 if (g_username || g_password) {
499 if (!g_username) {
500 g_username = PACKAGE;
501 }
502 if (mosquitto_username_pw_set(m_mosquitto, g_username, g_password) != MOSQ_ERR_SUCCESS) {
503 logOtherError("mqtt", "unable to set username/password, trying without");
504 }
505 }
506 string willTopic = m_globalTopic+"running";
507 string willData = "false";
508 size_t len = willData.length();
509 #if (LIBMOSQUITTO_MAJOR >= 1)
510 mosquitto_will_set(m_mosquitto, willTopic.c_str(), (uint32_t)len,
511 reinterpret_cast<const uint8_t*>(willData.c_str()), 0, true);
512 #else
513 mosquitto_will_set(m_mosquitto, true, willTopic.c_str(), (uint32_t)len,
514 reinterpret_cast<const uint8_t*>(willData.c_str()), 0, true);
515 #endif
516
517 #if (LIBMOSQUITTO_MAJOR >= 1)
518 if (g_cafile || g_capath) {
519 int ret;
520 ret = mosquitto_tls_set(m_mosquitto, g_cafile, g_capath, g_certfile, g_keyfile, on_keypassword);
521 if (ret != MOSQ_ERR_SUCCESS) {
522 logOtherError("mqtt", "unable to set TLS: %d", ret);
523 } else if (g_insecure) {
524 ret = mosquitto_tls_insecure_set(m_mosquitto, true);
525 if (ret != MOSQ_ERR_SUCCESS) {
526 logOtherError("mqtt", "unable to set TLS insecure: %d", ret);
527 }
528 }
529 }
530 #endif
531 #if (LIBMOSQUITTO_VERSION_NUMBER >= 1003001)
532 if (g_logFromLib) {
533 mosquitto_log_callback_set(m_mosquitto, on_log);
534 }
535 #endif
536 mosquitto_connect_callback_set(m_mosquitto, on_connect);
537 mosquitto_message_callback_set(m_mosquitto, on_message);
538 int ret;
539 #if (LIBMOSQUITTO_MAJOR >= 1)
540 ret = mosquitto_connect(m_mosquitto, g_host, g_port, 60);
541 #else
542 ret = mosquitto_connect(m_mosquitto, g_host, g_port, 60, true);
543 #endif
544 if (ret == MOSQ_ERR_INVAL && !g_ignoreInvalidParams) {
545 logOtherError("mqtt", "unable to connect (invalid parameters)");
546 mosquitto_destroy(m_mosquitto);
547 m_mosquitto = nullptr;
548 } else if (!check(ret, "unable to connect, retrying")) {
549 m_connected = false;
550 m_initialConnectFailed = g_ignoreInvalidParams;
551 } else {
552 m_connected = true; // assume success until connect_callback says otherwise
553 logOtherDebug("mqtt", "connection requested");
554 }
555 }
556 }
557
~MqttHandler()558 MqttHandler::~MqttHandler() {
559 join();
560 if (m_mosquitto) {
561 mosquitto_destroy(m_mosquitto);
562 m_mosquitto = nullptr;
563 }
564 mosquitto_lib_cleanup();
565 }
566
start()567 void MqttHandler::start() {
568 if (m_mosquitto) {
569 WaitThread::start("MQTT");
570 }
571 }
572
notifyConnected()573 void MqttHandler::notifyConnected() {
574 if (m_mosquitto && isRunning()) {
575 const string sep = (g_publishFormat & OF_JSON) ? "\"" : "";
576 publishTopic(m_globalTopic+"version", sep + (PACKAGE_STRING "." REVISION) + sep, true);
577 publishTopic(m_globalTopic+"running", "true", true);
578 check(mosquitto_subscribe(m_mosquitto, nullptr, m_subscribeTopic.c_str(), 0), "subscribe");
579 }
580 }
581
notifyTopic(const string & topic,const string & data)582 void MqttHandler::notifyTopic(const string& topic, const string& data) {
583 size_t pos = topic.rfind('/');
584 if (pos == string::npos) {
585 return;
586 }
587 string direction = topic.substr(pos+1);
588 if (direction.empty()) {
589 return;
590 }
591 bool isWrite = direction == "set";
592 bool isList = !isWrite && direction == "list";
593 if (!isWrite && !isList && direction != "get") {
594 return;
595 }
596
597 logOtherDebug("mqtt", "received topic %s with data %s", topic.c_str(), data.c_str());
598 string remain = topic.substr(0, pos);
599 size_t last = 0;
600 string circuit, name;
601 bool finalField = false;
602 for (size_t idx = 0; idx < g_topicStrs.size()+1 && !finalField; idx++) {
603 string field;
604 string chk;
605 if (idx < g_topicStrs.size()) {
606 chk = g_topicStrs[idx];
607 pos = remain.find(chk, last);
608 if (pos == string::npos) {
609 if (!isList) {
610 return;
611 }
612 if (idx == 0 && remain+"/" == chk) { // check for only first prefix, e.g. "ebusd/"
613 break;
614 }
615 pos = remain.size();
616 finalField = true;
617 }
618 } else if (idx-1 < g_topicFields.size()) {
619 pos = remain.size();
620 } else if (last < remain.size()) {
621 if (!isList) {
622 return;
623 }
624 break;
625 } else {
626 break;
627 }
628 field = remain.substr(last, pos-last);
629 last = pos+chk.size();
630 if (idx == 0) {
631 if (pos > 0) {
632 return;
633 }
634 } else {
635 if (field.empty()) {
636 if (!isList) {
637 return;
638 }
639 continue;
640 }
641 string fieldName = g_topicFields[idx-1];
642 if (fieldName == "circuit") {
643 circuit = field;
644 } else if (fieldName == "name") {
645 name = field;
646 } else if (fieldName == "field") {
647 // field = field; // TODO add support for writing a single field
648 } else {
649 return;
650 }
651 }
652 }
653 if (isList) {
654 logOtherInfo("mqtt", "received list topic for %s %s", circuit.c_str(), name.c_str());
655 deque<Message*> messages;
656 bool circuitPrefix = circuit.length() > 0 && circuit.find_last_of('*') == circuit.length()-1;
657 if (circuitPrefix) {
658 circuit = circuit.substr(0, circuit.length()-1);
659 }
660 bool namePrefix = name.length() > 0 && name.find_last_of('*') == name.length()-1;
661 if (namePrefix) {
662 name = name.substr(0, name.length()-1);
663 }
664 m_messages->findAll(circuit, name, m_levels, !(circuitPrefix || namePrefix), true, true,
665 true, true, true, 0, 0, false, &messages);
666 bool onlyWithData = !data.empty();
667 for (const auto message : messages) {
668 if ((circuitPrefix && (
669 message->getCircuit().substr(0, circuit.length()) != circuit
670 || (!namePrefix && name.length() > 0 && message->getName() != name)))
671 || (namePrefix && (
672 message->getName().substr(0, name.length()) != name
673 || (!circuitPrefix && circuit.length() > 0 && message->getCircuit() != circuit)))
674 ) {
675 continue;
676 }
677 time_t lastup = message->getLastUpdateTime();
678 if (onlyWithData && lastup == 0) {
679 continue;
680 }
681 ostringstream ostream;
682 publishMessage(message, &ostream, true);
683 }
684 return;
685 }
686 if (name.empty()) {
687 return;
688 }
689 logOtherInfo("mqtt", "received %s topic for %s %s", direction.c_str(), circuit.c_str(), name.c_str());
690 Message* message = m_messages->find(circuit, name, m_levels, isWrite);
691 if (message == nullptr) {
692 message = m_messages->find(circuit, name, m_levels, isWrite, true);
693 }
694 if (message == nullptr) {
695 logOtherError("mqtt", "%s message %s %s not found", isWrite?"write":"read", circuit.c_str(), name.c_str());
696 return;
697 }
698 if (!message->isPassive()) {
699 string useData = data;
700 if (!isWrite && !data.empty()) {
701 size_t pos = useData.find_last_of('?');
702 if (pos != string::npos && pos > 0 && useData[pos-1] != UI_FIELD_SEPARATOR) {
703 pos = string::npos;
704 }
705 if (pos != string::npos) {
706 string args = useData.substr(pos + 1);
707 useData = useData.substr(0, pos > 0 ? pos - 1 : pos);
708 if (!args.empty()) {
709 result_t ret = RESULT_OK;
710 size_t pollPriority = (size_t)parseInt(args.c_str(), 10, 1, 9, &ret);
711 if (ret == RESULT_OK && pollPriority > 0 && message->setPollPriority(pollPriority)) {
712 m_messages->addPollMessage(false, message);
713 }
714 }
715 }
716 }
717 result_t result = m_busHandler->readFromBus(message, useData);
718 if (result != RESULT_OK) {
719 logOtherError("mqtt", "%s %s %s: %s", isWrite?"write":"read", circuit.c_str(), name.c_str(),
720 getResultCode(result));
721 return;
722 }
723 logOtherNotice("mqtt", "%s %s %s: %s", isWrite?"write":"read", circuit.c_str(), name.c_str(), data.c_str());
724 }
725 ostringstream ostream;
726 publishMessage(message, &ostream);
727 }
728
notifyUpdateCheckResult(const string & checkResult)729 void MqttHandler::notifyUpdateCheckResult(const string& checkResult) {
730 if (checkResult != m_lastUpdateCheckResult) {
731 m_lastUpdateCheckResult = checkResult;
732 const string sep = (g_publishFormat & OF_JSON) ? "\"" : "";
733 publishTopic(m_globalTopic+"updatecheck", sep + (checkResult.empty() ? "OK" : checkResult) + sep, true);
734 }
735 }
736
notifyScanStatus(const string & scanStatus)737 void MqttHandler::notifyScanStatus(const string& scanStatus) {
738 if (scanStatus != m_lastScanStatus) {
739 m_lastScanStatus = scanStatus;
740 const string sep = (g_publishFormat & OF_JSON) ? "\"" : "";
741 publishTopic(m_globalTopic+"scan", sep + (scanStatus.empty() ? "OK" : scanStatus) + sep, true);
742 }
743 }
744
run()745 void MqttHandler::run() {
746 time_t lastTaskRun, now, start, lastSignal = 0, lastUpdates = 0;
747 bool signal = false;
748 string signalTopic = m_globalTopic+"signal";
749 string uptimeTopic = m_globalTopic+"uptime";
750 ostringstream updates;
751
752 time(&now);
753 start = lastTaskRun = now;
754 bool allowReconnect = false;
755 while (isRunning()) {
756 bool wasConnected = m_connected;
757 bool needsWait = handleTraffic(allowReconnect);
758 bool reconnected = !wasConnected && m_connected;
759 allowReconnect = false;
760 time(&now);
761 bool sendSignal = reconnected;
762 if (now < start) {
763 // clock skew
764 if (now < lastSignal) {
765 lastSignal -= lastTaskRun-now;
766 }
767 lastTaskRun = now;
768 } else if (now > lastTaskRun+15) {
769 allowReconnect = true;
770 sendSignal = true;
771 time_t uptime = now-start;
772 updates.str("");
773 updates.clear();
774 updates << dec << static_cast<unsigned>(uptime);
775 publishTopic(uptimeTopic, updates.str());
776 time(&lastTaskRun);
777 }
778 if (sendSignal) {
779 if (m_busHandler->hasSignal()) {
780 lastSignal = now;
781 if (!signal || reconnected) {
782 signal = true;
783 publishTopic(signalTopic, "true", true);
784 }
785 } else {
786 if (signal || reconnected) {
787 signal = false;
788 publishTopic(signalTopic, "false", true);
789 }
790 }
791 }
792 if (!m_updatedMessages.empty()) {
793 m_messages->lock();
794 if (m_connected) {
795 for (auto it = m_updatedMessages.begin(); it != m_updatedMessages.end(); ) {
796 const vector<Message*>* messages = m_messages->getByKey(it->first);
797 if (messages) {
798 for (auto message : *messages) {
799 if (message->getLastChangeTime() > 0 && message->isAvailable()
800 && (!g_onlyChanges || message->getLastChangeTime() > lastUpdates)) {
801 updates.str("");
802 updates.clear();
803 updates << dec;
804 publishMessage(message, &updates);
805 }
806 }
807 }
808 it = m_updatedMessages.erase(it);
809 }
810 time(&lastUpdates);
811 } else {
812 m_updatedMessages.clear();
813 }
814 m_messages->unlock();
815 }
816 if ((!m_connected && !Wait(5)) || (needsWait && !Wait(1))) {
817 break;
818 }
819 }
820 publishTopic(signalTopic, "false", true);
821 publishTopic(m_globalTopic+"scan", "", true); // clear retain of scan status
822 }
823
handleTraffic(bool allowReconnect)824 bool MqttHandler::handleTraffic(bool allowReconnect) {
825 if (!m_mosquitto) {
826 return false;
827 }
828 int ret;
829 #if (LIBMOSQUITTO_MAJOR >= 1)
830 ret = mosquitto_loop(m_mosquitto, -1, 1); // waits up to 1 second for network traffic
831 #else
832 ret = mosquitto_loop(m_mosquitto, -1); // waits up to 1 second for network traffic
833 #endif
834 if (!m_connected && (ret == MOSQ_ERR_NO_CONN || ret == MOSQ_ERR_CONN_LOST) && allowReconnect) {
835 if (m_initialConnectFailed) {
836 #if (LIBMOSQUITTO_MAJOR >= 1)
837 ret = mosquitto_connect(m_mosquitto, g_host, g_port, 60);
838 #else
839 ret = mosquitto_connect(m_mosquitto, g_host, g_port, 60, true);
840 #endif
841 if (ret == MOSQ_ERR_INVAL) {
842 logOtherError("mqtt", "unable to connect (invalid parameters), retrying");
843 }
844 if (ret == MOSQ_ERR_SUCCESS) {
845 m_initialConnectFailed = false;
846 }
847 } else {
848 ret = mosquitto_reconnect(m_mosquitto);
849 }
850 }
851 if (!m_connected && ret == MOSQ_ERR_SUCCESS) {
852 m_connected = true;
853 logOtherNotice("mqtt", "connection re-established");
854 }
855 if (!m_connected || ret == MOSQ_ERR_SUCCESS) {
856 return false;
857 }
858 if (ret == MOSQ_ERR_NO_CONN || ret == MOSQ_ERR_CONN_LOST || ret == MOSQ_ERR_CONN_REFUSED) {
859 logOtherError("mqtt", "communication error: %s", ret == MOSQ_ERR_NO_CONN ? "not connected"
860 : (ret == MOSQ_ERR_CONN_LOST ? "connection lost" : "connection refused"));
861 m_connected = false;
862 } else {
863 time_t now;
864 time(&now);
865 if (now > m_lastErrorLogTime + 10) { // log at most every 10 seconds
866 m_lastErrorLogTime = now;
867 check(ret, "communication error");
868 }
869 }
870 return true;
871 }
872
getTopic(const Message * message,const string & suffix,const string & fieldName)873 string MqttHandler::getTopic(const Message* message, const string& suffix, const string& fieldName) {
874 ostringstream ret;
875 for (size_t i = 0; i < g_topicStrs.size(); i++) {
876 ret << g_topicStrs[i];
877 if (!message) {
878 break;
879 }
880 if (i < g_topicFields.size()) {
881 if (g_topicFields[i] == "field") {
882 ret << fieldName;
883 } else {
884 message->dumpField(g_topicFields[i], false, OF_NONE, &ret);
885 }
886 }
887 }
888 if (!suffix.empty()) {
889 ret << suffix;
890 }
891 return ret.str();
892 }
893
publishMessage(const Message * message,ostringstream * updates,bool includeWithoutData)894 void MqttHandler::publishMessage(const Message* message, ostringstream* updates, bool includeWithoutData) {
895 OutputFormat outputFormat = g_publishFormat;
896 bool json = outputFormat & OF_JSON;
897 bool noData = includeWithoutData && message->getLastUpdateTime() == 0;
898 if (!m_publishByField) {
899 if (noData) {
900 publishEmptyTopic(getTopic(message)); // alternatively: , json ? "null" : "");
901 return;
902 }
903 if (json) {
904 *updates << "{";
905 }
906 result_t result = message->decodeLastData(false, nullptr, -1, outputFormat, updates);
907 if (result != RESULT_OK) {
908 logOtherError("mqtt", "decode %s %s: %s", message->getCircuit().c_str(), message->getName().c_str(),
909 getResultCode(result));
910 return;
911 }
912 if (json) {
913 *updates << "}";
914 }
915 publishTopic(getTopic(message), updates->str());
916 return;
917 }
918 if (json && !(outputFormat & OF_ALL_ATTRS)) {
919 outputFormat |= OF_SHORT;
920 }
921 for (size_t index = 0; index < message->getFieldCount(); index++) {
922 string name = message->getFieldName(index);
923 if (noData) {
924 publishEmptyTopic(getTopic(message, "", name)); // alternatively: , json ? "null" : "");
925 continue;
926 }
927 result_t result = message->decodeLastData(false, nullptr, index, outputFormat, updates);
928 if (result != RESULT_OK) {
929 logOtherError("mqtt", "decode %s %s %s: %s", message->getCircuit().c_str(), message->getName().c_str(),
930 name.c_str(), getResultCode(result));
931 return;
932 }
933 publishTopic(getTopic(message, "", name), updates->str());
934 updates->str("");
935 updates->clear();
936 }
937 }
938
publishTopic(const string & topic,const string & data,bool retain)939 void MqttHandler::publishTopic(const string& topic, const string& data, bool retain) {
940 const char* topicStr = topic.c_str();
941 const char* dataStr = data.c_str();
942 const size_t len = strlen(dataStr);
943 logOtherDebug("mqtt", "publish %s %s", topicStr, dataStr);
944 check(mosquitto_publish(m_mosquitto, nullptr, topicStr, (uint32_t)len,
945 reinterpret_cast<const uint8_t*>(dataStr), 0, g_retain || retain), "publish");
946 }
947
publishEmptyTopic(const string & topic)948 void MqttHandler::publishEmptyTopic(const string& topic) {
949 const char* topicStr = topic.c_str();
950 logOtherDebug("mqtt", "publish empty %s", topicStr);
951 check(mosquitto_publish(m_mosquitto, nullptr, topicStr, 0, nullptr, 0, g_retain), "publish empty");
952 }
953
954 } // namespace ebusd
955