1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 #include <queue>
19 #include <sstream>
20 #include <fstream>
21 using namespace std;
22 
23 #define BOOST_DISABLE_ASSERTS
24 #include <boost/thread.hpp>
25 #include <boost/shared_ptr.hpp>
26 
27 #include "thrift/transport/TSocket.h"
28 #include "thrift/transport/TBufferTransports.h"
29 namespace att = apache::thrift::transport;
30 
31 #include "thrift/protocol/TBinaryProtocol.h"
32 namespace atp = apache::thrift::protocol;
33 
34 #include "atomicops.h"
35 
36 #include "queryteleserverparms.h"
37 #include "querytele_types.h"
38 #include "QueryTeleService.h"
39 
40 #include "queryteleprotoimpl.h"
41 
42 namespace
43 {
44 const size_t MaxQueueElems = 1000;
45 
46 template <class T>
47 struct TsTeleQueue
48 {
49     typedef std::queue<T> TeleQueue;
50 
51     TeleQueue queue;
52     boost::mutex queueMtx;
53 };
54 
55 TsTeleQueue<querytele::StepTele> stQueue;
56 TsTeleQueue<querytele::QueryTele> qtQueue;
57 TsTeleQueue<querytele::ImportTele> itQueue;
58 
59 volatile bool isInited = false;
60 boost::mutex initMux;
61 
62 boost::shared_ptr<att::TSocket> fSocket;
63 boost::shared_ptr<att::TBufferedTransport> fTransport;
64 boost::shared_ptr<atp::TBinaryProtocol> fProtocol;
65 
66 querytele::StepTele gLastStep;
67 
68 struct QStats
69 {
70     int qtqueuedrops;
71     int stqueuedrops;
72     int stqueuedups;
73     int itqueuedrops;
QStats__anonadf30d040111::QStats74     QStats() :
75         qtqueuedrops(0),
76         stqueuedrops(0),
77         stqueuedups(0),
78         itqueuedrops(0) { ; }
79 };
80 
81 QStats fQStats;
82 
83 #ifdef QUERY_TELE_DEBUG
get_trace_file()84 string get_trace_file()
85 {
86     ostringstream oss;
87     pid_t pid = getpid();
88 #ifdef _MSC_VER
89     DWORD threadid = GetCurrentThreadId();
90 #else
91     pthread_t threadid = pthread_self();
92 #endif
93     oss << "/tmp/qt-consumer-" << pid << "-" << threadid;
94 
95     return oss.str();
96 }
97 
log_query(const querytele::QueryTele & qtdata)98 void log_query(const querytele::QueryTele& qtdata)
99 {
100     ofstream trace(get_trace_file().c_str(), ios::out | ios::app);
101     trace << "Query,"
102           <<  qtdata.query_uuid << ","
103           << ","; // skip step uuid
104 
105     if (qtdata.msg_type == querytele::QTType::QT_SUMMARY)
106         trace << "SUMMARY,";
107     else if (qtdata.msg_type == querytele::QTType::QT_START)
108         trace << "START,";
109     else
110         trace << "PROGRESS,";
111 
112     trace << ","; // sktp step type
113 
114     trace << qtdata.start_time << ",";
115     trace << qtdata.end_time << ",";
116 
117     trace << qtdata.cache_io << ",";
118     trace << qtdata.msg_rcv_cnt << ",";
119     trace << qtdata.rows << ",";
120     trace << qtdata.max_mem_pct << ",";
121 
122     trace << qtdata.query_type << ",";
123     trace << qtdata.schema_name << ",";
124     trace << qtdata.query << ",";
125     trace << qtdata.system_name;
126     trace << endl;
127     trace.close();
128 }
129 #endif
130 
131 #ifdef QUERY_TELE_DEBUG
st2str(enum querytele::StepType::type t)132 const string st2str(enum querytele::StepType::type t)
133 {
134     switch (t)
135     {
136         case querytele::StepType::T_HJS:
137             return "HJS";
138 
139         case querytele::StepType::T_DSS:
140             return "DSS";
141 
142         case querytele::StepType::T_CES:
143             return "CES";
144 
145         case querytele::StepType::T_SQS:
146             return "SQS";
147 
148         case querytele::StepType::T_TAS:
149             return "TAS";
150 
151         case querytele::StepType::T_TNS:
152             return "TNS";
153 
154         case querytele::StepType::T_BPS:
155             return "BPS";
156 
157         case querytele::StepType::T_TCS:
158             return "TCS";
159 
160         case querytele::StepType::T_HVS:
161             return "HVS";
162 
163         case querytele::StepType::T_WFS:
164             return "WFS";
165 
166         case querytele::StepType::T_SAS:
167             return "SAS";
168 
169         case querytele::StepType::T_TUN:
170             return "TUN";
171 
172         default:
173             return "INV";
174     }
175 
176     return "INV";
177 }
178 #endif
179 
180 #ifdef QUERY_TELE_DEBUG
log_step(const querytele::StepTele & stdata)181 void log_step(const querytele::StepTele& stdata)
182 {
183     ofstream trace(get_trace_file().c_str(), ios::out | ios::app);
184 
185     trace << "Step,"
186           << stdata.query_uuid << ","
187           << stdata.step_uuid << ",";
188 
189     if (stdata.msg_type == querytele::STType::ST_SUMMARY)
190         trace << "SUMMARY,";
191     else if (stdata.msg_type == querytele::STType::ST_START)
192         trace << "START,";
193     else
194         trace << "PROGRESS,";
195 
196     trace << st2str(stdata.step_type) << ",";
197 
198     trace << stdata.start_time << ",";
199     trace << stdata.end_time << ",";
200 
201     trace << stdata.cache_io << ",";
202     trace << stdata.msg_rcv_cnt << ",";
203     trace << stdata.rows << ",";
204 
205     if (stdata.total_units_of_work > 0)
206         trace << stdata.units_of_work_completed * 100 / stdata.total_units_of_work << ",";
207     else
208         trace << "-1,";
209 
210     trace << ",,,"; // skip qtype, schemo, etc.
211     trace << fQStats.stqueuedrops << "," << fQStats.stqueuedups << "," << stQueue.queue.size();
212     trace << endl;
213     trace.close();
214 }
215 #endif
216 
TeleConsumer()217 void TeleConsumer()
218 {
219     bool didSomeWork = false;
220     boost::mutex::scoped_lock itlk(itQueue.queueMtx, boost::defer_lock);
221     boost::mutex::scoped_lock qtlk(qtQueue.queueMtx, boost::defer_lock);
222     boost::mutex::scoped_lock stlk(stQueue.queueMtx, boost::defer_lock);
223     querytele::QueryTeleServiceClient client(fProtocol);
224 
225     try
226     {
227         for (;;)
228         {
229             didSomeWork = false;
230 
231             itlk.lock();
232 
233             // Empty the import queue first...
234             while (!itQueue.queue.empty())
235             {
236                 querytele::ImportTele itdata = itQueue.queue.front();
237                 itQueue.queue.pop();
238                 itlk.unlock();
239 
240                 try
241                 {
242                     fTransport->open();
243                     client.postImport(itdata);
244                     fTransport->close();
245                 }
246                 catch (...)
247                 {
248                     try
249                     {
250                         fTransport->close();
251                     }
252                     catch (...)
253                     {
254                     }
255                 }
256 
257                 didSomeWork = true;
258                 itlk.lock();
259             }
260 
261             itlk.unlock();
262 
263             qtlk.lock();
264 
265             // Now empty the query queue...
266             while (!qtQueue.queue.empty())
267             {
268                 querytele::QueryTele qtdata = qtQueue.queue.front();
269                 qtQueue.queue.pop();
270                 qtlk.unlock();
271 
272                 try
273                 {
274                     fTransport->open();
275 #ifdef QUERY_TELE_DEBUG
276                     log_query(qtdata);
277 #endif
278                     client.postQuery(qtdata);
279                     fTransport->close();
280                 }
281                 catch (...)
282                 {
283                     try
284                     {
285                         fTransport->close();
286                     }
287                     catch (...)
288                     {
289                     }
290                 }
291 
292                 didSomeWork = true;
293                 qtlk.lock();
294             }
295 
296             qtlk.unlock();
297 
298             stlk.lock();
299 
300             // Finally empty the step queue...
301             // @bug6088 - Added check for query queue and import queue in while statment below to
302             //            keep the step logs from starving the query and import logs.
303             while (!stQueue.queue.empty() && qtQueue.queue.empty() && itQueue.queue.empty())
304             {
305                 querytele::StepTele stdata = stQueue.queue.front();
306                 stQueue.queue.pop();
307                 stlk.unlock();
308 
309                 try
310                 {
311                     fTransport->open();
312 #ifdef QUERY_TELE_DEBUG
313                     log_step(stdata);
314 #endif
315                     client.postStep(stdata);
316                     fTransport->close();
317                 }
318                 catch (...)
319                 {
320                     try
321                     {
322                         fTransport->close();
323                     }
324                     catch (...)
325                     {
326                     }
327                 }
328 
329                 didSomeWork = true;
330                 stlk.lock();
331             }
332 
333             stlk.unlock();
334 
335             if (!didSomeWork)
336             {
337                 usleep(50000);
338             }
339         }
340     }
341     catch (...)
342     {
343         //we're probably shutting down, just let this thread die quietly...
344     }
345 }
346 
347 boost::thread* consThd;
348 
349 }
350 
351 namespace querytele
352 {
353 
QueryTeleProtoImpl(const QueryTeleServerParms & sp)354 QueryTeleProtoImpl::QueryTeleProtoImpl(const QueryTeleServerParms& sp) :
355     fServerParms(sp)
356 {
357     if (fServerParms.host.empty() || fServerParms.port == 0) return;
358 
359     boost::mutex::scoped_lock lk(initMux);
360 
361     atomicops::atomicMb();
362 
363     if (isInited) return;
364 
365     fSocket.reset(new att::TSocket(fServerParms.host, fServerParms.port));
366     fTransport.reset(new att::TBufferedTransport(fSocket));
367     fProtocol.reset(new atp::TBinaryProtocol(fTransport));
368 
369     consThd = new boost::thread(&TeleConsumer);
370 
371     atomicops::atomicMb();
372     isInited = true;
373 }
374 
enqStepTele(const StepTele & stdata)375 int QueryTeleProtoImpl::enqStepTele(const StepTele& stdata)
376 {
377     try
378     {
379         boost::mutex::scoped_lock lk(stQueue.queueMtx);
380 
381         // @bug6088 - Added conditions below to always log progress SUMMARY and START messages to avoid completed queries showing up with progress 0
382         //            and no steps.
383         if (stQueue.queue.size() >= MaxQueueElems && stdata.msg_type != querytele::STType::ST_SUMMARY && stdata.msg_type != querytele::STType::ST_START)
384         {
385             fQStats.stqueuedrops++;
386             return -1;
387         }
388 
389         if ( stdata.step_uuid != gLastStep.step_uuid ||
390                 stdata.msg_type != gLastStep.msg_type ||
391                 stdata.step_type != gLastStep.step_type ||
392                 stdata.total_units_of_work != gLastStep.total_units_of_work ||
393                 stdata.units_of_work_completed != gLastStep.units_of_work_completed )
394         {
395             stQueue.queue.push(stdata);
396             gLastStep = stdata;
397         }
398         else
399         {
400             fQStats.stqueuedups++;
401         }
402     }
403     catch (...)
404     {
405         return -2;
406     }
407 
408     return 0;
409 }
410 
enqQueryTele(const QueryTele & qtdata)411 int QueryTeleProtoImpl::enqQueryTele(const QueryTele& qtdata)
412 {
413     try
414     {
415         boost::mutex::scoped_lock lk(qtQueue.queueMtx);
416 
417         if (qtQueue.queue.size() >= MaxQueueElems)
418         {
419             fQStats.qtqueuedrops++;
420             return -1;
421         }
422 
423         qtQueue.queue.push(qtdata);
424     }
425     catch (...)
426     {
427         return -2;
428     }
429 
430     return 0;
431 }
432 
enqImportTele(const ImportTele & itdata)433 int QueryTeleProtoImpl::enqImportTele(const ImportTele& itdata)
434 {
435     try
436     {
437         boost::mutex::scoped_lock lk(itQueue.queueMtx);
438 
439         if (itQueue.queue.size() >= MaxQueueElems)
440         {
441             fQStats.itqueuedrops++;
442             return -1;
443         }
444 
445         itQueue.queue.push(itdata);
446     }
447     catch (...)
448     {
449         return -2;
450     }
451 
452     return 0;
453 }
454 
waitForQueues()455 int QueryTeleProtoImpl::waitForQueues()
456 {
457     try
458     {
459         boost::mutex::scoped_lock lk(itQueue.queueMtx);
460 
461         while (!itQueue.queue.empty())
462         {
463             lk.unlock();
464             usleep(100000);
465             lk.lock();
466         }
467     }
468     catch (...)
469     {
470         return -1;
471     }
472 
473     return 0;
474 }
475 
476 } //namespace querytele
477 
478