1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: we_dmlcommandproc.cpp 3082 2011-09-26 22:00:38Z chao $
20
21 #include <unistd.h>
22 using namespace std;
23 #include "bytestream.h"
24 using namespace messageqcpp;
25
26 #include "we_messages.h"
27 #include "we_message_handlers.h"
28 #include "../../dbcon/dmlpackage/dmlpkg.h"
29 #include "we_dmlcommandproc.h"
30 using namespace dmlpackage;
31 #include "dmlpackageprocessor.h"
32 using namespace dmlpackageprocessor;
33 #include "dataconvert.h"
34 using namespace dataconvert;
35 #include "calpontsystemcatalog.h"
36 #include "sessionmanager.h"
37 using namespace execplan;
38 #include "messagelog.h"
39 #include "stopwatch.h"
40 #include "idberrorinfo.h"
41 #include "errorids.h"
42 using namespace logging;
43
44 #include "brmtypes.h"
45 using namespace BRM;
46 #include "we_tablemetadata.h"
47 #include "we_dbrootextenttracker.h"
48 #include "we_bulkrollbackmgr.h"
49 #include "we_define.h"
50 #include "we_confirmhdfsdbfile.h"
51 #include "cacheutils.h"
52 #include "IDBDataFile.h"
53 #include "IDBPolicy.h"
54
55 #include "checks.h"
56
57 namespace WriteEngine
58 {
59 //StopWatch timer;
WE_DMLCommandProc()60 WE_DMLCommandProc::WE_DMLCommandProc()
61 {
62 fIsFirstBatchPm = true;
63 filesPerColumnPartition = 8;
64 extentsPerSegmentFile = 1;
65 dbrootCnt = 1;
66 extentRows = 0x800000;
67 config::Config* cf = config::Config::makeConfig();
68 string fpc = cf->getConfig("ExtentMap", "FilesPerColumnPartition");
69
70 if (fpc.length() != 0)
71 filesPerColumnPartition = cf->uFromText(fpc);
72
73 string epsf = cf->getConfig("ExtentMap", "ExtentsPerSegmentFile");
74
75 if (epsf.length() != 0)
76 extentsPerSegmentFile = cf->uFromText(epsf);
77
78 string dbct = cf->getConfig("SystemConfig", "DBRootCount");
79
80 if (dbct.length() != 0)
81 dbrootCnt = cf->uFromText(dbct);
82 }
83
WE_DMLCommandProc(const WE_DMLCommandProc & rhs)84 WE_DMLCommandProc::WE_DMLCommandProc(const WE_DMLCommandProc& rhs)
85 {
86 fIsFirstBatchPm = rhs.fIsFirstBatchPm;
87 fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
88 }
89
~WE_DMLCommandProc()90 WE_DMLCommandProc::~WE_DMLCommandProc()
91 {
92 dbRootExtTrackerVec.clear();
93 }
94
processSingleInsert(messageqcpp::ByteStream & bs,std::string & err)95 uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std::string& err)
96 {
97 uint8_t rc = 0;
98 err.clear();
99 InsertDMLPackage insertPkg;
100 ByteStream::quadbyte tmp32;
101 bs >> tmp32;
102 BRM::TxnID txnid;
103 txnid.valid = true;
104 txnid.id = tmp32;
105 bs >> tmp32;
106 uint32_t dbroot = tmp32;
107
108 //cout << "processSingleInsert received bytestream length " << bs.length() << endl;
109
110 messageqcpp::ByteStream::byte packageType;
111 bs >> packageType;
112 insertPkg.read( bs);
113 uint32_t sessionId = insertPkg.get_SessionID();
114 //cout << " processSingleInsert for session " << sessionId << endl;
115 DMLTable* tablePtr = insertPkg.get_Table();
116 RowList rows = tablePtr->get_RowList();
117
118 WriteEngine::ColStructList colStructs;
119 WriteEngine::DctnryStructList dctnryStructList;
120 WriteEngine::DctnryValueList dctnryValueList;
121 WriteEngine::ColValueList colValuesList;
122 WriteEngine::DictStrList dicStringList ;
123 CalpontSystemCatalog::TableName tableName;
124 CalpontSystemCatalog::TableColName tableColName;
125 tableName.table = tableColName.table = tablePtr->get_TableName();
126 tableName.schema = tableColName.schema = tablePtr->get_SchemaName();
127
128 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
129 systemCatalogPtr->identity(CalpontSystemCatalog::EC);
130 CalpontSystemCatalog::ROPair tableRoPair;
131 std::vector<string> colNames;
132 bool isWarningSet = false;
133
134 try
135 {
136 tableRoPair = systemCatalogPtr->tableRID(tableName);
137
138 if (rows.size())
139 {
140 Row* rowPtr = rows.at(0);
141 ColumnList columns = rowPtr->get_ColumnList();
142 ColumnList::const_iterator column_iterator = columns.begin();
143
144 while (column_iterator != columns.end())
145 {
146 DMLColumn* columnPtr = *column_iterator;
147 tableColName.column = columnPtr->get_Name();
148 CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName);
149
150 CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
151
152 CalpontSystemCatalog::ColType colType;
153 colType = systemCatalogPtr->colType(oid);
154
155 WriteEngine::ColStruct colStruct;
156 colStruct.fColDbRoot = dbroot;
157 WriteEngine::DctnryStruct dctnryStruct;
158 dctnryStruct.fColDbRoot = dbroot;
159 colStruct.dataOid = roPair.objnum;
160 colStruct.tokenFlag = false;
161 colStruct.fCompressionType = colType.compressionType;
162
163 // Token
164 if ( isDictCol(colType) )
165 {
166 colStruct.colWidth = 8;
167 colStruct.tokenFlag = true;
168 }
169 else
170 {
171 colStruct.colWidth = colType.colWidth;
172 }
173
174 colStruct.colDataType = colType.colDataType;
175
176 if (colStruct.tokenFlag)
177 {
178 dctnryStruct.dctnryOid = colType.ddn.dictOID;
179 dctnryStruct.columnOid = colStruct.dataOid;
180 dctnryStruct.fCompressionType = colType.compressionType;
181 dctnryStruct.colWidth = colType.colWidth;
182 }
183 else
184 {
185 dctnryStruct.dctnryOid = 0;
186 dctnryStruct.columnOid = colStruct.dataOid;
187 dctnryStruct.fCompressionType = colType.compressionType;
188 dctnryStruct.colWidth = colType.colWidth;
189 }
190
191 colStructs.push_back(colStruct);
192 dctnryStructList.push_back(dctnryStruct);
193
194 ++column_iterator;
195 }
196
197 unsigned int numcols = rowPtr->get_NumberOfColumns();
198 std::string tmpStr("");
199
200 for (unsigned int i = 0; i < numcols; i++)
201 {
202
203 WriteEngine::ColTupleList colTuples;
204 WriteEngine::DctColTupleList dctColTuples;
205 RowList::const_iterator row_iterator = rows.begin();
206
207 while (row_iterator != rows.end())
208 {
209 Row* rowPtr = *row_iterator;
210 const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i);
211
212 tableColName.column = columnPtr->get_Name();
213 CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
214
215 CalpontSystemCatalog::ColType colType;
216 colType = systemCatalogPtr->colType(oid);
217
218 boost::any datavalue;
219 bool isNULL = false;
220 bool pushWarning = false;
221 std::vector<std::string> origVals;
222 origVals = columnPtr->get_DataVector();
223 WriteEngine::dictStr dicStrings;
224
225 // token
226 if ( isDictCol(colType) )
227 {
228 for ( uint32_t i = 0; i < origVals.size(); i++ )
229 {
230 tmpStr = origVals[i];
231
232 isNULL = columnPtr->get_isnull();
233
234 if ( isNULL || ( tmpStr.length() == 0 ) )
235 isNULL = true;
236 else
237 isNULL = false;
238
239 if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
240 {
241 if (isNULL && colType.defaultValue.empty()) //error out
242 {
243 Message::Args args;
244 args.add(tableColName.column);
245 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
246 rc = 1;
247 return rc;
248 }
249 else if (isNULL && !(colType.defaultValue.empty()))
250 {
251 tmpStr = colType.defaultValue;
252 }
253 }
254
255 if ( tmpStr.length() > (unsigned int)colType.colWidth )
256 {
257 tmpStr = tmpStr.substr(0, colType.colWidth);
258
259 if ( !pushWarning )
260 {
261 pushWarning = true;
262 isWarningSet = true;
263
264 if ((rc != NO_ERROR) && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
265 rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
266
267 colNames.push_back(tableColName.column);
268 }
269 }
270
271 WriteEngine::ColTuple colTuple;
272 colTuple.data = datavalue;
273
274 colTuples.push_back(colTuple);
275 //@Bug 2515. Only pass string values to write engine
276 dicStrings.push_back( tmpStr );
277 }
278
279 colValuesList.push_back(colTuples);
280 //@Bug 2515. Only pass string values to write engine
281 dicStringList.push_back( dicStrings );
282 }
283 else
284 {
285 string x;
286 std::string indata;
287
288 for ( uint32_t i = 0; i < origVals.size(); i++ )
289 {
290 indata = origVals[i];
291
292 isNULL = columnPtr->get_isnull();
293
294 if ( isNULL || ( indata.length() == 0 ) )
295 isNULL = true;
296 else
297 isNULL = false;
298
299 //check if autoincrement column and value is 0 or null
300 uint64_t nextVal = 1;
301
302 if (colType.autoincrement)
303 {
304 try
305 {
306 nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
307 fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
308 }
309 catch (std::exception& ex)
310 {
311 err = ex.what();
312 rc = 1;
313 return rc;
314 }
315 }
316
317 if (colType.autoincrement && ( isNULL || (indata.compare("0") == 0)))
318 {
319 try
320 {
321 bool reserved = fDbrm.getAIRange(oid, 1, &nextVal);
322
323 if (!reserved)
324 {
325 err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
326 rc = 1;
327 return rc;
328 }
329 }
330 catch (std::exception& ex)
331 {
332 err = ex.what();
333 rc = 1;
334 return rc;
335 }
336
337 ostringstream oss;
338 oss << nextVal;
339 indata = oss.str();
340 isNULL = false;
341 }
342
343 if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
344 {
345 if (isNULL && colType.defaultValue.empty()) //error out
346 {
347 Message::Args args;
348 args.add(tableColName.column);
349 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
350 rc = 1;
351 return rc;
352 }
353 else if (isNULL && !(colType.defaultValue.empty()))
354 {
355 indata = colType.defaultValue;
356 isNULL = false;
357 }
358 }
359
360 try
361 {
362 datavalue = DataConvert::convertColumnData(colType, indata, pushWarning, insertPkg.get_TimeZone(), isNULL, false, false);
363 }
364 catch (exception&)
365 {
366 rc = 1;
367 Message::Args args;
368 args.add(string("'") + indata + string("'"));
369 err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
370 }
371
372 //@Bug 1806
373 if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
374 {
375 return rc;
376 }
377
378 if ( pushWarning)
379 {
380 if (!isWarningSet)
381 isWarningSet = true;
382
383 if ( rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING )
384 rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
385
386 colNames.push_back(tableColName.column);
387 }
388
389 WriteEngine::ColTuple colTuple;
390 colTuple.data = datavalue;
391
392 colTuples.push_back(colTuple);
393 //@Bug 2515. Only pass string values to write engine
394 dicStrings.push_back( tmpStr );
395 }
396
397 colValuesList.push_back(colTuples);
398 dicStringList.push_back( dicStrings );
399 }
400
401 ++row_iterator;
402 }
403 }
404 }
405 }
406 catch (exception& ex)
407 {
408 rc = 1;
409 err = ex.what();
410 return rc;
411 }
412
413 // call the write engine to write the rows
414 int error = NO_ERROR;
415 //fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
416 //cout << "inserting a row with transaction id " << txnid.id << endl;
417 fWEWrapper.setIsInsert(true);
418 fWEWrapper.setBulkFlag(true);
419 fWEWrapper.setTransId(txnid.id);
420 //For hdfs use only
421 uint32_t tblOid = tableRoPair.objnum;
422
423 if (idbdatafile::IDBPolicy::useHdfs())
424 {
425
426 std::vector<Column> columns;
427 DctnryStructList dctnryList;
428 CalpontSystemCatalog::ColType colType;
429 std::vector<DBRootExtentInfo> colDBRootExtentInfo;
430 Convertor convertor;
431 dbRootExtTrackerVec.clear();
432 fRBMetaWriter.reset(new RBMetaWriter("SingleInsert", NULL));
433 CalpontSystemCatalog::RIDList ridList;
434
435 try
436 {
437 ridList = systemCatalogPtr->columnRIDs(tableName, true);
438 std::vector<OID> dctnryStoreOids(ridList.size()) ;
439 std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
440 bool bFirstExtentOnThisPM = false;
441
442 // First gather HWM BRM information for all columns
443 std::vector<int> colWidths;
444
445 for (unsigned i = 0; i < ridList.size(); i++)
446 {
447 rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
448 //need handle error
449
450 CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
451 colWidths.push_back( convertor.getCorrectRowWidth(
452 colType2.colDataType, colType2.colWidth) );
453 }
454
455 for (unsigned i = 0; i < ridList.size(); i++)
456 {
457 // Find DBRoot/segment file where we want to start adding rows
458 colType = systemCatalogPtr->colType(ridList[i].objnum);
459 boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker (new DBRootExtentTracker(ridList[i].objnum,
460 colWidths, dbRootHWMInfoColVec, i, 0) );
461 dbRootExtTrackerVec.push_back( pDBRootExtentTracker );
462 DBRootExtentInfo dbRootExtent;
463 std::string trkErrMsg;
464 bool bEmptyPM;
465
466 if (i == 0)
467 rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM, trkErrMsg);
468 else
469 pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
470
471 colDBRootExtentInfo.push_back(dbRootExtent);
472
473 Column aColumn;
474 aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
475 aColumn.colDataType = colType.colDataType;
476 aColumn.compressionType = colType.compressionType;
477 aColumn.dataFile.oid = ridList[i].objnum;
478 aColumn.dataFile.fPartition = dbRootExtent.fPartition;
479 aColumn.dataFile.fSegment = dbRootExtent.fSegment;
480 aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
481 aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
482 columns.push_back(aColumn);
483
484 if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
485 {
486 DctnryStruct aDctnry;
487 aDctnry.dctnryOid = colType.ddn.dictOID;
488 aDctnry.fColPartition = dbRootExtent.fPartition;
489 aDctnry.fColSegment = dbRootExtent.fSegment;
490 aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
491 dctnryList.push_back(aDctnry);
492 }
493
494 if (colType.ddn.dictOID > 0)
495 {
496 dctnryStoreOids[i] = colType.ddn.dictOID;
497 }
498 else
499 {
500 dctnryStoreOids[i] = 0;
501 }
502 }
503
504 fRBMetaWriter->init(tblOid, tableName.table);
505 fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
506
507 //cout << "Backing up hwm chunks" << endl;
508 for (unsigned i = 0; i < dctnryList.size(); i++) //back up chunks for compressed dictionary
509 {
510 // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
511 fRBMetaWriter->backupDctnryHWMChunk(
512 dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment);
513 }
514 }
515 catch (std::exception& ex)
516 {
517 err = ex.what();
518 rc = 1;
519 return rc;
520 }
521 }
522
523 if (colValuesList[0].size() > 0)
524 {
525 if (NO_ERROR !=
526 (error = fWEWrapper.insertColumnRec_Single(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, tableRoPair.objnum)))
527 {
528 if (error == ERR_BRM_DEAD_LOCK)
529 {
530 rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
531 WErrorCodes ec;
532 err = ec.errorString(error);
533 }
534 else if ( error == ERR_BRM_VB_OVERFLOW )
535 {
536 rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
537 err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
538 }
539 else
540 {
541 rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
542 WErrorCodes ec;
543 err = ec.errorString(error);
544 }
545 }
546 }
547
548 std::map<uint32_t, uint32_t> oids;
549 std::vector<BRM::OID_t> oidsToFlush;
550
551 for ( unsigned i = 0; i < colStructs.size(); i++)
552 {
553 oids[colStructs[i].dataOid] = colStructs[i].dataOid;
554 oidsToFlush.push_back(colStructs[i].dataOid);
555 }
556
557 for (unsigned i = 0; i < dctnryStructList.size(); i++)
558 {
559 oids[dctnryStructList[i].dctnryOid] = dctnryStructList[i].dctnryOid;
560 oidsToFlush.push_back(dctnryStructList[i].dctnryOid);
561 }
562
563 fWEWrapper.setTransId(txnid.id);
564 vector<LBID_t> lbidList;
565
566 if (idbdatafile::IDBPolicy::useHdfs())
567 {
568 //save the extent info to mark them invalid, after flush, the meta file will be gone.
569 std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
570 std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();
571
572 try
573 {
574 mapIter = m_txnLBIDMap.find(txnid.id);
575
576 if (mapIter != m_txnLBIDMap.end())
577 {
578 SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
579 std::tr1::unordered_map<BRM::LBID_t, uint32_t> ::iterator listIter = spTxnLBIDRec->m_LBIDMap.begin();
580
581 while (listIter != spTxnLBIDRec->m_LBIDMap.end())
582 {
583 lbidList.push_back(listIter->first);
584 listIter++;
585 }
586 }
587 }
588 catch (...) {}
589 }
590
591 //flush files
592 // @bug5333, up to here, rc may have an error code already, don't overwrite it.
593 int flushChunksRc = fWEWrapper.flushChunks(0, oids); // why not pass rc to flushChunks?
594
595 if (flushChunksRc != NO_ERROR)
596 {
597 WErrorCodes ec;
598 std::ostringstream ossErr;
599 ossErr << "Error flushing chunks for table " << tableName <<
600 "; " << ec.errorString(flushChunksRc);
601
602 // Append to errmsg in case we already have an error
603 if (err.length() > 0)
604 err += "; ";
605
606 err += ossErr.str();
607
608 if (error == NO_ERROR)
609 error = flushChunksRc;
610
611 if ((rc == NO_ERROR) || (rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
612 rc = 1; // return hardcoded 1 as the above
613 }
614
615 // Confirm HDFS DB file changes "only" if no error up to this point
616 if (idbdatafile::IDBPolicy::useHdfs())
617 {
618 if (error == NO_ERROR)
619 {
620 std::string eMsg;
621 ConfirmHdfsDbFile confirmHdfs;
622 error = confirmHdfs.confirmDbFileListFromMetaFile(tblOid, eMsg);
623
624 if (error != NO_ERROR)
625 {
626 ostringstream ossErr;
627 ossErr << "Error confirming changes to table " <<
628 tableName << "; " << eMsg;
629 err = ossErr.str();
630 rc = 1;
631 }
632 else // Perform extra cleanup that is necessary for HDFS
633 {
634 std::string eMsg;
635 ConfirmHdfsDbFile confirmHdfs;
636 int confirmRc2 = confirmHdfs.endDbFileListFromMetaFile(
637 tblOid, true, eMsg);
638
639 if (confirmRc2 != NO_ERROR)
640 {
641 // Might want to log this error, but don't think we need
642 // to report as fatal, since all changes were confirmed.
643 }
644
645 //flush PrimProc FD cache
646 TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tblOid);
647 ColsExtsInfoMap colsExtsInfoMap = aTableMetaData->getColsExtsInfoMap();
648 ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
649 ColExtsInfo::iterator aIt;
650 std::vector<BRM::FileInfo> files;
651 BRM::FileInfo aFile;
652
653 while (it != colsExtsInfoMap.end())
654 {
655 aIt = (it->second).begin();
656 aFile.oid = it->first;
657
658 while (aIt != (it->second).end())
659 {
660 aFile.partitionNum = aIt->partNum;
661 aFile.dbRoot = aIt->dbRoot;
662 aFile.segmentNum = aIt->segNum;
663 aFile.compType = aIt->compType;
664 files.push_back(aFile);
665 aIt++;
666 }
667
668 it++;
669 }
670
671 if (files.size() > 0)
672 cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
673
674 cacheutils::flushOIDsFromCache(oidsToFlush);
675 fDbrm.invalidateUncommittedExtentLBIDs(0, &lbidList);
676
677 try
678 {
679 BulkRollbackMgr::deleteMetaFile( tblOid );
680 }
681 catch (exception& ex)
682 {
683 err = ex.what();
684 rc = 1;
685 }
686 }
687 } // (error == NO_ERROR) through call to flushChunks()
688
689 if (error != NO_ERROR) // rollback
690 {
691 string applName ("SingleInsert");
692 fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(),
693 applName, false, err);
694 BulkRollbackMgr::deleteMetaFile( tblOid );
695 }
696 } // extra hdfs steps
697
698 fWEWrapper.setIsInsert(false);
699 fWEWrapper.setBulkFlag(false);
700 TableMetaData::removeTableMetaData(tblOid);
701
702 if ((rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) || isWarningSet)
703 {
704 if (rc == NO_ERROR)
705 rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
706
707 Message::Args args;
708 string cols = "'" + colNames[0] + "'";
709
710 for (unsigned i = 1; i < colNames.size(); i++)
711 {
712 cols = cols + ", " + "'" + colNames[i] + "'";
713 }
714
715 args.add(cols);
716 err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
717
718 // Strict mode enabled, so rollback on warning
719 if (insertPkg.get_isWarnToError())
720 {
721 string applName ("SingleInsert");
722 fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(),
723 applName, false, err);
724 BulkRollbackMgr::deleteMetaFile( tblOid );
725 }
726 }
727
728 // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
729 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
730 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
731
732 return rc;
733 }
734
commitVersion(ByteStream & bs,std::string & err)735 uint8_t WE_DMLCommandProc::commitVersion(ByteStream& bs, std::string& err)
736 {
737 int rc = 0;
738 uint32_t tmp32;
739 int txnID;
740
741 bs >> tmp32;
742 txnID = tmp32;
743 //cout << "processing commit txnid = " << txnID << endl;
744 rc = fWEWrapper.commit(txnID);
745
746 if (rc != 0)
747 {
748 WErrorCodes ec;
749 ostringstream oss;
750 oss << "WE: Error commiting transaction; " << txnID << ec.errorString(rc) << endl;
751 err = oss.str();
752 }
753
754 return rc;
755 }
756
rollbackBlocks(ByteStream & bs,std::string & err)757 uint8_t WE_DMLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err)
758 {
759 int rc = 0;
760 uint32_t sessionID, tmp32;;
761 int txnID;
762 bs >> sessionID;
763 bs >> tmp32;
764 txnID = tmp32;
765
766 //cout << "processing rollbackBlocks txnid = " << txnID << endl;
767 try
768 {
769 rc = fWEWrapper.rollbackBlocks(txnID, sessionID);
770 }
771 catch (std::exception& ex)
772 {
773 rc = 1;
774 err = ex.what();
775 }
776
777 return rc;
778 }
779
rollbackVersion(ByteStream & bs,std::string & err)780 uint8_t WE_DMLCommandProc::rollbackVersion(ByteStream& bs, std::string& err)
781 {
782 int rc = 0;
783 uint32_t sessionID, tmp32;
784 int txnID;
785 bs >> sessionID;
786 bs >> tmp32;
787 txnID = tmp32;
788 //cout << "processing rollbackVersion txnid = " << txnID << endl;
789 rc = fWEWrapper.rollbackVersion(txnID, sessionID);
790
791 if (rc != 0)
792 {
793 WErrorCodes ec;
794 ostringstream oss;
795 oss << "WE: Error rolling back transaction " << txnID << " for session " << sessionID << "; " << ec.errorString(rc) << endl;
796 err = oss.str();
797 }
798
799 return rc;
800 }
801
processBatchInsert(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId)802 uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId)
803 {
804 int rc = 0;
805 //cout << "processBatchInsert received bytestream length " << bs.length() << endl;
806
807 InsertDMLPackage insertPkg;
808 ByteStream::quadbyte tmp32;
809 bs >> tmp32;
810 //cout << "processBatchInsert got transaction id " << tmp32 << endl;
811 bs >> PMId;
812 //cout << "processBatchInsert gor PMId " << PMId << endl;
813 insertPkg.read( bs);
814 uint32_t sessionId = insertPkg.get_SessionID();
815 //cout << " processBatchInsert for session " << sessionId << endl;
816 DMLTable* tablePtr = insertPkg.get_Table();
817 bool isAutocommitOn = insertPkg.get_isAutocommitOn();
818
819 if (idbdatafile::IDBPolicy::useHdfs())
820 isAutocommitOn = true;
821
822 //cout << "This session isAutocommitOn is " << isAutocommitOn << endl;
823 BRM::TxnID txnid;
824 txnid.id = tmp32;
825 txnid.valid = true;
826 RowList rows = tablePtr->get_RowList();
827 bool isInsertSelect = insertPkg.get_isInsertSelect();
828
829 WriteEngine::ColStructList colStructs;
830 WriteEngine::DctnryStructList dctnryStructList;
831 WriteEngine::DctnryValueList dctnryValueList;
832 WriteEngine::ColValueList colValuesList;
833 WriteEngine::DictStrList dicStringList ;
834 CalpontSystemCatalog::TableName tableName;
835 CalpontSystemCatalog::TableColName tableColName;
836 tableName.table = tableColName.table = tablePtr->get_TableName();
837 tableName.schema = tableColName.schema = tablePtr->get_SchemaName();
838 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
839 systemCatalogPtr->identity(CalpontSystemCatalog::EC);
840 CalpontSystemCatalog::ROPair roPair;
841 CalpontSystemCatalog::RIDList ridList;
842 CalpontSystemCatalog::DictOIDList dictOids;
843
844 try
845 {
846 ridList = systemCatalogPtr->columnRIDs(tableName, true);
847 roPair = systemCatalogPtr->tableRID( tableName);
848 }
849 catch (std::exception& ex)
850 {
851 err = ex.what();
852 rc = 1;
853 return rc;
854 }
855
856
857 std::vector<OID> dctnryStoreOids(ridList.size()) ;
858 std::vector<Column> columns;
859 DctnryStructList dctnryList;
860 std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
861
862 uint32_t tblOid = roPair.objnum;
863 CalpontSystemCatalog::ColType colType;
864 std::vector<DBRootExtentInfo> colDBRootExtentInfo;
865 bool bFirstExtentOnThisPM = false;
866 Convertor convertor;
867
868 if ( fIsFirstBatchPm )
869 {
870 dbRootExtTrackerVec.clear();
871
872 if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
873 fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
874
875 fWEWrapper.setIsInsert(true);
876 fWEWrapper.setBulkFlag(true);
877 fWEWrapper.setTransId(txnid.id);
878
879 try
880 {
881 // First gather HWM BRM information for all columns
882 std::vector<int> colWidths;
883
884 for (unsigned i = 0; i < ridList.size(); i++)
885 {
886 rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
887 //need handle error
888
889 CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
890 colWidths.push_back( convertor.getCorrectRowWidth(
891 colType2.colDataType, colType2.colWidth) );
892 }
893
894 for (unsigned i = 0; i < ridList.size(); i++)
895 {
896 // Find DBRoot/segment file where we want to start adding rows
897 colType = systemCatalogPtr->colType(ridList[i].objnum);
898 boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker (new DBRootExtentTracker(ridList[i].objnum,
899 colWidths, dbRootHWMInfoColVec, i, 0) );
900 dbRootExtTrackerVec.push_back( pDBRootExtentTracker );
901 DBRootExtentInfo dbRootExtent;
902 std::string trkErrMsg;
903 bool bEmptyPM;
904
905 if (i == 0)
906 {
907 rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM, trkErrMsg);
908 /* cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " << (int)bFirstExtentOnThisPM <<
909 " oid:dbroot:hwm = " << ridList[i].objnum << ":"<<dbRootExtent.fDbRoot << ":"
910 <<":"<<dbRootExtent.fLocalHwm << " err = " << trkErrMsg << endl; */
911 }
912 else
913 pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
914
915
916 colDBRootExtentInfo.push_back(dbRootExtent);
917
918 Column aColumn;
919 aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
920 aColumn.colDataType = colType.colDataType;
921 aColumn.compressionType = colType.compressionType;
922 aColumn.dataFile.oid = ridList[i].objnum;
923 aColumn.dataFile.fPartition = dbRootExtent.fPartition;
924 aColumn.dataFile.fSegment = dbRootExtent.fSegment;
925 aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
926 aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
927 columns.push_back(aColumn);
928
929 if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
930 {
931 DctnryStruct aDctnry;
932 aDctnry.dctnryOid = colType.ddn.dictOID;
933 aDctnry.fColPartition = dbRootExtent.fPartition;
934 aDctnry.fColSegment = dbRootExtent.fSegment;
935 aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
936 dctnryList.push_back(aDctnry);
937 }
938
939 if (colType.ddn.dictOID > 0)
940 {
941 dctnryStoreOids[i] = colType.ddn.dictOID;
942 }
943 else
944 {
945 dctnryStoreOids[i] = 0;
946 }
947 }
948 }
949 catch (std::exception& ex)
950 {
951 err = ex.what();
952 rc = 1;
953 return rc;
954 }
955
956 //@Bug 5996 validate hwm before starts
957 rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting");
958
959 if ( rc != 0)
960 {
961 WErrorCodes ec;
962 err = ec.errorString(rc);
963 err += " Check err.log for detailed information.";
964 fIsFirstBatchPm = false;
965 rc = 1;
966 return rc;
967 }
968 }
969
970 std::vector<BRM::LBIDRange> rangeList;
971
972 // use of MetaFile for bulk rollback support
973 if ( fIsFirstBatchPm && isAutocommitOn)
974 {
975 //save meta data, version last block for each dbroot at the start of batch insert
976 try
977 {
978 fRBMetaWriter->init(tblOid, tableName.table);
979 fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
980
981 //cout << "Saved meta files" << endl;
982 if (!bFirstExtentOnThisPM)
983 {
984 //cout << "Backing up hwm chunks" << endl;
985 for (unsigned i = 0; i < dctnryList.size(); i++) //back up chunks for compressed dictionary
986 {
987 // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
988 fRBMetaWriter->backupDctnryHWMChunk(
989 dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment);
990 }
991 }
992 }
993 catch (WeException& ex) // catch exception to close file, then rethrow
994 {
995 rc = 1;
996 err = ex.what();
997 }
998
999 //Do versioning. Currently, we only version columns, not strings. If there is a design change, this will need to be re-visited
1000 if ( rc != 0)
1001 return rc;
1002
1003 }
1004
1005 std::vector<string> colNames;
1006 bool isWarningSet = false;
1007
1008 if (rows.size())
1009 {
1010 Row* rowPtr = rows.at(0);
1011 ColumnList columns = rowPtr->get_ColumnList();
1012 ColumnList::const_iterator column_iterator = columns.begin();
1013
1014 try
1015 {
1016 while (column_iterator != columns.end())
1017 {
1018 DMLColumn* columnPtr = *column_iterator;
1019 tableColName.column = columnPtr->get_Name();
1020 CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName);
1021
1022 CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
1023
1024 CalpontSystemCatalog::ColType colType;
1025 colType = systemCatalogPtr->colType(oid);
1026
1027 WriteEngine::ColStruct colStruct;
1028 WriteEngine::DctnryStruct dctnryStruct;
1029 colStruct.dataOid = roPair.objnum;
1030 colStruct.tokenFlag = false;
1031 colStruct.fCompressionType = colType.compressionType;
1032
1033 // Token
1034 if ( isDictCol(colType) )
1035 {
1036 colStruct.colWidth = 8;
1037 colStruct.tokenFlag = true;
1038 }
1039 else
1040 {
1041 colStruct.colWidth = colType.colWidth;
1042 }
1043
1044 colStruct.colDataType = colType.colDataType;
1045
1046 if (colStruct.tokenFlag)
1047 {
1048 dctnryStruct.dctnryOid = colType.ddn.dictOID;
1049 dctnryStruct.columnOid = colStruct.dataOid;
1050 dctnryStruct.fCompressionType = colType.compressionType;
1051 dctnryStruct.colWidth = colType.colWidth;
1052 }
1053 else
1054 {
1055 dctnryStruct.dctnryOid = 0;
1056 dctnryStruct.columnOid = colStruct.dataOid;
1057 dctnryStruct.fCompressionType = colType.compressionType;
1058 dctnryStruct.colWidth = colType.colWidth;
1059 }
1060
1061 colStructs.push_back(colStruct);
1062 dctnryStructList.push_back(dctnryStruct);
1063
1064 ++column_iterator;
1065 }
1066 }
1067 catch (std::exception& ex)
1068 {
1069 err = ex.what();
1070 rc = 1;
1071 return rc;
1072 }
1073
1074 unsigned int numcols = rowPtr->get_NumberOfColumns();
1075 std::string tmpStr("");
1076
1077 try
1078 {
1079 for (unsigned int i = 0; i < numcols; i++)
1080 {
1081 WriteEngine::ColTupleList colTuples;
1082 WriteEngine::DctColTupleList dctColTuples;
1083 RowList::const_iterator row_iterator = rows.begin();
1084 bool pushWarning = false;
1085
1086 while (row_iterator != rows.end())
1087 {
1088 Row* rowPtr = *row_iterator;
1089 const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i);
1090
1091 tableColName.column = columnPtr->get_Name();
1092 CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
1093
1094 CalpontSystemCatalog::ColType colType;
1095 colType = systemCatalogPtr->colType(oid);
1096
1097 boost::any datavalue;
1098 bool isNULL = false;
1099 std::vector<std::string> origVals;
1100 origVals = columnPtr->get_DataVector();
1101 WriteEngine::dictStr dicStrings;
1102
1103 // token
1104 if ( isDictCol(colType) )
1105 {
1106 for ( uint32_t i = 0; i < origVals.size(); i++ )
1107 {
1108 tmpStr = origVals[i];
1109
1110 if ( tmpStr.length() == 0 )
1111 isNULL = true;
1112 else
1113 isNULL = false;
1114
1115 if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
1116 {
1117 if (isNULL && colType.defaultValue.empty()) //error out
1118 {
1119 Message::Args args;
1120 args.add(tableColName.column);
1121 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
1122 rc = 1;
1123 return rc;
1124 }
1125 else if (isNULL && !(colType.defaultValue.empty()))
1126 {
1127 tmpStr = colType.defaultValue;
1128 }
1129 }
1130
1131 if ( tmpStr.length() > (unsigned int)colType.colWidth )
1132 {
1133 tmpStr = tmpStr.substr(0, colType.colWidth);
1134
1135 if ( !pushWarning )
1136 pushWarning = true;
1137 }
1138
1139 WriteEngine::ColTuple colTuple;
1140 colTuple.data = datavalue;
1141
1142 colTuples.push_back(colTuple);
1143 //@Bug 2515. Only pass string values to write engine
1144 dicStrings.push_back( tmpStr );
1145 }
1146
1147 colValuesList.push_back(colTuples);
1148 //@Bug 2515. Only pass string values to write engine
1149 dicStringList.push_back( dicStrings );
1150 }
1151 else
1152 {
1153 string x;
1154 std::string indata;
1155 //scan once to check how many autoincrement value needed
1156 uint32_t nextValNeeded = 0;
1157 uint64_t nextVal = 1;
1158
1159 if (colType.autoincrement)
1160 {
1161 try
1162 {
1163 nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
1164 fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
1165 }
1166 catch (std::exception& ex)
1167 {
1168 err = ex.what();
1169 rc = 1;
1170 return rc;
1171 }
1172
1173 for ( uint32_t i = 0; i < origVals.size(); i++ )
1174 {
1175 indata = origVals[i];
1176
1177 if ( indata.length() == 0 )
1178 isNULL = true;
1179 else
1180 isNULL = false;
1181
1182 if ( isNULL || (indata.compare("0") == 0))
1183 nextValNeeded++;
1184 }
1185 }
1186
1187 if (nextValNeeded > 0) //reserve next value
1188 {
1189 try
1190 {
1191 bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);
1192
1193 if (!reserved)
1194 {
1195 err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
1196 rc = 1;
1197 return rc;
1198 }
1199 }
1200 catch (std::exception& ex)
1201 {
1202 err = ex.what();
1203 rc = 1;
1204 return rc;
1205 }
1206 }
1207
1208 for ( uint32_t i = 0; i < origVals.size(); i++ )
1209 {
1210 indata = origVals[i];
1211
1212 if ( indata.length() == 0 )
1213 isNULL = true;
1214 else
1215 isNULL = false;
1216
1217 //check if autoincrement column and value is 0 or null
1218 if (colType.autoincrement && ( isNULL || (indata.compare("0") == 0)))
1219 {
1220 ostringstream oss;
1221 oss << nextVal++;
1222 indata = oss.str();
1223 isNULL = false;
1224 }
1225
1226 if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
1227 {
1228 if (isNULL && colType.defaultValue.empty()) //error out
1229 {
1230 Message::Args args;
1231 args.add(tableColName.column);
1232 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
1233 rc = 1;
1234 return rc;
1235 }
1236 else if (isNULL && !(colType.defaultValue.empty()))
1237 {
1238 indata = colType.defaultValue;
1239 isNULL = false;
1240 }
1241 }
1242
1243 try
1244 {
1245 datavalue = DataConvert::convertColumnData(colType, indata, pushWarning, insertPkg.get_TimeZone(), isNULL, false, false);
1246 }
1247 catch (exception&)
1248 {
1249 rc = 1;
1250 Message::Args args;
1251 args.add(string("'") + indata + string("'"));
1252 err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
1253 }
1254
1255 //@Bug 1806
1256 if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
1257 {
1258 return rc;
1259 }
1260
1261 if ( pushWarning && ( rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING ) )
1262 rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
1263
1264 WriteEngine::ColTuple colTuple;
1265 colTuple.data = datavalue;
1266
1267 colTuples.push_back(colTuple);
1268 //@Bug 2515. Only pass string values to write engine
1269 dicStrings.push_back( tmpStr );
1270 }
1271
1272 colValuesList.push_back(colTuples);
1273 dicStringList.push_back( dicStrings );
1274 }
1275
1276 ++row_iterator;
1277 }
1278
1279 if (pushWarning)
1280 {
1281 colNames.push_back(tableColName.column);
1282 isWarningSet = true;
1283 }
1284 }
1285 }
1286 catch (std::exception& ex)
1287 {
1288 err = ex.what();
1289 rc = 1;
1290 return rc;
1291 }
1292 }
1293
1294 // call the write engine to write the rows
1295 int error = NO_ERROR;
1296
1297 //fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
1298 //cout << "Batch inserting a row with transaction id " << txnid.id << endl;
1299 if (colValuesList.size() > 0)
1300 {
1301 if (colValuesList[0].size() > 0)
1302 {
1303 /* Begin-Disable use of MetaFile for bulk rollback support;
1304 Use alternate call below that passes 0 ptr for RBMetaWriter
1305 if (NO_ERROR !=
1306 (error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList,
1307 dbRootExtTrackerVec, fRBMetaWriter.get(), bFirstExtentOnThisPM, isInsertSelect, 0, roPair.objnum, fIsFirstBatchPm)))
1308 End-Disable use of MetaFile for bulk rollback support
1309 */
1310
1311 if (NO_ERROR !=
1312 (error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList,
1313 dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
1314 {
1315 if (error == ERR_BRM_DEAD_LOCK)
1316 {
1317 rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
1318 WErrorCodes ec;
1319 err = ec.errorString(error);
1320 }
1321 else if ( error == ERR_BRM_VB_OVERFLOW )
1322 {
1323 rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
1324 err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
1325 }
1326 else
1327 {
1328 rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
1329 WErrorCodes ec;
1330 err = ec.errorString(error);
1331 }
1332 }
1333 }
1334 }
1335
1336 if (fIsFirstBatchPm && isAutocommitOn)
1337 {
1338 //fWEWrapper.writeVBEnd(txnid.id, rangeList);
1339 fIsFirstBatchPm = false;
1340 }
1341 else if (fIsFirstBatchPm)
1342 {
1343 fIsFirstBatchPm = false;
1344 }
1345
1346 if ( isWarningSet && ( rc == NO_ERROR ) )
1347 {
1348 rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
1349 //cout << "Got warning" << endl;
1350 Message::Args args;
1351 string cols = "'" + colNames[0] + "'";
1352
1353 for (unsigned i = 1; i < colNames.size(); i++)
1354 {
1355 cols = cols + ", " + "'" + colNames[i] + "'";
1356 }
1357
1358 args.add(cols);
1359 err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
1360
1361 // Strict mode enabled, so rollback on warning
1362 if (insertPkg.get_isWarnToError())
1363 {
1364 string applName ("BatchInsert");
1365 fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(),
1366 applName, false, err);
1367 BulkRollbackMgr::deleteMetaFile( tblOid );
1368 }
1369 }
1370
1371 // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
1372 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
1373 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
1374 return rc;
1375 }
1376
processBatchInsertBinary(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId)1377 uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId)
1378 {
1379 int rc = 0;
1380 //cout << "processBatchInsert received bytestream length " << bs.length() << endl;
1381
1382 ByteStream::quadbyte tmp32;
1383 ByteStream::byte tmp8;
1384 bs >> tmp32;
1385 //cout << "processBatchInsert got transaction id " << tmp32 << endl;
1386 bs >> PMId;
1387 //cout << "processBatchInsert gor PMId " << PMId << endl;
1388 uint32_t sessionId;
1389 bs >> sessionId;
1390 //cout << " processBatchInsert for session " << sessionId << endl;
1391 bool isAutocommitOn;
1392 bs >> tmp8;
1393 isAutocommitOn = tmp8;
1394
1395 if (idbdatafile::IDBPolicy::useHdfs())
1396 isAutocommitOn = true;
1397
1398 //cout << "This session isAutocommitOn is " << isAutocommitOn << endl;
1399 BRM::TxnID txnid;
1400 txnid.id = tmp32;
1401 txnid.valid = true;
1402 bool isInsertSelect;
1403 bs >> tmp8;
1404 // For insert select, skip the hwm block and start inserting from the next block
1405 // to avoid self insert issue.
1406 //For batch insert: if not first batch, use the saved last rid to start adding rows.
1407 isInsertSelect = tmp8;
1408
1409 WriteEngine::ColStructList colStructs;
1410 WriteEngine::DctnryStructList dctnryStructList;
1411 WriteEngine::DctnryValueList dctnryValueList;
1412 std::vector<uint64_t> colValuesList;
1413 WriteEngine::DictStrList dicStringList ;
1414 CalpontSystemCatalog::TableName tableName;
1415 CalpontSystemCatalog::TableColName tableColName;
1416 bs >> tableColName.table;
1417 bs >> tableColName.schema;
1418 tableName.table = tableColName.table;
1419 tableName.schema = tableColName.schema;
1420 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
1421 systemCatalogPtr->identity(CalpontSystemCatalog::EC);
1422 CalpontSystemCatalog::ROPair roPair;
1423 CalpontSystemCatalog::RIDList ridList;
1424 CalpontSystemCatalog::DictOIDList dictOids;
1425
1426 try
1427 {
1428 ridList = systemCatalogPtr->columnRIDs(tableName, true);
1429 roPair = systemCatalogPtr->tableRID( tableName);
1430 }
1431 catch (std::exception& ex)
1432 {
1433 err = ex.what();
1434 rc = 1;
1435 return rc;
1436 }
1437
1438
1439 std::vector<OID> dctnryStoreOids(ridList.size()) ;
1440 std::vector<Column> columns;
1441 DctnryStructList dctnryList;
1442 std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
1443
1444 uint32_t tblOid = roPair.objnum;
1445 CalpontSystemCatalog::ColType colType;
1446 std::vector<DBRootExtentInfo> colDBRootExtentInfo;
1447 bool bFirstExtentOnThisPM = false;
1448 Convertor convertor;
1449
1450 if ( fIsFirstBatchPm )
1451 {
1452 dbRootExtTrackerVec.clear();
1453
1454 if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
1455 fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
1456
1457 fWEWrapper.setIsInsert(true);
1458 fWEWrapper.setBulkFlag(true);
1459 fWEWrapper.setTransId(txnid.id);
1460
1461 try
1462 {
1463 // First gather HWM BRM information for all columns
1464 std::vector<int> colWidths;
1465
1466 for (unsigned i = 0; i < ridList.size(); i++)
1467 {
1468 rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
1469 //need handle error
1470
1471 CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
1472 colWidths.push_back( convertor.getCorrectRowWidth(
1473 colType2.colDataType, colType2.colWidth) );
1474 }
1475
1476 for (unsigned i = 0; i < ridList.size(); i++)
1477 {
1478 // Find DBRoot/segment file where we want to start adding rows
1479 colType = systemCatalogPtr->colType(ridList[i].objnum);
1480 boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker (new DBRootExtentTracker(ridList[i].objnum,
1481 colWidths, dbRootHWMInfoColVec, i, 0) );
1482 dbRootExtTrackerVec.push_back( pDBRootExtentTracker );
1483 DBRootExtentInfo dbRootExtent;
1484 std::string trkErrMsg;
1485 bool bEmptyPM;
1486
1487 if (i == 0)
1488 {
1489 rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM, trkErrMsg);
1490 /* cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " << (int)bFirstExtentOnThisPM <<
1491 " oid:dbroot:hwm = " << ridList[i].objnum << ":"<<dbRootExtent.fDbRoot << ":"
1492 <<":"<<dbRootExtent.fLocalHwm << " err = " << trkErrMsg << endl; */
1493 }
1494 else
1495 pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
1496
1497
1498 colDBRootExtentInfo.push_back(dbRootExtent);
1499
1500 Column aColumn;
1501 aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
1502 aColumn.colDataType = colType.colDataType;
1503 aColumn.compressionType = colType.compressionType;
1504 aColumn.dataFile.oid = ridList[i].objnum;
1505 aColumn.dataFile.fPartition = dbRootExtent.fPartition;
1506 aColumn.dataFile.fSegment = dbRootExtent.fSegment;
1507 aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
1508 aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
1509 columns.push_back(aColumn);
1510
1511 if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
1512 {
1513 DctnryStruct aDctnry;
1514 aDctnry.dctnryOid = colType.ddn.dictOID;
1515 aDctnry.fColPartition = dbRootExtent.fPartition;
1516 aDctnry.fColSegment = dbRootExtent.fSegment;
1517 aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
1518 dctnryList.push_back(aDctnry);
1519 }
1520
1521 if (colType.ddn.dictOID > 0)
1522 {
1523 dctnryStoreOids[i] = colType.ddn.dictOID;
1524 }
1525 else
1526 {
1527 dctnryStoreOids[i] = 0;
1528 }
1529 }
1530 }
1531 catch (std::exception& ex)
1532 {
1533 err = ex.what();
1534 rc = 1;
1535 return rc;
1536 }
1537
1538 //@Bug 5996 validate hwm before starts
1539 rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting");
1540
1541 if ( rc != 0)
1542 {
1543 WErrorCodes ec;
1544 err = ec.errorString(rc);
1545 err += " Check err.log for detailed information.";
1546 fIsFirstBatchPm = false;
1547 rc = 1;
1548 return rc;
1549 }
1550 }
1551
1552 std::vector<BRM::LBIDRange> rangeList;
1553
1554 // use of MetaFile for bulk rollback support
1555 if ( fIsFirstBatchPm && isAutocommitOn)
1556 {
1557 //save meta data, version last block for each dbroot at the start of batch insert
1558 try
1559 {
1560 fRBMetaWriter->init(tblOid, tableName.table);
1561 fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
1562
1563 //cout << "Saved meta files" << endl;
1564 if (!bFirstExtentOnThisPM)
1565 {
1566 //cout << "Backing up hwm chunks" << endl;
1567 for (unsigned i = 0; i < dctnryList.size(); i++) //back up chunks for compressed dictionary
1568 {
1569 // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
1570 fRBMetaWriter->backupDctnryHWMChunk(
1571 dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment);
1572 }
1573 }
1574 }
1575 catch (WeException& ex) // catch exception to close file, then rethrow
1576 {
1577 rc = 1;
1578 err = ex.what();
1579 }
1580
1581 //Do versioning. Currently, we only version columns, not strings. If there is a design change, this will need to be re-visited
1582 if ( rc != 0)
1583 return rc;
1584
1585 }
1586
1587 std::vector<string> colNames;
1588 bool isWarningSet = false;
1589 uint32_t columnCount;
1590 bs >> columnCount;
1591
1592 if (columnCount)
1593 {
1594 try
1595 {
1596 for (uint32_t current_column = 0; current_column < columnCount; current_column++)
1597 {
1598 uint32_t tmp32;
1599 std::string colName;
1600 bs >> tmp32;
1601 bs >> colName;
1602 colNames.push_back(colName);
1603 CalpontSystemCatalog::OID oid = tmp32;
1604
1605 CalpontSystemCatalog::ColType colType;
1606 colType = systemCatalogPtr->colType(oid);
1607
1608 WriteEngine::ColStruct colStruct;
1609 WriteEngine::DctnryStruct dctnryStruct;
1610 colStruct.dataOid = oid;
1611 colStruct.tokenFlag = false;
1612 colStruct.fCompressionType = colType.compressionType;
1613
1614 // Token
1615 if ( isDictCol(colType) )
1616 {
1617 colStruct.colWidth = 8;
1618 colStruct.tokenFlag = true;
1619 }
1620 else
1621 {
1622 colStruct.colWidth = colType.colWidth;
1623 }
1624
1625 colStruct.colDataType = colType.colDataType;
1626
1627 if (colStruct.tokenFlag)
1628 {
1629 dctnryStruct.dctnryOid = colType.ddn.dictOID;
1630 dctnryStruct.columnOid = colStruct.dataOid;
1631 dctnryStruct.fCompressionType = colType.compressionType;
1632 dctnryStruct.colWidth = colType.colWidth;
1633 }
1634 else
1635 {
1636 dctnryStruct.dctnryOid = 0;
1637 dctnryStruct.columnOid = colStruct.dataOid;
1638 dctnryStruct.fCompressionType = colType.compressionType;
1639 dctnryStruct.colWidth = colType.colWidth;
1640 }
1641
1642 colStructs.push_back(colStruct);
1643 dctnryStructList.push_back(dctnryStruct);
1644
1645 }
1646 }
1647 catch (std::exception& ex)
1648 {
1649 err = ex.what();
1650 rc = 1;
1651 return rc;
1652 }
1653
1654 std::string tmpStr("");
1655 uint32_t valuesPerColumn;
1656 bs >> valuesPerColumn;
1657 colValuesList.reserve(columnCount * valuesPerColumn);
1658
1659 try
1660 {
1661 bool pushWarning = false;
1662
1663 for (uint32_t j = 0; j < columnCount; j++)
1664 {
1665 WriteEngine::DctColTupleList dctColTuples;
1666 tableColName.column = colNames[j];
1667 CalpontSystemCatalog::OID oid = colStructs[j].dataOid;
1668
1669 CalpontSystemCatalog::ColType colType;
1670 colType = systemCatalogPtr->colType(oid);
1671
1672 bool isNULL = false;
1673 WriteEngine::dictStr dicStrings;
1674
1675 // token
1676 if ( isDictCol(colType) )
1677 {
1678 for ( uint32_t i = 0; i < valuesPerColumn; i++ )
1679 {
1680 bs >> tmp8;
1681 isNULL = tmp8;
1682 bs >> tmpStr;
1683
1684 if ( tmpStr.length() == 0 )
1685 isNULL = true;
1686
1687 if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
1688 {
1689 if (isNULL && colType.defaultValue.empty()) //error out
1690 {
1691 Message::Args args;
1692 args.add(tableColName.column);
1693 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
1694 rc = 1;
1695 return rc;
1696 }
1697 else if (isNULL && !(colType.defaultValue.empty()))
1698 {
1699 tmpStr = colType.defaultValue;
1700 }
1701 }
1702
1703 if ( tmpStr.length() > (unsigned int)colType.colWidth )
1704 {
1705 tmpStr = tmpStr.substr(0, colType.colWidth);
1706
1707 if ( !pushWarning )
1708 pushWarning = true;
1709 }
1710
1711 colValuesList.push_back(0);
1712 //@Bug 2515. Only pass string values to write engine
1713 dicStrings.push_back( tmpStr );
1714 }
1715
1716 //@Bug 2515. Only pass string values to write engine
1717 dicStringList.push_back( dicStrings );
1718 }
1719 else
1720 {
1721 string x;
1722 //scan once to check how many autoincrement value needed
1723 uint32_t nextValNeeded = 0;
1724 uint64_t nextVal = 1;
1725
1726 if (colType.autoincrement)
1727 {
1728 try
1729 {
1730 nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
1731 fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
1732 }
1733 catch (std::exception& ex)
1734 {
1735 err = ex.what();
1736 rc = 1;
1737 return rc;
1738 }
1739 }
1740
1741 for ( uint32_t i = 0; i < valuesPerColumn; i++ )
1742 {
1743 bs >> tmp8;
1744 isNULL = tmp8;
1745
1746 uint8_t val8;
1747 uint16_t val16;
1748 uint32_t val32;
1749 uint64_t val64;
1750 uint64_t colValue;
1751 float valF;
1752 double valD;
1753 std::string valStr;
1754 bool valZero = false; // Needed for autoinc check
1755
1756 switch (colType.colDataType)
1757 {
1758 case execplan::CalpontSystemCatalog::TINYINT:
1759 case execplan::CalpontSystemCatalog::UTINYINT:
1760 bs >> val8;
1761
1762 if (val8 == 0)
1763 valZero = true;
1764
1765 colValue = val8;
1766 break;
1767
1768 case execplan::CalpontSystemCatalog::SMALLINT:
1769 case execplan::CalpontSystemCatalog::USMALLINT:
1770 bs >> val16;
1771
1772 if (val16 == 0)
1773 valZero = true;
1774
1775 colValue = val16;
1776 break;
1777
1778 case execplan::CalpontSystemCatalog::DATE:
1779 case execplan::CalpontSystemCatalog::MEDINT:
1780 case execplan::CalpontSystemCatalog::INT:
1781 case execplan::CalpontSystemCatalog::UMEDINT:
1782 case execplan::CalpontSystemCatalog::UINT:
1783 bs >> val32;
1784
1785 if (val32 == 0)
1786 valZero = true;
1787
1788 colValue = val32;
1789 break;
1790
1791 case execplan::CalpontSystemCatalog::BIGINT:
1792 case execplan::CalpontSystemCatalog::DATETIME:
1793 case execplan::CalpontSystemCatalog::TIME:
1794 case execplan::CalpontSystemCatalog::TIMESTAMP:
1795 case execplan::CalpontSystemCatalog::UBIGINT:
1796 bs >> val64;
1797
1798 if (val64 == 0)
1799 valZero = true;
1800
1801 colValue = val64;
1802 break;
1803
1804 case execplan::CalpontSystemCatalog::DECIMAL:
1805 switch (colType.colWidth)
1806 {
1807 case 1:
1808 {
1809 bs >> val8;
1810 colValue = val8;
1811 break;
1812 }
1813
1814 case 2:
1815 {
1816 bs >> val16;
1817 colValue = val16;
1818 break;
1819 }
1820
1821 case 4:
1822 {
1823 bs >> val32;
1824 colValue = val32;
1825 break;
1826 }
1827
1828 default:
1829 {
1830 bs >> val64;
1831 colValue = val64;
1832 break;
1833 }
1834 }
1835
1836 break;
1837
1838 case execplan::CalpontSystemCatalog::UDECIMAL:
1839
1840 // UDECIMAL numbers may not be negative
1841 if (colType.colWidth == 1)
1842 {
1843 bs >> val8;
1844
1845 // FIXME: IDK what would it mean if valN are unsigned
1846 if (utils::is_negative(val8) &&
1847 val8 != joblist::TINYINTEMPTYROW &&
1848 val8 != joblist::TINYINTNULL)
1849 {
1850 val8 = 0;
1851 pushWarning = true;
1852 }
1853
1854 colValue = val8;
1855 }
1856 else if (colType.colWidth == 2)
1857 {
1858 bs >> val16;
1859
1860 if (utils::is_negative(val16) &&
1861 val16 != joblist::SMALLINTEMPTYROW &&
1862 val16 != joblist::SMALLINTNULL)
1863 {
1864 val16 = 0;
1865 pushWarning = true;
1866 }
1867
1868 colValue = val16;
1869 }
1870 else if (colType.colWidth == 4)
1871 {
1872 bs >> val32;
1873
1874 if (utils::is_negative(val32) &&
1875 val32 != joblist::INTEMPTYROW &&
1876 val32 != joblist::INTNULL)
1877 {
1878 val32 = 0;
1879 pushWarning = true;
1880 }
1881
1882 colValue = val32;
1883 }
1884 else if (colType.colWidth == 8)
1885 {
1886 bs >> val64;
1887
1888 if (utils::is_negative(val64) &&
1889 val64 != joblist::BIGINTEMPTYROW &&
1890 val64 != joblist::BIGINTNULL)
1891 {
1892 val64 = 0;
1893 pushWarning = true;
1894 }
1895
1896 colValue = val64;
1897 }
1898
1899 break;
1900
1901 case execplan::CalpontSystemCatalog::DOUBLE:
1902 bs >> val64;
1903 colValue = val64;
1904 break;
1905
1906 case execplan::CalpontSystemCatalog::UDOUBLE:
1907 bs >> val64;
1908 memcpy(&valD, &val64, 8);
1909
1910 if (valD < 0.0 && valD != joblist::DOUBLEEMPTYROW && valD != joblist::DOUBLENULL)
1911 {
1912 valD = 0.0;
1913 pushWarning = true;
1914 }
1915
1916 colValue = val64;
1917 break;
1918
1919 case execplan::CalpontSystemCatalog::FLOAT:
1920 bs >> val32;
1921 colValue = val32;
1922 break;
1923
1924 case execplan::CalpontSystemCatalog::UFLOAT:
1925 bs >> val32;
1926 memcpy(&valF, &val32, 4);
1927
1928 if (valF < 0.0 && valF != joblist::FLOATEMPTYROW && valF != joblist::FLOATNULL)
1929 {
1930 valF = 0.0;
1931 pushWarning = true;
1932 }
1933
1934 colValue = val32;
1935 break;
1936
1937 case execplan::CalpontSystemCatalog::CHAR:
1938 case execplan::CalpontSystemCatalog::VARCHAR:
1939 case execplan::CalpontSystemCatalog::TEXT:
1940 case execplan::CalpontSystemCatalog::BLOB:
1941 bs >> valStr;
1942
1943 if (valStr.length() > (unsigned int)colType.colWidth)
1944 {
1945 valStr = valStr.substr(0, colType.colWidth);
1946 pushWarning = true;
1947 }
1948 else
1949 {
1950 if ( (unsigned int)colType.colWidth > valStr.length())
1951 {
1952 //Pad null character to the string
1953 valStr.resize(colType.colWidth, 0);
1954 }
1955 }
1956
1957 // FIXME: colValue is uint64_t (8 bytes)
1958 memcpy(&colValue, valStr.c_str(), valStr.length());
1959 break;
1960
1961 default:
1962 rc = 1;
1963 err = IDBErrorInfo::instance()->errorMsg(ERR_DATATYPE_NOT_SUPPORT);
1964 break;
1965 }
1966
1967 //check if autoincrement column and value is 0 or null
1968 if (colType.autoincrement && ( isNULL || valZero))
1969 {
1970 ostringstream oss;
1971 oss << nextVal++;
1972 isNULL = false;
1973
1974 try
1975 {
1976 nextValNeeded++;
1977 bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);
1978
1979 if (!reserved)
1980 {
1981 err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
1982 rc = 1;
1983 return rc;
1984 }
1985 }
1986 catch (std::exception& ex)
1987 {
1988 err = ex.what();
1989 rc = 1;
1990 return rc;
1991 }
1992
1993 colValue = nextVal;
1994 }
1995
1996 if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
1997 {
1998 if (isNULL && colType.defaultValue.empty()) //error out
1999 {
2000 Message::Args args;
2001 args.add(tableColName.column);
2002 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
2003 rc = 1;
2004 return rc;
2005 }
2006 else if (isNULL && !(colType.defaultValue.empty()))
2007 {
2008 memcpy(&colValue, colType.defaultValue.c_str(), colType.defaultValue.length());
2009 isNULL = false;
2010 }
2011 }
2012
2013 //@Bug 1806
2014 if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
2015 {
2016 return rc;
2017 }
2018
2019 if ( pushWarning && ( rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING ) )
2020 rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
2021
2022
2023 colValuesList.push_back(colValue);
2024 //@Bug 2515. Only pass string values to write engine
2025 dicStrings.push_back( valStr );
2026 }
2027
2028 dicStringList.push_back( dicStrings );
2029 }
2030
2031 if (pushWarning)
2032 {
2033 colNames.push_back(tableColName.column);
2034 isWarningSet = true;
2035 }
2036 }
2037 }
2038 catch (std::exception& ex)
2039 {
2040 err = ex.what();
2041 rc = 1;
2042 return rc;
2043 }
2044 }
2045
2046 // call the write engine to write the rows
2047 int error = NO_ERROR;
2048
2049 //fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
2050 //cout << "Batch inserting a row with transaction id " << txnid.id << endl;
2051 if (colValuesList.size() > 0)
2052 {
2053 if (NO_ERROR !=
2054 (error = fWEWrapper.insertColumnRecsBinary(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList,
2055 dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
2056 {
2057 if (error == ERR_BRM_DEAD_LOCK)
2058 {
2059 rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
2060 WErrorCodes ec;
2061 err = ec.errorString(error);
2062 }
2063 else if ( error == ERR_BRM_VB_OVERFLOW )
2064 {
2065 rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
2066 err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
2067 }
2068 else
2069 {
2070 rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
2071 WErrorCodes ec;
2072 err = ec.errorString(error);
2073 }
2074 }
2075 }
2076
2077 if (fIsFirstBatchPm && isAutocommitOn)
2078 {
2079 //fWEWrapper.writeVBEnd(txnid.id, rangeList);
2080 fIsFirstBatchPm = false;
2081 }
2082 else if (fIsFirstBatchPm)
2083 {
2084 fIsFirstBatchPm = false;
2085 }
2086
2087 if ( isWarningSet && ( rc == NO_ERROR ) )
2088 {
2089 rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
2090 //cout << "Got warning" << endl;
2091 Message::Args args;
2092 string cols = "'" + colNames[0] + "'";
2093
2094 for (unsigned i = 1; i < colNames.size(); i++)
2095 {
2096 cols = cols + ", " + "'" + colNames[i] + "'";
2097 }
2098
2099 args.add(cols);
2100 err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
2101
2102 }
2103
2104 // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
2105 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
2106 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
2107 return rc;
2108 }
2109
2110
commitBatchAutoOn(messageqcpp::ByteStream & bs,std::string & err)2111 uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err)
2112 {
2113 uint8_t rc = 0;
2114 //need to commit the versioned blocks, set hwm, update casual partition, send back to DMLProc to set them
2115 //cout << " in commiting autocommit on batch insert " << endl;
2116 uint32_t tmp32, tableOid, sessionId;
2117 int txnID;
2118
2119 bs >> tmp32;
2120 txnID = tmp32;
2121 bs >> tmp32;
2122 tableOid = tmp32;
2123 bs >> tmp32;
2124 sessionId = tmp32;
2125
2126 BRM::DBRM dbrm;
2127
2128 std::vector<BRM::BulkSetHWMArg> setHWMArgs;
2129 TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
2130 ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
2131 ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
2132 ColExtsInfo::iterator aIt;
2133 BulkSetHWMArg aArg;
2134 std::vector<BRM::FileInfo> files;
2135 BRM::FileInfo aFile;
2136
2137 while (it != colsExtsInfoMap.end())
2138 {
2139 aIt = (it->second).begin();
2140 aArg.oid = it->first;
2141 aFile.oid = it->first;
2142
2143 //cout << "OID:" << aArg.oid;
2144 while (aIt != (it->second).end())
2145 {
2146 aArg.partNum = aIt->partNum;
2147 aArg.segNum = aIt->segNum;
2148 aArg.hwm = aIt->hwm;
2149
2150 if (!aIt->isDict)
2151 setHWMArgs.push_back(aArg);
2152
2153 aFile.partitionNum = aIt->partNum;
2154 aFile.dbRoot = aIt->dbRoot;
2155 aFile.segmentNum = aIt->segNum;
2156 aFile.compType = aIt->compType;
2157 //cout <<"Added to files oid:dbroot:part:seg:compType = " << aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
2158 //<<":"<<aFile.compType <<endl;
2159 files.push_back(aFile);
2160 aIt++;
2161 }
2162
2163 it++;
2164 }
2165
2166 bs.restart();
2167 //cout << " serialized setHWMArgs size = " << setHWMArgs.size() << endl;
2168 serializeInlineVector (bs, setHWMArgs);
2169
2170 //flush files
2171 //cout << "flush files when autocommit on" << endl;
2172 fWEWrapper.setIsInsert(true);
2173 fWEWrapper.setBulkFlag(true);
2174
2175 std::map<uint32_t, uint32_t> oids;
2176 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
2177 CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
2178
2179 CalpontSystemCatalog::TableName aTableName = systemCatalogPtr->tableName(tableOid);
2180 CalpontSystemCatalog::RIDList ridList;
2181
2182 try
2183 {
2184 ridList = systemCatalogPtr->columnRIDs(aTableName, true);
2185 }
2186 catch (std::exception& ex)
2187 {
2188 err = ex.what();
2189 rc = 1;
2190 return rc;
2191 }
2192
2193 for (unsigned i = 0; i < ridList.size(); i++)
2194 {
2195 oids[ridList[i].objnum] = ridList[i].objnum;
2196 }
2197
2198 CalpontSystemCatalog::DictOIDList dictOids;
2199
2200 try
2201 {
2202 dictOids = systemCatalogPtr->dictOIDs(aTableName);
2203 }
2204 catch (std::exception& ex)
2205 {
2206 err = ex.what();
2207 rc = 1;
2208 return rc;
2209 }
2210
2211 for (unsigned i = 0; i < dictOids.size(); i++)
2212 {
2213 oids[dictOids[i].dictOID] = dictOids[i].dictOID;
2214 }
2215
2216 fWEWrapper.setTransId(txnID);
2217
2218 fWEWrapper.setIsInsert(false);
2219 fWEWrapper.setBulkFlag(false);
2220
2221 fIsFirstBatchPm = true;
2222
2223 if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0) )
2224 cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
2225
2226 TableMetaData::removeTableMetaData(tableOid);
2227
2228 // MCOL-1160 For API bulk insert flush the PrimProc cached dictionary
2229 // blocks tounched
2230 std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
2231 mapIter = fWEWrapper.getDictMap().find(txnID);
2232
2233 if (mapIter != fWEWrapper.getDictMap().end())
2234 {
2235 std::set<BRM::LBID_t>::iterator lbidIter;
2236 std::vector<BRM::LBID_t> dictFlushBlks;
2237 cerr << "API Flushing blocks: ";
2238 for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++)
2239 {
2240 cerr << *lbidIter << ", ";
2241 dictFlushBlks.push_back((*lbidIter));
2242 }
2243 cerr << endl;
2244 cacheutils::flushPrimProcAllverBlocks(dictFlushBlks);
2245 fWEWrapper.getDictMap().erase(txnID);
2246 }
2247
2248 // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
2249 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
2250 CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
2251 return rc;
2252 }
2253
processBatchInsertHwm(messageqcpp::ByteStream & bs,std::string & err)2254 uint8_t WE_DMLCommandProc::processBatchInsertHwm(messageqcpp::ByteStream& bs, std::string& err)
2255 {
2256 uint8_t tmp8, rc = 0;
2257 err.clear();
2258 //set hwm for autocommit off
2259 uint32_t tmp32, tableOid;
2260 int txnID;
2261 bool isAutoCommitOn;
2262
2263 bs >> tmp32;
2264 txnID = tmp32;
2265 bs >> tmp8;
2266 isAutoCommitOn = (tmp8 != 0);
2267 bs >> tmp32;
2268 tableOid = tmp32;
2269 bs >> tmp8;
2270 //cout << "processBatchInsertHwm: tranid:isAutoCommitOn = " <<txnID <<":"<< (int)isAutoCommitOn << endl;
2271 std::vector<BRM::FileInfo> files;
2272 std::vector<BRM::OID_t> oidsToFlush;
2273
2274 BRM::FileInfo aFile;
2275 //BRM::FileInfo curFile;
2276 TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
2277 ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
2278 ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
2279 ColExtsInfo::iterator aIt;
2280 CalpontSystemCatalog::RIDList ridList;
2281 CalpontSystemCatalog::ROPair roPair;
2282 std::vector<DBRootExtentInfo> colDBRootExtentInfo;
2283 DBRootExtentInfo aExtentInfo;
2284
2285 while (it != colsExtsInfoMap.end())
2286 {
2287 aIt = (it->second).begin();
2288 aFile.oid = it->first;
2289 oidsToFlush.push_back(aFile.oid);
2290 roPair.objnum = aFile.oid;
2291 aExtentInfo.fPartition = 0;
2292 aExtentInfo.fDbRoot = 0;
2293 aExtentInfo.fSegment = 0;
2294 aExtentInfo.fLocalHwm = 0;
2295 bool isDict = false;
2296
2297 while (aIt != (it->second).end())
2298 {
2299 aFile.partitionNum = aIt->partNum;
2300 aFile.dbRoot = aIt->dbRoot;
2301 aFile.segmentNum = aIt->segNum;
2302 aFile.compType = aIt->compType;
2303 files.push_back(aFile);
2304
2305 if (!aIt->isDict)
2306 {
2307 if ((aIt->partNum > aExtentInfo.fPartition) || ((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum > aExtentInfo.fSegment)) ||
2308 ((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum == aExtentInfo.fSegment) && (aIt->segNum > aExtentInfo.fLocalHwm )))
2309 {
2310 aExtentInfo.fPartition = aIt->partNum;
2311 aExtentInfo.fDbRoot = aIt->dbRoot;
2312 aExtentInfo.fSegment = aIt->segNum;
2313 aExtentInfo.fLocalHwm = aIt->hwm;
2314 }
2315 }
2316 else
2317 {
2318 isDict = true;
2319 }
2320
2321 aIt++;
2322 }
2323
2324 if (!isDict)
2325 {
2326 ridList.push_back(roPair);
2327 colDBRootExtentInfo.push_back(aExtentInfo);
2328 }
2329
2330 it++;
2331 }
2332
2333 //@Bug 5996. Validate hwm before set them
2334 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(0);
2335 systemCatalogPtr->identity(CalpontSystemCatalog::EC);
2336
2337 try
2338 {
2339 CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid);
2340 ridList = systemCatalogPtr->columnRIDs(tableName);
2341 }
2342 catch (exception& ex)
2343 {
2344 err = ex.what();
2345 rc = 1;
2346 TableMetaData::removeTableMetaData(tableOid);
2347
2348 fIsFirstBatchPm = true;
2349 //cout << "flush files when autocommit off" << endl;
2350 fWEWrapper.setIsInsert(true);
2351 fWEWrapper.setBulkFlag(true);
2352 return rc;
2353 }
2354
2355 rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Ending");
2356
2357 if ( rc != 0)
2358 {
2359 WErrorCodes ec;
2360 err = ec.errorString(rc);
2361 err += " Check err.log for detailed information.";
2362 TableMetaData::removeTableMetaData(tableOid);
2363
2364 fIsFirstBatchPm = true;
2365 fWEWrapper.setIsInsert(true);
2366 fWEWrapper.setBulkFlag(true);
2367 rc = 1;
2368 return rc;
2369 }
2370
2371 try
2372 {
2373 if (isAutoCommitOn)
2374 {
2375 bs.restart();
2376
2377 if (fWEWrapper.getIsInsert())
2378 {
2379 // @bug5333, up to here, rc == 0, but flushchunk may fail.
2380 rc = processBatchInsertHwmFlushChunks(tableOid, txnID, files, oidsToFlush, err);
2381 }
2382
2383 if ( tmp8 != 0)
2384 TableMetaData::removeTableMetaData(tableOid);
2385
2386 return rc; // will set hwm with version commit.
2387 }
2388 }
2389 catch (exception& ex)
2390 {
2391 err = ex.what();
2392 rc = 1;
2393 return rc;
2394 }
2395
2396 // Handle case where isAutoCommitOn is false
2397 BRM::DBRM dbrm;
2398 //cout << " In processBatchInsertHwm setting hwm" << endl;
2399 std::vector<BRM::BulkSetHWMArg> setHWMArgs;
2400 it = colsExtsInfoMap.begin();
2401 BulkSetHWMArg aArg;
2402
2403 while (it != colsExtsInfoMap.end())
2404 {
2405 aIt = (it->second).begin();
2406 aArg.oid = it->first;
2407
2408 //cout << "for oid " << aArg.oid << endl;
2409 while (aIt != (it->second).end())
2410 {
2411 aArg.partNum = aIt->partNum;
2412 aArg.segNum = aIt->segNum;
2413 aArg.hwm = aIt->hwm;
2414
2415 //@Bug 6029 dictionary store files already set hwm.
2416 if (!aIt->isDict)
2417 setHWMArgs.push_back(aArg);
2418
2419 aIt++;
2420 }
2421
2422 it++;
2423 }
2424
2425 TableMetaData::removeTableMetaData(tableOid);
2426
2427 fIsFirstBatchPm = true;
2428 //cout << "flush files when autocommit off" << endl;
2429 fWEWrapper.setIsInsert(true);
2430 fWEWrapper.setBulkFlag(true);
2431
2432 rc = processBatchInsertHwmFlushChunks(tableOid, txnID,
2433 files, oidsToFlush, err);
2434 bs.restart();
2435
2436 try
2437 {
2438 serializeInlineVector (bs, setHWMArgs);
2439 }
2440 catch (exception& ex)
2441 {
2442 // Append to errmsg in case we already have an error
2443 if (err.length() > 0)
2444 err += "; ";
2445
2446 err += ex.what();
2447 rc = 1;
2448 return rc;
2449 }
2450
2451 //cout << "flush is called for transaction " << txnID << endl;
2452
2453 return rc;
2454 }
2455
2456 //------------------------------------------------------------------------------
2457 // Flush chunks for the specified table and transaction.
2458 // Also confirms changes to DB files (for hdfs).
2459 // files vector represents list of files to be purged from PrimProc cache.
2460 // oid2ToFlush represents list of oids to be flushed from PrimProc cache.
2461 // Afterwords, the following attributes are reset as follows:
2462 // fWEWrapper.setIsInsert(false);
2463 // fWEWrapper.setBulkFlag(false);
2464 // fIsFirstBatchPm = true;
2465 // returns 0 for success; returns 1 if error occurs
2466 //------------------------------------------------------------------------------
processBatchInsertHwmFlushChunks(uint32_t tblOid,int txnID,const std::vector<BRM::FileInfo> & files,const std::vector<BRM::OID_t> & oidsToFlush,std::string & err)2467 uint8_t WE_DMLCommandProc::processBatchInsertHwmFlushChunks(
2468 uint32_t tblOid, int txnID,
2469 const std::vector<BRM::FileInfo>& files,
2470 const std::vector<BRM::OID_t>& oidsToFlush,
2471 std::string& err)
2472 {
2473 uint8_t rc = 0;
2474 std::map<uint32_t, uint32_t> oids;
2475 CalpontSystemCatalog::TableName aTableName;
2476 CalpontSystemCatalog::RIDList ridList;
2477 CalpontSystemCatalog::DictOIDList dictOids;
2478
2479 try
2480 {
2481 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
2482 CalpontSystemCatalog::makeCalpontSystemCatalog(txnID);
2483 aTableName = systemCatalogPtr->tableName(tblOid);
2484 ridList = systemCatalogPtr->columnRIDs(aTableName, true);
2485 dictOids = systemCatalogPtr->dictOIDs(aTableName);
2486 }
2487 catch (exception& ex)
2488 {
2489 std::ostringstream ossErr;
2490 ossErr << "System Catalog error for table OID " << tblOid;
2491
2492 // Include tbl name in msg unless exception occurred before we got it
2493 if (aTableName.table.length() > 0)
2494 ossErr << '(' << aTableName << ')';
2495
2496 ossErr << "; " << ex.what();
2497 err = ossErr.str();
2498 rc = 1;
2499 return rc;
2500 }
2501
2502 for (unsigned i = 0; i < ridList.size(); i++)
2503 {
2504 oids[ridList[i].objnum] = ridList[i].objnum;
2505 }
2506
2507 for (unsigned i = 0; i < dictOids.size(); i++)
2508 {
2509 oids[dictOids[i].dictOID] = dictOids[i].dictOID;
2510 }
2511
2512 fWEWrapper.setTransId(txnID);
2513
2514 // @bug5333, up to here, rc == 0, but flushchunk may fail.
2515 rc = fWEWrapper.flushChunks(0, oids);
2516
2517 if (rc == NO_ERROR)
2518 {
2519 // Confirm changes to db files "only" if no error up to this point
2520 if (idbdatafile::IDBPolicy::useHdfs())
2521 {
2522 std::string eMsg;
2523 ConfirmHdfsDbFile confirmHdfs;
2524 int confirmDbRc = confirmHdfs.confirmDbFileListFromMetaFile(
2525 tblOid, eMsg);
2526
2527 if (confirmDbRc == NO_ERROR)
2528 {
2529 int endDbRc = confirmHdfs.endDbFileListFromMetaFile(
2530 tblOid, true, eMsg);
2531
2532 if (endDbRc != NO_ERROR)
2533 {
2534 // Might want to log this error, but don't think we
2535 // need to report as fatal, as all changes were confirmed.
2536 }
2537
2538 if (files.size() > 0)
2539 cacheutils::purgePrimProcFdCache(files,
2540 Config::getLocalModuleID());
2541
2542 cacheutils::flushOIDsFromCache(oidsToFlush);
2543 }
2544 else
2545 {
2546 ostringstream ossErr;
2547 ossErr << "Error confirming changes to table " <<
2548 aTableName << "; " << eMsg;
2549 err = ossErr.str();
2550 rc = 1; // reset to 1
2551 }
2552 }
2553 }
2554 else // flushChunks error
2555 {
2556 WErrorCodes ec;
2557 std::ostringstream ossErr;
2558 ossErr << "Error flushing chunks for table " << aTableName <<
2559 "; " << ec.errorString(rc);
2560 err = ossErr.str();
2561 rc = 1; // reset to 1
2562 }
2563
2564 fWEWrapper.setIsInsert(false);
2565 fWEWrapper.setBulkFlag(false);
2566 fIsFirstBatchPm = true;
2567
2568 return rc;
2569 }
2570
commitBatchAutoOff(messageqcpp::ByteStream & bs,std::string & err)2571 uint8_t WE_DMLCommandProc::commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err)
2572 {
2573 uint8_t rc = 0;
2574 //commit all versioned blocks, set hwm, update casual partition
2575 return rc;
2576 }
2577
rollbackBatchAutoOn(messageqcpp::ByteStream & bs,std::string & err)2578 uint8_t WE_DMLCommandProc::rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err)
2579 {
2580 uint8_t rc = 0;
2581 uint32_t tmp32, tableOid, sessionID;
2582 uint64_t lockID;
2583 bs >> sessionID;
2584 bs >> lockID;
2585 bs >> tmp32;
2586 tableOid = tmp32;
2587 //Bulkrollback
2588 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
2589 CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
2590 CalpontSystemCatalog::TableName aTableName;
2591
2592 try
2593 {
2594 aTableName = systemCatalogPtr->tableName(tableOid);
2595 }
2596 catch ( ... )
2597 {
2598 err = "No such table for oid " + tableOid;
2599 rc = 1;
2600 return rc;
2601 }
2602
2603 string table = aTableName.schema + "." + aTableName.table;
2604 string applName ("BatchInsert");
2605 rc = fWEWrapper.bulkRollback(tableOid, lockID, table, applName, false, err);
2606 fIsFirstBatchPm = true;
2607 TableMetaData::removeTableMetaData(tableOid);
2608 return rc;
2609 }
2610
rollbackBatchAutoOff(messageqcpp::ByteStream & bs,std::string & err)2611 uint8_t WE_DMLCommandProc::rollbackBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err)
2612 {
2613 uint8_t rc = 0;
2614 //Rollbacked all versioned blocks
2615 return rc;
2616 }
processUpdate(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId,uint64_t & blocksChanged)2617 uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
2618 std::string& err,
2619 ByteStream::quadbyte& PMId,
2620 uint64_t& blocksChanged)
2621 {
2622 uint8_t rc = 0;
2623 //cout << " In processUpdate" << endl;
2624 uint32_t tmp32, sessionID;
2625 TxnID txnId;
2626 bs >> PMId;
2627 bs >> tmp32;
2628 txnId = tmp32;
2629 fWEWrapper.setIsInsert(false);
2630 fWEWrapper.setBulkFlag(false);
2631 fWEWrapper.setTransId(txnId);
2632
2633 if (!rowGroups[txnId]) //meta data
2634 {
2635 rowGroups[txnId] = new rowgroup::RowGroup();
2636 rowGroups[txnId]->deserialize(bs);
2637 uint8_t pkgType;
2638 bs >> pkgType;
2639 cpackages[txnId].read(bs);
2640 //cout << "Processed meta data in update" << endl;
2641
2642 rc = fWEWrapper.startTransaction(txnId);
2643
2644 if (rc != NO_ERROR)
2645 {
2646 WErrorCodes ec;
2647 err = ec.errorString(rc);
2648 }
2649
2650 return rc;
2651 }
2652
2653 bool pushWarning = false;
2654 rowgroup::RGData rgData;
2655 rgData.deserialize(bs);
2656 rowGroups[txnId]->setData(&rgData);
2657 //rowGroups[txnId]->setData(const_cast<uint8_t*>(bs.buf()));
2658 //get rows and values
2659 rowgroup::Row row;
2660 rowGroups[txnId]->initRow(&row);
2661 string value("");
2662 uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount();
2663 uint32_t columnsSelected = rowGroups[txnId]->getColumnCount();
2664 std::vector<execplan::CalpontSystemCatalog::ColDataType> fetchColTypes = rowGroups[txnId]->getColTypes();
2665 std::vector<uint32_t> fetchColScales = rowGroups[txnId]->getScale();
2666 std::vector<uint32_t> fetchColColwidths;
2667
2668 for (uint32_t i = 0; i < columnsSelected; i++)
2669 {
2670 fetchColColwidths.push_back(rowGroups[txnId]->getColumnWidth(i));
2671 }
2672
2673 WriteEngine::ColTupleList aColList;
2674 WriteEngine::ColTuple colTuple;
2675 WriteEngine::ColStructList colStructList;
2676 WriteEngine::ColStruct colStruct;
2677 WriteEngine::ColValueList colValueList;
2678 WriteEngine::RIDList rowIDLists;
2679
2680 WriteEngine::DctnryStructList dctnryStructList;
2681 WriteEngine::DctnryStruct dctnryStruct;
2682 WriteEngine::DctnryValueList dctnryValueList;
2683
2684 CalpontSystemCatalog::TableName tableName;
2685 CalpontSystemCatalog::TableColName tableColName;
2686 DMLTable* tablePtr = cpackages[txnId].get_Table();
2687 RowList rows = tablePtr->get_RowList();
2688 dmlpackage::ColumnList columnsUpdated = rows[0]->get_ColumnList();
2689 tableColName.table = tableName.table = tablePtr->get_TableName();
2690 tableColName.schema = tableName.schema = tablePtr->get_SchemaName();
2691 tableColName.column = columnsUpdated[0]->get_Name();
2692
2693 sessionID = cpackages[txnId].get_SessionID();
2694
2695 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
2696 CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
2697 CalpontSystemCatalog::OID oid = 0;
2698 CalpontSystemCatalog::ROPair tableRO;
2699
2700 std::string timeZone = cpackages[txnId].get_TimeZone();
2701
2702 try
2703 {
2704 tableRO = systemCatalogPtr->tableRID(tableName);
2705 oid = systemCatalogPtr->lookupOID(tableColName);
2706 }
2707 catch (std::exception& ex)
2708 {
2709 rc = 1;
2710 ostringstream oss;
2711 oss << "lookupOID got exception " << ex.what() << " with column " << tableColName.schema << "." + tableColName.table << "." << tableColName.column;
2712 err = oss.str();
2713 }
2714 catch ( ... )
2715 {
2716 rc = 1;
2717 ostringstream oss;
2718 oss << "lookupOID got unknown exception with column " << tableColName.schema << "." << tableColName.table << "." << tableColName.column;
2719 err = oss.str();
2720 }
2721
2722 if (rc != 0)
2723 return rc;
2724
2725 rowGroups[txnId]->getRow(0, &row);
2726 CalpontSystemCatalog::RID rid = row.getRid();
2727 uint16_t dbRoot, segment, blockNum;
2728 uint32_t partition;
2729 uint8_t extentNum;
2730 //Get the file information from rowgroup
2731 dbRoot = rowGroups[txnId]->getDBRoot();
2732 rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
2733 colStruct.fColPartition = partition;
2734 colStruct.fColSegment = segment;
2735 colStruct.fColDbRoot = dbRoot;
2736 dctnryStruct.fColPartition = partition;
2737 dctnryStruct.fColSegment = segment;
2738 dctnryStruct.fColDbRoot = dbRoot;
2739 TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableRO.objnum);
2740 //Build to be updated column structure and values
2741 int error = 0;
2742 unsigned fetchColPos = 0;
2743 bool ridsFetched = false;
2744 bool isNull = false;
2745 boost::any datavalue;
2746 int64_t intColVal = 0;
2747 //timer.start("fetch values");
2748 std::vector<string> colNames;
2749
2750 // for query stats
2751 boost::scoped_array<CalpontSystemCatalog::ColType> colTypes(new CalpontSystemCatalog::ColType[columnsUpdated.size()]);
2752 boost::scoped_array<int> preBlkNums(new int[columnsUpdated.size()]);
2753 boost::scoped_array<OID> oids(new OID[columnsUpdated.size()]);
2754
2755 BRMWrapper::setUseVb(true);
2756 for (unsigned int j = 0; j < columnsUpdated.size(); j++)
2757 {
2758 //timer.start("lookupsyscat");
2759 tableColName.column = columnsUpdated[j]->get_Name();
2760
2761 try
2762 {
2763 oids[j] = systemCatalogPtr->lookupOID(tableColName);
2764 colTypes[j] = systemCatalogPtr->colType(oids[j]);
2765 }
2766 catch (std::exception& ex)
2767 {
2768 rc = 1;
2769 ostringstream oss;
2770 oss << "colType got exception " << ex.what() << " with column oid " << oid;
2771 err = oss.str();
2772 }
2773 catch ( ... )
2774 {
2775 rc = 1;
2776 ostringstream oss;
2777 oss << "colType got unknown exception with column oid " << oid;
2778 err = oss.str();
2779 }
2780
2781 if (rc != 0)
2782 return rc;
2783
2784 preBlkNums[j] = -1;
2785 }
2786
2787 for (unsigned int j = 0; j < columnsUpdated.size(); j++)
2788 {
2789 /* WriteEngine::ColTupleList colTupleList;
2790 //timer.start("lookupsyscat");
2791 tableColName.column = columnsUpdated[j]->get_Name();
2792 try
2793 {
2794 oid = systemCatalogPtr->lookupOID(tableColName);
2795 }
2796 catch (std::exception& ex)
2797 {
2798 rc = 1;
2799 ostringstream oss;
2800 oss << "lookupOID got exception " << ex.what() << " with column " << tableColName.schema << "." << tableColName.table << "." << tableColName.column;
2801 err = oss.str();
2802 }
2803 catch ( ... )
2804 {
2805 rc = 1;
2806 ostringstream oss;
2807 oss << "lookupOID got unknown exception with column " << tableColName.schema << "." << tableColName.table << "." << tableColName.column;
2808 err = oss.str();
2809 }
2810
2811 if (rc != 0)
2812 return rc;
2813
2814 CalpontSystemCatalog::ColType colType;
2815 try
2816 {
2817 colType = systemCatalogPtr->colType(oid);
2818 }
2819 catch (std::exception& ex)
2820 {
2821 rc = 1;
2822 ostringstream oss;
2823 oss << "colType got exception " << ex.what() << " with column oid " << oid;
2824 err = oss.str();
2825 }
2826 catch ( ... )
2827 {
2828 rc = 1;
2829 ostringstream oss;
2830 oss << "colType got unknown exception with column oid " << oid;
2831 err = oss.str();
2832 }
2833
2834 if (rc !=0)
2835 return rc;
2836 */
2837 WriteEngine::ColTupleList colTupleList;
2838 CalpontSystemCatalog::ColType colType = colTypes[j];
2839 oid = oids[j];
2840 colStruct.dataOid = oid;
2841 colStruct.colDataType = colType.colDataType;
2842 colStruct.tokenFlag = false;
2843 colStruct.fCompressionType = colType.compressionType;
2844 tableColName.column = columnsUpdated[j]->get_Name();
2845
2846 if ( !ridsFetched)
2847 {
2848 // querystats
2849 uint64_t relativeRID = 0;
2850
2851 for (unsigned i = 0; i < rowsThisRowgroup; i++)
2852 {
2853 rowGroups[txnId]->getRow(i, &row);
2854 rid = row.getRid();
2855 relativeRID = rid - rowGroups[txnId]->getBaseRid();
2856 rid = relativeRID;
2857 convertToRelativeRid (rid, extentNum, blockNum);
2858 rowIDLists.push_back(rid);
2859 uint32_t colWidth = (colTypes[j].colWidth > 8 ? 8 : colTypes[j].colWidth);
2860 int rrid = (int) relativeRID / (BYTE_PER_BLOCK / colWidth);
2861 // populate stats.blocksChanged
2862 if (rrid > preBlkNums[j])
2863 {
2864 preBlkNums[j] = rrid ;
2865 blocksChanged++;
2866 }
2867
2868 }
2869
2870 ridsFetched = true;
2871 }
2872
2873 bool pushWarn = false;
2874 bool nameNeeded = false;
2875
2876 if (isDictCol(colType))
2877 {
2878 colStruct.colWidth = 8;
2879 colStruct.tokenFlag = true;
2880 dctnryStruct.dctnryOid = colType.ddn.dictOID;
2881 dctnryStruct.columnOid = colStruct.dataOid;
2882 dctnryStruct.fCompressionType = colType.compressionType;
2883 dctnryStruct.colWidth = colType.colWidth;
2884
2885 if (NO_ERROR != (error = fWEWrapper.openDctnry (txnId, dctnryStruct, false))) // @bug 5572 HDFS tmp file
2886 {
2887 WErrorCodes ec;
2888 err = ec.errorString(error);
2889 rc = error;
2890 return rc;
2891 }
2892
2893 ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStruct.dctnryOid);
2894 ColExtsInfo::iterator it = aColExtsInfo.begin();
2895
2896 while (it != aColExtsInfo.end())
2897 {
2898 if ((it->dbRoot == dctnryStruct.fColDbRoot) && (it->partNum == dctnryStruct.fColPartition) && (it->segNum == dctnryStruct.fColSegment))
2899 break;
2900
2901 it++;
2902 }
2903
2904 if (it == aColExtsInfo.end()) //add this one to the list
2905 {
2906 ColExtInfo aExt;
2907 aExt.dbRoot = dctnryStruct.fColDbRoot;
2908 aExt.partNum = dctnryStruct.fColPartition;
2909 aExt.segNum = dctnryStruct.fColSegment;
2910 aExt.compType = dctnryStruct.fCompressionType;
2911 aExt.isDict = true;
2912 aColExtsInfo.push_back(aExt);
2913 }
2914
2915 aTableMetaData->setColExtsInfo(dctnryStruct.dctnryOid, aColExtsInfo);
2916
2917
2918 if (columnsUpdated[j]->get_isFromCol())
2919 {
2920 for (unsigned i = 0; i < rowsThisRowgroup; i++)
2921 {
2922
2923 rowGroups[txnId]->getRow(i, &row);
2924
2925 if (row.isNullValue(fetchColPos))
2926 {
2927 if ((colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
2928 {
2929 rc = 1;
2930 Message::Args args;
2931 args.add(tableColName.column);
2932 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
2933 return rc;
2934 }
2935 else if (colType.defaultValue.length() > 0)
2936 {
2937 value = colType.defaultValue;
2938
2939 if (value.length() > (unsigned int)colType.colWidth)
2940 {
2941 value = value.substr(0, colType.colWidth);
2942 pushWarn = true;
2943
2944 if (!pushWarning)
2945 {
2946 pushWarning = true;
2947 }
2948
2949 if (pushWarn)
2950 nameNeeded = true;
2951 }
2952
2953 WriteEngine::DctnryTuple dctTuple;
2954 dctTuple.sigValue = (unsigned char*)value.c_str();
2955 dctTuple.sigSize = value.length();
2956 dctTuple.isNull = false;
2957
2958 error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
2959
2960 if (error != NO_ERROR)
2961 {
2962 fWEWrapper.closeDctnry(txnId, colType.compressionType);
2963 return false;
2964 }
2965
2966 colTuple.data = dctTuple.token;
2967 colTupleList.push_back (colTuple);
2968 }
2969 else
2970 {
2971 WriteEngine::Token nullToken;
2972 colTuple.data = nullToken;
2973 colTupleList.push_back (colTuple);
2974 }
2975
2976 continue;
2977 }
2978
2979 switch (fetchColTypes[fetchColPos])
2980 {
2981 case CalpontSystemCatalog::DATE:
2982 {
2983 intColVal = row.getUintField<4>(fetchColPos);
2984 value = DataConvert::dateToString(intColVal);
2985 break;
2986 }
2987
2988 case CalpontSystemCatalog::DATETIME:
2989 {
2990 intColVal = row.getUintField<8>(fetchColPos);
2991 value = DataConvert::datetimeToString(intColVal, colType.precision);
2992 break;
2993 }
2994
2995 case CalpontSystemCatalog::TIMESTAMP:
2996 {
2997 intColVal = row.getUintField<8>(fetchColPos);
2998 value = DataConvert::timestampToString(intColVal, timeZone, colType.precision);
2999 break;
3000 }
3001
3002 case CalpontSystemCatalog::TIME:
3003 {
3004 intColVal = row.getIntField<8>(fetchColPos);
3005 value = DataConvert::timeToString(intColVal, colType.precision);
3006 break;
3007 }
3008
3009 case CalpontSystemCatalog::CHAR:
3010 case CalpontSystemCatalog::VARCHAR:
3011 {
3012 value = row.getStringField(fetchColPos);
3013 unsigned i = strlen(value.c_str());
3014 value = value.substr(0, i);
3015 break;
3016 }
3017
3018 case CalpontSystemCatalog::VARBINARY:
3019 case CalpontSystemCatalog::BLOB:
3020 case CalpontSystemCatalog::TEXT:
3021 {
3022 value = row.getVarBinaryStringField(fetchColPos);
3023 break;
3024 }
3025
3026 case CalpontSystemCatalog::DECIMAL:
3027 case CalpontSystemCatalog::UDECIMAL:
3028 {
3029 // decimal width > 8 cannot be stored in an integer
3030 if (fetchColColwidths[fetchColPos] > 8)
3031 {
3032 value = row.getStringField(fetchColPos);
3033 unsigned i = strlen(value.c_str());
3034 value = value.substr(0, i);
3035 break;
3036 }
3037
3038 // else
3039 // fall through to integer cases
3040 }
3041 /* fall through */
3042
3043 case CalpontSystemCatalog::BIGINT:
3044 case CalpontSystemCatalog::UBIGINT:
3045 case CalpontSystemCatalog::INT:
3046 case CalpontSystemCatalog::UINT:
3047 case CalpontSystemCatalog::MEDINT:
3048 case CalpontSystemCatalog::UMEDINT:
3049 case CalpontSystemCatalog::SMALLINT:
3050 case CalpontSystemCatalog::USMALLINT:
3051 case CalpontSystemCatalog::TINYINT:
3052 case CalpontSystemCatalog::UTINYINT:
3053 {
3054 {
3055 intColVal = row.getIntField(fetchColPos);
3056
3057 if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDECIMAL
3058 && intColVal < 0)
3059 {
3060 intColVal = 0;
3061 }
3062
3063 if (fetchColScales[fetchColPos] <= 0)
3064 {
3065 ostringstream os;
3066
3067 if (isUnsigned(fetchColTypes[fetchColPos]))
3068 os << static_cast<uint64_t>(intColVal);
3069 else
3070 os << intColVal;
3071
3072 value = os.str();
3073 }
3074 else
3075 {
3076 const int ctmp_size = 65 + 1 + 1 + 1;
3077 char ctmp[ctmp_size] = {0};
3078 DataConvert::decimalToString(
3079 intColVal, fetchColScales[fetchColPos],
3080 ctmp, ctmp_size, fetchColTypes[fetchColPos]);
3081 value = ctmp; // null termination by decimalToString
3082 }
3083 }
3084 break;
3085 }
3086
3087 //In this case, we're trying to load a double output column with float data. This is the
3088 // case when you do sum(floatcol), e.g.
3089 case CalpontSystemCatalog::FLOAT:
3090 case CalpontSystemCatalog::UFLOAT:
3091 {
3092 float dl = row.getFloatField(fetchColPos);
3093
3094 if (dl == std::numeric_limits<float>::infinity())
3095 continue;
3096
3097 if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0)
3098 {
3099 dl = 0.0;
3100 }
3101
3102 ostringstream os;
3103 //@Bug 3350 fix the precision.
3104 os << setprecision(7) << dl;
3105 value = os.str();
3106 break;
3107 }
3108
3109 case CalpontSystemCatalog::DOUBLE:
3110 case CalpontSystemCatalog::UDOUBLE:
3111 {
3112 double dl = row.getDoubleField(fetchColPos);
3113
3114 if (dl == std::numeric_limits<double>::infinity())
3115 continue;
3116
3117 if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0)
3118 {
3119 dl = 0.0;
3120 }
3121
3122 ostringstream os;
3123 //@Bug 3350 fix the precision.
3124 os << setprecision(16) << dl;
3125 value = os.str();
3126 break;
3127 }
3128
3129 case CalpontSystemCatalog::LONGDOUBLE:
3130 {
3131 long double dll = row.getLongDoubleField(fetchColPos);
3132
3133 if (dll == std::numeric_limits<long double>::infinity())
3134 continue;
3135
3136 ostringstream os;
3137 //@Bug 3350 fix the precision.
3138 os << setprecision(19) << dll;
3139 value = os.str();
3140 break;
3141 }
3142
3143 default: // treat as int64
3144 {
3145 ostringstream os;
3146 intColVal = row.getUintField<8>(fetchColPos);
3147 os << intColVal;
3148 value = os.str();
3149 break;
3150 }
3151 }
3152
3153 uint32_t funcScale = columnsUpdated[j]->get_funcScale();
3154
3155 if (funcScale != 0)
3156 {
3157 string::size_type pos = value.find_first_of("."); //decimal point
3158
3159 if ( pos >= value.length() )
3160 value.insert(value.length(), ".");
3161
3162 //padding 0 if needed
3163 pos = value.find_first_of(".");
3164 uint32_t digitsAfterPoint = value.length() - pos - 1;
3165
3166 if (digitsAfterPoint < funcScale)
3167 {
3168 for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++)
3169 value += "0";
3170 }
3171 }
3172
3173 //check data length
3174 //trim the string if needed
3175 if (value.length() > (unsigned int)colType.colWidth)
3176 {
3177 value = value.substr(0, colType.colWidth);
3178
3179 if (!pushWarn)
3180 pushWarn = true;
3181
3182 if (!pushWarning)
3183 pushWarning = true;
3184
3185 if (pushWarn)
3186 nameNeeded = true;
3187 }
3188
3189 WriteEngine::DctnryTuple dctTuple;
3190 dctTuple.sigValue = (unsigned char*)value.c_str();
3191 dctTuple.sigSize = value.length();
3192 dctTuple.isNull = false;
3193
3194 error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
3195
3196 if (error != NO_ERROR)
3197 {
3198 fWEWrapper.closeDctnry(txnId, colType.compressionType);
3199 rc = error;
3200 WErrorCodes ec;
3201 err = ec.errorString(error);
3202 return rc;
3203 }
3204
3205 colTuple.data = dctTuple.token;
3206 colTupleList.push_back (colTuple);
3207 }
3208
3209 if (colType.compressionType == 0)
3210 fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
3211 else
3212 fWEWrapper.closeDctnry(txnId, colType.compressionType, false);
3213
3214 fetchColPos++;
3215 }
3216 else //constant
3217 {
3218 if (columnsUpdated[j]->get_isnull())
3219 {
3220 if ((colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
3221 {
3222 rc = 1;
3223 Message::Args args;
3224 args.add(tableColName.column);
3225 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
3226 return rc;
3227 }
3228 else if (colType.defaultValue.length() > 0)
3229 {
3230 value = colType.defaultValue;
3231
3232 if (value.length() > (unsigned int)colType.colWidth)
3233 {
3234 value = value.substr(0, colType.colWidth);
3235 pushWarn = true;
3236
3237 if (!pushWarning)
3238 {
3239 pushWarning = true;
3240 }
3241
3242 if (pushWarn)
3243 nameNeeded = true;
3244 }
3245
3246 WriteEngine::DctnryTuple dctTuple;
3247 dctTuple.sigValue = (unsigned char*)value.c_str();
3248 dctTuple.sigSize = value.length();
3249 dctTuple.isNull = false;
3250 error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
3251
3252 if (error != NO_ERROR)
3253 {
3254 fWEWrapper.closeDctnry(txnId, colType.compressionType);
3255 rc = error;
3256 WErrorCodes ec;
3257 err = ec.errorString(error);
3258 return rc;
3259 }
3260
3261 colTuple.data = dctTuple.token;
3262
3263 if (colType.compressionType == 0)
3264 fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
3265 else
3266 fWEWrapper.closeDctnry(txnId, colType.compressionType, false); // Constant only need to tokenize once.
3267 }
3268 else
3269 {
3270 WriteEngine::Token nullToken;
3271 colTuple.data = nullToken;
3272 }
3273 }
3274 else
3275 {
3276 value = columnsUpdated[j]->get_Data();
3277
3278 if (value.length() > (unsigned int)colType.colWidth)
3279 {
3280 value = value.substr(0, colType.colWidth);
3281 pushWarn = true;
3282
3283 if (!pushWarning)
3284 {
3285 pushWarning = true;
3286 }
3287
3288 if (pushWarn)
3289 nameNeeded = true;
3290 }
3291
3292 WriteEngine::DctnryTuple dctTuple;
3293 dctTuple.sigValue = (unsigned char*)value.c_str();
3294 dctTuple.sigSize = value.length();
3295 dctTuple.isNull = false;
3296 error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
3297
3298 if (error != NO_ERROR)
3299 {
3300 fWEWrapper.closeDctnry(txnId, colType.compressionType);
3301 rc = error;
3302 WErrorCodes ec;
3303 err = ec.errorString(error);
3304 return rc;
3305 }
3306
3307 colTuple.data = dctTuple.token;
3308
3309 if (colType.compressionType == 0)
3310 fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
3311 else
3312 fWEWrapper.closeDctnry(txnId, colType.compressionType, false); // Constant only need to tokenize once.
3313 }
3314
3315 for (unsigned row = 0; row < rowsThisRowgroup; row++)
3316 colTupleList.push_back (colTuple);
3317 }
3318 }
3319 else //Non dictionary column
3320 {
3321 colStruct.colWidth = colType.colWidth;
3322
3323 if (columnsUpdated[j]->get_isFromCol())
3324 {
3325 for (unsigned i = 0; i < rowsThisRowgroup; i++)
3326 {
3327 rowGroups[txnId]->getRow(i, &row);
3328
3329 if (row.isNullValue(fetchColPos))
3330 {
3331 isNull = true;
3332 value = "";
3333 }
3334 else
3335 {
3336 isNull = false;
3337
3338 switch (fetchColTypes[fetchColPos])
3339 {
3340 case CalpontSystemCatalog::DATE:
3341 {
3342 intColVal = row.getUintField<4>(fetchColPos);
3343 value = DataConvert::dateToString(intColVal);
3344 break;
3345 }
3346
3347 case CalpontSystemCatalog::DATETIME:
3348 {
3349 intColVal = row.getUintField<8>(fetchColPos);
3350 value = DataConvert::datetimeToString(intColVal, colType.precision);
3351 break;
3352 }
3353
3354 case CalpontSystemCatalog::TIMESTAMP:
3355 {
3356 intColVal = row.getUintField<8>(fetchColPos);
3357 value = DataConvert::timestampToString(intColVal, timeZone, colType.precision);
3358 break;
3359 }
3360
3361 case CalpontSystemCatalog::TIME:
3362 {
3363 intColVal = row.getIntField<8>(fetchColPos);
3364 value = DataConvert::timeToString(intColVal, colType.precision);
3365 break;
3366 }
3367
3368 case CalpontSystemCatalog::CHAR:
3369 case CalpontSystemCatalog::VARCHAR:
3370 {
3371 value = row.getStringField(fetchColPos);
3372 unsigned i = strlen(value.c_str());
3373 value = value.substr(0, i);
3374 break;
3375 }
3376
3377 case CalpontSystemCatalog::VARBINARY:
3378 case CalpontSystemCatalog::BLOB:
3379 case CalpontSystemCatalog::TEXT:
3380 {
3381 value = row.getVarBinaryStringField(fetchColPos);
3382 break;
3383 }
3384
3385 case CalpontSystemCatalog::DECIMAL:
3386 case CalpontSystemCatalog::UDECIMAL:
3387 {
3388 // decimal width > 8 cannot be stored in an integer
3389 if (fetchColColwidths[fetchColPos] > 8)
3390 {
3391 value = row.getStringField(fetchColPos);
3392 unsigned i = strlen(value.c_str());
3393 value = value.substr(0, i);
3394 break;
3395 }
3396
3397 // else
3398 // fall through to integer cases
3399 }
3400 /* fall through */
3401
3402 case CalpontSystemCatalog::BIGINT:
3403 case CalpontSystemCatalog::UBIGINT:
3404 case CalpontSystemCatalog::INT:
3405 case CalpontSystemCatalog::UINT:
3406 case CalpontSystemCatalog::MEDINT:
3407 case CalpontSystemCatalog::UMEDINT:
3408 case CalpontSystemCatalog::SMALLINT:
3409 case CalpontSystemCatalog::USMALLINT:
3410 case CalpontSystemCatalog::TINYINT:
3411 case CalpontSystemCatalog::UTINYINT:
3412 {
3413 {
3414 intColVal = row.getIntField(fetchColPos);
3415
3416 if (fetchColTypes[fetchColPos] ==
3417 CalpontSystemCatalog::UDECIMAL
3418 && intColVal < 0)
3419 {
3420 intColVal = 0;
3421 }
3422
3423 if (fetchColScales[fetchColPos] <= 0)
3424 {
3425 ostringstream os;
3426
3427 if (isUnsigned(fetchColTypes[fetchColPos]))
3428 os << static_cast<uint64_t>(intColVal);
3429 else
3430 os << intColVal;
3431
3432 value = os.str();
3433 }
3434 else
3435 {
3436 const int ctmp_size = 65 + 1 + 1 + 1;
3437 char ctmp[ctmp_size] = {0};
3438 DataConvert::decimalToString(
3439 intColVal, fetchColScales[fetchColPos],
3440 ctmp, ctmp_size, fetchColTypes[fetchColPos]);
3441 value = ctmp; // null termination by decimalToString
3442 }
3443 }
3444 break;
3445 }
3446
3447 //In this case, we're trying to load a double output column with float data. This is the
3448 // case when you do sum(floatcol), e.g.
3449 case CalpontSystemCatalog::FLOAT:
3450 case CalpontSystemCatalog::UFLOAT:
3451 {
3452 float dl = row.getFloatField(fetchColPos);
3453
3454 if (dl == std::numeric_limits<float>::infinity())
3455 continue;
3456
3457 if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0)
3458 {
3459 dl = 0.0;
3460 }
3461
3462 ostringstream os;
3463 //@Bug 3350 fix the precision.
3464 os << setprecision(7) << dl;
3465 value = os.str();
3466 break;
3467 }
3468
3469 case CalpontSystemCatalog::DOUBLE:
3470 case CalpontSystemCatalog::UDOUBLE:
3471 {
3472 double dl = row.getDoubleField(fetchColPos);
3473
3474 if (dl == std::numeric_limits<double>::infinity())
3475 continue;
3476
3477 if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0)
3478 {
3479 dl = 0.0;
3480 }
3481
3482 ostringstream os;
3483 //@Bug 3350 fix the precision.
3484 os << setprecision(16) << dl;
3485 value = os.str();
3486 break;
3487 }
3488
3489 case CalpontSystemCatalog::LONGDOUBLE:
3490 {
3491 long double dll = row.getLongDoubleField(fetchColPos);
3492
3493 if (dll == std::numeric_limits<long double>::infinity())
3494 continue;
3495
3496 ostringstream os;
3497 //@Bug 3350 fix the precision.
3498 os << setprecision(19) << dll;
3499 value = os.str();
3500 break;
3501 }
3502
3503 default: // treat as int64
3504 {
3505 ostringstream os;
3506 intColVal = row.getUintField<8>(fetchColPos);
3507 os << intColVal;
3508 value = os.str();
3509 break;
3510 }
3511 }
3512 }
3513
3514 uint32_t funcScale = columnsUpdated[j]->get_funcScale();
3515
3516 if (funcScale != 0)
3517 {
3518 string::size_type pos = value.find_first_of("."); //decimal point
3519
3520 if ( pos >= value.length() )
3521 value.insert(value.length(), ".");
3522
3523 //padding 0 if needed
3524 pos = value.find_first_of(".");
3525 uint32_t digitsAfterPoint = value.length() - pos - 1;
3526
3527 if (digitsAfterPoint < funcScale)
3528 {
3529 for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++)
3530 value += "0";
3531 }
3532 }
3533
3534 //Check NOT NULL constraint and default value
3535 if ((isNull) && (colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
3536 {
3537 rc = 1;
3538 Message::Args args;
3539 args.add(tableColName.column);
3540 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
3541 return rc;
3542 }
3543 else if ((isNull) && (colType.defaultValue.length() > 0))
3544 {
3545 isNull = false;
3546 bool oneWarn = false;
3547
3548 try
3549 {
3550 datavalue = DataConvert::convertColumnData(colType, colType.defaultValue, pushWarn, timeZone, isNull, false, false);
3551 }
3552 catch (exception&)
3553 {
3554 //@Bug 2624. Error out on conversion failure
3555 rc = 1;
3556 Message::Args args;
3557 args.add(string("'") + colType.defaultValue + string("'"));
3558 err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
3559 }
3560
3561 if ((pushWarn) && (!oneWarn))
3562 oneWarn = true;
3563
3564 colTuple.data = datavalue;
3565 colTupleList.push_back (colTuple);
3566
3567 if (oneWarn)
3568 pushWarn = true;
3569
3570 if (!pushWarning)
3571 {
3572 pushWarning = pushWarn;
3573 }
3574
3575 if (pushWarn)
3576 nameNeeded = true;
3577 }
3578 else
3579 {
3580 try
3581 {
3582 datavalue = DataConvert::convertColumnData(colType, value, pushWarn, timeZone, isNull, false, false);
3583 }
3584 catch (exception&)
3585 {
3586 //@Bug 2624. Error out on conversion failure
3587 rc = 1;
3588 Message::Args args;
3589 args.add(string("'") + value + string("'"));
3590 err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
3591 return rc;
3592 }
3593
3594 colTuple.data = datavalue;
3595 colTupleList.push_back (colTuple);
3596
3597 if (!pushWarning)
3598 {
3599 pushWarning = pushWarn;
3600 }
3601
3602 if (pushWarn)
3603 nameNeeded = true;
3604 }
3605 }
3606
3607 fetchColPos++;
3608 }
3609 else //constant column
3610 {
3611 if (columnsUpdated[j]->get_isnull())
3612 {
3613 isNull = true;
3614 }
3615 else
3616 {
3617 isNull = false;
3618 }
3619
3620 string inData (columnsUpdated[j]->get_Data());
3621
3622 if (((colType.colDataType == execplan::CalpontSystemCatalog::DATE) && (inData == "0000-00-00")) ||
3623 ((colType.colDataType == execplan::CalpontSystemCatalog::DATETIME) && (inData == "0000-00-00 00:00:00")) ||
3624 ((colType.colDataType == execplan::CalpontSystemCatalog::TIMESTAMP) && (inData == "0000-00-00 00:00:00")))
3625 {
3626 isNull = true;
3627 }
3628
3629 uint64_t nextVal = 0;
3630
3631 if (colType.autoincrement)
3632 {
3633 try
3634 {
3635 nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
3636 fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
3637 }
3638 catch (std::exception& ex)
3639 {
3640 err = ex.what();
3641 rc = 1;
3642 return rc;
3643 }
3644 }
3645
3646 if (colType.autoincrement && ( isNull || (inData.compare("0") == 0)))
3647 {
3648 //reserve nextVal
3649 try
3650 {
3651 bool reserved = fDbrm.getAIRange(oid, rowsThisRowgroup, &nextVal);
3652
3653 if (!reserved)
3654 {
3655 err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
3656 rc = 1;
3657 return rc;
3658 }
3659 }
3660 catch (std::exception& ex)
3661 {
3662 err = ex.what();
3663 rc = 1;
3664 return rc;
3665 }
3666
3667 isNull = false;
3668 bool oneWarn = false;
3669
3670 for (unsigned row = 0; row < rowsThisRowgroup; row++)
3671 {
3672
3673 ostringstream oss;
3674 oss << nextVal++;
3675 inData = oss.str();
3676
3677 try
3678 {
3679 datavalue = DataConvert::convertColumnData(colType, inData, pushWarn, timeZone, isNull, false, false);
3680 }
3681 catch (exception&)
3682 {
3683 //@Bug 2624. Error out on conversion failure
3684 rc = 1;
3685 Message::Args args;
3686 args.add(string("'") + inData + string("'"));
3687 err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
3688 }
3689
3690 if ((pushWarn) && (!oneWarn))
3691 oneWarn = true;
3692
3693 colTuple.data = datavalue;
3694 colTupleList.push_back (colTuple);
3695 }
3696
3697 if (oneWarn)
3698 pushWarn = true;
3699
3700 if (!pushWarning)
3701 {
3702 pushWarning = pushWarn;
3703 }
3704
3705 if (pushWarn)
3706 nameNeeded = true;
3707 }
3708 else if (isNull && (colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
3709 {
3710 rc = 1;
3711 Message::Args args;
3712 args.add(tableColName.column);
3713 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
3714 return rc;
3715 }
3716 else if (isNull && (colType.defaultValue.length() > 0))
3717 {
3718 isNull = false;
3719 bool oneWarn = false;
3720
3721 for (unsigned row = 0; row < rowsThisRowgroup; row++)
3722 {
3723 try
3724 {
3725 datavalue = DataConvert::convertColumnData(colType, colType.defaultValue, pushWarn, timeZone, isNull, false, false);
3726 }
3727 catch (exception&)
3728 {
3729 //@Bug 2624. Error out on conversion failure
3730 rc = 1;
3731 Message::Args args;
3732 args.add(string("'") + colType.defaultValue + string("'"));
3733 err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
3734 }
3735
3736 if ((pushWarn) && (!oneWarn))
3737 oneWarn = true;
3738
3739 colTuple.data = datavalue;
3740 colTupleList.push_back (colTuple);
3741 }
3742
3743 if (oneWarn)
3744 pushWarn = true;
3745
3746 if (!pushWarning)
3747 {
3748 pushWarning = pushWarn;
3749 }
3750
3751 if (pushWarn)
3752 nameNeeded = true;
3753 }
3754 else
3755 {
3756 try
3757 {
3758 datavalue = DataConvert::convertColumnData(colType, inData, pushWarn, timeZone, isNull, false, true);
3759 }
3760 catch (exception& ex)
3761 {
3762 //@Bug 2624. Error out on conversion failure
3763 rc = 1;
3764 cout << ex.what() << endl;
3765 Message::Args args;
3766 args.add(string("'") + inData + string("'"));
3767 err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
3768 return rc;
3769 }
3770
3771 colTuple.data = datavalue;
3772
3773 if (!pushWarning)
3774 {
3775 pushWarning = pushWarn;
3776 }
3777
3778 if (pushWarn)
3779 nameNeeded = true;
3780
3781 for (unsigned row = 0; row < rowsThisRowgroup; row++)
3782 colTupleList.push_back (colTuple);
3783 }
3784 }
3785 }
3786
3787 if (nameNeeded)
3788 {
3789 colNames.push_back(tableColName.column);
3790 }
3791
3792 colStructList.push_back(colStruct);
3793 colValueList.push_back (colTupleList);
3794 } //end of bulding values and column structure.
3795
3796 //timer.stop("fetch values");
3797 if (rowIDLists.size() > 0)
3798 {
3799 error = fWEWrapper.updateColumnRecs(txnId, colStructList, colValueList, rowIDLists, tableRO.objnum);
3800 }
3801
3802 if (error != NO_ERROR)
3803 {
3804 rc = error;
3805 WErrorCodes ec;
3806 err = ec.errorString(error);
3807
3808 if (error == ERR_BRM_DEAD_LOCK)
3809 {
3810 rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
3811 }
3812 else if (error == ERR_BRM_VB_OVERFLOW)
3813 {
3814 rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
3815 err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
3816 }
3817 }
3818
3819 if (pushWarning)
3820 {
3821 rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
3822 Message::Args args;
3823 string cols = "'" + colNames[0] + "'";
3824
3825 for (unsigned i = 1; i < colNames.size(); i++)
3826 {
3827 cols = cols + ", " + "'" + colNames[i] + "'";
3828 }
3829
3830 args.add(cols);
3831 err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
3832 }
3833
3834 return rc;
3835 }
3836
getWrittenLbids(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId)3837 uint8_t WE_DMLCommandProc::getWrittenLbids(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId)
3838 {
3839 uint8_t rc = 0;
3840 uint32_t txnId;
3841 vector<LBID_t> lbidList;
3842
3843 bs >> txnId;
3844 std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
3845 std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();
3846
3847 try
3848 {
3849 mapIter = m_txnLBIDMap.find(txnId);
3850
3851 if (mapIter != m_txnLBIDMap.end())
3852 {
3853 SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
3854 std::tr1::unordered_map<BRM::LBID_t, uint32_t> ::iterator listIter = spTxnLBIDRec->m_LBIDMap.begin();
3855
3856 while (listIter != spTxnLBIDRec->m_LBIDMap.end())
3857 {
3858 lbidList.push_back(listIter->first);
3859 listIter++;
3860 }
3861 }
3862 }
3863 catch (...) {}
3864
3865 bs.restart();
3866
3867 try
3868 {
3869 serializeInlineVector (bs, lbidList);
3870 }
3871 catch (exception& ex)
3872 {
3873 // Append to errmsg in case we already have an error
3874 if (err.length() > 0)
3875 err += "; ";
3876
3877 err += ex.what();
3878 rc = 1;
3879 return rc;
3880 }
3881
3882 return rc;
3883 }
3884
processFlushFiles(messageqcpp::ByteStream & bs,std::string & err)3885 uint8_t WE_DMLCommandProc::processFlushFiles(messageqcpp::ByteStream& bs, std::string& err)
3886 {
3887 uint8_t rc = 0;
3888 uint32_t flushCode, txnId, tableOid;
3889 int error;
3890 bs >> flushCode;
3891 bs >> txnId;
3892 bs >> tableOid;
3893 std::map<uint32_t, uint32_t> oids;
3894 CalpontSystemCatalog::TableName aTableName;
3895 CalpontSystemCatalog::RIDList ridList;
3896 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
3897 CalpontSystemCatalog::makeCalpontSystemCatalog(txnId);
3898 // execplan::CalpontSystemCatalog::ColType colType;
3899 CalpontSystemCatalog::DictOIDList dictOids;
3900
3901 if (tableOid >= 3000)
3902 {
3903 try
3904 {
3905 aTableName = systemCatalogPtr->tableName(tableOid);
3906 }
3907 catch ( ... )
3908 {
3909 err = "Systemcatalog error for tableoid " + tableOid;
3910 rc = 1;
3911 return rc;
3912 }
3913
3914 dictOids = systemCatalogPtr->dictOIDs(aTableName);
3915
3916 for (unsigned i = 0; i < dictOids.size(); i++)
3917 {
3918 oids[dictOids[i].dictOID] = dictOids[i].dictOID;
3919 }
3920
3921 //if (dictOids.size() > 0)
3922 // colType = systemCatalogPtr->colTypeDct(dictOids[0].dictOID);
3923 }
3924
3925
3926 fWEWrapper.setIsInsert(false);
3927 fWEWrapper.setBulkFlag(false);
3928 vector<LBID_t> lbidList;
3929
3930 if (idbdatafile::IDBPolicy::useHdfs())
3931 {
3932 //save the extent info to mark them invalid, after flush, the meta file will be gone.
3933 std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
3934 std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();
3935
3936 try
3937 {
3938 mapIter = m_txnLBIDMap.find(txnId);
3939
3940 if (mapIter != m_txnLBIDMap.end())
3941 {
3942 SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
3943 std::tr1::unordered_map<BRM::LBID_t, uint32_t> ::iterator listIter = spTxnLBIDRec->m_LBIDMap.begin();
3944
3945 while (listIter != spTxnLBIDRec->m_LBIDMap.end())
3946 {
3947 lbidList.push_back(listIter->first);
3948 listIter++;
3949 }
3950 }
3951 }
3952 catch (...) {}
3953 }
3954
3955 error = fWEWrapper.flushDataFiles(flushCode, txnId, oids);
3956
3957 //No need to close files, flushDataFile will close them.
3958 //if (((colType.compressionType > 0 ) && (dictOids.size() > 0)) || (idbdatafile::IDBPolicy::useHdfs()))
3959 // fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
3960 if (error != NO_ERROR)
3961 {
3962 rc = error;
3963 WErrorCodes ec;
3964 err = ec.errorString(error);
3965 }
3966
3967 //erase rowgroup from the rowGroup map
3968 if (rowGroups[txnId])
3969 {
3970 delete rowGroups[txnId];
3971 rowGroups[txnId] = 0;
3972 }
3973
3974 TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
3975 ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
3976 ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
3977 ColExtsInfo::iterator aIt;
3978 std::vector<BRM::FileInfo> files;
3979 std::vector<BRM::OID_t> oidsToFlush;
3980 BRM::FileInfo aFile;
3981
3982 while (it != colsExtsInfoMap.end())
3983 {
3984 aIt = (it->second).begin();
3985 aFile.oid = it->first;
3986 oidsToFlush.push_back(aFile.oid);
3987
3988 while (aIt != (it->second).end())
3989 {
3990 aFile.partitionNum = aIt->partNum;
3991 aFile.dbRoot = aIt->dbRoot;
3992 aFile.segmentNum = aIt->segNum;
3993 aFile.compType = aIt->compType;
3994 files.push_back(aFile);
3995 //cout <<"Added to files oid:dbroot:part:seg:compType = " << aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
3996 //<<":"<<aFile.compType <<endl;
3997 aIt++;
3998 }
3999
4000 it++;
4001 }
4002
4003 if (idbdatafile::IDBPolicy::useHdfs())
4004 {
4005 rc = fWEWrapper.confirmTransaction(txnId);
4006 //@Bug 5700. Purge FD cache after file swap
4007 cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
4008 cacheutils::flushOIDsFromCache(oidsToFlush);
4009 fDbrm.invalidateUncommittedExtentLBIDs(0, &lbidList);
4010 }
4011
4012 //cout << "Purged files.size:moduleId = " << files.size() << ":"<<Config::getLocalModuleID() << endl;
4013 //if (idbdatafile::IDBPolicy::useHdfs())
4014 // cacheutils::dropPrimProcFdCache();
4015 TableMetaData::removeTableMetaData(tableOid);
4016
4017 // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
4018 CalpontSystemCatalog::removeCalpontSystemCatalog(txnId);
4019 CalpontSystemCatalog::removeCalpontSystemCatalog(txnId | 0x80000000);
4020 return rc;
4021 }
4022
processDelete(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId,uint64_t & blocksChanged)4023 uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs,
4024 std::string& err,
4025 ByteStream::quadbyte& PMId,
4026 uint64_t& blocksChanged)
4027 {
4028 uint8_t rc = 0;
4029 //cout << " In processDelete" << endl;
4030 uint32_t tmp32, sessionID;
4031 TxnID txnId;
4032 bs >> PMId;
4033 bs >> sessionID;
4034 bs >> tmp32;
4035 txnId = tmp32;
4036 string schema, tableName;
4037 bs >> schema;
4038 bs >> tableName;
4039 fWEWrapper.setIsInsert(false);
4040 fWEWrapper.setBulkFlag(false);
4041 fWEWrapper.setTransId(txnId);
4042
4043 if (!rowGroups[txnId]) //meta data
4044 {
4045 rowGroups[txnId] = new rowgroup::RowGroup();
4046 rowGroups[txnId]->deserialize(bs);
4047 //If hdfs, call chunkmanager to set up
4048
4049 rc = fWEWrapper.startTransaction(txnId);
4050
4051 if (rc != NO_ERROR)
4052 {
4053 WErrorCodes ec;
4054 err = ec.errorString(rc);
4055 }
4056
4057 return rc;
4058 }
4059
4060 rowgroup::RGData rgData;
4061 rgData.deserialize(bs);
4062 rowGroups[txnId]->setData(&rgData);
4063 //rowGroups[txnId]->setData(const_cast<uint8_t*>(bs.buf()));
4064 //get row ids
4065 rowgroup::Row row;
4066 rowGroups[txnId]->initRow(&row);
4067 WriteEngine::RIDList rowIDList;
4068 CalpontSystemCatalog::RID rid;
4069 uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount();
4070 uint16_t dbRoot, segment, blockNum;
4071 uint32_t partition;
4072 uint8_t extentNum;
4073 CalpontSystemCatalog::TableName aTableName;
4074 aTableName.schema = schema;
4075 aTableName.table = tableName;
4076 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
4077 CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
4078 CalpontSystemCatalog::ROPair roPair;
4079
4080 CalpontSystemCatalog::RIDList tableRidList;
4081
4082 try
4083 {
4084 roPair = systemCatalogPtr->tableRID( aTableName);
4085 tableRidList = systemCatalogPtr->columnRIDs(aTableName, true);
4086 }
4087 catch (exception& ex)
4088 {
4089 err = ex.what();
4090 rc = 1;
4091 return rc;
4092
4093 }
4094
4095 // querystats
4096 uint64_t relativeRID = 0;
4097 boost::scoped_array<int> preBlkNums(new int[row.getColumnCount()]);
4098 boost::scoped_array<uint32_t> colWidth(new uint32_t[row.getColumnCount()]);
4099
4100 // initialize
4101 for (uint32_t j = 0; j < row.getColumnCount(); j++)
4102 {
4103 preBlkNums[j] = -1;
4104 colWidth[j] = (row.getColumnWidth(j) >= 8 ? 8 : row.getColumnWidth(j));
4105 }
4106
4107 //Get the file information from rowgroup
4108 dbRoot = rowGroups[txnId]->getDBRoot();
4109 rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
4110 WriteEngine::ColStructList colStructList;
4111 WriteEngine::ColStruct colStruct;
4112 colStruct.fColPartition = partition;
4113 colStruct.fColSegment = segment;
4114 colStruct.fColDbRoot = dbRoot;
4115
4116 for (unsigned i = 0; i < rowsThisRowgroup; i++)
4117 {
4118 rowGroups[txnId]->getRow(i, &row);
4119 rid = row.getRid();
4120 relativeRID = rid - rowGroups[txnId]->getBaseRid();
4121 rid = relativeRID;
4122 convertToRelativeRid (rid, extentNum, blockNum);
4123 rowIDList.push_back(rid);
4124
4125 // populate stats.blocksChanged
4126 for (uint32_t j = 0; j < row.getColumnCount(); j++)
4127 {
4128 if ((int)(relativeRID / (BYTE_PER_BLOCK / colWidth[j])) > preBlkNums[j])
4129 {
4130 blocksChanged++;
4131 preBlkNums[j] = relativeRID / (BYTE_PER_BLOCK / colWidth[j]);
4132 }
4133 }
4134 }
4135
4136 try
4137 {
4138 for (unsigned i = 0; i < tableRidList.size(); i++)
4139 {
4140 CalpontSystemCatalog::ColType colType;
4141 colType = systemCatalogPtr->colType( tableRidList[i].objnum );
4142 colStruct.dataOid = tableRidList[i].objnum;
4143 colStruct.tokenFlag = false;
4144 colStruct.fCompressionType = colType.compressionType;
4145
4146 if (colType.colWidth > 8) //token
4147 {
4148 colStruct.colWidth = 8;
4149 colStruct.tokenFlag = true;
4150 }
4151 else
4152 {
4153 colStruct.colWidth = colType.colWidth;
4154 }
4155
4156 colStruct.colDataType = colType.colDataType;
4157
4158 colStructList.push_back( colStruct );
4159 }
4160 }
4161 catch (exception& ex)
4162 {
4163 err = ex.what();
4164 rc = 1;
4165 return rc;
4166 }
4167
4168 std::vector<ColStructList> colExtentsStruct;
4169 std::vector<void*> colOldValueList;
4170 std::vector<RIDList> ridLists;
4171 colExtentsStruct.push_back(colStructList);
4172 ridLists.push_back(rowIDList);
4173 int error = 0;
4174
4175 error = fWEWrapper.deleteRow( txnId, colExtentsStruct, colOldValueList, ridLists, roPair.objnum );
4176
4177 if (error != NO_ERROR)
4178 {
4179 rc = error;
4180 //cout << "WE Error code " << error << endl;
4181 WErrorCodes ec;
4182 err = ec.errorString(error);
4183
4184 if (error == ERR_BRM_DEAD_LOCK)
4185 {
4186 rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
4187 }
4188 else if (error == ERR_BRM_VB_OVERFLOW)
4189 {
4190 rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
4191 err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
4192 }
4193 }
4194
4195 return rc;
4196 }
4197
processRemoveMeta(messageqcpp::ByteStream & bs,std::string & err)4198 uint8_t WE_DMLCommandProc::processRemoveMeta(messageqcpp::ByteStream& bs, std::string& err)
4199 {
4200 uint8_t rc = 0;
4201 uint32_t tableOID;
4202
4203 try
4204 {
4205 bs >> tableOID;
4206 //std::cout << ": tableOID-" << tableOID << std::endl;
4207
4208 BulkRollbackMgr::deleteMetaFile( tableOID );
4209 }
4210 catch (exception& ex)
4211 {
4212 err = ex.what();
4213 rc = 1;
4214 }
4215
4216 return rc;
4217 }
4218
4219 //------------------------------------------------------------------------------
4220 // Process bulk rollback command
4221 //------------------------------------------------------------------------------
processBulkRollback(messageqcpp::ByteStream & bs,std::string & err)4222 uint8_t WE_DMLCommandProc::processBulkRollback(messageqcpp::ByteStream& bs,
4223 std::string& err)
4224 {
4225 uint8_t rc = 0;
4226 err.clear();
4227
4228 try
4229 {
4230 uint32_t tableOID;
4231 uint64_t tableLockID;
4232 std::string tableName;
4233 std::string appName;
4234
4235 // May want to eventually comment out this logging to stdout,
4236 // but it shouldn't hurt to keep in here.
4237 std::cout << "processBulkRollback";
4238 bs >> tableLockID;
4239 //std::cout << ": tableLock-"<< tableLockID;
4240
4241 bs >> tableOID;
4242 //std::cout << "; tableOID-" << tableOID;
4243
4244 bs >> tableName;
4245
4246 if (tableName.length() == 0)
4247 {
4248 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
4249 CalpontSystemCatalog::makeCalpontSystemCatalog(0);
4250 CalpontSystemCatalog::TableName aTableName = systemCatalogPtr->tableName(tableOID);
4251 tableName = aTableName.toString();
4252
4253 }
4254
4255 std::cout << "; table-" << tableName;
4256
4257 bs >> appName;
4258 std::cout << "; app-" << appName << std::endl;
4259
4260 int we_rc = fWEWrapper.bulkRollback(
4261 tableOID,
4262 tableLockID,
4263 tableName,
4264 appName,
4265 false, // no extra debug logging to the console
4266 err );
4267
4268 if (we_rc != NO_ERROR)
4269 rc = 2;
4270 }
4271 catch (exception& ex)
4272 {
4273 std::cout << "processBulkRollback: exception-" << ex.what() << std::endl;
4274 err = ex.what();
4275 rc = 1;
4276 }
4277
4278 return rc;
4279 }
4280
4281 //------------------------------------------------------------------------------
4282 // Process bulk rollback cleanup command (deletes bulk rollback meta data files)
4283 //------------------------------------------------------------------------------
processBulkRollbackCleanup(messageqcpp::ByteStream & bs,std::string & err)4284 uint8_t WE_DMLCommandProc::processBulkRollbackCleanup(
4285 messageqcpp::ByteStream& bs,
4286 std::string& err)
4287 {
4288 uint8_t rc = 0;
4289 err.clear();
4290
4291 try
4292 {
4293 uint32_t tableOID;
4294
4295 // May want to eventually comment out this logging to stdout,
4296 // but it shouldn't hurt to keep in here.
4297 std::cout << "processBulkRollbackCleanup";
4298 bs >> tableOID;
4299 std::cout << ": tableOID-" << tableOID << std::endl;
4300
4301 BulkRollbackMgr::deleteMetaFile( tableOID );
4302 }
4303 catch (exception& ex)
4304 {
4305 std::cout << "processBulkRollbackCleanup: exception-" << ex.what() <<
4306 std::endl;
4307 err = ex.what();
4308 rc = 1;
4309 }
4310
4311 return rc;
4312 }
4313
updateSyscolumnNextval(ByteStream & bs,std::string & err)4314 uint8_t WE_DMLCommandProc::updateSyscolumnNextval(ByteStream& bs, std::string& err)
4315 {
4316 uint32_t columnOid, sessionID;
4317 uint64_t nextVal;
4318 int rc = 0;
4319 bs >> columnOid;
4320 bs >> nextVal;
4321 bs >> sessionID;
4322 uint16_t dbRoot;
4323 std::map<uint32_t, uint32_t> oids;
4324 //std::vector<BRM::OID_t> oidsToFlush;
4325 oids[columnOid] = columnOid;
4326 //oidsToFlush.push_back(columnOid);
4327 BRM::OID_t oid = 1021;
4328 fDbrm.getSysCatDBRoot(oid, dbRoot);
4329 fWEWrapper.setTransId(sessionID);
4330 fWEWrapper.setBulkFlag(false);
4331 fWEWrapper.startTransaction(sessionID);
4332 //cout << "updateSyscolumnNextval startTransaction id " << sessionID << endl;
4333 rc = fWEWrapper.updateNextValue(sessionID, columnOid, nextVal, sessionID, dbRoot);
4334
4335 if (rc != 0)
4336 {
4337 err = "Error in WE::updateNextValue";
4338 rc = 1;
4339 }
4340
4341 if (idbdatafile::IDBPolicy::useHdfs())
4342 {
4343 cout << "updateSyscolumnNextval flushDataFiles " << endl;
4344 int rc1 = fWEWrapper.flushDataFiles(rc, sessionID, oids);
4345
4346 if ((rc == 0) && ( rc1 == 0))
4347 {
4348 cout << "updateSyscolumnNextval confirmTransaction rc =0 " << endl;
4349 rc1 = fWEWrapper.confirmTransaction(sessionID);
4350 cout << "updateSyscolumnNextval confirmTransaction return code is " << rc1 << endl;
4351
4352 if ( rc1 == NO_ERROR)
4353 rc1 = fWEWrapper.endTransaction(sessionID, true);
4354 else
4355 fWEWrapper.endTransaction(sessionID, false);
4356 }
4357 else
4358 {
4359 cout << "updateSyscolumnNextval endTransaction with error " << endl;
4360 fWEWrapper.endTransaction(sessionID, false);
4361
4362 }
4363
4364 if ( rc == NO_ERROR)
4365 rc = rc1;
4366 }
4367
4368 //if (idbdatafile::IDBPolicy::useHdfs())
4369 // cacheutils::flushOIDsFromCache(oidsToFlush);
4370 return rc;
4371 }
4372
processPurgeFDCache(ByteStream & bs,std::string & err)4373 uint8_t WE_DMLCommandProc::processPurgeFDCache(ByteStream& bs, std::string& err)
4374 {
4375 int rc = 0;
4376 uint32_t tableOid;
4377 bs >> tableOid;
4378 TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
4379 ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
4380 ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
4381 ColExtsInfo::iterator aIt;
4382 std::vector<BRM::FileInfo> files;
4383 BRM::FileInfo aFile;
4384
4385 while (it != colsExtsInfoMap.end())
4386 {
4387 aIt = (it->second).begin();
4388 aFile.oid = it->first;
4389
4390 while (aIt != (it->second).end())
4391 {
4392 aFile.partitionNum = aIt->partNum;
4393 aFile.dbRoot = aIt->dbRoot;
4394 aFile.segmentNum = aIt->segNum;
4395 aFile.compType = aIt->compType;
4396 files.push_back(aFile);
4397 //cout <<"Added to files oid:dbroot:part:seg:compType = " << aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
4398 //<<":"<<aFile.compType <<endl;
4399 aIt++;
4400 }
4401
4402 it++;
4403 }
4404
4405 if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0) )
4406 cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
4407
4408 TableMetaData::removeTableMetaData(tableOid);
4409 return rc;
4410 }
4411
processFixRows(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId)4412 uint8_t WE_DMLCommandProc::processFixRows(messageqcpp::ByteStream& bs,
4413 std::string& err,
4414 ByteStream::quadbyte& PMId)
4415 {
4416 uint8_t rc = 0;
4417 //cout << " In processFixRows" << endl;
4418 uint32_t tmp32;
4419 uint64_t sessionID;
4420 uint16_t dbRoot, segment;
4421 uint32_t partition;
4422 string schema, tableName;
4423 TxnID txnId;
4424 uint8_t tmp8;
4425 bool firstBatch = false;
4426 WriteEngine::RIDList rowIDList;
4427 bs >> PMId;
4428 bs >> tmp8;
4429 firstBatch = (tmp8 != 0);
4430 bs >> sessionID;
4431 bs >> tmp32;
4432 txnId = tmp32;
4433
4434 bs >> schema;
4435 bs >> tableName;
4436 bs >> dbRoot;
4437 bs >> partition;
4438 bs >> segment;
4439
4440 deserializeInlineVector(bs, rowIDList);
4441
4442 //Need to identify whether this is the first batch to start transaction.
4443 if (firstBatch)
4444 {
4445 rc = fWEWrapper.startTransaction(txnId);
4446
4447 if (rc != NO_ERROR)
4448 {
4449 WErrorCodes ec;
4450 err = ec.errorString(rc);
4451 return rc;
4452 }
4453
4454 fWEWrapper.setIsInsert(false);
4455 fWEWrapper.setBulkFlag(false);
4456 fWEWrapper.setTransId(txnId);
4457 fWEWrapper.setFixFlag(true);
4458 }
4459
4460
4461 CalpontSystemCatalog::TableName aTableName;
4462 aTableName.schema = schema;
4463 aTableName.table = tableName;
4464 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
4465 CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
4466
4467 CalpontSystemCatalog::ROPair roPair;
4468
4469 CalpontSystemCatalog::RIDList tableRidList;
4470
4471 try
4472 {
4473 roPair = systemCatalogPtr->tableRID( aTableName);
4474 tableRidList = systemCatalogPtr->columnRIDs(aTableName, true);
4475 }
4476 catch (exception& ex)
4477 {
4478 err = ex.what();
4479 rc = 1;
4480 return rc;
4481
4482 }
4483
4484 WriteEngine::ColStructList colStructList;
4485 WriteEngine::ColStruct colStruct;
4486 WriteEngine::DctnryStructList dctnryStructList;
4487 /* colStruct.fColPartition = partition;
4488 colStruct.fColSegment = segment;
4489 colStruct.fColDbRoot = dbRoot; */
4490 colStruct.fColPartition = 0;
4491 colStruct.fColSegment = 0;
4492 colStruct.fColDbRoot = 3;
4493 //should we always scan dictionary store files?
4494
4495 try
4496 {
4497 for (unsigned i = 0; i < tableRidList.size(); i++)
4498 {
4499 CalpontSystemCatalog::ColType colType;
4500 colType = systemCatalogPtr->colType( tableRidList[i].objnum );
4501 colStruct.dataOid = tableRidList[i].objnum;
4502 colStruct.tokenFlag = false;
4503 colStruct.fCompressionType = colType.compressionType;
4504 WriteEngine::DctnryStruct dctnryStruct;
4505 dctnryStruct.fColDbRoot = colStruct.fColDbRoot;
4506 dctnryStruct.fColPartition = colStruct.fColPartition;
4507 dctnryStruct.fColSegment = colStruct.fColSegment;
4508 dctnryStruct.fCompressionType = colStruct.fCompressionType;
4509 dctnryStruct.dctnryOid = 0;
4510
4511 if (colType.colWidth > 8) //token
4512 {
4513 colStruct.colWidth = 8;
4514 colStruct.tokenFlag = true;
4515 dctnryStruct.dctnryOid = colType.ddn.dictOID;
4516 }
4517 else
4518 {
4519 colStruct.colWidth = colType.colWidth;
4520 }
4521
4522 colStruct.colDataType = colType.colDataType;
4523
4524 colStructList.push_back( colStruct );
4525 dctnryStructList.push_back(dctnryStruct);
4526 }
4527 }
4528 catch (exception& ex)
4529 {
4530 err = ex.what();
4531 rc = 1;
4532 return rc;
4533
4534 }
4535
4536 int error = 0;
4537
4538 try
4539 {
4540 error = fWEWrapper.deleteBadRows( txnId, colStructList, rowIDList, dctnryStructList);
4541
4542 if (error != NO_ERROR)
4543 {
4544 rc = error;
4545 //cout << "WE Error code " << error << endl;
4546 WErrorCodes ec;
4547 err = ec.errorString(error);
4548
4549 if (error == ERR_BRM_DEAD_LOCK)
4550 {
4551 rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
4552 }
4553 else if (error == ERR_BRM_VB_OVERFLOW)
4554 {
4555 rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
4556 err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
4557 }
4558 }
4559 }
4560 catch (std::exception& ex)
4561 {
4562 err = ex.what();
4563 rc = 1;
4564 }
4565
4566 //cout << "WES return rc " << (int)rc << " with msg " << err << endl;
4567 return rc;
4568 }
4569
processEndTransaction(ByteStream & bs,std::string & err)4570 uint8_t WE_DMLCommandProc::processEndTransaction(ByteStream& bs, std::string& err)
4571 {
4572 int rc = 0;
4573 ByteStream::byte tmp8;
4574 bool success;
4575 uint32_t txnid;
4576 bs >> txnid;
4577 bs >> tmp8;
4578 success = (tmp8 != 0);
4579 rc = fWEWrapper.endTransaction(txnid, success);
4580
4581 if (rc != NO_ERROR)
4582 {
4583 WErrorCodes ec;
4584 err = ec.errorString(rc);
4585 }
4586
4587 return rc;
4588
4589 }
4590 //------------------------------------------------------------------------------
4591 // Validates the correctness of the current HWMs for this table.
4592 // The HWMs for all the 1 byte columns should be identical. Same goes
4593 // for all the 2 byte columns, etc. The 2 byte column HWMs should be
4594 // "roughly" (but not necessarily exactly) twice that of a 1 byte column.
4595 // Same goes for the 4 byte column HWMs vs their 2 byte counterparts, etc.
4596 // ridList - columns oids to be used to get column width on to use with validation.
4597 // segFileInfo - Vector of File objects carrying current DBRoot, partition,
4598 // HWM, etc to be validated for the columns belonging to jobTable.
4599 // stage - Current stage we are validating. "Starting" or "Ending".
4600 //------------------------------------------------------------------------------
validateColumnHWMs(CalpontSystemCatalog::RIDList & ridList,boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr,const std::vector<DBRootExtentInfo> & segFileInfo,const char * stage)4601 int WE_DMLCommandProc::validateColumnHWMs(
4602 CalpontSystemCatalog::RIDList& ridList,
4603 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr,
4604 const std::vector<DBRootExtentInfo>& segFileInfo,
4605 const char* stage )
4606 {
4607 int rc = NO_ERROR;
4608
4609 if ((!fIsFirstBatchPm) && (strcmp(stage, "Starting") == 0))
4610 return rc;
4611
4612 // Used to track first 1-byte, 2-byte, 4-byte, and 8-byte columns in table
4613 int byte1First = -1;
4614 int byte2First = -1;
4615 int byte4First = -1;
4616 int byte8First = -1;
4617
4618 // Make sure the HWMs for all 1-byte columns match; same for all 2-byte,
4619 // 4-byte, and 8-byte columns as well.
4620 CalpontSystemCatalog::ColType colType;
4621 Convertor convertor;
4622
4623 for (unsigned k = 0; k < segFileInfo.size(); k++)
4624 {
4625 int k1 = 0;
4626
4627 // Find out column width
4628 colType = systemCatalogPtr->colType(ridList[k].objnum);
4629 colType.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
4630
4631 // Find the first 1-byte, 2-byte, 4-byte, and 8-byte columns.
4632 // Use those as our reference HWM for the respective column widths.
4633 switch ( colType.colWidth )
4634 {
4635 case 1:
4636 {
4637 if (byte1First == -1)
4638 byte1First = k;
4639
4640 k1 = byte1First;
4641 break;
4642 }
4643
4644 case 2:
4645 {
4646 if (byte2First == -1)
4647 byte2First = k;
4648
4649 k1 = byte2First;
4650 break;
4651 }
4652
4653 case 4:
4654 {
4655 if (byte4First == -1)
4656 byte4First = k;
4657
4658 k1 = byte4First;
4659 break;
4660 }
4661
4662 case 8:
4663 default:
4664 {
4665 if (byte8First == -1)
4666 byte8First = k;
4667
4668 k1 = byte8First;
4669 break;
4670 }
4671 } // end of switch based on column width (1,2,4, or 8)
4672
4673 //std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 <<
4674 // "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm <<
4675 // " <to> col-" << k <<
4676 // "; wid-" << jobColK.width << " ; hwm-"<<segFileInfo[k].fLocalHwm<<std::endl;
4677
4678 // Validate that the HWM for this column (k) matches that of the
4679 // corresponding reference column with the same width.
4680 if ((segFileInfo[k1].fDbRoot != segFileInfo[k].fDbRoot) ||
4681 (segFileInfo[k1].fPartition != segFileInfo[k].fPartition) ||
4682 (segFileInfo[k1].fSegment != segFileInfo[k].fSegment) ||
4683 (segFileInfo[k1].fLocalHwm != segFileInfo[k].fLocalHwm))
4684 {
4685 CalpontSystemCatalog::ColType colType2;
4686 colType2 = systemCatalogPtr->colType(ridList[k1].objnum);
4687 ostringstream oss;
4688 oss << stage << " HWMs do not match for"
4689 " OID1-" << ridList[k1].objnum <<
4690 "; DBRoot-" << segFileInfo[k1].fDbRoot <<
4691 "; partition-" << segFileInfo[k1].fPartition <<
4692 "; segment-" << segFileInfo[k1].fSegment <<
4693 "; hwm-" << segFileInfo[k1].fLocalHwm <<
4694 "; width-" << colType2.colWidth << ':' << std::endl <<
4695 " and OID2-" << ridList[k].objnum <<
4696 "; DBRoot-" << segFileInfo[k].fDbRoot <<
4697 "; partition-" << segFileInfo[k].fPartition <<
4698 "; segment-" << segFileInfo[k].fSegment <<
4699 "; hwm-" << segFileInfo[k].fLocalHwm <<
4700 "; width-" << colType.colWidth;
4701 fLog.logMsg( oss.str(), ERR_UNKNOWN, MSGLVL_ERROR );
4702 return ERR_BRM_HWMS_NOT_EQUAL;
4703 }
4704
4705 // HWM DBRoot, partition, and segment number should match for all
4706 // columns; so compare DBRoot, part#, and seg# with first column.
4707 if ((segFileInfo[0].fDbRoot != segFileInfo[k].fDbRoot) ||
4708 (segFileInfo[0].fPartition != segFileInfo[k].fPartition) ||
4709 (segFileInfo[0].fSegment != segFileInfo[k].fSegment))
4710 {
4711 CalpontSystemCatalog::ColType colType2;
4712 colType2 = systemCatalogPtr->colType(ridList[0].objnum);
4713 ostringstream oss;
4714 oss << stage << " HWM DBRoot,Part#, or Seg# do not match for"
4715 " OID1-" << ridList[0].objnum <<
4716 "; DBRoot-" << segFileInfo[0].fDbRoot <<
4717 "; partition-" << segFileInfo[0].fPartition <<
4718 "; segment-" << segFileInfo[0].fSegment <<
4719 "; hwm-" << segFileInfo[0].fLocalHwm <<
4720 "; width-" << colType2.colWidth << ':' << std::endl <<
4721 " and OID2-" << ridList[k].objnum <<
4722 "; DBRoot-" << segFileInfo[k].fDbRoot <<
4723 "; partition-" << segFileInfo[k].fPartition <<
4724 "; segment-" << segFileInfo[k].fSegment <<
4725 "; hwm-" << segFileInfo[k].fLocalHwm <<
4726 "; width-" << colType.colWidth;
4727 fLog.logMsg( oss.str(), ERR_UNKNOWN, MSGLVL_ERROR );
4728 return ERR_BRM_HWMS_NOT_EQUAL;
4729 }
4730 } // end of loop to compare all 1-byte HWMs, 2-byte HWMs, etc.
4731
4732 // Validate/compare HWM for 1-byte column in relation to 2-byte column, etc.
4733 // Without knowing the exact row count, we can't extrapolate the exact HWM
4734 // for the wider column, but we can narrow it down to an expected range.
4735 int refCol = 0;
4736 int colIdx = 0;
4737
4738 // Validate/compare HWMs given a 1-byte column as a starting point
4739 if (byte1First >= 0)
4740 {
4741 refCol = byte1First;
4742
4743 if (byte2First >= 0)
4744 {
4745 HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 2;
4746 HWM hwmHi = hwmLo + 1;
4747
4748 if ((segFileInfo[byte2First].fLocalHwm < hwmLo) ||
4749 (segFileInfo[byte2First].fLocalHwm > hwmHi))
4750 {
4751 colIdx = byte2First;
4752 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
4753 goto errorCheck;
4754 }
4755 }
4756
4757 if (byte4First >= 0)
4758 {
4759 HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 4;
4760 HWM hwmHi = hwmLo + 3;
4761
4762 if ((segFileInfo[byte4First].fLocalHwm < hwmLo) ||
4763 (segFileInfo[byte4First].fLocalHwm > hwmHi))
4764 {
4765 colIdx = byte4First;
4766 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
4767 goto errorCheck;
4768 }
4769 }
4770
4771 if (byte8First >= 0)
4772 {
4773 HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 8;
4774 HWM hwmHi = hwmLo + 7;
4775
4776 if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
4777 (segFileInfo[byte8First].fLocalHwm > hwmHi))
4778 {
4779 colIdx = byte8First;
4780 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
4781 goto errorCheck;
4782 }
4783 }
4784 }
4785
4786 // Validate/compare HWMs given a 2-byte column as a starting point
4787 if (byte2First >= 0)
4788 {
4789 refCol = byte2First;
4790
4791 if (byte4First >= 0)
4792 {
4793 HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 2;
4794 HWM hwmHi = hwmLo + 1;
4795
4796 if ((segFileInfo[byte4First].fLocalHwm < hwmLo) ||
4797 (segFileInfo[byte4First].fLocalHwm > hwmHi))
4798 {
4799 colIdx = byte4First;
4800 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
4801 goto errorCheck;
4802 }
4803 }
4804
4805 if (byte8First >= 0)
4806 {
4807 HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 4;
4808 HWM hwmHi = hwmLo + 3;
4809
4810 if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
4811 (segFileInfo[byte8First].fLocalHwm > hwmHi))
4812 {
4813 colIdx = byte8First;
4814 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
4815 goto errorCheck;
4816 }
4817 }
4818 }
4819
4820 // Validate/compare HWMs given a 4-byte column as a starting point
4821 if (byte4First >= 0)
4822 {
4823 refCol = byte4First;
4824
4825 if (byte8First >= 0)
4826 {
4827 HWM hwmLo = segFileInfo[byte4First].fLocalHwm * 2;
4828 HWM hwmHi = hwmLo + 1;
4829
4830 if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
4831 (segFileInfo[byte8First].fLocalHwm > hwmHi))
4832 {
4833 colIdx = byte8First;
4834 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
4835 goto errorCheck;
4836 }
4837 }
4838 }
4839
4840 // To avoid repeating this message 6 times in the preceding source code, we
4841 // use the "dreaded" goto to branch to this single place for error handling.
4842 errorCheck:
4843
4844 if (rc != NO_ERROR)
4845 {
4846 CalpontSystemCatalog::ColType colType1, colType2;
4847 colType1 = systemCatalogPtr->colType(ridList[refCol].objnum);
4848 colType1.colWidth = convertor.getCorrectRowWidth(
4849 colType1.colDataType, colType1.colWidth);
4850
4851 colType2 = systemCatalogPtr->colType(ridList[colIdx].objnum);
4852 colType2.colWidth = convertor.getCorrectRowWidth(
4853 colType2.colDataType, colType2.colWidth);
4854
4855 ostringstream oss;
4856 oss << stage << " HWMs are not in sync for"
4857 " OID1-" << ridList[refCol].objnum <<
4858 "; DBRoot-" << segFileInfo[refCol].fDbRoot <<
4859 "; partition-" << segFileInfo[refCol].fPartition <<
4860 "; segment-" << segFileInfo[refCol].fSegment <<
4861 "; hwm-" << segFileInfo[refCol].fLocalHwm <<
4862 "; width-" << colType1.colWidth << ':' << std::endl <<
4863 " and OID2-" << ridList[colIdx].objnum <<
4864 "; DBRoot-" << segFileInfo[colIdx].fDbRoot <<
4865 "; partition-" << segFileInfo[colIdx].fPartition <<
4866 "; segment-" << segFileInfo[colIdx].fSegment <<
4867 "; hwm-" << segFileInfo[colIdx].fLocalHwm <<
4868 "; width-" << colType2.colWidth;
4869 fLog.logMsg( oss.str(), ERR_UNKNOWN, MSGLVL_ERROR );
4870 }
4871
4872 return rc;
4873 }
4874 }
4875