1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2019 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 #include <my_config.h>
20 #ifndef _MSC_VER
21 #include <unistd.h>
22 #endif
23 #include <string>
24 #include <iostream>
25 #include <stack>
26 #ifdef _MSC_VER
27 #include <unordered_map>
28 #include <unordered_set>
29 #include <stdio.h>
30 #else
31 #include <tr1/unordered_map>
32 #include <tr1/unordered_set>
33 #endif
34 #include <fstream>
35 #include <sstream>
36 #include <cerrno>
37 #include <cstring>
38 #include <time.h>
39 #include <cassert>
40 #include <vector>
41 #include <map>
42 #include <limits>
43 #if defined(__linux__)
44 #include <wait.h> //wait()
45 #elif defined(__FreeBSD__)
46 #include <sys/types.h>
47 #include <sys/stat.h> // For stat().
48 #include <sys/wait.h>
49 #include <sys/time.h>
50 #include <sys/resource.h>
51 #endif
52 using namespace std;
53
54 #include <boost/shared_ptr.hpp>
55 #include <boost/algorithm/string/case_conv.hpp>
56 #include <boost/regex.hpp>
57 #include <boost/thread.hpp>
58
59 #include "idb_mysql.h"
60
61 #define NEED_CALPONT_INTERFACE
62 #include "ha_mcs_impl.h"
63
64 #include "ha_mcs_impl_if.h"
65 using namespace cal_impl_if;
66
67 #include "calpontselectexecutionplan.h"
68 #include "logicoperator.h"
69 #include "parsetree.h"
70 using namespace execplan;
71
72 #include "dataconvert.h"
73 using namespace dataconvert;
74
75 #include "sm.h"
76 #include "ha_mcs_pushdown.h"
77
78 #include "bytestream.h"
79 #include "messagequeue.h"
80 using namespace messageqcpp;
81
82 #include "dmlpackage.h"
83 #include "calpontdmlpackage.h"
84 #include "insertdmlpackage.h"
85 #include "vendordmlstatement.h"
86 #include "calpontdmlfactory.h"
87 using namespace dmlpackage;
88
89 #include "dmlpackageprocessor.h"
90 using namespace dmlpackageprocessor;
91
92 #include "configcpp.h"
93 using namespace config;
94
95 #include "rowgroup.h"
96 using namespace rowgroup;
97
98 #include "brmtypes.h"
99 using namespace BRM;
100
101 #include "querystats.h"
102 using namespace querystats;
103
104 #include "calpontselectexecutionplan.h"
105 #include "calpontsystemcatalog.h"
106 #include "simplecolumn_int.h"
107 #include "simplecolumn_decimal.h"
108 #include "aggregatecolumn.h"
109 #include "constantcolumn.h"
110 #include "simplefilter.h"
111 #include "constantfilter.h"
112 #include "functioncolumn.h"
113 #include "arithmeticcolumn.h"
114 #include "arithmeticoperator.h"
115 #include "logicoperator.h"
116 #include "predicateoperator.h"
117 #include "rowcolumn.h"
118 #include "selectfilter.h"
119 using namespace execplan;
120
121 #include "joblisttypes.h"
122 using namespace joblist;
123
124 #include "cacheutils.h"
125
126 #include "errorcodes.h"
127 #include "idberrorinfo.h"
128 #include "errorids.h"
129 using namespace logging;
130
131 #include "resourcemanager.h"
132
133 #include "funcexp.h"
134 #include "functor.h"
135 using namespace funcexp;
136
137 #include "installdir.h"
138 #include "columnstoreversion.h"
139 #include "ha_mcs_sysvars.h"
140
141 namespace cal_impl_if
142 {
143 extern bool nonConstFunc(Item_func* ifp);
144 }
145
146 namespace
147 {
148 // Calpont vtable non-support error message
149 const string infinidb_autoswitch_warning = "The query includes syntax that is not supported by MariaDB Columnstore distributed mode. The execution was switched to standard mode with downgraded performance.";
150
151 // copied from item_timefunc.cc
152 static const string interval_names[] =
153 {
154 "year", "quarter", "month", "week", "day",
155 "hour", "minute", "second", "microsecond",
156 "year_month", "day_hour", "day_minute",
157 "day_second", "hour_minute", "hour_second",
158 "minute_second", "day_microsecond",
159 "hour_microsecond", "minute_microsecond",
160 "second_microsecond"
161 };
162
163 const unsigned NONSUPPORTED_ERR_THRESH = 2000;
164
165 // HDFS is never used nowadays, so don't bother
166 bool useHdfs = false; // ResourceManager::instance()->useHdfs();
167
168 //convenience fcn
tid2sid(const uint32_t tid)169 inline uint32_t tid2sid(const uint32_t tid)
170 {
171 return CalpontSystemCatalog::idb_tid2sid(tid);
172 }
173
174
175 /**
176 @brief
177 Wrapper around logging facility.
178
179 @details
180 Reduces the boiler plate code.
181
182 Called from number of places(mostly DML) in
183 ha_mcs_impl.cpp().
184 */
log_this(THD * thd,const char * message,logging::LOG_TYPE log_type,unsigned sid)185 void log_this(THD *thd, const char *message,
186 logging::LOG_TYPE log_type, unsigned sid)
187 {
188 // corresponds with dbcon in SubsystemID vector
189 // in messagelog.cpp
190 unsigned int subSystemId = 24;
191 logging::LoggingID logid( subSystemId, sid, 0);
192 logging::Message::Args args1;
193 logging::Message msg(1);
194 args1.add(message);
195 msg.format( args1 );
196 Logger logger(logid.fSubsysID);
197 logger.logMessage(log_type, msg, logid);
198 }
199
200 /**
201 @brief
202 Forcely close a FEP connection.
203
204 @details
205 Plugin code opens network connection with ExMgr to
206 get:
207 the result of meta-data queries
208 the result of DML or DQL query in any mode
209 statistics
210 This code allows to explicitly close the connection
211 if any error happens using a non-existing protocol
212 code 0. This causes ExeMgr loop to drop the
213 connection.
214
215 Called from many places in ha_mcs_impl.cpp().
216 */
force_close_fep_conn(THD * thd,cal_connection_info * ci,bool check_prev_rc=false)217 void force_close_fep_conn(THD *thd, cal_connection_info* ci, bool check_prev_rc = false)
218 {
219 if (!ci->cal_conn_hndl)
220 {
221 return;
222 }
223
224 if(check_prev_rc && !ci->rc)
225 {
226 return;
227 }
228
229 // send ExeMgr an unknown signal to force him to close
230 // the connection
231 ByteStream msg;
232 ByteStream::quadbyte qb = 0;
233 msg << qb;
234
235 try
236 {
237 ci->cal_conn_hndl->exeMgr->write(msg);
238 }
239 catch (...)
240 {
241 // Add details into the message.
242 log_this(thd, "Exception in force_close_fep_conn().",
243 logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
244 }
245
246 sm::sm_cleanup(ci->cal_conn_hndl);
247 ci->cal_conn_hndl = 0;
248 }
249
storeNumericField(Field ** f,int64_t value,CalpontSystemCatalog::ColType & ct)250 void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct)
251 {
252 // unset null bit first
253 if ((*f)->null_ptr)
254 *(*f)->null_ptr &= ~(*f)->null_bit;
255
256 // For unsigned, use the ColType returned in the row rather than the
257 // unsigned_flag set by mysql. This is because mysql gets it wrong for SUM()
258 // Hopefully, in all other cases we get it right.
259 switch ((*f)->type())
260 {
261 case MYSQL_TYPE_NEWDECIMAL:
262 {
263 // @bug4388 stick to InfiniDB's scale in case mysql gives wrong scale due
264 // to create vtable limitation.
265 //if (f2->dec < ct.scale)
266 // f2->dec = ct.scale;
267
268 char buf[256];
269 dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, buf, 256, ct.colDataType);
270 (*f)->store(buf, strlen(buf), (*f)->charset());
271 break;
272 }
273
274 case MYSQL_TYPE_TINY: //TINYINT type
275 {
276 Field_tiny* f2 = (Field_tiny*)*f;
277 longlong int_val = (longlong)value;
278 (*f)->store(int_val, f2->unsigned_flag);
279 break;
280 }
281
282 case MYSQL_TYPE_SHORT: //SMALLINT type
283 {
284 Field_short* f2 = (Field_short*)*f;
285 longlong int_val = (longlong)value;
286 (*f)->store(int_val, f2->unsigned_flag);
287 break;
288 }
289
290 case MYSQL_TYPE_INT24: //MEDINT type
291 {
292 Field_medium* f2 = (Field_medium*)*f;
293 longlong int_val = (longlong)value;
294 (*f)->store(int_val, f2->unsigned_flag);
295 break;
296 }
297
298 case MYSQL_TYPE_LONG: //INT type
299 {
300 Field_long* f2 = (Field_long*)*f;
301 longlong int_val = (longlong)value;
302 (*f)->store(int_val, f2->unsigned_flag);
303 break;
304 }
305
306 case MYSQL_TYPE_LONGLONG: //BIGINT type
307 {
308 Field_longlong* f2 = (Field_longlong*)*f;
309 longlong int_val = (longlong)value;
310 (*f)->store(int_val, f2->unsigned_flag);
311 break;
312 }
313
314 case MYSQL_TYPE_FLOAT: // FLOAT type
315 {
316 float float_val = *(float*)(&value);
317 (*f)->store(float_val);
318 break;
319 }
320
321 case MYSQL_TYPE_DOUBLE: // DOUBLE type
322 {
323 double double_val = *(double*)(&value);
324 (*f)->store(double_val);
325 break;
326 }
327
328 case MYSQL_TYPE_VARCHAR:
329 {
330 char tmp[25];
331 if (ct.colDataType == CalpontSystemCatalog::DECIMAL)
332 dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, tmp, 25, ct.colDataType);
333 else
334 snprintf(tmp, 25, "%lld", (long long)value);
335
336 (*f)->store(tmp, strlen(tmp), (*f)->charset());
337 break;
338 }
339
340 default:
341 {
342 Field_longlong* f2 = (Field_longlong*)*f;
343 longlong int_val = (longlong)value;
344 (*f)->store(int_val, f2->unsigned_flag);
345 break;
346 }
347 }
348 }
349
350 //
351 // @bug 2244. Log exception related to lost connection to ExeMgr.
352 // Log exception error from calls to sm::tpl_scan_fetch in fetchNextRow()
353 //
tpl_scan_fetch_LogException(cal_table_info & ti,cal_connection_info * ci,std::exception * ex)354 void tpl_scan_fetch_LogException( cal_table_info& ti, cal_connection_info* ci, std::exception* ex)
355 {
356 time_t t = time(0);
357 char datestr[50];
358 ctime_r(&t, datestr);
359 datestr[ strlen(datestr) - 1 ] = '\0'; // strip off trailing newline
360
361 uint32_t sesID = 0;
362 string connHndl("No connection handle to use");
363
364 if (ti.conn_hndl)
365 {
366 connHndl = "ti connection used";
367 sesID = ti.conn_hndl->sessionID;
368 }
369
370 else if (ci->cal_conn_hndl)
371 {
372 connHndl = "ci connection used";
373 sesID = ci->cal_conn_hndl->sessionID;
374 }
375
376 int64_t rowsRet = -1;
377
378 if (ti.tpl_scan_ctx)
379 rowsRet = ti.tpl_scan_ctx->rowsreturned;
380
381 if (ex)
382 cerr << datestr << ": sm::tpl_scan_fetch error getting rows for sessionID: " <<
383 sesID << "; " << connHndl << "; rowsReturned: " << rowsRet <<
384 "; reason-" << ex->what() << endl;
385 else
386 cerr << datestr << ": sm::tpl_scan_fetch unknown error getting rows for sessionID: " <<
387 sesID << "; " << connHndl << "; rowsReturned: " << rowsRet << endl;
388 }
389
390 const char hexdig[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', };
391
vbin2hex(const uint8_t * p,const unsigned l,char * o)392 int vbin2hex(const uint8_t* p, const unsigned l, char* o)
393 {
394 for (unsigned i = 0; i < l; i++, p++)
395 {
396 *o++ = hexdig[*p >> 4];
397 *o++ = hexdig[*p & 0xf];
398 }
399
400 return 0;
401 }
402
403 // Table Map is used by both cond_push and table mode processing
404 // Entries made by cond_push don't have csep though.
405 // When
onlyOneTableinTM(cal_impl_if::cal_connection_info * ci)406 bool onlyOneTableinTM(cal_impl_if::cal_connection_info* ci)
407 {
408 size_t counter = 0;
409 for (auto &tableMapEntry: ci->tableMap)
410 {
411 if (tableMapEntry.second.csep)
412 counter++;
413 if (counter >= 1)
414 return false;
415 }
416
417 return true;
418 }
419
fetchNextRow(uchar * buf,cal_table_info & ti,cal_connection_info * ci,bool handler_flag=false)420 int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool handler_flag = false)
421 {
422 int rc = HA_ERR_END_OF_FILE;
423 int num_attr = ti.msTablePtr->s->fields;
424 sm::status_t sm_stat;
425
426 try
427 {
428 if (ti.conn_hndl)
429 {
430 sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ti.conn_hndl);
431 }
432 else if (ci->cal_conn_hndl)
433 {
434 sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ci->cal_conn_hndl, (int*)(¤t_thd->killed));
435 }
436 else
437 throw runtime_error("internal error");
438 }
439 catch (std::exception& ex)
440 {
441 // @bug 2244. Always log this msg for now, as we try to track down when/why we are
442 // losing socket connection with ExeMgr
443 //#ifdef INFINIDB_DEBUG
444 tpl_scan_fetch_LogException( ti, ci, &ex);
445 //#endif
446 sm_stat = sm::CALPONT_INTERNAL_ERROR;
447 }
448 catch (...)
449 {
450 // @bug 2244. Always log this msg for now, as we try to track down when/why we are
451 // losing socket connection with ExeMgr
452 //#ifdef INFINIDB_DEBUG
453 tpl_scan_fetch_LogException( ti, ci, 0 );
454 //#endif
455 sm_stat = sm::CALPONT_INTERNAL_ERROR;
456 }
457
458 if (sm_stat == sm::STATUS_OK)
459 {
460 Field** f;
461 f = ti.msTablePtr->field;
462
463 //set all fields to null in null col bitmap
464 if (!handler_flag)
465 memset(buf, -1, ti.msTablePtr->s->null_bytes);
466 else
467 {
468 memset(ti.msTablePtr->null_flags, -1, ti.msTablePtr->s->null_bytes);
469 }
470
471 std::vector<CalpontSystemCatalog::ColType>& colTypes = ti.tpl_scan_ctx->ctp;
472 int64_t intColVal = 0;
473 uint64_t uintColVal = 0;
474 char tmp[256];
475
476 RowGroup* rowGroup = ti.tpl_scan_ctx->rowGroup;
477
478 // table mode mysql expects all columns of the table. mapping between columnoid and position in rowgroup
479 // set coltype.position to be the position in rowgroup. only set once.
480 if (ti.tpl_scan_ctx->rowsreturned == 0 &&
481 (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF))
482 {
483 for (uint32_t i = 0; i < rowGroup->getColumnCount(); i++)
484 {
485 int oid = rowGroup->getOIDs()[i];
486 int j = 0;
487
488 for (; j < num_attr; j++)
489 {
490 // mysql should haved eliminated duplicate projection columns
491 if (oid == colTypes[j].columnOID || oid == colTypes[j].ddn.dictOID)
492 {
493 colTypes[j].colPosition = i;
494 break;
495 }
496 }
497 }
498 }
499
500 rowgroup::Row row;
501 rowGroup->initRow(&row);
502 rowGroup->getRow(ti.tpl_scan_ctx->rowsreturned, &row);
503 int s;
504
505 for (int p = 0; p < num_attr; p++, f++)
506 {
507 //This col is going to be written
508 bitmap_set_bit(ti.msTablePtr->write_set, (*f)->field_index);
509
510 // get coltype if not there yet
511 if (colTypes[0].colWidth == 0)
512 {
513 for (short c = 0; c < num_attr; c++)
514 {
515 colTypes[c].colPosition = c;
516 colTypes[c].colWidth = rowGroup->getColumnWidth(c);
517 colTypes[c].colDataType = rowGroup->getColTypes()[c];
518 colTypes[c].columnOID = rowGroup->getOIDs()[c];
519 colTypes[c].scale = rowGroup->getScale()[c];
520 colTypes[c].precision = rowGroup->getPrecision()[c];
521 }
522 }
523
524 CalpontSystemCatalog::ColType colType(colTypes[p]);
525
526 // table mode handling
527 if (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)
528 {
529 if (colType.colPosition == -1) // not projected by tuplejoblist
530 continue;
531 else
532 s = colType.colPosition;
533 }
534 else
535 {
536 s = p;
537 }
538
539 // precision == -16 is borrowed as skip null check indicator for bit ops.
540 if (row.isNullValue(s) && colType.precision != -16)
541 {
542 // @2835. Handle empty string and null confusion. store empty string for string column
543 if (colType.colDataType == CalpontSystemCatalog::CHAR ||
544 colType.colDataType == CalpontSystemCatalog::VARCHAR ||
545 colType.colDataType == CalpontSystemCatalog::VARBINARY)
546 {
547 (*f)->store(tmp, 0, (*f)->charset());
548 }
549
550 continue;
551 }
552
553 // fetch and store data
554 switch (colType.colDataType)
555 {
556 case CalpontSystemCatalog::DATE:
557 {
558 if ((*f)->null_ptr)
559 *(*f)->null_ptr &= ~(*f)->null_bit;
560
561 intColVal = row.getUintField<4>(s);
562 DataConvert::dateToString(intColVal, tmp, 255);
563 (*f)->store(tmp, strlen(tmp), (*f)->charset());
564 break;
565 }
566
567 case CalpontSystemCatalog::DATETIME:
568 {
569 if ((*f)->null_ptr)
570 *(*f)->null_ptr &= ~(*f)->null_bit;
571
572 intColVal = row.getUintField<8>(s);
573 DataConvert::datetimeToString(intColVal, tmp, 255, colType.precision);
574 (*f)->store(tmp, strlen(tmp), (*f)->charset());
575 break;
576 }
577
578 case CalpontSystemCatalog::TIME:
579 {
580 if ((*f)->null_ptr)
581 *(*f)->null_ptr &= ~(*f)->null_bit;
582
583 intColVal = row.getUintField<8>(s);
584 DataConvert::timeToString(intColVal, tmp, 255, colType.precision);
585 (*f)->store(tmp, strlen(tmp), (*f)->charset());
586 break;
587 }
588
589 case CalpontSystemCatalog::TIMESTAMP:
590 {
591 if ((*f)->null_ptr)
592 *(*f)->null_ptr &= ~(*f)->null_bit;
593
594 intColVal = row.getUintField<8>(s);
595 DataConvert::timestampToString(intColVal, tmp, 255, current_thd->variables.time_zone->get_name()->ptr(), colType.precision);
596 (*f)->store(tmp, strlen(tmp), (*f)->charset());
597 break;
598 }
599
600 case CalpontSystemCatalog::CHAR:
601 case CalpontSystemCatalog::VARCHAR:
602 {
603 switch (colType.colWidth)
604 {
605 case 1:
606 intColVal = row.getUintField<1>(s);
607 (*f)->store((char*)(&intColVal), strlen((char*)(&intColVal)), (*f)->charset());
608 break;
609
610 case 2:
611 intColVal = row.getUintField<2>(s);
612 (*f)->store((char*)(&intColVal), strlen((char*)(&intColVal)), (*f)->charset());
613 break;
614
615 case 4:
616 intColVal = row.getUintField<4>(s);
617 (*f)->store((char*)(&intColVal), strlen((char*)(&intColVal)), (*f)->charset());
618 break;
619
620 case 8:
621 //make sure we don't send strlen off into the weeds...
622 intColVal = row.getUintField<8>(s);
623 memcpy(tmp, &intColVal, 8);
624 tmp[8] = 0;
625 (*f)->store(tmp, strlen(tmp), (*f)->charset());
626 break;
627
628 default:
629 (*f)->store((const char*)row.getStringPointer(s), row.getStringLength(s), (*f)->charset());
630 }
631
632 if ((*f)->null_ptr)
633 *(*f)->null_ptr &= ~(*f)->null_bit;
634
635 break;
636 }
637
638 case CalpontSystemCatalog::VARBINARY:
639 {
640 if (get_varbin_always_hex(current_thd))
641 {
642 uint32_t l;
643 const uint8_t* p = row.getVarBinaryField(l, s);
644 uint32_t ll = l * 2;
645 boost::scoped_array<char> sca(new char[ll]);
646 vbin2hex(p, l, sca.get());
647 (*f)->store(sca.get(), ll, (*f)->charset());
648 }
649 else
650 (*f)->store((const char*)row.getVarBinaryField(s), row.getVarBinaryLength(s), (*f)->charset());
651
652 if ((*f)->null_ptr)
653 *(*f)->null_ptr &= ~(*f)->null_bit;
654
655 break;
656 }
657
658 case CalpontSystemCatalog::BIGINT:
659 {
660 intColVal = row.getIntField<8>(s);
661 storeNumericField(f, intColVal, colType);
662 break;
663 }
664
665 case CalpontSystemCatalog::UBIGINT:
666 {
667 uintColVal = row.getUintField<8>(s);
668 storeNumericField(f, uintColVal, colType);
669 break;
670 }
671
672 case CalpontSystemCatalog::INT:
673 case CalpontSystemCatalog::MEDINT:
674 {
675 intColVal = row.getIntField<4>(s);
676 storeNumericField(f, intColVal, colType);
677 break;
678 }
679
680 case CalpontSystemCatalog::UINT:
681 case CalpontSystemCatalog::UMEDINT:
682 {
683 uintColVal = row.getUintField<4>(s);
684 storeNumericField(f, uintColVal, colType);
685 break;
686 }
687
688 case CalpontSystemCatalog::SMALLINT:
689 {
690 intColVal = row.getIntField<2>(s);
691 storeNumericField(f, intColVal, colType);
692 break;
693 }
694
695 case CalpontSystemCatalog::USMALLINT:
696 {
697 uintColVal = row.getUintField<2>(s);
698 storeNumericField(f, uintColVal, colType);
699 break;
700 }
701
702 case CalpontSystemCatalog::TINYINT:
703 {
704 intColVal = row.getIntField<1>(s);
705 storeNumericField(f, intColVal, colType);
706 break;
707 }
708
709 case CalpontSystemCatalog::UTINYINT:
710 {
711 uintColVal = row.getUintField<1>(s);
712 storeNumericField(f, uintColVal, colType);
713 break;
714 }
715
716 //In this case, we're trying to load a double output column with float data. This is the
717 // case when you do sum(floatcol), e.g.
718 case CalpontSystemCatalog::FLOAT:
719 case CalpontSystemCatalog::UFLOAT:
720 {
721 float dl = row.getFloatField(s);
722
723 if (dl == std::numeric_limits<float>::infinity())
724 continue;
725
726 // bug 3485, reserve enough space for the longest float value
727 // -3.402823466E+38 to -1.175494351E-38, 0, and
728 // 1.175494351E-38 to 3.402823466E+38.
729 (*f)->field_length = 40;
730 (*f)->store(dl);
731
732 if ((*f)->null_ptr)
733 *(*f)->null_ptr &= ~(*f)->null_bit;
734
735 break;
736 }
737
738 case CalpontSystemCatalog::DOUBLE:
739 case CalpontSystemCatalog::UDOUBLE:
740 {
741 double dl = row.getDoubleField(s);
742
743 if (dl == std::numeric_limits<double>::infinity())
744 continue;
745
746 if ((*f)->type() == MYSQL_TYPE_NEWDECIMAL)
747 {
748 char buf[310];
749 // reserve enough space for the longest double value
750 // -1.7976931348623157E+308 to -2.2250738585072014E-308, 0, and
751 // 2.2250738585072014E-308 to 1.7976931348623157E+308.
752 snprintf(buf, 310, "%.18g", dl);
753 (*f)->store(buf, strlen(buf), (*f)->charset());
754 }
755 else
756 {
757 // The server converts dl=-0 to dl=0 in (*f)->store().
758 // This happens in the call to truncate_double().
759 // This is an unexpected behaviour, so we directly store the
760 // double value using the lower level float8store() function.
761 // TODO Remove this when (*f)->store() handles this properly.
762 (*f)->field_length = 310;
763 if (dl == 0)
764 float8store((*f)->ptr,dl);
765 else
766 (*f)->store(dl);
767 }
768 if ((*f)->null_ptr)
769 *(*f)->null_ptr &= ~(*f)->null_bit;
770
771 break;
772 }
773
774 case CalpontSystemCatalog::LONGDOUBLE:
775 {
776 long double dl = row.getLongDoubleField(s);
777 if (dl == std::numeric_limits<long double>::infinity())
778 {
779 continue;
780 }
781
782 if ((*f)->type() == MYSQL_TYPE_NEWDECIMAL)
783 {
784 char buf[310];
785 snprintf(buf, 310, "%.20Lg", dl);
786 (*f)->store(buf, strlen(buf), (*f)->charset());
787 }
788 else
789 {
790 // reserve enough space for the longest double value
791 // -1.7976931348623157E+308 to -2.2250738585072014E-308, 0, and
792 // 2.2250738585072014E-308 to 1.7976931348623157E+308.
793 (*f)->field_length = 310;
794 (*f)->store(static_cast<double>(dl));
795 }
796 if ((*f)->null_ptr)
797 *(*f)->null_ptr &= ~(*f)->null_bit;
798 break;
799 }
800
801 case CalpontSystemCatalog::DECIMAL:
802 case CalpontSystemCatalog::UDECIMAL:
803 {
804 intColVal = row.getIntField(s);
805 storeNumericField(f, intColVal, colType);
806 break;
807 }
808
809 case CalpontSystemCatalog::BLOB:
810 case CalpontSystemCatalog::TEXT:
811 {
812 Field_blob* f2 = (Field_blob*)*f;
813 f2->set_ptr(row.getVarBinaryLength(s), (unsigned char*)row.getVarBinaryField(s));
814
815 if ((*f)->null_ptr)
816 *(*f)->null_ptr &= ~(*f)->null_bit;
817
818 break;
819 }
820
821 default: // treat as int64
822 {
823 intColVal = row.getUintField<8>(s);
824 storeNumericField(f, intColVal, colType);
825 break;
826 }
827 }
828 }
829
830 ti.tpl_scan_ctx->rowsreturned++;
831 ti.c++;
832 #ifdef INFINIDB_DEBUG
833
834 if ((ti.c % 1000000) == 0)
835 cerr << "fetchNextRow so far table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl;
836
837 #endif
838 ti.moreRows = true;
839 rc = 0;
840 }
841 else if (sm_stat == sm::SQL_NOT_FOUND)
842 {
843 IDEBUG( cerr << "fetchNextRow done for table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl );
844 ti.c = 0;
845 ti.moreRows = false;
846 rc = HA_ERR_END_OF_FILE;
847 }
848 else if (sm_stat == sm::CALPONT_INTERNAL_ERROR)
849 {
850 ti.moreRows = false;
851 rc = ER_INTERNAL_ERROR;
852 ci->rc = rc;
853 }
854 else if ((uint32_t)sm_stat == logging::ERR_LOST_CONN_EXEMGR)
855 {
856 ti.moreRows = false;
857 rc = logging::ERR_LOST_CONN_EXEMGR;
858 sm::sm_init(tid2sid(current_thd->thread_id), &ci->cal_conn_hndl,
859 get_local_query(current_thd));
860 idbassert(ci->cal_conn_hndl != 0);
861 ci->rc = rc;
862 }
863 else if (sm_stat == sm::SQL_KILLED)
864 {
865 // query was aborted by the user. treat it the same as limit query. close
866 // connection after rnd_close.
867 ti.c = 0;
868 ti.moreRows = false;
869 rc = HA_ERR_END_OF_FILE;
870 ci->rc = rc;
871 }
872 else
873 {
874 ti.moreRows = false;
875 rc = sm_stat;
876 ci->rc = rc;
877 }
878
879 return rc;
880 }
881
makeUpdateScalarJoin(const ParseTree * n,void * obj)882 void makeUpdateScalarJoin(const ParseTree* n, void* obj)
883 {
884 TreeNode* tn = n->data();
885 SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);
886
887 if (!sf)
888 return;
889
890 SimpleColumn* scLeft = dynamic_cast<SimpleColumn*>(sf->lhs());
891 SimpleColumn* scRight = dynamic_cast<SimpleColumn*>(sf->rhs());
892 CalpontSystemCatalog::TableAliasName* updatedTables = reinterpret_cast<CalpontSystemCatalog::TableAliasName*>(obj);
893
894 if ( scLeft && scRight )
895 {
896 if ( (strcasecmp(scLeft->tableName().c_str(), updatedTables->table.c_str()) == 0 ) && (strcasecmp(scLeft->schemaName().c_str(), updatedTables->schema.c_str()) == 0)
897 && (strcasecmp(scLeft->tableAlias().c_str(), updatedTables->alias.c_str()) == 0))
898 {
899 uint64_t lJoinInfo = sf->lhs()->joinInfo();
900 lJoinInfo |= JOIN_SCALAR;
901 //lJoinInfo |= JOIN_OUTER_SELECT;
902 //lJoinInfo |= JOIN_CORRELATED;
903 sf->lhs()->joinInfo(lJoinInfo);
904 }
905 else if ( (strcasecmp(scRight->tableName().c_str(), updatedTables->table.c_str()) == 0) && (strcasecmp(scRight->schemaName().c_str(), updatedTables->schema.c_str()) == 0)
906 && (strcasecmp(scRight->tableAlias().c_str(), updatedTables->alias.c_str()) == 0))
907 {
908 uint64_t rJoinInfo = sf->rhs()->joinInfo();
909 rJoinInfo |= JOIN_SCALAR;
910 //rJoinInfo |= JOIN_OUTER_SELECT;
911 //rJoinInfo |= JOIN_CORRELATED;
912 sf->rhs()->joinInfo(rJoinInfo);
913 }
914 else
915 return;
916 }
917 else
918 return;
919 }
920
makeUpdateSemiJoin(const ParseTree * n,void * obj)921 void makeUpdateSemiJoin(const ParseTree* n, void* obj)
922 {
923 TreeNode* tn = n->data();
924 SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);
925
926 if (!sf)
927 return;
928
929 SimpleColumn* scLeft = dynamic_cast<SimpleColumn*>(sf->lhs());
930 SimpleColumn* scRight = dynamic_cast<SimpleColumn*>(sf->rhs());
931 CalpontSystemCatalog::TableAliasName* updatedTables = reinterpret_cast<CalpontSystemCatalog::TableAliasName*>(obj);
932
933 //@Bug 3279. Added a check for column filters.
934 if ( scLeft && scRight && (strcasecmp(scRight->tableAlias().c_str(), scLeft->tableAlias().c_str()) != 0))
935 {
936 if ( (strcasecmp(scLeft->tableName().c_str(), updatedTables->table.c_str()) == 0 ) && (strcasecmp(scLeft->schemaName().c_str(), updatedTables->schema.c_str()) == 0)
937 && (strcasecmp(scLeft->tableAlias().c_str(), updatedTables->alias.c_str()) == 0))
938 {
939 uint64_t lJoinInfo = sf->lhs()->joinInfo();
940 lJoinInfo |= JOIN_SEMI;
941 //lJoinInfo |= JOIN_OUTER_SELECT;
942 //lJoinInfo |= JOIN_CORRELATED;
943 sf->lhs()->joinInfo(lJoinInfo);
944 }
945 else if ( (strcasecmp(scRight->tableName().c_str(), updatedTables->table.c_str()) == 0) && (strcasecmp(scRight->schemaName().c_str(), updatedTables->schema.c_str()) == 0)
946 && (strcasecmp(scRight->tableAlias().c_str(), updatedTables->alias.c_str()) == 0))
947 {
948 uint64_t rJoinInfo = sf->rhs()->joinInfo();
949 rJoinInfo |= JOIN_SEMI;
950 //rJoinInfo |= JOIN_OUTER_SELECT;
951 //rJoinInfo |= JOIN_CORRELATED;
952 sf->rhs()->joinInfo(rJoinInfo);
953 }
954 else
955 return;
956 }
957 else
958 return;
959 }
960
getOnUpdateTimestampColumns(string & schema,string & tableName,int sessionID)961 vector<string> getOnUpdateTimestampColumns(string& schema, string& tableName, int sessionID)
962 {
963 vector<string> returnVal;
964 typedef CalpontSelectExecutionPlan::ColumnMap::value_type CMVT_;
965 boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
966 csc->identity(execplan::CalpontSystemCatalog::FE);
967 CalpontSystemCatalog::TableName aTableName;
968
969 // select columnname from calpontsys.syscolumn
970 // where schema = schema and tablename = tableName
971 // and datatype = 'timestamp'
972 // and defaultvalue = 'current_timestamp() ON UPDATE current_timestamp()'
973 CalpontSelectExecutionPlan csep;
974 CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList;
975 CalpontSelectExecutionPlan::FilterTokenList filterTokenList;
976 CalpontSelectExecutionPlan::ColumnMap colMap;
977
978 SessionManager sm;
979 BRM::TxnID txnID;
980 txnID = sm.getTxnID(sessionID);
981
982 if (!txnID.valid)
983 {
984 txnID.id = 0;
985 txnID.valid = true;
986 }
987
988 QueryContext verID;
989 verID = sm.verID();
990 csep.txnID(txnID.id);
991 csep.verID(verID);
992 csep.sessionID(sessionID);
993
994 string sysTable = "calpontsys.syscolumn.";
995 string firstCol = sysTable + "columnname";
996 SimpleColumn* c1 = new SimpleColumn(firstCol, sessionID);
997 string secondCol = sysTable + "schema";
998 SimpleColumn* c2 = new SimpleColumn(secondCol, sessionID);
999 string thirdCol = sysTable + "tablename";
1000 SimpleColumn* c3 = new SimpleColumn(thirdCol, sessionID);
1001 string fourthCol = sysTable + "datatype";
1002 SimpleColumn* c4 = new SimpleColumn(fourthCol, sessionID);
1003 string fifthCol = sysTable + "defaultvalue";
1004 SimpleColumn* c5 = new SimpleColumn(fifthCol, sessionID);
1005 SRCP srcp;
1006 srcp.reset(c1);
1007 colMap.insert(CMVT_(firstCol, srcp));
1008 srcp.reset(c2);
1009 colMap.insert(CMVT_(secondCol, srcp));
1010 srcp.reset(c3);
1011 colMap.insert(CMVT_(thirdCol, srcp));
1012 srcp.reset(c4);
1013 colMap.insert(CMVT_(fourthCol, srcp));
1014 srcp.reset(c5);
1015 colMap.insert(CMVT_(fifthCol, srcp));
1016 csep.columnMapNonStatic(colMap);
1017 srcp.reset(c1->clone());
1018 returnedColumnList.push_back(srcp);
1019 csep.returnedCols(returnedColumnList);
1020
1021 // Filters
1022 const SOP opeq(new Operator("="));
1023 SimpleFilter* f1 = new SimpleFilter (opeq,
1024 c2->clone(),
1025 new ConstantColumn(schema, ConstantColumn::LITERAL));
1026 filterTokenList.push_back(f1);
1027 filterTokenList.push_back(new Operator("and"));
1028
1029 SimpleFilter* f2 = new SimpleFilter (opeq,
1030 c3->clone(),
1031 new ConstantColumn(tableName, ConstantColumn::LITERAL));
1032 filterTokenList.push_back(f2);
1033 filterTokenList.push_back(new Operator("and"));
1034
1035 SimpleFilter* f3 = new SimpleFilter (opeq,
1036 c4->clone(),
1037 new ConstantColumn((uint64_t) execplan::CalpontSystemCatalog::TIMESTAMP, ConstantColumn::NUM));
1038 filterTokenList.push_back(f3);
1039 filterTokenList.push_back(new Operator("and"));
1040
1041 string defaultValue = "current_timestamp() ON UPDATE current_timestamp()";
1042 SimpleFilter* f4 = new SimpleFilter (opeq,
1043 c5->clone(),
1044 new ConstantColumn(defaultValue, ConstantColumn::LITERAL));
1045 filterTokenList.push_back(f4);
1046 csep.filterTokenList(filterTokenList);
1047
1048 CalpontSelectExecutionPlan::TableList tablelist;
1049 tablelist.push_back(make_aliastable("calpontsys", "syscolumn", ""));
1050 csep.tableList(tablelist);
1051
1052 boost::shared_ptr<messageqcpp::MessageQueueClient> exemgrClient (new messageqcpp::MessageQueueClient("ExeMgr1"));
1053 ByteStream msg, emsgBs;
1054 rowgroup::RGData rgData;
1055 ByteStream::quadbyte qb = 4;
1056 msg << qb;
1057 rowgroup::RowGroup* rowGroup = 0;
1058 uint32_t rowCount;
1059
1060 exemgrClient->write(msg);
1061 ByteStream msgPlan;
1062 csep.serialize(msgPlan);
1063 exemgrClient->write(msgPlan);
1064 msg.restart();
1065 msg = exemgrClient->read(); //error handling
1066 emsgBs = exemgrClient->read();
1067 ByteStream::quadbyte qb1;
1068
1069 if (emsgBs.length() == 0)
1070 {
1071 //exemgrClient->shutdown();
1072 //delete exemgrClient;
1073 //exemgrClient = 0;
1074 throw runtime_error("Lost conection to ExeMgr.");
1075 }
1076
1077 string emsgStr;
1078 emsgBs >> emsgStr;
1079
1080 if (msg.length() == 4)
1081 {
1082 msg >> qb1;
1083
1084 if (qb1 != 0)
1085 {
1086 //exemgrClient->shutdown();
1087 //delete exemgrClient;
1088 //exemgrClient = 0;
1089 throw runtime_error(emsgStr);
1090 }
1091 }
1092
1093 while (true)
1094 {
1095 msg.restart();
1096 msg = exemgrClient->read();
1097
1098 if ( msg.length() == 0 )
1099 {
1100 //exemgrClient->shutdown();
1101 //delete exemgrClient;
1102 //exemgrClient = 0;
1103 throw runtime_error("Lost conection to ExeMgr.");
1104 }
1105 else
1106 {
1107 if (!rowGroup)
1108 {
1109 //This is mete data
1110 rowGroup = new rowgroup::RowGroup();
1111 rowGroup->deserialize(msg);
1112 qb = 100;
1113 msg.restart();
1114 msg << qb;
1115 exemgrClient->write(msg);
1116 continue;
1117 }
1118
1119 rgData.deserialize(msg);
1120 rowGroup->setData(&rgData);
1121
1122 if (rowGroup->getStatus() != 0)
1123 {
1124 //msg.advance(rowGroup->getDataSize());
1125 msg >> emsgStr;
1126 //exemgrClient->shutdown();
1127 //delete exemgrClient;
1128 //exemgrClient = 0;
1129 throw runtime_error(emsgStr);
1130 }
1131
1132 rowCount = rowGroup->getRowCount();
1133 if (rowCount > 0)
1134 {
1135 rowgroup::Row row;
1136 rowGroup->initRow(&row);
1137 for (uint32_t i = 0; i < rowCount; i++)
1138 {
1139 rowGroup->getRow(i, &row);
1140 // we are only fetching a single column
1141 returnVal.push_back(row.getStringField(0));
1142 }
1143 }
1144 else
1145 {
1146 break;
1147 }
1148
1149 //exemgrClient->shutdown();
1150 //delete exemgrClient;
1151 //exemgrClient = 0;
1152 }
1153 }
1154
1155 return returnVal;
1156 }
1157
doUpdateDelete(THD * thd,gp_walk_info & gwi,const std::vector<COND * > & condStack)1158 uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& condStack)
1159 {
1160 if (get_fe_conn_info_ptr() == nullptr)
1161 set_fe_conn_info_ptr((void*)new cal_connection_info());
1162
1163 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
1164
1165 if (ci->isSlaveNode && !thd->slave_thread)
1166 {
1167 string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE);
1168 setError(thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
1169 return ER_CHECK_NOT_IMPLEMENTED;
1170 }
1171
1172 //@Bug 4387. Check BRM status before start statement.
1173 boost::scoped_ptr<DBRM> dbrmp(new DBRM());
1174 int rc = dbrmp->isReadWrite();
1175
1176 if (rc != 0 )
1177 {
1178 setError(current_thd, ER_READ_ONLY_MODE, "Cannot execute the statement. DBRM is read only!");
1179 return ER_READ_ONLY_MODE;
1180 }
1181
1182 // stats start
1183 ci->stats.reset();
1184 ci->stats.setStartTime();
1185 if (thd->main_security_ctx.user)
1186 {
1187 ci->stats.fUser = thd->main_security_ctx.user;
1188 }
1189 else
1190 {
1191 ci->stats.fUser = "";
1192 }
1193
1194 if (thd->main_security_ctx.host)
1195 ci->stats.fHost = thd->main_security_ctx.host;
1196 else if (thd->main_security_ctx.host_or_ip)
1197 ci->stats.fHost = thd->main_security_ctx.host_or_ip;
1198 else
1199 ci->stats.fHost = "unknown";
1200
1201 try
1202 {
1203 ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
1204 }
1205 catch (std::exception& e)
1206 {
1207 string msg = string("Columnstore User Priority - ") + e.what();
1208 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
1209 }
1210
1211 ci->stats.fSessionID = tid2sid(thd->thread_id);
1212
1213 LEX* lex = thd->lex;
1214 idbassert(lex != 0);
1215
1216 // Error out DELETE on VIEW. It's currently not supported.
1217 // @note DELETE on VIEW works natually (for simple cases at least), but we choose to turn it off
1218 // for now - ZZ.
1219 TABLE_LIST* tables = thd->lex->query_tables;
1220
1221 for (; tables; tables = tables->next_local)
1222 {
1223 if (tables->view)
1224 {
1225 Message::Args args;
1226
1227 if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
1228 args.add("Update");
1229 #if 0
1230 else if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0)
1231 args.add("Row based replication event");
1232 #endif
1233 else
1234 args.add("Delete");
1235
1236 string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_VIEW, args);
1237 setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
1238 return ER_CHECK_NOT_IMPLEMENTED;
1239 }
1240
1241 /*
1242 #if (defined(_MSC_VER) && defined(_DEBUG)) || defined(SAFE_MUTEX)
1243 if ((strcmp((*tables->table->s->db_plugin)->name.str, "InfiniDB") != 0) && (strcmp((*tables->table->s->db_plugin)->name.str, "MEMORY") != 0) &&
1244 (tables->table->s->table_category != TABLE_CATEGORY_TEMPORARY) )
1245 #else
1246 if ((strcmp(tables->table->s->db_plugin->name.str, "InfiniDB") != 0) && (strcmp(tables->table->s->db_plugin->name.str, "MEMORY") != 0) &&
1247 (tables->table->s->table_category != TABLE_CATEGORY_TEMPORARY) )
1248 #endif
1249 {
1250 Message::Args args;
1251 args.add("Non Calpont table(s)");
1252 string emsg(IDBErrorInfo::instance()->errorMsg(ERR_DML_NOT_SUPPORT_FEATURE, args));
1253 setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
1254 return ER_CHECK_NOT_IMPLEMENTED;
1255 }
1256 */
1257 }
1258
1259 // @bug 1127. Re-construct update stmt using lex instead of using the original query.
1260 char* query_char = idb_mysql_query_str(thd);
1261 std::string dmlStmt;
1262 if (!query_char)
1263 {
1264 dmlStmt = "<Replication event>";
1265 }
1266 else
1267 {
1268 dmlStmt = query_char;
1269 }
1270 string schemaName;
1271 string tableName("");
1272 string aliasName("");
1273 UpdateSqlStatement updateSqlStmt;
1274 ColumnAssignmentList* colAssignmentListPtr = new ColumnAssignmentList();
1275 bool isFromCol = false;
1276 bool isFromSameTable = true;
1277 execplan::SCSEP updateCP(new execplan::CalpontSelectExecutionPlan());
1278
1279 updateCP->isDML(true);
1280
1281 //@Bug 2753. the memory already freed by destructor of UpdateSqlStatement
1282 if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
1283 {
1284 ColumnAssignment* columnAssignmentPtr;
1285 Item_field* item;
1286 List_iterator_fast<Item> field_it(thd->lex->first_select_lex()->item_list);
1287 List_iterator_fast<Item> value_it(thd->lex->value_list);
1288 updateCP->queryType(CalpontSelectExecutionPlan::UPDATE);
1289 ci->stats.fQueryType = updateCP->queryType();
1290 uint32_t cnt = 0;
1291 tr1::unordered_set<string> timeStampColumnNames;
1292
1293 while ((item = (Item_field*) field_it++))
1294 {
1295 cnt++;
1296
1297 string tmpTableName = bestTableName(item);
1298
1299 //@Bug 5312 populate aliasname with tablename if it is empty
1300 if (!item->table_name.str)
1301 aliasName = tmpTableName;
1302 else
1303 aliasName = item->table_name.str;
1304
1305 if (lower_case_table_names)
1306 {
1307 boost::algorithm::to_lower(aliasName);
1308 boost::algorithm::to_lower(tableName);
1309 boost::algorithm::to_lower(tmpTableName);
1310 }
1311
1312 if (strcasecmp(tableName.c_str(), "") == 0)
1313 {
1314 tableName = tmpTableName;
1315 }
1316 else if (strcmp(tableName.c_str(), tmpTableName.c_str()) != 0)
1317 {
1318 //@ Bug3326 error out for multi table update
1319 string emsg(IDBErrorInfo::instance()->errorMsg(ERR_UPDATE_NOT_SUPPORT_FEATURE));
1320 thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, emsg.c_str());
1321 ci->rc = -1;
1322 thd->set_row_count_func(0);
1323 return -1;
1324 }
1325
1326 if (!item->db_name.str)
1327 {
1328 //@Bug 5312. if subselect, wait until the schema info is available.
1329 if (thd->derived_tables_processing)
1330 return 0;
1331 else
1332 {
1333 thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, "The statement cannot be processed without schema.");
1334 ci->rc = -1;
1335 thd->set_row_count_func(0);
1336 return -1;
1337 }
1338 }
1339 else
1340 {
1341 schemaName = string(item->db_name.str);
1342 if (lower_case_table_names)
1343 {
1344 boost::algorithm::to_lower(schemaName);
1345 }
1346 }
1347 columnAssignmentPtr = new ColumnAssignment(item->name.str, "=", "");
1348 if (item->field_type() == MYSQL_TYPE_TIMESTAMP ||
1349 item->field_type() == MYSQL_TYPE_TIMESTAMP2)
1350 {
1351 timeStampColumnNames.insert(string(item->name.str));
1352 }
1353
1354 Item* value = value_it++;
1355
1356 if (value->type() == Item::CONST_ITEM)
1357 {
1358 if (value->cmp_type() == STRING_RESULT ||
1359 value->cmp_type() == DECIMAL_RESULT ||
1360 value->cmp_type() == REAL_RESULT)
1361 {
1362 //@Bug 2587 use val_str to replace value->name to get rid of 255 limit
1363 String val, *str;
1364 str = value->val_str(&val);
1365 columnAssignmentPtr->fScalarExpression.assign(str->ptr(), str->length());
1366 columnAssignmentPtr->fFromCol = false;
1367 }
1368 else if (value->cmp_type() == INT_RESULT)
1369 {
1370 std::ostringstream oss;
1371
1372 if (value->unsigned_flag)
1373 {
1374 oss << value->val_uint();
1375 }
1376 else
1377 {
1378 oss << value->val_int();
1379 }
1380
1381 columnAssignmentPtr->fScalarExpression = oss.str();
1382 columnAssignmentPtr->fFromCol = false;
1383 }
1384 }
1385 else if ( value->type() == Item::FUNC_ITEM )
1386 {
1387 //Bug 2092 handle negative values
1388 Item_func* ifp = (Item_func*)value;
1389
1390 if (ifp->result_type() == DECIMAL_RESULT)
1391 columnAssignmentPtr->fFuncScale = ifp->decimals; //decimal scale
1392
1393 vector <Item_field*> tmpVec;
1394 bool hasNonSupportItem = false;
1395 uint16_t parseInfo = 0;
1396 parse_item(ifp, tmpVec, hasNonSupportItem, parseInfo);
1397
1398 // const f&e evaluate here. @bug3513. Rule out special functions that takes
1399 // no argument but needs to be sent to the back end to process. Like rand(),
1400 // sysdate() etc.
1401 if (!hasNonSupportItem && !cal_impl_if::nonConstFunc(ifp) && tmpVec.size() == 0)
1402 {
1403 gp_walk_info gwi;
1404 gwi.thd = thd;
1405 SRCP srcp(buildReturnedColumn(value, gwi, gwi.fatalParseError));
1406 ConstantColumn* constCol = dynamic_cast<ConstantColumn*>(srcp.get());
1407
1408 if (constCol )
1409 {
1410 columnAssignmentPtr->fScalarExpression = constCol->constval();
1411 isFromCol = false;
1412 columnAssignmentPtr->fFromCol = false;
1413 }
1414 else
1415 {
1416 isFromCol = true;
1417 columnAssignmentPtr->fFromCol = true;
1418
1419 }
1420 }
1421 else
1422 {
1423 isFromCol = true;
1424 columnAssignmentPtr->fFromCol = true;
1425 }
1426
1427 if ( isFromCol )
1428 {
1429 string sectableName("");
1430 string secschemaName ("");
1431
1432 for ( unsigned i = 0; i < tmpVec.size(); i++ )
1433 {
1434 sectableName = bestTableName(tmpVec[i]);
1435
1436 if ( tmpVec[i]->db_name.str )
1437 {
1438 secschemaName = string(tmpVec[i]->db_name.str);
1439 }
1440
1441 if ( (strcasecmp(tableName.c_str(), sectableName.c_str()) != 0) ||
1442 (strcasecmp(schemaName.c_str(), secschemaName.c_str()) != 0))
1443 {
1444 isFromSameTable = false;
1445 break;
1446 }
1447 }
1448 }
1449 }
1450 else if ( value->type() == Item::FIELD_ITEM)
1451 {
1452 isFromCol = true;
1453 columnAssignmentPtr->fFromCol = true;
1454 Item_field* setIt = reinterpret_cast<Item_field*> (value);
1455 string sectableName = string(setIt->table_name.str);
1456
1457 if ( setIt->db_name.str ) //derived table
1458 {
1459 string secschemaName = string(setIt->db_name.str);
1460
1461 if ( (strcasecmp(tableName.c_str(), sectableName.c_str()) != 0) || (strcasecmp(schemaName.c_str(), secschemaName.c_str()) != 0))
1462 {
1463 isFromSameTable = false;
1464 }
1465 }
1466 else
1467 {
1468 isFromSameTable = false;
1469 }
1470 }
1471 else if ( value->type() == Item::NULL_ITEM )
1472 {
1473 columnAssignmentPtr->fScalarExpression = "";
1474 columnAssignmentPtr->fFromCol = false;
1475 columnAssignmentPtr->fIsNull = true;
1476 }
1477 else if ( value->type() == Item::SUBSELECT_ITEM )
1478 {
1479 isFromCol = true;
1480 columnAssignmentPtr->fFromCol = true;
1481 isFromSameTable = false;
1482 }
1483 //@Bug 4449 handle default value
1484 else if (value->type() == Item::DEFAULT_VALUE_ITEM)
1485 {
1486 Item_field* tmp = (Item_field*)value;
1487
1488 if (!tmp->field_name.length) //null
1489 {
1490 columnAssignmentPtr->fScalarExpression = "NULL";
1491 columnAssignmentPtr->fFromCol = false;
1492 }
1493 else
1494 {
1495 String val, *str;
1496 str = value->val_str(&val);
1497 columnAssignmentPtr->fScalarExpression.assign(str->ptr(), str->length());
1498 columnAssignmentPtr->fFromCol = false;
1499 }
1500 }
1501 else if (value->type() == Item::WINDOW_FUNC_ITEM)
1502 {
1503 setError(thd, ER_INTERNAL_ERROR,
1504 logging::IDBErrorInfo::instance()->errorMsg(ERR_WF_UPDATE));
1505 return ER_CHECK_NOT_IMPLEMENTED;
1506 }
1507 else
1508 {
1509 String val, *str;
1510 str = value->val_str(&val);
1511
1512 if (str)
1513 {
1514 columnAssignmentPtr->fScalarExpression.assign(str->ptr(), str->length());
1515 columnAssignmentPtr->fFromCol = false;
1516 }
1517 else
1518 {
1519 columnAssignmentPtr->fScalarExpression = "NULL";
1520 columnAssignmentPtr->fFromCol = false;
1521 }
1522 }
1523
1524 colAssignmentListPtr->push_back ( columnAssignmentPtr );
1525 }
1526
1527 // Support for on update current_timestamp() for timestamp fields
1528 // Query calpontsys.syscolumn to get all timestamp columns with
1529 // ON UPDATE current_timestamp() property
1530 vector<string> onUpdateTimeStampColumns = getOnUpdateTimestampColumns(schemaName, tableName, tid2sid(thd->thread_id));
1531 for (size_t i = 0; i < onUpdateTimeStampColumns.size(); i++)
1532 {
1533 if (timeStampColumnNames.find(onUpdateTimeStampColumns[i]) == timeStampColumnNames.end())
1534 {
1535 // DRRTUY * That is far from ideal.
1536 columnAssignmentPtr = new ColumnAssignment(string(onUpdateTimeStampColumns[i]), "=", "");
1537 struct timeval tv;
1538 char buf[64];
1539 gettimeofday(&tv, 0);
1540 MySQLTime time;
1541 gmtSecToMySQLTime(tv.tv_sec, time, thd->variables.time_zone->get_name()->ptr());
1542 sprintf(buf, "%04d-%02d-%02d %02d:%02d:%02d.%06ld", time.year, time.month, time.day, time.hour, time.minute, time.second, tv.tv_usec);
1543 columnAssignmentPtr->fScalarExpression = buf;
1544 colAssignmentListPtr->push_back ( columnAssignmentPtr );
1545 }
1546 }
1547 }
1548 #if 0
1549 else if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0)
1550 {
1551 string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_RBR_EVENT);
1552 setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
1553 return ER_CHECK_NOT_IMPLEMENTED;
1554 }
1555 #endif
1556 else
1557 {
1558 updateCP->queryType(CalpontSelectExecutionPlan::DELETE);
1559 ci->stats.fQueryType = updateCP->queryType();
1560 }
1561
1562 //save table oid for commit/rollback to use
1563 uint32_t sessionID = tid2sid(thd->thread_id);
1564 boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
1565 csc->identity(execplan::CalpontSystemCatalog::FE);
1566 CalpontSystemCatalog::TableName aTableName;
1567 TABLE_LIST* first_table = 0;
1568
1569 if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1570 {
1571 aTableName.schema = schemaName;
1572 aTableName.table = tableName;
1573 }
1574 else
1575 {
1576 first_table = (TABLE_LIST*) thd->lex->first_select_lex()->table_list.first;
1577 aTableName.schema = first_table->table->s->db.str;
1578 aTableName.table = first_table->table->s->table_name.str;
1579 }
1580 if (lower_case_table_names)
1581 {
1582 boost::algorithm::to_lower(aTableName.schema);
1583 boost::algorithm::to_lower(aTableName.table);
1584 }
1585
1586 CalpontDMLPackage* pDMLPackage = 0;
1587 // dmlStmt += ";";
1588 IDEBUG( cout << "STMT: " << dmlStmt << " and sessionID " << thd->thread_id << endl );
1589 VendorDMLStatement dmlStatement(dmlStmt, sessionID);
1590
1591 if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1592 dmlStatement.set_DMLStatementType( DML_UPDATE );
1593 else
1594 dmlStatement.set_DMLStatementType( DML_DELETE );
1595
1596 TableName* qualifiedTablName = new TableName();
1597
1598
1599 UpdateSqlStatement updateStmt;
1600 //@Bug 2753. To make sure the momory is freed.
1601 updateStmt.fColAssignmentListPtr = colAssignmentListPtr;
1602
1603 if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1604 {
1605 qualifiedTablName->fName = tableName;
1606 qualifiedTablName->fSchema = schemaName;
1607 updateStmt.fNamePtr = qualifiedTablName;
1608 pDMLPackage = CalpontDMLFactory::makeCalpontUpdatePackageFromMysqlBuffer( dmlStatement, updateStmt );
1609 }
1610 else if ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) //@Bug 6121 error out on multi tables delete.
1611 {
1612 if ( (thd->lex->first_select_lex()->join) != 0)
1613 {
1614 multi_delete* deleteTable = (multi_delete*)((thd->lex->first_select_lex()->join)->result);
1615 first_table = (TABLE_LIST*) deleteTable->get_tables();
1616
1617 if (deleteTable->get_num_of_tables() == 1)
1618 {
1619 schemaName = first_table->db.str;
1620 tableName = first_table->table_name.str;
1621 aliasName = first_table->alias.str;
1622 if (lower_case_table_names)
1623 {
1624 boost::algorithm::to_lower(schemaName);
1625 boost::algorithm::to_lower(tableName);
1626 boost::algorithm::to_lower(aliasName);
1627 }
1628 qualifiedTablName->fName = tableName;
1629 qualifiedTablName->fSchema = schemaName;
1630 pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement);
1631 }
1632 else
1633 {
1634 string emsg("Deleting rows from multiple tables in a single statement is currently not supported.");
1635 thd->raise_error_printf(ER_INTERNAL_ERROR, emsg.c_str());
1636 ci->rc = 1;
1637 thd->set_row_count_func(0);
1638 return 1;
1639 }
1640 }
1641 else
1642 {
1643 first_table = (TABLE_LIST*) thd->lex->first_select_lex()->table_list.first;
1644 schemaName = first_table->table->s->db.str;
1645 tableName = first_table->table->s->table_name.str;
1646 aliasName = first_table->alias.str;
1647 if (lower_case_table_names)
1648 {
1649 boost::algorithm::to_lower(schemaName);
1650 boost::algorithm::to_lower(tableName);
1651 boost::algorithm::to_lower(aliasName);
1652 }
1653 qualifiedTablName->fName = tableName;
1654 qualifiedTablName->fSchema = schemaName;
1655 pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement);
1656 }
1657 }
1658 else
1659 {
1660 first_table = (TABLE_LIST*) thd->lex->first_select_lex()->table_list.first;
1661 schemaName = first_table->table->s->db.str;
1662 tableName = first_table->table->s->table_name.str;
1663 aliasName = first_table->alias.str;
1664 if (lower_case_table_names)
1665 {
1666 boost::algorithm::to_lower(schemaName);
1667 boost::algorithm::to_lower(tableName);
1668 boost::algorithm::to_lower(aliasName);
1669 }
1670 qualifiedTablName->fName = tableName;
1671 qualifiedTablName->fSchema = schemaName;
1672 pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement);
1673 }
1674
1675 if (!pDMLPackage)
1676 {
1677 string emsg("Fatal parse error in vtable mode in DMLParser ");
1678 setError(thd, ER_INTERNAL_ERROR, emsg);
1679 return ER_INTERNAL_ERROR;
1680 }
1681
1682 pDMLPackage->set_TableName(tableName);
1683
1684 pDMLPackage->set_SchemaName(schemaName);
1685 pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
1686
1687 pDMLPackage->set_IsFromCol( true );
1688 //cout << " setting isFromCol to " << isFromCol << endl;
1689 std::string origStmt = dmlStmt;
1690 origStmt += ";";
1691 pDMLPackage->set_SQLStatement( origStmt );
1692
1693 //Save the item list
1694 List<Item> items;
1695 SELECT_LEX select_lex;
1696
1697 if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1698 {
1699 items = (thd->lex->first_select_lex()->item_list);
1700 thd->lex->first_select_lex()->item_list = thd->lex->value_list;
1701 }
1702
1703 select_lex = *lex->first_select_lex();
1704
1705
1706 //@Bug 2808 Error out on order by or limit clause
1707 //@bug5096. support dml limit.
1708 if (( thd->lex->first_select_lex()->order_list.elements != 0 ) )
1709 {
1710 string emsg("DML Statement with order by clause is not currently supported.");
1711 thd->raise_error_printf(ER_INTERNAL_ERROR, emsg.c_str());
1712 ci->rc = 1;
1713 thd->set_row_count_func(0);
1714 return 1;
1715 }
1716
1717 {
1718 updateCP->subType (CalpontSelectExecutionPlan::SELECT_SUBS);
1719 //@Bug 2975.
1720 SessionManager sm;
1721 BRM::TxnID txnID;
1722 txnID = sm.getTxnID(sessionID);
1723
1724 if (!txnID.valid)
1725 {
1726 txnID.id = 0;
1727 txnID.valid = true;
1728 }
1729
1730 QueryContext verID;
1731 verID = sm.verID();
1732
1733 updateCP->txnID(txnID.id);
1734 updateCP->verID(verID);
1735 updateCP->sessionID(sessionID);
1736 char* query_char = idb_mysql_query_str(thd);
1737 std::string query_str;
1738 if (!query_char)
1739 {
1740 query_str = "<Replication event>";
1741 }
1742 else
1743 {
1744 query_str = query_char;
1745 }
1746 updateCP->data(query_str);
1747
1748 try
1749 {
1750 updateCP->priority( ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
1751 }
1752 catch (std::exception& e)
1753 {
1754 string msg = string("Columnstore User Priority - ") + e.what();
1755 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
1756 }
1757
1758 gwi.clauseType = WHERE;
1759
1760 if (getSelectPlan(gwi, select_lex, updateCP, false, false, condStack) != 0) //@Bug 3030 Modify the error message for unsupported functions
1761 {
1762 if (gwi.cs_vtable_is_update_with_derive)
1763 {
1764 // @bug 4457. MySQL inconsistence! for some queries, some structures are only available
1765 // in the derived_tables_processing phase. So by pass the phase for DML only when the
1766 // execution plan can not be successfully generated. recover lex before returning;
1767 thd->lex->first_select_lex()->item_list = items;
1768 return 0;
1769 }
1770
1771 //check different error code
1772 // avoid double set IDB error
1773 string emsg;
1774
1775 if (gwi.parseErrorText.find("IDB-") == string::npos)
1776 {
1777 Message::Args args;
1778 args.add(gwi.parseErrorText);
1779 emsg = IDBErrorInfo::instance()->errorMsg(ER_INTERNAL_ERROR, args);
1780 }
1781 else
1782 {
1783 emsg = gwi.parseErrorText;
1784 }
1785
1786 thd->raise_error_printf(ER_INTERNAL_ERROR, emsg.c_str());
1787 ci->rc = -1;
1788 thd->set_row_count_func(0);
1789 return -1;
1790
1791 }
1792
1793 //cout<< "Plan before is " << endl << *updateCP << endl;
1794 //set the large side by putting the updated table at the first
1795 CalpontSelectExecutionPlan::TableList tbList = updateCP->tableList();
1796
1797 CalpontSelectExecutionPlan::TableList::iterator iter = tbList.begin();
1798 bool notFirst = false;
1799
1800 while ( iter != tbList.end() )
1801 {
1802 if ( ( iter != tbList.begin() ) && (iter->schema == schemaName) && ( iter->table == tableName ) && ( iter->alias == aliasName ) )
1803 {
1804 notFirst = true;
1805 tbList.erase(iter);
1806 break;
1807 }
1808
1809 iter++;
1810 }
1811
1812 if ( notFirst )
1813 {
1814 CalpontSystemCatalog::TableAliasName tn = make_aliastable(schemaName, tableName, aliasName);
1815 iter = tbList.begin();
1816 tbList.insert( iter, 1, tn );
1817 }
1818
1819 updateCP->tableList( tbList );
1820 // DRRTUY * this is very optimisic assumption
1821 updateCP->overrideLargeSideEstimate( true );
1822 //loop through returnedcols to find out constant columns
1823 CalpontSelectExecutionPlan::ReturnedColumnList returnedCols = updateCP->returnedCols();
1824 CalpontSelectExecutionPlan::ReturnedColumnList::iterator coliter = returnedCols.begin();
1825
1826 while ( coliter != returnedCols.end() )
1827 {
1828 ConstantColumn* returnCol = dynamic_cast<ConstantColumn*>((*coliter).get());
1829
1830 if (returnCol )
1831 {
1832 returnedCols.erase(coliter);
1833 coliter = returnedCols.begin();
1834 }
1835 else
1836 coliter++;
1837 }
1838
1839 if ((updateCP->columnMap()).empty())
1840 throw runtime_error ("column map is empty!");
1841
1842 if (returnedCols.empty())
1843 returnedCols.push_back((updateCP->columnMap()).begin()->second);
1844
1845 //@Bug 6123. get the correct returned columnlist
1846 if (( (thd->lex)->sql_command == SQLCOM_DELETE ) || ( (thd->lex)->sql_command == SQLCOM_DELETE_MULTI ) )
1847 {
1848 returnedCols.clear();
1849 //choose the smallest column to project
1850 CalpontSystemCatalog::TableName deleteTableName;
1851 deleteTableName.schema = schemaName;
1852 deleteTableName.table = tableName;
1853 CalpontSystemCatalog::RIDList colrids;
1854
1855 try
1856 {
1857 colrids = csc->columnRIDs(deleteTableName, false, lower_case_table_names);
1858 }
1859 catch (IDBExcept& ie)
1860 {
1861 thd->raise_error_printf(ER_INTERNAL_ERROR, ie.what());
1862 ci->rc = -1;
1863 thd->set_row_count_func(0);
1864 return -1;
1865 }
1866
1867 int minColWidth = -1;
1868 int minWidthColOffset = 0;
1869
1870 for (unsigned int j = 0; j < colrids.size(); j++)
1871 {
1872 CalpontSystemCatalog::ColType ct = csc->colType(colrids[j].objnum);
1873
1874 if (ct.colDataType == CalpontSystemCatalog::VARBINARY)
1875 continue;
1876
1877 if (minColWidth == -1 || ct.colWidth < minColWidth)
1878 {
1879 minColWidth = ct.colWidth;
1880 minWidthColOffset = j;
1881 }
1882 }
1883
1884 CalpontSystemCatalog::TableColName tcn = csc->colName(colrids[minWidthColOffset].objnum);
1885 SimpleColumn* sc = new SimpleColumn(tcn.schema, tcn.table, tcn.column, csc->sessionID());
1886 sc->tableAlias(aliasName);
1887 sc->timeZone(thd->variables.time_zone->get_name()->ptr());
1888 sc->resultType(csc->colType(colrids[minWidthColOffset].objnum));
1889 SRCP srcp;
1890 srcp.reset(sc);
1891 returnedCols.push_back(srcp);
1892 //cout << "tablename:alias = " << tcn.table<<":"<<aliasName<<endl;
1893 }
1894
1895 updateCP->returnedCols( returnedCols );
1896
1897 if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1898 {
1899 const ParseTree* ptsub = updateCP->filters();
1900
1901 if ( !isFromSameTable )
1902 {
1903 //cout << "set scalar" << endl;
1904 //walk tree to set scalar
1905 if (ptsub)
1906 ptsub->walk(makeUpdateScalarJoin, &tbList[0] );
1907 }
1908 else
1909 {
1910 //cout << "set semi" << endl;
1911 if (ptsub)
1912 ptsub->walk(makeUpdateSemiJoin, &tbList[0] );
1913 }
1914
1915 if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1916 thd->lex->first_select_lex()->item_list = items;
1917 }
1918
1919 }
1920
1921 //cout<< "Plan is " << endl << *updateCP << endl;
1922 //updateCP->traceFlags(1);
1923 pDMLPackage->HasFilter(true);
1924 pDMLPackage->uuid(updateCP->uuid());
1925
1926 ByteStream bytestream, bytestream1;
1927 bytestream << sessionID;
1928 boost::shared_ptr<messageqcpp::ByteStream> plan = pDMLPackage->get_ExecutionPlan();
1929 updateCP->rmParms(ci->rmParms);
1930 updateCP->serialize(*plan);
1931 pDMLPackage->write(bytestream);
1932
1933 delete pDMLPackage;
1934
1935 ByteStream::byte b = 0;
1936 ByteStream::octbyte rows = 0;
1937 std::string errorMsg;
1938 long long dmlRowCount = 0;
1939
1940 if ( thd->killed > 0 )
1941 {
1942 return 0;
1943 }
1944
1945 //querystats::QueryStats stats;
1946 string tableLockInfo;
1947
1948 // Save the tableOid for the COMMIT | ROLLBACK
1949 CalpontSystemCatalog::ROPair roPair;
1950 try
1951 {
1952 roPair = csc->tableRID(aTableName);
1953 }
1954 catch (IDBExcept& ie)
1955 {
1956 setError(thd, ER_INTERNAL_ERROR, ie.what());
1957 return ER_INTERNAL_ERROR;
1958 }
1959 catch (std::exception& ex)
1960 {
1961 setError(thd, ER_INTERNAL_ERROR,
1962 logging::IDBErrorInfo::instance()->errorMsg(ERR_SYSTEM_CATALOG) + ex.what());
1963 return ER_INTERNAL_ERROR;
1964 }
1965 ci->tableOid = roPair.objnum;
1966
1967 // Send the request to DMLProc and wait for a response.
1968 try
1969 {
1970 timespec* tsp = 0;
1971 #ifndef _MSC_VER
1972 timespec ts;
1973 ts.tv_sec = 3L;
1974 ts.tv_nsec = 0L;
1975 tsp = &ts;
1976 #else
1977 //FIXME: @#$%^&! mysql has buggered up timespec!
1978 // The definition in my_pthread.h isn't the same as in winport/unistd.h...
1979 struct timespec_foo
1980 {
1981 long tv_sec;
1982 long tv_nsec;
1983 } ts_foo;
1984 ts_foo.tv_sec = 3;
1985 ts_foo.tv_nsec = 0;
1986 //This is only to get the compiler to not carp below at the read() call.
1987 // The messagequeue lib uses the correct struct
1988 tsp = reinterpret_cast<timespec*>(&ts_foo);
1989 #endif
1990 bool isTimeOut = true;
1991 int maxRetries = 2;
1992 std::string exMsg;
1993
1994 // We try twice to get a response from dmlproc.
1995 // Every (3) seconds, check for ctrl+c
1996 for (int retry = 0; bytestream1.length() == 0 && retry < maxRetries; ++ retry)
1997 {
1998 try
1999 {
2000 if (!ci->dmlProc)
2001 {
2002 ci->dmlProc = new MessageQueueClient("DMLProc");
2003 //cout << "doUpdateDelete start new DMLProc client " << ci->dmlProc << " for session " << sessionID << endl;
2004 }
2005 else
2006 {
2007 delete ci->dmlProc;
2008 ci->dmlProc = nullptr;
2009 ci->dmlProc = new MessageQueueClient("DMLProc");
2010 }
2011
2012 // Send the request to DMLProc
2013 ci->dmlProc->write(bytestream);
2014
2015 // Get an answer from DMLProc
2016 while (isTimeOut)
2017 {
2018 isTimeOut = false;
2019 bytestream1 = ci->dmlProc->read(tsp, &isTimeOut);
2020
2021 if (b == 0 && thd->killed > 0 && isTimeOut)
2022 {
2023 // We send the CTRL+C command to DMLProc out of band
2024 // (on a different connection) This will cause
2025 // DMLProc to stop processing and return an error on the
2026 // original connection which will cause a rollback.
2027 messageqcpp::MessageQueueClient ctrlCProc("DMLProc");
2028 //cout << "doUpdateDelete start new DMLProc client for ctrl-c " << " for session " << sessionID << endl;
2029 VendorDMLStatement cmdStmt( "CTRL+C", DML_COMMAND, sessionID);
2030 CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
2031 pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
2032 ByteStream bytestream;
2033 bytestream << static_cast<uint32_t>(sessionID);
2034 pDMLPackage->write(bytestream);
2035 delete pDMLPackage;
2036 b = 1;
2037 retry = maxRetries;
2038 errorMsg = "Command canceled by user";
2039
2040 try
2041 {
2042 ctrlCProc.write(bytestream);
2043 }
2044 catch (runtime_error&)
2045 {
2046 errorMsg = "Lost connection to DMLProc while doing ctrl+c";
2047 }
2048 catch (...)
2049 {
2050 errorMsg = "Unknown error caught while doing ctrl+c";
2051 }
2052
2053 // break;
2054 }
2055 }
2056 }
2057 catch (runtime_error& ex)
2058 {
2059 // An exception causes a retry, so fall thru
2060 exMsg = ex.what();
2061 }
2062
2063 if (bytestream1.length() == 0 && thd->killed <= 0)
2064 {
2065 //cout << "line 1442. received 0 byte from DMLProc and retry = "<< retry << endl;
2066 // Seems dmlProc isn't playing. Reset it and try again.
2067 delete ci->dmlProc;
2068 ci->dmlProc = nullptr;
2069 isTimeOut = true; //@Bug 4742
2070 }
2071 }
2072
2073 if (bytestream1.length() == 0)
2074 {
2075 // If we didn't get anything, error
2076 b = 1;
2077
2078 if (exMsg.length() > 0)
2079 {
2080 errorMsg = exMsg;
2081 }
2082 else
2083 {
2084 errorMsg = "Lost connection to DMLProc";
2085 }
2086 }
2087 else
2088 {
2089 bytestream1 >> b;
2090 bytestream1 >> rows;
2091 bytestream1 >> errorMsg;
2092
2093 if (b == 0)
2094 {
2095 bytestream1 >> tableLockInfo;
2096 bytestream1 >> ci->queryStats;
2097 bytestream1 >> ci->extendedStats;
2098 bytestream1 >> ci->miniStats;
2099 ci->stats.unserialize(bytestream1);
2100 }
2101 }
2102
2103 dmlRowCount = rows;
2104
2105 if (thd->killed && b == 0)
2106 {
2107 b = dmlpackageprocessor::DMLPackageProcessor::JOB_CANCELED;
2108 errorMsg = "Command canceled by user";
2109 }
2110 }
2111 catch (runtime_error& ex)
2112 {
2113 cout << ex.what() << endl;
2114 b = 1;
2115 delete ci->dmlProc;
2116 ci->dmlProc = nullptr;
2117 errorMsg = ex.what();
2118 }
2119 catch ( ... )
2120 {
2121 //cout << "... exception while writing to DMLProc" << endl;
2122 b = 1;
2123 delete ci->dmlProc;
2124 ci->dmlProc = nullptr;
2125 errorMsg = "Unknown error caught";
2126 }
2127
2128 // If autocommit is on then go ahead and tell the engine to commit the transaction
2129 //@Bug 1960 If error occurs, the commit is just to release the active transaction.
2130 //@Bug 2241. Rollback transaction when it failed
2131 //@Bug 4605. If error, always rollback.
2132 if (b != dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR)
2133 {
2134 std::string command;
2135
2136 if ((useHdfs) && (b == 0))
2137 command = "COMMIT";
2138 else if ((useHdfs) && (b != 0))
2139 command = "ROLLBACK";
2140 else if ((b == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) && thd->is_strict_mode())
2141 command = "ROLLBACK";
2142 else if ((!(current_thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) && (( b == 0 ) || (b == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)) )
2143 command = "COMMIT";
2144 else if (( b != 0 ) && (b != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) )
2145 command = "ROLLBACK";
2146 else
2147 command = "";
2148
2149 if ( command != "")
2150 {
2151 VendorDMLStatement cmdStmt(command, DML_COMMAND, sessionID);
2152 CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
2153 pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
2154 pDMLPackage->setTableOid (ci->tableOid);
2155 ByteStream bytestream;
2156 bytestream << static_cast<uint32_t>(sessionID);
2157 pDMLPackage->write(bytestream);
2158 delete pDMLPackage;
2159
2160 ByteStream::byte bc;
2161 std::string errMsg;
2162
2163 try
2164 {
2165 if (!ci->dmlProc)
2166 {
2167 ci->dmlProc = new MessageQueueClient("DMLProc");
2168 //cout << " doupdateDelete command use a new dml client " << ci->dmlProc << endl;
2169 }
2170
2171 ci->dmlProc->write(bytestream);
2172 bytestream1 = ci->dmlProc->read();
2173 bytestream1 >> bc;
2174 bytestream1 >> rows;
2175 bytestream1 >> errMsg;
2176
2177 if ( b == 0 )
2178 {
2179 b = bc;
2180 errorMsg = errMsg;
2181 }
2182 }
2183 catch (runtime_error&)
2184 {
2185 errorMsg = "Lost connection to DMLProc";
2186 b = 1;
2187 delete ci->dmlProc;
2188 ci->dmlProc = nullptr;
2189 }
2190 catch (...)
2191 {
2192 errorMsg = "Unknown error caught";
2193 b = 1;
2194 }
2195
2196 // Clear tableOid for the next SQL statement
2197 ci->tableOid = 0;
2198 }
2199 }
2200
2201 //@Bug 2241 Display an error message to user
2202
2203 if ( ( b != 0 ) && (b != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
2204 {
2205 //@Bug 2540. Set error status instead of warning
2206 thd->raise_error_printf(ER_INTERNAL_ERROR, errorMsg.c_str());
2207 ci->rc = b;
2208 rc = ER_INTERNAL_ERROR;
2209 }
2210
2211 if (b == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
2212 {
2213 if (thd->is_strict_mode())
2214 {
2215 thd->set_row_count_func(0);
2216 ci->rc = b;
2217 // Turn this on as MariaDB doesn't do it until the next phase
2218 thd->abort_on_warning = thd->is_strict_mode();
2219 rc = ER_INTERNAL_ERROR;
2220 }
2221 else
2222 {
2223 ci->affectedRows = dmlRowCount;
2224 }
2225
2226 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARN_DATA_OUT_OF_RANGE, errorMsg.c_str());
2227 }
2228 else
2229 {
2230 // if (dmlRowCount != 0) //Bug 5117. Handling self join.
2231 ci->affectedRows = dmlRowCount;
2232 //cout << " error status " << ci->rc << " and rowcount = " << dmlRowCount << endl;
2233 }
2234
2235 // insert query stats
2236 ci->stats.setEndTime();
2237
2238 try
2239 {
2240 ci->stats.insert();
2241 }
2242 catch (std::exception& e)
2243 {
2244 string msg = string("Columnstore Query Stats - ") + e.what();
2245 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
2246 }
2247
2248 delete ci->dmlProc;
2249 ci->dmlProc = nullptr;
2250 return rc;
2251 }
2252
2253 } //anon namespace
2254
ha_mcs_impl_open(const char * name,int mode,uint32_t test_if_locked)2255 int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked)
2256 {
2257 IDEBUG ( cout << "ha_mcs_impl_open: " << name << ", " << mode << ", " << test_if_locked << endl );
2258 Config::makeConfig();
2259 return 0;
2260 }
2261
ha_mcs_impl_close(void)2262 int ha_mcs_impl_close(void)
2263 {
2264 IDEBUG( cout << "ha_mcs_impl_close" << endl );
2265 return 0;
2266 }
2267
ha_mcs_impl_discover_existence(const char * schema,const char * name)2268 int ha_mcs_impl_discover_existence(const char* schema, const char* name)
2269 {
2270 boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog();
2271
2272 try
2273 {
2274 const CalpontSystemCatalog::OID oid = csc->lookupTableOID(make_table(schema, name, lower_case_table_names));
2275
2276 if (oid)
2277 return 1;
2278 }
2279 catch ( ... )
2280 {
2281 }
2282
2283 return 0;
2284 }
2285
ha_mcs_impl_direct_update_delete_rows(bool execute,ha_rows * affected_rows,const std::vector<COND * > & condStack)2286 int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector<COND*>& condStack)
2287 {
2288 THD* thd = current_thd;
2289 int rc = 0;
2290 cal_impl_if::gp_walk_info gwi;
2291 gwi.thd = thd;
2292
2293 if (thd->slave_thread && !get_replication_slave(thd) && (
2294 thd->lex->sql_command == SQLCOM_INSERT ||
2295 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
2296 thd->lex->sql_command == SQLCOM_UPDATE ||
2297 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
2298 thd->lex->sql_command == SQLCOM_DELETE ||
2299 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
2300 thd->lex->sql_command == SQLCOM_TRUNCATE ||
2301 thd->lex->sql_command == SQLCOM_LOAD))
2302 {
2303 if (affected_rows)
2304 *affected_rows = 0;
2305 return 0;
2306 }
2307
2308 if (execute)
2309 {
2310 rc = doUpdateDelete(thd, gwi, condStack);
2311 }
2312
2313 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2314 if (ci)
2315 {
2316 *affected_rows = ci->affectedRows;
2317 }
2318
2319 return rc;
2320 }
2321
impl_rnd_init(TABLE * table,const std::vector<COND * > & condStack)2322 int ha_mcs::impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
2323 {
2324 IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl );
2325 THD* thd = current_thd;
2326
2327 gp_walk_info gwi;
2328 gwi.thd = thd;
2329
2330 if (thd->slave_thread && !get_replication_slave(thd) && (
2331 thd->lex->sql_command == SQLCOM_INSERT ||
2332 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
2333 thd->lex->sql_command == SQLCOM_UPDATE ||
2334 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
2335 thd->lex->sql_command == SQLCOM_DELETE ||
2336 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
2337 thd->lex->sql_command == SQLCOM_TRUNCATE ||
2338 thd->lex->sql_command == SQLCOM_LOAD))
2339 return 0;
2340
2341 //check whether the system is ready to process statement.
2342 #ifndef _MSC_VER
2343 static DBRM dbrm(true);
2344 int bSystemQueryReady = dbrm.getSystemQueryReady();
2345
2346 if (bSystemQueryReady == 0)
2347 {
2348 // Still not ready
2349 setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries");
2350 return ER_INTERNAL_ERROR;
2351 }
2352 else if (bSystemQueryReady < 0)
2353 {
2354 // Still not ready
2355 setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries");
2356 return ER_INTERNAL_ERROR;
2357 }
2358 #endif
2359
2360 // Set this to close all outstanding FEP connections on
2361 // client disconnect in handlerton::closecon_handlerton().
2362 if ( !thd_get_ha_data(thd, mcs_hton))
2363 {
2364 thd_set_ha_data(thd, mcs_hton, reinterpret_cast<void*>(0x42));
2365 }
2366
2367 #if 0
2368 if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0)
2369 {
2370 string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_RBR_EVENT);
2371 setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
2372 return ER_CHECK_NOT_IMPLEMENTED;
2373 }
2374 #endif
2375
2376 // If ALTER TABLE and not ENGINE= we don't need rnd_init (gets us in a bad state)
2377 if ((thd->lex->sql_command == SQLCOM_ALTER_TABLE) && !(thd->lex->create_info.used_fields & HA_CREATE_USED_ENGINE))
2378 {
2379 return 0;
2380 }
2381
2382 /*
2383 Update and delete code.
2384 Note, we may be updating/deleting a different table,
2385 and the current one is only needed for reading,
2386 e.g. cstab1 is needed for reading in this example:
2387
2388 UPDATE innotab1 SET a=100 WHERE a NOT IN (SELECT a FROM cstab1 WHERE a=1);
2389 */
2390 if (!isReadOnly() && // make sure the current table is being modified
2391 (thd->lex->sql_command == SQLCOM_UPDATE ||
2392 thd->lex->sql_command == SQLCOM_DELETE ||
2393 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
2394 thd->lex->sql_command == SQLCOM_UPDATE_MULTI))
2395 return doUpdateDelete(thd, gwi, condStack);
2396
2397 uint32_t sessionID = tid2sid(thd->thread_id);
2398 boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
2399 csc->identity(CalpontSystemCatalog::FE);
2400
2401 if (get_fe_conn_info_ptr() == nullptr)
2402 set_fe_conn_info_ptr((void*)new cal_connection_info());
2403
2404 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2405
2406 idbassert(ci != 0);
2407
2408 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
2409 {
2410 force_close_fep_conn(thd, ci);
2411 return 0;
2412 }
2413
2414 sm::tableid_t tableid = 0;
2415 cal_table_info ti;
2416 sm::cpsm_conhdl_t* hndl;
2417 SCSEP csep;
2418
2419 // update traceFlags according to the autoswitch state.
2420 ci->traceFlags |= CalpontSelectExecutionPlan::TRACE_TUPLE_OFF;
2421
2422 bool localQuery = get_local_query(thd);
2423
2424 // table mode
2425 {
2426 ti = ci->tableMap[table];
2427
2428 // get connection handle for this table handler
2429 // re-establish table handle
2430 if (ti.conn_hndl)
2431 {
2432 sm::sm_cleanup(ti.conn_hndl);
2433 ti.conn_hndl = 0;
2434 }
2435
2436 sm::sm_init(sessionID, &ti.conn_hndl, localQuery);
2437 ti.conn_hndl->csc = csc;
2438 hndl = ti.conn_hndl;
2439
2440 try
2441 {
2442 ti.conn_hndl->connect();
2443 }
2444 catch (...)
2445 {
2446 setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
2447 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2448 goto error;
2449 }
2450
2451 // get filter plan for table
2452 if (ti.csep.get() == 0)
2453 {
2454 ti.csep.reset(new CalpontSelectExecutionPlan());
2455
2456 SessionManager sm;
2457 BRM::TxnID txnID;
2458 txnID = sm.getTxnID(sessionID);
2459
2460 if (!txnID.valid)
2461 {
2462 txnID.id = 0;
2463 txnID.valid = true;
2464 }
2465
2466 QueryContext verID;
2467 verID = sm.verID();
2468
2469 ti.csep->txnID(txnID.id);
2470 ti.csep->verID(verID);
2471 ti.csep->sessionID(sessionID);
2472
2473 if (thd->db.length)
2474 ti.csep->schemaName(thd->db.str, lower_case_table_names);
2475
2476 ti.csep->traceFlags(ci->traceFlags);
2477 ti.msTablePtr = table;
2478
2479 // send plan whenever rnd_init is called
2480 cp_get_table_plan(thd, ti.csep, ti);
2481 }
2482
2483 IDEBUG( cerr << table->s->table_name.str << " send plan:" << endl );
2484 IDEBUG( cerr << *ti.csep << endl );
2485 csep = ti.csep;
2486
2487 // for ExeMgr logging sqltext. only log once for the query although multi plans may be sent
2488 // CS adds the ti into TM in the end of rnd_init thus we log the SQL
2489 // only once when there is no ti with csep.
2490 if (onlyOneTableinTM(ci))
2491 {
2492 ti.csep->data(idb_mysql_query_str(thd));
2493 }
2494 else
2495 {
2496 ti.csep->data("<part of the query executed in table mode>");
2497 }
2498 }
2499
2500 {
2501 ByteStream msg;
2502 ByteStream emsgBs;
2503
2504 while (true)
2505 {
2506 try
2507 {
2508 ByteStream::quadbyte qb = 4;
2509 msg << qb;
2510 hndl->exeMgr->write(msg);
2511 msg.restart();
2512 csep->rmParms(ci->rmParms);
2513
2514 //send plan
2515 csep->serialize(msg);
2516 hndl->exeMgr->write(msg);
2517
2518 //get ExeMgr status back to indicate a vtable joblist success or not
2519 msg.restart();
2520 emsgBs.restart();
2521 msg = hndl->exeMgr->read();
2522 emsgBs = hndl->exeMgr->read();
2523 string emsg;
2524
2525 if (msg.length() == 0 || emsgBs.length() == 0)
2526 {
2527 emsg = "Lost connection to ExeMgr. Please contact your administrator";
2528 setError(thd, ER_INTERNAL_ERROR, emsg);
2529 return ER_INTERNAL_ERROR;
2530 }
2531
2532 string emsgStr;
2533 emsgBs >> emsgStr;
2534 bool err = false;
2535
2536 if (msg.length() == 4)
2537 {
2538 msg >> qb;
2539
2540 if (qb != 0)
2541 {
2542 err = true;
2543 // for makejoblist error, stats contains only error code and insert from here
2544 // because table fetch is not started
2545 ci->stats.setEndTime();
2546 ci->stats.fQuery = csep->data();
2547 ci->stats.fQueryType = csep->queryType();
2548 ci->stats.fErrorNo = qb;
2549
2550 try
2551 {
2552 ci->stats.insert();
2553 }
2554 catch (std::exception& e)
2555 {
2556 string msg = string("Columnstore Query Stats - ") + e.what();
2557 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
2558 }
2559 }
2560 }
2561 else
2562 {
2563 err = true;
2564 }
2565
2566 if (err)
2567 {
2568 setError(thd, ER_INTERNAL_ERROR, emsgStr);
2569 return ER_INTERNAL_ERROR;
2570 }
2571
2572 ci->rmParms.clear();
2573
2574 ci->tableMap[table] = ti;
2575
2576 break;
2577 }
2578 catch (...)
2579 {
2580 sm::sm_cleanup(hndl);
2581 hndl = 0;
2582
2583 sm::sm_init(sessionID, &hndl, localQuery);
2584 idbassert(hndl != 0);
2585 hndl->csc = csc;
2586
2587 ti.conn_hndl = hndl;
2588
2589 try
2590 {
2591 hndl->connect();
2592 }
2593 catch (...)
2594 {
2595 setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
2596 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2597 goto error;
2598 }
2599
2600 msg.restart();
2601 }
2602 }
2603 }
2604
2605 // common path for both vtable select phase and table mode -- open scan handle
2606 ti = ci->tableMap[table];
2607 ti.msTablePtr = table;
2608
2609 if (ti.tpl_ctx == nullptr)
2610 {
2611 ti.tpl_ctx = new sm::cpsm_tplh_t();
2612 ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
2613 }
2614
2615 // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
2616 // call rnd_init for a table more than once.
2617 ti.tpl_scan_ctx->rowGroup = nullptr;
2618
2619 try
2620 {
2621 tableid = execplan::IDB_VTABLE_ID;
2622 }
2623 catch (...)
2624 {
2625 string emsg = "No table ID found for table " + string(table->s->table_name.str);
2626 setError(thd, ER_INTERNAL_ERROR, emsg);
2627 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2628 goto internal_error;
2629 }
2630
2631 try
2632 {
2633 sm::tpl_open(tableid, ti.tpl_ctx, hndl);
2634 sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
2635 }
2636 catch (std::exception& e)
2637 {
2638 string emsg = "table can not be opened: " + string(e.what());
2639 setError(thd, ER_INTERNAL_ERROR, emsg);
2640 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2641 goto internal_error;
2642 }
2643 catch (...)
2644 {
2645 string emsg = "table can not be opened";
2646 setError(thd, ER_INTERNAL_ERROR, emsg);
2647 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2648 goto internal_error;
2649 }
2650
2651 ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
2652
2653 if ((ti.tpl_scan_ctx->ctp).size() == 0)
2654 {
2655 uint32_t num_attr = table->s->fields;
2656
2657 for (uint32_t i = 0; i < num_attr; i++)
2658 {
2659 CalpontSystemCatalog::ColType ctype;
2660 ti.tpl_scan_ctx->ctp.push_back(ctype);
2661 }
2662
2663 // populate coltypes here for table mode because tableband gives treeoid for dictionary column
2664 {
2665 CalpontSystemCatalog::RIDList oidlist = csc->columnRIDs(make_table(table->s->db.str, table->s->table_name.str, lower_case_table_names), true);
2666
2667 if (oidlist.size() != num_attr)
2668 {
2669 string emsg = "Size mismatch probably caused by front end out of sync";
2670 setError(thd, ER_INTERNAL_ERROR, emsg);
2671 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2672 goto internal_error;
2673 }
2674
2675 for (unsigned int j = 0; j < oidlist.size(); j++)
2676 {
2677 CalpontSystemCatalog::ColType ctype = csc->colType(oidlist[j].objnum);
2678 ti.tpl_scan_ctx->ctp[ctype.colPosition] = ctype;
2679 ti.tpl_scan_ctx->ctp[ctype.colPosition].colPosition = -1;
2680 }
2681 }
2682 }
2683
2684 ci->tableMap[table] = ti;
2685 return 0;
2686
2687 error:
2688 // CS doesn't need to close the actual sockets
2689 // b/c it tries to reuse it running next query.
2690 if (ci->cal_conn_hndl)
2691 {
2692 sm::sm_cleanup(ci->cal_conn_hndl);
2693 ci->cal_conn_hndl = 0;
2694 }
2695
2696 return ER_INTERNAL_ERROR;
2697
2698 internal_error:
2699
2700 if (ci->cal_conn_hndl)
2701 {
2702 sm::sm_cleanup(ci->cal_conn_hndl);
2703 ci->cal_conn_hndl = 0;
2704 }
2705
2706 return ER_INTERNAL_ERROR;
2707 }
2708
ha_mcs_impl_rnd_next(uchar * buf,TABLE * table)2709 int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table)
2710 {
2711 THD* thd = current_thd;
2712
2713 if (thd->slave_thread && !get_replication_slave(thd) && (
2714 thd->lex->sql_command == SQLCOM_INSERT ||
2715 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
2716 thd->lex->sql_command == SQLCOM_UPDATE ||
2717 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
2718 thd->lex->sql_command == SQLCOM_DELETE ||
2719 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
2720 thd->lex->sql_command == SQLCOM_TRUNCATE ||
2721 thd->lex->sql_command == SQLCOM_LOAD))
2722 return HA_ERR_END_OF_FILE;
2723
2724 if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
2725 ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
2726 return HA_ERR_END_OF_FILE;
2727
2728 // @bug 2547
2729 // MCOL-2178 This variable can never be true in the scope of this function
2730 // if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
2731 // return HA_ERR_END_OF_FILE;
2732
2733 if (get_fe_conn_info_ptr() == nullptr)
2734 set_fe_conn_info_ptr((void*)new cal_connection_info());
2735
2736 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2737
2738 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
2739 {
2740 force_close_fep_conn(thd, ci);
2741 return 0;
2742 }
2743
2744 if (ci->alterTableState > 0) return HA_ERR_END_OF_FILE;
2745
2746 cal_table_info ti;
2747 ti = ci->tableMap[table];
2748 int rc = HA_ERR_END_OF_FILE;
2749
2750 if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
2751 {
2752 CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
2753 return ER_INTERNAL_ERROR;
2754 }
2755
2756 idbassert(ti.msTablePtr == table);
2757
2758 try
2759 {
2760 rc = fetchNextRow(buf, ti, ci);
2761 }
2762 catch (std::exception& e)
2763 {
2764 string emsg = string("Error while fetching from ExeMgr: ") + e.what();
2765 setError(thd, ER_INTERNAL_ERROR, emsg);
2766 CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
2767 return ER_INTERNAL_ERROR;
2768 }
2769
2770 ci->tableMap[table] = ti;
2771
2772 if (rc != 0 && rc != HA_ERR_END_OF_FILE)
2773 {
2774 string emsg;
2775
2776 // remove this check when all error handling migrated to the new framework.
2777 if (rc >= 1000)
2778 emsg = ti.tpl_scan_ctx->errMsg;
2779 else
2780 {
2781 logging::ErrorCodes errorcodes;
2782 emsg = errorcodes.errorString(rc);
2783 }
2784
2785 setError(thd, ER_INTERNAL_ERROR, emsg);
2786 //setError(thd, ER_INTERNAL_ERROR, "testing");
2787 ci->stats.fErrorNo = rc;
2788 CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
2789 rc = ER_INTERNAL_ERROR;
2790 }
2791
2792 return rc;
2793 }
2794
ha_mcs_impl_rnd_end(TABLE * table,bool is_pushdown_hand)2795 int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
2796 {
2797 int rc = 0;
2798 THD* thd = current_thd;
2799
2800 if (thd->slave_thread && !get_replication_slave(thd) && (
2801 thd->lex->sql_command == SQLCOM_INSERT ||
2802 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
2803 thd->lex->sql_command == SQLCOM_UPDATE ||
2804 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
2805 thd->lex->sql_command == SQLCOM_DELETE ||
2806 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
2807 thd->lex->sql_command == SQLCOM_TRUNCATE ||
2808 thd->lex->sql_command == SQLCOM_LOAD))
2809 return 0;
2810
2811 if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE )
2812 return rc;
2813
2814 if (((thd->lex)->sql_command == SQLCOM_UPDATE) ||
2815 ((thd->lex)->sql_command == SQLCOM_DELETE) ||
2816 ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) ||
2817 ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
2818 return rc;
2819
2820 // MCOL-2178 isUnion member only assigned, never used
2821 // if (is_pushdown_hand)
2822 // {
2823 // MIGR::infinidb_vtable.isUnion = false;
2824 // }
2825 cal_connection_info* ci = nullptr;
2826
2827 if (get_fe_conn_info_ptr() != NULL)
2828 ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2829
2830 if (!ci)
2831 {
2832 set_fe_conn_info_ptr((void*)new cal_connection_info());
2833 ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2834 }
2835
2836 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
2837 {
2838 force_close_fep_conn(thd, ci);
2839 // clear querystats because no query stats available for cancelled query
2840 ci->queryStats = "";
2841 return 0;
2842 }
2843
2844 IDEBUG( cerr << "rnd_end for table " << table->s->table_name.str << endl );
2845
2846 cal_table_info ti = ci->tableMap[table];
2847 sm::cpsm_conhdl_t* hndl;
2848
2849 if (!is_pushdown_hand)
2850 hndl = ti.conn_hndl;
2851 else
2852 hndl = ci->cal_conn_hndl;
2853
2854 if (ti.tpl_ctx)
2855 {
2856 if (ti.tpl_scan_ctx.get())
2857 {
2858 try
2859 {
2860 sm::tpl_scan_close(ti.tpl_scan_ctx);
2861 }
2862 catch (...)
2863 {
2864 rc = ER_INTERNAL_ERROR;
2865 }
2866 }
2867
2868 ti.tpl_scan_ctx.reset();
2869
2870 try
2871 {
2872 {
2873 bool ask_4_stats = (ci->traceFlags) ? true : false;
2874 sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, ask_4_stats);
2875 }
2876
2877 // set conn hndl back. could be changed in tpl_close
2878 if (!is_pushdown_hand)
2879 ti.conn_hndl = hndl;
2880 else
2881 ci->cal_conn_hndl = hndl;
2882
2883 ti.tpl_ctx = 0;
2884 }
2885 catch (IDBExcept& e)
2886 {
2887 if (e.errorCode() == ERR_CROSS_ENGINE_CONNECT || e.errorCode() == ERR_CROSS_ENGINE_CONFIG)
2888 {
2889 string msg = string("Columnstore Query Stats - ") + e.what();
2890 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
2891 }
2892 else
2893 {
2894 setError(thd, ER_INTERNAL_ERROR, e.what());
2895 rc = ER_INTERNAL_ERROR;
2896 }
2897 }
2898 catch (std::exception& e)
2899 {
2900 setError(thd, ER_INTERNAL_ERROR, e.what());
2901 rc = ER_INTERNAL_ERROR;
2902 }
2903 catch (...)
2904 {
2905 setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in rnd_end");
2906 rc = ER_INTERNAL_ERROR;
2907 }
2908 }
2909
2910 ti.tpl_ctx = 0;
2911
2912 ci->tableMap[table] = ti;
2913
2914 // push warnings from CREATE phase
2915 if (!ci->warningMsg.empty())
2916 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, ci->warningMsg.c_str());
2917
2918 ci->warningMsg.clear();
2919 // reset expressionId just in case
2920 ci->expressionId = 0;
2921
2922 thd_set_ha_data(thd, mcs_hton, reinterpret_cast<void*>(ci));
2923
2924 return rc;
2925 }
2926
ha_mcs_impl_create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)2927 int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO* create_info)
2928 {
2929 THD* thd = current_thd;
2930
2931 if (get_fe_conn_info_ptr() == nullptr)
2932 set_fe_conn_info_ptr((void*)new cal_connection_info());
2933
2934 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2935
2936 //@Bug 1948. Mysql calls create table to create a new table with new signature.
2937 if (ci->alterTableState > 0) return 0;
2938
2939 // Just to be sure
2940 if (!table_arg)
2941 {
2942 setError(thd, ER_INTERNAL_ERROR, "ha_mcs_impl_create_: table_arg is NULL");
2943 return 1;
2944 }
2945
2946 if (!table_arg->s)
2947 {
2948 setError(thd, ER_INTERNAL_ERROR, "ha_mcs_impl_create_: table_arg->s is NULL");
2949 return 1;
2950 }
2951
2952 int rc = ha_mcs_impl_create_(name, table_arg, create_info, *ci);
2953
2954 return rc;
2955 }
2956
ha_mcs_impl_delete_table(const char * name)2957 int ha_mcs_impl_delete_table(const char* name)
2958 {
2959 THD* thd = current_thd;
2960 char* dbName = nullptr;
2961
2962 if (!name)
2963 {
2964 setError(thd, ER_INTERNAL_ERROR, "Drop Table with NULL name not permitted");
2965 return 1;
2966 }
2967
2968 //if this is an InfiniDB tmp table ('#sql*.frm') just leave...
2969 if (!memcmp((uchar*)name, tmp_file_prefix, tmp_file_prefix_length)) return 0;
2970
2971 if (get_fe_conn_info_ptr() == nullptr)
2972 set_fe_conn_info_ptr((void*)new cal_connection_info());
2973
2974 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2975
2976 if (!thd) return 0;
2977
2978 if (!thd->lex) return 0;
2979
2980 if (!idb_mysql_query_str(thd)) return 0;
2981
2982 if (thd->lex->sql_command == SQLCOM_DROP_DB)
2983 {
2984 dbName = const_cast<char*>(thd->lex->name.str);
2985 }
2986 else
2987 {
2988 TABLE_LIST* first_table = (TABLE_LIST*) thd->lex->first_select_lex()->table_list.first;
2989 dbName = const_cast<char*>(first_table->db.str);
2990 }
2991
2992 if (!dbName)
2993 {
2994 setError(thd, ER_INTERNAL_ERROR, "Drop Table with NULL schema not permitted");
2995 return 1;
2996 }
2997
2998 if (!ci) return 0;
2999
3000 //@Bug 1948,2306. if alter table want to drop the old table, InfiniDB does not need to drop.
3001 if ( ci->isAlter )
3002 {
3003 ci->isAlter = false;
3004 return 0;
3005 }
3006
3007 int rc = ha_mcs_impl_delete_table_(dbName, name, *ci);
3008 return rc;
3009 }
ha_mcs_impl_write_row(const uchar * buf,TABLE * table,uint64_t rows_changed)3010 int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed)
3011 {
3012 THD* thd = current_thd;
3013
3014 if (thd->slave_thread && !get_replication_slave(thd))
3015 return 0;
3016
3017 // Error out INSERT on VIEW. It's currently not supported.
3018 // @note INSERT on VIEW works natually (for simple cases at least), but we choose to turn it off
3019 // for now - ZZ.
3020
3021 if (thd->lex->query_tables->view)
3022 {
3023 Message::Args args;
3024 args.add("Insert");
3025 string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_VIEW, args);
3026 setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
3027 return ER_CHECK_NOT_IMPLEMENTED;
3028 }
3029
3030 if (get_fe_conn_info_ptr() == nullptr)
3031 set_fe_conn_info_ptr((void*)new cal_connection_info());
3032
3033 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3034
3035 // At the beginning of insert, make sure there are no
3036 // left-over values from a previously possibly failed insert.
3037 if (rows_changed == 0)
3038 ci->tableValuesMap.clear();
3039
3040 if (ci->alterTableState > 0)
3041 return 0;
3042
3043 ha_rows rowsInserted = 0;
3044 int rc = 0;
3045
3046 // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
3047 // transaction or not. User should use this option very carefully since
3048 // cpimport currently does not support rollbacks
3049 if (((ci->useCpimport == 2) ||
3050 ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
3051 (!ci->singleInsert) &&
3052 ((ci->isLoaddataInfile) ||
3053 ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
3054 ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) )
3055 {
3056 rc = ha_mcs_impl_write_batch_row_(buf, table, *ci);
3057 }
3058 else
3059 {
3060 if ( !ci->dmlProc )
3061 {
3062 ci->dmlProc = new MessageQueueClient("DMLProc");
3063 //cout << "write_row starts a client " << ci->dmlProc << " for session " << thd->thread_id << endl;
3064 }
3065
3066 rc = ha_mcs_impl_write_row_(buf, table, *ci, rowsInserted);
3067
3068 }
3069
3070 //@Bug 2438 Added a variable rowsHaveInserted to keep track of how many rows have been inserted already.
3071 if ( !ci->singleInsert && ( rc == 0 ) && ( rowsInserted > 0 ))
3072 {
3073 ci->rowsHaveInserted += rowsInserted;
3074 }
3075
3076 return rc;
3077 }
3078
ha_mcs_impl_update_row()3079 int ha_mcs_impl_update_row()
3080 {
3081 if (get_fe_conn_info_ptr() == nullptr)
3082 set_fe_conn_info_ptr((void*)new cal_connection_info());
3083
3084 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3085 int rc = ci->rc;
3086
3087 if ( rc != 0 )
3088 ci->rc = 0;
3089
3090 return ( rc );
3091 }
3092
ha_mcs_impl_delete_row()3093 int ha_mcs_impl_delete_row()
3094 {
3095 if (get_fe_conn_info_ptr() == nullptr)
3096 set_fe_conn_info_ptr((void*)new cal_connection_info());
3097
3098 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3099 int rc = ci->rc;
3100
3101 if ( rc != 0 )
3102 ci->rc = 0;
3103
3104 return ( rc );
3105 }
3106
ha_mcs_impl_start_bulk_insert(ha_rows rows,TABLE * table,bool is_cache_insert)3107 void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_insert)
3108 {
3109 THD* thd = current_thd;
3110
3111 if (thd->slave_thread && !get_replication_slave(thd))
3112 return;
3113
3114 if (get_fe_conn_info_ptr() == nullptr)
3115 set_fe_conn_info_ptr((void*)new cal_connection_info());
3116
3117 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3118
3119 // clear rows variable
3120 ci->rowsHaveInserted = 0;
3121
3122 if (ci->alterTableState > 0)
3123 return;
3124
3125 //@bug 5660. Error out DDL/DML on slave node, or on local query node
3126 if (ci->isSlaveNode)
3127 {
3128 string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE);
3129 setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
3130 return;
3131 }
3132
3133 //@bug 4771. reject REPLACE key word
3134 if ((thd->lex)->sql_command == SQLCOM_REPLACE_SELECT)
3135 {
3136 setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, "REPLACE statement is not supported in Columnstore.");
3137 }
3138
3139 boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(tid2sid(thd->thread_id));
3140 csc->identity(execplan::CalpontSystemCatalog::FE);
3141
3142 //@Bug 2515.
3143 //Check command instead of vtable state
3144 if ((thd->lex)->sql_command == SQLCOM_INSERT)
3145 {
3146 string insertStmt = idb_mysql_query_str(thd);
3147 boost::algorithm::to_lower(insertStmt);
3148 string intoStr("into");
3149 size_t found = insertStmt.find(intoStr);
3150
3151 if (found != string::npos)
3152 insertStmt.erase(found);
3153
3154 found = insertStmt.find("ignore");
3155
3156 if (found != string::npos)
3157 {
3158 setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, "IGNORE option in insert statement is not supported in Columnstore.");
3159 }
3160
3161 if ( rows > 1 )
3162 {
3163 ci->singleInsert = false;
3164 }
3165 }
3166 else if ( (thd->lex)->sql_command == SQLCOM_LOAD || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT)
3167 {
3168 ci->singleInsert = false;
3169 ci->isLoaddataInfile = true;
3170 }
3171
3172 if (is_cache_insert && (thd->lex)->sql_command != SQLCOM_INSERT_SELECT)
3173 {
3174 ci->isCacheInsert = true;
3175
3176 if (rows > 1)
3177 ci->singleInsert = false;
3178 }
3179
3180 ci->bulkInsertRows = rows;
3181
3182 if ((((thd->lex)->sql_command == SQLCOM_INSERT) ||
3183 ((thd->lex)->sql_command == SQLCOM_LOAD) ||
3184 (thd->lex)->sql_command == SQLCOM_INSERT_SELECT ||
3185 ci->isCacheInsert) && !ci->singleInsert )
3186 {
3187 ci->useCpimport = get_use_import_for_batchinsert(thd);
3188
3189 if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0))
3190 ci->useCpimport = 0;
3191
3192 // For now, disable cpimport for cache inserts
3193 if (ci->isCacheInsert)
3194 ci->useCpimport = 0;
3195
3196 // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
3197 // transaction or not. User should use this option very carefully since
3198 // cpimport currently does not support rollbacks
3199 if ((ci->useCpimport == 2) ||
3200 ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) //If autocommit on batch insert will use cpimport to load data
3201 {
3202 //store table info to connection info
3203 CalpontSystemCatalog::TableName tableName;
3204 tableName.schema = table->s->db.str;
3205 tableName.table = table->s->table_name.str;
3206 ci->useXbit = false;
3207 CalpontSystemCatalog::RIDList colrids;
3208
3209 try
3210 {
3211 colrids = csc->columnRIDs(tableName, false, lower_case_table_names);
3212 }
3213 catch (IDBExcept& ie)
3214 {
3215 // TODO Can't use ERR_UNKNOWN_TABLE because it needs two
3216 // arguments to format. Update setError to take vararg.
3217 // setError(thd, ER_UNKNOWN_TABLE, ie.what());
3218 setError(thd, ER_INTERNAL_ERROR, ie.what());
3219 ci->rc = 5;
3220 ci->singleInsert = true;
3221 return;
3222 }
3223
3224 ci->useXbit = table->s->db_options_in_use & HA_OPTION_PACK_RECORD;
3225
3226 // TODO: This needs a proper fix.
3227 if (is_cache_insert)
3228 ci->useXbit = false;
3229
3230 //@bug 6122 Check how many columns have not null constraint. columnn with not null constraint will not show up in header.
3231 unsigned int numberNotNull = 0;
3232
3233 for (unsigned int j = 0; j < colrids.size(); j++)
3234 {
3235 CalpontSystemCatalog::ColType ctype = csc->colType(colrids[j].objnum);
3236 ci->columnTypes.push_back(ctype);
3237
3238 if ((( ctype.colDataType == CalpontSystemCatalog::VARCHAR ) || ( ctype.colDataType == CalpontSystemCatalog::VARBINARY )) && !ci->useXbit )
3239 ci->useXbit = true;
3240
3241 if (ctype.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
3242 numberNotNull++;
3243 }
3244
3245 // The length of the record header is:(1 + number of columns + 7) / 8 bytes
3246 if (ci->useXbit)
3247 ci->headerLength = (1 + colrids.size() + 7 - 1 - numberNotNull) / 8; //xbit is used
3248 else
3249 ci->headerLength = (1 + colrids.size() + 7 - numberNotNull) / 8;
3250
3251 //Log the statement to debug.log
3252 {
3253 ostringstream oss;
3254 oss << "Start SQL statement: " << idb_mysql_query_str(thd) << "; |" << table->s->db.str << "|";
3255 log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3256 }
3257
3258 //start process cpimport mode 1
3259 ci->mysqld_pid = getpid();
3260
3261 //get delimiter
3262 if (char(get_import_for_batchinsert_delimiter(thd)) != '\007')
3263 ci->delimiter = char(get_import_for_batchinsert_delimiter(thd));
3264 else
3265 ci->delimiter = '\007';
3266
3267 //get enclosed by
3268 if (char(get_import_for_batchinsert_enclosed_by(thd)) != 8)
3269 ci->enclosed_by = char(get_import_for_batchinsert_enclosed_by(thd));
3270 else
3271 ci->enclosed_by = 8;
3272
3273 //cout << "current set up is usecpimport:delimiter = " << (int)ci->useCpimport<<":"<< ci->delimiter <<endl;
3274 //set up for cpimport
3275 std::vector<char*> Cmds;
3276 std::string aCmdLine;
3277 std::string aTmpDir(startup::StartUp::tmpDir());
3278
3279 //If local module type is not PM and Local PM query is set, error out
3280 char escapechar[2] = "";
3281
3282 if (ci->enclosed_by == 34) // Double quotes
3283 strcat(escapechar, "\\");
3284
3285 if (get_local_query(thd))
3286 {
3287 const auto oamcache = oam::OamCache::makeOamCache();
3288 int localModuleId = oamcache->getLocalPMId();
3289
3290 if (localModuleId == 0)
3291 {
3292 setError(current_thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_LOCAL_QUERY_UM));
3293 ci->singleInsert = true;
3294 log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG,
3295 tid2sid(thd->thread_id));
3296 return;
3297 }
3298 else
3299 {
3300 #ifdef _MSC_VER
3301 aCmdLine = "cpimport.exe -N -P " + to_string(localModuleId) + " -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
3302 #else
3303 aCmdLine = "cpimport -m 1 -N -P " + boost::to_string(localModuleId) + " -s " + ci->delimiter + " -e 0" + " -T " + thd->variables.time_zone->get_name()->ptr() + " -E " + escapechar + ci->enclosed_by + " ";
3304 #endif
3305 }
3306 }
3307 else
3308 {
3309 #ifdef _MSC_VER
3310 aCmdLine = "cpimport.exe -N -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
3311 #else
3312 aCmdLine = std::string("cpimport -m 1 -N -s ") + ci->delimiter + " -e 0" + " -T " + thd->variables.time_zone->get_name()->ptr() + " -E " + escapechar + ci->enclosed_by + " ";
3313 #endif
3314 }
3315
3316 aCmdLine = aCmdLine + table->s->db.str + " " + table->s->table_name.str ;
3317
3318 std::istringstream ss(aCmdLine);
3319 std::string arg;
3320 std::vector<std::string> v2(20, "");
3321 unsigned int i = 0;
3322
3323 while (ss >> arg)
3324 {
3325 v2[i++] = arg;
3326 }
3327
3328 for (unsigned int j = 0; j < i; ++j)
3329 {
3330 Cmds.push_back(const_cast<char*>(v2[j].c_str()));
3331 }
3332
3333 Cmds.push_back(0); //null terminate
3334
3335 #ifdef _MSC_VER
3336 BOOL bSuccess = false;
3337 BOOL bInitialized = false;
3338 SECURITY_ATTRIBUTES saAttr;
3339 saAttr.nLength = sizeof(SECURITY_ATTRIBUTES);
3340 saAttr.bInheritHandle = TRUE;
3341 saAttr.lpSecurityDescriptor = nullptr;
3342 HANDLE handleList[2];
3343 const char* pSectionMsg;
3344 bSuccess = true;
3345
3346 // Create a pipe for the child process's STDIN.
3347 if (bSuccess)
3348 {
3349 pSectionMsg = "Create Stdin";
3350 bSuccess = CreatePipe(&ci->cpimport_stdin_Rd, &ci->cpimport_stdin_Wr, &saAttr, 65536);
3351
3352 // Ensure the write handle to the pipe for STDIN is not inherited.
3353 if (bSuccess)
3354 {
3355 pSectionMsg = "SetHandleInformation(stdin)";
3356 bSuccess = SetHandleInformation(ci->cpimport_stdin_Wr, HANDLE_FLAG_INHERIT, 0);
3357 }
3358 }
3359
3360 // Launch cpimport
3361 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList = nullptr;
3362 SIZE_T attrSize = 0;
3363 STARTUPINFOEX siStartInfo;
3364
3365 // To ensure the child only inherits the STDIN and STDOUT Handles, we add a list of
3366 // Handles that can be inherited to the call to CreateProcess
3367 if (bSuccess)
3368 {
3369 pSectionMsg = "InitializeProcThreadAttributeList(NULL)";
3370 bSuccess = InitializeProcThreadAttributeList(NULL, 1, 0, &attrSize) ||
3371 GetLastError() == ERROR_INSUFFICIENT_BUFFER; // Asks how much buffer to alloc
3372 }
3373
3374 if (bSuccess)
3375 {
3376 pSectionMsg = "HeapAlloc for AttrList";
3377 lpAttributeList = reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>
3378 (HeapAlloc(GetProcessHeap(), 0, attrSize));
3379 bSuccess = lpAttributeList != nullptr;
3380 }
3381
3382 if (bSuccess)
3383 {
3384 pSectionMsg = "InitializeProcThreadAttributeList";
3385 bSuccess = InitializeProcThreadAttributeList(lpAttributeList, 1, 0, &attrSize);
3386 }
3387
3388 if (bSuccess)
3389 {
3390 pSectionMsg = "UpdateProcThreadAttribute";
3391 bInitialized = true;
3392 handleList[0] = ci->cpimport_stdin_Rd;
3393 bSuccess = UpdateProcThreadAttribute(lpAttributeList,
3394 0, PROC_THREAD_ATTRIBUTE_HANDLE_LIST,
3395 handleList, sizeof(HANDLE), NULL, NULL);
3396 }
3397
3398 if (bSuccess)
3399 {
3400 pSectionMsg = "CreateProcess";
3401 // In order for GenerateConsoleCtrlEvent (used when job is canceled) to work,
3402 // this process must have a Console, which Services don't have. We create this
3403 // when we create the child process. Once created, we leave it around for next time.
3404 // AllocConsole will silently fail if it already exists, so no pain.
3405 AllocConsole();
3406 // Set up members of the PROCESS_INFORMATION structure.
3407 memset(&ci->cpimportProcInfo, 0, sizeof(PROCESS_INFORMATION));
3408
3409 // Set up members of the STARTUPINFOEX structure.
3410 // This structure specifies the STDIN and STDOUT handles for redirection.
3411 memset(&siStartInfo, 0, sizeof(STARTUPINFOEX));
3412 siStartInfo.StartupInfo.cb = sizeof(STARTUPINFOEX);
3413 siStartInfo.lpAttributeList = lpAttributeList;
3414 siStartInfo.StartupInfo.hStdError = nullptr;
3415 siStartInfo.StartupInfo.hStdOutput = nullptr;
3416 siStartInfo.StartupInfo.hStdInput = ci->cpimport_stdin_Rd;
3417 siStartInfo.StartupInfo.dwFlags |= STARTF_USESTDHANDLES;
3418 // Create the child process.
3419 bSuccess = CreateProcess(NULL, // program. NULL means use command line
3420 const_cast<LPSTR>(aCmdLine.c_str()), // command line
3421 NULL, // process security attributes
3422 NULL, // primary thread security attributes
3423 TRUE, // handles are inherited
3424 EXTENDED_STARTUPINFO_PRESENT | CREATE_NEW_PROCESS_GROUP, // creation flags
3425 NULL, // use parent's environment
3426 NULL, // use parent's current directory
3427 &siStartInfo.StartupInfo, // STARTUPINFO pointer
3428 &ci->cpimportProcInfo); // receives PROCESS_INFORMATION
3429
3430 }
3431
3432 // We need to clean up the memory created by InitializeProcThreadAttributeList
3433 // and HeapAlloc
3434 if (bInitialized)
3435 DeleteProcThreadAttributeList(lpAttributeList);
3436
3437 if (lpAttributeList)
3438 HeapFree(GetProcessHeap(), 0, lpAttributeList);
3439
3440 if (!bSuccess)
3441 {
3442 // If an error occurs, Log and return.
3443 int errnum = GetLastError();
3444 char errmsg[512];
3445 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, errmsg, 512, NULL);
3446 ostringstream oss;
3447 oss << " : Error in " << pSectionMsg << " (errno-" <<
3448 errnum << "); " << errmsg;
3449 setError(current_thd, ER_INTERNAL_ERROR, oss.str());
3450 ci->singleInsert = true;
3451 log_this(thd, oss.str(), logging::LOG_TYPE_ERROR, tid2sid(thd->thread_id));
3452 log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3453 return;
3454 }
3455
3456 // Close the read handle that the child is using. We won't be needing this.
3457 CloseHandle(ci->cpimport_stdin_Rd);
3458 // The write functions all want a FILE*
3459 ci->fdt[1] = _open_osfhandle((intptr_t)ci->cpimport_stdin_Wr, _O_APPEND);
3460 ci->filePtr = _fdopen(ci->fdt[1], "w");
3461 #else
3462 long maxFD = -1;
3463 maxFD = sysconf(_SC_OPEN_MAX);
3464
3465 if (pipe(ci->fdt) == -1)
3466 {
3467 int errnum = errno;
3468 ostringstream oss;
3469 oss << " : Error in creating pipe (errno-" <<
3470 errnum << "); " << strerror(errnum);
3471 setError(current_thd, ER_INTERNAL_ERROR, oss.str());
3472 ci->singleInsert = true;
3473 log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3474 return;
3475 }
3476
3477 //cout << "maxFD = " << maxFD <<endl;
3478 errno = 0;
3479 pid_t aChPid = fork();
3480
3481 if (aChPid == -1) //an error caused
3482 {
3483 int errnum = errno;
3484 ostringstream oss;
3485 oss << " : Error in forking cpimport.bin (errno-" <<
3486 errnum << "); " << strerror(errnum);
3487 setError(current_thd, ER_INTERNAL_ERROR, oss.str());
3488 ci->singleInsert = true;
3489 log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3490 return;
3491 }
3492 else if (aChPid == 0) // we are in child
3493 {
3494 for (int i = 0; i < maxFD; i++)
3495 {
3496 if (i != ci->fdt[0])
3497 close(i);
3498 }
3499
3500 errno = 0;
3501
3502 if (dup2(ci->fdt[0], 0) < 0) //make stdin be the reading end of the pipe
3503 {
3504 setError(current_thd, ER_INTERNAL_ERROR, "dup2 failed");
3505 ci->singleInsert = true;
3506 exit (1);
3507 }
3508
3509 close(ci->fdt[0]); // will trigger an EOF on stdin
3510 ci->fdt[0] = -1;
3511 open("/dev/null", O_WRONLY);
3512 open("/dev/null", O_WRONLY);
3513 errno = 0;
3514 execvp(Cmds[0], &Cmds[0]); //NOTE - works with full Path
3515
3516 int execvErrno = errno;
3517
3518 ostringstream oss;
3519 oss << " : execvp error: cpimport.bin invocation failed; "
3520 << "(errno-" << errno << "); " << strerror(execvErrno) <<
3521 "; Check file and try invoking locally.";
3522 cout << oss.str();
3523
3524 setError(current_thd, ER_INTERNAL_ERROR, "Forking process cpimport failed.");
3525 ci->singleInsert = true;
3526 log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG,
3527 tid2sid(thd->thread_id));
3528 exit(1);
3529 }
3530 else // parent
3531 {
3532 ci->filePtr = fdopen(ci->fdt[1], "w");
3533 ci->cpimport_pid = aChPid; // This is the child PID
3534 close(ci->fdt[0]); //close the READER of PARENT
3535 ci->fdt[0] = -1;
3536 // now we can send all the data thru FIFO[1], writer of PARENT
3537 }
3538 // Set read_set used for bulk insertion of Fields inheriting
3539 //from Field_blob|Field_varstring
3540 bitmap_set_all(table->read_set);
3541 #endif
3542 }
3543 else
3544 {
3545 if (!ci->dmlProc)
3546 {
3547 ci->dmlProc = new MessageQueueClient("DMLProc");
3548 }
3549 }
3550 }
3551
3552 //Save table oid for commit to use
3553 if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert)
3554 {
3555 // query stats. only collect execution time and rows inserted for insert/load_data_infile
3556 ci->stats.reset();
3557 ci->stats.setStartTime();
3558 if (thd->main_security_ctx.user)
3559 {
3560 ci->stats.fUser = thd->main_security_ctx.user;
3561 }
3562 else
3563 {
3564 ci->stats.fUser = "";
3565 }
3566
3567 if (thd->main_security_ctx.host)
3568 ci->stats.fHost = thd->main_security_ctx.host;
3569 else if (thd->main_security_ctx.host_or_ip)
3570 ci->stats.fHost = thd->main_security_ctx.host_or_ip;
3571 else
3572 ci->stats.fHost = "unknown";
3573
3574 ci->stats.fSessionID = thd->thread_id;
3575 ci->stats.fQuery = idb_mysql_query_str(thd);
3576
3577 try
3578 {
3579 ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
3580 }
3581 catch (std::exception& e)
3582 {
3583 string msg = string("Columnstore User Priority - ") + e.what();
3584 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
3585 }
3586
3587 if ((thd->lex)->sql_command == SQLCOM_INSERT)
3588 ci->stats.fQueryType = CalpontSelectExecutionPlan::queryTypeToString(CalpontSelectExecutionPlan::INSERT);
3589 else if ((thd->lex)->sql_command == SQLCOM_LOAD)
3590 ci->stats.fQueryType = CalpontSelectExecutionPlan::queryTypeToString(CalpontSelectExecutionPlan::LOAD_DATA_INFILE);
3591
3592 //@Bug 4387. Check BRM status before start statement.
3593 boost::scoped_ptr<DBRM> dbrmp(new DBRM());
3594 int rc = dbrmp->isReadWrite();
3595
3596 if (rc != 0 )
3597 {
3598 setError(current_thd, ER_READ_ONLY_MODE, "Cannot execute the statement. DBRM is read only!");
3599 ci->rc = rc;
3600 ci->singleInsert = true;
3601 bitmap_clear_all(table->read_set);
3602 return;
3603 }
3604
3605 uint32_t stateFlags;
3606 dbrmp->getSystemState(stateFlags);
3607
3608 if (stateFlags & SessionManagerServer::SS_SUSPENDED)
3609 {
3610 setError(current_thd, ER_INTERNAL_ERROR, "Writing to the database is disabled.");
3611 bitmap_clear_all(table->read_set);
3612 return;
3613 }
3614
3615 CalpontSystemCatalog::TableName tableName;
3616 tableName.schema = table->s->db.str;
3617 tableName.table = table->s->table_name.str;
3618
3619 try
3620 {
3621 CalpontSystemCatalog::ROPair roPair = csc->tableRID(tableName, lower_case_table_names);
3622 ci->tableOid = roPair.objnum;
3623 }
3624 catch (IDBExcept& ie)
3625 {
3626 setError(thd, ER_INTERNAL_ERROR, ie.what());
3627 bitmap_clear_all(table->read_set);
3628 }
3629 catch (std::exception& ex)
3630 {
3631 bitmap_clear_all(table->read_set);
3632 setError(thd, ER_INTERNAL_ERROR,
3633 logging::IDBErrorInfo::instance()->errorMsg(ERR_SYSTEM_CATALOG) + ex.what());
3634 }
3635 }
3636
3637 if ( ci->rc != 0 )
3638 ci->rc = 0;
3639 }
3640
3641
3642
ha_mcs_impl_end_bulk_insert(bool abort,TABLE * table)3643 int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
3644 {
3645 // Clear read_set used for bulk insertion of Fields inheriting
3646 //from Field_blob|Field_varstring
3647 bitmap_clear_all(table->read_set);
3648 THD* thd = current_thd;
3649
3650 if (thd->slave_thread && !get_replication_slave(thd))
3651 return 0;
3652
3653 std::string aTmpDir(startup::StartUp::tmpDir());
3654
3655 if (get_fe_conn_info_ptr() == nullptr)
3656 set_fe_conn_info_ptr((void*)new cal_connection_info());
3657
3658 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3659
3660 int rc = 0;
3661
3662 if (ci->rc == 5) //read only dbrm
3663 return rc;
3664
3665 // @bug 2378. do not enter for select, reset singleInsert flag after multiple insert.
3666 // @bug 2515. Check command intead of vtable state
3667 if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || ci->isCacheInsert) && !ci->singleInsert )
3668 {
3669 if (((ci->useCpimport == 2) ||
3670 ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
3671 (!ci->singleInsert) &&
3672 ((ci->isLoaddataInfile) ||
3673 ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
3674 ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) )
3675 {
3676 #ifdef _MSC_VER
3677 if (thd->killed > 0)
3678 {
3679 errno = 0;
3680 // GenerateConsoleCtrlEvent sends a signal to cpimport
3681 BOOL brtn = GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, ci->cpimportProcInfo.dwProcessId);
3682
3683 if (!brtn)
3684 {
3685 int errnum = GetLastError();
3686 char errmsg[512];
3687 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, errmsg, 512, NULL);
3688 ostringstream oss;
3689 oss << "GenerateConsoleCtrlEvent: (errno-" << errnum << "); " << errmsg;
3690 log_this(thd, oss.str(), logging::LOG_TYPE_DEBUG,0);
3691 }
3692
3693 // Close handles to the cpimport process and its primary thread.
3694 fclose (ci->filePtr);
3695 ci->filePtr = 0;
3696 ci->fdt[1] = -1;
3697 CloseHandle(ci->cpimportProcInfo.hProcess);
3698 CloseHandle(ci->cpimportProcInfo.hThread);
3699 WaitForSingleObject(ci->cpimportProcInfo.hProcess, INFINITE);
3700 }
3701
3702 #else
3703
3704 if ( (thd->killed > 0) && (ci->cpimport_pid > 0) ) //handle CTRL-C
3705 {
3706 //cout << "sending ctrl-c to cpimport" << endl;
3707 errno = 0;
3708 kill( ci->cpimport_pid, SIGUSR1 );
3709 fclose (ci->filePtr);
3710 ci->filePtr = 0;
3711 ci->fdt[1] = -1;
3712 int aStatus;
3713 waitpid(ci->cpimport_pid, &aStatus, 0); // wait until cpimport finishs
3714 }
3715
3716 #endif
3717 else
3718 {
3719 //tear down cpimport
3720 #ifdef _MSC_VER
3721 fclose (ci->filePtr);
3722 ci->filePtr = 0;
3723 ci->fdt[1] = -1;
3724 DWORD exitCode;
3725 WaitForSingleObject(ci->cpimportProcInfo.hProcess, INFINITE);
3726 GetExitCodeProcess(ci->cpimportProcInfo.hProcess, &exitCode);
3727
3728 if (exitCode != 0)
3729 {
3730 rc = 1;
3731 setError(thd, ER_INTERNAL_ERROR, "load failed. The detailed error information is listed in InfiniDBLog.txt.");
3732 }
3733
3734 // Close handles to the cpimport process and its primary thread.
3735 CloseHandle(ci->cpimportProcInfo.hProcess);
3736 CloseHandle(ci->cpimportProcInfo.hThread);
3737 #else
3738 fclose (ci->filePtr);
3739 ci->filePtr = 0;
3740 ci->fdt[1] = -1;
3741 int aStatus;
3742 pid_t aPid = waitpid(ci->cpimport_pid, &aStatus, 0); // wait until cpimport finishs
3743
3744 if ((aPid == ci->cpimport_pid) || (aPid == -1))
3745 {
3746 ci->cpimport_pid = 0;
3747
3748 if ((WIFEXITED(aStatus)) && (WEXITSTATUS(aStatus) == 0))
3749 {
3750 //cout << "\tCpimport exit on success" << endl;
3751 }
3752 else
3753 {
3754 if (WEXITSTATUS(aStatus) == 2)
3755 {
3756 rc = 1;
3757 ifstream dmlFile;
3758 ostringstream oss;
3759 oss << aTmpDir << ci->tableOid << ".txt";
3760 dmlFile.open(oss.str().c_str());
3761
3762 if (dmlFile.is_open())
3763 {
3764 string line;
3765 getline(dmlFile, line);
3766 setError(thd, ER_INTERNAL_ERROR, line);
3767 dmlFile.close();
3768 remove (oss.str().c_str());
3769 }
3770 }
3771 else
3772 {
3773 rc = 1;
3774 ifstream dmlFile;
3775 ostringstream oss;
3776 oss << aTmpDir << ci->tableOid << ".txt";
3777 dmlFile.open(oss.str().c_str());
3778
3779 if (dmlFile.is_open())
3780 {
3781 string line;
3782 getline(dmlFile, line);
3783 setError(thd, ER_INTERNAL_ERROR, line);
3784 dmlFile.close();
3785 remove (oss.str().c_str());
3786 }
3787 else
3788 setError(thd, ER_INTERNAL_ERROR, "load failed. The detailed error information is listed in err.log.");
3789 }
3790 }
3791 }
3792
3793 #endif
3794 if ( rc == 0)
3795 {
3796 log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3797 }
3798 else
3799 {
3800 log_this(thd, "End SQL statement with error", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3801 }
3802
3803 ci->columnTypes.clear();
3804 //get extra warning count if any
3805 ifstream dmlFile;
3806 ostringstream oss;
3807 oss << aTmpDir << ci->tableOid << ".txt";
3808 dmlFile.open(oss.str().c_str());
3809 int totalWarnCount = 0;
3810 int colWarns = 0;
3811 string line;
3812
3813 if (dmlFile.is_open())
3814 {
3815 while (getline(dmlFile, line))
3816 {
3817 colWarns = atoi(line.c_str());
3818 totalWarnCount += colWarns;
3819 }
3820
3821 dmlFile.close();
3822 remove (oss.str().c_str());
3823
3824 for (int i = 0; i < totalWarnCount; i++)
3825 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, "Values saturated");
3826 }
3827 }
3828 }
3829 else
3830 {
3831 if ( thd->killed > 0 )
3832 abort = true;
3833
3834 if ( !ci->dmlProc )
3835 {
3836 ci->dmlProc = new MessageQueueClient("DMLProc");
3837 //cout << "end_bulk_insert starts a client " << ci->dmlProc << " for session " << thd->thread_id << endl;
3838 }
3839
3840 if (((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ((thd->lex)->sql_command == SQLCOM_LOAD))
3841 rc = ha_mcs_impl_write_last_batch(table, *ci, abort);
3842 }
3843 }
3844
3845 // populate query stats for insert and load data infile. insert select has
3846 // stats entered in sm already
3847 if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
3848 ((thd->lex)->sql_command == SQLCOM_LOAD) ||
3849 ci->isCacheInsert)
3850 {
3851 ci->stats.setEndTime();
3852 ci->stats.fErrorNo = rc;
3853
3854 if (ci->singleInsert)
3855 ci->stats.fRows = 1;
3856 else
3857 ci->stats.fRows = ci->rowsHaveInserted;
3858
3859 try
3860 {
3861 ci->stats.insert();
3862 }
3863 catch (std::exception& e)
3864 {
3865 string msg = string("Columnstore Query Stats - ") + e.what();
3866 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
3867 }
3868 }
3869
3870 // MCOL-4002 We earlier had these re-initializations set only for
3871 // non-transactions, i.e.:
3872 // !(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
3873 // However, we should be resetting these members anyways.
3874 ci->singleInsert = true; // reset the flag
3875 ci->isLoaddataInfile = false;
3876 ci->isCacheInsert = false;
3877 ci->tableOid = 0;
3878 ci->rowsHaveInserted = 0;
3879 ci->useCpimport = 1;
3880
3881 return rc;
3882 }
3883
ha_mcs_impl_commit(handlerton * hton,THD * thd,bool all)3884 int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all)
3885 {
3886 if (get_fe_conn_info_ptr() == nullptr)
3887 set_fe_conn_info_ptr((void*)new cal_connection_info());
3888
3889 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3890
3891 if (ci->isAlter)
3892 return 0;
3893
3894 //@Bug 5823 check if any active transaction for this session
3895 boost::scoped_ptr<DBRM> dbrmp(new DBRM());
3896 BRM::TxnID txnId = dbrmp->getTxnID(tid2sid(thd->thread_id));
3897
3898 if (!txnId.valid)
3899 return 0;
3900
3901 if ( !ci->dmlProc )
3902 {
3903 ci->dmlProc = new MessageQueueClient("DMLProc");
3904 //cout << "commit starts a client " << ci->dmlProc << " for session " << thd->thread_id << endl;
3905 }
3906
3907 int rc = ha_mcs_impl_commit_(hton, thd, all, *ci);
3908 thd->server_status &= ~SERVER_STATUS_IN_TRANS;
3909 ci->singleInsert = true; // reset the flag
3910 ci->isLoaddataInfile = false;
3911 ci->isCacheInsert = false;
3912 ci->tableOid = 0;
3913 ci->rowsHaveInserted = 0;
3914 return rc;
3915 }
3916
ha_mcs_impl_rollback(handlerton * hton,THD * thd,bool all)3917 int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all)
3918 {
3919 if (get_fe_conn_info_ptr() == nullptr)
3920 set_fe_conn_info_ptr((void*)new cal_connection_info());
3921
3922 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3923
3924 if ( !ci->dmlProc )
3925 {
3926
3927 ci->dmlProc = new MessageQueueClient("DMLProc");
3928 }
3929
3930 int rc = ha_mcs_impl_rollback_(hton, thd, all, *ci);
3931 ci->singleInsert = true; // reset the flag
3932 ci->isLoaddataInfile = false;
3933 ci->isCacheInsert = false;
3934 ci->tableOid = 0;
3935 ci->rowsHaveInserted = 0;
3936 thd->server_status &= ~SERVER_STATUS_IN_TRANS;
3937 return rc;
3938 }
3939
ha_mcs_impl_close_connection(handlerton * hton,THD * thd)3940 int ha_mcs_impl_close_connection (handlerton* hton, THD* thd)
3941 {
3942 if (!thd) return 0;
3943
3944 if (thd->thread_id == 0)
3945 return 0;
3946
3947 execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
3948
3949 // MCOL-3247 Use THD::ha_data as a per-plugin per-session
3950 // storage. Filled in external_lock when we remove a lock
3951 // from vtable(lock_type = 2)
3952 // An ugly way. I will use ha_data w/o external_lock.
3953 // This in MCOL-2178
3954 cal_connection_info* ci = nullptr;
3955 if(thd_get_ha_data(thd, hton) != (void*)0x42) // 0x42 is the magic CS sets when setup hton
3956 {
3957 ci = reinterpret_cast<cal_connection_info*>(thd_get_ha_data(thd, hton));
3958 }
3959
3960 if (!ci) return 0;
3961
3962 int rc = 0;
3963
3964 if ( ci->dmlProc )
3965 {
3966 rc = ha_mcs_impl_close_connection_(hton, thd, *ci);
3967 delete ci->dmlProc;
3968 ci->dmlProc = nullptr;
3969 }
3970
3971 if (ci->cal_conn_hndl)
3972 {
3973 sm::sm_cleanup(ci->cal_conn_hndl);
3974 ci->cal_conn_hndl = 0;
3975 }
3976
3977 return rc;
3978 }
3979
ha_mcs_impl_rename_table(const char * from,const char * to)3980 int ha_mcs_impl_rename_table(const char* from, const char* to)
3981 {
3982 IDEBUG( cout << "ha_mcs_impl_rename_table: " << from << " => " << to << endl );
3983
3984 if (get_fe_conn_info_ptr() == nullptr)
3985 set_fe_conn_info_ptr((void*)new cal_connection_info());
3986
3987 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3988
3989 //@Bug 1948. Alter table call rename table twice
3990 if ( ci->alterTableState == cal_connection_info::ALTER_FIRST_RENAME )
3991 {
3992 ci->alterTableState = cal_connection_info::ALTER_SECOND_RENAME;
3993 IDEBUG( cout << "ha_mcs_impl_rename_table: was in state ALTER_FIRST_RENAME, now in ALTER_SECOND_RENAME" << endl );
3994 return 0;
3995 }
3996 else if (ci->alterTableState == cal_connection_info::ALTER_SECOND_RENAME)
3997 {
3998 ci->alterTableState = cal_connection_info::NOT_ALTER;
3999 IDEBUG( cout << "ha_mcs_impl_rename_table: was in state ALTER_SECOND_RENAME, now in NOT_ALTER" << endl );
4000 return 0;
4001 }
4002
4003 int rc = ha_mcs_impl_rename_table_(from, to, *ci);
4004 return rc;
4005 }
4006
ha_mcs_impl_delete_row(const uchar * buf)4007 int ha_mcs_impl_delete_row(const uchar* buf)
4008 {
4009 IDEBUG( cout << "ha_mcs_impl_delete_row" << endl );
4010 return 0;
4011 }
4012
ha_mcs_impl_cond_push(COND * cond,TABLE * table,std::vector<COND * > & condStack)4013 COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>& condStack)
4014 {
4015 THD* thd = current_thd;
4016
4017 if (((thd->lex)->sql_command == SQLCOM_UPDATE) ||
4018 ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) ||
4019 ((thd->lex)->sql_command == SQLCOM_DELETE) ||
4020 ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI))
4021 {
4022 condStack.push_back(cond);
4023 return nullptr;
4024 }
4025
4026 string alias;
4027 alias.assign(table->alias.ptr(), table->alias.length());
4028 IDEBUG( cout << "ha_mcs_impl_cond_push: " << alias << endl );
4029
4030 if (get_fe_conn_info_ptr() == nullptr)
4031 set_fe_conn_info_ptr((void*)new cal_connection_info());
4032
4033 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4034
4035 cal_table_info ti = ci->tableMap[table];
4036
4037 #ifdef DEBUG_WALK_COND
4038 {
4039 gp_walk_info gwi;
4040 gwi.condPush = true;
4041 gwi.sessionid = tid2sid(thd->thread_id);
4042 cout << "------------------ cond push -----------------------" << endl;
4043 cond->traverse_cond(debug_walk, &gwi, Item::POSTFIX);
4044 cout << "------------------------------------------------\n" << endl;
4045 }
4046 #endif
4047
4048 if (!ti.csep)
4049 {
4050 if (!ti.condInfo)
4051 ti.condInfo = new gp_walk_info();
4052
4053 gp_walk_info* gwi = ti.condInfo;
4054 gwi->dropCond = false;
4055 gwi->fatalParseError = false;
4056 gwi->condPush = true;
4057 gwi->thd = thd;
4058 gwi->sessionid = tid2sid(thd->thread_id);
4059 cond->traverse_cond(gp_walk, gwi, Item::POSTFIX);
4060 ci->tableMap[table] = ti;
4061
4062 if (gwi->fatalParseError)
4063 {
4064 IDEBUG( cout << gwi->parseErrorText << endl );
4065
4066 if (ti.condInfo)
4067 {
4068 delete ti.condInfo;
4069 ti.condInfo = nullptr;
4070 ci->tableMap[table] = ti;
4071 }
4072
4073 return cond;
4074 }
4075
4076 if (gwi->dropCond)
4077 {
4078 return cond;
4079 }
4080 else
4081 {
4082 return nullptr;
4083 }
4084 }
4085
4086 return cond;
4087 }
4088
impl_external_lock(THD * thd,TABLE * table,int lock_type)4089 int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type)
4090 {
4091 // @bug 3014. Error out locking table command. IDB does not support it now.
4092 if (thd->lex->sql_command == SQLCOM_LOCK_TABLES)
4093 {
4094 setError(current_thd, ER_CHECK_NOT_IMPLEMENTED,
4095 logging::IDBErrorInfo::instance()->errorMsg(ERR_LOCK_TABLE));
4096 return ER_CHECK_NOT_IMPLEMENTED;
4097 }
4098
4099 // @info called for every table at the beginning and at the end of a query.
4100 // used for cleaning up the tableinfo.
4101 string alias;
4102 alias.assign(table->alias.ptr(), table->alias.length());
4103 IDEBUG( cout << "external_lock for " << alias << endl );
4104
4105 if (get_fe_conn_info_ptr() == nullptr)
4106 set_fe_conn_info_ptr((void*)new cal_connection_info());
4107
4108 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4109
4110 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
4111 {
4112 ci->physTablesList.clear();
4113 ci->tableMap.clear();
4114 force_close_fep_conn(thd, ci);
4115 return 0;
4116 }
4117
4118 m_lock_type= lock_type;
4119
4120 CalTableMap::iterator mapiter = ci->tableMap.find(table);
4121 // make sure this is a release lock (2nd) call called in
4122 // the table mode.
4123 if (mapiter != ci->tableMap.end()
4124 && (mapiter->second.condInfo || mapiter->second.csep)
4125 && lock_type == 2)
4126 {
4127 // CS ends up processing query with handlers
4128 // table mode
4129 if (mapiter->second.conn_hndl)
4130 {
4131 if (ci->traceFlags & 1)
4132 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, 9999, mapiter->second.conn_hndl->queryStats.c_str());
4133
4134 ci->queryStats = mapiter->second.conn_hndl->queryStats;
4135 ci->extendedStats = mapiter->second.conn_hndl->extendedStats;
4136 ci->miniStats = mapiter->second.conn_hndl->miniStats;
4137 sm::sm_cleanup(mapiter->second.conn_hndl);
4138 mapiter->second.conn_hndl = nullptr;
4139 }
4140
4141 if (mapiter->second.condInfo)
4142 {
4143 delete mapiter->second.condInfo;
4144 mapiter->second.condInfo = nullptr;
4145 }
4146
4147 // MCOL-2178 Check for tableMap size to set this only once.
4148 ci->queryState = 0;
4149 // Clean up the tableMap and physTablesList
4150 ci->tableMap.erase(table);
4151 ci->physTablesList.erase(table);
4152 thd->variables.in_subquery_conversion_threshold = IN_SUBQUERY_CONVERSION_THRESHOLD;
4153 restore_optimizer_flags(thd);
4154 }
4155 else
4156 {
4157 if (lock_type == 0)
4158 {
4159 ci->physTablesList.insert(table);
4160 // MCOL-2178 Disable Conversion of Big IN Predicates Into Subqueries
4161 thd->variables.in_subquery_conversion_threshold=~ 0;
4162 // Early optimizer_switch changes to avoid unsupported opt-s.
4163 mutate_optimizer_flags(thd);
4164 }
4165 else if (lock_type == 2)
4166 {
4167 std::set<TABLE*>::iterator iter = ci->physTablesList.find(table);
4168 if (iter != ci->physTablesList.end())
4169 {
4170 ci->physTablesList.erase(table);
4171 }
4172
4173 // CS ends up processing query with handlers
4174 if (iter != ci->physTablesList.end() && ci->physTablesList.empty())
4175 {
4176 if (!ci->cal_conn_hndl)
4177 return 0;
4178
4179 if (ci->traceFlags & 1)
4180 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, 9999, ci->cal_conn_hndl->queryStats.c_str());
4181
4182 ci->queryStats = ci->cal_conn_hndl->queryStats;
4183 ci->extendedStats = ci->cal_conn_hndl->extendedStats;
4184 ci->miniStats = ci->cal_conn_hndl->miniStats;
4185 ci->queryState = 0;
4186 // MCOL-3247 Use THD::ha_data as a per-plugin per-session
4187 // storage for cal_conn_hndl to use it later in close_connection
4188 thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
4189 // Clean up all tableMap entries made by cond_push
4190 for (auto &tme: ci->tableMap)
4191 {
4192 if (tme.second.condInfo)
4193 {
4194 delete tme.second.condInfo;
4195 tme.second.condInfo= nullptr;
4196 }
4197 }
4198 ci->tableMap.clear();
4199 // MCOL-2178 Enable Conversion of Big IN Predicates Into Subqueries
4200 thd->variables.in_subquery_conversion_threshold = IN_SUBQUERY_CONVERSION_THRESHOLD;
4201 restore_optimizer_flags(thd);
4202 }
4203
4204 }
4205 }
4206
4207 return 0;
4208 }
4209
4210 // for sorting length exceeds blob limit. Just error out for now.
ha_mcs_impl_rnd_pos(uchar * buf,uchar * pos)4211 int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos)
4212 {
4213 IDEBUG( cout << "ha_mcs_impl_rnd_pos" << endl);
4214 string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_ORDERBY_TOO_BIG);
4215 setError(current_thd, ER_INTERNAL_ERROR, emsg);
4216 return ER_INTERNAL_ERROR;
4217 }
4218
4219 /*@brief ha_mcs_impl_group_by_init - Get data for MariaDB group_by
4220 pushdown handler */
4221 /***********************************************************
4222 * DESCRIPTION:
4223 * Prepares data for group_by_handler::next_row() calls.
4224 * PARAMETERS:
4225 * group_hand - group by handler, that preserves initial table and items lists. .
4226 * table - TABLE pointer The table to save the result set into.
4227 * RETURN:
4228 * 0 if success
4229 * others if something went wrong whilst getting the result set
4230 ***********************************************************/
ha_mcs_impl_group_by_init(mcs_handler_info * handler_info,TABLE * table)4231 int ha_mcs_impl_group_by_init(mcs_handler_info *handler_info, TABLE* table)
4232 {
4233 ha_mcs_group_by_handler *group_hand=
4234 reinterpret_cast<ha_mcs_group_by_handler*>(handler_info->hndl_ptr);
4235 string tableName = group_hand->table_list->table->s->table_name.str;
4236 IDEBUG( cout << "group_by_init for table " << tableName << endl );
4237 THD* thd = current_thd;
4238
4239 //check whether the system is ready to process statement.
4240 #ifndef _MSC_VER
4241 static DBRM dbrm(true);
4242 int bSystemQueryReady = dbrm.getSystemQueryReady();
4243
4244 if (bSystemQueryReady == 0)
4245 {
4246 // Still not ready
4247 setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries");
4248 return ER_INTERNAL_ERROR;
4249 }
4250 else if (bSystemQueryReady < 0)
4251 {
4252 // Still not ready
4253 setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries");
4254 return ER_INTERNAL_ERROR;
4255 }
4256
4257 #endif
4258
4259 uint32_t sessionID = tid2sid(thd->thread_id);
4260 boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
4261 csc->identity(CalpontSystemCatalog::FE);
4262
4263 if (get_fe_conn_info_ptr() == nullptr)
4264 set_fe_conn_info_ptr((void*)new cal_connection_info());
4265
4266 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4267
4268 idbassert(ci != 0);
4269
4270
4271 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
4272 {
4273 force_close_fep_conn(thd, ci);
4274 return 0;
4275 }
4276
4277 sm::tableid_t tableid = 0;
4278 cal_table_info ti;
4279 cal_group_info gi;
4280 sm::cpsm_conhdl_t* hndl;
4281 SCSEP csep;
4282
4283 bool localQuery = get_local_query(thd);
4284
4285 {
4286 ci->stats.reset(); // reset query stats
4287 ci->stats.setStartTime();
4288 if (thd->main_security_ctx.user)
4289 {
4290 ci->stats.fUser = thd->main_security_ctx.user;
4291 }
4292 else
4293 {
4294 ci->stats.fUser = "";
4295 }
4296
4297 if (thd->main_security_ctx.host)
4298 ci->stats.fHost = thd->main_security_ctx.host;
4299 else if (thd->main_security_ctx.host_or_ip)
4300 ci->stats.fHost = thd->main_security_ctx.host_or_ip;
4301 else
4302 ci->stats.fHost = "unknown";
4303
4304 try
4305 {
4306 ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
4307 }
4308 catch (std::exception& e)
4309 {
4310 string msg = string("Columnstore User Priority - ") + e.what();
4311 ci->warningMsg = msg;
4312 }
4313
4314 // If the previous query has error and
4315 // this is not a subquery run by the server(MCOL-1601)
4316 // re-establish the connection
4317 if (ci->queryState != 0)
4318 {
4319 if( ci->cal_conn_hndl_st.size() == 0 )
4320 sm::sm_cleanup(ci->cal_conn_hndl);
4321 ci->cal_conn_hndl = 0;
4322 }
4323
4324 sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery);
4325 idbassert(ci->cal_conn_hndl != 0);
4326 ci->cal_conn_hndl->csc = csc;
4327 idbassert(ci->cal_conn_hndl->exeMgr != 0);
4328
4329 try
4330 {
4331 ci->cal_conn_hndl->connect();
4332 }
4333 catch (...)
4334 {
4335 setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
4336 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
4337 goto error;
4338 }
4339
4340 hndl = ci->cal_conn_hndl;
4341
4342 ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
4343 if (!csep)
4344 csep.reset(new CalpontSelectExecutionPlan());
4345
4346 SessionManager sm;
4347 BRM::TxnID txnID;
4348 txnID = sm.getTxnID(sessionID);
4349
4350 if (!txnID.valid)
4351 {
4352 txnID.id = 0;
4353 txnID.valid = true;
4354 }
4355
4356 QueryContext verID;
4357 verID = sm.verID();
4358
4359 csep->txnID(txnID.id);
4360 csep->verID(verID);
4361 csep->sessionID(sessionID);
4362
4363 if (group_hand->table_list->db.length)
4364 csep->schemaName(group_hand->table_list->db.str, lower_case_table_names);
4365
4366 csep->traceFlags(ci->traceFlags);
4367
4368 // MCOL-1052 Send Items lists down to the optimizer.
4369 gi.groupByTables = group_hand->table_list;
4370 gi.groupByFields = group_hand->select;
4371 gi.groupByWhere = group_hand->where;
4372 gi.groupByGroup = group_hand->group_by;
4373 gi.groupByOrder = group_hand->order_by;
4374 gi.groupByHaving = group_hand->having;
4375 gi.groupByDistinct = group_hand->distinct;
4376
4377 // MCOL-1052 Send pushed conditions here, since server could omit GROUP BY
4378 // items in case of = or IN functions used on GROUP BY columns.
4379 {
4380 CalTableMap::iterator mapiter;
4381 execplan::CalpontSelectExecutionPlan::ColumnMap::iterator colMapIter;
4382 execplan::CalpontSelectExecutionPlan::ColumnMap::iterator condColMapIter;
4383 execplan::ParseTree* ptIt;
4384
4385 for (TABLE_LIST* tl = gi.groupByTables; tl; tl = tl->next_local)
4386 {
4387 mapiter = ci->tableMap.find(tl->table);
4388
4389 if (mapiter != ci->tableMap.end() && mapiter->second.condInfo != NULL
4390 && mapiter->second.condInfo->condPush)
4391 {
4392 while (!mapiter->second.condInfo->ptWorkStack.empty())
4393 {
4394 ptIt = mapiter->second.condInfo->ptWorkStack.top();
4395 mapiter->second.condInfo->ptWorkStack.pop();
4396 gi.pushedPts.push_back(ptIt);
4397 }
4398 }
4399 }
4400 }
4401 // send plan whenever group_init is called
4402 int status = cp_get_group_plan(thd, csep, gi);
4403
4404 // Never proceed if status != 0 to avoid empty DA
4405 // crashes on later stages
4406 if (status != 0)
4407 goto internal_error;
4408
4409 // @bug 2547. don't need to send the plan if it's impossible where for all unions.
4410 // MCOL-2178 commenting the below out since cp_get_group_plan does not modify this variable
4411 // which has a default value of false
4412 //if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
4413 // return 0;
4414
4415 string query;
4416 // Set the query text only once if the server executes
4417 // subqueries separately.
4418 if(ci->queryState)
4419 query.assign("<subquery of the previous>");
4420 else
4421 query.assign(thd->query_string.str(), thd->query_string.length());
4422 csep->data(query);
4423
4424 try
4425 {
4426 csep->priority( ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
4427 }
4428 catch (std::exception& e)
4429 {
4430 string msg = string("Columnstore User Priority - ") + e.what();
4431 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
4432 }
4433
4434 #ifdef PLAN_HEX_FILE
4435 // plan serialization
4436 string tmpDir = aTmpDir + "/li1-plan.hex";
4437
4438 ifstream ifs(tmpDir);
4439 ByteStream bs1;
4440 ifs >> bs1;
4441 ifs.close();
4442 csep->unserialize(bs1);
4443 #endif
4444
4445 if (ci->traceFlags & 1)
4446 {
4447 cerr << "---------------- EXECUTION PLAN ----------------" << endl;
4448 cerr << *csep << endl ;
4449 cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
4450 }
4451 else
4452 {
4453 IDEBUG( cout << "---------------- EXECUTION PLAN ----------------" << endl );
4454 IDEBUG( cerr << *csep << endl );
4455 IDEBUG( cout << "-------------- EXECUTION PLAN END --------------\n" << endl );
4456 }
4457 }// end of execution plan generation
4458
4459 {
4460 ByteStream msg;
4461 ByteStream emsgBs;
4462
4463 while (true)
4464 {
4465 try
4466 {
4467 ByteStream::quadbyte qb = 4;
4468 msg << qb;
4469 hndl->exeMgr->write(msg);
4470 msg.restart();
4471 csep->rmParms(ci->rmParms);
4472
4473 //send plan
4474 csep->serialize(msg);
4475 hndl->exeMgr->write(msg);
4476
4477 //get ExeMgr status back to indicate a vtable joblist success or not
4478 msg.restart();
4479 emsgBs.restart();
4480 msg = hndl->exeMgr->read();
4481 emsgBs = hndl->exeMgr->read();
4482 string emsg;
4483
4484 if (msg.length() == 0 || emsgBs.length() == 0)
4485 {
4486 emsg = "Lost connection to ExeMgr. Please contact your administrator";
4487 setError(thd, ER_INTERNAL_ERROR, emsg);
4488 return ER_INTERNAL_ERROR;
4489 }
4490
4491 string emsgStr;
4492 emsgBs >> emsgStr;
4493 bool err = false;
4494
4495 if (msg.length() == 4)
4496 {
4497 msg >> qb;
4498
4499 if (qb != 0)
4500 {
4501 err = true;
4502 // for makejoblist error, stats contains only error code and insert from here
4503 // because table fetch is not started
4504 ci->stats.setEndTime();
4505 ci->stats.fQuery = csep->data();
4506 ci->stats.fQueryType = csep->queryType();
4507 ci->stats.fErrorNo = qb;
4508
4509 try
4510 {
4511 ci->stats.insert();
4512 }
4513 catch (std::exception& e)
4514 {
4515 string msg = string("Columnstore Query Stats - ") + e.what();
4516 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
4517 }
4518 }
4519 }
4520 else
4521 {
4522 err = true;
4523 }
4524
4525 if (err)
4526 {
4527 setError(thd, ER_INTERNAL_ERROR, emsgStr);
4528 return ER_INTERNAL_ERROR;
4529 }
4530
4531 ci->rmParms.clear();
4532
4533 ci->queryState = 1;
4534
4535 break;
4536 }
4537 catch (...)
4538 {
4539 sm::sm_cleanup(hndl);
4540 hndl = 0;
4541
4542 sm::sm_init(sessionID, &hndl, localQuery);
4543 idbassert(hndl != 0);
4544 hndl->csc = csc;
4545
4546 ci->cal_conn_hndl = hndl;
4547 ci->cal_conn_hndl_st.pop();
4548 ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
4549 try
4550 {
4551 hndl->connect();
4552 }
4553 catch (...)
4554 {
4555 setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
4556 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
4557 goto error;
4558 }
4559
4560 msg.restart();
4561 }
4562 }
4563 }
4564
4565 // set query state to be in_process. Sometimes mysql calls rnd_init multiple
4566 // times, this makes sure plan only being generated and sent once. It will be
4567 // reset when query finishes in sm::end_query
4568
4569 // common path for both vtable select phase and table mode -- open scan handle
4570 ti = ci->tableMap[table];
4571 ti.msTablePtr = table;
4572
4573 {
4574 // MCOL-1601 Using stacks of ExeMgr conn hndls, table and scan contexts.
4575 ti.tpl_ctx = new sm::cpsm_tplh_t();
4576 ti.tpl_ctx_st.push(ti.tpl_ctx);
4577 ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
4578 ti.tpl_scan_ctx_st.push(ti.tpl_scan_ctx);
4579
4580 // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
4581 // call rnd_init for a table more than once.
4582 ti.tpl_scan_ctx->rowGroup = nullptr;
4583
4584 try
4585 {
4586 tableid = execplan::IDB_VTABLE_ID;
4587 }
4588 catch (...)
4589 {
4590 string emsg = "No table ID found for table " + string(table->s->table_name.str);
4591 setError(thd, ER_INTERNAL_ERROR, emsg);
4592 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
4593 goto internal_error;
4594 }
4595
4596 try
4597 {
4598 sm::tpl_open(tableid, ti.tpl_ctx, hndl);
4599 sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
4600 }
4601 catch (std::exception& e)
4602 {
4603 string emsg = "table can not be opened: " + string(e.what());
4604 setError(thd, ER_INTERNAL_ERROR, emsg);
4605 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
4606 goto internal_error;
4607 }
4608 catch (...)
4609 {
4610 string emsg = "table can not be opened";
4611 setError(thd, ER_INTERNAL_ERROR, emsg);
4612 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
4613 goto internal_error;
4614 }
4615
4616 ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
4617
4618 if ((ti.tpl_scan_ctx->ctp).size() == 0)
4619 {
4620 uint32_t num_attr = table->s->fields;
4621
4622 for (uint32_t i = 0; i < num_attr; i++)
4623 {
4624 CalpontSystemCatalog::ColType ctype;
4625 ti.tpl_scan_ctx->ctp.push_back(ctype);
4626 }
4627 }
4628 }
4629
4630 ci->tableMap[table] = ti;
4631 return 0;
4632
4633 error:
4634
4635 if (ci->cal_conn_hndl)
4636 {
4637 // end_query() should be called here.
4638 sm::sm_cleanup(ci->cal_conn_hndl);
4639 ci->cal_conn_hndl = 0;
4640 }
4641
4642 // do we need to close all connection handle of the table map?
4643 return ER_INTERNAL_ERROR;
4644
4645 internal_error:
4646
4647 if (ci->cal_conn_hndl)
4648 {
4649 // end_query() should be called here.
4650 sm::sm_cleanup(ci->cal_conn_hndl);
4651 ci->cal_conn_hndl = 0;
4652 }
4653
4654 return ER_INTERNAL_ERROR;
4655 }
4656
4657 /*@brief ha_mcs_impl_group_by_next - Return result set for MariaDB group_by
4658 pushdown handler
4659 */
4660 /***********************************************************
4661 * DESCRIPTION:
4662 * Return a result record for each group_by_handler::next_row() call.
4663 * PARAMETERS:
4664 * group_hand - group by handler, that preserves initial table and items lists. .
4665 * table - TABLE pointer The table to save the result set in.
4666 * RETURN:
4667 * 0 if success
4668 * HA_ERR_END_OF_FILE if the record set has come to an end
4669 * others if something went wrong whilst getting the result set
4670 ***********************************************************/
ha_mcs_impl_group_by_next(TABLE * table)4671 int ha_mcs_impl_group_by_next(TABLE* table)
4672 {
4673 THD* thd = current_thd;
4674
4675 if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
4676 ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
4677 return HA_ERR_END_OF_FILE;
4678
4679 if (thd->slave_thread && !get_replication_slave(thd) && (
4680 thd->lex->sql_command == SQLCOM_INSERT ||
4681 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
4682 thd->lex->sql_command == SQLCOM_UPDATE ||
4683 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
4684 thd->lex->sql_command == SQLCOM_DELETE ||
4685 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
4686 thd->lex->sql_command == SQLCOM_TRUNCATE ||
4687 thd->lex->sql_command == SQLCOM_LOAD))
4688 return HA_ERR_END_OF_FILE;
4689
4690 // @bug 2547
4691 // MCOL-2178
4692 // if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
4693 // return HA_ERR_END_OF_FILE;
4694
4695 if (get_fe_conn_info_ptr() == nullptr)
4696 set_fe_conn_info_ptr((void*)new cal_connection_info());
4697
4698 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4699
4700 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
4701 {
4702 force_close_fep_conn(thd, ci);
4703 return 0;
4704 }
4705
4706 if (ci->alterTableState > 0) return HA_ERR_END_OF_FILE;
4707
4708 cal_table_info ti;
4709 ti = ci->tableMap[table];
4710 int rc = HA_ERR_END_OF_FILE;
4711
4712 if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
4713 {
4714 CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
4715 return ER_INTERNAL_ERROR;
4716 }
4717
4718 idbassert(ti.msTablePtr == table);
4719
4720 try
4721 {
4722 // fetchNextRow interface forces to use buf.
4723 unsigned char buf;
4724 rc = fetchNextRow(&buf, ti, ci, true);
4725 }
4726 catch (std::exception& e)
4727 {
4728 string emsg = string("Error while fetching from ExeMgr: ") + e.what();
4729 setError(thd, ER_INTERNAL_ERROR, emsg);
4730 CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
4731 return ER_INTERNAL_ERROR;
4732 }
4733
4734 ci->tableMap[table] = ti;
4735
4736 if (rc != 0 && rc != HA_ERR_END_OF_FILE)
4737 {
4738 string emsg;
4739
4740 // remove this check when all error handling migrated to the new framework.
4741 if (rc >= 1000)
4742 emsg = ti.tpl_scan_ctx->errMsg;
4743 else
4744 {
4745 logging::ErrorCodes errorcodes;
4746 emsg = errorcodes.errorString(rc);
4747 }
4748
4749 setError(thd, ER_INTERNAL_ERROR, emsg);
4750 ci->stats.fErrorNo = rc;
4751 CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
4752 rc = ER_INTERNAL_ERROR;
4753 }
4754
4755 return rc;
4756 }
4757
ha_mcs_impl_group_by_end(TABLE * table)4758 int ha_mcs_impl_group_by_end(TABLE* table)
4759 {
4760 int rc = 0;
4761 THD* thd = current_thd;
4762
4763 if (thd->slave_thread && !get_replication_slave(thd) && (
4764 thd->lex->sql_command == SQLCOM_INSERT ||
4765 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
4766 thd->lex->sql_command == SQLCOM_UPDATE ||
4767 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
4768 thd->lex->sql_command == SQLCOM_DELETE ||
4769 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
4770 thd->lex->sql_command == SQLCOM_TRUNCATE ||
4771 thd->lex->sql_command == SQLCOM_LOAD))
4772 return 0;
4773
4774 // MCOL-2178 isUnion member only assigned, never used
4775 // MIGR::infinidb_vtable.isUnion = false;
4776
4777 cal_connection_info* ci = nullptr;
4778
4779 if (get_fe_conn_info_ptr() != NULL)
4780 ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4781
4782 if (!ci)
4783 {
4784 set_fe_conn_info_ptr((void*)new cal_connection_info());
4785 ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4786 }
4787
4788 if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
4789 ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
4790 {
4791 force_close_fep_conn(thd, ci, true); // with checking prev command rc
4792 return rc;
4793 }
4794
4795 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
4796 {
4797 force_close_fep_conn(thd, ci);
4798 // clear querystats because no query stats available for cancelled query
4799 ci->queryStats = "";
4800 // Poping next ExeMgr connection out of the stack
4801 if ( ci->cal_conn_hndl_st.size() )
4802 {
4803 ci->cal_conn_hndl_st.pop();
4804 if ( ci->cal_conn_hndl_st.size() )
4805 ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
4806 }
4807
4808 return 0;
4809 }
4810
4811 IDEBUG( cerr << "group_by_end for table " << table->s->table_name.str << endl );
4812
4813 cal_table_info ti = ci->tableMap[table];
4814 sm::cpsm_conhdl_t* hndl;
4815 bool clearScanCtx = false;
4816
4817 hndl = ci->cal_conn_hndl;
4818
4819 if (ti.tpl_ctx)
4820 {
4821 if (ti.tpl_scan_ctx.get())
4822 {
4823 clearScanCtx = ( (ti.tpl_scan_ctx.get()->rowsreturned) &&
4824 ti.tpl_scan_ctx.get()->rowsreturned == ti.tpl_scan_ctx.get()->getRowCount() );
4825 try
4826 {
4827 sm::tpl_scan_close(ti.tpl_scan_ctx);
4828 }
4829 catch (...)
4830 {
4831 rc = ER_INTERNAL_ERROR;
4832 }
4833 }
4834
4835 ti.tpl_scan_ctx.reset();
4836 if ( ti.tpl_scan_ctx_st.size() )
4837 {
4838 ti.tpl_scan_ctx_st.pop();
4839 if ( ti.tpl_scan_ctx_st.size() )
4840 ti.tpl_scan_ctx = ti.tpl_scan_ctx_st.top();
4841 }
4842 try
4843 {
4844 if(hndl)
4845 {
4846 {
4847 bool ask_4_stats = (ci->traceFlags) ? true : false;
4848 sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, ask_4_stats, clearScanCtx);
4849 }
4850 // Normaly stats variables are set in external_lock method but we set it here
4851 // since they we pretend we are in vtable_disabled mode and the stats vars won't be set.
4852 // We sum the stats up here since server could run a number of
4853 // queries e.g. each for a subquery in a filter.
4854 if(hndl)
4855 {
4856 if (hndl->queryStats.length())
4857 ci->queryStats += hndl->queryStats;
4858 if (hndl->extendedStats.length())
4859 ci->extendedStats += hndl->extendedStats;
4860 if (hndl->miniStats.length())
4861 ci->miniStats += hndl->miniStats;
4862 }
4863 }
4864
4865 ci->cal_conn_hndl = hndl;
4866
4867 ti.tpl_ctx = 0;
4868 }
4869 catch (IDBExcept& e)
4870 {
4871 if (e.errorCode() == ERR_CROSS_ENGINE_CONNECT || e.errorCode() == ERR_CROSS_ENGINE_CONFIG)
4872 {
4873 string msg = string("Columnstore Query Stats - ") + e.what();
4874 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
4875 }
4876 else
4877 {
4878 setError(thd, ER_INTERNAL_ERROR, e.what());
4879 rc = ER_INTERNAL_ERROR;
4880 }
4881 }
4882 catch (std::exception& e)
4883 {
4884 setError(thd, ER_INTERNAL_ERROR, e.what());
4885 rc = ER_INTERNAL_ERROR;
4886 }
4887 catch (...)
4888 {
4889 setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in group_by_end");
4890 rc = ER_INTERNAL_ERROR;
4891 }
4892 }
4893
4894 ti.tpl_ctx = 0;
4895
4896 if ( ti.tpl_ctx_st.size() )
4897 {
4898 ti.tpl_ctx_st.pop();
4899 if ( ti.tpl_ctx_st.size() )
4900 ti.tpl_ctx = ti.tpl_ctx_st.top();
4901 }
4902
4903 if ( ci->cal_conn_hndl_st.size() )
4904 {
4905 ci->cal_conn_hndl_st.pop();
4906 if ( ci->cal_conn_hndl_st.size() )
4907 ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
4908 }
4909
4910 ci->tableMap[table] = ti;
4911
4912 // push warnings from CREATE phase
4913 if (!ci->warningMsg.empty())
4914 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, ci->warningMsg.c_str());
4915
4916 ci->warningMsg.clear();
4917 // reset expressionId just in case
4918 ci->expressionId = 0;
4919 return rc;
4920 }
4921
4922
4923 /*@brief Initiate the query for derived_handler */
4924 /***********************************************************
4925 * DESCRIPTION:
4926 * Execute the query and saves derived table query.
4927 * There is an extra handler argument so I ended up with a
4928 * new init function. The code is a copy of
4929 * ha_mcs_impl_rnd_init() mostly.
4930 * PARAMETERS:
4931 * mcs_handler_info* pnt to an envelope struct
4932 * TABLE* table - dest table to put the results into
4933 * RETURN:
4934 * rc as int
4935 ***********************************************************/
ha_mcs_impl_pushdown_init(mcs_handler_info * handler_info,TABLE * table)4936 int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
4937 {
4938 IDEBUG( cout << "pushdown_init for table " << endl );
4939 THD* thd = current_thd;
4940
4941 if (thd->slave_thread && !get_replication_slave(thd) && (
4942 thd->lex->sql_command == SQLCOM_INSERT ||
4943 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
4944 thd->lex->sql_command == SQLCOM_UPDATE ||
4945 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
4946 thd->lex->sql_command == SQLCOM_DELETE ||
4947 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
4948 thd->lex->sql_command == SQLCOM_TRUNCATE ||
4949 thd->lex->sql_command == SQLCOM_LOAD))
4950 return 0;
4951
4952 gp_walk_info gwi;
4953 gwi.thd = thd;
4954 bool err = false;
4955
4956 //check whether the system is ready to process statement.
4957 #ifndef _MSC_VER
4958 static DBRM dbrm(true);
4959 int bSystemQueryReady = dbrm.getSystemQueryReady();
4960
4961 if (bSystemQueryReady == 0)
4962 {
4963 // Still not ready
4964 setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries");
4965 return ER_INTERNAL_ERROR;
4966 }
4967 else if (bSystemQueryReady < 0)
4968 {
4969 // Still not ready
4970 setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries");
4971 return ER_INTERNAL_ERROR;
4972 }
4973 #endif
4974
4975 // Set this to close all outstanding FEP connections on
4976 // client disconnect in handlerton::closecon_handlerton().
4977 if ( !thd_get_ha_data(thd, mcs_hton))
4978 {
4979 thd_set_ha_data(thd, mcs_hton, reinterpret_cast<void*>(0x42));
4980 }
4981
4982 if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE )
4983 {
4984 return 0;
4985 }
4986
4987 // MCOL-4023 We need to test this code path.
4988 //Update and delete code
4989 if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
4990 return doUpdateDelete(thd, gwi, std::vector<COND*>());
4991
4992 uint32_t sessionID = tid2sid(thd->thread_id);
4993 boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
4994 csc->identity(CalpontSystemCatalog::FE);
4995
4996 if (!get_fe_conn_info_ptr())
4997 set_fe_conn_info_ptr(reinterpret_cast<void*>(new cal_connection_info(), thd));
4998
4999 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
5000
5001 idbassert(ci != 0);
5002
5003 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
5004 {
5005 if (ci->cal_conn_hndl)
5006 {
5007 // send ExeMgr a signal before closing the connection
5008 ByteStream msg;
5009 ByteStream::quadbyte qb = 0;
5010 msg << qb;
5011
5012 try
5013 {
5014 ci->cal_conn_hndl->exeMgr->write(msg);
5015 }
5016 catch (...)
5017 {
5018 // canceling query. ignore connection failure.
5019 }
5020
5021 sm::sm_cleanup(ci->cal_conn_hndl);
5022 ci->cal_conn_hndl = 0;
5023 }
5024
5025 return 0;
5026 }
5027
5028 sm::tableid_t tableid = 0;
5029 cal_table_info ti;
5030 sm::cpsm_conhdl_t* hndl;
5031 SCSEP csep;
5032 // Declare handlers ptrs in this scope for future use.
5033 select_handler* sh = nullptr;
5034 derived_handler* dh = nullptr;
5035
5036 // update traceFlags according to the autoswitch state.
5037 ci->traceFlags = (ci->traceFlags | CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)^
5038 CalpontSelectExecutionPlan::TRACE_TUPLE_OFF;
5039
5040 bool localQuery = (get_local_query(thd) > 0 ? true : false);
5041
5042 {
5043 ci->stats.reset(); // reset query stats
5044 ci->stats.setStartTime();
5045 if (thd->main_security_ctx.user)
5046 {
5047 ci->stats.fUser = thd->main_security_ctx.user;
5048 }
5049 else
5050 {
5051 ci->stats.fUser = "";
5052 }
5053
5054 if (thd->main_security_ctx.host)
5055 ci->stats.fHost = thd->main_security_ctx.host;
5056 else if (thd->main_security_ctx.host_or_ip)
5057 ci->stats.fHost = thd->main_security_ctx.host_or_ip;
5058 else
5059 ci->stats.fHost = "unknown";
5060
5061 try
5062 {
5063 ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
5064 }
5065 catch (std::exception& e)
5066 {
5067 string msg = string("Columnstore User Priority - ") + e.what();
5068 ci->warningMsg = msg;
5069 }
5070
5071 // if the previous query has error, re-establish the connection
5072 if (ci->queryState != 0)
5073 {
5074 sm::sm_cleanup(ci->cal_conn_hndl);
5075 ci->cal_conn_hndl = 0;
5076 }
5077
5078 sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery);
5079 idbassert(ci->cal_conn_hndl != 0);
5080 ci->cal_conn_hndl->csc = csc;
5081 idbassert(ci->cal_conn_hndl->exeMgr != 0);
5082
5083 try
5084 {
5085 ci->cal_conn_hndl->connect();
5086 }
5087 catch (...)
5088 {
5089 setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
5090 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5091 goto error;
5092 }
5093
5094 hndl = ci->cal_conn_hndl;
5095
5096 IDEBUG( std::cout << idb_mysql_query_str(thd) << std::endl );
5097
5098 {
5099 if (!csep)
5100 csep.reset(new CalpontSelectExecutionPlan());
5101
5102 SessionManager sm;
5103 BRM::TxnID txnID;
5104 txnID = sm.getTxnID(sessionID);
5105
5106 if (!txnID.valid)
5107 {
5108 txnID.id = 0;
5109 txnID.valid = true;
5110 }
5111
5112 QueryContext verID;
5113 verID = sm.verID();
5114
5115 csep->txnID(txnID.id);
5116 csep->verID(verID);
5117 csep->sessionID(sessionID);
5118
5119 if (thd->db.length)
5120 csep->schemaName(thd->db.str, lower_case_table_names);
5121
5122 csep->traceFlags(ci->traceFlags);
5123
5124 // cast the handler and get a plan.
5125 int status = 42;
5126 if (handler_info->hndl_type == mcs_handler_types_t::SELECT)
5127 {
5128 sh = reinterpret_cast<select_handler*>(handler_info->hndl_ptr);
5129 status = cs_get_select_plan(sh, thd, csep, gwi);
5130 }
5131 else if (handler_info->hndl_type == DERIVED)
5132 {
5133 dh = reinterpret_cast<derived_handler*>(handler_info->hndl_ptr);
5134 status = cs_get_derived_plan(dh, thd, csep, gwi);
5135 }
5136
5137 // Return an error to avoid MDB crash later in end_statement
5138 if (status != 0)
5139 goto internal_error;
5140
5141 string query;
5142 query.assign(idb_mysql_query_str(thd));
5143 csep->data(query);
5144
5145 try
5146 {
5147 csep->priority( ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
5148 }
5149 catch (std::exception& e)
5150 {
5151 string msg = string("Columnstore User Priority - ") + e.what();
5152 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
5153 }
5154
5155 // DRRTUY Make this runtime configureable
5156 #ifdef PLAN_HEX_FILE
5157 // plan serialization
5158 ifstream ifs("/tmp/li1-plan.hex");
5159 ByteStream bs1;
5160 ifs >> bs1;
5161 ifs.close();
5162 csep->unserialize(bs1);
5163 #endif
5164
5165 if (ci->traceFlags & 1)
5166 {
5167 cerr << "---------------- EXECUTION PLAN ----------------" << endl;
5168 cerr << *csep << endl ;
5169 cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
5170 }
5171 else
5172 {
5173 IDEBUG( cout << "---------------- EXECUTION PLAN ----------------" << endl );
5174 IDEBUG( cerr << *csep << endl );
5175 IDEBUG( cout << "-------------- EXECUTION PLAN END --------------\n" << endl );
5176 }
5177 }
5178 }// end of execution plan generation
5179
5180 {
5181 ByteStream msg;
5182 ByteStream emsgBs;
5183
5184 while (true)
5185 {
5186 try
5187 {
5188 ByteStream::quadbyte qb = 4;
5189 msg << qb;
5190 hndl->exeMgr->write(msg);
5191 msg.restart();
5192 csep->rmParms(ci->rmParms);
5193
5194 //send plan
5195 csep->serialize(msg);
5196 hndl->exeMgr->write(msg);
5197
5198 //get ExeMgr status back to indicate a vtable joblist success or not
5199 msg.restart();
5200 emsgBs.restart();
5201 msg = hndl->exeMgr->read();
5202 emsgBs = hndl->exeMgr->read();
5203 string emsg;
5204
5205 if (msg.length() == 0 || emsgBs.length() == 0)
5206 {
5207 emsg = "Lost connection to ExeMgr. Please contact your administrator";
5208 setError(thd, ER_INTERNAL_ERROR, emsg);
5209 return ER_INTERNAL_ERROR;
5210 }
5211
5212 string emsgStr;
5213 emsgBs >> emsgStr;
5214
5215 if (msg.length() == 4)
5216 {
5217 msg >> qb;
5218
5219 if (qb != 0)
5220 {
5221 err = true;
5222 // for makejoblist error, stats contains only error code and insert from here
5223 // because table fetch is not started
5224 ci->stats.setEndTime();
5225 ci->stats.fQuery = csep->data();
5226 ci->stats.fQueryType = csep->queryType();
5227 ci->stats.fErrorNo = qb;
5228
5229 try
5230 {
5231 ci->stats.insert();
5232 }
5233 catch (std::exception& e)
5234 {
5235 string msg = string("Columnstore Query Stats - ") + e.what();
5236 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
5237 }
5238 }
5239 }
5240 else
5241 {
5242 err = true;
5243 }
5244
5245 if (err)
5246 {
5247 // CS resets error in create_SH() if fallback is enabled
5248 setError(thd, ER_INTERNAL_ERROR, emsgStr);
5249 goto internal_error;
5250 }
5251
5252 ci->rmParms.clear();
5253
5254 // SH will initiate SM in select_next() only
5255 if (!sh)
5256 ci->queryState= sm::QUERY_IN_PROCESS;
5257
5258 break;
5259 }
5260 catch (...)
5261 {
5262 sm::sm_cleanup(hndl);
5263 hndl = 0;
5264
5265 sm::sm_init(sessionID, &hndl, localQuery);
5266 idbassert(hndl != 0);
5267 hndl->csc = csc;
5268
5269 ci->cal_conn_hndl = hndl;
5270
5271 try
5272 {
5273 hndl->connect();
5274 }
5275 catch (...)
5276 {
5277 setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
5278 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5279 goto error;
5280 }
5281
5282 msg.restart();
5283 }
5284 }
5285 }
5286
5287 // set query state to be in_process. Sometimes mysql calls rnd_init multiple
5288 // times, this makes sure plan only being generated and sent once. It will be
5289 // reset when query finishes in sm::end_query
5290
5291 // common path for both vtable select phase and table mode -- open scan handle
5292 ti = ci->tableMap[table];
5293 // This is the server's temp table for the result.
5294 if(sh)
5295 {
5296 ti.msTablePtr = sh->table;
5297 }
5298 else
5299 {
5300 ti.msTablePtr = dh->table;
5301 }
5302
5303 // For SH CS creates SM environment inside select_next().
5304 // This allows us to try and fail with SH.
5305 if (!sh)
5306 {
5307 if (ti.tpl_ctx == 0)
5308 {
5309 ti.tpl_ctx = new sm::cpsm_tplh_t();
5310 ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
5311 }
5312
5313 // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
5314 // call rnd_init for a table more than once.
5315 ti.tpl_scan_ctx->rowGroup = nullptr;
5316
5317 try
5318 {
5319 tableid = execplan::IDB_VTABLE_ID;
5320 }
5321 catch (...)
5322 {
5323 string emsg = "No table ID found for table " + string(table->s->table_name.str);
5324 setError(thd, ER_INTERNAL_ERROR, emsg);
5325 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5326 goto internal_error;
5327 }
5328
5329 try
5330 {
5331 sm::tpl_open(tableid, ti.tpl_ctx, hndl);
5332 sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
5333 }
5334 catch (std::exception& e)
5335 {
5336 string emsg = "table can not be opened: " + string(e.what());
5337 setError(thd, ER_INTERNAL_ERROR, emsg);
5338 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5339 goto internal_error;
5340 }
5341 catch (...)
5342 {
5343 string emsg = "table can not be opened";
5344 setError(thd, ER_INTERNAL_ERROR, emsg);
5345 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5346 goto internal_error;
5347 }
5348
5349 ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
5350
5351 if ((ti.tpl_scan_ctx->ctp).size() == 0)
5352 {
5353 uint32_t num_attr = table->s->fields;
5354
5355 for (uint32_t i = 0; i < num_attr; i++)
5356 {
5357 CalpontSystemCatalog::ColType ctype;
5358 ti.tpl_scan_ctx->ctp.push_back(ctype);
5359 }
5360 }
5361 ci->tableMap[table] = ti;
5362 }
5363
5364 return 0;
5365
5366 error:
5367
5368 if (ci->cal_conn_hndl)
5369 {
5370 sm::sm_cleanup(ci->cal_conn_hndl);
5371 ci->cal_conn_hndl = 0;
5372 }
5373
5374 // do we need to close all connection handle of the table map
5375 return ER_INTERNAL_ERROR;
5376
5377 internal_error:
5378
5379 if (ci->cal_conn_hndl)
5380 {
5381 sm::sm_cleanup(ci->cal_conn_hndl);
5382 ci->cal_conn_hndl = 0;
5383 }
5384
5385 return ER_INTERNAL_ERROR;
5386 }
5387
ha_mcs_impl_select_next(uchar * buf,TABLE * table)5388 int ha_mcs_impl_select_next(uchar* buf, TABLE* table)
5389 {
5390 THD* thd = current_thd;
5391
5392 if (thd->slave_thread && !get_replication_slave(thd) && (
5393 thd->lex->sql_command == SQLCOM_INSERT ||
5394 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
5395 thd->lex->sql_command == SQLCOM_UPDATE ||
5396 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
5397 thd->lex->sql_command == SQLCOM_DELETE ||
5398 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
5399 thd->lex->sql_command == SQLCOM_TRUNCATE ||
5400 thd->lex->sql_command == SQLCOM_LOAD))
5401 return HA_ERR_END_OF_FILE;
5402
5403 if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
5404 ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
5405 return HA_ERR_END_OF_FILE;
5406
5407 if (get_fe_conn_info_ptr() == nullptr)
5408 set_fe_conn_info_ptr((void*)new cal_connection_info());
5409
5410 cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
5411
5412 int rc = HA_ERR_END_OF_FILE;
5413
5414 // @bug 2547
5415 // MCOL-2178 This variable can never be true in the scope of this function
5416 // if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
5417 // return HA_ERR_END_OF_FILE;
5418
5419 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
5420 {
5421 force_close_fep_conn(thd, ci);
5422 return 0;
5423 }
5424
5425 if (ci->alterTableState > 0) return rc;
5426
5427 cal_table_info ti;
5428 ti= ci->tableMap[table];
5429 // This is the server's temp table for the result.
5430 ti.msTablePtr= table;
5431 sm::tableid_t tableid= execplan::IDB_VTABLE_ID;
5432 sm::cpsm_conhdl_t* hndl= ci->cal_conn_hndl;
5433
5434 if (!ti.tpl_ctx || !ti.tpl_scan_ctx || (hndl && hndl->queryState == sm::NO_QUERY))
5435 {
5436 if (ti.tpl_ctx == 0)
5437 {
5438 ti.tpl_ctx = new sm::cpsm_tplh_t();
5439 ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
5440 }
5441
5442 // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
5443 // call rnd_init for a table more than once.
5444 ti.tpl_scan_ctx->rowGroup = nullptr;
5445
5446 try
5447 {
5448 sm::tpl_open(tableid, ti.tpl_ctx, hndl);
5449 sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
5450 }
5451 catch (std::exception& e)
5452 {
5453 uint32_t sessionID = tid2sid(thd->thread_id);
5454 string emsg = "table can not be opened: " + string(e.what());
5455 setError(thd, ER_INTERNAL_ERROR, emsg);
5456 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5457 goto internal_error;
5458 }
5459 catch (...)
5460 {
5461 uint32_t sessionID = tid2sid(thd->thread_id);
5462 string emsg = "table can not be opened";
5463 setError(thd, ER_INTERNAL_ERROR, emsg);
5464 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5465 goto internal_error;
5466 }
5467
5468 ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
5469
5470 if ((ti.tpl_scan_ctx->ctp).size() == 0)
5471 {
5472 uint32_t num_attr = table->s->fields;
5473
5474 for (uint32_t i = 0; i < num_attr; i++)
5475 {
5476 CalpontSystemCatalog::ColType ctype;
5477 ti.tpl_scan_ctx->ctp.push_back(ctype);
5478 }
5479 }
5480 ci->tableMap[table] = ti;
5481 hndl->queryState= sm::QUERY_IN_PROCESS;
5482 }
5483
5484 if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
5485 {
5486 uint32_t sessionID = tid2sid(thd->thread_id);
5487 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5488 return ER_INTERNAL_ERROR;
5489 }
5490
5491 idbassert(ti.msTablePtr == table);
5492
5493 try
5494 {
5495 rc = fetchNextRow(buf, ti, ci);
5496 }
5497 catch (std::exception& e)
5498 {
5499 uint32_t sessionID = tid2sid(thd->thread_id);
5500 string emsg = string("Error while fetching from ExeMgr: ") + e.what();
5501 setError(thd, ER_INTERNAL_ERROR, emsg);
5502 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5503 return ER_INTERNAL_ERROR;
5504 }
5505
5506 ci->tableMap[table]= ti;
5507
5508 if (rc != 0 && rc != HA_ERR_END_OF_FILE)
5509 {
5510 string emsg;
5511
5512 // remove this check when all error handling migrated to the new framework.
5513 if (rc >= 1000)
5514 emsg = ti.tpl_scan_ctx->errMsg;
5515 else
5516 {
5517 logging::ErrorCodes errorcodes;
5518 emsg = errorcodes.errorString(rc);
5519 }
5520
5521 uint32_t sessionID = tid2sid(thd->thread_id);
5522 setError(thd, ER_INTERNAL_ERROR, emsg);
5523 ci->stats.fErrorNo = rc;
5524 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5525 rc = ER_INTERNAL_ERROR;
5526 }
5527
5528 return rc;
5529
5530 internal_error:
5531
5532 if (ci->cal_conn_hndl)
5533 {
5534 sm::sm_cleanup(ci->cal_conn_hndl);
5535 ci->cal_conn_hndl = 0;
5536 }
5537
5538 return ER_INTERNAL_ERROR;
5539
5540 }
5541
5542 // vim:sw=4 ts=4:
5543