1 /**
2 * nettest.cpp
3 * This file is part of the YATE Project http://YATE.null.ro
4 *
5 * Network and socket performance test module
6 *
7 * Yet Another Telephony Engine - a fully featured software PBX and IVR
8 * Copyright (C) 2004-2014 Null Team
9 * Author: Marian Podgoreanu
10 *
11 * This software is distributed under multiple licenses;
12 * see the COPYING file in the main directory for licensing
13 * information for this specific distribution.
14 *
15 * This use of this software may be subject to additional restrictions.
16 * See the LEGAL file in the main directory for details.
17 *
18 * This program is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
21 */
22
23 #include <yatephone.h>
24
25 #include <string.h>
26 #include <stdio.h>
27
28 #include <sys/time.h>
29 #include <sys/types.h>
30 #include <unistd.h>
31
32 using namespace TelEngine;
33 namespace { // anonymous
34
35 class Statistics; //
36 class NTTest; //
37 class NTWorkerContainer; // A set of workers
38 class NTWorker; //
39 class NTWriter; //
40 class NTReader; //
41 class NTSelectReader; //
42 class NTPlugin; // The module
43
44 /**
45 * This class holds an fd_set
46 * @short MultiSelect private data
47 */
48 class PrivateFDSet
49 {
50 public:
isset(int handle)51 inline bool isset(int handle)
52 { return 0 != FD_ISSET(handle,&set); }
add(int handle)53 inline void add(int handle)
54 { FD_SET(handle,&set); }
reset()55 inline void reset()
56 { FD_ZERO(&set); }
57 fd_set set;
58 };
59
60 /**
61 * This class encapsulates a select for a set of file descriptors.
62 * File descriptors can be appended to wait for data to be read or write or
63 * wait for an exception to occur
64 * @short A multiple file descriptor select
65 */
66 class FDSetSelect
67 {
68 public:
69 /**
70 * Constructor
71 */
72 FDSetSelect();
73
74 /**
75 * Destructor. Release private data
76 */
77 ~FDSetSelect();
78
79 /**
80 * Check if data is available for read
81 * This method should be called after @ref select() returns
82 * @param handle File descriptor to check
83 * @return True if there is any data available for the given file descriptor
84 */
85 bool canRead(int handle) const;
86
87 /**
88 * Check if a file descriptor can be used to write data
89 * This method should be called after @ref select() returns
90 * @param handle File descriptor to check
91 * @return True if data can be written using the given file descriptor
92 */
93 bool canWrite(int handle) const;
94
95 /**
96 * Check if there is a pending event for a given file descriptor
97 * This method should be called after @ref select() returns
98 * @param handle File descriptor to check
99 * @return True if there is a pending event for the given file descriptor
100 */
101 bool hasEvent(int handle) const;
102
103 /**
104 * Append a file descriptor to read, write and/or event set.
105 * This method shouldn't be called while in select
106 * @param handle File descriptor to append
107 * @param read True to append to the read set (wait to receive data)
108 * @param write True to append to the write set (check if the handle can be used to write data)
109 * @param event True to append to the event set (check exceptions)
110 * @return False if handle is invalid or target set is missing (all flags are false)
111 */
112 bool add(int handle, bool read, bool write, bool event);
113
114 /**
115 * Reset all file descriptor sets.
116 * This method shouldn't be called while in select
117 */
118 void reset();
119
120 /**
121 * Start waiting for a file descriptor state change
122 * @param uSec The select timeout in microseconds (can be 0 to wait until a file descriptor get set)
123 * @return The number of file descriptors whose state changed or negative on error
124 */
125 int select(unsigned int uSec);
126
127 private:
128 PrivateFDSet* m_read; // Read set
129 PrivateFDSet* m_write; // Write set
130 PrivateFDSet* m_event; // Event set
131 PrivateFDSet* m_crtR; // Current event set for read/write/events
132 PrivateFDSet* m_crtW; //
133 PrivateFDSet* m_crtE; //
134 int m_maxHandle; // Maximum handle value in current set(s)
135 bool m_selectError; // Flag used to output errors
136 };
137
138 // Statistics class
139 class Statistics
140 {
141 public:
Statistics()142 inline Statistics()
143 { reset(); }
reset()144 inline void reset() {
145 msStart = Time::msecNow();
146 msStop = 0;
147 packets = totalBytes = errors = lostBytes = 0;
148 stopped = 0;
149 }
success(unsigned int bytes)150 inline void success(unsigned int bytes) {
151 packets++;
152 totalBytes += bytes;
153 }
failure(unsigned int bytes)154 inline void failure(unsigned int bytes) {
155 packets++;
156 errors++;
157 lostBytes += bytes;
158 }
operator +=(const Statistics & src)159 inline Statistics& operator+=(const Statistics& src) {
160 packets += src.packets;
161 totalBytes += src.totalBytes;
162 errors += src.errors;
163 lostBytes += src.lostBytes;
164 stopped += src.stopped;
165 return *this;
166 }
167
168 void output(String& dest);
169
170 u_int64_t msStart;
171 u_int64_t msStop;
172 u_int64_t packets;
173 u_int64_t totalBytes;
174 u_int64_t errors;
175 u_int64_t lostBytes;
176 unsigned int stopped;
177 };
178
179 class NTTest : public Mutex, public GenObject, public DebugEnabler
180 {
181 public:
182 NTTest(const char* name);
~NTTest()183 virtual ~NTTest()
184 { stop(); }
destruct()185 virtual void destruct()
186 { stop(); GenObject::destruct(); }
send() const187 inline bool send() const
188 { return m_send; }
localip() const189 inline const String& localip() const
190 { return m_localip; }
remoteip() const191 inline const String& remoteip() const
192 { return m_remoteip; }
packetLen() const193 inline unsigned int packetLen() const
194 { return m_packetLen; }
interval() const195 inline unsigned int interval() const
196 { return m_interval; }
lifetime() const197 inline unsigned int lifetime() const
198 { return m_lifetime; }
packetCount() const199 inline unsigned int packetCount() const
200 { return m_packetCount; }
selectTimeout() const201 inline int selectTimeout() const
202 { return m_selectTimeout; }
203 bool init(NamedList& params);
204 void start();
205 void stop();
206 void addWorker();
207 void removeWorker(NTWorker* worker);
208 private:
209 Mutex m_mutex;
210 String m_id;
211 String m_localip;
212 String m_remoteip;
213 int m_port;
214 unsigned int m_threads;
215 bool m_send;
216 unsigned int m_packetLen;
217 unsigned int m_interval;
218 unsigned int m_lifetime;
219 unsigned int m_packetCount;
220 ObjList m_containers;
221 unsigned int m_workerCount;
222 int m_selectTimeout;
223 Statistics m_localStats;
224 };
225
226 class NTWorkerContainer : public Mutex, public GenObject, public DebugEnabler
227 {
228 friend class SelectThread;
229 public:
230 NTWorkerContainer(NTTest* test, unsigned int threads, const char* id);
test()231 inline NTTest* test()
232 { return m_test; }
233 void start(int& port);
234 void stop();
235 void addWorker(NTWorker* worker);
236 void removeWorker(NTWorker* worker);
237 private:
238 String m_id;
239 NTTest* m_test;
240 unsigned int m_workerCount;
241 unsigned int m_threads;
242 ObjList m_workers;
243 };
244
245 class NTWorker : public Thread, public GenObject
246 {
247 public:
248 NTWorker(NTWorkerContainer* container, int port, const char* name = "NTWorker");
249 ~NTWorker();
counters() const250 const Statistics& counters() const
251 { return m_counters; }
252 protected:
253 bool initSocket(Socket* sock = 0, SocketAddr* addr = 0);
254 protected:
255 NTWorkerContainer* m_container;
256 NTTest* m_test;
257 u_int64_t m_timeToDie;
258 Socket m_socket;
259 SocketAddr m_addr;
260 Statistics m_counters;
261 };
262
263 class NTWriter : public NTWorker
264 {
265 public:
NTWriter(NTWorkerContainer * container,int port)266 inline NTWriter(NTWorkerContainer* container, int port)
267 : NTWorker(container,port),
268 m_timeToSend(0)
269 {}
270 virtual void run();
271 private:
272 u_int64_t m_timeToSend;
273 };
274
275 class NTReader : public NTWorker
276 {
277 public:
NTReader(NTWorkerContainer * container,int port)278 inline NTReader(NTWorkerContainer* container, int port)
279 : NTWorker(container,port)
280 {}
281 virtual void run();
282 };
283
284 class NTSelectReader : public NTWorker
285 {
286 public:
287 NTSelectReader(NTWorkerContainer* container, int& port, unsigned int count);
288 virtual ~NTSelectReader();
289 virtual void run();
290 private:
291 Socket* m_sockets;
292 unsigned int m_count;
293 };
294
295 class NTPlugin : public Module
296 {
297 public:
298 NTPlugin();
299 virtual ~NTPlugin();
300 virtual void initialize();
301 virtual bool received(Message& msg, int id);
302 private:
303 bool m_first;
304 };
305
306 // Static data
307 static DataBlock s_stopPattern;
308 static NTTest* s_test = 0;
309 // Config
310 static String s_localip;
311 static unsigned int s_packetLen = 320;
312 static unsigned int s_interval = 20;
313 static unsigned int s_lifetime = 60;
314 static unsigned long s_sleep = 2;
315 // Plugin
316 static NTPlugin plugin;
317
318
319 /**
320 * FDSetSelect
321 */
322 // Constructor
FDSetSelect()323 FDSetSelect::FDSetSelect()
324 : m_read(new PrivateFDSet),
325 m_write(new PrivateFDSet),
326 m_event(new PrivateFDSet),
327 m_crtR(0),
328 m_crtW(0),
329 m_crtE(0),
330 m_maxHandle(Socket::invalidHandle()),
331 m_selectError(false)
332 {
333 }
334
335 // Release private data
~FDSetSelect()336 FDSetSelect::~FDSetSelect()
337 {
338 delete m_read;
339 delete m_write;
340 delete m_event;
341 }
342
343 // Check if data is available for read
canRead(int handle) const344 bool FDSetSelect::canRead(int handle) const
345 {
346 return m_read->isset(handle);
347 }
348
349 // Check if a file descriptor can be used to write data
canWrite(int handle) const350 bool FDSetSelect::canWrite(int handle) const
351 {
352 return m_write->isset(handle);
353 }
354
355
356 // Check if there is a pending event for a given file descriptor
hasEvent(int handle) const357 bool FDSetSelect::hasEvent(int handle) const
358 {
359 return m_event->isset(handle);
360 }
361
362 // Append a file descriptor to read, write and/or event set.
363 // Return false if handle is invalid or target set is missing (all flags are false)
add(int handle,bool read,bool write,bool event)364 bool FDSetSelect::add(int handle, bool read, bool write, bool event)
365 {
366 if (!(read || write || event) || handle == Socket::invalidHandle() ||
367 !Socket::canSelect(handle))
368 return false;
369 if (read) {
370 m_read->add(handle);
371 m_crtR = m_read;
372 }
373 if (write) {
374 m_write->add(handle);
375 m_crtW = m_write;
376 }
377 if (event) {
378 m_event->add(handle);
379 m_crtE = m_event;
380 }
381 if (m_maxHandle < handle)
382 m_maxHandle = handle;
383 return true;
384 }
385
386 // Reset all file descriptor sets
reset()387 void FDSetSelect::reset()
388 {
389 m_read->reset();
390 m_write->reset();
391 m_event->reset();
392 m_crtR = m_crtW = m_crtE = 0;
393 m_maxHandle = Socket::invalidHandle();
394 }
395
396 // Start waiting for a file descriptor state change
select(unsigned int uSec)397 int FDSetSelect::select(unsigned int uSec)
398 {
399 if (m_maxHandle == Socket::invalidHandle())
400 return 0;
401 struct timeval t;
402 t.tv_sec = 0;
403 t.tv_usec = (uSec > 0) ? uSec : 0;
404 m_selectError = false;
405 int result = ::select(m_maxHandle+1,&m_crtR->set,&m_crtW->set,&m_crtE->set,&t);
406 if (result >= 0) {
407 XDebug(DebugAll,"FDSetSelect got %d handlers [%p]",result,this);
408 return result;
409 }
410 bool canRetry = (errno == EAGAIN || errno == EINTR || errno == EBADF);
411 if (!(canRetry || m_selectError)) {
412 Debug(DebugWarn,"FDSetSelect failed: %d: %s [%p]",errno,::strerror(errno),this);
413 m_selectError = true;
414 }
415 return -1;
416 }
417
418
419 /**
420 * Statistics
421 */
output(String & dest)422 void Statistics::output(String& dest)
423 {
424 dest << "=================================================================";
425 #define stat_set64(text,val) \
426 ::sprintf(buf,FMT64,val); \
427 dest << "\r\n" << text << buf;
428 char buf[128];
429 stat_set64("Packets: ",packets);
430 stat_set64("Total (bytes): ",totalBytes);
431 stat_set64("Errors: ",errors);
432 stat_set64("Lost (bytes): ",lostBytes);
433 dest <<"\r\nStopped: " << stopped;
434 u_int64_t stop = msStop ? msStop : Time::msecNow();
435 u_int64_t lenMsec = stop - msStart;
436 u_int64_t lenSec = lenMsec / 1000;
437 if (!lenSec)
438 lenSec = 1;
439 stat_set64("Test length (ms): ",lenMsec);
440 stat_set64("Ratio (Mb/s): ",totalBytes / lenSec * 8 / 1000000);
441 stat_set64("Ratio (packets/s): ",packets/lenSec);
442 #undef stat_set64
443 dest << "\r\n=================================================================";
444 }
445
446
447 /**
448 * NTTest
449 */
NTTest(const char * name)450 NTTest::NTTest(const char* name)
451 : Mutex(true),
452 m_mutex(true),
453 m_port(0),
454 m_threads(0),
455 m_send(true),
456 m_packetLen(0),
457 m_interval(0),
458 m_lifetime(0),
459 m_packetCount(0),
460 m_workerCount(0),
461 m_selectTimeout(-1)
462 {
463 debugChain(&plugin);
464 m_id << plugin.debugName() << "/" << name;
465 debugName(m_id);
466 }
467
init(NamedList & params)468 bool NTTest::init(NamedList& params)
469 {
470 Lock2 lock(*this,m_mutex);
471 stop();
472
473 m_localip = params.getValue("localip",s_localip);
474 if (m_localip.null()) {
475 Debug(this,DebugNote,"Empty localip in section '%s'",debugName());
476 return false;
477 }
478 m_remoteip = params.getValue("remoteip");
479 if (m_remoteip.null()) {
480 Debug(this,DebugNote,"Empty remoteip in section '%s'",debugName());
481 return false;
482 }
483
484 String tmp = params.getValue("port");
485 m_port = tmp.toInteger(0);
486 if (!m_port) {
487 Debug(this,DebugNote,"Invalid port=%s in section '%s'",
488 tmp.c_str(),debugName());
489 return false;
490 }
491
492 m_threads = params.getIntValue("threads",1);
493 if (m_threads < 1)
494 m_threads = 1;
495
496 m_send = params.getBoolValue("send",true);
497
498 m_packetLen = params.getIntValue("packetlen",s_packetLen);
499 if (m_packetLen < 16)
500 m_packetLen = 16;
501 else if (m_packetLen > 1400)
502 m_packetLen = 1400;
503 m_interval = params.getIntValue("interval",s_interval);
504 if (m_interval < 1)
505 m_interval = 1;
506 else if (m_interval > 120)
507 m_interval = 120;
508 m_lifetime = params.getIntValue("lifetime",s_lifetime);
509 bool sendAllPackets = params.getBoolValue("sendallpackets",true);
510 if (sendAllPackets)
511 m_packetCount = m_lifetime * 1000 / m_interval;
512 else
513 m_packetCount = 0;
514 m_selectTimeout = params.getIntValue("select-timeout",-1);
515
516 m_containers.clear();
517 int workersets = params.getIntValue("workersets",1);
518 if (workersets < 1 || (unsigned int)workersets > m_threads)
519 workersets = 1;
520 unsigned int nFull = 0;
521 unsigned int nRest = 0;
522 if (workersets == 1)
523 nRest = m_threads;
524 else {
525 nFull = m_threads / (workersets - 1);
526 nRest = m_threads - nFull * (workersets - 1);
527 }
528 for (int i = 1; i <= workersets; i++) {
529 String id;
530 id << m_id << "/" << i;
531 if (i < workersets)
532 m_containers.append(new NTWorkerContainer(this,nFull,id));
533 else
534 m_containers.append(new NTWorkerContainer(this,nRest,id));
535 }
536
537 unsigned int sock = m_threads;
538 if (m_selectTimeout >= 0)
539 m_threads = workersets;
540
541 tmp = "";
542 tmp << "\r\nAction: " << (m_send ? "send" : "recv");
543 if (m_selectTimeout >= 0)
544 tmp << "\r\nSockets: " << sock;
545 else
546 tmp << "\r\nThreads: " << m_threads;
547 tmp << "\r\nLocal address: " << m_localip;
548 tmp << "\r\nRemote address: " << m_remoteip;
549 tmp << "\r\nPort: " << m_port;
550 tmp << "\r\nPacket length: " << m_packetLen;
551 tmp << "\r\nPackets: " << m_packetCount;
552 tmp << "\r\nInterval: " << m_interval << "ms";
553 tmp << "\r\nLifetime: " << m_lifetime << "s";
554 tmp << "\r\nWorker sets: " << workersets;
555 tmp << "\r\nSelect timeout: " << m_selectTimeout << (m_selectTimeout < 0 ? " (not used)" : "us");
556 Debug(this,DebugInfo,"Initialized:%s",tmp.c_str());
557 return true;
558 }
559
start()560 void NTTest::start()
561 {
562 Lock lock(m_mutex);
563 stop();
564 DDebug(this,DebugAll,"Starting");
565 m_localStats.reset();
566 int port = m_port;
567 for (ObjList* o = m_containers.skipNull(); o; o = o->skipNext())
568 (static_cast<NTWorkerContainer*>(o->get()))->start(port);
569 }
570
stop()571 void NTTest::stop()
572 {
573 Lock lock(m_mutex);
574 DDebug(this,DebugAll,"Stopping %u workers",m_workerCount);
575 for (ObjList* o = m_containers.skipNull(); o; o = o->skipNext())
576 (static_cast<NTWorkerContainer*>(o->get()))->stop();
577 }
578
addWorker()579 void NTTest::addWorker()
580 {
581 Lock lock(this);
582 m_workerCount++;
583 if (m_workerCount == m_threads)
584 Debug(this,DebugAll,"Created %u workers",m_workerCount);
585 }
586
removeWorker(NTWorker * worker)587 void NTTest::removeWorker(NTWorker* worker)
588 {
589 Lock lock(this);
590 if (!(worker && m_workerCount))
591 return;
592 m_localStats += worker->counters();
593 m_workerCount--;
594 if (m_workerCount)
595 return;
596 lock.drop();
597 m_localStats.msStop = Time::msecNow();
598 String tmp;
599 m_localStats.output(tmp);
600 Debug(this,DebugInfo,"No more workers. Local statistics:\r\n%s",tmp.c_str());
601 }
602
603
604 /**
605 * NTWorkerContainer
606 */
NTWorkerContainer(NTTest * test,unsigned int threads,const char * id)607 NTWorkerContainer::NTWorkerContainer(NTTest* test, unsigned int threads, const char* id)
608 : Mutex(true),
609 m_id(id),
610 m_test(test),
611 m_workerCount(0),
612 m_threads(threads)
613 {
614 debugName(m_id);
615 debugChain(m_test);
616 }
617
start(int & port)618 void NTWorkerContainer::start(int& port)
619 {
620 Lock lock(this);
621 stop();
622 if (!m_test)
623 return;
624 lock.drop();
625 DDebug(this,DebugAll,"Starting");
626 if (m_test->selectTimeout() >= 0)
627 (new NTSelectReader(this,port,m_threads))->startup();
628 else
629 for (unsigned int i = 0; i < m_threads; i++, port++)
630 if (m_test->send())
631 (new NTWriter(this,port))->startup();
632 else
633 (new NTReader(this,port))->startup();
634 }
635
stop()636 void NTWorkerContainer::stop()
637 {
638 Lock l(this);
639 DDebug(this,DebugAll,"Stopping %u workers",m_workerCount);
640 if (!m_workerCount)
641 return;
642 if (m_workerCount) {
643 ListIterator iterw(m_workers);
644 for (GenObject* o = 0; 0 != (o = iterw.get());)
645 (static_cast<NTWorker*>(o))->cancel(false);
646 }
647 l.drop();
648 while (m_workerCount)
649 Thread::yield();
650 DDebug(this,DebugAll,"Stopped");
651 }
652
addWorker(NTWorker * worker)653 void NTWorkerContainer::addWorker(NTWorker* worker)
654 {
655 Lock lock(this);
656 if (!worker)
657 return;
658 ObjList* obj = m_workers.append(worker);
659 if (!obj)
660 return;
661 m_workerCount++;
662 obj->setDelete(false);
663 if (m_workerCount >= m_threads)
664 DDebug(this,DebugAll,"Created %u workers",m_workerCount);
665 lock.drop();
666 if (m_test)
667 m_test->addWorker();
668 }
669
removeWorker(NTWorker * worker)670 void NTWorkerContainer::removeWorker(NTWorker* worker)
671 {
672 Lock lock(this);
673 if (!(worker && m_workerCount))
674 return;
675 m_workers.remove(worker,false);
676 if (m_workerCount)
677 m_workerCount--;
678 if (!m_workerCount)
679 DDebug(this,DebugAll,"No more workers");
680 lock.drop();
681 if (m_test)
682 m_test->removeWorker(worker);
683 }
684
685
686 /**
687 * NTWorker
688 */
NTWorker(NTWorkerContainer * container,int port,const char * name)689 NTWorker::NTWorker(NTWorkerContainer* container, int port, const char* name)
690 : Thread(name),
691 m_container(container),
692 m_test(container ? container->test() : 0),
693 m_timeToDie(0),
694 m_addr(AF_INET)
695 {
696 if (!(m_container && m_test))
697 return;
698 container->addWorker(this);
699 m_addr.host(m_test->send() ? m_test->remoteip() : m_test->localip());
700 m_addr.port(port);
701 if (!m_test->packetCount() && m_test->lifetime())
702 m_timeToDie = Time::msecNow() + m_test->lifetime() * 1000;
703 }
704
~NTWorker()705 NTWorker::~NTWorker()
706 {
707 if (m_socket.valid()) {
708 m_socket.setLinger(-1);
709 m_socket.terminate();
710 }
711 if (m_container)
712 m_container->removeWorker(this);
713 }
714
initSocket(Socket * sock,SocketAddr * addr)715 bool NTWorker::initSocket(Socket* sock, SocketAddr* addr)
716 {
717 if (!(m_container && m_test))
718 return false;
719 if (!sock) {
720 sock = &m_socket;
721 addr = &m_addr;
722 }
723 if (!sock->create(addr->family(),SOCK_DGRAM)) {
724 Debug(m_container,DebugNote,"Failed to create socket: %d '%s' [%p]",
725 sock->error(),::strerror(sock->error()),this);
726 return false;
727 }
728 if (!m_test->send() && !sock->bind(*addr)) {
729 Debug(m_container,DebugNote,"Failed to bind socket on port %d: %d '%s' [%p]",
730 addr->port(),sock->error(),::strerror(sock->error()),this);
731 return false;
732 }
733 sock->setBlocking(false);
734 return true;
735 }
736
737
738 /**
739 * NTWriter
740 */
run()741 void NTWriter::run()
742 {
743 if (!initSocket())
744 return;
745 unsigned char buf[m_test->packetLen()];
746 buf[0] = 1;
747 while (true) {
748 u_int64_t now = Time::msecNow();
749 if (now < m_timeToSend) {
750 Thread::msleep(s_sleep,true);
751 continue;
752 }
753
754 bool die = false;
755 if (m_test->packetCount())
756 die = m_counters.packets >= m_test->packetCount();
757 else
758 die = m_timeToDie && now > m_timeToDie;
759 if (die) {
760 Thread::msleep(5,true);
761 if (0 < m_socket.sendTo(s_stopPattern.data(),s_stopPattern.length(),m_addr))
762 m_counters.stopped = 1;
763 break;
764 }
765
766 Thread::check(true);
767 m_timeToSend = now + m_test->interval();
768 int w = m_socket.sendTo(buf,m_test->packetLen(),m_addr);
769 if (w != m_socket.socketError() || m_socket.canRetry()) {
770 if (w == m_socket.socketError())
771 continue;
772 if (w)
773 m_counters.success(w);
774 if ((unsigned int)w < m_test->packetLen())
775 m_counters.failure(m_test->packetLen() - w);
776 continue;
777 }
778 Debug(m_container,DebugNote,"SEND error dest='%s:%d': %d '%s' [%p]",
779 m_addr.host().c_str(),m_addr.port(),
780 m_socket.error(),::strerror(m_socket.error()),this);
781 m_counters.failure(m_test->packetLen());
782 }
783 }
784
785
786 /**
787 * NTReader
788 */
run()789 void NTReader::run()
790 {
791 if (!initSocket())
792 return;
793 unsigned char buf[m_test->packetLen()];
794 SocketAddr addr;
795 while (true) {
796 if (m_timeToDie && (Time::msecNow() > m_timeToDie))
797 break;
798 Thread::msleep(s_sleep,true);
799 int r = m_socket.recvFrom(buf,m_test->packetLen(),addr);
800 if (r > 0) {
801 if (buf[0] == 0) {
802 m_counters.stopped = 1;
803 break;
804 }
805 m_counters.success(r);
806 continue;
807 }
808 if (r == 0 || (r == m_socket.socketError() && m_socket.canRetry()))
809 continue;
810 Debug(m_container,DebugNote,"RECV error src='%s:%d': %d '%s' [%p]",
811 addr.host().c_str(),addr.port(),
812 m_socket.error(),::strerror(m_socket.error()),this);
813 m_counters.failure(0);
814 }
815 }
816
817
818 /**
819 * NTSelectReader
820 */
NTSelectReader(NTWorkerContainer * container,int & port,unsigned int count)821 NTSelectReader::NTSelectReader(NTWorkerContainer* container, int& port, unsigned int count)
822 : NTWorker(container,0,"NTSelectReader"),
823 m_sockets(0),
824 m_count(count)
825 {
826 DDebug(container,DebugAll,"NTSelectReader sockets=%u",count);
827 m_sockets = new Socket[m_count];
828 unsigned int ok = 0;
829 for (unsigned int i = 0; i < count; i++, port++) {
830 SocketAddr addr(AF_INET);
831 addr.host(m_test->localip());
832 addr.port(port);
833 if (initSocket(&m_sockets[i],&addr))
834 ok++;
835 }
836 if (!ok) {
837 Debug(container,DebugNote,"NTSelectReader: Bind or create failed for all sockets");
838 delete[] m_sockets;
839 m_sockets = 0;
840 m_count = 0;
841 }
842 }
843
~NTSelectReader()844 NTSelectReader::~NTSelectReader()
845 {
846 if (!m_sockets)
847 return;
848 for (unsigned int i = 0; i < m_count; i++)
849 if (m_sockets[i].valid()) {
850 m_sockets[i].setLinger(-1);
851 m_sockets[i].terminate();
852 }
853 delete[] m_sockets;
854 }
855
run()856 void NTSelectReader::run()
857 {
858 if (!m_count || !m_test || m_test->send())
859 return;
860 DDebug(m_container,DebugAll,"Select reader worker started");
861 unsigned char buf[m_test->packetLen()];
862 SocketAddr addr;
863 int ok = 0;
864 FDSetSelect set;
865 while (true) {
866 if (m_counters.stopped == m_count)
867 break;
868 if (m_timeToDie && (Time::msecNow() > m_timeToDie))
869 break;
870 set.reset();
871 for (unsigned int i = 0; i < m_count; i++)
872 set.add(m_sockets[i].handle(),true,false,false);
873 ok = set.select(m_container->test()->selectTimeout());
874 if (ok <= 0) {
875 if (!m_test->selectTimeout())
876 Thread::msleep(1,true);
877 continue;
878 }
879 for (unsigned int i = 0; i < m_count; i++) {
880 if (!(m_sockets[i].valid() && set.canRead(m_sockets[i].handle())))
881 continue;
882 int r = m_sockets[i].recvFrom(buf,m_test->packetLen(),addr);
883 if (r > 0) {
884 if (buf[0]) {
885 if ((unsigned int)r != m_test->packetLen())
886 Debug(m_container,DebugMild,"RECV %u expected=%u [%p]",
887 r,m_test->packetLen(),this);
888 m_counters.success(r);
889 }
890 else {
891 m_sockets[i].setLinger(-1);
892 m_sockets[i].terminate();
893 m_counters.stopped++;
894 }
895 continue;
896 }
897 m_counters.failure(0);
898 if (r == 0 || (r == m_sockets[i].socketError() && m_sockets[i].canRetry()))
899 continue;
900 Debug(m_container,DebugNote,"RECV error src='%s:%d': %d '%s' [%p]",
901 addr.host().c_str(),addr.port(),
902 m_sockets[i].error(),::strerror(m_sockets[i].error()),this);
903 }
904 }
905 }
906
907
908 /**
909 * Plugin
910 */
NTPlugin()911 NTPlugin::NTPlugin()
912 : Module("nettest","misc"), m_first(true)
913 {
914 Output("Loaded module Network Test");
915 }
916
~NTPlugin()917 NTPlugin::~NTPlugin()
918 {
919 Output("Unloading module Network Test");
920 }
921
initialize()922 void NTPlugin::initialize()
923 {
924 Output("Initializing module Network Test");
925
926 debugLevel(10);
927
928 if (m_first) {
929 m_first = false;
930 setup();
931 installRelay(Halt);
932 }
933
934 // Reset statistics
935 lock();
936 TelEngine::destruct(s_test);
937
938 // Get new values from config
939 Configuration cfg(Engine::configFile("nettest"));
940 NamedList* general = cfg.getSection("general");
941 NamedList dummy("");
942 if (!general)
943 general = &dummy;
944 s_localip = general->getValue("localip");
945 s_packetLen = general->getIntValue("packetlen",320);
946 if (s_packetLen < 16)
947 s_packetLen = 16;
948 else if (s_packetLen > 1400)
949 s_packetLen = 1400;
950 s_interval = general->getIntValue("interval",20);
951 if (s_interval < 1)
952 s_interval = 1;
953 else if (s_interval > 120)
954 s_interval = 120;
955 s_lifetime = general->getIntValue("lifetime",60);
956 s_sleep = cfg.getIntValue("general","sleep",2);
957 if (s_sleep < 1)
958 s_sleep = 1;
959 else if (s_sleep > 10)
960 s_sleep = 10;
961 s_stopPattern.assign(0,s_packetLen);
962
963 Debug(this,DebugInfo,
964 "Init: localip=%s packet=%u interval=%ums lifetime=%us",
965 s_localip.c_str(),s_packetLen,s_interval,s_lifetime);
966
967 unsigned int n = cfg.sections();
968 for (unsigned int i = 0; i < n; i++) {
969 NamedList* sect = cfg.getSection(i);
970 if (!sect || sect->null() || *sect == "general")
971 continue;
972
973 s_test = new NTTest(*sect);
974 if (!s_test->init(*sect)) {
975 Debug(this,DebugNote,"Failed to init test from section '%s'",sect->c_str());
976 TelEngine::destruct(s_test);
977 continue;
978 }
979
980 s_test->start();
981 break;
982 }
983
984 unlock();
985 }
986
received(Message & msg,int id)987 bool NTPlugin::received(Message& msg, int id)
988 {
989 if (id == Halt)
990 TelEngine::destruct(s_test);
991 return Module::received(msg,id);
992 }
993
994 }; // anonymous namespace
995
996 /* vi: set ts=8 sw=4 sts=4 noet: */
997