1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 #include <unistd.h>
19 #include <stdint.h>
20 #include <string>
21 using namespace std;
22
23 #include <boost/uuid/uuid.hpp>
24 #include <boost/uuid/uuid_io.hpp>
25 namespace bu = boost::uuids;
26
27 #include "queryteleserverparms.h"
28 #include "querytele_types.h"
29 #include "QueryTeleService.h"
30 #include "queryteleprotoimpl.h"
31 #include "telestats.h"
32
33 #include "queryteleclient.h"
34
35 namespace
36 {
37 using namespace querytele;
38
39 #define QT_ASSIGN_(x) out. __set_ ## x (qts. x)
qts2qt(const QueryTeleStats & qts)40 QueryTele qts2qt(const QueryTeleStats& qts)
41 {
42 QueryTele out;
43
44 out.query_uuid = bu::to_string(qts.query_uuid);
45
46 switch (qts.msg_type)
47 {
48 case QueryTeleStats::QT_SUMMARY:
49 out.msg_type = QTType::QT_SUMMARY;
50 break;
51
52 case QueryTeleStats::QT_PROGRESS:
53 out.msg_type = QTType::QT_PROGRESS;
54 break;
55
56 case QueryTeleStats::QT_START:
57 out.msg_type = QTType::QT_START;
58 break;
59
60 default:
61 out.msg_type = QTType::QT_INVALID;
62 break;
63 }
64
65 QT_ASSIGN_(max_mem_pct);
66 QT_ASSIGN_(num_files);
67 QT_ASSIGN_(phy_io);
68 QT_ASSIGN_(cache_io);
69 QT_ASSIGN_(msg_rcv_cnt);
70 QT_ASSIGN_(cp_blocks_skipped);
71 QT_ASSIGN_(msg_bytes_in);
72 QT_ASSIGN_(msg_bytes_out);
73 QT_ASSIGN_(rows);
74 QT_ASSIGN_(start_time);
75 QT_ASSIGN_(end_time);
76 QT_ASSIGN_(error_no);
77 QT_ASSIGN_(blocks_changed);
78 QT_ASSIGN_(session_id);
79 QT_ASSIGN_(query_type);
80 QT_ASSIGN_(query);
81 QT_ASSIGN_(user);
82 QT_ASSIGN_(host);
83 QT_ASSIGN_(priority);
84 QT_ASSIGN_(priority_level);
85 QT_ASSIGN_(system_name);
86 QT_ASSIGN_(module_name);
87 QT_ASSIGN_(local_query);
88 QT_ASSIGN_(schema_name);
89
90 return out;
91 }
92 #undef QT_ASSIGN_
93
94 #define QT_ASSIGN_(x) out. __set_ ## x (sts. x)
sts2st(const StepTeleStats & sts)95 StepTele sts2st(const StepTeleStats& sts)
96 {
97 StepTele out;
98
99 out.query_uuid = bu::to_string(sts.query_uuid);
100
101 switch (sts.msg_type)
102 {
103 case StepTeleStats::ST_SUMMARY:
104 out.msg_type = STType::ST_SUMMARY;
105 break;
106
107 case StepTeleStats::ST_PROGRESS:
108 out.msg_type = STType::ST_PROGRESS;
109 break;
110
111 case StepTeleStats::ST_START:
112 out.msg_type = STType::ST_START;
113 break;
114
115 default:
116 out.msg_type = STType::ST_INVALID;
117 break;
118 }
119
120 out.step_uuid = bu::to_string(sts.step_uuid);
121 QT_ASSIGN_(phy_io);
122 QT_ASSIGN_(cache_io);
123 QT_ASSIGN_(msg_rcv_cnt);
124 QT_ASSIGN_(cp_blocks_skipped);
125 QT_ASSIGN_(msg_bytes_in);
126 QT_ASSIGN_(msg_bytes_out);
127 QT_ASSIGN_(rows);
128 QT_ASSIGN_(start_time);
129 QT_ASSIGN_(end_time);
130 QT_ASSIGN_(total_units_of_work);
131 QT_ASSIGN_(units_of_work_completed);
132
133 return out;
134 }
135 #undef QT_ASSIGN_
136
137 #define QT_ASSIGN_(x) out. __set_ ## x (its. x)
its2it(const ImportTeleStats & its)138 ImportTele its2it(const ImportTeleStats& its)
139 {
140 ImportTele out;
141
142 out.job_uuid = bu::to_string(its.job_uuid);
143 out.import_uuid = bu::to_string(its.import_uuid);
144
145 switch (its.msg_type)
146 {
147 case ImportTeleStats::IT_SUMMARY:
148 out.msg_type = ITType::IT_SUMMARY;
149 break;
150
151 case ImportTeleStats::IT_PROGRESS:
152 out.msg_type = ITType::IT_PROGRESS;
153 break;
154
155 case ImportTeleStats::IT_START:
156 out.msg_type = ITType::IT_START;
157 break;
158
159 case ImportTeleStats::IT_TERM:
160 out.msg_type = ITType::IT_TERM;
161 break;
162
163 default:
164 out.msg_type = ITType::IT_INVALID;
165 break;
166 }
167
168 QT_ASSIGN_(start_time);
169 QT_ASSIGN_(end_time);
170 QT_ASSIGN_(table_list);
171 QT_ASSIGN_(rows_so_far);
172 QT_ASSIGN_(system_name);
173 QT_ASSIGN_(module_name);
174 QT_ASSIGN_(schema_name);
175
176 return out;
177 }
178 #undef QT_ASSIGN_
179
180 }
181
182 namespace querytele
183 {
184
QueryTeleClient(const QueryTeleServerParms & sp)185 QueryTeleClient::QueryTeleClient(const QueryTeleServerParms& sp) :
186 fProtoImpl(0),
187 fServerParms(sp)
188 {
189 if (fServerParms.host.empty() || fServerParms.port == 0) return;
190
191 fProtoImpl = new QueryTeleProtoImpl(fServerParms);
192 }
193
~QueryTeleClient()194 QueryTeleClient::~QueryTeleClient()
195 {
196 delete fProtoImpl;
197 }
198
QueryTeleClient(const QueryTeleClient & rhs)199 QueryTeleClient::QueryTeleClient(const QueryTeleClient& rhs) :
200 fProtoImpl(0)
201 {
202 fServerParms = rhs.fServerParms;
203
204 if (rhs.fProtoImpl)
205 {
206 fProtoImpl = new QueryTeleProtoImpl(*rhs.fProtoImpl);
207 }
208 }
209
operator =(const QueryTeleClient & rhs)210 QueryTeleClient& QueryTeleClient::operator=(const QueryTeleClient& rhs)
211 {
212 if (&rhs != this)
213 {
214 fProtoImpl = 0;
215 fServerParms = rhs.fServerParms;
216
217 if (rhs.fProtoImpl)
218 {
219 fProtoImpl = new QueryTeleProtoImpl(*rhs.fProtoImpl);
220 }
221 }
222
223 return *this;
224 }
225
serverParms(const QueryTeleServerParms & sp)226 void QueryTeleClient::serverParms(const QueryTeleServerParms& sp)
227 {
228 fServerParms = sp;
229 delete fProtoImpl;
230 fProtoImpl = 0;
231
232 if (fServerParms.host.empty() || fServerParms.port == 0) return;
233
234 fProtoImpl = new QueryTeleProtoImpl(fServerParms);
235 }
236
postQueryTele(const QueryTeleStats & qts)237 void QueryTeleClient::postQueryTele(const QueryTeleStats& qts)
238 {
239 if (!fProtoImpl) return;
240
241 QueryTele qtdata = qts2qt(qts);
242 fProtoImpl->enqQueryTele(qtdata);
243 }
244
245 #define QT_STYPE_CASE_(x) case StepTeleStats:: x: {stdata.step_type = StepType:: x; break;}
postStepTele(const StepTeleStats & sts)246 void QueryTeleClient::postStepTele(const StepTeleStats& sts)
247 {
248 if (!fProtoImpl) return;
249
250 StepTele stdata = sts2st(sts);
251
252 switch (fStepParms.stepType)
253 {
254 QT_STYPE_CASE_(T_HJS);
255 QT_STYPE_CASE_(T_DSS);
256 QT_STYPE_CASE_(T_CES);
257 QT_STYPE_CASE_(T_SQS);
258 QT_STYPE_CASE_(T_TAS);
259 QT_STYPE_CASE_(T_TNS);
260 QT_STYPE_CASE_(T_BPS);
261 QT_STYPE_CASE_(T_TCS);
262 QT_STYPE_CASE_(T_HVS);
263 QT_STYPE_CASE_(T_WFS);
264 QT_STYPE_CASE_(T_SAS);
265 QT_STYPE_CASE_(T_TUN);
266
267 default:
268 stdata.step_type = StepType::T_INVALID;
269 break;
270 }
271
272 fProtoImpl->enqStepTele(stdata);
273 }
274 #undef QT_STYPE_CASE_
275
postImportTele(const ImportTeleStats & its)276 void QueryTeleClient::postImportTele(const ImportTeleStats& its)
277 {
278 if (!fProtoImpl) return;
279
280 ImportTele itdata = its2it(its);
281 fProtoImpl->enqImportTele(itdata);
282 }
283
waitForQueues()284 void QueryTeleClient::waitForQueues()
285 {
286 if (fProtoImpl)
287 fProtoImpl->waitForQueues();
288 }
289
290 }
291
292