1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 #include <iostream>
19 #include <string>
20 #include <ctime>
21 using namespace std;
22
23 #include <boost/shared_ptr.hpp>
24 using namespace boost;
25
26 #include "thrift/TProcessor.h"
27 #include "thrift/protocol/TProtocol.h"
28 #include "thrift/protocol/TBinaryProtocol.h"
29 #include "thrift/transport/TServerTransport.h"
30 #include "thrift/transport/TServerSocket.h"
31 #include "thrift/transport/TTransport.h"
32 #include "thrift/transport/TBufferTransports.h"
33 #include "thrift/server/TSimpleServer.h"
34 namespace at = apache::thrift;
35 namespace atp = at::protocol;
36 namespace att = at::transport;
37 namespace ats = at::server;
38
39 #include "QueryTeleService.h"
40 using namespace querytele;
41
42 namespace
43 {
44
45 class QueryTeleServiceHandler : public QueryTeleServiceIf
46 {
47 public:
48 void postQuery(const QueryTele&);
49 void postStep(const StepTele&);
50 void postImport(const ImportTele&);
51
52 protected:
53
54 private:
55
56 };
57
st2str(enum StepType::type t)58 const string st2str(enum StepType::type t)
59 {
60 switch (t)
61 {
62 case StepType::T_HJS:
63 return "HJS";
64
65 case StepType::T_DSS:
66 return "DSS";
67
68 case StepType::T_CES:
69 return "CES";
70
71 case StepType::T_SQS:
72 return "SQS";
73
74 case StepType::T_TAS:
75 return "TAS";
76
77 case StepType::T_TNS:
78 return "TNS";
79
80 case StepType::T_BPS:
81 return "BPS";
82
83 case StepType::T_TCS:
84 return "TCS";
85
86 case StepType::T_HVS:
87 return "HVS";
88
89 case StepType::T_WFS:
90 return "WFS";
91
92 case StepType::T_SAS:
93 return "SAS";
94
95 case StepType::T_TUN:
96 return "TUN";
97
98 default:
99 return "INV";
100 }
101
102 return "INV";
103 }
104
postQuery(const QueryTele & qt)105 void QueryTeleServiceHandler::postQuery(const QueryTele& qt)
106 {
107 cout << "postQuery: " << endl;
108 cout << " uuid: " << qt.query_uuid << endl;
109
110 if (qt.msg_type == QTType::QT_SUMMARY)
111 cout << " mt: SUMMARY" << endl;
112 else if (qt.msg_type == QTType::QT_START)
113 cout << " mt: START" << endl;
114 else
115 cout << " mt: PROGRESS" << endl;
116
117 cout << " qry: " << qt.query << endl;
118 cout << " mmpct: " << qt.max_mem_pct << endl;
119 cout << " cache: " << qt.cache_io << endl;
120 cout << " nmsgs: " << qt.msg_rcv_cnt << endl;
121 cout << " rows: " << qt.rows << endl;
122 cout << " qt: " << qt.query_type << endl;
123 int64_t tt = qt.start_time;
124 cout << " st: (" << tt << ") ";
125 tt /= 1000;
126 cout << ctime(&tt);
127 tt = qt.end_time;
128 cout << " et: (" << tt << ") ";
129 tt /= 1000;
130 cout << ctime(&tt);
131 cout << " sn: " << qt.system_name << endl;
132 cout << " mn: " << qt.module_name << endl;
133 cout << " lq: " << qt.local_query << endl;
134 cout << " dn: " << qt.schema_name << endl;
135 cout << endl;
136 }
137
postStep(const StepTele & qt)138 void QueryTeleServiceHandler::postStep(const StepTele& qt)
139 {
140 cout << "postStep: " << endl;
141 cout << " quuid: " << qt.query_uuid << endl;
142 cout << " uuid: " << qt.step_uuid << endl;
143
144 if (qt.msg_type == STType::ST_SUMMARY)
145 cout << " mt: SUMMARY" << endl;
146 else if (qt.msg_type == STType::ST_START)
147 cout << " mt: START" << endl;
148 else
149 cout << " mt: PROGRESS" << endl;
150
151 cout << " st: " << st2str(qt.step_type) << endl;
152 cout << " cache: " << qt.cache_io << endl;
153 cout << " nmsgs: " << qt.msg_rcv_cnt << endl;
154 cout << " rows: " << qt.rows << endl;
155
156 if (qt.total_units_of_work > 0)
157 cout << " pct: " << qt.units_of_work_completed * 100 / qt.total_units_of_work << endl;
158 else
159 cout << " pct: n/a" << endl;
160
161 int64_t tt = qt.start_time;
162 cout << " st: (" << tt << ") ";
163 tt /= 1000;
164 cout << ctime(&tt);
165 tt = qt.end_time;
166 cout << " et: (" << tt << ") ";
167 tt /= 1000;
168 cout << ctime(&tt);
169 cout << endl;
170 }
171
postImport(const ImportTele & qt)172 void QueryTeleServiceHandler::postImport(const ImportTele& qt)
173 {
174 cout << "importStep: " << endl;
175 cout << " juuid: " << qt.job_uuid << endl;
176 cout << " iuuid: " << qt.import_uuid << endl;
177
178 if (qt.msg_type == ITType::IT_SUMMARY)
179 cout << " mt: SUMMARY" << endl;
180 else if (qt.msg_type == ITType::IT_START)
181 cout << " mt: START" << endl;
182 else if (qt.msg_type == ITType::IT_TERM)
183 cout << " mt: TERM" << endl;
184 else
185 cout << " mt: PROGRESS" << endl;
186
187 if (qt.table_list.empty())
188 cout << " tn: " << "(empty)" << endl;
189 else
190 cout << " tn: " << qt.table_list[0] << endl;
191
192 if (qt.rows_so_far.empty())
193 cout << " rows: " << "(empty)" << endl;
194 else
195 cout << " rows: " << qt.rows_so_far[0] << endl;
196
197 int64_t tt = qt.start_time;
198 cout << " st: (" << tt << ") ";
199 tt /= 1000;
200 cout << ctime(&tt);
201 tt = qt.end_time;
202 cout << " et: (" << tt << ") ";
203 tt /= 1000;
204 cout << ctime(&tt);
205 cout << " sn: " << qt.system_name << endl;
206 cout << " mn: " << qt.module_name << endl;
207 cout << " dn: " << qt.schema_name << endl;
208 cout << endl;
209 }
210
211 }
212
main(int argc,char ** argv)213 int main(int argc, char** argv)
214 {
215
216 shared_ptr<atp::TProtocolFactory> protocolFactory(new atp::TBinaryProtocolFactory());
217 shared_ptr<QueryTeleServiceHandler> handler(new QueryTeleServiceHandler());
218 shared_ptr<at::TProcessor> processor(new QueryTeleServiceProcessor(handler));
219 shared_ptr<att::TServerTransport> serverTransport(new att::TServerSocket(9990));
220 shared_ptr<att::TTransportFactory> transportFactory(new att::TBufferedTransportFactory());
221
222 ats::TSimpleServer server(processor,
223 serverTransport,
224 transportFactory,
225 protocolFactory);
226
227
228 cout << "Starting the server..." << endl;
229 server.serve();
230 cout << "done." << endl;
231 return 0;
232 }
233
234