1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 #include <unistd.h>
19 #include <stdint.h>
20 #include <string>
21 using namespace std;
22 
23 #include <boost/uuid/uuid.hpp>
24 #include <boost/uuid/uuid_io.hpp>
25 namespace bu = boost::uuids;
26 
27 #include "queryteleserverparms.h"
28 #include "querytele_types.h"
29 #include "QueryTeleService.h"
30 #include "queryteleprotoimpl.h"
31 #include "telestats.h"
32 
33 #include "queryteleclient.h"
34 
35 namespace
36 {
37 using namespace querytele;
38 
39 #define QT_ASSIGN_(x) out. __set_ ## x (qts. x)
qts2qt(const QueryTeleStats & qts)40 QueryTele qts2qt(const QueryTeleStats& qts)
41 {
42     QueryTele out;
43 
44     out.query_uuid = bu::to_string(qts.query_uuid);
45 
46     switch (qts.msg_type)
47     {
48         case QueryTeleStats::QT_SUMMARY:
49             out.msg_type = QTType::QT_SUMMARY;
50             break;
51 
52         case QueryTeleStats::QT_PROGRESS:
53             out.msg_type = QTType::QT_PROGRESS;
54             break;
55 
56         case QueryTeleStats::QT_START:
57             out.msg_type = QTType::QT_START;
58             break;
59 
60         default:
61             out.msg_type = QTType::QT_INVALID;
62             break;
63     }
64 
65     QT_ASSIGN_(max_mem_pct);
66     QT_ASSIGN_(num_files);
67     QT_ASSIGN_(phy_io);
68     QT_ASSIGN_(cache_io);
69     QT_ASSIGN_(msg_rcv_cnt);
70     QT_ASSIGN_(cp_blocks_skipped);
71     QT_ASSIGN_(msg_bytes_in);
72     QT_ASSIGN_(msg_bytes_out);
73     QT_ASSIGN_(rows);
74     QT_ASSIGN_(start_time);
75     QT_ASSIGN_(end_time);
76     QT_ASSIGN_(error_no);
77     QT_ASSIGN_(blocks_changed);
78     QT_ASSIGN_(session_id);
79     QT_ASSIGN_(query_type);
80     QT_ASSIGN_(query);
81     QT_ASSIGN_(user);
82     QT_ASSIGN_(host);
83     QT_ASSIGN_(priority);
84     QT_ASSIGN_(priority_level);
85     QT_ASSIGN_(system_name);
86     QT_ASSIGN_(module_name);
87     QT_ASSIGN_(local_query);
88     QT_ASSIGN_(schema_name);
89 
90     return out;
91 }
92 #undef QT_ASSIGN_
93 
94 #define QT_ASSIGN_(x) out. __set_ ## x (sts. x)
sts2st(const StepTeleStats & sts)95 StepTele sts2st(const StepTeleStats& sts)
96 {
97     StepTele out;
98 
99     out.query_uuid = bu::to_string(sts.query_uuid);
100 
101     switch (sts.msg_type)
102     {
103         case StepTeleStats::ST_SUMMARY:
104             out.msg_type = STType::ST_SUMMARY;
105             break;
106 
107         case StepTeleStats::ST_PROGRESS:
108             out.msg_type = STType::ST_PROGRESS;
109             break;
110 
111         case StepTeleStats::ST_START:
112             out.msg_type = STType::ST_START;
113             break;
114 
115         default:
116             out.msg_type = STType::ST_INVALID;
117             break;
118     }
119 
120     out.step_uuid = bu::to_string(sts.step_uuid);
121     QT_ASSIGN_(phy_io);
122     QT_ASSIGN_(cache_io);
123     QT_ASSIGN_(msg_rcv_cnt);
124     QT_ASSIGN_(cp_blocks_skipped);
125     QT_ASSIGN_(msg_bytes_in);
126     QT_ASSIGN_(msg_bytes_out);
127     QT_ASSIGN_(rows);
128     QT_ASSIGN_(start_time);
129     QT_ASSIGN_(end_time);
130     QT_ASSIGN_(total_units_of_work);
131     QT_ASSIGN_(units_of_work_completed);
132 
133     return out;
134 }
135 #undef QT_ASSIGN_
136 
137 #define QT_ASSIGN_(x) out. __set_ ## x (its. x)
its2it(const ImportTeleStats & its)138 ImportTele its2it(const ImportTeleStats& its)
139 {
140     ImportTele out;
141 
142     out.job_uuid = bu::to_string(its.job_uuid);
143     out.import_uuid = bu::to_string(its.import_uuid);
144 
145     switch (its.msg_type)
146     {
147         case ImportTeleStats::IT_SUMMARY:
148             out.msg_type = ITType::IT_SUMMARY;
149             break;
150 
151         case ImportTeleStats::IT_PROGRESS:
152             out.msg_type = ITType::IT_PROGRESS;
153             break;
154 
155         case ImportTeleStats::IT_START:
156             out.msg_type = ITType::IT_START;
157             break;
158 
159         case ImportTeleStats::IT_TERM:
160             out.msg_type = ITType::IT_TERM;
161             break;
162 
163         default:
164             out.msg_type = ITType::IT_INVALID;
165             break;
166     }
167 
168     QT_ASSIGN_(start_time);
169     QT_ASSIGN_(end_time);
170     QT_ASSIGN_(table_list);
171     QT_ASSIGN_(rows_so_far);
172     QT_ASSIGN_(system_name);
173     QT_ASSIGN_(module_name);
174     QT_ASSIGN_(schema_name);
175 
176     return out;
177 }
178 #undef QT_ASSIGN_
179 
180 }
181 
182 namespace querytele
183 {
184 
QueryTeleClient(const QueryTeleServerParms & sp)185 QueryTeleClient::QueryTeleClient(const QueryTeleServerParms& sp) :
186     fProtoImpl(0),
187     fServerParms(sp)
188 {
189     if (fServerParms.host.empty() || fServerParms.port == 0) return;
190 
191     fProtoImpl = new QueryTeleProtoImpl(fServerParms);
192 }
193 
~QueryTeleClient()194 QueryTeleClient::~QueryTeleClient()
195 {
196     delete fProtoImpl;
197 }
198 
QueryTeleClient(const QueryTeleClient & rhs)199 QueryTeleClient::QueryTeleClient(const QueryTeleClient& rhs) :
200     fProtoImpl(0)
201 {
202     fServerParms = rhs.fServerParms;
203 
204     if (rhs.fProtoImpl)
205     {
206         fProtoImpl = new QueryTeleProtoImpl(*rhs.fProtoImpl);
207     }
208 }
209 
operator =(const QueryTeleClient & rhs)210 QueryTeleClient& QueryTeleClient::operator=(const QueryTeleClient& rhs)
211 {
212     if (&rhs != this)
213     {
214         fProtoImpl = 0;
215         fServerParms = rhs.fServerParms;
216 
217         if (rhs.fProtoImpl)
218         {
219             fProtoImpl = new QueryTeleProtoImpl(*rhs.fProtoImpl);
220         }
221     }
222 
223     return *this;
224 }
225 
serverParms(const QueryTeleServerParms & sp)226 void QueryTeleClient::serverParms(const QueryTeleServerParms& sp)
227 {
228     fServerParms = sp;
229     delete fProtoImpl;
230     fProtoImpl = 0;
231 
232     if (fServerParms.host.empty() || fServerParms.port == 0) return;
233 
234     fProtoImpl = new QueryTeleProtoImpl(fServerParms);
235 }
236 
postQueryTele(const QueryTeleStats & qts)237 void QueryTeleClient::postQueryTele(const QueryTeleStats& qts)
238 {
239     if (!fProtoImpl) return;
240 
241     QueryTele qtdata = qts2qt(qts);
242     fProtoImpl->enqQueryTele(qtdata);
243 }
244 
245 #define QT_STYPE_CASE_(x) case StepTeleStats:: x: {stdata.step_type = StepType:: x; break;}
postStepTele(const StepTeleStats & sts)246 void QueryTeleClient::postStepTele(const StepTeleStats& sts)
247 {
248     if (!fProtoImpl) return;
249 
250     StepTele stdata = sts2st(sts);
251 
252     switch (fStepParms.stepType)
253     {
254             QT_STYPE_CASE_(T_HJS);
255             QT_STYPE_CASE_(T_DSS);
256             QT_STYPE_CASE_(T_CES);
257             QT_STYPE_CASE_(T_SQS);
258             QT_STYPE_CASE_(T_TAS);
259             QT_STYPE_CASE_(T_TNS);
260             QT_STYPE_CASE_(T_BPS);
261             QT_STYPE_CASE_(T_TCS);
262             QT_STYPE_CASE_(T_HVS);
263             QT_STYPE_CASE_(T_WFS);
264             QT_STYPE_CASE_(T_SAS);
265             QT_STYPE_CASE_(T_TUN);
266 
267         default:
268             stdata.step_type = StepType::T_INVALID;
269             break;
270     }
271 
272     fProtoImpl->enqStepTele(stdata);
273 }
274 #undef QT_STYPE_CASE_
275 
postImportTele(const ImportTeleStats & its)276 void QueryTeleClient::postImportTele(const ImportTeleStats& its)
277 {
278     if (!fProtoImpl) return;
279 
280     ImportTele itdata = its2it(its);
281     fProtoImpl->enqImportTele(itdata);
282 }
283 
waitForQueues()284 void QueryTeleClient::waitForQueues()
285 {
286     if (fProtoImpl)
287         fProtoImpl->waitForQueues();
288 }
289 
290 }
291 
292