1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 /*
20  * $Id: ha_mcs_dml.cpp 9711 2013-07-23 21:01:27Z chao $
21  */
22 
23 #include <my_config.h>
24 #include <string>
25 #include <iostream>
26 #include <stack>
27 #ifdef _MSC_VER
28 #include <unordered_map>
29 #include <unordered_set>
30 #else
31 #include <tr1/unordered_map>
32 #include <tr1/unordered_set>
33 #endif
34 #include <fstream>
35 #include <sstream>
36 #include <cerrno>
37 #include <cstring>
38 using namespace std;
39 
40 #include <boost/shared_ptr.hpp>
41 using namespace boost;
42 
43 #include "idb_mysql.h"
44 
45 #define NEED_CALPONT_INTERFACE
46 #include "ha_mcs_impl.h"
47 
48 #include "ha_mcs_impl_if.h"
49 using namespace cal_impl_if;
50 
51 #include "vendordmlstatement.h"
52 #include "calpontdmlpackage.h"
53 #include "calpontdmlfactory.h"
54 using namespace dmlpackage;
55 
56 #include "dmlpackageprocessor.h"
57 using namespace dmlpackageprocessor;
58 
59 #include "dataconvert.h"
60 using namespace dataconvert;
61 
62 #include "bytestream.h"
63 using namespace messageqcpp;
64 
65 #include "configcpp.h"
66 using namespace config;
67 
68 #include "calpontsystemcatalog.h"
69 using namespace execplan;
70 
71 #include "resourcemanager.h"
72 using namespace joblist;
73 //#include "stopwatch.h"
74 //using namespace logging;
75 
76 #include "dbrm.h"
77 
78 namespace
79 {
80 #define BATCH_INSERT_GROUP_ROWS_FOR_CACHE 100000
81 uint64_t fBatchInsertGroupRows = 0; // ResourceManager::instance()->getRowsPerBatch();
82 // HDFS is never used nowadays, so don't bother
83 bool useHdfs = false; // ResourceManager::instance()->useHdfs();
84 
85 //convenience fcn
tid2sid(const uint32_t tid)86 inline uint32_t tid2sid(const uint32_t tid)
87 {
88     return execplan::CalpontSystemCatalog::idb_tid2sid(tid);
89 }
90 
91 //StopWatch timer;
buildValueList(TABLE * table,cal_connection_info & ci)92 uint32_t buildValueList (TABLE* table, cal_connection_info& ci )
93 {
94     char attribute_buffer[1024];
95     String attribute(attribute_buffer, sizeof(attribute_buffer),
96                      &my_charset_bin);
97     uint32_t size = 0;
98     int columnPos = 0;
99     double dbval;
100     ci.nullValuesBitset.reset();
101 
102     for (Field** field = table->field; *field; field++)
103     {
104         if ((*field)->is_null())
105         {
106             ci.tableValuesMap[columnPos].push_back (""); //currently, empty string is treated as null.
107             ci.nullValuesBitset[columnPos] = true;
108         }
109         else
110         {
111             bitmap_set_bit(table->read_set, (*field)->field_index);
112             ci.nullValuesBitset[columnPos] = false;
113 
114             // @bug 3798 get real value for float/double type
115             if ((*field)->result_type() == REAL_RESULT)
116             {
117                 dbval = (*field)->val_real();
118                 //int maxlen = (*field)->max_display_length();
119                 const unsigned maxlen = 1024 + 1 + 1 + 1; //1 for leading zero, 1 for dp, 1 for null
120                 char buf[maxlen];
121                 memset(buf, 0, maxlen);
122                 snprintf(buf, maxlen, "%.1024f", dbval);
123                 ci.tableValuesMap[columnPos].push_back(buf);
124             }
125             else
126             {
127                 //fetch different data type
128                 (*field)->val_str(&attribute, &attribute);
129 
130                 if (attribute.length() == 0)
131                 {
132                     ci.tableValuesMap[columnPos].push_back (""); //currently, empty string is treated as null.
133                 }
134                 else
135                 {
136                     string val(attribute.ptr(), attribute.length());
137                     ci.tableValuesMap[columnPos].push_back(val);
138                 }
139             }
140         }
141 
142         ci.colNameList.push_back((*field)->field_name.str);
143 
144         columnPos++;
145     }
146 
147     size = ci.tableValuesMap[0].size();
148     return size;
149 }
150 
151 
ProcessCommandStatement(THD * thd,string & dmlStatement,cal_connection_info & ci,std::string schema="")152 int ProcessCommandStatement(THD* thd, string& dmlStatement, cal_connection_info& ci, std::string schema = "")
153 {
154     int rc = 0;
155 
156     ulong sessionID = tid2sid(thd->thread_id);
157 
158     CalpontDMLPackage* pDMLPackage;
159 
160     //@Bug 2721 and 2722. Log the statement before issuing commit/rollback
161     if ( dmlStatement == "LOGGING" )
162     {
163         char* query_char = idb_mysql_query_str(thd);
164         std::string query_str;
165         if (!query_char)
166         {
167             query_str = "<Replication event>";
168         }
169         else
170         {
171             query_str = query_char;
172         }
173         VendorDMLStatement cmdStmt(query_str, DML_COMMAND, sessionID);
174         cmdStmt.set_Logging( false );
175         pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
176         pDMLPackage->set_Logging( false );
177         pDMLPackage->set_SchemaName( schema );
178     }
179     else
180     {
181         VendorDMLStatement cmdStmt(dmlStatement, DML_COMMAND, sessionID);
182         pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
183     }
184 
185     pDMLPackage->setTableOid (ci.tableOid);
186 
187     if (!ci.singleInsert)
188     {
189         pDMLPackage->set_isBatchInsert(true);
190     }
191 
192     if (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
193         pDMLPackage->set_isAutocommitOn(true);
194 
195     if (useHdfs)
196         pDMLPackage->set_isAutocommitOn(true);
197 
198     ByteStream bytestream;
199     bytestream << static_cast<uint32_t>(sessionID);
200 
201     pDMLPackage->write(bytestream);
202     delete pDMLPackage;
203 
204     ByteStream::byte b = 0;
205     string errormsg;
206     ByteStream::octbyte rows;
207 
208     try
209     {
210         ci.dmlProc->write(bytestream);
211         bytestream = ci.dmlProc->read();
212 
213         if ( bytestream.length() == 0 )
214         {
215             rc = 1;
216             thd->killed = KILL_QUERY;
217             thd->get_stmt_da()->set_overwrite_status(true);
218 
219             thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [1]");
220         }
221         else
222         {
223             bytestream >> b;
224             bytestream >> rows;
225             bytestream >> errormsg;
226         }
227     }
228     catch (runtime_error&)
229     {
230         rc = 1 ;
231         thd->killed = KILL_QUERY;
232         thd->get_stmt_da()->set_overwrite_status(true);
233 
234         thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [2]");
235     }
236     catch (...)
237     {
238         rc = 1;
239         thd->killed = KILL_QUERY;
240         thd->get_stmt_da()->set_overwrite_status(true);
241 
242         thd->raise_error_printf(ER_INTERNAL_ERROR, "Caught unknown error");
243     }
244 
245     if (( b != 0 ) && (!thd->get_stmt_da()->is_set()))
246     {
247         rc = 1;
248         thd->killed = KILL_QUERY;
249         thd->raise_error_printf(ER_INTERNAL_ERROR, errormsg.c_str());
250     }
251 
252     delete ci.dmlProc;
253     ci.dmlProc = NULL;
254     return rc;
255 }
256 
doProcessInsertValues(TABLE * table,uint32_t size,cal_connection_info & ci,bool lastBatch=false)257 int doProcessInsertValues ( TABLE* table, uint32_t size, cal_connection_info& ci, bool lastBatch = false )
258 {
259     THD* thd = current_thd;
260     uint32_t sessionID = tid2sid(thd->thread_id);
261 
262     int rc = 0;
263 
264     char* query_char = idb_mysql_query_str(thd);
265     std::string query_str;
266     if (!query_char)
267     {
268         query_str = "<Replication event>";
269     }
270     else
271     {
272         query_str = query_char;
273     }
274 
275     VendorDMLStatement dmlStmts(query_str, DML_INSERT, table->s->table_name.str,
276                                 table->s->db.str, size, ci.colNameList.size(), ci.colNameList,
277                                 ci.tableValuesMap, ci.nullValuesBitset, sessionID);
278 
279     CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStmts);
280     //@Bug 2466 Move the clean up earlier to avoid the second insert in another session to get the data
281     ci.tableValuesMap.clear();
282     ci.colNameList.clear();
283 
284     if (!pDMLPackage)
285     {
286         rc = -1;
287         string emsg("Calpont DML package cannot build. ");
288         thd->get_stmt_da()->set_overwrite_status(true);
289         thd->raise_error_printf(ER_INTERNAL_ERROR, emsg.c_str());
290         return rc;
291     }
292 
293     //@Bug 2721 and 2722 log batch insert statement only once in the bebug file.
294     if (( ( ci.isLoaddataInfile ) || ((ci.rowsHaveInserted + size) < ci.bulkInsertRows ) ) && (ci.rowsHaveInserted > 0))
295     {
296         pDMLPackage->set_Logging( false );
297         pDMLPackage->set_Logending( false );
298     }
299     else if (( ( ci.isLoaddataInfile ) || ((ci.rowsHaveInserted + size) < ci.bulkInsertRows ) ) && (ci.rowsHaveInserted == 0))
300     {
301         pDMLPackage->set_Logging( true );
302         pDMLPackage->set_Logending( false );
303     }
304 
305     if ( ci.singleInsert )
306     {
307         pDMLPackage->set_Logging( true );
308         pDMLPackage->set_Logending( true );
309     }
310 
311     if ( !ci.singleInsert )
312     {
313         pDMLPackage->set_isBatchInsert( true );
314     }
315 
316     if (thd->is_strict_mode())
317     {
318         pDMLPackage->set_isWarnToError( true );
319     }
320 
321     pDMLPackage->setTableOid (ci.tableOid);
322 
323     if (lastBatch)
324     {
325         pDMLPackage->set_Logending( true );
326 
327     }
328 
329     if (lastBatch && (ci.rowsHaveInserted > 0))
330         pDMLPackage->set_Logging( false );
331 
332     std::string name = table->s->table_name.str;
333     pDMLPackage->set_TableName(name);
334     name = table->s->db.str;
335     pDMLPackage->set_SchemaName(name);
336     pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
337 
338     if (thd->lex->sql_command == SQLCOM_INSERT_SELECT)
339         pDMLPackage->set_isInsertSelect(true);
340 
341     //Carry session autocommit info in the pkg to use in DMLProc
342     //cout << "Thread options = "  << thd->variables.option_bits << " and  OPTION_NOT_AUTOCOMMIT:OPTION_BEGIN = " << OPTION_NOT_AUTOCOMMIT << ":" << OPTION_BEGIN << endl;
343     if (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
344     {
345         //cout << "autocommit is on" << endl;
346         pDMLPackage->set_isAutocommitOn(true);
347     }
348     else if (useHdfs)
349     {
350         pDMLPackage->set_isAutocommitOn(true);
351     }
352 
353     ByteStream bytestream, bytestreamRcv;
354     bytestream << sessionID;
355 
356     pDMLPackage->write(bytestream);
357     delete pDMLPackage;
358 
359     ByteStream::byte b = 0;
360     string errormsg;
361     ByteStream::octbyte rows;
362 
363     try
364     {
365         ci.dmlProc->write(bytestream);
366         bytestreamRcv = ci.dmlProc->read();
367 
368         if ( bytestreamRcv.length() == 0 )
369         {
370             //check if it is first batch and DMLProc restarted. Only this case, get a new client and resend
371             if (ci.rowsHaveInserted == 0)
372             {
373                 delete ci.dmlProc;
374                 ci.dmlProc = new MessageQueueClient("DMLProc");
375 
376                 //cout << "doProcessInsertValues starts a client " << ci.dmlProc << " for session " << thd->thread_id << endl;
377                 try
378                 {
379                     ci.dmlProc->write(bytestream);
380                     bytestreamRcv = ci.dmlProc->read();
381 
382                     if ( bytestreamRcv.length() == 0 )
383                     {
384                         rc = -1;
385                         b = 1;
386                         errormsg = "Lost connection to DMLProc [3]";
387                     }
388                     else
389                     {
390                         bytestreamRcv >> b;
391                         bytestreamRcv >> rows;
392                         bytestreamRcv >> errormsg;
393                         rc = b;
394                     }
395                 }
396                 catch (runtime_error&)
397                 {
398                     rc = -1;
399                     thd->get_stmt_da()->set_overwrite_status(true);
400                     errormsg = "Lost connection to DMLProc [4]";
401                     b = 1;
402                 }
403             }
404         }
405         else
406         {
407             bytestreamRcv >> b;
408             bytestreamRcv >> rows;
409             bytestreamRcv >> errormsg;
410             rc = b;
411         }
412     }
413     catch (std::exception& rex)
414     {
415         //check if it is first batch and DMLProc restarted. Only this case, get a new client and resend
416         if (ci.rowsHaveInserted == 0)
417         {
418             delete ci.dmlProc;
419             ci.dmlProc = new MessageQueueClient("DMLProc");
420 
421             //cout << "doProcessInsertValues exception starts a client " << ci.dmlProc << " for session " << thd->thread_id << endl;
422             try
423             {
424                 ci.dmlProc->write(bytestream);
425                 bytestreamRcv = ci.dmlProc->read();
426 
427                 if ( bytestreamRcv.length() == 0 )
428                 {
429                     rc = -1;
430                     b = 1;
431                     errormsg = string("Lost connection to DMLProc after getting a new client [1:") + rex.what() + "]";
432                 }
433                 else
434                 {
435                     bytestreamRcv >> b;
436                     bytestreamRcv >> rows;
437                     bytestreamRcv >> errormsg;
438                     rc = b;
439                 }
440             }
441             catch (std::exception& rrex)
442             {
443                 rc = -1;
444                 thd->get_stmt_da()->set_overwrite_status(true);
445                 errormsg = string("Lost connection to DMLProc after getting a new client [2:") + rex.what() + " then " + rrex.what() + "]";
446                 b = 1;
447             }
448         }
449         else //really lost connection
450         {
451             rc = -1;
452             thd->get_stmt_da()->set_overwrite_status(true);
453             errormsg = string("Lost connection to DMLProc really [1:") + rex.what() + "]";
454             b = 1;
455         }
456     }
457     catch (...)
458     {
459         rc = -1;
460         thd->get_stmt_da()->set_overwrite_status(true);
461         errormsg = "Unknown error caught";
462         b = 1;
463     }
464 
465 
466     if ((b != 0) && (b != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
467     {
468         thd->get_stmt_da()->set_overwrite_status(true);
469         thd->raise_error_printf(ER_INTERNAL_ERROR, errormsg.c_str());
470 
471     }
472 
473     if ( b == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING )
474     {
475         if (!thd->is_strict_mode())
476         {
477             rc = 0;
478         }
479 
480         push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARN_DATA_OUT_OF_RANGE, errormsg.c_str());
481     }
482 
483     if ( rc != 0 )
484         ci.rc = rc;
485 
486     if ( b == dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR )
487         rc = b;
488 
489     return rc;
490 }
491 
492 }
493 
ha_mcs_impl_write_last_batch(TABLE * table,cal_connection_info & ci,bool abort)494 int ha_mcs_impl_write_last_batch(TABLE* table, cal_connection_info& ci, bool abort)
495 {
496     int rc = 0;
497     THD* thd = current_thd;
498     std::string command;
499     uint32_t size =  ci.tableValuesMap[0].size();
500     //@Bug 2468. Add a logging statement command
501     command = "COMMIT";
502     std::string schema;
503     schema = table->s->db.str;
504 
505     //@Bug 6112. if no row to be insert and no rows have been inserted, no need to send to DMLProc
506     if ((size == 0) && (ci.rowsHaveInserted == 0))
507         return rc;
508 
509     //@Bug 2715 Check the saved error code.
510     //@Bug 4516 always send the last package to allow DMLProc receive all messages from WES
511     if (( ci.rc != 0 ) || abort )
512     {
513         if (abort) //@Bug 5285. abort is different from error, dmlproc only clean up when erroring out
514             rc = doProcessInsertValues( table, size, ci, true);
515 
516         //@Bug 2722 Log the statement into datamod log
517         //@Bug 4605 if error, rollback and no need to check whether the session is autocommit off
518 
519         command = "ROLLBACK";
520         rc = ProcessCommandStatement ( thd, command, ci, schema );
521         rc = ci.rc;
522         ci.rc = 0;
523 
524         if (size > 0 )
525         {
526             ci.tableValuesMap.clear();
527             ci.colNameList.clear();
528         }
529 
530         return rc;
531     }
532     else
533     {
534         rc = doProcessInsertValues( table, size, ci, true);
535     }
536 
537     if ( abort )
538     {
539         rc = 1;
540         thd->get_stmt_da()->set_overwrite_status(true);
541         std::string errormsg = "statement is aborted.";
542         thd->raise_error_printf(ER_INTERNAL_ERROR, errormsg.c_str());
543     }
544 
545     if ( rc == dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR  )
546         return rc;
547 
548     //@Bug 4605
549     int rc1 = 0;
550     if ( (rc == 0) && !abort && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))
551     {
552         ci.rowsHaveInserted += size;
553         command = "COMMIT";
554         rc1 = ProcessCommandStatement ( thd, command, ci, schema );
555     }
556     else if (useHdfs)
557     {
558         ci.rowsHaveInserted += size;
559         command = "COMMIT";
560         rc1 = ProcessCommandStatement ( thd, command, ci, schema );
561     }
562     else if (( rc != 0) || abort )
563     {
564         command = "ROLLBACK";
565         rc1 = ProcessCommandStatement ( thd, command, ci, schema );
566     }
567     rc = max(rc, rc1);
568 
569     return rc;
570 
571 }
572 
ha_mcs_impl_write_row_(const uchar * buf,TABLE * table,cal_connection_info & ci,ha_rows & rowsInserted)573 int ha_mcs_impl_write_row_(const uchar* buf, TABLE* table, cal_connection_info& ci, ha_rows& rowsInserted)
574 {
575     int rc = 0;
576     //timer.start( "buildValueList");
577     ci.colNameList.clear();
578     THD* thd = current_thd;
579     uint32_t size = 0;
580     std::string schema;
581     schema = table->s->db.str;
582 
583     //@Bug 2086 Added syntax check for '\0'
584     try
585     {
586         size = buildValueList ( table, ci );
587     }
588     catch (runtime_error& rex)
589     {
590         rc = 1;
591         ci.rc = rc; //@Bug 2790 Save the error infomation.
592         thd->get_stmt_da()->set_overwrite_status(true);
593         thd->raise_error_printf(ER_INTERNAL_ERROR, rex.what());
594         return rc;
595     }
596 
597     if (fBatchInsertGroupRows == 0)
598     {
599         fBatchInsertGroupRows = ResourceManager::instance()->getRowsPerBatch();
600     }
601 
602     //timer.stop( "buildValueList");
603     if ( ci.singleInsert   // Single insert
604          || (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows )
605              || ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows) || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) )) )
606          //Insert with mutilple value case: processed batch by batch. Last batch is sent also.
607          || (( ci.bulkInsertRows == 0 ) && ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows)
608              || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) ) )  ) // Load data in file is processed batch by batch
609     {
610         //timer.start( "DMLProc takes");
611         //cout <<" sending a batch to DMLProc ... The size is " << size << "  the current bulkInsertRows = " <<  ci.bulkInsertRows << endl;
612         //Build dmlpackage
613         if (( ci.bulkInsertRows > 0 ) && ( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ))
614         {
615             rc = doProcessInsertValues( table, size, ci, true );
616         }
617         else
618         {
619             rc = doProcessInsertValues( table, size, ci );
620         }
621 
622         if ( rc == 0 )
623             rowsInserted = size;
624         else
625             ci.rc = rc;
626 
627         //@Bug 2481. The current active transaction needs to be released if autocommit is on
628         //@Bug 2438 Added a check for batchinsert's last batch to send commit if autocommit is on
629         std::string command;
630 
631         if ( ci.singleInsert  || ( (ci.bulkInsertRows > 0 ) && (( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ) ) )
632         {
633             if ( thd->killed > 0 )
634             {
635                 command = "ROLLBACK";
636                 rc = ProcessCommandStatement ( thd, command, ci, schema );
637             }
638             else if (rc != dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR)
639             {
640                 //@Bug 4605
641                 int rc1 = 0;
642                 if ( rc != 0 )
643                 {
644                     command = "ROLLBACK";
645                     rc1 = ProcessCommandStatement ( thd, command, ci, schema );
646                 }
647                 else if (( rc == 0 ) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))
648                 {
649                     command = "COMMIT";
650                     rc1 = ProcessCommandStatement ( thd, command, ci, schema );
651                 }
652                 else if (useHdfs)
653                 {
654                     command = "COMMIT";
655                     rc1 = ProcessCommandStatement ( thd, command, ci, schema );
656                 }
657                 rc = max(rc, rc1);
658             }
659         }
660 
661         //timer.stop( "DMLProc takes");
662         //timer.finish();
663         return rc;
664 
665     }
666     else
667     {
668         return rc;
669     }
670 }
671 
ha_mcs_impl_write_batch_row_(const uchar * buf,TABLE * table,cal_impl_if::cal_connection_info & ci)672 int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci)
673 {
674     ByteStream rowData;
675     int rc = 0;
676     //std::ostringstream  data;
677     bool nullVal = false;
678     const uchar* bufHdr = buf;	// bit flag indicating a field is null. Only those fields that are nullable are represented.
679     int32_t headerByte = 0; // Current byte in the bufHdr
680     int32_t headerBit = 0;  // current bit in the bufHdr current byte.
681     uint16_t colpos = 0;
682     buf  = buf + ci.headerLength;  // Number of bytes used for null bits.
683     //@Bug 6122 if all columns have not null constraint, there is no information in the header
684     std::string escape;
685     char nullBits = *bufHdr++;
686 
687     if (!ci.useXbit)
688     {
689         // Skip the first bit. For some reason, mysql reserves the first bit of the first byte, unless there's a varchar column in the table.
690         nullBits = nullBits >> 1;
691         ++headerBit;
692     }
693 
694     while (colpos < ci.columnTypes.size()) //test bitmap for null values
695     {
696         uint8_t numLoop = 7;
697 
698         if ((ci.useXbit) || (colpos > 6))
699             numLoop++;
700 
701         for (uint8_t i = 0; i < numLoop; i++)
702         {
703             if (colpos == ci.columnTypes.size())
704                 break;
705 
706             //if a column has not null constraint, it will not be in the bit map
707             if (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)
708             {
709                 if (ci.headerLength > 0 && headerByte >= ci.headerLength)
710                 {
711                     // We've used more null bits than allowed. Something is seriously wrong.
712                     std::string errormsg = "Null bit header is wrong size";
713                     setError(current_thd, ER_INTERNAL_ERROR, errormsg);
714                     return -1;
715                 }
716 
717                 nullVal = nullBits & 0x01;
718                 nullBits = nullBits >> 1;
719                 ++headerBit;
720 
721                 if (headerBit == 8)
722                 {
723                     nullBits = *bufHdr++;
724                     headerBit = 0;
725                     ++headerByte;
726                 }
727             }
728             else
729             {
730                 nullVal = false;
731             }
732 
733             switch (ci.columnTypes[colpos].colDataType)
734             {
735                 case CalpontSystemCatalog::DATE: //date fetch
736                 {
737 
738                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
739                     {
740                         fprintf(ci.filePtr, "%c", ci.delimiter);
741                     }
742                     else
743                     {
744                         const uchar* tmp1 = buf;
745                         uint32_t tmp = (tmp1[2] << 16) + (tmp1[1] << 8) + tmp1[0];
746 
747                         int day = tmp & 0x0000001fl;
748                         int month = (tmp >> 5) & 0x0000000fl;
749                         int year = tmp >> 9;
750                         fprintf(ci.filePtr, "%04d-%02d-%02d%c", year, month, day, ci.delimiter);
751                     }
752 
753                     buf += 3;
754                     break;
755                 }
756 
757                 case CalpontSystemCatalog::DATETIME:
758                 {
759                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
760                     {
761                         fprintf(ci.filePtr, "%c", ci.delimiter);
762 
763                         if (table->field[colpos]->real_type() == MYSQL_TYPE_DATETIME2)
764                             buf += table->field[colpos]->pack_length();
765                         else
766                             buf += 8;
767                     }
768                     else
769                     {
770                         if (table->field[colpos]->real_type() == MYSQL_TYPE_DATETIME2)
771                         {
772                             // mariadb 10.1 compatibility -- MYSQL_TYPE_DATETIME2 introduced in mysql 5.6
773                             MYSQL_TIME ltime;
774                             const uchar* pos = buf;
775                             longlong tmp = my_datetime_packed_from_binary(pos, table->field[colpos]->decimals());
776                             TIME_from_longlong_datetime_packed(&ltime, tmp);
777 
778                             if (!ltime.second_part)
779                             {
780                                 fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d%c",
781                                         ltime.year, ltime.month, ltime.day,
782                                         ltime.hour, ltime.minute, ltime.second, ci.delimiter);
783                             }
784                             else
785                             {
786                                 fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d.%ld%c",
787                                         ltime.year, ltime.month, ltime.day,
788                                         ltime.hour, ltime.minute, ltime.second,
789                                         ltime.second_part, ci.delimiter);
790                             }
791 
792                             buf += table->field[colpos]->pack_length();
793                         }
794                         else
795                         {
796                             long long value = *((long long*) buf);
797                             long datePart = (long) (value / 1000000ll);
798                             int day = datePart % 100;
799                             int month = (datePart / 100) % 100;
800                             int year = datePart / 10000;
801                             fprintf(ci.filePtr, "%04d-%02d-%02d ", year, month, day);
802 
803                             long timePart = (long) (value - (long long) datePart * 1000000ll);
804                             int second = timePart % 100;
805                             int min = (timePart / 100) % 100;
806                             int hour = timePart / 10000;
807                             fprintf(ci.filePtr, "%02d:%02d:%02d%c", hour, min, second, ci.delimiter);
808                             buf += 8;
809                         }
810                     }
811 
812                     break;
813                 }
814 
815                 case CalpontSystemCatalog::TIME:
816                 {
817                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
818                     {
819                         fprintf(ci.filePtr, "%c", ci.delimiter);
820 
821                         buf += table->field[colpos]->pack_length();
822                     }
823                     else
824                     {
825                         MYSQL_TIME ltime;
826                         const uchar* pos = buf;
827                         longlong tmp = my_time_packed_from_binary(pos, table->field[colpos]->decimals());
828                         TIME_from_longlong_time_packed(&ltime, tmp);
829 
830                         if (ltime.neg)
831                         {
832                             fprintf(ci.filePtr, "-");
833                         }
834 
835                         if (!ltime.second_part)
836                         {
837                             fprintf(ci.filePtr, "%02d:%02d:%02d%c",
838                                     ltime.hour, ltime.minute, ltime.second, ci.delimiter);
839                         }
840                         else
841                         {
842                             fprintf(ci.filePtr, "%02d:%02d:%02d.%ld%c",
843                                     ltime.hour, ltime.minute, ltime.second,
844                                     ltime.second_part, ci.delimiter);
845                         }
846 
847                         buf += table->field[colpos]->pack_length();
848                     }
849 
850                     break;
851                 }
852 
853                 case CalpontSystemCatalog::TIMESTAMP:
854                 {
855                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
856                     {
857                         fprintf(ci.filePtr, "%c", ci.delimiter);
858                     }
859                     else
860                     {
861                         const uchar* pos = buf;
862                         struct timeval tm;
863                         my_timestamp_from_binary(&tm, pos, table->field[colpos]->decimals());
864 
865                         MySQLTime time;
866                         gmtSecToMySQLTime(tm.tv_sec, time, current_thd->variables.time_zone->get_name()->ptr());
867 
868                         if (!tm.tv_usec)
869                         {
870                             fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d%c",
871                                     time.year, time.month, time.day,
872                                     time.hour, time.minute, time.second, ci.delimiter);
873                         }
874                         else
875                         {
876                             fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d.%ld%c",
877                                     time.year, time.month, time.day,
878                                     time.hour, time.minute, time.second,
879                                     tm.tv_usec, ci.delimiter);
880                         }
881                     }
882 
883                     buf += table->field[colpos]->pack_length();
884 
885                     break;
886                 }
887                 case CalpontSystemCatalog::BIGINT:
888                 {
889                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
890                         fprintf(ci.filePtr, "%c", ci.delimiter);
891                     else
892                         fprintf(ci.filePtr, "%lld%c", *((long long*)buf), ci.delimiter);
893 
894                     buf += 8;
895                     break;
896                 }
897 
898                 case CalpontSystemCatalog::UBIGINT:
899                 {
900                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
901                         fprintf(ci.filePtr, "%c", ci.delimiter);
902                     else
903                         fprintf(ci.filePtr, "%llu%c", *((long long unsigned*)buf), ci.delimiter);
904 
905                     buf += 8;
906                     break;
907                 }
908 
909                 case CalpontSystemCatalog::INT:
910                 case CalpontSystemCatalog::MEDINT:
911                 {
912                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
913                         fprintf(ci.filePtr, "%c", ci.delimiter);
914                     else
915                         fprintf(ci.filePtr, "%d%c", *((int32_t*)buf), ci.delimiter);
916 
917                     buf += 4;
918                     break;
919                 }
920 
921                 case CalpontSystemCatalog::UINT:
922                 case CalpontSystemCatalog::UMEDINT:
923                 {
924                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
925                         fprintf(ci.filePtr, "%c", ci.delimiter);
926                     else
927                     {
928                         fprintf(ci.filePtr, "%u%c", *((uint32_t*)buf), ci.delimiter);
929                         //printf("%u|", *((uint32_t*)buf));
930                         //cout << *((uint32_t*)buf) << endl;
931                     }
932 
933                     buf += 4;
934                     break;
935                 }
936 
937                 case CalpontSystemCatalog::SMALLINT:
938                 {
939                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
940                         fprintf(ci.filePtr, "%c", ci.delimiter);
941                     else
942                         fprintf(ci.filePtr, "%d%c", *((int16_t*)buf), ci.delimiter);
943 
944                     buf += 2;
945                     break;
946                 }
947 
948                 case CalpontSystemCatalog::USMALLINT:
949                 {
950                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
951                         fprintf(ci.filePtr, "%c", ci.delimiter);
952                     else
953                         fprintf(ci.filePtr, "%u%c", *((uint16_t*)buf), ci.delimiter);
954 
955                     buf += 2;
956                     break;
957                 }
958 
959                 case CalpontSystemCatalog::TINYINT:
960                 {
961                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
962                         fprintf(ci.filePtr, "%c", ci.delimiter);
963                     else
964                         fprintf(ci.filePtr, "%d%c", *((int8_t*)buf), ci.delimiter);
965 
966                     buf += 1;
967                     break;
968                 }
969 
970                 case CalpontSystemCatalog::UTINYINT:
971                 {
972                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
973                         fprintf(ci.filePtr, "%c", ci.delimiter);
974                     else
975                         fprintf(ci.filePtr, "%u%c", *((uint8_t*)buf), ci.delimiter);
976 
977                     buf += 1;
978                     break;
979                 }
980 
981                 case CalpontSystemCatalog::FLOAT:
982                 case CalpontSystemCatalog::UFLOAT:
983                 {
984                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
985                         fprintf(ci.filePtr, "%c", ci.delimiter);
986                     else
987                     {
988                         float val = *((float*)buf);
989 
990                         if ((fabs(val) > (1.0 / IDB_pow[4])) && (fabs(val) < (float) IDB_pow[6]))
991                         {
992                             fprintf(ci.filePtr, "%.7f%c", val, ci.delimiter);
993                         }
994                         else
995                         {
996                             fprintf(ci.filePtr, "%e%c", val, ci.delimiter);
997                         }
998 
999 
1000                         //fprintf(ci.filePtr, "%.7g|", *((float*)buf));
1001                         //printf("%.7f|", *((float*)buf));
1002                     }
1003 
1004                     buf += 4;
1005                     break;
1006                 }
1007 
1008                 case CalpontSystemCatalog::DOUBLE:
1009                 case CalpontSystemCatalog::UDOUBLE:
1010                 {
1011                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
1012                         fprintf(ci.filePtr, "%c", ci.delimiter);
1013                     else
1014                     {
1015                         fprintf(ci.filePtr, "%.15g%c", *((double*)buf), ci.delimiter);
1016                         //printf("%.15g|", *((double*)buf));
1017                     }
1018 
1019                     buf += 8;
1020                     break;
1021                 }
1022 
1023                 case CalpontSystemCatalog::LONGDOUBLE:
1024                 {
1025                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
1026                         fprintf(ci.filePtr, "%c", ci.delimiter);
1027                     else
1028                     {
1029                         fprintf(ci.filePtr, "%.15Lg%c", *((long double*)buf), ci.delimiter);
1030                         //printf("%.15g|", *((double*)buf));
1031                     }
1032 
1033                     buf += 8;
1034                     break;
1035                 }
1036 
1037                 case CalpontSystemCatalog::DECIMAL:
1038                 case CalpontSystemCatalog::UDECIMAL:
1039                 {
1040                     uint bytesBefore = 1;
1041                     uint totalBytes = 9;
1042 
1043                     switch (ci.columnTypes[colpos].precision)
1044                     {
1045                         case 18:
1046                         case 17:
1047                         case 16:
1048                         {
1049                             totalBytes = 8;
1050                             break;
1051                         }
1052 
1053                         case 15:
1054                         case 14:
1055                         {
1056                             totalBytes = 7;
1057                             break;
1058                         }
1059 
1060                         case 13:
1061                         case 12:
1062                         {
1063                             totalBytes =  6;
1064                             break;
1065                         }
1066 
1067                         case 11:
1068                         {
1069                             totalBytes =  5;
1070                             break;
1071                         }
1072 
1073                         case 10:
1074                         {
1075                             totalBytes =  5;
1076                             break;
1077                         }
1078 
1079                         case 9:
1080                         case 8:
1081                         case 7:
1082                         {
1083                             totalBytes =  4;
1084                             break;
1085                         }
1086 
1087                         case 6:
1088                         case 5:
1089                         {
1090                             totalBytes = 3;
1091                             break;
1092                         }
1093 
1094                         case 4:
1095                         case 3:
1096                         {
1097                             totalBytes =  2;
1098                             break;
1099                         }
1100 
1101                         case 2:
1102                         case 1:
1103                         {
1104                             totalBytes = 1;
1105                             break;
1106                         }
1107 
1108                         default:
1109                             break;
1110                     }
1111 
1112                     switch (ci.columnTypes[colpos].scale)
1113                     {
1114                         case 0:
1115                         {
1116                             bytesBefore = totalBytes;
1117                             break;
1118                         }
1119 
1120                         case 1: //1 byte for digits after decimal point
1121                         {
1122                             if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14)
1123                                     && (ci.columnTypes[colpos].precision != 12) && (ci.columnTypes[colpos].precision != 10)
1124                                     && (ci.columnTypes[colpos].precision != 7) && (ci.columnTypes[colpos].precision != 5)
1125                                     && (ci.columnTypes[colpos].precision != 3) && (ci.columnTypes[colpos].precision != 1))
1126                                 totalBytes++;
1127 
1128                             bytesBefore = totalBytes - 1;
1129                             break;
1130                         }
1131 
1132                         case 2: //1 byte for digits after decimal point
1133                         {
1134                             if ((ci.columnTypes[colpos].precision == 18) || (ci.columnTypes[colpos].precision == 9))
1135                                 totalBytes++;
1136 
1137                             bytesBefore = totalBytes - 1;
1138                             break;
1139                         }
1140 
1141                         case 3: //2 bytes for digits after decimal point
1142                         {
1143                             if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14)
1144                                     && (ci.columnTypes[colpos].precision != 12) && (ci.columnTypes[colpos].precision != 7)
1145                                     && (ci.columnTypes[colpos].precision != 5) && (ci.columnTypes[colpos].precision != 3))
1146                                 totalBytes++;
1147 
1148                             bytesBefore = totalBytes - 2;
1149                             break;
1150                         }
1151 
1152                         case 4:
1153                         {
1154                             if ((ci.columnTypes[colpos].precision == 18) || (ci.columnTypes[colpos].precision == 11)
1155                                     || (ci.columnTypes[colpos].precision == 9))
1156                                 totalBytes++;
1157 
1158                             bytesBefore = totalBytes - 2;
1159                             break;
1160 
1161                         }
1162 
1163                         case 5:
1164                         {
1165                             if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14)
1166                                     && (ci.columnTypes[colpos].precision != 7) && (ci.columnTypes[colpos].precision != 5))
1167                                 totalBytes++;
1168 
1169                             bytesBefore = totalBytes - 3;
1170                             break;
1171                         }
1172 
1173                         case 6:
1174                         {
1175                             if ((ci.columnTypes[colpos].precision == 18) || (ci.columnTypes[colpos].precision == 13)
1176                                     || (ci.columnTypes[colpos].precision == 11) || (ci.columnTypes[colpos].precision == 9))
1177                                 totalBytes++;
1178 
1179                             bytesBefore = totalBytes - 3;
1180                             break;
1181                         }
1182 
1183                         case 7:
1184                         {
1185                             if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 7))
1186                                 totalBytes++;
1187 
1188                             bytesBefore = totalBytes - 4;
1189                             break;
1190                         }
1191 
1192                         case 8:
1193                         {
1194                             if ((ci.columnTypes[colpos].precision == 18) || (ci.columnTypes[colpos].precision == 15)
1195                                     || (ci.columnTypes[colpos].precision == 13) || (ci.columnTypes[colpos].precision == 11)
1196                                     || (ci.columnTypes[colpos].precision == 9))
1197                                 totalBytes++;
1198 
1199                             bytesBefore = totalBytes - 4;;
1200                             break;
1201                         }
1202 
1203                         case 9:
1204                         {
1205                             bytesBefore = totalBytes - 4;;
1206                             break;
1207                         }
1208 
1209                         case 10:
1210                         {
1211                             if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14)
1212                                     && (ci.columnTypes[colpos].precision != 12) && (ci.columnTypes[colpos].precision != 10))
1213                                 totalBytes++;
1214 
1215                             bytesBefore = totalBytes - 5;;
1216                             break;
1217                         }
1218 
1219                         case 11:
1220                         {
1221                             if (ci.columnTypes[colpos].precision == 18)
1222                                 totalBytes++;
1223 
1224                             bytesBefore = totalBytes - 5;
1225                             break;
1226                         }
1227 
1228                         case 12:
1229                         {
1230                             if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14)
1231                                     && (ci.columnTypes[colpos].precision != 12))
1232                                 totalBytes++;
1233 
1234                             bytesBefore = totalBytes - 6;
1235                             break;
1236                         }
1237 
1238                         case 13:
1239                         {
1240                             if (ci.columnTypes[colpos].precision == 18)
1241                                 totalBytes++;
1242 
1243                             bytesBefore = totalBytes - 6;
1244                             break;
1245                         }
1246 
1247                         case 14:
1248                         {
1249                             if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14))
1250                                 totalBytes++;
1251 
1252                             bytesBefore = totalBytes - 7;
1253                             break;
1254                         }
1255 
1256                         case 15:
1257                         {
1258                             if (ci.columnTypes[colpos].precision == 18)
1259                                 totalBytes++;
1260 
1261                             bytesBefore = totalBytes - 7;
1262                             break;
1263                         }
1264 
1265                         case 16:
1266                         {
1267                             if (ci.columnTypes[colpos].precision != 16)
1268                                 totalBytes++;
1269 
1270                             bytesBefore = totalBytes - 8;
1271                             break;
1272                         }
1273 
1274                         case 17:
1275                         {
1276                             if (ci.columnTypes[colpos].precision == 18)
1277                                 totalBytes++;
1278 
1279                             bytesBefore = totalBytes - 8;
1280                             break;
1281                         }
1282 
1283                         case 18:
1284                         {
1285                             bytesBefore = totalBytes - 8;
1286                             break;
1287                         }
1288 
1289                         default:
1290                             break;
1291                     }
1292 
1293                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
1294                     {
1295                         fprintf(ci.filePtr, "%c", ci.delimiter);
1296                         //printf("|");
1297                     }
1298                     else
1299                     {
1300                         uint32_t mask [5] = {0, 0xFF, 0xFFFF, 0xFFFFFF, 0xFFFFFFFF};
1301                         char neg = '-';
1302 
1303                         if (ci.columnTypes[colpos].scale == 0)
1304                         {
1305                             const uchar* tmpBuf = buf;
1306                             //test flag bit for sign
1307                             bool posNum  = tmpBuf[0] & (0x80);
1308                             uchar tmpChr = tmpBuf[0];
1309                             tmpChr ^= 0x80; //flip the bit
1310                             int32_t tmp1 = tmpChr;
1311 
1312                             if (totalBytes > 4)
1313                             {
1314                                 for (uint i = 1; i < (totalBytes - 4); i++)
1315                                 {
1316                                     tmp1 = (tmp1 << 8) + tmpBuf[i];
1317                                 }
1318 
1319                                 if (( tmp1 != 0 ) && (tmp1 != -1))
1320                                 {
1321                                     if (!posNum)
1322                                     {
1323                                         tmp1 = mask[totalBytes - 4] - tmp1;
1324 
1325                                         if (tmp1 != 0)
1326                                         {
1327                                             fprintf(ci.filePtr, "%c", neg);
1328                                             //printf("%c", neg);
1329                                         }
1330                                     }
1331 
1332                                     if (tmp1 != 0)
1333                                     {
1334                                         fprintf(ci.filePtr, "%d", tmp1);
1335                                         ////printf("%d", tmp1);
1336                                     }
1337                                 }
1338 
1339                                 int32_t tmp2 = tmpBuf[totalBytes - 4];
1340 
1341                                 for (uint i = (totalBytes - 3); i < totalBytes; i++)
1342                                 {
1343                                     tmp2 = (tmp2 << 8) + tmpBuf[i];
1344                                 }
1345 
1346                                 if ( tmp1 != 0 )
1347                                 {
1348                                     if (!posNum)
1349                                     {
1350                                         tmp2 = mask[4] - tmp2;
1351 
1352                                         if (tmp1 == -1)
1353                                         {
1354                                             fprintf(ci.filePtr, "%c", neg);
1355                                             fprintf(ci.filePtr, "%d%c", tmp2, ci.delimiter);
1356                                             ////printf("%c", neg);
1357                                             //////printf( "%d|", tmp2);
1358                                         }
1359                                         else
1360                                         {
1361                                             fprintf(ci.filePtr, "%09u%c", tmp2, ci.delimiter);
1362                                             ////printf("%09u|", tmp2);
1363                                         }
1364                                     }
1365                                     else
1366                                     {
1367                                         fprintf(ci.filePtr, "%09u%c", tmp2, ci.delimiter);
1368                                         //printf("%09u|", tmp2);
1369                                     }
1370                                 }
1371                                 else
1372                                 {
1373                                     if (!posNum)
1374                                     {
1375                                         tmp2 = mask[4] - tmp2;
1376                                         fprintf(ci.filePtr, "%c", neg);
1377                                         //printf("%c", neg);
1378                                     }
1379 
1380                                     fprintf(ci.filePtr, "%d%c", tmp2, ci.delimiter);
1381                                     //printf("%d|", tmp2);
1382                                 }
1383                             }
1384                             else
1385                             {
1386                                 for (uint i = 1; i < totalBytes; i++)
1387                                 {
1388                                     tmp1 = (tmp1 << 8) + tmpBuf[i];
1389                                 }
1390 
1391                                 if (!posNum)
1392                                 {
1393                                     tmp1 = mask[totalBytes] - tmp1;
1394                                     fprintf(ci.filePtr, "%c", neg);
1395                                     //printf("%c", neg);
1396                                 }
1397 
1398                                 fprintf(ci.filePtr, "%d%c", tmp1, ci.delimiter);
1399                                 //printf("%d|", tmp1);
1400                             }
1401                         }
1402                         else
1403                         {
1404                             const uchar* tmpBuf = buf;
1405                             //test flag bit for sign
1406                             bool posNum  = tmpBuf[0] & (0x80);
1407                             uchar tmpChr = tmpBuf[0];
1408                             tmpChr ^= 0x80; //flip the bit
1409                             int32_t tmp1 = tmpChr;
1410 
1411                             //fetch the digits before decimal point
1412                             if (bytesBefore == 0)
1413                             {
1414                                 if (!posNum)
1415                                 {
1416                                     fprintf(ci.filePtr, "%c", neg);
1417                                     //printf("%c", neg);
1418                                 }
1419 
1420                                 fprintf(ci.filePtr, "0.");
1421                                 //printf("0.");
1422                             }
1423                             else if (bytesBefore > 4)
1424                             {
1425                                 for (uint i = 1; i < (bytesBefore - 4); i++)
1426                                 {
1427                                     tmp1 = (tmp1 << 8) + tmpBuf[i];
1428                                 }
1429 
1430                                 if (!posNum)
1431                                 {
1432                                     tmp1 = mask[bytesBefore - 4] - tmp1;
1433                                 }
1434 
1435                                 if (( tmp1 != 0 ) && (tmp1 != -1))
1436                                 {
1437                                     if (!posNum)
1438                                     {
1439                                         fprintf(ci.filePtr, "%c", neg);
1440                                         //printf("%c", neg);
1441                                     }
1442 
1443                                     fprintf(ci.filePtr, "%d", tmp1);
1444                                     //printf("%d", tmp1);
1445                                 }
1446 
1447                                 tmpBuf += (bytesBefore - 4);
1448                                 int32_t tmp2 = *((int32_t*)tmpBuf);
1449                                 tmp2 = ntohl(tmp2);
1450 
1451                                 if ( tmp1 != 0 )
1452                                 {
1453                                     if (!posNum)
1454                                     {
1455                                         tmp2 = mask[4] - tmp2;
1456                                     }
1457 
1458                                     if (tmp1 == -1)
1459                                     {
1460                                         fprintf(ci.filePtr, "%c", neg);
1461                                         fprintf(ci.filePtr, "%d.", tmp2);
1462                                         //printf("%c", neg);
1463                                         //printf("%d.", tmp2);
1464                                     }
1465                                     else
1466                                     {
1467                                         fprintf(ci.filePtr, "%09u.", tmp2);
1468                                         //printf("%09u.", tmp2);
1469                                     }
1470                                 }
1471                                 else
1472                                 {
1473                                     if (!posNum)
1474                                     {
1475                                         tmp2 = mask[4] - tmp2;
1476                                         fprintf(ci.filePtr, "%c", neg);
1477                                         //printf("%c", neg);
1478                                     }
1479 
1480                                     fprintf(ci.filePtr, "%d.", tmp2);
1481                                     //printf("%d.", tmp2);
1482                                 }
1483                             }
1484                             else
1485                             {
1486                                 for (uint i = 1; i < bytesBefore; i++)
1487                                 {
1488                                     tmp1 = (tmp1 << 8) + tmpBuf[i];
1489                                 }
1490 
1491                                 if (!posNum)
1492                                 {
1493                                     tmp1 = mask[bytesBefore] - tmp1;
1494                                     fprintf(ci.filePtr, "%c", neg);
1495                                     //printf("%c", neg);
1496                                 }
1497 
1498                                 fprintf(ci.filePtr, "%d.", tmp1);
1499                                 //printf("%d.", tmp1);
1500                             }
1501 
1502                             //fetch the digits after decimal point
1503                             int32_t tmp2 = 0;
1504 
1505                             if (bytesBefore > 4)
1506                                 tmpBuf += 4;
1507                             else
1508                                 tmpBuf += bytesBefore;
1509 
1510                             tmp2 = tmpBuf[0];
1511 
1512                             if ((totalBytes - bytesBefore) < 5)
1513                             {
1514                                 for (uint j = 1; j < (totalBytes - bytesBefore); j++)
1515                                 {
1516                                     tmp2 = (tmp2 << 8) + tmpBuf[j];
1517                                 }
1518 
1519                                 int8_t digits = ci.columnTypes[colpos].scale - 9; //9 digits is a 4 bytes chunk
1520 
1521                                 if ( digits <= 0 )
1522                                     digits = ci.columnTypes[colpos].scale;
1523 
1524                                 if (!posNum)
1525                                 {
1526                                     tmp2 = mask[totalBytes - bytesBefore] - tmp2;
1527                                 }
1528 
1529                                 fprintf(ci.filePtr, "%0*u%c", digits, tmp2, ci.delimiter);
1530                                 //printf("%0*u|", digits, tmp2);
1531                             }
1532                             else
1533                             {
1534                                 for (uint j = 1; j < 4; j++)
1535                                 {
1536                                     tmp2 = (tmp2 << 8) +  tmpBuf[j];
1537                                 }
1538 
1539                                 if (!posNum)
1540                                 {
1541                                     tmp2 = mask[4] - tmp2;
1542                                 }
1543 
1544                                 fprintf(ci.filePtr, "%09u", tmp2);
1545                                 //printf("%09u", tmp2);
1546 
1547                                 tmpBuf += 4;
1548                                 int32_t tmp3 = tmpBuf[0];
1549 
1550                                 for (uint j = 1; j < (totalBytes - bytesBefore - 4); j++)
1551                                 {
1552                                     tmp3 = (tmp3 << 8) +  tmpBuf[j];
1553                                 }
1554 
1555                                 int8_t digits = ci.columnTypes[colpos].scale - 9; //9 digits is a 4 bytes chunk
1556 
1557                                 if ( digits < 0 )
1558                                     digits = ci.columnTypes[colpos].scale;
1559 
1560                                 if (!posNum)
1561                                 {
1562                                     tmp3 = mask[totalBytes - bytesBefore - 4] - tmp3;
1563                                 }
1564 
1565                                 fprintf(ci.filePtr, "%0*u%c", digits, tmp3, ci.delimiter);
1566                                 //printf("%0*u|", digits, tmp3);
1567                             }
1568                         }
1569                     }
1570 
1571                     buf += totalBytes;
1572                     break;
1573                 }
1574 
1575                 case CalpontSystemCatalog::CHAR:
1576                 case CalpontSystemCatalog::TEXT:
1577                 case CalpontSystemCatalog::VARCHAR:
1578                 case CalpontSystemCatalog::BLOB:
1579                 case CalpontSystemCatalog::VARBINARY:
1580                 {
1581                     Field* fieldPtr = table->field[colpos];
1582                     if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
1583                     {
1584                         fprintf(ci.filePtr, "%c", ci.delimiter);
1585                     }
1586                     else
1587                     {
1588                         String value;
1589                         // We need to set table->read_set for a field first.
1590                         // This happens in ha_mcs_impl_start_bulk_insert().
1591                         fieldPtr->val_str(&value);
1592                         if (UNLIKELY(execplan::isBlobOrVarbinary(ci.columnTypes[colpos].colDataType)))
1593                         {
1594                             const char* ptr = value.ptr();
1595                             for (uint32_t i = 0; i < value.length(); i++)
1596                             {
1597                                 fprintf(ci.filePtr, "%02x", *(uint8_t*)(ptr+i));
1598                             }
1599                             fprintf(ci.filePtr, "%c", ci.delimiter);
1600                         }
1601                         else
1602                         {
1603                             escape.assign(value.ptr(), value.length());
1604                             boost::replace_all(escape, "\\", "\\\\");
1605                             fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(),
1606                                 escape.c_str(), ci.enclosed_by, ci.delimiter);
1607                         }
1608                     }
1609                     buf+= fieldPtr->pack_length();
1610                     break;
1611                 }
1612                 default:	// treat as int64
1613                 {
1614                     break;
1615                 }
1616             }
1617 
1618             colpos++;
1619         }
1620     }
1621 
1622     rc = fprintf(ci.filePtr, "\n"); //@bug 6077 check whether thhe pipe is still open
1623 
1624     if ( rc < 0)
1625         rc = -1;
1626     else
1627         rc = 0;
1628 
1629     return rc;
1630 }
1631 
ha_mcs_impl_viewtablelock(cal_impl_if::cal_connection_info & ci,execplan::CalpontSystemCatalog::TableName & tablename)1632 std::string  ha_mcs_impl_viewtablelock( cal_impl_if::cal_connection_info& ci, execplan::CalpontSystemCatalog::TableName& tablename)
1633 {
1634     THD* thd = current_thd;
1635     ulong sessionID = tid2sid(thd->thread_id);
1636     CalpontDMLPackage* pDMLPackage;
1637     std::string dmlStatement( "VIEWTABLELOCK" );
1638     VendorDMLStatement cmdStmt(dmlStatement, DML_COMMAND, sessionID);
1639     pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
1640     if (lower_case_table_names)
1641     {
1642         boost::algorithm::to_lower(tablename.schema);
1643         boost::algorithm::to_lower(tablename.table);
1644     }
1645     pDMLPackage->set_SchemaName (tablename.schema);
1646     pDMLPackage->set_TableName (tablename.table);
1647 
1648     ByteStream bytestream;
1649     bytestream << static_cast<uint32_t>(sessionID);
1650     pDMLPackage->write(bytestream);
1651     delete pDMLPackage;
1652 
1653     ByteStream::byte b = 0;
1654     ByteStream::octbyte rows;
1655     std::string errorMsg;
1656     std::string tableLockInfo;
1657     //int dmlRowCount = 0;
1658 
1659     try
1660     {
1661         ci.dmlProc->write(bytestream);
1662         bytestream = ci.dmlProc->read();
1663 
1664         if ( bytestream.length() == 0 )
1665         {
1666             thd->get_stmt_da()->set_overwrite_status(true);
1667             thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [5]");
1668         }
1669         else
1670         {
1671             bytestream >> b;
1672             bytestream >> rows;
1673             bytestream >> errorMsg;
1674             bytestream >> tableLockInfo;
1675         }
1676 
1677     }
1678     catch (runtime_error&)
1679     {
1680         thd->get_stmt_da()->set_overwrite_status(true);
1681         thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [6]");
1682     }
1683     catch (...)
1684     {
1685         thd->get_stmt_da()->set_overwrite_status(true);
1686         thd->raise_error_printf(ER_INTERNAL_ERROR, "Caught unknown error");
1687     }
1688 
1689     if ( b != 0 )
1690         tableLockInfo = errorMsg;
1691 
1692     return tableLockInfo;
1693 
1694 }
1695 
1696 //------------------------------------------------------------------------------
1697 // Clear the table lock associated with the specified table lock id.
1698 // Any bulk rollback that is pending will be applied before the table
1699 // lock is released.
1700 //------------------------------------------------------------------------------
ha_mcs_impl_cleartablelock(cal_impl_if::cal_connection_info & ci,uint64_t tableLockID)1701 std::string  ha_mcs_impl_cleartablelock(
1702     cal_impl_if::cal_connection_info& ci,
1703     uint64_t tableLockID)
1704 {
1705     execplan::CalpontSystemCatalog::TableName tblName;
1706     THD* thd        = current_thd;
1707     ulong sessionID = tid2sid(thd->thread_id);
1708     std::string tableLockInfo;
1709     BRM::TableLockInfo lockInfo;
1710 
1711     // Perform preliminary setup.  CalpontDMLPackage expects schema and table
1712     // name to be provided, so we get the table OID for the specified table
1713     // lock, and then get the table name for the applicable table OID.
1714     std::string prelimTask;
1715 
1716     try
1717     {
1718         BRM::DBRM brm;
1719         prelimTask = "getting table locks from BRM.";
1720         bool getLockInfo = brm.getTableLockInfo(tableLockID, &lockInfo);
1721 
1722         if (!getLockInfo)
1723         {
1724             tableLockInfo = "No table lock found for specified table lock ID";
1725             return tableLockInfo;
1726         }
1727 
1728         boost::shared_ptr<execplan::CalpontSystemCatalog> csc =
1729             execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
1730         csc->identity(execplan::CalpontSystemCatalog::FE);
1731 
1732         prelimTask = "getting table name from system catalog.";
1733         tblName    = csc->tableName( lockInfo.tableOID );
1734     }
1735     catch (std::exception& ex)
1736     {
1737         std::string eMsg(ex.what());
1738         eMsg += " Error ";
1739         eMsg += prelimTask;
1740 
1741         thd->get_stmt_da()->set_overwrite_status(true);
1742         thd->raise_error_printf(ER_INTERNAL_ERROR, eMsg.c_str());
1743         return tableLockInfo;
1744     }
1745     catch (...)
1746     {
1747         std::string eMsg(" Error ");
1748         eMsg += prelimTask;
1749 
1750         thd->get_stmt_da()->set_overwrite_status(true);
1751         thd->raise_error_printf(ER_INTERNAL_ERROR, eMsg.c_str());
1752         return tableLockInfo;
1753     }
1754 
1755     std::string dmlStatement( "CLEARTABLELOCK" );
1756     VendorDMLStatement cmdStmt(dmlStatement, DML_COMMAND, sessionID);
1757     CalpontDMLPackage* pDMLPackage =
1758         CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer( cmdStmt );
1759     pDMLPackage->set_SchemaName(tblName.schema);
1760     pDMLPackage->set_TableName (tblName.table );
1761 
1762     // Table lock ID is passed in the SQL statement attribute
1763     std::ostringstream lockIDString;
1764     lockIDString << tableLockID;
1765     pDMLPackage->set_SQLStatement( lockIDString.str() );
1766 
1767     ByteStream bytestream;
1768     bytestream << static_cast<uint32_t>(sessionID);
1769     pDMLPackage->write(bytestream);
1770     delete pDMLPackage;
1771 
1772     ByteStream::byte b = 0;
1773     ByteStream::octbyte rows;
1774     std::string errorMsg;
1775 
1776     try
1777     {
1778         ci.dmlProc->write(bytestream);
1779         bytestream = ci.dmlProc->read();
1780 
1781         if ( bytestream.length() == 0 )
1782         {
1783             thd->get_stmt_da()->set_overwrite_status(true);
1784             thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [7]");
1785         }
1786         else
1787         {
1788             bytestream >> b;
1789             bytestream >> rows;
1790             bytestream >> errorMsg;
1791             bytestream >> tableLockInfo;
1792         }
1793 
1794     }
1795     catch (runtime_error&)
1796     {
1797         thd->get_stmt_da()->set_overwrite_status(true);
1798         thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [8]");
1799     }
1800     catch (...)
1801     {
1802         thd->get_stmt_da()->set_overwrite_status(true);
1803         thd->raise_error_printf(ER_INTERNAL_ERROR, "Caught unknown error");
1804     }
1805 
1806     //@Bug 2606. Send error message back to sql session
1807     if ( b != 0 )
1808         tableLockInfo = errorMsg;
1809 
1810     return tableLockInfo;
1811 }
1812 
ha_mcs_impl_commit_(handlerton * hton,THD * thd,bool all,cal_connection_info & ci)1813 int ha_mcs_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connection_info& ci )
1814 {
1815     if (thd->slave_thread && !get_replication_slave(thd))
1816         return 0;
1817 
1818     int rc = 0;
1819 
1820     std::string command("COMMIT");
1821 #ifdef INFINIDB_DEBUG
1822     cout << "COMMIT" << endl;
1823 #endif
1824     rc = ProcessCommandStatement(thd, command, ci);
1825     return rc;
1826 }
1827 
ha_mcs_impl_rollback_(handlerton * hton,THD * thd,bool all,cal_connection_info & ci)1828 int ha_mcs_impl_rollback_ (handlerton* hton, THD* thd, bool all, cal_connection_info& ci)
1829 {
1830     int rc = 0;
1831 #ifdef INFINIDB_DEBUG
1832     cout << "ROLLBACK" << endl;
1833 #endif
1834 
1835     if (useHdfs)
1836     {
1837         string msg = string("Some non-transactional changed tables couldn't be rolled back");
1838         //	cout << "Some non-transactional changed tables couldn't be rolled back" << endl;
1839         push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 1196, msg.c_str());
1840         return rc;
1841     }
1842 
1843     std::string command("ROLLBACK");
1844     rc = ProcessCommandStatement(thd, command, ci);
1845     return rc;
1846 }
1847 
ha_mcs_impl_close_connection_(handlerton * hton,THD * thd,cal_connection_info & ci)1848 int ha_mcs_impl_close_connection_ (handlerton* hton, THD* thd, cal_connection_info& ci )
1849 {
1850     int rc = 0;
1851 #ifdef INFINIDB_DEBUG
1852     cout << "Close connection session ID " << thd->thread_id << endl;
1853 #endif
1854 
1855     if ( !ci.dmlProc )
1856     {
1857         return rc;
1858     }
1859 
1860     std::string command("CLEANUP");
1861     rc = ProcessCommandStatement(thd, command, ci);
1862     // @bug 1622. remove calpontsystemcatalog and close the socket when session quit.
1863     // @info when Calpont process a select query, an alter table phase is involved in
1864     // the vtable design, which will auto start a transaction. when autocommit on (by default), rollback is automically called
1865     // when session quit. rollback can also be called by user explicitly to rollback
1866     // a transaction. Under either situation, system catalog cache for this session should
1867     // be removed
1868     return rc;
1869 }
1870 
1871 // vim:ts=4 sw=4:
1872 
1873