1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2019 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 /*
20 * $Id: ha_mcs_dml.cpp 9711 2013-07-23 21:01:27Z chao $
21 */
22
23 #include <my_config.h>
24 #include <string>
25 #include <iostream>
26 #include <stack>
27 #ifdef _MSC_VER
28 #include <unordered_map>
29 #include <unordered_set>
30 #else
31 #include <tr1/unordered_map>
32 #include <tr1/unordered_set>
33 #endif
34 #include <fstream>
35 #include <sstream>
36 #include <cerrno>
37 #include <cstring>
38 using namespace std;
39
40 #include <boost/shared_ptr.hpp>
41 using namespace boost;
42
43 #include "idb_mysql.h"
44
45 #define NEED_CALPONT_INTERFACE
46 #include "ha_mcs_impl.h"
47
48 #include "ha_mcs_impl_if.h"
49 using namespace cal_impl_if;
50
51 #include "vendordmlstatement.h"
52 #include "calpontdmlpackage.h"
53 #include "calpontdmlfactory.h"
54 using namespace dmlpackage;
55
56 #include "dmlpackageprocessor.h"
57 using namespace dmlpackageprocessor;
58
59 #include "dataconvert.h"
60 using namespace dataconvert;
61
62 #include "bytestream.h"
63 using namespace messageqcpp;
64
65 #include "configcpp.h"
66 using namespace config;
67
68 #include "calpontsystemcatalog.h"
69 using namespace execplan;
70
71 #include "resourcemanager.h"
72 using namespace joblist;
73 //#include "stopwatch.h"
74 //using namespace logging;
75
76 #include "dbrm.h"
77
78 namespace
79 {
80 #define BATCH_INSERT_GROUP_ROWS_FOR_CACHE 100000
81 uint64_t fBatchInsertGroupRows = 0; // ResourceManager::instance()->getRowsPerBatch();
82 // HDFS is never used nowadays, so don't bother
83 bool useHdfs = false; // ResourceManager::instance()->useHdfs();
84
85 //convenience fcn
tid2sid(const uint32_t tid)86 inline uint32_t tid2sid(const uint32_t tid)
87 {
88 return execplan::CalpontSystemCatalog::idb_tid2sid(tid);
89 }
90
91 //StopWatch timer;
buildValueList(TABLE * table,cal_connection_info & ci)92 uint32_t buildValueList (TABLE* table, cal_connection_info& ci )
93 {
94 char attribute_buffer[1024];
95 String attribute(attribute_buffer, sizeof(attribute_buffer),
96 &my_charset_bin);
97 uint32_t size = 0;
98 int columnPos = 0;
99 double dbval;
100 ci.nullValuesBitset.reset();
101
102 for (Field** field = table->field; *field; field++)
103 {
104 if ((*field)->is_null())
105 {
106 ci.tableValuesMap[columnPos].push_back (""); //currently, empty string is treated as null.
107 ci.nullValuesBitset[columnPos] = true;
108 }
109 else
110 {
111 bitmap_set_bit(table->read_set, (*field)->field_index);
112 ci.nullValuesBitset[columnPos] = false;
113
114 // @bug 3798 get real value for float/double type
115 if ((*field)->result_type() == REAL_RESULT)
116 {
117 dbval = (*field)->val_real();
118 //int maxlen = (*field)->max_display_length();
119 const unsigned maxlen = 1024 + 1 + 1 + 1; //1 for leading zero, 1 for dp, 1 for null
120 char buf[maxlen];
121 memset(buf, 0, maxlen);
122 snprintf(buf, maxlen, "%.1024f", dbval);
123 ci.tableValuesMap[columnPos].push_back(buf);
124 }
125 else
126 {
127 //fetch different data type
128 (*field)->val_str(&attribute, &attribute);
129
130 if (attribute.length() == 0)
131 {
132 ci.tableValuesMap[columnPos].push_back (""); //currently, empty string is treated as null.
133 }
134 else
135 {
136 string val(attribute.ptr(), attribute.length());
137 ci.tableValuesMap[columnPos].push_back(val);
138 }
139 }
140 }
141
142 ci.colNameList.push_back((*field)->field_name.str);
143
144 columnPos++;
145 }
146
147 size = ci.tableValuesMap[0].size();
148 return size;
149 }
150
151
ProcessCommandStatement(THD * thd,string & dmlStatement,cal_connection_info & ci,std::string schema="")152 int ProcessCommandStatement(THD* thd, string& dmlStatement, cal_connection_info& ci, std::string schema = "")
153 {
154 int rc = 0;
155
156 ulong sessionID = tid2sid(thd->thread_id);
157
158 CalpontDMLPackage* pDMLPackage;
159
160 //@Bug 2721 and 2722. Log the statement before issuing commit/rollback
161 if ( dmlStatement == "LOGGING" )
162 {
163 char* query_char = idb_mysql_query_str(thd);
164 std::string query_str;
165 if (!query_char)
166 {
167 query_str = "<Replication event>";
168 }
169 else
170 {
171 query_str = query_char;
172 }
173 VendorDMLStatement cmdStmt(query_str, DML_COMMAND, sessionID);
174 cmdStmt.set_Logging( false );
175 pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
176 pDMLPackage->set_Logging( false );
177 pDMLPackage->set_SchemaName( schema );
178 }
179 else
180 {
181 VendorDMLStatement cmdStmt(dmlStatement, DML_COMMAND, sessionID);
182 pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
183 }
184
185 pDMLPackage->setTableOid (ci.tableOid);
186
187 if (!ci.singleInsert)
188 {
189 pDMLPackage->set_isBatchInsert(true);
190 }
191
192 if (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
193 pDMLPackage->set_isAutocommitOn(true);
194
195 if (useHdfs)
196 pDMLPackage->set_isAutocommitOn(true);
197
198 ByteStream bytestream;
199 bytestream << static_cast<uint32_t>(sessionID);
200
201 pDMLPackage->write(bytestream);
202 delete pDMLPackage;
203
204 ByteStream::byte b = 0;
205 string errormsg;
206 ByteStream::octbyte rows;
207
208 try
209 {
210 ci.dmlProc->write(bytestream);
211 bytestream = ci.dmlProc->read();
212
213 if ( bytestream.length() == 0 )
214 {
215 rc = 1;
216 thd->killed = KILL_QUERY;
217 thd->get_stmt_da()->set_overwrite_status(true);
218
219 thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [1]");
220 }
221 else
222 {
223 bytestream >> b;
224 bytestream >> rows;
225 bytestream >> errormsg;
226 }
227 }
228 catch (runtime_error&)
229 {
230 rc = 1 ;
231 thd->killed = KILL_QUERY;
232 thd->get_stmt_da()->set_overwrite_status(true);
233
234 thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [2]");
235 }
236 catch (...)
237 {
238 rc = 1;
239 thd->killed = KILL_QUERY;
240 thd->get_stmt_da()->set_overwrite_status(true);
241
242 thd->raise_error_printf(ER_INTERNAL_ERROR, "Caught unknown error");
243 }
244
245 if (( b != 0 ) && (!thd->get_stmt_da()->is_set()))
246 {
247 rc = 1;
248 thd->killed = KILL_QUERY;
249 thd->raise_error_printf(ER_INTERNAL_ERROR, errormsg.c_str());
250 }
251
252 delete ci.dmlProc;
253 ci.dmlProc = NULL;
254 return rc;
255 }
256
doProcessInsertValues(TABLE * table,uint32_t size,cal_connection_info & ci,bool lastBatch=false)257 int doProcessInsertValues ( TABLE* table, uint32_t size, cal_connection_info& ci, bool lastBatch = false )
258 {
259 THD* thd = current_thd;
260 uint32_t sessionID = tid2sid(thd->thread_id);
261
262 int rc = 0;
263
264 char* query_char = idb_mysql_query_str(thd);
265 std::string query_str;
266 if (!query_char)
267 {
268 query_str = "<Replication event>";
269 }
270 else
271 {
272 query_str = query_char;
273 }
274
275 VendorDMLStatement dmlStmts(query_str, DML_INSERT, table->s->table_name.str,
276 table->s->db.str, size, ci.colNameList.size(), ci.colNameList,
277 ci.tableValuesMap, ci.nullValuesBitset, sessionID);
278
279 CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStmts);
280 //@Bug 2466 Move the clean up earlier to avoid the second insert in another session to get the data
281 ci.tableValuesMap.clear();
282 ci.colNameList.clear();
283
284 if (!pDMLPackage)
285 {
286 rc = -1;
287 string emsg("Calpont DML package cannot build. ");
288 thd->get_stmt_da()->set_overwrite_status(true);
289 thd->raise_error_printf(ER_INTERNAL_ERROR, emsg.c_str());
290 return rc;
291 }
292
293 //@Bug 2721 and 2722 log batch insert statement only once in the bebug file.
294 if (( ( ci.isLoaddataInfile ) || ((ci.rowsHaveInserted + size) < ci.bulkInsertRows ) ) && (ci.rowsHaveInserted > 0))
295 {
296 pDMLPackage->set_Logging( false );
297 pDMLPackage->set_Logending( false );
298 }
299 else if (( ( ci.isLoaddataInfile ) || ((ci.rowsHaveInserted + size) < ci.bulkInsertRows ) ) && (ci.rowsHaveInserted == 0))
300 {
301 pDMLPackage->set_Logging( true );
302 pDMLPackage->set_Logending( false );
303 }
304
305 if ( ci.singleInsert )
306 {
307 pDMLPackage->set_Logging( true );
308 pDMLPackage->set_Logending( true );
309 }
310
311 if ( !ci.singleInsert )
312 {
313 pDMLPackage->set_isBatchInsert( true );
314 }
315
316 if (thd->is_strict_mode())
317 {
318 pDMLPackage->set_isWarnToError( true );
319 }
320
321 pDMLPackage->setTableOid (ci.tableOid);
322
323 if (lastBatch)
324 {
325 pDMLPackage->set_Logending( true );
326
327 }
328
329 if (lastBatch && (ci.rowsHaveInserted > 0))
330 pDMLPackage->set_Logging( false );
331
332 std::string name = table->s->table_name.str;
333 pDMLPackage->set_TableName(name);
334 name = table->s->db.str;
335 pDMLPackage->set_SchemaName(name);
336 pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
337
338 if (thd->lex->sql_command == SQLCOM_INSERT_SELECT)
339 pDMLPackage->set_isInsertSelect(true);
340
341 //Carry session autocommit info in the pkg to use in DMLProc
342 //cout << "Thread options = " << thd->variables.option_bits << " and OPTION_NOT_AUTOCOMMIT:OPTION_BEGIN = " << OPTION_NOT_AUTOCOMMIT << ":" << OPTION_BEGIN << endl;
343 if (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
344 {
345 //cout << "autocommit is on" << endl;
346 pDMLPackage->set_isAutocommitOn(true);
347 }
348 else if (useHdfs)
349 {
350 pDMLPackage->set_isAutocommitOn(true);
351 }
352
353 ByteStream bytestream, bytestreamRcv;
354 bytestream << sessionID;
355
356 pDMLPackage->write(bytestream);
357 delete pDMLPackage;
358
359 ByteStream::byte b = 0;
360 string errormsg;
361 ByteStream::octbyte rows;
362
363 try
364 {
365 ci.dmlProc->write(bytestream);
366 bytestreamRcv = ci.dmlProc->read();
367
368 if ( bytestreamRcv.length() == 0 )
369 {
370 //check if it is first batch and DMLProc restarted. Only this case, get a new client and resend
371 if (ci.rowsHaveInserted == 0)
372 {
373 delete ci.dmlProc;
374 ci.dmlProc = new MessageQueueClient("DMLProc");
375
376 //cout << "doProcessInsertValues starts a client " << ci.dmlProc << " for session " << thd->thread_id << endl;
377 try
378 {
379 ci.dmlProc->write(bytestream);
380 bytestreamRcv = ci.dmlProc->read();
381
382 if ( bytestreamRcv.length() == 0 )
383 {
384 rc = -1;
385 b = 1;
386 errormsg = "Lost connection to DMLProc [3]";
387 }
388 else
389 {
390 bytestreamRcv >> b;
391 bytestreamRcv >> rows;
392 bytestreamRcv >> errormsg;
393 rc = b;
394 }
395 }
396 catch (runtime_error&)
397 {
398 rc = -1;
399 thd->get_stmt_da()->set_overwrite_status(true);
400 errormsg = "Lost connection to DMLProc [4]";
401 b = 1;
402 }
403 }
404 }
405 else
406 {
407 bytestreamRcv >> b;
408 bytestreamRcv >> rows;
409 bytestreamRcv >> errormsg;
410 rc = b;
411 }
412 }
413 catch (std::exception& rex)
414 {
415 //check if it is first batch and DMLProc restarted. Only this case, get a new client and resend
416 if (ci.rowsHaveInserted == 0)
417 {
418 delete ci.dmlProc;
419 ci.dmlProc = new MessageQueueClient("DMLProc");
420
421 //cout << "doProcessInsertValues exception starts a client " << ci.dmlProc << " for session " << thd->thread_id << endl;
422 try
423 {
424 ci.dmlProc->write(bytestream);
425 bytestreamRcv = ci.dmlProc->read();
426
427 if ( bytestreamRcv.length() == 0 )
428 {
429 rc = -1;
430 b = 1;
431 errormsg = string("Lost connection to DMLProc after getting a new client [1:") + rex.what() + "]";
432 }
433 else
434 {
435 bytestreamRcv >> b;
436 bytestreamRcv >> rows;
437 bytestreamRcv >> errormsg;
438 rc = b;
439 }
440 }
441 catch (std::exception& rrex)
442 {
443 rc = -1;
444 thd->get_stmt_da()->set_overwrite_status(true);
445 errormsg = string("Lost connection to DMLProc after getting a new client [2:") + rex.what() + " then " + rrex.what() + "]";
446 b = 1;
447 }
448 }
449 else //really lost connection
450 {
451 rc = -1;
452 thd->get_stmt_da()->set_overwrite_status(true);
453 errormsg = string("Lost connection to DMLProc really [1:") + rex.what() + "]";
454 b = 1;
455 }
456 }
457 catch (...)
458 {
459 rc = -1;
460 thd->get_stmt_da()->set_overwrite_status(true);
461 errormsg = "Unknown error caught";
462 b = 1;
463 }
464
465
466 if ((b != 0) && (b != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
467 {
468 thd->get_stmt_da()->set_overwrite_status(true);
469 thd->raise_error_printf(ER_INTERNAL_ERROR, errormsg.c_str());
470
471 }
472
473 if ( b == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING )
474 {
475 if (!thd->is_strict_mode())
476 {
477 rc = 0;
478 }
479
480 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARN_DATA_OUT_OF_RANGE, errormsg.c_str());
481 }
482
483 if ( rc != 0 )
484 ci.rc = rc;
485
486 if ( b == dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR )
487 rc = b;
488
489 return rc;
490 }
491
492 }
493
ha_mcs_impl_write_last_batch(TABLE * table,cal_connection_info & ci,bool abort)494 int ha_mcs_impl_write_last_batch(TABLE* table, cal_connection_info& ci, bool abort)
495 {
496 int rc = 0;
497 THD* thd = current_thd;
498 std::string command;
499 uint32_t size = ci.tableValuesMap[0].size();
500 //@Bug 2468. Add a logging statement command
501 command = "COMMIT";
502 std::string schema;
503 schema = table->s->db.str;
504
505 //@Bug 6112. if no row to be insert and no rows have been inserted, no need to send to DMLProc
506 if ((size == 0) && (ci.rowsHaveInserted == 0))
507 return rc;
508
509 //@Bug 2715 Check the saved error code.
510 //@Bug 4516 always send the last package to allow DMLProc receive all messages from WES
511 if (( ci.rc != 0 ) || abort )
512 {
513 if (abort) //@Bug 5285. abort is different from error, dmlproc only clean up when erroring out
514 rc = doProcessInsertValues( table, size, ci, true);
515
516 //@Bug 2722 Log the statement into datamod log
517 //@Bug 4605 if error, rollback and no need to check whether the session is autocommit off
518
519 command = "ROLLBACK";
520 rc = ProcessCommandStatement ( thd, command, ci, schema );
521 rc = ci.rc;
522 ci.rc = 0;
523
524 if (size > 0 )
525 {
526 ci.tableValuesMap.clear();
527 ci.colNameList.clear();
528 }
529
530 return rc;
531 }
532 else
533 {
534 rc = doProcessInsertValues( table, size, ci, true);
535 }
536
537 if ( abort )
538 {
539 rc = 1;
540 thd->get_stmt_da()->set_overwrite_status(true);
541 std::string errormsg = "statement is aborted.";
542 thd->raise_error_printf(ER_INTERNAL_ERROR, errormsg.c_str());
543 }
544
545 if ( rc == dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR )
546 return rc;
547
548 //@Bug 4605
549 int rc1 = 0;
550 if ( (rc == 0) && !abort && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))
551 {
552 ci.rowsHaveInserted += size;
553 command = "COMMIT";
554 rc1 = ProcessCommandStatement ( thd, command, ci, schema );
555 }
556 else if (useHdfs)
557 {
558 ci.rowsHaveInserted += size;
559 command = "COMMIT";
560 rc1 = ProcessCommandStatement ( thd, command, ci, schema );
561 }
562 else if (( rc != 0) || abort )
563 {
564 command = "ROLLBACK";
565 rc1 = ProcessCommandStatement ( thd, command, ci, schema );
566 }
567 rc = max(rc, rc1);
568
569 return rc;
570
571 }
572
ha_mcs_impl_write_row_(const uchar * buf,TABLE * table,cal_connection_info & ci,ha_rows & rowsInserted)573 int ha_mcs_impl_write_row_(const uchar* buf, TABLE* table, cal_connection_info& ci, ha_rows& rowsInserted)
574 {
575 int rc = 0;
576 //timer.start( "buildValueList");
577 ci.colNameList.clear();
578 THD* thd = current_thd;
579 uint32_t size = 0;
580 std::string schema;
581 schema = table->s->db.str;
582
583 //@Bug 2086 Added syntax check for '\0'
584 try
585 {
586 size = buildValueList ( table, ci );
587 }
588 catch (runtime_error& rex)
589 {
590 rc = 1;
591 ci.rc = rc; //@Bug 2790 Save the error infomation.
592 thd->get_stmt_da()->set_overwrite_status(true);
593 thd->raise_error_printf(ER_INTERNAL_ERROR, rex.what());
594 return rc;
595 }
596
597 if (fBatchInsertGroupRows == 0)
598 {
599 fBatchInsertGroupRows = ResourceManager::instance()->getRowsPerBatch();
600 }
601
602 //timer.stop( "buildValueList");
603 if ( ci.singleInsert // Single insert
604 || (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows )
605 || ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows) || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) )) )
606 //Insert with mutilple value case: processed batch by batch. Last batch is sent also.
607 || (( ci.bulkInsertRows == 0 ) && ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows)
608 || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) ) ) ) // Load data in file is processed batch by batch
609 {
610 //timer.start( "DMLProc takes");
611 //cout <<" sending a batch to DMLProc ... The size is " << size << " the current bulkInsertRows = " << ci.bulkInsertRows << endl;
612 //Build dmlpackage
613 if (( ci.bulkInsertRows > 0 ) && ( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ))
614 {
615 rc = doProcessInsertValues( table, size, ci, true );
616 }
617 else
618 {
619 rc = doProcessInsertValues( table, size, ci );
620 }
621
622 if ( rc == 0 )
623 rowsInserted = size;
624 else
625 ci.rc = rc;
626
627 //@Bug 2481. The current active transaction needs to be released if autocommit is on
628 //@Bug 2438 Added a check for batchinsert's last batch to send commit if autocommit is on
629 std::string command;
630
631 if ( ci.singleInsert || ( (ci.bulkInsertRows > 0 ) && (( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ) ) )
632 {
633 if ( thd->killed > 0 )
634 {
635 command = "ROLLBACK";
636 rc = ProcessCommandStatement ( thd, command, ci, schema );
637 }
638 else if (rc != dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR)
639 {
640 //@Bug 4605
641 int rc1 = 0;
642 if ( rc != 0 )
643 {
644 command = "ROLLBACK";
645 rc1 = ProcessCommandStatement ( thd, command, ci, schema );
646 }
647 else if (( rc == 0 ) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))
648 {
649 command = "COMMIT";
650 rc1 = ProcessCommandStatement ( thd, command, ci, schema );
651 }
652 else if (useHdfs)
653 {
654 command = "COMMIT";
655 rc1 = ProcessCommandStatement ( thd, command, ci, schema );
656 }
657 rc = max(rc, rc1);
658 }
659 }
660
661 //timer.stop( "DMLProc takes");
662 //timer.finish();
663 return rc;
664
665 }
666 else
667 {
668 return rc;
669 }
670 }
671
ha_mcs_impl_write_batch_row_(const uchar * buf,TABLE * table,cal_impl_if::cal_connection_info & ci)672 int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci)
673 {
674 ByteStream rowData;
675 int rc = 0;
676 //std::ostringstream data;
677 bool nullVal = false;
678 const uchar* bufHdr = buf; // bit flag indicating a field is null. Only those fields that are nullable are represented.
679 int32_t headerByte = 0; // Current byte in the bufHdr
680 int32_t headerBit = 0; // current bit in the bufHdr current byte.
681 uint16_t colpos = 0;
682 buf = buf + ci.headerLength; // Number of bytes used for null bits.
683 //@Bug 6122 if all columns have not null constraint, there is no information in the header
684 std::string escape;
685 char nullBits = *bufHdr++;
686
687 if (!ci.useXbit)
688 {
689 // Skip the first bit. For some reason, mysql reserves the first bit of the first byte, unless there's a varchar column in the table.
690 nullBits = nullBits >> 1;
691 ++headerBit;
692 }
693
694 while (colpos < ci.columnTypes.size()) //test bitmap for null values
695 {
696 uint8_t numLoop = 7;
697
698 if ((ci.useXbit) || (colpos > 6))
699 numLoop++;
700
701 for (uint8_t i = 0; i < numLoop; i++)
702 {
703 if (colpos == ci.columnTypes.size())
704 break;
705
706 //if a column has not null constraint, it will not be in the bit map
707 if (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)
708 {
709 if (ci.headerLength > 0 && headerByte >= ci.headerLength)
710 {
711 // We've used more null bits than allowed. Something is seriously wrong.
712 std::string errormsg = "Null bit header is wrong size";
713 setError(current_thd, ER_INTERNAL_ERROR, errormsg);
714 return -1;
715 }
716
717 nullVal = nullBits & 0x01;
718 nullBits = nullBits >> 1;
719 ++headerBit;
720
721 if (headerBit == 8)
722 {
723 nullBits = *bufHdr++;
724 headerBit = 0;
725 ++headerByte;
726 }
727 }
728 else
729 {
730 nullVal = false;
731 }
732
733 switch (ci.columnTypes[colpos].colDataType)
734 {
735 case CalpontSystemCatalog::DATE: //date fetch
736 {
737
738 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
739 {
740 fprintf(ci.filePtr, "%c", ci.delimiter);
741 }
742 else
743 {
744 const uchar* tmp1 = buf;
745 uint32_t tmp = (tmp1[2] << 16) + (tmp1[1] << 8) + tmp1[0];
746
747 int day = tmp & 0x0000001fl;
748 int month = (tmp >> 5) & 0x0000000fl;
749 int year = tmp >> 9;
750 fprintf(ci.filePtr, "%04d-%02d-%02d%c", year, month, day, ci.delimiter);
751 }
752
753 buf += 3;
754 break;
755 }
756
757 case CalpontSystemCatalog::DATETIME:
758 {
759 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
760 {
761 fprintf(ci.filePtr, "%c", ci.delimiter);
762
763 if (table->field[colpos]->real_type() == MYSQL_TYPE_DATETIME2)
764 buf += table->field[colpos]->pack_length();
765 else
766 buf += 8;
767 }
768 else
769 {
770 if (table->field[colpos]->real_type() == MYSQL_TYPE_DATETIME2)
771 {
772 // mariadb 10.1 compatibility -- MYSQL_TYPE_DATETIME2 introduced in mysql 5.6
773 MYSQL_TIME ltime;
774 const uchar* pos = buf;
775 longlong tmp = my_datetime_packed_from_binary(pos, table->field[colpos]->decimals());
776 TIME_from_longlong_datetime_packed(<ime, tmp);
777
778 if (!ltime.second_part)
779 {
780 fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d%c",
781 ltime.year, ltime.month, ltime.day,
782 ltime.hour, ltime.minute, ltime.second, ci.delimiter);
783 }
784 else
785 {
786 fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d.%ld%c",
787 ltime.year, ltime.month, ltime.day,
788 ltime.hour, ltime.minute, ltime.second,
789 ltime.second_part, ci.delimiter);
790 }
791
792 buf += table->field[colpos]->pack_length();
793 }
794 else
795 {
796 long long value = *((long long*) buf);
797 long datePart = (long) (value / 1000000ll);
798 int day = datePart % 100;
799 int month = (datePart / 100) % 100;
800 int year = datePart / 10000;
801 fprintf(ci.filePtr, "%04d-%02d-%02d ", year, month, day);
802
803 long timePart = (long) (value - (long long) datePart * 1000000ll);
804 int second = timePart % 100;
805 int min = (timePart / 100) % 100;
806 int hour = timePart / 10000;
807 fprintf(ci.filePtr, "%02d:%02d:%02d%c", hour, min, second, ci.delimiter);
808 buf += 8;
809 }
810 }
811
812 break;
813 }
814
815 case CalpontSystemCatalog::TIME:
816 {
817 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
818 {
819 fprintf(ci.filePtr, "%c", ci.delimiter);
820
821 buf += table->field[colpos]->pack_length();
822 }
823 else
824 {
825 MYSQL_TIME ltime;
826 const uchar* pos = buf;
827 longlong tmp = my_time_packed_from_binary(pos, table->field[colpos]->decimals());
828 TIME_from_longlong_time_packed(<ime, tmp);
829
830 if (ltime.neg)
831 {
832 fprintf(ci.filePtr, "-");
833 }
834
835 if (!ltime.second_part)
836 {
837 fprintf(ci.filePtr, "%02d:%02d:%02d%c",
838 ltime.hour, ltime.minute, ltime.second, ci.delimiter);
839 }
840 else
841 {
842 fprintf(ci.filePtr, "%02d:%02d:%02d.%ld%c",
843 ltime.hour, ltime.minute, ltime.second,
844 ltime.second_part, ci.delimiter);
845 }
846
847 buf += table->field[colpos]->pack_length();
848 }
849
850 break;
851 }
852
853 case CalpontSystemCatalog::TIMESTAMP:
854 {
855 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
856 {
857 fprintf(ci.filePtr, "%c", ci.delimiter);
858 }
859 else
860 {
861 const uchar* pos = buf;
862 struct timeval tm;
863 my_timestamp_from_binary(&tm, pos, table->field[colpos]->decimals());
864
865 MySQLTime time;
866 gmtSecToMySQLTime(tm.tv_sec, time, current_thd->variables.time_zone->get_name()->ptr());
867
868 if (!tm.tv_usec)
869 {
870 fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d%c",
871 time.year, time.month, time.day,
872 time.hour, time.minute, time.second, ci.delimiter);
873 }
874 else
875 {
876 fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d.%ld%c",
877 time.year, time.month, time.day,
878 time.hour, time.minute, time.second,
879 tm.tv_usec, ci.delimiter);
880 }
881 }
882
883 buf += table->field[colpos]->pack_length();
884
885 break;
886 }
887 case CalpontSystemCatalog::BIGINT:
888 {
889 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
890 fprintf(ci.filePtr, "%c", ci.delimiter);
891 else
892 fprintf(ci.filePtr, "%lld%c", *((long long*)buf), ci.delimiter);
893
894 buf += 8;
895 break;
896 }
897
898 case CalpontSystemCatalog::UBIGINT:
899 {
900 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
901 fprintf(ci.filePtr, "%c", ci.delimiter);
902 else
903 fprintf(ci.filePtr, "%llu%c", *((long long unsigned*)buf), ci.delimiter);
904
905 buf += 8;
906 break;
907 }
908
909 case CalpontSystemCatalog::INT:
910 case CalpontSystemCatalog::MEDINT:
911 {
912 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
913 fprintf(ci.filePtr, "%c", ci.delimiter);
914 else
915 fprintf(ci.filePtr, "%d%c", *((int32_t*)buf), ci.delimiter);
916
917 buf += 4;
918 break;
919 }
920
921 case CalpontSystemCatalog::UINT:
922 case CalpontSystemCatalog::UMEDINT:
923 {
924 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
925 fprintf(ci.filePtr, "%c", ci.delimiter);
926 else
927 {
928 fprintf(ci.filePtr, "%u%c", *((uint32_t*)buf), ci.delimiter);
929 //printf("%u|", *((uint32_t*)buf));
930 //cout << *((uint32_t*)buf) << endl;
931 }
932
933 buf += 4;
934 break;
935 }
936
937 case CalpontSystemCatalog::SMALLINT:
938 {
939 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
940 fprintf(ci.filePtr, "%c", ci.delimiter);
941 else
942 fprintf(ci.filePtr, "%d%c", *((int16_t*)buf), ci.delimiter);
943
944 buf += 2;
945 break;
946 }
947
948 case CalpontSystemCatalog::USMALLINT:
949 {
950 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
951 fprintf(ci.filePtr, "%c", ci.delimiter);
952 else
953 fprintf(ci.filePtr, "%u%c", *((uint16_t*)buf), ci.delimiter);
954
955 buf += 2;
956 break;
957 }
958
959 case CalpontSystemCatalog::TINYINT:
960 {
961 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
962 fprintf(ci.filePtr, "%c", ci.delimiter);
963 else
964 fprintf(ci.filePtr, "%d%c", *((int8_t*)buf), ci.delimiter);
965
966 buf += 1;
967 break;
968 }
969
970 case CalpontSystemCatalog::UTINYINT:
971 {
972 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
973 fprintf(ci.filePtr, "%c", ci.delimiter);
974 else
975 fprintf(ci.filePtr, "%u%c", *((uint8_t*)buf), ci.delimiter);
976
977 buf += 1;
978 break;
979 }
980
981 case CalpontSystemCatalog::FLOAT:
982 case CalpontSystemCatalog::UFLOAT:
983 {
984 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
985 fprintf(ci.filePtr, "%c", ci.delimiter);
986 else
987 {
988 float val = *((float*)buf);
989
990 if ((fabs(val) > (1.0 / IDB_pow[4])) && (fabs(val) < (float) IDB_pow[6]))
991 {
992 fprintf(ci.filePtr, "%.7f%c", val, ci.delimiter);
993 }
994 else
995 {
996 fprintf(ci.filePtr, "%e%c", val, ci.delimiter);
997 }
998
999
1000 //fprintf(ci.filePtr, "%.7g|", *((float*)buf));
1001 //printf("%.7f|", *((float*)buf));
1002 }
1003
1004 buf += 4;
1005 break;
1006 }
1007
1008 case CalpontSystemCatalog::DOUBLE:
1009 case CalpontSystemCatalog::UDOUBLE:
1010 {
1011 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
1012 fprintf(ci.filePtr, "%c", ci.delimiter);
1013 else
1014 {
1015 fprintf(ci.filePtr, "%.15g%c", *((double*)buf), ci.delimiter);
1016 //printf("%.15g|", *((double*)buf));
1017 }
1018
1019 buf += 8;
1020 break;
1021 }
1022
1023 case CalpontSystemCatalog::LONGDOUBLE:
1024 {
1025 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
1026 fprintf(ci.filePtr, "%c", ci.delimiter);
1027 else
1028 {
1029 fprintf(ci.filePtr, "%.15Lg%c", *((long double*)buf), ci.delimiter);
1030 //printf("%.15g|", *((double*)buf));
1031 }
1032
1033 buf += 8;
1034 break;
1035 }
1036
1037 case CalpontSystemCatalog::DECIMAL:
1038 case CalpontSystemCatalog::UDECIMAL:
1039 {
1040 uint bytesBefore = 1;
1041 uint totalBytes = 9;
1042
1043 switch (ci.columnTypes[colpos].precision)
1044 {
1045 case 18:
1046 case 17:
1047 case 16:
1048 {
1049 totalBytes = 8;
1050 break;
1051 }
1052
1053 case 15:
1054 case 14:
1055 {
1056 totalBytes = 7;
1057 break;
1058 }
1059
1060 case 13:
1061 case 12:
1062 {
1063 totalBytes = 6;
1064 break;
1065 }
1066
1067 case 11:
1068 {
1069 totalBytes = 5;
1070 break;
1071 }
1072
1073 case 10:
1074 {
1075 totalBytes = 5;
1076 break;
1077 }
1078
1079 case 9:
1080 case 8:
1081 case 7:
1082 {
1083 totalBytes = 4;
1084 break;
1085 }
1086
1087 case 6:
1088 case 5:
1089 {
1090 totalBytes = 3;
1091 break;
1092 }
1093
1094 case 4:
1095 case 3:
1096 {
1097 totalBytes = 2;
1098 break;
1099 }
1100
1101 case 2:
1102 case 1:
1103 {
1104 totalBytes = 1;
1105 break;
1106 }
1107
1108 default:
1109 break;
1110 }
1111
1112 switch (ci.columnTypes[colpos].scale)
1113 {
1114 case 0:
1115 {
1116 bytesBefore = totalBytes;
1117 break;
1118 }
1119
1120 case 1: //1 byte for digits after decimal point
1121 {
1122 if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14)
1123 && (ci.columnTypes[colpos].precision != 12) && (ci.columnTypes[colpos].precision != 10)
1124 && (ci.columnTypes[colpos].precision != 7) && (ci.columnTypes[colpos].precision != 5)
1125 && (ci.columnTypes[colpos].precision != 3) && (ci.columnTypes[colpos].precision != 1))
1126 totalBytes++;
1127
1128 bytesBefore = totalBytes - 1;
1129 break;
1130 }
1131
1132 case 2: //1 byte for digits after decimal point
1133 {
1134 if ((ci.columnTypes[colpos].precision == 18) || (ci.columnTypes[colpos].precision == 9))
1135 totalBytes++;
1136
1137 bytesBefore = totalBytes - 1;
1138 break;
1139 }
1140
1141 case 3: //2 bytes for digits after decimal point
1142 {
1143 if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14)
1144 && (ci.columnTypes[colpos].precision != 12) && (ci.columnTypes[colpos].precision != 7)
1145 && (ci.columnTypes[colpos].precision != 5) && (ci.columnTypes[colpos].precision != 3))
1146 totalBytes++;
1147
1148 bytesBefore = totalBytes - 2;
1149 break;
1150 }
1151
1152 case 4:
1153 {
1154 if ((ci.columnTypes[colpos].precision == 18) || (ci.columnTypes[colpos].precision == 11)
1155 || (ci.columnTypes[colpos].precision == 9))
1156 totalBytes++;
1157
1158 bytesBefore = totalBytes - 2;
1159 break;
1160
1161 }
1162
1163 case 5:
1164 {
1165 if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14)
1166 && (ci.columnTypes[colpos].precision != 7) && (ci.columnTypes[colpos].precision != 5))
1167 totalBytes++;
1168
1169 bytesBefore = totalBytes - 3;
1170 break;
1171 }
1172
1173 case 6:
1174 {
1175 if ((ci.columnTypes[colpos].precision == 18) || (ci.columnTypes[colpos].precision == 13)
1176 || (ci.columnTypes[colpos].precision == 11) || (ci.columnTypes[colpos].precision == 9))
1177 totalBytes++;
1178
1179 bytesBefore = totalBytes - 3;
1180 break;
1181 }
1182
1183 case 7:
1184 {
1185 if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 7))
1186 totalBytes++;
1187
1188 bytesBefore = totalBytes - 4;
1189 break;
1190 }
1191
1192 case 8:
1193 {
1194 if ((ci.columnTypes[colpos].precision == 18) || (ci.columnTypes[colpos].precision == 15)
1195 || (ci.columnTypes[colpos].precision == 13) || (ci.columnTypes[colpos].precision == 11)
1196 || (ci.columnTypes[colpos].precision == 9))
1197 totalBytes++;
1198
1199 bytesBefore = totalBytes - 4;;
1200 break;
1201 }
1202
1203 case 9:
1204 {
1205 bytesBefore = totalBytes - 4;;
1206 break;
1207 }
1208
1209 case 10:
1210 {
1211 if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14)
1212 && (ci.columnTypes[colpos].precision != 12) && (ci.columnTypes[colpos].precision != 10))
1213 totalBytes++;
1214
1215 bytesBefore = totalBytes - 5;;
1216 break;
1217 }
1218
1219 case 11:
1220 {
1221 if (ci.columnTypes[colpos].precision == 18)
1222 totalBytes++;
1223
1224 bytesBefore = totalBytes - 5;
1225 break;
1226 }
1227
1228 case 12:
1229 {
1230 if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14)
1231 && (ci.columnTypes[colpos].precision != 12))
1232 totalBytes++;
1233
1234 bytesBefore = totalBytes - 6;
1235 break;
1236 }
1237
1238 case 13:
1239 {
1240 if (ci.columnTypes[colpos].precision == 18)
1241 totalBytes++;
1242
1243 bytesBefore = totalBytes - 6;
1244 break;
1245 }
1246
1247 case 14:
1248 {
1249 if ((ci.columnTypes[colpos].precision != 16) && (ci.columnTypes[colpos].precision != 14))
1250 totalBytes++;
1251
1252 bytesBefore = totalBytes - 7;
1253 break;
1254 }
1255
1256 case 15:
1257 {
1258 if (ci.columnTypes[colpos].precision == 18)
1259 totalBytes++;
1260
1261 bytesBefore = totalBytes - 7;
1262 break;
1263 }
1264
1265 case 16:
1266 {
1267 if (ci.columnTypes[colpos].precision != 16)
1268 totalBytes++;
1269
1270 bytesBefore = totalBytes - 8;
1271 break;
1272 }
1273
1274 case 17:
1275 {
1276 if (ci.columnTypes[colpos].precision == 18)
1277 totalBytes++;
1278
1279 bytesBefore = totalBytes - 8;
1280 break;
1281 }
1282
1283 case 18:
1284 {
1285 bytesBefore = totalBytes - 8;
1286 break;
1287 }
1288
1289 default:
1290 break;
1291 }
1292
1293 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
1294 {
1295 fprintf(ci.filePtr, "%c", ci.delimiter);
1296 //printf("|");
1297 }
1298 else
1299 {
1300 uint32_t mask [5] = {0, 0xFF, 0xFFFF, 0xFFFFFF, 0xFFFFFFFF};
1301 char neg = '-';
1302
1303 if (ci.columnTypes[colpos].scale == 0)
1304 {
1305 const uchar* tmpBuf = buf;
1306 //test flag bit for sign
1307 bool posNum = tmpBuf[0] & (0x80);
1308 uchar tmpChr = tmpBuf[0];
1309 tmpChr ^= 0x80; //flip the bit
1310 int32_t tmp1 = tmpChr;
1311
1312 if (totalBytes > 4)
1313 {
1314 for (uint i = 1; i < (totalBytes - 4); i++)
1315 {
1316 tmp1 = (tmp1 << 8) + tmpBuf[i];
1317 }
1318
1319 if (( tmp1 != 0 ) && (tmp1 != -1))
1320 {
1321 if (!posNum)
1322 {
1323 tmp1 = mask[totalBytes - 4] - tmp1;
1324
1325 if (tmp1 != 0)
1326 {
1327 fprintf(ci.filePtr, "%c", neg);
1328 //printf("%c", neg);
1329 }
1330 }
1331
1332 if (tmp1 != 0)
1333 {
1334 fprintf(ci.filePtr, "%d", tmp1);
1335 ////printf("%d", tmp1);
1336 }
1337 }
1338
1339 int32_t tmp2 = tmpBuf[totalBytes - 4];
1340
1341 for (uint i = (totalBytes - 3); i < totalBytes; i++)
1342 {
1343 tmp2 = (tmp2 << 8) + tmpBuf[i];
1344 }
1345
1346 if ( tmp1 != 0 )
1347 {
1348 if (!posNum)
1349 {
1350 tmp2 = mask[4] - tmp2;
1351
1352 if (tmp1 == -1)
1353 {
1354 fprintf(ci.filePtr, "%c", neg);
1355 fprintf(ci.filePtr, "%d%c", tmp2, ci.delimiter);
1356 ////printf("%c", neg);
1357 //////printf( "%d|", tmp2);
1358 }
1359 else
1360 {
1361 fprintf(ci.filePtr, "%09u%c", tmp2, ci.delimiter);
1362 ////printf("%09u|", tmp2);
1363 }
1364 }
1365 else
1366 {
1367 fprintf(ci.filePtr, "%09u%c", tmp2, ci.delimiter);
1368 //printf("%09u|", tmp2);
1369 }
1370 }
1371 else
1372 {
1373 if (!posNum)
1374 {
1375 tmp2 = mask[4] - tmp2;
1376 fprintf(ci.filePtr, "%c", neg);
1377 //printf("%c", neg);
1378 }
1379
1380 fprintf(ci.filePtr, "%d%c", tmp2, ci.delimiter);
1381 //printf("%d|", tmp2);
1382 }
1383 }
1384 else
1385 {
1386 for (uint i = 1; i < totalBytes; i++)
1387 {
1388 tmp1 = (tmp1 << 8) + tmpBuf[i];
1389 }
1390
1391 if (!posNum)
1392 {
1393 tmp1 = mask[totalBytes] - tmp1;
1394 fprintf(ci.filePtr, "%c", neg);
1395 //printf("%c", neg);
1396 }
1397
1398 fprintf(ci.filePtr, "%d%c", tmp1, ci.delimiter);
1399 //printf("%d|", tmp1);
1400 }
1401 }
1402 else
1403 {
1404 const uchar* tmpBuf = buf;
1405 //test flag bit for sign
1406 bool posNum = tmpBuf[0] & (0x80);
1407 uchar tmpChr = tmpBuf[0];
1408 tmpChr ^= 0x80; //flip the bit
1409 int32_t tmp1 = tmpChr;
1410
1411 //fetch the digits before decimal point
1412 if (bytesBefore == 0)
1413 {
1414 if (!posNum)
1415 {
1416 fprintf(ci.filePtr, "%c", neg);
1417 //printf("%c", neg);
1418 }
1419
1420 fprintf(ci.filePtr, "0.");
1421 //printf("0.");
1422 }
1423 else if (bytesBefore > 4)
1424 {
1425 for (uint i = 1; i < (bytesBefore - 4); i++)
1426 {
1427 tmp1 = (tmp1 << 8) + tmpBuf[i];
1428 }
1429
1430 if (!posNum)
1431 {
1432 tmp1 = mask[bytesBefore - 4] - tmp1;
1433 }
1434
1435 if (( tmp1 != 0 ) && (tmp1 != -1))
1436 {
1437 if (!posNum)
1438 {
1439 fprintf(ci.filePtr, "%c", neg);
1440 //printf("%c", neg);
1441 }
1442
1443 fprintf(ci.filePtr, "%d", tmp1);
1444 //printf("%d", tmp1);
1445 }
1446
1447 tmpBuf += (bytesBefore - 4);
1448 int32_t tmp2 = *((int32_t*)tmpBuf);
1449 tmp2 = ntohl(tmp2);
1450
1451 if ( tmp1 != 0 )
1452 {
1453 if (!posNum)
1454 {
1455 tmp2 = mask[4] - tmp2;
1456 }
1457
1458 if (tmp1 == -1)
1459 {
1460 fprintf(ci.filePtr, "%c", neg);
1461 fprintf(ci.filePtr, "%d.", tmp2);
1462 //printf("%c", neg);
1463 //printf("%d.", tmp2);
1464 }
1465 else
1466 {
1467 fprintf(ci.filePtr, "%09u.", tmp2);
1468 //printf("%09u.", tmp2);
1469 }
1470 }
1471 else
1472 {
1473 if (!posNum)
1474 {
1475 tmp2 = mask[4] - tmp2;
1476 fprintf(ci.filePtr, "%c", neg);
1477 //printf("%c", neg);
1478 }
1479
1480 fprintf(ci.filePtr, "%d.", tmp2);
1481 //printf("%d.", tmp2);
1482 }
1483 }
1484 else
1485 {
1486 for (uint i = 1; i < bytesBefore; i++)
1487 {
1488 tmp1 = (tmp1 << 8) + tmpBuf[i];
1489 }
1490
1491 if (!posNum)
1492 {
1493 tmp1 = mask[bytesBefore] - tmp1;
1494 fprintf(ci.filePtr, "%c", neg);
1495 //printf("%c", neg);
1496 }
1497
1498 fprintf(ci.filePtr, "%d.", tmp1);
1499 //printf("%d.", tmp1);
1500 }
1501
1502 //fetch the digits after decimal point
1503 int32_t tmp2 = 0;
1504
1505 if (bytesBefore > 4)
1506 tmpBuf += 4;
1507 else
1508 tmpBuf += bytesBefore;
1509
1510 tmp2 = tmpBuf[0];
1511
1512 if ((totalBytes - bytesBefore) < 5)
1513 {
1514 for (uint j = 1; j < (totalBytes - bytesBefore); j++)
1515 {
1516 tmp2 = (tmp2 << 8) + tmpBuf[j];
1517 }
1518
1519 int8_t digits = ci.columnTypes[colpos].scale - 9; //9 digits is a 4 bytes chunk
1520
1521 if ( digits <= 0 )
1522 digits = ci.columnTypes[colpos].scale;
1523
1524 if (!posNum)
1525 {
1526 tmp2 = mask[totalBytes - bytesBefore] - tmp2;
1527 }
1528
1529 fprintf(ci.filePtr, "%0*u%c", digits, tmp2, ci.delimiter);
1530 //printf("%0*u|", digits, tmp2);
1531 }
1532 else
1533 {
1534 for (uint j = 1; j < 4; j++)
1535 {
1536 tmp2 = (tmp2 << 8) + tmpBuf[j];
1537 }
1538
1539 if (!posNum)
1540 {
1541 tmp2 = mask[4] - tmp2;
1542 }
1543
1544 fprintf(ci.filePtr, "%09u", tmp2);
1545 //printf("%09u", tmp2);
1546
1547 tmpBuf += 4;
1548 int32_t tmp3 = tmpBuf[0];
1549
1550 for (uint j = 1; j < (totalBytes - bytesBefore - 4); j++)
1551 {
1552 tmp3 = (tmp3 << 8) + tmpBuf[j];
1553 }
1554
1555 int8_t digits = ci.columnTypes[colpos].scale - 9; //9 digits is a 4 bytes chunk
1556
1557 if ( digits < 0 )
1558 digits = ci.columnTypes[colpos].scale;
1559
1560 if (!posNum)
1561 {
1562 tmp3 = mask[totalBytes - bytesBefore - 4] - tmp3;
1563 }
1564
1565 fprintf(ci.filePtr, "%0*u%c", digits, tmp3, ci.delimiter);
1566 //printf("%0*u|", digits, tmp3);
1567 }
1568 }
1569 }
1570
1571 buf += totalBytes;
1572 break;
1573 }
1574
1575 case CalpontSystemCatalog::CHAR:
1576 case CalpontSystemCatalog::TEXT:
1577 case CalpontSystemCatalog::VARCHAR:
1578 case CalpontSystemCatalog::BLOB:
1579 case CalpontSystemCatalog::VARBINARY:
1580 {
1581 Field* fieldPtr = table->field[colpos];
1582 if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
1583 {
1584 fprintf(ci.filePtr, "%c", ci.delimiter);
1585 }
1586 else
1587 {
1588 String value;
1589 // We need to set table->read_set for a field first.
1590 // This happens in ha_mcs_impl_start_bulk_insert().
1591 fieldPtr->val_str(&value);
1592 if (UNLIKELY(execplan::isBlobOrVarbinary(ci.columnTypes[colpos].colDataType)))
1593 {
1594 const char* ptr = value.ptr();
1595 for (uint32_t i = 0; i < value.length(); i++)
1596 {
1597 fprintf(ci.filePtr, "%02x", *(uint8_t*)(ptr+i));
1598 }
1599 fprintf(ci.filePtr, "%c", ci.delimiter);
1600 }
1601 else
1602 {
1603 escape.assign(value.ptr(), value.length());
1604 boost::replace_all(escape, "\\", "\\\\");
1605 fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(),
1606 escape.c_str(), ci.enclosed_by, ci.delimiter);
1607 }
1608 }
1609 buf+= fieldPtr->pack_length();
1610 break;
1611 }
1612 default: // treat as int64
1613 {
1614 break;
1615 }
1616 }
1617
1618 colpos++;
1619 }
1620 }
1621
1622 rc = fprintf(ci.filePtr, "\n"); //@bug 6077 check whether thhe pipe is still open
1623
1624 if ( rc < 0)
1625 rc = -1;
1626 else
1627 rc = 0;
1628
1629 return rc;
1630 }
1631
ha_mcs_impl_viewtablelock(cal_impl_if::cal_connection_info & ci,execplan::CalpontSystemCatalog::TableName & tablename)1632 std::string ha_mcs_impl_viewtablelock( cal_impl_if::cal_connection_info& ci, execplan::CalpontSystemCatalog::TableName& tablename)
1633 {
1634 THD* thd = current_thd;
1635 ulong sessionID = tid2sid(thd->thread_id);
1636 CalpontDMLPackage* pDMLPackage;
1637 std::string dmlStatement( "VIEWTABLELOCK" );
1638 VendorDMLStatement cmdStmt(dmlStatement, DML_COMMAND, sessionID);
1639 pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
1640 if (lower_case_table_names)
1641 {
1642 boost::algorithm::to_lower(tablename.schema);
1643 boost::algorithm::to_lower(tablename.table);
1644 }
1645 pDMLPackage->set_SchemaName (tablename.schema);
1646 pDMLPackage->set_TableName (tablename.table);
1647
1648 ByteStream bytestream;
1649 bytestream << static_cast<uint32_t>(sessionID);
1650 pDMLPackage->write(bytestream);
1651 delete pDMLPackage;
1652
1653 ByteStream::byte b = 0;
1654 ByteStream::octbyte rows;
1655 std::string errorMsg;
1656 std::string tableLockInfo;
1657 //int dmlRowCount = 0;
1658
1659 try
1660 {
1661 ci.dmlProc->write(bytestream);
1662 bytestream = ci.dmlProc->read();
1663
1664 if ( bytestream.length() == 0 )
1665 {
1666 thd->get_stmt_da()->set_overwrite_status(true);
1667 thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [5]");
1668 }
1669 else
1670 {
1671 bytestream >> b;
1672 bytestream >> rows;
1673 bytestream >> errorMsg;
1674 bytestream >> tableLockInfo;
1675 }
1676
1677 }
1678 catch (runtime_error&)
1679 {
1680 thd->get_stmt_da()->set_overwrite_status(true);
1681 thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [6]");
1682 }
1683 catch (...)
1684 {
1685 thd->get_stmt_da()->set_overwrite_status(true);
1686 thd->raise_error_printf(ER_INTERNAL_ERROR, "Caught unknown error");
1687 }
1688
1689 if ( b != 0 )
1690 tableLockInfo = errorMsg;
1691
1692 return tableLockInfo;
1693
1694 }
1695
1696 //------------------------------------------------------------------------------
1697 // Clear the table lock associated with the specified table lock id.
1698 // Any bulk rollback that is pending will be applied before the table
1699 // lock is released.
1700 //------------------------------------------------------------------------------
ha_mcs_impl_cleartablelock(cal_impl_if::cal_connection_info & ci,uint64_t tableLockID)1701 std::string ha_mcs_impl_cleartablelock(
1702 cal_impl_if::cal_connection_info& ci,
1703 uint64_t tableLockID)
1704 {
1705 execplan::CalpontSystemCatalog::TableName tblName;
1706 THD* thd = current_thd;
1707 ulong sessionID = tid2sid(thd->thread_id);
1708 std::string tableLockInfo;
1709 BRM::TableLockInfo lockInfo;
1710
1711 // Perform preliminary setup. CalpontDMLPackage expects schema and table
1712 // name to be provided, so we get the table OID for the specified table
1713 // lock, and then get the table name for the applicable table OID.
1714 std::string prelimTask;
1715
1716 try
1717 {
1718 BRM::DBRM brm;
1719 prelimTask = "getting table locks from BRM.";
1720 bool getLockInfo = brm.getTableLockInfo(tableLockID, &lockInfo);
1721
1722 if (!getLockInfo)
1723 {
1724 tableLockInfo = "No table lock found for specified table lock ID";
1725 return tableLockInfo;
1726 }
1727
1728 boost::shared_ptr<execplan::CalpontSystemCatalog> csc =
1729 execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
1730 csc->identity(execplan::CalpontSystemCatalog::FE);
1731
1732 prelimTask = "getting table name from system catalog.";
1733 tblName = csc->tableName( lockInfo.tableOID );
1734 }
1735 catch (std::exception& ex)
1736 {
1737 std::string eMsg(ex.what());
1738 eMsg += " Error ";
1739 eMsg += prelimTask;
1740
1741 thd->get_stmt_da()->set_overwrite_status(true);
1742 thd->raise_error_printf(ER_INTERNAL_ERROR, eMsg.c_str());
1743 return tableLockInfo;
1744 }
1745 catch (...)
1746 {
1747 std::string eMsg(" Error ");
1748 eMsg += prelimTask;
1749
1750 thd->get_stmt_da()->set_overwrite_status(true);
1751 thd->raise_error_printf(ER_INTERNAL_ERROR, eMsg.c_str());
1752 return tableLockInfo;
1753 }
1754
1755 std::string dmlStatement( "CLEARTABLELOCK" );
1756 VendorDMLStatement cmdStmt(dmlStatement, DML_COMMAND, sessionID);
1757 CalpontDMLPackage* pDMLPackage =
1758 CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer( cmdStmt );
1759 pDMLPackage->set_SchemaName(tblName.schema);
1760 pDMLPackage->set_TableName (tblName.table );
1761
1762 // Table lock ID is passed in the SQL statement attribute
1763 std::ostringstream lockIDString;
1764 lockIDString << tableLockID;
1765 pDMLPackage->set_SQLStatement( lockIDString.str() );
1766
1767 ByteStream bytestream;
1768 bytestream << static_cast<uint32_t>(sessionID);
1769 pDMLPackage->write(bytestream);
1770 delete pDMLPackage;
1771
1772 ByteStream::byte b = 0;
1773 ByteStream::octbyte rows;
1774 std::string errorMsg;
1775
1776 try
1777 {
1778 ci.dmlProc->write(bytestream);
1779 bytestream = ci.dmlProc->read();
1780
1781 if ( bytestream.length() == 0 )
1782 {
1783 thd->get_stmt_da()->set_overwrite_status(true);
1784 thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [7]");
1785 }
1786 else
1787 {
1788 bytestream >> b;
1789 bytestream >> rows;
1790 bytestream >> errorMsg;
1791 bytestream >> tableLockInfo;
1792 }
1793
1794 }
1795 catch (runtime_error&)
1796 {
1797 thd->get_stmt_da()->set_overwrite_status(true);
1798 thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [8]");
1799 }
1800 catch (...)
1801 {
1802 thd->get_stmt_da()->set_overwrite_status(true);
1803 thd->raise_error_printf(ER_INTERNAL_ERROR, "Caught unknown error");
1804 }
1805
1806 //@Bug 2606. Send error message back to sql session
1807 if ( b != 0 )
1808 tableLockInfo = errorMsg;
1809
1810 return tableLockInfo;
1811 }
1812
ha_mcs_impl_commit_(handlerton * hton,THD * thd,bool all,cal_connection_info & ci)1813 int ha_mcs_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connection_info& ci )
1814 {
1815 if (thd->slave_thread && !get_replication_slave(thd))
1816 return 0;
1817
1818 int rc = 0;
1819
1820 std::string command("COMMIT");
1821 #ifdef INFINIDB_DEBUG
1822 cout << "COMMIT" << endl;
1823 #endif
1824 rc = ProcessCommandStatement(thd, command, ci);
1825 return rc;
1826 }
1827
ha_mcs_impl_rollback_(handlerton * hton,THD * thd,bool all,cal_connection_info & ci)1828 int ha_mcs_impl_rollback_ (handlerton* hton, THD* thd, bool all, cal_connection_info& ci)
1829 {
1830 int rc = 0;
1831 #ifdef INFINIDB_DEBUG
1832 cout << "ROLLBACK" << endl;
1833 #endif
1834
1835 if (useHdfs)
1836 {
1837 string msg = string("Some non-transactional changed tables couldn't be rolled back");
1838 // cout << "Some non-transactional changed tables couldn't be rolled back" << endl;
1839 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 1196, msg.c_str());
1840 return rc;
1841 }
1842
1843 std::string command("ROLLBACK");
1844 rc = ProcessCommandStatement(thd, command, ci);
1845 return rc;
1846 }
1847
ha_mcs_impl_close_connection_(handlerton * hton,THD * thd,cal_connection_info & ci)1848 int ha_mcs_impl_close_connection_ (handlerton* hton, THD* thd, cal_connection_info& ci )
1849 {
1850 int rc = 0;
1851 #ifdef INFINIDB_DEBUG
1852 cout << "Close connection session ID " << thd->thread_id << endl;
1853 #endif
1854
1855 if ( !ci.dmlProc )
1856 {
1857 return rc;
1858 }
1859
1860 std::string command("CLEANUP");
1861 rc = ProcessCommandStatement(thd, command, ci);
1862 // @bug 1622. remove calpontsystemcatalog and close the socket when session quit.
1863 // @info when Calpont process a select query, an alter table phase is involved in
1864 // the vtable design, which will auto start a transaction. when autocommit on (by default), rollback is automically called
1865 // when session quit. rollback can also be called by user explicitly to rollback
1866 // a transaction. Under either situation, system catalog cache for this session should
1867 // be removed
1868 return rc;
1869 }
1870
1871 // vim:ts=4 sw=4:
1872
1873