1 
2 /* Web Polygraph       http://www.web-polygraph.org/
3  * Copyright 2003-2011 The Measurement Factory
4  * Licensed under the Apache License, Version 2.0 */
5 
6 #include "base/polygraph.h"
7 
8 #include "runtime/polyBcastChannels.h"
9 #include "probe/ProbeXact.h"
10 #include "probe/PolyProbe.h"
11 #include "probe/ProbeStatMgr.h"
12 
ProbeXact(const NetAddr & aCltHost,const NetAddr & aSrvHost,int fd)13 ProbeXact::ProbeXact(const NetAddr &aCltHost,const NetAddr &aSrvHost, int fd):
14 	theCltHost(aCltHost), theSrvHost(aSrvHost), theSock(fd), theStats(0), theRdCount(0) {
15 }
16 
~ProbeXact()17 ProbeXact::~ProbeXact() {
18 	if (theReadR)
19 		TheFileScanner->clearRes(theReadR);
20 	if (theWriteR)
21 		TheFileScanner->clearRes(theWriteR);
22 	if (theSock.fd() >= 0)
23 		theSock.close();
24 }
25 
exec()26 void ProbeXact::exec() {
27 	theReadR = TheFileScanner->setFD(theSock.fd(), dirRead, this);
28 	theWriteR = TheFileScanner->setFD(theSock.fd(), dirWrite, this);
29 
30 	theStats = TheProbeStatMgr.stats(cltHost(), srvHost());
31 	Assert(theStats);
32 
33 	theChannels.append(ThePhasesEndChannel);
34 	startListen();
35 }
36 
finish(const Error & err)37 void ProbeXact::finish(const Error &err) {
38 	Assert(owner());
39 	if (err)
40 		cerr << cltHost() << "<->" << srvHost() << " : error: " << err << endl;
41 	owner()->noteXactDone(this); // will destroy
42 }
43 
noteReadReady(int fd)44 void ProbeXact::noteReadReady(int fd) {
45 	Assert(fd == theSock.fd()); // remove later
46 
47 	static char buf[16*1024];
48 	buf[0] = (char)0;
49 	const Size sz = theSock.read(buf, sizeof(buf));
50 	theRdCount++;
51 
52 	if (sz < 0) {
53 		finish(Error::Last());
54 	} else
55 	if (sz == 0) {
56 		finish(Error::None());
57 	} else
58 	if (theRdCount == 1 && buf[0]) {
59 		ThePolyProbe->sendStats(theSock, theCltHost);
60 		finish(Error::None());
61 	} else {
62 		theStats->recordRead(sz);
63 	}
64 }
65 
noteWriteReady(int fd)66 void ProbeXact::noteWriteReady(int fd) {
67 	Assert(fd == theSock.fd()); // remove later
68 
69 	static char buf[16*1024];
70 	const Size sz = theSock.write(buf, sizeof(buf));
71 
72 	if (sz < 0)
73 		finish(Error::Last());
74 	else
75 		theStats->recordWrite(sz);
76 }
77 
noteInfoEvent(BcastChannel * ch,InfoEvent ev)78 void ProbeXact::noteInfoEvent(BcastChannel *ch, InfoEvent ev) {
79 	Assert(ch == ThePhasesEndChannel);
80 	Assert(ev == BcastRcver::ieNone);
81 	if (theSock.fd() >= 0) {
82 		theSock.close();
83 		TheFileScanner->clearRes(theReadR);
84 		TheFileScanner->clearRes(theWriteR);
85 	}
86 	finish(Error::None());
87 }
88