1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 // $Id: we_dmlcommandproc.cpp 3082 2011-09-26 22:00:38Z chao $
20 
21 #include <unistd.h>
22 using namespace std;
23 #include "bytestream.h"
24 using namespace messageqcpp;
25 
26 #include "we_messages.h"
27 #include "we_message_handlers.h"
28 #include "../../dbcon/dmlpackage/dmlpkg.h"
29 #include "we_dmlcommandproc.h"
30 using namespace dmlpackage;
31 #include "dmlpackageprocessor.h"
32 using namespace dmlpackageprocessor;
33 #include "dataconvert.h"
34 using namespace dataconvert;
35 #include "calpontsystemcatalog.h"
36 #include "sessionmanager.h"
37 using namespace execplan;
38 #include "messagelog.h"
39 #include "stopwatch.h"
40 #include "idberrorinfo.h"
41 #include "errorids.h"
42 using namespace logging;
43 
44 #include "brmtypes.h"
45 using namespace BRM;
46 #include "we_tablemetadata.h"
47 #include "we_dbrootextenttracker.h"
48 #include "we_bulkrollbackmgr.h"
49 #include "we_define.h"
50 #include "we_confirmhdfsdbfile.h"
51 #include "cacheutils.h"
52 #include "IDBDataFile.h"
53 #include "IDBPolicy.h"
54 
55 #include "checks.h"
56 
57 namespace WriteEngine
58 {
59 //StopWatch timer;
WE_DMLCommandProc()60 WE_DMLCommandProc::WE_DMLCommandProc()
61 {
62     fIsFirstBatchPm = true;
63     filesPerColumnPartition = 8;
64     extentsPerSegmentFile = 1;
65     dbrootCnt = 1;
66     extentRows = 0x800000;
67     config::Config* cf = config::Config::makeConfig();
68     string fpc = cf->getConfig("ExtentMap", "FilesPerColumnPartition");
69 
70     if (fpc.length() != 0)
71         filesPerColumnPartition = cf->uFromText(fpc);
72 
73     string epsf = cf->getConfig("ExtentMap", "ExtentsPerSegmentFile");
74 
75     if (epsf.length() != 0)
76         extentsPerSegmentFile = cf->uFromText(epsf);
77 
78     string dbct = cf->getConfig("SystemConfig", "DBRootCount");
79 
80     if (dbct.length() != 0)
81         dbrootCnt = cf->uFromText(dbct);
82 }
83 
WE_DMLCommandProc(const WE_DMLCommandProc & rhs)84 WE_DMLCommandProc::WE_DMLCommandProc(const WE_DMLCommandProc& rhs)
85 {
86     fIsFirstBatchPm = rhs.fIsFirstBatchPm;
87     fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
88 }
89 
~WE_DMLCommandProc()90 WE_DMLCommandProc::~WE_DMLCommandProc()
91 {
92     dbRootExtTrackerVec.clear();
93 }
94 
processSingleInsert(messageqcpp::ByteStream & bs,std::string & err)95 uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std::string& err)
96 {
97     uint8_t rc = 0;
98     err.clear();
99     InsertDMLPackage insertPkg;
100     ByteStream::quadbyte tmp32;
101     bs >> tmp32;
102     BRM::TxnID txnid;
103     txnid.valid = true;
104     txnid.id  = tmp32;
105     bs >> tmp32;
106     uint32_t dbroot = tmp32;
107 
108     //cout << "processSingleInsert received bytestream length " << bs.length() << endl;
109 
110     messageqcpp::ByteStream::byte packageType;
111     bs >> packageType;
112     insertPkg.read( bs);
113     uint32_t sessionId = insertPkg.get_SessionID();
114     //cout << " processSingleInsert for session " << sessionId << endl;
115     DMLTable* tablePtr = insertPkg.get_Table();
116     RowList rows = tablePtr->get_RowList();
117 
118     WriteEngine::ColStructList colStructs;
119     WriteEngine::DctnryStructList dctnryStructList;
120     WriteEngine::DctnryValueList dctnryValueList;
121     WriteEngine::ColValueList colValuesList;
122     WriteEngine::DictStrList dicStringList ;
123     CalpontSystemCatalog::TableName tableName;
124     CalpontSystemCatalog::TableColName tableColName;
125     tableName.table = tableColName.table = tablePtr->get_TableName();
126     tableName.schema = tableColName.schema = tablePtr->get_SchemaName();
127 
128     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
129     systemCatalogPtr->identity(CalpontSystemCatalog::EC);
130     CalpontSystemCatalog::ROPair tableRoPair;
131     std::vector<string> colNames;
132     bool isWarningSet = false;
133 
134     try
135     {
136         tableRoPair = systemCatalogPtr->tableRID(tableName);
137 
138         if (rows.size())
139         {
140             Row* rowPtr = rows.at(0);
141             ColumnList columns = rowPtr->get_ColumnList();
142             ColumnList::const_iterator column_iterator = columns.begin();
143 
144             while (column_iterator != columns.end())
145             {
146                 DMLColumn* columnPtr = *column_iterator;
147                 tableColName.column = columnPtr->get_Name();
148                 CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName);
149 
150                 CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
151 
152                 CalpontSystemCatalog::ColType colType;
153                 colType = systemCatalogPtr->colType(oid);
154 
155                 WriteEngine::ColStruct colStruct;
156                 colStruct.fColDbRoot = dbroot;
157                 WriteEngine::DctnryStruct dctnryStruct;
158                 dctnryStruct.fColDbRoot = dbroot;
159                 colStruct.dataOid = roPair.objnum;
160                 colStruct.tokenFlag = false;
161                 colStruct.fCompressionType = colType.compressionType;
162 
163                 // Token
164                 if ( isDictCol(colType) )
165                 {
166                     colStruct.colWidth = 8;
167                     colStruct.tokenFlag = true;
168                 }
169                 else
170                 {
171                     colStruct.colWidth = colType.colWidth;
172                 }
173 
174                 colStruct.colDataType = colType.colDataType;
175 
176                 if (colStruct.tokenFlag)
177                 {
178                     dctnryStruct.dctnryOid = colType.ddn.dictOID;
179                     dctnryStruct.columnOid = colStruct.dataOid;
180                     dctnryStruct.fCompressionType = colType.compressionType;
181                     dctnryStruct.colWidth = colType.colWidth;
182                 }
183                 else
184                 {
185                     dctnryStruct.dctnryOid = 0;
186                     dctnryStruct.columnOid = colStruct.dataOid;
187                     dctnryStruct.fCompressionType = colType.compressionType;
188                     dctnryStruct.colWidth = colType.colWidth;
189                 }
190 
191                 colStructs.push_back(colStruct);
192                 dctnryStructList.push_back(dctnryStruct);
193 
194                 ++column_iterator;
195             }
196 
197             unsigned int numcols = rowPtr->get_NumberOfColumns();
198             std::string tmpStr("");
199 
200             for (unsigned int i = 0; i < numcols; i++)
201             {
202 
203                 WriteEngine::ColTupleList colTuples;
204                 WriteEngine::DctColTupleList dctColTuples;
205                 RowList::const_iterator row_iterator = rows.begin();
206 
207                 while (row_iterator != rows.end())
208                 {
209                     Row* rowPtr = *row_iterator;
210                     const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i);
211 
212                     tableColName.column = columnPtr->get_Name();
213                     CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
214 
215                     CalpontSystemCatalog::ColType colType;
216                     colType = systemCatalogPtr->colType(oid);
217 
218                     boost::any datavalue;
219                     bool isNULL = false;
220                     bool pushWarning = false;
221                     std::vector<std::string> origVals;
222                     origVals = columnPtr->get_DataVector();
223                     WriteEngine::dictStr dicStrings;
224 
225                     // token
226                     if ( isDictCol(colType) )
227                     {
228                         for ( uint32_t i = 0; i < origVals.size(); i++ )
229                         {
230                             tmpStr = origVals[i];
231 
232                             isNULL = columnPtr->get_isnull();
233 
234                             if ( isNULL || ( tmpStr.length() == 0 ) )
235                                 isNULL = true;
236                             else
237                                 isNULL = false;
238 
239                             if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
240                             {
241                                 if (isNULL && colType.defaultValue.empty()) //error out
242                                 {
243                                     Message::Args args;
244                                     args.add(tableColName.column);
245                                     err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
246                                     rc = 1;
247                                     return rc;
248                                 }
249                                 else if (isNULL && !(colType.defaultValue.empty()))
250                                 {
251                                     tmpStr = colType.defaultValue;
252                                 }
253                             }
254 
255                             if ( tmpStr.length() > (unsigned int)colType.colWidth )
256                             {
257                                 tmpStr = tmpStr.substr(0, colType.colWidth);
258 
259                                 if ( !pushWarning )
260                                 {
261                                     pushWarning = true;
262                                     isWarningSet = true;
263 
264                                     if ((rc != NO_ERROR) && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
265                                         rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
266 
267                                     colNames.push_back(tableColName.column);
268                                 }
269                             }
270 
271                             WriteEngine::ColTuple colTuple;
272                             colTuple.data = datavalue;
273 
274                             colTuples.push_back(colTuple);
275                             //@Bug 2515. Only pass string values to write engine
276                             dicStrings.push_back( tmpStr );
277                         }
278 
279                         colValuesList.push_back(colTuples);
280                         //@Bug 2515. Only pass string values to write engine
281                         dicStringList.push_back( dicStrings );
282                     }
283                     else
284                     {
285                         string x;
286                         std::string indata;
287 
288                         for ( uint32_t i = 0; i < origVals.size(); i++ )
289                         {
290                             indata = origVals[i];
291 
292                             isNULL = columnPtr->get_isnull();
293 
294                             if ( isNULL || ( indata.length() == 0 ) )
295                                 isNULL = true;
296                             else
297                                 isNULL = false;
298 
299                             //check if autoincrement column and value is 0 or null
300                             uint64_t nextVal = 1;
301 
302                             if (colType.autoincrement)
303                             {
304                                 try
305                                 {
306                                     nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
307                                     fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
308                                 }
309                                 catch (std::exception& ex)
310                                 {
311                                     err = ex.what();
312                                     rc = 1;
313                                     return rc;
314                                 }
315                             }
316 
317                             if (colType.autoincrement && ( isNULL || (indata.compare("0") == 0)))
318                             {
319                                 try
320                                 {
321                                     bool reserved = fDbrm.getAIRange(oid, 1, &nextVal);
322 
323                                     if (!reserved)
324                                     {
325                                         err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
326                                         rc = 1;
327                                         return rc;
328                                     }
329                                 }
330                                 catch (std::exception& ex)
331                                 {
332                                     err = ex.what();
333                                     rc = 1;
334                                     return rc;
335                                 }
336 
337                                 ostringstream oss;
338                                 oss << nextVal;
339                                 indata = oss.str();
340                                 isNULL = false;
341                             }
342 
343                             if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
344                             {
345                                 if (isNULL && colType.defaultValue.empty()) //error out
346                                 {
347                                     Message::Args args;
348                                     args.add(tableColName.column);
349                                     err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
350                                     rc = 1;
351                                     return rc;
352                                 }
353                                 else if (isNULL && !(colType.defaultValue.empty()))
354                                 {
355                                     indata = colType.defaultValue;
356                                     isNULL = false;
357                                 }
358                             }
359 
360                             try
361                             {
362                                 datavalue = DataConvert::convertColumnData(colType, indata, pushWarning, insertPkg.get_TimeZone(), isNULL, false, false);
363                             }
364                             catch (exception&)
365                             {
366                                 rc = 1;
367                                 Message::Args args;
368                                 args.add(string("'") + indata + string("'"));
369                                 err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
370                             }
371 
372                             //@Bug 1806
373                             if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
374                             {
375                                 return rc;
376                             }
377 
378                             if ( pushWarning)
379                             {
380                                 if (!isWarningSet)
381                                     isWarningSet = true;
382 
383                                 if ( rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING )
384                                     rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
385 
386                                 colNames.push_back(tableColName.column);
387                             }
388 
389                             WriteEngine::ColTuple colTuple;
390                             colTuple.data = datavalue;
391 
392                             colTuples.push_back(colTuple);
393                             //@Bug 2515. Only pass string values to write engine
394                             dicStrings.push_back( tmpStr );
395                         }
396 
397                         colValuesList.push_back(colTuples);
398                         dicStringList.push_back( dicStrings );
399                     }
400 
401                     ++row_iterator;
402                 }
403             }
404         }
405     }
406     catch (exception& ex)
407     {
408         rc = 1;
409         err = ex.what();
410         return rc;
411     }
412 
413     // call the write engine to write the rows
414     int error = NO_ERROR;
415     //fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
416     //cout << "inserting a row with transaction id " << txnid.id << endl;
417     fWEWrapper.setIsInsert(true);
418     fWEWrapper.setBulkFlag(true);
419     fWEWrapper.setTransId(txnid.id);
420     //For hdfs use only
421     uint32_t  tblOid = tableRoPair.objnum;
422 
423     if (idbdatafile::IDBPolicy::useHdfs())
424     {
425 
426         std::vector<Column> columns;
427         DctnryStructList dctnryList;
428         CalpontSystemCatalog::ColType colType;
429         std::vector<DBRootExtentInfo> colDBRootExtentInfo;
430         Convertor convertor;
431         dbRootExtTrackerVec.clear();
432         fRBMetaWriter.reset(new RBMetaWriter("SingleInsert", NULL));
433         CalpontSystemCatalog::RIDList ridList;
434 
435         try
436         {
437             ridList = systemCatalogPtr->columnRIDs(tableName, true);
438             std::vector<OID>    dctnryStoreOids(ridList.size()) ;
439             std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
440             bool bFirstExtentOnThisPM = false;
441 
442             // First gather HWM BRM information for all columns
443             std::vector<int> colWidths;
444 
445             for (unsigned i = 0; i < ridList.size(); i++)
446             {
447                 rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
448                 //need handle error
449 
450                 CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
451                 colWidths.push_back( convertor.getCorrectRowWidth(
452                                          colType2.colDataType, colType2.colWidth) );
453             }
454 
455             for (unsigned i = 0; i < ridList.size(); i++)
456             {
457                 // Find DBRoot/segment file where we want to start adding rows
458                 colType = systemCatalogPtr->colType(ridList[i].objnum);
459                 boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker (new DBRootExtentTracker(ridList[i].objnum,
460                         colWidths, dbRootHWMInfoColVec, i, 0) );
461                 dbRootExtTrackerVec.push_back( pDBRootExtentTracker );
462                 DBRootExtentInfo dbRootExtent;
463                 std::string trkErrMsg;
464                 bool bEmptyPM;
465 
466                 if (i == 0)
467                     rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM, trkErrMsg);
468                 else
469                     pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
470 
471                 colDBRootExtentInfo.push_back(dbRootExtent);
472 
473                 Column aColumn;
474                 aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
475                 aColumn.colDataType = colType.colDataType;
476                 aColumn.compressionType = colType.compressionType;
477                 aColumn.dataFile.oid = ridList[i].objnum;
478                 aColumn.dataFile.fPartition = dbRootExtent.fPartition;
479                 aColumn.dataFile.fSegment = dbRootExtent.fSegment;
480                 aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
481                 aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
482                 columns.push_back(aColumn);
483 
484                 if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
485                 {
486                     DctnryStruct aDctnry;
487                     aDctnry.dctnryOid = colType.ddn.dictOID;
488                     aDctnry.fColPartition = dbRootExtent.fPartition;
489                     aDctnry.fColSegment = dbRootExtent.fSegment;
490                     aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
491                     dctnryList.push_back(aDctnry);
492                 }
493 
494                 if (colType.ddn.dictOID > 0)
495                 {
496                     dctnryStoreOids[i] = colType.ddn.dictOID;
497                 }
498                 else
499                 {
500                     dctnryStoreOids[i] = 0;
501                 }
502             }
503 
504             fRBMetaWriter->init(tblOid, tableName.table);
505             fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
506 
507             //cout << "Backing up hwm chunks" << endl;
508             for (unsigned i = 0; i < dctnryList.size(); i++) //back up chunks for compressed dictionary
509             {
510                 // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
511                 fRBMetaWriter->backupDctnryHWMChunk(
512                     dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment);
513             }
514         }
515         catch (std::exception& ex)
516         {
517             err = ex.what();
518             rc = 1;
519             return rc;
520         }
521     }
522 
523     if (colValuesList[0].size() > 0)
524     {
525         if (NO_ERROR !=
526                 (error = fWEWrapper.insertColumnRec_Single(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, tableRoPair.objnum)))
527         {
528             if (error == ERR_BRM_DEAD_LOCK)
529             {
530                 rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
531                 WErrorCodes ec;
532                 err = ec.errorString(error);
533             }
534             else if ( error == ERR_BRM_VB_OVERFLOW )
535             {
536                 rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
537                 err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
538             }
539             else
540             {
541                 rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
542                 WErrorCodes ec;
543                 err = ec.errorString(error);
544             }
545         }
546     }
547 
548     std::map<uint32_t, uint32_t> oids;
549     std::vector<BRM::OID_t>  oidsToFlush;
550 
551     for ( unsigned i = 0; i < colStructs.size(); i++)
552     {
553         oids[colStructs[i].dataOid] = colStructs[i].dataOid;
554         oidsToFlush.push_back(colStructs[i].dataOid);
555     }
556 
557     for (unsigned i = 0; i < dctnryStructList.size(); i++)
558     {
559         oids[dctnryStructList[i].dctnryOid] = dctnryStructList[i].dctnryOid;
560         oidsToFlush.push_back(dctnryStructList[i].dctnryOid);
561     }
562 
563     fWEWrapper.setTransId(txnid.id);
564     vector<LBID_t> lbidList;
565 
566     if (idbdatafile::IDBPolicy::useHdfs())
567     {
568         //save the extent info to mark them invalid, after flush, the meta file will be gone.
569         std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
570         std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>	m_txnLBIDMap = fWEWrapper.getTxnMap();
571 
572         try
573         {
574             mapIter = m_txnLBIDMap.find(txnid.id);
575 
576             if (mapIter != m_txnLBIDMap.end())
577             {
578                 SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
579                 std::tr1::unordered_map<BRM::LBID_t, uint32_t> ::iterator listIter = spTxnLBIDRec->m_LBIDMap.begin();
580 
581                 while (listIter != spTxnLBIDRec->m_LBIDMap.end())
582                 {
583                     lbidList.push_back(listIter->first);
584                     listIter++;
585                 }
586             }
587         }
588         catch (...) {}
589     }
590 
591     //flush files
592     // @bug5333, up to here, rc may have an error code already, don't overwrite it.
593     int flushChunksRc = fWEWrapper.flushChunks(0, oids);  // why not pass rc to flushChunks?
594 
595     if (flushChunksRc != NO_ERROR)
596     {
597         WErrorCodes ec;
598         std::ostringstream ossErr;
599         ossErr << "Error flushing chunks for table " << tableName <<
600                "; " << ec.errorString(flushChunksRc);
601 
602         // Append to errmsg in case we already have an error
603         if (err.length() > 0)
604             err += "; ";
605 
606         err += ossErr.str();
607 
608         if (error == NO_ERROR)
609             error = flushChunksRc;
610 
611         if ((rc == NO_ERROR) || (rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
612             rc = 1;  // return hardcoded 1 as the above
613     }
614 
615     // Confirm HDFS DB file changes "only" if no error up to this point
616     if (idbdatafile::IDBPolicy::useHdfs())
617     {
618         if (error == NO_ERROR)
619         {
620             std::string eMsg;
621             ConfirmHdfsDbFile confirmHdfs;
622             error = confirmHdfs.confirmDbFileListFromMetaFile(tblOid, eMsg);
623 
624             if (error != NO_ERROR)
625             {
626                 ostringstream ossErr;
627                 ossErr << "Error confirming changes to table " <<
628                        tableName << "; " << eMsg;
629                 err = ossErr.str();
630                 rc  = 1;
631             }
632             else // Perform extra cleanup that is necessary for HDFS
633             {
634                 std::string eMsg;
635                 ConfirmHdfsDbFile confirmHdfs;
636                 int confirmRc2 = confirmHdfs.endDbFileListFromMetaFile(
637                                      tblOid, true, eMsg);
638 
639                 if (confirmRc2 != NO_ERROR)
640                 {
641                     // Might want to log this error, but don't think we need
642                     // to report as fatal, since all changes were confirmed.
643                 }
644 
645                 //flush PrimProc FD cache
646                 TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tblOid);
647                 ColsExtsInfoMap colsExtsInfoMap = aTableMetaData->getColsExtsInfoMap();
648                 ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
649                 ColExtsInfo::iterator aIt;
650                 std::vector<BRM::FileInfo> files;
651                 BRM::FileInfo aFile;
652 
653                 while (it != colsExtsInfoMap.end())
654                 {
655                     aIt = (it->second).begin();
656                     aFile.oid = it->first;
657 
658                     while (aIt != (it->second).end())
659                     {
660                         aFile.partitionNum = aIt->partNum;
661                         aFile.dbRoot = aIt->dbRoot;
662                         aFile.segmentNum = aIt->segNum;
663                         aFile.compType = aIt->compType;
664                         files.push_back(aFile);
665                         aIt++;
666                     }
667 
668                     it++;
669                 }
670 
671                 if (files.size() > 0)
672                     cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
673 
674                 cacheutils::flushOIDsFromCache(oidsToFlush);
675                 fDbrm.invalidateUncommittedExtentLBIDs(0, &lbidList);
676 
677                 try
678                 {
679                     BulkRollbackMgr::deleteMetaFile( tblOid );
680                 }
681                 catch (exception& ex)
682                 {
683                     err = ex.what();
684                     rc = 1;
685                 }
686             }
687         } // (error == NO_ERROR) through call to flushChunks()
688 
689         if (error != NO_ERROR) // rollback
690         {
691             string applName ("SingleInsert");
692             fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(),
693                                     applName, false, err);
694             BulkRollbackMgr::deleteMetaFile( tblOid );
695         }
696     } // extra hdfs steps
697 
698     fWEWrapper.setIsInsert(false);
699     fWEWrapper.setBulkFlag(false);
700     TableMetaData::removeTableMetaData(tblOid);
701 
702     if ((rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) || isWarningSet)
703     {
704         if (rc == NO_ERROR)
705             rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
706 
707         Message::Args args;
708         string cols = "'" + colNames[0] + "'";
709 
710         for (unsigned i = 1; i < colNames.size(); i++)
711         {
712             cols = cols + ", " +  "'" + colNames[i] + "'";
713         }
714 
715         args.add(cols);
716         err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
717 
718         // Strict mode enabled, so rollback on warning
719         if (insertPkg.get_isWarnToError())
720         {
721             string applName ("SingleInsert");
722             fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(),
723                                     applName, false, err);
724             BulkRollbackMgr::deleteMetaFile( tblOid );
725         }
726     }
727 
728     // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
729     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
730     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
731 
732     return rc;
733 }
734 
commitVersion(ByteStream & bs,std::string & err)735 uint8_t WE_DMLCommandProc::commitVersion(ByteStream& bs, std::string& err)
736 {
737     int rc = 0;
738     uint32_t tmp32;
739     int txnID;
740 
741     bs >> tmp32;
742     txnID = tmp32;
743     //cout << "processing commit txnid = " << txnID << endl;
744     rc = fWEWrapper.commit(txnID);
745 
746     if (rc != 0)
747     {
748         WErrorCodes ec;
749         ostringstream oss;
750         oss << "WE: Error commiting transaction; "  << txnID << ec.errorString(rc) << endl;
751         err = oss.str();
752     }
753 
754     return rc;
755 }
756 
rollbackBlocks(ByteStream & bs,std::string & err)757 uint8_t WE_DMLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err)
758 {
759     int rc = 0;
760     uint32_t sessionID, tmp32;;
761     int txnID;
762     bs >> sessionID;
763     bs >> tmp32;
764     txnID = tmp32;
765 
766     //cout << "processing rollbackBlocks txnid = " << txnID << endl;
767     try
768     {
769         rc = fWEWrapper.rollbackBlocks(txnID, sessionID);
770     }
771     catch (std::exception& ex)
772     {
773         rc = 1;
774         err = ex.what();
775     }
776 
777     return rc;
778 }
779 
rollbackVersion(ByteStream & bs,std::string & err)780 uint8_t WE_DMLCommandProc::rollbackVersion(ByteStream& bs, std::string& err)
781 {
782     int rc = 0;
783     uint32_t sessionID, tmp32;
784     int txnID;
785     bs >> sessionID;
786     bs >> tmp32;
787     txnID = tmp32;
788     //cout << "processing rollbackVersion txnid = " << txnID << endl;
789     rc = fWEWrapper.rollbackVersion(txnID, sessionID);
790 
791     if (rc != 0)
792     {
793         WErrorCodes ec;
794         ostringstream oss;
795         oss << "WE: Error rolling back transaction "  << txnID << " for session " <<  sessionID << "; " << ec.errorString(rc) << endl;
796         err = oss.str();
797     }
798 
799     return rc;
800 }
801 
processBatchInsert(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId)802 uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId)
803 {
804     int rc = 0;
805     //cout << "processBatchInsert received bytestream length " << bs.length() << endl;
806 
807     InsertDMLPackage insertPkg;
808     ByteStream::quadbyte tmp32;
809     bs >> tmp32;
810     //cout << "processBatchInsert got transaction id " << tmp32 << endl;
811     bs >> PMId;
812     //cout << "processBatchInsert gor PMId " << PMId << endl;
813     insertPkg.read( bs);
814     uint32_t sessionId = insertPkg.get_SessionID();
815     //cout << " processBatchInsert for session " << sessionId << endl;
816     DMLTable* tablePtr = insertPkg.get_Table();
817     bool isAutocommitOn = insertPkg.get_isAutocommitOn();
818 
819     if (idbdatafile::IDBPolicy::useHdfs())
820         isAutocommitOn = true;
821 
822     //cout << "This session isAutocommitOn is " << isAutocommitOn << endl;
823     BRM::TxnID txnid;
824     txnid.id  = tmp32;
825     txnid.valid = true;
826     RowList rows = tablePtr->get_RowList();
827     bool isInsertSelect = insertPkg.get_isInsertSelect();
828 
829     WriteEngine::ColStructList colStructs;
830     WriteEngine::DctnryStructList dctnryStructList;
831     WriteEngine::DctnryValueList dctnryValueList;
832     WriteEngine::ColValueList colValuesList;
833     WriteEngine::DictStrList dicStringList ;
834     CalpontSystemCatalog::TableName tableName;
835     CalpontSystemCatalog::TableColName tableColName;
836     tableName.table = tableColName.table = tablePtr->get_TableName();
837     tableName.schema = tableColName.schema = tablePtr->get_SchemaName();
838     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
839     systemCatalogPtr->identity(CalpontSystemCatalog::EC);
840     CalpontSystemCatalog::ROPair roPair;
841     CalpontSystemCatalog::RIDList ridList;
842     CalpontSystemCatalog::DictOIDList dictOids;
843 
844     try
845     {
846         ridList = systemCatalogPtr->columnRIDs(tableName, true);
847         roPair = systemCatalogPtr->tableRID( tableName);
848     }
849     catch (std::exception& ex)
850     {
851         err = ex.what();
852         rc = 1;
853         return rc;
854     }
855 
856 
857     std::vector<OID>    dctnryStoreOids(ridList.size()) ;
858     std::vector<Column> columns;
859     DctnryStructList dctnryList;
860     std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
861 
862     uint32_t  tblOid = roPair.objnum;
863     CalpontSystemCatalog::ColType colType;
864     std::vector<DBRootExtentInfo> colDBRootExtentInfo;
865     bool bFirstExtentOnThisPM = false;
866     Convertor convertor;
867 
868     if ( fIsFirstBatchPm )
869     {
870         dbRootExtTrackerVec.clear();
871 
872         if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
873             fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
874 
875         fWEWrapper.setIsInsert(true);
876         fWEWrapper.setBulkFlag(true);
877         fWEWrapper.setTransId(txnid.id);
878 
879         try
880         {
881             // First gather HWM BRM information for all columns
882             std::vector<int> colWidths;
883 
884             for (unsigned i = 0; i < ridList.size(); i++)
885             {
886                 rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
887                 //need handle error
888 
889                 CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
890                 colWidths.push_back( convertor.getCorrectRowWidth(
891                                          colType2.colDataType, colType2.colWidth) );
892             }
893 
894             for (unsigned i = 0; i < ridList.size(); i++)
895             {
896                 // Find DBRoot/segment file where we want to start adding rows
897                 colType = systemCatalogPtr->colType(ridList[i].objnum);
898                 boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker (new DBRootExtentTracker(ridList[i].objnum,
899                         colWidths, dbRootHWMInfoColVec, i, 0) );
900                 dbRootExtTrackerVec.push_back( pDBRootExtentTracker );
901                 DBRootExtentInfo dbRootExtent;
902                 std::string trkErrMsg;
903                 bool bEmptyPM;
904 
905                 if (i == 0)
906                 {
907                     rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM, trkErrMsg);
908                     /*	cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " << (int)bFirstExtentOnThisPM <<
909                     	" oid:dbroot:hwm = " << ridList[i].objnum << ":"<<dbRootExtent.fDbRoot << ":"
910                     	<<":"<<dbRootExtent.fLocalHwm << " err = " << trkErrMsg << endl; */
911                 }
912                 else
913                     pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
914 
915 
916                 colDBRootExtentInfo.push_back(dbRootExtent);
917 
918                 Column aColumn;
919                 aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
920                 aColumn.colDataType = colType.colDataType;
921                 aColumn.compressionType = colType.compressionType;
922                 aColumn.dataFile.oid = ridList[i].objnum;
923                 aColumn.dataFile.fPartition = dbRootExtent.fPartition;
924                 aColumn.dataFile.fSegment = dbRootExtent.fSegment;
925                 aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
926                 aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
927                 columns.push_back(aColumn);
928 
929                 if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
930                 {
931                     DctnryStruct aDctnry;
932                     aDctnry.dctnryOid = colType.ddn.dictOID;
933                     aDctnry.fColPartition = dbRootExtent.fPartition;
934                     aDctnry.fColSegment = dbRootExtent.fSegment;
935                     aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
936                     dctnryList.push_back(aDctnry);
937                 }
938 
939                 if (colType.ddn.dictOID > 0)
940                 {
941                     dctnryStoreOids[i] = colType.ddn.dictOID;
942                 }
943                 else
944                 {
945                     dctnryStoreOids[i] = 0;
946                 }
947             }
948         }
949         catch (std::exception& ex)
950         {
951             err = ex.what();
952             rc = 1;
953             return rc;
954         }
955 
956         //@Bug 5996 validate hwm before starts
957         rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting");
958 
959         if ( rc != 0)
960         {
961             WErrorCodes ec;
962             err = ec.errorString(rc);
963             err += " Check err.log for detailed information.";
964             fIsFirstBatchPm = false;
965             rc = 1;
966             return rc;
967         }
968     }
969 
970     std::vector<BRM::LBIDRange>   rangeList;
971 
972     // use of MetaFile for bulk rollback support
973     if ( fIsFirstBatchPm && isAutocommitOn)
974     {
975         //save meta data, version last block for each dbroot at the start of batch insert
976         try
977         {
978             fRBMetaWriter->init(tblOid, tableName.table);
979             fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
980 
981             //cout << "Saved meta files" << endl;
982             if (!bFirstExtentOnThisPM)
983             {
984                 //cout << "Backing up hwm chunks" << endl;
985                 for (unsigned i = 0; i < dctnryList.size(); i++) //back up chunks for compressed dictionary
986                 {
987                     // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
988                     fRBMetaWriter->backupDctnryHWMChunk(
989                         dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment);
990                 }
991             }
992         }
993         catch (WeException& ex) // catch exception to close file, then rethrow
994         {
995             rc = 1;
996             err =  ex.what();
997         }
998 
999         //Do versioning. Currently, we only version columns, not strings. If there is a design change, this will need to be re-visited
1000         if ( rc != 0)
1001             return rc;
1002 
1003     }
1004 
1005     std::vector<string> colNames;
1006     bool isWarningSet = false;
1007 
1008     if (rows.size())
1009     {
1010         Row* rowPtr = rows.at(0);
1011         ColumnList columns = rowPtr->get_ColumnList();
1012         ColumnList::const_iterator column_iterator = columns.begin();
1013 
1014         try
1015         {
1016             while (column_iterator != columns.end())
1017             {
1018                 DMLColumn* columnPtr = *column_iterator;
1019                 tableColName.column = columnPtr->get_Name();
1020                 CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName);
1021 
1022                 CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
1023 
1024                 CalpontSystemCatalog::ColType colType;
1025                 colType = systemCatalogPtr->colType(oid);
1026 
1027                 WriteEngine::ColStruct colStruct;
1028                 WriteEngine::DctnryStruct dctnryStruct;
1029                 colStruct.dataOid = roPair.objnum;
1030                 colStruct.tokenFlag = false;
1031                 colStruct.fCompressionType = colType.compressionType;
1032 
1033                 // Token
1034                 if ( isDictCol(colType) )
1035                 {
1036                     colStruct.colWidth = 8;
1037                     colStruct.tokenFlag = true;
1038                 }
1039                 else
1040                 {
1041                     colStruct.colWidth = colType.colWidth;
1042                 }
1043 
1044                 colStruct.colDataType = colType.colDataType;
1045 
1046                 if (colStruct.tokenFlag)
1047                 {
1048                     dctnryStruct.dctnryOid = colType.ddn.dictOID;
1049                     dctnryStruct.columnOid = colStruct.dataOid;
1050                     dctnryStruct.fCompressionType = colType.compressionType;
1051                     dctnryStruct.colWidth = colType.colWidth;
1052                 }
1053                 else
1054                 {
1055                     dctnryStruct.dctnryOid = 0;
1056                     dctnryStruct.columnOid = colStruct.dataOid;
1057                     dctnryStruct.fCompressionType = colType.compressionType;
1058                     dctnryStruct.colWidth = colType.colWidth;
1059                 }
1060 
1061                 colStructs.push_back(colStruct);
1062                 dctnryStructList.push_back(dctnryStruct);
1063 
1064                 ++column_iterator;
1065             }
1066         }
1067         catch (std::exception& ex)
1068         {
1069             err = ex.what();
1070             rc = 1;
1071             return rc;
1072         }
1073 
1074         unsigned int numcols = rowPtr->get_NumberOfColumns();
1075         std::string tmpStr("");
1076 
1077         try
1078         {
1079             for (unsigned int i = 0; i < numcols; i++)
1080             {
1081                 WriteEngine::ColTupleList colTuples;
1082                 WriteEngine::DctColTupleList dctColTuples;
1083                 RowList::const_iterator row_iterator = rows.begin();
1084                 bool pushWarning = false;
1085 
1086                 while (row_iterator != rows.end())
1087                 {
1088                     Row* rowPtr = *row_iterator;
1089                     const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i);
1090 
1091                     tableColName.column = columnPtr->get_Name();
1092                     CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
1093 
1094                     CalpontSystemCatalog::ColType colType;
1095                     colType = systemCatalogPtr->colType(oid);
1096 
1097                     boost::any datavalue;
1098                     bool isNULL = false;
1099                     std::vector<std::string> origVals;
1100                     origVals = columnPtr->get_DataVector();
1101                     WriteEngine::dictStr dicStrings;
1102 
1103                     // token
1104                     if ( isDictCol(colType) )
1105                     {
1106                         for ( uint32_t i = 0; i < origVals.size(); i++ )
1107                         {
1108                             tmpStr = origVals[i];
1109 
1110                             if ( tmpStr.length() == 0 )
1111                                 isNULL = true;
1112                             else
1113                                 isNULL = false;
1114 
1115                             if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
1116                             {
1117                                 if (isNULL && colType.defaultValue.empty()) //error out
1118                                 {
1119                                     Message::Args args;
1120                                     args.add(tableColName.column);
1121                                     err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
1122                                     rc = 1;
1123                                     return rc;
1124                                 }
1125                                 else if (isNULL && !(colType.defaultValue.empty()))
1126                                 {
1127                                     tmpStr = colType.defaultValue;
1128                                 }
1129                             }
1130 
1131                             if ( tmpStr.length() > (unsigned int)colType.colWidth )
1132                             {
1133                                 tmpStr = tmpStr.substr(0, colType.colWidth);
1134 
1135                                 if ( !pushWarning )
1136                                     pushWarning = true;
1137                             }
1138 
1139                             WriteEngine::ColTuple colTuple;
1140                             colTuple.data = datavalue;
1141 
1142                             colTuples.push_back(colTuple);
1143                             //@Bug 2515. Only pass string values to write engine
1144                             dicStrings.push_back( tmpStr );
1145                         }
1146 
1147                         colValuesList.push_back(colTuples);
1148                         //@Bug 2515. Only pass string values to write engine
1149                         dicStringList.push_back( dicStrings );
1150                     }
1151                     else
1152                     {
1153                         string x;
1154                         std::string indata;
1155                         //scan once to check how many autoincrement value needed
1156                         uint32_t nextValNeeded = 0;
1157                         uint64_t nextVal = 1;
1158 
1159                         if (colType.autoincrement)
1160                         {
1161                             try
1162                             {
1163                                 nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
1164                                 fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
1165                             }
1166                             catch (std::exception& ex)
1167                             {
1168                                 err = ex.what();
1169                                 rc = 1;
1170                                 return rc;
1171                             }
1172 
1173                             for ( uint32_t i = 0; i < origVals.size(); i++ )
1174                             {
1175                                 indata = origVals[i];
1176 
1177                                 if ( indata.length() == 0 )
1178                                     isNULL = true;
1179                                 else
1180                                     isNULL = false;
1181 
1182                                 if ( isNULL || (indata.compare("0") == 0))
1183                                     nextValNeeded++;
1184                             }
1185                         }
1186 
1187                         if (nextValNeeded > 0) //reserve next value
1188                         {
1189                             try
1190                             {
1191                                 bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);
1192 
1193                                 if (!reserved)
1194                                 {
1195                                     err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
1196                                     rc = 1;
1197                                     return rc;
1198                                 }
1199                             }
1200                             catch (std::exception& ex)
1201                             {
1202                                 err = ex.what();
1203                                 rc = 1;
1204                                 return rc;
1205                             }
1206                         }
1207 
1208                         for ( uint32_t i = 0; i < origVals.size(); i++ )
1209                         {
1210                             indata = origVals[i];
1211 
1212                             if ( indata.length() == 0 )
1213                                 isNULL = true;
1214                             else
1215                                 isNULL = false;
1216 
1217                             //check if autoincrement column and value is 0 or null
1218                             if (colType.autoincrement && ( isNULL || (indata.compare("0") == 0)))
1219                             {
1220                                 ostringstream oss;
1221                                 oss << nextVal++;
1222                                 indata = oss.str();
1223                                 isNULL = false;
1224                             }
1225 
1226                             if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
1227                             {
1228                                 if (isNULL && colType.defaultValue.empty()) //error out
1229                                 {
1230                                     Message::Args args;
1231                                     args.add(tableColName.column);
1232                                     err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
1233                                     rc = 1;
1234                                     return rc;
1235                                 }
1236                                 else if (isNULL && !(colType.defaultValue.empty()))
1237                                 {
1238                                     indata = colType.defaultValue;
1239                                     isNULL = false;
1240                                 }
1241                             }
1242 
1243                             try
1244                             {
1245                                 datavalue = DataConvert::convertColumnData(colType, indata, pushWarning, insertPkg.get_TimeZone(), isNULL, false, false);
1246                             }
1247                             catch (exception&)
1248                             {
1249                                 rc = 1;
1250                                 Message::Args args;
1251                                 args.add(string("'") + indata + string("'"));
1252                                 err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
1253                             }
1254 
1255                             //@Bug 1806
1256                             if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
1257                             {
1258                                 return rc;
1259                             }
1260 
1261                             if ( pushWarning && ( rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING ) )
1262                                 rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
1263 
1264                             WriteEngine::ColTuple colTuple;
1265                             colTuple.data = datavalue;
1266 
1267                             colTuples.push_back(colTuple);
1268                             //@Bug 2515. Only pass string values to write engine
1269                             dicStrings.push_back( tmpStr );
1270                         }
1271 
1272                         colValuesList.push_back(colTuples);
1273                         dicStringList.push_back( dicStrings );
1274                     }
1275 
1276                     ++row_iterator;
1277                 }
1278 
1279                 if (pushWarning)
1280                 {
1281                     colNames.push_back(tableColName.column);
1282                     isWarningSet = true;
1283                 }
1284             }
1285         }
1286         catch (std::exception& ex)
1287         {
1288             err = ex.what();
1289             rc = 1;
1290             return rc;
1291         }
1292     }
1293 
1294     // call the write engine to write the rows
1295     int error = NO_ERROR;
1296 
1297     //fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
1298     //cout << "Batch inserting a row with transaction id " << txnid.id << endl;
1299     if (colValuesList.size() > 0)
1300     {
1301         if (colValuesList[0].size() > 0)
1302         {
1303             /* Begin-Disable use of MetaFile for bulk rollback support;
1304                Use alternate call below that passes 0 ptr for RBMetaWriter
1305             			if (NO_ERROR !=
1306             			(error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList,
1307             						dbRootExtTrackerVec, fRBMetaWriter.get(), bFirstExtentOnThisPM, isInsertSelect, 0, roPair.objnum, fIsFirstBatchPm)))
1308             End-Disable use of MetaFile for bulk rollback support
1309             */
1310 
1311             if (NO_ERROR !=
1312                     (error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList,
1313                              dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
1314             {
1315                 if (error == ERR_BRM_DEAD_LOCK)
1316                 {
1317                     rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
1318                     WErrorCodes ec;
1319                     err = ec.errorString(error);
1320                 }
1321                 else if ( error == ERR_BRM_VB_OVERFLOW )
1322                 {
1323                     rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
1324                     err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
1325                 }
1326                 else
1327                 {
1328                     rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
1329                     WErrorCodes ec;
1330                     err = ec.errorString(error);
1331                 }
1332             }
1333         }
1334     }
1335 
1336     if (fIsFirstBatchPm && isAutocommitOn)
1337     {
1338         //fWEWrapper.writeVBEnd(txnid.id, rangeList);
1339         fIsFirstBatchPm = false;
1340     }
1341     else if (fIsFirstBatchPm)
1342     {
1343         fIsFirstBatchPm = false;
1344     }
1345 
1346     if ( isWarningSet && ( rc == NO_ERROR ) )
1347     {
1348         rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
1349         //cout << "Got warning" << endl;
1350         Message::Args args;
1351         string cols = "'" + colNames[0] + "'";
1352 
1353         for (unsigned i = 1; i < colNames.size(); i++)
1354         {
1355             cols = cols + ", " +  "'" + colNames[i] + "'";
1356         }
1357 
1358         args.add(cols);
1359         err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
1360 
1361         // Strict mode enabled, so rollback on warning
1362         if (insertPkg.get_isWarnToError())
1363         {
1364             string applName ("BatchInsert");
1365             fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(),
1366                                     applName, false, err);
1367             BulkRollbackMgr::deleteMetaFile( tblOid );
1368         }
1369     }
1370 
1371     // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
1372     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
1373     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
1374     return rc;
1375 }
1376 
processBatchInsertBinary(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId)1377 uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId)
1378 {
1379     int rc = 0;
1380     //cout << "processBatchInsert received bytestream length " << bs.length() << endl;
1381 
1382     ByteStream::quadbyte tmp32;
1383     ByteStream::byte tmp8;
1384     bs >> tmp32;
1385     //cout << "processBatchInsert got transaction id " << tmp32 << endl;
1386     bs >> PMId;
1387     //cout << "processBatchInsert gor PMId " << PMId << endl;
1388     uint32_t sessionId;
1389     bs >> sessionId;
1390     //cout << " processBatchInsert for session " << sessionId << endl;
1391     bool isAutocommitOn;
1392     bs >> tmp8;
1393     isAutocommitOn = tmp8;
1394 
1395     if (idbdatafile::IDBPolicy::useHdfs())
1396         isAutocommitOn = true;
1397 
1398     //cout << "This session isAutocommitOn is " << isAutocommitOn << endl;
1399     BRM::TxnID txnid;
1400     txnid.id  = tmp32;
1401     txnid.valid = true;
1402     bool isInsertSelect;
1403     bs >> tmp8;
1404     // For insert select, skip the hwm block and start inserting from the next block
1405     // to avoid self insert issue.
1406     //For batch insert: if not first batch, use the saved last rid to start adding rows.
1407     isInsertSelect = tmp8;
1408 
1409     WriteEngine::ColStructList colStructs;
1410     WriteEngine::DctnryStructList dctnryStructList;
1411     WriteEngine::DctnryValueList dctnryValueList;
1412     std::vector<uint64_t> colValuesList;
1413     WriteEngine::DictStrList dicStringList ;
1414     CalpontSystemCatalog::TableName tableName;
1415     CalpontSystemCatalog::TableColName tableColName;
1416     bs >> tableColName.table;
1417     bs >> tableColName.schema;
1418     tableName.table = tableColName.table;
1419     tableName.schema = tableColName.schema;
1420     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
1421     systemCatalogPtr->identity(CalpontSystemCatalog::EC);
1422     CalpontSystemCatalog::ROPair roPair;
1423     CalpontSystemCatalog::RIDList ridList;
1424     CalpontSystemCatalog::DictOIDList dictOids;
1425 
1426     try
1427     {
1428         ridList = systemCatalogPtr->columnRIDs(tableName, true);
1429         roPair = systemCatalogPtr->tableRID( tableName);
1430     }
1431     catch (std::exception& ex)
1432     {
1433         err = ex.what();
1434         rc = 1;
1435         return rc;
1436     }
1437 
1438 
1439     std::vector<OID>    dctnryStoreOids(ridList.size()) ;
1440     std::vector<Column> columns;
1441     DctnryStructList dctnryList;
1442     std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
1443 
1444     uint32_t  tblOid = roPair.objnum;
1445     CalpontSystemCatalog::ColType colType;
1446     std::vector<DBRootExtentInfo> colDBRootExtentInfo;
1447     bool bFirstExtentOnThisPM = false;
1448     Convertor convertor;
1449 
1450     if ( fIsFirstBatchPm )
1451     {
1452         dbRootExtTrackerVec.clear();
1453 
1454         if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
1455             fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
1456 
1457         fWEWrapper.setIsInsert(true);
1458         fWEWrapper.setBulkFlag(true);
1459         fWEWrapper.setTransId(txnid.id);
1460 
1461         try
1462         {
1463             // First gather HWM BRM information for all columns
1464             std::vector<int> colWidths;
1465 
1466             for (unsigned i = 0; i < ridList.size(); i++)
1467             {
1468                 rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
1469                 //need handle error
1470 
1471                 CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
1472                 colWidths.push_back( convertor.getCorrectRowWidth(
1473                                          colType2.colDataType, colType2.colWidth) );
1474             }
1475 
1476             for (unsigned i = 0; i < ridList.size(); i++)
1477             {
1478                 // Find DBRoot/segment file where we want to start adding rows
1479                 colType = systemCatalogPtr->colType(ridList[i].objnum);
1480                 boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker (new DBRootExtentTracker(ridList[i].objnum,
1481                         colWidths, dbRootHWMInfoColVec, i, 0) );
1482                 dbRootExtTrackerVec.push_back( pDBRootExtentTracker );
1483                 DBRootExtentInfo dbRootExtent;
1484                 std::string trkErrMsg;
1485                 bool bEmptyPM;
1486 
1487                 if (i == 0)
1488                 {
1489                     rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM, trkErrMsg);
1490                     /*	cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " << (int)bFirstExtentOnThisPM <<
1491                     	" oid:dbroot:hwm = " << ridList[i].objnum << ":"<<dbRootExtent.fDbRoot << ":"
1492                     	<<":"<<dbRootExtent.fLocalHwm << " err = " << trkErrMsg << endl; */
1493                 }
1494                 else
1495                     pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
1496 
1497 
1498                 colDBRootExtentInfo.push_back(dbRootExtent);
1499 
1500                 Column aColumn;
1501                 aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
1502                 aColumn.colDataType = colType.colDataType;
1503                 aColumn.compressionType = colType.compressionType;
1504                 aColumn.dataFile.oid = ridList[i].objnum;
1505                 aColumn.dataFile.fPartition = dbRootExtent.fPartition;
1506                 aColumn.dataFile.fSegment = dbRootExtent.fSegment;
1507                 aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
1508                 aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
1509                 columns.push_back(aColumn);
1510 
1511                 if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
1512                 {
1513                     DctnryStruct aDctnry;
1514                     aDctnry.dctnryOid = colType.ddn.dictOID;
1515                     aDctnry.fColPartition = dbRootExtent.fPartition;
1516                     aDctnry.fColSegment = dbRootExtent.fSegment;
1517                     aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
1518                     dctnryList.push_back(aDctnry);
1519                 }
1520 
1521                 if (colType.ddn.dictOID > 0)
1522                 {
1523                     dctnryStoreOids[i] = colType.ddn.dictOID;
1524                 }
1525                 else
1526                 {
1527                     dctnryStoreOids[i] = 0;
1528                 }
1529             }
1530         }
1531         catch (std::exception& ex)
1532         {
1533             err = ex.what();
1534             rc = 1;
1535             return rc;
1536         }
1537 
1538         //@Bug 5996 validate hwm before starts
1539         rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting");
1540 
1541         if ( rc != 0)
1542         {
1543             WErrorCodes ec;
1544             err = ec.errorString(rc);
1545             err += " Check err.log for detailed information.";
1546             fIsFirstBatchPm = false;
1547             rc = 1;
1548             return rc;
1549         }
1550     }
1551 
1552     std::vector<BRM::LBIDRange>   rangeList;
1553 
1554     // use of MetaFile for bulk rollback support
1555     if ( fIsFirstBatchPm && isAutocommitOn)
1556     {
1557         //save meta data, version last block for each dbroot at the start of batch insert
1558         try
1559         {
1560             fRBMetaWriter->init(tblOid, tableName.table);
1561             fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
1562 
1563             //cout << "Saved meta files" << endl;
1564             if (!bFirstExtentOnThisPM)
1565             {
1566                 //cout << "Backing up hwm chunks" << endl;
1567                 for (unsigned i = 0; i < dctnryList.size(); i++) //back up chunks for compressed dictionary
1568                 {
1569                     // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
1570                     fRBMetaWriter->backupDctnryHWMChunk(
1571                         dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment);
1572                 }
1573             }
1574         }
1575         catch (WeException& ex) // catch exception to close file, then rethrow
1576         {
1577             rc = 1;
1578             err =  ex.what();
1579         }
1580 
1581         //Do versioning. Currently, we only version columns, not strings. If there is a design change, this will need to be re-visited
1582         if ( rc != 0)
1583             return rc;
1584 
1585     }
1586 
1587     std::vector<string> colNames;
1588     bool isWarningSet = false;
1589     uint32_t columnCount;
1590     bs >> columnCount;
1591 
1592     if (columnCount)
1593     {
1594         try
1595         {
1596             for (uint32_t current_column = 0; current_column < columnCount; current_column++)
1597             {
1598                 uint32_t tmp32;
1599                 std::string colName;
1600                 bs >> tmp32;
1601                 bs >> colName;
1602                 colNames.push_back(colName);
1603                 CalpontSystemCatalog::OID oid = tmp32;
1604 
1605                 CalpontSystemCatalog::ColType colType;
1606                 colType = systemCatalogPtr->colType(oid);
1607 
1608                 WriteEngine::ColStruct colStruct;
1609                 WriteEngine::DctnryStruct dctnryStruct;
1610                 colStruct.dataOid = oid;
1611                 colStruct.tokenFlag = false;
1612                 colStruct.fCompressionType = colType.compressionType;
1613 
1614                 // Token
1615                 if ( isDictCol(colType) )
1616                 {
1617                     colStruct.colWidth = 8;
1618                     colStruct.tokenFlag = true;
1619                 }
1620                 else
1621                 {
1622                     colStruct.colWidth = colType.colWidth;
1623                 }
1624 
1625                 colStruct.colDataType = colType.colDataType;
1626 
1627                 if (colStruct.tokenFlag)
1628                 {
1629                     dctnryStruct.dctnryOid = colType.ddn.dictOID;
1630                     dctnryStruct.columnOid = colStruct.dataOid;
1631                     dctnryStruct.fCompressionType = colType.compressionType;
1632                     dctnryStruct.colWidth = colType.colWidth;
1633                 }
1634                 else
1635                 {
1636                     dctnryStruct.dctnryOid = 0;
1637                     dctnryStruct.columnOid = colStruct.dataOid;
1638                     dctnryStruct.fCompressionType = colType.compressionType;
1639                     dctnryStruct.colWidth = colType.colWidth;
1640                 }
1641 
1642                 colStructs.push_back(colStruct);
1643                 dctnryStructList.push_back(dctnryStruct);
1644 
1645             }
1646         }
1647         catch (std::exception& ex)
1648         {
1649             err = ex.what();
1650             rc = 1;
1651             return rc;
1652         }
1653 
1654         std::string tmpStr("");
1655         uint32_t valuesPerColumn;
1656         bs >> valuesPerColumn;
1657         colValuesList.reserve(columnCount * valuesPerColumn);
1658 
1659         try
1660         {
1661             bool pushWarning = false;
1662 
1663             for (uint32_t j = 0; j < columnCount; j++)
1664             {
1665                 WriteEngine::DctColTupleList dctColTuples;
1666                 tableColName.column = colNames[j];
1667                 CalpontSystemCatalog::OID oid = colStructs[j].dataOid;
1668 
1669                 CalpontSystemCatalog::ColType colType;
1670                 colType = systemCatalogPtr->colType(oid);
1671 
1672                 bool isNULL = false;
1673                 WriteEngine::dictStr dicStrings;
1674 
1675                 // token
1676                 if ( isDictCol(colType) )
1677                 {
1678                     for ( uint32_t i = 0; i < valuesPerColumn; i++ )
1679                     {
1680                         bs >> tmp8;
1681                         isNULL = tmp8;
1682                         bs >> tmpStr;
1683 
1684                         if ( tmpStr.length() == 0 )
1685                             isNULL = true;
1686 
1687                         if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
1688                         {
1689                             if (isNULL && colType.defaultValue.empty()) //error out
1690                             {
1691                                 Message::Args args;
1692                                 args.add(tableColName.column);
1693                                 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
1694                                 rc = 1;
1695                                 return rc;
1696                             }
1697                             else if (isNULL && !(colType.defaultValue.empty()))
1698                             {
1699                                 tmpStr = colType.defaultValue;
1700                             }
1701                         }
1702 
1703                         if ( tmpStr.length() > (unsigned int)colType.colWidth )
1704                         {
1705                             tmpStr = tmpStr.substr(0, colType.colWidth);
1706 
1707                             if ( !pushWarning )
1708                                 pushWarning = true;
1709                         }
1710 
1711                         colValuesList.push_back(0);
1712                         //@Bug 2515. Only pass string values to write engine
1713                         dicStrings.push_back( tmpStr );
1714                     }
1715 
1716                     //@Bug 2515. Only pass string values to write engine
1717                     dicStringList.push_back( dicStrings );
1718                 }
1719                 else
1720                 {
1721                     string x;
1722                     //scan once to check how many autoincrement value needed
1723                     uint32_t nextValNeeded = 0;
1724                     uint64_t nextVal = 1;
1725 
1726                     if (colType.autoincrement)
1727                     {
1728                         try
1729                         {
1730                             nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
1731                             fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
1732                         }
1733                         catch (std::exception& ex)
1734                         {
1735                             err = ex.what();
1736                             rc = 1;
1737                             return rc;
1738                         }
1739                     }
1740 
1741                     for ( uint32_t i = 0; i < valuesPerColumn; i++ )
1742                     {
1743                         bs >> tmp8;
1744                         isNULL = tmp8;
1745 
1746                         uint8_t val8;
1747                         uint16_t val16;
1748                         uint32_t val32;
1749                         uint64_t val64;
1750                         uint64_t colValue;
1751                         float valF;
1752                         double valD;
1753                         std::string valStr;
1754                         bool valZero = false; // Needed for autoinc check
1755 
1756                         switch (colType.colDataType)
1757                         {
1758                             case execplan::CalpontSystemCatalog::TINYINT:
1759                             case execplan::CalpontSystemCatalog::UTINYINT:
1760                                 bs >> val8;
1761 
1762                                 if (val8 == 0)
1763                                     valZero = true;
1764 
1765                                 colValue = val8;
1766                                 break;
1767 
1768                             case execplan::CalpontSystemCatalog::SMALLINT:
1769                             case execplan::CalpontSystemCatalog::USMALLINT:
1770                                 bs >> val16;
1771 
1772                                 if (val16 == 0)
1773                                     valZero = true;
1774 
1775                                 colValue = val16;
1776                                 break;
1777 
1778                             case execplan::CalpontSystemCatalog::DATE:
1779                             case execplan::CalpontSystemCatalog::MEDINT:
1780                             case execplan::CalpontSystemCatalog::INT:
1781                             case execplan::CalpontSystemCatalog::UMEDINT:
1782                             case execplan::CalpontSystemCatalog::UINT:
1783                                 bs >> val32;
1784 
1785                                 if (val32 == 0)
1786                                     valZero = true;
1787 
1788                                 colValue = val32;
1789                                 break;
1790 
1791                             case execplan::CalpontSystemCatalog::BIGINT:
1792                             case execplan::CalpontSystemCatalog::DATETIME:
1793                             case execplan::CalpontSystemCatalog::TIME:
1794                             case execplan::CalpontSystemCatalog::TIMESTAMP:
1795                             case execplan::CalpontSystemCatalog::UBIGINT:
1796                                 bs >> val64;
1797 
1798                                 if (val64 == 0)
1799                                     valZero = true;
1800 
1801                                 colValue = val64;
1802                                 break;
1803 
1804                             case execplan::CalpontSystemCatalog::DECIMAL:
1805                                 switch (colType.colWidth)
1806                                 {
1807                                     case 1:
1808                                     {
1809                                         bs >> val8;
1810                                         colValue = val8;
1811                                         break;
1812                                     }
1813 
1814                                     case 2:
1815                                     {
1816                                         bs >> val16;
1817                                         colValue = val16;
1818                                         break;
1819                                     }
1820 
1821                                     case 4:
1822                                     {
1823                                         bs >> val32;
1824                                         colValue = val32;
1825                                         break;
1826                                     }
1827 
1828                                     default:
1829                                     {
1830                                         bs >> val64;
1831                                         colValue = val64;
1832                                         break;
1833                                     }
1834                                 }
1835 
1836                                 break;
1837 
1838                             case execplan::CalpontSystemCatalog::UDECIMAL:
1839 
1840                                 // UDECIMAL numbers may not be negative
1841                                 if (colType.colWidth == 1)
1842                                 {
1843                                     bs >> val8;
1844 
1845                                     // FIXME: IDK what would it mean if valN are unsigned
1846                                     if (utils::is_negative(val8) &&
1847                                             val8 != joblist::TINYINTEMPTYROW &&
1848                                             val8 != joblist::TINYINTNULL)
1849                                     {
1850                                         val8 = 0;
1851                                         pushWarning = true;
1852                                     }
1853 
1854                                     colValue = val8;
1855                                 }
1856                                 else if (colType.colWidth == 2)
1857                                 {
1858                                     bs >> val16;
1859 
1860                                     if (utils::is_negative(val16) &&
1861                                             val16 != joblist::SMALLINTEMPTYROW &&
1862                                             val16 != joblist::SMALLINTNULL)
1863                                     {
1864                                         val16 = 0;
1865                                         pushWarning = true;
1866                                     }
1867 
1868                                     colValue = val16;
1869                                 }
1870                                 else if (colType.colWidth == 4)
1871                                 {
1872                                     bs >> val32;
1873 
1874                                     if (utils::is_negative(val32) &&
1875                                             val32 != joblist::INTEMPTYROW &&
1876                                             val32 != joblist::INTNULL)
1877                                     {
1878                                         val32 = 0;
1879                                         pushWarning = true;
1880                                     }
1881 
1882                                     colValue = val32;
1883                                 }
1884                                 else if (colType.colWidth == 8)
1885                                 {
1886                                     bs >> val64;
1887 
1888                                     if (utils::is_negative(val64) &&
1889                                             val64 != joblist::BIGINTEMPTYROW &&
1890                                             val64 != joblist::BIGINTNULL)
1891                                     {
1892                                         val64 = 0;
1893                                         pushWarning = true;
1894                                     }
1895 
1896                                     colValue = val64;
1897                                 }
1898 
1899                                 break;
1900 
1901                             case execplan::CalpontSystemCatalog::DOUBLE:
1902                                 bs >> val64;
1903                                 colValue = val64;
1904                                 break;
1905 
1906                             case execplan::CalpontSystemCatalog::UDOUBLE:
1907                                 bs >> val64;
1908                                 memcpy(&valD, &val64, 8);
1909 
1910                                 if (valD < 0.0 && valD != joblist::DOUBLEEMPTYROW && valD != joblist::DOUBLENULL)
1911                                 {
1912                                     valD = 0.0;
1913                                     pushWarning = true;
1914                                 }
1915 
1916                                 colValue = val64;
1917                                 break;
1918 
1919                             case execplan::CalpontSystemCatalog::FLOAT:
1920                                 bs >> val32;
1921                                 colValue = val32;
1922                                 break;
1923 
1924                             case execplan::CalpontSystemCatalog::UFLOAT:
1925                                 bs >> val32;
1926                                 memcpy(&valF, &val32, 4);
1927 
1928                                 if (valF < 0.0 && valF != joblist::FLOATEMPTYROW && valF != joblist::FLOATNULL)
1929                                 {
1930                                     valF = 0.0;
1931                                     pushWarning = true;
1932                                 }
1933 
1934                                 colValue = val32;
1935                                 break;
1936 
1937                             case execplan::CalpontSystemCatalog::CHAR:
1938                             case execplan::CalpontSystemCatalog::VARCHAR:
1939                             case execplan::CalpontSystemCatalog::TEXT:
1940                             case execplan::CalpontSystemCatalog::BLOB:
1941                                 bs >> valStr;
1942 
1943                                 if (valStr.length() > (unsigned int)colType.colWidth)
1944                                 {
1945                                     valStr = valStr.substr(0, colType.colWidth);
1946                                     pushWarning = true;
1947                                 }
1948                                 else
1949                                 {
1950                                     if ( (unsigned int)colType.colWidth > valStr.length())
1951                                     {
1952                                         //Pad null character to the string
1953                                         valStr.resize(colType.colWidth, 0);
1954                                     }
1955                                 }
1956 
1957                                 // FIXME: colValue is uint64_t (8 bytes)
1958                                 memcpy(&colValue, valStr.c_str(), valStr.length());
1959                                 break;
1960 
1961                             default:
1962                                 rc = 1;
1963                                 err = IDBErrorInfo::instance()->errorMsg(ERR_DATATYPE_NOT_SUPPORT);
1964                                 break;
1965                         }
1966 
1967                         //check if autoincrement column and value is 0 or null
1968                         if (colType.autoincrement && ( isNULL || valZero))
1969                         {
1970                             ostringstream oss;
1971                             oss << nextVal++;
1972                             isNULL = false;
1973 
1974                             try
1975                             {
1976                                 nextValNeeded++;
1977                                 bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);
1978 
1979                                 if (!reserved)
1980                                 {
1981                                     err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
1982                                     rc = 1;
1983                                     return rc;
1984                                 }
1985                             }
1986                             catch (std::exception& ex)
1987                             {
1988                                 err = ex.what();
1989                                 rc = 1;
1990                                 return rc;
1991                             }
1992 
1993                             colValue = nextVal;
1994                         }
1995 
1996                         if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
1997                         {
1998                             if (isNULL && colType.defaultValue.empty()) //error out
1999                             {
2000                                 Message::Args args;
2001                                 args.add(tableColName.column);
2002                                 err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
2003                                 rc = 1;
2004                                 return rc;
2005                             }
2006                             else if (isNULL && !(colType.defaultValue.empty()))
2007                             {
2008                                 memcpy(&colValue, colType.defaultValue.c_str(), colType.defaultValue.length());
2009                                 isNULL = false;
2010                             }
2011                         }
2012 
2013                         //@Bug 1806
2014                         if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
2015                         {
2016                             return rc;
2017                         }
2018 
2019                         if ( pushWarning && ( rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING ) )
2020                             rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
2021 
2022 
2023                         colValuesList.push_back(colValue);
2024                         //@Bug 2515. Only pass string values to write engine
2025                         dicStrings.push_back( valStr );
2026                     }
2027 
2028                     dicStringList.push_back( dicStrings );
2029                 }
2030 
2031                 if (pushWarning)
2032                 {
2033                     colNames.push_back(tableColName.column);
2034                     isWarningSet = true;
2035                 }
2036             }
2037         }
2038         catch (std::exception& ex)
2039         {
2040             err = ex.what();
2041             rc = 1;
2042             return rc;
2043         }
2044     }
2045 
2046     // call the write engine to write the rows
2047     int error = NO_ERROR;
2048 
2049     //fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
2050     //cout << "Batch inserting a row with transaction id " << txnid.id << endl;
2051     if (colValuesList.size() > 0)
2052     {
2053         if (NO_ERROR !=
2054                 (error = fWEWrapper.insertColumnRecsBinary(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList,
2055                          dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
2056         {
2057             if (error == ERR_BRM_DEAD_LOCK)
2058             {
2059                 rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
2060                 WErrorCodes ec;
2061                 err = ec.errorString(error);
2062             }
2063             else if ( error == ERR_BRM_VB_OVERFLOW )
2064             {
2065                 rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
2066                 err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
2067             }
2068             else
2069             {
2070                 rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
2071                 WErrorCodes ec;
2072                 err = ec.errorString(error);
2073             }
2074         }
2075     }
2076 
2077     if (fIsFirstBatchPm && isAutocommitOn)
2078     {
2079         //fWEWrapper.writeVBEnd(txnid.id, rangeList);
2080         fIsFirstBatchPm = false;
2081     }
2082     else if (fIsFirstBatchPm)
2083     {
2084         fIsFirstBatchPm = false;
2085     }
2086 
2087     if ( isWarningSet && ( rc == NO_ERROR ) )
2088     {
2089         rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
2090         //cout << "Got warning" << endl;
2091         Message::Args args;
2092         string cols = "'" + colNames[0] + "'";
2093 
2094         for (unsigned i = 1; i < colNames.size(); i++)
2095         {
2096             cols = cols + ", " +  "'" + colNames[i] + "'";
2097         }
2098 
2099         args.add(cols);
2100         err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
2101 
2102    }
2103 
2104     // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
2105     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
2106     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
2107     return rc;
2108 }
2109 
2110 
commitBatchAutoOn(messageqcpp::ByteStream & bs,std::string & err)2111 uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err)
2112 {
2113     uint8_t rc = 0;
2114     //need to commit the versioned blocks, set hwm, update casual partition, send back to DMLProc to set them
2115     //cout << " in commiting autocommit on batch insert " << endl;
2116     uint32_t tmp32, tableOid, sessionId;
2117     int txnID;
2118 
2119     bs >> tmp32;
2120     txnID = tmp32;
2121     bs >> tmp32;
2122     tableOid = tmp32;
2123     bs >> tmp32;
2124     sessionId = tmp32;
2125 
2126     BRM::DBRM dbrm;
2127 
2128     std::vector<BRM::BulkSetHWMArg> setHWMArgs;
2129     TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
2130     ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
2131     ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
2132     ColExtsInfo::iterator aIt;
2133     BulkSetHWMArg aArg;
2134     std::vector<BRM::FileInfo> files;
2135     BRM::FileInfo aFile;
2136 
2137     while (it != colsExtsInfoMap.end())
2138     {
2139         aIt = (it->second).begin();
2140         aArg.oid = it->first;
2141         aFile.oid = it->first;
2142 
2143         //cout << "OID:" << aArg.oid;
2144         while (aIt != (it->second).end())
2145         {
2146             aArg.partNum = aIt->partNum;
2147             aArg.segNum = aIt->segNum;
2148             aArg.hwm = aIt->hwm;
2149 
2150             if (!aIt->isDict)
2151                 setHWMArgs.push_back(aArg);
2152 
2153             aFile.partitionNum = aIt->partNum;
2154             aFile.dbRoot = aIt->dbRoot;
2155             aFile.segmentNum = aIt->segNum;
2156             aFile.compType = aIt->compType;
2157             //cout <<"Added to files oid:dbroot:part:seg:compType = " << aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
2158             //<<":"<<aFile.compType <<endl;
2159             files.push_back(aFile);
2160             aIt++;
2161         }
2162 
2163         it++;
2164     }
2165 
2166     bs.restart();
2167     //cout << " serialized setHWMArgs size = " << setHWMArgs.size() << endl;
2168     serializeInlineVector (bs, setHWMArgs);
2169 
2170     //flush files
2171     //cout << "flush files when autocommit on" << endl;
2172     fWEWrapper.setIsInsert(true);
2173     fWEWrapper.setBulkFlag(true);
2174 
2175     std::map<uint32_t, uint32_t> oids;
2176     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
2177         CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
2178 
2179     CalpontSystemCatalog::TableName aTableName =  systemCatalogPtr->tableName(tableOid);
2180     CalpontSystemCatalog::RIDList ridList;
2181 
2182     try
2183     {
2184         ridList = systemCatalogPtr->columnRIDs(aTableName, true);
2185     }
2186     catch (std::exception& ex)
2187     {
2188         err = ex.what();
2189         rc = 1;
2190         return rc;
2191     }
2192 
2193     for (unsigned i = 0; i < ridList.size(); i++)
2194     {
2195         oids[ridList[i].objnum] = ridList[i].objnum;
2196     }
2197 
2198     CalpontSystemCatalog::DictOIDList dictOids;
2199 
2200     try
2201     {
2202         dictOids = systemCatalogPtr->dictOIDs(aTableName);
2203     }
2204     catch (std::exception& ex)
2205     {
2206         err = ex.what();
2207         rc = 1;
2208         return rc;
2209     }
2210 
2211     for (unsigned i = 0; i < dictOids.size(); i++)
2212     {
2213         oids[dictOids[i].dictOID] = dictOids[i].dictOID;
2214     }
2215 
2216     fWEWrapper.setTransId(txnID);
2217 
2218     fWEWrapper.setIsInsert(false);
2219     fWEWrapper.setBulkFlag(false);
2220 
2221     fIsFirstBatchPm = true;
2222 
2223     if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0) )
2224         cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
2225 
2226     TableMetaData::removeTableMetaData(tableOid);
2227 
2228     // MCOL-1160 For API bulk insert flush the PrimProc cached dictionary
2229     // blocks tounched
2230     std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
2231     mapIter = fWEWrapper.getDictMap().find(txnID);
2232 
2233     if (mapIter != fWEWrapper.getDictMap().end())
2234     {
2235         std::set<BRM::LBID_t>::iterator lbidIter;
2236         std::vector<BRM::LBID_t> dictFlushBlks;
2237         cerr << "API Flushing blocks: ";
2238         for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++)
2239         {
2240             cerr << *lbidIter << ", ";
2241             dictFlushBlks.push_back((*lbidIter));
2242         }
2243         cerr << endl;
2244         cacheutils::flushPrimProcAllverBlocks(dictFlushBlks);
2245         fWEWrapper.getDictMap().erase(txnID);
2246     }
2247 
2248     // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
2249     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
2250     CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
2251     return rc;
2252 }
2253 
processBatchInsertHwm(messageqcpp::ByteStream & bs,std::string & err)2254 uint8_t WE_DMLCommandProc::processBatchInsertHwm(messageqcpp::ByteStream& bs, std::string& err)
2255 {
2256     uint8_t tmp8, rc = 0;
2257     err.clear();
2258     //set hwm for autocommit off
2259     uint32_t tmp32, tableOid;
2260     int txnID;
2261     bool isAutoCommitOn;
2262 
2263     bs >> tmp32;
2264     txnID = tmp32;
2265     bs >> tmp8;
2266     isAutoCommitOn = (tmp8 != 0);
2267     bs >> tmp32;
2268     tableOid = tmp32;
2269     bs >> tmp8;
2270     //cout << "processBatchInsertHwm: tranid:isAutoCommitOn = " <<txnID <<":"<< (int)isAutoCommitOn << endl;
2271     std::vector<BRM::FileInfo> files;
2272     std::vector<BRM::OID_t>  oidsToFlush;
2273 
2274     BRM::FileInfo aFile;
2275     //BRM::FileInfo curFile;
2276     TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
2277     ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
2278     ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
2279     ColExtsInfo::iterator aIt;
2280     CalpontSystemCatalog::RIDList ridList;
2281     CalpontSystemCatalog::ROPair roPair;
2282     std::vector<DBRootExtentInfo> colDBRootExtentInfo;
2283     DBRootExtentInfo aExtentInfo;
2284 
2285     while (it != colsExtsInfoMap.end())
2286     {
2287         aIt = (it->second).begin();
2288         aFile.oid = it->first;
2289         oidsToFlush.push_back(aFile.oid);
2290         roPair.objnum = aFile.oid;
2291         aExtentInfo.fPartition = 0;
2292         aExtentInfo.fDbRoot = 0;
2293         aExtentInfo.fSegment = 0;
2294         aExtentInfo.fLocalHwm = 0;
2295         bool isDict = false;
2296 
2297         while (aIt != (it->second).end())
2298         {
2299             aFile.partitionNum = aIt->partNum;
2300             aFile.dbRoot = aIt->dbRoot;
2301             aFile.segmentNum = aIt->segNum;
2302             aFile.compType = aIt->compType;
2303             files.push_back(aFile);
2304 
2305             if (!aIt->isDict)
2306             {
2307                 if ((aIt->partNum > aExtentInfo.fPartition) || ((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum > aExtentInfo.fSegment)) ||
2308                         ((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum == aExtentInfo.fSegment) && (aIt->segNum > aExtentInfo.fLocalHwm )))
2309                 {
2310                     aExtentInfo.fPartition = aIt->partNum;
2311                     aExtentInfo.fDbRoot = aIt->dbRoot;
2312                     aExtentInfo.fSegment = aIt->segNum;
2313                     aExtentInfo.fLocalHwm = aIt->hwm;
2314                 }
2315             }
2316             else
2317             {
2318                 isDict = true;
2319             }
2320 
2321             aIt++;
2322         }
2323 
2324         if (!isDict)
2325         {
2326             ridList.push_back(roPair);
2327             colDBRootExtentInfo.push_back(aExtentInfo);
2328         }
2329 
2330         it++;
2331     }
2332 
2333     //@Bug 5996. Validate hwm before set them
2334     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(0);
2335     systemCatalogPtr->identity(CalpontSystemCatalog::EC);
2336 
2337     try
2338     {
2339         CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid);
2340         ridList = systemCatalogPtr->columnRIDs(tableName);
2341     }
2342     catch (exception& ex)
2343     {
2344         err = ex.what();
2345         rc = 1;
2346         TableMetaData::removeTableMetaData(tableOid);
2347 
2348         fIsFirstBatchPm = true;
2349         //cout << "flush files when autocommit off" << endl;
2350         fWEWrapper.setIsInsert(true);
2351         fWEWrapper.setBulkFlag(true);
2352         return rc;
2353     }
2354 
2355     rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Ending");
2356 
2357     if ( rc != 0)
2358     {
2359         WErrorCodes ec;
2360         err = ec.errorString(rc);
2361         err += " Check err.log for detailed information.";
2362         TableMetaData::removeTableMetaData(tableOid);
2363 
2364         fIsFirstBatchPm = true;
2365         fWEWrapper.setIsInsert(true);
2366         fWEWrapper.setBulkFlag(true);
2367         rc = 1;
2368         return rc;
2369     }
2370 
2371     try
2372     {
2373         if (isAutoCommitOn)
2374         {
2375             bs.restart();
2376 
2377             if (fWEWrapper.getIsInsert())
2378             {
2379                 // @bug5333, up to here, rc == 0, but flushchunk may fail.
2380                 rc = processBatchInsertHwmFlushChunks(tableOid, txnID, files, oidsToFlush, err);
2381             }
2382 
2383             if ( tmp8 != 0)
2384                 TableMetaData::removeTableMetaData(tableOid);
2385 
2386             return rc; // will set hwm with version commit.
2387         }
2388     }
2389     catch (exception& ex)
2390     {
2391         err = ex.what();
2392         rc = 1;
2393         return rc;
2394     }
2395 
2396     // Handle case where isAutoCommitOn is false
2397     BRM::DBRM dbrm;
2398     //cout << " In processBatchInsertHwm setting hwm" << endl;
2399     std::vector<BRM::BulkSetHWMArg> setHWMArgs;
2400     it = colsExtsInfoMap.begin();
2401     BulkSetHWMArg aArg;
2402 
2403     while (it != colsExtsInfoMap.end())
2404     {
2405         aIt = (it->second).begin();
2406         aArg.oid = it->first;
2407 
2408         //cout << "for oid " << aArg.oid << endl;
2409         while (aIt != (it->second).end())
2410         {
2411             aArg.partNum = aIt->partNum;
2412             aArg.segNum = aIt->segNum;
2413             aArg.hwm = aIt->hwm;
2414 
2415             //@Bug 6029 dictionary store files already set hwm.
2416             if (!aIt->isDict)
2417                 setHWMArgs.push_back(aArg);
2418 
2419             aIt++;
2420         }
2421 
2422         it++;
2423     }
2424 
2425     TableMetaData::removeTableMetaData(tableOid);
2426 
2427     fIsFirstBatchPm = true;
2428     //cout << "flush files when autocommit off" << endl;
2429     fWEWrapper.setIsInsert(true);
2430     fWEWrapper.setBulkFlag(true);
2431 
2432     rc = processBatchInsertHwmFlushChunks(tableOid, txnID,
2433                                           files, oidsToFlush, err);
2434     bs.restart();
2435 
2436     try
2437     {
2438         serializeInlineVector (bs, setHWMArgs);
2439     }
2440     catch (exception& ex)
2441     {
2442         // Append to errmsg in case we already have an error
2443         if (err.length() > 0)
2444             err += "; ";
2445 
2446         err += ex.what();
2447         rc = 1;
2448         return rc;
2449     }
2450 
2451     //cout << "flush is called for transaction " << txnID << endl;
2452 
2453     return rc;
2454 }
2455 
2456 //------------------------------------------------------------------------------
2457 // Flush chunks for the specified table and transaction.
2458 // Also confirms changes to DB files (for hdfs).
2459 // files vector represents list of files to be purged from PrimProc cache.
2460 // oid2ToFlush  represents list of oids  to be flushed from PrimProc cache.
2461 // Afterwords, the following attributes are reset as follows:
2462 //   fWEWrapper.setIsInsert(false);
2463 //   fWEWrapper.setBulkFlag(false);
2464 //   fIsFirstBatchPm = true;
2465 // returns 0 for success; returns 1 if error occurs
2466 //------------------------------------------------------------------------------
processBatchInsertHwmFlushChunks(uint32_t tblOid,int txnID,const std::vector<BRM::FileInfo> & files,const std::vector<BRM::OID_t> & oidsToFlush,std::string & err)2467 uint8_t WE_DMLCommandProc::processBatchInsertHwmFlushChunks(
2468     uint32_t tblOid, int txnID,
2469     const std::vector<BRM::FileInfo>& files,
2470     const std::vector<BRM::OID_t>& oidsToFlush,
2471     std::string& err)
2472 {
2473     uint8_t rc = 0;
2474     std::map<uint32_t, uint32_t>       oids;
2475     CalpontSystemCatalog::TableName   aTableName;
2476     CalpontSystemCatalog::RIDList     ridList;
2477     CalpontSystemCatalog::DictOIDList dictOids;
2478 
2479     try
2480     {
2481         boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
2482             CalpontSystemCatalog::makeCalpontSystemCatalog(txnID);
2483         aTableName = systemCatalogPtr->tableName(tblOid);
2484         ridList    = systemCatalogPtr->columnRIDs(aTableName, true);
2485         dictOids   = systemCatalogPtr->dictOIDs(aTableName);
2486     }
2487     catch (exception& ex)
2488     {
2489         std::ostringstream ossErr;
2490         ossErr << "System Catalog error for table OID " << tblOid;
2491 
2492         // Include tbl name in msg unless exception occurred before we got it
2493         if (aTableName.table.length() > 0)
2494             ossErr << '(' << aTableName << ')';
2495 
2496         ossErr << "; " << ex.what();
2497         err = ossErr.str();
2498         rc = 1;
2499         return rc;
2500     }
2501 
2502     for (unsigned i = 0; i < ridList.size(); i++)
2503     {
2504         oids[ridList[i].objnum] = ridList[i].objnum;
2505     }
2506 
2507     for (unsigned i = 0; i < dictOids.size(); i++)
2508     {
2509         oids[dictOids[i].dictOID] = dictOids[i].dictOID;
2510     }
2511 
2512     fWEWrapper.setTransId(txnID);
2513 
2514     // @bug5333, up to here, rc == 0, but flushchunk may fail.
2515     rc = fWEWrapper.flushChunks(0, oids);
2516 
2517     if (rc == NO_ERROR)
2518     {
2519         // Confirm changes to db files "only" if no error up to this point
2520         if (idbdatafile::IDBPolicy::useHdfs())
2521         {
2522             std::string eMsg;
2523             ConfirmHdfsDbFile confirmHdfs;
2524             int confirmDbRc = confirmHdfs.confirmDbFileListFromMetaFile(
2525                                   tblOid, eMsg);
2526 
2527             if (confirmDbRc == NO_ERROR)
2528             {
2529                 int endDbRc = confirmHdfs.endDbFileListFromMetaFile(
2530                                   tblOid, true, eMsg);
2531 
2532                 if (endDbRc != NO_ERROR)
2533                 {
2534                     // Might want to log this error, but don't think we
2535                     // need to report as fatal, as all changes were confirmed.
2536                 }
2537 
2538                 if (files.size() > 0)
2539                     cacheutils::purgePrimProcFdCache(files,
2540                                                      Config::getLocalModuleID());
2541 
2542                 cacheutils::flushOIDsFromCache(oidsToFlush);
2543             }
2544             else
2545             {
2546                 ostringstream ossErr;
2547                 ossErr << "Error confirming changes to table " <<
2548                        aTableName << "; " << eMsg;
2549                 err = ossErr.str();
2550                 rc = 1; // reset to 1
2551             }
2552         }
2553     }
2554     else // flushChunks error
2555     {
2556         WErrorCodes ec;
2557         std::ostringstream ossErr;
2558         ossErr << "Error flushing chunks for table " << aTableName <<
2559                "; " << ec.errorString(rc);
2560         err = ossErr.str();
2561         rc = 1; // reset to 1
2562     }
2563 
2564     fWEWrapper.setIsInsert(false);
2565     fWEWrapper.setBulkFlag(false);
2566     fIsFirstBatchPm = true;
2567 
2568     return rc;
2569 }
2570 
commitBatchAutoOff(messageqcpp::ByteStream & bs,std::string & err)2571 uint8_t WE_DMLCommandProc::commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err)
2572 {
2573     uint8_t rc = 0;
2574     //commit all versioned blocks, set hwm, update casual partition
2575     return rc;
2576 }
2577 
rollbackBatchAutoOn(messageqcpp::ByteStream & bs,std::string & err)2578 uint8_t WE_DMLCommandProc::rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err)
2579 {
2580     uint8_t rc = 0;
2581     uint32_t tmp32, tableOid, sessionID;
2582     uint64_t lockID;
2583     bs >> sessionID;
2584     bs >> lockID;
2585     bs >> tmp32;
2586     tableOid = tmp32;
2587     //Bulkrollback
2588     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
2589         CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
2590     CalpontSystemCatalog::TableName aTableName;
2591 
2592     try
2593     {
2594         aTableName = systemCatalogPtr->tableName(tableOid);
2595     }
2596     catch ( ... )
2597     {
2598         err = "No such table for oid " + tableOid;
2599         rc = 1;
2600         return rc;
2601     }
2602 
2603     string table = aTableName.schema + "." + aTableName.table;
2604     string applName ("BatchInsert");
2605     rc = fWEWrapper.bulkRollback(tableOid, lockID, table, applName, false, err);
2606     fIsFirstBatchPm = true;
2607     TableMetaData::removeTableMetaData(tableOid);
2608     return rc;
2609 }
2610 
rollbackBatchAutoOff(messageqcpp::ByteStream & bs,std::string & err)2611 uint8_t WE_DMLCommandProc::rollbackBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err)
2612 {
2613     uint8_t rc = 0;
2614     //Rollbacked all versioned blocks
2615     return rc;
2616 }
processUpdate(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId,uint64_t & blocksChanged)2617 uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
2618         std::string& err,
2619         ByteStream::quadbyte& PMId,
2620         uint64_t& blocksChanged)
2621 {
2622     uint8_t rc = 0;
2623     //cout << " In processUpdate" << endl;
2624     uint32_t tmp32, sessionID;
2625     TxnID txnId;
2626     bs >> PMId;
2627     bs >> tmp32;
2628     txnId = tmp32;
2629     fWEWrapper.setIsInsert(false);
2630     fWEWrapper.setBulkFlag(false);
2631     fWEWrapper.setTransId(txnId);
2632 
2633     if (!rowGroups[txnId]) //meta data
2634     {
2635         rowGroups[txnId] = new rowgroup::RowGroup();
2636         rowGroups[txnId]->deserialize(bs);
2637         uint8_t pkgType;
2638         bs >> pkgType;
2639         cpackages[txnId].read(bs);
2640         //cout << "Processed meta data in update" << endl;
2641 
2642         rc = fWEWrapper.startTransaction(txnId);
2643 
2644         if (rc != NO_ERROR)
2645         {
2646             WErrorCodes ec;
2647             err = ec.errorString(rc);
2648         }
2649 
2650         return rc;
2651     }
2652 
2653     bool pushWarning = false;
2654     rowgroup::RGData rgData;
2655     rgData.deserialize(bs);
2656     rowGroups[txnId]->setData(&rgData);
2657     //rowGroups[txnId]->setData(const_cast<uint8_t*>(bs.buf()));
2658     //get rows and values
2659     rowgroup::Row row;
2660     rowGroups[txnId]->initRow(&row);
2661     string value("");
2662     uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount();
2663     uint32_t columnsSelected = rowGroups[txnId]->getColumnCount();
2664     std::vector<execplan::CalpontSystemCatalog::ColDataType> fetchColTypes =  rowGroups[txnId]->getColTypes();
2665     std::vector<uint32_t> fetchColScales = rowGroups[txnId]->getScale();
2666     std::vector<uint32_t> fetchColColwidths;
2667 
2668     for (uint32_t i = 0; i < columnsSelected; i++)
2669     {
2670         fetchColColwidths.push_back(rowGroups[txnId]->getColumnWidth(i));
2671     }
2672 
2673     WriteEngine::ColTupleList aColList;
2674     WriteEngine::ColTuple colTuple;
2675     WriteEngine::ColStructList colStructList;
2676     WriteEngine::ColStruct colStruct;
2677     WriteEngine::ColValueList colValueList;
2678     WriteEngine::RIDList  rowIDLists;
2679 
2680     WriteEngine::DctnryStructList dctnryStructList;
2681     WriteEngine::DctnryStruct dctnryStruct;
2682     WriteEngine::DctnryValueList dctnryValueList;
2683 
2684     CalpontSystemCatalog::TableName tableName;
2685     CalpontSystemCatalog::TableColName tableColName;
2686     DMLTable* tablePtr =  cpackages[txnId].get_Table();
2687     RowList rows = tablePtr->get_RowList();
2688     dmlpackage::ColumnList columnsUpdated = rows[0]->get_ColumnList();
2689     tableColName.table = tableName.table = tablePtr->get_TableName();
2690     tableColName.schema = tableName.schema = tablePtr->get_SchemaName();
2691     tableColName.column  = columnsUpdated[0]->get_Name();
2692 
2693     sessionID = cpackages[txnId].get_SessionID();
2694 
2695     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
2696         CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
2697     CalpontSystemCatalog::OID oid = 0;
2698     CalpontSystemCatalog::ROPair tableRO;
2699 
2700     std::string timeZone = cpackages[txnId].get_TimeZone();
2701 
2702     try
2703     {
2704         tableRO =  systemCatalogPtr->tableRID(tableName);
2705         oid = systemCatalogPtr->lookupOID(tableColName);
2706     }
2707     catch  (std::exception& ex)
2708     {
2709         rc = 1;
2710         ostringstream oss;
2711         oss << "lookupOID got exception " << ex.what() << " with column " << tableColName.schema << "." + tableColName.table << "." << tableColName.column;
2712         err = oss.str();
2713     }
2714     catch ( ... )
2715     {
2716         rc = 1;
2717         ostringstream oss;
2718         oss << "lookupOID got unknown exception with column " << tableColName.schema << "." << tableColName.table << "." << tableColName.column;
2719         err =  oss.str();
2720     }
2721 
2722     if (rc != 0)
2723         return rc;
2724 
2725     rowGroups[txnId]->getRow(0, &row);
2726     CalpontSystemCatalog::RID rid = row.getRid();
2727     uint16_t dbRoot, segment, blockNum;
2728     uint32_t partition;
2729     uint8_t extentNum;
2730     //Get the file information from rowgroup
2731     dbRoot = rowGroups[txnId]->getDBRoot();
2732     rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
2733     colStruct.fColPartition = partition;
2734     colStruct.fColSegment = segment;
2735     colStruct.fColDbRoot = dbRoot;
2736     dctnryStruct.fColPartition = partition;
2737     dctnryStruct.fColSegment = segment;
2738     dctnryStruct.fColDbRoot = dbRoot;
2739     TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableRO.objnum);
2740     //Build to be updated column structure and values
2741     int error = 0;
2742     unsigned fetchColPos = 0;
2743     bool ridsFetched = false;
2744     bool isNull = false;
2745     boost::any datavalue;
2746     int64_t intColVal = 0;
2747     //timer.start("fetch values");
2748     std::vector<string> colNames;
2749 
2750     // for query stats
2751     boost::scoped_array<CalpontSystemCatalog::ColType> colTypes(new CalpontSystemCatalog::ColType[columnsUpdated.size()]);
2752     boost::scoped_array<int> preBlkNums(new int[columnsUpdated.size()]);
2753     boost::scoped_array<OID> oids(new OID[columnsUpdated.size()]);
2754 
2755     BRMWrapper::setUseVb(true);
2756     for (unsigned int j = 0; j < columnsUpdated.size(); j++)
2757     {
2758         //timer.start("lookupsyscat");
2759         tableColName.column  = columnsUpdated[j]->get_Name();
2760 
2761         try
2762         {
2763             oids[j] = systemCatalogPtr->lookupOID(tableColName);
2764             colTypes[j] = systemCatalogPtr->colType(oids[j]);
2765         }
2766         catch  (std::exception& ex)
2767         {
2768             rc = 1;
2769             ostringstream oss;
2770             oss << "colType got exception " << ex.what() << " with column oid " << oid;
2771             err = oss.str();
2772         }
2773         catch ( ... )
2774         {
2775             rc = 1;
2776             ostringstream oss;
2777             oss << "colType got unknown exception with column oid " << oid;
2778             err = oss.str();
2779         }
2780 
2781         if (rc != 0)
2782             return rc;
2783 
2784         preBlkNums[j] = -1;
2785     }
2786 
2787     for (unsigned int j = 0; j < columnsUpdated.size(); j++)
2788     {
2789         /*		WriteEngine::ColTupleList colTupleList;
2790         		//timer.start("lookupsyscat");
2791         		tableColName.column  = columnsUpdated[j]->get_Name();
2792         		try
2793         		{
2794         			oid = systemCatalogPtr->lookupOID(tableColName);
2795         		}
2796         		catch  (std::exception& ex)
2797         		{
2798         			rc = 1;
2799         			ostringstream oss;
2800         			oss << "lookupOID got exception " << ex.what() << " with column " << tableColName.schema << "." << tableColName.table << "." << tableColName.column;
2801         			err = oss.str();
2802         		}
2803         		catch ( ... )
2804         		{
2805         			rc = 1;
2806         			ostringstream oss;
2807         			oss <<  "lookupOID got unknown exception with column " << tableColName.schema << "." << tableColName.table << "." << tableColName.column;
2808         			err = oss.str();
2809         		}
2810 
2811         		if (rc != 0)
2812         			return rc;
2813 
2814         		CalpontSystemCatalog::ColType colType;
2815         		try
2816         		{
2817         			colType = systemCatalogPtr->colType(oid);
2818         		}
2819         		catch  (std::exception& ex)
2820         		{
2821         			rc = 1;
2822         			ostringstream oss;
2823         			oss << "colType got exception " << ex.what() << " with column oid " << oid;
2824         			err = oss.str();
2825         		}
2826         		catch ( ... )
2827         		{
2828         			rc = 1;
2829         			ostringstream oss;
2830         			oss << "colType got unknown exception with column oid " << oid;
2831         			err = oss.str();
2832         		}
2833 
2834         		if (rc !=0)
2835         			return rc;
2836         		*/
2837         WriteEngine::ColTupleList colTupleList;
2838         CalpontSystemCatalog::ColType colType = colTypes[j];
2839         oid = oids[j];
2840         colStruct.dataOid = oid;
2841         colStruct.colDataType = colType.colDataType;
2842         colStruct.tokenFlag = false;
2843         colStruct.fCompressionType = colType.compressionType;
2844         tableColName.column  = columnsUpdated[j]->get_Name();
2845 
2846         if ( !ridsFetched)
2847         {
2848             // querystats
2849             uint64_t relativeRID = 0;
2850 
2851             for (unsigned i = 0; i < rowsThisRowgroup; i++)
2852             {
2853                 rowGroups[txnId]->getRow(i, &row);
2854                 rid = row.getRid();
2855                 relativeRID = rid - rowGroups[txnId]->getBaseRid();
2856                 rid = relativeRID;
2857                 convertToRelativeRid (rid, extentNum, blockNum);
2858                 rowIDLists.push_back(rid);
2859                 uint32_t colWidth = (colTypes[j].colWidth > 8 ? 8 : colTypes[j].colWidth);
2860 		int rrid = (int) relativeRID / (BYTE_PER_BLOCK / colWidth);
2861                 // populate stats.blocksChanged
2862 		if (rrid > preBlkNums[j])
2863                 {
2864 			preBlkNums[j] = rrid ;
2865                         blocksChanged++;
2866 		}
2867 
2868             }
2869 
2870             ridsFetched = true;
2871         }
2872 
2873         bool pushWarn = false;
2874         bool nameNeeded = false;
2875 
2876         if (isDictCol(colType))
2877         {
2878             colStruct.colWidth = 8;
2879             colStruct.tokenFlag = true;
2880             dctnryStruct.dctnryOid = colType.ddn.dictOID;
2881             dctnryStruct.columnOid = colStruct.dataOid;
2882             dctnryStruct.fCompressionType = colType.compressionType;
2883             dctnryStruct.colWidth = colType.colWidth;
2884 
2885             if (NO_ERROR != (error = fWEWrapper.openDctnry (txnId, dctnryStruct, false))) // @bug 5572 HDFS tmp file
2886             {
2887                 WErrorCodes ec;
2888                 err = ec.errorString(error);
2889                 rc = error;
2890                 return rc;
2891             }
2892 
2893             ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStruct.dctnryOid);
2894             ColExtsInfo::iterator it = aColExtsInfo.begin();
2895 
2896             while (it != aColExtsInfo.end())
2897             {
2898                 if ((it->dbRoot == dctnryStruct.fColDbRoot) && (it->partNum == dctnryStruct.fColPartition) && (it->segNum == dctnryStruct.fColSegment))
2899                     break;
2900 
2901                 it++;
2902             }
2903 
2904             if (it == aColExtsInfo.end()) //add this one to the list
2905             {
2906                 ColExtInfo aExt;
2907                 aExt.dbRoot = dctnryStruct.fColDbRoot;
2908                 aExt.partNum = dctnryStruct.fColPartition;
2909                 aExt.segNum = dctnryStruct.fColSegment;
2910                 aExt.compType = dctnryStruct.fCompressionType;
2911                 aExt.isDict = true;
2912                 aColExtsInfo.push_back(aExt);
2913             }
2914 
2915             aTableMetaData->setColExtsInfo(dctnryStruct.dctnryOid, aColExtsInfo);
2916 
2917 
2918             if (columnsUpdated[j]->get_isFromCol())
2919             {
2920                 for (unsigned i = 0; i < rowsThisRowgroup; i++)
2921                 {
2922 
2923                     rowGroups[txnId]->getRow(i, &row);
2924 
2925                     if (row.isNullValue(fetchColPos))
2926                     {
2927                         if ((colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
2928                         {
2929                             rc = 1;
2930                             Message::Args args;
2931                             args.add(tableColName.column);
2932                             err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
2933                             return rc;
2934                         }
2935                         else if (colType.defaultValue.length() > 0)
2936                         {
2937                             value = colType.defaultValue;
2938 
2939                             if (value.length() > (unsigned int)colType.colWidth)
2940                             {
2941                                 value = value.substr(0, colType.colWidth);
2942                                 pushWarn = true;
2943 
2944                                 if (!pushWarning)
2945                                 {
2946                                     pushWarning = true;
2947                                 }
2948 
2949                                 if (pushWarn)
2950                                     nameNeeded = true;
2951                             }
2952 
2953                             WriteEngine::DctnryTuple dctTuple;
2954                             dctTuple.sigValue = (unsigned char*)value.c_str();
2955                             dctTuple.sigSize = value.length();
2956                             dctTuple.isNull = false;
2957 
2958                             error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
2959 
2960                             if (error != NO_ERROR)
2961                             {
2962                                 fWEWrapper.closeDctnry(txnId, colType.compressionType);
2963                                 return false;
2964                             }
2965 
2966                             colTuple.data = dctTuple.token;
2967                             colTupleList.push_back (colTuple);
2968                         }
2969                         else
2970                         {
2971                             WriteEngine::Token nullToken;
2972                             colTuple.data = nullToken;
2973                             colTupleList.push_back (colTuple);
2974                         }
2975 
2976                         continue;
2977                     }
2978 
2979                     switch (fetchColTypes[fetchColPos])
2980                     {
2981                         case CalpontSystemCatalog::DATE:
2982                         {
2983                             intColVal = row.getUintField<4>(fetchColPos);
2984                             value = DataConvert::dateToString(intColVal);
2985                             break;
2986                         }
2987 
2988                         case CalpontSystemCatalog::DATETIME:
2989                         {
2990                             intColVal = row.getUintField<8>(fetchColPos);
2991                             value = DataConvert::datetimeToString(intColVal, colType.precision);
2992                             break;
2993                         }
2994 
2995                         case CalpontSystemCatalog::TIMESTAMP:
2996                         {
2997                             intColVal = row.getUintField<8>(fetchColPos);
2998                             value = DataConvert::timestampToString(intColVal, timeZone, colType.precision);
2999                             break;
3000                         }
3001 
3002                         case CalpontSystemCatalog::TIME:
3003                         {
3004                             intColVal = row.getIntField<8>(fetchColPos);
3005                             value = DataConvert::timeToString(intColVal, colType.precision);
3006                             break;
3007                         }
3008 
3009                         case CalpontSystemCatalog::CHAR:
3010                         case CalpontSystemCatalog::VARCHAR:
3011                         {
3012                             value = row.getStringField(fetchColPos);
3013                             unsigned i = strlen(value.c_str());
3014                             value = value.substr(0, i);
3015                             break;
3016                         }
3017 
3018                         case CalpontSystemCatalog::VARBINARY:
3019                         case CalpontSystemCatalog::BLOB:
3020                         case CalpontSystemCatalog::TEXT:
3021                         {
3022                             value = row.getVarBinaryStringField(fetchColPos);
3023                             break;
3024                         }
3025 
3026                         case CalpontSystemCatalog::DECIMAL:
3027                         case CalpontSystemCatalog::UDECIMAL:
3028                         {
3029                             // decimal width > 8 cannot be stored in an integer
3030                             if (fetchColColwidths[fetchColPos] > 8)
3031                             {
3032                                 value = row.getStringField(fetchColPos);
3033                                 unsigned i = strlen(value.c_str());
3034                                 value = value.substr(0, i);
3035                                 break;
3036                             }
3037 
3038                             // else
3039                             //     fall through to integer cases
3040                         }
3041                         /* fall through */
3042 
3043                         case CalpontSystemCatalog::BIGINT:
3044                         case CalpontSystemCatalog::UBIGINT:
3045                         case CalpontSystemCatalog::INT:
3046                         case CalpontSystemCatalog::UINT:
3047                         case CalpontSystemCatalog::MEDINT:
3048                         case CalpontSystemCatalog::UMEDINT:
3049                         case CalpontSystemCatalog::SMALLINT:
3050                         case CalpontSystemCatalog::USMALLINT:
3051                         case CalpontSystemCatalog::TINYINT:
3052                         case CalpontSystemCatalog::UTINYINT:
3053                         {
3054                             {
3055                                 intColVal = row.getIntField(fetchColPos);
3056 
3057                                 if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDECIMAL
3058                                         && intColVal < 0)
3059                                 {
3060                                     intColVal = 0;
3061                                 }
3062 
3063                                 if (fetchColScales[fetchColPos] <= 0)
3064                                 {
3065                                     ostringstream os;
3066 
3067                                     if (isUnsigned(fetchColTypes[fetchColPos]))
3068                                         os << static_cast<uint64_t>(intColVal);
3069                                     else
3070                                         os << intColVal;
3071 
3072                                     value = os.str();
3073                                 }
3074                                 else
3075                                 {
3076                                     const int ctmp_size = 65 + 1 + 1 + 1;
3077                                     char ctmp[ctmp_size] = {0};
3078                                     DataConvert::decimalToString(
3079                                         intColVal, fetchColScales[fetchColPos],
3080                                         ctmp, ctmp_size, fetchColTypes[fetchColPos]);
3081                                     value = ctmp;  // null termination by decimalToString
3082                                 }
3083                             }
3084                             break;
3085                         }
3086 
3087                         //In this case, we're trying to load a double output column with float data. This is the
3088                         // case when you do sum(floatcol), e.g.
3089                         case CalpontSystemCatalog::FLOAT:
3090                         case CalpontSystemCatalog::UFLOAT:
3091                         {
3092                             float dl = row.getFloatField(fetchColPos);
3093 
3094                             if (dl == std::numeric_limits<float>::infinity())
3095                                 continue;
3096 
3097                             if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0)
3098                             {
3099                                 dl = 0.0;
3100                             }
3101 
3102                             ostringstream os;
3103                             //@Bug 3350 fix the precision.
3104                             os << setprecision(7) << dl;
3105                             value = os.str();
3106                             break;
3107                         }
3108 
3109                         case CalpontSystemCatalog::DOUBLE:
3110                         case CalpontSystemCatalog::UDOUBLE:
3111                         {
3112                             double dl = row.getDoubleField(fetchColPos);
3113 
3114                             if (dl == std::numeric_limits<double>::infinity())
3115                                 continue;
3116 
3117                             if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0)
3118                             {
3119                                 dl = 0.0;
3120                             }
3121 
3122                             ostringstream os;
3123                             //@Bug 3350 fix the precision.
3124                             os << setprecision(16) << dl;
3125                             value = os.str();
3126                             break;
3127                         }
3128 
3129                         case CalpontSystemCatalog::LONGDOUBLE:
3130                         {
3131                             long double dll = row.getLongDoubleField(fetchColPos);
3132 
3133                             if (dll == std::numeric_limits<long double>::infinity())
3134                                 continue;
3135 
3136                             ostringstream os;
3137                             //@Bug 3350 fix the precision.
3138                             os << setprecision(19) << dll;
3139                             value = os.str();
3140                             break;
3141                         }
3142 
3143                         default:	// treat as int64
3144                         {
3145                             ostringstream os;
3146                             intColVal = row.getUintField<8>(fetchColPos);
3147                             os << intColVal;
3148                             value = os.str();
3149                             break;
3150                         }
3151                     }
3152 
3153                     uint32_t funcScale = columnsUpdated[j]->get_funcScale();
3154 
3155                     if (funcScale != 0)
3156                     {
3157                         string::size_type pos = value.find_first_of("."); //decimal point
3158 
3159                         if ( pos >= value.length() )
3160                             value.insert(value.length(), ".");
3161 
3162                         //padding 0 if needed
3163                         pos = value.find_first_of(".");
3164                         uint32_t digitsAfterPoint = value.length() - pos - 1;
3165 
3166                         if (digitsAfterPoint < funcScale)
3167                         {
3168                             for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++)
3169                                 value += "0";
3170                         }
3171                     }
3172 
3173                     //check data length
3174                     //trim the string if needed
3175                     if (value.length() > (unsigned int)colType.colWidth)
3176                     {
3177                         value = value.substr(0, colType.colWidth);
3178 
3179                         if (!pushWarn)
3180                             pushWarn = true;
3181 
3182                         if (!pushWarning)
3183                             pushWarning = true;
3184 
3185                         if (pushWarn)
3186                             nameNeeded = true;
3187                     }
3188 
3189                     WriteEngine::DctnryTuple dctTuple;
3190                     dctTuple.sigValue = (unsigned char*)value.c_str();
3191                     dctTuple.sigSize = value.length();
3192                     dctTuple.isNull = false;
3193 
3194                     error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
3195 
3196                     if (error != NO_ERROR)
3197                     {
3198                         fWEWrapper.closeDctnry(txnId, colType.compressionType);
3199                         rc = error;
3200                         WErrorCodes ec;
3201                         err = ec.errorString(error);
3202                         return rc;
3203                     }
3204 
3205                     colTuple.data = dctTuple.token;
3206                     colTupleList.push_back (colTuple);
3207                 }
3208 
3209                 if (colType.compressionType == 0)
3210                     fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
3211                 else
3212                     fWEWrapper.closeDctnry(txnId, colType.compressionType, false);
3213 
3214                 fetchColPos++;
3215             }
3216             else //constant
3217             {
3218                 if (columnsUpdated[j]->get_isnull())
3219                 {
3220                     if ((colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
3221                     {
3222                         rc = 1;
3223                         Message::Args args;
3224                         args.add(tableColName.column);
3225                         err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
3226                         return rc;
3227                     }
3228                     else if (colType.defaultValue.length() > 0)
3229                     {
3230                         value = colType.defaultValue;
3231 
3232                         if (value.length() > (unsigned int)colType.colWidth)
3233                         {
3234                             value = value.substr(0, colType.colWidth);
3235                             pushWarn = true;
3236 
3237                             if (!pushWarning)
3238                             {
3239                                 pushWarning = true;
3240                             }
3241 
3242                             if (pushWarn)
3243                                 nameNeeded = true;
3244                         }
3245 
3246                         WriteEngine::DctnryTuple dctTuple;
3247                         dctTuple.sigValue = (unsigned char*)value.c_str();
3248                         dctTuple.sigSize = value.length();
3249                         dctTuple.isNull = false;
3250                         error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
3251 
3252                         if (error != NO_ERROR)
3253                         {
3254                             fWEWrapper.closeDctnry(txnId, colType.compressionType);
3255                             rc = error;
3256                             WErrorCodes ec;
3257                             err = ec.errorString(error);
3258                             return rc;
3259                         }
3260 
3261                         colTuple.data = dctTuple.token;
3262 
3263                         if (colType.compressionType == 0)
3264                             fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
3265                         else
3266                             fWEWrapper.closeDctnry(txnId, colType.compressionType, false); // Constant only need to tokenize once.
3267                     }
3268                     else
3269                     {
3270                         WriteEngine::Token nullToken;
3271                         colTuple.data = nullToken;
3272                     }
3273                 }
3274                 else
3275                 {
3276                     value = columnsUpdated[j]->get_Data();
3277 
3278                     if (value.length() > (unsigned int)colType.colWidth)
3279                     {
3280                         value = value.substr(0, colType.colWidth);
3281                         pushWarn = true;
3282 
3283                         if (!pushWarning)
3284                         {
3285                             pushWarning = true;
3286                         }
3287 
3288                         if (pushWarn)
3289                             nameNeeded = true;
3290                     }
3291 
3292                     WriteEngine::DctnryTuple dctTuple;
3293                     dctTuple.sigValue = (unsigned char*)value.c_str();
3294                     dctTuple.sigSize = value.length();
3295                     dctTuple.isNull = false;
3296                     error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
3297 
3298                     if (error != NO_ERROR)
3299                     {
3300                         fWEWrapper.closeDctnry(txnId, colType.compressionType);
3301                         rc = error;
3302                         WErrorCodes ec;
3303                         err = ec.errorString(error);
3304                         return rc;
3305                     }
3306 
3307                     colTuple.data = dctTuple.token;
3308 
3309                     if (colType.compressionType == 0)
3310                         fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
3311                     else
3312                         fWEWrapper.closeDctnry(txnId, colType.compressionType, false); // Constant only need to tokenize once.
3313                 }
3314 
3315                 for (unsigned row = 0; row < rowsThisRowgroup; row++)
3316                     colTupleList.push_back (colTuple);
3317             }
3318         }
3319         else //Non dictionary column
3320         {
3321             colStruct.colWidth = colType.colWidth;
3322 
3323             if (columnsUpdated[j]->get_isFromCol())
3324             {
3325                 for (unsigned i = 0; i < rowsThisRowgroup; i++)
3326                 {
3327                     rowGroups[txnId]->getRow(i, &row);
3328 
3329                     if (row.isNullValue(fetchColPos))
3330                     {
3331                         isNull = true;
3332                         value = "";
3333                     }
3334                     else
3335                     {
3336                         isNull = false;
3337 
3338                         switch (fetchColTypes[fetchColPos])
3339                         {
3340                             case CalpontSystemCatalog::DATE:
3341                             {
3342                                 intColVal = row.getUintField<4>(fetchColPos);
3343                                 value = DataConvert::dateToString(intColVal);
3344                                 break;
3345                             }
3346 
3347                             case CalpontSystemCatalog::DATETIME:
3348                             {
3349                                 intColVal = row.getUintField<8>(fetchColPos);
3350                                 value = DataConvert::datetimeToString(intColVal, colType.precision);
3351                                 break;
3352                             }
3353 
3354                             case CalpontSystemCatalog::TIMESTAMP:
3355                             {
3356                                 intColVal = row.getUintField<8>(fetchColPos);
3357                                 value = DataConvert::timestampToString(intColVal, timeZone, colType.precision);
3358                                 break;
3359                             }
3360 
3361                             case CalpontSystemCatalog::TIME:
3362                             {
3363                                 intColVal = row.getIntField<8>(fetchColPos);
3364                                 value = DataConvert::timeToString(intColVal, colType.precision);
3365                                 break;
3366                             }
3367 
3368                             case CalpontSystemCatalog::CHAR:
3369                             case CalpontSystemCatalog::VARCHAR:
3370                             {
3371                                 value = row.getStringField(fetchColPos);
3372                                 unsigned i = strlen(value.c_str());
3373                                 value = value.substr(0, i);
3374                                 break;
3375                             }
3376 
3377                             case CalpontSystemCatalog::VARBINARY:
3378                             case CalpontSystemCatalog::BLOB:
3379                             case CalpontSystemCatalog::TEXT:
3380                             {
3381                                 value = row.getVarBinaryStringField(fetchColPos);
3382                                 break;
3383                             }
3384 
3385                             case CalpontSystemCatalog::DECIMAL:
3386                             case CalpontSystemCatalog::UDECIMAL:
3387                             {
3388                                 // decimal width > 8 cannot be stored in an integer
3389                                 if (fetchColColwidths[fetchColPos] > 8)
3390                                 {
3391                                     value = row.getStringField(fetchColPos);
3392                                     unsigned i = strlen(value.c_str());
3393                                     value = value.substr(0, i);
3394                                     break;
3395                                 }
3396 
3397                                 // else
3398                                 //     fall through to integer cases
3399                             }
3400                             /* fall through */
3401 
3402                             case CalpontSystemCatalog::BIGINT:
3403                             case CalpontSystemCatalog::UBIGINT:
3404                             case CalpontSystemCatalog::INT:
3405                             case CalpontSystemCatalog::UINT:
3406                             case CalpontSystemCatalog::MEDINT:
3407                             case CalpontSystemCatalog::UMEDINT:
3408                             case CalpontSystemCatalog::SMALLINT:
3409                             case CalpontSystemCatalog::USMALLINT:
3410                             case CalpontSystemCatalog::TINYINT:
3411                             case CalpontSystemCatalog::UTINYINT:
3412                             {
3413                                 {
3414                                     intColVal = row.getIntField(fetchColPos);
3415 
3416                                     if (fetchColTypes[fetchColPos] ==
3417                                             CalpontSystemCatalog::UDECIMAL
3418                                             && intColVal < 0)
3419                                     {
3420                                         intColVal = 0;
3421                                     }
3422 
3423                                     if (fetchColScales[fetchColPos] <= 0)
3424                                     {
3425                                         ostringstream os;
3426 
3427                                         if (isUnsigned(fetchColTypes[fetchColPos]))
3428                                             os << static_cast<uint64_t>(intColVal);
3429                                         else
3430                                             os << intColVal;
3431 
3432                                         value = os.str();
3433                                     }
3434                                     else
3435                                     {
3436                                         const int ctmp_size = 65 + 1 + 1 + 1;
3437                                         char ctmp[ctmp_size] = {0};
3438                                         DataConvert::decimalToString(
3439                                             intColVal, fetchColScales[fetchColPos],
3440                                             ctmp, ctmp_size, fetchColTypes[fetchColPos]);
3441                                         value = ctmp;  // null termination by decimalToString
3442                                     }
3443                                 }
3444                                 break;
3445                             }
3446 
3447                             //In this case, we're trying to load a double output column with float data. This is the
3448                             // case when you do sum(floatcol), e.g.
3449                             case CalpontSystemCatalog::FLOAT:
3450                             case CalpontSystemCatalog::UFLOAT:
3451                             {
3452                                 float dl = row.getFloatField(fetchColPos);
3453 
3454                                 if (dl == std::numeric_limits<float>::infinity())
3455                                     continue;
3456 
3457                                 if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0)
3458                                 {
3459                                     dl = 0.0;
3460                                 }
3461 
3462                                 ostringstream os;
3463                                 //@Bug 3350 fix the precision.
3464                                 os << setprecision(7) << dl;
3465                                 value = os.str();
3466                                 break;
3467                             }
3468 
3469                             case CalpontSystemCatalog::DOUBLE:
3470                             case CalpontSystemCatalog::UDOUBLE:
3471                             {
3472                                 double dl = row.getDoubleField(fetchColPos);
3473 
3474                                 if (dl == std::numeric_limits<double>::infinity())
3475                                     continue;
3476 
3477                                 if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0)
3478                                 {
3479                                     dl = 0.0;
3480                                 }
3481 
3482                                 ostringstream os;
3483                                 //@Bug 3350 fix the precision.
3484                                 os << setprecision(16) << dl;
3485                                 value = os.str();
3486                                 break;
3487                             }
3488 
3489                             case CalpontSystemCatalog::LONGDOUBLE:
3490                             {
3491                                 long double dll = row.getLongDoubleField(fetchColPos);
3492 
3493                                 if (dll == std::numeric_limits<long double>::infinity())
3494                                     continue;
3495 
3496                                 ostringstream os;
3497                                 //@Bug 3350 fix the precision.
3498                                 os << setprecision(19) << dll;
3499                                 value = os.str();
3500                                 break;
3501                             }
3502 
3503                             default:	// treat as int64
3504                             {
3505                                 ostringstream os;
3506                                 intColVal = row.getUintField<8>(fetchColPos);
3507                                 os << intColVal;
3508                                 value = os.str();
3509                                 break;
3510                             }
3511                         }
3512                     }
3513 
3514                     uint32_t funcScale = columnsUpdated[j]->get_funcScale();
3515 
3516                     if (funcScale != 0)
3517                     {
3518                         string::size_type pos = value.find_first_of("."); //decimal point
3519 
3520                         if ( pos >= value.length() )
3521                             value.insert(value.length(), ".");
3522 
3523                         //padding 0 if needed
3524                         pos = value.find_first_of(".");
3525                         uint32_t digitsAfterPoint = value.length() - pos - 1;
3526 
3527                         if (digitsAfterPoint < funcScale)
3528                         {
3529                             for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++)
3530                                 value += "0";
3531                         }
3532                     }
3533 
3534                     //Check NOT NULL constraint and default value
3535                     if ((isNull) && (colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
3536                     {
3537                         rc = 1;
3538                         Message::Args args;
3539                         args.add(tableColName.column);
3540                         err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
3541                         return rc;
3542                     }
3543                     else if ((isNull) && (colType.defaultValue.length() > 0))
3544                     {
3545                         isNull = false;
3546                         bool oneWarn = false;
3547 
3548                         try
3549                         {
3550                             datavalue = DataConvert::convertColumnData(colType, colType.defaultValue, pushWarn, timeZone, isNull, false, false);
3551                         }
3552                         catch (exception&)
3553                         {
3554                             //@Bug 2624. Error out on conversion failure
3555                             rc = 1;
3556                             Message::Args args;
3557                             args.add(string("'") + colType.defaultValue + string("'"));
3558                             err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
3559                         }
3560 
3561                         if ((pushWarn) && (!oneWarn))
3562                             oneWarn = true;
3563 
3564                         colTuple.data = datavalue;
3565                         colTupleList.push_back (colTuple);
3566 
3567                         if (oneWarn)
3568                             pushWarn = true;
3569 
3570                         if (!pushWarning)
3571                         {
3572                             pushWarning = pushWarn;
3573                         }
3574 
3575                         if (pushWarn)
3576                             nameNeeded = true;
3577                     }
3578                     else
3579                     {
3580                         try
3581                         {
3582                             datavalue = DataConvert::convertColumnData(colType, value, pushWarn, timeZone, isNull, false, false);
3583                         }
3584                         catch (exception&)
3585                         {
3586                             //@Bug 2624. Error out on conversion failure
3587                             rc = 1;
3588                             Message::Args args;
3589                             args.add(string("'") + value + string("'"));
3590                             err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
3591                             return rc;
3592                         }
3593 
3594                         colTuple.data = datavalue;
3595                         colTupleList.push_back (colTuple);
3596 
3597                         if (!pushWarning)
3598                         {
3599                             pushWarning = pushWarn;
3600                         }
3601 
3602                         if (pushWarn)
3603                             nameNeeded = true;
3604                     }
3605                 }
3606 
3607                 fetchColPos++;
3608             }
3609             else //constant column
3610             {
3611                 if (columnsUpdated[j]->get_isnull())
3612                 {
3613                     isNull = true;
3614                 }
3615                 else
3616                 {
3617                     isNull = false;
3618                 }
3619 
3620                 string inData (columnsUpdated[j]->get_Data());
3621 
3622                 if (((colType.colDataType == execplan::CalpontSystemCatalog::DATE) && (inData == "0000-00-00")) ||
3623                         ((colType.colDataType == execplan::CalpontSystemCatalog::DATETIME) && (inData == "0000-00-00 00:00:00")) ||
3624                         ((colType.colDataType == execplan::CalpontSystemCatalog::TIMESTAMP) && (inData == "0000-00-00 00:00:00")))
3625                 {
3626                     isNull = true;
3627                 }
3628 
3629                 uint64_t nextVal = 0;
3630 
3631                 if (colType.autoincrement)
3632                 {
3633                     try
3634                     {
3635                         nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
3636                         fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
3637                     }
3638                     catch (std::exception& ex)
3639                     {
3640                         err = ex.what();
3641                         rc = 1;
3642                         return rc;
3643                     }
3644                 }
3645 
3646                 if (colType.autoincrement && ( isNull || (inData.compare("0") == 0)))
3647                 {
3648                     //reserve nextVal
3649                     try
3650                     {
3651                         bool reserved = fDbrm.getAIRange(oid, rowsThisRowgroup, &nextVal);
3652 
3653                         if (!reserved)
3654                         {
3655                             err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
3656                             rc = 1;
3657                             return rc;
3658                         }
3659                     }
3660                     catch (std::exception& ex)
3661                     {
3662                         err = ex.what();
3663                         rc = 1;
3664                         return rc;
3665                     }
3666 
3667                     isNull = false;
3668                     bool oneWarn = false;
3669 
3670                     for (unsigned row = 0; row < rowsThisRowgroup; row++)
3671                     {
3672 
3673                         ostringstream oss;
3674                         oss << nextVal++;
3675                         inData = oss.str();
3676 
3677                         try
3678                         {
3679                             datavalue = DataConvert::convertColumnData(colType, inData, pushWarn, timeZone, isNull, false, false);
3680                         }
3681                         catch (exception&)
3682                         {
3683                             //@Bug 2624. Error out on conversion failure
3684                             rc = 1;
3685                             Message::Args args;
3686                             args.add(string("'") + inData + string("'"));
3687                             err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
3688                         }
3689 
3690                         if ((pushWarn) && (!oneWarn))
3691                             oneWarn = true;
3692 
3693                         colTuple.data = datavalue;
3694                         colTupleList.push_back (colTuple);
3695                     }
3696 
3697                     if (oneWarn)
3698                         pushWarn = true;
3699 
3700                     if (!pushWarning)
3701                     {
3702                         pushWarning = pushWarn;
3703                     }
3704 
3705                     if (pushWarn)
3706                         nameNeeded = true;
3707                 }
3708                 else if (isNull && (colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
3709                 {
3710                     rc = 1;
3711                     Message::Args args;
3712                     args.add(tableColName.column);
3713                     err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
3714                     return rc;
3715                 }
3716                 else if (isNull && (colType.defaultValue.length() > 0))
3717                 {
3718                     isNull = false;
3719                     bool oneWarn = false;
3720 
3721                     for (unsigned row = 0; row < rowsThisRowgroup; row++)
3722                     {
3723                         try
3724                         {
3725                             datavalue = DataConvert::convertColumnData(colType, colType.defaultValue, pushWarn, timeZone, isNull, false, false);
3726                         }
3727                         catch (exception&)
3728                         {
3729                             //@Bug 2624. Error out on conversion failure
3730                             rc = 1;
3731                             Message::Args args;
3732                             args.add(string("'") + colType.defaultValue + string("'"));
3733                             err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
3734                         }
3735 
3736                         if ((pushWarn) && (!oneWarn))
3737                             oneWarn = true;
3738 
3739                         colTuple.data = datavalue;
3740                         colTupleList.push_back (colTuple);
3741                     }
3742 
3743                     if (oneWarn)
3744                         pushWarn = true;
3745 
3746                     if (!pushWarning)
3747                     {
3748                         pushWarning = pushWarn;
3749                     }
3750 
3751                     if (pushWarn)
3752                         nameNeeded = true;
3753                 }
3754                 else
3755                 {
3756                     try
3757                     {
3758                         datavalue = DataConvert::convertColumnData(colType, inData, pushWarn, timeZone, isNull, false, true);
3759                     }
3760                     catch (exception& ex)
3761                     {
3762                         //@Bug 2624. Error out on conversion failure
3763                         rc = 1;
3764                         cout << ex.what() << endl;
3765                         Message::Args args;
3766                         args.add(string("'") + inData + string("'"));
3767                         err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
3768                         return rc;
3769                     }
3770 
3771                     colTuple.data = datavalue;
3772 
3773                     if (!pushWarning)
3774                     {
3775                         pushWarning = pushWarn;
3776                     }
3777 
3778                     if (pushWarn)
3779                         nameNeeded = true;
3780 
3781                     for (unsigned row = 0; row < rowsThisRowgroup; row++)
3782                         colTupleList.push_back (colTuple);
3783                 }
3784             }
3785         }
3786 
3787         if (nameNeeded)
3788         {
3789             colNames.push_back(tableColName.column);
3790         }
3791 
3792         colStructList.push_back(colStruct);
3793         colValueList.push_back (colTupleList);
3794     } //end of bulding values and column structure.
3795 
3796     //timer.stop("fetch values");
3797     if (rowIDLists.size() > 0)
3798     {
3799         error = fWEWrapper.updateColumnRecs(txnId, colStructList, colValueList,  rowIDLists, tableRO.objnum);
3800     }
3801 
3802     if (error != NO_ERROR)
3803     {
3804         rc = error;
3805         WErrorCodes ec;
3806         err = ec.errorString(error);
3807 
3808         if (error == ERR_BRM_DEAD_LOCK)
3809         {
3810             rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
3811         }
3812         else if (error == ERR_BRM_VB_OVERFLOW)
3813         {
3814             rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
3815             err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
3816         }
3817     }
3818 
3819     if (pushWarning)
3820     {
3821         rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
3822         Message::Args args;
3823         string cols = "'" + colNames[0] + "'";
3824 
3825         for (unsigned i = 1; i < colNames.size(); i++)
3826         {
3827             cols = cols + ", " +  "'" + colNames[i] + "'";
3828         }
3829 
3830         args.add(cols);
3831         err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
3832     }
3833 
3834     return rc;
3835 }
3836 
getWrittenLbids(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId)3837 uint8_t WE_DMLCommandProc::getWrittenLbids(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId)
3838 {
3839     uint8_t rc = 0;
3840     uint32_t txnId;
3841     vector<LBID_t> lbidList;
3842 
3843     bs >> txnId;
3844     std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
3845     std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>  m_txnLBIDMap = fWEWrapper.getTxnMap();
3846 
3847     try
3848     {
3849         mapIter = m_txnLBIDMap.find(txnId);
3850 
3851         if (mapIter != m_txnLBIDMap.end())
3852         {
3853             SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
3854             std::tr1::unordered_map<BRM::LBID_t, uint32_t> ::iterator listIter = spTxnLBIDRec->m_LBIDMap.begin();
3855 
3856             while (listIter != spTxnLBIDRec->m_LBIDMap.end())
3857             {
3858                 lbidList.push_back(listIter->first);
3859                 listIter++;
3860             }
3861         }
3862     }
3863     catch (...) {}
3864 
3865     bs.restart();
3866 
3867     try
3868     {
3869         serializeInlineVector (bs, lbidList);
3870     }
3871     catch (exception& ex)
3872     {
3873         // Append to errmsg in case we already have an error
3874         if (err.length() > 0)
3875             err += "; ";
3876 
3877         err += ex.what();
3878         rc = 1;
3879         return rc;
3880     }
3881 
3882     return rc;
3883 }
3884 
processFlushFiles(messageqcpp::ByteStream & bs,std::string & err)3885 uint8_t WE_DMLCommandProc::processFlushFiles(messageqcpp::ByteStream& bs, std::string& err)
3886 {
3887     uint8_t rc = 0;
3888     uint32_t flushCode, txnId, tableOid;
3889     int error;
3890     bs >> flushCode;
3891     bs >> txnId;
3892     bs >> tableOid;
3893     std::map<uint32_t, uint32_t> oids;
3894     CalpontSystemCatalog::TableName aTableName;
3895     CalpontSystemCatalog::RIDList ridList;
3896     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
3897         CalpontSystemCatalog::makeCalpontSystemCatalog(txnId);
3898     // execplan::CalpontSystemCatalog::ColType colType;
3899     CalpontSystemCatalog::DictOIDList dictOids;
3900 
3901     if (tableOid >= 3000)
3902     {
3903         try
3904         {
3905             aTableName =  systemCatalogPtr->tableName(tableOid);
3906         }
3907         catch ( ... )
3908         {
3909             err = "Systemcatalog error for tableoid " + tableOid;
3910             rc = 1;
3911             return rc;
3912         }
3913 
3914         dictOids = systemCatalogPtr->dictOIDs(aTableName);
3915 
3916         for (unsigned i = 0; i < dictOids.size(); i++)
3917         {
3918             oids[dictOids[i].dictOID] = dictOids[i].dictOID;
3919         }
3920 
3921         //if (dictOids.size() > 0)
3922         //	colType = systemCatalogPtr->colTypeDct(dictOids[0].dictOID);
3923     }
3924 
3925 
3926     fWEWrapper.setIsInsert(false);
3927     fWEWrapper.setBulkFlag(false);
3928     vector<LBID_t> lbidList;
3929 
3930     if (idbdatafile::IDBPolicy::useHdfs())
3931     {
3932         //save the extent info to mark them invalid, after flush, the meta file will be gone.
3933         std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
3934         std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>  m_txnLBIDMap = fWEWrapper.getTxnMap();
3935 
3936         try
3937         {
3938             mapIter = m_txnLBIDMap.find(txnId);
3939 
3940             if (mapIter != m_txnLBIDMap.end())
3941             {
3942                 SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
3943                 std::tr1::unordered_map<BRM::LBID_t, uint32_t> ::iterator listIter = spTxnLBIDRec->m_LBIDMap.begin();
3944 
3945                 while (listIter != spTxnLBIDRec->m_LBIDMap.end())
3946                 {
3947                     lbidList.push_back(listIter->first);
3948                     listIter++;
3949                 }
3950             }
3951         }
3952         catch (...) {}
3953     }
3954 
3955     error = fWEWrapper.flushDataFiles(flushCode, txnId, oids);
3956 
3957 //No need to close files, flushDataFile will close them.
3958     //if (((colType.compressionType > 0 ) && (dictOids.size() > 0)) || (idbdatafile::IDBPolicy::useHdfs()))
3959     //	fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
3960     if (error != NO_ERROR)
3961     {
3962         rc = error;
3963         WErrorCodes ec;
3964         err = ec.errorString(error);
3965     }
3966 
3967     //erase rowgroup from the rowGroup map
3968     if (rowGroups[txnId])
3969     {
3970         delete rowGroups[txnId];
3971         rowGroups[txnId] = 0;
3972     }
3973 
3974     TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
3975     ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
3976     ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
3977     ColExtsInfo::iterator aIt;
3978     std::vector<BRM::FileInfo> files;
3979     std::vector<BRM::OID_t>  oidsToFlush;
3980     BRM::FileInfo aFile;
3981 
3982     while (it != colsExtsInfoMap.end())
3983     {
3984         aIt = (it->second).begin();
3985         aFile.oid = it->first;
3986         oidsToFlush.push_back(aFile.oid);
3987 
3988         while (aIt != (it->second).end())
3989         {
3990             aFile.partitionNum = aIt->partNum;
3991             aFile.dbRoot = aIt->dbRoot;
3992             aFile.segmentNum = aIt->segNum;
3993             aFile.compType = aIt->compType;
3994             files.push_back(aFile);
3995             //cout <<"Added to files oid:dbroot:part:seg:compType = " << aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
3996             //<<":"<<aFile.compType <<endl;
3997             aIt++;
3998         }
3999 
4000         it++;
4001     }
4002 
4003     if (idbdatafile::IDBPolicy::useHdfs())
4004     {
4005         rc = fWEWrapper.confirmTransaction(txnId);
4006         //@Bug 5700. Purge FD cache after file swap
4007         cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
4008         cacheutils::flushOIDsFromCache(oidsToFlush);
4009         fDbrm.invalidateUncommittedExtentLBIDs(0, &lbidList);
4010     }
4011 
4012     //cout << "Purged files.size:moduleId = " << files.size() << ":"<<Config::getLocalModuleID() << endl;
4013     //if (idbdatafile::IDBPolicy::useHdfs())
4014     //		cacheutils::dropPrimProcFdCache();
4015     TableMetaData::removeTableMetaData(tableOid);
4016 
4017     // MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
4018     CalpontSystemCatalog::removeCalpontSystemCatalog(txnId);
4019     CalpontSystemCatalog::removeCalpontSystemCatalog(txnId | 0x80000000);
4020     return rc;
4021 }
4022 
processDelete(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId,uint64_t & blocksChanged)4023 uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs,
4024         std::string& err,
4025         ByteStream::quadbyte& PMId,
4026         uint64_t& blocksChanged)
4027 {
4028     uint8_t rc = 0;
4029     //cout << " In processDelete" << endl;
4030     uint32_t tmp32, sessionID;
4031     TxnID txnId;
4032     bs >> PMId;
4033     bs >> sessionID;
4034     bs >> tmp32;
4035     txnId = tmp32;
4036     string schema, tableName;
4037     bs >> schema;
4038     bs >> tableName;
4039     fWEWrapper.setIsInsert(false);
4040     fWEWrapper.setBulkFlag(false);
4041     fWEWrapper.setTransId(txnId);
4042 
4043     if (!rowGroups[txnId]) //meta data
4044     {
4045         rowGroups[txnId] = new rowgroup::RowGroup();
4046         rowGroups[txnId]->deserialize(bs);
4047         //If hdfs, call chunkmanager to set up
4048 
4049         rc = fWEWrapper.startTransaction(txnId);
4050 
4051         if (rc != NO_ERROR)
4052         {
4053             WErrorCodes ec;
4054             err = ec.errorString(rc);
4055         }
4056 
4057         return rc;
4058     }
4059 
4060     rowgroup::RGData rgData;
4061     rgData.deserialize(bs);
4062     rowGroups[txnId]->setData(&rgData);
4063     //rowGroups[txnId]->setData(const_cast<uint8_t*>(bs.buf()));
4064     //get row ids
4065     rowgroup::Row row;
4066     rowGroups[txnId]->initRow(&row);
4067     WriteEngine::RIDList  rowIDList;
4068     CalpontSystemCatalog::RID rid;
4069     uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount();
4070     uint16_t dbRoot, segment, blockNum;
4071     uint32_t partition;
4072     uint8_t extentNum;
4073     CalpontSystemCatalog::TableName aTableName;
4074     aTableName.schema = schema;
4075     aTableName.table = tableName;
4076     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
4077         CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
4078     CalpontSystemCatalog::ROPair roPair;
4079 
4080     CalpontSystemCatalog::RIDList tableRidList;
4081 
4082     try
4083     {
4084         roPair = systemCatalogPtr->tableRID( aTableName);
4085         tableRidList = systemCatalogPtr->columnRIDs(aTableName, true);
4086     }
4087     catch (exception& ex)
4088     {
4089         err = ex.what();
4090         rc = 1;
4091         return rc;
4092 
4093     }
4094 
4095     // querystats
4096     uint64_t relativeRID = 0;
4097     boost::scoped_array<int> preBlkNums(new int[row.getColumnCount()]);
4098     boost::scoped_array<uint32_t> colWidth(new uint32_t[row.getColumnCount()]);
4099 
4100     // initialize
4101     for (uint32_t j = 0; j < row.getColumnCount(); j++)
4102     {
4103         preBlkNums[j] = -1;
4104         colWidth[j] = (row.getColumnWidth(j) >= 8 ? 8 : row.getColumnWidth(j));
4105     }
4106 
4107     //Get the file information from rowgroup
4108     dbRoot = rowGroups[txnId]->getDBRoot();
4109     rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
4110     WriteEngine::ColStructList colStructList;
4111     WriteEngine::ColStruct colStruct;
4112     colStruct.fColPartition = partition;
4113     colStruct.fColSegment = segment;
4114     colStruct.fColDbRoot = dbRoot;
4115 
4116     for (unsigned i = 0; i < rowsThisRowgroup; i++)
4117     {
4118         rowGroups[txnId]->getRow(i, &row);
4119         rid = row.getRid();
4120         relativeRID = rid - rowGroups[txnId]->getBaseRid();
4121         rid = relativeRID;
4122         convertToRelativeRid (rid, extentNum, blockNum);
4123         rowIDList.push_back(rid);
4124 
4125         // populate stats.blocksChanged
4126         for (uint32_t j = 0; j < row.getColumnCount(); j++)
4127         {
4128             if ((int)(relativeRID / (BYTE_PER_BLOCK / colWidth[j])) > preBlkNums[j])
4129             {
4130                 blocksChanged++;
4131                 preBlkNums[j] = relativeRID / (BYTE_PER_BLOCK / colWidth[j]);
4132             }
4133         }
4134     }
4135 
4136     try
4137     {
4138         for (unsigned i = 0; i < tableRidList.size(); i++)
4139         {
4140             CalpontSystemCatalog::ColType colType;
4141             colType = systemCatalogPtr->colType( tableRidList[i].objnum );
4142             colStruct.dataOid = tableRidList[i].objnum;
4143             colStruct.tokenFlag = false;
4144             colStruct.fCompressionType = colType.compressionType;
4145 
4146             if (colType.colWidth > 8)  //token
4147             {
4148                 colStruct.colWidth = 8;
4149                 colStruct.tokenFlag = true;
4150             }
4151             else
4152             {
4153                 colStruct.colWidth = colType.colWidth;
4154             }
4155 
4156             colStruct.colDataType = colType.colDataType;
4157 
4158             colStructList.push_back( colStruct );
4159         }
4160     }
4161     catch (exception& ex)
4162     {
4163         err = ex.what();
4164         rc = 1;
4165         return rc;
4166     }
4167 
4168     std::vector<ColStructList> colExtentsStruct;
4169     std::vector<void*> colOldValueList;
4170     std::vector<RIDList> ridLists;
4171     colExtentsStruct.push_back(colStructList);
4172     ridLists.push_back(rowIDList);
4173     int error = 0;
4174 
4175     error = fWEWrapper.deleteRow( txnId, colExtentsStruct, colOldValueList, ridLists, roPair.objnum );
4176 
4177     if (error != NO_ERROR)
4178     {
4179         rc = error;
4180         //cout << "WE Error code " << error << endl;
4181         WErrorCodes ec;
4182         err = ec.errorString(error);
4183 
4184         if (error == ERR_BRM_DEAD_LOCK)
4185         {
4186             rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
4187         }
4188         else if (error == ERR_BRM_VB_OVERFLOW)
4189         {
4190             rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
4191             err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
4192         }
4193     }
4194 
4195     return rc;
4196 }
4197 
processRemoveMeta(messageqcpp::ByteStream & bs,std::string & err)4198 uint8_t WE_DMLCommandProc::processRemoveMeta(messageqcpp::ByteStream& bs, std::string& err)
4199 {
4200     uint8_t rc = 0;
4201     uint32_t	tableOID;
4202 
4203     try
4204     {
4205         bs >> tableOID;
4206         //std::cout << ": tableOID-" << tableOID << std::endl;
4207 
4208         BulkRollbackMgr::deleteMetaFile( tableOID );
4209     }
4210     catch (exception& ex)
4211     {
4212         err = ex.what();
4213         rc = 1;
4214     }
4215 
4216     return rc;
4217 }
4218 
4219 //------------------------------------------------------------------------------
4220 // Process bulk rollback command
4221 //------------------------------------------------------------------------------
processBulkRollback(messageqcpp::ByteStream & bs,std::string & err)4222 uint8_t WE_DMLCommandProc::processBulkRollback(messageqcpp::ByteStream& bs,
4223         std::string& err)
4224 {
4225     uint8_t rc = 0;
4226     err.clear();
4227 
4228     try
4229     {
4230         uint32_t    tableOID;
4231         uint64_t    tableLockID;
4232         std::string tableName;
4233         std::string appName;
4234 
4235         // May want to eventually comment out this logging to stdout,
4236         // but it shouldn't hurt to keep in here.
4237         std::cout << "processBulkRollback";
4238         bs >> tableLockID;
4239         //std::cout << ": tableLock-"<< tableLockID;
4240 
4241         bs >> tableOID;
4242         //std::cout << "; tableOID-" << tableOID;
4243 
4244         bs >> tableName;
4245 
4246         if (tableName.length() == 0)
4247         {
4248             boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
4249                 CalpontSystemCatalog::makeCalpontSystemCatalog(0);
4250             CalpontSystemCatalog::TableName aTableName = systemCatalogPtr->tableName(tableOID);
4251             tableName = aTableName.toString();
4252 
4253         }
4254 
4255         std::cout << "; table-"    << tableName;
4256 
4257         bs >> appName;
4258         std::cout << "; app-"      << appName << std::endl;
4259 
4260         int we_rc = fWEWrapper.bulkRollback(
4261                         tableOID,
4262                         tableLockID,
4263                         tableName,
4264                         appName,
4265                         false, // no extra debug logging to the console
4266                         err );
4267 
4268         if (we_rc != NO_ERROR)
4269             rc = 2;
4270     }
4271     catch (exception& ex)
4272     {
4273         std::cout << "processBulkRollback: exception-" << ex.what() << std::endl;
4274         err = ex.what();
4275         rc  = 1;
4276     }
4277 
4278     return rc;
4279 }
4280 
4281 //------------------------------------------------------------------------------
4282 // Process bulk rollback cleanup command (deletes bulk rollback meta data files)
4283 //------------------------------------------------------------------------------
processBulkRollbackCleanup(messageqcpp::ByteStream & bs,std::string & err)4284 uint8_t WE_DMLCommandProc::processBulkRollbackCleanup(
4285     messageqcpp::ByteStream& bs,
4286     std::string& err)
4287 {
4288     uint8_t rc = 0;
4289     err.clear();
4290 
4291     try
4292     {
4293         uint32_t tableOID;
4294 
4295         // May want to eventually comment out this logging to stdout,
4296         // but it shouldn't hurt to keep in here.
4297         std::cout << "processBulkRollbackCleanup";
4298         bs >> tableOID;
4299         std::cout << ": tableOID-" << tableOID << std::endl;
4300 
4301         BulkRollbackMgr::deleteMetaFile( tableOID );
4302     }
4303     catch (exception& ex)
4304     {
4305         std::cout << "processBulkRollbackCleanup: exception-" << ex.what() <<
4306                   std::endl;
4307         err = ex.what();
4308         rc  = 1;
4309     }
4310 
4311     return rc;
4312 }
4313 
updateSyscolumnNextval(ByteStream & bs,std::string & err)4314 uint8_t WE_DMLCommandProc::updateSyscolumnNextval(ByteStream& bs, std::string& err)
4315 {
4316     uint32_t columnOid, sessionID;
4317     uint64_t nextVal;
4318     int rc = 0;
4319     bs >> columnOid;
4320     bs >> nextVal;
4321     bs >> sessionID;
4322     uint16_t dbRoot;
4323     std::map<uint32_t, uint32_t> oids;
4324     //std::vector<BRM::OID_t>  oidsToFlush;
4325     oids[columnOid] = columnOid;
4326     //oidsToFlush.push_back(columnOid);
4327     BRM::OID_t oid = 1021;
4328     fDbrm.getSysCatDBRoot(oid, dbRoot);
4329     fWEWrapper.setTransId(sessionID);
4330     fWEWrapper.setBulkFlag(false);
4331     fWEWrapper.startTransaction(sessionID);
4332     //cout << "updateSyscolumnNextval startTransaction id " << sessionID << endl;
4333     rc = fWEWrapper.updateNextValue(sessionID, columnOid, nextVal, sessionID, dbRoot);
4334 
4335     if (rc != 0)
4336     {
4337         err = "Error in WE::updateNextValue";
4338         rc = 1;
4339     }
4340 
4341     if (idbdatafile::IDBPolicy::useHdfs())
4342     {
4343         cout << "updateSyscolumnNextval flushDataFiles  " <<  endl;
4344         int rc1 = fWEWrapper.flushDataFiles(rc, sessionID, oids);
4345 
4346         if ((rc == 0) && ( rc1 == 0))
4347         {
4348             cout << "updateSyscolumnNextval confirmTransaction rc =0  " << endl;
4349             rc1 = fWEWrapper.confirmTransaction(sessionID);
4350             cout << "updateSyscolumnNextval confirmTransaction return code is  " << rc1 << endl;
4351 
4352             if ( rc1 == NO_ERROR)
4353                 rc1 = fWEWrapper.endTransaction(sessionID, true);
4354             else
4355                 fWEWrapper.endTransaction(sessionID, false);
4356         }
4357         else
4358         {
4359             cout << "updateSyscolumnNextval endTransaction with error  " << endl;
4360             fWEWrapper.endTransaction(sessionID, false);
4361 
4362         }
4363 
4364         if ( rc == NO_ERROR)
4365             rc = rc1;
4366     }
4367 
4368     //if (idbdatafile::IDBPolicy::useHdfs())
4369     //	cacheutils::flushOIDsFromCache(oidsToFlush);
4370     return rc;
4371 }
4372 
processPurgeFDCache(ByteStream & bs,std::string & err)4373 uint8_t WE_DMLCommandProc::processPurgeFDCache(ByteStream& bs, std::string& err)
4374 {
4375     int rc = 0;
4376     uint32_t tableOid;
4377     bs >> tableOid;
4378     TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
4379     ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
4380     ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
4381     ColExtsInfo::iterator aIt;
4382     std::vector<BRM::FileInfo> files;
4383     BRM::FileInfo aFile;
4384 
4385     while (it != colsExtsInfoMap.end())
4386     {
4387         aIt = (it->second).begin();
4388         aFile.oid = it->first;
4389 
4390         while (aIt != (it->second).end())
4391         {
4392             aFile.partitionNum = aIt->partNum;
4393             aFile.dbRoot = aIt->dbRoot;
4394             aFile.segmentNum = aIt->segNum;
4395             aFile.compType = aIt->compType;
4396             files.push_back(aFile);
4397             //cout <<"Added to files oid:dbroot:part:seg:compType = " << aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
4398             //<<":"<<aFile.compType <<endl;
4399             aIt++;
4400         }
4401 
4402         it++;
4403     }
4404 
4405     if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0) )
4406         cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
4407 
4408     TableMetaData::removeTableMetaData(tableOid);
4409     return rc;
4410 }
4411 
processFixRows(messageqcpp::ByteStream & bs,std::string & err,ByteStream::quadbyte & PMId)4412 uint8_t WE_DMLCommandProc::processFixRows(messageqcpp::ByteStream& bs,
4413         std::string& err,
4414         ByteStream::quadbyte& PMId)
4415 {
4416     uint8_t rc = 0;
4417     //cout << " In processFixRows" << endl;
4418     uint32_t tmp32;
4419     uint64_t sessionID;
4420     uint16_t dbRoot, segment;
4421     uint32_t partition;
4422     string schema, tableName;
4423     TxnID txnId;
4424     uint8_t tmp8;
4425     bool firstBatch = false;
4426     WriteEngine::RIDList  rowIDList;
4427     bs >> PMId;
4428     bs >> tmp8;
4429     firstBatch = (tmp8 != 0);
4430     bs >> sessionID;
4431     bs >> tmp32;
4432     txnId = tmp32;
4433 
4434     bs >> schema;
4435     bs >> tableName;
4436     bs >> dbRoot;
4437     bs >> partition;
4438     bs >> segment;
4439 
4440     deserializeInlineVector(bs, rowIDList);
4441 
4442     //Need to identify whether this is the first batch to start transaction.
4443     if (firstBatch)
4444     {
4445         rc = fWEWrapper.startTransaction(txnId);
4446 
4447         if (rc != NO_ERROR)
4448         {
4449             WErrorCodes ec;
4450             err = ec.errorString(rc);
4451             return rc;
4452         }
4453 
4454         fWEWrapper.setIsInsert(false);
4455         fWEWrapper.setBulkFlag(false);
4456         fWEWrapper.setTransId(txnId);
4457         fWEWrapper.setFixFlag(true);
4458     }
4459 
4460 
4461     CalpontSystemCatalog::TableName aTableName;
4462     aTableName.schema = schema;
4463     aTableName.table = tableName;
4464     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
4465         CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
4466 
4467     CalpontSystemCatalog::ROPair roPair;
4468 
4469     CalpontSystemCatalog::RIDList tableRidList;
4470 
4471     try
4472     {
4473         roPair = systemCatalogPtr->tableRID( aTableName);
4474         tableRidList = systemCatalogPtr->columnRIDs(aTableName, true);
4475     }
4476     catch (exception& ex)
4477     {
4478         err = ex.what();
4479         rc = 1;
4480         return rc;
4481 
4482     }
4483 
4484     WriteEngine::ColStructList colStructList;
4485     WriteEngine::ColStruct colStruct;
4486     WriteEngine::DctnryStructList dctnryStructList;
4487     /*	colStruct.fColPartition = partition;
4488     	colStruct.fColSegment = segment;
4489     	colStruct.fColDbRoot = dbRoot; */
4490     colStruct.fColPartition = 0;
4491     colStruct.fColSegment = 0;
4492     colStruct.fColDbRoot = 3;
4493     //should we always scan dictionary store files?
4494 
4495     try
4496     {
4497         for (unsigned i = 0; i < tableRidList.size(); i++)
4498         {
4499             CalpontSystemCatalog::ColType colType;
4500             colType = systemCatalogPtr->colType( tableRidList[i].objnum );
4501             colStruct.dataOid = tableRidList[i].objnum;
4502             colStruct.tokenFlag = false;
4503             colStruct.fCompressionType = colType.compressionType;
4504             WriteEngine::DctnryStruct dctnryStruct;
4505             dctnryStruct.fColDbRoot = colStruct.fColDbRoot;
4506             dctnryStruct.fColPartition = colStruct.fColPartition;
4507             dctnryStruct.fColSegment = colStruct.fColSegment;
4508             dctnryStruct.fCompressionType = colStruct.fCompressionType;
4509             dctnryStruct.dctnryOid = 0;
4510 
4511             if (colType.colWidth > 8)         //token
4512             {
4513                 colStruct.colWidth = 8;
4514                 colStruct.tokenFlag = true;
4515                 dctnryStruct.dctnryOid = colType.ddn.dictOID;
4516             }
4517             else
4518             {
4519                 colStruct.colWidth = colType.colWidth;
4520             }
4521 
4522             colStruct.colDataType = colType.colDataType;
4523 
4524             colStructList.push_back( colStruct );
4525             dctnryStructList.push_back(dctnryStruct);
4526         }
4527     }
4528     catch (exception& ex)
4529     {
4530         err = ex.what();
4531         rc = 1;
4532         return rc;
4533 
4534     }
4535 
4536     int error = 0;
4537 
4538     try
4539     {
4540         error = fWEWrapper.deleteBadRows( txnId, colStructList, rowIDList, dctnryStructList);
4541 
4542         if (error != NO_ERROR)
4543         {
4544             rc = error;
4545             //cout << "WE Error code " << error << endl;
4546             WErrorCodes ec;
4547             err = ec.errorString(error);
4548 
4549             if (error == ERR_BRM_DEAD_LOCK)
4550             {
4551                 rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
4552             }
4553             else if (error == ERR_BRM_VB_OVERFLOW)
4554             {
4555                 rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
4556                 err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
4557             }
4558         }
4559     }
4560     catch (std::exception& ex)
4561     {
4562         err = ex.what();
4563         rc = 1;
4564     }
4565 
4566     //cout << "WES return rc " << (int)rc << " with msg " << err << endl;
4567     return rc;
4568 }
4569 
processEndTransaction(ByteStream & bs,std::string & err)4570 uint8_t WE_DMLCommandProc::processEndTransaction(ByteStream& bs, std::string& err)
4571 {
4572     int rc = 0;
4573     ByteStream::byte tmp8;
4574     bool success;
4575     uint32_t txnid;
4576     bs >> txnid;
4577     bs >> tmp8;
4578     success = (tmp8 != 0);
4579     rc = fWEWrapper.endTransaction(txnid, success);
4580 
4581     if (rc != NO_ERROR)
4582     {
4583         WErrorCodes ec;
4584         err = ec.errorString(rc);
4585     }
4586 
4587     return rc;
4588 
4589 }
4590 //------------------------------------------------------------------------------
4591 // Validates the correctness of the current HWMs for this table.
4592 // The HWMs for all the 1 byte columns should be identical.  Same goes
4593 // for all the 2 byte columns, etc.  The 2 byte column HWMs should be
4594 // "roughly" (but not necessarily exactly) twice that of a 1 byte column.
4595 // Same goes for the 4 byte column HWMs vs their 2 byte counterparts, etc.
4596 // ridList - columns oids to be used to get column width on to use with validation.
4597 // segFileInfo - Vector of File objects carrying current DBRoot, partition,
4598 //            HWM, etc to be validated for the columns belonging to jobTable.
4599 // stage    - Current stage we are validating.  "Starting" or "Ending".
4600 //------------------------------------------------------------------------------
validateColumnHWMs(CalpontSystemCatalog::RIDList & ridList,boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr,const std::vector<DBRootExtentInfo> & segFileInfo,const char * stage)4601 int WE_DMLCommandProc::validateColumnHWMs(
4602     CalpontSystemCatalog::RIDList& ridList,
4603     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr,
4604     const std::vector<DBRootExtentInfo>& segFileInfo,
4605     const char* stage )
4606 {
4607     int rc = NO_ERROR;
4608 
4609     if ((!fIsFirstBatchPm) && (strcmp(stage, "Starting") == 0))
4610         return rc;
4611 
4612     // Used to track first 1-byte, 2-byte, 4-byte, and 8-byte columns in table
4613     int byte1First = -1;
4614     int byte2First = -1;
4615     int byte4First = -1;
4616     int byte8First = -1;
4617 
4618     // Make sure the HWMs for all 1-byte columns match; same for all 2-byte,
4619     // 4-byte, and 8-byte columns as well.
4620     CalpontSystemCatalog::ColType colType;
4621     Convertor convertor;
4622 
4623     for (unsigned k = 0; k < segFileInfo.size(); k++)
4624     {
4625         int k1 = 0;
4626 
4627         // Find out column width
4628         colType = systemCatalogPtr->colType(ridList[k].objnum);
4629         colType.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
4630 
4631         // Find the first 1-byte, 2-byte, 4-byte, and 8-byte columns.
4632         // Use those as our reference HWM for the respective column widths.
4633         switch ( colType.colWidth )
4634         {
4635             case 1:
4636             {
4637                 if (byte1First == -1)
4638                     byte1First = k;
4639 
4640                 k1 = byte1First;
4641                 break;
4642             }
4643 
4644             case 2:
4645             {
4646                 if (byte2First == -1)
4647                     byte2First = k;
4648 
4649                 k1 = byte2First;
4650                 break;
4651             }
4652 
4653             case 4:
4654             {
4655                 if (byte4First == -1)
4656                     byte4First = k;
4657 
4658                 k1 = byte4First;
4659                 break;
4660             }
4661 
4662             case 8:
4663             default:
4664             {
4665                 if (byte8First == -1)
4666                     byte8First = k;
4667 
4668                 k1 = byte8First;
4669                 break;
4670             }
4671         } // end of switch based on column width (1,2,4, or 8)
4672 
4673 //std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 <<
4674 //  "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm <<
4675 //  " <to> col-" << k <<
4676 //  "; wid-" << jobColK.width << " ; hwm-"<<segFileInfo[k].fLocalHwm<<std::endl;
4677 
4678         // Validate that the HWM for this column (k) matches that of the
4679         // corresponding reference column with the same width.
4680         if ((segFileInfo[k1].fDbRoot    != segFileInfo[k].fDbRoot)    ||
4681                 (segFileInfo[k1].fPartition != segFileInfo[k].fPartition) ||
4682                 (segFileInfo[k1].fSegment   != segFileInfo[k].fSegment)   ||
4683                 (segFileInfo[k1].fLocalHwm  != segFileInfo[k].fLocalHwm))
4684         {
4685             CalpontSystemCatalog::ColType colType2;
4686             colType2 = systemCatalogPtr->colType(ridList[k1].objnum);
4687             ostringstream oss;
4688             oss << stage << " HWMs do not match for"
4689                 " OID1-"       << ridList[k1].objnum              <<
4690                 "; DBRoot-"    << segFileInfo[k1].fDbRoot      <<
4691                 "; partition-" << segFileInfo[k1].fPartition   <<
4692                 "; segment-"   << segFileInfo[k1].fSegment     <<
4693                 "; hwm-"       << segFileInfo[k1].fLocalHwm    <<
4694                 "; width-"     << colType2.colWidth << ':' << std::endl <<
4695                 " and OID2-"   << ridList[k].objnum               <<
4696                 "; DBRoot-"    << segFileInfo[k].fDbRoot       <<
4697                 "; partition-" << segFileInfo[k].fPartition    <<
4698                 "; segment-"   << segFileInfo[k].fSegment      <<
4699                 "; hwm-"       << segFileInfo[k].fLocalHwm     <<
4700                 "; width-"     << colType.colWidth;
4701             fLog.logMsg( oss.str(), ERR_UNKNOWN, MSGLVL_ERROR );
4702             return ERR_BRM_HWMS_NOT_EQUAL;
4703         }
4704 
4705         // HWM DBRoot, partition, and segment number should match for all
4706         // columns; so compare DBRoot, part#, and seg# with first column.
4707         if ((segFileInfo[0].fDbRoot    != segFileInfo[k].fDbRoot)    ||
4708                 (segFileInfo[0].fPartition != segFileInfo[k].fPartition) ||
4709                 (segFileInfo[0].fSegment   != segFileInfo[k].fSegment))
4710         {
4711             CalpontSystemCatalog::ColType colType2;
4712             colType2 = systemCatalogPtr->colType(ridList[0].objnum);
4713             ostringstream oss;
4714             oss << stage << " HWM DBRoot,Part#, or Seg# do not match for"
4715                 " OID1-"       << ridList[0].objnum               <<
4716                 "; DBRoot-"    << segFileInfo[0].fDbRoot       <<
4717                 "; partition-" << segFileInfo[0].fPartition    <<
4718                 "; segment-"   << segFileInfo[0].fSegment      <<
4719                 "; hwm-"       << segFileInfo[0].fLocalHwm     <<
4720                 "; width-"     << colType2.colWidth << ':' << std::endl <<
4721                 " and OID2-"   << ridList[k].objnum               <<
4722                 "; DBRoot-"    << segFileInfo[k].fDbRoot       <<
4723                 "; partition-" << segFileInfo[k].fPartition    <<
4724                 "; segment-"   << segFileInfo[k].fSegment      <<
4725                 "; hwm-"       << segFileInfo[k].fLocalHwm     <<
4726                 "; width-"     << colType.colWidth;
4727             fLog.logMsg( oss.str(), ERR_UNKNOWN, MSGLVL_ERROR );
4728             return ERR_BRM_HWMS_NOT_EQUAL;
4729         }
4730     } // end of loop to compare all 1-byte HWMs, 2-byte HWMs, etc.
4731 
4732     // Validate/compare HWM for 1-byte column in relation to 2-byte column, etc.
4733     // Without knowing the exact row count, we can't extrapolate the exact HWM
4734     // for the wider column, but we can narrow it down to an expected range.
4735     int refCol = 0;
4736     int colIdx = 0;
4737 
4738     // Validate/compare HWMs given a 1-byte column as a starting point
4739     if (byte1First >= 0)
4740     {
4741         refCol = byte1First;
4742 
4743         if (byte2First >= 0)
4744         {
4745             HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 2;
4746             HWM hwmHi = hwmLo + 1;
4747 
4748             if ((segFileInfo[byte2First].fLocalHwm < hwmLo) ||
4749                     (segFileInfo[byte2First].fLocalHwm > hwmHi))
4750             {
4751                 colIdx = byte2First;
4752                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
4753                 goto errorCheck;
4754             }
4755         }
4756 
4757         if (byte4First >= 0)
4758         {
4759             HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 4;
4760             HWM hwmHi = hwmLo + 3;
4761 
4762             if ((segFileInfo[byte4First].fLocalHwm < hwmLo) ||
4763                     (segFileInfo[byte4First].fLocalHwm > hwmHi))
4764             {
4765                 colIdx = byte4First;
4766                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
4767                 goto errorCheck;
4768             }
4769         }
4770 
4771         if (byte8First >= 0)
4772         {
4773             HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 8;
4774             HWM hwmHi = hwmLo + 7;
4775 
4776             if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
4777                     (segFileInfo[byte8First].fLocalHwm > hwmHi))
4778             {
4779                 colIdx = byte8First;
4780                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
4781                 goto errorCheck;
4782             }
4783         }
4784     }
4785 
4786     // Validate/compare HWMs given a 2-byte column as a starting point
4787     if (byte2First >= 0)
4788     {
4789         refCol = byte2First;
4790 
4791         if (byte4First >= 0)
4792         {
4793             HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 2;
4794             HWM hwmHi = hwmLo + 1;
4795 
4796             if ((segFileInfo[byte4First].fLocalHwm < hwmLo) ||
4797                     (segFileInfo[byte4First].fLocalHwm > hwmHi))
4798             {
4799                 colIdx = byte4First;
4800                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
4801                 goto errorCheck;
4802             }
4803         }
4804 
4805         if (byte8First >= 0)
4806         {
4807             HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 4;
4808             HWM hwmHi = hwmLo + 3;
4809 
4810             if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
4811                     (segFileInfo[byte8First].fLocalHwm > hwmHi))
4812             {
4813                 colIdx = byte8First;
4814                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
4815                 goto errorCheck;
4816             }
4817         }
4818     }
4819 
4820     // Validate/compare HWMs given a 4-byte column as a starting point
4821     if (byte4First >= 0)
4822     {
4823         refCol = byte4First;
4824 
4825         if (byte8First >= 0)
4826         {
4827             HWM hwmLo = segFileInfo[byte4First].fLocalHwm * 2;
4828             HWM hwmHi = hwmLo + 1;
4829 
4830             if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
4831                     (segFileInfo[byte8First].fLocalHwm > hwmHi))
4832             {
4833                 colIdx = byte8First;
4834                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
4835                 goto errorCheck;
4836             }
4837         }
4838     }
4839 
4840 // To avoid repeating this message 6 times in the preceding source code, we
4841 // use the "dreaded" goto to branch to this single place for error handling.
4842 errorCheck:
4843 
4844     if (rc != NO_ERROR)
4845     {
4846         CalpontSystemCatalog::ColType colType1, colType2;
4847         colType1 = systemCatalogPtr->colType(ridList[refCol].objnum);
4848         colType1.colWidth = convertor.getCorrectRowWidth(
4849                                 colType1.colDataType, colType1.colWidth);
4850 
4851         colType2 = systemCatalogPtr->colType(ridList[colIdx].objnum);
4852         colType2.colWidth = convertor.getCorrectRowWidth(
4853                                 colType2.colDataType, colType2.colWidth);
4854 
4855         ostringstream oss;
4856         oss << stage << " HWMs are not in sync for"
4857             " OID1-"       << ridList[refCol].objnum                <<
4858             "; DBRoot-"    << segFileInfo[refCol].fDbRoot      <<
4859             "; partition-" << segFileInfo[refCol].fPartition   <<
4860             "; segment-"   << segFileInfo[refCol].fSegment     <<
4861             "; hwm-"       << segFileInfo[refCol].fLocalHwm    <<
4862             "; width-"     << colType1.colWidth << ':' << std::endl <<
4863             " and OID2-"   << ridList[colIdx].objnum                 <<
4864             "; DBRoot-"    << segFileInfo[colIdx].fDbRoot      <<
4865             "; partition-" << segFileInfo[colIdx].fPartition   <<
4866             "; segment-"   << segFileInfo[colIdx].fSegment     <<
4867             "; hwm-"       << segFileInfo[colIdx].fLocalHwm    <<
4868             "; width-"     << colType2.colWidth;
4869         fLog.logMsg( oss.str(), ERR_UNKNOWN, MSGLVL_ERROR );
4870     }
4871 
4872     return rc;
4873 }
4874 }
4875