1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 #include <iostream>
19 #include <string>
20 #include <ctime>
21 using namespace std;
22 
23 #include <boost/shared_ptr.hpp>
24 using namespace boost;
25 
26 #include "thrift/TProcessor.h"
27 #include "thrift/protocol/TProtocol.h"
28 #include "thrift/protocol/TBinaryProtocol.h"
29 #include "thrift/transport/TServerTransport.h"
30 #include "thrift/transport/TServerSocket.h"
31 #include "thrift/transport/TTransport.h"
32 #include "thrift/transport/TBufferTransports.h"
33 #include "thrift/server/TSimpleServer.h"
34 namespace at = apache::thrift;
35 namespace atp = at::protocol;
36 namespace att = at::transport;
37 namespace ats = at::server;
38 
39 #include "QueryTeleService.h"
40 using namespace querytele;
41 
42 namespace
43 {
44 
45 class QueryTeleServiceHandler : public QueryTeleServiceIf
46 {
47 public:
48     void postQuery(const QueryTele&);
49     void postStep(const StepTele&);
50     void postImport(const ImportTele&);
51 
52 protected:
53 
54 private:
55 
56 };
57 
st2str(enum StepType::type t)58 const string st2str(enum StepType::type t)
59 {
60     switch (t)
61     {
62         case StepType::T_HJS:
63             return "HJS";
64 
65         case StepType::T_DSS:
66             return "DSS";
67 
68         case StepType::T_CES:
69             return "CES";
70 
71         case StepType::T_SQS:
72             return "SQS";
73 
74         case StepType::T_TAS:
75             return "TAS";
76 
77         case StepType::T_TNS:
78             return "TNS";
79 
80         case StepType::T_BPS:
81             return "BPS";
82 
83         case StepType::T_TCS:
84             return "TCS";
85 
86         case StepType::T_HVS:
87             return "HVS";
88 
89         case StepType::T_WFS:
90             return "WFS";
91 
92         case StepType::T_SAS:
93             return "SAS";
94 
95         case StepType::T_TUN:
96             return "TUN";
97 
98         default:
99             return "INV";
100     }
101 
102     return "INV";
103 }
104 
postQuery(const QueryTele & qt)105 void QueryTeleServiceHandler::postQuery(const QueryTele& qt)
106 {
107     cout << "postQuery: " << endl;
108     cout << "  uuid: " << qt.query_uuid << endl;
109 
110     if (qt.msg_type == QTType::QT_SUMMARY)
111         cout << "  mt: SUMMARY" << endl;
112     else if (qt.msg_type == QTType::QT_START)
113         cout << "  mt: START" << endl;
114     else
115         cout << "  mt: PROGRESS" << endl;
116 
117     cout << "  qry: " << qt.query << endl;
118     cout << "  mmpct: " << qt.max_mem_pct << endl;
119     cout << "  cache: " << qt.cache_io << endl;
120     cout << "  nmsgs: " << qt.msg_rcv_cnt << endl;
121     cout << "  rows: " << qt.rows << endl;
122     cout << "  qt: " << qt.query_type << endl;
123     int64_t tt = qt.start_time;
124     cout << "  st: (" << tt << ") ";
125     tt /= 1000;
126     cout << ctime(&tt);
127     tt = qt.end_time;
128     cout << "  et: (" << tt << ") ";
129     tt /= 1000;
130     cout << ctime(&tt);
131     cout << "  sn: " << qt.system_name << endl;
132     cout << "  mn: " << qt.module_name << endl;
133     cout << "  lq: " << qt.local_query << endl;
134     cout << "  dn: " << qt.schema_name << endl;
135     cout << endl;
136 }
137 
postStep(const StepTele & qt)138 void QueryTeleServiceHandler::postStep(const StepTele& qt)
139 {
140     cout << "postStep: " << endl;
141     cout << "  quuid: " << qt.query_uuid << endl;
142     cout << "  uuid: " << qt.step_uuid << endl;
143 
144     if (qt.msg_type == STType::ST_SUMMARY)
145         cout << "  mt: SUMMARY" << endl;
146     else if (qt.msg_type == STType::ST_START)
147         cout << "  mt: START" << endl;
148     else
149         cout << "  mt: PROGRESS" << endl;
150 
151     cout << "  st: " << st2str(qt.step_type) << endl;
152     cout << "  cache: " << qt.cache_io << endl;
153     cout << "  nmsgs: " << qt.msg_rcv_cnt << endl;
154     cout << "  rows: " << qt.rows << endl;
155 
156     if (qt.total_units_of_work > 0)
157         cout << "  pct: " << qt.units_of_work_completed * 100 / qt.total_units_of_work << endl;
158     else
159         cout << "  pct: n/a" << endl;
160 
161     int64_t tt = qt.start_time;
162     cout << "  st: (" << tt << ") ";
163     tt /= 1000;
164     cout << ctime(&tt);
165     tt = qt.end_time;
166     cout << "  et: (" << tt << ") ";
167     tt /= 1000;
168     cout << ctime(&tt);
169     cout << endl;
170 }
171 
postImport(const ImportTele & qt)172 void QueryTeleServiceHandler::postImport(const ImportTele& qt)
173 {
174     cout << "importStep: " << endl;
175     cout << "  juuid: " << qt.job_uuid << endl;
176     cout << "  iuuid: " << qt.import_uuid << endl;
177 
178     if (qt.msg_type == ITType::IT_SUMMARY)
179         cout << "  mt: SUMMARY" << endl;
180     else if (qt.msg_type == ITType::IT_START)
181         cout << "  mt: START" << endl;
182     else if (qt.msg_type == ITType::IT_TERM)
183         cout << "  mt: TERM" << endl;
184     else
185         cout << "  mt: PROGRESS" << endl;
186 
187     if (qt.table_list.empty())
188         cout << "  tn: " << "(empty)" << endl;
189     else
190         cout << "  tn: " << qt.table_list[0] << endl;
191 
192     if (qt.rows_so_far.empty())
193         cout << "  rows: " << "(empty)" << endl;
194     else
195         cout << "  rows: " << qt.rows_so_far[0] << endl;
196 
197     int64_t tt = qt.start_time;
198     cout << "  st: (" << tt << ") ";
199     tt /= 1000;
200     cout << ctime(&tt);
201     tt = qt.end_time;
202     cout << "  et: (" << tt << ") ";
203     tt /= 1000;
204     cout << ctime(&tt);
205     cout << "  sn: " << qt.system_name << endl;
206     cout << "  mn: " << qt.module_name << endl;
207     cout << "  dn: " << qt.schema_name << endl;
208     cout << endl;
209 }
210 
211 }
212 
main(int argc,char ** argv)213 int main(int argc, char** argv)
214 {
215 
216     shared_ptr<atp::TProtocolFactory> protocolFactory(new atp::TBinaryProtocolFactory());
217     shared_ptr<QueryTeleServiceHandler> handler(new QueryTeleServiceHandler());
218     shared_ptr<at::TProcessor> processor(new QueryTeleServiceProcessor(handler));
219     shared_ptr<att::TServerTransport> serverTransport(new att::TServerSocket(9990));
220     shared_ptr<att::TTransportFactory> transportFactory(new att::TBufferedTransportFactory());
221 
222     ats::TSimpleServer server(processor,
223                               serverTransport,
224                               transportFactory,
225                               protocolFactory);
226 
227 
228     cout << "Starting the server..." << endl;
229     server.serve();
230     cout << "done." << endl;
231     return 0;
232 }
233 
234