1 #include <unistd.h>
2 #include <iostream>
3 #include <string>
4 #include <cerrno>
5 using namespace std;
6 
7 #include <boost/scoped_ptr.hpp>
8 #include <boost/scoped_array.hpp>
9 using namespace boost;
10 
11 #ifndef _MSC_VER
12 #include "mcsconfig.h"
13 #endif
14 
15 #include "socktype.h"
16 
17 #include "exceptclasses.h"
18 
19 #include "socketio.h"
20 
21 #include "bytestream.h"
22 #include "messagequeue.h"
23 using namespace messageqcpp;
24 
25 #include "rowgroup.h"
26 using namespace rowgroup;
27 
28 namespace qfe
29 {
30 
processReturnedRows(MessageQueueClient * mqc,SockType fd)31 void processReturnedRows(MessageQueueClient* mqc, SockType fd)
32 {
33     scoped_ptr<MessageQueueClient> cleaner(mqc);
34     SBS sbs;
35     sbs = mqc->read();
36     //cerr << "got a bs of " << sbs->length() << " bytes" << endl;
37 
38     RowGroup rg;
39     rg.deserialize(*sbs);
40 
41     //cerr << "got a base rowgroup with rows of " << rg.getRowSize() << " bytes" << endl;
42     //cerr << rg.toString() << endl;
43 
44     ByteStream bs;
45     ByteStream::quadbyte tableOID = 100;
46     bs.reset();
47     bs << tableOID;
48     mqc->write(bs);
49 
50     sbs = mqc->read();
51     //cerr << "got a bs of " << sbs->length() << " bytes" << endl;
52     RGData rgd;
53     rgd.deserialize(*sbs, true);
54     rg.setData(&rgd);
55     //cerr << "got a rowgroup with: " << rg.getRowCount() << " rows" << endl;
56 
57     socketio::writeString(fd, "OK");
58     Row r;
59 
60     while (rg.getRowCount() > 0)
61     {
62         rg.initRow(&r);
63         rg.getRow(0, &r);
64         string csv;
65         bs.reset();
66 
67         for (unsigned i = 0; i < rg.getRowCount(); i++)
68         {
69             csv = r.toCSV();
70             bs << csv;
71             r.nextRow();
72         }
73 
74         //cerr << "writing " << bs.length() << " bytes back to client" << endl;
75         SockWriteFcn(fd, bs.buf(), bs.length());
76 
77         bs.reset();
78         bs << tableOID;
79         mqc->write(bs);
80 
81         sbs = mqc->read();
82         //cerr << "got a bs of " << sbs->length() << " bytes" << endl;
83         rgd.deserialize(*sbs, true);
84         rg.setData(&rgd);
85         //cerr << "got a rowgroup with: " << rg.getRowCount() << " rows" << endl;
86     }
87 
88     tableOID = 0;
89     bs.reset();
90     bs << tableOID;
91     mqc->write(bs);
92 
93     //sync with the client on end-of-results
94     SockWriteFcn(fd, &tableOID, 4);
95     SockReadFcn(fd, &tableOID, 4);
96 
97 }
98 
99 }
100 
101