1 
2 /* Web Polygraph       http://www.web-polygraph.org/
3  * Copyright 2003-2011 The Measurement Factory
4  * Licensed under the Apache License, Version 2.0 */
5 
6 #include "base/polygraph.h"
7 
8 #include "xstd/h/iostream.h"
9 #include "xstd/h/sstream.h"
10 #include "xstd/h/iomanip.h"
11 #include "base/polyVersion.h"
12 
13 // do not try to compile polymon if curses are not supported
14 #if defined(HAVE_CONFIG_H) && (!defined(HAVE_NCURSES_H) || !defined(HAVE_LIBNCURSES))
main()15 	int main() {
16 		cerr << "polymon could not be built without "
17 			<< "a properly installed ncurses library" << endl;
18 		return -1;
19 	}
20 #else
21 
22 #include "xstd/h/math.h"
23 #include "xstd/h/os_std.h"
24 #include "xstd/h/string.h"
25 #include "xstd/h/signal.h"
26 #include "xstd/h/netinet.h"
27 
28 #include <stdlib.h>
29 
30 #if defined(sun)
31 #define bool BOOL
32 #endif
33 #include <ncurses.h>
34 // cleanup after ncurses.h
35 #if defined(bool)
36 #undef bool
37 #endif
38 #if defined(clear)
39 #undef clear
40 #endif
41 #if defined(timeout)
42 #undef timeout
43 #endif
44 
45 #include "xstd/Assert.h"
46 #include "xstd/Time.h"
47 #include "xstd/Socket.h"
48 #include "xstd/Select.h"
49 #include "xstd/Poll.h"
50 #include "xstd/Epoll.h"
51 #include "xstd/AlarmClock.h"
52 #include "xstd/History.h"
53 #include "base/AggrStat.h"
54 #include "base/polyLogCats.h"
55 #include "runtime/NotifMsg.h"
56 
57 
58 
59 // takes care of the keyboard input
60 class KbdMonitor: public FileScanUser {
61 	public:
62 		KbdMonitor(int aFD);
63 		virtual ~KbdMonitor();
64 
65 		virtual void noteReadReady(int fd);
66 
67 	public:
68 		int theFD;
69 		FileScanReserv theReserv;
70 };
71 
72 // handles incoming messages
73 class MsgMonitor: public FileScanUser {
74 	public:
75 		MsgMonitor(Socket aSock);
76 		virtual ~MsgMonitor();
77 
78 		virtual void noteReadReady(int fd);
79 
80 	public:
81 		Socket theSock;
82 		FileScanReserv theReserv;
83 
84 		char theBuf[sizeof(StatusFwdMsg)*64];
85 		int theSize;
86 };
87 
88 // handles periodic operations
89 class Ticker: public AlarmUser {
90 	public:
91 		Ticker(Time aPeriod);
92 		virtual ~Ticker();
93 
94 		virtual void wakeUp(const Alarm &alarm);
95 
96 	public:
97 		Time thePeriod;
98 };
99 
100 class HostFilter;
101 
102 // maintains info about a host
103 class Host {
104 	public:
105 		typedef History<StatusFwdMsg> Log;
106 
107 	public:
108 		Host(const NetAddr &anAddr);
109 
addr() const110 		const NetAddr &addr() const { return theAddr; }
111 		// XXX pre-IPv6 code returned 's_addr' as an int
id() const112 		int id() const { return theAddr.addrN().octet(0); }
113 		int logCat() const;
114 		// XXX pre-IPv6 code returned 's_addr' as an int
lna() const115 		int lna() const { return theAddr.addrN().octet(0); }
log() const116 		const Log &log() const { return theLog; }
117 		const char *runLabel() const;
118 
119 		bool busy() const;
selected() const120 		bool selected() const { return isSelected; }
isClient() const121 		bool isClient() const { return logCat() == lgcCltSide; }
isServer() const122 		bool isServer() const { return logCat() == lgcSrvSide; }
123 
124 		void noteMsg(const StatusFwdMsg &msg); // { theLog.insert(msg); }
selected(bool be)125 		void selected(bool be) { isSelected = be; }
126 
127 		bool matches(const HostFilter &sel) const;
128 
129 	protected:
130 		NetAddr theAddr; // unique host address
131 		Log theLog;      // notification messasge history
132 		bool isSelected;
133 };
134 
135 // used to select groups of hosts
136 struct HostFilter {
137 	struct {
138 		int pos;
139 	} lbl;          // experiment label based selection
140 	struct {
141 		int pos;
142 	} logCat;           // category based selection (aka "side")
143 
HostFilterHostFilter144 	HostFilter() { memset(this, 0, sizeof(*this)); }
145 };
146 
147 
148 // used in some "matrix" windows to remember screen content
149 class WinMatrix {
150 	public:
151 		WinMatrix(int aMaxY, int aMaxX);
152 
operator ()(int y,int x) const153 		double operator ()(int y, int x) const { return theImage[safePos(y,x)]; }
operator ()(int y,int x)154 		double &operator ()(int y, int x) { return theImage[safePos(y,x)]; }
155 
156 	protected:
157 		int safePos(int y, int x) const;
158 
159 	protected:
160 		Array<double> theImage;
161 		int theMaxY;
162 		int theMaxX;
163 };
164 
165 // WINDOW wrapper with a title and event handlers
166 class Window {
167 	public:
168 		typedef void (Window::*EventHandler)(const Host &);
169 
170 	public:
171 		Window(const char *aTitle);
172 		virtual ~Window();
173 
noteAdd(const Host &)174 		virtual void noteAdd(const Host &) {}
noteDel(const Host &)175 		virtual void noteDel(const Host &) {}
noteUpd(const Host &)176 		virtual void noteUpd(const Host &) {}
177 
178 	private:
179 		// disable copying
180 		Window(const Window &);
181 		Window &operator =(const Window &);
182 
183 	public:
184 		WINDOW *const handle;
185 		const char *title;
186 };
187 
188 // general monitor information
189 class InfoWindow: public Window {
190 	public:
191 		InfoWindow(const char *aTitle);
192 
193 		virtual void noteAdd(const Host &);
194 		virtual void noteDel(const Host &);
195 		virtual void noteUpd(const Host &);
196 
197 	protected:
198 		void update();
199 
200 	protected:
201 		int theMsgCnt;       // msgs seen so far
202 		int theListnCnt;     // derived from CopyCnt in the fwded msg
203 };
204 
205 // help window
206 class HelpWindow: public Window {
207 	public:
208 		HelpWindow();
209 
210 	protected:
211 		void update();
212 };
213 
214 // summary information about SmxWindows
215 class MsgSum;
216 class SumWindow: public Window {
217 	public:
218 		SumWindow(const char *aTitle);
219 
220 		virtual void noteAdd(const Host &);
221 		virtual void noteDel(const Host &);
222 		virtual void noteUpd(const Host &);
223 
224 	protected:
225 		void update();
226 		void displaySide(int &y, int x, const MsgSum &sum);
227 		void displayLine(int y, int x, const char *label, const AggrStat &stats, const char *meas, double scale = 1);
228 };
229 
230 // single measurement matrix window
231 class SmxWindow: public Window {
232 	public:
233 		SmxWindow(const char *aTitle);
234 
235 		virtual void noteAdd(const Host &);
236 		virtual void noteDel(const Host &);
237 		virtual void noteUpd(const Host &);
238 
239 	protected:
240 		virtual void displayMsg(int y, int x, const StatusFwdMsg &msg) = 0;
241 		virtual void eraseSlot(int y, int x);
242 
243 	protected:
244 		static int Host2X(const Host &host);
245 		static int Host2Y(const Host &host);
246 		static int Id2X(int id);
247 		static int Id2Y(int id);
248 };
249 
250 // number matrix window
251 class NumxWindow: public SmxWindow {
252 	public:
253 		NumxWindow(const char *aTitle, const char *aFmt);
254 
255 		virtual void noteAdd(const Host &);
256 		virtual void noteDel(const Host &);
257 		virtual void noteUpd(const Host &);
258 
259 	protected:
260 		virtual void displayMsg(int y, int x, const StatusFwdMsg &msg);
261 		virtual void eraseSlot(int y, int x);
262 
263 		virtual double msg2num(const StatusFwdMsg &msg) = 0;
264 
265 		void update();
266 
267 	protected:
268 		const char *theFmt;  // format to use for output
269 		WinMatrix theMatrix; // currently displayed values
270 		double theNumSum;    // accumulator to compute averages and sums
271 		int theNumCnt;       // number of hosts contributed to sum
272 };
273 
274 class MsgSum {
275 	public:
276 		MsgSum();
277 
hostCount() const278 		Counter hostCount() const { return theReqRate.count(); }
279 
280 		MsgSum &operator +=(const StatusFwdMsg &msg);
281 
282 	public:
283 		String theLabels;
284 		AggrStat theReqRate;
285 		AggrStat theRepRate;
286 		AggrStat theBwidth;
287 		AggrStat theRespTime;
288 		AggrStat theDHR;
289 		AggrStat theConnUse;
290 		AggrStat theErrRatio;
291 		AggrStat theXactTotCnt;
292 		AggrStat theTotErrRatio;
293 		AggrStat theSockInstCnt;
294 };
295 
296 /* smx windows for all attributes */
297 
298 struct RunLabelSmxWin: public SmxWindow {
RunLabelSmxWinRunLabelSmxWin299 	RunLabelSmxWin(const char *aTitle): SmxWindow(aTitle) {}
displayMsgRunLabelSmxWin300 	virtual void displayMsg(int y, int x, const StatusFwdMsg &msg) {
301 		mvwprintw(handle, y, x, (char*)"%7.6s", msg.theLabel);
302 	}
303 };
304 
305 struct RunTimeSmxWin: public SmxWindow {
RunTimeSmxWinRunTimeSmxWin306 	RunTimeSmxWin(const char *aTitle): SmxWindow(aTitle) {}
307 	virtual void displayMsg(int y, int x, const StatusFwdMsg &msg);
308 };
309 
310 struct ReqRateSmxWin: public NumxWindow {
ReqRateSmxWinReqRateSmxWin311 	ReqRateSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.1f") {}
msg2numReqRateSmxWin312 	virtual double msg2num(const StatusFwdMsg &msg) {
313 		return (double)msg.theReqRate;
314 	}
315 };
316 
317 struct RepRateSmxWin: public NumxWindow {
RepRateSmxWinRepRateSmxWin318 	RepRateSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.1f") {}
msg2numRepRateSmxWin319 	virtual double msg2num(const StatusFwdMsg &msg) {
320 		return (double)msg.theRepRate;
321 	}
322 };
323 
324 struct BwidthSmxWin: public NumxWindow {
BwidthSmxWinBwidthSmxWin325 	BwidthSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.1f") {}
msg2numBwidthSmxWin326 	virtual double msg2num(const StatusFwdMsg &msg) {
327 		return (double)msg.theBwidth/(1024.*1024/8);
328 	}
329 };
330 
331 struct RespTimeSmxWin: public NumxWindow {
RespTimeSmxWinRespTimeSmxWin332 	RespTimeSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.0f") {}
msg2numRespTimeSmxWin333 	virtual double msg2num(const StatusFwdMsg &msg) {
334 		return msg.theRespTime.msec();
335 	}
336 };
337 
338 struct DHRSmxWin: public NumxWindow {
DHRSmxWinDHRSmxWin339 	DHRSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.2f") {}
msg2numDHRSmxWin340 	virtual double msg2num(const StatusFwdMsg &msg) {
341 		return Max(100*(double)msg.theDHR, -1.);
342 	}
343 };
344 
345 struct ConnUseSmxWin: public NumxWindow {
ConnUseSmxWinConnUseSmxWin346 	ConnUseSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.2f") {}
msg2numConnUseSmxWin347 	virtual double msg2num(const StatusFwdMsg &msg) {
348 		return msg.theConnUse;
349 	}
350 };
351 
352 struct ErrRatioSmxWin: public NumxWindow {
ErrRatioSmxWinErrRatioSmxWin353 	ErrRatioSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.2f") {}
msg2numErrRatioSmxWin354 	virtual double msg2num(const StatusFwdMsg &msg) {
355 		return Max(100*(double)msg.theErrRatio, -1.);
356 	}
357 };
358 
359 struct XactTotCntSmxWin: public NumxWindow {
XactTotCntSmxWinXactTotCntSmxWin360 	XactTotCntSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.0f") {}
msg2numXactTotCntSmxWin361 	virtual double msg2num(const StatusFwdMsg &msg) {
362 		return msg.theXactTotCnt/1000.;
363 	}
364 };
365 
366 struct ErrTotCntSmxWin: public NumxWindow {
ErrTotCntSmxWinErrTotCntSmxWin367 	ErrTotCntSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.2f") {}
msg2numErrTotCntSmxWin368 	virtual double msg2num(const StatusFwdMsg &msg) {
369 		return msg.theErrTotCnt/1000.;
370 	}
371 };
372 
373 struct ErrTotRatioSmxWin: public NumxWindow {
ErrTotRatioSmxWinErrTotRatioSmxWin374 	ErrTotRatioSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.2f") {}
msg2numErrTotRatioSmxWin375 	virtual double msg2num(const StatusFwdMsg &msg) {
376 		return Percent(msg.theErrTotCnt, msg.theXactTotCnt);
377 	}
378 };
379 
380 struct SockInstCntSmxWin: public NumxWindow {
SockInstCntSmxWinSockInstCntSmxWin381 	SockInstCntSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.0f") {}
msg2numSockInstCntSmxWin382 	virtual double msg2num(const StatusFwdMsg &msg) {
383 		return msg.theSockInstCnt;
384 	}
385 };
386 
387 class MsgGapSmxWin: public SmxWindow {
388 	public:
MsgGapSmxWin(const char * aTitle)389 		MsgGapSmxWin(const char *aTitle): SmxWindow(aTitle) {}
390 
391 		virtual void noteUpd(const Host &host);
392 
393 	protected:
394 		virtual void displayMsg(int y, int x, const StatusFwdMsg &msg);
395 
396 	protected:
397 		StatusFwdMsg theLastMsg;
398 };
399 
400 
401 /* globals */
402 
403 static NetAddr TheDisp;
404 
405 static FileScanner *TheScanner = 0;
406 static bool DoShutdown = false;
407 
408 static PtrArray<Host*> TheHosts;
409 static Array<Host*> TheHostIdx; // address -> host map
410 static int TheBusyHostCnt = 0;
411 static PtrArray<String*> TheLabels;
412 static int TheUniqLblCnt = 0;
413 
414 static PtrArray<Window*> TheWins;
415 static int TheWinPos = 0; // current window
416 static Window *TheInfoWin = 0; // special
417 
418 static Window *TheHelpWin = 0; // help window
419 static bool ShowingHelp = false; // true iff help window is shown
420 
421 static HostFilter TheFilter;
422 static char TheFiltWarn[80] = "";
423 static int TheSelHostCnt = 0;
424 
425 static const int TheXMargin = 5;
426 static const int TheYMargin = 4;
427 static const int TheXCellWidth = 7;
428 static const int TheYCellWidth = 2;
429 
430 static void NoteMsg(const StatusFwdMsg &msg);
431 static void Broadcast(Window::EventHandler weh, const Host &host);
432 static void DeleteIdleHosts();
433 static bool AddFirstLabel(const Host *skip, const String &l);
434 static bool DelLastLabel(const Host *skip, const String &l);
435 static void SelectHost(Host *host);
436 static int SwitchWin(int idx);
437 static int SwitchLblFilter(int idx);
438 static int SwitchCatFilter(int idx);
439 static void BuildFiltWarn(ostream &os);
440 
Dbl2Int(double v)441 inline int Dbl2Int(double v) { return (int)rint(v); }
442 
443 
444 /* KbdMonitor */
445 
KbdMonitor(int aFD)446 KbdMonitor::KbdMonitor(int aFD): theFD(aFD) {
447 	theReserv = TheScanner->setFD(theFD, dirRead, this);
448 }
449 
~KbdMonitor()450 KbdMonitor::~KbdMonitor() {
451 	if (theReserv)
452 		TheScanner->clearRes(theReserv);
453 }
454 
noteReadReady(int)455 void KbdMonitor::noteReadReady(int) {
456 
457 	switch (getch()) {
458 		case '0': SwitchWin(0); break;
459 
460 		case KEY_LEFT: SwitchWin(TheWinPos-1); break;
461 		case KEY_RIGHT: SwitchWin(TheWinPos+1); break;
462 
463 		case KEY_DOWN: SwitchLblFilter(TheFilter.lbl.pos - 1); break;
464 		case KEY_UP: SwitchLblFilter(TheFilter.lbl.pos + 1); break;
465 
466 		case 'H':
467 		case 'h':
468 			if (!ShowingHelp) {
469 				ShowingHelp = true;
470 				touchwin(TheHelpWin->handle);
471 				wrefresh(TheHelpWin->handle);
472 				return;
473 			} else
474 				SwitchWin(TheWinPos);
475 			break;
476 
477 		case 'S': SwitchCatFilter(TheFilter.logCat.pos - 1); break;
478 		case 's': SwitchCatFilter(TheFilter.logCat.pos + 1); break;
479 
480 		case 'R': SwitchLblFilter(0); SwitchCatFilter(0); DeleteIdleHosts(); // fall through
481 		case 'r': clearok(curscr, true); SwitchWin(TheWinPos); break;
482 
483 		case 'q':
484 		case 'Q': DoShutdown = true; break;
485 
486 		default: return;
487 	}
488 
489 	if (ShowingHelp)
490 		ShowingHelp = false;
491 
492 	wrefresh(TheWins[TheWinPos]->handle);
493 }
494 
495 /* MsgMonitor */
496 
MsgMonitor(Socket aSock)497 MsgMonitor::MsgMonitor(Socket aSock): theSock(aSock), theSize(0) {
498 	theReserv = TheScanner->setFD(theSock.fd(), dirRead, this);
499 }
500 
~MsgMonitor()501 MsgMonitor::~MsgMonitor() {
502 	if (theReserv)
503 		TheScanner->clearRes(theReserv);
504 	if (theSock)
505 		theSock.close();
506 }
507 
noteReadReady(int)508 void MsgMonitor::noteReadReady(int) {
509 	static const int msgsz = sizeof(StatusFwdMsg);
510 
511 	const int sz = theSock.read(theBuf + theSize, sizeof(theBuf) - theSize);
512 
513 	if (sz < 0) {
514 		if (const Error err = Error::LastExcept(EWOULDBLOCK)) {
515 			cerr << "failed to read from dispatcher at " << TheDisp << ": " << err << endl;
516 			DoShutdown = true;
517 		}
518 		return;
519 	}
520 
521 	if (sz == 0) {
522 		cerr << "dispatcher at " << TheDisp << " quit." << endl;
523 		DoShutdown = true;
524 		return;
525 	}
526 
527 	theSize += sz;
528 
529 	while (theSize >= msgsz) {
530 		StatusFwdMsg msg;
531 		memcpy(&msg, theBuf, msgsz);
532 		theSize -= msgsz;
533 		memmove(theBuf, theBuf + msgsz, theSize);
534 
535 		// handle the message
536 		msg.ntoh();
537 		msg.theRcvTime = TheClock;
538 		NoteMsg(msg);
539 	}
540 	Assert(theSize >= 0);
541 
542 	wrefresh(TheWins[TheWinPos]->handle);
543 }
544 
545 
546 /* Ticker */
547 
Ticker(Time aPeriod)548 Ticker::Ticker(Time aPeriod): thePeriod(aPeriod) {
549 	sleepFor(thePeriod);
550 }
551 
~Ticker()552 Ticker::~Ticker() {
553 }
554 
wakeUp(const Alarm & alarm)555 void Ticker::wakeUp(const Alarm &alarm) {
556 	AlarmUser::wakeUp(alarm);
557 
558 	DeleteIdleHosts();
559 
560 	sleepFor(thePeriod);
561 }
562 
563 
564 /* Host */
565 
Host(const NetAddr & anAddr)566 Host::Host(const NetAddr &anAddr): theAddr(anAddr), theLog(2), isSelected(false) {
567 }
568 
logCat() const569 int Host::logCat() const {
570 	return theLog.depth() ? theLog[0].theCat : lgcAll;
571 }
572 
runLabel() const573 const char *Host::runLabel() const {
574 	return theLog.depth() ? theLog[0].theLabel : 0;
575 }
576 
busy() const577 bool Host::busy() const {
578 	return theLog.depth() && theLog[0].theRcvTime > TheClock.time() - Time::Sec(90);
579 }
580 
noteMsg(const StatusFwdMsg & msg)581 void Host::noteMsg(const StatusFwdMsg &msg) {
582 	theLog.insert(msg);
583 }
584 
matches(const HostFilter & filter) const585 bool Host::matches(const HostFilter &filter) const {
586 	if (filter.logCat.pos != lgcAll && filter.logCat.pos != logCat())
587 		return false;
588 	if (const int l = filter.lbl.pos)
589 		return theLog.depth() && *TheLabels[l] == theLog[0].theLabel;
590 	return true;
591 }
592 
593 
594 /* WinMatrix */
595 
WinMatrix(int aMaxY,int aMaxX)596 WinMatrix::WinMatrix(int aMaxY, int aMaxX): theMaxY(aMaxY), theMaxX(aMaxX) {
597 	theImage.resize((aMaxY+1) * (aMaxX+1));
598 }
599 
safePos(int y,int x) const600 int WinMatrix::safePos(int y, int x) const {
601 	Assert(y <= theMaxY && x <= theMaxX);
602 	const int p = y*(theMaxX+1) + x;
603 	Assert(p < theImage.count());
604 	return p;
605 }
606 
607 /* Window */
608 
Window(const char * aTitle)609 Window::Window(const char *aTitle): handle(newwin(0,0,0,0)), title(aTitle) {
610 	leaveok(handle, true); // do not bother about cursor pos
611 
612 	int maxx, maxy;
613 	(void)maxy; // silence g++ "set but not used" warning
614 	getmaxyx(handle, maxy, maxx);
615 
616 	werase(handle);
617 
618 	// print window title
619 	wattron(handle, A_BOLD);
620 	wattron(handle, A_UNDERLINE);
621 	mvwaddstr(handle, 0, (maxx - strlen(title))/2, (char*)title);
622 	wattroff(handle, A_UNDERLINE);
623 	wattroff(handle, A_BOLD);
624 }
625 
~Window()626 Window::~Window() {
627 	delwin(handle);
628 }
629 
630 
631 /* InfoWindow */
632 
InfoWindow(char const * aTitle)633 InfoWindow::InfoWindow(char const *aTitle): Window(aTitle),
634 	theMsgCnt(0), theListnCnt(0) {
635 	update();
636 }
637 
update()638 void InfoWindow::update() {
639 	const int x = 2;
640 	int y = 3;
641 	mvwprintw(handle, y++, x, (char*)"%20s %s:%d", "dispatcher:", TheDisp.addrA().cstr(), TheDisp.port());
642 	mvwprintw(handle, y++, x, (char*)"%20s %7d", "listeners:", theListnCnt);
643 	y++;
644 	mvwprintw(handle, y++, x, (char*)"%20s %7d", "messages received:", theMsgCnt);
645 	mvwprintw(handle, y++, x, (char*)"%20s %7d", "mean message size:", sizeof(StatusNotifMsg));
646 	mvwprintw(handle, y++, x, (char*)"%20s %7d", "selected hosts:", TheSelHostCnt);
647 	mvwprintw(handle, y++, x, (char*)"%20s %7d", "busy hosts:", TheBusyHostCnt);
648 	mvwprintw(handle, y++, x, (char*)"%20s %7d", "experiments:", TheUniqLblCnt);
649 }
650 
noteAdd(const Host & host)651 void InfoWindow::noteAdd(const Host &host) {
652 	Window::noteAdd(host);
653 	update();
654 }
655 
noteDel(const Host & host)656 void InfoWindow::noteDel(const Host &host) {
657 	Window::noteDel(host);
658 	update();
659 }
660 
noteUpd(const Host & host)661 void InfoWindow::noteUpd(const Host &host) {
662 	Window::noteUpd(host);
663 	if (host.log().depth()) {
664 		theMsgCnt++;
665 		theListnCnt = host.log()[0].theCopyCnt;
666 	}
667 	update();
668 }
669 
670 
671 /* HelpWindow */
672 
HelpWindow()673 HelpWindow::HelpWindow(): Window("Help") {
674 	update();
675 }
676 
update()677 void HelpWindow::update() {
678 	const int x = 2;
679 	int y = 2;
680 	mvwprintw(handle, y++, x, "0:\t\tMonitor Info.");
681 	mvwprintw(handle, y++, x, "LEFT/RIGHT:\tNext/previous tab.");
682 	mvwprintw(handle, y++, x, "UP/DOWN:\tChange label filters.");
683 	mvwprintw(handle, y++, x, "h:\t\tShow/close this help tab.");
684 	mvwprintw(handle, y++, x, "S/s:\t\tChange client/server-side filters.");
685 	mvwprintw(handle, y++, x, "R:\t\tReset filters, remove idle hosts, and refresh screen.");
686 	mvwprintw(handle, y++, x, "r:\t\tRefresh screen.");
687 	mvwprintw(handle, y++, x, "Q/q:\t\tQuit.");
688 }
689 
690 
691 /* SumWindow */
692 
SumWindow(char const * aTitle)693 SumWindow::SumWindow(char const *aTitle): Window(aTitle) {
694 	update();
695 }
696 
update()697 void SumWindow::update() {
698 
699 	// collect summary info from hosts
700 	MsgSum cltSum, srvSum;
701 	for (int i = 0; i < TheHosts.count(); ++i) {
702 		if (const Host *host = TheHosts[i]) {
703 			if (host->log().depth()) {
704 				const StatusFwdMsg &msg = host->log()[0];
705 				if (host->isClient())
706 					cltSum += msg;
707 				else
708 					srvSum += msg;
709 			}
710 		}
711 	}
712 
713 	const int x = 2;
714 
715 	int y = 2;
716 	mvwprintw(handle, y++, x, (char*)"%s (%d hosts)", "Client side:", cltSum.hostCount());
717 	displaySide(y, x+2, cltSum);
718 
719 	y += 1;
720 	mvwprintw(handle, y++, x, (char*)"%s (%d hosts)", "Server side:", srvSum.hostCount());
721 	displaySide(y, x+2, srvSum);
722 
723 	y += 1;
724 	mvwprintw(handle, y++, x+2, (char*)"%20s %7s %8s %9s %9s %s",
725 		"measurement:", "min", "mean", "max", "sum", "unit");
726 }
727 
displaySide(int & y,int x,const MsgSum & sum)728 void SumWindow::displaySide(int &y, int x, const MsgSum &sum) {
729 	//mvwprintw(handle, y++, x, (char*)"%20s %s", "labels:",
730 	//	(const char*)sum.theLabels);
731 
732 	displayLine(y++, x, "load:", sum.theReqRate, "req/sec");
733 	displayLine(y++, x, "throughput:", sum.theRepRate, "rep/sec");
734 	//displayLine(y++, x, "bandwidth:", sum.theBwidth, "Mbit/sec", 1024.*1024/8);
735 
736 	displayLine(y++, x, "response time:", sum.theRespTime, "msec");
737 	displayLine(y++, x, "DHR:", sum.theDHR, "%");
738 	//displayLine(y++, x, "connection use:", sum.theConnUse, "xact/conn");
739 
740 	displayLine(y++, x, "errors total:", sum.theTotErrRatio, "%");
741 	//displayLine(y++, x, "errors now:", sum.theErrRatio, "%");
742 	displayLine(y++, x, "xactions total:", sum.theXactTotCnt, "x10^6", 1e6);
743 	displayLine(y++, x, "open sockets now:", sum.theSockInstCnt, "");
744 }
745 
displayLine(int y,int x,const char * label,const AggrStat & stats,const char * meas,double scale)746 void SumWindow::displayLine(int y, int x, const char *label, const AggrStat &stats, const char *meas, double scale) {
747 	mvwprintw(handle, y, x, (char*)"%20s %7d %8d %9d %9d %s",
748 		label,
749 		Dbl2Int(stats.min()/scale),
750 		Dbl2Int(stats.mean()/scale),
751 		Dbl2Int(stats.max()/scale),
752 		Dbl2Int(stats.sum()/scale),
753 		meas);
754 }
755 
noteAdd(const Host & host)756 void SumWindow::noteAdd(const Host &host) {
757 	Window::noteAdd(host);
758 	update();
759 }
760 
noteDel(const Host & host)761 void SumWindow::noteDel(const Host &host) {
762 	Window::noteDel(host);
763 	update();
764 }
765 
noteUpd(const Host & host)766 void SumWindow::noteUpd(const Host &host) {
767 	Window::noteUpd(host);
768 	update();
769 }
770 
771 
772 /* SmxWindow */
773 
SmxWindow(char const * aTitle)774 SmxWindow::SmxWindow(char const *aTitle): Window(aTitle) {
775 
776 	wattron(handle, A_BOLD);
777 
778 	// horizontal lables
779 	for (int i = 0; i < 10; ++i)
780 		mvwprintw(handle, 2, Id2X(i), (char*)"%7d", i+1);
781 
782 	// vertical labels
783 	for (int i = 0; i < 100; i += 10)
784 		mvwprintw(handle, Id2Y(i), 0, (char*)"%2d", i);
785 
786 	wattroff(handle, A_BOLD);
787 }
788 
noteAdd(const Host & host)789 void SmxWindow::noteAdd(const Host &host) {
790 	Window::noteAdd(host);
791 	displayMsg(Host2Y(host), Host2X(host), host.log()[0]);
792 }
793 
noteDel(const Host & host)794 void SmxWindow::noteDel(const Host &host) {
795 	Window::noteDel(host);
796 	eraseSlot(Host2Y(host), Host2X(host));
797 }
798 
noteUpd(const Host & host)799 void SmxWindow::noteUpd(const Host &host) {
800 	Window::noteUpd(host);
801 	displayMsg(Host2Y(host), Host2X(host), host.log()[0]);
802 }
803 
eraseSlot(int y,int x)804 void SmxWindow::eraseSlot(int y, int x) {
805 	mvwprintw(handle, y, x, (char*)"%7s ", "");
806 }
807 
Host2Y(const Host & host)808 int SmxWindow::Host2Y(const Host &host) {
809 	return Id2Y((host.lna() & 255) - 1);
810 }
811 
Host2X(const Host & host)812 int SmxWindow::Host2X(const Host &host) {
813 	return Id2X((host.lna() & 255) - 1);
814 }
815 
Id2Y(int host_id)816 int SmxWindow::Id2Y(int host_id) {
817 	return TheYMargin + ((host_id % 100) / 10) * TheYCellWidth;
818 }
819 
Id2X(int host_id)820 int SmxWindow::Id2X(int host_id) {
821 	return TheXMargin + (host_id % 10) * TheXCellWidth;
822 }
823 
824 
825 /* NumxWindow */
826 
NumxWindow(const char * aTitle,const char * aFmt)827 NumxWindow::NumxWindow(const char *aTitle, const char *aFmt):
828 	SmxWindow(aTitle), theFmt(aFmt), theMatrix(Id2Y(99),Id2X(99)),
829 	theNumSum(0), theNumCnt(0) {
830 
831 	update();
832 }
833 
noteAdd(const Host & host)834 void NumxWindow::noteAdd(const Host &host) {
835 	theNumCnt++;
836 	SmxWindow::noteAdd(host);
837 }
838 
noteDel(const Host & host)839 void NumxWindow::noteDel(const Host &host) {
840 	theNumCnt--;
841 	SmxWindow::noteDel(host);
842 }
843 
noteUpd(const Host & host)844 void NumxWindow::noteUpd(const Host &host) {
845 	SmxWindow::noteUpd(host);
846 }
847 
displayMsg(int y,int x,const StatusFwdMsg & msg)848 void NumxWindow::displayMsg(int y, int x, const StatusFwdMsg &msg) {
849 	const double n = msg2num(msg);
850 	theNumSum -= theMatrix(y, x);
851 	mvwprintw(handle, y, x, (char*)theFmt, n);
852 	theMatrix(y, x) = n;
853 	theNumSum += n;
854 	update();
855 }
856 
eraseSlot(int y,int x)857 void NumxWindow::eraseSlot(int y, int x) {
858 	theNumSum -= theMatrix(y, x);
859 	SmxWindow::eraseSlot(y, x);
860 	theMatrix(y, x) = 0;
861 	update();
862 }
863 
update()864 void NumxWindow::update() {
865 	int maxx, maxy;
866 	getmaxyx(handle, maxy, maxx);
867 
868     int y = maxy-1;
869 
870 	wattron(handle, A_BOLD);
871 	mvwaddstr(handle, y, Id2X(2) + TheXCellWidth-4, (char*)"cnt:");
872 	mvwaddstr(handle, y, Id2X(4) + TheXCellWidth-4, (char*)"avg:");
873 	mvwaddstr(handle, y, Id2X(6) + TheXCellWidth-4, (char*)"sum:");
874 	wattroff(handle, A_BOLD);
875 
876 	mvwprintw(handle, y, Id2X(3), (char*)"%7d", theNumCnt);
877 	mvwprintw(handle, y, Id2X(5), (char*)theFmt, Ratio(theNumSum,theNumCnt));
878 	mvwprintw(handle, y, Id2X(7), (char*)theFmt, theNumSum);
879 }
880 
881 
882 /* RunTimeSmxWin */
883 
displayMsg(int y,int x,const StatusFwdMsg & msg)884 void RunTimeSmxWin::displayMsg(int y, int x, const StatusFwdMsg &msg) {
885 	const int sec = (msg.theSndTime - msg.theStartTime).sec();
886 	const int min = (sec/60) % 60;
887 	const int hour = sec/3600;
888 	if (hour > 0)
889 		mvwprintw(handle, y, x, (char*)"%4d:%02d", hour, min);
890 	else
891 		mvwprintw(handle, y, x, (char*)"%4d.%02d", min, sec%60);
892 }
893 
894 
895 /* MsgGapSmxWin */
896 
noteUpd(const Host & host)897 void MsgGapSmxWin::noteUpd(const Host &host) {
898 	if (host.log().depth() > 1)
899 		theLastMsg = host.log()[1];
900 	SmxWindow::noteUpd(host);
901 }
902 
displayMsg(int y,int x,const StatusFwdMsg & msg)903 void MsgGapSmxWin::displayMsg(int y, int x, const StatusFwdMsg &msg) {
904 	if (theLastMsg.theRcvTime >= 0)
905 		mvwprintw(handle, y, x, (char*)"%7d",
906 			(msg.theRcvTime - theLastMsg.theRcvTime).sec());
907 	else
908 		mvwprintw(handle, y, x, (char*)"%7s", "?");
909 }
910 
MsgSum()911 MsgSum::MsgSum(): theLabels("") {
912 }
913 
operator +=(const StatusFwdMsg & msg)914 MsgSum &MsgSum::operator +=(const StatusFwdMsg &msg) {
915 	/*if (msg.theLabel && !theLabels.str(msg.theLabel)) { // XXX: not sufficient
916 		if (theLabels)
917 			theLabels += ",";
918 		theLabels += msg.theLabel;
919 	}*/
920 
921 	theReqRate.record(Dbl2Int(msg.theReqRate));
922 	theRepRate.record(Dbl2Int(msg.theRepRate));
923 	theBwidth.record(Dbl2Int(msg.theBwidth));
924 	theRespTime.record(msg.theRespTime.msec());
925 	theDHR.record(Dbl2Int(msg.theDHR*100));
926 	theConnUse.record(Dbl2Int(msg.theConnUse));
927 	theErrRatio.record(Dbl2Int(msg.theErrRatio*100));
928 	theXactTotCnt.record(msg.theXactTotCnt);
929 	theTotErrRatio.record(
930 		Dbl2Int(Percent(msg.theErrTotCnt, msg.theErrTotCnt+msg.theXactTotCnt)));
931 	theSockInstCnt.record(msg.theSockInstCnt);
932 
933 	return *this;
934 }
935 
936 /* local routines */
937 
938 static
AddHost(const NetAddr & addr)939 Host *AddHost(const NetAddr &addr) {
940 	Host *host = new Host(addr);
941 	Host *foundPos = 0;
942 	for (int i = 0; !foundPos && i < TheHosts.count(); ++i) {
943 		if (!TheHosts[i])
944 			foundPos = TheHosts[i] = host;
945 	}
946 	if (!foundPos)
947 		TheHosts.append(host);
948 	TheBusyHostCnt++;
949 	return host;
950 }
951 
952 static
NoteMsg(const StatusFwdMsg & msg)953 void NoteMsg(const StatusFwdMsg &msg) {
954 	//clog << "from " << inet_ntoa(from.sin_addr) << ": " << TheMsg.buf << endl;
955 
956 	NetAddr from(msg.theSndAddr.addr, msg.theSndAddr.port);
957 
958 	// find corresponding host
959 	// XXX pre-IPv6 code used 's_addr' as an int
960 	const int hidx = from.addrN().octet(0) % TheHostIdx.count();
961 	Host *host = TheHostIdx[hidx];
962 
963 	if (host && host->addr() != from) { // collision
964 		host = 0;
965 		// find using linear search
966 		for (int i = 0; !host && i < TheHosts.count(); ++i) {
967 			if (TheHosts[i] && TheHosts[i]->addr() == from)
968 				host = TheHosts[i];
969 		}
970 	}
971 
972 	if (!host) {
973 		host = AddHost(from);
974 		if (!TheHostIdx[hidx])
975 			TheHostIdx[hidx] = host;
976 		AddFirstLabel(host, msg.theLabel);
977 	}
978 
979 	host->noteMsg(msg);
980 
981 	SelectHost(host); // check select status
982 
983 	Broadcast(&Window::noteUpd, *host);
984 }
985 
986 static
Broadcast(Window::EventHandler weh,const Host & host)987 void Broadcast(Window::EventHandler weh, const Host &host) {
988 	if (!host.selected())
989 		(TheInfoWin->*weh)(host); // info window gets all events
990 	else
991 		for (int w = 0; w < TheWins.count(); ++w)
992 			(TheWins[w]->*weh)(host);
993 }
994 
995 #if 0
996 static
997 void Broadcast(Window::EventHandler weh) {
998 	for (int h = 0; h < TheHosts.count(); ++h) {
999 		if (TheHosts[h])
1000 			Broadcast(weh, *TheHosts[h]);
1001 	}
1002 }
1003 #endif
1004 
1005 
1006 static
DeleteIdleHosts()1007 void DeleteIdleHosts() {
1008 	// we will these from scratch:
1009 	TheHostIdx.clear();
1010 	TheSelHostCnt = 0;
1011 
1012 	for (int h = 0; h < TheHosts.count(); ++h) {
1013 		if (Host *host = TheHosts[h]) {
1014 			if (host->busy()) {
1015 				// XXX pre-IPv6 code used 's_addr' as an int
1016 				const int idx = host->addr().addrN().octet(0) % TheHostIdx.count();
1017 				TheHostIdx[idx] = host;
1018 				if (host->selected())
1019 					TheSelHostCnt++;
1020 			} else {
1021 				TheBusyHostCnt--;
1022 				Broadcast(&Window::noteDel, *host);
1023 				DelLastLabel(host, host->runLabel());
1024 				delete host;
1025 				TheHosts[h] = 0;
1026 			}
1027 		}
1028 	}
1029 }
1030 
1031 static
AddFirstLabel(const Host * skip,const String & l)1032 bool AddFirstLabel(const Host *skip, const String &l) {
1033 	if (!l)
1034 		return false; // empty label
1035 
1036 	for (int h = 0; h < TheHosts.count(); ++h) {
1037 		if (TheHosts[h] == skip)
1038 			continue;
1039 		if (TheHosts[h] && l == TheHosts[h]->runLabel())
1040 			return false; // not first label
1041 	}
1042 	TheUniqLblCnt++;
1043 	for (int i = 1; i < TheLabels.count(); ++i) {
1044 		if (!*TheLabels[i]) {
1045 			*TheLabels[i] = l;
1046 			return true;
1047 		}
1048 	}
1049 	TheLabels.append(new String(l));
1050 	return false;
1051 }
1052 
1053 static
DelLastLabel(const Host * skip,const String & l)1054 bool DelLastLabel(const Host *skip, const String &l) {
1055 	for (int h = 0; h < TheHosts.count(); ++h) {
1056 		if (TheHosts[h] == skip)
1057 			continue;
1058 		if (TheHosts[h] && l == TheHosts[h]->runLabel())
1059 			return false; // not last label
1060 	}
1061 	TheUniqLblCnt--;
1062 	for (int i = 1; i < TheLabels.count(); ++i) {
1063 		if (*TheLabels[i] == l) {
1064 			*TheLabels[i] = 0;
1065 			if (i == TheLabels.count()-1)
1066 				delete TheLabels.pop();
1067 			return true;
1068 		}
1069 	}
1070 	Assert(0);
1071 	return false;
1072 }
1073 
1074 static
SwitchWin(int idx)1075 int SwitchWin(int idx) {
1076 	TheWinPos = (idx + TheWins.count()) % TheWins.count();
1077 	touchwin(TheWins[TheWinPos]->handle);
1078 	return TheWinPos;
1079 }
1080 
1081 static
SelectHost(Host * host)1082 void SelectHost(Host *host) {
1083 	const bool wasSel = host->selected();
1084 	const bool isSel = host->matches(TheFilter);
1085 	if (!wasSel && isSel) {
1086 		host->selected(isSel);
1087 		TheSelHostCnt++;
1088 		Broadcast(&Window::noteAdd, *host);
1089 	} else
1090 	if (wasSel && !isSel) {
1091 		TheSelHostCnt--;
1092 		Broadcast(&Window::noteDel, *host);
1093 		host->selected(isSel);
1094 	}
1095 }
1096 
1097 static
SelectHosts()1098 void SelectHosts() {
1099 	for (int h = 0; h < TheHosts.count(); ++h) {
1100 		if (Host *host = TheHosts[h])
1101 			SelectHost(host);
1102 	}
1103 
1104 	// update selection warnings
1105 	ofixedstream s(TheFiltWarn, sizeof(TheFiltWarn));
1106 	BuildFiltWarn(s);
1107 	for (int w = 0; w < TheWins.count(); ++w) {
1108 		if (strlen(TheFiltWarn)) {
1109 			wattron(TheWins[w]->handle, A_BOLD);
1110 			mvwaddstr(TheWins[w]->handle, 1, 0, (char*)"Filters:");
1111 			wattroff(TheWins[w]->handle, A_BOLD);
1112 			mvwprintw(TheWins[w]->handle, 1, 10, (char*)"%-20s", TheFiltWarn);
1113 		} else {
1114 			wattroff(TheWins[w]->handle, A_BOLD);
1115 			mvwprintw(TheWins[w]->handle, 1, 0, (char*)"%-30s", "");
1116 		}
1117 	}
1118 
1119 	touchwin(TheWins[TheWinPos]->handle);
1120 }
1121 
1122 static
SwitchLblFilter(int idx)1123 int SwitchLblFilter(int idx) {
1124 	TheFilter.lbl.pos = (idx + TheLabels.count()) % TheLabels.count();
1125 
1126 	if (TheFilter.lbl.pos)
1127 		for (int i = 0; !*TheLabels[TheFilter.lbl.pos] && i < TheLabels.count(); ++i) {
1128 			TheFilter.lbl.pos++;
1129 			TheFilter.lbl.pos %= TheLabels.count();
1130 		}
1131 
1132 	SelectHosts();
1133 	return TheFilter.lbl.pos;
1134 }
1135 
1136 static
SwitchCatFilter(int idx)1137 int SwitchCatFilter(int idx) {
1138 	TheFilter.logCat.pos = (idx + lgcEnd) % lgcEnd;
1139 	SelectHosts();
1140 	return TheFilter.logCat.pos;
1141 }
1142 
1143 static
BuildFiltWarn(ostream & os)1144 void BuildFiltWarn(ostream &os) {
1145 	if (TheFilter.lbl.pos) {
1146 		Assert(*TheLabels[TheFilter.lbl.pos]);
1147 		os << *TheLabels[TheFilter.lbl.pos];
1148 	}
1149 
1150 	if (TheFilter.logCat.pos) {
1151 		if (TheFilter.lbl.pos)
1152 			os << ',';
1153 		os << (TheFilter.logCat.pos == lgcCltSide ?
1154 			"clt" : "srv");
1155 	}
1156 	os << ends;
1157 }
1158 
main(int argc,char * argv[])1159 int main(int argc, char *argv[]) {
1160 	(void)PolyVersion();
1161 
1162 	if (argc == 2 && String("--help") == argv[1]) {
1163 		cout << "Usage: " << argv[0]
1164 			<< " [--help] [udp2tcpd_ip [udp2tcpd_port]]"
1165 			<< endl;
1166 		return 0;
1167 	}
1168 
1169 	const char *disph = argc >= 2 ? argv[1] : "127.0.0.1";
1170 	const int dispp = argc >= 3 ? atoi(argv[2]) : 18256;
1171 	TheDisp = NetAddr(disph, dispp);
1172 
1173 	Socket sock;
1174 	Must(sock.create(TheDisp.addrN().family()));
1175 	if (!Should(sock.connect(TheDisp))) {
1176 		cerr << "failed to connect to udp2tcp dispatcher at " << TheDisp << endl;
1177 		return -2;
1178 	}
1179 	Must(sock.blocking(false));
1180 
1181 	Must(initscr());
1182 	cbreak();
1183 	noecho();
1184 	nonl();
1185 	intrflush(stdscr,false);
1186 	keypad(stdscr,true);
1187 	curs_set(0);
1188 	wtimeout(stdscr, 0); // delay for cusrses input, msec; zero == nonblocking
1189 
1190 	TheHostIdx.resize(256);
1191 	TheLabels.append(new String);
1192 
1193 	TheWins.append(TheInfoWin = new InfoWindow("Monitor Info"));
1194 	TheWins.append(new SumWindow("Summary Information"));
1195 	TheWins.append(new RunLabelSmxWin("Experiment Label"));
1196 	TheWins.append(new RunTimeSmxWin("Run time [h:m.s]"));
1197 	TheWins.append(new ReqRateSmxWin("Load [requests/sec]"));
1198 	TheWins.append(new RepRateSmxWin("Throughput [replies/sec]"));
1199 	TheWins.append(new BwidthSmxWin("Network Bandwidth [Mbps,replies]"));
1200 	TheWins.append(new RespTimeSmxWin("Response Time [msec]"));
1201 	TheWins.append(new DHRSmxWin("DHR [%]"));
1202 	TheWins.append(new ConnUseSmxWin("Connection Use [xact/conn]"));
1203 	TheWins.append(new ErrTotRatioSmxWin("Errors Total [%]"));
1204 	TheWins.append(new ErrTotCntSmxWin("Errors Total Count x 1000"));
1205 	TheWins.append(new XactTotCntSmxWin("Total Xaction Count x 1000"));
1206 	TheWins.append(new ErrRatioSmxWin("Errors Now [%]"));
1207 	TheWins.append(new SockInstCntSmxWin("Open Sockets Now"));
1208 	TheWins.append(new MsgGapSmxWin("Message Gap [sec]"));
1209 
1210 	TheHelpWin = new HelpWindow;
1211 
1212 	signal(SIGPIPE, SIG_IGN);
1213 	Clock::Update();
1214 
1215 	TheScanner = new PG_PREFFERED_FILE_SCANNER;
1216 	TheScanner->configure();
1217 
1218 	MsgMonitor *msgSrv = new MsgMonitor(sock);
1219 	KbdMonitor *kbdSrv = new KbdMonitor(0); // XXX: how to get stdin fd?
1220 	Ticker *ticker = new Ticker(Time::Sec(60));
1221 
1222 	SwitchWin(0);
1223 	wrefresh(TheWins[TheWinPos]->handle);
1224 
1225 	while (!DoShutdown) {
1226 		Clock::Update();
1227 		Time tout = TheAlarmClock.on() ?
1228 			TheAlarmClock.timeLeft() : Time();
1229 		TheScanner->scan(TheAlarmClock.on() ? 0 : &tout);
1230 	}
1231 
1232 	delete ticker;
1233 	delete msgSrv;
1234 	delete kbdSrv;
1235 	delete TheHelpWin;
1236 	delete TheScanner;
1237 
1238 	endwin();
1239 
1240 	return 0;
1241 }
1242 
1243 
1244 #endif /* HAVE_NCURSES_H */
1245