1 /** @file
2     MQTT output for rtl_433 events
3 
4     Copyright (C) 2019 Christian Zuckschwerdt
5 
6     This program is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10 */
11 
12 // note: our unit header includes unistd.h for gethostname() via data.h
13 #include "output_mqtt.h"
14 #include "optparse.h"
15 #include "util.h"
16 #include "fatal.h"
17 #include "r_util.h"
18 
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <string.h>
22 
23 #include "mongoose.h"
24 
25 /* MQTT client abstraction */
26 
27 typedef struct mqtt_client {
28     struct mg_connect_opts connect_opts;
29     struct mg_send_mqtt_handshake_opts mqtt_opts;
30     struct mg_connection *conn;
31     int prev_status;
32     char address[253 + 6 + 1]; // dns max + port
33     char client_id[256];
34     uint16_t message_id;
35     int publish_flags; // MG_MQTT_RETAIN | MG_MQTT_QOS(0)
36 } mqtt_client_t;
37 
mqtt_client_event(struct mg_connection * nc,int ev,void * ev_data)38 static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data)
39 {
40     // note that while shutting down the ctx is NULL
41     mqtt_client_t *ctx = (mqtt_client_t *)nc->user_data;
42     // only valid in MG_EV_MQTT_ events
43     struct mg_mqtt_message *msg = (struct mg_mqtt_message *)ev_data;
44 
45     //if (ev != MG_EV_POLL)
46     //    fprintf(stderr, "MQTT user handler got event %d\n", ev);
47 
48     switch (ev) {
49     case MG_EV_CONNECT: {
50         int connect_status = *(int *)ev_data;
51         if (connect_status == 0) {
52             // Success
53             fprintf(stderr, "MQTT Connected...\n");
54             mg_set_protocol_mqtt(nc);
55             if (ctx)
56                 mg_send_mqtt_handshake_opt(nc, ctx->client_id, ctx->mqtt_opts);
57         }
58         else {
59             // Error, print only once
60             if (ctx && ctx->prev_status != connect_status)
61                 fprintf(stderr, "MQTT connect error: %s\n", strerror(connect_status));
62         }
63         if (ctx)
64             ctx->prev_status = connect_status;
65         break;
66     }
67     case MG_EV_MQTT_CONNACK:
68         if (msg->connack_ret_code != MG_EV_MQTT_CONNACK_ACCEPTED) {
69             fprintf(stderr, "MQTT Connection error: %u\n", msg->connack_ret_code);
70         }
71         else {
72             fprintf(stderr, "MQTT Connection established.\n");
73         }
74         break;
75     case MG_EV_MQTT_PUBACK:
76         fprintf(stderr, "MQTT Message publishing acknowledged (msg_id: %u)\n", msg->message_id);
77         break;
78     case MG_EV_MQTT_SUBACK:
79         fprintf(stderr, "MQTT Subscription acknowledged.\n");
80         break;
81     case MG_EV_MQTT_PUBLISH: {
82         fprintf(stderr, "MQTT Incoming message %.*s: %.*s\n", (int)msg->topic.len,
83                 msg->topic.p, (int)msg->payload.len, msg->payload.p);
84         break;
85     }
86     case MG_EV_CLOSE:
87         if (!ctx)
88             break; // shuttig down
89         if (ctx->prev_status == 0)
90             fprintf(stderr, "MQTT Connection failed...\n");
91         // reconnect
92         char const *error_string = NULL;
93         ctx->connect_opts.error_string = &error_string;
94         ctx->conn = mg_connect_opt(nc->mgr, ctx->address, mqtt_client_event, ctx->connect_opts);
95         ctx->connect_opts.error_string = NULL;
96         if (!ctx->conn) {
97             fprintf(stderr, "MQTT connect (%s) failed%s%s\n", ctx->address,
98                     error_string ? ": " : "", error_string ? error_string : "");
99         }
100         break;
101     }
102 }
103 
mqtt_client_init(struct mg_mgr * mgr,tls_opts_t * tls_opts,char const * host,char const * port,char const * user,char const * pass,char const * client_id,int retain,int qos)104 static mqtt_client_t *mqtt_client_init(struct mg_mgr *mgr, tls_opts_t *tls_opts, char const *host, char const *port, char const *user, char const *pass, char const *client_id, int retain, int qos)
105 {
106     mqtt_client_t *ctx = calloc(1, sizeof(*ctx));
107     if (!ctx)
108         FATAL_CALLOC("mqtt_client_init()");
109 
110     ctx->mqtt_opts.user_name = user;
111     ctx->mqtt_opts.password  = pass;
112     ctx->publish_flags  = MG_MQTT_QOS(qos) | (retain ? MG_MQTT_RETAIN : 0);
113     // TODO: these should be user configurable options
114     //ctx->opts.keepalive = 60;
115     //ctx->timeout = 10000L;
116     //ctx->cleansession = 1;
117     strncpy(ctx->client_id, client_id, sizeof(ctx->client_id));
118     ctx->client_id[sizeof(ctx->client_id) - 1] = '\0';
119 
120     // if the host is an IPv6 address it needs quoting
121     if (strchr(host, ':'))
122         snprintf(ctx->address, sizeof(ctx->address), "[%s]:%s", host, port);
123     else
124         snprintf(ctx->address, sizeof(ctx->address), "%s:%s", host, port);
125 
126     ctx->connect_opts.user_data = ctx;
127     if (tls_opts && tls_opts->tls_ca_cert) {
128 #if MG_ENABLE_SSL
129         ctx->connect_opts.ssl_cert          = tls_opts->tls_cert;
130         ctx->connect_opts.ssl_key           = tls_opts->tls_key;
131         ctx->connect_opts.ssl_ca_cert       = tls_opts->tls_ca_cert;
132         ctx->connect_opts.ssl_cipher_suites = tls_opts->tls_cipher_suites;
133         ctx->connect_opts.ssl_server_name   = tls_opts->tls_server_name;
134         ctx->connect_opts.ssl_psk_identity  = tls_opts->tls_psk_identity;
135         ctx->connect_opts.ssl_psk_key       = tls_opts->tls_psk_key;
136 #else
137         fprintf(stderr, "mqtts (TLS) not available\n");
138         exit(1);
139 #endif
140     }
141     char const *error_string = NULL;
142     ctx->connect_opts.error_string = &error_string;
143     ctx->conn = mg_connect_opt(mgr, ctx->address, mqtt_client_event, ctx->connect_opts);
144     ctx->connect_opts.error_string = NULL;
145     if (!ctx->conn) {
146         fprintf(stderr, "MQTT connect (%s) failed%s%s\n", ctx->address,
147                 error_string ? ": " : "", error_string ? error_string : "");
148         exit(1);
149     }
150 
151     return ctx;
152 }
153 
mqtt_client_publish(mqtt_client_t * ctx,char const * topic,char const * str)154 static void mqtt_client_publish(mqtt_client_t *ctx, char const *topic, char const *str)
155 {
156     if (!ctx->conn || !ctx->conn->proto_handler)
157         return;
158 
159     ctx->message_id++;
160     mg_mqtt_publish(ctx->conn, topic, ctx->message_id, ctx->publish_flags, str, strlen(str));
161 }
162 
mqtt_client_free(mqtt_client_t * ctx)163 static void mqtt_client_free(mqtt_client_t *ctx)
164 {
165     if (ctx && ctx->conn) {
166         ctx->conn->user_data = NULL;
167         ctx->conn->flags |= MG_F_CLOSE_IMMEDIATELY;
168     }
169     free(ctx);
170 }
171 
172 /* Helper */
173 
174 /// clean the topic inplace to [-.A-Za-z0-9], esp. not whitespace, +, #, /, $
mqtt_sanitize_topic(char * topic)175 static char *mqtt_sanitize_topic(char *topic)
176 {
177     for (char *p = topic; *p; ++p)
178         if (*p != '-' && *p != '.' && (*p < 'A' || *p > 'Z') && (*p < 'a' || *p > 'z') && (*p < '0' || *p > '9'))
179             *p = '_';
180 
181     return topic;
182 }
183 
184 /* MQTT printer */
185 
186 typedef struct {
187     struct data_output output;
188     mqtt_client_t *mqc;
189     char topic[256];
190     char hostname[64];
191     char *devices;
192     char *events;
193     char *states;
194     //char *homie;
195     //char *hass;
196 } data_output_mqtt_t;
197 
print_mqtt_array(data_output_t * output,data_array_t * array,char const * format)198 static void print_mqtt_array(data_output_t *output, data_array_t *array, char const *format)
199 {
200     data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
201 
202     char *orig = mqtt->topic + strlen(mqtt->topic); // save current topic
203 
204     for (int c = 0; c < array->num_values; ++c) {
205         sprintf(orig, "/%d", c);
206         print_array_value(output, array, format, c);
207     }
208     *orig = '\0'; // restore topic
209 }
210 
append_topic(char * topic,data_t * data)211 static char *append_topic(char *topic, data_t *data)
212 {
213     if (data->type == DATA_STRING) {
214         strcpy(topic, data->value.v_ptr);
215         mqtt_sanitize_topic(topic);
216         topic += strlen(data->value.v_ptr);
217     }
218     else if (data->type == DATA_INT) {
219         topic += sprintf(topic, "%d", data->value.v_int);
220     }
221     else {
222         fprintf(stderr, "Can't append data type %d to topic\n", data->type);
223     }
224 
225     return topic;
226 }
227 
expand_topic(char * topic,char const * format,data_t * data,char const * hostname)228 static char *expand_topic(char *topic, char const *format, data_t *data, char const *hostname)
229 {
230     // collect well-known top level keys
231     data_t *data_type    = NULL;
232     data_t *data_model   = NULL;
233     data_t *data_subtype = NULL;
234     data_t *data_channel = NULL;
235     data_t *data_id      = NULL;
236     data_t *data_protocol = NULL;
237     for (data_t *d = data; d; d = d->next) {
238         if (!strcmp(d->key, "type"))
239             data_type = d;
240         else if (!strcmp(d->key, "model"))
241             data_model = d;
242         else if (!strcmp(d->key, "subtype"))
243             data_subtype = d;
244         else if (!strcmp(d->key, "channel"))
245             data_channel = d;
246         else if (!strcmp(d->key, "id"))
247             data_id = d;
248         else if (!strcmp(d->key, "protocol")) // NOTE: needs "-M protocol"
249             data_protocol = d;
250     }
251 
252     // consume entire format string
253     while (format && *format) {
254         data_t *data_token  = NULL;
255         char const *string_token = NULL;
256         int leading_slash   = 0;
257         char const *t_start = NULL;
258         char const *t_end   = NULL;
259         char const *d_start = NULL;
260         char const *d_end   = NULL;
261         // copy until '['
262         while (*format && *format != '[')
263             *topic++ = *format++;
264         // skip '['
265         if (!*format)
266             break;
267         ++format;
268         // read slash
269         if (!leading_slash && (*format < 'a' || *format > 'z')) {
270             leading_slash = *format;
271             format++;
272         }
273         // read key until : or ]
274         t_start = t_end = format;
275         while (*format && *format != ':' && *format != ']' && *format != '[')
276             t_end = ++format;
277         // read default until ]
278         if (*format == ':') {
279             d_start = d_end = ++format;
280             while (*format && *format != ']' && *format != '[')
281                 d_end = ++format;
282         }
283         // check for proper closing
284         if (*format != ']') {
285             fprintf(stderr, "%s: unterminated token\n", __func__);
286             exit(1);
287         }
288         ++format;
289 
290         // resolve token
291         if (!strncmp(t_start, "hostname", t_end - t_start))
292             string_token = hostname;
293         else if (!strncmp(t_start, "type", t_end - t_start))
294             data_token = data_type;
295         else if (!strncmp(t_start, "model", t_end - t_start))
296             data_token = data_model;
297         else if (!strncmp(t_start, "subtype", t_end - t_start))
298             data_token = data_subtype;
299         else if (!strncmp(t_start, "channel", t_end - t_start))
300             data_token = data_channel;
301         else if (!strncmp(t_start, "id", t_end - t_start))
302             data_token = data_id;
303         else if (!strncmp(t_start, "protocol", t_end - t_start))
304             data_token = data_protocol;
305         else {
306             fprintf(stderr, "%s: unknown token \"%.*s\"\n", __func__, (int)(t_end - t_start), t_start);
307             exit(1);
308         }
309 
310         // append token or default
311         if (!data_token && !string_token && !d_start)
312             continue;
313         if (leading_slash)
314             *topic++ = leading_slash;
315         if (data_token)
316             topic = append_topic(topic, data_token);
317         else if (string_token)
318             topic += sprintf(topic, "%s", string_token);
319         else
320             topic += sprintf(topic, "%.*s", (int)(d_end - d_start), d_start);
321     }
322 
323     *topic = '\0';
324     return topic;
325 }
326 
327 // <prefix>[/type][/model][/subtype][/channel][/id]/battery: "OK"|"LOW"
print_mqtt_data(data_output_t * output,data_t * data,char const * format)328 static void print_mqtt_data(data_output_t *output, data_t *data, char const *format)
329 {
330     UNUSED(format);
331     data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
332 
333     char *orig = mqtt->topic + strlen(mqtt->topic); // save current topic
334     char *end  = orig;
335 
336     // top-level only
337     if (!*mqtt->topic) {
338         // collect well-known top level keys
339         data_t *data_model = NULL;
340         for (data_t *d = data; d; d = d->next) {
341             if (!strcmp(d->key, "model"))
342                 data_model = d;
343         }
344 
345         // "states" topic
346         if (!data_model) {
347             if (mqtt->states) {
348                 size_t message_size = 20000; // state message need a large buffer
349                 char *message       = malloc(message_size);
350                 if (!message) {
351                     WARN_MALLOC("print_mqtt_data()");
352                     return; // NOTE: skip output on alloc failure.
353                 }
354                 data_print_jsons(data, message, message_size);
355                 expand_topic(mqtt->topic, mqtt->states, data, mqtt->hostname);
356                 mqtt_client_publish(mqtt->mqc, mqtt->topic, message);
357                 *mqtt->topic = '\0'; // clear topic
358                 free(message);
359             }
360             return;
361         }
362 
363         // "events" topic
364         if (mqtt->events) {
365             char message[2048]; // we expect the biggest strings to be around 500 bytes.
366             data_print_jsons(data, message, sizeof(message));
367             expand_topic(mqtt->topic, mqtt->events, data, mqtt->hostname);
368             mqtt_client_publish(mqtt->mqc, mqtt->topic, message);
369             *mqtt->topic = '\0'; // clear topic
370         }
371 
372         // "devices" topic
373         if (!mqtt->devices) {
374             return;
375         }
376 
377         end = expand_topic(mqtt->topic, mqtt->devices, data, mqtt->hostname);
378     }
379 
380     while (data) {
381         if (!strcmp(data->key, "type")
382                 || !strcmp(data->key, "model")
383                 || !strcmp(data->key, "subtype")) {
384             // skip, except "id", "channel"
385         }
386         else {
387             // push topic
388             *end = '/';
389             strcpy(end + 1, data->key);
390             print_value(output, data->type, data->value, data->format);
391             *end = '\0'; // pop topic
392         }
393         data = data->next;
394     }
395     *orig = '\0'; // restore topic
396 }
397 
print_mqtt_string(data_output_t * output,char const * str,char const * format)398 static void print_mqtt_string(data_output_t *output, char const *str, char const *format)
399 {
400     UNUSED(format);
401     data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
402     mqtt_client_publish(mqtt->mqc, mqtt->topic, str);
403 }
404 
print_mqtt_double(data_output_t * output,double data,char const * format)405 static void print_mqtt_double(data_output_t *output, double data, char const *format)
406 {
407     char str[20];
408     // use scientific notation for very big/small values
409     if (data > 1e7 || data < 1e-4) {
410         snprintf(str, 20, "%g", data);
411     }
412     else {
413         int ret = snprintf(str, 20, "%.5f", data);
414         // remove trailing zeros, always keep one digit after the decimal point
415         char *p = str + ret - 1;
416         while (*p == '0' && p[-1] != '.') {
417             *p-- = '\0';
418         }
419     }
420 
421     print_mqtt_string(output, str, format);
422 }
423 
print_mqtt_int(data_output_t * output,int data,char const * format)424 static void print_mqtt_int(data_output_t *output, int data, char const *format)
425 {
426     char str[20];
427     snprintf(str, 20, "%d", data);
428     print_mqtt_string(output, str, format);
429 }
430 
data_output_mqtt_free(data_output_t * output)431 static void data_output_mqtt_free(data_output_t *output)
432 {
433     data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
434 
435     if (!mqtt)
436         return;
437 
438     free(mqtt->devices);
439     free(mqtt->events);
440     free(mqtt->states);
441     //free(mqtt->homie);
442     //free(mqtt->hass);
443 
444     mqtt_client_free(mqtt->mqc);
445 
446     free(mqtt);
447 }
448 
mqtt_topic_default(char const * topic,char const * base,char const * suffix)449 static char *mqtt_topic_default(char const *topic, char const *base, char const *suffix)
450 {
451     char path[256];
452     char const *p;
453     if (topic) {
454         p = topic;
455     }
456     else if (!base) {
457         p = suffix;
458     }
459     else {
460         snprintf(path, sizeof(path), "%s/%s", base, suffix);
461         p = path;
462     }
463 
464     char *ret = strdup(p);
465     if (!ret)
466         WARN_STRDUP("mqtt_topic_default()");
467     return ret;
468 }
469 
data_output_mqtt_create(struct mg_mgr * mgr,char * param,char const * dev_hint)470 struct data_output *data_output_mqtt_create(struct mg_mgr *mgr, char *param, char const *dev_hint)
471 {
472     data_output_mqtt_t *mqtt = calloc(1, sizeof(data_output_mqtt_t));
473     if (!mqtt)
474         FATAL_CALLOC("data_output_mqtt_create()");
475 
476     gethostname(mqtt->hostname, sizeof(mqtt->hostname) - 1);
477     mqtt->hostname[sizeof(mqtt->hostname) - 1] = '\0';
478     // only use hostname, not domain part
479     char *dot = strchr(mqtt->hostname, '.');
480     if (dot)
481         *dot = '\0';
482     //fprintf(stderr, "Hostname: %s\n", hostname);
483 
484     // generate a short deterministic client_id to identify this input device on restart
485     uint16_t host_crc = crc16((uint8_t *)mqtt->hostname, strlen(mqtt->hostname), 0x1021, 0xffff);
486     uint16_t devq_crc = crc16((uint8_t *)dev_hint, dev_hint ? strlen(dev_hint) : 0, 0x1021, 0xffff);
487     char client_id[17];
488     snprintf(client_id, sizeof(client_id), "rtl_433-%04x%04x", host_crc, devq_crc);
489 
490     // default base topic
491     char base_topic[8 + sizeof(mqtt->hostname)];
492     snprintf(base_topic, sizeof(base_topic), "rtl_433/%s", mqtt->hostname);
493 
494     // default topics
495     char const *path_devices = "devices[/type][/model][/subtype][/channel][/id]";
496     char const *path_events = "events";
497     char const *path_states = "states";
498 
499     char *user = NULL;
500     char *pass = NULL;
501     int retain = 0;
502     int qos = 0;
503 
504     // parse host and port
505     tls_opts_t tls_opts = {0};
506     if (strncmp(param, "mqtts", 5) == 0) {
507         tls_opts.tls_ca_cert = "*"; // TLS is enabled but no cert verification is performed.
508     }
509     param      = arg_param(param); // strip scheme
510     char *host = "localhost";
511     char *port = tls_opts.tls_ca_cert ? "8883" : "1883";
512     char *opts = hostport_param(param, &host, &port);
513     fprintf(stderr, "Publishing MQTT data to %s port %s%s\n", host, port, tls_opts.tls_ca_cert ? " (TLS)" : "");
514 
515     // parse auth and format options
516     char *key, *val;
517     while (getkwargs(&opts, &key, &val)) {
518         key = remove_ws(key);
519         val = trim_ws(val);
520         if (!key || !*key)
521             continue;
522         else if (!strcasecmp(key, "u") || !strcasecmp(key, "user"))
523             user = val;
524         else if (!strcasecmp(key, "p") || !strcasecmp(key, "pass"))
525             pass = val;
526         else if (!strcasecmp(key, "r") || !strcasecmp(key, "retain"))
527             retain = atobv(val, 1);
528         else if (!strcasecmp(key, "q") || !strcasecmp(key, "qos"))
529             qos = atoiv(val, 1);
530         // Simple key-topic mapping
531         else if (!strcasecmp(key, "d") || !strcasecmp(key, "devices"))
532             mqtt->devices = mqtt_topic_default(val, base_topic, path_devices);
533         // deprecated, remove this
534         else if (!strcasecmp(key, "c") || !strcasecmp(key, "usechannel")) {
535             fprintf(stderr, "\"usechannel=...\" has been removed. Use a topic format string:\n");
536             fprintf(stderr, "for \"afterid\"   use e.g. \"devices=rtl_433/[hostname]/devices[/type][/model][/subtype][/id][/channel]\"\n");
537             fprintf(stderr, "for \"beforeid\"  use e.g. \"devices=rtl_433/[hostname]/devices[/type][/model][/subtype][/channel][/id]\"\n");
538             fprintf(stderr, "for \"replaceid\" use e.g. \"devices=rtl_433/[hostname]/devices[/type][/model][/subtype][/channel]\"\n");
539             fprintf(stderr, "for \"no\"        use e.g. \"devices=rtl_433/[hostname]/devices[/type][/model][/subtype][/id]\"\n");
540             exit(1);
541         }
542         // JSON events to single topic
543         else if (!strcasecmp(key, "e") || !strcasecmp(key, "events"))
544             mqtt->events = mqtt_topic_default(val, base_topic, path_events);
545         // JSON states to single topic
546         else if (!strcasecmp(key, "s") || !strcasecmp(key, "states"))
547             mqtt->states = mqtt_topic_default(val, base_topic, path_states);
548         // TODO: Homie Convention https://homieiot.github.io/
549         //else if (!strcasecmp(key, "o") || !strcasecmp(key, "homie"))
550         //    mqtt->homie = mqtt_topic_default(val, NULL, "homie"); // base topic
551         // TODO: Home Assistant MQTT discovery https://www.home-assistant.io/docs/mqtt/discovery/
552         //else if (!strcasecmp(key, "a") || !strcasecmp(key, "hass"))
553         //    mqtt->hass = mqtt_topic_default(val, NULL, "homeassistant"); // discovery prefix
554         else if (!tls_param(&tls_opts, key, val)) {
555             // ok
556         }
557         else {
558             fprintf(stderr, "Invalid key \"%s\" option.\n", key);
559             exit(1);
560         }
561     }
562 
563     // Default is to use all formats
564     if (!mqtt->devices && !mqtt->events && !mqtt->states) {
565         mqtt->devices = mqtt_topic_default(NULL, base_topic, path_devices);
566         mqtt->events  = mqtt_topic_default(NULL, base_topic, path_events);
567         mqtt->states  = mqtt_topic_default(NULL, base_topic, path_states);
568     }
569     if (mqtt->devices)
570         fprintf(stderr, "Publishing device info to MQTT topic \"%s\".\n", mqtt->devices);
571     if (mqtt->events)
572         fprintf(stderr, "Publishing events info to MQTT topic \"%s\".\n", mqtt->events);
573     if (mqtt->states)
574         fprintf(stderr, "Publishing states info to MQTT topic \"%s\".\n", mqtt->states);
575 
576     mqtt->output.print_data   = print_mqtt_data;
577     mqtt->output.print_array  = print_mqtt_array;
578     mqtt->output.print_string = print_mqtt_string;
579     mqtt->output.print_double = print_mqtt_double;
580     mqtt->output.print_int    = print_mqtt_int;
581     mqtt->output.output_free  = data_output_mqtt_free;
582 
583     mqtt->mqc = mqtt_client_init(mgr, &tls_opts, host, port, user, pass, client_id, retain, qos);
584 
585     return &mqtt->output;
586 }
587