1 /*
2 LinKNX KNX home automation platform
3 Copyright (C) 2007-2009 Jean-François Meessen <linknx@ouaye.net>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20 #include <iostream>
21 #include <iomanip>
22 #include "ioport.h"
23 #include <fcntl.h>
24 #include <unistd.h>
25 #include <unistd.h>
26
27 Logger& IOPort::logger_m(Logger::getInstance("IOPort"));
28 Logger& RxThread::logger_m(Logger::getInstance("RxThread"));
29 Logger& UdpIOPort::logger_m(Logger::getInstance("UdpIOPort"));
30 Logger& TcpClientIOPort::logger_m(Logger::getInstance("TcpClientIOPort"));
31 Logger& SerialIOPort::logger_m(Logger::getInstance("SerialIOPort"));
32
33 IOPortManager* IOPortManager::instance_m;
34
IOPortManager()35 IOPortManager::IOPortManager()
36 {}
37
~IOPortManager()38 IOPortManager::~IOPortManager()
39 {
40 IOPortMap_t::iterator it;
41 for (it = portMap_m.begin(); it != portMap_m.end(); it++) {
42 delete (*it).second;
43 }
44 }
45
instance()46 IOPortManager* IOPortManager::instance()
47 {
48 if (instance_m == 0)
49 instance_m = new IOPortManager();
50 return instance_m;
51 }
52
getPort(const std::string & id)53 IOPort* IOPortManager::getPort(const std::string& id)
54 {
55 IOPortMap_t::iterator it = portMap_m.find(id);
56 if (it == portMap_m.end())
57 return 0;
58 return (*it).second;
59 }
60
addPort(IOPort * port)61 void IOPortManager::addPort(IOPort* port)
62 {
63 if (!portMap_m.insert(IOPortPair_t(port->getID(), port)).second)
64 throw ticpp::Exception("IO Port ID already exists");
65 }
66
removePort(IOPort * port)67 void IOPortManager::removePort(IOPort* port)
68 {
69 IOPortMap_t::iterator it = portMap_m.find(port->getID());
70 if (it != portMap_m.end())
71 {
72 delete it->second;
73 portMap_m.erase(it);
74 }
75 }
76
importXml(ticpp::Element * pConfig)77 void IOPortManager::importXml(ticpp::Element* pConfig)
78 {
79 ticpp::Iterator< ticpp::Element > child("ioport");
80 for ( child = pConfig->FirstChildElement("ioport", false); child != child.end(); child++ )
81 {
82 std::string id = child->GetAttribute("id");
83 bool del = child->GetAttribute("delete") == "true";
84 IOPortMap_t::iterator it = portMap_m.find(id);
85 if (it != portMap_m.end())
86 {
87 IOPort* port = it->second;
88
89 if (del)
90 {
91 delete port;
92 portMap_m.erase(it);
93 }
94 else
95 {
96 port->importXml(&(*child));
97 portMap_m.insert(IOPortPair_t(id, port));
98 }
99 }
100 else
101 {
102 if (del)
103 throw ticpp::Exception("IO Port not found");
104 IOPort* port = IOPort::create(&(*child));
105 portMap_m.insert(IOPortPair_t(id, port));
106 }
107 }
108
109 }
110
exportXml(ticpp::Element * pConfig)111 void IOPortManager::exportXml(ticpp::Element* pConfig)
112 {
113 IOPortMap_t::iterator it;
114 for (it = portMap_m.begin(); it != portMap_m.end(); it++)
115 {
116 ticpp::Element pElem("ioport");
117 (*it).second->exportXml(&pElem);
118 pConfig->LinkEndChild(&pElem);
119 }
120 }
121
IOPort()122 IOPort::IOPort()
123 {}
124
~IOPort()125 IOPort::~IOPort()
126 {
127 }
128
create(const std::string & type)129 IOPort* IOPort::create(const std::string& type)
130 {
131 if (type == "" || type == "udp")
132 return new UdpIOPort();
133 else if (type == "tcp")
134 return new TcpClientIOPort();
135 else if (type == "serial")
136 return new SerialIOPort();
137 else
138 return 0;
139 }
140
create(ticpp::Element * pConfig)141 IOPort* IOPort::create(ticpp::Element* pConfig)
142 {
143 std::string type = pConfig->GetAttribute("type");
144 IOPort* obj = IOPort::create(type);
145 if (obj == 0)
146 {
147 std::stringstream msg;
148 msg << "IOPort type not supported: '" << type << "'" << std::endl;
149 throw ticpp::Exception(msg.str());
150 }
151 obj->importXml(pConfig);
152 return obj;
153 }
154
importXml(ticpp::Element * pConfig)155 void IOPort::importXml(ticpp::Element* pConfig)
156 {
157 id_m = pConfig->GetAttribute("id");
158 if (isRxEnabled())
159 rxThread_m.reset(new RxThread(this));
160 }
161
exportXml(ticpp::Element * pConfig)162 void IOPort::exportXml(ticpp::Element* pConfig)
163 {
164 pConfig->SetAttribute("id", id_m);
165 }
166
addListener(IOPortListener * l)167 void IOPort::addListener(IOPortListener *l)
168 {
169 if (rxThread_m.get())
170 rxThread_m->addListener(l);
171 }
172
removeListener(IOPortListener * l)173 bool IOPort::removeListener(IOPortListener *l)
174 {
175 if (rxThread_m.get())
176 return (rxThread_m->removeListener(l));
177 else
178 return false;
179 }
180
addConnectListener(ConnectCondition * c)181 void IOPort::addConnectListener(ConnectCondition *c)
182 {
183 connectListenerList_m.push_back(c);
184 }
185
removeConnectListener(ConnectCondition * c)186 bool IOPort::removeConnectListener(ConnectCondition *c)
187 {
188 connectListenerList_m.remove(c);
189 return true;
190 }
191
onConnect()192 void IOPort::onConnect()
193 {
194 ConnectListenerList_t::iterator it;
195 for (it = connectListenerList_m.begin(); it != connectListenerList_m.end(); it++)
196 {
197 (*it)->onConnect();
198 }
199 }
200
RxThread(IOPort * port)201 RxThread::RxThread(IOPort *port) : port_m(port), isRunning_m(false), stop_m(0)
202 {}
203
~RxThread()204 RxThread::~RxThread()
205 {
206 Stop();
207 }
208
addListener(IOPortListener * listener)209 void RxThread::addListener(IOPortListener *listener)
210 {
211 if (listenerList_m.empty())
212 Start();
213 listenerList_m.push_back(listener);
214 }
215
removeListener(IOPortListener * listener)216 bool RxThread::removeListener(IOPortListener *listener)
217 {
218 listenerList_m.remove(listener);
219 if (listenerList_m.empty())
220 Stop();
221 return true;
222 }
223
Run(pth_sem_t * stop1)224 void RxThread::Run (pth_sem_t * stop1)
225 {
226 stop_m = pth_event (PTH_EVENT_SEM, stop1);
227 uint8_t buf[1024];
228 int retval;
229 logger_m.debugStream() << "Start IO Port loop." << endlog;
230 while ((retval = port_m->get(buf, sizeof(buf), stop_m)) > 0)
231 {
232 ListenerList_t::iterator it;
233 for (it = listenerList_m.begin(); it != listenerList_m.end(); it++)
234 {
235 // logger_m.debugStream() << "Calling onDataReceived on listener for " << port_m->getID() << endlog;
236 (*it)->onDataReceived(buf, retval);
237 }
238 }
239 logger_m.debugStream() << "Out of IO Port loop." << endlog;
240 pth_event_free (stop_m, PTH_FREE_THIS);
241 stop_m = 0;
242 }
243
UdpIOPort()244 UdpIOPort::UdpIOPort() : sockfd_m(-1), port_m(0), rxport_m(0)
245 {
246 memset (&addr_m, 0, sizeof (addr_m));
247 }
248
~UdpIOPort()249 UdpIOPort::~UdpIOPort()
250 {
251 if (sockfd_m >= 0)
252 close(sockfd_m);
253 Logger::getInstance("UdpIOPort").debugStream() << "Deleting UdpIOPort " << endlog;
254 }
255
importXml(ticpp::Element * pConfig)256 void UdpIOPort::importXml(ticpp::Element* pConfig)
257 {
258 memset (&addr_m, 0, sizeof (addr_m));
259 addr_m.sin_family = AF_INET;
260 pConfig->GetAttribute("port", &port_m);
261 addr_m.sin_port = htons(port_m);
262 host_m = pConfig->GetAttribute("host");
263 addr_m.sin_addr.s_addr = inet_addr(host_m.c_str());
264 pConfig->GetAttributeOrDefault("rxport", &rxport_m, 0);
265 IOPort::importXml(pConfig);
266
267 sockfd_m = socket(AF_INET, SOCK_DGRAM, 0);
268 if (sockfd_m >= 0 && rxport_m > 0) {
269 struct sockaddr_in addr;
270 bzero(&addr,sizeof(addr));
271 addr.sin_family = AF_INET;
272 addr.sin_port = htons(rxport_m);
273 addr.sin_addr.s_addr = htonl(INADDR_ANY);
274 if (bind(sockfd_m, (struct sockaddr *)&addr,sizeof(addr)) < 0) /* error */
275 {
276 logger_m.errorStream() << "Unable to bind socket for ioport " << getID() << endlog;
277 }
278 }
279 else {
280 logger_m.errorStream() << "Unable to create socket for ioport " << getID() << endlog;
281 }
282
283
284 logger_m.infoStream() << "UdpIOPort configured for host " << host_m << " and port " << port_m << endlog;
285 }
286
exportXml(ticpp::Element * pConfig)287 void UdpIOPort::exportXml(ticpp::Element* pConfig)
288 {
289 IOPort::exportXml(pConfig);
290 pConfig->SetAttribute("type", "udp");
291 pConfig->SetAttribute("host", host_m);
292 pConfig->SetAttribute("port", port_m);
293 if (rxport_m > 0)
294 pConfig->SetAttribute("rxport", rxport_m);
295 }
296
send(const uint8_t * buf,int len)297 int UdpIOPort::send(const uint8_t* buf, int len)
298 {
299 logger_m.infoStream() << "send(buf, len=" << len << "):"
300 << buf << endlog;
301
302 if (sockfd_m >= 0) {
303 ssize_t nbytes = pth_sendto(sockfd_m, buf, len, 0,
304 (const struct sockaddr *) &addr_m, sizeof (addr_m));
305 if (nbytes == len) {
306 return nbytes;
307 }
308 else {
309 logger_m.errorStream() << "Unable to send to socket for ioport " << getID() << endlog;
310 }
311 }
312 return -1;
313 }
314
get(uint8_t * buf,int len,pth_event_t stop)315 int UdpIOPort::get(uint8_t* buf, int len, pth_event_t stop)
316 {
317 logger_m.debugStream() << "get(buf, len=" << len << "):"
318 << buf << endlog;
319 if (sockfd_m >= 0) {
320 socklen_t rl;
321 sockaddr_in r;
322 rl = sizeof (r);
323 memset (&r, 0, sizeof (r));
324 ssize_t i = pth_recvfrom_ev(sockfd_m, buf, len, 0,
325 (struct sockaddr *) &r, &rl, stop);
326 // logger_m.debugStream() << "Out of recvfrom " << i << " rl=" << rl << endlog;
327 if (i > 0 && rl == sizeof (r))
328 {
329 std::string msg(reinterpret_cast<const char*>(buf), i);
330 logger_m.debugStream() << "Received '" << msg << "' on ioport " << getID() << endlog;
331 return i;
332 }
333 }
334 return -1;
335 }
336
Socket(TcpClientIOPort * ioport)337 TcpClientIOPort::Socket::Socket(TcpClientIOPort *ioport)
338 : ioport_m(ioport), sockfd_m(-1)
339 {
340 // Reuse permanent socket?
341 if(ioport->permanent_m)
342 {
343 sockfd_m = ioport->permanentSockfd_m;
344 }
345
346 // Allocate new socket?
347 if(sockfd_m < 0)
348 {
349 // Allocate a new socket.
350 sockfd_m = socket(AF_INET, SOCK_STREAM, 0);
351 if (sockfd_m >= 0) {
352 if (pth_connect(sockfd_m, (const struct sockaddr *)&ioport->addr_m, sizeof (ioport_m->addr_m)) < 0) {
353 logger_m.errorStream() << "Unable to connect to server for ioport " << ioport->getID() << endlog;
354 }
355 }
356 else {
357 logger_m.errorStream() << "Unable to create socket for ioport " << ioport_m->getID() << endlog;
358 }
359 }
360
361 // Keep socket?
362 if (ioport->permanent_m)
363 {
364 ioport->permanentSockfd_m = sockfd_m;
365 }
366 }
367
~Socket()368 TcpClientIOPort::Socket::~Socket()
369 {
370 // Close socket unless connection is permanent.
371 if (sockfd_m > 0 && !ioport_m->permanent_m)
372 {
373 if (close(sockfd_m) < 0) {
374 logger_m.errorStream() << "Unable to close connection to server for ioport " << ioport_m->getID() << endlog;
375 }
376 }
377 }
378
TcpClientIOPort()379 TcpClientIOPort::TcpClientIOPort() : port_m(0), permanentSockfd_m(-1)
380 {
381 memset (&addr_m, 0, sizeof (addr_m));
382 }
383
~TcpClientIOPort()384 TcpClientIOPort::~TcpClientIOPort()
385 {
386 // Close permanent socket if applicable.
387 if (permanentSockfd_m >= 0)
388 close(permanentSockfd_m);
389 Logger::getInstance("TcpClientIOPort").debugStream() << "Deleting TcpClientIOPort " << endlog;
390 }
391
importXml(ticpp::Element * pConfig)392 void TcpClientIOPort::importXml(ticpp::Element* pConfig)
393 {
394 memset (&addr_m, 0, sizeof (addr_m));
395 addr_m.sin_family = AF_INET;
396 pConfig->GetAttribute("port", &port_m);
397 addr_m.sin_port = htons(port_m);
398 host_m = pConfig->GetAttribute("host");
399 addr_m.sin_addr.s_addr = inet_addr(host_m.c_str());
400 std::string perm = pConfig->GetAttribute("permanent");
401 permanent_m = (perm == "true" || perm == "yes");
402 IOPort::importXml(pConfig);
403
404 logger_m.infoStream() << "TcpClientIOPort " << (permanent_m?"(permanent) ":"") << "configured for host " << host_m << " and port " << port_m << endlog;
405 }
406
exportXml(ticpp::Element * pConfig)407 void TcpClientIOPort::exportXml(ticpp::Element* pConfig)
408 {
409 IOPort::exportXml(pConfig);
410 pConfig->SetAttribute("type", "tcp");
411 pConfig->SetAttribute("host", host_m);
412 pConfig->SetAttribute("port", port_m);
413 if (permanent_m)
414 pConfig->SetAttribute("permanent", "true");
415 }
416
send(const uint8_t * buf,int len)417 int TcpClientIOPort::send(const uint8_t* buf, int len)
418 {
419 logger_m.infoStream() << "send(buf, len=" << len << "):"
420 << buf << endlog;
421
422 Socket so(this);
423 if (so.sockfd_m >= 0) {
424 ssize_t nbytes = pth_write(so.sockfd_m, buf, len);
425 if (nbytes == len) {
426 return nbytes;
427 }
428 else {
429 logger_m.errorStream() << "Error while sending data for ioport " << getID() << endlog;
430 }
431 }
432 return -1;
433 }
434
get(uint8_t * buf,int len,pth_event_t stop)435 int TcpClientIOPort::get(uint8_t* buf, int len, pth_event_t stop)
436 {
437 logger_m.debugStream() << "get(buf, len=" << len << ")" << endlog;
438 bool retry = true;
439 while (retry) {
440 Socket so(this);
441 if (so.sockfd_m >= 0) {
442 ssize_t i = pth_read_ev(so.sockfd_m, buf, len, stop);
443 logger_m.debugStream() << "Out of read " << i << endlog;
444 if (i > 0)
445 {
446 std::string msg(reinterpret_cast<const char*>(buf), i);
447 logger_m.debugStream() << "Received '" << msg << "' on ioport " << getID() << endlog;
448 return i;
449 }
450 else {
451 if (pth_event_status (stop) == PTH_STATUS_OCCURRED)
452 retry = false;
453 }
454 }
455 else {
456 struct timeval tv;
457 tv.tv_sec = 60;
458 tv.tv_usec = 0;
459 pth_select_ev(0,0,0,0,&tv,stop);
460 if (pth_event_status (stop) == PTH_STATUS_OCCURRED)
461 retry = false;
462 }
463 }
464 logger_m.debugStream() << "Abort get() on ioport " << getID() << endlog;
465 return -1;
466 }
467
SerialIOPort()468 SerialIOPort::SerialIOPort() : fd_m(-1)
469 {
470 memset (&newtio_m, 0, sizeof (newtio_m));
471 }
472
~SerialIOPort()473 SerialIOPort::~SerialIOPort()
474 {
475 if (fd_m >= 0) {
476 // restore old port settings
477 tcsetattr(fd_m, TCSANOW, &oldtio_m);
478 close(fd_m);
479 }
480 Logger::getInstance("SerialIOPort").debugStream() << "Deleting SerialIOPort " << endlog;
481 }
482
importXml(ticpp::Element * pConfig)483 void SerialIOPort::importXml(ticpp::Element* pConfig)
484 {
485 ErrorMessage err;
486 int speed;
487 struct termios newtio;
488 std::string framing = pConfig->GetAttributeOrDefault("framing", "8N1");
489 std::string flow = pConfig->GetAttributeOrDefault("flow", "none");
490 std::string mode = pConfig->GetAttributeOrDefault("mode", "text");
491 modeRaw_m = (mode == "raw");
492 memset (&newtio, 0, sizeof (newtio));
493 pConfig->GetAttribute("speed", &speed);
494 newtio.c_cflag = CLOCAL | CREAD;
495 newtio.c_iflag = ICRNL;
496 newtio.c_oflag = 0;
497 newtio.c_lflag = ICANON;
498
499 if (modeRaw_m)
500 {
501 timeout_m = RuleServer::parseDuration(pConfig->GetAttributeOrDefault("timeout", "0"), false, true) / 100;
502 pConfig->GetAttributeOrDefault("msg-length", &msglength_m, 255);
503 newtio.c_iflag = 0;
504 newtio.c_lflag = 0;
505 newtio.c_cc[VTIME] = timeout_m; // inter character timer (x100ms; 0=disabled)
506 newtio.c_cc[VMIN] = msglength_m; // block until timer expires or msglegth bytes are received
507 }
508 switch (framing[0]) {
509 case '5':
510 newtio.c_cflag |= CS5;
511 break;
512 case '6':
513 newtio.c_cflag |= CS6;
514 break;
515 case '7':
516 newtio.c_cflag |= CS7;
517 break;
518 case '8':
519 newtio.c_cflag |= CS8;
520 break;
521 default:
522 err << "Unsupported nb of data bits '" << framing[0] << "' for serial port";
523 err.logAndThrow(logger_m);
524 }
525 switch (framing[1]) {
526 case 'E':
527 newtio.c_cflag |= PARENB;
528 break;
529 case 'O':
530 newtio.c_cflag |= (PARENB | PARODD);
531 break;
532 case 'N':
533 newtio.c_iflag |= IGNPAR;
534 break;
535 default:
536 err << "Unsupported parity '" << framing[1] << "' for serial port";
537 err.logAndThrow(logger_m);
538 }
539
540 if (framing[2] == '2')
541 newtio.c_cflag |= CSTOPB;
542 else if (framing[2] != '1') {
543 err << "Unsupported nb of stop bits '" << framing[2] << "' for serial port";
544 err.logAndThrow(logger_m);
545 }
546
547 if (flow == "xon-xoff")
548 newtio.c_iflag |= (IXON | IXOFF);
549 else if (flow == "rts-cts")
550 newtio.c_cflag |= CRTSCTS;
551 else if (flow != "none") {
552 err << "Unsupported flow control '" << flow << "' for serial port";
553 err.logAndThrow(logger_m);
554 }
555
556 switch (speed) {
557 case 200:
558 speed_m = B200;
559 break;
560 case 300:
561 speed_m = B300;
562 break;
563 case 600:
564 speed_m = B600;
565 break;
566 case 1200:
567 speed_m = B1200;
568 break;
569 case 1800:
570 speed_m = B1800;
571 break;
572 case 2400:
573 speed_m = B2400;
574 break;
575 case 4800:
576 speed_m = B4800;
577 break;
578 case 9600:
579 speed_m = B9600;
580 break;
581 case 19200:
582 speed_m = B19200;
583 break;
584 case 38400:
585 speed_m = B38400;
586 break;
587 case 57600:
588 speed_m = B57600;
589 break;
590 case 115200:
591 speed_m = B115200;
592 break;
593 case 230400:
594 speed_m = B230400;
595 break;
596 default:
597 err << "Unsupported speed '" << speed << "' for serial port";
598 err.logAndThrow(logger_m);
599 }
600 cfsetispeed(&newtio, speed_m);
601 cfsetospeed(&newtio, speed_m);
602 pConfig->GetAttribute("dev", &dev_m);
603 newtio_m = newtio;
604
605 IOPort::importXml(pConfig);
606
607 fd_m = open(dev_m.c_str(), O_RDWR | O_NOCTTY );
608 if (fd_m >= 0) {
609 // Save previous port settings
610 tcgetattr(fd_m, &oldtio_m);
611 tcflush(fd_m, TCIFLUSH);
612 tcsetattr(fd_m, TCSANOW, &newtio_m);
613 logger_m.infoStream() << "SerialIOPort configured for device " << dev_m << endlog;
614 onConnect();
615 }
616 else {
617 logger_m.errorStream() << "Unable to open device '" << dev_m << "' for ioport " << getID() << endlog;
618 }
619 }
620
exportXml(ticpp::Element * pConfig)621 void SerialIOPort::exportXml(ticpp::Element* pConfig)
622 {
623 int speed;
624 IOPort::exportXml(pConfig);
625 pConfig->SetAttribute("type", "serial");
626 pConfig->SetAttribute("dev", dev_m);
627 switch (speed_m) {
628 case B200:
629 speed = 200;
630 break;
631 case B300:
632 speed = 300;
633 break;
634 case B600:
635 speed = 600;
636 break;
637 case B1200:
638 speed = 1200;
639 break;
640 case B1800:
641 speed = 1800;
642 break;
643 case B2400:
644 speed = 2400;
645 break;
646 case B4800:
647 speed = 4800;
648 break;
649 case B9600:
650 speed = 9600;
651 break;
652 case B19200:
653 speed = 19200;
654 break;
655 case B38400:
656 speed = 38400;
657 break;
658 case B57600:
659 speed = 57600;
660 break;
661 case B115200:
662 speed = 115200;
663 break;
664 case B230400:
665 speed = 230400;
666 break;
667 default:
668 speed = 9600;
669 break;
670 }
671 pConfig->SetAttribute("speed", speed);
672
673 std::string framing;
674 switch (newtio_m.c_cflag & CSIZE) {
675 case CS5:
676 framing.push_back('5');
677 break;
678 case CS6:
679 framing.push_back('6');
680 break;
681 case CS7:
682 framing.push_back('7');
683 break;
684 default:
685 case CS8:
686 framing.push_back('8');
687 break;
688 }
689 if ((newtio_m.c_cflag & PARENB) == 0)
690 framing.push_back('N');
691 else if (newtio_m.c_cflag & PARODD)
692 framing.push_back('O');
693 else
694 framing.push_back('E');
695
696 if (newtio_m.c_cflag & CSTOPB)
697 framing.push_back('2');
698 else
699 framing.push_back('1');
700
701 pConfig->SetAttribute("framing", framing);
702
703 if (newtio_m.c_cflag & CRTSCTS)
704 pConfig->SetAttribute("flow", "rts-cts");
705 else if (newtio_m.c_iflag & IXON)
706 pConfig->SetAttribute("flow", "xon-xoff");
707 else
708 pConfig->SetAttribute("flow", "none");
709
710 if (modeRaw_m)
711 {
712 pConfig->SetAttribute("mode", "raw");
713 if (timeout_m != 0)
714 pConfig->SetAttribute("timeout", RuleServer::formatDuration(timeout_m*100, true));
715 if (msglength_m != 255)
716 pConfig->SetAttribute("msg-length", msglength_m);
717 }
718 }
719
send(const uint8_t * buf,int len)720 int SerialIOPort::send(const uint8_t* buf, int len)
721 {
722 logger_m.infoStream() << "send(buf, len=" << len << "):"
723 << buf << endlog;
724
725 if (fd_m >= 0) {
726 ssize_t nbytes = pth_write(fd_m, buf, len);
727 if (nbytes == len) {
728 return nbytes;
729 }
730 else {
731 logger_m.errorStream() << "Unable to send to socket for ioport " << getID() << endlog;
732 }
733 }
734 return -1;
735 }
736
get(uint8_t * buf,int len,pth_event_t stop)737 int SerialIOPort::get(uint8_t* buf, int len, pth_event_t stop)
738 {
739 logger_m.debugStream() << "get(buf, len=" << len << ")" << endlog;
740 if (fd_m >= 0) {
741 ssize_t i = pth_read_ev(fd_m, buf, len, stop);
742 // logger_m.debugStream() << "Out of recvfrom " << i << " rl=" << rl << endlog;
743 if (i > 0)
744 {
745 std::string msg(reinterpret_cast<const char*>(buf), i);
746 if (logger_m.isDebugEnabled())
747 {
748 DbgStream dbg = logger_m.debugStream();
749 dbg << "Received message on ioport " << getID() << ": ";
750 if (modeRaw_m)
751 {
752 dbg << std::hex << std::setfill ('0') << std::setw (2);
753 for (uint8_t *p = buf; p < buf+i; p++)
754 dbg << (int)*p << " ";
755 dbg << std::dec << endlog;
756 }
757 else
758 dbg << msg << endlog;
759 }
760 return i;
761 }
762 }
763 return -1;
764 }
765
TxAction()766 TxAction::TxAction(): varFlags_m(0), hex_m(false)
767 {}
768
~TxAction()769 TxAction::~TxAction()
770 {}
771
importXml(ticpp::Element * pConfig)772 void TxAction::importXml(ticpp::Element* pConfig)
773 {
774 int i=0;
775 port_m = pConfig->GetAttribute("ioport");
776 std::string data = pConfig->GetAttribute("data");
777 if (!IOPortManager::instance()->getPort(port_m))
778 {
779 std::stringstream msg;
780 msg << "TxAction: IO Port ID not found: '" << port_m << "'" << std::endl;
781 throw ticpp::Exception(msg.str());
782 }
783
784 varFlags_m = 0;
785 if (pConfig->GetAttributeOrDefault("hex", "false") != "false")
786 {
787 hex_m = true;
788 while (i < data.length())
789 {
790 std::istringstream ss(data.substr(i, 2));
791 ss.setf(std::ios::hex, std::ios::basefield);
792 int value = 0;
793 ss >> value;
794 data_m.push_back(static_cast<char>(value));
795 i += 2;
796 }
797 logger_m.infoStream() << "TxAction: Configured to send hex data to ioport " << port_m << endlog;
798 }
799 else
800 {
801 data_m = data;
802
803 if (pConfig->GetAttribute("var") == "true")
804 {
805 varFlags_m = VarEnabled;
806 if (parseVarString(data, true))
807 varFlags_m |= VarData;
808 }
809 logger_m.infoStream() << "TxAction: Configured to send '" << data_m << "' to ioport " << port_m << endlog;
810 }
811 }
812
exportXml(ticpp::Element * pConfig)813 void TxAction::exportXml(ticpp::Element* pConfig)
814 {
815 pConfig->SetAttribute("type", "ioport-tx");
816 if (hex_m)
817 {
818 int i = 0;
819 pConfig->SetAttribute("hex", "true");
820 pConfig->SetAttribute("data", data_m);
821 std::ostringstream ss;
822 ss.setf(std::ios::hex, std::ios::basefield);
823 ss.fill('0');
824 while (i < data_m.length())
825 ss << std::setw(2) << int(data_m[i++]);
826 pConfig->SetAttribute("data", ss.str());
827 }
828 else
829 pConfig->SetAttribute("data", data_m);
830 pConfig->SetAttribute("ioport", port_m);
831 if (varFlags_m & VarEnabled)
832 pConfig->SetAttribute("var", "true");
833
834 Action::exportXml(pConfig);
835 }
836
Run(pth_sem_t * stop)837 void TxAction::Run (pth_sem_t * stop)
838 {
839 if (sleep(delay_m, stop))
840 return;
841 try
842 {
843 IOPort* port = IOPortManager::instance()->getPort(port_m);
844 if (!port)
845 throw ticpp::Exception("IO Port ID not found.");
846 sendData(port);
847 }
848 catch( ticpp::Exception& ex )
849 {
850 logger_m.warnStream() << "Error in TxAction on port '" << port_m << "': " << ex.m_details << endlog;
851 }
852 }
853
sendData(IOPort * port)854 void TxAction::sendData(IOPort* port)
855 {
856 std::string data = data_m;
857 if (varFlags_m & VarData)
858 parseVarString(data);
859 if (hex_m)
860 logger_m.infoStream() << "Execute TxAction send hex data to ioport " << port->getID() << endlog;
861 else
862 logger_m.infoStream() << "Execute TxAction send '" << data << "' to ioport " << port->getID() << endlog;
863 const uint8_t* u8data = reinterpret_cast<const uint8_t*>(data.c_str());
864 int len = data.length();
865 int ret = port->send(u8data, len);
866 while (ret < len) {
867 if (ret <= 0)
868 throw ticpp::Exception("Unable to send data.");
869 len -= ret;
870 u8data += ret;
871 ret = port->send(u8data, len);
872 }
873 }
874
RxCondition(ChangeListener * cl)875 RxCondition::RxCondition(ChangeListener* cl) : regexFlag_m(false), value_m(false), hex_m(false), pmatch_m(0), cl_m(cl)
876 {}
877
~RxCondition()878 RxCondition::~RxCondition()
879 {
880 IOPort* port = IOPortManager::instance()->getPort(port_m);
881 if (port)
882 port->removeListener(this);
883 if (regexFlag_m)
884 regfree(®ex_m);
885 if (pmatch_m)
886 delete pmatch_m;
887 std::vector<Object*>::iterator it;
888 for (it=objects_m.begin(); it!=objects_m.end(); it++)
889 if (*it)
890 (*it)->decRefCount();
891 }
892
evaluate()893 bool RxCondition::evaluate()
894 {
895 return value_m;
896 }
897
importXml(ticpp::Element * pConfig)898 void RxCondition::importXml(ticpp::Element* pConfig)
899 {
900 if (!cl_m)
901 throw ticpp::Exception("Rx condition on IO port is not supported in this context");
902 port_m = pConfig->GetAttribute("ioport");
903 exp_m = pConfig->GetAttribute("expected");
904 IOPort* port = IOPortManager::instance()->getPort(port_m);
905 if (!port)
906 {
907 std::stringstream msg;
908 msg << "RxCondition: IO Port ID not found: '" << port_m << "'" << std::endl;
909 throw ticpp::Exception(msg.str());
910 }
911 port->addListener(this);
912 regexFlag_m = pConfig->GetAttributeOrDefault("regex", "false") != "false";
913 hex_m = pConfig->GetAttributeOrDefault("hex", "false") != "false";
914 if (regexFlag_m)
915 {
916 if (regcomp(®ex_m, exp_m.c_str(), REG_EXTENDED) != 0) {
917 std::stringstream msg;
918 msg << "RxCondition: Invalid regular expression: '" << exp_m << "'" << std::endl;
919 throw ticpp::Exception(msg.str());
920 }
921 size_t nmatch = regex_m.re_nsub+1;
922 if (nmatch > 0)
923 {
924 pmatch_m = new regmatch_t[nmatch];
925 objects_m.resize(nmatch);
926 }
927 for (int i=0; i<nmatch; i++)
928 {
929 std::string id;
930 std::stringstream obj;
931 obj << "object" << i;
932 id = pConfig->GetAttributeOrDefault(obj.str(),"");
933 if (id != "")
934 objects_m[i] = ObjectController::instance()->getObject(id);
935 else
936 objects_m[i] = 0;
937 logger_m.debugStream() << "subgroup: " << i << " => Object '" << id << "'" << endlog;
938 }
939 }
940
941
942 logger_m.infoStream() << "RxCondition: configured to watch for '" << exp_m << "' on ioport " << port_m << endlog;
943 }
944
exportXml(ticpp::Element * pConfig)945 void RxCondition::exportXml(ticpp::Element* pConfig)
946 {
947 pConfig->SetAttribute("type", "ioport-rx");
948 pConfig->SetAttribute("expected", exp_m);
949 if (hex_m)
950 pConfig->SetAttribute("hex", "true" );
951 if (regexFlag_m)
952 pConfig->SetAttribute("regex", "true" );
953 std::vector<Object*>::iterator it;
954 int i=0;
955 for (it=objects_m.begin(); it!=objects_m.end(); it++)
956 {
957 if (*it)
958 {
959 std::stringstream obj;
960 obj << "object" << i;
961 pConfig->SetAttribute(obj.str(), (*it)->getID());
962 }
963 i++;
964 }
965
966 pConfig->SetAttribute("ioport", port_m);
967 }
968
statusXml(ticpp::Element * pStatus)969 void RxCondition::statusXml(ticpp::Element* pStatus)
970 {
971 pStatus->SetAttribute("type", "ioport-rx");
972 pStatus->SetAttribute("ioport", port_m);
973 }
974
onDataReceived(const uint8_t * buf,unsigned int len)975 void RxCondition::onDataReceived(const uint8_t* buf, unsigned int len)
976 {
977 std::string rx(reinterpret_cast<const char*>(buf), len);
978 if (hex_m)
979 {
980 int i = 0;
981 std::ostringstream ss;
982 ss.setf(std::ios::hex, std::ios::basefield);
983 ss.fill('0');
984 while (i < len)
985 ss << std::setw(2) << int(buf[i++]);
986 rx = ss.str();
987 }
988 if (cl_m)
989 {
990 if (regexFlag_m)
991 {
992 int status;
993 size_t nmatch = regex_m.re_nsub+1;
994 status = regexec(®ex_m, rx.c_str(), nmatch, pmatch_m, 0);
995 if (status == 0)
996 {
997 logger_m.debugStream() << "RxCondition: expected message received: '" << rx << "'" << regex_m.re_nsub << endlog;
998 for (int i=0; i<nmatch; i++)
999 {
1000 Object* obj = objects_m[i];
1001 regmatch_t match = pmatch_m[i];
1002 logger_m.debugStream() << "subgroup: " << i << " '" << match.rm_so << ":" << match.rm_eo << "'" << endlog;
1003 if ((match.rm_so != (size_t)-1) && (obj)) {
1004 std::string newValue(rx.c_str(), match.rm_so, match.rm_eo - match.rm_so);
1005 logger_m.debugStream() << "RxCondition: new value " << newValue << " found in regex for object " << obj->getID() << endlog;
1006
1007 if (hex_m && newValue.length() <= 8)
1008 {
1009 unsigned int value;
1010 std::istringstream val(newValue);
1011 val >> std::hex >> value;
1012 std::stringstream out;
1013 out << value;
1014 newValue = out.str();
1015 }
1016 try
1017 {
1018 obj->setValue(newValue);
1019 }
1020 catch( ticpp::Exception& ex )
1021 {
1022 logger_m.errorStream() << "RxCondition: Error cannot set value '" << newValue << "' to object '" << obj->getID() << "'" << endlog;
1023 }
1024 }
1025 }
1026 value_m = true;
1027 cl_m->onChange(0);
1028 value_m = false;
1029 cl_m->onChange(0);
1030
1031 }
1032 }
1033 else
1034 {
1035 rx.resize(exp_m.length());
1036 if (exp_m == rx)
1037 {
1038 logger_m.debugStream() << "RxCondition: expected message received: '" << exp_m << "'" << endlog;
1039 value_m = true;
1040 cl_m->onChange(0);
1041 value_m = false;
1042 cl_m->onChange(0);
1043 }
1044 }
1045 }
1046 }
1047
ConnectCondition(ChangeListener * cl)1048 ConnectCondition::ConnectCondition(ChangeListener* cl) : value_m(false), cl_m(cl)
1049 {}
1050
~ConnectCondition()1051 ConnectCondition::~ConnectCondition()
1052 {
1053 IOPort* port = IOPortManager::instance()->getPort(port_m);
1054 if (port)
1055 port->removeConnectListener(this);
1056 }
1057
evaluate()1058 bool ConnectCondition::evaluate()
1059 {
1060 return value_m;
1061 }
1062
importXml(ticpp::Element * pConfig)1063 void ConnectCondition::importXml(ticpp::Element* pConfig)
1064 {
1065 if (!cl_m)
1066 throw ticpp::Exception("Connect condition on IO port is not supported in this context");
1067 port_m = pConfig->GetAttribute("ioport");
1068
1069 IOPort* port = IOPortManager::instance()->getPort(port_m);
1070 if (!port)
1071 {
1072 std::stringstream msg;
1073 msg << "ConnectCondition: IO Port ID not found: '" << port_m << "'" << std::endl;
1074 throw ticpp::Exception(msg.str());
1075 }
1076
1077 if (!port->mustConnect())
1078 {
1079 std::stringstream msg;
1080 msg << "ConnectCondition: Only serial and TCP port can use connectCondition" << std::endl;
1081 throw ticpp::Exception(msg.str());
1082 }
1083
1084 port->addConnectListener(this);
1085 }
1086
exportXml(ticpp::Element * pConfig)1087 void ConnectCondition::exportXml(ticpp::Element* pConfig)
1088 {
1089 pConfig->SetAttribute("type", "ioport-connect");
1090 pConfig->SetAttribute("ioport", port_m);
1091 }
1092
statusXml(ticpp::Element * pStatus)1093 void ConnectCondition::statusXml(ticpp::Element* pStatus)
1094 {
1095 pStatus->SetAttribute("type", "ioport-connect");
1096 pStatus->SetAttribute("ioport", port_m);
1097 }
1098
onConnect()1099 void ConnectCondition::onConnect()
1100 {
1101 logger_m.debugStream() << "ConnectCondition: IOPort '" << port_m << "' is connected" << endlog;
1102 value_m = true;
1103 cl_m->onChange(0);
1104 value_m = false;
1105 cl_m->onChange(0);
1106 }
1107