1
2 /* Web Polygraph http://www.web-polygraph.org/
3 * Copyright 2003-2011 The Measurement Factory
4 * Licensed under the Apache License, Version 2.0 */
5
6 #include "base/polygraph.h"
7
8 #include "runtime/polyBcastChannels.h"
9 #include "probe/ProbeXact.h"
10 #include "probe/PolyProbe.h"
11 #include "probe/ProbeStatMgr.h"
12
ProbeXact(const NetAddr & aCltHost,const NetAddr & aSrvHost,int fd)13 ProbeXact::ProbeXact(const NetAddr &aCltHost,const NetAddr &aSrvHost, int fd):
14 theCltHost(aCltHost), theSrvHost(aSrvHost), theSock(fd), theStats(0), theRdCount(0) {
15 }
16
~ProbeXact()17 ProbeXact::~ProbeXact() {
18 if (theReadR)
19 TheFileScanner->clearRes(theReadR);
20 if (theWriteR)
21 TheFileScanner->clearRes(theWriteR);
22 if (theSock.fd() >= 0)
23 theSock.close();
24 }
25
exec()26 void ProbeXact::exec() {
27 theReadR = TheFileScanner->setFD(theSock.fd(), dirRead, this);
28 theWriteR = TheFileScanner->setFD(theSock.fd(), dirWrite, this);
29
30 theStats = TheProbeStatMgr.stats(cltHost(), srvHost());
31 Assert(theStats);
32
33 theChannels.append(ThePhasesEndChannel);
34 startListen();
35 }
36
finish(const Error & err)37 void ProbeXact::finish(const Error &err) {
38 Assert(owner());
39 if (err)
40 cerr << cltHost() << "<->" << srvHost() << " : error: " << err << endl;
41 owner()->noteXactDone(this); // will destroy
42 }
43
noteReadReady(int fd)44 void ProbeXact::noteReadReady(int fd) {
45 Assert(fd == theSock.fd()); // remove later
46
47 static char buf[16*1024];
48 buf[0] = (char)0;
49 const Size sz = theSock.read(buf, sizeof(buf));
50 theRdCount++;
51
52 if (sz < 0) {
53 finish(Error::Last());
54 } else
55 if (sz == 0) {
56 finish(Error::None());
57 } else
58 if (theRdCount == 1 && buf[0]) {
59 ThePolyProbe->sendStats(theSock, theCltHost);
60 finish(Error::None());
61 } else {
62 theStats->recordRead(sz);
63 }
64 }
65
noteWriteReady(int fd)66 void ProbeXact::noteWriteReady(int fd) {
67 Assert(fd == theSock.fd()); // remove later
68
69 static char buf[16*1024];
70 const Size sz = theSock.write(buf, sizeof(buf));
71
72 if (sz < 0)
73 finish(Error::Last());
74 else
75 theStats->recordWrite(sz);
76 }
77
noteInfoEvent(BcastChannel * ch,InfoEvent ev)78 void ProbeXact::noteInfoEvent(BcastChannel *ch, InfoEvent ev) {
79 Assert(ch == ThePhasesEndChannel);
80 Assert(ev == BcastRcver::ieNone);
81 if (theSock.fd() >= 0) {
82 theSock.close();
83 TheFileScanner->clearRes(theReadR);
84 TheFileScanner->clearRes(theWriteR);
85 }
86 finish(Error::None());
87 }
88