1 /** @file
2 MQTT output for rtl_433 events
3
4 Copyright (C) 2019 Christian Zuckschwerdt
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
10 */
11
12 // note: our unit header includes unistd.h for gethostname() via data.h
13 #include "output_mqtt.h"
14 #include "optparse.h"
15 #include "util.h"
16 #include "fatal.h"
17 #include "r_util.h"
18
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <string.h>
22
23 #include "mongoose.h"
24
25 /* MQTT client abstraction */
26
27 typedef struct mqtt_client {
28 struct mg_connect_opts connect_opts;
29 struct mg_send_mqtt_handshake_opts mqtt_opts;
30 struct mg_connection *conn;
31 int prev_status;
32 char address[253 + 6 + 1]; // dns max + port
33 char client_id[256];
34 uint16_t message_id;
35 int publish_flags; // MG_MQTT_RETAIN | MG_MQTT_QOS(0)
36 } mqtt_client_t;
37
mqtt_client_event(struct mg_connection * nc,int ev,void * ev_data)38 static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data)
39 {
40 // note that while shutting down the ctx is NULL
41 mqtt_client_t *ctx = (mqtt_client_t *)nc->user_data;
42 // only valid in MG_EV_MQTT_ events
43 struct mg_mqtt_message *msg = (struct mg_mqtt_message *)ev_data;
44
45 //if (ev != MG_EV_POLL)
46 // fprintf(stderr, "MQTT user handler got event %d\n", ev);
47
48 switch (ev) {
49 case MG_EV_CONNECT: {
50 int connect_status = *(int *)ev_data;
51 if (connect_status == 0) {
52 // Success
53 fprintf(stderr, "MQTT Connected...\n");
54 mg_set_protocol_mqtt(nc);
55 if (ctx)
56 mg_send_mqtt_handshake_opt(nc, ctx->client_id, ctx->mqtt_opts);
57 }
58 else {
59 // Error, print only once
60 if (ctx && ctx->prev_status != connect_status)
61 fprintf(stderr, "MQTT connect error: %s\n", strerror(connect_status));
62 }
63 if (ctx)
64 ctx->prev_status = connect_status;
65 break;
66 }
67 case MG_EV_MQTT_CONNACK:
68 if (msg->connack_ret_code != MG_EV_MQTT_CONNACK_ACCEPTED) {
69 fprintf(stderr, "MQTT Connection error: %u\n", msg->connack_ret_code);
70 }
71 else {
72 fprintf(stderr, "MQTT Connection established.\n");
73 }
74 break;
75 case MG_EV_MQTT_PUBACK:
76 fprintf(stderr, "MQTT Message publishing acknowledged (msg_id: %u)\n", msg->message_id);
77 break;
78 case MG_EV_MQTT_SUBACK:
79 fprintf(stderr, "MQTT Subscription acknowledged.\n");
80 break;
81 case MG_EV_MQTT_PUBLISH: {
82 fprintf(stderr, "MQTT Incoming message %.*s: %.*s\n", (int)msg->topic.len,
83 msg->topic.p, (int)msg->payload.len, msg->payload.p);
84 break;
85 }
86 case MG_EV_CLOSE:
87 if (!ctx)
88 break; // shuttig down
89 if (ctx->prev_status == 0)
90 fprintf(stderr, "MQTT Connection failed...\n");
91 // reconnect
92 char const *error_string = NULL;
93 ctx->connect_opts.error_string = &error_string;
94 ctx->conn = mg_connect_opt(nc->mgr, ctx->address, mqtt_client_event, ctx->connect_opts);
95 ctx->connect_opts.error_string = NULL;
96 if (!ctx->conn) {
97 fprintf(stderr, "MQTT connect (%s) failed%s%s\n", ctx->address,
98 error_string ? ": " : "", error_string ? error_string : "");
99 }
100 break;
101 }
102 }
103
mqtt_client_init(struct mg_mgr * mgr,tls_opts_t * tls_opts,char const * host,char const * port,char const * user,char const * pass,char const * client_id,int retain,int qos)104 static mqtt_client_t *mqtt_client_init(struct mg_mgr *mgr, tls_opts_t *tls_opts, char const *host, char const *port, char const *user, char const *pass, char const *client_id, int retain, int qos)
105 {
106 mqtt_client_t *ctx = calloc(1, sizeof(*ctx));
107 if (!ctx)
108 FATAL_CALLOC("mqtt_client_init()");
109
110 ctx->mqtt_opts.user_name = user;
111 ctx->mqtt_opts.password = pass;
112 ctx->publish_flags = MG_MQTT_QOS(qos) | (retain ? MG_MQTT_RETAIN : 0);
113 // TODO: these should be user configurable options
114 //ctx->opts.keepalive = 60;
115 //ctx->timeout = 10000L;
116 //ctx->cleansession = 1;
117 strncpy(ctx->client_id, client_id, sizeof(ctx->client_id));
118 ctx->client_id[sizeof(ctx->client_id) - 1] = '\0';
119
120 // if the host is an IPv6 address it needs quoting
121 if (strchr(host, ':'))
122 snprintf(ctx->address, sizeof(ctx->address), "[%s]:%s", host, port);
123 else
124 snprintf(ctx->address, sizeof(ctx->address), "%s:%s", host, port);
125
126 ctx->connect_opts.user_data = ctx;
127 if (tls_opts && tls_opts->tls_ca_cert) {
128 #if MG_ENABLE_SSL
129 ctx->connect_opts.ssl_cert = tls_opts->tls_cert;
130 ctx->connect_opts.ssl_key = tls_opts->tls_key;
131 ctx->connect_opts.ssl_ca_cert = tls_opts->tls_ca_cert;
132 ctx->connect_opts.ssl_cipher_suites = tls_opts->tls_cipher_suites;
133 ctx->connect_opts.ssl_server_name = tls_opts->tls_server_name;
134 ctx->connect_opts.ssl_psk_identity = tls_opts->tls_psk_identity;
135 ctx->connect_opts.ssl_psk_key = tls_opts->tls_psk_key;
136 #else
137 fprintf(stderr, "mqtts (TLS) not available\n");
138 exit(1);
139 #endif
140 }
141 char const *error_string = NULL;
142 ctx->connect_opts.error_string = &error_string;
143 ctx->conn = mg_connect_opt(mgr, ctx->address, mqtt_client_event, ctx->connect_opts);
144 ctx->connect_opts.error_string = NULL;
145 if (!ctx->conn) {
146 fprintf(stderr, "MQTT connect (%s) failed%s%s\n", ctx->address,
147 error_string ? ": " : "", error_string ? error_string : "");
148 exit(1);
149 }
150
151 return ctx;
152 }
153
mqtt_client_publish(mqtt_client_t * ctx,char const * topic,char const * str)154 static void mqtt_client_publish(mqtt_client_t *ctx, char const *topic, char const *str)
155 {
156 if (!ctx->conn || !ctx->conn->proto_handler)
157 return;
158
159 ctx->message_id++;
160 mg_mqtt_publish(ctx->conn, topic, ctx->message_id, ctx->publish_flags, str, strlen(str));
161 }
162
mqtt_client_free(mqtt_client_t * ctx)163 static void mqtt_client_free(mqtt_client_t *ctx)
164 {
165 if (ctx && ctx->conn) {
166 ctx->conn->user_data = NULL;
167 ctx->conn->flags |= MG_F_CLOSE_IMMEDIATELY;
168 }
169 free(ctx);
170 }
171
172 /* Helper */
173
174 /// clean the topic inplace to [-.A-Za-z0-9], esp. not whitespace, +, #, /, $
mqtt_sanitize_topic(char * topic)175 static char *mqtt_sanitize_topic(char *topic)
176 {
177 for (char *p = topic; *p; ++p)
178 if (*p != '-' && *p != '.' && (*p < 'A' || *p > 'Z') && (*p < 'a' || *p > 'z') && (*p < '0' || *p > '9'))
179 *p = '_';
180
181 return topic;
182 }
183
184 /* MQTT printer */
185
186 typedef struct {
187 struct data_output output;
188 mqtt_client_t *mqc;
189 char topic[256];
190 char hostname[64];
191 char *devices;
192 char *events;
193 char *states;
194 //char *homie;
195 //char *hass;
196 } data_output_mqtt_t;
197
print_mqtt_array(data_output_t * output,data_array_t * array,char const * format)198 static void print_mqtt_array(data_output_t *output, data_array_t *array, char const *format)
199 {
200 data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
201
202 char *orig = mqtt->topic + strlen(mqtt->topic); // save current topic
203
204 for (int c = 0; c < array->num_values; ++c) {
205 sprintf(orig, "/%d", c);
206 print_array_value(output, array, format, c);
207 }
208 *orig = '\0'; // restore topic
209 }
210
append_topic(char * topic,data_t * data)211 static char *append_topic(char *topic, data_t *data)
212 {
213 if (data->type == DATA_STRING) {
214 strcpy(topic, data->value.v_ptr);
215 mqtt_sanitize_topic(topic);
216 topic += strlen(data->value.v_ptr);
217 }
218 else if (data->type == DATA_INT) {
219 topic += sprintf(topic, "%d", data->value.v_int);
220 }
221 else {
222 fprintf(stderr, "Can't append data type %d to topic\n", data->type);
223 }
224
225 return topic;
226 }
227
expand_topic(char * topic,char const * format,data_t * data,char const * hostname)228 static char *expand_topic(char *topic, char const *format, data_t *data, char const *hostname)
229 {
230 // collect well-known top level keys
231 data_t *data_type = NULL;
232 data_t *data_model = NULL;
233 data_t *data_subtype = NULL;
234 data_t *data_channel = NULL;
235 data_t *data_id = NULL;
236 data_t *data_protocol = NULL;
237 for (data_t *d = data; d; d = d->next) {
238 if (!strcmp(d->key, "type"))
239 data_type = d;
240 else if (!strcmp(d->key, "model"))
241 data_model = d;
242 else if (!strcmp(d->key, "subtype"))
243 data_subtype = d;
244 else if (!strcmp(d->key, "channel"))
245 data_channel = d;
246 else if (!strcmp(d->key, "id"))
247 data_id = d;
248 else if (!strcmp(d->key, "protocol")) // NOTE: needs "-M protocol"
249 data_protocol = d;
250 }
251
252 // consume entire format string
253 while (format && *format) {
254 data_t *data_token = NULL;
255 char const *string_token = NULL;
256 int leading_slash = 0;
257 char const *t_start = NULL;
258 char const *t_end = NULL;
259 char const *d_start = NULL;
260 char const *d_end = NULL;
261 // copy until '['
262 while (*format && *format != '[')
263 *topic++ = *format++;
264 // skip '['
265 if (!*format)
266 break;
267 ++format;
268 // read slash
269 if (!leading_slash && (*format < 'a' || *format > 'z')) {
270 leading_slash = *format;
271 format++;
272 }
273 // read key until : or ]
274 t_start = t_end = format;
275 while (*format && *format != ':' && *format != ']' && *format != '[')
276 t_end = ++format;
277 // read default until ]
278 if (*format == ':') {
279 d_start = d_end = ++format;
280 while (*format && *format != ']' && *format != '[')
281 d_end = ++format;
282 }
283 // check for proper closing
284 if (*format != ']') {
285 fprintf(stderr, "%s: unterminated token\n", __func__);
286 exit(1);
287 }
288 ++format;
289
290 // resolve token
291 if (!strncmp(t_start, "hostname", t_end - t_start))
292 string_token = hostname;
293 else if (!strncmp(t_start, "type", t_end - t_start))
294 data_token = data_type;
295 else if (!strncmp(t_start, "model", t_end - t_start))
296 data_token = data_model;
297 else if (!strncmp(t_start, "subtype", t_end - t_start))
298 data_token = data_subtype;
299 else if (!strncmp(t_start, "channel", t_end - t_start))
300 data_token = data_channel;
301 else if (!strncmp(t_start, "id", t_end - t_start))
302 data_token = data_id;
303 else if (!strncmp(t_start, "protocol", t_end - t_start))
304 data_token = data_protocol;
305 else {
306 fprintf(stderr, "%s: unknown token \"%.*s\"\n", __func__, (int)(t_end - t_start), t_start);
307 exit(1);
308 }
309
310 // append token or default
311 if (!data_token && !string_token && !d_start)
312 continue;
313 if (leading_slash)
314 *topic++ = leading_slash;
315 if (data_token)
316 topic = append_topic(topic, data_token);
317 else if (string_token)
318 topic += sprintf(topic, "%s", string_token);
319 else
320 topic += sprintf(topic, "%.*s", (int)(d_end - d_start), d_start);
321 }
322
323 *topic = '\0';
324 return topic;
325 }
326
327 // <prefix>[/type][/model][/subtype][/channel][/id]/battery: "OK"|"LOW"
print_mqtt_data(data_output_t * output,data_t * data,char const * format)328 static void print_mqtt_data(data_output_t *output, data_t *data, char const *format)
329 {
330 UNUSED(format);
331 data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
332
333 char *orig = mqtt->topic + strlen(mqtt->topic); // save current topic
334 char *end = orig;
335
336 // top-level only
337 if (!*mqtt->topic) {
338 // collect well-known top level keys
339 data_t *data_model = NULL;
340 for (data_t *d = data; d; d = d->next) {
341 if (!strcmp(d->key, "model"))
342 data_model = d;
343 }
344
345 // "states" topic
346 if (!data_model) {
347 if (mqtt->states) {
348 size_t message_size = 20000; // state message need a large buffer
349 char *message = malloc(message_size);
350 if (!message) {
351 WARN_MALLOC("print_mqtt_data()");
352 return; // NOTE: skip output on alloc failure.
353 }
354 data_print_jsons(data, message, message_size);
355 expand_topic(mqtt->topic, mqtt->states, data, mqtt->hostname);
356 mqtt_client_publish(mqtt->mqc, mqtt->topic, message);
357 *mqtt->topic = '\0'; // clear topic
358 free(message);
359 }
360 return;
361 }
362
363 // "events" topic
364 if (mqtt->events) {
365 char message[2048]; // we expect the biggest strings to be around 500 bytes.
366 data_print_jsons(data, message, sizeof(message));
367 expand_topic(mqtt->topic, mqtt->events, data, mqtt->hostname);
368 mqtt_client_publish(mqtt->mqc, mqtt->topic, message);
369 *mqtt->topic = '\0'; // clear topic
370 }
371
372 // "devices" topic
373 if (!mqtt->devices) {
374 return;
375 }
376
377 end = expand_topic(mqtt->topic, mqtt->devices, data, mqtt->hostname);
378 }
379
380 while (data) {
381 if (!strcmp(data->key, "type")
382 || !strcmp(data->key, "model")
383 || !strcmp(data->key, "subtype")) {
384 // skip, except "id", "channel"
385 }
386 else {
387 // push topic
388 *end = '/';
389 strcpy(end + 1, data->key);
390 print_value(output, data->type, data->value, data->format);
391 *end = '\0'; // pop topic
392 }
393 data = data->next;
394 }
395 *orig = '\0'; // restore topic
396 }
397
print_mqtt_string(data_output_t * output,char const * str,char const * format)398 static void print_mqtt_string(data_output_t *output, char const *str, char const *format)
399 {
400 UNUSED(format);
401 data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
402 mqtt_client_publish(mqtt->mqc, mqtt->topic, str);
403 }
404
print_mqtt_double(data_output_t * output,double data,char const * format)405 static void print_mqtt_double(data_output_t *output, double data, char const *format)
406 {
407 char str[20];
408 // use scientific notation for very big/small values
409 if (data > 1e7 || data < 1e-4) {
410 snprintf(str, 20, "%g", data);
411 }
412 else {
413 int ret = snprintf(str, 20, "%.5f", data);
414 // remove trailing zeros, always keep one digit after the decimal point
415 char *p = str + ret - 1;
416 while (*p == '0' && p[-1] != '.') {
417 *p-- = '\0';
418 }
419 }
420
421 print_mqtt_string(output, str, format);
422 }
423
print_mqtt_int(data_output_t * output,int data,char const * format)424 static void print_mqtt_int(data_output_t *output, int data, char const *format)
425 {
426 char str[20];
427 snprintf(str, 20, "%d", data);
428 print_mqtt_string(output, str, format);
429 }
430
data_output_mqtt_free(data_output_t * output)431 static void data_output_mqtt_free(data_output_t *output)
432 {
433 data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
434
435 if (!mqtt)
436 return;
437
438 free(mqtt->devices);
439 free(mqtt->events);
440 free(mqtt->states);
441 //free(mqtt->homie);
442 //free(mqtt->hass);
443
444 mqtt_client_free(mqtt->mqc);
445
446 free(mqtt);
447 }
448
mqtt_topic_default(char const * topic,char const * base,char const * suffix)449 static char *mqtt_topic_default(char const *topic, char const *base, char const *suffix)
450 {
451 char path[256];
452 char const *p;
453 if (topic) {
454 p = topic;
455 }
456 else if (!base) {
457 p = suffix;
458 }
459 else {
460 snprintf(path, sizeof(path), "%s/%s", base, suffix);
461 p = path;
462 }
463
464 char *ret = strdup(p);
465 if (!ret)
466 WARN_STRDUP("mqtt_topic_default()");
467 return ret;
468 }
469
data_output_mqtt_create(struct mg_mgr * mgr,char * param,char const * dev_hint)470 struct data_output *data_output_mqtt_create(struct mg_mgr *mgr, char *param, char const *dev_hint)
471 {
472 data_output_mqtt_t *mqtt = calloc(1, sizeof(data_output_mqtt_t));
473 if (!mqtt)
474 FATAL_CALLOC("data_output_mqtt_create()");
475
476 gethostname(mqtt->hostname, sizeof(mqtt->hostname) - 1);
477 mqtt->hostname[sizeof(mqtt->hostname) - 1] = '\0';
478 // only use hostname, not domain part
479 char *dot = strchr(mqtt->hostname, '.');
480 if (dot)
481 *dot = '\0';
482 //fprintf(stderr, "Hostname: %s\n", hostname);
483
484 // generate a short deterministic client_id to identify this input device on restart
485 uint16_t host_crc = crc16((uint8_t *)mqtt->hostname, strlen(mqtt->hostname), 0x1021, 0xffff);
486 uint16_t devq_crc = crc16((uint8_t *)dev_hint, dev_hint ? strlen(dev_hint) : 0, 0x1021, 0xffff);
487 char client_id[17];
488 snprintf(client_id, sizeof(client_id), "rtl_433-%04x%04x", host_crc, devq_crc);
489
490 // default base topic
491 char base_topic[8 + sizeof(mqtt->hostname)];
492 snprintf(base_topic, sizeof(base_topic), "rtl_433/%s", mqtt->hostname);
493
494 // default topics
495 char const *path_devices = "devices[/type][/model][/subtype][/channel][/id]";
496 char const *path_events = "events";
497 char const *path_states = "states";
498
499 char *user = NULL;
500 char *pass = NULL;
501 int retain = 0;
502 int qos = 0;
503
504 // parse host and port
505 tls_opts_t tls_opts = {0};
506 if (strncmp(param, "mqtts", 5) == 0) {
507 tls_opts.tls_ca_cert = "*"; // TLS is enabled but no cert verification is performed.
508 }
509 param = arg_param(param); // strip scheme
510 char *host = "localhost";
511 char *port = tls_opts.tls_ca_cert ? "8883" : "1883";
512 char *opts = hostport_param(param, &host, &port);
513 fprintf(stderr, "Publishing MQTT data to %s port %s%s\n", host, port, tls_opts.tls_ca_cert ? " (TLS)" : "");
514
515 // parse auth and format options
516 char *key, *val;
517 while (getkwargs(&opts, &key, &val)) {
518 key = remove_ws(key);
519 val = trim_ws(val);
520 if (!key || !*key)
521 continue;
522 else if (!strcasecmp(key, "u") || !strcasecmp(key, "user"))
523 user = val;
524 else if (!strcasecmp(key, "p") || !strcasecmp(key, "pass"))
525 pass = val;
526 else if (!strcasecmp(key, "r") || !strcasecmp(key, "retain"))
527 retain = atobv(val, 1);
528 else if (!strcasecmp(key, "q") || !strcasecmp(key, "qos"))
529 qos = atoiv(val, 1);
530 // Simple key-topic mapping
531 else if (!strcasecmp(key, "d") || !strcasecmp(key, "devices"))
532 mqtt->devices = mqtt_topic_default(val, base_topic, path_devices);
533 // deprecated, remove this
534 else if (!strcasecmp(key, "c") || !strcasecmp(key, "usechannel")) {
535 fprintf(stderr, "\"usechannel=...\" has been removed. Use a topic format string:\n");
536 fprintf(stderr, "for \"afterid\" use e.g. \"devices=rtl_433/[hostname]/devices[/type][/model][/subtype][/id][/channel]\"\n");
537 fprintf(stderr, "for \"beforeid\" use e.g. \"devices=rtl_433/[hostname]/devices[/type][/model][/subtype][/channel][/id]\"\n");
538 fprintf(stderr, "for \"replaceid\" use e.g. \"devices=rtl_433/[hostname]/devices[/type][/model][/subtype][/channel]\"\n");
539 fprintf(stderr, "for \"no\" use e.g. \"devices=rtl_433/[hostname]/devices[/type][/model][/subtype][/id]\"\n");
540 exit(1);
541 }
542 // JSON events to single topic
543 else if (!strcasecmp(key, "e") || !strcasecmp(key, "events"))
544 mqtt->events = mqtt_topic_default(val, base_topic, path_events);
545 // JSON states to single topic
546 else if (!strcasecmp(key, "s") || !strcasecmp(key, "states"))
547 mqtt->states = mqtt_topic_default(val, base_topic, path_states);
548 // TODO: Homie Convention https://homieiot.github.io/
549 //else if (!strcasecmp(key, "o") || !strcasecmp(key, "homie"))
550 // mqtt->homie = mqtt_topic_default(val, NULL, "homie"); // base topic
551 // TODO: Home Assistant MQTT discovery https://www.home-assistant.io/docs/mqtt/discovery/
552 //else if (!strcasecmp(key, "a") || !strcasecmp(key, "hass"))
553 // mqtt->hass = mqtt_topic_default(val, NULL, "homeassistant"); // discovery prefix
554 else if (!tls_param(&tls_opts, key, val)) {
555 // ok
556 }
557 else {
558 fprintf(stderr, "Invalid key \"%s\" option.\n", key);
559 exit(1);
560 }
561 }
562
563 // Default is to use all formats
564 if (!mqtt->devices && !mqtt->events && !mqtt->states) {
565 mqtt->devices = mqtt_topic_default(NULL, base_topic, path_devices);
566 mqtt->events = mqtt_topic_default(NULL, base_topic, path_events);
567 mqtt->states = mqtt_topic_default(NULL, base_topic, path_states);
568 }
569 if (mqtt->devices)
570 fprintf(stderr, "Publishing device info to MQTT topic \"%s\".\n", mqtt->devices);
571 if (mqtt->events)
572 fprintf(stderr, "Publishing events info to MQTT topic \"%s\".\n", mqtt->events);
573 if (mqtt->states)
574 fprintf(stderr, "Publishing states info to MQTT topic \"%s\".\n", mqtt->states);
575
576 mqtt->output.print_data = print_mqtt_data;
577 mqtt->output.print_array = print_mqtt_array;
578 mqtt->output.print_string = print_mqtt_string;
579 mqtt->output.print_double = print_mqtt_double;
580 mqtt->output.print_int = print_mqtt_int;
581 mqtt->output.output_free = data_output_mqtt_free;
582
583 mqtt->mqc = mqtt_client_init(mgr, &tls_opts, host, port, user, pass, client_id, retain, qos);
584
585 return &mqtt->output;
586 }
587