1 #include <unistd.h>
2 #include <iostream>
3 #include <string>
4 #include <cerrno>
5 using namespace std;
6
7 #include <boost/scoped_ptr.hpp>
8 #include <boost/scoped_array.hpp>
9 using namespace boost;
10
11 #ifndef _MSC_VER
12 #include "mcsconfig.h"
13 #endif
14
15 #include "socktype.h"
16
17 #include "exceptclasses.h"
18
19 #include "socketio.h"
20
21 #include "bytestream.h"
22 #include "messagequeue.h"
23 using namespace messageqcpp;
24
25 #include "rowgroup.h"
26 using namespace rowgroup;
27
28 namespace qfe
29 {
30
processReturnedRows(MessageQueueClient * mqc,SockType fd)31 void processReturnedRows(MessageQueueClient* mqc, SockType fd)
32 {
33 scoped_ptr<MessageQueueClient> cleaner(mqc);
34 SBS sbs;
35 sbs = mqc->read();
36 //cerr << "got a bs of " << sbs->length() << " bytes" << endl;
37
38 RowGroup rg;
39 rg.deserialize(*sbs);
40
41 //cerr << "got a base rowgroup with rows of " << rg.getRowSize() << " bytes" << endl;
42 //cerr << rg.toString() << endl;
43
44 ByteStream bs;
45 ByteStream::quadbyte tableOID = 100;
46 bs.reset();
47 bs << tableOID;
48 mqc->write(bs);
49
50 sbs = mqc->read();
51 //cerr << "got a bs of " << sbs->length() << " bytes" << endl;
52 RGData rgd;
53 rgd.deserialize(*sbs, true);
54 rg.setData(&rgd);
55 //cerr << "got a rowgroup with: " << rg.getRowCount() << " rows" << endl;
56
57 socketio::writeString(fd, "OK");
58 Row r;
59
60 while (rg.getRowCount() > 0)
61 {
62 rg.initRow(&r);
63 rg.getRow(0, &r);
64 string csv;
65 bs.reset();
66
67 for (unsigned i = 0; i < rg.getRowCount(); i++)
68 {
69 csv = r.toCSV();
70 bs << csv;
71 r.nextRow();
72 }
73
74 //cerr << "writing " << bs.length() << " bytes back to client" << endl;
75 SockWriteFcn(fd, bs.buf(), bs.length());
76
77 bs.reset();
78 bs << tableOID;
79 mqc->write(bs);
80
81 sbs = mqc->read();
82 //cerr << "got a bs of " << sbs->length() << " bytes" << endl;
83 rgd.deserialize(*sbs, true);
84 rg.setData(&rgd);
85 //cerr << "got a rowgroup with: " << rg.getRowCount() << " rows" << endl;
86 }
87
88 tableOID = 0;
89 bs.reset();
90 bs << tableOID;
91 mqc->write(bs);
92
93 //sync with the client on end-of-results
94 SockWriteFcn(fd, &tableOID, 4);
95 SockReadFcn(fd, &tableOID, 4);
96
97 }
98
99 }
100
101