1 /* $Header: /var/cvs/mbdyn/mbdyn/mbdyn-1.0/mbdyn/base/socketstreamdrive.cc,v 1.65 2017/08/28 10:34:39 masarati Exp $ */
2 /*
3 * MBDyn (C) is a multibody analysis code.
4 * http://www.mbdyn.org
5 *
6 * Copyright (C) 1996-2017
7 *
8 * Pierangelo Masarati <masarati@aero.polimi.it>
9 * Paolo Mantegazza <mantegazza@aero.polimi.it>
10 *
11 * Dipartimento di Ingegneria Aerospaziale - Politecnico di Milano
12 * via La Masa, 34 - 20156 Milano, Italy
13 * http://www.aero.polimi.it
14 *
15 * Changing this copyright notice is forbidden.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation (version 2 of the License).
20 *
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
30 */
31
32 /*
33 * Michele Attolico <attolico@aero.polimi.it>
34 */
35
36 /*
37 * (Portions)
38 *
39 * AUTHOR: Dr. Rudolf Jaeger <rudijaeger@yahoo.com>
40 * Copyright (C) 2008 all rights reserved.
41 *
42 * The copyright of this patch is trasferred
43 * to Pierangelo Masarati and Paolo Mantegazza
44 * for use in the software MBDyn as described
45 * in the GNU Public License version 2.1
46 */
47
48 #include "mbconfig.h" /* This goes first in every *.c,*.cc file */
49
50 #include "dataman.h"
51 #include "filedrv.h"
52 #include "streamdrive.h"
53 #include "socketstreamdrive.h"
54
55 #ifdef USE_SOCKET
56
57 #include "sock.h"
58
59 #include <string.h>
60 #include <sys/types.h>
61 #include <sys/select.h>
62 #include <sys/socket.h>
63 #include <netinet/in.h>
64 #include <netdb.h>
65 #include <stdio.h>
66 #include <stdlib.h>
67 #include <errno.h>
68 #include <sys/un.h>
69 #include <arpa/inet.h>
70
71 #include "rtai_in_drive.h"
72
73 #define DEFAULT_PORT 9012 /* intentionally unassigned by IANA */
74 #define DEFAULT_HOST "127.0.0.1"
75
SocketStreamDrive(unsigned int uL,const DriveHandler * pDH,UseSocket * pUS,bool c,const std::string & sFileName,integer nd,const std::vector<doublereal> & v0,StreamDrive::Modifier * pMod,unsigned int ie,bool bReceiveFirst,int flags,const struct timeval & st,StreamDriveEcho * pSDE)76 SocketStreamDrive::SocketStreamDrive(unsigned int uL,
77 const DriveHandler* pDH,
78 UseSocket *pUS, bool c,
79 const std::string& sFileName,
80 integer nd, const std::vector<doublereal>& v0,
81 StreamDrive::Modifier *pMod,
82 unsigned int ie, bool bReceiveFirst,
83 int flags,
84 const struct timeval& st,
85 StreamDriveEcho *pSDE)
86 : StreamDrive(uL, pDH, sFileName, nd, v0, c, pMod),
87 InputEvery(ie), bReceiveFirst(bReceiveFirst), InputCounter(ie - 1),
88 pUS(pUS), recv_flags(flags),
89 SocketTimeout(st),
90 pSDE(pSDE)
91 {
92 // NOTE: InputCounter is set to InputEvery - 1 so that input
93 // is expected at initialization (initial time) and then every
94 // InputEvery steps; for example, for InputEvery == 4, input
95 // is expected at:
96 // initial time
97 // initial time + 4 * timestep
98 // initial time + 8 * timestep
99 ASSERT(InputEvery > 0);
100
101 if (!bReceiveFirst) {
102 InputCounter -= InputEvery;
103 }
104
105 if (pSDE) {
106 pSDE->Init("SocketStreamDrive", uLabel, nd);
107 }
108 }
109
~SocketStreamDrive(void)110 SocketStreamDrive::~SocketStreamDrive(void)
111 {
112 if (pUS != 0) {
113 SAFEDELETE(pUS);
114 }
115
116 if (pSDE != 0) {
117 delete pSDE;
118 }
119 }
120
121 /* Scrive il contributo del DriveCaller al file di restart */
122 std::ostream&
Restart(std::ostream & out) const123 SocketStreamDrive::Restart(std::ostream& out) const
124 {
125 out << " file: " << uLabel << ", socket stream,"
126 " stream drive name, \"" << sFileName << "\"";
127 pUS->Restart(out);
128 return out << ", " << iNumDrives << ";" << std::endl;
129 }
130
131 void
ServePending(const doublereal & t)132 SocketStreamDrive::ServePending(const doublereal& t)
133 {
134
135 // by now, an abandoned drive is not read any more;
136 // should we retry or what?
137 if (pUS->Abandoned()) {
138 silent_cout("SocketStreamDrive(" << sFileName << "): "
139 "abandoned" << std::endl);
140 return;
141 }
142
143 ASSERT(pUS->Connected());
144
145 /* read only every InputEvery steps */
146 InputCounter++;
147 if (InputCounter != InputEvery) {
148 return;
149 }
150 InputCounter = 0;
151
152 int sock_nr = pUS->GetSock();
153 ssize_t rc = -1;
154 // Use socket timeout if set in input file; Default: 0
155 if (SocketTimeout.tv_sec || SocketTimeout.tv_usec) {
156 // Use Select() on the socket for automatic shutdown if
157 // socket clients fail.
158 fd_set readfds;
159
160 // Clear the set
161 FD_ZERO(&readfds);
162
163 // Add descriptors to the set
164 FD_SET(sock_nr, &readfds);
165
166 // Copy timeout because select(2) may overwrite it
167 struct timeval tv = SocketTimeout;
168
169 // Call select
170 rc = select(sock_nr + 1, &readfds, NULL, NULL, &tv);
171 switch (rc) {
172 case -1: {
173 int save_errno = errno;
174 char *err_msg = strerror(save_errno);
175
176 silent_cout("SocketStreamDrive"
177 "(" << sFileName << "): select failed"
178 << " (" << save_errno << ": "
179 << err_msg << ")" << std::endl);
180 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
181 }
182
183 case 0:
184 silent_cout("SocketStreamDrive"
185 "(" << sFileName << "): select timed out"
186 << std::endl);
187 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
188
189 default:
190 if (!FD_ISSET(sock_nr, &readfds)) {
191 silent_cout("SocketStreamDrive"
192 "(" << sFileName << "): "
193 "socket " << sock_nr << " reset"
194 << std::endl);
195 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
196 }
197 }
198 }
199
200 // Read data
201 // NOTE: flags __SHOULD__ contain MSG_WAITALL;
202 // however, it is not defined on some platforms (e.g. Cygwin)
203 // TODO: needs work for network independence!
204 rc = pUS->recv(&buf[0], size, recv_flags);
205
206 /* FIXME: no receive at first step? */
207 switch (rc) {
208 case 0:
209 do_abandon:;
210 silent_cout("SocketStreamDrive(" << sFileName << "): "
211 << "communication closed by host; abandoning..."
212 << std::endl);
213 pUS->Abandon();
214 break;
215
216 case -1: {
217 int save_errno = errno;
218
219 // some errno values may be legal
220 switch (save_errno) {
221 case EAGAIN:
222 if (recv_flags & MSG_DONTWAIT) {
223 // non-blocking
224 return;
225 }
226 break;
227
228 case ECONNRESET:
229 goto do_abandon;
230 }
231
232 char *err_msg = strerror(save_errno);
233 silent_cout("SocketStreamDrive(" << sFileName << ") failed "
234 "(" << save_errno << ": " << err_msg << ")"
235 << std::endl);
236 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
237 }
238
239 default:
240 if (pSDE) {
241 pSDE->EchoPrepare(&pdVal[1], iNumDrives);
242 }
243
244 // copy values from buffer
245 pMod->Modify(&pdVal[1], &buf[0]);
246
247 if (pSDE) {
248 pSDE->Echo(&pdVal[1], iNumDrives);
249 }
250 break;
251 }
252 }
253
254
255 /* legge i drivers tipo stream */
256
257 static Drive *
ReadStreamDrive(const DataManager * pDM,MBDynParser & HP,unsigned uLabel)258 ReadStreamDrive(const DataManager *pDM, MBDynParser& HP, unsigned uLabel)
259 {
260 bool bGotCreate(false);
261 bool bCreate(false);
262 unsigned short int port = -1;
263 std::string name;
264 std::string host;
265 std::string path;
266
267 if (HP.IsKeyWord("name") || HP.IsKeyWord("stream" "drive" "name")) {
268 const char *m = HP.GetStringWithDelims();
269 if (m == 0) {
270 silent_cerr("SocketStreamDrive(" << uLabel << "): "
271 "unable to read stream drive name "
272 "at line " << HP.GetLineData()
273 << std::endl);
274 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
275
276 }
277
278 name = m;
279
280 } else {
281 silent_cerr("SocketStreamDrive(" << uLabel << "):"
282 "missing stream drive name "
283 "at line " << HP.GetLineData()
284 << std::endl);
285 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
286 }
287
288 if (HP.IsKeyWord("create")) {
289 bGotCreate = true;
290 if (!HP.GetYesNo(bCreate)) {
291 silent_cerr("SocketStreamDrive"
292 "(" << uLabel << ", \"" << name << "\"): "
293 "\"create\" must be either \"yes\" or \"no\" "
294 "at line " << HP.GetLineData()
295 << std::endl);
296 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
297 }
298 }
299
300 if (HP.IsKeyWord("local") || HP.IsKeyWord("path")) {
301 const char *m = HP.GetFileName();
302
303 if (m == 0) {
304 silent_cerr("SocketStreamDrive"
305 "(" << uLabel << ", \"" << name << "\"): "
306 "unable to read local path"
307 "at line " << HP.GetLineData()
308 << std::endl);
309 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
310 }
311
312 path = m;
313 }
314
315 if (HP.IsKeyWord("port")) {
316 if (!path.empty()) {
317 silent_cerr("SocketStreamDrive"
318 "(" << uLabel << ", \"" << name << "\"): "
319 "cannot specify port "
320 "for a local socket "
321 "at line " << HP.GetLineData()
322 << std::endl);
323 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
324 }
325
326 int p = HP.GetInt();
327 /* Da sistemare da qui */
328 #ifdef IPPORT_USERRESERVED
329 if (p <= IPPORT_USERRESERVED) {
330 silent_cerr("SocketStreamDrive"
331 "(" << uLabel << ", \"" << name << "\"): "
332 "cannot listen on reserved port "
333 << port << ": less than "
334 "IPPORT_USERRESERVED=" << IPPORT_USERRESERVED
335 << " at line " << HP.GetLineData()
336 << std::endl);
337 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
338 }
339 /* if #undef'd, don't bother checking;
340 * the OS will do it for us */
341 #endif /* IPPORT_USERRESERVED */
342
343 port = p;
344 }
345
346
347 if (HP.IsKeyWord("host")) {
348 if (!path.empty()) {
349 silent_cerr("SocketStreamDrive"
350 "(" << uLabel << ", \"" << name << "\"): "
351 "cannot specify host for a local socket "
352 "at line " << HP.GetLineData()
353 << std::endl);
354 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
355 }
356
357 const char *h;
358
359 h = HP.GetStringWithDelims();
360 if (h == 0) {
361 silent_cerr("SocketStreamDrive"
362 "(" << uLabel << ", \"" << name << "\"): "
363 "unable to read host "
364 "at line " << HP.GetLineData()
365 << std::endl);
366 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
367 }
368
369 host = h;
370
371 } else if (path.empty() && !bCreate) {
372 silent_cerr("SocketStreamDrive"
373 "(" << uLabel << ", \"" << name << "\"): "
374 "host undefined, "
375 "using default \"" << DEFAULT_HOST "\" "
376 "at line " << HP.GetLineData()
377 << std::endl);
378 host = DEFAULT_HOST;
379 }
380
381 int socket_type = SOCK_STREAM;
382 if (HP.IsKeyWord("socket" "type")) {
383 if (HP.IsKeyWord("udp")) {
384 socket_type = SOCK_DGRAM;
385 if (!bGotCreate) {
386 bCreate = true;
387 }
388
389 } else if (!HP.IsKeyWord("tcp")) {
390 silent_cerr("SocketStreamDrive(" << uLabel << ", \"" << name << "\"): "
391 "invalid socket type "
392 "at line " << HP.GetLineData() << std::endl);
393 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
394 }
395 }
396
397 if ((socket_type == SOCK_DGRAM) && !bCreate) {
398 silent_cerr("SocketStreamDrive(" << uLabel << ", \"" << name << "\"): "
399 "socket type=udp incompatible with create=no "
400 "at line " << HP.GetLineData() << std::endl);
401 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
402 }
403
404 // we want to block until the whole chunk is received
405 int flags = 0;
406 #ifdef MSG_WAITALL
407 flags |= MSG_WAITALL;
408 #endif // MSG_WAITALL
409
410 while (HP.IsArg()) {
411 if (HP.IsKeyWord("signal")) {
412 #ifdef MSG_NOSIGNAL
413 flags &= ~MSG_NOSIGNAL;
414 #else // ! MSG_NOSIGNAL
415 silent_cout("SocketStreamDrive"
416 "(" << uLabel << ", \"" << name << "\"): "
417 "MSG_NOSIGNAL not defined (ignored) "
418 "at line " << HP.GetLineData()
419 << std::endl);
420 #endif // ! MSG_NOSIGNAL
421
422 // not honored by recv(2)
423 } else if (HP.IsKeyWord("no" "signal")) {
424 #ifdef MSG_NOSIGNAL
425 flags |= MSG_NOSIGNAL;
426 #else // ! MSG_NOSIGNAL
427 silent_cout("SocketStreamDrive"
428 "(" << uLabel << ", \"" << name << "\"): "
429 "MSG_NOSIGNAL not defined (ignored) "
430 "at line " << HP.GetLineData()
431 << std::endl);
432 #endif // ! MSG_NOSIGNAL
433
434 } else if (HP.IsKeyWord("blocking")) {
435 // not honored by recv(2)?
436 flags |= MSG_WAITALL;
437 flags &= ~MSG_DONTWAIT;
438
439 } else if (HP.IsKeyWord("non" "blocking")) {
440 // not honored by recv(2)?
441 flags &= ~MSG_WAITALL;
442 flags |= MSG_DONTWAIT;
443
444 } else {
445 break;
446 }
447 }
448
449 unsigned int InputEvery = 1;
450 if (HP.IsKeyWord("input" "every")) {
451 int i = HP.GetInt();
452 if (i <= 0) {
453 silent_cerr("SocketStreamDrive"
454 "(" << uLabel << ", \"" << name << "\"): "
455 "invalid \"input every\" value " << i
456 << " at line " << HP.GetLineData()
457 << std::endl);
458 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
459 }
460 InputEvery = (unsigned int)i;
461 }
462
463 bool bReceiveFirst(true);
464 if (HP.IsKeyWord("receive" "first")) {
465 if (!HP.GetYesNo(bReceiveFirst)) {
466 silent_cerr("SocketStreamDrive"
467 "(" << uLabel << ", \"" << name << "\"): "
468 "\"receive first\" must be either \"yes\" or \"no\" "
469 << "at line " << HP.GetLineData()
470 << std::endl);
471 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
472 }
473 }
474
475 struct timeval SocketTimeout = { 0, 0 };
476 if (HP.IsKeyWord("timeout")) {
477 doublereal st = HP.GetReal();
478 if (st < 0) {
479 silent_cerr("SocketStreamDrive"
480 "(" << uLabel << ", \"" << name << "\"): "
481 "invalid socket timeout value " << st
482 << " at line " << HP.GetLineData()
483 << std::endl);
484 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
485 }
486 SocketTimeout.tv_sec = long(st);
487 SocketTimeout.tv_usec = long((st - SocketTimeout.tv_sec)*1000000);
488 }
489
490 pedantic_cout("SocketStreamDrive"
491 "(" << uLabel << ", \"" << name << "\"): "
492 "timeout: " << SocketTimeout.tv_sec << "s "
493 << SocketTimeout.tv_usec << "ns" << std::endl);
494
495 StreamDriveEcho *pSDE = ReadStreamDriveEcho(pDM, HP);
496
497 /* Luca Conti edits - GSOC 2017 */
498
499 std::vector<doublereal> v0;
500 StreamDrive::Modifier *pMod(0);
501 int idrives;
502
503 const char *s = HP.IsWord(fileDriveContentTypeWordSet);
504 if (s != NULL) {
505 FileDriveContentTypeMap::iterator it = fileDriveContentTypeMap.find(std::string(s));
506 pMod = it->second->Read(v0, HP, idrives);
507 } else {
508 idrives = HP.GetInt();
509 if (idrives <= 0) {
510 silent_cerr("SocketStreamDrive"
511 "(" << uLabel << ", \"" << name << "\"): "
512 "illegal number of channels " << idrives
513 << " at line " << HP.GetLineData()
514 << std::endl);
515 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
516 }
517
518 if (HP.IsKeyWord("initial" "values")) {
519 v0.resize(idrives);
520 for (int i = 0; i < idrives; i++) {
521 v0[i] = HP.GetReal();
522 }
523 }
524
525 if (HP.IsKeyWord("modifier")) {
526 pMod = ReadStreamDriveModifier(HP, idrives);
527 }
528 }
529
530 UseSocket *pUS = 0;
531
532
533 // .log file output
534 std::ostream& out = pDM->GetLogFile();
535 out << "filedriver: " << uLabel << " stream";
536
537 if (path.empty()) {
538 if (port == (unsigned short int)(-1)) {
539 port = DEFAULT_PORT;
540 }
541 SAFENEWWITHCONSTRUCTOR(pUS, UseInetSocket, UseInetSocket(host, port, socket_type, bCreate));
542
543 // .log file output
544 out
545 << " INET"
546 << " " << name
547 << " " << host
548 << " " << port;
549
550 } else {
551 SAFENEWWITHCONSTRUCTOR(pUS, UseLocalSocket, UseLocalSocket(path, socket_type, bCreate));
552 out
553 << " UNIX"
554 << " " << name
555 << " " << path;
556 }
557
558 if (socket_type == SOCK_STREAM) {
559 out << " tcp";
560 } else {
561 out << " udp";
562 }
563
564 out << " " << bCreate;
565
566
567 if ((socket_type == SOCK_STREAM) && bCreate) {
568 const_cast<DataManager *>(pDM)->RegisterSocketUser(pUS);
569
570 } else {
571 pUS->Connect();
572 }
573
574 Drive* pDr = 0;
575 SAFENEWWITHCONSTRUCTOR(pDr, SocketStreamDrive,
576 SocketStreamDrive(uLabel,
577 pDM->pGetDrvHdl(), pUS, bCreate,
578 name, idrives, v0, pMod,
579 InputEvery, bReceiveFirst,
580 flags, SocketTimeout,
581 pSDE));
582 #ifdef MSG_NOSIGNAL
583 if (flags & ~MSG_NOSIGNAL) {
584 out << " " << true;
585 } else {
586 #endif // MSG_NOSIGNAL
587 out << " " << false;
588 #ifdef MSG_NOSIGNAL
589 }
590 #endif
591
592 #ifdef MSG_DONTWAIT
593 if (flags & MSG_DONTWAIT) {
594 out << " " << true;
595 } else {
596 #endif // MSG_DONTWAIT
597 out << " " << false;
598 #ifdef MSG_DONTWAIT
599 }
600 #endif
601
602 out
603 << " " << InputEvery
604 << " " << bReceiveFirst
605 << " " << SocketTimeout.tv_sec
606 << " " << idrives;
607
608 for (std::vector<doublereal>::iterator it = v0.begin(); it != v0.end(); ++it)
609 {
610 out << " " << (*it);
611 }
612
613 out << std::endl;
614
615 return pDr;
616 }
617
618 #endif // USE_SOCKET
619
620
621 Drive *
Read(unsigned uLabel,const DataManager * pDM,MBDynParser & HP)622 StreamDR::Read(unsigned uLabel, const DataManager *pDM, MBDynParser& HP)
623 {
624 Drive *pDr = 0;
625
626 if (!s.empty()) {
627 pedantic_cout("\"" << s << "\" is deprecated; "
628 "use \"stream\" instead at line "
629 << HP.GetLineData() << std::endl);
630 }
631
632 #ifdef USE_RTAI
633 if (::rtmbdyn_rtai_task != NULL){
634 silent_cout("starting RTMBDyn drive " << uLabel << std::endl);
635 pDr = ReadRTMBDynInDrive(pDM, HP, uLabel);
636 } else
637 #endif /* USE_RTAI */
638 {
639 #ifdef USE_SOCKET
640 silent_cout("starting stream drive " << uLabel << std::endl);
641 pDr = ReadStreamDrive(pDM, HP, uLabel);
642 #else // ! USE_SOCKET
643 silent_cerr("stream drive " << uLabel
644 << " not allowed at line " << HP.GetLineData()
645 << " because apparently the current architecture "
646 "does not support sockets" << std::endl);
647 throw ErrGeneric(MBDYN_EXCEPT_ARGS);
648 #endif // ! USE_SOCKET
649 }
650
651 return pDr;
652 }
653
654 /* Luca Conti edits - GSOC 2017 */
655
656 FileDriveContentTypeMap fileDriveContentTypeMap;
657 FileDriveContentTypeWordSetType fileDriveContentTypeWordSet;
658
659 /* file drive content type parsing checker: allows the parser
660 to understand if the next keyword is a content type */
IsWord(const std::string & s) const661 bool FileDriveContentTypeWordSetType::IsWord(const std::string& s) const {
662 return fileDriveContentTypeMap.find(std::string(s)) != fileDriveContentTypeMap.end();
663 };
664
665 /* registration function: call it to register a new content type*/
SetFileDriveContentType(const char * name,FileDriveContentTypeReader * rf)666 bool SetFileDriveContentType(const char *name, FileDriveContentTypeReader *rf) {
667 pedantic_cout("registering file drive content type \"" << name << "\""
668 << std::endl );
669 return fileDriveContentTypeMap.insert(FileDriveContentTypeMap::value_type(name, rf)).second;
670 }
671
672 /*deallocation of all content types in fileDriveContentTypeMap, if any was added*/
DestroyFileDriveContentTypes(void)673 void DestroyFileDriveContentTypes(void) {
674 for (FileDriveContentTypeMap::iterator i = fileDriveContentTypeMap.begin(); i != fileDriveContentTypeMap.end(); ++i) {
675 delete i->second;
676 }
677 fileDriveContentTypeMap.clear();
678 }
679