1 /**
2  * nettest.cpp
3  * This file is part of the YATE Project http://YATE.null.ro
4  *
5  * Network and socket performance test module
6  *
7  * Yet Another Telephony Engine - a fully featured software PBX and IVR
8  * Copyright (C) 2004-2014 Null Team
9  * Author: Marian Podgoreanu
10  *
11  * This software is distributed under multiple licenses;
12  * see the COPYING file in the main directory for licensing
13  * information for this specific distribution.
14  *
15  * This use of this software may be subject to additional restrictions.
16  * See the LEGAL file in the main directory for details.
17  *
18  * This program is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
21  */
22 
23 #include <yatephone.h>
24 
25 #include <string.h>
26 #include <stdio.h>
27 
28 #include <sys/time.h>
29 #include <sys/types.h>
30 #include <unistd.h>
31 
32 using namespace TelEngine;
33 namespace { // anonymous
34 
35 class Statistics;                        //
36 class NTTest;                            //
37 class NTWorkerContainer;                 // A set of workers
38 class NTWorker;                          //
39 class NTWriter;                          //
40 class NTReader;                          //
41 class NTSelectReader;                    //
42 class NTPlugin;                          // The module
43 
44 /**
45  * This class holds an fd_set
46  * @short MultiSelect private data
47  */
48 class PrivateFDSet
49 {
50 public:
isset(int handle)51     inline bool isset(int handle)
52 	{ return 0 != FD_ISSET(handle,&set); }
add(int handle)53     inline void add(int handle)
54 	{ FD_SET(handle,&set); }
reset()55     inline void reset()
56 	{ FD_ZERO(&set); }
57     fd_set set;
58 };
59 
60 /**
61  * This class encapsulates a select for a set of file descriptors.
62  * File descriptors can be appended to wait for data to be read or write or
63  *  wait for an exception to occur
64  * @short A multiple file descriptor select
65  */
66 class FDSetSelect
67 {
68 public:
69     /**
70      * Constructor
71      */
72     FDSetSelect();
73 
74     /**
75      * Destructor. Release private data
76      */
77     ~FDSetSelect();
78 
79     /**
80      * Check if data is available for read
81      * This method should be called after @ref select() returns
82      * @param handle File descriptor to check
83      * @return True if there is any data available for the given file descriptor
84      */
85     bool canRead(int handle) const;
86 
87     /**
88      * Check if a file descriptor can be used to write data
89      * This method should be called after @ref select() returns
90      * @param handle File descriptor to check
91      * @return True if data can be written using the given file descriptor
92      */
93     bool canWrite(int handle) const;
94 
95     /**
96      * Check if there is a pending event for a given file descriptor
97      * This method should be called after @ref select() returns
98      * @param handle File descriptor to check
99      * @return True if there is a pending event for the given file descriptor
100      */
101     bool hasEvent(int handle) const;
102 
103     /**
104      * Append a file descriptor to read, write and/or event set.
105      * This method shouldn't be called while in select
106      * @param handle File descriptor to append
107      * @param read True to append to the read set (wait to receive data)
108      * @param write True to append to the write set (check if the handle can be used to write data)
109      * @param event True to append to the event set (check exceptions)
110      * @return False if handle is invalid or target set is missing (all flags are false)
111      */
112     bool add(int handle, bool read, bool write, bool event);
113 
114     /**
115      * Reset all file descriptor sets.
116      * This method shouldn't be called while in select
117      */
118     void reset();
119 
120     /**
121      * Start waiting for a file descriptor state change
122      * @param uSec The select timeout in microseconds (can be 0 to wait until a file descriptor get set)
123      * @return The number of file descriptors whose state changed or negative on error
124      */
125     int select(unsigned int uSec);
126 
127 private:
128     PrivateFDSet* m_read;                // Read set
129     PrivateFDSet* m_write;               // Write set
130     PrivateFDSet* m_event;               // Event set
131     PrivateFDSet* m_crtR;                // Current event set for read/write/events
132     PrivateFDSet* m_crtW;                //
133     PrivateFDSet* m_crtE;                //
134     int m_maxHandle;                     // Maximum handle value in current set(s)
135     bool m_selectError;                  // Flag used to output errors
136 };
137 
138 // Statistics class
139 class Statistics
140 {
141 public:
Statistics()142     inline Statistics()
143 	{ reset(); }
reset()144     inline void reset() {
145 	    msStart = Time::msecNow();
146 	    msStop = 0;
147 	    packets = totalBytes = errors = lostBytes = 0;
148 	    stopped = 0;
149 	}
success(unsigned int bytes)150     inline void success(unsigned int bytes) {
151 	    packets++;
152 	    totalBytes += bytes;
153 	}
failure(unsigned int bytes)154     inline void failure(unsigned int bytes) {
155 	    packets++;
156 	    errors++;
157 	    lostBytes += bytes;
158 	}
operator +=(const Statistics & src)159     inline Statistics& operator+=(const Statistics& src) {
160 	    packets += src.packets;
161 	    totalBytes += src.totalBytes;
162 	    errors += src.errors;
163 	    lostBytes += src.lostBytes;
164 	    stopped += src.stopped;
165 	    return *this;
166 	}
167 
168     void output(String& dest);
169 
170     u_int64_t msStart;
171     u_int64_t msStop;
172     u_int64_t packets;
173     u_int64_t totalBytes;
174     u_int64_t errors;
175     u_int64_t lostBytes;
176     unsigned int stopped;
177 };
178 
179 class NTTest : public Mutex, public GenObject, public DebugEnabler
180 {
181 public:
182     NTTest(const char* name);
~NTTest()183     virtual ~NTTest()
184 	{ stop(); }
destruct()185     virtual void destruct()
186 	{ stop(); GenObject::destruct(); }
send() const187     inline bool send() const
188 	{ return m_send; }
localip() const189     inline const String& localip() const
190 	{ return m_localip; }
remoteip() const191     inline const String& remoteip() const
192 	{ return m_remoteip; }
packetLen() const193     inline unsigned int packetLen() const
194 	{ return m_packetLen; }
interval() const195     inline unsigned int interval() const
196 	{ return m_interval; }
lifetime() const197     inline unsigned int lifetime() const
198 	{ return m_lifetime; }
packetCount() const199     inline unsigned int packetCount() const
200 	{ return m_packetCount; }
selectTimeout() const201     inline int selectTimeout() const
202 	{ return m_selectTimeout; }
203     bool init(NamedList& params);
204     void start();
205     void stop();
206     void addWorker();
207     void removeWorker(NTWorker* worker);
208 private:
209     Mutex m_mutex;
210     String m_id;
211     String m_localip;
212     String m_remoteip;
213     int m_port;
214     unsigned int m_threads;
215     bool m_send;
216     unsigned int m_packetLen;
217     unsigned int m_interval;
218     unsigned int m_lifetime;
219     unsigned int m_packetCount;
220     ObjList m_containers;
221     unsigned int m_workerCount;
222     int m_selectTimeout;
223     Statistics m_localStats;
224 };
225 
226 class NTWorkerContainer : public Mutex, public GenObject, public DebugEnabler
227 {
228     friend class SelectThread;
229 public:
230     NTWorkerContainer(NTTest* test, unsigned int threads, const char* id);
test()231     inline NTTest* test()
232 	{ return m_test; }
233     void start(int& port);
234     void stop();
235     void addWorker(NTWorker* worker);
236     void removeWorker(NTWorker* worker);
237 private:
238     String m_id;
239     NTTest* m_test;
240     unsigned int m_workerCount;
241     unsigned int m_threads;
242     ObjList m_workers;
243 };
244 
245 class NTWorker : public Thread, public GenObject
246 {
247 public:
248     NTWorker(NTWorkerContainer* container, int port, const char* name = "NTWorker");
249     ~NTWorker();
counters() const250     const Statistics& counters() const
251 	{ return m_counters; }
252 protected:
253     bool initSocket(Socket* sock = 0, SocketAddr* addr = 0);
254 protected:
255     NTWorkerContainer* m_container;
256     NTTest* m_test;
257     u_int64_t m_timeToDie;
258     Socket m_socket;
259     SocketAddr m_addr;
260     Statistics m_counters;
261 };
262 
263 class NTWriter : public NTWorker
264 {
265 public:
NTWriter(NTWorkerContainer * container,int port)266     inline NTWriter(NTWorkerContainer* container, int port)
267 	: NTWorker(container,port),
268 	m_timeToSend(0)
269 	{}
270     virtual void run();
271 private:
272     u_int64_t m_timeToSend;
273 };
274 
275 class NTReader : public NTWorker
276 {
277 public:
NTReader(NTWorkerContainer * container,int port)278     inline NTReader(NTWorkerContainer* container, int port)
279 	: NTWorker(container,port)
280 	{}
281     virtual void run();
282 };
283 
284 class NTSelectReader : public NTWorker
285 {
286 public:
287     NTSelectReader(NTWorkerContainer* container, int& port, unsigned int count);
288     virtual ~NTSelectReader();
289     virtual void run();
290 private:
291     Socket* m_sockets;
292     unsigned int m_count;
293 };
294 
295 class NTPlugin : public Module
296 {
297 public:
298     NTPlugin();
299     virtual ~NTPlugin();
300     virtual void initialize();
301     virtual bool received(Message& msg, int id);
302 private:
303     bool m_first;
304 };
305 
306 // Static data
307 static DataBlock s_stopPattern;
308 static NTTest* s_test = 0;
309 // Config
310 static String s_localip;
311 static unsigned int s_packetLen = 320;
312 static unsigned int s_interval = 20;
313 static unsigned int s_lifetime = 60;
314 static unsigned long s_sleep = 2;
315 // Plugin
316 static NTPlugin plugin;
317 
318 
319 /**
320  * FDSetSelect
321  */
322 // Constructor
FDSetSelect()323 FDSetSelect::FDSetSelect()
324     : m_read(new PrivateFDSet),
325     m_write(new PrivateFDSet),
326     m_event(new PrivateFDSet),
327     m_crtR(0),
328     m_crtW(0),
329     m_crtE(0),
330     m_maxHandle(Socket::invalidHandle()),
331     m_selectError(false)
332 {
333 }
334 
335 // Release private data
~FDSetSelect()336 FDSetSelect::~FDSetSelect()
337 {
338     delete m_read;
339     delete m_write;
340     delete m_event;
341 }
342 
343 // Check if data is available for read
canRead(int handle) const344 bool FDSetSelect::canRead(int handle) const
345 {
346     return m_read->isset(handle);
347 }
348 
349 // Check if a file descriptor can be used to write data
canWrite(int handle) const350 bool FDSetSelect::canWrite(int handle) const
351 {
352     return m_write->isset(handle);
353 }
354 
355 
356 // Check if there is a pending event for a given file descriptor
hasEvent(int handle) const357 bool FDSetSelect::hasEvent(int handle) const
358 {
359     return m_event->isset(handle);
360 }
361 
362 // Append a file descriptor to read, write and/or event set.
363 // Return false if handle is invalid or target set is missing (all flags are false)
add(int handle,bool read,bool write,bool event)364 bool FDSetSelect::add(int handle, bool read, bool write, bool event)
365 {
366     if (!(read || write || event) || handle == Socket::invalidHandle() ||
367 	!Socket::canSelect(handle))
368 	return false;
369     if (read) {
370 	m_read->add(handle);
371 	m_crtR = m_read;
372     }
373     if (write) {
374 	m_write->add(handle);
375 	m_crtW = m_write;
376     }
377     if (event) {
378 	m_event->add(handle);
379 	m_crtE = m_event;
380     }
381     if (m_maxHandle < handle)
382 	m_maxHandle = handle;
383     return true;
384 }
385 
386 // Reset all file descriptor sets
reset()387 void FDSetSelect::reset()
388 {
389     m_read->reset();
390     m_write->reset();
391     m_event->reset();
392     m_crtR = m_crtW = m_crtE = 0;
393     m_maxHandle = Socket::invalidHandle();
394 }
395 
396 // Start waiting for a file descriptor state change
select(unsigned int uSec)397 int FDSetSelect::select(unsigned int uSec)
398 {
399     if (m_maxHandle == Socket::invalidHandle())
400 	return 0;
401     struct timeval t;
402     t.tv_sec = 0;
403     t.tv_usec = (uSec > 0) ? uSec : 0;
404     m_selectError = false;
405     int result = ::select(m_maxHandle+1,&m_crtR->set,&m_crtW->set,&m_crtE->set,&t);
406     if (result >= 0) {
407 	XDebug(DebugAll,"FDSetSelect got %d handlers [%p]",result,this);
408 	return result;
409     }
410     bool canRetry = (errno == EAGAIN || errno == EINTR || errno == EBADF);
411     if (!(canRetry || m_selectError)) {
412 	Debug(DebugWarn,"FDSetSelect failed: %d: %s [%p]",errno,::strerror(errno),this);
413 	m_selectError = true;
414     }
415     return -1;
416 }
417 
418 
419 /**
420  * Statistics
421  */
output(String & dest)422 void Statistics::output(String& dest)
423 {
424     dest << "=================================================================";
425 #define stat_set64(text,val) \
426     ::sprintf(buf,FMT64,val); \
427     dest << "\r\n" << text << buf;
428     char buf[128];
429     stat_set64("Packets:           ",packets);
430     stat_set64("Total (bytes):     ",totalBytes);
431     stat_set64("Errors:            ",errors);
432     stat_set64("Lost (bytes):      ",lostBytes);
433     dest <<"\r\nStopped:           " << stopped;
434     u_int64_t stop = msStop ? msStop : Time::msecNow();
435     u_int64_t lenMsec = stop - msStart;
436     u_int64_t lenSec = lenMsec / 1000;
437     if (!lenSec)
438 	lenSec = 1;
439     stat_set64("Test length (ms):  ",lenMsec);
440     stat_set64("Ratio (Mb/s):      ",totalBytes / lenSec * 8 / 1000000);
441     stat_set64("Ratio (packets/s): ",packets/lenSec);
442 #undef stat_set64
443     dest << "\r\n=================================================================";
444 }
445 
446 
447 /**
448  * NTTest
449  */
NTTest(const char * name)450 NTTest::NTTest(const char* name)
451     : Mutex(true),
452     m_mutex(true),
453     m_port(0),
454     m_threads(0),
455     m_send(true),
456     m_packetLen(0),
457     m_interval(0),
458     m_lifetime(0),
459     m_packetCount(0),
460     m_workerCount(0),
461     m_selectTimeout(-1)
462 {
463     debugChain(&plugin);
464     m_id << plugin.debugName() << "/" << name;
465     debugName(m_id);
466 }
467 
init(NamedList & params)468 bool NTTest::init(NamedList& params)
469 {
470     Lock2 lock(*this,m_mutex);
471     stop();
472 
473     m_localip = params.getValue("localip",s_localip);
474     if (m_localip.null()) {
475 	Debug(this,DebugNote,"Empty localip in section '%s'",debugName());
476 	return false;
477     }
478     m_remoteip = params.getValue("remoteip");
479     if (m_remoteip.null()) {
480 	Debug(this,DebugNote,"Empty remoteip in section '%s'",debugName());
481 	return false;
482     }
483 
484     String tmp = params.getValue("port");
485     m_port = tmp.toInteger(0);
486     if (!m_port) {
487 	Debug(this,DebugNote,"Invalid port=%s in section '%s'",
488 	    tmp.c_str(),debugName());
489 	return false;
490     }
491 
492     m_threads = params.getIntValue("threads",1);
493     if (m_threads < 1)
494 	m_threads = 1;
495 
496     m_send = params.getBoolValue("send",true);
497 
498     m_packetLen = params.getIntValue("packetlen",s_packetLen);
499     if (m_packetLen < 16)
500 	m_packetLen = 16;
501     else if (m_packetLen > 1400)
502 	m_packetLen = 1400;
503     m_interval = params.getIntValue("interval",s_interval);
504     if (m_interval < 1)
505 	m_interval = 1;
506     else if (m_interval > 120)
507 	m_interval = 120;
508     m_lifetime = params.getIntValue("lifetime",s_lifetime);
509     bool sendAllPackets = params.getBoolValue("sendallpackets",true);
510     if (sendAllPackets)
511 	m_packetCount = m_lifetime * 1000 / m_interval;
512     else
513 	m_packetCount = 0;
514     m_selectTimeout = params.getIntValue("select-timeout",-1);
515 
516     m_containers.clear();
517     int workersets = params.getIntValue("workersets",1);
518     if (workersets < 1 || (unsigned int)workersets > m_threads)
519 	workersets = 1;
520     unsigned int nFull = 0;
521     unsigned int nRest = 0;
522     if (workersets == 1)
523 	nRest = m_threads;
524     else {
525 	nFull = m_threads / (workersets - 1);
526 	nRest = m_threads - nFull * (workersets - 1);
527     }
528     for (int i = 1; i <= workersets; i++) {
529 	String id;
530 	id << m_id << "/" << i;
531 	if (i < workersets)
532 	    m_containers.append(new NTWorkerContainer(this,nFull,id));
533 	else
534 	    m_containers.append(new NTWorkerContainer(this,nRest,id));
535     }
536 
537     unsigned int sock = m_threads;
538     if (m_selectTimeout >= 0)
539 	m_threads = workersets;
540 
541     tmp = "";
542     tmp << "\r\nAction:         " << (m_send ? "send" : "recv");
543     if (m_selectTimeout >= 0)
544 	tmp << "\r\nSockets:        " << sock;
545     else
546 	tmp << "\r\nThreads:        " << m_threads;
547     tmp << "\r\nLocal address:  " << m_localip;
548     tmp << "\r\nRemote address: " << m_remoteip;
549     tmp << "\r\nPort:           " << m_port;
550     tmp << "\r\nPacket length:  " << m_packetLen;
551     tmp << "\r\nPackets:        " << m_packetCount;
552     tmp << "\r\nInterval:       " << m_interval << "ms";
553     tmp << "\r\nLifetime:       " << m_lifetime << "s";
554     tmp << "\r\nWorker sets:    " << workersets;
555     tmp << "\r\nSelect timeout: " << m_selectTimeout << (m_selectTimeout < 0 ? " (not used)" : "us");
556     Debug(this,DebugInfo,"Initialized:%s",tmp.c_str());
557     return true;
558 }
559 
start()560 void NTTest::start()
561 {
562     Lock lock(m_mutex);
563     stop();
564     DDebug(this,DebugAll,"Starting");
565     m_localStats.reset();
566     int port = m_port;
567     for (ObjList* o = m_containers.skipNull(); o; o = o->skipNext())
568 	(static_cast<NTWorkerContainer*>(o->get()))->start(port);
569 }
570 
stop()571 void NTTest::stop()
572 {
573     Lock lock(m_mutex);
574     DDebug(this,DebugAll,"Stopping %u workers",m_workerCount);
575     for (ObjList* o = m_containers.skipNull(); o; o = o->skipNext())
576 	(static_cast<NTWorkerContainer*>(o->get()))->stop();
577 }
578 
addWorker()579 void NTTest::addWorker()
580 {
581     Lock lock(this);
582     m_workerCount++;
583     if (m_workerCount == m_threads)
584 	Debug(this,DebugAll,"Created %u workers",m_workerCount);
585 }
586 
removeWorker(NTWorker * worker)587 void NTTest::removeWorker(NTWorker* worker)
588 {
589     Lock lock(this);
590     if (!(worker && m_workerCount))
591 	return;
592     m_localStats += worker->counters();
593     m_workerCount--;
594     if (m_workerCount)
595 	return;
596     lock.drop();
597     m_localStats.msStop = Time::msecNow();
598     String tmp;
599     m_localStats.output(tmp);
600     Debug(this,DebugInfo,"No more workers. Local statistics:\r\n%s",tmp.c_str());
601 }
602 
603 
604 /**
605  * NTWorkerContainer
606  */
NTWorkerContainer(NTTest * test,unsigned int threads,const char * id)607 NTWorkerContainer::NTWorkerContainer(NTTest* test, unsigned int threads, const char* id)
608     : Mutex(true),
609     m_id(id),
610     m_test(test),
611     m_workerCount(0),
612     m_threads(threads)
613 {
614     debugName(m_id);
615     debugChain(m_test);
616 }
617 
start(int & port)618 void NTWorkerContainer::start(int& port)
619 {
620     Lock lock(this);
621     stop();
622     if (!m_test)
623 	return;
624     lock.drop();
625     DDebug(this,DebugAll,"Starting");
626     if (m_test->selectTimeout() >= 0)
627 	(new NTSelectReader(this,port,m_threads))->startup();
628     else
629 	for (unsigned int i = 0; i < m_threads; i++, port++)
630 	    if (m_test->send())
631 		(new NTWriter(this,port))->startup();
632 	    else
633 		(new NTReader(this,port))->startup();
634 }
635 
stop()636 void NTWorkerContainer::stop()
637 {
638     Lock l(this);
639     DDebug(this,DebugAll,"Stopping %u workers",m_workerCount);
640     if (!m_workerCount)
641 	return;
642     if (m_workerCount) {
643 	ListIterator iterw(m_workers);
644 	for (GenObject* o = 0; 0 != (o = iterw.get());)
645 	    (static_cast<NTWorker*>(o))->cancel(false);
646     }
647     l.drop();
648     while (m_workerCount)
649 	Thread::yield();
650     DDebug(this,DebugAll,"Stopped");
651 }
652 
addWorker(NTWorker * worker)653 void NTWorkerContainer::addWorker(NTWorker* worker)
654 {
655     Lock lock(this);
656     if (!worker)
657 	return;
658     ObjList* obj = m_workers.append(worker);
659     if (!obj)
660 	return;
661     m_workerCount++;
662     obj->setDelete(false);
663     if (m_workerCount >= m_threads)
664 	DDebug(this,DebugAll,"Created %u workers",m_workerCount);
665     lock.drop();
666     if (m_test)
667 	m_test->addWorker();
668 }
669 
removeWorker(NTWorker * worker)670 void NTWorkerContainer::removeWorker(NTWorker* worker)
671 {
672     Lock lock(this);
673     if (!(worker && m_workerCount))
674 	return;
675     m_workers.remove(worker,false);
676     if (m_workerCount)
677 	m_workerCount--;
678     if (!m_workerCount)
679 	DDebug(this,DebugAll,"No more workers");
680     lock.drop();
681     if (m_test)
682 	m_test->removeWorker(worker);
683 }
684 
685 
686 /**
687  * NTWorker
688  */
NTWorker(NTWorkerContainer * container,int port,const char * name)689 NTWorker::NTWorker(NTWorkerContainer* container, int port, const char* name)
690     : Thread(name),
691     m_container(container),
692     m_test(container ? container->test() : 0),
693     m_timeToDie(0),
694     m_addr(AF_INET)
695 {
696     if (!(m_container && m_test))
697 	return;
698     container->addWorker(this);
699     m_addr.host(m_test->send() ? m_test->remoteip() : m_test->localip());
700     m_addr.port(port);
701     if (!m_test->packetCount() && m_test->lifetime())
702 	m_timeToDie = Time::msecNow() + m_test->lifetime() * 1000;
703 }
704 
~NTWorker()705 NTWorker::~NTWorker()
706 {
707     if (m_socket.valid()) {
708 	m_socket.setLinger(-1);
709 	m_socket.terminate();
710     }
711     if (m_container)
712 	m_container->removeWorker(this);
713 }
714 
initSocket(Socket * sock,SocketAddr * addr)715 bool NTWorker::initSocket(Socket* sock, SocketAddr* addr)
716 {
717     if (!(m_container && m_test))
718 	return false;
719     if (!sock) {
720 	sock = &m_socket;
721 	addr = &m_addr;
722     }
723     if (!sock->create(addr->family(),SOCK_DGRAM)) {
724 	Debug(m_container,DebugNote,"Failed to create socket: %d '%s' [%p]",
725 	    sock->error(),::strerror(sock->error()),this);
726 	return false;
727     }
728     if (!m_test->send() && !sock->bind(*addr)) {
729 	Debug(m_container,DebugNote,"Failed to bind socket on port %d: %d '%s' [%p]",
730 	    addr->port(),sock->error(),::strerror(sock->error()),this);
731 	return false;
732     }
733     sock->setBlocking(false);
734     return true;
735 }
736 
737 
738 /**
739  * NTWriter
740  */
run()741 void NTWriter::run()
742 {
743     if (!initSocket())
744 	return;
745     unsigned char buf[m_test->packetLen()];
746     buf[0] = 1;
747     while (true) {
748 	u_int64_t now = Time::msecNow();
749 	if (now < m_timeToSend) {
750 	    Thread::msleep(s_sleep,true);
751 	    continue;
752 	}
753 
754 	bool die = false;
755 	if (m_test->packetCount())
756 	    die = m_counters.packets >= m_test->packetCount();
757 	else
758 	    die = m_timeToDie && now > m_timeToDie;
759 	if (die) {
760 	    Thread::msleep(5,true);
761 	    if (0 < m_socket.sendTo(s_stopPattern.data(),s_stopPattern.length(),m_addr))
762 		m_counters.stopped = 1;
763 	    break;
764 	}
765 
766 	Thread::check(true);
767 	m_timeToSend = now + m_test->interval();
768 	int w = m_socket.sendTo(buf,m_test->packetLen(),m_addr);
769 	if (w != m_socket.socketError() || m_socket.canRetry()) {
770 	    if (w == m_socket.socketError())
771 		continue;
772 	    if (w)
773 		m_counters.success(w);
774 	    if ((unsigned int)w < m_test->packetLen())
775 		m_counters.failure(m_test->packetLen() - w);
776 	    continue;
777 	}
778 	Debug(m_container,DebugNote,"SEND error dest='%s:%d': %d '%s' [%p]",
779 	    m_addr.host().c_str(),m_addr.port(),
780 	    m_socket.error(),::strerror(m_socket.error()),this);
781 	m_counters.failure(m_test->packetLen());
782     }
783 }
784 
785 
786 /**
787  * NTReader
788  */
run()789 void NTReader::run()
790 {
791     if (!initSocket())
792 	return;
793     unsigned char buf[m_test->packetLen()];
794     SocketAddr addr;
795     while (true) {
796 	if (m_timeToDie && (Time::msecNow() > m_timeToDie))
797 	    break;
798 	Thread::msleep(s_sleep,true);
799 	int r = m_socket.recvFrom(buf,m_test->packetLen(),addr);
800 	if (r > 0) {
801 	    if (buf[0] == 0) {
802 		m_counters.stopped = 1;
803 		break;
804 	    }
805 	    m_counters.success(r);
806 	    continue;
807 	}
808 	if (r == 0 || (r == m_socket.socketError() && m_socket.canRetry()))
809 	    continue;
810 	Debug(m_container,DebugNote,"RECV error src='%s:%d': %d '%s' [%p]",
811 	    addr.host().c_str(),addr.port(),
812 	    m_socket.error(),::strerror(m_socket.error()),this);
813 	m_counters.failure(0);
814     }
815 }
816 
817 
818 /**
819  * NTSelectReader
820  */
NTSelectReader(NTWorkerContainer * container,int & port,unsigned int count)821 NTSelectReader::NTSelectReader(NTWorkerContainer* container, int& port, unsigned int count)
822     : NTWorker(container,0,"NTSelectReader"),
823     m_sockets(0),
824     m_count(count)
825 {
826     DDebug(container,DebugAll,"NTSelectReader sockets=%u",count);
827     m_sockets = new Socket[m_count];
828     unsigned int ok = 0;
829     for (unsigned int i = 0; i < count; i++, port++) {
830 	SocketAddr addr(AF_INET);
831 	addr.host(m_test->localip());
832 	addr.port(port);
833 	if (initSocket(&m_sockets[i],&addr))
834 	    ok++;
835     }
836     if (!ok) {
837 	Debug(container,DebugNote,"NTSelectReader: Bind or create failed for all sockets");
838 	delete[] m_sockets;
839 	m_sockets = 0;
840 	m_count = 0;
841     }
842 }
843 
~NTSelectReader()844 NTSelectReader::~NTSelectReader()
845 {
846     if (!m_sockets)
847 	return;
848     for (unsigned int i = 0; i < m_count; i++)
849 	if (m_sockets[i].valid()) {
850 	    m_sockets[i].setLinger(-1);
851 	    m_sockets[i].terminate();
852 	}
853     delete[] m_sockets;
854 }
855 
run()856 void NTSelectReader::run()
857 {
858     if (!m_count || !m_test || m_test->send())
859 	return;
860     DDebug(m_container,DebugAll,"Select reader worker started");
861     unsigned char buf[m_test->packetLen()];
862     SocketAddr addr;
863     int ok = 0;
864     FDSetSelect set;
865     while (true) {
866 	if (m_counters.stopped == m_count)
867 	    break;
868 	if (m_timeToDie && (Time::msecNow() > m_timeToDie))
869 	    break;
870 	set.reset();
871 	for (unsigned int i = 0; i < m_count; i++)
872 	    set.add(m_sockets[i].handle(),true,false,false);
873 	ok = set.select(m_container->test()->selectTimeout());
874 	if (ok <= 0) {
875 	    if (!m_test->selectTimeout())
876 		Thread::msleep(1,true);
877 	    continue;
878 	}
879 	for (unsigned int i = 0; i < m_count; i++) {
880 	    if (!(m_sockets[i].valid() && set.canRead(m_sockets[i].handle())))
881 		continue;
882 	    int r = m_sockets[i].recvFrom(buf,m_test->packetLen(),addr);
883 	    if (r > 0) {
884 		if (buf[0]) {
885 		    if ((unsigned int)r != m_test->packetLen())
886 			Debug(m_container,DebugMild,"RECV %u expected=%u [%p]",
887 			    r,m_test->packetLen(),this);
888 		    m_counters.success(r);
889 		}
890 		else {
891 		    m_sockets[i].setLinger(-1);
892 		    m_sockets[i].terminate();
893 		    m_counters.stopped++;
894 		}
895 		continue;
896 	    }
897 	    m_counters.failure(0);
898 	    if (r == 0 || (r == m_sockets[i].socketError() && m_sockets[i].canRetry()))
899 		continue;
900 	    Debug(m_container,DebugNote,"RECV error src='%s:%d': %d '%s' [%p]",
901 		addr.host().c_str(),addr.port(),
902 		m_sockets[i].error(),::strerror(m_sockets[i].error()),this);
903 	}
904     }
905 }
906 
907 
908 /**
909  * Plugin
910  */
NTPlugin()911 NTPlugin::NTPlugin()
912     : Module("nettest","misc"), m_first(true)
913 {
914     Output("Loaded module Network Test");
915 }
916 
~NTPlugin()917 NTPlugin::~NTPlugin()
918 {
919     Output("Unloading module Network Test");
920 }
921 
initialize()922 void NTPlugin::initialize()
923 {
924     Output("Initializing module Network Test");
925 
926     debugLevel(10);
927 
928     if (m_first) {
929 	m_first = false;
930 	setup();
931 	installRelay(Halt);
932     }
933 
934     // Reset statistics
935     lock();
936     TelEngine::destruct(s_test);
937 
938     // Get new values from config
939     Configuration cfg(Engine::configFile("nettest"));
940     NamedList* general = cfg.getSection("general");
941     NamedList dummy("");
942     if (!general)
943 	general = &dummy;
944     s_localip = general->getValue("localip");
945     s_packetLen = general->getIntValue("packetlen",320);
946     if (s_packetLen < 16)
947 	s_packetLen = 16;
948     else if (s_packetLen > 1400)
949 	s_packetLen = 1400;
950     s_interval = general->getIntValue("interval",20);
951     if (s_interval < 1)
952 	s_interval = 1;
953     else if (s_interval > 120)
954 	s_interval = 120;
955     s_lifetime = general->getIntValue("lifetime",60);
956     s_sleep = cfg.getIntValue("general","sleep",2);
957     if (s_sleep < 1)
958 	s_sleep = 1;
959     else if (s_sleep > 10)
960 	s_sleep = 10;
961     s_stopPattern.assign(0,s_packetLen);
962 
963     Debug(this,DebugInfo,
964 	"Init: localip=%s packet=%u interval=%ums lifetime=%us",
965 	s_localip.c_str(),s_packetLen,s_interval,s_lifetime);
966 
967     unsigned int n = cfg.sections();
968     for (unsigned int i = 0; i < n; i++) {
969 	NamedList* sect = cfg.getSection(i);
970 	if (!sect || sect->null() || *sect == "general")
971 	    continue;
972 
973 	s_test = new NTTest(*sect);
974 	if (!s_test->init(*sect)) {
975 	    Debug(this,DebugNote,"Failed to init test from section '%s'",sect->c_str());
976 	    TelEngine::destruct(s_test);
977 	    continue;
978 	}
979 
980 	s_test->start();
981 	break;
982     }
983 
984     unlock();
985 }
986 
received(Message & msg,int id)987 bool NTPlugin::received(Message& msg, int id)
988 {
989     if (id == Halt)
990 	TelEngine::destruct(s_test);
991     return Module::received(msg,id);
992 }
993 
994 }; // anonymous namespace
995 
996 /* vi: set ts=8 sw=4 sts=4 noet: */
997