1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 #include <my_config.h>
20 #ifndef _MSC_VER
21 #include <unistd.h>
22 #endif
23 #include <string>
24 #include <iostream>
25 #include <stack>
26 #ifdef _MSC_VER
27 #include <unordered_map>
28 #include <unordered_set>
29 #include <stdio.h>
30 #else
31 #include <tr1/unordered_map>
32 #include <tr1/unordered_set>
33 #endif
34 #include <fstream>
35 #include <sstream>
36 #include <cerrno>
37 #include <cstring>
38 #include <time.h>
39 #include <cassert>
40 #include <vector>
41 #include <map>
42 #include <limits>
43 #if defined(__linux__)
44 #include <wait.h>			//wait()
45 #elif defined(__FreeBSD__)
46 #include <sys/types.h>
47 #include <sys/stat.h>   	// For stat().
48 #include <sys/wait.h>
49 #include <sys/time.h>
50 #include <sys/resource.h>
51 #endif
52 using namespace std;
53 
54 #include <boost/shared_ptr.hpp>
55 #include <boost/algorithm/string/case_conv.hpp>
56 #include <boost/regex.hpp>
57 #include <boost/thread.hpp>
58 
59 #include "idb_mysql.h"
60 
61 #define NEED_CALPONT_INTERFACE
62 #include "ha_mcs_impl.h"
63 
64 #include "ha_mcs_impl_if.h"
65 using namespace cal_impl_if;
66 
67 #include "calpontselectexecutionplan.h"
68 #include "logicoperator.h"
69 #include "parsetree.h"
70 using namespace execplan;
71 
72 #include "dataconvert.h"
73 using namespace dataconvert;
74 
75 #include "sm.h"
76 #include "ha_mcs_pushdown.h"
77 
78 #include "bytestream.h"
79 #include "messagequeue.h"
80 using namespace messageqcpp;
81 
82 #include "dmlpackage.h"
83 #include "calpontdmlpackage.h"
84 #include "insertdmlpackage.h"
85 #include "vendordmlstatement.h"
86 #include "calpontdmlfactory.h"
87 using namespace dmlpackage;
88 
89 #include "dmlpackageprocessor.h"
90 using namespace dmlpackageprocessor;
91 
92 #include "configcpp.h"
93 using namespace config;
94 
95 #include "rowgroup.h"
96 using namespace rowgroup;
97 
98 #include "brmtypes.h"
99 using namespace BRM;
100 
101 #include "querystats.h"
102 using namespace querystats;
103 
104 #include "calpontselectexecutionplan.h"
105 #include "calpontsystemcatalog.h"
106 #include "simplecolumn_int.h"
107 #include "simplecolumn_decimal.h"
108 #include "aggregatecolumn.h"
109 #include "constantcolumn.h"
110 #include "simplefilter.h"
111 #include "constantfilter.h"
112 #include "functioncolumn.h"
113 #include "arithmeticcolumn.h"
114 #include "arithmeticoperator.h"
115 #include "logicoperator.h"
116 #include "predicateoperator.h"
117 #include "rowcolumn.h"
118 #include "selectfilter.h"
119 using namespace execplan;
120 
121 #include "joblisttypes.h"
122 using namespace joblist;
123 
124 #include "cacheutils.h"
125 
126 #include "errorcodes.h"
127 #include "idberrorinfo.h"
128 #include "errorids.h"
129 using namespace logging;
130 
131 #include "resourcemanager.h"
132 
133 #include "funcexp.h"
134 #include "functor.h"
135 using namespace funcexp;
136 
137 #include "installdir.h"
138 #include "columnstoreversion.h"
139 #include "ha_mcs_sysvars.h"
140 
141 namespace cal_impl_if
142 {
143 extern bool nonConstFunc(Item_func* ifp);
144 }
145 
146 namespace
147 {
148 // Calpont vtable non-support error message
149 const string infinidb_autoswitch_warning = "The query includes syntax that is not supported by MariaDB Columnstore distributed mode. The execution was switched to standard mode with downgraded performance.";
150 
151 // copied from item_timefunc.cc
152 static const string interval_names[] =
153 {
154     "year", "quarter", "month", "week", "day",
155     "hour", "minute", "second", "microsecond",
156     "year_month", "day_hour", "day_minute",
157     "day_second", "hour_minute", "hour_second",
158     "minute_second", "day_microsecond",
159     "hour_microsecond", "minute_microsecond",
160     "second_microsecond"
161 };
162 
163 const unsigned NONSUPPORTED_ERR_THRESH = 2000;
164 
165 // HDFS is never used nowadays, so don't bother
166 bool useHdfs = false; // ResourceManager::instance()->useHdfs();
167 
168 //convenience fcn
tid2sid(const uint32_t tid)169 inline uint32_t tid2sid(const uint32_t tid)
170 {
171     return CalpontSystemCatalog::idb_tid2sid(tid);
172 }
173 
174 
175 /**
176   @brief
177   Wrapper around logging facility.
178 
179   @details
180   Reduces the boiler plate code.
181 
182   Called from number of places(mostly DML) in
183   ha_mcs_impl.cpp().
184 */
log_this(THD * thd,const char * message,logging::LOG_TYPE log_type,unsigned sid)185 void log_this(THD *thd, const char *message,
186     logging::LOG_TYPE log_type, unsigned sid)
187 {
188     // corresponds with dbcon in SubsystemID vector
189     // in messagelog.cpp
190     unsigned int subSystemId = 24;
191     logging::LoggingID logid( subSystemId, sid, 0);
192     logging::Message::Args args1;
193     logging::Message msg(1);
194     args1.add(message);
195     msg.format( args1 );
196     Logger logger(logid.fSubsysID);
197     logger.logMessage(log_type, msg, logid);
198 }
199 
200 /**
201   @brief
202   Forcely close a FEP connection.
203 
204   @details
205   Plugin code opens network connection with ExMgr to
206   get:
207     the result of meta-data queries
208     the result of DML or DQL query in any mode
209     statistics
210   This code allows to explicitly close the connection
211   if any error happens using a non-existing protocol
212   code 0. This causes ExeMgr loop to drop the
213   connection.
214 
215   Called from many places in ha_mcs_impl.cpp().
216 */
force_close_fep_conn(THD * thd,cal_connection_info * ci,bool check_prev_rc=false)217 void force_close_fep_conn(THD *thd, cal_connection_info* ci, bool check_prev_rc = false)
218 {
219     if (!ci->cal_conn_hndl)
220     {
221         return;
222     }
223 
224     if(check_prev_rc && !ci->rc)
225     {
226         return;
227     }
228 
229     // send ExeMgr an unknown signal to force him to close
230     // the connection
231     ByteStream msg;
232     ByteStream::quadbyte qb = 0;
233     msg << qb;
234 
235     try
236     {
237         ci->cal_conn_hndl->exeMgr->write(msg);
238     }
239     catch (...)
240     {
241         // Add details into the message.
242         log_this(thd, "Exception in force_close_fep_conn().",
243             logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
244     }
245 
246     sm::sm_cleanup(ci->cal_conn_hndl);
247     ci->cal_conn_hndl = 0;
248 }
249 
storeNumericField(Field ** f,int64_t value,CalpontSystemCatalog::ColType & ct)250 void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct)
251 {
252     // unset null bit first
253     if ((*f)->null_ptr)
254         *(*f)->null_ptr &= ~(*f)->null_bit;
255 
256     // For unsigned, use the ColType returned in the row rather than the
257     // unsigned_flag set by mysql. This is because mysql gets it wrong for SUM()
258     // Hopefully, in all other cases we get it right.
259     switch ((*f)->type())
260     {
261         case MYSQL_TYPE_NEWDECIMAL:
262         {
263             // @bug4388 stick to InfiniDB's scale in case mysql gives wrong scale due
264             // to create vtable limitation.
265             //if (f2->dec < ct.scale)
266             //    f2->dec = ct.scale;
267 
268             char buf[256];
269             dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, buf, 256, ct.colDataType);
270             (*f)->store(buf, strlen(buf), (*f)->charset());
271             break;
272         }
273 
274         case MYSQL_TYPE_TINY: //TINYINT type
275         {
276             Field_tiny* f2 = (Field_tiny*)*f;
277             longlong int_val = (longlong)value;
278             (*f)->store(int_val, f2->unsigned_flag);
279             break;
280         }
281 
282         case MYSQL_TYPE_SHORT: //SMALLINT type
283         {
284             Field_short* f2 = (Field_short*)*f;
285             longlong int_val = (longlong)value;
286             (*f)->store(int_val, f2->unsigned_flag);
287             break;
288         }
289 
290         case MYSQL_TYPE_INT24: //MEDINT type
291         {
292             Field_medium* f2 = (Field_medium*)*f;
293             longlong int_val = (longlong)value;
294             (*f)->store(int_val, f2->unsigned_flag);
295             break;
296         }
297 
298         case MYSQL_TYPE_LONG: //INT type
299         {
300             Field_long* f2 = (Field_long*)*f;
301             longlong int_val = (longlong)value;
302             (*f)->store(int_val, f2->unsigned_flag);
303             break;
304         }
305 
306         case MYSQL_TYPE_LONGLONG: //BIGINT type
307         {
308             Field_longlong* f2 = (Field_longlong*)*f;
309             longlong int_val = (longlong)value;
310             (*f)->store(int_val, f2->unsigned_flag);
311             break;
312         }
313 
314         case MYSQL_TYPE_FLOAT: // FLOAT type
315         {
316             float float_val = *(float*)(&value);
317             (*f)->store(float_val);
318             break;
319         }
320 
321         case MYSQL_TYPE_DOUBLE: // DOUBLE type
322         {
323             double double_val = *(double*)(&value);
324             (*f)->store(double_val);
325             break;
326         }
327 
328         case MYSQL_TYPE_VARCHAR:
329         {
330             char tmp[25];
331             if (ct.colDataType == CalpontSystemCatalog::DECIMAL)
332                 dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, tmp, 25, ct.colDataType);
333             else
334                 snprintf(tmp, 25, "%lld", (long long)value);
335 
336             (*f)->store(tmp, strlen(tmp), (*f)->charset());
337             break;
338         }
339 
340         default:
341         {
342             Field_longlong* f2 = (Field_longlong*)*f;
343             longlong int_val = (longlong)value;
344             (*f)->store(int_val, f2->unsigned_flag);
345             break;
346         }
347     }
348 }
349 
350 //
351 // @bug 2244. Log exception related to lost connection to ExeMgr.
352 // Log exception error from calls to sm::tpl_scan_fetch in fetchNextRow()
353 //
tpl_scan_fetch_LogException(cal_table_info & ti,cal_connection_info * ci,std::exception * ex)354 void tpl_scan_fetch_LogException( cal_table_info& ti, cal_connection_info* ci, std::exception* ex)
355 {
356     time_t t = time(0);
357     char datestr[50];
358     ctime_r(&t, datestr);
359     datestr[ strlen(datestr) - 1 ] = '\0'; // strip off trailing newline
360 
361     uint32_t sesID   = 0;
362     string connHndl("No connection handle to use");
363 
364     if (ti.conn_hndl)
365     {
366         connHndl = "ti connection used";
367         sesID = ti.conn_hndl->sessionID;
368     }
369 
370     else if (ci->cal_conn_hndl)
371     {
372         connHndl = "ci connection used";
373         sesID = ci->cal_conn_hndl->sessionID;
374     }
375 
376     int64_t rowsRet = -1;
377 
378     if (ti.tpl_scan_ctx)
379         rowsRet = ti.tpl_scan_ctx->rowsreturned;
380 
381     if (ex)
382         cerr << datestr << ": sm::tpl_scan_fetch error getting rows for sessionID: " <<
383              sesID << "; " << connHndl << "; rowsReturned: " << rowsRet <<
384              "; reason-" << ex->what() << endl;
385     else
386         cerr << datestr << ": sm::tpl_scan_fetch unknown error getting rows for sessionID: " <<
387              sesID << "; " << connHndl << "; rowsReturned: " << rowsRet << endl;
388 }
389 
390 const char hexdig[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', };
391 
vbin2hex(const uint8_t * p,const unsigned l,char * o)392 int vbin2hex(const uint8_t* p, const unsigned l, char* o)
393 {
394     for (unsigned i = 0; i < l; i++, p++)
395     {
396         *o++ = hexdig[*p >> 4];
397         *o++ = hexdig[*p & 0xf];
398     }
399 
400     return 0;
401 }
402 
403 // Table Map is used by both cond_push and table mode processing
404 // Entries made by cond_push don't have csep though.
405 // When
onlyOneTableinTM(cal_impl_if::cal_connection_info * ci)406 bool onlyOneTableinTM(cal_impl_if::cal_connection_info* ci)
407 {
408     size_t counter = 0;
409     for (auto &tableMapEntry: ci->tableMap)
410     {
411         if (tableMapEntry.second.csep)
412             counter++;
413         if (counter >= 1)
414             return false;
415     }
416 
417     return true;
418 }
419 
fetchNextRow(uchar * buf,cal_table_info & ti,cal_connection_info * ci,bool handler_flag=false)420 int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool handler_flag = false)
421 {
422     int rc = HA_ERR_END_OF_FILE;
423     int num_attr = ti.msTablePtr->s->fields;
424     sm::status_t sm_stat;
425 
426     try
427     {
428         if (ti.conn_hndl)
429         {
430             sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ti.conn_hndl);
431         }
432         else if (ci->cal_conn_hndl)
433         {
434             sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ci->cal_conn_hndl, (int*)(&current_thd->killed));
435         }
436         else
437             throw runtime_error("internal error");
438     }
439     catch (std::exception& ex)
440     {
441 // @bug 2244. Always log this msg for now, as we try to track down when/why we are
442 //			losing socket connection with ExeMgr
443 //#ifdef INFINIDB_DEBUG
444         tpl_scan_fetch_LogException( ti, ci, &ex);
445 //#endif
446         sm_stat = sm::CALPONT_INTERNAL_ERROR;
447     }
448     catch (...)
449     {
450 // @bug 2244. Always log this msg for now, as we try to track down when/why we are
451 //			losing socket connection with ExeMgr
452 //#ifdef INFINIDB_DEBUG
453         tpl_scan_fetch_LogException( ti, ci, 0 );
454 //#endif
455         sm_stat = sm::CALPONT_INTERNAL_ERROR;
456     }
457 
458     if (sm_stat == sm::STATUS_OK)
459     {
460         Field** f;
461         f = ti.msTablePtr->field;
462 
463         //set all fields to null in null col bitmap
464         if (!handler_flag)
465             memset(buf, -1, ti.msTablePtr->s->null_bytes);
466         else
467         {
468             memset(ti.msTablePtr->null_flags, -1, ti.msTablePtr->s->null_bytes);
469         }
470 
471         std::vector<CalpontSystemCatalog::ColType>& colTypes = ti.tpl_scan_ctx->ctp;
472         int64_t intColVal = 0;
473         uint64_t uintColVal = 0;
474         char tmp[256];
475 
476         RowGroup* rowGroup = ti.tpl_scan_ctx->rowGroup;
477 
478         // table mode mysql expects all columns of the table. mapping between columnoid and position in rowgroup
479         // set coltype.position to be the position in rowgroup. only set once.
480         if (ti.tpl_scan_ctx->rowsreturned == 0 &&
481                 (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF))
482         {
483             for (uint32_t i = 0; i < rowGroup->getColumnCount(); i++)
484             {
485                 int oid = rowGroup->getOIDs()[i];
486                 int j = 0;
487 
488                 for (; j < num_attr; j++)
489                 {
490                     // mysql should haved eliminated duplicate projection columns
491                     if (oid == colTypes[j].columnOID || oid == colTypes[j].ddn.dictOID)
492                     {
493                         colTypes[j].colPosition = i;
494                         break;
495                     }
496                 }
497             }
498         }
499 
500         rowgroup::Row row;
501         rowGroup->initRow(&row);
502         rowGroup->getRow(ti.tpl_scan_ctx->rowsreturned, &row);
503         int s;
504 
505         for (int p = 0; p < num_attr; p++, f++)
506         {
507             //This col is going to be written
508             bitmap_set_bit(ti.msTablePtr->write_set, (*f)->field_index);
509 
510             // get coltype if not there yet
511             if (colTypes[0].colWidth == 0)
512             {
513                 for (short c = 0; c < num_attr; c++)
514                 {
515                     colTypes[c].colPosition = c;
516                     colTypes[c].colWidth = rowGroup->getColumnWidth(c);
517                     colTypes[c].colDataType = rowGroup->getColTypes()[c];
518                     colTypes[c].columnOID = rowGroup->getOIDs()[c];
519                     colTypes[c].scale = rowGroup->getScale()[c];
520                     colTypes[c].precision = rowGroup->getPrecision()[c];
521                 }
522             }
523 
524             CalpontSystemCatalog::ColType colType(colTypes[p]);
525 
526             // table mode handling
527             if (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)
528             {
529                 if (colType.colPosition == -1) // not projected by tuplejoblist
530                     continue;
531                 else
532                     s = colType.colPosition;
533             }
534             else
535             {
536                 s = p;
537             }
538 
539             // precision == -16 is borrowed as skip null check indicator for bit ops.
540             if (row.isNullValue(s) && colType.precision != -16)
541             {
542                 // @2835. Handle empty string and null confusion. store empty string for string column
543                 if (colType.colDataType == CalpontSystemCatalog::CHAR ||
544                         colType.colDataType == CalpontSystemCatalog::VARCHAR ||
545                         colType.colDataType == CalpontSystemCatalog::VARBINARY)
546                 {
547                     (*f)->store(tmp, 0, (*f)->charset());
548                 }
549 
550                 continue;
551             }
552 
553             // fetch and store data
554             switch (colType.colDataType)
555             {
556                 case CalpontSystemCatalog::DATE:
557                 {
558                     if ((*f)->null_ptr)
559                         *(*f)->null_ptr &= ~(*f)->null_bit;
560 
561                     intColVal = row.getUintField<4>(s);
562                     DataConvert::dateToString(intColVal, tmp, 255);
563                     (*f)->store(tmp, strlen(tmp), (*f)->charset());
564                     break;
565                 }
566 
567                 case CalpontSystemCatalog::DATETIME:
568                 {
569                     if ((*f)->null_ptr)
570                         *(*f)->null_ptr &= ~(*f)->null_bit;
571 
572                     intColVal = row.getUintField<8>(s);
573                     DataConvert::datetimeToString(intColVal, tmp, 255, colType.precision);
574                     (*f)->store(tmp, strlen(tmp), (*f)->charset());
575                     break;
576                 }
577 
578                 case CalpontSystemCatalog::TIME:
579                 {
580                     if ((*f)->null_ptr)
581                         *(*f)->null_ptr &= ~(*f)->null_bit;
582 
583                     intColVal = row.getUintField<8>(s);
584                     DataConvert::timeToString(intColVal, tmp, 255, colType.precision);
585                     (*f)->store(tmp, strlen(tmp), (*f)->charset());
586                     break;
587                 }
588 
589                 case CalpontSystemCatalog::TIMESTAMP:
590                 {
591                     if ((*f)->null_ptr)
592                         *(*f)->null_ptr &= ~(*f)->null_bit;
593 
594                     intColVal = row.getUintField<8>(s);
595                     DataConvert::timestampToString(intColVal, tmp, 255, current_thd->variables.time_zone->get_name()->ptr(), colType.precision);
596                     (*f)->store(tmp, strlen(tmp), (*f)->charset());
597                     break;
598                 }
599 
600                 case CalpontSystemCatalog::CHAR:
601                 case CalpontSystemCatalog::VARCHAR:
602                 {
603                     switch (colType.colWidth)
604                     {
605                         case 1:
606                             intColVal = row.getUintField<1>(s);
607                             (*f)->store((char*)(&intColVal), strlen((char*)(&intColVal)), (*f)->charset());
608                             break;
609 
610                         case 2:
611                             intColVal = row.getUintField<2>(s);
612                             (*f)->store((char*)(&intColVal), strlen((char*)(&intColVal)), (*f)->charset());
613                             break;
614 
615                         case 4:
616                             intColVal = row.getUintField<4>(s);
617                             (*f)->store((char*)(&intColVal), strlen((char*)(&intColVal)), (*f)->charset());
618                             break;
619 
620                         case 8:
621                             //make sure we don't send strlen off into the weeds...
622                             intColVal = row.getUintField<8>(s);
623                             memcpy(tmp, &intColVal, 8);
624                             tmp[8] = 0;
625                             (*f)->store(tmp, strlen(tmp), (*f)->charset());
626                             break;
627 
628                         default:
629                             (*f)->store((const char*)row.getStringPointer(s), row.getStringLength(s), (*f)->charset());
630                     }
631 
632                     if ((*f)->null_ptr)
633                         *(*f)->null_ptr &= ~(*f)->null_bit;
634 
635                     break;
636                 }
637 
638                 case CalpontSystemCatalog::VARBINARY:
639                 {
640                     if (get_varbin_always_hex(current_thd))
641                     {
642                         uint32_t l;
643                         const uint8_t* p = row.getVarBinaryField(l, s);
644                         uint32_t ll = l * 2;
645                         boost::scoped_array<char> sca(new char[ll]);
646                         vbin2hex(p, l, sca.get());
647                         (*f)->store(sca.get(), ll, (*f)->charset());
648                     }
649                     else
650                         (*f)->store((const char*)row.getVarBinaryField(s), row.getVarBinaryLength(s), (*f)->charset());
651 
652                     if ((*f)->null_ptr)
653                         *(*f)->null_ptr &= ~(*f)->null_bit;
654 
655                     break;
656                 }
657 
658                 case CalpontSystemCatalog::BIGINT:
659                 {
660                     intColVal = row.getIntField<8>(s);
661                     storeNumericField(f, intColVal, colType);
662                     break;
663                 }
664 
665                 case CalpontSystemCatalog::UBIGINT:
666                 {
667                     uintColVal = row.getUintField<8>(s);
668                     storeNumericField(f, uintColVal, colType);
669                     break;
670                 }
671 
672                 case CalpontSystemCatalog::INT:
673                 case CalpontSystemCatalog::MEDINT:
674                 {
675                     intColVal = row.getIntField<4>(s);
676                     storeNumericField(f, intColVal, colType);
677                     break;
678                 }
679 
680                 case CalpontSystemCatalog::UINT:
681                 case CalpontSystemCatalog::UMEDINT:
682                 {
683                     uintColVal = row.getUintField<4>(s);
684                     storeNumericField(f, uintColVal, colType);
685                     break;
686                 }
687 
688                 case CalpontSystemCatalog::SMALLINT:
689                 {
690                     intColVal = row.getIntField<2>(s);
691                     storeNumericField(f, intColVal, colType);
692                     break;
693                 }
694 
695                 case CalpontSystemCatalog::USMALLINT:
696                 {
697                     uintColVal = row.getUintField<2>(s);
698                     storeNumericField(f, uintColVal, colType);
699                     break;
700                 }
701 
702                 case CalpontSystemCatalog::TINYINT:
703                 {
704                     intColVal = row.getIntField<1>(s);
705                     storeNumericField(f, intColVal, colType);
706                     break;
707                 }
708 
709                 case CalpontSystemCatalog::UTINYINT:
710                 {
711                     uintColVal = row.getUintField<1>(s);
712                     storeNumericField(f, uintColVal, colType);
713                     break;
714                 }
715 
716                 //In this case, we're trying to load a double output column with float data. This is the
717                 // case when you do sum(floatcol), e.g.
718                 case CalpontSystemCatalog::FLOAT:
719                 case CalpontSystemCatalog::UFLOAT:
720                 {
721                     float dl = row.getFloatField(s);
722 
723                     if (dl == std::numeric_limits<float>::infinity())
724                         continue;
725 
726                     // bug 3485, reserve enough space for the longest float value
727                     // -3.402823466E+38 to -1.175494351E-38, 0, and
728                     // 1.175494351E-38 to 3.402823466E+38.
729                     (*f)->field_length = 40;
730                     (*f)->store(dl);
731 
732                     if ((*f)->null_ptr)
733                         *(*f)->null_ptr &= ~(*f)->null_bit;
734 
735                     break;
736                 }
737 
738                 case CalpontSystemCatalog::DOUBLE:
739                 case CalpontSystemCatalog::UDOUBLE:
740                 {
741                     double dl = row.getDoubleField(s);
742 
743                     if (dl == std::numeric_limits<double>::infinity())
744                         continue;
745 
746                     if ((*f)->type() == MYSQL_TYPE_NEWDECIMAL)
747                     {
748                         char buf[310];
749                         // reserve enough space for the longest double value
750                         // -1.7976931348623157E+308 to -2.2250738585072014E-308, 0, and
751                         // 2.2250738585072014E-308 to 1.7976931348623157E+308.
752                         snprintf(buf, 310, "%.18g", dl);
753                         (*f)->store(buf, strlen(buf), (*f)->charset());
754                     }
755                     else
756                     {
757                         // The server converts dl=-0 to dl=0 in (*f)->store().
758                         // This happens in the call to truncate_double().
759                         // This is an unexpected behaviour, so we directly store the
760                         // double value using the lower level float8store() function.
761                         // TODO Remove this when (*f)->store() handles this properly.
762                         (*f)->field_length = 310;
763                         if (dl == 0)
764                             float8store((*f)->ptr,dl);
765                         else
766                             (*f)->store(dl);
767                     }
768                     if ((*f)->null_ptr)
769                         *(*f)->null_ptr &= ~(*f)->null_bit;
770 
771                     break;
772                 }
773 
774                 case CalpontSystemCatalog::LONGDOUBLE:
775                 {
776                     long double dl = row.getLongDoubleField(s);
777                     if (dl == std::numeric_limits<long double>::infinity())
778                     {
779                         continue;
780                     }
781 
782                     if ((*f)->type() == MYSQL_TYPE_NEWDECIMAL)
783                     {
784                         char buf[310];
785                         snprintf(buf, 310, "%.20Lg", dl);
786                         (*f)->store(buf, strlen(buf), (*f)->charset());
787                     }
788                     else
789                     {
790                         // reserve enough space for the longest double value
791                         // -1.7976931348623157E+308 to -2.2250738585072014E-308, 0, and
792                         // 2.2250738585072014E-308 to 1.7976931348623157E+308.
793                         (*f)->field_length = 310;
794                         (*f)->store(static_cast<double>(dl));
795                     }
796                     if ((*f)->null_ptr)
797                         *(*f)->null_ptr &= ~(*f)->null_bit;
798                     break;
799                 }
800 
801                 case CalpontSystemCatalog::DECIMAL:
802                 case CalpontSystemCatalog::UDECIMAL:
803                 {
804                     intColVal = row.getIntField(s);
805                     storeNumericField(f, intColVal, colType);
806                     break;
807                 }
808 
809                 case CalpontSystemCatalog::BLOB:
810                 case CalpontSystemCatalog::TEXT:
811                 {
812                     Field_blob* f2 = (Field_blob*)*f;
813                     f2->set_ptr(row.getVarBinaryLength(s), (unsigned char*)row.getVarBinaryField(s));
814 
815                     if ((*f)->null_ptr)
816                         *(*f)->null_ptr &= ~(*f)->null_bit;
817 
818                     break;
819                 }
820 
821                 default:	// treat as int64
822                 {
823                     intColVal = row.getUintField<8>(s);
824                     storeNumericField(f, intColVal, colType);
825                     break;
826                 }
827             }
828         }
829 
830         ti.tpl_scan_ctx->rowsreturned++;
831         ti.c++;
832 #ifdef INFINIDB_DEBUG
833 
834         if ((ti.c % 1000000) == 0)
835             cerr << "fetchNextRow so far table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl;
836 
837 #endif
838         ti.moreRows = true;
839         rc = 0;
840     }
841     else if (sm_stat == sm::SQL_NOT_FOUND)
842     {
843         IDEBUG( cerr << "fetchNextRow done for table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl );
844         ti.c = 0;
845         ti.moreRows = false;
846         rc = HA_ERR_END_OF_FILE;
847     }
848     else if (sm_stat == sm::CALPONT_INTERNAL_ERROR)
849     {
850         ti.moreRows = false;
851         rc = ER_INTERNAL_ERROR;
852         ci->rc = rc;
853     }
854     else if ((uint32_t)sm_stat == logging::ERR_LOST_CONN_EXEMGR)
855     {
856         ti.moreRows = false;
857         rc = logging::ERR_LOST_CONN_EXEMGR;
858         sm::sm_init(tid2sid(current_thd->thread_id), &ci->cal_conn_hndl,
859                     get_local_query(current_thd));
860         idbassert(ci->cal_conn_hndl != 0);
861         ci->rc = rc;
862     }
863     else if (sm_stat == sm::SQL_KILLED)
864     {
865         // query was aborted by the user. treat it the same as limit query. close
866         // connection after rnd_close.
867         ti.c = 0;
868         ti.moreRows = false;
869         rc = HA_ERR_END_OF_FILE;
870         ci->rc = rc;
871     }
872     else
873     {
874         ti.moreRows = false;
875         rc = sm_stat;
876         ci->rc = rc;
877     }
878 
879     return rc;
880 }
881 
makeUpdateScalarJoin(const ParseTree * n,void * obj)882 void makeUpdateScalarJoin(const ParseTree* n, void* obj)
883 {
884     TreeNode* tn = n->data();
885     SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);
886 
887     if (!sf)
888         return;
889 
890     SimpleColumn* scLeft = dynamic_cast<SimpleColumn*>(sf->lhs());
891     SimpleColumn* scRight = dynamic_cast<SimpleColumn*>(sf->rhs());
892     CalpontSystemCatalog::TableAliasName* updatedTables = reinterpret_cast<CalpontSystemCatalog::TableAliasName*>(obj);
893 
894     if ( scLeft && scRight )
895     {
896         if ( (strcasecmp(scLeft->tableName().c_str(), updatedTables->table.c_str()) == 0 ) && (strcasecmp(scLeft->schemaName().c_str(), updatedTables->schema.c_str()) == 0)
897                 && (strcasecmp(scLeft->tableAlias().c_str(), updatedTables->alias.c_str()) == 0))
898         {
899             uint64_t lJoinInfo = sf->lhs()->joinInfo();
900             lJoinInfo |= JOIN_SCALAR;
901             //lJoinInfo |= JOIN_OUTER_SELECT;
902             //lJoinInfo |= JOIN_CORRELATED;
903             sf->lhs()->joinInfo(lJoinInfo);
904         }
905         else if ( (strcasecmp(scRight->tableName().c_str(), updatedTables->table.c_str()) == 0) && (strcasecmp(scRight->schemaName().c_str(), updatedTables->schema.c_str()) == 0)
906                   && (strcasecmp(scRight->tableAlias().c_str(), updatedTables->alias.c_str()) == 0))
907         {
908             uint64_t rJoinInfo = sf->rhs()->joinInfo();
909             rJoinInfo |= JOIN_SCALAR;
910             //rJoinInfo |= JOIN_OUTER_SELECT;
911             //rJoinInfo |= JOIN_CORRELATED;
912             sf->rhs()->joinInfo(rJoinInfo);
913         }
914         else
915             return;
916     }
917     else
918         return;
919 }
920 
makeUpdateSemiJoin(const ParseTree * n,void * obj)921 void makeUpdateSemiJoin(const ParseTree* n, void* obj)
922 {
923     TreeNode* tn = n->data();
924     SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);
925 
926     if (!sf)
927         return;
928 
929     SimpleColumn* scLeft = dynamic_cast<SimpleColumn*>(sf->lhs());
930     SimpleColumn* scRight = dynamic_cast<SimpleColumn*>(sf->rhs());
931     CalpontSystemCatalog::TableAliasName* updatedTables = reinterpret_cast<CalpontSystemCatalog::TableAliasName*>(obj);
932 
933     //@Bug 3279. Added a check for column filters.
934     if ( scLeft && scRight && (strcasecmp(scRight->tableAlias().c_str(), scLeft->tableAlias().c_str()) != 0))
935     {
936         if ( (strcasecmp(scLeft->tableName().c_str(), updatedTables->table.c_str()) == 0 ) && (strcasecmp(scLeft->schemaName().c_str(), updatedTables->schema.c_str()) == 0)
937                 && (strcasecmp(scLeft->tableAlias().c_str(), updatedTables->alias.c_str()) == 0))
938         {
939             uint64_t lJoinInfo = sf->lhs()->joinInfo();
940             lJoinInfo |= JOIN_SEMI;
941             //lJoinInfo |= JOIN_OUTER_SELECT;
942             //lJoinInfo |= JOIN_CORRELATED;
943             sf->lhs()->joinInfo(lJoinInfo);
944         }
945         else if ( (strcasecmp(scRight->tableName().c_str(), updatedTables->table.c_str()) == 0) && (strcasecmp(scRight->schemaName().c_str(), updatedTables->schema.c_str()) == 0)
946                   && (strcasecmp(scRight->tableAlias().c_str(), updatedTables->alias.c_str()) == 0))
947         {
948             uint64_t rJoinInfo = sf->rhs()->joinInfo();
949             rJoinInfo |= JOIN_SEMI;
950             //rJoinInfo |= JOIN_OUTER_SELECT;
951             //rJoinInfo |= JOIN_CORRELATED;
952             sf->rhs()->joinInfo(rJoinInfo);
953         }
954         else
955             return;
956     }
957     else
958         return;
959 }
960 
getOnUpdateTimestampColumns(string & schema,string & tableName,int sessionID)961 vector<string> getOnUpdateTimestampColumns(string& schema, string& tableName, int sessionID)
962 {
963     vector<string> returnVal;
964     typedef CalpontSelectExecutionPlan::ColumnMap::value_type CMVT_;
965     boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
966     csc->identity(execplan::CalpontSystemCatalog::FE);
967     CalpontSystemCatalog::TableName aTableName;
968 
969     // select columnname from calpontsys.syscolumn
970     // where schema = schema and tablename = tableName
971     // and datatype = 'timestamp'
972     // and defaultvalue = 'current_timestamp() ON UPDATE current_timestamp()'
973     CalpontSelectExecutionPlan csep;
974     CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList;
975     CalpontSelectExecutionPlan::FilterTokenList filterTokenList;
976     CalpontSelectExecutionPlan::ColumnMap colMap;
977 
978     SessionManager sm;
979     BRM::TxnID txnID;
980     txnID = sm.getTxnID(sessionID);
981 
982     if (!txnID.valid)
983     {
984         txnID.id = 0;
985         txnID.valid = true;
986     }
987 
988     QueryContext verID;
989     verID = sm.verID();
990     csep.txnID(txnID.id);
991     csep.verID(verID);
992     csep.sessionID(sessionID);
993 
994     string sysTable = "calpontsys.syscolumn.";
995     string firstCol = sysTable + "columnname";
996     SimpleColumn* c1 = new SimpleColumn(firstCol, sessionID);
997     string secondCol = sysTable + "schema";
998     SimpleColumn* c2 = new SimpleColumn(secondCol, sessionID);
999     string thirdCol = sysTable + "tablename";
1000     SimpleColumn* c3 = new SimpleColumn(thirdCol, sessionID);
1001     string fourthCol = sysTable + "datatype";
1002     SimpleColumn* c4 = new SimpleColumn(fourthCol, sessionID);
1003     string fifthCol = sysTable + "defaultvalue";
1004     SimpleColumn* c5 = new SimpleColumn(fifthCol, sessionID);
1005     SRCP srcp;
1006     srcp.reset(c1);
1007     colMap.insert(CMVT_(firstCol, srcp));
1008     srcp.reset(c2);
1009     colMap.insert(CMVT_(secondCol, srcp));
1010     srcp.reset(c3);
1011     colMap.insert(CMVT_(thirdCol, srcp));
1012     srcp.reset(c4);
1013     colMap.insert(CMVT_(fourthCol, srcp));
1014     srcp.reset(c5);
1015     colMap.insert(CMVT_(fifthCol, srcp));
1016     csep.columnMapNonStatic(colMap);
1017     srcp.reset(c1->clone());
1018     returnedColumnList.push_back(srcp);
1019     csep.returnedCols(returnedColumnList);
1020 
1021     // Filters
1022     const SOP opeq(new Operator("="));
1023     SimpleFilter* f1 = new SimpleFilter (opeq,
1024                                          c2->clone(),
1025                                          new ConstantColumn(schema, ConstantColumn::LITERAL));
1026     filterTokenList.push_back(f1);
1027     filterTokenList.push_back(new Operator("and"));
1028 
1029     SimpleFilter* f2 = new SimpleFilter (opeq,
1030                                          c3->clone(),
1031                                          new ConstantColumn(tableName, ConstantColumn::LITERAL));
1032     filterTokenList.push_back(f2);
1033     filterTokenList.push_back(new Operator("and"));
1034 
1035     SimpleFilter* f3 = new SimpleFilter (opeq,
1036                                          c4->clone(),
1037                                          new ConstantColumn((uint64_t) execplan::CalpontSystemCatalog::TIMESTAMP, ConstantColumn::NUM));
1038     filterTokenList.push_back(f3);
1039     filterTokenList.push_back(new Operator("and"));
1040 
1041     string defaultValue = "current_timestamp() ON UPDATE current_timestamp()";
1042     SimpleFilter* f4 = new SimpleFilter (opeq,
1043                                          c5->clone(),
1044                                          new ConstantColumn(defaultValue, ConstantColumn::LITERAL));
1045     filterTokenList.push_back(f4);
1046     csep.filterTokenList(filterTokenList);
1047 
1048     CalpontSelectExecutionPlan::TableList tablelist;
1049     tablelist.push_back(make_aliastable("calpontsys", "syscolumn", ""));
1050     csep.tableList(tablelist);
1051 
1052     boost::shared_ptr<messageqcpp::MessageQueueClient> exemgrClient (new messageqcpp::MessageQueueClient("ExeMgr1"));
1053     ByteStream msg, emsgBs;
1054     rowgroup::RGData rgData;
1055     ByteStream::quadbyte qb = 4;
1056     msg << qb;
1057     rowgroup::RowGroup* rowGroup = 0;
1058     uint32_t rowCount;
1059 
1060     exemgrClient->write(msg);
1061     ByteStream msgPlan;
1062     csep.serialize(msgPlan);
1063     exemgrClient->write(msgPlan);
1064     msg.restart();
1065     msg = exemgrClient->read(); //error handling
1066     emsgBs = exemgrClient->read();
1067     ByteStream::quadbyte qb1;
1068 
1069     if (emsgBs.length() == 0)
1070     {
1071         //exemgrClient->shutdown();
1072         //delete exemgrClient;
1073         //exemgrClient = 0;
1074         throw runtime_error("Lost conection to ExeMgr.");
1075     }
1076 
1077     string emsgStr;
1078     emsgBs >> emsgStr;
1079 
1080     if (msg.length() == 4)
1081     {
1082         msg >> qb1;
1083 
1084         if (qb1 != 0)
1085         {
1086             //exemgrClient->shutdown();
1087             //delete exemgrClient;
1088             //exemgrClient = 0;
1089             throw runtime_error(emsgStr);
1090         }
1091     }
1092 
1093     while (true)
1094     {
1095         msg.restart();
1096         msg = exemgrClient->read();
1097 
1098         if ( msg.length() == 0 )
1099         {
1100             //exemgrClient->shutdown();
1101             //delete exemgrClient;
1102             //exemgrClient = 0;
1103             throw runtime_error("Lost conection to ExeMgr.");
1104         }
1105         else
1106         {
1107             if (!rowGroup)
1108             {
1109                 //This is mete data
1110                 rowGroup = new rowgroup::RowGroup();
1111                 rowGroup->deserialize(msg);
1112                 qb = 100;
1113                 msg.restart();
1114                 msg << qb;
1115                 exemgrClient->write(msg);
1116                 continue;
1117             }
1118 
1119             rgData.deserialize(msg);
1120             rowGroup->setData(&rgData);
1121 
1122             if (rowGroup->getStatus() != 0)
1123             {
1124                 //msg.advance(rowGroup->getDataSize());
1125                 msg >> emsgStr;
1126                 //exemgrClient->shutdown();
1127                 //delete exemgrClient;
1128                 //exemgrClient = 0;
1129                 throw runtime_error(emsgStr);
1130             }
1131 
1132             rowCount = rowGroup->getRowCount();
1133             if (rowCount > 0)
1134             {
1135                 rowgroup::Row row;
1136                 rowGroup->initRow(&row);
1137                 for (uint32_t i = 0; i < rowCount; i++)
1138                 {
1139                     rowGroup->getRow(i, &row);
1140                     // we are only fetching a single column
1141                     returnVal.push_back(row.getStringField(0));
1142                 }
1143             }
1144             else
1145             {
1146                 break;
1147             }
1148 
1149             //exemgrClient->shutdown();
1150             //delete exemgrClient;
1151             //exemgrClient = 0;
1152         }
1153     }
1154 
1155     return returnVal;
1156 }
1157 
doUpdateDelete(THD * thd,gp_walk_info & gwi,const std::vector<COND * > & condStack)1158 uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& condStack)
1159 {
1160     if (get_fe_conn_info_ptr() == nullptr)
1161         set_fe_conn_info_ptr((void*)new cal_connection_info());
1162 
1163     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
1164 
1165     if (ci->isSlaveNode && !thd->slave_thread)
1166     {
1167         string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE);
1168         setError(thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
1169         return ER_CHECK_NOT_IMPLEMENTED;
1170     }
1171 
1172     //@Bug 4387. Check BRM status before start statement.
1173     boost::scoped_ptr<DBRM> dbrmp(new DBRM());
1174     int rc = dbrmp->isReadWrite();
1175 
1176     if (rc != 0 )
1177     {
1178         setError(current_thd, ER_READ_ONLY_MODE, "Cannot execute the statement. DBRM is read only!");
1179         return ER_READ_ONLY_MODE;
1180     }
1181 
1182     // stats start
1183     ci->stats.reset();
1184     ci->stats.setStartTime();
1185     if (thd->main_security_ctx.user)
1186     {
1187         ci->stats.fUser = thd->main_security_ctx.user;
1188     }
1189     else
1190     {
1191         ci->stats.fUser = "";
1192     }
1193 
1194     if (thd->main_security_ctx.host)
1195         ci->stats.fHost = thd->main_security_ctx.host;
1196     else if (thd->main_security_ctx.host_or_ip)
1197         ci->stats.fHost = thd->main_security_ctx.host_or_ip;
1198     else
1199         ci->stats.fHost = "unknown";
1200 
1201     try
1202     {
1203         ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
1204     }
1205     catch (std::exception& e)
1206     {
1207         string msg = string("Columnstore User Priority - ") + e.what();
1208         push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
1209     }
1210 
1211     ci->stats.fSessionID = tid2sid(thd->thread_id);
1212 
1213     LEX* lex = thd->lex;
1214     idbassert(lex != 0);
1215 
1216     // Error out DELETE on VIEW. It's currently not supported.
1217     // @note DELETE on VIEW works natually (for simple cases at least), but we choose to turn it off
1218     // for now - ZZ.
1219     TABLE_LIST* tables = thd->lex->query_tables;
1220 
1221     for (; tables; tables = tables->next_local)
1222     {
1223         if (tables->view)
1224         {
1225             Message::Args args;
1226 
1227             if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
1228                 args.add("Update");
1229 #if 0
1230             else if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0)
1231                 args.add("Row based replication event");
1232 #endif
1233             else
1234                 args.add("Delete");
1235 
1236             string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_VIEW, args);
1237             setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
1238             return ER_CHECK_NOT_IMPLEMENTED;
1239         }
1240 
1241         /*
1242         #if (defined(_MSC_VER) && defined(_DEBUG)) || defined(SAFE_MUTEX)
1243         		if ((strcmp((*tables->table->s->db_plugin)->name.str, "InfiniDB") != 0) && (strcmp((*tables->table->s->db_plugin)->name.str, "MEMORY") != 0) &&
1244         					   (tables->table->s->table_category != TABLE_CATEGORY_TEMPORARY) )
1245         #else
1246         		if ((strcmp(tables->table->s->db_plugin->name.str, "InfiniDB") != 0) && (strcmp(tables->table->s->db_plugin->name.str, "MEMORY") != 0) &&
1247         					   (tables->table->s->table_category != TABLE_CATEGORY_TEMPORARY) )
1248         #endif
1249         		{
1250         			Message::Args args;
1251         			args.add("Non Calpont table(s)");
1252         			string emsg(IDBErrorInfo::instance()->errorMsg(ERR_DML_NOT_SUPPORT_FEATURE, args));
1253         			setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
1254         			return ER_CHECK_NOT_IMPLEMENTED;
1255         		}
1256         */
1257     }
1258 
1259     // @bug 1127. Re-construct update stmt using lex instead of using the original query.
1260     char* query_char = idb_mysql_query_str(thd);
1261     std::string dmlStmt;
1262     if (!query_char)
1263     {
1264         dmlStmt = "<Replication event>";
1265     }
1266     else
1267     {
1268         dmlStmt = query_char;
1269     }
1270     string schemaName;
1271     string tableName("");
1272     string aliasName("");
1273     UpdateSqlStatement updateSqlStmt;
1274     ColumnAssignmentList* colAssignmentListPtr = new ColumnAssignmentList();
1275     bool isFromCol = false;
1276     bool isFromSameTable = true;
1277     execplan::SCSEP updateCP(new execplan::CalpontSelectExecutionPlan());
1278 
1279     updateCP->isDML(true);
1280 
1281     //@Bug 2753. the memory already freed by destructor of UpdateSqlStatement
1282     if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
1283     {
1284         ColumnAssignment* columnAssignmentPtr;
1285         Item_field* item;
1286         List_iterator_fast<Item> field_it(thd->lex->first_select_lex()->item_list);
1287         List_iterator_fast<Item> value_it(thd->lex->value_list);
1288         updateCP->queryType(CalpontSelectExecutionPlan::UPDATE);
1289         ci->stats.fQueryType = updateCP->queryType();
1290         uint32_t cnt = 0;
1291         tr1::unordered_set<string> timeStampColumnNames;
1292 
1293         while ((item = (Item_field*) field_it++))
1294         {
1295             cnt++;
1296 
1297             string tmpTableName = bestTableName(item);
1298 
1299             //@Bug 5312 populate aliasname with tablename if it is empty
1300             if (!item->table_name.str)
1301                 aliasName = tmpTableName;
1302             else
1303                 aliasName = item->table_name.str;
1304 
1305             if (lower_case_table_names)
1306             {
1307                 boost::algorithm::to_lower(aliasName);
1308                 boost::algorithm::to_lower(tableName);
1309                 boost::algorithm::to_lower(tmpTableName);
1310             }
1311 
1312             if (strcasecmp(tableName.c_str(), "") == 0)
1313             {
1314                 tableName = tmpTableName;
1315             }
1316             else if (strcmp(tableName.c_str(), tmpTableName.c_str()) != 0)
1317             {
1318                 //@ Bug3326 error out for multi table update
1319                 string emsg(IDBErrorInfo::instance()->errorMsg(ERR_UPDATE_NOT_SUPPORT_FEATURE));
1320                 thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, emsg.c_str());
1321                 ci->rc = -1;
1322                 thd->set_row_count_func(0);
1323                 return -1;
1324             }
1325 
1326             if (!item->db_name.str)
1327             {
1328                 //@Bug 5312. if subselect, wait until the schema info is available.
1329                 if (thd->derived_tables_processing)
1330                     return 0;
1331                 else
1332                 {
1333                     thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, "The statement cannot be processed without schema.");
1334                     ci->rc = -1;
1335                     thd->set_row_count_func(0);
1336                     return -1;
1337                 }
1338             }
1339             else
1340             {
1341                 schemaName = string(item->db_name.str);
1342                 if (lower_case_table_names)
1343                 {
1344                     boost::algorithm::to_lower(schemaName);
1345                 }
1346             }
1347             columnAssignmentPtr = new ColumnAssignment(item->name.str, "=", "");
1348             if (item->field_type() == MYSQL_TYPE_TIMESTAMP ||
1349                 item->field_type() == MYSQL_TYPE_TIMESTAMP2)
1350             {
1351                 timeStampColumnNames.insert(string(item->name.str));
1352             }
1353 
1354             Item* value = value_it++;
1355 
1356             if (value->type() ==  Item::CONST_ITEM)
1357             {
1358                 if (value->cmp_type() == STRING_RESULT ||
1359                     value->cmp_type() == DECIMAL_RESULT ||
1360                     value->cmp_type() == REAL_RESULT)
1361                 {
1362                     //@Bug 2587 use val_str to replace value->name to get rid of 255 limit
1363                     String val, *str;
1364                     str = value->val_str(&val);
1365                     columnAssignmentPtr->fScalarExpression.assign(str->ptr(), str->length());
1366                     columnAssignmentPtr->fFromCol = false;
1367                 }
1368                 else if (value->cmp_type() ==  INT_RESULT)
1369                 {
1370                     std::ostringstream oss;
1371 
1372                     if (value->unsigned_flag)
1373                     {
1374                         oss << value->val_uint();
1375                     }
1376                     else
1377                     {
1378                         oss << value->val_int();
1379                     }
1380 
1381                     columnAssignmentPtr->fScalarExpression = oss.str();
1382                     columnAssignmentPtr->fFromCol = false;
1383                 }
1384             }
1385            else if ( value->type() ==  Item::FUNC_ITEM )
1386             {
1387                 //Bug 2092 handle negative values
1388                 Item_func* ifp = (Item_func*)value;
1389 
1390                 if (ifp->result_type() == DECIMAL_RESULT)
1391                     columnAssignmentPtr->fFuncScale = ifp->decimals; //decimal scale
1392 
1393                 vector <Item_field*> tmpVec;
1394                 bool hasNonSupportItem = false;
1395                 uint16_t parseInfo = 0;
1396                 parse_item(ifp, tmpVec, hasNonSupportItem, parseInfo);
1397 
1398                 // const f&e evaluate here. @bug3513. Rule out special functions that takes
1399                 // no argument but needs to be sent to the back end to process. Like rand(),
1400                 // sysdate() etc.
1401                 if (!hasNonSupportItem && !cal_impl_if::nonConstFunc(ifp) && tmpVec.size() == 0)
1402                 {
1403                     gp_walk_info gwi;
1404                     gwi.thd = thd;
1405                     SRCP srcp(buildReturnedColumn(value, gwi, gwi.fatalParseError));
1406                     ConstantColumn* constCol = dynamic_cast<ConstantColumn*>(srcp.get());
1407 
1408                     if (constCol )
1409                     {
1410                         columnAssignmentPtr->fScalarExpression  = constCol->constval();
1411                         isFromCol = false;
1412                         columnAssignmentPtr->fFromCol = false;
1413                     }
1414                     else
1415                     {
1416                         isFromCol = true;
1417                         columnAssignmentPtr->fFromCol = true;
1418 
1419                     }
1420                 }
1421                 else
1422                 {
1423                     isFromCol = true;
1424                     columnAssignmentPtr->fFromCol = true;
1425                 }
1426 
1427                 if ( isFromCol )
1428                 {
1429                     string sectableName("");
1430                     string secschemaName ("");
1431 
1432                     for ( unsigned i = 0; i < tmpVec.size(); i++ )
1433                     {
1434                         sectableName = bestTableName(tmpVec[i]);
1435 
1436                         if ( tmpVec[i]->db_name.str )
1437                         {
1438                             secschemaName = string(tmpVec[i]->db_name.str);
1439                         }
1440 
1441                         if ( (strcasecmp(tableName.c_str(), sectableName.c_str()) != 0) ||
1442                                 (strcasecmp(schemaName.c_str(), secschemaName.c_str()) != 0))
1443                         {
1444                             isFromSameTable = false;
1445                             break;
1446                         }
1447                     }
1448                 }
1449             }
1450             else if ( value->type() ==  Item::FIELD_ITEM)
1451             {
1452                 isFromCol = true;
1453                 columnAssignmentPtr->fFromCol = true;
1454                 Item_field* setIt = reinterpret_cast<Item_field*> (value);
1455                 string sectableName = string(setIt->table_name.str);
1456 
1457                 if ( setIt->db_name.str ) //derived table
1458                 {
1459                     string secschemaName = string(setIt->db_name.str);
1460 
1461                     if ( (strcasecmp(tableName.c_str(), sectableName.c_str()) != 0) || (strcasecmp(schemaName.c_str(), secschemaName.c_str()) != 0))
1462                     {
1463                         isFromSameTable = false;
1464                     }
1465                 }
1466                 else
1467                 {
1468                     isFromSameTable = false;
1469                 }
1470             }
1471             else if ( value->type() ==  Item::NULL_ITEM )
1472             {
1473                 columnAssignmentPtr->fScalarExpression = "";
1474                 columnAssignmentPtr->fFromCol = false;
1475                 columnAssignmentPtr->fIsNull = true;
1476             }
1477             else if ( value->type() == Item::SUBSELECT_ITEM )
1478             {
1479                 isFromCol = true;
1480                 columnAssignmentPtr->fFromCol = true;
1481                isFromSameTable = false;
1482             }
1483             //@Bug 4449 handle default value
1484             else if (value->type() == Item::DEFAULT_VALUE_ITEM)
1485             {
1486                 Item_field* tmp = (Item_field*)value;
1487 
1488                 if (!tmp->field_name.length) //null
1489                 {
1490                     columnAssignmentPtr->fScalarExpression = "NULL";
1491                     columnAssignmentPtr->fFromCol = false;
1492                 }
1493                 else
1494                 {
1495                     String val, *str;
1496                     str = value->val_str(&val);
1497                     columnAssignmentPtr->fScalarExpression.assign(str->ptr(), str->length());
1498                     columnAssignmentPtr->fFromCol = false;
1499                 }
1500             }
1501             else if (value->type() == Item::WINDOW_FUNC_ITEM)
1502             {
1503                 setError(thd, ER_INTERNAL_ERROR,
1504                          logging::IDBErrorInfo::instance()->errorMsg(ERR_WF_UPDATE));
1505                 return ER_CHECK_NOT_IMPLEMENTED;
1506             }
1507             else
1508             {
1509                 String val, *str;
1510                 str = value->val_str(&val);
1511 
1512                 if (str)
1513                 {
1514                     columnAssignmentPtr->fScalarExpression.assign(str->ptr(), str->length());
1515                     columnAssignmentPtr->fFromCol = false;
1516                 }
1517                 else
1518                 {
1519                     columnAssignmentPtr->fScalarExpression = "NULL";
1520                     columnAssignmentPtr->fFromCol = false;
1521                 }
1522             }
1523 
1524             colAssignmentListPtr->push_back ( columnAssignmentPtr );
1525         }
1526 
1527         // Support for on update current_timestamp() for timestamp fields
1528         // Query calpontsys.syscolumn to get all timestamp columns with
1529         // ON UPDATE current_timestamp() property
1530         vector<string> onUpdateTimeStampColumns = getOnUpdateTimestampColumns(schemaName, tableName, tid2sid(thd->thread_id));
1531         for (size_t i = 0; i < onUpdateTimeStampColumns.size(); i++)
1532         {
1533             if (timeStampColumnNames.find(onUpdateTimeStampColumns[i]) == timeStampColumnNames.end())
1534             {
1535                 // DRRTUY * That is far from ideal.
1536                 columnAssignmentPtr = new ColumnAssignment(string(onUpdateTimeStampColumns[i]), "=", "");
1537                 struct timeval tv;
1538                 char buf[64];
1539                 gettimeofday(&tv, 0);
1540                 MySQLTime time;
1541                 gmtSecToMySQLTime(tv.tv_sec, time, thd->variables.time_zone->get_name()->ptr());
1542                 sprintf(buf, "%04d-%02d-%02d %02d:%02d:%02d.%06ld", time.year, time.month, time.day, time.hour, time.minute, time.second, tv.tv_usec);
1543                 columnAssignmentPtr->fScalarExpression = buf;
1544                 colAssignmentListPtr->push_back ( columnAssignmentPtr );
1545             }
1546         }
1547     }
1548 #if 0
1549     else if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0)
1550     {
1551         string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_RBR_EVENT);
1552         setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
1553         return ER_CHECK_NOT_IMPLEMENTED;
1554     }
1555 #endif
1556     else
1557     {
1558         updateCP->queryType(CalpontSelectExecutionPlan::DELETE);
1559         ci->stats.fQueryType = updateCP->queryType();
1560     }
1561 
1562     //save table oid for commit/rollback to use
1563     uint32_t sessionID = tid2sid(thd->thread_id);
1564     boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
1565     csc->identity(execplan::CalpontSystemCatalog::FE);
1566     CalpontSystemCatalog::TableName aTableName;
1567     TABLE_LIST* first_table = 0;
1568 
1569     if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1570     {
1571         aTableName.schema = schemaName;
1572         aTableName.table = tableName;
1573     }
1574     else
1575     {
1576         first_table = (TABLE_LIST*) thd->lex->first_select_lex()->table_list.first;
1577         aTableName.schema = first_table->table->s->db.str;
1578         aTableName.table = first_table->table->s->table_name.str;
1579     }
1580     if (lower_case_table_names)
1581     {
1582         boost::algorithm::to_lower(aTableName.schema);
1583         boost::algorithm::to_lower(aTableName.table);
1584     }
1585 
1586     CalpontDMLPackage* pDMLPackage = 0;
1587 //	dmlStmt += ";";
1588     IDEBUG( cout << "STMT: " << dmlStmt << " and sessionID " << thd->thread_id <<  endl );
1589     VendorDMLStatement dmlStatement(dmlStmt, sessionID);
1590 
1591     if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1592         dmlStatement.set_DMLStatementType( DML_UPDATE );
1593     else
1594         dmlStatement.set_DMLStatementType( DML_DELETE );
1595 
1596     TableName* qualifiedTablName = new TableName();
1597 
1598 
1599     UpdateSqlStatement updateStmt;
1600     //@Bug 2753. To make sure the momory is freed.
1601     updateStmt.fColAssignmentListPtr = colAssignmentListPtr;
1602 
1603     if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1604     {
1605         qualifiedTablName->fName = tableName;
1606         qualifiedTablName->fSchema = schemaName;
1607         updateStmt.fNamePtr = qualifiedTablName;
1608         pDMLPackage = CalpontDMLFactory::makeCalpontUpdatePackageFromMysqlBuffer( dmlStatement, updateStmt );
1609     }
1610     else if ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) //@Bug 6121 error out on multi tables delete.
1611     {
1612         if ( (thd->lex->first_select_lex()->join) != 0)
1613         {
1614             multi_delete* deleteTable = (multi_delete*)((thd->lex->first_select_lex()->join)->result);
1615             first_table = (TABLE_LIST*) deleteTable->get_tables();
1616 
1617             if (deleteTable->get_num_of_tables() == 1)
1618             {
1619                 schemaName = first_table->db.str;
1620                 tableName = first_table->table_name.str;
1621                 aliasName = first_table->alias.str;
1622                 if (lower_case_table_names)
1623                 {
1624                     boost::algorithm::to_lower(schemaName);
1625                     boost::algorithm::to_lower(tableName);
1626                     boost::algorithm::to_lower(aliasName);
1627                 }
1628                 qualifiedTablName->fName = tableName;
1629                 qualifiedTablName->fSchema = schemaName;
1630                 pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement);
1631             }
1632             else
1633             {
1634                 string emsg("Deleting rows from multiple tables in a single statement is currently not supported.");
1635                 thd->raise_error_printf(ER_INTERNAL_ERROR, emsg.c_str());
1636                 ci->rc = 1;
1637                 thd->set_row_count_func(0);
1638                 return 1;
1639             }
1640         }
1641         else
1642         {
1643             first_table = (TABLE_LIST*) thd->lex->first_select_lex()->table_list.first;
1644             schemaName = first_table->table->s->db.str;
1645             tableName = first_table->table->s->table_name.str;
1646             aliasName = first_table->alias.str;
1647             if (lower_case_table_names)
1648             {
1649                 boost::algorithm::to_lower(schemaName);
1650                 boost::algorithm::to_lower(tableName);
1651                 boost::algorithm::to_lower(aliasName);
1652             }
1653             qualifiedTablName->fName = tableName;
1654             qualifiedTablName->fSchema = schemaName;
1655             pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement);
1656         }
1657     }
1658     else
1659     {
1660         first_table = (TABLE_LIST*) thd->lex->first_select_lex()->table_list.first;
1661         schemaName = first_table->table->s->db.str;
1662         tableName = first_table->table->s->table_name.str;
1663         aliasName = first_table->alias.str;
1664         if (lower_case_table_names)
1665         {
1666             boost::algorithm::to_lower(schemaName);
1667             boost::algorithm::to_lower(tableName);
1668             boost::algorithm::to_lower(aliasName);
1669         }
1670         qualifiedTablName->fName = tableName;
1671         qualifiedTablName->fSchema = schemaName;
1672         pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement);
1673     }
1674 
1675     if (!pDMLPackage)
1676     {
1677         string emsg("Fatal parse error in vtable mode in DMLParser ");
1678         setError(thd, ER_INTERNAL_ERROR, emsg);
1679         return ER_INTERNAL_ERROR;
1680     }
1681 
1682     pDMLPackage->set_TableName(tableName);
1683 
1684     pDMLPackage->set_SchemaName(schemaName);
1685     pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
1686 
1687     pDMLPackage->set_IsFromCol( true );
1688     //cout << " setting 	isFromCol to " << isFromCol << endl;
1689     std::string origStmt = dmlStmt;
1690     origStmt += ";";
1691     pDMLPackage->set_SQLStatement( origStmt );
1692 
1693     //Save the item list
1694     List<Item> items;
1695     SELECT_LEX select_lex;
1696 
1697     if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1698     {
1699         items = (thd->lex->first_select_lex()->item_list);
1700         thd->lex->first_select_lex()->item_list = thd->lex->value_list;
1701     }
1702 
1703     select_lex = *lex->first_select_lex();
1704 
1705 
1706     //@Bug 2808 Error out on order by or limit clause
1707     //@bug5096. support dml limit.
1708     if (( thd->lex->first_select_lex()->order_list.elements != 0 ) )
1709     {
1710         string emsg("DML Statement with order by clause is not currently supported.");
1711         thd->raise_error_printf(ER_INTERNAL_ERROR, emsg.c_str());
1712         ci->rc = 1;
1713         thd->set_row_count_func(0);
1714         return 1;
1715     }
1716 
1717     {
1718         updateCP->subType (CalpontSelectExecutionPlan::SELECT_SUBS);
1719         //@Bug 2975.
1720         SessionManager sm;
1721         BRM::TxnID txnID;
1722         txnID = sm.getTxnID(sessionID);
1723 
1724         if (!txnID.valid)
1725         {
1726             txnID.id = 0;
1727             txnID.valid = true;
1728         }
1729 
1730         QueryContext verID;
1731         verID = sm.verID();
1732 
1733         updateCP->txnID(txnID.id);
1734         updateCP->verID(verID);
1735         updateCP->sessionID(sessionID);
1736         char* query_char = idb_mysql_query_str(thd);
1737         std::string query_str;
1738         if (!query_char)
1739         {
1740             query_str = "<Replication event>";
1741         }
1742         else
1743         {
1744             query_str = query_char;
1745         }
1746         updateCP->data(query_str);
1747 
1748         try
1749         {
1750             updateCP->priority(	ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
1751         }
1752         catch (std::exception& e)
1753         {
1754             string msg = string("Columnstore User Priority - ") + e.what();
1755             push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
1756         }
1757 
1758         gwi.clauseType = WHERE;
1759 
1760         if (getSelectPlan(gwi, select_lex, updateCP, false, false, condStack) != 0) //@Bug 3030 Modify the error message for unsupported functions
1761         {
1762             if (gwi.cs_vtable_is_update_with_derive)
1763             {
1764                 // @bug 4457. MySQL inconsistence! for some queries, some structures are only available
1765                 // in the derived_tables_processing phase. So by pass the phase for DML only when the
1766                 // execution plan can not be successfully generated. recover lex before returning;
1767                 thd->lex->first_select_lex()->item_list = items;
1768                 return 0;
1769             }
1770 
1771             //check different error code
1772             // avoid double set IDB error
1773             string emsg;
1774 
1775             if (gwi.parseErrorText.find("IDB-") == string::npos)
1776             {
1777                 Message::Args args;
1778                 args.add(gwi.parseErrorText);
1779                 emsg = IDBErrorInfo::instance()->errorMsg(ER_INTERNAL_ERROR, args);
1780             }
1781             else
1782             {
1783                 emsg = gwi.parseErrorText;
1784             }
1785 
1786             thd->raise_error_printf(ER_INTERNAL_ERROR, emsg.c_str());
1787             ci->rc = -1;
1788             thd->set_row_count_func(0);
1789             return -1;
1790 
1791         }
1792 
1793 //cout<< "Plan before is " << endl << *updateCP << endl;
1794         //set the large side by putting the updated table at the first
1795         CalpontSelectExecutionPlan::TableList tbList = updateCP->tableList();
1796 
1797         CalpontSelectExecutionPlan::TableList::iterator iter = tbList.begin();
1798         bool notFirst = false;
1799 
1800         while ( iter != tbList.end() )
1801         {
1802             if ( ( iter != tbList.begin() ) && (iter->schema == schemaName) && ( iter->table == tableName ) && ( iter->alias == aliasName ) )
1803             {
1804                 notFirst = true;
1805                 tbList.erase(iter);
1806                 break;
1807             }
1808 
1809             iter++;
1810         }
1811 
1812         if ( notFirst )
1813         {
1814             CalpontSystemCatalog::TableAliasName tn = make_aliastable(schemaName, tableName, aliasName);
1815             iter = tbList.begin();
1816             tbList.insert( iter, 1, tn );
1817         }
1818 
1819         updateCP->tableList( tbList );
1820         // DRRTUY * this is very optimisic assumption
1821         updateCP->overrideLargeSideEstimate( true );
1822         //loop through returnedcols to find out constant columns
1823         CalpontSelectExecutionPlan::ReturnedColumnList returnedCols = updateCP->returnedCols();
1824         CalpontSelectExecutionPlan::ReturnedColumnList::iterator coliter = returnedCols.begin();
1825 
1826         while ( coliter != returnedCols.end() )
1827         {
1828             ConstantColumn* returnCol = dynamic_cast<ConstantColumn*>((*coliter).get());
1829 
1830             if (returnCol )
1831             {
1832                 returnedCols.erase(coliter);
1833                 coliter = returnedCols.begin();
1834             }
1835             else
1836                 coliter++;
1837         }
1838 
1839         if ((updateCP->columnMap()).empty())
1840             throw runtime_error ("column map is empty!");
1841 
1842         if (returnedCols.empty())
1843             returnedCols.push_back((updateCP->columnMap()).begin()->second);
1844 
1845         //@Bug 6123. get the correct returned columnlist
1846         if (( (thd->lex)->sql_command == SQLCOM_DELETE ) || ( (thd->lex)->sql_command == SQLCOM_DELETE_MULTI ) )
1847         {
1848             returnedCols.clear();
1849             //choose the smallest column to project
1850             CalpontSystemCatalog::TableName deleteTableName;
1851             deleteTableName.schema = schemaName;
1852             deleteTableName.table = tableName;
1853             CalpontSystemCatalog::RIDList colrids;
1854 
1855             try
1856             {
1857                 colrids = csc->columnRIDs(deleteTableName, false, lower_case_table_names);
1858             }
1859             catch (IDBExcept& ie)
1860             {
1861                 thd->raise_error_printf(ER_INTERNAL_ERROR, ie.what());
1862                 ci->rc = -1;
1863                 thd->set_row_count_func(0);
1864                 return -1;
1865             }
1866 
1867             int minColWidth = -1;
1868             int minWidthColOffset = 0;
1869 
1870             for (unsigned int j = 0; j < colrids.size(); j++)
1871             {
1872                 CalpontSystemCatalog::ColType ct = csc->colType(colrids[j].objnum);
1873 
1874                 if (ct.colDataType == CalpontSystemCatalog::VARBINARY)
1875                     continue;
1876 
1877                 if (minColWidth == -1 || ct.colWidth < minColWidth)
1878                 {
1879                     minColWidth = ct.colWidth;
1880                     minWidthColOffset = j;
1881                 }
1882             }
1883 
1884             CalpontSystemCatalog::TableColName tcn = csc->colName(colrids[minWidthColOffset].objnum);
1885             SimpleColumn* sc = new SimpleColumn(tcn.schema, tcn.table, tcn.column, csc->sessionID());
1886             sc->tableAlias(aliasName);
1887             sc->timeZone(thd->variables.time_zone->get_name()->ptr());
1888             sc->resultType(csc->colType(colrids[minWidthColOffset].objnum));
1889             SRCP srcp;
1890             srcp.reset(sc);
1891             returnedCols.push_back(srcp);
1892             //cout << "tablename:alias = " << tcn.table<<":"<<aliasName<<endl;
1893         }
1894 
1895         updateCP->returnedCols( returnedCols );
1896 
1897         if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1898         {
1899             const ParseTree* ptsub = updateCP->filters();
1900 
1901             if ( !isFromSameTable )
1902             {
1903                 //cout << "set scalar" << endl;
1904                 //walk tree to set scalar
1905                 if (ptsub)
1906                     ptsub->walk(makeUpdateScalarJoin, &tbList[0] );
1907             }
1908             else
1909             {
1910                 //cout << "set semi" << endl;
1911                 if (ptsub)
1912                     ptsub->walk(makeUpdateSemiJoin, &tbList[0] );
1913             }
1914 
1915             if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
1916                 thd->lex->first_select_lex()->item_list = items;
1917         }
1918 
1919     }
1920 
1921     //cout<< "Plan is " << endl << *updateCP << endl;
1922     //updateCP->traceFlags(1);
1923     pDMLPackage->HasFilter(true);
1924     pDMLPackage->uuid(updateCP->uuid());
1925 
1926     ByteStream bytestream, bytestream1;
1927     bytestream << sessionID;
1928     boost::shared_ptr<messageqcpp::ByteStream> plan = pDMLPackage->get_ExecutionPlan();
1929     updateCP->rmParms(ci->rmParms);
1930     updateCP->serialize(*plan);
1931     pDMLPackage->write(bytestream);
1932 
1933     delete pDMLPackage;
1934 
1935     ByteStream::byte b = 0;
1936     ByteStream::octbyte rows = 0;
1937     std::string errorMsg;
1938     long long dmlRowCount = 0;
1939 
1940     if ( thd->killed > 0 )
1941     {
1942         return 0;
1943     }
1944 
1945     //querystats::QueryStats stats;
1946     string tableLockInfo;
1947 
1948     // Save the tableOid for the COMMIT | ROLLBACK
1949     CalpontSystemCatalog::ROPair roPair;
1950     try
1951     {
1952         roPair = csc->tableRID(aTableName);
1953     }
1954     catch (IDBExcept& ie)
1955     {
1956         setError(thd, ER_INTERNAL_ERROR, ie.what());
1957         return ER_INTERNAL_ERROR;
1958     }
1959     catch (std::exception& ex)
1960     {
1961         setError(thd, ER_INTERNAL_ERROR,
1962                  logging::IDBErrorInfo::instance()->errorMsg(ERR_SYSTEM_CATALOG) + ex.what());
1963         return ER_INTERNAL_ERROR;
1964     }
1965     ci->tableOid = roPair.objnum;
1966 
1967     // Send the request to DMLProc and wait for a response.
1968     try
1969     {
1970         timespec* tsp = 0;
1971 #ifndef _MSC_VER
1972         timespec ts;
1973         ts.tv_sec = 3L;
1974         ts.tv_nsec = 0L;
1975         tsp = &ts;
1976 #else
1977         //FIXME: @#$%^&! mysql has buggered up timespec!
1978         // The definition in my_pthread.h isn't the same as in winport/unistd.h...
1979         struct timespec_foo
1980         {
1981             long tv_sec;
1982             long tv_nsec;
1983         } ts_foo;
1984         ts_foo.tv_sec = 3;
1985         ts_foo.tv_nsec = 0;
1986         //This is only to get the compiler to not carp below at the read() call.
1987         // The messagequeue lib uses the correct struct
1988         tsp = reinterpret_cast<timespec*>(&ts_foo);
1989 #endif
1990         bool isTimeOut = true;
1991         int maxRetries = 2;
1992         std::string exMsg;
1993 
1994         // We try twice to get a response from dmlproc.
1995         // Every (3) seconds, check for ctrl+c
1996         for (int retry = 0; bytestream1.length() == 0 && retry < maxRetries; ++ retry)
1997         {
1998             try
1999             {
2000                 if (!ci->dmlProc)
2001                 {
2002                     ci->dmlProc = new MessageQueueClient("DMLProc");
2003                     //cout << "doUpdateDelete start new DMLProc client " << ci->dmlProc << " for session " << sessionID << endl;
2004                 }
2005                 else
2006                 {
2007                     delete ci->dmlProc;
2008                     ci->dmlProc = nullptr;
2009                     ci->dmlProc = new MessageQueueClient("DMLProc");
2010                 }
2011 
2012                 // Send the request to DMLProc
2013                 ci->dmlProc->write(bytestream);
2014 
2015                 // Get an answer from DMLProc
2016                 while (isTimeOut)
2017                 {
2018                     isTimeOut = false;
2019                     bytestream1 = ci->dmlProc->read(tsp, &isTimeOut);
2020 
2021                     if (b == 0 && thd->killed > 0 && isTimeOut)
2022                     {
2023                         // We send the CTRL+C command to DMLProc out of band
2024                         // (on a different connection) This will cause
2025                         // DMLProc to stop processing and return an error on the
2026                         // original connection which will cause a rollback.
2027                         messageqcpp::MessageQueueClient ctrlCProc("DMLProc");
2028                         //cout << "doUpdateDelete start new DMLProc client for ctrl-c " <<  " for session " << sessionID << endl;
2029                         VendorDMLStatement cmdStmt( "CTRL+C", DML_COMMAND, sessionID);
2030                         CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
2031                         pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
2032                         ByteStream bytestream;
2033                         bytestream << static_cast<uint32_t>(sessionID);
2034                         pDMLPackage->write(bytestream);
2035                         delete pDMLPackage;
2036                         b = 1;
2037                         retry = maxRetries;
2038                         errorMsg = "Command canceled by user";
2039 
2040                         try
2041                         {
2042                             ctrlCProc.write(bytestream);
2043                         }
2044                         catch (runtime_error&)
2045                         {
2046                             errorMsg = "Lost connection to DMLProc while doing ctrl+c";
2047                         }
2048                         catch (...)
2049                         {
2050                             errorMsg = "Unknown error caught while doing ctrl+c";
2051                         }
2052 
2053 //						break;
2054                     }
2055                 }
2056             }
2057             catch (runtime_error& ex)
2058             {
2059                 // An exception causes a retry, so fall thru
2060                 exMsg = ex.what();
2061             }
2062 
2063             if (bytestream1.length() == 0 && thd->killed <= 0)
2064             {
2065                 //cout << "line 1442. received 0 byte from DMLProc and retry = "<< retry << endl;
2066                 // Seems dmlProc isn't playing. Reset it and try again.
2067                 delete ci->dmlProc;
2068                 ci->dmlProc = nullptr;
2069                 isTimeOut = true; //@Bug 4742
2070             }
2071         }
2072 
2073         if (bytestream1.length() == 0)
2074         {
2075             // If we didn't get anything, error
2076             b = 1;
2077 
2078             if (exMsg.length() > 0)
2079             {
2080                 errorMsg = exMsg;
2081             }
2082             else
2083             {
2084                 errorMsg = "Lost connection to DMLProc";
2085             }
2086         }
2087         else
2088         {
2089             bytestream1 >> b;
2090             bytestream1 >> rows;
2091             bytestream1 >> errorMsg;
2092 
2093             if (b == 0)
2094             {
2095                 bytestream1 >> tableLockInfo;
2096                 bytestream1 >> ci->queryStats;
2097                 bytestream1 >> ci->extendedStats;
2098                 bytestream1 >> ci->miniStats;
2099                 ci->stats.unserialize(bytestream1);
2100             }
2101         }
2102 
2103         dmlRowCount = rows;
2104 
2105         if (thd->killed && b == 0)
2106         {
2107             b = dmlpackageprocessor::DMLPackageProcessor::JOB_CANCELED;
2108             errorMsg = "Command canceled by user";
2109         }
2110     }
2111     catch (runtime_error& ex)
2112     {
2113         cout << ex.what() << endl;
2114         b = 1;
2115         delete ci->dmlProc;
2116         ci->dmlProc = nullptr;
2117         errorMsg = ex.what();
2118     }
2119     catch ( ... )
2120     {
2121         //cout << "... exception while writing to DMLProc" << endl;
2122         b = 1;
2123         delete ci->dmlProc;
2124         ci->dmlProc = nullptr;
2125         errorMsg =  "Unknown error caught";
2126     }
2127 
2128     // If autocommit is on then go ahead and tell the engine to commit the transaction
2129     //@Bug 1960 If error occurs, the commit is just to release the active transaction.
2130     //@Bug 2241. Rollback transaction when it failed
2131     //@Bug 4605. If error, always rollback.
2132     if (b != dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR)
2133     {
2134         std::string command;
2135 
2136         if ((useHdfs) && (b == 0))
2137             command = "COMMIT";
2138         else if ((useHdfs) && (b != 0))
2139             command = "ROLLBACK";
2140         else if ((b == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) && thd->is_strict_mode())
2141             command = "ROLLBACK";
2142         else if ((!(current_thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) && (( b == 0 ) || (b == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)) )
2143             command = "COMMIT";
2144         else if (( b != 0 ) && (b != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) )
2145             command = "ROLLBACK";
2146         else
2147             command = "";
2148 
2149         if ( command != "")
2150         {
2151             VendorDMLStatement cmdStmt(command, DML_COMMAND, sessionID);
2152             CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
2153             pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
2154             pDMLPackage->setTableOid (ci->tableOid);
2155             ByteStream bytestream;
2156             bytestream << static_cast<uint32_t>(sessionID);
2157             pDMLPackage->write(bytestream);
2158             delete pDMLPackage;
2159 
2160             ByteStream::byte bc;
2161             std::string errMsg;
2162 
2163             try
2164             {
2165                 if (!ci->dmlProc)
2166                 {
2167                     ci->dmlProc = new MessageQueueClient("DMLProc");
2168                     //cout << " doupdateDelete command use a new dml client " << ci->dmlProc << endl;
2169                 }
2170 
2171                 ci->dmlProc->write(bytestream);
2172                 bytestream1 = ci->dmlProc->read();
2173                 bytestream1 >> bc;
2174                 bytestream1 >> rows;
2175                 bytestream1 >> errMsg;
2176 
2177                 if ( b == 0 )
2178                 {
2179                     b = bc;
2180                     errorMsg = errMsg;
2181                 }
2182             }
2183             catch (runtime_error&)
2184             {
2185                 errorMsg = "Lost connection to DMLProc";
2186                 b = 1;
2187                 delete ci->dmlProc;
2188                 ci->dmlProc = nullptr;
2189             }
2190             catch (...)
2191             {
2192                 errorMsg = "Unknown error caught";
2193                 b = 1;
2194             }
2195 
2196             // Clear tableOid for the next SQL statement
2197             ci->tableOid = 0;
2198         }
2199     }
2200 
2201     //@Bug 2241 Display an error message to user
2202 
2203     if ( ( b != 0 ) && (b != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
2204     {
2205         //@Bug 2540. Set error status instead of warning
2206         thd->raise_error_printf(ER_INTERNAL_ERROR, errorMsg.c_str());
2207         ci->rc = b;
2208         rc = ER_INTERNAL_ERROR;
2209     }
2210 
2211     if (b == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
2212     {
2213         if (thd->is_strict_mode())
2214         {
2215             thd->set_row_count_func(0);
2216             ci->rc = b;
2217             // Turn this on as MariaDB doesn't do it until the next phase
2218             thd->abort_on_warning = thd->is_strict_mode();
2219             rc = ER_INTERNAL_ERROR;
2220         }
2221         else
2222         {
2223             ci->affectedRows = dmlRowCount;
2224         }
2225 
2226         push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARN_DATA_OUT_OF_RANGE, errorMsg.c_str());
2227     }
2228     else
2229     {
2230 //		if (dmlRowCount != 0) //Bug 5117. Handling self join.
2231             ci->affectedRows = dmlRowCount;
2232         //cout << " error status " << ci->rc << " and rowcount = " << dmlRowCount << endl;
2233     }
2234 
2235     // insert query stats
2236     ci->stats.setEndTime();
2237 
2238     try
2239     {
2240         ci->stats.insert();
2241     }
2242     catch (std::exception& e)
2243     {
2244         string msg = string("Columnstore Query Stats - ") + e.what();
2245         push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
2246     }
2247 
2248     delete ci->dmlProc;
2249     ci->dmlProc = nullptr;
2250     return rc;
2251 }
2252 
2253 } //anon namespace
2254 
ha_mcs_impl_open(const char * name,int mode,uint32_t test_if_locked)2255 int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked)
2256 {
2257     IDEBUG ( cout << "ha_mcs_impl_open: " << name << ", " << mode << ", " << test_if_locked << endl );
2258     Config::makeConfig();
2259     return 0;
2260 }
2261 
ha_mcs_impl_close(void)2262 int ha_mcs_impl_close(void)
2263 {
2264     IDEBUG( cout << "ha_mcs_impl_close" << endl );
2265     return 0;
2266 }
2267 
ha_mcs_impl_discover_existence(const char * schema,const char * name)2268 int ha_mcs_impl_discover_existence(const char* schema, const char* name)
2269 {
2270     boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog();
2271 
2272     try
2273     {
2274         const CalpontSystemCatalog::OID oid = csc->lookupTableOID(make_table(schema, name, lower_case_table_names));
2275 
2276         if (oid)
2277             return 1;
2278     }
2279     catch ( ... )
2280     {
2281     }
2282 
2283     return 0;
2284 }
2285 
ha_mcs_impl_direct_update_delete_rows(bool execute,ha_rows * affected_rows,const std::vector<COND * > & condStack)2286 int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector<COND*>& condStack)
2287 {
2288     THD* thd = current_thd;
2289     int rc = 0;
2290     cal_impl_if::gp_walk_info gwi;
2291     gwi.thd = thd;
2292 
2293     if (thd->slave_thread && !get_replication_slave(thd) && (
2294                 thd->lex->sql_command == SQLCOM_INSERT ||
2295                 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
2296                 thd->lex->sql_command == SQLCOM_UPDATE ||
2297                 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
2298                 thd->lex->sql_command == SQLCOM_DELETE ||
2299                 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
2300                 thd->lex->sql_command == SQLCOM_TRUNCATE ||
2301                 thd->lex->sql_command == SQLCOM_LOAD))
2302     {
2303         if (affected_rows)
2304             *affected_rows = 0;
2305         return 0;
2306     }
2307 
2308     if (execute)
2309     {
2310         rc = doUpdateDelete(thd, gwi, condStack);
2311     }
2312 
2313     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2314     if (ci)
2315     {
2316         *affected_rows = ci->affectedRows;
2317     }
2318 
2319     return rc;
2320 }
2321 
impl_rnd_init(TABLE * table,const std::vector<COND * > & condStack)2322 int ha_mcs::impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
2323 {
2324     IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl );
2325     THD* thd = current_thd;
2326 
2327     gp_walk_info gwi;
2328     gwi.thd = thd;
2329 
2330     if (thd->slave_thread && !get_replication_slave(thd) && (
2331                 thd->lex->sql_command == SQLCOM_INSERT ||
2332                 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
2333                 thd->lex->sql_command == SQLCOM_UPDATE ||
2334                 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
2335                 thd->lex->sql_command == SQLCOM_DELETE ||
2336                 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
2337                 thd->lex->sql_command == SQLCOM_TRUNCATE ||
2338                 thd->lex->sql_command == SQLCOM_LOAD))
2339         return 0;
2340 
2341     //check whether the system is ready to process statement.
2342 #ifndef _MSC_VER
2343     static DBRM dbrm(true);
2344     int bSystemQueryReady = dbrm.getSystemQueryReady();
2345 
2346     if (bSystemQueryReady == 0)
2347     {
2348         // Still not ready
2349         setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries");
2350         return ER_INTERNAL_ERROR;
2351     }
2352     else if (bSystemQueryReady < 0)
2353     {
2354         // Still not ready
2355         setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries");
2356         return ER_INTERNAL_ERROR;
2357     }
2358 #endif
2359 
2360     // Set this to close all outstanding FEP connections on
2361     // client disconnect in handlerton::closecon_handlerton().
2362     if ( !thd_get_ha_data(thd, mcs_hton))
2363     {
2364         thd_set_ha_data(thd, mcs_hton, reinterpret_cast<void*>(0x42));
2365     }
2366 
2367 #if 0
2368     if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0)
2369     {
2370         string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_RBR_EVENT);
2371         setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
2372         return ER_CHECK_NOT_IMPLEMENTED;
2373     }
2374 #endif
2375 
2376     // If ALTER TABLE and not ENGINE= we don't need rnd_init (gets us in a bad state)
2377     if ((thd->lex->sql_command == SQLCOM_ALTER_TABLE) && !(thd->lex->create_info.used_fields & HA_CREATE_USED_ENGINE))
2378     {
2379         return 0;
2380     }
2381 
2382     /*
2383       Update and delete code.
2384       Note, we may be updating/deleting a different table,
2385       and the current one is only needed for reading,
2386       e.g. cstab1 is needed for reading in this example:
2387 
2388       UPDATE innotab1 SET a=100 WHERE a NOT IN (SELECT a FROM cstab1 WHERE a=1);
2389     */
2390     if (!isReadOnly() && // make sure the current table is being modified
2391         (thd->lex->sql_command == SQLCOM_UPDATE ||
2392          thd->lex->sql_command == SQLCOM_DELETE ||
2393          thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
2394          thd->lex->sql_command == SQLCOM_UPDATE_MULTI))
2395         return doUpdateDelete(thd, gwi, condStack);
2396 
2397     uint32_t sessionID = tid2sid(thd->thread_id);
2398     boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
2399     csc->identity(CalpontSystemCatalog::FE);
2400 
2401     if (get_fe_conn_info_ptr() == nullptr)
2402         set_fe_conn_info_ptr((void*)new cal_connection_info());
2403 
2404     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2405 
2406     idbassert(ci != 0);
2407 
2408     if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
2409     {
2410         force_close_fep_conn(thd, ci);
2411         return 0;
2412     }
2413 
2414     sm::tableid_t tableid = 0;
2415     cal_table_info ti;
2416     sm::cpsm_conhdl_t* hndl;
2417     SCSEP csep;
2418 
2419     // update traceFlags according to the autoswitch state.
2420     ci->traceFlags |= CalpontSelectExecutionPlan::TRACE_TUPLE_OFF;
2421 
2422     bool localQuery = get_local_query(thd);
2423 
2424     // table mode
2425     {
2426         ti = ci->tableMap[table];
2427 
2428         // get connection handle for this table handler
2429         // re-establish table handle
2430         if (ti.conn_hndl)
2431         {
2432             sm::sm_cleanup(ti.conn_hndl);
2433             ti.conn_hndl = 0;
2434         }
2435 
2436         sm::sm_init(sessionID, &ti.conn_hndl, localQuery);
2437         ti.conn_hndl->csc = csc;
2438         hndl = ti.conn_hndl;
2439 
2440         try
2441         {
2442             ti.conn_hndl->connect();
2443         }
2444         catch (...)
2445         {
2446             setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
2447             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2448             goto error;
2449         }
2450 
2451         // get filter plan for table
2452         if (ti.csep.get() == 0)
2453         {
2454             ti.csep.reset(new CalpontSelectExecutionPlan());
2455 
2456             SessionManager sm;
2457             BRM::TxnID txnID;
2458             txnID = sm.getTxnID(sessionID);
2459 
2460             if (!txnID.valid)
2461             {
2462                 txnID.id = 0;
2463                 txnID.valid = true;
2464             }
2465 
2466             QueryContext verID;
2467             verID = sm.verID();
2468 
2469             ti.csep->txnID(txnID.id);
2470             ti.csep->verID(verID);
2471             ti.csep->sessionID(sessionID);
2472 
2473             if (thd->db.length)
2474                 ti.csep->schemaName(thd->db.str, lower_case_table_names);
2475 
2476             ti.csep->traceFlags(ci->traceFlags);
2477             ti.msTablePtr = table;
2478 
2479             // send plan whenever rnd_init is called
2480             cp_get_table_plan(thd, ti.csep, ti);
2481         }
2482 
2483         IDEBUG( cerr << table->s->table_name.str << " send plan:" << endl );
2484         IDEBUG( cerr << *ti.csep << endl );
2485         csep = ti.csep;
2486 
2487         // for ExeMgr logging sqltext. only log once for the query although multi plans may be sent
2488         // CS adds the ti into TM in the end of rnd_init thus we log the SQL
2489         // only once when there is no ti with csep.
2490         if (onlyOneTableinTM(ci))
2491         {
2492             ti.csep->data(idb_mysql_query_str(thd));
2493         }
2494         else
2495         {
2496             ti.csep->data("<part of the query executed in table mode>");
2497         }
2498     }
2499 
2500     {
2501         ByteStream msg;
2502         ByteStream emsgBs;
2503 
2504         while (true)
2505         {
2506             try
2507             {
2508                 ByteStream::quadbyte qb = 4;
2509                 msg << qb;
2510                 hndl->exeMgr->write(msg);
2511                 msg.restart();
2512                 csep->rmParms(ci->rmParms);
2513 
2514                 //send plan
2515                 csep->serialize(msg);
2516                 hndl->exeMgr->write(msg);
2517 
2518                 //get ExeMgr status back to indicate a vtable joblist success or not
2519                 msg.restart();
2520                 emsgBs.restart();
2521                 msg = hndl->exeMgr->read();
2522                 emsgBs = hndl->exeMgr->read();
2523                 string emsg;
2524 
2525                 if (msg.length() == 0 || emsgBs.length() == 0)
2526                 {
2527                     emsg = "Lost connection to ExeMgr. Please contact your administrator";
2528                     setError(thd, ER_INTERNAL_ERROR, emsg);
2529                     return ER_INTERNAL_ERROR;
2530                 }
2531 
2532                 string emsgStr;
2533                 emsgBs >> emsgStr;
2534                 bool err = false;
2535 
2536                 if (msg.length() == 4)
2537                 {
2538                     msg >> qb;
2539 
2540                     if (qb != 0)
2541                     {
2542                         err = true;
2543                         // for makejoblist error, stats contains only error code and insert from here
2544                         // because table fetch is not started
2545                         ci->stats.setEndTime();
2546                         ci->stats.fQuery = csep->data();
2547                         ci->stats.fQueryType = csep->queryType();
2548                         ci->stats.fErrorNo = qb;
2549 
2550                         try
2551                         {
2552                             ci->stats.insert();
2553                         }
2554                         catch (std::exception& e)
2555                         {
2556                             string msg = string("Columnstore Query Stats - ") + e.what();
2557                             push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
2558                         }
2559                     }
2560                 }
2561                 else
2562                 {
2563                     err = true;
2564                 }
2565 
2566                 if (err)
2567                 {
2568                     setError(thd, ER_INTERNAL_ERROR, emsgStr);
2569                     return ER_INTERNAL_ERROR;
2570                 }
2571 
2572                 ci->rmParms.clear();
2573 
2574                 ci->tableMap[table] = ti;
2575 
2576                 break;
2577             }
2578             catch (...)
2579             {
2580                 sm::sm_cleanup(hndl);
2581                 hndl = 0;
2582 
2583                 sm::sm_init(sessionID, &hndl, localQuery);
2584                 idbassert(hndl != 0);
2585                 hndl->csc = csc;
2586 
2587                 ti.conn_hndl = hndl;
2588 
2589                 try
2590                 {
2591                     hndl->connect();
2592                 }
2593                 catch (...)
2594                 {
2595                     setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
2596                     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2597                     goto error;
2598                 }
2599 
2600                 msg.restart();
2601             }
2602         }
2603     }
2604 
2605     // common path for both vtable select phase and table mode -- open scan handle
2606     ti = ci->tableMap[table];
2607     ti.msTablePtr = table;
2608 
2609     if (ti.tpl_ctx == nullptr)
2610     {
2611         ti.tpl_ctx = new sm::cpsm_tplh_t();
2612         ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
2613     }
2614 
2615     // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
2616     // call rnd_init for a table more than once.
2617     ti.tpl_scan_ctx->rowGroup = nullptr;
2618 
2619     try
2620     {
2621         tableid = execplan::IDB_VTABLE_ID;
2622     }
2623     catch (...)
2624     {
2625         string emsg = "No table ID found for table " + string(table->s->table_name.str);
2626         setError(thd, ER_INTERNAL_ERROR, emsg);
2627         CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2628         goto internal_error;
2629     }
2630 
2631     try
2632     {
2633         sm::tpl_open(tableid, ti.tpl_ctx, hndl);
2634         sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
2635     }
2636     catch (std::exception& e)
2637     {
2638         string emsg = "table can not be opened: " + string(e.what());
2639         setError(thd, ER_INTERNAL_ERROR, emsg);
2640         CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2641         goto internal_error;
2642     }
2643     catch (...)
2644     {
2645         string emsg = "table can not be opened";
2646         setError(thd, ER_INTERNAL_ERROR, emsg);
2647         CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2648         goto internal_error;
2649     }
2650 
2651     ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
2652 
2653     if ((ti.tpl_scan_ctx->ctp).size() == 0)
2654     {
2655         uint32_t num_attr = table->s->fields;
2656 
2657         for (uint32_t i = 0; i < num_attr; i++)
2658         {
2659             CalpontSystemCatalog::ColType ctype;
2660             ti.tpl_scan_ctx->ctp.push_back(ctype);
2661         }
2662 
2663         // populate coltypes here for table mode because tableband gives treeoid for dictionary column
2664         {
2665             CalpontSystemCatalog::RIDList oidlist = csc->columnRIDs(make_table(table->s->db.str, table->s->table_name.str, lower_case_table_names), true);
2666 
2667             if (oidlist.size() != num_attr)
2668             {
2669                 string emsg = "Size mismatch probably caused by front end out of sync";
2670                 setError(thd, ER_INTERNAL_ERROR, emsg);
2671                 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
2672                 goto internal_error;
2673             }
2674 
2675             for (unsigned int j = 0; j < oidlist.size(); j++)
2676             {
2677                 CalpontSystemCatalog::ColType ctype = csc->colType(oidlist[j].objnum);
2678                 ti.tpl_scan_ctx->ctp[ctype.colPosition] = ctype;
2679                 ti.tpl_scan_ctx->ctp[ctype.colPosition].colPosition = -1;
2680             }
2681         }
2682     }
2683 
2684     ci->tableMap[table] = ti;
2685     return 0;
2686 
2687 error:
2688     // CS doesn't need to close the actual sockets
2689     // b/c it tries to reuse it running next query.
2690     if (ci->cal_conn_hndl)
2691     {
2692         sm::sm_cleanup(ci->cal_conn_hndl);
2693         ci->cal_conn_hndl = 0;
2694     }
2695 
2696     return ER_INTERNAL_ERROR;
2697 
2698 internal_error:
2699 
2700     if (ci->cal_conn_hndl)
2701     {
2702         sm::sm_cleanup(ci->cal_conn_hndl);
2703         ci->cal_conn_hndl = 0;
2704     }
2705 
2706     return ER_INTERNAL_ERROR;
2707 }
2708 
ha_mcs_impl_rnd_next(uchar * buf,TABLE * table)2709 int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table)
2710 {
2711     THD* thd = current_thd;
2712 
2713     if (thd->slave_thread && !get_replication_slave(thd) && (
2714                 thd->lex->sql_command == SQLCOM_INSERT ||
2715                 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
2716                 thd->lex->sql_command == SQLCOM_UPDATE ||
2717                 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
2718                 thd->lex->sql_command == SQLCOM_DELETE ||
2719                 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
2720                 thd->lex->sql_command == SQLCOM_TRUNCATE ||
2721                 thd->lex->sql_command == SQLCOM_LOAD))
2722         return HA_ERR_END_OF_FILE;
2723 
2724     if (((thd->lex)->sql_command == SQLCOM_UPDATE)  || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
2725             ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
2726         return HA_ERR_END_OF_FILE;
2727 
2728     // @bug 2547
2729     // MCOL-2178 This variable can never be true in the scope of this function
2730     //    if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
2731     //        return HA_ERR_END_OF_FILE;
2732 
2733     if (get_fe_conn_info_ptr() == nullptr)
2734         set_fe_conn_info_ptr((void*)new cal_connection_info());
2735 
2736     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2737 
2738     if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
2739     {
2740         force_close_fep_conn(thd, ci);
2741         return 0;
2742     }
2743 
2744     if (ci->alterTableState > 0) return HA_ERR_END_OF_FILE;
2745 
2746     cal_table_info ti;
2747     ti = ci->tableMap[table];
2748     int rc = HA_ERR_END_OF_FILE;
2749 
2750     if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
2751     {
2752         CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
2753         return ER_INTERNAL_ERROR;
2754     }
2755 
2756     idbassert(ti.msTablePtr == table);
2757 
2758     try
2759     {
2760         rc = fetchNextRow(buf, ti, ci);
2761     }
2762     catch (std::exception& e)
2763     {
2764         string emsg = string("Error while fetching from ExeMgr: ") + e.what();
2765         setError(thd, ER_INTERNAL_ERROR, emsg);
2766         CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
2767         return ER_INTERNAL_ERROR;
2768     }
2769 
2770     ci->tableMap[table] = ti;
2771 
2772     if (rc != 0 && rc != HA_ERR_END_OF_FILE)
2773     {
2774         string emsg;
2775 
2776         // remove this check when all error handling migrated to the new framework.
2777         if (rc >= 1000)
2778             emsg = ti.tpl_scan_ctx->errMsg;
2779         else
2780         {
2781             logging::ErrorCodes errorcodes;
2782             emsg = errorcodes.errorString(rc);
2783         }
2784 
2785         setError(thd, ER_INTERNAL_ERROR, emsg);
2786         //setError(thd, ER_INTERNAL_ERROR, "testing");
2787         ci->stats.fErrorNo = rc;
2788         CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
2789         rc = ER_INTERNAL_ERROR;
2790     }
2791 
2792     return rc;
2793 }
2794 
ha_mcs_impl_rnd_end(TABLE * table,bool is_pushdown_hand)2795 int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
2796 {
2797     int rc = 0;
2798     THD* thd = current_thd;
2799 
2800     if (thd->slave_thread && !get_replication_slave(thd) && (
2801                 thd->lex->sql_command == SQLCOM_INSERT ||
2802                 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
2803                 thd->lex->sql_command == SQLCOM_UPDATE ||
2804                 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
2805                 thd->lex->sql_command == SQLCOM_DELETE ||
2806                 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
2807                 thd->lex->sql_command == SQLCOM_TRUNCATE ||
2808                 thd->lex->sql_command == SQLCOM_LOAD))
2809         return 0;
2810 
2811     if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE )
2812         return rc;
2813 
2814     if (((thd->lex)->sql_command == SQLCOM_UPDATE)  ||
2815             ((thd->lex)->sql_command == SQLCOM_DELETE) ||
2816             ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) ||
2817             ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
2818         return rc;
2819 
2820     // MCOL-2178 isUnion member only assigned, never used
2821     //    if (is_pushdown_hand)
2822     //    {
2823     //        MIGR::infinidb_vtable.isUnion = false;
2824     //    }
2825     cal_connection_info* ci = nullptr;
2826 
2827     if (get_fe_conn_info_ptr() != NULL)
2828         ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2829 
2830     if (!ci)
2831     {
2832         set_fe_conn_info_ptr((void*)new cal_connection_info());
2833         ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2834     }
2835 
2836     if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
2837     {
2838         force_close_fep_conn(thd, ci);
2839         // clear querystats because no query stats available for cancelled query
2840         ci->queryStats = "";
2841         return 0;
2842     }
2843 
2844     IDEBUG( cerr << "rnd_end for table " << table->s->table_name.str << endl );
2845 
2846     cal_table_info ti = ci->tableMap[table];
2847     sm::cpsm_conhdl_t* hndl;
2848 
2849     if (!is_pushdown_hand)
2850         hndl = ti.conn_hndl;
2851     else
2852         hndl = ci->cal_conn_hndl;
2853 
2854     if (ti.tpl_ctx)
2855     {
2856         if (ti.tpl_scan_ctx.get())
2857         {
2858             try
2859             {
2860                 sm::tpl_scan_close(ti.tpl_scan_ctx);
2861             }
2862             catch (...)
2863             {
2864                 rc = ER_INTERNAL_ERROR;
2865             }
2866         }
2867 
2868         ti.tpl_scan_ctx.reset();
2869 
2870         try
2871         {
2872             {
2873                 bool ask_4_stats = (ci->traceFlags) ? true : false;
2874                 sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, ask_4_stats);
2875             }
2876 
2877             // set conn hndl back. could be changed in tpl_close
2878             if (!is_pushdown_hand)
2879                 ti.conn_hndl = hndl;
2880             else
2881                 ci->cal_conn_hndl = hndl;
2882 
2883             ti.tpl_ctx = 0;
2884         }
2885         catch (IDBExcept& e)
2886         {
2887             if (e.errorCode() == ERR_CROSS_ENGINE_CONNECT || e.errorCode() == ERR_CROSS_ENGINE_CONFIG)
2888             {
2889                 string msg = string("Columnstore Query Stats - ") + e.what();
2890                 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
2891             }
2892             else
2893             {
2894                 setError(thd, ER_INTERNAL_ERROR, e.what());
2895                 rc = ER_INTERNAL_ERROR;
2896             }
2897         }
2898         catch (std::exception& e)
2899         {
2900             setError(thd, ER_INTERNAL_ERROR, e.what());
2901             rc = ER_INTERNAL_ERROR;
2902         }
2903         catch (...)
2904         {
2905             setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in rnd_end");
2906             rc = ER_INTERNAL_ERROR;
2907         }
2908     }
2909 
2910     ti.tpl_ctx = 0;
2911 
2912     ci->tableMap[table] = ti;
2913 
2914     // push warnings from CREATE phase
2915     if (!ci->warningMsg.empty())
2916         push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, ci->warningMsg.c_str());
2917 
2918     ci->warningMsg.clear();
2919     // reset expressionId just in case
2920     ci->expressionId = 0;
2921 
2922     thd_set_ha_data(thd, mcs_hton, reinterpret_cast<void*>(ci));
2923 
2924     return rc;
2925 }
2926 
ha_mcs_impl_create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)2927 int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO* create_info)
2928 {
2929     THD* thd = current_thd;
2930 
2931     if (get_fe_conn_info_ptr() == nullptr)
2932         set_fe_conn_info_ptr((void*)new cal_connection_info());
2933 
2934     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2935 
2936     //@Bug 1948. Mysql calls create table to create a new table with new signature.
2937     if (ci->alterTableState > 0) return 0;
2938 
2939     // Just to be sure
2940     if (!table_arg)
2941     {
2942         setError(thd, ER_INTERNAL_ERROR, "ha_mcs_impl_create_: table_arg is NULL");
2943         return 1;
2944     }
2945 
2946     if (!table_arg->s)
2947     {
2948         setError(thd, ER_INTERNAL_ERROR, "ha_mcs_impl_create_: table_arg->s is NULL");
2949         return 1;
2950     }
2951 
2952     int rc = ha_mcs_impl_create_(name, table_arg, create_info, *ci);
2953 
2954     return rc;
2955 }
2956 
ha_mcs_impl_delete_table(const char * name)2957 int ha_mcs_impl_delete_table(const char* name)
2958 {
2959     THD* thd = current_thd;
2960     char* dbName = nullptr;
2961 
2962     if (!name)
2963     {
2964         setError(thd, ER_INTERNAL_ERROR, "Drop Table with NULL name not permitted");
2965         return 1;
2966     }
2967 
2968     //if this is an InfiniDB tmp table ('#sql*.frm') just leave...
2969     if (!memcmp((uchar*)name, tmp_file_prefix, tmp_file_prefix_length)) return 0;
2970 
2971     if (get_fe_conn_info_ptr() == nullptr)
2972         set_fe_conn_info_ptr((void*)new cal_connection_info());
2973 
2974     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
2975 
2976     if (!thd) return 0;
2977 
2978     if (!thd->lex) return 0;
2979 
2980     if (!idb_mysql_query_str(thd)) return 0;
2981 
2982     if (thd->lex->sql_command == SQLCOM_DROP_DB)
2983     {
2984         dbName = const_cast<char*>(thd->lex->name.str);
2985     }
2986     else
2987     {
2988         TABLE_LIST* first_table = (TABLE_LIST*) thd->lex->first_select_lex()->table_list.first;
2989         dbName = const_cast<char*>(first_table->db.str);
2990     }
2991 
2992     if (!dbName)
2993     {
2994         setError(thd, ER_INTERNAL_ERROR, "Drop Table with NULL schema not permitted");
2995         return 1;
2996     }
2997 
2998     if (!ci) return 0;
2999 
3000     //@Bug 1948,2306. if alter table want to drop the old table, InfiniDB does not need to drop.
3001     if ( ci->isAlter )
3002     {
3003         ci->isAlter = false;
3004         return 0;
3005     }
3006 
3007     int rc = ha_mcs_impl_delete_table_(dbName, name, *ci);
3008     return rc;
3009 }
ha_mcs_impl_write_row(const uchar * buf,TABLE * table,uint64_t rows_changed)3010 int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed)
3011 {
3012     THD* thd = current_thd;
3013 
3014     if (thd->slave_thread && !get_replication_slave(thd))
3015         return 0;
3016 
3017     // Error out INSERT on VIEW. It's currently not supported.
3018     // @note INSERT on VIEW works natually (for simple cases at least), but we choose to turn it off
3019     // for now - ZZ.
3020 
3021     if (thd->lex->query_tables->view)
3022     {
3023         Message::Args args;
3024         args.add("Insert");
3025         string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_VIEW, args);
3026         setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
3027         return ER_CHECK_NOT_IMPLEMENTED;
3028     }
3029 
3030     if (get_fe_conn_info_ptr() == nullptr)
3031         set_fe_conn_info_ptr((void*)new cal_connection_info());
3032 
3033     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3034 
3035     // At the beginning of insert, make sure there are no
3036     // left-over values from a previously possibly failed insert.
3037     if (rows_changed == 0)
3038         ci->tableValuesMap.clear();
3039 
3040     if (ci->alterTableState > 0)
3041         return 0;
3042 
3043     ha_rows rowsInserted = 0;
3044     int rc = 0;
3045 
3046     // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
3047     // transaction or not. User should use this option very carefully since
3048     // cpimport currently does not support rollbacks
3049     if (((ci->useCpimport == 2) ||
3050          ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
3051         (!ci->singleInsert) &&
3052         ((ci->isLoaddataInfile) ||
3053          ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
3054          ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) )
3055     {
3056         rc = ha_mcs_impl_write_batch_row_(buf, table, *ci);
3057     }
3058     else
3059     {
3060         if ( !ci->dmlProc )
3061         {
3062             ci->dmlProc = new MessageQueueClient("DMLProc");
3063             //cout << "write_row starts a client " << ci->dmlProc << " for session " << thd->thread_id << endl;
3064         }
3065 
3066         rc = ha_mcs_impl_write_row_(buf, table, *ci, rowsInserted);
3067 
3068     }
3069 
3070     //@Bug 2438 Added a variable rowsHaveInserted to keep track of how many rows have been inserted already.
3071     if ( !ci->singleInsert && ( rc == 0 ) && ( rowsInserted > 0 ))
3072     {
3073         ci->rowsHaveInserted += rowsInserted;
3074     }
3075 
3076     return rc;
3077 }
3078 
ha_mcs_impl_update_row()3079 int ha_mcs_impl_update_row()
3080 {
3081     if (get_fe_conn_info_ptr() == nullptr)
3082         set_fe_conn_info_ptr((void*)new cal_connection_info());
3083 
3084     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3085     int rc = ci->rc;
3086 
3087     if ( rc != 0 )
3088         ci->rc = 0;
3089 
3090     return ( rc );
3091 }
3092 
ha_mcs_impl_delete_row()3093 int ha_mcs_impl_delete_row()
3094 {
3095     if (get_fe_conn_info_ptr() == nullptr)
3096         set_fe_conn_info_ptr((void*)new cal_connection_info());
3097 
3098     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3099     int rc = ci->rc;
3100 
3101     if ( rc != 0 )
3102         ci->rc = 0;
3103 
3104     return ( rc );
3105 }
3106 
ha_mcs_impl_start_bulk_insert(ha_rows rows,TABLE * table,bool is_cache_insert)3107 void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_insert)
3108 {
3109     THD* thd = current_thd;
3110 
3111     if (thd->slave_thread && !get_replication_slave(thd))
3112         return;
3113 
3114     if (get_fe_conn_info_ptr() == nullptr)
3115         set_fe_conn_info_ptr((void*)new cal_connection_info());
3116 
3117     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3118 
3119     // clear rows variable
3120     ci->rowsHaveInserted = 0;
3121 
3122     if (ci->alterTableState > 0)
3123         return;
3124 
3125     //@bug 5660. Error out DDL/DML on slave node, or on local query node
3126     if (ci->isSlaveNode)
3127     {
3128         string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE);
3129         setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
3130         return;
3131     }
3132 
3133     //@bug 4771. reject REPLACE key word
3134     if ((thd->lex)->sql_command == SQLCOM_REPLACE_SELECT)
3135     {
3136         setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, "REPLACE statement is not supported in Columnstore.");
3137     }
3138 
3139     boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(tid2sid(thd->thread_id));
3140     csc->identity(execplan::CalpontSystemCatalog::FE);
3141 
3142     //@Bug 2515.
3143     //Check command instead of vtable state
3144     if ((thd->lex)->sql_command == SQLCOM_INSERT)
3145     {
3146         string insertStmt = idb_mysql_query_str(thd);
3147         boost::algorithm::to_lower(insertStmt);
3148         string intoStr("into");
3149         size_t found = insertStmt.find(intoStr);
3150 
3151         if (found != string::npos)
3152             insertStmt.erase(found);
3153 
3154         found = insertStmt.find("ignore");
3155 
3156         if (found != string::npos)
3157         {
3158             setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, "IGNORE option in insert statement is not supported in Columnstore.");
3159         }
3160 
3161         if ( rows > 1 )
3162         {
3163             ci->singleInsert = false;
3164         }
3165     }
3166     else if ( (thd->lex)->sql_command == SQLCOM_LOAD || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT)
3167     {
3168         ci->singleInsert = false;
3169         ci->isLoaddataInfile = true;
3170     }
3171 
3172     if (is_cache_insert && (thd->lex)->sql_command != SQLCOM_INSERT_SELECT)
3173     {
3174         ci->isCacheInsert = true;
3175 
3176         if (rows > 1)
3177             ci->singleInsert = false;
3178     }
3179 
3180     ci->bulkInsertRows = rows;
3181 
3182     if ((((thd->lex)->sql_command == SQLCOM_INSERT) ||
3183             ((thd->lex)->sql_command == SQLCOM_LOAD) ||
3184             (thd->lex)->sql_command == SQLCOM_INSERT_SELECT ||
3185             ci->isCacheInsert) && !ci->singleInsert )
3186     {
3187         ci->useCpimport = get_use_import_for_batchinsert(thd);
3188 
3189         if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0))
3190             ci->useCpimport = 0;
3191 
3192         // For now, disable cpimport for cache inserts
3193         if (ci->isCacheInsert)
3194             ci->useCpimport = 0;
3195 
3196         // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
3197         // transaction or not. User should use this option very carefully since
3198         // cpimport currently does not support rollbacks
3199         if ((ci->useCpimport == 2) ||
3200             ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) //If autocommit on batch insert will use cpimport to load data
3201         {
3202             //store table info to connection info
3203             CalpontSystemCatalog::TableName tableName;
3204             tableName.schema = table->s->db.str;
3205             tableName.table = table->s->table_name.str;
3206             ci->useXbit = false;
3207             CalpontSystemCatalog::RIDList colrids;
3208 
3209             try
3210             {
3211                 colrids = csc->columnRIDs(tableName, false, lower_case_table_names);
3212             }
3213             catch (IDBExcept& ie)
3214             {
3215                 // TODO Can't use ERR_UNKNOWN_TABLE because it needs two
3216                 // arguments to format. Update setError to take vararg.
3217 //				setError(thd, ER_UNKNOWN_TABLE, ie.what());
3218                 setError(thd, ER_INTERNAL_ERROR, ie.what());
3219                 ci->rc = 5;
3220                 ci->singleInsert = true;
3221                 return;
3222             }
3223 
3224             ci->useXbit = table->s->db_options_in_use & HA_OPTION_PACK_RECORD;
3225 
3226             // TODO: This needs a proper fix.
3227             if (is_cache_insert)
3228                 ci->useXbit = false;
3229 
3230             //@bug 6122 Check how many columns have not null constraint. columnn with not null constraint will not show up in header.
3231             unsigned int numberNotNull = 0;
3232 
3233             for (unsigned int j = 0; j < colrids.size(); j++)
3234             {
3235                 CalpontSystemCatalog::ColType ctype = csc->colType(colrids[j].objnum);
3236                 ci->columnTypes.push_back(ctype);
3237 
3238                 if ((( ctype.colDataType == CalpontSystemCatalog::VARCHAR ) || ( ctype.colDataType == CalpontSystemCatalog::VARBINARY )) && !ci->useXbit )
3239                     ci->useXbit = true;
3240 
3241                 if (ctype.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
3242                     numberNotNull++;
3243             }
3244 
3245             // The length of the record header is:(1 + number of columns + 7) / 8 bytes
3246             if (ci->useXbit)
3247                 ci->headerLength = (1 + colrids.size() + 7 - 1 - numberNotNull) / 8; //xbit is used
3248             else
3249                 ci->headerLength = (1 + colrids.size() + 7 - numberNotNull) / 8;
3250 
3251             //Log the statement to debug.log
3252             {
3253                 ostringstream oss;
3254                 oss << "Start SQL statement: " << idb_mysql_query_str(thd) << "; |" << table->s->db.str << "|";
3255                 log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3256             }
3257 
3258             //start process cpimport mode 1
3259             ci->mysqld_pid = getpid();
3260 
3261             //get delimiter
3262             if (char(get_import_for_batchinsert_delimiter(thd)) != '\007')
3263                 ci->delimiter = char(get_import_for_batchinsert_delimiter(thd));
3264             else
3265                 ci->delimiter = '\007';
3266 
3267             //get enclosed by
3268             if (char(get_import_for_batchinsert_enclosed_by(thd)) != 8)
3269                 ci->enclosed_by = char(get_import_for_batchinsert_enclosed_by(thd));
3270             else
3271                 ci->enclosed_by = 8;
3272 
3273             //cout << "current set up is usecpimport:delimiter = " << (int)ci->useCpimport<<":"<<	ci->delimiter <<endl;
3274             //set up for cpimport
3275             std::vector<char*> Cmds;
3276             std::string aCmdLine;
3277             std::string aTmpDir(startup::StartUp::tmpDir());
3278 
3279             //If local module type is not PM and Local PM query is set, error out
3280             char escapechar[2] = "";
3281 
3282             if (ci->enclosed_by == 34)	// Double quotes
3283                 strcat(escapechar, "\\");
3284 
3285             if (get_local_query(thd))
3286             {
3287                 const auto oamcache = oam::OamCache::makeOamCache();
3288                 int localModuleId = oamcache->getLocalPMId();
3289 
3290                 if (localModuleId == 0)
3291                 {
3292                     setError(current_thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_LOCAL_QUERY_UM));
3293                     ci->singleInsert = true;
3294                     log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG,
3295                     tid2sid(thd->thread_id));
3296                     return;
3297                 }
3298                 else
3299                 {
3300 #ifdef _MSC_VER
3301                     aCmdLine = "cpimport.exe -N -P " + to_string(localModuleId) + " -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
3302 #else
3303                     aCmdLine = "cpimport -m 1 -N -P " + boost::to_string(localModuleId) + " -s " + ci->delimiter + " -e 0" + " -T " + thd->variables.time_zone->get_name()->ptr() + " -E " + escapechar + ci->enclosed_by + " ";
3304 #endif
3305                 }
3306             }
3307             else
3308             {
3309 #ifdef _MSC_VER
3310                 aCmdLine = "cpimport.exe -N -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
3311 #else
3312                 aCmdLine = std::string("cpimport -m 1 -N -s ") + ci->delimiter + " -e 0" + " -T " + thd->variables.time_zone->get_name()->ptr() + " -E " + escapechar + ci->enclosed_by + " ";
3313 #endif
3314             }
3315 
3316             aCmdLine = aCmdLine + table->s->db.str + " " + table->s->table_name.str ;
3317 
3318             std::istringstream ss(aCmdLine);
3319             std::string arg;
3320             std::vector<std::string> v2(20, "");
3321             unsigned int i = 0;
3322 
3323             while (ss >> arg)
3324             {
3325                 v2[i++] = arg;
3326             }
3327 
3328             for (unsigned int j = 0; j < i; ++j)
3329             {
3330                 Cmds.push_back(const_cast<char*>(v2[j].c_str()));
3331             }
3332 
3333             Cmds.push_back(0); //null terminate
3334 
3335 #ifdef _MSC_VER
3336             BOOL bSuccess = false;
3337             BOOL bInitialized = false;
3338             SECURITY_ATTRIBUTES saAttr;
3339             saAttr.nLength = sizeof(SECURITY_ATTRIBUTES);
3340             saAttr.bInheritHandle = TRUE;
3341             saAttr.lpSecurityDescriptor = nullptr;
3342             HANDLE handleList[2];
3343             const char* pSectionMsg;
3344             bSuccess = true;
3345 
3346             // Create a pipe for the child process's STDIN.
3347             if (bSuccess)
3348             {
3349                 pSectionMsg = "Create Stdin";
3350                 bSuccess = CreatePipe(&ci->cpimport_stdin_Rd, &ci->cpimport_stdin_Wr, &saAttr, 65536);
3351 
3352                 // Ensure the write handle to the pipe for STDIN is not inherited.
3353                 if (bSuccess)
3354                 {
3355                     pSectionMsg = "SetHandleInformation(stdin)";
3356                     bSuccess = SetHandleInformation(ci->cpimport_stdin_Wr, HANDLE_FLAG_INHERIT, 0);
3357                 }
3358             }
3359 
3360             // Launch cpimport
3361             LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList = nullptr;
3362             SIZE_T attrSize = 0;
3363             STARTUPINFOEX siStartInfo;
3364 
3365             // To ensure the child only inherits the STDIN and STDOUT Handles, we add a list of
3366             // Handles that can be inherited to the call to CreateProcess
3367             if (bSuccess)
3368             {
3369                 pSectionMsg = "InitializeProcThreadAttributeList(NULL)";
3370                 bSuccess = InitializeProcThreadAttributeList(NULL, 1, 0, &attrSize) ||
3371                            GetLastError() == ERROR_INSUFFICIENT_BUFFER; // Asks how much buffer to alloc
3372             }
3373 
3374             if (bSuccess)
3375             {
3376                 pSectionMsg = "HeapAlloc for AttrList";
3377                 lpAttributeList = reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>
3378                                   (HeapAlloc(GetProcessHeap(), 0, attrSize));
3379                 bSuccess = lpAttributeList != nullptr;
3380             }
3381 
3382             if (bSuccess)
3383             {
3384                 pSectionMsg = "InitializeProcThreadAttributeList";
3385                 bSuccess = InitializeProcThreadAttributeList(lpAttributeList, 1, 0, &attrSize);
3386             }
3387 
3388             if (bSuccess)
3389             {
3390                 pSectionMsg = "UpdateProcThreadAttribute";
3391                 bInitialized = true;
3392                 handleList[0] = ci->cpimport_stdin_Rd;
3393                 bSuccess = UpdateProcThreadAttribute(lpAttributeList,
3394                                                      0, PROC_THREAD_ATTRIBUTE_HANDLE_LIST,
3395                                                      handleList, sizeof(HANDLE), NULL, NULL);
3396             }
3397 
3398             if (bSuccess)
3399             {
3400                 pSectionMsg = "CreateProcess";
3401                 // In order for GenerateConsoleCtrlEvent (used when job is canceled) to work,
3402                 // this process must have a Console, which Services don't have. We create this
3403                 // when we create the child process. Once created, we leave it around for next time.
3404                 // AllocConsole will silently fail if it already exists, so no pain.
3405                 AllocConsole();
3406                 // Set up members of the PROCESS_INFORMATION structure.
3407                 memset(&ci->cpimportProcInfo, 0, sizeof(PROCESS_INFORMATION));
3408 
3409                 // Set up members of the STARTUPINFOEX structure.
3410                 // This structure specifies the STDIN and STDOUT handles for redirection.
3411                 memset(&siStartInfo, 0, sizeof(STARTUPINFOEX));
3412                 siStartInfo.StartupInfo.cb = sizeof(STARTUPINFOEX);
3413                 siStartInfo.lpAttributeList = lpAttributeList;
3414                 siStartInfo.StartupInfo.hStdError = nullptr;
3415                 siStartInfo.StartupInfo.hStdOutput = nullptr;
3416                 siStartInfo.StartupInfo.hStdInput = ci->cpimport_stdin_Rd;
3417                 siStartInfo.StartupInfo.dwFlags |= STARTF_USESTDHANDLES;
3418                 // Create the child process.
3419                 bSuccess = CreateProcess(NULL,         // program. NULL means use command line
3420                                          const_cast<LPSTR>(aCmdLine.c_str()), // command line
3421                                          NULL,          // process security attributes
3422                                          NULL,          // primary thread security attributes
3423                                          TRUE,          // handles are inherited
3424                                          EXTENDED_STARTUPINFO_PRESENT | CREATE_NEW_PROCESS_GROUP, // creation flags
3425                                          NULL,          // use parent's environment
3426                                          NULL,          // use parent's current directory
3427                                          &siStartInfo.StartupInfo,  // STARTUPINFO pointer
3428                                          &ci->cpimportProcInfo);  // receives PROCESS_INFORMATION
3429 
3430             }
3431 
3432             // We need to clean up the memory created by InitializeProcThreadAttributeList
3433             // and HeapAlloc
3434             if (bInitialized)
3435                 DeleteProcThreadAttributeList(lpAttributeList);
3436 
3437             if (lpAttributeList)
3438                 HeapFree(GetProcessHeap(), 0, lpAttributeList);
3439 
3440             if (!bSuccess)
3441             {
3442                 // If an error occurs, Log and return.
3443                 int errnum = GetLastError();
3444                 char errmsg[512];
3445                 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, errmsg, 512, NULL);
3446                 ostringstream oss;
3447                 oss << " : Error in " << pSectionMsg << " (errno-" <<
3448                     errnum << "); " << errmsg;
3449                 setError(current_thd, ER_INTERNAL_ERROR, oss.str());
3450                 ci->singleInsert = true;
3451                 log_this(thd, oss.str(), logging::LOG_TYPE_ERROR, tid2sid(thd->thread_id));
3452                 log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3453                 return;
3454             }
3455 
3456             // Close the read handle that the child is using. We won't be needing this.
3457             CloseHandle(ci->cpimport_stdin_Rd);
3458             // The write functions all want a FILE*
3459             ci->fdt[1] = _open_osfhandle((intptr_t)ci->cpimport_stdin_Wr, _O_APPEND);
3460             ci->filePtr = _fdopen(ci->fdt[1], "w");
3461 #else
3462             long maxFD = -1;
3463             maxFD = sysconf(_SC_OPEN_MAX);
3464 
3465             if (pipe(ci->fdt) == -1)
3466             {
3467                 int errnum = errno;
3468                 ostringstream oss;
3469                 oss << " : Error in creating pipe (errno-" <<
3470                     errnum << "); " << strerror(errnum);
3471                 setError(current_thd, ER_INTERNAL_ERROR, oss.str());
3472                 ci->singleInsert = true;
3473                 log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3474                 return;
3475             }
3476 
3477             //cout << "maxFD = " << maxFD <<endl;
3478             errno = 0;
3479             pid_t aChPid = fork();
3480 
3481             if (aChPid == -1)	//an error caused
3482             {
3483                 int errnum = errno;
3484                 ostringstream oss;
3485                 oss << " : Error in forking cpimport.bin (errno-" <<
3486                     errnum << "); " << strerror(errnum);
3487                 setError(current_thd, ER_INTERNAL_ERROR, oss.str());
3488                 ci->singleInsert = true;
3489                 log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3490                 return;
3491             }
3492             else if (aChPid == 0) // we are in child
3493             {
3494                 for (int i = 0; i < maxFD; i++)
3495                 {
3496                     if (i != ci->fdt[0])
3497                         close(i);
3498                 }
3499 
3500                 errno = 0;
3501 
3502                 if (dup2(ci->fdt[0], 0) < 0)	//make stdin be the reading end of the pipe
3503                 {
3504                     setError(current_thd, ER_INTERNAL_ERROR, "dup2 failed");
3505                     ci->singleInsert = true;
3506                     exit (1);
3507                 }
3508 
3509                 close(ci->fdt[0]);	// will trigger an EOF on stdin
3510                 ci->fdt[0] = -1;
3511                 open("/dev/null", O_WRONLY);
3512                 open("/dev/null", O_WRONLY);
3513                 errno = 0;
3514                 execvp(Cmds[0], &Cmds[0]);	//NOTE - works with full Path
3515 
3516                 int execvErrno = errno;
3517 
3518                 ostringstream oss;
3519                 oss << " : execvp error: cpimport.bin invocation failed; "
3520                     << "(errno-" << errno << "); " << strerror(execvErrno) <<
3521                     "; Check file and try invoking locally.";
3522                 cout << oss.str();
3523 
3524                 setError(current_thd, ER_INTERNAL_ERROR, "Forking process cpimport failed.");
3525                 ci->singleInsert = true;
3526                 log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG,
3527                 tid2sid(thd->thread_id));
3528                 exit(1);
3529             }
3530             else	// parent
3531             {
3532                 ci->filePtr = fdopen(ci->fdt[1], "w");
3533                 ci->cpimport_pid = aChPid;	// This is the child PID
3534                 close(ci->fdt[0]);	//close the READER of PARENT
3535                 ci->fdt[0] = -1;
3536                 // now we can send all the data thru FIFO[1], writer of PARENT
3537             }
3538             // Set read_set used for bulk insertion of Fields inheriting
3539             //from Field_blob|Field_varstring
3540             bitmap_set_all(table->read_set);
3541 #endif
3542         }
3543         else
3544         {
3545             if (!ci->dmlProc)
3546             {
3547                 ci->dmlProc = new MessageQueueClient("DMLProc");
3548             }
3549         }
3550     }
3551 
3552     //Save table oid for commit to use
3553     if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) ||  ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert)
3554     {
3555         // query stats. only collect execution time and rows inserted for insert/load_data_infile
3556         ci->stats.reset();
3557         ci->stats.setStartTime();
3558         if (thd->main_security_ctx.user)
3559         {
3560             ci->stats.fUser = thd->main_security_ctx.user;
3561         }
3562         else
3563         {
3564             ci->stats.fUser = "";
3565         }
3566 
3567         if (thd->main_security_ctx.host)
3568             ci->stats.fHost = thd->main_security_ctx.host;
3569         else if (thd->main_security_ctx.host_or_ip)
3570             ci->stats.fHost = thd->main_security_ctx.host_or_ip;
3571         else
3572             ci->stats.fHost = "unknown";
3573 
3574         ci->stats.fSessionID = thd->thread_id;
3575         ci->stats.fQuery = idb_mysql_query_str(thd);
3576 
3577         try
3578         {
3579             ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
3580         }
3581         catch (std::exception& e)
3582         {
3583             string msg = string("Columnstore User Priority - ") + e.what();
3584             push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
3585         }
3586 
3587         if ((thd->lex)->sql_command == SQLCOM_INSERT)
3588             ci->stats.fQueryType = CalpontSelectExecutionPlan::queryTypeToString(CalpontSelectExecutionPlan::INSERT);
3589         else if ((thd->lex)->sql_command == SQLCOM_LOAD)
3590             ci->stats.fQueryType = CalpontSelectExecutionPlan::queryTypeToString(CalpontSelectExecutionPlan::LOAD_DATA_INFILE);
3591 
3592         //@Bug 4387. Check BRM status before start statement.
3593         boost::scoped_ptr<DBRM> dbrmp(new DBRM());
3594         int rc = dbrmp->isReadWrite();
3595 
3596         if (rc != 0 )
3597         {
3598             setError(current_thd, ER_READ_ONLY_MODE, "Cannot execute the statement. DBRM is read only!");
3599             ci->rc = rc;
3600             ci->singleInsert = true;
3601             bitmap_clear_all(table->read_set);
3602             return;
3603         }
3604 
3605         uint32_t stateFlags;
3606         dbrmp->getSystemState(stateFlags);
3607 
3608         if (stateFlags & SessionManagerServer::SS_SUSPENDED)
3609         {
3610             setError(current_thd, ER_INTERNAL_ERROR, "Writing to the database is disabled.");
3611             bitmap_clear_all(table->read_set);
3612             return;
3613         }
3614 
3615         CalpontSystemCatalog::TableName tableName;
3616         tableName.schema = table->s->db.str;
3617         tableName.table = table->s->table_name.str;
3618 
3619         try
3620         {
3621             CalpontSystemCatalog::ROPair roPair = csc->tableRID(tableName, lower_case_table_names);
3622             ci->tableOid = roPair.objnum;
3623         }
3624         catch (IDBExcept& ie)
3625         {
3626             setError(thd, ER_INTERNAL_ERROR, ie.what());
3627             bitmap_clear_all(table->read_set);
3628         }
3629         catch (std::exception& ex)
3630         {
3631             bitmap_clear_all(table->read_set);
3632             setError(thd, ER_INTERNAL_ERROR,
3633                      logging::IDBErrorInfo::instance()->errorMsg(ERR_SYSTEM_CATALOG) + ex.what());
3634         }
3635     }
3636 
3637     if ( ci->rc != 0 )
3638         ci->rc = 0;
3639 }
3640 
3641 
3642 
ha_mcs_impl_end_bulk_insert(bool abort,TABLE * table)3643 int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
3644 {
3645     // Clear read_set used for bulk insertion of Fields inheriting
3646     //from Field_blob|Field_varstring
3647     bitmap_clear_all(table->read_set);
3648     THD* thd = current_thd;
3649 
3650     if (thd->slave_thread && !get_replication_slave(thd))
3651         return 0;
3652 
3653     std::string aTmpDir(startup::StartUp::tmpDir());
3654 
3655     if (get_fe_conn_info_ptr() == nullptr)
3656         set_fe_conn_info_ptr((void*)new cal_connection_info());
3657 
3658     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3659 
3660     int rc = 0;
3661 
3662     if (ci->rc == 5) //read only dbrm
3663         return rc;
3664 
3665     // @bug 2378. do not enter for select, reset singleInsert flag after multiple insert.
3666     // @bug 2515. Check command intead of vtable state
3667     if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) ||  ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || ci->isCacheInsert) && !ci->singleInsert )
3668     {
3669         if (((ci->useCpimport == 2) ||
3670              ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
3671             (!ci->singleInsert) &&
3672             ((ci->isLoaddataInfile) ||
3673              ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
3674              ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) )
3675         {
3676 #ifdef _MSC_VER
3677             if (thd->killed > 0)
3678             {
3679                 errno = 0;
3680                 // GenerateConsoleCtrlEvent sends a signal to cpimport
3681                 BOOL brtn = GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, ci->cpimportProcInfo.dwProcessId);
3682 
3683                 if (!brtn)
3684                 {
3685                     int errnum = GetLastError();
3686                     char errmsg[512];
3687                     FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, errmsg, 512, NULL);
3688                     ostringstream oss;
3689                     oss << "GenerateConsoleCtrlEvent: (errno-" << errnum << "); " << errmsg;
3690                     log_this(thd, oss.str(), logging::LOG_TYPE_DEBUG,0);
3691                 }
3692 
3693                 // Close handles to the cpimport process and its primary thread.
3694                 fclose (ci->filePtr);
3695                 ci->filePtr = 0;
3696                 ci->fdt[1] = -1;
3697                 CloseHandle(ci->cpimportProcInfo.hProcess);
3698                 CloseHandle(ci->cpimportProcInfo.hThread);
3699                 WaitForSingleObject(ci->cpimportProcInfo.hProcess, INFINITE);
3700             }
3701 
3702 #else
3703 
3704             if ( (thd->killed > 0) && (ci->cpimport_pid > 0) ) //handle CTRL-C
3705             {
3706                 //cout << "sending ctrl-c to cpimport" << endl;
3707                 errno = 0;
3708                 kill( ci->cpimport_pid, SIGUSR1 );
3709                 fclose (ci->filePtr);
3710                 ci->filePtr = 0;
3711                 ci->fdt[1] = -1;
3712                 int aStatus;
3713                 waitpid(ci->cpimport_pid, &aStatus, 0); // wait until cpimport finishs
3714             }
3715 
3716 #endif
3717             else
3718             {
3719                 //tear down cpimport
3720 #ifdef _MSC_VER
3721                 fclose (ci->filePtr);
3722                 ci->filePtr = 0;
3723                 ci->fdt[1] = -1;
3724                 DWORD exitCode;
3725                 WaitForSingleObject(ci->cpimportProcInfo.hProcess, INFINITE);
3726                 GetExitCodeProcess(ci->cpimportProcInfo.hProcess, &exitCode);
3727 
3728                 if (exitCode != 0)
3729                 {
3730                     rc = 1;
3731                     setError(thd, ER_INTERNAL_ERROR, "load failed. The detailed error information is listed in InfiniDBLog.txt.");
3732                 }
3733 
3734                 // Close handles to the cpimport process and its primary thread.
3735                 CloseHandle(ci->cpimportProcInfo.hProcess);
3736                 CloseHandle(ci->cpimportProcInfo.hThread);
3737 #else
3738                 fclose (ci->filePtr);
3739                 ci->filePtr = 0;
3740                 ci->fdt[1] = -1;
3741                 int aStatus;
3742                 pid_t aPid = waitpid(ci->cpimport_pid, &aStatus, 0); // wait until cpimport finishs
3743 
3744                 if ((aPid == ci->cpimport_pid) || (aPid == -1))
3745                 {
3746                     ci->cpimport_pid = 0;
3747 
3748                     if ((WIFEXITED(aStatus)) && (WEXITSTATUS(aStatus) == 0))
3749                     {
3750                         //cout << "\tCpimport exit on success" << endl;
3751                     }
3752                     else
3753                     {
3754                         if (WEXITSTATUS(aStatus) == 2)
3755                         {
3756                             rc = 1;
3757                             ifstream dmlFile;
3758                             ostringstream oss;
3759                             oss << aTmpDir << ci->tableOid << ".txt";
3760                             dmlFile.open(oss.str().c_str());
3761 
3762                             if (dmlFile.is_open())
3763                             {
3764                                 string line;
3765                                 getline(dmlFile, line);
3766                                 setError(thd, ER_INTERNAL_ERROR, line);
3767                                 dmlFile.close();
3768                                 remove (oss.str().c_str());
3769                             }
3770                         }
3771                         else
3772                         {
3773                             rc = 1;
3774                             ifstream dmlFile;
3775                             ostringstream oss;
3776                             oss << aTmpDir << ci->tableOid << ".txt";
3777                             dmlFile.open(oss.str().c_str());
3778 
3779                             if (dmlFile.is_open())
3780                             {
3781                                 string line;
3782                                 getline(dmlFile, line);
3783                                 setError(thd, ER_INTERNAL_ERROR, line);
3784                                 dmlFile.close();
3785                                 remove (oss.str().c_str());
3786                             }
3787                             else
3788                                 setError(thd, ER_INTERNAL_ERROR, "load failed. The detailed error information is listed in err.log.");
3789                         }
3790                     }
3791                 }
3792 
3793 #endif
3794                 if ( rc == 0)
3795                 {
3796                     log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3797                 }
3798                 else
3799                 {
3800                     log_this(thd, "End SQL statement with error", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
3801                 }
3802 
3803                 ci->columnTypes.clear();
3804                 //get extra warning count if any
3805                 ifstream dmlFile;
3806                 ostringstream oss;
3807                 oss << aTmpDir << ci->tableOid << ".txt";
3808                 dmlFile.open(oss.str().c_str());
3809                 int totalWarnCount = 0;
3810                 int colWarns = 0;
3811                 string line;
3812 
3813                 if (dmlFile.is_open())
3814                 {
3815                     while (getline(dmlFile, line))
3816                     {
3817                         colWarns = atoi(line.c_str());
3818                         totalWarnCount += colWarns;
3819                     }
3820 
3821                     dmlFile.close();
3822                     remove (oss.str().c_str());
3823 
3824                     for (int i = 0; i < totalWarnCount; i++)
3825                         push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, "Values saturated");
3826                 }
3827             }
3828         }
3829         else
3830         {
3831             if ( thd->killed > 0 )
3832                 abort = true;
3833 
3834             if ( !ci->dmlProc )
3835             {
3836                 ci->dmlProc = new MessageQueueClient("DMLProc");
3837                 //cout << "end_bulk_insert starts a client " << ci->dmlProc << " for session " << thd->thread_id << endl;
3838             }
3839 
3840             if (((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ((thd->lex)->sql_command == SQLCOM_LOAD))
3841                 rc = ha_mcs_impl_write_last_batch(table, *ci, abort);
3842         }
3843     }
3844 
3845     // populate query stats for insert and load data infile. insert select has
3846     // stats entered in sm already
3847     if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
3848         ((thd->lex)->sql_command == SQLCOM_LOAD) ||
3849         ci->isCacheInsert)
3850     {
3851         ci->stats.setEndTime();
3852         ci->stats.fErrorNo = rc;
3853 
3854         if (ci->singleInsert)
3855             ci->stats.fRows = 1;
3856         else
3857             ci->stats.fRows = ci->rowsHaveInserted;
3858 
3859         try
3860         {
3861             ci->stats.insert();
3862         }
3863         catch (std::exception& e)
3864         {
3865             string msg = string("Columnstore Query Stats - ") + e.what();
3866             push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
3867         }
3868     }
3869 
3870     // MCOL-4002 We earlier had these re-initializations set only for
3871     // non-transactions, i.e.:
3872     // !(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
3873     // However, we should be resetting these members anyways.
3874     ci->singleInsert = true; // reset the flag
3875     ci->isLoaddataInfile = false;
3876     ci->isCacheInsert = false;
3877     ci->tableOid = 0;
3878     ci->rowsHaveInserted = 0;
3879     ci->useCpimport = 1;
3880 
3881     return rc;
3882 }
3883 
ha_mcs_impl_commit(handlerton * hton,THD * thd,bool all)3884 int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all)
3885 {
3886     if (get_fe_conn_info_ptr() == nullptr)
3887         set_fe_conn_info_ptr((void*)new cal_connection_info());
3888 
3889     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3890 
3891     if (ci->isAlter)
3892         return 0;
3893 
3894     //@Bug 5823 check if any active transaction for this session
3895     boost::scoped_ptr<DBRM> dbrmp(new DBRM());
3896     BRM::TxnID txnId = dbrmp->getTxnID(tid2sid(thd->thread_id));
3897 
3898     if (!txnId.valid)
3899         return 0;
3900 
3901     if ( !ci->dmlProc )
3902     {
3903         ci->dmlProc = new MessageQueueClient("DMLProc");
3904         //cout << "commit starts a client " << ci->dmlProc << " for session " << thd->thread_id << endl;
3905     }
3906 
3907     int rc = ha_mcs_impl_commit_(hton, thd, all, *ci);
3908     thd->server_status &= ~SERVER_STATUS_IN_TRANS;
3909     ci->singleInsert = true; // reset the flag
3910     ci->isLoaddataInfile = false;
3911     ci->isCacheInsert = false;
3912     ci->tableOid = 0;
3913     ci->rowsHaveInserted = 0;
3914     return rc;
3915 }
3916 
ha_mcs_impl_rollback(handlerton * hton,THD * thd,bool all)3917 int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all)
3918 {
3919     if (get_fe_conn_info_ptr() == nullptr)
3920         set_fe_conn_info_ptr((void*)new cal_connection_info());
3921 
3922     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3923 
3924     if ( !ci->dmlProc )
3925     {
3926 
3927         ci->dmlProc = new MessageQueueClient("DMLProc");
3928     }
3929 
3930     int rc = ha_mcs_impl_rollback_(hton, thd, all, *ci);
3931     ci->singleInsert = true; // reset the flag
3932     ci->isLoaddataInfile = false;
3933     ci->isCacheInsert = false;
3934     ci->tableOid = 0;
3935     ci->rowsHaveInserted = 0;
3936     thd->server_status &= ~SERVER_STATUS_IN_TRANS;
3937     return rc;
3938 }
3939 
ha_mcs_impl_close_connection(handlerton * hton,THD * thd)3940 int ha_mcs_impl_close_connection (handlerton* hton, THD* thd)
3941 {
3942     if (!thd) return 0;
3943 
3944     if (thd->thread_id == 0)
3945         return 0;
3946 
3947     execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
3948 
3949     // MCOL-3247 Use THD::ha_data as a per-plugin per-session
3950     // storage. Filled in external_lock when we remove a lock
3951     // from vtable(lock_type = 2)
3952     // An ugly way. I will use ha_data w/o external_lock.
3953     // This in MCOL-2178
3954     cal_connection_info* ci = nullptr;
3955     if(thd_get_ha_data(thd, hton) != (void*)0x42) // 0x42 is the magic CS sets when setup hton
3956     {
3957         ci = reinterpret_cast<cal_connection_info*>(thd_get_ha_data(thd, hton));
3958     }
3959 
3960     if (!ci) return 0;
3961 
3962     int rc = 0;
3963 
3964     if ( ci->dmlProc )
3965     {
3966         rc = ha_mcs_impl_close_connection_(hton, thd, *ci);
3967         delete ci->dmlProc;
3968         ci->dmlProc = nullptr;
3969     }
3970 
3971     if (ci->cal_conn_hndl)
3972     {
3973         sm::sm_cleanup(ci->cal_conn_hndl);
3974         ci->cal_conn_hndl = 0;
3975     }
3976 
3977     return rc;
3978 }
3979 
ha_mcs_impl_rename_table(const char * from,const char * to)3980 int ha_mcs_impl_rename_table(const char* from, const char* to)
3981 {
3982     IDEBUG( cout << "ha_mcs_impl_rename_table: " << from << " => " << to << endl );
3983 
3984     if (get_fe_conn_info_ptr() == nullptr)
3985         set_fe_conn_info_ptr((void*)new cal_connection_info());
3986 
3987     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
3988 
3989     //@Bug 1948. Alter table call rename table twice
3990     if ( ci->alterTableState == cal_connection_info::ALTER_FIRST_RENAME )
3991     {
3992         ci->alterTableState = cal_connection_info::ALTER_SECOND_RENAME;
3993         IDEBUG( cout << "ha_mcs_impl_rename_table: was in state ALTER_FIRST_RENAME, now in ALTER_SECOND_RENAME" << endl );
3994         return 0;
3995     }
3996     else if (ci->alterTableState == cal_connection_info::ALTER_SECOND_RENAME)
3997     {
3998         ci->alterTableState = cal_connection_info::NOT_ALTER;
3999         IDEBUG( cout << "ha_mcs_impl_rename_table: was in state ALTER_SECOND_RENAME, now in NOT_ALTER" << endl );
4000         return 0;
4001     }
4002 
4003     int rc = ha_mcs_impl_rename_table_(from, to, *ci);
4004     return rc;
4005 }
4006 
ha_mcs_impl_delete_row(const uchar * buf)4007 int ha_mcs_impl_delete_row(const uchar* buf)
4008 {
4009     IDEBUG( cout << "ha_mcs_impl_delete_row" << endl );
4010     return 0;
4011 }
4012 
ha_mcs_impl_cond_push(COND * cond,TABLE * table,std::vector<COND * > & condStack)4013 COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>& condStack)
4014 {
4015     THD* thd = current_thd;
4016 
4017     if (((thd->lex)->sql_command == SQLCOM_UPDATE) ||
4018             ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) ||
4019             ((thd->lex)->sql_command == SQLCOM_DELETE) ||
4020             ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI))
4021     {
4022         condStack.push_back(cond);
4023         return nullptr;
4024     }
4025 
4026     string alias;
4027     alias.assign(table->alias.ptr(), table->alias.length());
4028     IDEBUG( cout << "ha_mcs_impl_cond_push: " << alias << endl );
4029 
4030     if (get_fe_conn_info_ptr() == nullptr)
4031         set_fe_conn_info_ptr((void*)new cal_connection_info());
4032 
4033     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4034 
4035     cal_table_info ti = ci->tableMap[table];
4036 
4037 #ifdef DEBUG_WALK_COND
4038     {
4039         gp_walk_info gwi;
4040         gwi.condPush = true;
4041         gwi.sessionid = tid2sid(thd->thread_id);
4042         cout << "------------------ cond push -----------------------" << endl;
4043         cond->traverse_cond(debug_walk, &gwi, Item::POSTFIX);
4044         cout << "------------------------------------------------\n" << endl;
4045     }
4046 #endif
4047 
4048     if (!ti.csep)
4049     {
4050         if (!ti.condInfo)
4051             ti.condInfo = new gp_walk_info();
4052 
4053         gp_walk_info* gwi = ti.condInfo;
4054         gwi->dropCond = false;
4055         gwi->fatalParseError = false;
4056         gwi->condPush = true;
4057         gwi->thd = thd;
4058         gwi->sessionid = tid2sid(thd->thread_id);
4059         cond->traverse_cond(gp_walk, gwi, Item::POSTFIX);
4060         ci->tableMap[table] = ti;
4061 
4062         if (gwi->fatalParseError)
4063         {
4064             IDEBUG( cout << gwi->parseErrorText << endl );
4065 
4066             if (ti.condInfo)
4067             {
4068                 delete ti.condInfo;
4069                 ti.condInfo = nullptr;
4070                 ci->tableMap[table] = ti;
4071             }
4072 
4073             return cond;
4074         }
4075 
4076         if (gwi->dropCond)
4077         {
4078             return cond;
4079         }
4080         else
4081         {
4082             return nullptr;
4083         }
4084     }
4085 
4086     return cond;
4087 }
4088 
impl_external_lock(THD * thd,TABLE * table,int lock_type)4089 int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type)
4090 {
4091     // @bug 3014. Error out locking table command. IDB does not support it now.
4092     if (thd->lex->sql_command == SQLCOM_LOCK_TABLES)
4093     {
4094         setError(current_thd, ER_CHECK_NOT_IMPLEMENTED,
4095                  logging::IDBErrorInfo::instance()->errorMsg(ERR_LOCK_TABLE));
4096         return ER_CHECK_NOT_IMPLEMENTED;
4097     }
4098 
4099     // @info called for every table at the beginning and at the end of a query.
4100     // used for cleaning up the tableinfo.
4101     string alias;
4102     alias.assign(table->alias.ptr(), table->alias.length());
4103     IDEBUG( cout << "external_lock for " << alias << endl );
4104 
4105     if (get_fe_conn_info_ptr() == nullptr)
4106         set_fe_conn_info_ptr((void*)new cal_connection_info());
4107 
4108     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4109 
4110     if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
4111     {
4112         ci->physTablesList.clear();
4113         ci->tableMap.clear();
4114         force_close_fep_conn(thd, ci);
4115         return 0;
4116     }
4117 
4118     m_lock_type= lock_type;
4119 
4120     CalTableMap::iterator mapiter = ci->tableMap.find(table);
4121     // make sure this is a release lock (2nd) call called in
4122     // the table mode.
4123     if (mapiter != ci->tableMap.end()
4124         && (mapiter->second.condInfo || mapiter->second.csep)
4125         && lock_type == 2)
4126     {
4127         // CS ends up processing query with handlers
4128         // table mode
4129         if (mapiter->second.conn_hndl)
4130         {
4131             if (ci->traceFlags & 1)
4132                 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, 9999, mapiter->second.conn_hndl->queryStats.c_str());
4133 
4134             ci->queryStats = mapiter->second.conn_hndl->queryStats;
4135             ci->extendedStats = mapiter->second.conn_hndl->extendedStats;
4136             ci->miniStats = mapiter->second.conn_hndl->miniStats;
4137             sm::sm_cleanup(mapiter->second.conn_hndl);
4138             mapiter->second.conn_hndl = nullptr;
4139         }
4140 
4141         if (mapiter->second.condInfo)
4142         {
4143             delete mapiter->second.condInfo;
4144             mapiter->second.condInfo = nullptr;
4145         }
4146 
4147         // MCOL-2178 Check for tableMap size to set this only once.
4148         ci->queryState = 0;
4149         // Clean up the tableMap and physTablesList
4150         ci->tableMap.erase(table);
4151         ci->physTablesList.erase(table);
4152         thd->variables.in_subquery_conversion_threshold = IN_SUBQUERY_CONVERSION_THRESHOLD;
4153         restore_optimizer_flags(thd);
4154     }
4155     else
4156     {
4157         if (lock_type == 0)
4158         {
4159             ci->physTablesList.insert(table);
4160             // MCOL-2178 Disable Conversion of Big IN Predicates Into Subqueries
4161             thd->variables.in_subquery_conversion_threshold=~ 0;
4162             // Early optimizer_switch changes to avoid unsupported opt-s.
4163             mutate_optimizer_flags(thd);
4164         }
4165         else if (lock_type == 2)
4166         {
4167             std::set<TABLE*>::iterator iter = ci->physTablesList.find(table);
4168             if (iter != ci->physTablesList.end())
4169             {
4170                 ci->physTablesList.erase(table);
4171             }
4172 
4173             // CS ends up processing query with handlers
4174             if (iter != ci->physTablesList.end() && ci->physTablesList.empty())
4175             {
4176                 if (!ci->cal_conn_hndl)
4177                     return 0;
4178 
4179                 if (ci->traceFlags & 1)
4180                     push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, 9999, ci->cal_conn_hndl->queryStats.c_str());
4181 
4182                 ci->queryStats = ci->cal_conn_hndl->queryStats;
4183                 ci->extendedStats = ci->cal_conn_hndl->extendedStats;
4184                 ci->miniStats = ci->cal_conn_hndl->miniStats;
4185                 ci->queryState = 0;
4186                 // MCOL-3247 Use THD::ha_data as a per-plugin per-session
4187                 // storage for cal_conn_hndl to use it later in close_connection
4188                 thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
4189                 // Clean up all tableMap entries made by cond_push
4190                 for (auto &tme: ci->tableMap)
4191                 {
4192                     if (tme.second.condInfo)
4193                     {
4194                         delete tme.second.condInfo;
4195                         tme.second.condInfo= nullptr;
4196                     }
4197                 }
4198                 ci->tableMap.clear();
4199                 // MCOL-2178 Enable Conversion of Big IN Predicates Into Subqueries
4200                 thd->variables.in_subquery_conversion_threshold = IN_SUBQUERY_CONVERSION_THRESHOLD;
4201                 restore_optimizer_flags(thd);
4202             }
4203 
4204         }
4205     }
4206 
4207     return 0;
4208 }
4209 
4210 // for sorting length exceeds blob limit. Just error out for now.
ha_mcs_impl_rnd_pos(uchar * buf,uchar * pos)4211 int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos)
4212 {
4213     IDEBUG( cout << "ha_mcs_impl_rnd_pos" << endl);
4214     string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_ORDERBY_TOO_BIG);
4215     setError(current_thd, ER_INTERNAL_ERROR, emsg);
4216     return ER_INTERNAL_ERROR;
4217 }
4218 
4219 /*@brief ha_mcs_impl_group_by_init - Get data for MariaDB group_by
4220     pushdown handler */
4221 /***********************************************************
4222  * DESCRIPTION:
4223  * Prepares data for group_by_handler::next_row() calls.
4224  * PARAMETERS:
4225  *    group_hand - group by handler, that preserves initial table and items lists. .
4226  *    table - TABLE pointer The table to save the result set into.
4227  * RETURN:
4228  *    0 if success
4229  *    others if something went wrong whilst getting the result set
4230  ***********************************************************/
ha_mcs_impl_group_by_init(mcs_handler_info * handler_info,TABLE * table)4231 int ha_mcs_impl_group_by_init(mcs_handler_info *handler_info, TABLE* table)
4232 {
4233     ha_mcs_group_by_handler *group_hand=
4234       reinterpret_cast<ha_mcs_group_by_handler*>(handler_info->hndl_ptr);
4235     string tableName = group_hand->table_list->table->s->table_name.str;
4236     IDEBUG( cout << "group_by_init for table " << tableName << endl );
4237     THD* thd = current_thd;
4238 
4239     //check whether the system is ready to process statement.
4240 #ifndef _MSC_VER
4241     static DBRM dbrm(true);
4242     int bSystemQueryReady = dbrm.getSystemQueryReady();
4243 
4244     if (bSystemQueryReady == 0)
4245     {
4246         // Still not ready
4247         setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries");
4248         return ER_INTERNAL_ERROR;
4249     }
4250     else if (bSystemQueryReady < 0)
4251     {
4252         // Still not ready
4253         setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries");
4254         return ER_INTERNAL_ERROR;
4255     }
4256 
4257 #endif
4258 
4259     uint32_t sessionID = tid2sid(thd->thread_id);
4260     boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
4261     csc->identity(CalpontSystemCatalog::FE);
4262 
4263     if (get_fe_conn_info_ptr() == nullptr)
4264         set_fe_conn_info_ptr((void*)new cal_connection_info());
4265 
4266     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4267 
4268     idbassert(ci != 0);
4269 
4270 
4271     if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
4272     {
4273         force_close_fep_conn(thd, ci);
4274         return 0;
4275     }
4276 
4277     sm::tableid_t tableid = 0;
4278     cal_table_info ti;
4279     cal_group_info gi;
4280     sm::cpsm_conhdl_t* hndl;
4281     SCSEP csep;
4282 
4283     bool localQuery = get_local_query(thd);
4284 
4285     {
4286         ci->stats.reset(); // reset query stats
4287         ci->stats.setStartTime();
4288         if (thd->main_security_ctx.user)
4289         {
4290             ci->stats.fUser = thd->main_security_ctx.user;
4291         }
4292         else
4293         {
4294             ci->stats.fUser = "";
4295         }
4296 
4297         if (thd->main_security_ctx.host)
4298             ci->stats.fHost = thd->main_security_ctx.host;
4299         else if (thd->main_security_ctx.host_or_ip)
4300             ci->stats.fHost = thd->main_security_ctx.host_or_ip;
4301         else
4302             ci->stats.fHost = "unknown";
4303 
4304         try
4305         {
4306             ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
4307         }
4308         catch (std::exception& e)
4309         {
4310             string msg = string("Columnstore User Priority - ") + e.what();
4311             ci->warningMsg = msg;
4312         }
4313 
4314         // If the previous query has error and
4315         // this is not a subquery run by the server(MCOL-1601)
4316         // re-establish the connection
4317         if (ci->queryState != 0)
4318         {
4319             if( ci->cal_conn_hndl_st.size() == 0 )
4320                 sm::sm_cleanup(ci->cal_conn_hndl);
4321             ci->cal_conn_hndl = 0;
4322         }
4323 
4324         sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery);
4325         idbassert(ci->cal_conn_hndl != 0);
4326         ci->cal_conn_hndl->csc = csc;
4327         idbassert(ci->cal_conn_hndl->exeMgr != 0);
4328 
4329         try
4330         {
4331             ci->cal_conn_hndl->connect();
4332         }
4333         catch (...)
4334         {
4335             setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
4336             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
4337             goto error;
4338         }
4339 
4340         hndl = ci->cal_conn_hndl;
4341 
4342         ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
4343         if (!csep)
4344             csep.reset(new CalpontSelectExecutionPlan());
4345 
4346         SessionManager sm;
4347         BRM::TxnID txnID;
4348         txnID = sm.getTxnID(sessionID);
4349 
4350         if (!txnID.valid)
4351         {
4352             txnID.id = 0;
4353             txnID.valid = true;
4354         }
4355 
4356         QueryContext verID;
4357         verID = sm.verID();
4358 
4359         csep->txnID(txnID.id);
4360         csep->verID(verID);
4361         csep->sessionID(sessionID);
4362 
4363         if (group_hand->table_list->db.length)
4364             csep->schemaName(group_hand->table_list->db.str, lower_case_table_names);
4365 
4366         csep->traceFlags(ci->traceFlags);
4367 
4368         // MCOL-1052 Send Items lists down to the optimizer.
4369         gi.groupByTables = group_hand->table_list;
4370         gi.groupByFields = group_hand->select;
4371         gi.groupByWhere = group_hand->where;
4372         gi.groupByGroup = group_hand->group_by;
4373         gi.groupByOrder = group_hand->order_by;
4374         gi.groupByHaving = group_hand->having;
4375         gi.groupByDistinct = group_hand->distinct;
4376 
4377         // MCOL-1052 Send pushed conditions here, since server could omit GROUP BY
4378         // items in case of = or IN functions used on GROUP BY columns.
4379         {
4380             CalTableMap::iterator mapiter;
4381             execplan::CalpontSelectExecutionPlan::ColumnMap::iterator colMapIter;
4382             execplan::CalpontSelectExecutionPlan::ColumnMap::iterator condColMapIter;
4383             execplan::ParseTree* ptIt;
4384 
4385             for (TABLE_LIST* tl = gi.groupByTables; tl; tl = tl->next_local)
4386             {
4387                 mapiter = ci->tableMap.find(tl->table);
4388 
4389                 if (mapiter != ci->tableMap.end() && mapiter->second.condInfo != NULL
4390                         && mapiter->second.condInfo->condPush)
4391                 {
4392                     while (!mapiter->second.condInfo->ptWorkStack.empty())
4393                     {
4394                         ptIt = mapiter->second.condInfo->ptWorkStack.top();
4395                         mapiter->second.condInfo->ptWorkStack.pop();
4396                         gi.pushedPts.push_back(ptIt);
4397                     }
4398                 }
4399             }
4400         }
4401         // send plan whenever group_init is called
4402         int status = cp_get_group_plan(thd, csep, gi);
4403 
4404         // Never proceed if status != 0 to avoid empty DA
4405         // crashes on later stages
4406         if (status != 0)
4407             goto internal_error;
4408 
4409         // @bug 2547. don't need to send the plan if it's impossible where for all unions.
4410         // MCOL-2178 commenting the below out since cp_get_group_plan does not modify this variable
4411         // which has a default value of false
4412         //if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
4413         //  return 0;
4414 
4415         string query;
4416         // Set the query text only once if the server executes
4417         // subqueries separately.
4418         if(ci->queryState)
4419             query.assign("<subquery of the previous>");
4420         else
4421             query.assign(thd->query_string.str(), thd->query_string.length());
4422         csep->data(query);
4423 
4424         try
4425         {
4426             csep->priority(	ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
4427         }
4428         catch (std::exception& e)
4429         {
4430             string msg = string("Columnstore User Priority - ") + e.what();
4431             push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
4432         }
4433 
4434 #ifdef PLAN_HEX_FILE
4435         // plan serialization
4436         string tmpDir = aTmpDir + "/li1-plan.hex";
4437 
4438         ifstream ifs(tmpDir);
4439         ByteStream bs1;
4440         ifs >> bs1;
4441         ifs.close();
4442         csep->unserialize(bs1);
4443 #endif
4444 
4445         if (ci->traceFlags & 1)
4446         {
4447             cerr << "---------------- EXECUTION PLAN ----------------" << endl;
4448             cerr << *csep << endl ;
4449             cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
4450         }
4451         else
4452         {
4453             IDEBUG( cout << "---------------- EXECUTION PLAN ----------------" << endl );
4454             IDEBUG( cerr << *csep << endl );
4455             IDEBUG( cout << "-------------- EXECUTION PLAN END --------------\n" << endl );
4456         }
4457     }// end of execution plan generation
4458 
4459     {
4460         ByteStream msg;
4461         ByteStream emsgBs;
4462 
4463         while (true)
4464         {
4465             try
4466             {
4467                 ByteStream::quadbyte qb = 4;
4468                 msg << qb;
4469                 hndl->exeMgr->write(msg);
4470                 msg.restart();
4471                 csep->rmParms(ci->rmParms);
4472 
4473                 //send plan
4474                 csep->serialize(msg);
4475                 hndl->exeMgr->write(msg);
4476 
4477                 //get ExeMgr status back to indicate a vtable joblist success or not
4478                 msg.restart();
4479                 emsgBs.restart();
4480                 msg = hndl->exeMgr->read();
4481                 emsgBs = hndl->exeMgr->read();
4482                 string emsg;
4483 
4484                 if (msg.length() == 0 || emsgBs.length() == 0)
4485                 {
4486                     emsg = "Lost connection to ExeMgr. Please contact your administrator";
4487                     setError(thd, ER_INTERNAL_ERROR, emsg);
4488                     return ER_INTERNAL_ERROR;
4489                 }
4490 
4491                 string emsgStr;
4492                 emsgBs >> emsgStr;
4493                 bool err = false;
4494 
4495                 if (msg.length() == 4)
4496                 {
4497                     msg >> qb;
4498 
4499                     if (qb != 0)
4500                     {
4501                         err = true;
4502                         // for makejoblist error, stats contains only error code and insert from here
4503                         // because table fetch is not started
4504                         ci->stats.setEndTime();
4505                         ci->stats.fQuery = csep->data();
4506                         ci->stats.fQueryType = csep->queryType();
4507                         ci->stats.fErrorNo = qb;
4508 
4509                         try
4510                         {
4511                             ci->stats.insert();
4512                         }
4513                         catch (std::exception& e)
4514                         {
4515                             string msg = string("Columnstore Query Stats - ") + e.what();
4516                             push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
4517                         }
4518                     }
4519                 }
4520                 else
4521                 {
4522                     err = true;
4523                 }
4524 
4525                 if (err)
4526                 {
4527                     setError(thd, ER_INTERNAL_ERROR, emsgStr);
4528                     return ER_INTERNAL_ERROR;
4529                 }
4530 
4531                 ci->rmParms.clear();
4532 
4533                 ci->queryState = 1;
4534 
4535                 break;
4536             }
4537             catch (...)
4538             {
4539                 sm::sm_cleanup(hndl);
4540                 hndl = 0;
4541 
4542                 sm::sm_init(sessionID, &hndl, localQuery);
4543                 idbassert(hndl != 0);
4544                 hndl->csc = csc;
4545 
4546                 ci->cal_conn_hndl = hndl;
4547                 ci->cal_conn_hndl_st.pop();
4548                 ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
4549                 try
4550                 {
4551                     hndl->connect();
4552                 }
4553                 catch (...)
4554                 {
4555                     setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
4556                     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
4557                     goto error;
4558                 }
4559 
4560                 msg.restart();
4561             }
4562         }
4563     }
4564 
4565     // set query state to be in_process. Sometimes mysql calls rnd_init multiple
4566     // times, this makes sure plan only being generated and sent once. It will be
4567     // reset when query finishes in sm::end_query
4568 
4569     // common path for both vtable select phase and table mode -- open scan handle
4570     ti = ci->tableMap[table];
4571     ti.msTablePtr = table;
4572 
4573     {
4574         // MCOL-1601 Using stacks of ExeMgr conn hndls, table and scan contexts.
4575         ti.tpl_ctx = new sm::cpsm_tplh_t();
4576         ti.tpl_ctx_st.push(ti.tpl_ctx);
4577         ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
4578         ti.tpl_scan_ctx_st.push(ti.tpl_scan_ctx);
4579 
4580         // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
4581         // call rnd_init for a table more than once.
4582         ti.tpl_scan_ctx->rowGroup = nullptr;
4583 
4584         try
4585         {
4586             tableid = execplan::IDB_VTABLE_ID;
4587         }
4588         catch (...)
4589         {
4590             string emsg = "No table ID found for table " + string(table->s->table_name.str);
4591             setError(thd, ER_INTERNAL_ERROR, emsg);
4592             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
4593             goto internal_error;
4594         }
4595 
4596         try
4597         {
4598             sm::tpl_open(tableid, ti.tpl_ctx, hndl);
4599             sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
4600         }
4601         catch (std::exception& e)
4602         {
4603             string emsg = "table can not be opened: " + string(e.what());
4604             setError(thd, ER_INTERNAL_ERROR, emsg);
4605             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
4606             goto internal_error;
4607         }
4608         catch (...)
4609         {
4610             string emsg = "table can not be opened";
4611             setError(thd, ER_INTERNAL_ERROR, emsg);
4612             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
4613             goto internal_error;
4614         }
4615 
4616         ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
4617 
4618         if ((ti.tpl_scan_ctx->ctp).size() == 0)
4619         {
4620             uint32_t num_attr = table->s->fields;
4621 
4622             for (uint32_t i = 0; i < num_attr; i++)
4623             {
4624                 CalpontSystemCatalog::ColType ctype;
4625                 ti.tpl_scan_ctx->ctp.push_back(ctype);
4626             }
4627         }
4628     }
4629 
4630     ci->tableMap[table] = ti;
4631     return 0;
4632 
4633 error:
4634 
4635     if (ci->cal_conn_hndl)
4636     {
4637         // end_query() should be called here.
4638         sm::sm_cleanup(ci->cal_conn_hndl);
4639         ci->cal_conn_hndl = 0;
4640     }
4641 
4642     // do we need to close all connection handle of the table map?
4643     return ER_INTERNAL_ERROR;
4644 
4645 internal_error:
4646 
4647     if (ci->cal_conn_hndl)
4648     {
4649         // end_query() should be called here.
4650         sm::sm_cleanup(ci->cal_conn_hndl);
4651         ci->cal_conn_hndl = 0;
4652     }
4653 
4654     return ER_INTERNAL_ERROR;
4655 }
4656 
4657 /*@brief ha_mcs_impl_group_by_next - Return result set for MariaDB group_by
4658     pushdown handler
4659 */
4660 /***********************************************************
4661  * DESCRIPTION:
4662  * Return a result record for each group_by_handler::next_row() call.
4663  * PARAMETERS:
4664  *    group_hand - group by handler, that preserves initial table and items lists. .
4665  *    table - TABLE pointer The table to save the result set in.
4666  * RETURN:
4667  *    0 if success
4668  *    HA_ERR_END_OF_FILE if the record set has come to an end
4669  *    others if something went wrong whilst getting the result set
4670  ***********************************************************/
ha_mcs_impl_group_by_next(TABLE * table)4671 int ha_mcs_impl_group_by_next(TABLE* table)
4672 {
4673     THD* thd = current_thd;
4674 
4675     if (((thd->lex)->sql_command == SQLCOM_UPDATE)  || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
4676             ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
4677         return HA_ERR_END_OF_FILE;
4678 
4679     if (thd->slave_thread && !get_replication_slave(thd) && (
4680                 thd->lex->sql_command == SQLCOM_INSERT ||
4681                 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
4682                 thd->lex->sql_command == SQLCOM_UPDATE ||
4683                 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
4684                 thd->lex->sql_command == SQLCOM_DELETE ||
4685                 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
4686                 thd->lex->sql_command == SQLCOM_TRUNCATE ||
4687                 thd->lex->sql_command == SQLCOM_LOAD))
4688         return HA_ERR_END_OF_FILE;
4689 
4690     // @bug 2547
4691     //  MCOL-2178
4692     //  if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
4693     //    return HA_ERR_END_OF_FILE;
4694 
4695     if (get_fe_conn_info_ptr() == nullptr)
4696         set_fe_conn_info_ptr((void*)new cal_connection_info());
4697 
4698     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4699 
4700     if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
4701     {
4702         force_close_fep_conn(thd, ci);
4703         return 0;
4704     }
4705 
4706     if (ci->alterTableState > 0) return HA_ERR_END_OF_FILE;
4707 
4708     cal_table_info ti;
4709     ti = ci->tableMap[table];
4710     int rc = HA_ERR_END_OF_FILE;
4711 
4712     if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
4713     {
4714         CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
4715         return ER_INTERNAL_ERROR;
4716     }
4717 
4718     idbassert(ti.msTablePtr == table);
4719 
4720     try
4721     {
4722         // fetchNextRow interface forces to use buf.
4723         unsigned char buf;
4724         rc = fetchNextRow(&buf, ti, ci, true);
4725     }
4726     catch (std::exception& e)
4727     {
4728         string emsg = string("Error while fetching from ExeMgr: ") + e.what();
4729         setError(thd, ER_INTERNAL_ERROR, emsg);
4730         CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
4731         return ER_INTERNAL_ERROR;
4732     }
4733 
4734     ci->tableMap[table] = ti;
4735 
4736     if (rc != 0 && rc != HA_ERR_END_OF_FILE)
4737     {
4738         string emsg;
4739 
4740         // remove this check when all error handling migrated to the new framework.
4741         if (rc >= 1000)
4742             emsg = ti.tpl_scan_ctx->errMsg;
4743         else
4744         {
4745             logging::ErrorCodes errorcodes;
4746             emsg = errorcodes.errorString(rc);
4747         }
4748 
4749         setError(thd, ER_INTERNAL_ERROR, emsg);
4750         ci->stats.fErrorNo = rc;
4751         CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
4752         rc = ER_INTERNAL_ERROR;
4753     }
4754 
4755     return rc;
4756 }
4757 
ha_mcs_impl_group_by_end(TABLE * table)4758 int ha_mcs_impl_group_by_end(TABLE* table)
4759 {
4760     int rc = 0;
4761     THD* thd = current_thd;
4762 
4763     if (thd->slave_thread && !get_replication_slave(thd) && (
4764                 thd->lex->sql_command == SQLCOM_INSERT ||
4765                 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
4766                 thd->lex->sql_command == SQLCOM_UPDATE ||
4767                 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
4768                 thd->lex->sql_command == SQLCOM_DELETE ||
4769                 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
4770                 thd->lex->sql_command == SQLCOM_TRUNCATE ||
4771                 thd->lex->sql_command == SQLCOM_LOAD))
4772         return 0;
4773 
4774     // MCOL-2178 isUnion member only assigned, never used
4775     //    MIGR::infinidb_vtable.isUnion = false;
4776 
4777     cal_connection_info* ci = nullptr;
4778 
4779     if (get_fe_conn_info_ptr() != NULL)
4780         ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4781 
4782     if (!ci)
4783     {
4784         set_fe_conn_info_ptr((void*)new cal_connection_info());
4785         ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
4786     }
4787 
4788     if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
4789             ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
4790     {
4791         force_close_fep_conn(thd, ci, true); // with checking prev command rc
4792         return rc;
4793     }
4794 
4795     if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
4796     {
4797         force_close_fep_conn(thd, ci);
4798         // clear querystats because no query stats available for cancelled query
4799         ci->queryStats = "";
4800         // Poping next ExeMgr connection out of the stack
4801         if ( ci->cal_conn_hndl_st.size() )
4802         {
4803             ci->cal_conn_hndl_st.pop();
4804             if ( ci->cal_conn_hndl_st.size() )
4805                 ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
4806         }
4807 
4808         return 0;
4809     }
4810 
4811     IDEBUG( cerr << "group_by_end for table " << table->s->table_name.str << endl );
4812 
4813     cal_table_info ti = ci->tableMap[table];
4814     sm::cpsm_conhdl_t* hndl;
4815     bool clearScanCtx = false;
4816 
4817     hndl = ci->cal_conn_hndl;
4818 
4819     if (ti.tpl_ctx)
4820     {
4821         if (ti.tpl_scan_ctx.get())
4822         {
4823             clearScanCtx = ( (ti.tpl_scan_ctx.get()->rowsreturned) &&
4824                 ti.tpl_scan_ctx.get()->rowsreturned == ti.tpl_scan_ctx.get()->getRowCount() );
4825             try
4826             {
4827                 sm::tpl_scan_close(ti.tpl_scan_ctx);
4828             }
4829             catch (...)
4830             {
4831                 rc = ER_INTERNAL_ERROR;
4832             }
4833         }
4834 
4835         ti.tpl_scan_ctx.reset();
4836         if ( ti.tpl_scan_ctx_st.size() )
4837         {
4838             ti.tpl_scan_ctx_st.pop();
4839             if ( ti.tpl_scan_ctx_st.size() )
4840                 ti.tpl_scan_ctx =  ti.tpl_scan_ctx_st.top();
4841         }
4842         try
4843         {
4844             if(hndl)
4845             {
4846                 {
4847                     bool ask_4_stats = (ci->traceFlags) ? true : false;
4848                     sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, ask_4_stats, clearScanCtx);
4849                 }
4850 // Normaly stats variables are set in external_lock method but we set it here
4851 // since they we pretend we are in vtable_disabled mode and the stats vars won't be set.
4852 // We sum the stats up here since server could run a number of
4853 // queries e.g. each for a subquery in a filter.
4854                 if(hndl)
4855                 {
4856                     if (hndl->queryStats.length())
4857                         ci->queryStats += hndl->queryStats;
4858                     if (hndl->extendedStats.length())
4859                         ci->extendedStats += hndl->extendedStats;
4860                     if (hndl->miniStats.length())
4861                         ci->miniStats += hndl->miniStats;
4862                 }
4863             }
4864 
4865             ci->cal_conn_hndl = hndl;
4866 
4867             ti.tpl_ctx = 0;
4868         }
4869         catch (IDBExcept& e)
4870         {
4871             if (e.errorCode() == ERR_CROSS_ENGINE_CONNECT || e.errorCode() == ERR_CROSS_ENGINE_CONFIG)
4872             {
4873                 string msg = string("Columnstore Query Stats - ") + e.what();
4874                 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
4875             }
4876             else
4877             {
4878                 setError(thd, ER_INTERNAL_ERROR, e.what());
4879                 rc = ER_INTERNAL_ERROR;
4880             }
4881         }
4882         catch (std::exception& e)
4883         {
4884             setError(thd, ER_INTERNAL_ERROR, e.what());
4885             rc = ER_INTERNAL_ERROR;
4886         }
4887         catch (...)
4888         {
4889             setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in group_by_end");
4890             rc = ER_INTERNAL_ERROR;
4891         }
4892     }
4893 
4894     ti.tpl_ctx = 0;
4895 
4896     if ( ti.tpl_ctx_st.size() )
4897     {
4898         ti.tpl_ctx_st.pop();
4899         if ( ti.tpl_ctx_st.size() )
4900             ti.tpl_ctx = ti.tpl_ctx_st.top();
4901     }
4902 
4903     if ( ci->cal_conn_hndl_st.size() )
4904     {
4905         ci->cal_conn_hndl_st.pop();
4906         if ( ci->cal_conn_hndl_st.size() )
4907             ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
4908     }
4909 
4910     ci->tableMap[table] = ti;
4911 
4912     // push warnings from CREATE phase
4913     if (!ci->warningMsg.empty())
4914         push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, ci->warningMsg.c_str());
4915 
4916     ci->warningMsg.clear();
4917     // reset expressionId just in case
4918     ci->expressionId = 0;
4919     return rc;
4920 }
4921 
4922 
4923 /*@brief  Initiate the query for derived_handler           */
4924 /***********************************************************
4925  * DESCRIPTION:
4926  * Execute the query and saves derived table query.
4927  * There is an extra handler argument so I ended up with a
4928  * new init function. The code is a copy of
4929  * ha_mcs_impl_rnd_init() mostly.
4930  * PARAMETERS:
4931  * mcs_handler_info* pnt to an envelope struct
4932  * TABLE* table - dest table to put the results into
4933  * RETURN:
4934  *    rc as int
4935  ***********************************************************/
ha_mcs_impl_pushdown_init(mcs_handler_info * handler_info,TABLE * table)4936 int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
4937 {
4938     IDEBUG( cout << "pushdown_init for table " << endl );
4939     THD* thd = current_thd;
4940 
4941     if (thd->slave_thread && !get_replication_slave(thd) && (
4942                 thd->lex->sql_command == SQLCOM_INSERT ||
4943                 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
4944                 thd->lex->sql_command == SQLCOM_UPDATE ||
4945                 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
4946                 thd->lex->sql_command == SQLCOM_DELETE ||
4947                 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
4948                 thd->lex->sql_command == SQLCOM_TRUNCATE ||
4949                 thd->lex->sql_command == SQLCOM_LOAD))
4950         return 0;
4951 
4952     gp_walk_info gwi;
4953     gwi.thd = thd;
4954     bool err = false;
4955 
4956     //check whether the system is ready to process statement.
4957 #ifndef _MSC_VER
4958     static DBRM dbrm(true);
4959     int bSystemQueryReady = dbrm.getSystemQueryReady();
4960 
4961     if (bSystemQueryReady == 0)
4962     {
4963         // Still not ready
4964         setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries");
4965         return ER_INTERNAL_ERROR;
4966     }
4967     else if (bSystemQueryReady < 0)
4968     {
4969         // Still not ready
4970         setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries");
4971         return ER_INTERNAL_ERROR;
4972     }
4973 #endif
4974 
4975     // Set this to close all outstanding FEP connections on
4976     // client disconnect in handlerton::closecon_handlerton().
4977     if ( !thd_get_ha_data(thd, mcs_hton))
4978     {
4979         thd_set_ha_data(thd, mcs_hton, reinterpret_cast<void*>(0x42));
4980     }
4981 
4982     if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE )
4983     {
4984         return 0;
4985     }
4986 
4987     // MCOL-4023 We need to test this code path.
4988     //Update and delete code
4989     if ( ((thd->lex)->sql_command == SQLCOM_UPDATE)  || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
4990         return doUpdateDelete(thd, gwi, std::vector<COND*>());
4991 
4992     uint32_t sessionID = tid2sid(thd->thread_id);
4993     boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
4994     csc->identity(CalpontSystemCatalog::FE);
4995 
4996     if (!get_fe_conn_info_ptr())
4997         set_fe_conn_info_ptr(reinterpret_cast<void*>(new cal_connection_info(), thd));
4998 
4999     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
5000 
5001     idbassert(ci != 0);
5002 
5003     if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
5004     {
5005         if (ci->cal_conn_hndl)
5006         {
5007             // send ExeMgr a signal before closing the connection
5008             ByteStream msg;
5009             ByteStream::quadbyte qb = 0;
5010             msg << qb;
5011 
5012             try
5013             {
5014                 ci->cal_conn_hndl->exeMgr->write(msg);
5015             }
5016             catch (...)
5017             {
5018                 // canceling query. ignore connection failure.
5019             }
5020 
5021             sm::sm_cleanup(ci->cal_conn_hndl);
5022             ci->cal_conn_hndl = 0;
5023         }
5024 
5025         return 0;
5026     }
5027 
5028     sm::tableid_t tableid = 0;
5029     cal_table_info ti;
5030     sm::cpsm_conhdl_t* hndl;
5031     SCSEP csep;
5032     // Declare handlers ptrs in this scope for future use.
5033     select_handler* sh = nullptr;
5034     derived_handler* dh = nullptr;
5035 
5036     // update traceFlags according to the autoswitch state.
5037     ci->traceFlags = (ci->traceFlags | CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)^
5038                      CalpontSelectExecutionPlan::TRACE_TUPLE_OFF;
5039 
5040     bool localQuery = (get_local_query(thd) > 0 ? true : false);
5041 
5042     {
5043         ci->stats.reset(); // reset query stats
5044         ci->stats.setStartTime();
5045         if (thd->main_security_ctx.user)
5046         {
5047             ci->stats.fUser = thd->main_security_ctx.user;
5048         }
5049         else
5050         {
5051             ci->stats.fUser = "";
5052         }
5053 
5054         if (thd->main_security_ctx.host)
5055             ci->stats.fHost = thd->main_security_ctx.host;
5056         else if (thd->main_security_ctx.host_or_ip)
5057             ci->stats.fHost = thd->main_security_ctx.host_or_ip;
5058         else
5059             ci->stats.fHost = "unknown";
5060 
5061         try
5062         {
5063             ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
5064         }
5065         catch (std::exception& e)
5066         {
5067             string msg = string("Columnstore User Priority - ") + e.what();
5068             ci->warningMsg = msg;
5069         }
5070 
5071         // if the previous query has error, re-establish the connection
5072         if (ci->queryState != 0)
5073         {
5074             sm::sm_cleanup(ci->cal_conn_hndl);
5075             ci->cal_conn_hndl = 0;
5076         }
5077 
5078         sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery);
5079         idbassert(ci->cal_conn_hndl != 0);
5080         ci->cal_conn_hndl->csc = csc;
5081         idbassert(ci->cal_conn_hndl->exeMgr != 0);
5082 
5083         try
5084         {
5085             ci->cal_conn_hndl->connect();
5086         }
5087         catch (...)
5088         {
5089             setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
5090             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5091             goto error;
5092         }
5093 
5094         hndl = ci->cal_conn_hndl;
5095 
5096         IDEBUG( std::cout << idb_mysql_query_str(thd) << std::endl );
5097 
5098         {
5099             if (!csep)
5100                 csep.reset(new CalpontSelectExecutionPlan());
5101 
5102             SessionManager sm;
5103             BRM::TxnID txnID;
5104             txnID = sm.getTxnID(sessionID);
5105 
5106             if (!txnID.valid)
5107             {
5108                 txnID.id = 0;
5109                 txnID.valid = true;
5110             }
5111 
5112             QueryContext verID;
5113             verID = sm.verID();
5114 
5115             csep->txnID(txnID.id);
5116             csep->verID(verID);
5117             csep->sessionID(sessionID);
5118 
5119             if (thd->db.length)
5120                 csep->schemaName(thd->db.str, lower_case_table_names);
5121 
5122             csep->traceFlags(ci->traceFlags);
5123 
5124             // cast the handler and get a plan.
5125             int status = 42;
5126             if (handler_info->hndl_type == mcs_handler_types_t::SELECT)
5127             {
5128                 sh = reinterpret_cast<select_handler*>(handler_info->hndl_ptr);
5129                 status = cs_get_select_plan(sh, thd, csep, gwi);
5130             }
5131             else if (handler_info->hndl_type == DERIVED)
5132             {
5133                 dh = reinterpret_cast<derived_handler*>(handler_info->hndl_ptr);
5134                 status = cs_get_derived_plan(dh, thd, csep, gwi);
5135             }
5136 
5137             // Return an error to avoid MDB crash later in end_statement
5138             if (status != 0)
5139                 goto internal_error;
5140 
5141             string query;
5142             query.assign(idb_mysql_query_str(thd));
5143             csep->data(query);
5144 
5145             try
5146             {
5147                 csep->priority(	ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
5148             }
5149             catch (std::exception& e)
5150             {
5151                 string msg = string("Columnstore User Priority - ") + e.what();
5152                 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
5153             }
5154 
5155 // DRRTUY Make this runtime configureable
5156 #ifdef PLAN_HEX_FILE
5157             // plan serialization
5158             ifstream ifs("/tmp/li1-plan.hex");
5159             ByteStream bs1;
5160             ifs >> bs1;
5161             ifs.close();
5162             csep->unserialize(bs1);
5163 #endif
5164 
5165             if (ci->traceFlags & 1)
5166             {
5167                 cerr << "---------------- EXECUTION PLAN ----------------" << endl;
5168                 cerr << *csep << endl ;
5169                 cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
5170             }
5171             else
5172             {
5173                 IDEBUG( cout << "---------------- EXECUTION PLAN ----------------" << endl );
5174                 IDEBUG( cerr << *csep << endl );
5175                 IDEBUG( cout << "-------------- EXECUTION PLAN END --------------\n" << endl );
5176             }
5177         }
5178     }// end of execution plan generation
5179 
5180     {
5181         ByteStream msg;
5182         ByteStream emsgBs;
5183 
5184         while (true)
5185         {
5186             try
5187             {
5188                 ByteStream::quadbyte qb = 4;
5189                 msg << qb;
5190                 hndl->exeMgr->write(msg);
5191                 msg.restart();
5192                 csep->rmParms(ci->rmParms);
5193 
5194                 //send plan
5195                 csep->serialize(msg);
5196                 hndl->exeMgr->write(msg);
5197 
5198                 //get ExeMgr status back to indicate a vtable joblist success or not
5199                 msg.restart();
5200                 emsgBs.restart();
5201                 msg = hndl->exeMgr->read();
5202                 emsgBs = hndl->exeMgr->read();
5203                 string emsg;
5204 
5205                 if (msg.length() == 0 || emsgBs.length() == 0)
5206                 {
5207                     emsg = "Lost connection to ExeMgr. Please contact your administrator";
5208                     setError(thd, ER_INTERNAL_ERROR, emsg);
5209                     return ER_INTERNAL_ERROR;
5210                 }
5211 
5212                 string emsgStr;
5213                 emsgBs >> emsgStr;
5214 
5215                 if (msg.length() == 4)
5216                 {
5217                     msg >> qb;
5218 
5219                     if (qb != 0)
5220                     {
5221                         err = true;
5222                         // for makejoblist error, stats contains only error code and insert from here
5223                         // because table fetch is not started
5224                         ci->stats.setEndTime();
5225                         ci->stats.fQuery = csep->data();
5226                         ci->stats.fQueryType = csep->queryType();
5227                         ci->stats.fErrorNo = qb;
5228 
5229                         try
5230                         {
5231                             ci->stats.insert();
5232                         }
5233                         catch (std::exception& e)
5234                         {
5235                             string msg = string("Columnstore Query Stats - ") + e.what();
5236                             push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
5237                         }
5238                     }
5239                 }
5240                 else
5241                 {
5242                     err = true;
5243                 }
5244 
5245                 if (err)
5246                 {
5247                     // CS resets error in create_SH() if fallback is enabled
5248                     setError(thd, ER_INTERNAL_ERROR, emsgStr);
5249                     goto internal_error;
5250                 }
5251 
5252                 ci->rmParms.clear();
5253 
5254                 // SH will initiate SM in select_next() only
5255                 if (!sh)
5256                     ci->queryState= sm::QUERY_IN_PROCESS;
5257 
5258                 break;
5259             }
5260             catch (...)
5261             {
5262                 sm::sm_cleanup(hndl);
5263                 hndl = 0;
5264 
5265                 sm::sm_init(sessionID, &hndl, localQuery);
5266                 idbassert(hndl != 0);
5267                 hndl->csc = csc;
5268 
5269                 ci->cal_conn_hndl = hndl;
5270 
5271                 try
5272                 {
5273                     hndl->connect();
5274                 }
5275                 catch (...)
5276                 {
5277                     setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
5278                     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5279                     goto error;
5280                 }
5281 
5282                 msg.restart();
5283             }
5284         }
5285     }
5286 
5287     // set query state to be in_process. Sometimes mysql calls rnd_init multiple
5288     // times, this makes sure plan only being generated and sent once. It will be
5289     // reset when query finishes in sm::end_query
5290 
5291     // common path for both vtable select phase and table mode -- open scan handle
5292     ti = ci->tableMap[table];
5293     // This is the server's temp table for the result.
5294     if(sh)
5295     {
5296         ti.msTablePtr = sh->table;
5297     }
5298     else
5299     {
5300         ti.msTablePtr = dh->table;
5301     }
5302 
5303     // For SH CS creates SM environment inside select_next().
5304     // This allows us to try and fail with SH.
5305     if (!sh)
5306     {
5307         if (ti.tpl_ctx == 0)
5308         {
5309             ti.tpl_ctx = new sm::cpsm_tplh_t();
5310             ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
5311         }
5312 
5313         // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
5314         // call rnd_init for a table more than once.
5315         ti.tpl_scan_ctx->rowGroup = nullptr;
5316 
5317         try
5318         {
5319             tableid = execplan::IDB_VTABLE_ID;
5320         }
5321         catch (...)
5322         {
5323             string emsg = "No table ID found for table " + string(table->s->table_name.str);
5324             setError(thd, ER_INTERNAL_ERROR, emsg);
5325             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5326             goto internal_error;
5327         }
5328 
5329         try
5330         {
5331             sm::tpl_open(tableid, ti.tpl_ctx, hndl);
5332             sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
5333         }
5334         catch (std::exception& e)
5335         {
5336             string emsg = "table can not be opened: " + string(e.what());
5337             setError(thd, ER_INTERNAL_ERROR, emsg);
5338             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5339             goto internal_error;
5340         }
5341         catch (...)
5342         {
5343             string emsg = "table can not be opened";
5344             setError(thd, ER_INTERNAL_ERROR, emsg);
5345             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5346             goto internal_error;
5347         }
5348 
5349         ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
5350 
5351         if ((ti.tpl_scan_ctx->ctp).size() == 0)
5352         {
5353             uint32_t num_attr = table->s->fields;
5354 
5355             for (uint32_t i = 0; i < num_attr; i++)
5356             {
5357                 CalpontSystemCatalog::ColType ctype;
5358                 ti.tpl_scan_ctx->ctp.push_back(ctype);
5359             }
5360         }
5361         ci->tableMap[table] = ti;
5362     }
5363 
5364     return 0;
5365 
5366 error:
5367 
5368     if (ci->cal_conn_hndl)
5369     {
5370         sm::sm_cleanup(ci->cal_conn_hndl);
5371         ci->cal_conn_hndl = 0;
5372     }
5373 
5374     // do we need to close all connection handle of the table map
5375     return ER_INTERNAL_ERROR;
5376 
5377 internal_error:
5378 
5379     if (ci->cal_conn_hndl)
5380     {
5381         sm::sm_cleanup(ci->cal_conn_hndl);
5382         ci->cal_conn_hndl = 0;
5383     }
5384 
5385     return ER_INTERNAL_ERROR;
5386 }
5387 
ha_mcs_impl_select_next(uchar * buf,TABLE * table)5388 int ha_mcs_impl_select_next(uchar* buf, TABLE* table)
5389 {
5390     THD* thd = current_thd;
5391 
5392     if (thd->slave_thread && !get_replication_slave(thd) && (
5393                 thd->lex->sql_command == SQLCOM_INSERT ||
5394                 thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
5395                 thd->lex->sql_command == SQLCOM_UPDATE ||
5396                 thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
5397                 thd->lex->sql_command == SQLCOM_DELETE ||
5398                 thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
5399                 thd->lex->sql_command == SQLCOM_TRUNCATE ||
5400                 thd->lex->sql_command == SQLCOM_LOAD))
5401         return HA_ERR_END_OF_FILE;
5402 
5403     if (((thd->lex)->sql_command == SQLCOM_UPDATE)  || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
5404             ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
5405         return HA_ERR_END_OF_FILE;
5406 
5407     if (get_fe_conn_info_ptr() == nullptr)
5408         set_fe_conn_info_ptr((void*)new cal_connection_info());
5409 
5410     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
5411 
5412     int rc = HA_ERR_END_OF_FILE;
5413 
5414     // @bug 2547
5415     // MCOL-2178 This variable can never be true in the scope of this function
5416     //    if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
5417     //        return HA_ERR_END_OF_FILE;
5418 
5419     if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
5420     {
5421         force_close_fep_conn(thd, ci);
5422         return 0;
5423     }
5424 
5425     if (ci->alterTableState > 0) return rc;
5426 
5427     cal_table_info ti;
5428     ti= ci->tableMap[table];
5429     // This is the server's temp table for the result.
5430     ti.msTablePtr= table;
5431     sm::tableid_t tableid= execplan::IDB_VTABLE_ID;
5432     sm::cpsm_conhdl_t* hndl= ci->cal_conn_hndl;
5433 
5434     if (!ti.tpl_ctx || !ti.tpl_scan_ctx || (hndl && hndl->queryState == sm::NO_QUERY))
5435     {
5436         if (ti.tpl_ctx == 0)
5437         {
5438             ti.tpl_ctx = new sm::cpsm_tplh_t();
5439             ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
5440         }
5441 
5442         // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
5443         // call rnd_init for a table more than once.
5444         ti.tpl_scan_ctx->rowGroup = nullptr;
5445 
5446         try
5447         {
5448             sm::tpl_open(tableid, ti.tpl_ctx, hndl);
5449             sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
5450         }
5451         catch (std::exception& e)
5452         {
5453             uint32_t sessionID = tid2sid(thd->thread_id);
5454             string emsg = "table can not be opened: " + string(e.what());
5455             setError(thd, ER_INTERNAL_ERROR, emsg);
5456             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5457             goto internal_error;
5458         }
5459         catch (...)
5460         {
5461             uint32_t sessionID = tid2sid(thd->thread_id);
5462             string emsg = "table can not be opened";
5463             setError(thd, ER_INTERNAL_ERROR, emsg);
5464             CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5465             goto internal_error;
5466         }
5467 
5468         ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
5469 
5470         if ((ti.tpl_scan_ctx->ctp).size() == 0)
5471         {
5472             uint32_t num_attr = table->s->fields;
5473 
5474             for (uint32_t i = 0; i < num_attr; i++)
5475             {
5476                 CalpontSystemCatalog::ColType ctype;
5477                 ti.tpl_scan_ctx->ctp.push_back(ctype);
5478             }
5479         }
5480         ci->tableMap[table] = ti;
5481         hndl->queryState= sm::QUERY_IN_PROCESS;
5482     }
5483 
5484     if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
5485     {
5486         uint32_t sessionID = tid2sid(thd->thread_id);
5487         CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5488         return ER_INTERNAL_ERROR;
5489     }
5490 
5491     idbassert(ti.msTablePtr == table);
5492 
5493     try
5494     {
5495         rc = fetchNextRow(buf, ti, ci);
5496     }
5497     catch (std::exception& e)
5498     {
5499         uint32_t sessionID = tid2sid(thd->thread_id);
5500         string emsg = string("Error while fetching from ExeMgr: ") + e.what();
5501         setError(thd, ER_INTERNAL_ERROR, emsg);
5502         CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5503         return ER_INTERNAL_ERROR;
5504     }
5505 
5506     ci->tableMap[table]= ti;
5507 
5508     if (rc != 0 && rc != HA_ERR_END_OF_FILE)
5509     {
5510         string emsg;
5511 
5512         // remove this check when all error handling migrated to the new framework.
5513         if (rc >= 1000)
5514             emsg = ti.tpl_scan_ctx->errMsg;
5515         else
5516         {
5517             logging::ErrorCodes errorcodes;
5518             emsg = errorcodes.errorString(rc);
5519         }
5520 
5521         uint32_t sessionID = tid2sid(thd->thread_id);
5522         setError(thd, ER_INTERNAL_ERROR, emsg);
5523         ci->stats.fErrorNo = rc;
5524         CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
5525         rc = ER_INTERNAL_ERROR;
5526     }
5527 
5528     return rc;
5529 
5530 internal_error:
5531 
5532     if (ci->cal_conn_hndl)
5533     {
5534         sm::sm_cleanup(ci->cal_conn_hndl);
5535         ci->cal_conn_hndl = 0;
5536     }
5537 
5538     return ER_INTERNAL_ERROR;
5539 
5540 }
5541 
5542 // vim:sw=4 ts=4:
5543