1 #include "stdafx.h"
2 #include "TTNMQTT.h"
3 #include "../main/Logger.h"
4 #include "../main/Helper.h"
5 #include <iostream>
6 #include "../main/localtime_r.h"
7 #include "../main/mainworker.h"
8 #include "../main/SQLHelper.h"
9 #include "../main/json_helper.h"
10 #include "../webserver/Base64.h"
11 #include "cayenne_lpp/CayenneLPP_Dec.h"
12 #include <sstream>
13
14 #define RETRY_DELAY 30
15
16 #ifdef _DEBUG
17 //#define DEBUG_TTN_R
18 //#define DEBUG_TTN_W
19 #endif
20
21 #ifdef DEBUG_TTN_W
SaveString2Disk(std::string str,std::string filename)22 void SaveString2Disk(std::string str, std::string filename)
23 {
24 FILE *fOut = fopen(filename.c_str(), "wb+");
25 if (fOut)
26 {
27 fwrite(str.c_str(), 1, str.size(), fOut);
28 fclose(fOut);
29 }
30 }
31 #endif
32 #ifdef DEBUG_TTN_R
ReadFile(std::string filename)33 std::string ReadFile(std::string filename)
34 {
35 std::ifstream file;
36 std::string sResult = "";
37 file.open(filename.c_str());
38 if (!file.is_open())
39 return "";
40 std::string sLine;
41 while (!file.eof())
42 {
43 getline(file, sLine);
44 sResult += sLine;
45 }
46 file.close();
47 return sResult;
48 }
49 #endif
50
51
CTTNMQTT(const int ID,const std::string & IPAddress,const unsigned short usIPPort,const std::string & Username,const std::string & Password,const std::string & CAfilename)52 CTTNMQTT::CTTNMQTT(const int ID, const std::string &IPAddress, const unsigned short usIPPort, const std::string &Username, const std::string &Password, const std::string &CAfilename) :
53 mosqdz::mosquittodz((std::string("Domoticz-TTN") + std::string(GenerateUUID())).c_str()),
54 m_szIPAddress(IPAddress),
55 m_UserName(Username),
56 m_Password(Password),
57 m_CAFilename(CAfilename)
58 {
59 m_HwdID = ID;
60 m_IsConnected = false;
61 m_bDoReconnect = false;
62
63 m_usIPPort = usIPPort;
64 m_TopicIn = Username + "/devices/+/up";
65
66 #ifdef DEBUG_TTN_R
67 std::string sResult = ReadFile("ttn_mqtt.json");
68 mosquitto_message mqtt_msg;
69 mqtt_msg.topic = "domo_rob/devices/lopy4/up";
70 mqtt_msg.payload = (void*)sResult.c_str();
71 mqtt_msg.payloadlen = sResult.size();
72 on_message(&mqtt_msg);
73 #endif
74
75 mosqdz::lib_init();
76 }
77
~CTTNMQTT(void)78 CTTNMQTT::~CTTNMQTT(void)
79 {
80 mosqdz::lib_cleanup();
81 }
82
StartHardware()83 bool CTTNMQTT::StartHardware()
84 {
85 RequestStart();
86
87 StartHeartbeatThread();
88
89 //force connect the next first time
90 m_IsConnected = false;
91
92 m_bIsStarted = true;
93
94 //Start worker thread
95 m_thread = std::make_shared<std::thread>(&CTTNMQTT::Do_Work, this);
96 SetThreadNameInt(m_thread->native_handle());
97 return (m_thread != nullptr);
98 }
99
StopMQTT()100 void CTTNMQTT::StopMQTT()
101 {
102 disconnect();
103 m_bIsStarted = false;
104 }
105
StopHardware()106 bool CTTNMQTT::StopHardware()
107 {
108 StopHeartbeatThread();
109 if (m_thread)
110 {
111 RequestStop();
112 m_thread->join();
113 m_thread.reset();
114 }
115 m_IsConnected = false;
116 return true;
117 }
118
on_subscribe(int mid,int qos_count,const int * granted_qos)119 void CTTNMQTT::on_subscribe(int mid, int qos_count, const int *granted_qos)
120 {
121 _log.Log(LOG_STATUS, "TTN_MQTT: Subscribed");
122 m_IsConnected = true;
123 }
124
on_connect(int rc)125 void CTTNMQTT::on_connect(int rc)
126 {
127 /* rc=
128 ** 0 - success
129 ** 1 - connection refused(unacceptable protocol version)
130 ** 2 - connection refused(identifier rejected)
131 ** 3 - connection refused(broker unavailable)
132 */
133
134 if (rc == 0) {
135 if (m_IsConnected) {
136 _log.Log(LOG_STATUS, "TTN_MQTT: re-connected to: %s:%d", m_szIPAddress.c_str(), m_usIPPort);
137 }
138 else {
139 _log.Log(LOG_STATUS, "TTN_MQTT: connected to: %s:%d", m_szIPAddress.c_str(), m_usIPPort);
140 m_IsConnected = true;
141 sOnConnected(this);
142 }
143 subscribe(NULL, m_TopicIn.c_str());
144 }
145 else {
146 _log.Log(LOG_ERROR, "TTN_MQTT: Connection failed!, restarting (rc=%d)", rc);
147 m_bDoReconnect = true;
148 }
149 }
150
on_disconnect(int rc)151 void CTTNMQTT::on_disconnect(int rc)
152 {
153 if (rc != 0)
154 {
155 if (!IsStopRequested(0))
156 {
157 if (rc == 5)
158 {
159 _log.Log(LOG_ERROR, "TTN_MQTT: disconnected, Invalid Username/Password (rc=%d)", rc);
160 }
161 else
162 {
163 _log.Log(LOG_ERROR, "TTN_MQTT: disconnected, restarting (rc=%d)", rc);
164 }
165 m_bDoReconnect = true;
166 }
167 }
168 }
169
170
ConnectInt()171 bool CTTNMQTT::ConnectInt()
172 {
173 StopMQTT();
174 return ConnectIntEx();
175 }
176
ConnectIntEx()177 bool CTTNMQTT::ConnectIntEx()
178 {
179 m_bDoReconnect = false;
180 _log.Log(LOG_STATUS, "TTN_MQTT: Connecting to %s:%d", m_szIPAddress.c_str(), m_usIPPort);
181
182 int rc;
183 int keepalive = 60;
184
185 if (!m_CAFilename.empty()) {
186 rc = tls_set(m_CAFilename.c_str());
187
188 if (rc != MOSQ_ERR_SUCCESS)
189 {
190 _log.Log(LOG_ERROR, "TTN_MQTT: Failed enabling TLS mode, return code: %d (CA certificate: '%s')", rc, m_CAFilename.c_str());
191 return false;
192 }
193 else {
194 _log.Log(LOG_STATUS, "TTN_MQTT: enabled TLS mode");
195 }
196 }
197 rc = username_pw_set((!m_UserName.empty()) ? m_UserName.c_str() : NULL, (!m_Password.empty()) ? m_Password.c_str() : NULL);
198
199 rc = connect(m_szIPAddress.c_str(), m_usIPPort, keepalive);
200 if (rc != MOSQ_ERR_SUCCESS)
201 {
202 _log.Log(LOG_ERROR, "TTN_MQTT: Failed to start, return code: %d (Check IP/Port)", rc);
203 m_bDoReconnect = true;
204 return false;
205 }
206 return true;
207 }
208
Do_Work()209 void CTTNMQTT::Do_Work()
210 {
211 bool bFirstTime = true;
212 int msec_counter = 0;
213 int sec_counter = 0;
214
215 while (!IsStopRequested(100))
216 {
217 if (!bFirstTime)
218 {
219 int rc = loop();
220 if (rc) {
221 if (rc != MOSQ_ERR_NO_CONN)
222 {
223 if (!IsStopRequested(0))
224 {
225 if (!m_bDoReconnect)
226 {
227 reconnect();
228 }
229 }
230 }
231 }
232 }
233
234 msec_counter++;
235 if (msec_counter == 10)
236 {
237 msec_counter = 0;
238
239 sec_counter++;
240
241 if (sec_counter % 12 == 0) {
242 m_LastHeartbeat = mytime(NULL);
243 }
244
245 if (bFirstTime)
246 {
247 bFirstTime = false;
248 ConnectInt();
249 }
250 else
251 {
252 if (sec_counter % 30 == 0)
253 {
254 if (m_bDoReconnect)
255 ConnectIntEx();
256 }
257 if (isConnected() && sec_counter % 10 == 0)
258 {
259 SendHeartbeat();
260 }
261 }
262 }
263 }
264 clear_callbacks();
265
266 if (isConnected())
267 disconnect();
268
269 _log.Log(LOG_STATUS, "TTN_MQTT: Worker stopped...");
270 }
271
SendHeartbeat()272 void CTTNMQTT::SendHeartbeat()
273 {
274 // not necessary for normal MQTT servers
275 }
276
SendMessage(const std::string & Topic,const std::string & Message)277 void CTTNMQTT::SendMessage(const std::string &Topic, const std::string &Message)
278 {
279 try {
280 if (!m_IsConnected)
281 {
282 _log.Log(LOG_STATUS, "TTN_MQTT: Not Connected, failed to send message: %s", Message.c_str());
283 return;
284 }
285 publish(NULL, Topic.c_str(), Message.size(), Message.c_str());
286 }
287 catch (...)
288 {
289 _log.Log(LOG_ERROR, "TTN_MQTT: Failed to send message: %s", Message.c_str());
290 }
291 }
292
WriteInt(const std::string & sendStr)293 void CTTNMQTT::WriteInt(const std::string &sendStr)
294 {
295 if (sendStr.size() < 2)
296 return;
297 //string the return and the end
298 //std::string sMessage = std::string(sendStr.begin(), sendStr.begin() + sendStr.size());
299 //SendMessage(m_TopicOut, sMessage);
300 }
301
GetSensorWithChannel(const Json::Value & root,const int sChannel)302 Json::Value CTTNMQTT::GetSensorWithChannel(const Json::Value &root, const int sChannel)
303 {
304 Json::Value ret;
305 for (auto itt = root.begin(); itt != root.end(); ++itt)
306 {
307 uint8_t channel = (uint8_t)(*itt)["channel"].asInt();
308
309 if ((channel == sChannel) && !(*itt)["used"])
310 return (*itt);
311 }
312 return ret;
313 }
314
FlagSensorWithChannelUsed(Json::Value & root,const std::string & stype,const int sChannel)315 void CTTNMQTT::FlagSensorWithChannelUsed(Json::Value &root, const std::string &stype, const int sChannel)
316 {
317 for (auto itt = root.begin(); itt != root.end(); ++itt)
318 {
319 uint8_t channel = (uint8_t)(*itt)["channel"].asInt();
320 std::string type = (*itt)["type"].asString();
321
322 if ((type == stype) && (channel == sChannel))
323 {
324 (*itt)["used"] = true;
325 return;
326 }
327 }
328 }
329
UpdateUserVariable(const std::string & varName,const std::string & varValue)330 void CTTNMQTT::UpdateUserVariable(const std::string &varName, const std::string &varValue)
331 {
332 std::string szLastUpdate = TimeToString(NULL, TF_DateTime);
333
334 int ID;
335
336 std::vector<std::vector<std::string> > result;
337 result = m_sql.safe_query("SELECT ID FROM UserVariables WHERE (Name=='%q')", varName.c_str());
338 if (result.empty())
339 {
340 m_sql.safe_query("INSERT INTO UserVariables (Name, ValueType, Value) VALUES ('%q',%d,'%q')", varName.c_str(), USERVARTYPE_STRING, varValue.c_str());
341 result = m_sql.safe_query("SELECT ID FROM UserVariables WHERE (Name=='%q')", varName.c_str());
342 if (result.empty())
343 return;
344 ID = atoi(result[0][0].c_str());
345 }
346 else
347 {
348 ID = atoi(result[0][0].c_str());
349 m_sql.safe_query("UPDATE UserVariables SET Value='%q', LastUpdate='%q' WHERE (ID==%d)", varValue.c_str(), szLastUpdate.c_str(), ID);
350 }
351
352 m_mainworker.m_eventsystem.SetEventTrigger(ID, m_mainworker.m_eventsystem.REASON_USERVARIABLE, 0);
353 m_mainworker.m_eventsystem.UpdateUserVariable(ID, varValue, szLastUpdate);
354 }
355
GetAddDeviceAndSensor(const int m_HwdID,const std::string & DeviceName,const std::string & MacAddress)356 int CTTNMQTT::GetAddDeviceAndSensor(const int m_HwdID, const std::string &DeviceName, const std::string &MacAddress)
357 {
358 int DeviceID = 0;
359
360 //_log.Log(LOG_NORM, "TTN_MQTT: Looking for Device and Sensor (%i): .%s. , .%s.", m_HwdID, DeviceName.c_str(), MacAddress.c_str());
361
362 //Get our internal device_id, if not found, add it
363 std::vector<std::vector<std::string> > result;
364 result = m_sql.safe_query("SELECT ID FROM WOLNodes WHERE (HardwareID==%d) AND (MacAddress=='%q')", m_HwdID, MacAddress.c_str());
365 if (result.empty())
366 {
367 //New one, let's add it to the system
368 m_sql.safe_query("INSERT INTO WOLNodes (HardwareID, Name, MacAddress) VALUES (%d, '%q', '%q')", m_HwdID, DeviceName.c_str(), MacAddress.c_str());
369 result = m_sql.safe_query("SELECT ID FROM WOLNodes WHERE (HardwareID==%d) AND (MacAddress=='%q')", m_HwdID, MacAddress.c_str());
370 if (result.empty())
371 {
372 _log.Log(LOG_ERROR, "TTN_MQTT: Problem adding new Device with MacAddress %s !!", MacAddress.c_str());
373 }
374 }
375
376 if (!result.empty())
377 {
378 DeviceID = atoi(result[0][0].c_str());
379 //Add last received in database ?
380 }
381
382 return DeviceID;
383 }
384
on_message(const struct mosquitto_message * message)385 void CTTNMQTT::on_message(const struct mosquitto_message *message)
386 {
387 std::string topic = message->topic;
388
389 if (topic.find("/up/") != std::string::npos)
390 return; //not interested in sub-topics
391
392 std::string qMessage = std::string((char*)message->payload, (char*)message->payload + message->payloadlen);
393
394 #ifdef DEBUG_TTN_W
395 SaveString2Disk(qMessage, "ttn_mqtt.json");
396 #endif
397
398 //_log.Log(LOG_NORM, "TTN_MQTT: Topic: %s", topic.c_str());
399
400 if (qMessage.empty())
401 return;
402
403 try {
404 int rssi = 12; //Set a default, means unknown
405 int BatteryLevel = 255; // BatteryLevel is not measured, but needs to be provided
406
407 //Get device name from MQTT topic
408 std::vector<std::string> strarray;
409 StringSplit(topic, "/", strarray);
410 std::string MQTTDeviceName = strarray[2];
411
412 //Check if we received a JSON object with payload_raw
413 Json::Value root;
414 bool ret = ParseJSon(qMessage, root);
415 if ((!ret) || (!root.isObject()))
416 {
417 _log.Log(LOG_ERROR, "TTN_MQTT: Invalid data received from %s ! Unable to parse JSON!", MQTTDeviceName.c_str());
418 return;
419 }
420 if (root["payload_raw"].empty())
421 {
422 _log.Log(LOG_ERROR, "TTN_MQTT: Invalid data received from %s ! No payload_raw found in JSON data!", MQTTDeviceName.c_str());
423 return;
424 }
425
426 //Get data from message
427 std::string DeviceName = root["dev_id"].asString();
428 std::string AppId = root["app_id"].asString();
429 std::string DeviceSerial = root["hardware_serial"].asString();
430 int MessagePort = root["port"].asInt();
431 std::string lpp = base64_decode(root["payload_raw"].asString());
432
433 //Check if the payload_raw contains valid CayenneLPP structured data
434 //TO-DO: The current CayenneLPP Decoder assumes Dynamic Sensor Payload structure and not other possible Sensor payload structures like Packed
435 // The LoRa FramePort should be checked to find out which type of patload structure is used. Port 1 is Dynamic, Port 2 is Packed, etc.
436 // But as for LoRa mostly port 2 is used as the default and Dynamic the most implemented CatenneLPP payload structure, we stick with these assumptions
437 Json::Value payload;
438
439 if (!CayenneLPPDec::ParseLPP((const uint8_t*)lpp.c_str(), lpp.size(), payload))
440 {
441 _log.Log(LOG_ERROR, "TTN_MQTT: Invalid data received! The payload_raw does not contain a valid CayenneLPP payload!");
442 return;
443 }
444
445 if ((payload.empty()) || (!payload.isArray()))
446 {
447 _log.Log(LOG_ERROR, "TTN_MQTT: Invalid data received! The CayenneLPP payload doesn't contain (valid) data!");
448 return;
449 }
450
451 //See if we can find the rssi
452 if (!root["metadata"].empty())
453 {
454 Json::Value MetaData = root["metadata"];
455 if (!MetaData["gateways"].empty())
456 {
457 Json::Value Gateways = MetaData["gateways"];
458 // TO_DO: there could be multiple gateways and we should loop them to find the lowest rssi
459 if (!Gateways[0].empty())
460 {
461 Json::Value Gateway = Gateways[0];
462 int rssi = Gateway["rssi"].asInt();
463 //_log.Log(LOG_NORM, "TTN_MQTT: Found Gateway 0! RSSI %i", rssi);
464 }
465 }
466 }
467
468 int DeviceID = GetAddDeviceAndSensor(m_HwdID, DeviceName.c_str(), DeviceSerial.c_str());
469 if (DeviceID == 0) // Unable to find the Device and/or Add the new Device
470 {
471 return;
472 }
473
474 // Walk over the payload to find all used channels. Each channel represents a single sensor.
475 int chanSensors [65] = {}; // CayenneLPP Data Channel ranges from 0 to 64
476 for (auto itt = payload.begin(); itt != payload.end(); ++itt)
477 {
478 uint8_t channel = (uint8_t)(*itt)["channel"].asInt();
479 std::string type = (*itt)["type"].asString();
480
481 chanSensors[channel]++;
482 //_log.Log(LOG_STATUS, "TTN_MQTT: Increased count for channel %i (%s)!", channel, type.c_str());
483 }
484
485 // Now walk over each channel/sensor to find the different measurement types and values
486 int channel = 0;
487 do {
488 if(chanSensors[channel] > 0)
489 {
490 //_log.Log(LOG_STATUS, "TTN_MQTT: Processing %i Sensorvalues for channel %i!", chanSensors[channel],channel);
491 bool bTemp = false, bHumidity = false, bBaro = false, bGps = false, bDin = false, bDout = false, bAin = false, bAout = false, bPresense = false, bLuminosity = false;
492 float temp = 0, hum = 0, baro = 0, lat = 0, lon = 0, alt = 0, ain = 0, aout = 0, presence = 0, luminocity = 0;
493 int din = 0, dout = 0;
494 uint8_t nforecast = wsbaroforecast_some_clouds;
495
496 int current = 1;
497 do {
498 // Look for Sensorreading for the sensor of this channel that hasn't been processed yet
499 Json::Value vSensor = GetSensorWithChannel(payload, channel);
500 if ( vSensor.isObject() )
501 {
502 std::string type = vSensor["type"].asString();
503 //_log.Log(LOG_STATUS, "TTN_MQTT: Processing Sensor of type %s for channel %i!", type.c_str(),channel);
504
505 if (type == "temp") {
506 temp = vSensor["value"].asFloat();
507 bTemp = true;
508 }
509 else if (type == "humidity") {
510 hum = vSensor["value"].asFloat();
511 bHumidity = true;
512 }
513 else if (type == "baro") {
514 baro = vSensor["value"].asFloat();
515 bBaro = true;
516 }
517 else if (type == "gps") {
518 std::stringstream sstr;
519 sstr << vSensor["lat"].asFloat() << "," << vSensor["lon"].asFloat() << "," << vSensor["alt"].asFloat();
520
521 SendPercentageSensor(DeviceID, channel, BatteryLevel, vSensor["alt"].asFloat(), DeviceName + " Altitude");
522 UpdateUserVariable(DeviceName, sstr.str());
523 bGps = true;
524 }
525 else if (type == "digital_input") {
526 SendGeneralSwitch(DeviceID, channel, BatteryLevel, vSensor["value"].asInt(), 0, DeviceName, rssi);
527 bDin = true;
528 }
529 else if (type == "digital_output") {
530 SendGeneralSwitch(DeviceID, channel, BatteryLevel, vSensor["value"].asInt(), 0, DeviceName, rssi);
531 bDout = true;
532 }
533 else if (type == "analog_input") {
534 SendCustomSensor(DeviceID, channel, BatteryLevel, vSensor["value"].asFloat(), DeviceName, type);
535 bAin = true;
536 }
537 else if (type == "analog_output") {
538 SendCustomSensor(DeviceID, channel, BatteryLevel, vSensor["value"].asFloat(), DeviceName, type);
539 bAout = true;
540 }
541 else if (type == "presense") {
542 SendCustomSensor(DeviceID, channel, BatteryLevel, vSensor["value"].asFloat(), DeviceName, type);
543 bPresense = true;
544 }
545 else if (type == "luminosity") {
546 SendLuxSensor(DeviceID, channel, BatteryLevel, vSensor["value"].asFloat(), DeviceName);
547 bLuminosity = true;
548 }
549 else if (type == "accel") {
550 _log.Log(LOG_STATUS, "TTN_MQTT: Sensortype %s not implemented!", type.c_str());
551 }
552 else if (type == "gyro") {
553 _log.Log(LOG_STATUS, "TTN_MQTT: Sensortype %s not implemented!", type.c_str());
554 }
555 else if (type == "unixtime") {
556 _log.Log(LOG_STATUS, "TTN_MQTT: Sensortype %s not implemented!", type.c_str());
557 }
558 else {
559 _log.Log(LOG_STATUS, "TTN_MQTT: Unhandled Sensortype %s!", type.c_str());
560 }
561
562 FlagSensorWithChannelUsed(payload, type, channel);
563 }
564 else
565 {
566 _log.Log(LOG_ERROR, "TTN_MQTT: Could not process all Sensorvalues for channel %i! Now at %i, but expected %i values!", channel, current, chanSensors[channel]);
567 }
568 current++;
569 }
570 while (current <= chanSensors[channel]);
571
572 // Let see if we can predict a forecast based on the barometer value
573 if(bBaro)
574 {
575 if (bTemp)
576 {
577 nforecast = m_forecast_calculators[DeviceName].CalculateBaroForecast(temp, baro);
578 }
579 else
580 {
581 nforecast = m_forecast_calculators[DeviceName].CalculateBaroForecast(baro);
582 }
583 }
584
585 // Now store the sensor values if not stored already
586 if (bTemp && bHumidity && bBaro)
587 {
588 SendTempHumBaroSensorFloat(DeviceID, BatteryLevel, temp, (int)rint(hum), baro, (uint8_t)nforecast, DeviceName, rssi);
589 }
590 else if(bTemp && bHumidity)
591 {
592 SendTempHumSensor(DeviceID, BatteryLevel, temp, (int)rint(hum), DeviceName, rssi);
593 }
594 else if(bTemp && bBaro)
595 {
596 SendTempBaroSensor(DeviceID, BatteryLevel, temp, baro, DeviceName);
597 }
598 else
599 {
600 if (bTemp)
601 {
602 SendTempSensor(DeviceID, BatteryLevel, temp, DeviceName, rssi);
603 }
604 if (bHumidity)
605 {
606 SendHumiditySensor(DeviceID, BatteryLevel, (int)rint(hum), DeviceName, rssi);
607 }
608 if (bBaro)
609 {
610 SendBaroSensor(DeviceID, channel, BatteryLevel, baro, (uint8_t)nforecast, DeviceName);
611 }
612 }
613 }
614 channel++;
615 }
616 while (channel < 65);
617 }
618 catch (...)
619 {
620 _log.Log(LOG_ERROR, "TTN_MQTT: Error parsing message!!!");
621 }
622 }
623