1 /***************************************************************************
2 *
3 * Project: OpenCPN
4 * Purpose: PlugIn Manager Object
5 * Author: David Register
6 *
7 ***************************************************************************
8 * Copyright (C) 2010 by David S. Register *
9 * *
10 * This program is free software; you can redistribute it and/or modify *
11 * it under the terms of the GNU General Public License as published by *
12 * the Free Software Foundation; either version 2 of the License, or *
13 * (at your option) any later version. *
14 * *
15 * This program is distributed in the hope that it will be useful, *
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
18 * GNU General Public License for more details. *
19 * *
20 * You should have received a copy of the GNU General Public License *
21 * along with this program; if not, write to the *
22 * Free Software Foundation, Inc., *
23 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
24 **************************************************************************/
25 //Originally by balp on 2018-07-28.
26
27 #ifdef __MINGW32__
28 #undef IPV6STRICT // mingw FTBS fix: missing struct ip_mreq
29 #include <windows.h>
30 #endif
31
32 #include "zeroconf.hpp"
33
34 #include "wx/wxprec.h"
35
36 #ifndef WX_PRECOMP
37 #include "wx/wx.h"
38 #endif //precompiled headers
39
40 #include "wx/tokenzr.h"
41 #include <wx/datetime.h>
42
43 #include <stdlib.h>
44 #include <math.h>
45 #include <time.h>
46
47 #ifndef __WXMSW__
48 #include <arpa/inet.h>
49 #include <netinet/tcp.h>
50 #endif
51
52 #include <vector>
53 #include <sstream>
54 #include <wx/socket.h>
55 #include <wx/log.h>
56 #include <wx/memory.h>
57 #include <wx/chartype.h>
58 #include <wx/wx.h>
59 #include <wx/sckaddr.h>
60 #include "easywsclient.hpp"
61 #include "chart1.h"
62 #include "wxServDisc.h"
63
64 #if defined(__WXMSW__) && !defined(__MINGW32__)
65 #include <Ws2tcpip.h> // for ip_mreq
66 #endif
67
68 #include "dychart.h"
69
70 #include "datastream.h"
71 #include "SignalKDataStream.h"
72 #include "NetworkDataStream.h"
73 #include "OCPN_SignalKEvent.h"
74 #include "OCPN_DataStreamEvent.h"
75
76
77 #if !defined(NAN)
78 static const long long lNaN = 0xfff8000000000000;
79 #define NAN (*(double*)&lNaN)
80 #endif
81
82 extern bool g_benableUDPNullHeader;
83
84 static wxEvtHandler *s_wsConsumer;
85 extern MyFrame *gFrame;
86
87 #include <wx/sckstrm.h>
88 #include "wx/jsonreader.h"
89 #include "wx/jsonwriter.h"
90
91 // Handle nessages from WebSocket thread, forward to stipulated consumer, and manage watchdog
92 class OCPN_WebSocketMessageHandler : public wxEvtHandler
93 {
94 public:
95 OCPN_WebSocketMessageHandler( SignalKDataStream *parent, wxEvtHandler *upstream_consumer );
96 ~OCPN_WebSocketMessageHandler();
97
98 void OnWebSocketMessage( OCPN_SignalKEvent& event );
99
100 SignalKDataStream *m_parent;
101 wxEvtHandler *m_upstream_consumer;
102 };
103
104
105
OCPN_WebSocketMessageHandler(SignalKDataStream * parent,wxEvtHandler * upstream_consumer)106 OCPN_WebSocketMessageHandler::OCPN_WebSocketMessageHandler( SignalKDataStream *parent, wxEvtHandler *upstream_consumer )
107 {
108 m_upstream_consumer = upstream_consumer;
109 m_parent = parent;
110 Bind(EVT_OCPN_SIGNALKSTREAM, &OCPN_WebSocketMessageHandler::OnWebSocketMessage, this);
111 }
112
OnWebSocketMessage(OCPN_SignalKEvent & event)113 void OCPN_WebSocketMessageHandler::OnWebSocketMessage( OCPN_SignalKEvent &event )
114 {
115 if(m_upstream_consumer){
116 OCPN_SignalKEvent signalKEvent(0, EVT_OCPN_SIGNALKSTREAM, event.GetString());
117 m_upstream_consumer->AddPendingEvent(signalKEvent);
118 }
119
120 m_parent->ResetWatchdog(); // feed the dog
121
122 }
123
~OCPN_WebSocketMessageHandler()124 OCPN_WebSocketMessageHandler::~OCPN_WebSocketMessageHandler()
125 {
126 Unbind(EVT_OCPN_SIGNALKSTREAM, &OCPN_WebSocketMessageHandler::OnWebSocketMessage, this);
127 }
128
129
130
131
BEGIN_EVENT_TABLE(SignalKDataStream,wxEvtHandler)132 BEGIN_EVENT_TABLE(SignalKDataStream, wxEvtHandler)
133 EVT_TIMER(TIMER_SOCKET + 2, SignalKDataStream::OnTimerSocket)
134 EVT_SOCKET(SIGNALK_SOCKET_ID, SignalKDataStream::OnSocketEvent)
135 EVT_TIMER(TIMER_SOCKET + 3, SignalKDataStream::OnSocketReadWatchdogTimer)
136 END_EVENT_TABLE()
137
138 SignalKDataStream::SignalKDataStream(wxEvtHandler *input_consumer,
139 const ConnectionParams *params)
140 : DataStream(input_consumer, params),
141 m_params(params),
142 m_sock(0),
143 m_brx_connect_event(false)
144
145 {
146 m_addr.Hostname(params->NetworkAddress);
147 m_addr.Service(params->NetworkPort);
148 m_socket_timer.SetOwner(this, TIMER_SOCKET + 2);
149 m_socketread_watchdog_timer.SetOwner(this, TIMER_SOCKET + 3);
150 m_useWebSocket = true;
151 m_wsThread = NULL;
152 m_threadActive = false;
153 m_eventHandler = new OCPN_WebSocketMessageHandler( this, GetConsumer());
154
155
156 Open();
157
158 }
159
~SignalKDataStream()160 SignalKDataStream::~SignalKDataStream(){
161
162 if(m_useWebSocket){
163 delete m_eventHandler;
164 m_eventHandler = NULL;
165 s_wsConsumer = NULL;
166 }
167 else{
168 if (GetSock()->IsOk()){
169 char unsub[] = "{\"context\":\"*\",\"unsubscribe\":[{\"path\":\"*\"}]}\r\n";
170 GetSock()->Write(unsub, strlen(unsub));
171 }
172 }
173
174 Close();
175 }
176
Open(void)177 void SignalKDataStream::Open(void) {
178
179 wxString discoveredIP;
180 int discoveredPort;
181
182 if(m_useWebSocket){
183 std::string serviceIdent = std::string("_signalk-ws._tcp.local."); // Works for node.js server
184 if(m_params->AutoSKDiscover){
185 if( DiscoverSKServer( serviceIdent, discoveredIP, discoveredPort, 1) ) { // 1 second scan
186 wxLogDebug(wxString::Format(_T("SK server autodiscovery finds WebSocket service: %s:%d"), discoveredIP.c_str(), discoveredPort));
187 m_addr.Hostname(discoveredIP);
188 m_addr.Service(discoveredPort);
189
190 // Update the connection params, by pointer to item in global params array
191 ConnectionParams *params = (ConnectionParams *)m_params; // non-const
192 params->NetworkAddress = discoveredIP;
193 params->NetworkPort = discoveredPort;
194 }
195 else
196 wxLogDebug(_T("SK server autodiscovery finds no WebSocket server."));
197 }
198
199 OpenWebSocket();
200 }
201 else{
202 std::string serviceIdent = std::string("_signalk-ws._tcp.local"); // Works for node.js server
203 if(m_params->AutoSKDiscover){
204 if( DiscoverSKServer( serviceIdent, discoveredIP, discoveredPort, 1) ) { // 1 second scan
205 wxLogMessage(wxString::Format(_T("SK server autodiscovery finds REST service: %s:%d"), discoveredIP.c_str(), discoveredPort));
206 m_addr.Hostname(discoveredIP);
207 m_addr.Service(discoveredPort);
208 }
209 else
210 wxLogMessage(_T("SK server autodiscovery finds no REST server."));
211 }
212
213 OpenTCPSocket();
214 }
215 }
216
DiscoverSKServer(std::string serviceIdent,wxString & ip,int & port,int tSec)217 bool SignalKDataStream::DiscoverSKServer( std::string serviceIdent, wxString &ip, int &port, int tSec){
218 #if 0
219 std::vector<Zeroconf::mdns_responce> result;
220 bool st = Zeroconf::Resolve(serviceIdent.c_str(), tSec, &result);
221
222 for(size_t i = 0 ; i < result.size() ; i++){
223 sockaddr_storage sas = result[i].peer; // Address of the responded machine
224 sockaddr_in *sai = (sockaddr_in*)&sas;
225 ip = wxString(inet_ntoa(sai->sin_addr));
226
227 std::vector<uint8_t> data = result[i].data;
228
229 std::vector<Zeroconf::Detail::mdns_record> records = result[i].records;
230 for(size_t j = 0 ; j < records.size() ; j++){
231
232 uint16_t type = records[j].type;
233 if(type == 33){ // SRV
234 size_t pos = records[j].pos;
235 //size_t len = records[j].len;
236 //std::string name = records[j].name;
237
238 // TODO This is pretty ugly, but I cannot find a definition of SRV record
239 unsigned char portHi = data[pos + 16];
240 unsigned char portLo = data[pos + 17];
241 port = (portHi * 256) + portLo;
242 return true;
243 }
244 }
245 }
246 return false;
247 #else
248 wxServDisc *servscan = new wxServDisc(gFrame, wxString(serviceIdent.c_str()), QTYPE_PTR);
249
250 for(int i = 0 ; i < 10 ; i++){
251 if(servscan->getResultCount()){
252 auto result = servscan->getResults().at(0);
253 delete servscan;
254 //wxSleep(5);
255
256 //return false;
257 wxServDisc *namescan = new wxServDisc(0, result.name, QTYPE_SRV);
258 for(int j=0 ; j < 10 ; j++){
259 if(namescan->getResultCount()){
260 auto namescanResult = namescan->getResults().at(0);
261 port = namescanResult.port;
262 delete namescan;
263
264 wxServDisc *addrscan = new wxServDisc(0, namescanResult.name, QTYPE_A);
265 for(int k=0 ; k < 10 ; k++){
266 if(addrscan->getResultCount()){
267 auto addrscanResult = addrscan->getResults().at(0);
268 ip = addrscanResult.ip;
269 delete addrscan;
270 return true;
271 break;
272 }
273 else{
274 wxYield();
275 wxMilliSleep(1000);
276 }
277 }
278 delete addrscan;
279 return false;
280 }
281 else{
282 wxYield();
283 wxMilliSleep(1000);
284 }
285 }
286 delete namescan;
287 return false;
288 }
289 else{
290 wxYield();
291 wxMilliSleep(1000);
292 }
293 }
294
295 delete servscan;
296 return false;
297
298 //wxString a = servscan.getResults().at(0).ip;
299 //wxServDisc namescan(0, servscan->getResults().at(0).name, QTYPE_SRV);
300 //wxServDisc addrscan(0, namescan.getResults().at(0).name, QTYPE_A);
301 //wxString addr = addrscan.getResults().at(0).ip;
302 #endif
303 }
304
OpenTCPSocket()305 void SignalKDataStream::OpenTCPSocket()
306 {
307 wxLogMessage(wxString::Format(_T("Opening Signal K TCPSocket client: %s"),
308 m_params->GetDSPort().c_str()));
309
310 SetSock(new wxSocketClient());
311 GetSock()->SetEventHandler(*this, SIGNALK_SOCKET_ID);
312 GetSock()->SetNotify(wxSOCKET_CONNECTION_FLAG | wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG);
313 GetSock()->Notify(TRUE);
314 GetSock()->SetTimeout(1); // Short timeout
315
316 SetConnectTime(wxDateTime::Now());
317
318 wxSocketClient* tcp_socket = static_cast<wxSocketClient*>(GetSock());
319 tcp_socket->Connect(GetAddr(), FALSE);
320 SetBrxConnectEvent(false);
321
322 }
323
324 int sdogval;
OnSocketReadWatchdogTimer(wxTimerEvent & event)325 void SignalKDataStream::OnSocketReadWatchdogTimer(wxTimerEvent& event)
326 {
327 m_dog_value--;
328 sdogval++;
329
330 if( m_dog_value <= 0 ) { // No receive in n seconds, assume connection lost
331 if(m_useWebSocket){
332 wxLogMessage( wxString::Format(_T(" WebSocket SignalKDataStream watchdog timeout: %s"), GetPort().c_str()) );
333
334 //printf("DOGTIME %d\n", sdogval);
335 CloseWebSocket();
336 OpenWebSocket();
337 SetWatchdog( N_DOG_TIMEOUT_RECONNECT );
338 }
339 else{
340 wxLogMessage( wxString::Format(_T(" TCP SignalKDataStream watchdog timeout: %s"), GetPort().c_str()) );
341
342 if(GetProtocol() == TCP ) {
343 wxSocketClient* tcp_socket = dynamic_cast<wxSocketClient*>(GetSock());
344 if(tcp_socket) {
345 tcp_socket->Close();
346 }
347 GetSocketTimer()->Start(5000, wxTIMER_ONE_SHOT); // schedule a reconnect
348 GetSocketThreadWatchdogTimer()->Stop();
349 }
350 }
351 }
352
353 GetSocketThreadWatchdogTimer()->Start(1000, wxTIMER_ONE_SHOT); // re-Start the dog
354 }
355
OnTimerSocket(wxTimerEvent & event)356 void SignalKDataStream::OnTimerSocket(wxTimerEvent& event)
357 {
358 // Attempt a connection
359 wxSocketClient* tcp_socket = dynamic_cast<wxSocketClient*>(GetSock());
360 if(tcp_socket) {
361 if(tcp_socket->IsDisconnected() ) {
362 SetBrxConnectEvent(false);
363 tcp_socket->Connect(GetAddr(), FALSE);
364 GetSocketTimer()->Start(5000, wxTIMER_ONE_SHOT); // schedule another attempt
365 }
366 }
367 }
368
OnSocketEvent(wxSocketEvent & event)369 void SignalKDataStream::OnSocketEvent(wxSocketEvent& event)
370 {
371 #define RD_BUF_SIZE 4096
372
373
374 switch(event.GetSocketEvent())
375 {
376 case wxSOCKET_CONNECTION :
377 {
378 wxLogMessage( wxString::Format(_T("SignalKDataStream connection established: %s"),
379 GetPort().c_str()) );
380 ResetWatchdog(); // feed the dog
381 if (GetSock()->IsOk())
382 (void) SetOutputSocketOptions(GetSock());
383 GetSocketTimer()->Stop();
384 SetBrxConnectEvent(true);
385 SetConnectTime(wxDateTime::Now());
386
387 char sub2[] = "{\"context\":\"vessels.self\",\"subscribe\":[{\"path\":\"navigation.*\"}]}\r\n";
388 GetSock()->Write(sub2, strlen(sub2));
389
390 break;
391 }
392
393 case wxSOCKET_LOST:
394 {
395
396 if (GetBrxConnectEvent())
397 wxLogMessage(wxString::Format(_T("SignalKDataStream connection lost: %s"), GetPort().c_str()));
398 wxDateTime now = wxDateTime::Now();
399 wxTimeSpan since_connect = now - GetConnectTime();
400
401 int retry_time = 5000; // default
402
403 // If the socket has never connected, and it is a short interval since the connect request
404 // then stretch the time a bit. This happens on Windows if there is no dafault IP on any interface
405
406 if(!GetBrxConnectEvent() && (since_connect.GetSeconds() < 5) )
407 retry_time = 10000; // 10 secs
408
409 GetSocketThreadWatchdogTimer()->Stop();
410 GetSocketTimer()->Start(retry_time, wxTIMER_ONE_SHOT); // Schedule a re-connect attempt
411
412 break;
413 }
414
415 case wxSOCKET_INPUT:
416 {
417 #define RD_BUF_SIZE 4096 // Allows handling of high volume data streams.
418 wxJSONReader jsonReader;
419 wxJSONValue root;
420
421 std::vector<char> data(RD_BUF_SIZE+1);
422 event.GetSocket()->Read(&data.front(),RD_BUF_SIZE);
423 if(!event.GetSocket()->Error())
424 {
425 size_t count = event.GetSocket()->LastCount();
426 m_sock_buffer.append(&data.front(), count);
427 }
428
429 bool done = false;
430
431 while(!done){
432 int sk_tail = 2;
433 size_t sk_end = m_sock_buffer.find_first_of("\r\n"); // detect the end of an SK string by finding the EOL
434
435 if (sk_end == wxString::npos) // No termination characters: continue reading
436 break;
437
438
439 size_t bufl = m_sock_buffer.size();
440 if(sk_end <= m_sock_buffer.size() - sk_tail){
441 std::string sk_line = m_sock_buffer.substr(0,sk_end + sk_tail);
442
443 // If, due to some logic error, the {nmea_end} parameter is larger than the length of the
444 // socket buffer, then std::string::substr() will throw an exception.
445 // We don't want that, so test for it.
446 // If found, the simple solution is to clear the socket buffer, and carry on
447 // This has been seen on high volume TCP feeds, Windows only.
448 // Hard to catch.....
449 if(sk_end > m_sock_buffer.size())
450 m_sock_buffer.clear();
451 else
452 m_sock_buffer = m_sock_buffer.substr(sk_end + sk_tail);
453
454 size_t sk_start = 0;
455 if(sk_start != wxString::npos){
456 sk_line = sk_line.substr(sk_start);
457 if(sk_line.size()){
458
459 int errors = jsonReader.Parse(sk_line, &root);
460 if (errors > 0) {
461 wxLogMessage(
462 wxString::Format(_T("SignalKDataStream ERROR: the JSON document is not well-formed:%d: %s"),
463 errors,
464 GetPort().c_str()));
465
466 } else {
467 if( GetConsumer() ) {
468
469 #if 0
470 wxString dbg;
471 wxJSONWriter writer;
472 writer.Write(root, dbg);
473
474 wxString msg( _T("SignalK TCP Socket Event sent to consumer:\n") );
475 msg.append(dbg);
476 wxLogMessage(msg);
477 #endif
478 OCPN_SignalKEvent signalKEvent(0, EVT_OCPN_SIGNALKSTREAM, sk_line);
479 GetConsumer()->AddPendingEvent(signalKEvent);
480 }
481 }
482 }
483 }
484 }
485 else
486 done = true;
487 }
488
489 // Prevent non-nmea junk from consuming to much memory by limiting carry-over buffer size.
490 if(m_sock_buffer.size()>RD_BUF_SIZE)
491 m_sock_buffer = m_sock_buffer.substr(m_sock_buffer.size()-RD_BUF_SIZE);
492
493 ResetWatchdog(); // feed the dog
494
495 break;
496 }
497 default :
498 break;
499 }
500
501 }
502
503
SetOutputSocketOptions(wxSocketBase * sock)504 bool SignalKDataStream::SetOutputSocketOptions(wxSocketBase* sock)
505 {
506 int ret;
507 int nagleDisable=1;
508 ret = sock->SetOption(IPPROTO_TCP,TCP_NODELAY,&nagleDisable, sizeof(nagleDisable));
509 unsigned long outbuf_size = 1024;
510 return (sock->SetOption(SOL_SOCKET,SO_SNDBUF,&outbuf_size, sizeof(outbuf_size)) && ret);
511 }
512
Close()513 void SignalKDataStream::Close()
514 {
515 if(m_useWebSocket){
516 wxLogMessage( _T("Closing Signal K WebSocket DataStream ") );
517 CloseWebSocket();
518 }
519 else{
520 wxLogMessage( wxString::Format(_T("Closing Signal K DataStream %s"), GetPort().c_str()) );
521 // Kill off the TCP Socket if alive
522 if(m_sock){
523 m_sock->Notify(FALSE);
524 m_sock->Destroy();
525 }
526 }
527
528 m_socket_timer.Stop();
529 m_socketread_watchdog_timer.Stop();
530
531
532 DataStream::Close();
533 }
534
535 // WebSocket implementation
536
537 class WebSocketThread : public wxThread
538 {
539 public: WebSocketThread( SignalKDataStream *parent, wxIPV4address address, wxEvtHandler *consumer );
540 virtual void *Entry();
541 private:
542 static void HandleMessage(const std::string & message);
543
544 wxIPV4address m_address;
545 wxEvtHandler *m_consumer;
546 SignalKDataStream *m_parentStream;
547 };
548
549
WebSocketThread(SignalKDataStream * parent,wxIPV4address address,wxEvtHandler * consumer)550 WebSocketThread::WebSocketThread( SignalKDataStream *parent, wxIPV4address address, wxEvtHandler *consumer)
551 {
552 m_address = address;
553 m_consumer = consumer;
554 m_parentStream = parent;
555 }
556
Entry()557 void *WebSocketThread::Entry()
558 {
559 using easywsclient::WebSocket;
560
561 m_parentStream->SetThreadRunning(true);
562
563 s_wsConsumer = m_consumer;
564
565 wxString host = m_address.IPAddress();
566 int port = m_address.Service();
567
568 // Craft the address string
569 std::stringstream wsAddress;
570 wsAddress << "ws://" << host.mb_str() << ":" << port << "/signalk/v1/stream?subscribe=all&sendCachedValues=false" ;
571
572 WebSocket::pointer ws = WebSocket::from_url(wsAddress.str());
573 if(ws == NULL){
574 //printf("No Connect\n");
575 m_parentStream->SetThreadRunning(false);
576 return 0;
577 }
578 while (true) {
579 if(TestDestroy()){
580 //printf("receiving delete\n");
581 ws->close();
582 }
583
584 if(ws->getReadyState() == WebSocket::CLOSED){
585 //printf("closed\n");
586 break;
587 }
588 ws->poll(10);
589 if(ws->getReadyState() == WebSocket::OPEN){
590 ws->dispatch(HandleMessage);
591 }
592 }
593
594 //printf("ws delete\n");
595 delete ws;
596
597 m_parentStream->SetThreadRunning(false);
598
599 return 0;
600 }
601
HandleMessage(const std::string & message)602 void WebSocketThread::HandleMessage(const std::string & message)
603 {
604 if(s_wsConsumer){
605 OCPN_SignalKEvent signalKEvent(0, EVT_OCPN_SIGNALKSTREAM, message);
606 s_wsConsumer->AddPendingEvent(signalKEvent);
607 }
608
609 }
610
611
612
613
OpenWebSocket()614 void SignalKDataStream::OpenWebSocket()
615 {
616 //printf("OpenWebSocket\n");
617 wxLogMessage(wxString::Format(_T("Opening Signal K WebSocket client: %s"),
618 m_params->GetDSPort().c_str()));
619
620 // Start a thread to run the client without blocking
621
622 m_wsThread = new WebSocketThread(this, GetAddr(), m_eventHandler);
623 if ( m_wsThread->Create() != wxTHREAD_NO_ERROR ) {
624 wxLogError(wxT("Can't create WebSocketThread!"));
625
626 return;
627 }
628
629 ResetWatchdog();
630 GetSocketThreadWatchdogTimer()->Start(1000, wxTIMER_ONE_SHOT); // Start the dog
631
632 m_wsThread->Run();
633 }
634
CloseWebSocket()635 void SignalKDataStream::CloseWebSocket()
636 {
637 if(m_wsThread){
638 if(IsThreadRunning()){
639 //printf("sending delete\n");
640 m_wsThread->Delete();
641 wxMilliSleep(100);
642
643 int nDeadman = 0;
644 while(IsThreadRunning() && (++nDeadman < 200)){ // spin for max 2 secs.
645 wxMilliSleep(10);
646 }
647 //printf("Closed in %d\n", nDeadman);
648 wxMilliSleep(100);
649 }
650 }
651 }
652