1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 // $Id: writeengine.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
20 
21 /** @writeengine.cpp
22  *   A wrapper class for the write engine to write information to files
23  */
24 #include <cmath>
25 #include <cstdlib>
26 #include <unistd.h>
27 #include <boost/scoped_array.hpp>
28 #include <boost/scoped_ptr.hpp>
29 using namespace std;
30 
31 #include "joblisttypes.h"
32 
33 #define WRITEENGINEWRAPPER_DLLEXPORT
34 #include "writeengine.h"
35 #undef WRITEENGINEWRAPPER_DLLEXPORT
36 
37 #include "we_convertor.h"
38 #include "we_log.h"
39 #include "we_simplesyslog.h"
40 #include "we_config.h"
41 #include "we_bulkrollbackmgr.h"
42 #include "brm.h"
43 #include "stopwatch.h"
44 #include "we_colop.h"
45 #include "we_type.h"
46 
47 #include "we_colopcompress.h"
48 #include "we_dctnrycompress.h"
49 #include "cacheutils.h"
50 #include "calpontsystemcatalog.h"
51 #include "we_simplesyslog.h"
52 using namespace cacheutils;
53 using namespace logging;
54 using namespace BRM;
55 using namespace execplan;
56 #include "IDBDataFile.h"
57 #include "IDBPolicy.h"
58 #include "MonitorProcMem.h"
59 using namespace idbdatafile;
60 
61 #ifdef _MSC_VER
62 #define isnan _isnan
63 #endif
64 
65 namespace WriteEngine
66 //#define PROFILE 1
67 
68 {
69 StopWatch timer;
70 
71 /**@brief WriteEngineWrapper Constructor
72 */
WriteEngineWrapper()73 WriteEngineWrapper::WriteEngineWrapper() :  m_opType(NOOP)
74 {
75     m_colOp[UN_COMPRESSED_OP] = new ColumnOpCompress0;
76     m_colOp[COMPRESSED_OP]    = new ColumnOpCompress1;
77 
78     m_dctnry[UN_COMPRESSED_OP] = new DctnryCompress0;
79     m_dctnry[COMPRESSED_OP]    = new DctnryCompress1;
80 }
81 
WriteEngineWrapper(const WriteEngineWrapper & rhs)82 WriteEngineWrapper::WriteEngineWrapper(const WriteEngineWrapper& rhs) :  m_opType(rhs.m_opType)
83 {
84     m_colOp[UN_COMPRESSED_OP] = new ColumnOpCompress0;
85     m_colOp[COMPRESSED_OP]    = new ColumnOpCompress1;
86 
87     m_dctnry[UN_COMPRESSED_OP] = new DctnryCompress0;
88     m_dctnry[COMPRESSED_OP]    = new DctnryCompress1;
89 }
90 
91 /**@brief WriteEngineWrapper Constructor
92 */
~WriteEngineWrapper()93 WriteEngineWrapper::~WriteEngineWrapper()
94 {
95     delete m_colOp[UN_COMPRESSED_OP];
96     delete m_colOp[COMPRESSED_OP];
97     delete m_dctnry[UN_COMPRESSED_OP];
98     delete m_dctnry[COMPRESSED_OP];
99 }
100 
101 /**@brief Perform upfront initialization
102 */
init(unsigned subSystemID)103 /* static */ void WriteEngineWrapper::init(unsigned subSystemID)
104 {
105     SimpleSysLog::instance()->setLoggingID(logging::LoggingID(subSystemID));
106     Config::initConfigCache();
107     BRMWrapper::getInstance();
108 
109     // Bug 5415 Add HDFS MemBuffer vs. FileBuffer decision logic.
110     config::Config* cf = config::Config::makeConfig();
111     //--------------------------------------------------------------------------
112     // Memory overload protection. This setting will cause the process to die should
113     // it, by itself, consume maxPct of total memory. Monitored in MonitorProcMem.
114     // Only used at the express direction of Field Support.
115     //--------------------------------------------------------------------------
116     int maxPct = 0; //disable by default
117     string strMaxPct = cf->getConfig("WriteEngine", "MaxPct");
118 
119     if ( strMaxPct.length() != 0 )
120         maxPct = cf->uFromText(strMaxPct);
121 
122     //--------------------------------------------------------------------------
123     // MemoryCheckPercent. This controls at what percent of total memory be consumed
124     // by all processes before we switch from HdfsRdwrMemBuffer to HdfsRdwrFileBuffer.
125     // This is only used in Hdfs installations.
126     //--------------------------------------------------------------------------
127     int checkPct = 95;
128     string strCheckPct = cf->getConfig("SystemConfig", "MemoryCheckPercent");
129 
130     if ( strCheckPct.length() != 0 )
131         checkPct = cf->uFromText(strCheckPct);
132 
133     //--------------------------------------------------------------------------
134     // If we're either HDFS, or maxPct is turned on, start the monitor thread.
135     // Otherwise, we don't need it, so don't waste the resources.
136     //--------------------------------------------------------------------------
137     if (maxPct > 0 || IDBPolicy::useHdfs())
138     {
139         new boost::thread(utils::MonitorProcMem(maxPct, checkPct, subSystemID));
140     }
141 }
142 
143 /*@brief checkValid --Check input parameters are valid
144  */
145 /***********************************************************
146  * DESCRIPTION:
147  *    Check input parameters are valid
148  * PARAMETERS:
149  *    colStructList - column struct list
150  *    colValueList - column value list
151  *    ridList - rowid list
152  * RETURN:
153  *    NO_ERROR if success
154  *    others if something wrong in the checking process
155  ***********************************************************/
checkValid(const TxnID & txnid,const ColStructList & colStructList,const ColValueList & colValueList,const RIDList & ridList) const156 int WriteEngineWrapper::checkValid(const TxnID& txnid, const ColStructList& colStructList, const ColValueList& colValueList, const RIDList& ridList) const
157 {
158     ColTupleList   curTupleList;
159     ColStructList::size_type structListSize;
160     ColValueList::size_type  valListSize;
161     ColTupleList::size_type  totalRow;
162 
163     if (colStructList.size() == 0)
164         return ERR_STRUCT_EMPTY;
165 
166     structListSize = colStructList.size() ;
167     valListSize = colValueList.size();
168 
169 //      if (colStructList.size() !=  colValueList.size())
170     if (structListSize != valListSize)
171         return ERR_STRUCT_VALUE_NOT_MATCH;
172 
173     for (ColValueList::size_type i = 0; i < valListSize; i++)
174     {
175 
176         curTupleList = static_cast<ColTupleList>(colValueList[i]);
177         totalRow = curTupleList.size();
178 
179         if (ridList.size() > 0)
180         {
181             if (totalRow != ridList.size())
182                 return ERR_ROWID_VALUE_NOT_MATCH;
183         }
184 
185     } // end of for (int i = 0;
186 
187     return NO_ERROR;
188 }
189 
190 /*@brief findSmallestColumn --Find the smallest column for this table
191  */
192 /***********************************************************
193  * DESCRIPTION:
194  *    Find the smallest column for this table
195  * PARAMETERS:
196  *    lowColLen - returns smallest column width
197  *    colId - returns smallest column id
198  *    colStructList - column struct list
199  * RETURN:
200  *  void
201  ***********************************************************/
findSmallestColumn(uint32_t & colId,ColStructList colStructList)202 void WriteEngineWrapper::findSmallestColumn(uint32_t& colId, ColStructList colStructList)
203 // MCOL-1675: find the smallest column width to calculate the RowID from so
204 // that all HWMs will be incremented by this operation
205 {
206     int32_t lowColLen = 8192;
207     for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++)
208     {
209         if (colStructList[colIt].colWidth < lowColLen)
210         {
211             colId = colIt;
212             lowColLen = colStructList[colId].colWidth;
213             if ( lowColLen == 1 )
214             {
215                 break;
216             }
217         }
218     }
219 }
220 
221 /*@convertValArray -  Convert interface values to internal values
222  */
223 /***********************************************************
224  * DESCRIPTION:
225  *    Convert interface values to internal values
226  * PARAMETERS:
227  *    colStructList - column struct list
228  *    colValueList - column value list
229  * RETURN:
230  *    none
231  *    valArray - output value array
232  *    nullArray - output null flag array
233  ***********************************************************/
convertValArray(const size_t totalRow,const ColType colType,ColTupleList & curTupleList,void * valArray,bool bFromList)234 void WriteEngineWrapper::convertValArray(const size_t totalRow, const ColType colType, ColTupleList& curTupleList, void* valArray, bool bFromList)
235 {
236     ColTuple    curTuple;
237     ColTupleList::size_type i;
238 
239     if (bFromList)
240         for (i = 0; i < curTupleList.size(); i++)
241         {
242             curTuple = curTupleList[i];
243             convertValue(colType, valArray, i, curTuple.data);
244         } // end of for (int i = 0
245     else
246         for (i = 0; i < totalRow; i++)
247         {
248             convertValue(colType, valArray, i, curTuple.data, false);
249             curTupleList.push_back(curTuple);
250         }
251 }
252 
253 /*
254  * @brief Convert column value to its internal representation
255  */
convertValue(const ColType colType,void * value,boost::any & data)256 void WriteEngineWrapper::convertValue(const ColType colType, void* value, boost::any& data)
257 {
258     string curStr;
259     int size;
260 
261     switch (colType)
262     {
263         case WriteEngine::WR_INT :
264         case WriteEngine::WR_MEDINT :
265             if (data.type() == typeid(int))
266             {
267                 int val = boost::any_cast<int>(data);
268                 size = sizeof(int);
269                 memcpy(value, &val, size);
270             }
271             else
272             {
273                 uint32_t val = boost::any_cast<uint32_t>(data);
274                 size = sizeof(uint32_t);
275                 memcpy(value, &val, size);
276             }
277 
278             break;
279 
280         case WriteEngine::WR_UINT :
281         case WriteEngine::WR_UMEDINT :
282         {
283             uint32_t val = boost::any_cast<uint32_t>(data);
284             size = sizeof(uint32_t);
285             memcpy(value, &val, size);
286         }
287         break;
288 
289         case WriteEngine::WR_VARBINARY : // treat same as char for now
290         case WriteEngine::WR_CHAR :
291         case WriteEngine::WR_BLOB :
292         case WriteEngine::WR_TEXT :
293             curStr = boost::any_cast<string>(data);
294 
295             if ((int) curStr.length() > MAX_COLUMN_BOUNDARY)
296                 curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY);
297 
298             memcpy(value, curStr.c_str(), curStr.length());
299             break;
300 
301         case WriteEngine::WR_FLOAT:
302         {
303             float val = boost::any_cast<float>(data);
304 
305 //N.B.There is a bug in boost::any or in gcc where, if you store a nan, you will get back a nan,
306 // but not necessarily the same bits that you put in. This only seems to be for float (double seems
307 // to work).
308             if (isnan(val))
309             {
310                 uint32_t ti = joblist::FLOATNULL;
311                 float* tfp = (float*)&ti;
312                 val = *tfp;
313             }
314 
315             size = sizeof(float);
316             memcpy(value, &val, size);
317         }
318         break;
319 
320         case WriteEngine::WR_DOUBLE:
321         {
322             double val = boost::any_cast<double>(data);
323             size = sizeof(double);
324             memcpy(value, &val, size);
325         }
326         break;
327 
328         case WriteEngine::WR_SHORT:
329         {
330             short val = boost::any_cast<short>(data);
331             size = sizeof(short);
332             memcpy(value, &val, size);
333         }
334         break;
335 
336         case WriteEngine::WR_USHORT:
337         {
338             uint16_t val = boost::any_cast<uint16_t>(data);
339             size = sizeof(uint16_t);
340             memcpy(value, &val, size);
341         }
342         break;
343 
344         case WriteEngine::WR_BYTE:
345         {
346             char val = boost::any_cast<char>(data);
347             size = sizeof(char);
348             memcpy(value, &val, size);
349         }
350         break;
351 
352         case WriteEngine::WR_UBYTE:
353         {
354             uint8_t val = boost::any_cast<uint8_t>(data);
355             size = sizeof(uint8_t);
356             memcpy(value, &val, size);
357         }
358         break;
359 
360         case WriteEngine::WR_LONGLONG:
361             if (data.type() == typeid(long long))
362             {
363                 long long val = boost::any_cast<long long>(data);
364                 size = sizeof(long long);
365                 memcpy(value, &val, size);
366             }
367             else
368             {
369                 uint64_t val = boost::any_cast<uint64_t>(data);
370                 size = sizeof(uint64_t);
371                 memcpy(value, &val, size);
372             }
373 
374             break;
375 
376         case WriteEngine::WR_ULONGLONG:
377         {
378             uint64_t val = boost::any_cast<uint64_t>(data);
379             size = sizeof(uint64_t);
380             memcpy(value, &val, size);
381         }
382         break;
383 
384         case WriteEngine::WR_TOKEN:
385         {
386             Token val = boost::any_cast<Token>(data);
387             size = sizeof(Token);
388             memcpy(value, &val, size);
389         }
390         break;
391 
392     } // end of switch (colType)
393 }  /*@convertValue -  The base for converting values */
394 
395 /***********************************************************
396  * DESCRIPTION:
397  *    The base for converting values
398  * PARAMETERS:
399  *    colType - data type
400  *    pos - array position
401  *    data - value
402  * RETURN:
403  *    none
404  ***********************************************************/
convertValue(const ColType colType,void * valArray,const size_t pos,boost::any & data,bool fromList)405 void WriteEngineWrapper::convertValue(const ColType colType, void* valArray, const size_t pos, boost::any& data, bool fromList)
406 {
407     string curStr;
408 //      ColTuple    curTuple;
409 
410     if (fromList)
411     {
412         switch (colType)
413         {
414             case WriteEngine::WR_INT :
415             case WriteEngine::WR_MEDINT :
416                 if (data.type() == typeid(long))
417                     ((int*)valArray)[pos] = static_cast<int>(boost::any_cast<long>(data));
418                 else if (data.type() == typeid(int))
419                     ((int*)valArray)[pos] = boost::any_cast<int>(data);
420                 else
421                     ((int*)valArray)[pos] = boost::any_cast<uint32_t>(data);
422 
423                 break;
424 
425             case WriteEngine::WR_UINT :
426             case WriteEngine::WR_UMEDINT :
427                 ((uint32_t*)valArray)[pos] = boost::any_cast<uint32_t>(data);
428                 break;
429 
430             case WriteEngine::WR_VARBINARY : // treat same as char for now
431             case WriteEngine::WR_CHAR :
432             case WriteEngine::WR_BLOB :
433             case WriteEngine::WR_TEXT :
434                 curStr = boost::any_cast<string>(data);
435 
436                 if ((int) curStr.length() > MAX_COLUMN_BOUNDARY)
437                     curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY);
438 
439                 memcpy((char*)valArray + pos * MAX_COLUMN_BOUNDARY, curStr.c_str(), curStr.length());
440                 break;
441 
442 //            case WriteEngine::WR_LONG :   ((long*)valArray)[pos] = boost::any_cast<long>(curTuple.data);
443 //                                          break;
444             case WriteEngine::WR_FLOAT:
445                 ((float*)valArray)[pos] = boost::any_cast<float>(data);
446 
447                 if (isnan(((float*)valArray)[pos]))
448                 {
449                     uint32_t ti = joblist::FLOATNULL;
450                     float* tfp = (float*)&ti;
451                     ((float*)valArray)[pos] = *tfp;
452                 }
453 
454                 break;
455 
456             case WriteEngine::WR_DOUBLE:
457                 ((double*)valArray)[pos] = boost::any_cast<double>(data);
458                 break;
459 
460             case WriteEngine::WR_SHORT:
461                 ((short*)valArray)[pos] = boost::any_cast<short>(data);
462                 break;
463 
464             case WriteEngine::WR_USHORT:
465                 ((uint16_t*)valArray)[pos] = boost::any_cast<uint16_t>(data);
466                 break;
467 
468 //            case WriteEngine::WR_BIT:     ((bool*)valArray)[pos] = boost::any_cast<bool>(data);
469 //                                          break;
470             case WriteEngine::WR_BYTE:
471                 ((char*)valArray)[pos] = boost::any_cast<char>(data);
472                 break;
473 
474             case WriteEngine::WR_UBYTE:
475                 ((uint8_t*)valArray)[pos] = boost::any_cast<uint8_t>(data);
476                 break;
477 
478             case WriteEngine::WR_LONGLONG:
479                 if (data.type() == typeid(long long))
480                     ((long long*)valArray)[pos] = boost::any_cast<long long>(data);
481                 else if (data.type() == typeid(long))
482                     ((long long*)valArray)[pos] = (long long)boost::any_cast<long>(data);
483                 else
484                     ((long long*)valArray)[pos] = boost::any_cast<uint64_t>(data);
485 
486                 break;
487 
488             case WriteEngine::WR_ULONGLONG:
489                 ((uint64_t*)valArray)[pos] = boost::any_cast<uint64_t>(data);
490                 break;
491 
492             case WriteEngine::WR_TOKEN:
493                 ((Token*)valArray)[pos] = boost::any_cast<Token>(data);
494                 break;
495         } // end of switch (colType)
496     }
497     else
498     {
499         switch (colType)
500         {
501             case WriteEngine::WR_INT :
502             case WriteEngine::WR_MEDINT :
503                 data = ((int*)valArray)[pos];
504                 break;
505 
506             case WriteEngine::WR_UINT :
507             case WriteEngine::WR_UMEDINT :
508                 data = ((uint64_t*)valArray)[pos];
509                 break;
510 
511             case WriteEngine::WR_VARBINARY : // treat same as char for now
512             case WriteEngine::WR_CHAR :
513             case WriteEngine::WR_BLOB :
514             case WriteEngine::WR_TEXT :
515                 char tmp[10];
516                 memcpy(tmp, (char*)valArray + pos * 8, 8);
517                 curStr = tmp;
518                 data = curStr;
519                 break;
520 
521 //            case WriteEngine::WR_LONG :   ((long*)valArray)[pos] = boost::any_cast<long>(curTuple.data);
522 //                                          break;
523             case WriteEngine::WR_FLOAT:
524                 data = ((float*)valArray)[pos];
525                 break;
526 
527             case WriteEngine::WR_DOUBLE:
528                 data = ((double*)valArray)[pos];
529                 break;
530 
531             case WriteEngine::WR_SHORT:
532                 data = ((short*)valArray)[pos];
533                 break;
534 
535             case WriteEngine::WR_USHORT:
536                 data = ((uint16_t*)valArray)[pos];
537                 break;
538 
539 //            case WriteEngine::WR_BIT:     data = ((bool*)valArray)[pos];
540 //                                          break;
541             case WriteEngine::WR_BYTE:
542                 data = ((char*)valArray)[pos];
543                 break;
544 
545             case WriteEngine::WR_UBYTE:
546                 data = ((uint8_t*)valArray)[pos];
547                 break;
548 
549             case WriteEngine::WR_LONGLONG:
550                 data = ((long long*)valArray)[pos];
551                 break;
552 
553             case WriteEngine::WR_ULONGLONG:
554                 data = ((uint64_t*)valArray)[pos];
555                 break;
556 
557             case WriteEngine::WR_TOKEN:
558                 data = ((Token*)valArray)[pos];
559                 break;
560         } // end of switch (colType)
561     } // end of if
562 }
563 
564 /*@createColumn -  Create column files, including data and bitmap files
565  */
566 /***********************************************************
567  * DESCRIPTION:
568  *    Create column files, including data and bitmap files
569  * PARAMETERS:
570  *    dataOid - column data file id
571  *    bitmapOid - column bitmap file id
572  *    colWidth - column width
573  *    dbRoot   - DBRoot where file is to be located
574  *    partition - Starting partition number for segment file path
575  *     compressionType - compression type
576  * RETURN:
577  *    NO_ERROR if success
578  *    ERR_FILE_EXIST if file exists
579  *    ERR_FILE_CREATE if something wrong in creating the file
580  ***********************************************************/
createColumn(const TxnID & txnid,const OID & dataOid,const CalpontSystemCatalog::ColDataType dataType,int dataWidth,uint16_t dbRoot,uint32_t partition,int compressionType)581 int WriteEngineWrapper::createColumn(
582     const TxnID& txnid,
583     const OID& dataOid,
584     const CalpontSystemCatalog::ColDataType dataType,
585     int dataWidth,
586     uint16_t dbRoot,
587     uint32_t partition,
588     int compressionType)
589 {
590     int      rc;
591     Column   curCol;
592 
593     int compress_op = op(compressionType);
594     m_colOp[compress_op]->initColumn(curCol);
595     rc = m_colOp[compress_op]->createColumn(curCol, 0, dataWidth, dataType,
596                                             WriteEngine::WR_CHAR, (FID)dataOid, dbRoot, partition);
597 
598     // This is optional, however, it's recommended to do so to free heap
599     // memory if assigned in the future
600     m_colOp[compress_op]->clearColumn(curCol);
601     std::map<FID, FID> oids;
602 
603     if (rc == NO_ERROR)
604         rc = flushDataFiles(NO_ERROR, txnid, oids);
605 
606     if (rc != NO_ERROR)
607     {
608         return rc;
609     }
610 
611     RETURN_ON_ERROR(BRMWrapper::getInstance()->setLocalHWM(dataOid, partition, 0, 0));
612     // @bug 281 : fix for bug 281 - Add flush VM cache to clear all write buffer
613     //flushVMCache();
614     return rc;
615 }
616 
617 //BUG931
618 /**
619  * @brief Fill column with default values
620  */
fillColumn(const TxnID & txnid,const OID & dataOid,const CalpontSystemCatalog::ColDataType dataType,int dataWidth,ColTuple defaultVal,const OID & refColOID,const CalpontSystemCatalog::ColDataType refColDataType,int refColWidth,int refCompressionType,bool isNULL,int compressionType,const string & defaultValStr,const OID & dictOid,bool autoincrement)621 int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid,
622                                    const CalpontSystemCatalog::ColDataType dataType, int dataWidth,
623                                    ColTuple defaultVal, const OID& refColOID,
624                                    const CalpontSystemCatalog::ColDataType refColDataType,
625                                    int refColWidth, int refCompressionType,
626                                    bool isNULL, int compressionType,
627                                    const string& defaultValStr,
628                                    const OID& dictOid, bool autoincrement)
629 {
630     int      rc = NO_ERROR;
631     Column   newCol;
632     Column   refCol;
633     ColType  newColType;
634     ColType  refColType;
635     boost::scoped_array<char> defVal(new char[MAX_COLUMN_BOUNDARY]);
636     ColumnOp* colOpNewCol = m_colOp[op(compressionType)];
637     ColumnOp* refColOp = m_colOp[op(refCompressionType)];
638     Dctnry*   dctnry  = m_dctnry[op(compressionType)];
639     colOpNewCol->initColumn(newCol);
640     refColOp->initColumn(refCol);
641     //boost::shared_ptr<Dctnry> dctnry;
642     // boost::shared_ptr<ColumnOp> refColOp;
643     // refColOp.reset(colOpRefCol);
644     // dctnry.reset(dctOp);
645     uint16_t dbRoot = 1;	//not to be used
646    int newDataWidth = dataWidth;
647     //Convert HWM of the reference column for the new column
648     //Bug 1703,1705
649     bool isToken = false;
650 
651     if (((dataType == CalpontSystemCatalog::VARCHAR) && (dataWidth > 7)) ||
652             ((dataType == CalpontSystemCatalog::CHAR) && (dataWidth > 8)) ||
653             (dataType == CalpontSystemCatalog::VARBINARY) ||
654             (dataType == CalpontSystemCatalog::BLOB) ||
655             (dataType == CalpontSystemCatalog::TEXT))
656     {
657         isToken = true;
658     }
659 
660     Convertor::convertColType(dataType, newColType, isToken);
661 
662     if (((refColDataType == CalpontSystemCatalog::VARCHAR) && (refColWidth > 7)) ||
663             ((refColDataType == CalpontSystemCatalog::CHAR) && (refColWidth > 8)) ||
664             (refColDataType == CalpontSystemCatalog::VARBINARY) ||
665             (dataType == CalpontSystemCatalog::BLOB) ||
666             (dataType == CalpontSystemCatalog::TEXT))
667     {
668         isToken = true;
669     }
670 
671     newDataWidth = colOpNewCol->getCorrectRowWidth(dataType, dataWidth);
672     // MCOL-1347 CS doubles the width for ALTER TABLE..ADD COLUMN
673     if ( dataWidth < 4 && dataType == CalpontSystemCatalog::VARCHAR )
674     {
675         newDataWidth >>= 1;
676     }
677 
678     Convertor::convertColType(refColDataType, refColType, isToken);
679     refColOp->setColParam(refCol, 0, refColOp->getCorrectRowWidth(refColDataType, refColWidth),
680                           refColDataType, refColType, (FID)refColOID, refCompressionType, dbRoot);
681    colOpNewCol->setColParam(newCol, 0, newDataWidth,
682                              dataType, newColType, (FID)dataOid, compressionType, dbRoot);
683 
684     int size = sizeof(Token);
685 
686     if (newColType == WriteEngine::WR_TOKEN)
687     {
688         if (isNULL)
689         {
690             Token nullToken;
691             memcpy(defVal.get(), &nullToken, size);
692         }
693 
694         //Tokenization is done when we create dictionary file
695     }
696     else
697         convertValue(newColType, defVal.get(), defaultVal.data);
698 
699     if (rc == NO_ERROR)
700         rc = colOpNewCol->fillColumn(txnid, newCol, refCol, defVal.get(), dctnry, refColOp, dictOid, dataWidth, defaultValStr, autoincrement);
701 
702 //   colOpNewCol->clearColumn(newCol);
703 //   colOpRefCol->clearColumn(refCol);
704 
705 //   free(defVal);
706 
707 // flushing files is in colOp->fillColumn()
708 // if (rc == NO_ERROR)
709 // rc = flushDataFiles();
710 
711     return rc;
712 }
713 
deleteRow(const TxnID & txnid,vector<ColStructList> & colExtentsStruct,vector<void * > & colOldValueList,vector<RIDList> & ridLists,const int32_t tableOid)714 int WriteEngineWrapper::deleteRow(const TxnID& txnid, vector<ColStructList>& colExtentsStruct, vector<void*>& colOldValueList,
715                                   vector<RIDList>& ridLists, const int32_t tableOid)
716 {
717     ColTuple         curTuple;
718     ColStruct        curColStruct;
719     DctnryStruct     dctnryStruct;
720     ColValueList     colValueList;
721     ColTupleList     curTupleList;
722     DctnryStructList dctnryStructList;
723     DctnryValueList  dctnryValueList;
724     ColStructList    colStructList;
725     uint64_t         emptyVal;
726     int              rc;
727     string           tmpStr("");
728     vector<DctnryStructList> dctnryExtentsStruct;
729 
730     if (colExtentsStruct.size() == 0 || ridLists.size() == 0)
731         return ERR_STRUCT_EMPTY;
732 
733     // set transaction id
734     setTransId(txnid);
735     unsigned numExtents = colExtentsStruct.size();
736 
737     for (unsigned extent = 0; extent < numExtents; extent++)
738     {
739         colStructList = colExtentsStruct[extent];
740 
741         for (ColStructList::size_type i = 0; i < colStructList.size(); i++)
742         {
743             curTupleList.clear();
744             curColStruct = colStructList[i];
745             emptyVal = m_colOp[op(curColStruct.fCompressionType)]->
746                        getEmptyRowValue(curColStruct.colDataType, curColStruct.colWidth);
747 
748             curTuple.data = emptyVal;
749             //for (RIDList::size_type j = 0; j < ridLists[extent].size(); j++)
750             //    curTupleList.push_back(curTuple);
751             curTupleList.push_back(curTuple);
752             colValueList.push_back(curTupleList);
753 
754             dctnryStruct.dctnryOid = 0;
755             dctnryStruct.fColPartition = curColStruct.fColPartition;
756             dctnryStruct.fColSegment = curColStruct.fColSegment;
757             dctnryStruct.fColDbRoot = curColStruct.fColDbRoot;
758             dctnryStruct.columnOid = colStructList[i].dataOid;
759             dctnryStructList.push_back(dctnryStruct);
760 
761             DctnryTuple dctnryTuple;
762             DctColTupleList dctColTuples;
763             dctnryTuple.sigValue = (unsigned char*)tmpStr.c_str();
764             dctnryTuple.sigSize = tmpStr.length();
765             dctnryTuple.isNull = true;
766             dctColTuples.push_back (dctnryTuple);
767             dctnryValueList.push_back (dctColTuples);
768         }
769 
770         dctnryExtentsStruct.push_back(dctnryStructList);
771     }
772 
773     // unfortunately I don't have a better way to instruct without passing too many parameters
774     m_opType = DELETE;
775     rc = updateColumnRec(txnid, colExtentsStruct, colValueList, colOldValueList, ridLists, dctnryExtentsStruct, dctnryValueList, tableOid);
776     m_opType = NOOP;
777 
778     return rc;
779 }
780 
deleteBadRows(const TxnID & txnid,ColStructList & colStructs,RIDList & ridList,DctnryStructList & dctnryStructList)781 int WriteEngineWrapper::deleteBadRows(const TxnID& txnid, ColStructList& colStructs,
782                                       RIDList& ridList, DctnryStructList& dctnryStructList)
783 {
784     /*  Need to scan all files including dictionary store files to check whether there is any bad chunks
785      *
786      */
787     int rc = 0;
788     Column         curCol;
789     void*          valArray = NULL;
790 
791     for (unsigned i = 0; i < colStructs.size(); i++)
792     {
793         ColumnOp* colOp = m_colOp[op(colStructs[i].fCompressionType)];
794         unsigned needFixFiles = colStructs[i].tokenFlag ? 2 : 1;
795         colOp->initColumn(curCol);
796 
797         for (unsigned j = 0; j < needFixFiles; j++)
798         {
799             if (j == 0)
800             {
801                 colOp->setColParam(curCol, 0, colStructs[i].colWidth,
802                                    colStructs[i].colDataType, colStructs[i].colType, colStructs[i].dataOid,
803                                    colStructs[i].fCompressionType, colStructs[i].fColDbRoot,
804                                    colStructs[i].fColPartition, colStructs[i].fColSegment);
805 
806                 string segFile;
807                 rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
808 
809                 if (rc != NO_ERROR) //If openFile fails, disk error or header error is assumed.
810                 {
811                     //report error and return.
812                     std::ostringstream oss;
813                     WErrorCodes ec;
814                     string err = ec.errorString(rc);
815                     oss << "Error opening file oid:dbroot:partition:segment = " << colStructs[i].dataOid << ":" << colStructs[i].fColDbRoot
816                         << ":" << colStructs[i].fColPartition << ":" << colStructs[i].fColSegment << " and error code is " << rc << " with message " << err;
817                     throw std::runtime_error(oss.str());
818                 }
819 
820                 switch (colStructs[i].colType)
821                 {
822                     case WriteEngine::WR_INT:
823                     case WriteEngine::WR_MEDINT:
824                         valArray = (int*) calloc(sizeof(int), 1);
825                         break;
826 
827                     case WriteEngine::WR_UINT:
828                     case WriteEngine::WR_UMEDINT:
829                         valArray = (uint32_t*) calloc(sizeof(uint32_t), 1);
830                         break;
831 
832                     case WriteEngine::WR_VARBINARY : // treat same as char for now
833                     case WriteEngine::WR_CHAR:
834                     case WriteEngine::WR_BLOB:
835                     case WriteEngine::WR_TEXT:
836                         valArray = (char*) calloc(sizeof(char), 1 * MAX_COLUMN_BOUNDARY);
837                         break;
838 
839                     case WriteEngine::WR_FLOAT:
840                         valArray = (float*) calloc(sizeof(float), 1);
841                         break;
842 
843                     case WriteEngine::WR_DOUBLE:
844                         valArray = (double*) calloc(sizeof(double), 1);
845                         break;
846 
847                     case WriteEngine::WR_BYTE:
848                         valArray = (char*) calloc(sizeof(char), 1);
849                         break;
850 
851                     case WriteEngine::WR_UBYTE:
852                         valArray = (uint8_t*) calloc(sizeof(uint8_t), 1);
853                         break;
854 
855                     case WriteEngine::WR_SHORT:
856                         valArray = (short*) calloc(sizeof(short), 1);
857                         break;
858 
859                     case WriteEngine::WR_USHORT:
860                         valArray = (uint16_t*) calloc(sizeof(uint16_t), 1);
861                         break;
862 
863                     case WriteEngine::WR_LONGLONG:
864                         valArray = (long long*) calloc(sizeof(long long), 1);
865                         break;
866 
867                     case WriteEngine::WR_ULONGLONG:
868                         valArray = (uint64_t*) calloc(sizeof(uint64_t), 1);
869                         break;
870 
871                     case WriteEngine::WR_TOKEN:
872                         valArray = (Token*) calloc(sizeof(Token), 1);
873                         break;
874                 }
875 
876                 rc = colOp->writeRows(curCol, ridList.size(), ridList, valArray, 0, true);
877 
878                 if ( rc != NO_ERROR)
879                 {
880                     //read error is fixed in place
881                     if (rc == ERR_COMP_COMPRESS) //write error
882                     {
883 
884                     }
885 
886                 }
887 
888                 //flush files will be done in the end of fix.
889                 colOp->clearColumn(curCol);
890 
891                 if (valArray != NULL)
892                     free(valArray);
893             }
894             else //dictionary file. How to fix
895             {
896                 //read headers out, uncompress the last chunk, if error, replace it with empty chunk.
897                 Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
898                 rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
899                                         dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition,
900                                         dctnryStructList[i].fColSegment,
901                                         false);
902 
903                 rc =  dctnry->checkFixLastDictChunk();
904                 rc = dctnry->closeDctnry(true);
905 
906             }
907         }
908     }
909 
910     return rc;
911 }
912 
913 /*@flushVMCache - Flush VM cache
914 */
915 /***********************************************************
916  * DESCRIPTION:
917  *    Flush sytem VM cache
918  * PARAMETERS:
919  *    none
920  * RETURN:
921  *    none
922  ***********************************************************/
flushVMCache() const923 void WriteEngineWrapper::flushVMCache() const
924 {
925 //      int fd = open("/proc/sys/vm/drop_caches", O_WRONLY);
926 //      write(fd, "3", 1);
927 //      close(fd);
928 
929 }
930 
931 /*@insertColumnRecs -  Insert value(s) into a column
932 */
933 /***********************************************************
934  * DESCRIPTION:
935  *    Insert values into  columns (batchinsert)
936  * PARAMETERS:
937  *    colStructList - column struct list
938  *    colValueList - column value list
939  * RETURN:
940  *    NO_ERROR if success
941  *    others if something wrong in inserting the value
942  ***********************************************************/
943 
insertColumnRecs(const TxnID & txnid,ColStructList & colStructList,ColValueList & colValueList,DctnryStructList & dctnryStructList,DictStrList & dictStrList,std::vector<boost::shared_ptr<DBRootExtentTracker>> & dbRootExtentTrackers,RBMetaWriter * fRBMetaWriter,bool bFirstExtentOnThisPM,bool insertSelect,bool isAutoCommitOn,OID tableOid,bool isFirstBatchPm)944 int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
945         ColStructList& colStructList,
946         ColValueList& colValueList,
947         DctnryStructList& dctnryStructList,
948         DictStrList& dictStrList,
949         std::vector<boost::shared_ptr<DBRootExtentTracker> >& dbRootExtentTrackers,
950         RBMetaWriter* fRBMetaWriter,
951         bool bFirstExtentOnThisPM,
952         bool insertSelect,
953         bool isAutoCommitOn,
954         OID tableOid,
955         bool isFirstBatchPm)
956 {
957     int            rc;
958     RID*           rowIdArray = NULL;
959     ColTupleList   curTupleList;
960     Column         curCol;
961     ColStruct      curColStruct;
962     ColValueList   colOldValueList;
963     ColValueList   colNewValueList;
964     ColStructList  newColStructList;
965     DctnryStructList newDctnryStructList;
966     HWM            hwm = 0;
967     HWM            oldHwm = 0;
968     HWM    		  newHwm = 0;
969     ColTupleList::size_type totalRow;
970     ColStructList::size_type totalColumns;
971     uint64_t rowsLeft = 0;
972     bool newExtent = false;
973     RIDList ridList;
974     ColumnOp* colOp = NULL;
975 
976     // Set tmp file suffix to modify HDFS db file
977     bool           useTmpSuffix = false;
978 
979     if (idbdatafile::IDBPolicy::useHdfs())
980     {
981         if (!bFirstExtentOnThisPM)
982             useTmpSuffix = true;
983     }
984 
985     unsigned i = 0;
986 #ifdef PROFILE
987     StopWatch timer;
988 #endif
989 
990     // debug information for testing
991     if (isDebug(DEBUG_2))
992     {
993         printf("\nIn wrapper insert\n");
994         printInputValue(colStructList, colValueList, ridList);
995     }
996 
997     // end
998 
999     //Convert data type and column width to write engine specific
1000     for (i = 0; i < colStructList.size(); i++)
1001         Convertor::convertColType(&colStructList[i]);
1002 
1003     uint32_t colId = 0;
1004     // MCOL-1675: find the smallest column width to calculate the RowID from so
1005     // that all HWMs will be incremented by this operation
1006     findSmallestColumn(colId, colStructList);
1007 
1008     // rc = checkValid(txnid, colStructList, colValueList, ridList);
1009     // if (rc != NO_ERROR)
1010     //   return rc;
1011 
1012     setTransId(txnid);
1013     uint16_t  dbRoot, segmentNum;
1014     uint32_t partitionNum;
1015     string    segFile;
1016     bool newFile;
1017     TableMetaData* tableMetaData = TableMetaData::makeTableMetaData(tableOid);
1018     //populate colStructList with file information
1019     IDBDataFile* pFile = NULL;
1020     std::vector<DBRootExtentInfo> extentInfo;
1021     int currentDBrootIdx = 0;
1022     std::vector<BRM::CreateStripeColumnExtentsArgOut> extents;
1023 
1024     //--------------------------------------------------------------------------
1025     // For first batch on this PM:
1026     //   o get starting extent from ExtentTracker, and allocate extent if needed
1027     //   o construct colStructList and dctnryStructList accordingly
1028     //   o save extent information in tableMetaData for future use
1029     // If not first batch on this PM:
1030     //   o construct colStructList and dctnryStructList from tableMetaData
1031     //--------------------------------------------------------------------------
1032     if (isFirstBatchPm)
1033     {
1034 		currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx();
1035 		extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList();
1036         dbRoot = extentInfo[currentDBrootIdx].fDbRoot;
1037         partitionNum = extentInfo[currentDBrootIdx].fPartition;
1038 
1039         //----------------------------------------------------------------------
1040         // check whether this extent is the first on this PM
1041         //----------------------------------------------------------------------
1042         if (bFirstExtentOnThisPM)
1043         {
1044             //cout << "bFirstExtentOnThisPM is " << bFirstExtentOnThisPM << endl;
1045             std::vector<BRM::CreateStripeColumnExtentsArgIn> cols;
1046             BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn;
1047 
1048             for (i = 0; i < colStructList.size(); i++)
1049             {
1050                 createStripeColumnExtentsArgIn.oid = colStructList[i].dataOid;
1051                 createStripeColumnExtentsArgIn.width = colStructList[i].colWidth;
1052                 createStripeColumnExtentsArgIn.colDataType = colStructList[i].colDataType;
1053                 cols.push_back(createStripeColumnExtentsArgIn);
1054             }
1055 
1056             rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partitionNum, segmentNum, extents);
1057 
1058             if (rc != NO_ERROR)
1059                 return rc;
1060 
1061             //Create column files
1062             BRM::CPInfoList_t cpinfoList;
1063             BRM::CPInfo cpInfo;
1064 
1065             if (isUnsigned(colStructList[i].colDataType))
1066             {
1067                 cpInfo.max = 0;
1068                 cpInfo.min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
1069             }
1070             else
1071             {
1072                 cpInfo.max = numeric_limits<int64_t>::min();
1073                 cpInfo.min = numeric_limits<int64_t>::max();
1074             }
1075 
1076             cpInfo.seqNum = -1;
1077 
1078             for ( i = 0; i < extents.size(); i++)
1079             {
1080                 colOp = m_colOp[op(colStructList[i].fCompressionType)];
1081                 colOp->initColumn(curCol);
1082 				colOp->setColParam(curCol, colId, colStructList[i].colWidth, colStructList[i].colDataType,
1083                                    colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType,
1084                                    dbRoot, partitionNum, segmentNum);
1085                 rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot,
1086                                          partitionNum, segmentNum, segFile, pFile, newFile);
1087 
1088                 if (rc != NO_ERROR)
1089                     return rc;
1090 
1091                 //mark the extents to invalid
1092                 cpInfo.firstLbid = extents[i].startLbid;
1093                 cpinfoList.push_back(cpInfo);
1094                 colStructList[i].fColPartition = partitionNum;
1095                 colStructList[i].fColSegment = segmentNum;
1096                 colStructList[i].fColDbRoot = dbRoot;
1097                 dctnryStructList[i].fColPartition = partitionNum;
1098                 dctnryStructList[i].fColSegment = segmentNum;
1099                 dctnryStructList[i].fColDbRoot = dbRoot;
1100             }
1101 
1102             //mark the extents to invalid
1103             rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);
1104 
1105             if (rc != NO_ERROR)
1106                 return rc;
1107 
1108             //create corresponding dictionary files
1109             for (i = 0; i < dctnryStructList.size(); i++)
1110             {
1111                 if (dctnryStructList[i].dctnryOid > 0)
1112                 {
1113                     rc = createDctnry(txnid, dctnryStructList[i].dctnryOid, dctnryStructList[i].colWidth, dbRoot, partitionNum,
1114                                       segmentNum, dctnryStructList[i].fCompressionType);
1115 
1116                     if ( rc != NO_ERROR)
1117                         return rc;
1118                 }
1119             }
1120         }    // if ( bFirstExtentOnThisPM)
1121         else // if (!bFirstExtentOnThisPM)
1122         {
1123             std::vector<DBRootExtentInfo> tmpExtentInfo;
1124 
1125             for (i = 0; i < dbRootExtentTrackers.size(); i++)
1126             {
1127                 tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
1128                 colStructList[i].fColPartition =  tmpExtentInfo[currentDBrootIdx].fPartition;
1129                 colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
1130                 colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
1131                 //cout << "Load from dbrootExtenttracker oid:dbroot:part:seg = " <<colStructList[i].dataOid<<":"
1132                 //<<colStructList[i].fColDbRoot<<":"<<colStructList[i].fColPartition<<":"<<colStructList[i].fColSegment<<endl;
1133                 dctnryStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
1134                 dctnryStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
1135                 dctnryStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
1136             }
1137         }
1138 
1139         //----------------------------------------------------------------------
1140         // Save the extents info in tableMetaData
1141         //----------------------------------------------------------------------
1142         for (i = 0; i < colStructList.size(); i++)
1143         {
1144             ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
1145             ColExtsInfo::iterator it = aColExtsInfo.begin();
1146 
1147             while (it != aColExtsInfo.end())
1148             {
1149                 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
1150                     break;
1151 
1152                 it++;
1153             }
1154 
1155             if (it == aColExtsInfo.end()) //add this one to the list
1156             {
1157                 ColExtInfo aExt;
1158                 aExt.dbRoot = colStructList[i].fColDbRoot;
1159                 aExt.partNum = colStructList[i].fColPartition;
1160                 aExt.segNum = colStructList[i].fColSegment;
1161                 aExt.compType = colStructList[i].fCompressionType;
1162                 aExt.isDict = false;
1163 
1164                 if (bFirstExtentOnThisPM)
1165                 {
1166                     aExt.hwm = extents[i].startBlkOffset;
1167                     aExt.isNewExt = true;
1168                     //cout << "adding a ext to metadata" << endl;
1169                 }
1170                 else
1171                 {
1172                     std::vector<DBRootExtentInfo> tmpExtentInfo;
1173                     tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
1174                     aExt.isNewExt = false;
1175                     aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm;
1176                     //cout << "oid " << colStructList[i].dataOid << " gets hwm " << aExt.hwm << endl;
1177                 }
1178 
1179                 aExt.current = true;
1180                 aColExtsInfo.push_back(aExt);
1181                 //cout << "get from extentinfo oid:hwm = " << colStructList[i].dataOid << ":" << aExt.hwm << endl;
1182             }
1183 
1184             tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
1185         }
1186 
1187         for (i = 0; i < dctnryStructList.size(); i++)
1188         {
1189             if (dctnryStructList[i].dctnryOid > 0)
1190             {
1191                 ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
1192                 ColExtsInfo::iterator it = aColExtsInfo.begin();
1193 
1194                 while (it != aColExtsInfo.end())
1195                 {
1196                     if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment))
1197                         break;
1198 
1199                     it++;
1200                 }
1201 
1202                 if (it == aColExtsInfo.end()) //add this one to the list
1203                 {
1204                     ColExtInfo aExt;
1205                     aExt.dbRoot = dctnryStructList[i].fColDbRoot;
1206                     aExt.partNum = dctnryStructList[i].fColPartition;
1207                     aExt.segNum = dctnryStructList[i].fColSegment;
1208                     aExt.compType = dctnryStructList[i].fCompressionType;
1209                     aExt.isDict = true;
1210                     aColExtsInfo.push_back(aExt);
1211                 }
1212 
1213                 tableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
1214             }
1215         }
1216 
1217     } // if (isFirstBatchPm)
1218     else //get the extent info from tableMetaData
1219     {
1220         ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
1221         ColExtsInfo::iterator it = aColExtsInfo.begin();
1222 
1223         while (it != aColExtsInfo.end())
1224         {
1225             if (it->current)
1226                 break;
1227 
1228             it++;
1229         }
1230 
1231         if (it == aColExtsInfo.end())
1232             return 1;
1233 
1234         for (i = 0; i < colStructList.size(); i++)
1235         {
1236             colStructList[i].fColPartition = it->partNum;
1237             colStructList[i].fColSegment = it->segNum;
1238             colStructList[i].fColDbRoot = it->dbRoot;
1239             dctnryStructList[i].fColPartition = it->partNum;
1240             dctnryStructList[i].fColSegment = it->segNum;
1241             dctnryStructList[i].fColDbRoot = it->dbRoot;
1242         }
1243     }
1244 
1245     curTupleList = static_cast<ColTupleList>(colValueList[0]);
1246     totalRow = curTupleList.size();
1247     totalColumns = colStructList.size();
1248     rowIdArray = new RID[totalRow];
1249     // use scoped_array to ensure ptr deletion regardless of where we return
1250     boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
1251     memset(rowIdArray, 0, (sizeof(RID)*totalRow));
1252 
1253     //--------------------------------------------------------------------------
1254     // allocate row id(s)
1255     //--------------------------------------------------------------------------
1256    curColStruct = colStructList[colId];
1257     colOp = m_colOp[op(curColStruct.fCompressionType)];
1258 
1259     colOp->initColumn(curCol);
1260 
1261     //Get the correct segment, partition, column file
1262     vector<ExtentInfo> colExtentInfo; //Save those empty extents in case of failure to rollback
1263     vector<ExtentInfo> dictExtentInfo; //Save those empty extents in case of failure to rollback
1264     vector<ExtentInfo> fileInfo;
1265     dbRoot = curColStruct.fColDbRoot;
1266     //use the first column to calculate row id
1267    ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
1268     ColExtsInfo::iterator it = aColExtsInfo.begin();
1269 
1270     while (it != aColExtsInfo.end())
1271     {
1272 		if ((it->dbRoot == colStructList[colId].fColDbRoot) &&
1273             (it->partNum == colStructList[colId].fColPartition) &&
1274             (it->segNum == colStructList[colId].fColSegment) && it->current )
1275         {
1276             break;
1277         }
1278         it++;
1279     }
1280 
1281     if (it != aColExtsInfo.end())
1282     {
1283         hwm = it->hwm;
1284 		//cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl;
1285     }
1286 
1287     oldHwm = hwm; //Save this info for rollback
1288     //need to pass real dbRoot, partition, and segment to setColParam
1289    colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType,
1290                        curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
1291                        curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment);
1292     rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file
1293 
1294     if (rc != NO_ERROR)
1295     {
1296         return rc;
1297     }
1298 
1299     //get hwm first
1300     // @bug 286 : fix for bug 286 - correct the typo in getHWM
1301     //RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm));
1302 
1303     Column newCol;
1304 
1305 #ifdef PROFILE
1306     timer.start("allocRowId");
1307 #endif
1308     newColStructList = colStructList;
1309     newDctnryStructList = dctnryStructList;
1310     bool bUseStartExtent = true;
1311 
1312     if (idbdatafile::IDBPolicy::useHdfs())
1313         insertSelect = true;
1314 
1315     rc = colOp->allocRowId(txnid, bUseStartExtent,
1316                            curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile,
1317                            newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm);
1318 
1319     //cout << "after allocrowid, total row = " <<totalRow << " newExtent is " << newExtent << endl;
1320     //cout << "column oid " << curColStruct.dataOid << " has hwm:newHwm = " << hwm <<":" << newHwm<< endl;
1321     if (rc != NO_ERROR) //Clean up is already done
1322         return rc;
1323 
1324 #ifdef PROFILE
1325     timer.stop("allocRowId");
1326 #endif
1327 
1328     //--------------------------------------------------------------------------
1329     // Expand initial abbreviated extent if any RID in 1st extent is > 256K.
1330     // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
1331     //--------------------------------------------------------------------------
1332     // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
1333     if ((curCol.dataFile.fPartition == 0) &&
1334             (curCol.dataFile.fSegment   == 0) &&
1335             ((totalRow - rowsLeft) > 0) &&
1336             (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
1337     {
1338        for (unsigned k=0; k<colStructList.size(); k++)
1339         {
1340            // Skip the selected column
1341            if (k == colId)
1342                continue;
1343             Column expandCol;
1344             colOp = m_colOp[op(colStructList[k].fCompressionType)];
1345             colOp->setColParam(expandCol, 0,
1346                                colStructList[k].colWidth,
1347                                colStructList[k].colDataType,
1348                                colStructList[k].colType,
1349                                colStructList[k].dataOid,
1350                                colStructList[k].fCompressionType,
1351                                colStructList[k].fColDbRoot,
1352                                colStructList[k].fColPartition,
1353                                colStructList[k].fColSegment);
1354             rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file
1355 
1356             if (rc == NO_ERROR)
1357             {
1358                 if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
1359                 {
1360                     rc = colOp->expandAbbrevExtent(expandCol);
1361                 }
1362             }
1363 
1364             if (rc != NO_ERROR)
1365             {
1366                 return rc;
1367             }
1368 
1369             colOp->clearColumn(expandCol); // closes the file (if uncompressed)
1370         }
1371     }
1372 
1373     //--------------------------------------------------------------------------
1374     // Tokenize data if needed
1375     //--------------------------------------------------------------------------
1376     if (insertSelect && isAutoCommitOn)
1377         BRMWrapper::setUseVb( false );
1378     else
1379         BRMWrapper::setUseVb( true );
1380 
1381     dictStr::iterator dctStr_iter;
1382     ColTupleList::iterator col_iter;
1383 
1384     for (i = 0; i < colStructList.size(); i++)
1385     {
1386         if (colStructList[i].tokenFlag)
1387         {
1388             dctStr_iter = dictStrList[i].begin();
1389             col_iter = colValueList[i].begin();
1390             Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
1391             rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
1392                                     dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition,
1393                                     dctnryStructList[i].fColSegment,
1394                                     useTmpSuffix); // @bug 5572 HDFS tmp file
1395 
1396             if (rc != NO_ERROR)
1397             {
1398                 cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid << endl;
1399                 return rc;
1400             }
1401 
1402             for (uint32_t     rows = 0; rows < (totalRow - rowsLeft); rows++)
1403             {
1404                 if (dctStr_iter->length() == 0)
1405                 {
1406                     Token nullToken;
1407                     col_iter->data = nullToken;
1408                 }
1409                 else
1410                 {
1411 #ifdef PROFILE
1412                     timer.start("tokenize");
1413 #endif
1414                     DctnryTuple dctTuple;
1415                     dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
1416                     dctTuple.sigSize = dctStr_iter->length();
1417                     dctTuple.isNull = false;
1418                     rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
1419 
1420                     if (rc != NO_ERROR)
1421                     {
1422                         dctnry->closeDctnry();
1423                         return rc;
1424                     }
1425 
1426 #ifdef PROFILE
1427                     timer.stop("tokenize");
1428 #endif
1429                     col_iter->data = dctTuple.token;
1430                 }
1431 
1432                 dctStr_iter++;
1433                 col_iter++;
1434 
1435             }
1436 
1437             //close dictionary files
1438             rc = dctnry->closeDctnry(false);
1439 
1440             if (rc != NO_ERROR)
1441                 return rc;
1442 
1443             if (newExtent)
1444             {
1445                 //@Bug 4854 back up hwm chunk for the file to be modified
1446                 if (fRBMetaWriter)
1447                     fRBMetaWriter->backupDctnryHWMChunk(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment);
1448 
1449                 rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid,
1450                                         newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition,
1451                                         newDctnryStructList[i].fColSegment,
1452                                         false); // @bug 5572 HDFS tmp file
1453 
1454                 if (rc != NO_ERROR)
1455                     return rc;
1456 
1457                 for (uint32_t     rows = 0; rows < rowsLeft; rows++)
1458                 {
1459                     if (dctStr_iter->length() == 0)
1460                     {
1461                         Token nullToken;
1462                         col_iter->data = nullToken;
1463                     }
1464                     else
1465                     {
1466 #ifdef PROFILE
1467                         timer.start("tokenize");
1468 #endif
1469                         DctnryTuple dctTuple;
1470                         dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
1471                         dctTuple.sigSize = dctStr_iter->length();
1472                         dctTuple.isNull = false;
1473                         rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
1474 
1475                         if (rc != NO_ERROR)
1476                         {
1477                             dctnry->closeDctnry();
1478                             return rc;
1479                         }
1480 
1481 #ifdef PROFILE
1482                         timer.stop("tokenize");
1483 #endif
1484                         col_iter->data = dctTuple.token;
1485                     }
1486 
1487                     dctStr_iter++;
1488                     col_iter++;
1489                 }
1490 
1491                 //close dictionary files
1492                 rc = dctnry->closeDctnry(false);
1493 
1494                 if (rc != NO_ERROR)
1495                     return rc;
1496             }
1497         }
1498     }
1499 
1500     if (insertSelect && isAutoCommitOn)
1501         BRMWrapper::setUseVb( false );
1502     else
1503         BRMWrapper::setUseVb( true );
1504 
1505     //--------------------------------------------------------------------------
1506     // Update column info structure @Bug 1862 set hwm, and
1507     // Prepare ValueList for new extent (if applicable)
1508     //--------------------------------------------------------------------------
1509     //@Bug 2205 Check whether all rows go to the new extent
1510     RID lastRid = 0;
1511     RID lastRidNew = 0;
1512 
1513     if (totalRow - rowsLeft > 0)
1514     {
1515         lastRid = rowIdArray[totalRow - rowsLeft - 1];
1516         lastRidNew = rowIdArray[totalRow - 1];
1517     }
1518     else
1519     {
1520         lastRid = 0;
1521         lastRidNew = rowIdArray[totalRow - 1];
1522     }
1523 
1524     //cout << "rowid allocated is "  << lastRid << endl;
1525     //if a new extent is created, all the columns in this table should have their own new extent
1526     //First column already processed
1527 
1528     //@Bug 1701. Close the file (if uncompressed)
1529     m_colOp[op(curCol.compressionType)]->clearColumn(curCol);
1530     //cout << "Saving hwm info for new ext batch" << endl;
1531     //Update hwm to set them in the end
1532     bool succFlag = false;
1533     unsigned colWidth = 0;
1534     int      curFbo = 0, curBio;
1535 
1536     for (i = 0; i < totalColumns; i++)
1537     {
1538         //shoud be obtained from saved hwm
1539         aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
1540         it = aColExtsInfo.begin();
1541 
1542         while (it != aColExtsInfo.end())
1543         {
1544             if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition)
1545                     && (it->segNum == colStructList[i].fColSegment) && it->current)
1546                 break;
1547 
1548             it++;
1549         }
1550 
1551         if (it != aColExtsInfo.end()) //update hwm info
1552         {
1553             oldHwm = it->hwm;
1554         }
1555 
1556         // save hwm for the old extent
1557         colWidth = colStructList[i].colWidth;
1558         succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
1559 
1560         //cout << "insertcolumnrec   oid:rid:fbo:oldhwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << oldHwm << endl;
1561         if (succFlag)
1562         {
1563             if ((HWM)curFbo >= oldHwm)
1564             {
1565                 it->hwm = (HWM)curFbo;
1566             }
1567 
1568             //@Bug 4947. set current to false for old extent.
1569             if (newExtent)
1570             {
1571                 it->current = false;
1572             }
1573 
1574             //cout << "updated old ext info for oid " << colStructList[i].dataOid << " dbroot:part:seg:hwm:current = "
1575             //<< it->dbRoot<<":"<<it->partNum<<":"<<it->segNum<<":"<<it->hwm<<":"<< it->current<< " and newExtent is " << newExtent << endl;
1576         }
1577         else
1578             return ERR_INVALID_PARAM;
1579 
1580         //update hwm for the new extent
1581         if (newExtent)
1582         {
1583             it = aColExtsInfo.begin();
1584 
1585             while (it != aColExtsInfo.end())
1586             {
1587                 if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition)
1588                         && (it->segNum == newColStructList[i].fColSegment) && it->current)
1589                     break;
1590 
1591                 it++;
1592             }
1593 
1594             succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
1595 
1596             if (succFlag)
1597             {
1598                 if (it != aColExtsInfo.end())
1599                 {
1600                     it->hwm = (HWM)curFbo;
1601                     //cout << "setting hwm to " << (int)curFbo <<" for seg " <<it->segNum << endl;
1602                     it->current = true;
1603                 }
1604             }
1605             else
1606                 return ERR_INVALID_PARAM;
1607         }
1608 
1609         tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
1610     }
1611 
1612     //--------------------------------------------------------------------------
1613     //Prepare the valuelist for the new extent
1614     //--------------------------------------------------------------------------
1615     ColTupleList colTupleList;
1616     ColTupleList newColTupleList;
1617     ColTupleList firstPartTupleList;
1618 
1619     for (unsigned i = 0; i < totalColumns; i++)
1620     {
1621         colTupleList = static_cast<ColTupleList>(colValueList[i]);
1622 
1623         for (uint64_t j = rowsLeft; j > 0; j--)
1624         {
1625             newColTupleList.push_back(colTupleList[totalRow - j]);
1626         }
1627 
1628         colNewValueList.push_back(newColTupleList);
1629         newColTupleList.clear();
1630 
1631         //upate the oldvalue list for the old extent
1632         for (uint64_t j = 0; j < (totalRow - rowsLeft); j++)
1633         {
1634             firstPartTupleList.push_back(colTupleList[j]);
1635         }
1636 
1637         colOldValueList.push_back(firstPartTupleList);
1638         firstPartTupleList.clear();
1639     }
1640 
1641     // end of allocate row id
1642 
1643 #ifdef PROFILE
1644     timer.start("writeColumnRec");
1645 #endif
1646 //cout << "Writing column record" << endl;
1647 
1648     if (rc == NO_ERROR)
1649     {
1650         //----------------------------------------------------------------------
1651         //Mark extents invalid
1652         //----------------------------------------------------------------------
1653         vector<BRM::LBID_t> lbids;
1654         vector<CalpontSystemCatalog::ColDataType> colDataTypes;
1655         bool successFlag = true;
1656         unsigned width = 0;
1657         int         curFbo = 0, curBio, lastFbo = -1;
1658 
1659         if (isFirstBatchPm && (totalRow == rowsLeft))
1660         {}
1661         else
1662         {
1663             for (unsigned i = 0; i < colStructList.size(); i++)
1664             {
1665                 colOp = m_colOp[op(colStructList[i].fCompressionType)];
1666                 width = colStructList[i].colWidth;
1667                 successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
1668 
1669                 if (successFlag)
1670                 {
1671                     if (curFbo != lastFbo)
1672                     {
1673                         RETURN_ON_ERROR(AddLBIDtoList(txnid,
1674                                                       lbids,
1675                                                       colDataTypes,
1676                                                       colStructList[i],
1677                                                       curFbo));
1678                     }
1679                 }
1680             }
1681         }
1682 
1683         if (lbids.size() > 0)
1684             rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
1685 
1686         //----------------------------------------------------------------------
1687         // Write row(s) to database file(s)
1688         //----------------------------------------------------------------------
1689         rc = writeColumnRec(txnid, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix); // @bug 5572 HDFS tmp file
1690 
1691         if (rc == NO_ERROR)
1692         {
1693             if (dctnryStructList.size() > 0)
1694             {
1695                 vector<BRM::OID_t> oids {static_cast<int32_t>(tableOid)};
1696                 for (const DctnryStruct &dctnryStruct : dctnryStructList)
1697                 {
1698                     oids.push_back(dctnryStruct.dctnryOid);
1699                 }
1700 
1701                 rc = flushOIDsFromCache(oids);
1702 
1703                 if (rc != 0)
1704                     rc = ERR_BLKCACHE_FLUSH_LIST; // translate to WE error
1705             }
1706        }
1707     }
1708 
1709     return rc;
1710 }
1711 
insertColumnRecsBinary(const TxnID & txnid,ColStructList & colStructList,std::vector<uint64_t> & colValueList,DctnryStructList & dctnryStructList,DictStrList & dictStrList,std::vector<boost::shared_ptr<DBRootExtentTracker>> & dbRootExtentTrackers,RBMetaWriter * fRBMetaWriter,bool bFirstExtentOnThisPM,bool insertSelect,bool isAutoCommitOn,OID tableOid,bool isFirstBatchPm)1712 int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
1713         ColStructList& colStructList,
1714         std::vector<uint64_t>& colValueList,
1715         DctnryStructList& dctnryStructList,
1716         DictStrList& dictStrList,
1717         std::vector<boost::shared_ptr<DBRootExtentTracker> >& dbRootExtentTrackers,
1718         RBMetaWriter* fRBMetaWriter,
1719         bool bFirstExtentOnThisPM,
1720         bool insertSelect,
1721         bool isAutoCommitOn,
1722         OID tableOid,
1723         bool isFirstBatchPm)
1724 {
1725     int            rc;
1726     RID*           rowIdArray = NULL;
1727     Column         curCol;
1728     ColStruct      curColStruct;
1729     ColStructList  newColStructList;
1730     std::vector<uint64_t> colNewValueList;
1731     DctnryStructList newDctnryStructList;
1732     HWM            hwm = 0;
1733     HWM            oldHwm = 0;
1734     HWM    		  newHwm = 0;
1735     size_t totalRow;
1736     ColStructList::size_type totalColumns;
1737     uint64_t rowsLeft = 0;
1738     bool newExtent = false;
1739     RIDList ridList;
1740     ColumnOp* colOp = NULL;
1741     std::vector<BRM::LBID_t> dictLbids;
1742 
1743     // Set tmp file suffix to modify HDFS db file
1744     bool           useTmpSuffix = false;
1745 
1746     if (idbdatafile::IDBPolicy::useHdfs())
1747     {
1748         if (!bFirstExtentOnThisPM)
1749             useTmpSuffix = true;
1750     }
1751 
1752     unsigned i = 0;
1753 #ifdef PROFILE
1754     StopWatch timer;
1755 #endif
1756 
1757     //Convert data type and column width to write engine specific
1758     for (i = 0; i < colStructList.size(); i++)
1759         Convertor::convertColType(&colStructList[i]);
1760 
1761     uint32_t colId = 0;
1762     // MCOL-1675: find the smallest column width to calculate the RowID from so
1763     // that all HWMs will be incremented by this operation
1764     findSmallestColumn(colId, colStructList);
1765 
1766 
1767     // rc = checkValid(txnid, colStructList, colValueList, ridList);
1768     // if (rc != NO_ERROR)
1769     //   return rc;
1770 
1771     setTransId(txnid);
1772     uint16_t  dbRoot, segmentNum;
1773     uint32_t partitionNum;
1774     string    segFile;
1775     bool newFile;
1776     TableMetaData* tableMetaData = TableMetaData::makeTableMetaData(tableOid);
1777     //populate colStructList with file information
1778     IDBDataFile* pFile = NULL;
1779     std::vector<DBRootExtentInfo> extentInfo;
1780     int currentDBrootIdx = 0;
1781     std::vector<BRM::CreateStripeColumnExtentsArgOut> extents;
1782 
1783     //--------------------------------------------------------------------------
1784     // For first batch on this PM:
1785     //   o get starting extent from ExtentTracker, and allocate extent if needed
1786     //   o construct colStructList and dctnryStructList accordingly
1787     //   o save extent information in tableMetaData for future use
1788     // If not first batch on this PM:
1789     //   o construct colStructList and dctnryStructList from tableMetaData
1790     //--------------------------------------------------------------------------
1791     if (isFirstBatchPm)
1792     {
1793         currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx();
1794         extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList();
1795         dbRoot = extentInfo[currentDBrootIdx].fDbRoot;
1796         partitionNum = extentInfo[currentDBrootIdx].fPartition;
1797 
1798         //----------------------------------------------------------------------
1799         // check whether this extent is the first on this PM
1800         //----------------------------------------------------------------------
1801         if (bFirstExtentOnThisPM)
1802         {
1803             //cout << "bFirstExtentOnThisPM is " << bFirstExtentOnThisPM << endl;
1804             std::vector<BRM::CreateStripeColumnExtentsArgIn> cols;
1805             BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn;
1806 
1807             for (i = 0; i < colStructList.size(); i++)
1808             {
1809                 createStripeColumnExtentsArgIn.oid = colStructList[i].dataOid;
1810                 createStripeColumnExtentsArgIn.width = colStructList[i].colWidth;
1811                 createStripeColumnExtentsArgIn.colDataType = colStructList[i].colDataType;
1812                 cols.push_back(createStripeColumnExtentsArgIn);
1813             }
1814 
1815             rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partitionNum, segmentNum, extents);
1816 
1817             if (rc != NO_ERROR)
1818                 return rc;
1819 
1820             //Create column files
1821             BRM::CPInfoList_t cpinfoList;
1822             BRM::CPInfo cpInfo;
1823 
1824             if (isUnsigned(colStructList[i].colDataType))
1825             {
1826                 cpInfo.max = 0;
1827                 cpInfo.min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
1828             }
1829             else
1830             {
1831                 cpInfo.max = numeric_limits<int64_t>::min();
1832                 cpInfo.min = numeric_limits<int64_t>::max();
1833             }
1834 
1835             cpInfo.seqNum = -1;
1836 
1837             for ( i = 0; i < extents.size(); i++)
1838             {
1839                 colOp = m_colOp[op(colStructList[i].fCompressionType)];
1840                 colOp->initColumn(curCol);
1841                 colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType,
1842                                    colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType,
1843                                    dbRoot, partitionNum, segmentNum);
1844                 rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot,
1845                                          partitionNum, segmentNum, segFile, pFile, newFile);
1846 
1847                 if (rc != NO_ERROR)
1848                     return rc;
1849 
1850                 //mark the extents to invalid
1851                 cpInfo.firstLbid = extents[i].startLbid;
1852                 cpinfoList.push_back(cpInfo);
1853                 colStructList[i].fColPartition = partitionNum;
1854                 colStructList[i].fColSegment = segmentNum;
1855                 colStructList[i].fColDbRoot = dbRoot;
1856                 dctnryStructList[i].fColPartition = partitionNum;
1857                 dctnryStructList[i].fColSegment = segmentNum;
1858                 dctnryStructList[i].fColDbRoot = dbRoot;
1859             }
1860 
1861             //mark the extents to invalid
1862             rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);
1863 
1864             if (rc != NO_ERROR)
1865                 return rc;
1866 
1867             //create corresponding dictionary files
1868             for (i = 0; i < dctnryStructList.size(); i++)
1869             {
1870                 if (dctnryStructList[i].dctnryOid > 0)
1871                 {
1872                     rc = createDctnry(txnid, dctnryStructList[i].dctnryOid, dctnryStructList[i].colWidth, dbRoot, partitionNum,
1873                                       segmentNum, dctnryStructList[i].fCompressionType);
1874 
1875                     if ( rc != NO_ERROR)
1876                         return rc;
1877                 }
1878             }
1879         }    // if ( bFirstExtentOnThisPM)
1880         else // if (!bFirstExtentOnThisPM)
1881         {
1882             std::vector<DBRootExtentInfo> tmpExtentInfo;
1883 
1884             for (i = 0; i < dbRootExtentTrackers.size(); i++)
1885             {
1886                 tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
1887                 colStructList[i].fColPartition =  tmpExtentInfo[currentDBrootIdx].fPartition;
1888                 colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
1889                 colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
1890                 //cout << "Load from dbrootExtenttracker oid:dbroot:part:seg = " <<colStructList[i].dataOid<<":"
1891                 //<<colStructList[i].fColDbRoot<<":"<<colStructList[i].fColPartition<<":"<<colStructList[i].fColSegment<<endl;
1892                 dctnryStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
1893                 dctnryStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
1894                 dctnryStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
1895             }
1896         }
1897 
1898         //----------------------------------------------------------------------
1899         // Save the extents info in tableMetaData
1900         //----------------------------------------------------------------------
1901         for (i = 0; i < colStructList.size(); i++)
1902         {
1903             ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
1904             ColExtsInfo::iterator it = aColExtsInfo.begin();
1905 
1906             while (it != aColExtsInfo.end())
1907             {
1908                 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
1909                     break;
1910 
1911                 it++;
1912             }
1913 
1914             if (it == aColExtsInfo.end()) //add this one to the list
1915             {
1916                 ColExtInfo aExt;
1917                 aExt.dbRoot = colStructList[i].fColDbRoot;
1918                 aExt.partNum = colStructList[i].fColPartition;
1919                 aExt.segNum = colStructList[i].fColSegment;
1920                 aExt.compType = colStructList[i].fCompressionType;
1921                 aExt.isDict = false;
1922 
1923                 if (bFirstExtentOnThisPM)
1924                 {
1925                     aExt.hwm = extents[i].startBlkOffset;
1926                     aExt.isNewExt = true;
1927                     //cout << "adding a ext to metadata" << endl;
1928                 }
1929                 else
1930                 {
1931                     std::vector<DBRootExtentInfo> tmpExtentInfo;
1932                     tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
1933                     aExt.isNewExt = false;
1934                     aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm;
1935                     //cout << "oid " << colStructList[i].dataOid << " gets hwm " << aExt.hwm << endl;
1936                 }
1937 
1938                 aExt.current = true;
1939                 aColExtsInfo.push_back(aExt);
1940                 //cout << "get from extentinfo oid:hwm = " << colStructList[i].dataOid << ":" << aExt.hwm << endl;
1941             }
1942 
1943             tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
1944         }
1945 
1946         for (i = 0; i < dctnryStructList.size(); i++)
1947         {
1948             if (dctnryStructList[i].dctnryOid > 0)
1949             {
1950                 ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
1951                 ColExtsInfo::iterator it = aColExtsInfo.begin();
1952 
1953                 while (it != aColExtsInfo.end())
1954                 {
1955                     if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment))
1956                         break;
1957 
1958                     it++;
1959                 }
1960 
1961                 if (it == aColExtsInfo.end()) //add this one to the list
1962                 {
1963                     ColExtInfo aExt;
1964                     aExt.dbRoot = dctnryStructList[i].fColDbRoot;
1965                     aExt.partNum = dctnryStructList[i].fColPartition;
1966                     aExt.segNum = dctnryStructList[i].fColSegment;
1967                     aExt.compType = dctnryStructList[i].fCompressionType;
1968                     aExt.isDict = true;
1969                     aColExtsInfo.push_back(aExt);
1970                 }
1971 
1972                 tableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
1973             }
1974         }
1975 
1976     } // if (isFirstBatchPm)
1977     else //get the extent info from tableMetaData
1978     {
1979         ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
1980         ColExtsInfo::iterator it = aColExtsInfo.begin();
1981 
1982         while (it != aColExtsInfo.end())
1983         {
1984             if (it->current)
1985                 break;
1986 
1987             it++;
1988         }
1989 
1990         if (it == aColExtsInfo.end())
1991             return 1;
1992 
1993         for (i = 0; i < colStructList.size(); i++)
1994         {
1995             colStructList[i].fColPartition = it->partNum;
1996             colStructList[i].fColSegment = it->segNum;
1997             colStructList[i].fColDbRoot = it->dbRoot;
1998             dctnryStructList[i].fColPartition = it->partNum;
1999             dctnryStructList[i].fColSegment = it->segNum;
2000             dctnryStructList[i].fColDbRoot = it->dbRoot;
2001         }
2002     }
2003 
2004     totalColumns = colStructList.size();
2005     totalRow = colValueList.size() / totalColumns;
2006     rowIdArray = new RID[totalRow];
2007     // use scoped_array to ensure ptr deletion regardless of where we return
2008     boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
2009     memset(rowIdArray, 0, (sizeof(RID)*totalRow));
2010 
2011     //--------------------------------------------------------------------------
2012     // allocate row id(s)
2013     //--------------------------------------------------------------------------
2014 
2015     curColStruct = colStructList[colId];
2016 
2017     colOp = m_colOp[op(curColStruct.fCompressionType)];
2018 
2019     colOp->initColumn(curCol);
2020 
2021     //Get the correct segment, partition, column file
2022     vector<ExtentInfo> colExtentInfo; //Save those empty extents in case of failure to rollback
2023     vector<ExtentInfo> dictExtentInfo; //Save those empty extents in case of failure to rollback
2024     vector<ExtentInfo> fileInfo;
2025     dbRoot = curColStruct.fColDbRoot;
2026     //use the smallest column to calculate row id
2027     ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
2028     ColExtsInfo::iterator it = aColExtsInfo.begin();
2029 
2030     while (it != aColExtsInfo.end())
2031     {
2032         if ((it->dbRoot == colStructList[colId].fColDbRoot) && (it->partNum == colStructList[colId].fColPartition) && (it->segNum == colStructList[colId].fColSegment) && it->current )
2033             break;
2034 
2035         it++;
2036     }
2037 
2038     if (it != aColExtsInfo.end())
2039     {
2040         hwm = it->hwm;
2041         //cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and seg is " << colStructList[colId].fColSegment << endl;
2042     }
2043 
2044     oldHwm = hwm; //Save this info for rollback
2045     //need to pass real dbRoot, partition, and segment to setColParam
2046     colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType,
2047                        curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
2048                        curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment);
2049     rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file
2050 
2051     if (rc != NO_ERROR)
2052     {
2053         return rc;
2054     }
2055 
2056     //get hwm first
2057     // @bug 286 : fix for bug 286 - correct the typo in getHWM
2058     //RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm));
2059 
2060     Column newCol;
2061 
2062 #ifdef PROFILE
2063     timer.start("allocRowId");
2064 #endif
2065     newColStructList = colStructList;
2066     newDctnryStructList = dctnryStructList;
2067     bool bUseStartExtent = true;
2068 
2069     if (idbdatafile::IDBPolicy::useHdfs())
2070         insertSelect = true;
2071 
2072     rc = colOp->allocRowId(txnid, bUseStartExtent,
2073                            curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile,
2074                            newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm);
2075 
2076     //cout << "after allocrowid, total row = " <<totalRow << " newExtent is " << newExtent << endl;
2077     // cout << "column oid " << curColStruct.dataOid << " has hwm:newHwm = " << hwm <<":" << newHwm<< endl;
2078     if (rc != NO_ERROR) //Clean up is already done
2079         return rc;
2080 
2081 #ifdef PROFILE
2082     timer.stop("allocRowId");
2083 #endif
2084 
2085     //--------------------------------------------------------------------------
2086     // Expand initial abbreviated extent if any RID in 1st extent is > 256K.
2087     // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
2088     //--------------------------------------------------------------------------
2089     // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
2090     if ((curCol.dataFile.fPartition == 0) &&
2091             (curCol.dataFile.fSegment   == 0) &&
2092             ((totalRow - rowsLeft) > 0) &&
2093             (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
2094     {
2095         for (size_t k = 0; k < colStructList.size(); k++)
2096         {
2097             // Skip the selected column
2098             if (k == (size_t)colId)
2099                 continue;
2100 
2101             Column expandCol;
2102            colOp = m_colOp[op(colStructList[k].fCompressionType)];
2103            // Shouldn't we change 0 to colId here?
2104             colOp->setColParam(expandCol, 0,
2105                                colStructList[k].colWidth,
2106                                colStructList[k].colDataType,
2107                                colStructList[k].colType,
2108                                colStructList[k].dataOid,
2109                                colStructList[k].fCompressionType,
2110                                colStructList[k].fColDbRoot,
2111                                colStructList[k].fColPartition,
2112                                colStructList[k].fColSegment);
2113             rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file
2114 
2115             if (rc == NO_ERROR)
2116             {
2117                 if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
2118                 {
2119                     rc = colOp->expandAbbrevExtent(expandCol);
2120                 }
2121             }
2122 
2123             if (rc != NO_ERROR)
2124             {
2125                 return rc;
2126             }
2127 
2128             colOp->closeColumnFile(expandCol);
2129         }
2130     }
2131 
2132     //--------------------------------------------------------------------------
2133     // Tokenize data if needed
2134     //--------------------------------------------------------------------------
2135     if (insertSelect && isAutoCommitOn)
2136         BRMWrapper::setUseVb( false );
2137     else
2138         BRMWrapper::setUseVb( true );
2139 
2140     dictStr::iterator dctStr_iter;
2141     uint64_t* colValPtr;
2142     size_t rowsPerColumn = colValueList.size() / colStructList.size();
2143 
2144     for (i = 0; i < colStructList.size(); i++)
2145     {
2146         if (colStructList[i].tokenFlag)
2147         {
2148             dctStr_iter = dictStrList[i].begin();
2149             Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
2150             rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
2151                                     dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition,
2152                                     dctnryStructList[i].fColSegment,
2153                                     useTmpSuffix); // @bug 5572 HDFS tmp file
2154 
2155             if (rc != NO_ERROR)
2156             {
2157                 cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid << endl;
2158                 return rc;
2159             }
2160 
2161             for (uint32_t     rows = 0; rows < (totalRow - rowsLeft); rows++)
2162             {
2163                 colValPtr = &colValueList[(i * rowsPerColumn) + rows];
2164 
2165                 if (dctStr_iter->length() == 0)
2166                 {
2167                     Token nullToken;
2168                     memcpy(colValPtr, &nullToken, 8);
2169                 }
2170                 else
2171                 {
2172 #ifdef PROFILE
2173                     timer.start("tokenize");
2174 #endif
2175                     DctnryTuple dctTuple;
2176                     dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
2177                     dctTuple.sigSize = dctStr_iter->length();
2178                     dctTuple.isNull = false;
2179                     rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
2180 
2181                     if (rc != NO_ERROR)
2182                     {
2183                         dctnry->closeDctnry();
2184                         return rc;
2185                     }
2186 
2187 #ifdef PROFILE
2188                     timer.stop("tokenize");
2189 #endif
2190                     memcpy(colValPtr, &dctTuple.token, 8);
2191                     dictLbids.push_back(dctTuple.token.fbo);
2192                 }
2193 
2194                 dctStr_iter++;
2195 
2196             }
2197 
2198             //close dictionary files
2199             rc = dctnry->closeDctnry(false);
2200 
2201             if (rc != NO_ERROR)
2202                 return rc;
2203 
2204             if (newExtent)
2205             {
2206                 //@Bug 4854 back up hwm chunk for the file to be modified
2207                 if (fRBMetaWriter)
2208                     fRBMetaWriter->backupDctnryHWMChunk(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment);
2209 
2210                 rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid,
2211                                         newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition,
2212                                         newDctnryStructList[i].fColSegment,
2213                                         false); // @bug 5572 HDFS tmp file
2214 
2215                 if (rc != NO_ERROR)
2216                     return rc;
2217 
2218                 for (uint32_t     rows = 0; rows < rowsLeft; rows++)
2219                 {
2220                     colValPtr = &colValueList[(i * rowsPerColumn) + rows];
2221 
2222                     if (dctStr_iter->length() == 0)
2223                     {
2224                         Token nullToken;
2225                         memcpy(colValPtr, &nullToken, 8);
2226                     }
2227                     else
2228                     {
2229 #ifdef PROFILE
2230                         timer.start("tokenize");
2231 #endif
2232                         DctnryTuple dctTuple;
2233                         dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
2234                         dctTuple.sigSize = dctStr_iter->length();
2235                         dctTuple.isNull = false;
2236                         rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
2237 
2238                         if (rc != NO_ERROR)
2239                         {
2240                             dctnry->closeDctnry();
2241                             return rc;
2242                         }
2243 
2244 #ifdef PROFILE
2245                         timer.stop("tokenize");
2246 #endif
2247                         memcpy(colValPtr, &dctTuple.token, 8);
2248                         dictLbids.push_back(dctTuple.token.fbo);
2249                     }
2250 
2251                     dctStr_iter++;
2252                 }
2253 
2254                 //close dictionary files
2255                 rc = dctnry->closeDctnry(false);
2256 
2257                 if (rc != NO_ERROR)
2258                     return rc;
2259             }
2260         }
2261     }
2262 
2263     if (insertSelect && isAutoCommitOn)
2264         BRMWrapper::setUseVb( false );
2265     else
2266         BRMWrapper::setUseVb( true );
2267 
2268     //--------------------------------------------------------------------------
2269     // Update column info structure @Bug 1862 set hwm, and
2270     // Prepare ValueList for new extent (if applicable)
2271     //--------------------------------------------------------------------------
2272     //@Bug 2205 Check whether all rows go to the new extent
2273     RID lastRid = 0;
2274     RID lastRidNew = 0;
2275 
2276     if (totalRow - rowsLeft > 0)
2277     {
2278         lastRid = rowIdArray[totalRow - rowsLeft - 1];
2279         lastRidNew = rowIdArray[totalRow - 1];
2280     }
2281     else
2282     {
2283         lastRid = 0;
2284         lastRidNew = rowIdArray[totalRow - 1];
2285     }
2286 
2287     //cout << "rowid allocated is "  << lastRid << endl;
2288     //if a new extent is created, all the columns in this table should have their own new extent
2289     //First column already processed
2290 
2291     //@Bug 1701. Close the file (if uncompressed)
2292     m_colOp[op(curCol.compressionType)]->closeColumnFile(curCol);
2293     //cout << "Saving hwm info for new ext batch" << endl;
2294     //Update hwm to set them in the end
2295     bool succFlag = false;
2296     unsigned colWidth = 0;
2297     int      curFbo = 0, curBio;
2298 
2299     for (i = 0; i < totalColumns; i++)
2300     {
2301         //shoud be obtained from saved hwm
2302         aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
2303         it = aColExtsInfo.begin();
2304 
2305         while (it != aColExtsInfo.end())
2306         {
2307             if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition)
2308                     && (it->segNum == colStructList[i].fColSegment) && it->current)
2309                 break;
2310 
2311             it++;
2312         }
2313 
2314         if (it != aColExtsInfo.end()) //update hwm info
2315         {
2316             oldHwm = it->hwm;
2317 
2318             // save hwm for the old extent
2319             colWidth = colStructList[i].colWidth;
2320             succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
2321 
2322             //cout << "insertcolumnrec   oid:rid:fbo:oldhwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << oldHwm << endl;
2323             if (succFlag)
2324             {
2325                 if ((HWM)curFbo >= oldHwm)
2326                 {
2327                     it->hwm = (HWM)curFbo;
2328                 }
2329 
2330                 //@Bug 4947. set current to false for old extent.
2331                 if (newExtent)
2332                 {
2333                     it->current = false;
2334                 }
2335 
2336                 //cout << "updated old ext info for oid " << colStructList[i].dataOid << " dbroot:part:seg:hwm:current = "
2337                 //<< it->dbRoot<<":"<<it->partNum<<":"<<it->segNum<<":"<<it->hwm<<":"<< it->current<< " and newExtent is " << newExtent << endl;
2338             }
2339             else
2340                 return ERR_INVALID_PARAM;
2341 
2342         }
2343 
2344         //update hwm for the new extent
2345         if (newExtent)
2346         {
2347             it = aColExtsInfo.begin();
2348 
2349             while (it != aColExtsInfo.end())
2350             {
2351                 if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition)
2352                         && (it->segNum == newColStructList[i].fColSegment) && it->current)
2353                     break;
2354 
2355                 it++;
2356             }
2357 
2358             colWidth = newColStructList[i].colWidth;
2359             succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
2360 
2361             if (succFlag)
2362             {
2363                 if (it != aColExtsInfo.end())
2364                 {
2365                     it->hwm = (HWM)curFbo;
2366                     //cout << "setting hwm to " << (int)curFbo <<" for seg " <<it->segNum << endl;
2367                     it->current = true;
2368                 }
2369             }
2370             else
2371                 return ERR_INVALID_PARAM;
2372         }
2373 
2374         tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
2375     }
2376 
2377     //--------------------------------------------------------------------------
2378     //Prepare the valuelist for the new extent
2379     //--------------------------------------------------------------------------
2380 
2381     for (unsigned i = 1; i <= totalColumns; i++)
2382     {
2383         // Copy values to second value list
2384         for (uint64_t j = rowsLeft; j > 0; j--)
2385         {
2386             colNewValueList.push_back(colValueList[(totalRow * i) - j]);
2387         }
2388     }
2389 
2390     // end of allocate row id
2391 
2392 #ifdef PROFILE
2393     timer.start("writeColumnRec");
2394 #endif
2395 //cout << "Writing column record" << endl;
2396 
2397     if (rc == NO_ERROR)
2398     {
2399         //----------------------------------------------------------------------
2400         //Mark extents invalid
2401         //----------------------------------------------------------------------
2402         vector<BRM::LBID_t> lbids;
2403         vector<CalpontSystemCatalog::ColDataType> colDataTypes;
2404         bool successFlag = true;
2405         unsigned width = 0;
2406         int         curFbo = 0, curBio, lastFbo = -1;
2407 
2408         if (isFirstBatchPm && (totalRow == rowsLeft))
2409         {}
2410         else
2411         {
2412             for (unsigned i = 0; i < colStructList.size(); i++)
2413             {
2414                 colOp = m_colOp[op(colStructList[i].fCompressionType)];
2415                 width = colStructList[i].colWidth;
2416                 successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
2417 
2418                 if (successFlag)
2419                 {
2420                     if (curFbo != lastFbo)
2421                     {
2422                         RETURN_ON_ERROR(AddLBIDtoList(txnid,
2423                                                       lbids,
2424                                                       colDataTypes,
2425                                                       colStructList[i],
2426                                                       curFbo));
2427                     }
2428                 }
2429                 else
2430                     return ERR_INVALID_PARAM;
2431             }
2432         }
2433 
2434         // If we create a new extent for this batch
2435         for (unsigned i = 0; i < newColStructList.size(); i++)
2436         {
2437             colOp = m_colOp[op(newColStructList[i].fCompressionType)];
2438             width = newColStructList[i].colWidth;
2439             successFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / width, width, curFbo, curBio);
2440 
2441             if (successFlag)
2442             {
2443                 if (curFbo != lastFbo)
2444                 {
2445                     RETURN_ON_ERROR(AddLBIDtoList(txnid,
2446                                                   lbids,
2447                                                   colDataTypes,
2448                                                   newColStructList[i],
2449                                                   curFbo));
2450                 }
2451             }
2452             else
2453                 return ERR_INVALID_PARAM;
2454         }
2455 
2456         if (lbids.size() > 0)
2457             rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
2458 
2459         //----------------------------------------------------------------------
2460         // Write row(s) to database file(s)
2461         //----------------------------------------------------------------------
2462         bool versioning = !(isAutoCommitOn && insertSelect);
2463         AddDictToList(txnid, dictLbids);
2464         rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file
2465     }
2466 
2467     return rc;
2468 }
2469 
2470 
insertColumnRec_SYS(const TxnID & txnid,ColStructList & colStructList,ColValueList & colValueList,DctnryStructList & dctnryStructList,DictStrList & dictStrList,const int32_t tableOid)2471 int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid,
2472         ColStructList& colStructList,
2473         ColValueList& colValueList,
2474         DctnryStructList& dctnryStructList,
2475         DictStrList& dictStrList,
2476         const int32_t tableOid)
2477 {
2478     int            rc;
2479     RID*           rowIdArray = NULL;
2480     ColTupleList   curTupleList;
2481     Column         curCol;
2482     ColStruct      curColStruct;
2483     ColValueList   colOldValueList;
2484     ColValueList   colNewValueList;
2485     ColStructList  newColStructList;
2486     DctnryStructList newDctnryStructList;
2487     HWM            hwm = 0;
2488     HWM            newHwm = 0;
2489     HWM            oldHwm = 0;
2490     ColTupleList::size_type totalRow;
2491     ColStructList::size_type totalColumns;
2492     uint64_t rowsLeft = 0;
2493     bool newExtent = false;
2494     RIDList ridList;
2495     ColumnOp* colOp = NULL;
2496     uint32_t i = 0;
2497 #ifdef PROFILE
2498     StopWatch timer;
2499 #endif
2500 
2501     // debug information for testing
2502     if (isDebug(DEBUG_2))
2503     {
2504         printf("\nIn wrapper insert\n");
2505         printInputValue(colStructList, colValueList, ridList);
2506     }
2507 
2508     // end
2509 
2510     //Convert data type and column width to write engine specific
2511     for (i = 0; i < colStructList.size(); i++)
2512         Convertor::convertColType(&colStructList[i]);
2513 
2514     rc = checkValid(txnid, colStructList, colValueList, ridList);
2515 
2516     if (rc != NO_ERROR)
2517         return rc;
2518 
2519     setTransId(txnid);
2520 
2521     curTupleList = static_cast<ColTupleList>(colValueList[0]);
2522     totalRow = curTupleList.size();
2523     totalColumns = colStructList.size();
2524     rowIdArray = new RID[totalRow];
2525     // use scoped_array to ensure ptr deletion regardless of where we return
2526     boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
2527     memset(rowIdArray, 0, (sizeof(RID)*totalRow));
2528 
2529     // allocate row id(s)
2530     curColStruct = colStructList[0];
2531     colOp = m_colOp[op(curColStruct.fCompressionType)];
2532 
2533     colOp->initColumn(curCol);
2534 
2535     //Get the correct segment, partition, column file
2536     uint16_t dbRoot, segmentNum;
2537     uint32_t partitionNum;
2538     vector<ExtentInfo> colExtentInfo; //Save those empty extents in case of failure to rollback
2539     vector<ExtentInfo> dictExtentInfo; //Save those empty extents in case of failure to rollback
2540     vector<ExtentInfo> fileInfo;
2541     ExtentInfo info;
2542     //Don't search for empty space, always append to the end. May need to revisit this part
2543     dbRoot = curColStruct.fColDbRoot;
2544     int  extState;
2545     bool extFound;
2546     RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(
2547                         curColStruct.dataOid, dbRoot, partitionNum, segmentNum, hwm,
2548                         extState, extFound));
2549 
2550     for (i = 0; i < colStructList.size(); i++)
2551     {
2552         colStructList[i].fColPartition = partitionNum;
2553         colStructList[i].fColSegment = segmentNum;
2554         colStructList[i].fColDbRoot = dbRoot;
2555     }
2556 
2557     oldHwm = hwm; //Save this info for rollback
2558     //need to pass real dbRoot, partition, and segment to setColParam
2559     colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType,
2560                        curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
2561                        dbRoot, partitionNum, segmentNum);
2562 
2563     string segFile;
2564     rc = colOp->openColumnFile(curCol, segFile, false); // @bug 5572 HDFS tmp file
2565 
2566     if (rc != NO_ERROR)
2567     {
2568         return rc;
2569     }
2570 
2571     //get hwm first
2572     // @bug 286 : fix for bug 286 - correct the typo in getHWM
2573     //RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm));
2574 
2575     //...Note that we are casting totalRow to int to be in sync with
2576     //...allocRowId().  So we are assuming that totalRow
2577     //...(curTupleList.size()) will fit into an int.  We arleady made
2578     //...that assumption earlier in this method when we used totalRow
2579     //...in the call to calloc() to allocate rowIdArray.
2580     Column newCol;
2581     bool newFile;
2582 
2583 #ifdef PROFILE
2584     timer.start("allocRowId");
2585 #endif
2586 
2587     newColStructList = colStructList;
2588     newDctnryStructList = dctnryStructList;
2589     std::vector<boost::shared_ptr<DBRootExtentTracker> >   dbRootExtentTrackers;
2590     bool bUseStartExtent = true;
2591     rc = colOp->allocRowId(txnid, bUseStartExtent,
2592                            curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList,
2593                            dbRootExtentTrackers, false, false, 0);
2594 
2595     if ((rc == ERR_FILE_DISK_SPACE) && newExtent)
2596     {
2597         for (i = 0; i < newColStructList.size(); i++)
2598         {
2599             info.oid = newColStructList[i].dataOid;
2600             info.partitionNum = newColStructList[i].fColPartition;
2601             info.segmentNum = newColStructList[i].fColSegment;
2602             info.dbRoot = newColStructList[i].fColDbRoot;
2603 
2604             if (newFile)
2605                 fileInfo.push_back (info);
2606 
2607             colExtentInfo.push_back (info);
2608         }
2609 
2610         int rc1 = BRMWrapper::getInstance()->deleteEmptyColExtents(colExtentInfo);
2611 
2612         if ((rc1 == 0) &&  newFile)
2613         {
2614             rc1 = colOp->deleteFile(fileInfo[0].oid, fileInfo[0].dbRoot, fileInfo[0].partitionNum, fileInfo[0].segmentNum);
2615 
2616             if ( rc1 != NO_ERROR)
2617                 return rc;
2618 
2619             FileOp fileOp;
2620 
2621             for (i = 0; i < newDctnryStructList.size(); i++)
2622             {
2623                 if (newDctnryStructList[i].dctnryOid > 0)
2624                 {
2625                     info.oid = newDctnryStructList[i].dctnryOid;
2626                     info.partitionNum = newDctnryStructList[i].fColPartition;
2627                     info.segmentNum = newDctnryStructList[i].fColSegment;
2628                     info.dbRoot = newDctnryStructList[i].fColDbRoot;
2629                     info.newFile = true;
2630                     fileInfo.push_back (info);
2631                     dictExtentInfo.push_back (info);
2632                 }
2633             }
2634 
2635             if (dictExtentInfo.size() > 0)
2636             {
2637                 rc1 = BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo);
2638 
2639                 if ( rc1 != NO_ERROR)
2640                     return rc;
2641 
2642                 for (unsigned j = 0; j < fileInfo.size(); j++)
2643                 {
2644                     rc1 = fileOp.deleteFile(fileInfo[j].oid, fileInfo[j].dbRoot,
2645                                             fileInfo[j].partitionNum, fileInfo[j].segmentNum);
2646                 }
2647             }
2648         }
2649     }
2650 
2651     TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableOid);
2652 
2653     //..Expand initial abbreviated extent if any RID in 1st extent is > 256K
2654 // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
2655     if ((partitionNum == 0) &&
2656             (segmentNum   == 0) &&
2657             ((totalRow - rowsLeft) > 0) &&
2658             (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
2659     {
2660         for (size_t k = 1; k < colStructList.size(); k++)
2661         {
2662             Column expandCol;
2663             colOp = m_colOp[op(colStructList[k].fCompressionType)];
2664             colOp->setColParam(expandCol, 0,
2665                                colStructList[k].colWidth,
2666                                colStructList[k].colDataType,
2667                                colStructList[k].colType,
2668                                colStructList[k].dataOid,
2669                                colStructList[k].fCompressionType,
2670                                dbRoot,
2671                                partitionNum,
2672                                segmentNum);
2673             rc = colOp->openColumnFile(expandCol, segFile, false); // @bug 5572 HDFS tmp file
2674 
2675             if (rc == NO_ERROR)
2676             {
2677                 if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
2678                 {
2679                     rc = colOp->expandAbbrevExtent(expandCol);
2680                 }
2681             }
2682 
2683             if (rc != NO_ERROR)
2684             {
2685                 if (newExtent)
2686                 {
2687                     //Remove the empty extent added to the first column
2688                     int rc1 = BRMWrapper::getInstance()->
2689                               deleteEmptyColExtents(colExtentInfo);
2690 
2691                     if ((rc1 == 0) && newFile)
2692                     {
2693                         rc1 = colOp->deleteFile(fileInfo[0].oid,
2694                                                 fileInfo[0].dbRoot,
2695                                                 fileInfo[0].partitionNum,
2696                                                 fileInfo[0].segmentNum);
2697                     }
2698                 }
2699 
2700                 colOp->clearColumn(expandCol); // closes the file
2701                 return rc;
2702             }
2703 
2704             colOp->clearColumn(expandCol); // closes the file
2705         }
2706     }
2707 
2708     BRMWrapper::setUseVb(true);
2709     //Tokenize data if needed
2710     dictStr::iterator dctStr_iter;
2711     ColTupleList::iterator col_iter;
2712 
2713     for (i = 0; i < colStructList.size(); i++)
2714     {
2715         if (colStructList[i].tokenFlag)
2716         {
2717             dctStr_iter = dictStrList[i].begin();
2718             col_iter = colValueList[i].begin();
2719             Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
2720 
2721             dctnryStructList[i].fColPartition = partitionNum;
2722             dctnryStructList[i].fColSegment = segmentNum;
2723             dctnryStructList[i].fColDbRoot = dbRoot;
2724             rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
2725                                     dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition,
2726                                     dctnryStructList[i].fColSegment,
2727                                     false); // @bug 5572 HDFS tmp file
2728 
2729             if (rc != NO_ERROR)
2730                 return rc;
2731 
2732             ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
2733             ColExtsInfo::iterator it = aColExtsInfo.begin();
2734 
2735             while (it != aColExtsInfo.end())
2736             {
2737                 if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment))
2738                     break;
2739 
2740                 it++;
2741             }
2742 
2743             if (it == aColExtsInfo.end()) //add this one to the list
2744             {
2745                 ColExtInfo aExt;
2746                 aExt.dbRoot = dctnryStructList[i].fColDbRoot;
2747                 aExt.partNum = dctnryStructList[i].fColPartition;
2748                 aExt.segNum = dctnryStructList[i].fColSegment;
2749                 aExt.compType = dctnryStructList[i].fCompressionType;
2750                 aExt.isDict = true;
2751                 aColExtsInfo.push_back(aExt);
2752                 aTableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
2753             }
2754 
2755             for (uint32_t     rows = 0; rows < (totalRow - rowsLeft); rows++)
2756             {
2757                 if (dctStr_iter->length() == 0)
2758                 {
2759                     Token nullToken;
2760                     col_iter->data = nullToken;
2761                 }
2762                 else
2763                 {
2764 #ifdef PROFILE
2765                     timer.start("tokenize");
2766 #endif
2767                     DctnryTuple dctTuple;
2768                     dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
2769                     dctTuple.sigSize = dctStr_iter->length();
2770                     dctTuple.isNull = false;
2771                     rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
2772 
2773                     if (rc != NO_ERROR)
2774                     {
2775                         dctnry->closeDctnry();
2776                         return rc;
2777                     }
2778 
2779 #ifdef PROFILE
2780                     timer.stop("tokenize");
2781 #endif
2782                     col_iter->data = dctTuple.token;
2783                 }
2784 
2785                 dctStr_iter++;
2786                 col_iter++;
2787 
2788             }
2789 
2790             //close dictionary files
2791             rc = dctnry->closeDctnry();
2792 
2793             if (rc != NO_ERROR)
2794                 return rc;
2795 
2796             if (newExtent)
2797             {
2798                 rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid,
2799                                         newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition,
2800                                         newDctnryStructList[i].fColSegment,
2801                                         false); // @bug 5572 HDFS tmp file
2802 
2803                 if (rc != NO_ERROR)
2804                     return rc;
2805 
2806                 aColExtsInfo = aTableMetaData->getColExtsInfo(newDctnryStructList[i].dctnryOid);
2807                 it = aColExtsInfo.begin();
2808 
2809                 while (it != aColExtsInfo.end())
2810                 {
2811                     if ((it->dbRoot == newDctnryStructList[i].fColDbRoot) && (it->partNum == newDctnryStructList[i].fColPartition) && (it->segNum == newDctnryStructList[i].fColSegment))
2812                         break;
2813 
2814                     it++;
2815                 }
2816 
2817                 if (it == aColExtsInfo.end()) //add this one to the list
2818                 {
2819                     ColExtInfo aExt;
2820                     aExt.dbRoot = newDctnryStructList[i].fColDbRoot;
2821                     aExt.partNum = newDctnryStructList[i].fColPartition;
2822                     aExt.segNum = newDctnryStructList[i].fColSegment;
2823                     aExt.compType = newDctnryStructList[i].fCompressionType;
2824                     aExt.isDict = true;
2825                     aColExtsInfo.push_back(aExt);
2826                     aTableMetaData->setColExtsInfo(newDctnryStructList[i].dctnryOid, aColExtsInfo);
2827                 }
2828 
2829                 for (uint32_t     rows = 0; rows < rowsLeft; rows++)
2830                 {
2831                     if (dctStr_iter->length() == 0)
2832                     {
2833                         Token nullToken;
2834                         col_iter->data = nullToken;
2835                     }
2836                     else
2837                     {
2838 #ifdef PROFILE
2839                         timer.start("tokenize");
2840 #endif
2841                         DctnryTuple dctTuple;
2842                         dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
2843                         dctTuple.sigSize = dctStr_iter->length();
2844                         dctTuple.isNull = false;
2845                         rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
2846 
2847                         if (rc != NO_ERROR)
2848                         {
2849                             dctnry->closeDctnry();
2850                             return rc;
2851                         }
2852 
2853 #ifdef PROFILE
2854                         timer.stop("tokenize");
2855 #endif
2856                         col_iter->data = dctTuple.token;
2857                     }
2858 
2859                     dctStr_iter++;
2860                     col_iter++;
2861                 }
2862 
2863                 //close dictionary files
2864                 rc = dctnry->closeDctnry();
2865 
2866                 if (rc != NO_ERROR)
2867                     return rc;
2868             }
2869         }
2870     }
2871 
2872 
2873     //Update column info structure @Bug 1862 set hwm
2874     //@Bug 2205 Check whether all rows go to the new extent
2875     RID lastRid = 0;
2876     RID lastRidNew = 0;
2877 
2878     if (totalRow - rowsLeft > 0)
2879     {
2880         lastRid = rowIdArray[totalRow - rowsLeft - 1];
2881         lastRidNew = rowIdArray[totalRow - 1];
2882     }
2883     else
2884     {
2885         lastRid = 0;
2886         lastRidNew = rowIdArray[totalRow - 1];
2887     }
2888 
2889     //cout << "rowid allocated is "  << lastRid << endl;
2890     //if a new extent is created, all the columns in this table should have their own new extent
2891 
2892     //@Bug 1701. Close the file
2893     m_colOp[op(curCol.compressionType)]->clearColumn(curCol);
2894     std::vector<BulkSetHWMArg> hwmVecNewext;
2895     std::vector<BulkSetHWMArg> hwmVecOldext;
2896 
2897     if (newExtent) //Save all hwms to set them later.
2898     {
2899         BulkSetHWMArg aHwmEntryNew;
2900         BulkSetHWMArg aHwmEntryOld;
2901         bool succFlag = false;
2902         unsigned colWidth = 0;
2903         int  extState;
2904         bool extFound;
2905         int      curFbo = 0, curBio;
2906 
2907         for (i = 0; i < totalColumns; i++)
2908         {
2909             Column         curColLocal;
2910             colOp->initColumn(curColLocal);
2911 
2912             colOp = m_colOp[op(newColStructList[i].fCompressionType)];
2913             colOp->setColParam(curColLocal, 0,
2914                                newColStructList[i].colWidth, newColStructList[i].colDataType,
2915                                newColStructList[i].colType, newColStructList[i].dataOid,
2916                                newColStructList[i].fCompressionType, dbRoot, partitionNum, segmentNum);
2917 
2918             rc = BRMWrapper::getInstance()->getLastHWM_DBroot(
2919                      curColLocal.dataFile.fid, dbRoot, partitionNum, segmentNum, oldHwm,
2920                      extState, extFound);
2921 
2922             info.oid = curColLocal.dataFile.fid;
2923             info.partitionNum = partitionNum;
2924             info.segmentNum = segmentNum;
2925             info.dbRoot = dbRoot;
2926             info.hwm = oldHwm;
2927             colExtentInfo.push_back(info);
2928             // @Bug 2714 need to set hwm for the old extent
2929             colWidth = colStructList[i].colWidth;
2930             succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
2931 
2932             //cout << "insertcolumnrec   oid:rid:fbo:hwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << hwm << endl;
2933             if (succFlag)
2934             {
2935                 if ((HWM)curFbo > oldHwm)
2936                 {
2937                     aHwmEntryOld.oid = colStructList[i].dataOid;
2938                     aHwmEntryOld.partNum = partitionNum;
2939                     aHwmEntryOld.segNum = segmentNum;
2940                     aHwmEntryOld.hwm = curFbo;
2941                     hwmVecOldext.push_back(aHwmEntryOld);
2942                 }
2943             }
2944             else
2945                 return ERR_INVALID_PARAM;
2946 
2947             colWidth = newColStructList[i].colWidth;
2948             succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
2949 
2950             if (succFlag)
2951             {
2952                 aHwmEntryNew.oid = newColStructList[i].dataOid;
2953                 aHwmEntryNew.partNum = newColStructList[i].fColPartition;
2954                 aHwmEntryNew.segNum = newColStructList[i].fColSegment;
2955                 aHwmEntryNew.hwm = curFbo;
2956                 hwmVecNewext.push_back(aHwmEntryNew);
2957             }
2958 
2959             m_colOp[op(curColLocal.compressionType)]->clearColumn(curColLocal);
2960         }
2961 
2962         //Prepare the valuelist for the new extent
2963         ColTupleList colTupleList;
2964         ColTupleList newColTupleList;
2965         ColTupleList firstPartTupleList;
2966 
2967         for (unsigned i = 0; i < totalColumns; i++)
2968         {
2969             colTupleList = static_cast<ColTupleList>(colValueList[i]);
2970 
2971             for (uint64_t j = rowsLeft; j > 0; j--)
2972             {
2973                 newColTupleList.push_back(colTupleList[totalRow - j]);
2974             }
2975 
2976             colNewValueList.push_back(newColTupleList);
2977             newColTupleList.clear();
2978 
2979             //upate the oldvalue list for the old extent
2980             for (uint64_t j = 0; j < (totalRow - rowsLeft); j++)
2981             {
2982                 firstPartTupleList.push_back(colTupleList[j]);
2983             }
2984 
2985             colOldValueList.push_back(firstPartTupleList);
2986             firstPartTupleList.clear();
2987         }
2988     }
2989 
2990 //Mark extents invalid
2991     vector<BRM::LBID_t> lbids;
2992     vector<CalpontSystemCatalog::ColDataType> colDataTypes;
2993     bool successFlag = true;
2994     unsigned width = 0;
2995     BRM::LBID_t lbid;
2996     int         curFbo = 0, curBio, lastFbo = -1;
2997 
2998     if (totalRow - rowsLeft > 0)
2999     {
3000         for (unsigned i = 0; i < colStructList.size(); i++)
3001         {
3002             colOp = m_colOp[op(colStructList[i].fCompressionType)];
3003             width = colStructList[i].colWidth;
3004             successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
3005 
3006             if (successFlag)
3007             {
3008                 if (curFbo != lastFbo)
3009                 {
3010                     RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
3011                                         colStructList[i].dataOid, colStructList[i].fColPartition,
3012                                         colStructList[i].fColSegment, curFbo, lbid));
3013                     lbids.push_back((BRM::LBID_t)lbid);
3014                     colDataTypes.push_back(colStructList[i].colDataType);
3015                 }
3016             }
3017         }
3018     }
3019 
3020     lastRid = rowIdArray[totalRow - 1];
3021 
3022     for (unsigned i = 0; i < newColStructList.size(); i++)
3023     {
3024         colOp = m_colOp[op(newColStructList[i].fCompressionType)];
3025         width = newColStructList[i].colWidth;
3026         successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
3027 
3028         if (successFlag)
3029         {
3030             if (curFbo != lastFbo)
3031             {
3032                 RETURN_ON_ERROR(AddLBIDtoList(txnid,
3033                                               lbids,
3034                                               colDataTypes,
3035                                               newColStructList[i],
3036                                               curFbo));
3037             }
3038         }
3039     }
3040 
3041     //cout << "lbids size = " << lbids.size()<< endl;
3042     if (lbids.size() > 0)
3043         rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
3044 
3045     if (rc == NO_ERROR)
3046     {
3047         // MCOL-66 The DBRM can't handle concurrent transactions to sys tables
3048         static boost::mutex dbrmMutex;
3049         boost::mutex::scoped_lock lk(dbrmMutex);
3050 
3051         if (newExtent)
3052         {
3053             rc = writeColumnRec(txnid, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file
3054         }
3055         else
3056         {
3057             rc = writeColumnRec(txnid, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file
3058         }
3059     }
3060 
3061 #ifdef PROFILE
3062     timer.stop("writeColumnRec");
3063 #endif
3064 //   for (ColTupleList::size_type  i = 0; i < totalRow; i++)
3065 //      ridList.push_back((RID) rowIdArray[i]);
3066 
3067     // if (rc == NO_ERROR)
3068     //   rc = flushDataFiles(NO_ERROR);
3069 
3070     if ( !newExtent )
3071     {
3072         //flushVMCache();
3073         bool succFlag = false;
3074         unsigned colWidth = 0;
3075         int  extState;
3076         bool extFound;
3077         int curFbo = 0, curBio;
3078         std::vector<BulkSetHWMArg> hwmVec;
3079 
3080         for (unsigned i = 0; i < totalColumns; i++)
3081         {
3082             //colOp = m_colOp[op(colStructList[i].fCompressionType)];
3083             //Set all columns hwm together
3084             BulkSetHWMArg aHwmEntry;
3085             RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(colStructList[i].dataOid, dbRoot, partitionNum, segmentNum, hwm,
3086                             extState, extFound));
3087             colWidth = colStructList[i].colWidth;
3088             succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
3089 
3090             //cout << "insertcolumnrec   oid:rid:fbo:hwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << hwm << endl;
3091             if (succFlag)
3092             {
3093                 if ((HWM)curFbo > hwm)
3094                 {
3095                     aHwmEntry.oid = colStructList[i].dataOid;
3096                     aHwmEntry.partNum = partitionNum;
3097                     aHwmEntry.segNum = segmentNum;
3098                     aHwmEntry.hwm = curFbo;
3099                     hwmVec.push_back(aHwmEntry);
3100                 }
3101             }
3102             else
3103                 return ERR_INVALID_PARAM;
3104         }
3105 
3106         if (hwmVec.size() > 0 )
3107         {
3108             std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
3109             RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP( hwmVec, mergeCPDataArgs));
3110         }
3111     }
3112 
3113     if (newExtent)
3114     {
3115 #ifdef PROFILE
3116         timer.start("flushVMCache");
3117 #endif
3118         std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
3119         RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP( hwmVecNewext, mergeCPDataArgs));
3120         RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP( hwmVecOldext, mergeCPDataArgs));
3121         //flushVMCache();
3122 #ifdef PROFILE
3123         timer.stop("flushVMCache");
3124 #endif
3125     }
3126 
3127 #ifdef PROFILE
3128     timer.finish();
3129 #endif
3130     return rc;
3131 }
3132 
insertColumnRec_Single(const TxnID & txnid,ColStructList & colStructList,ColValueList & colValueList,DctnryStructList & dctnryStructList,DictStrList & dictStrList,const int32_t tableOid)3133 int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid,
3134         ColStructList& colStructList,
3135         ColValueList& colValueList,
3136         DctnryStructList& dctnryStructList,
3137         DictStrList& dictStrList,
3138         const int32_t tableOid)
3139 {
3140     int            rc;
3141     RID*           rowIdArray = NULL;
3142     ColTupleList   curTupleList;
3143     Column         curCol;
3144     ColStruct      curColStruct;
3145     ColValueList   colOldValueList;
3146     ColValueList   colNewValueList;
3147     ColStructList  newColStructList;
3148     DctnryStructList newDctnryStructList;
3149     HWM            hwm = 0;
3150     HWM            newHwm = 0;
3151     HWM            oldHwm = 0;
3152     ColTupleList::size_type totalRow;
3153     ColStructList::size_type totalColumns;
3154     uint64_t rowsLeft = 0;
3155     bool newExtent = false;
3156     RIDList ridList;
3157     ColumnOp* colOp = NULL;
3158     uint32_t i = 0;
3159 
3160 #ifdef PROFILE
3161     StopWatch timer;
3162 #endif
3163 
3164     // debug information for testing
3165     if (isDebug(DEBUG_2))
3166     {
3167         printf("\nIn wrapper insert\n");
3168         printInputValue(colStructList, colValueList, ridList);
3169     }
3170 
3171     // end
3172 
3173     //Convert data type and column width to write engine specific
3174     for (i = 0; i < colStructList.size(); i++)
3175         Convertor::convertColType(&colStructList[i]);
3176 
3177     uint32_t colId = 0;
3178     // MCOL-1675: find the smallest column width to calculate the RowID from so
3179     // that all HWMs will be incremented by this operation
3180     findSmallestColumn(colId, colStructList);
3181 
3182     rc = checkValid(txnid, colStructList, colValueList, ridList);
3183 
3184     if (rc != NO_ERROR)
3185         return rc;
3186 
3187     setTransId(txnid);
3188 
3189     curTupleList = static_cast<ColTupleList>(colValueList[0]);
3190     totalRow = curTupleList.size();
3191     totalColumns = colStructList.size();
3192     rowIdArray = new RID[totalRow];
3193     // use scoped_array to ensure ptr deletion regardless of where we return
3194     boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
3195     memset(rowIdArray, 0, (sizeof(RID)*totalRow));
3196 
3197     //--------------------------------------------------------------------------
3198     // allocate row id(s)
3199     //--------------------------------------------------------------------------
3200 	curColStruct = colStructList[colId];
3201     colOp = m_colOp[op(curColStruct.fCompressionType)];
3202 
3203     colOp->initColumn(curCol);
3204 
3205     //Get the correct segment, partition, column file
3206     uint16_t dbRoot;
3207     uint16_t segmentNum   = 0;
3208     uint32_t partitionNum = 0;
3209     //Don't search for empty space, always append to the end. May revisit later
3210     dbRoot = curColStruct.fColDbRoot;
3211     int  extState;
3212     bool bStartExtFound;
3213     bool bUseStartExtent = false;
3214     RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(
3215                         curColStruct.dataOid, dbRoot, partitionNum, segmentNum, hwm,
3216                         extState, bStartExtFound));
3217 
3218     if ((bStartExtFound) && (extState == BRM::EXTENTAVAILABLE))
3219         bUseStartExtent = true;
3220 
3221     for (i = 0; i < colStructList.size(); i++)
3222     {
3223         colStructList[i].fColPartition = partitionNum;
3224         colStructList[i].fColSegment   = segmentNum;
3225         colStructList[i].fColDbRoot    = dbRoot;
3226     }
3227 
3228     for (i = 0; i < dctnryStructList.size(); i++)
3229     {
3230         dctnryStructList[i].fColPartition = partitionNum;
3231         dctnryStructList[i].fColSegment   = segmentNum;
3232         dctnryStructList[i].fColDbRoot    = dbRoot;
3233     }
3234 
3235     oldHwm = hwm; //Save this info for rollback
3236     //need to pass real dbRoot, partition, and segment to setColParam
3237 	colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType,
3238                        curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
3239                        dbRoot, partitionNum, segmentNum);
3240 
3241     string segFile;
3242 
3243     if (bUseStartExtent)
3244     {
3245         rc = colOp->openColumnFile(curCol, segFile, true); // @bug 5572 HDFS tmp file
3246 
3247         if (rc != NO_ERROR)
3248         {
3249             return rc;
3250         }
3251     }
3252 
3253     bool newFile;
3254 
3255 #ifdef PROFILE
3256     timer.start("allocRowId");
3257 #endif
3258     newColStructList = colStructList;
3259     newDctnryStructList = dctnryStructList;
3260     std::vector<boost::shared_ptr<DBRootExtentTracker> >  dbRootExtentTrackers;
3261     rc = colOp->allocRowId(txnid, bUseStartExtent,
3262                            curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent,
3263                            rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList,
3264                            dbRootExtentTrackers, false, false, 0);
3265 
3266     //--------------------------------------------------------------------------
3267     // Handle case where we ran out of disk space allocating a new extent.
3268     // Rollback extentmap and delete any db files that were created.
3269     //--------------------------------------------------------------------------
3270     if (rc != NO_ERROR)
3271     {
3272         if ((rc == ERR_FILE_DISK_SPACE) && newExtent)
3273         {
3274             vector<ExtentInfo> colExtentInfo;
3275             vector<ExtentInfo> dictExtentInfo;
3276             vector<ExtentInfo> fileInfo;
3277             ExtentInfo info;
3278 
3279             for (i = 0; i < newColStructList.size(); i++)
3280             {
3281                 info.oid          = newColStructList[i].dataOid;
3282                 info.partitionNum = newColStructList[i].fColPartition;
3283                 info.segmentNum   = newColStructList[i].fColSegment;
3284                 info.dbRoot       = newColStructList[i].fColDbRoot;
3285 
3286                 if (newFile)
3287                     fileInfo.push_back (info);
3288 
3289                 colExtentInfo.push_back (info);
3290             }
3291 
3292             int rc1 = BRMWrapper::getInstance()->deleteEmptyColExtents(colExtentInfo);
3293 
3294             // Only rollback dictionary extents "if" store file is new
3295             if ((rc1 == 0) && newFile)
3296             {
3297                 for (unsigned int j = 0; j < fileInfo.size(); j++)
3298                 {
3299                     // ignore return code and delete what we can
3300                     rc1 = colOp->deleteFile(fileInfo[j].oid,
3301                                             fileInfo[j].dbRoot,
3302                                             fileInfo[j].partitionNum,
3303                                             fileInfo[j].segmentNum);
3304                 }
3305 
3306                 fileInfo.clear();
3307 
3308                 for (i = 0; i < newDctnryStructList.size(); i++)
3309                 {
3310                     if (newDctnryStructList[i].dctnryOid > 0)
3311                     {
3312                         info.oid          = newDctnryStructList[i].dctnryOid;
3313                         info.partitionNum = newDctnryStructList[i].fColPartition;
3314                         info.segmentNum   = newDctnryStructList[i].fColSegment;
3315                         info.dbRoot       = newDctnryStructList[i].fColDbRoot;
3316                         info.newFile      = true;
3317                         fileInfo.push_back (info);
3318                         dictExtentInfo.push_back (info);
3319                     }
3320                 }
3321 
3322                 if (dictExtentInfo.size() > 0)
3323                 {
3324                     FileOp fileOp;
3325                     rc1 = BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo);
3326 
3327                     if ( rc1 != NO_ERROR)
3328                         return rc;
3329 
3330                     for (unsigned j = 0; j < fileInfo.size(); j++)
3331                     {
3332                         rc1 = fileOp.deleteFile(fileInfo[j].oid,
3333                                                 fileInfo[j].dbRoot,
3334                                                 fileInfo[j].partitionNum,
3335                                                 fileInfo[j].segmentNum);
3336                     }
3337                 }
3338             }
3339         } // disk space error allocating new extent
3340 
3341         return rc;
3342     }     // rc != NO_ERROR from call to allocRowID()
3343 
3344 #ifdef PROFILE
3345     timer.stop("allocRowId");
3346 #endif
3347 
3348     TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableOid);
3349 
3350     //--------------------------------------------------------------------------
3351     // Expand initial abbreviated extent if any RID in 1st extent is > 256K.
3352     // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
3353     //--------------------------------------------------------------------------
3354 // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
3355 	if ((colStructList[colId].fColPartition == 0) &&
3356 		(colStructList[colId].fColSegment   == 0) &&
3357             ((totalRow - rowsLeft) > 0) &&
3358             (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
3359     {
3360 		for (unsigned k=0; k<colStructList.size(); k++)
3361         {
3362             if (k == colId)
3363                continue;
3364             Column expandCol;
3365             colOp = m_colOp[op(colStructList[k].fCompressionType)];
3366             colOp->setColParam(expandCol, 0,
3367                                colStructList[k].colWidth,
3368                                colStructList[k].colDataType,
3369                                colStructList[k].colType,
3370                                colStructList[k].dataOid,
3371                                colStructList[k].fCompressionType,
3372                                colStructList[k].fColDbRoot,
3373                                colStructList[k].fColPartition,
3374                                colStructList[k].fColSegment);
3375             rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file
3376 
3377             if (rc == NO_ERROR)
3378             {
3379                 if (colOp->abbreviatedExtent(
3380                             expandCol.dataFile.pFile, colStructList[k].colWidth))
3381                 {
3382                     rc = colOp->expandAbbrevExtent(expandCol);
3383                 }
3384             }
3385 
3386             colOp->clearColumn(expandCol); // closes the file
3387 
3388             if (rc != NO_ERROR)
3389             {
3390                 return rc;
3391             }
3392         } // loop through columns
3393     }     // if starting extent needs to be expanded
3394 
3395     //--------------------------------------------------------------------------
3396     // Tokenize data if needed
3397     //--------------------------------------------------------------------------
3398     dictStr::iterator dctStr_iter;
3399     ColTupleList::iterator col_iter;
3400 
3401     for (unsigned i = 0; i < colStructList.size(); i++)
3402     {
3403         if (colStructList[i].tokenFlag)
3404         {
3405             dctStr_iter = dictStrList[i].begin();
3406             col_iter = colValueList[i].begin();
3407             Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
3408 
3409             ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
3410             ColExtsInfo::iterator it = aColExtsInfo.begin();
3411 
3412             if (bUseStartExtent)
3413             {
3414                 rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
3415                                         dctnryStructList[i].fColDbRoot,
3416                                         dctnryStructList[i].fColPartition,
3417                                         dctnryStructList[i].fColSegment,
3418                                         true); // @bug 5572 HDFS tmp file
3419 
3420                 if (rc != NO_ERROR)
3421                     return rc;
3422 
3423                 while (it != aColExtsInfo.end())
3424                 {
3425                     if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment))
3426                         break;
3427 
3428                     it++;
3429                 }
3430 
3431                 if (it == aColExtsInfo.end()) //add this one to the list
3432                 {
3433                     ColExtInfo aExt;
3434                     aExt.dbRoot = dctnryStructList[i].fColDbRoot;
3435                     aExt.partNum = dctnryStructList[i].fColPartition;
3436                     aExt.segNum = dctnryStructList[i].fColSegment;
3437                     aExt.compType = dctnryStructList[i].fCompressionType;
3438                     aExt.isDict = true;
3439                     aColExtsInfo.push_back(aExt);
3440                     aTableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
3441                 }
3442 
3443 
3444                 for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
3445                 {
3446                     if (dctStr_iter->length() == 0)
3447                     {
3448                         Token nullToken;
3449                         col_iter->data = nullToken;
3450                     }
3451                     else
3452                     {
3453 #ifdef PROFILE
3454                         timer.start("tokenize");
3455 #endif
3456                         DctnryTuple dctTuple;
3457                         dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
3458                         dctTuple.sigSize = dctStr_iter->length();
3459                         dctTuple.isNull = false;
3460                         rc = tokenize(txnid,
3461                                       dctTuple,
3462                                       dctnryStructList[i].fCompressionType);
3463 
3464                         if (rc != NO_ERROR)
3465                         {
3466                             dctnry->closeDctnry();
3467                             return rc;
3468                         }
3469 
3470 #ifdef PROFILE
3471                         timer.stop("tokenize");
3472 #endif
3473                         col_iter->data = dctTuple.token;
3474                     }
3475 
3476                     dctStr_iter++;
3477                     col_iter++;
3478                 }
3479 
3480                 //close dictionary files
3481                 rc = dctnry->closeDctnry();
3482 
3483                 if (rc != NO_ERROR)
3484                     return rc;
3485             } // tokenize dictionary rows in 1st extent
3486 
3487             if (newExtent)
3488             {
3489                 rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid,
3490                                         newDctnryStructList[i].fColDbRoot,
3491                                         newDctnryStructList[i].fColPartition,
3492                                         newDctnryStructList[i].fColSegment,
3493                                         false); // @bug 5572 HDFS tmp file
3494 
3495                 if (rc != NO_ERROR)
3496                     return rc;
3497 
3498                 aColExtsInfo = aTableMetaData->getColExtsInfo(newDctnryStructList[i].dctnryOid);
3499                 it = aColExtsInfo.begin();
3500 
3501                 while (it != aColExtsInfo.end())
3502                 {
3503                     if ((it->dbRoot == newDctnryStructList[i].fColDbRoot) && (it->partNum == newDctnryStructList[i].fColPartition) && (it->segNum == newDctnryStructList[i].fColSegment))
3504                         break;
3505 
3506                     it++;
3507                 }
3508 
3509                 if (it == aColExtsInfo.end()) //add this one to the list
3510                 {
3511                     ColExtInfo aExt;
3512                     aExt.dbRoot = newDctnryStructList[i].fColDbRoot;
3513                     aExt.partNum = newDctnryStructList[i].fColPartition;
3514                     aExt.segNum = newDctnryStructList[i].fColSegment;
3515                     aExt.compType = newDctnryStructList[i].fCompressionType;
3516                     aExt.isDict = true;
3517                     aColExtsInfo.push_back(aExt);
3518                     aTableMetaData->setColExtsInfo(newDctnryStructList[i].dctnryOid, aColExtsInfo);
3519                 }
3520 
3521                 for (uint32_t rows = 0; rows < rowsLeft; rows++)
3522                 {
3523                     if (dctStr_iter->length() == 0)
3524                     {
3525                         Token nullToken;
3526                         col_iter->data = nullToken;
3527                     }
3528                     else
3529                     {
3530 #ifdef PROFILE
3531                         timer.start("tokenize");
3532 #endif
3533                         DctnryTuple dctTuple;
3534                         dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
3535                         dctTuple.sigSize = dctStr_iter->length();
3536                         dctTuple.isNull = false;
3537                         rc = tokenize(txnid,
3538                                       dctTuple,
3539                                       newDctnryStructList[i].fCompressionType);
3540 
3541                         if (rc != NO_ERROR)
3542                         {
3543                             dctnry->closeDctnry();
3544                             return rc;
3545                         }
3546 
3547 #ifdef PROFILE
3548                         timer.stop("tokenize");
3549 #endif
3550                         col_iter->data = dctTuple.token;
3551                     }
3552 
3553                     dctStr_iter++;
3554                     col_iter++;
3555                 }
3556 
3557                 //close dictionary files
3558                 rc = dctnry->closeDctnry();
3559 
3560                 if (rc != NO_ERROR)
3561                     return rc;
3562             } // tokenize dictionary rows in second extent
3563         }     // tokenize dictionary columns
3564     }         // loop through columns to see which ones need tokenizing
3565 
3566     //----------------------------------------------------------------------
3567     // Update column info structure @Bug 1862 set hwm, and
3568     // Prepare ValueList for new extent (if applicable)
3569     //----------------------------------------------------------------------
3570     //@Bug 2205 Check whether all rows go to the new extent
3571     RID lastRid = 0;
3572     RID lastRidNew = 0;
3573 
3574     if (totalRow - rowsLeft > 0)
3575     {
3576         lastRid = rowIdArray[totalRow - rowsLeft - 1];
3577         lastRidNew = rowIdArray[totalRow - 1];
3578     }
3579     else
3580     {
3581         lastRid = 0;
3582         lastRidNew = rowIdArray[totalRow - 1];
3583     }
3584 
3585     //cout << "rowid allocated is "  << lastRid << endl;
3586     //if a new extent is created, all the columns in this table should
3587     //have their own new extent
3588 
3589     //@Bug 1701. Close the file
3590     if (bUseStartExtent)
3591     {
3592         m_colOp[op(curCol.compressionType)]->clearColumn(curCol);
3593     }
3594 
3595     std::vector<BulkSetHWMArg> hwmVecNewext;
3596     std::vector<BulkSetHWMArg> hwmVecOldext;
3597 
3598     if (newExtent) //Save all hwms to set them later.
3599     {
3600         BulkSetHWMArg aHwmEntryNew;
3601         BulkSetHWMArg aHwmEntryOld;
3602 
3603         bool succFlag = false;
3604         unsigned colWidth = 0;
3605         int curFbo = 0, curBio;
3606 
3607         for (i = 0; i < totalColumns; i++)
3608         {
3609             colOp = m_colOp[op(newColStructList[i].fCompressionType)];
3610 
3611             // @Bug 2714 need to set hwm for the old extent
3612             colWidth = colStructList[i].colWidth;
3613             succFlag = colOp->calculateRowId(lastRid,
3614                                              BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
3615 
3616             //cout << "insertcolumnrec   oid:rid:fbo:hwm = " <<
3617             //colStructList[i].dataOid << ":" << lastRid << ":" <<
3618             //curFbo << ":" << hwm << endl;
3619             if (succFlag)
3620             {
3621                 if ((HWM)curFbo > oldHwm)
3622                 {
3623                     aHwmEntryOld.oid     = colStructList[i].dataOid;
3624                     aHwmEntryOld.partNum = colStructList[i].fColPartition;
3625                     aHwmEntryOld.segNum  = colStructList[i].fColSegment;
3626                     aHwmEntryOld.hwm     = curFbo;
3627                     hwmVecOldext.push_back(aHwmEntryOld);
3628                 }
3629             }
3630             else
3631                 return ERR_INVALID_PARAM;
3632 
3633             colWidth = newColStructList[i].colWidth;
3634             succFlag = colOp->calculateRowId(lastRidNew,
3635                                              BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
3636 
3637             if (succFlag)
3638             {
3639                 aHwmEntryNew.oid     = newColStructList[i].dataOid;
3640                 aHwmEntryNew.partNum = newColStructList[i].fColPartition;
3641                 aHwmEntryNew.segNum  = newColStructList[i].fColSegment;
3642                 aHwmEntryNew.hwm     = curFbo;
3643                 hwmVecNewext.push_back(aHwmEntryNew);
3644             }
3645         }
3646 
3647         //----------------------------------------------------------------------
3648         // Prepare the valuelist for the new extent
3649         //----------------------------------------------------------------------
3650         ColTupleList colTupleList;
3651         ColTupleList newColTupleList;
3652         ColTupleList firstPartTupleList;
3653 
3654         for (unsigned i = 0; i < totalColumns; i++)
3655         {
3656             colTupleList = static_cast<ColTupleList>(colValueList[i]);
3657 
3658             for (uint64_t j = rowsLeft; j > 0; j--)
3659             {
3660                 newColTupleList.push_back(colTupleList[totalRow - j]);
3661             }
3662 
3663             colNewValueList.push_back(newColTupleList);
3664 
3665             newColTupleList.clear();
3666 
3667             //upate the oldvalue list for the old extent
3668             for (uint64_t j = 0; j < (totalRow - rowsLeft); j++)
3669             {
3670                 firstPartTupleList.push_back(colTupleList[j]);
3671             }
3672 
3673             colOldValueList.push_back(firstPartTupleList);
3674             firstPartTupleList.clear();
3675         }
3676     }
3677 
3678     //--------------------------------------------------------------------------
3679     //Mark extents invalid
3680     //--------------------------------------------------------------------------
3681     vector<BRM::LBID_t> lbids;
3682     vector<CalpontSystemCatalog::ColDataType> colDataTypes;
3683     bool successFlag = true;
3684     unsigned width = 0;
3685     //BRM::LBID_t lbid;
3686     int curFbo = 0, curBio, lastFbo = -1;
3687     /*	if (totalRow-rowsLeft > 0)
3688     	{
3689     		for (unsigned i = 0; i < colStructList.size(); i++)
3690     		{
3691     			colOp = m_colOp[op(colStructList[i].fCompressionType)];
3692     			width = colStructList[i].colWidth;
3693     			successFlag = colOp->calculateRowId(lastRid ,
3694     				BYTE_PER_BLOCK/width, width, curFbo, curBio);
3695     			if (successFlag) {
3696     				if (curFbo != lastFbo) {
3697     					RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
3698     						colStructList[i].dataOid,
3699     						colStructList[i].fColPartition,
3700     						colStructList[i].fColSegment, curFbo, lbid));
3701     					lbids.push_back((BRM::LBID_t)lbid);
3702     					colDataTypes.push_back(colStructList[i].colDataType);
3703     				}
3704     			}
3705     		}
3706     	}
3707     */
3708     lastRid = rowIdArray[totalRow - 1];
3709 
3710     for (unsigned i = 0; i < colStructList.size(); i++)
3711     {
3712         colOp = m_colOp[op(colStructList[i].fCompressionType)];
3713         width = colStructList[i].colWidth;
3714         successFlag = colOp->calculateRowId(lastRid,
3715                                             BYTE_PER_BLOCK / width, width, curFbo, curBio);
3716 
3717         if (successFlag)
3718         {
3719             if (curFbo != lastFbo)
3720             {
3721                 colDataTypes.push_back(colStructList[i].colDataType);
3722                 RETURN_ON_ERROR(AddLBIDtoList(txnid,
3723                                               lbids,
3724                                               colDataTypes,
3725                                               colStructList[i],
3726                                               curFbo));
3727             }
3728         }
3729     }
3730 
3731     //cout << "lbids size = " << lbids.size()<< endl;
3732     if (lbids.size() > 0)
3733         rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
3734 
3735     //--------------------------------------------------------------------------
3736     // Write row(s) to database file(s)
3737     //--------------------------------------------------------------------------
3738 #ifdef PROFILE
3739     timer.start("writeColumnRec");
3740 #endif
3741 
3742     if (rc == NO_ERROR)
3743     {
3744         if (newExtent)
3745         {
3746             rc = writeColumnRec(txnid, colStructList, colOldValueList,
3747                                 rowIdArray, newColStructList, colNewValueList, tableOid,
3748                                 false); // @bug 5572 HDFS tmp file
3749         }
3750         else
3751         {
3752             rc = writeColumnRec(txnid, colStructList, colValueList,
3753                                 rowIdArray, newColStructList, colNewValueList, tableOid,
3754                                 true); // @bug 5572 HDFS tmp file
3755         }
3756     }
3757 
3758 #ifdef PROFILE
3759     timer.stop("writeColumnRec");
3760 #endif
3761 //  for (ColTupleList::size_type  i = 0; i < totalRow; i++)
3762 //      ridList.push_back((RID) rowIdArray[i]);
3763 
3764 //  if (rc == NO_ERROR)
3765 //  rc = flushDataFiles(NO_ERROR);
3766 
3767     //--------------------------------------------------------------------------
3768     // Update BRM
3769     //--------------------------------------------------------------------------
3770     if ( !newExtent )
3771     {
3772         //flushVMCache();
3773         bool succFlag = false;
3774         unsigned colWidth = 0;
3775         int  extState;
3776         bool extFound;
3777         int curFbo = 0, curBio;
3778         std::vector<BulkSetHWMArg> hwmVec;
3779 
3780         for (unsigned i = 0; i < totalColumns; i++)
3781         {
3782             //colOp = m_colOp[op(colStructList[i].fCompressionType)];
3783             //Set all columns hwm together
3784             BulkSetHWMArg aHwmEntry;
3785             RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(
3786                                 colStructList[i].dataOid,
3787                                 colStructList[i].fColDbRoot,
3788                                 colStructList[i].fColPartition,
3789                                 colStructList[i].fColSegment,
3790                                 hwm,
3791                                 extState, extFound));
3792             colWidth = colStructList[i].colWidth;
3793             succFlag = colOp->calculateRowId(lastRid,
3794                                              BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
3795 
3796             //cout << "insertcolumnrec   oid:rid:fbo:hwm = " <<
3797             //colStructList[i].dataOid << ":" << lastRid << ":" <<
3798             //curFbo << ":" << hwm << endl;
3799             if (succFlag)
3800             {
3801                 if ((HWM)curFbo > hwm)
3802                 {
3803                     aHwmEntry.oid     = colStructList[i].dataOid;
3804                     aHwmEntry.partNum = colStructList[i].fColPartition;
3805                     aHwmEntry.segNum  = colStructList[i].fColSegment;
3806                     aHwmEntry.hwm     = curFbo;
3807                     hwmVec.push_back(aHwmEntry);
3808                 }
3809             }
3810             else
3811                 return ERR_INVALID_PARAM;
3812         }
3813 
3814         std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
3815         RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(
3816                             hwmVec, mergeCPDataArgs));
3817     }
3818     else // if (newExtent)
3819     {
3820 #ifdef PROFILE
3821         timer.start("flushVMCache");
3822 #endif
3823         std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
3824 
3825         if (hwmVecNewext.size() > 0)
3826             RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(
3827                                 hwmVecNewext, mergeCPDataArgs));
3828 
3829         if (hwmVecOldext.size() > 0)
3830             RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(
3831                                 hwmVecOldext, mergeCPDataArgs));
3832 
3833         //flushVMCache();
3834 #ifdef PROFILE
3835         timer.stop("flushVMCache");
3836 #endif
3837     }
3838 
3839 #ifdef PROFILE
3840     timer.finish();
3841 #endif
3842     //flush PrimProc FD cache moved to we_dmlcommandproc.cpp
3843     /*	ColsExtsInfoMap colsExtsInfoMap = aTableMetaData->getColsExtsInfoMap();
3844     	ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
3845     	ColExtsInfo::iterator aIt;
3846     	std::vector<BRM::FileInfo> files;
3847     	BRM::FileInfo aFile;
3848     	while (it != colsExtsInfoMap.end())
3849     	{
3850     		aIt = (it->second).begin();
3851     		aFile.oid = it->first;
3852     		//cout << "OID:" << aArg.oid;
3853     		while (aIt != (it->second).end())
3854     		{
3855     			aFile.partitionNum = aIt->partNum;
3856     			aFile.dbRoot =aIt->dbRoot;
3857     			aFile.segmentNum = aIt->segNum;
3858     			aFile.compType = aIt->compType;
3859     			files.push_back(aFile);
3860     			//cout <<"Added to files oid:dbroot:part:seg:compType = " << aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
3861     			//<<":"<<aFile.compType <<endl;
3862     			aIt++;
3863     		}
3864     		it++;
3865     	}
3866     	if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
3867     		cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
3868     	TableMetaData::removeTableMetaData(tableOid);	*/
3869     return rc;
3870 }
3871 
3872 /*@brief printInputValue - Print input value
3873 */
3874 /***********************************************************
3875  * DESCRIPTION:
3876  *    Print input value
3877  * PARAMETERS:
3878  *    tableOid - table object id
3879  *    colStructList - column struct list
3880  *    colValueList - column value list
3881  *    ridList - RID list
3882  * RETURN:
3883  *    none
3884  ***********************************************************/
printInputValue(const ColStructList & colStructList,const ColValueList & colValueList,const RIDList & ridList) const3885 void WriteEngineWrapper::printInputValue(const ColStructList& colStructList,
3886         const ColValueList& colValueList,
3887         const RIDList& ridList) const
3888 {
3889     ColTupleList   curTupleList;
3890     ColStruct      curColStruct;
3891     ColTuple       curTuple;
3892     string         curStr;
3893     ColStructList::size_type i;
3894     ColTupleList::size_type  j;
3895 
3896     printf("\n=========================\n");
3897 //      printf("\nTable OID : %d \n", tableOid);
3898 
3899     printf("\nTotal RIDs: %zu\n", ridList.size());
3900 
3901     for (i = 0; i < ridList.size(); i++)
3902         cout << "RID[" << i << "] : " << ridList[i] << "\n";
3903 
3904     printf("\nTotal Columns: %zu\n", colStructList.size());
3905 
3906 
3907     for (i = 0; i < colStructList.size(); i++)
3908     {
3909         curColStruct = colStructList[i];
3910         curTupleList = colValueList[i];
3911 
3912         printf("\nColumn[%zu]", i);
3913         printf("\nData file OID : %d \t", curColStruct.dataOid);
3914         printf("\tWidth : %d \t Type: %d", curColStruct.colWidth, curColStruct.colDataType);
3915         printf("\nTotal values : %zu \n", curTupleList.size());
3916 
3917         for (j = 0; j < curTupleList.size(); j++)
3918         {
3919             curTuple = curTupleList[j];
3920 
3921             try
3922             {
3923                 if (curTuple.data.type() == typeid(int))
3924                     curStr = boost::lexical_cast<string>(boost::any_cast<int>(curTuple.data));
3925                 else if (curTuple.data.type() == typeid(float))
3926                     curStr = boost::lexical_cast<string>(boost::any_cast<float>(curTuple.data));
3927                 else if (curTuple.data.type() == typeid(long long))
3928                     curStr = boost::lexical_cast<string>(boost::any_cast<long long>(curTuple.data));
3929                 else if (curTuple.data.type() == typeid(double))
3930                     curStr = boost::lexical_cast<string>(boost::any_cast<double>(curTuple.data));
3931 //               else
3932 //               if (curTuple.data.type() == typeid(bool))
3933 //                  curStr = boost::lexical_cast<string>(boost::any_cast<bool>(curTuple.data));
3934                 else if (curTuple.data.type() == typeid(short))
3935                     curStr = boost::lexical_cast<string>(boost::any_cast<short>(curTuple.data));
3936                 else if (curTuple.data.type() == typeid(char))
3937                     curStr = boost::lexical_cast<string>(boost::any_cast<char>(curTuple.data));
3938                 else
3939                     curStr = boost::any_cast<string>(curTuple.data);
3940             }
3941             catch (...)
3942             {
3943             }
3944 
3945             if (isDebug(DEBUG_3))
3946                 printf("Value[%zu] : %s\n", j, curStr.c_str());
3947         }
3948 
3949     }
3950 
3951     printf("\n=========================\n");
3952 }
3953 
3954 /***********************************************************
3955  * DESCRIPTION:
3956  *    Process version buffer before any write operation
3957  * PARAMETERS:
3958  *    txnid - transaction id
3959  *    oid - column oid
3960  *    totalRow - total number of rows
3961  *    rowIdArray - rowid array
3962  * RETURN:
3963  *    NO_ERROR if success
3964  *    others if something wrong in inserting the value
3965  ***********************************************************/
processVersionBuffer(IDBDataFile * pFile,const TxnID & txnid,const ColStruct & colStruct,int width,int totalRow,const RID * rowIdArray,vector<LBIDRange> & rangeList)3966 int WriteEngineWrapper::processVersionBuffer(IDBDataFile* pFile, const TxnID& txnid,
3967         const ColStruct& colStruct, int width,
3968         int totalRow, const RID* rowIdArray, vector<LBIDRange>&   rangeList)
3969 {
3970     if (idbdatafile::IDBPolicy::useHdfs())
3971         return 0;
3972 
3973     RID         curRowId;
3974     int         rc = NO_ERROR;
3975     int         curFbo = 0, curBio, lastFbo = -1;
3976     bool        successFlag;
3977     BRM::LBID_t lbid;
3978     BRM::VER_t  verId = (BRM::VER_t) txnid;
3979     vector<uint32_t> fboList;
3980     LBIDRange   range;
3981     ColumnOp* colOp = m_colOp[op(colStruct.fCompressionType)];
3982 
3983     for (int i = 0; i < totalRow; i++)
3984     {
3985         curRowId = rowIdArray[i];
3986         //cout << "processVersionBuffer got rid " << curRowId << endl;
3987         successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / width, width, curFbo, curBio);
3988 
3989         if (successFlag)
3990         {
3991             if (curFbo != lastFbo)
3992             {
3993                 //cout << "processVersionBuffer is processing lbid  " << lbid << endl;
3994                 RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
3995                                     colStruct.dataOid, colStruct.fColPartition, colStruct.fColSegment, curFbo, lbid));
3996                 //cout << "processVersionBuffer is processing lbid  " << lbid << endl;
3997                 fboList.push_back((uint32_t)curFbo);
3998                 range.start = lbid;
3999                 range.size = 1;
4000                 rangeList.push_back(range);
4001             }
4002 
4003             lastFbo = curFbo;
4004         }
4005     }
4006 
4007     std::vector<VBRange> freeList;
4008     rc = BRMWrapper::getInstance()->
4009          writeVB(pFile, verId, colStruct.dataOid, fboList, rangeList, colOp, freeList, colStruct.fColDbRoot);
4010 
4011     return rc;
4012 }
4013 
processVersionBuffers(IDBDataFile * pFile,const TxnID & txnid,const ColStruct & colStruct,int width,int totalRow,const RIDList & ridList,vector<LBIDRange> & rangeList)4014 int WriteEngineWrapper::processVersionBuffers(IDBDataFile* pFile, const TxnID& txnid,
4015         const ColStruct& colStruct, int width,
4016         int totalRow, const RIDList& ridList,
4017         vector<LBIDRange>&    rangeList)
4018 {
4019     if (idbdatafile::IDBPolicy::useHdfs())
4020         return 0;
4021 
4022     RID         curRowId;
4023     int         rc = NO_ERROR;
4024     int         curFbo = 0, curBio, lastFbo = -1;
4025     bool        successFlag;
4026     BRM::LBID_t lbid;
4027     BRM::VER_t  verId = (BRM::VER_t) txnid;
4028     LBIDRange   range;
4029     vector<uint32_t> fboList;
4030     //vector<LBIDRange>   rangeList;
4031     ColumnOp* colOp = m_colOp[op(colStruct.fCompressionType)];
4032 
4033     for (int i = 0; i < totalRow; i++)
4034     {
4035         curRowId = ridList[i];
4036         //cout << "processVersionBuffer got rid " << curRowId << endl;
4037         successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / width, width, curFbo, curBio);
4038 
4039         if (successFlag)
4040         {
4041             if (curFbo != lastFbo)
4042             {
4043                 //cout << "processVersionBuffer is processing lbid  " << lbid << endl;
4044                 RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
4045                                     colStruct.dataOid, colStruct.fColPartition, colStruct.fColSegment, curFbo, lbid));
4046                 //cout << "processVersionBuffer is processing lbid  " << lbid << endl;
4047                 fboList.push_back((uint32_t)curFbo);
4048                 range.start = lbid;
4049                 range.size = 1;
4050                 rangeList.push_back(range);
4051             }
4052 
4053             lastFbo = curFbo;
4054         }
4055     }
4056 
4057 //cout << "calling writeVB with blocks " << rangeList.size() << endl;
4058     std::vector<VBRange> freeList;
4059     rc = BRMWrapper::getInstance()->
4060          writeVB(pFile, verId, colStruct.dataOid, fboList, rangeList, colOp, freeList, colStruct.fColDbRoot);
4061 
4062     return rc;
4063 }
4064 
processBeginVBCopy(const TxnID & txnid,const vector<ColStruct> & colStructList,const RIDList & ridList,std::vector<VBRange> & freeList,vector<vector<uint32_t>> & fboLists,vector<vector<LBIDRange>> & rangeLists,vector<LBIDRange> & rangeListTot)4065 int WriteEngineWrapper::processBeginVBCopy(const TxnID& txnid, const vector<ColStruct>& colStructList, const RIDList& ridList,
4066         std::vector<VBRange>& freeList, vector<vector<uint32_t> >& fboLists, vector<vector<LBIDRange> >& rangeLists,
4067         vector<LBIDRange>&   rangeListTot)
4068 {
4069     if (idbdatafile::IDBPolicy::useHdfs())
4070         return 0;
4071 
4072     RID         curRowId;
4073     int         rc = NO_ERROR;
4074     int         curFbo = 0, curBio, lastFbo = -1;
4075     bool        successFlag;
4076     BRM::LBID_t lbid;
4077     LBIDRange   range;
4078 
4079 //StopWatch timer;
4080 // timer.start("calculation");
4081     for (uint32_t j = 0; j < colStructList.size(); j++)
4082     {
4083         vector<uint32_t> fboList;
4084         vector<LBIDRange>    rangeList;
4085         lastFbo = -1;
4086         ColumnOp* colOp = m_colOp[op(colStructList[j].fCompressionType)];
4087 
4088         ColStruct curColStruct = colStructList[j];
4089         Convertor::convertColType(&curColStruct);
4090 
4091         for (uint32_t i = 0; i < ridList.size(); i++)
4092         {
4093             curRowId = ridList[i];
4094             //cout << "processVersionBuffer got rid " << curRowId << endl;
4095             successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / curColStruct.colWidth, curColStruct.colWidth, curFbo, curBio);
4096 
4097             if (successFlag)
4098             {
4099                 if (curFbo != lastFbo)
4100                 {
4101                     //cout << "processVersionBuffer is processing curFbo  " << curFbo << endl;
4102                     RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
4103                                         colStructList[j].dataOid, colStructList[j].fColPartition, colStructList[j].fColSegment, curFbo, lbid));
4104                     //cout << "beginVBCopy is processing lbid:transaction  " << lbid <<":"<<txnid<< endl;
4105                     fboList.push_back((uint32_t)curFbo);
4106                     range.start = lbid;
4107                     range.size = 1;
4108                     rangeList.push_back(range);
4109                 }
4110 
4111                 lastFbo = curFbo;
4112             }
4113         }
4114 
4115         BRMWrapper::getInstance()->pruneLBIDList(txnid, &rangeList, &fboList);
4116         rangeLists.push_back(rangeList);
4117 
4118         fboLists.push_back(fboList);
4119         rangeListTot.insert(rangeListTot.end(), rangeList.begin(), rangeList.end());
4120     }
4121 
4122     if (rangeListTot.size() > 0)
4123         rc = BRMWrapper::getInstance()->getDbrmObject()->beginVBCopy(txnid, colStructList[0].fColDbRoot, rangeListTot, freeList);
4124 
4125 //timer.stop("beginVBCopy");
4126 //timer.finish();
4127     return rc;
4128 }
4129 
4130 /**
4131 * @brief Process versioning for batch insert - only version the hwm block.
4132 */
4133 #if 0
4134 int WriteEngineWrapper::processBatchVersions(const TxnID& txnid, std::vector<Column> columns, std::vector<BRM::LBIDRange>&   rangeList)
4135 {
4136     int rc = 0;
4137     std::vector<DbFileOp*> fileOps;
4138 
4139     //open the column files
4140     for ( unsigned i = 0; i < columns.size(); i++)
4141     {
4142         ColumnOp* colOp = m_colOp[op(columns[i].compressionType)];
4143         Column curCol;
4144         // set params
4145         colOp->initColumn(curCol);
4146         ColType colType;
4147         Convertor::convertColType(columns[i].colDataType, colType);
4148         colOp->setColParam(curCol, 0, columns[i].colWidth,
4149                            columns[i].colDataType, colType, columns[i].dataFile.oid,
4150                            columns[i].compressionType,
4151                            columns[i].dataFile.fDbRoot, columns[i].dataFile.fPartition, columns[i].dataFile.fSegment);
4152         string segFile;
4153         rc = colOp->openColumnFile(curCol, segFile, IO_BUFF_SIZE);
4154 
4155         if (rc != NO_ERROR)
4156             break;
4157 
4158         columns[i].dataFile.pFile = curCol.dataFile.pFile;
4159         fileOps.push_back(colOp);
4160     }
4161 
4162     if ( rc == 0)
4163     {
4164         BRM::VER_t  verId = (BRM::VER_t) txnid;
4165         rc = BRMWrapper::getInstance()->writeBatchVBs(verId, columns, rangeList, fileOps);
4166     }
4167 
4168     //close files
4169     for ( unsigned i = 0; i < columns.size(); i++)
4170     {
4171         ColumnOp* colOp = dynamic_cast<ColumnOp*> (fileOps[i]);
4172         Column curCol;
4173         // set params
4174         colOp->initColumn(curCol);
4175         ColType colType;
4176         Convertor::convertColType(columns[i].colDataType, colType);
4177         colOp->setColParam(curCol, 0, columns[i].colWidth,
4178                            columns[i].colDataType, colType, columns[i].dataFile.oid,
4179                            columns[i].compressionType,
4180                            columns[i].dataFile.fDbRoot, columns[i].dataFile.fPartition, columns[i].dataFile.fSegment);
4181         curCol.dataFile.pFile = columns[i].dataFile.pFile;
4182         colOp->clearColumn(curCol);
4183     }
4184 
4185     return rc;
4186 }
4187 #endif
writeVBEnd(const TxnID & txnid,std::vector<BRM::LBIDRange> & rangeList)4188 void WriteEngineWrapper::writeVBEnd(const TxnID& txnid, std::vector<BRM::LBIDRange>&   rangeList)
4189 {
4190     if (idbdatafile::IDBPolicy::useHdfs())
4191         return;
4192 
4193     BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4194 }
4195 
updateColumnRec(const TxnID & txnid,vector<ColStructList> & colExtentsStruct,ColValueList & colValueList,vector<void * > & colOldValueList,vector<RIDList> & ridLists,vector<DctnryStructList> & dctnryExtentsStruct,DctnryValueList & dctnryValueList,const int32_t tableOid)4196 int WriteEngineWrapper::updateColumnRec(const TxnID& txnid,
4197                                         vector<ColStructList>& colExtentsStruct,
4198                                         ColValueList& colValueList,
4199                                         vector<void*>& colOldValueList,
4200                                         vector<RIDList>& ridLists,
4201                                         vector<DctnryStructList>& dctnryExtentsStruct,
4202                                         DctnryValueList& dctnryValueList,
4203                                         const int32_t tableOid)
4204 {
4205     int            rc = 0;
4206     //RID*           rowIdArray = NULL;
4207     //RIDList::size_type i;
4208     unsigned numExtents = colExtentsStruct.size();
4209     // ColValueList tmpColValueList;
4210     RIDList::const_iterator ridsIter;
4211     ColStructList colStructList;
4212     DctnryStructList dctnryStructList;
4213     ColumnOp* colOp = NULL;
4214 
4215     for (unsigned extent = 0; extent < numExtents; extent++)
4216     {
4217         ridsIter = ridLists[extent].begin();
4218 
4219         //rowIdArray = (RID*)calloc(sizeof(RID), ridLists[extent].size());
4220 
4221         colStructList = colExtentsStruct[extent];
4222         dctnryStructList = dctnryExtentsStruct[extent];
4223 
4224         if (m_opType != DELETE)
4225         {
4226 
4227             /*            ColTuple colTuple;
4228                      ColTupleList colTupleList;
4229                      for (i=0; i < colValueList.size(); i++)
4230                      {
4231                          colTupleList = colValueList[i];
4232                          colTuple = colTupleList[0];
4233                          for (unsigned i = 1; i < ridLists[extent].size(); i++)
4234                          {
4235                              colTupleList.push_back(colTuple);
4236                          }
4237                          tmpColValueList.push_back(colTupleList);
4238                      }
4239             */
4240             //Tokenize data if needed
4241             vector<Token> tokenList;
4242 
4243             DctColTupleList::iterator dctCol_iter;
4244             ColTupleList::iterator col_iter;
4245 
4246             for (unsigned i = 0; i < colStructList.size(); i++)
4247             {
4248                 if (colStructList[i].tokenFlag)
4249                 {
4250                     // only need to tokenize once
4251                     dctCol_iter = dctnryValueList[i].begin();
4252                     //col_iter = colValueList[i].begin();
4253                     Token token;
4254 
4255                     if (!dctCol_iter->isNull)
4256                     {
4257                         RETURN_ON_ERROR(tokenize(
4258                                             txnid, dctnryStructList[i], *dctCol_iter, true)); // @bug 5572 HDFS tmp file
4259                         token = dctCol_iter->token;
4260 
4261 #ifdef PROFILE
4262 //timer.stop("tokenize");
4263 #endif
4264                     }
4265                     else
4266                     {
4267                         //if (dctnryStructList[i].dctnryOid == 2001)
4268                         //	std::cout << " got null token for string " << dctCol_iter->sigValue <<std::endl;
4269                     }
4270 
4271                     //if (dctnryStructList[i].dctnryOid == 2001)
4272                     //std::cout << " got token for string " << dctCol_iter->sigValue << " op:fbo = " << token.op <<":"<<token.fbo << std::endl;
4273                     tokenList.push_back(token);
4274                 }
4275             }
4276 
4277             int dicPos = 0;
4278 
4279             for (unsigned i = 0; i < colStructList.size(); i++)
4280             {
4281                 if (colStructList[i].tokenFlag)
4282                 {
4283                     // only need to tokenize once
4284                     col_iter = colValueList[i].begin();
4285 
4286                     while (col_iter != colValueList[i].end())
4287                     {
4288                         col_iter->data = tokenList[dicPos];
4289                         col_iter++;
4290                     }
4291 
4292                     dicPos++;
4293                 }
4294             }
4295         }
4296 
4297         RIDList::iterator rid_iter;
4298         /*    i = 0;
4299               while (rid_iter != ridLists[extent].end())
4300               {
4301                  rowIdArray[i] = *rid_iter;
4302                  rid_iter++;
4303                  i++;
4304               }
4305         */
4306         //Mark extents invalid
4307         //if (colStructList[0].dataOid < 3000) {
4308         vector<BRM::LBID_t> lbids;
4309         vector<CalpontSystemCatalog::ColDataType> colDataTypes;
4310         bool successFlag = true;
4311         unsigned width = 0;
4312         int      curFbo = 0, curBio, lastFbo = -1;
4313         rid_iter = ridLists[extent].begin();
4314         RID aRid = *rid_iter;
4315 
4316         for (unsigned j = 0; j < colStructList.size(); j++)
4317         {
4318             colOp = m_colOp[op(colStructList[j].fCompressionType)];
4319 
4320             if (colStructList[j].tokenFlag)
4321                 continue;
4322 
4323             width = colOp->getCorrectRowWidth(colStructList[j].colDataType, colStructList[j].colWidth);
4324             successFlag = colOp->calculateRowId(aRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
4325 
4326             if (successFlag)
4327             {
4328                 if (curFbo != lastFbo)
4329                 {
4330                     RETURN_ON_ERROR(AddLBIDtoList(txnid,
4331                                                   lbids,
4332                                                   colDataTypes,
4333                                                   colStructList[j],
4334                                                   curFbo));
4335                 }
4336             }
4337         }
4338 
4339         //cout << "lbids size = " << lbids.size()<< endl;
4340 //#ifdef PROFILE
4341 //timer.start("markExtentsInvalid");
4342 //#endif
4343         if (lbids.size() > 0)
4344             rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
4345 
4346         //}
4347 
4348         if ( m_opType != DELETE)
4349             m_opType = UPDATE;
4350 
4351         rc = writeColumnRec(txnid, colStructList, colValueList, colOldValueList,
4352                             ridLists[extent], tableOid, true, ridLists[extent].size());
4353 
4354 //    if (rowIdArray)
4355 //       free(rowIdArray);
4356 
4357         m_opType = NOOP;
4358 
4359         if (rc != NO_ERROR)
4360             break;
4361     }
4362 
4363     return rc;
4364 }
4365 
updateColumnRecs(const TxnID & txnid,vector<ColStruct> & colExtentsStruct,ColValueList & colValueList,const RIDList & ridLists,const int32_t tableOid)4366 int WriteEngineWrapper::updateColumnRecs(const TxnID& txnid,
4367         vector<ColStruct>& colExtentsStruct,
4368         ColValueList& colValueList,
4369         const RIDList& ridLists,
4370         const int32_t tableOid)
4371 {
4372     //Mark extents invalid
4373     //int rc = 0;
4374     //if (colExtentsStruct[0].dataOid < 3000)
4375     //{
4376     vector<BRM::LBID_t> lbids;
4377     vector<CalpontSystemCatalog::ColDataType> colDataTypes;
4378     ColumnOp* colOp = NULL;
4379     bool successFlag = true;
4380     unsigned width = 0;
4381     \
4382     int curFbo = 0, curBio, lastFbo = -1;
4383     RID aRid = ridLists[0];
4384     int rc = 0;
4385 
4386     for (unsigned j = 0; j < colExtentsStruct.size(); j++)
4387     {
4388         colOp = m_colOp[op(colExtentsStruct[j].fCompressionType)];
4389 
4390         if (colExtentsStruct[j].tokenFlag)
4391             continue;
4392 
4393         width = colOp->getCorrectRowWidth(colExtentsStruct[j].colDataType, colExtentsStruct[j].colWidth);
4394         successFlag = colOp->calculateRowId(aRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
4395 
4396         if (successFlag)
4397         {
4398             if (curFbo != lastFbo)
4399             {
4400                 RETURN_ON_ERROR(AddLBIDtoList(txnid,
4401                                               lbids,
4402                                               colDataTypes,
4403                                               colExtentsStruct[j],
4404                                               curFbo));
4405             }
4406         }
4407     }
4408 
4409     if (lbids.size() > 0)
4410     {
4411 //        cout << "BRMWrapper::getInstance()->markExtentsInvalid(lbids); " << lbids.size() << " lbids" << endl;
4412         rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
4413     }
4414 
4415     //}
4416     if ( m_opType != DELETE)
4417         m_opType = UPDATE;
4418 
4419     rc = writeColumnRecords (txnid, colExtentsStruct, colValueList, ridLists, tableOid);
4420     m_opType = NOOP;
4421     return rc;
4422 }
4423 
writeColumnRecords(const TxnID & txnid,vector<ColStruct> & colStructList,ColValueList & colValueList,const RIDList & ridLists,const int32_t tableOid,bool versioning)4424 int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid,
4425         vector<ColStruct>& colStructList,
4426         ColValueList& colValueList,
4427         const RIDList& ridLists, const int32_t tableOid, bool versioning)
4428 {
4429     bool           bExcp;
4430     int            rc = 0;
4431     void*          valArray = NULL;
4432     Column         curCol;
4433     ColStruct      curColStruct;
4434     ColTupleList   curTupleList;
4435     ColStructList::size_type  totalColumn;
4436     ColStructList::size_type  i;
4437     ColTupleList::size_type   totalRow;
4438     setTransId(txnid);
4439     totalColumn = colStructList.size();
4440     totalRow = ridLists.size();
4441 
4442     TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
4443 
4444     for (i = 0; i < totalColumn; i++)
4445     {
4446         valArray = NULL;
4447         curColStruct = colStructList[i];
4448         curTupleList = colValueList[i];
4449         ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)];
4450 
4451         Convertor::convertColType(&curColStruct);
4452 
4453         // set params
4454         colOp->initColumn(curCol);
4455 
4456         colOp->setColParam(curCol, 0, curColStruct.colWidth,
4457                            curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid,
4458                            curColStruct.fCompressionType,
4459                            curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment);
4460 
4461         ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(curColStruct.dataOid);
4462         ColExtsInfo::iterator it = aColExtsInfo.begin();
4463 
4464         while (it != aColExtsInfo.end())
4465         {
4466             if ((it->dbRoot == curColStruct.fColDbRoot) && (it->partNum == curColStruct.fColPartition) && (it->segNum == curColStruct.fColSegment))
4467                 break;
4468 
4469             it++;
4470         }
4471 
4472         if (it == aColExtsInfo.end()) //add this one to the list
4473         {
4474             ColExtInfo aExt;
4475             aExt.dbRoot = curColStruct.fColDbRoot;
4476             aExt.partNum = curColStruct.fColPartition;
4477             aExt.segNum = curColStruct.fColSegment;
4478             aExt.compType = curColStruct.fCompressionType;
4479             aExt.isDict = false;
4480             aColExtsInfo.push_back(aExt);
4481             aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
4482         }
4483 
4484         string segFile;
4485         rc = colOp->openColumnFile(curCol, segFile, true); // @bug 5572 HDFS tmp file
4486 
4487         if (rc != NO_ERROR)
4488             break;
4489 
4490         vector<LBIDRange>   rangeList;
4491 
4492         if (versioning)
4493         {
4494             rc = processVersionBuffers(curCol.dataFile.pFile, txnid, curColStruct,
4495                                        curColStruct.colWidth, totalRow, ridLists, rangeList);
4496         }
4497 
4498         if (rc != NO_ERROR)
4499         {
4500             if (curColStruct.fCompressionType == 0)
4501             {
4502                 curCol.dataFile.pFile->flush();
4503             }
4504 
4505             BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4506             break;
4507         }
4508 
4509         switch (curColStruct.colType)
4510         {
4511             case WriteEngine::WR_INT:
4512             case WriteEngine::WR_MEDINT:
4513                 valArray = (int*) calloc(sizeof(int), totalRow);
4514                 break;
4515 
4516             case WriteEngine::WR_UINT:
4517             case WriteEngine::WR_UMEDINT:
4518                 valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow);
4519                 break;
4520 
4521             case WriteEngine::WR_VARBINARY : // treat same as char for now
4522             case WriteEngine::WR_CHAR:
4523             case WriteEngine::WR_BLOB:
4524             case WriteEngine::WR_TEXT:
4525                 valArray = (char*) calloc(sizeof(char), totalRow * MAX_COLUMN_BOUNDARY);
4526                 break;
4527 
4528             case WriteEngine::WR_FLOAT:
4529                 valArray = (float*) calloc(sizeof(float), totalRow);
4530                 break;
4531 
4532             case WriteEngine::WR_DOUBLE:
4533                 valArray = (double*) calloc(sizeof(double), totalRow);
4534                 break;
4535 
4536             case WriteEngine::WR_BYTE:
4537                 valArray = (char*) calloc(sizeof(char), totalRow);
4538                 break;
4539 
4540             case WriteEngine::WR_UBYTE:
4541                 valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow);
4542                 break;
4543 
4544             case WriteEngine::WR_SHORT:
4545                 valArray = (short*) calloc(sizeof(short), totalRow);
4546                 break;
4547 
4548             case WriteEngine::WR_USHORT:
4549                 valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow);
4550                 break;
4551 
4552             case WriteEngine::WR_LONGLONG:
4553                 valArray = (long long*) calloc(sizeof(long long), totalRow);
4554                 break;
4555 
4556             case WriteEngine::WR_ULONGLONG:
4557                 valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow);
4558                 break;
4559 
4560             case WriteEngine::WR_TOKEN:
4561                 valArray = (Token*) calloc(sizeof(Token), totalRow);
4562                 break;
4563         }
4564 
4565         // convert values to valArray
4566         bExcp = false;
4567 
4568         try
4569         {
4570             convertValArray(totalRow, curColStruct.colType, curTupleList, valArray);
4571         }
4572         catch (...)
4573         {
4574             bExcp = true;
4575         }
4576 
4577         if (bExcp)
4578         {
4579             BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4580             return ERR_PARSING;
4581         }
4582 
4583 #ifdef PROFILE
4584         timer.start("writeRow ");
4585 #endif
4586         rc = colOp->writeRowsValues(curCol, totalRow, ridLists, valArray);
4587 #ifdef PROFILE
4588         timer.stop("writeRow ");
4589 #endif
4590         colOp->clearColumn(curCol);
4591 
4592         if (curColStruct.fCompressionType == 0)
4593         {
4594             std::vector<BRM::FileInfo> files;
4595             BRM::FileInfo aFile;
4596             aFile.partitionNum = curColStruct.fColPartition;
4597             aFile.dbRoot = curColStruct.fColDbRoot;;
4598             aFile.segmentNum = curColStruct.fColSegment;
4599             aFile.compType = curColStruct.fCompressionType;
4600             files.push_back(aFile);
4601 
4602             if (idbdatafile::IDBPolicy::useHdfs())
4603                 cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
4604         }
4605 
4606         BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4607 
4608         if (valArray != NULL)
4609             free(valArray);
4610 
4611         // check error
4612         if (rc != NO_ERROR)
4613             break;
4614     }
4615 
4616     return rc;
4617 }
4618 
4619 /*@brief writeColumnRec - Write values to a column
4620 */
4621 /***********************************************************
4622  * DESCRIPTION:
4623  *    Write values to a column
4624  * PARAMETERS:
4625  *    tableOid - table object id
4626  *    colStructList - column struct list
4627  *    colValueList - column value list
4628  *    colNewStructList - the new extent struct list
4629  *    colNewValueList - column value list for the new extent
4630  *    rowIdArray -  row id list
4631  *    useTmpSuffix - use temp suffix for db output file
4632  * RETURN:
4633  *    NO_ERROR if success
4634  *    others if something wrong in inserting the value
4635  ***********************************************************/
writeColumnRec(const TxnID & txnid,const ColStructList & colStructList,ColValueList & colValueList,RID * rowIdArray,const ColStructList & newColStructList,ColValueList & newColValueList,const int32_t tableOid,bool useTmpSuffix,bool versioning)4636 int WriteEngineWrapper::writeColumnRec(const TxnID& txnid,
4637                                        const ColStructList& colStructList,
4638                                        ColValueList& colValueList,
4639                                        RID* rowIdArray,
4640                                        const ColStructList& newColStructList,
4641                                        ColValueList& newColValueList,
4642                                        const int32_t tableOid,
4643                                        bool useTmpSuffix,
4644                                        bool versioning)
4645 {
4646     bool           bExcp;
4647     int            rc = 0;
4648     void*          valArray;
4649     string         segFile;
4650     Column         curCol;
4651     ColTupleList   oldTupleList;
4652     ColStructList::size_type  totalColumn;
4653     ColStructList::size_type  i;
4654     ColTupleList::size_type   totalRow1, totalRow2;
4655 
4656     setTransId(txnid);
4657 
4658     totalColumn = colStructList.size();
4659 #ifdef PROFILE
4660     StopWatch timer;
4661 #endif
4662 
4663     if (newColValueList.size() > 0)
4664     {
4665         totalRow1 = colValueList[0].size();
4666         totalRow2 = newColValueList[0].size();
4667     }
4668     else
4669     {
4670         totalRow1 = colValueList[0].size();
4671         totalRow2 = 0;
4672     }
4673 
4674     TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
4675 
4676     for (i = 0; i < totalColumn; i++)
4677     {
4678         if (totalRow2 > 0)
4679         {
4680             RID* secondPart = rowIdArray + totalRow1;
4681 
4682             //@Bug 2205 Check if all rows go to the new extent
4683             if (totalRow1 > 0)
4684             {
4685                 //Write the first batch
4686                 valArray = NULL;
4687                 RID* firstPart = rowIdArray;
4688                 ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
4689 
4690                 // set params
4691                 colOp->initColumn(curCol);
4692                 // need to pass real dbRoot, partition, and segment to setColParam
4693                 colOp->setColParam(curCol, 0, colStructList[i].colWidth,
4694                                    colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
4695                                    colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
4696                                    colStructList[i].fColPartition, colStructList[i].fColSegment);
4697 
4698                 ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
4699                 ColExtsInfo::iterator it = aColExtsInfo.begin();
4700 
4701                 while (it != aColExtsInfo.end())
4702                 {
4703                     if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
4704                         break;
4705 
4706                     it++;
4707                 }
4708 
4709                 if (it == aColExtsInfo.end()) //add this one to the list
4710                 {
4711                     ColExtInfo aExt;
4712                     aExt.dbRoot = colStructList[i].fColDbRoot;
4713                     aExt.partNum = colStructList[i].fColPartition;
4714                     aExt.segNum = colStructList[i].fColSegment;
4715                     aExt.compType = colStructList[i].fCompressionType;
4716                     aColExtsInfo.push_back(aExt);
4717                     aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
4718                 }
4719 
4720                 rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
4721 
4722                 if (rc != NO_ERROR)
4723                     break;
4724 
4725                 // handling versioning
4726                 vector<LBIDRange>   rangeList;
4727 
4728                 if (versioning)
4729                 {
4730                     rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
4731                                               colStructList[i].colWidth, totalRow1, firstPart, rangeList);
4732 
4733                     if (rc != NO_ERROR)
4734                     {
4735                         if (colStructList[i].fCompressionType == 0)
4736                         {
4737                             curCol.dataFile.pFile->flush();
4738                         }
4739 
4740                         BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4741                         break;
4742                     }
4743                 }
4744 
4745                 //totalRow1 -= totalRow2;
4746                 // have to init the size here
4747                 // nullArray = (bool*) malloc(sizeof(bool) * totalRow);
4748                 switch (colStructList[i].colType)
4749                 {
4750                     case WriteEngine::WR_INT:
4751                     case WriteEngine::WR_MEDINT:
4752                         valArray = (int*) calloc(sizeof(int), totalRow1);
4753                         break;
4754 
4755                     case WriteEngine::WR_UINT:
4756                     case WriteEngine::WR_UMEDINT:
4757                         valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow1);
4758                         break;
4759 
4760                     case WriteEngine::WR_VARBINARY : // treat same as char for now
4761                     case WriteEngine::WR_CHAR:
4762                     case WriteEngine::WR_BLOB:
4763                     case WriteEngine::WR_TEXT:
4764                         valArray = (char*) calloc(sizeof(char), totalRow1 * MAX_COLUMN_BOUNDARY);
4765                         break;
4766 
4767                     case WriteEngine::WR_FLOAT:
4768                         valArray = (float*) calloc(sizeof(float), totalRow1);
4769                         break;
4770 
4771                     case WriteEngine::WR_DOUBLE:
4772                         valArray = (double*) calloc(sizeof(double), totalRow1);
4773                         break;
4774 
4775                     case WriteEngine::WR_BYTE:
4776                         valArray = (char*) calloc(sizeof(char), totalRow1);
4777                         break;
4778 
4779                     case WriteEngine::WR_UBYTE:
4780                         valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow1);
4781                         break;
4782 
4783                     case WriteEngine::WR_SHORT:
4784                         valArray = (short*) calloc(sizeof(short), totalRow1);
4785                         break;
4786 
4787                     case WriteEngine::WR_USHORT:
4788                         valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow1);
4789                         break;
4790 
4791                     case WriteEngine::WR_LONGLONG:
4792                         valArray = (long long*) calloc(sizeof(long long), totalRow1);
4793                         break;
4794 
4795                     case WriteEngine::WR_ULONGLONG:
4796                         valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow1);
4797                         break;
4798 
4799                     case WriteEngine::WR_TOKEN:
4800                         valArray = (Token*) calloc(sizeof(Token), totalRow1);
4801                         break;
4802                 }
4803 
4804                 // convert values to valArray
4805                 if (m_opType != DELETE)
4806                 {
4807                     bExcp = false;
4808 
4809                     try
4810                     {
4811                         convertValArray(totalRow1, colStructList[i].colType, colValueList[i], valArray);
4812                     }
4813                     catch (...)
4814                     {
4815                         bExcp = true;
4816                     }
4817 
4818                     if (bExcp)
4819                     {
4820                         if (versioning)
4821                             BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4822 
4823                         return ERR_PARSING;
4824                     }
4825 
4826 #ifdef PROFILE
4827                     timer.start("writeRow ");
4828 #endif
4829                     rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
4830 #ifdef PROFILE
4831                     timer.stop("writeRow ");
4832 #endif
4833                 }
4834                 else
4835                 {
4836 #ifdef PROFILE
4837                     timer.start("writeRow ");
4838 #endif
4839                     rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, true);
4840 #ifdef PROFILE
4841                     timer.stop("writeRow ");
4842 #endif
4843                 }
4844 
4845                 colOp->clearColumn(curCol);
4846 
4847                 if (versioning)
4848                     BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4849 
4850                 if (valArray != NULL)
4851                     free(valArray);
4852 
4853                 // check error
4854                 if (rc != NO_ERROR)
4855                     break;
4856             }
4857 
4858             //Process the second batch
4859             valArray = NULL;
4860 
4861             ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)];
4862 
4863             // set params
4864             colOp->initColumn(curCol);
4865             colOp->setColParam(curCol, 0, newColStructList[i].colWidth,
4866                                newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid,
4867                                newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot,
4868                                newColStructList[i].fColPartition, newColStructList[i].fColSegment);
4869 
4870             ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid);
4871             ColExtsInfo::iterator it = aColExtsInfo.begin();
4872 
4873             while (it != aColExtsInfo.end())
4874             {
4875                 if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == newColStructList[i].fColSegment))
4876                     break;
4877 
4878                 it++;
4879             }
4880 
4881             if (it == aColExtsInfo.end()) //add this one to the list
4882             {
4883                 ColExtInfo aExt;
4884                 aExt.dbRoot = newColStructList[i].fColDbRoot;
4885                 aExt.partNum = newColStructList[i].fColPartition;
4886                 aExt.segNum = newColStructList[i].fColSegment;
4887                 aExt.compType = newColStructList[i].fCompressionType;
4888                 aColExtsInfo.push_back(aExt);
4889                 aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
4890             }
4891 
4892             // Pass "false" for hdfs tmp file flag.  Since we only allow 1
4893             // extent per segment file (with HDFS), we can assume a second
4894             // extent is going to a new file (and won't need tmp file).
4895             rc = colOp->openColumnFile(curCol, segFile, false, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
4896 
4897             if (rc != NO_ERROR)
4898                 break;
4899 
4900             // handling versioning
4901             vector<LBIDRange>   rangeList;
4902 
4903             if (versioning)
4904             {
4905                 rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i],
4906                                           newColStructList[i].colWidth, totalRow2, secondPart, rangeList);
4907 
4908                 if (rc != NO_ERROR)
4909                 {
4910                     if (newColStructList[i].fCompressionType == 0)
4911                     {
4912                         curCol.dataFile.pFile->flush();
4913                     }
4914 
4915                     BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4916                     break;
4917                 }
4918             }
4919 
4920             //totalRow1 -= totalRow2;
4921             // have to init the size here
4922 //       nullArray = (bool*) malloc(sizeof(bool) * totalRow);
4923             switch (newColStructList[i].colType)
4924             {
4925                 case WriteEngine::WR_INT:
4926                 case WriteEngine::WR_MEDINT:
4927                     valArray = (int*) calloc(sizeof(int), totalRow2);
4928                     break;
4929 
4930                 case WriteEngine::WR_UINT:
4931                 case WriteEngine::WR_UMEDINT:
4932                     valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow2);
4933                     break;
4934 
4935                 case WriteEngine::WR_VARBINARY : // treat same as char for now
4936                 case WriteEngine::WR_CHAR:
4937                 case WriteEngine::WR_BLOB:
4938                 case WriteEngine::WR_TEXT:
4939                     valArray = (char*) calloc(sizeof(char), totalRow2 * MAX_COLUMN_BOUNDARY);
4940                     break;
4941 
4942                 case WriteEngine::WR_FLOAT:
4943                     valArray = (float*) calloc(sizeof(float), totalRow2);
4944                     break;
4945 
4946                 case WriteEngine::WR_DOUBLE:
4947                     valArray = (double*) calloc(sizeof(double), totalRow2);
4948                     break;
4949 
4950                 case WriteEngine::WR_BYTE:
4951                     valArray = (char*) calloc(sizeof(char), totalRow2);
4952                     break;
4953 
4954                 case WriteEngine::WR_UBYTE:
4955                     valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow2);
4956                     break;
4957 
4958                 case WriteEngine::WR_SHORT:
4959                     valArray = (short*) calloc(sizeof(short), totalRow2);
4960                     break;
4961 
4962                 case WriteEngine::WR_USHORT:
4963                     valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow2);
4964                     break;
4965 
4966                 case WriteEngine::WR_LONGLONG:
4967                     valArray = (long long*) calloc(sizeof(long long), totalRow2);
4968                     break;
4969 
4970                 case WriteEngine::WR_ULONGLONG:
4971                     valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow2);
4972                     break;
4973 
4974                 case WriteEngine::WR_TOKEN:
4975                     valArray = (Token*) calloc(sizeof(Token), totalRow2);
4976                     break;
4977             }
4978 
4979             // convert values to valArray
4980             if (m_opType != DELETE)
4981             {
4982                 bExcp = false;
4983 
4984                 try
4985                 {
4986                     convertValArray(totalRow2, newColStructList[i].colType, newColValueList[i], valArray);
4987                 }
4988                 catch (...)
4989                 {
4990                     bExcp = true;
4991                 }
4992 
4993                 if (bExcp)
4994                 {
4995                     if (versioning)
4996                         BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4997 
4998                     return ERR_PARSING;
4999                 }
5000 
5001 #ifdef PROFILE
5002                 timer.start("writeRow ");
5003 #endif
5004                 rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray);
5005 #ifdef PROFILE
5006                 timer.stop("writeRow ");
5007 #endif
5008             }
5009             else
5010             {
5011 #ifdef PROFILE
5012                 timer.start("writeRow ");
5013 #endif
5014                 rc = colOp->writeRow(curCol, totalRow2, rowIdArray, valArray, true);
5015 #ifdef PROFILE
5016                 timer.stop("writeRow ");
5017 #endif
5018             }
5019 
5020 
5021             colOp->clearColumn(curCol);
5022 
5023             if (versioning)
5024                 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5025 
5026             if (valArray != NULL)
5027                 free(valArray);
5028 
5029             // check error
5030             if (rc != NO_ERROR)
5031                 break;
5032         }
5033         else
5034         {
5035             valArray = NULL;
5036 
5037             ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
5038 
5039             // set params
5040             colOp->initColumn(curCol);
5041             colOp->setColParam(curCol, 0, colStructList[i].colWidth,
5042                                colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
5043                                colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
5044                                colStructList[i].fColPartition, colStructList[i].fColSegment);
5045 
5046             rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
5047 
5048             //cout << " Opened file oid " << curCol.dataFile.pFile << endl;
5049             if (rc != NO_ERROR)
5050                 break;
5051 
5052             ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
5053             ColExtsInfo::iterator it = aColExtsInfo.begin();
5054 
5055             while (it != aColExtsInfo.end())
5056             {
5057                 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
5058                     break;
5059 
5060                 it++;
5061             }
5062 
5063             if (it == aColExtsInfo.end()) //add this one to the list
5064             {
5065                 ColExtInfo aExt;
5066                 aExt.dbRoot = colStructList[i].fColDbRoot;
5067                 aExt.partNum = colStructList[i].fColPartition;
5068                 aExt.segNum = colStructList[i].fColSegment;
5069                 aExt.compType = colStructList[i].fCompressionType;
5070                 aColExtsInfo.push_back(aExt);
5071                 aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
5072             }
5073 
5074             // handling versioning
5075             vector<LBIDRange>   rangeList;
5076 
5077             if (versioning)
5078             {
5079                 rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
5080                                           colStructList[i].colWidth, totalRow1, rowIdArray, rangeList);
5081 
5082                 if (rc != NO_ERROR)
5083                 {
5084                     if (colStructList[i].fCompressionType == 0)
5085                     {
5086                         curCol.dataFile.pFile->flush();
5087                     }
5088 
5089                     BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5090                     break;
5091                 }
5092             }
5093 
5094             // have to init the size here
5095 //       nullArray = (bool*) malloc(sizeof(bool) * totalRow);
5096             switch (colStructList[i].colType)
5097             {
5098                 case WriteEngine::WR_INT:
5099                 case WriteEngine::WR_MEDINT:
5100                     valArray = (int*) calloc(sizeof(int), totalRow1);
5101                     break;
5102 
5103                 case WriteEngine::WR_UINT:
5104                 case WriteEngine::WR_UMEDINT:
5105                     valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow1);
5106                     break;
5107 
5108                 case WriteEngine::WR_VARBINARY : // treat same as char for now
5109                 case WriteEngine::WR_CHAR:
5110                 case WriteEngine::WR_BLOB:
5111                 case WriteEngine::WR_TEXT:
5112                     valArray = (char*) calloc(sizeof(char), totalRow1 * MAX_COLUMN_BOUNDARY);
5113                     break;
5114 
5115                 case WriteEngine::WR_FLOAT:
5116                     valArray = (float*) calloc(sizeof(float), totalRow1);
5117                     break;
5118 
5119                 case WriteEngine::WR_DOUBLE:
5120                     valArray = (double*) calloc(sizeof(double), totalRow1);
5121                     break;
5122 
5123                 case WriteEngine::WR_BYTE:
5124                     valArray = (char*) calloc(sizeof(char), totalRow1);
5125                     break;
5126 
5127                 case WriteEngine::WR_UBYTE:
5128                     valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow1);
5129                     break;
5130 
5131                 case WriteEngine::WR_SHORT:
5132                     valArray = (short*) calloc(sizeof(short), totalRow1);
5133                     break;
5134 
5135                 case WriteEngine::WR_USHORT:
5136                     valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow1);
5137                     break;
5138 
5139                 case WriteEngine::WR_LONGLONG:
5140                     valArray = (long long*) calloc(sizeof(long long), totalRow1);
5141                     break;
5142 
5143                 case WriteEngine::WR_ULONGLONG:
5144                     valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow1);
5145                     break;
5146 
5147                 case WriteEngine::WR_TOKEN:
5148                     valArray = (Token*) calloc(sizeof(Token), totalRow1);
5149                     break;
5150             }
5151 
5152             // convert values to valArray
5153             if (m_opType != DELETE)
5154             {
5155                 bExcp = false;
5156 
5157                 try
5158                 {
5159                     convertValArray(totalRow1, colStructList[i].colType, colValueList[i], valArray);
5160                 }
5161                 catch (...)
5162                 {
5163                     bExcp = true;
5164                 }
5165 
5166                 if (bExcp)
5167                 {
5168                     if (versioning)
5169                         BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5170 
5171                     return ERR_PARSING;
5172                 }
5173 
5174 #ifdef PROFILE
5175                 timer.start("writeRow ");
5176 #endif
5177                 rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray);
5178 #ifdef PROFILE
5179                 timer.stop("writeRow ");
5180 #endif
5181             }
5182             else
5183             {
5184 #ifdef PROFILE
5185                 timer.start("writeRow ");
5186 #endif
5187                 rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, true);
5188 #ifdef PROFILE
5189                 timer.stop("writeRow ");
5190 #endif
5191             }
5192 
5193             colOp->clearColumn(curCol);
5194 
5195             if (versioning)
5196                 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5197 
5198             if (valArray != NULL)
5199                 free(valArray);
5200 
5201             // check error
5202             if (rc != NO_ERROR)
5203                 break;
5204         }
5205     } // end of for (i = 0
5206 
5207 #ifdef PROFILE
5208     timer.finish();
5209 #endif
5210     return rc;
5211 }
5212 
writeColumnRecBinary(const TxnID & txnid,const ColStructList & colStructList,std::vector<uint64_t> & colValueList,RID * rowIdArray,const ColStructList & newColStructList,std::vector<uint64_t> & newColValueList,const int32_t tableOid,bool useTmpSuffix,bool versioning)5213 int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
5214         const ColStructList& colStructList,
5215         std::vector<uint64_t>& colValueList,
5216         RID* rowIdArray,
5217         const ColStructList& newColStructList,
5218         std::vector<uint64_t>& newColValueList,
5219         const int32_t tableOid,
5220         bool useTmpSuffix,
5221         bool versioning)
5222 {
5223     int            rc = 0;
5224     void*          valArray = NULL;
5225     string         segFile;
5226     Column         curCol;
5227     ColStructList::size_type  totalColumn;
5228     ColStructList::size_type  i;
5229     size_t   totalRow1, totalRow2;
5230 
5231     setTransId(txnid);
5232 
5233     totalColumn = colStructList.size();
5234 #ifdef PROFILE
5235     StopWatch timer;
5236 #endif
5237 
5238     totalRow1 = colValueList.size() / totalColumn;
5239 
5240     if (newColValueList.size() > 0)
5241     {
5242         totalRow2 = newColValueList.size() / newColStructList.size();
5243         totalRow1 -= totalRow2;
5244     }
5245     else
5246     {
5247         totalRow2 = 0;
5248     }
5249 
5250     // It is possible totalRow1 is zero but totalRow2 has values
5251     if ((totalRow1 == 0) && (totalRow2 == 0))
5252         return rc;
5253 
5254     TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
5255 
5256     if (totalRow1)
5257     {
5258         valArray = malloc(sizeof(uint64_t) * totalRow1);
5259 
5260         for (i = 0; i < totalColumn; i++)
5261         {
5262             //@Bug 2205 Check if all rows go to the new extent
5263             //Write the first batch
5264             RID* firstPart = rowIdArray;
5265             ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
5266 
5267             // set params
5268             colOp->initColumn(curCol);
5269             // need to pass real dbRoot, partition, and segment to setColParam
5270             colOp->setColParam(curCol, 0, colStructList[i].colWidth,
5271                                colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
5272                                colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
5273                                colStructList[i].fColPartition, colStructList[i].fColSegment);
5274 
5275             ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
5276             ColExtsInfo::iterator it = aColExtsInfo.begin();
5277 
5278             while (it != aColExtsInfo.end())
5279             {
5280                 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
5281                     break;
5282 
5283                 it++;
5284             }
5285 
5286             if (it == aColExtsInfo.end()) //add this one to the list
5287             {
5288                 ColExtInfo aExt;
5289                 aExt.dbRoot = colStructList[i].fColDbRoot;
5290                 aExt.partNum = colStructList[i].fColPartition;
5291                 aExt.segNum = colStructList[i].fColSegment;
5292                 aExt.compType = colStructList[i].fCompressionType;
5293                 aColExtsInfo.push_back(aExt);
5294                 aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
5295             }
5296 
5297             rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
5298 
5299             if (rc != NO_ERROR)
5300                 break;
5301 
5302             // handling versioning
5303             vector<LBIDRange>   rangeList;
5304 
5305             if (versioning)
5306             {
5307                 rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
5308                                           colStructList[i].colWidth, totalRow1, firstPart, rangeList);
5309 
5310                 if (rc != NO_ERROR)
5311                 {
5312                     if (colStructList[i].fCompressionType == 0)
5313                     {
5314                         curCol.dataFile.pFile->flush();
5315                     }
5316 
5317                     BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5318                     break;
5319                 }
5320             }
5321 
5322             //totalRow1 -= totalRow2;
5323             // have to init the size here
5324             // nullArray = (bool*) malloc(sizeof(bool) * totalRow);
5325             uint8_t tmp8;
5326             uint16_t tmp16;
5327             uint32_t tmp32;
5328 
5329             for (size_t j = 0; j < totalRow1; j++)
5330             {
5331                 uint64_t curValue = colValueList[((totalRow1 + totalRow2) * i) + j];
5332 
5333                 switch (colStructList[i].colType)
5334                 {
5335                     case WriteEngine::WR_VARBINARY : // treat same as char for now
5336                     case WriteEngine::WR_CHAR:
5337                     case WriteEngine::WR_BLOB:
5338                     case WriteEngine::WR_TEXT:
5339                         ((uint64_t*)valArray)[j] = curValue;
5340                         break;
5341 
5342                     case WriteEngine::WR_INT:
5343                     case WriteEngine::WR_UINT:
5344                     case WriteEngine::WR_MEDINT:
5345                     case WriteEngine::WR_UMEDINT:
5346                     case WriteEngine::WR_FLOAT:
5347                         tmp32 = curValue;
5348                         ((uint32_t*)valArray)[j] = tmp32;
5349                         break;
5350 
5351                     case WriteEngine::WR_ULONGLONG:
5352                     case WriteEngine::WR_LONGLONG:
5353                     case WriteEngine::WR_DOUBLE:
5354                     case WriteEngine::WR_TOKEN:
5355                         ((uint64_t*)valArray)[j] = curValue;
5356                         break;
5357 
5358                     case WriteEngine::WR_BYTE:
5359                     case WriteEngine::WR_UBYTE:
5360                         tmp8 = curValue;
5361                         ((uint8_t*)valArray)[j] = tmp8;
5362                         break;
5363 
5364                     case WriteEngine::WR_SHORT:
5365                     case WriteEngine::WR_USHORT:
5366                         tmp16 = curValue;
5367                         ((uint16_t*)valArray)[j] = tmp16;
5368                         break;
5369                 }
5370             }
5371 
5372 
5373 #ifdef PROFILE
5374             timer.start("writeRow ");
5375 #endif
5376             rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
5377 #ifdef PROFILE
5378             timer.stop("writeRow ");
5379 #endif
5380             colOp->closeColumnFile(curCol);
5381 
5382             if (versioning)
5383                 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5384 
5385             // check error
5386             if (rc != NO_ERROR)
5387                 break;
5388 
5389         } // end of for (i = 0
5390 
5391         if (valArray != NULL)
5392         {
5393             free(valArray);
5394             valArray = NULL;
5395         }
5396     }
5397 
5398     // MCOL-1176 - Write second extent
5399     if (totalRow2)
5400     {
5401         valArray = malloc(sizeof(uint64_t) * totalRow2);
5402 
5403         for (i = 0; i < newColStructList.size(); i++)
5404         {
5405             //@Bug 2205 Check if all rows go to the new extent
5406             //Write the first batch
5407             RID* secondPart = rowIdArray + totalRow1;
5408             ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)];
5409 
5410             // set params
5411             colOp->initColumn(curCol);
5412             // need to pass real dbRoot, partition, and segment to setColParam
5413             colOp->setColParam(curCol, 0, newColStructList[i].colWidth,
5414                                newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid,
5415                                newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot,
5416                                newColStructList[i].fColPartition, newColStructList[i].fColSegment);
5417 
5418             ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid);
5419             ColExtsInfo::iterator it = aColExtsInfo.begin();
5420 
5421             while (it != aColExtsInfo.end())
5422             {
5423                 if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
5424                     break;
5425 
5426                 it++;
5427             }
5428 
5429             if (it == aColExtsInfo.end()) //add this one to the list
5430             {
5431                 ColExtInfo aExt;
5432                 aExt.dbRoot = newColStructList[i].fColDbRoot;
5433                 aExt.partNum = newColStructList[i].fColPartition;
5434                 aExt.segNum = newColStructList[i].fColSegment;
5435                 aExt.compType = newColStructList[i].fCompressionType;
5436                 aColExtsInfo.push_back(aExt);
5437                 aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
5438             }
5439 
5440             rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
5441 
5442             if (rc != NO_ERROR)
5443                 break;
5444 
5445             // handling versioning
5446             vector<LBIDRange>   rangeList;
5447 
5448             if (versioning)
5449             {
5450                 rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i],
5451                                           newColStructList[i].colWidth, totalRow2, secondPart, rangeList);
5452 
5453                 if (rc != NO_ERROR)
5454                 {
5455                     if (newColStructList[i].fCompressionType == 0)
5456                     {
5457                         curCol.dataFile.pFile->flush();
5458                     }
5459 
5460                     BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5461                     break;
5462                 }
5463             }
5464 
5465             //totalRow1 -= totalRow2;
5466             // have to init the size here
5467             // nullArray = (bool*) malloc(sizeof(bool) * totalRow);
5468             uint8_t tmp8;
5469             uint16_t tmp16;
5470             uint32_t tmp32;
5471 
5472             for (size_t j = 0; j < totalRow2; j++)
5473             {
5474                 uint64_t curValue = newColValueList[(totalRow2 * i) + j];
5475 
5476                 switch (newColStructList[i].colType)
5477                 {
5478                     case WriteEngine::WR_VARBINARY : // treat same as char for now
5479                     case WriteEngine::WR_CHAR:
5480                     case WriteEngine::WR_BLOB:
5481                     case WriteEngine::WR_TEXT:
5482                         ((uint64_t*)valArray)[j] = curValue;
5483                         break;
5484 
5485                     case WriteEngine::WR_INT:
5486                     case WriteEngine::WR_UINT:
5487                     case WriteEngine::WR_MEDINT:
5488                     case WriteEngine::WR_UMEDINT:
5489                     case WriteEngine::WR_FLOAT:
5490                         tmp32 = curValue;
5491                         ((uint32_t*)valArray)[j] = tmp32;
5492                         break;
5493 
5494                     case WriteEngine::WR_ULONGLONG:
5495                     case WriteEngine::WR_LONGLONG:
5496                     case WriteEngine::WR_DOUBLE:
5497                     case WriteEngine::WR_TOKEN:
5498                         ((uint64_t*)valArray)[j] = curValue;
5499                         break;
5500 
5501                     case WriteEngine::WR_BYTE:
5502                     case WriteEngine::WR_UBYTE:
5503                         tmp8 = curValue;
5504                         ((uint8_t*)valArray)[j] = tmp8;
5505                         break;
5506 
5507                     case WriteEngine::WR_SHORT:
5508                     case WriteEngine::WR_USHORT:
5509                         tmp16 = curValue;
5510                         ((uint16_t*)valArray)[j] = tmp16;
5511                         break;
5512                 }
5513             }
5514 
5515 
5516 #ifdef PROFILE
5517             timer.start("writeRow ");
5518 #endif
5519             rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray);
5520 #ifdef PROFILE
5521             timer.stop("writeRow ");
5522 #endif
5523             colOp->closeColumnFile(curCol);
5524 
5525             if (versioning)
5526                 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5527 
5528             // check error
5529             if (rc != NO_ERROR)
5530                 break;
5531 
5532         } // end of for (i = 0
5533     }
5534 
5535     if (valArray != NULL)
5536         free(valArray);
5537 
5538 
5539 #ifdef PROFILE
5540     timer.finish();
5541 #endif
5542     return rc;
5543 }
5544 
5545 
writeColumnRec(const TxnID & txnid,const ColStructList & colStructList,const ColValueList & colValueList,vector<void * > & colOldValueList,const RIDList & ridList,const int32_t tableOid,bool convertStructFlag,ColTupleList::size_type nRows)5546 int WriteEngineWrapper::writeColumnRec(const TxnID& txnid,
5547                                        const ColStructList& colStructList,
5548                                        const ColValueList& colValueList,
5549                                        vector<void*>& colOldValueList,
5550                                        const RIDList& ridList,
5551                                        const int32_t tableOid,
5552                                        bool convertStructFlag,
5553                                        ColTupleList::size_type nRows)
5554 {
5555     bool           bExcp;
5556     int            rc = 0;
5557     void*          valArray = NULL;
5558     Column         curCol;
5559     ColStruct      curColStruct;
5560     ColTupleList   curTupleList, oldTupleList;
5561     ColStructList::size_type  totalColumn;
5562     ColStructList::size_type  i;
5563     ColTupleList::size_type   totalRow;
5564 
5565     setTransId(txnid);
5566     colOldValueList.clear();
5567     totalColumn = colStructList.size();
5568     totalRow = nRows;
5569 
5570 #ifdef PROFILE
5571     StopWatch timer;
5572 #endif
5573 
5574     vector<LBIDRange>   rangeListTot;
5575     std::vector<VBRange> freeList;
5576     vector<vector<uint32_t> > fboLists;
5577     vector<vector<LBIDRange> > rangeLists;
5578     rc = processBeginVBCopy(txnid, colStructList, ridList, freeList, fboLists, rangeLists, rangeListTot);
5579 
5580     if (rc != NO_ERROR)
5581     {
5582         if (rangeListTot.size() > 0)
5583             BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);
5584 
5585         switch (rc)
5586         {
5587             case BRM::ERR_DEADLOCK:
5588                 return ERR_BRM_DEAD_LOCK;
5589 
5590             case BRM::ERR_VBBM_OVERFLOW:
5591                 return ERR_BRM_VB_OVERFLOW;
5592 
5593             case BRM::ERR_NETWORK:
5594                 return ERR_BRM_NETWORK;
5595 
5596             case BRM::ERR_READONLY:
5597                 return ERR_BRM_READONLY;
5598 
5599             default:
5600                 return ERR_BRM_BEGIN_COPY;
5601         }
5602     }
5603 
5604     VBRange aRange;
5605     uint32_t blocksProcessedThisOid = 0;
5606     uint32_t blocksProcessed = 0;
5607     std::vector<BRM::FileInfo> files;
5608     TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
5609 
5610     for (i = 0; i < totalColumn; i++)
5611     {
5612         valArray = NULL;
5613         curColStruct = colStructList[i];
5614         curTupleList = colValueList[i]; //same value for all rows
5615         ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)];
5616 
5617         // convert column data type
5618         if (convertStructFlag)
5619             Convertor::convertColType(&curColStruct);
5620 
5621         // set params
5622         colOp->initColumn(curCol);
5623         colOp->setColParam(curCol, 0, curColStruct.colWidth,
5624                            curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid,
5625                            curColStruct.fCompressionType, curColStruct.fColDbRoot,
5626                            curColStruct.fColPartition, curColStruct.fColSegment);
5627 
5628 
5629         ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(curColStruct.dataOid);
5630         ColExtsInfo::iterator it = aColExtsInfo.begin();
5631 
5632         while (it != aColExtsInfo.end())
5633         {
5634             if ((it->dbRoot == curColStruct.fColDbRoot) && (it->partNum == curColStruct.fColPartition) && (it->segNum == curColStruct.fColSegment))
5635                 break;
5636 
5637             it++;
5638         }
5639 
5640         if (it == aColExtsInfo.end()) //add this one to the list
5641         {
5642             ColExtInfo aExt;
5643             aExt.dbRoot = curColStruct.fColDbRoot;
5644             aExt.partNum = curColStruct.fColPartition;
5645             aExt.segNum = curColStruct.fColSegment;
5646             aExt.compType = curColStruct.fCompressionType;
5647             aColExtsInfo.push_back(aExt);
5648             aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
5649         }
5650 
5651         string segFile;
5652         rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
5653 
5654         if (rc != NO_ERROR)
5655             break;
5656 
5657         if (curColStruct.fCompressionType == 0)
5658         {
5659             BRM::FileInfo aFile;
5660             aFile.oid = curColStruct.dataOid;
5661             aFile.partitionNum = curColStruct.fColPartition;
5662             aFile.dbRoot = curColStruct.fColDbRoot;;
5663             aFile.segmentNum = curColStruct.fColSegment;
5664             aFile.compType = curColStruct.fCompressionType;
5665             files.push_back(aFile);
5666         }
5667 
5668         // handling versioning
5669         //cout << " pass to processVersionBuffer rid " << rowIdArray[0] << endl;
5670         //cout << "dataOid:fColPartition = " << curColStruct.dataOid << ":" << curColStruct.fColPartition << endl;
5671 //timer.start("processVersionBuffers");
5672         //vector<LBIDRange>   rangeList;
5673         // rc = processVersionBuffers(curCol.dataFile.pFile, txnid, curColStruct, curColStruct.colWidth, totalRow, ridList, rangeList);
5674         std::vector<VBRange> curFreeList;
5675         uint32_t blockUsed = 0;
5676 
5677         if (!idbdatafile::IDBPolicy::useHdfs())
5678         {
5679             if (rangeListTot.size() > 0)
5680             {
5681                 if (freeList[0].size >= (blocksProcessed + rangeLists[i].size()))
5682                 {
5683                     aRange.vbOID = freeList[0].vbOID;
5684                     aRange.vbFBO = freeList[0].vbFBO + blocksProcessed;
5685                     aRange.size = rangeLists[i].size();
5686                     curFreeList.push_back(aRange);
5687                     //cout << "range size = " << aRange.size <<" and blocksProcessed = " << blocksProcessed<< endl;
5688                 }
5689                 else
5690                 {
5691                     aRange.vbOID = freeList[0].vbOID;
5692                     aRange.vbFBO = freeList[0].vbFBO + blocksProcessed;
5693                     aRange.size = freeList[0].size - blocksProcessed;
5694                     blockUsed = aRange.size;
5695                     curFreeList.push_back(aRange);
5696 
5697                     if (freeList.size() > 1)
5698                     {
5699                         aRange.vbOID = freeList[1].vbOID;
5700                         aRange.vbFBO = freeList[1].vbFBO + blocksProcessedThisOid;
5701                         aRange.size = rangeLists[i].size() - blockUsed;
5702                         curFreeList.push_back(aRange);
5703                         blocksProcessedThisOid += aRange.size;
5704                     }
5705                     else
5706                     {
5707                         rc = 1;
5708                         break;
5709                     }
5710 
5711                     //cout << "curFreeList size = " << curFreeList.size() << endl;
5712 
5713                 }
5714 
5715                 blocksProcessed += rangeLists[i].size();
5716 
5717                 //timer.start("Delete:writeVB");
5718                 rc = BRMWrapper::getInstance()->
5719                      writeVB(curCol.dataFile.pFile, (BRM::VER_t)txnid,
5720                              curColStruct.dataOid, fboLists[i], rangeLists[i],
5721                              colOp, curFreeList, curColStruct.fColDbRoot, true);
5722             }
5723         }
5724 
5725         //timer.stop("Delete:writeVB");
5726 //timer.stop("processVersionBuffers");
5727         // cout << " rc for processVersionBuffer is " << rc << endl;
5728         if (rc != NO_ERROR)
5729         {
5730             if (curColStruct.fCompressionType == 0)
5731             {
5732                 curCol.dataFile.pFile->flush();
5733             }
5734 
5735             if (rangeListTot.size() > 0)
5736                 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);
5737 
5738             break;
5739         }
5740 
5741         switch (curColStruct.colType)
5742         {
5743             case WriteEngine::WR_INT:
5744             case WriteEngine::WR_MEDINT:
5745                 valArray = (int*) calloc(sizeof(int), 1);
5746                 break;
5747 
5748             case WriteEngine::WR_UINT:
5749             case WriteEngine::WR_UMEDINT:
5750                 valArray = (uint32_t*) calloc(sizeof(uint32_t), 1);
5751                 break;
5752 
5753             case WriteEngine::WR_VARBINARY : // treat same as char for now
5754             case WriteEngine::WR_CHAR:
5755             case WriteEngine::WR_BLOB:
5756             case WriteEngine::WR_TEXT:
5757                 valArray = (char*) calloc(sizeof(char), 1 * MAX_COLUMN_BOUNDARY);
5758                 break;
5759 
5760             case WriteEngine::WR_FLOAT:
5761                 valArray = (float*) calloc(sizeof(float), 1);
5762                 break;
5763 
5764             case WriteEngine::WR_DOUBLE:
5765                 valArray = (double*) calloc(sizeof(double), 1);
5766                 break;
5767 
5768             case WriteEngine::WR_BYTE:
5769                 valArray = (char*) calloc(sizeof(char), 1);
5770                 break;
5771 
5772             case WriteEngine::WR_UBYTE:
5773                 valArray = (uint8_t*) calloc(sizeof(uint8_t), 1);
5774                 break;
5775 
5776             case WriteEngine::WR_SHORT:
5777                 valArray = (short*) calloc(sizeof(short), 1);
5778                 break;
5779 
5780             case WriteEngine::WR_USHORT:
5781                 valArray = (uint16_t*) calloc(sizeof(uint16_t), 1);
5782                 break;
5783 
5784             case WriteEngine::WR_LONGLONG:
5785                 valArray = (long long*) calloc(sizeof(long long), 1);
5786                 break;
5787 
5788             case WriteEngine::WR_ULONGLONG:
5789                 valArray = (uint64_t*) calloc(sizeof(uint64_t), 1);
5790                 break;
5791 
5792             case WriteEngine::WR_TOKEN:
5793                 valArray = (Token*) calloc(sizeof(Token), 1);
5794                 break;
5795         }
5796 
5797         // convert values to valArray
5798         if (m_opType != DELETE)
5799         {
5800             bExcp = false;
5801             ColTuple    curTuple;
5802             curTuple = curTupleList[0];
5803 
5804             try
5805             {
5806                 convertValue(curColStruct.colType, valArray, curTuple.data);
5807             }
5808             catch (...)
5809             {
5810                 bExcp = true;
5811             }
5812 
5813             if (bExcp)
5814             {
5815                 if (rangeListTot.size() > 0)
5816                     BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);
5817 
5818                 return ERR_PARSING;
5819             }
5820 
5821 #ifdef PROFILE
5822             timer.start("writeRow ");
5823 #endif
5824             rc = colOp->writeRows(curCol, totalRow, ridList, valArray);
5825 #ifdef PROFILE
5826             timer.stop("writeRow ");
5827 #endif
5828         }
5829         else
5830         {
5831 #ifdef PROFILE
5832             timer.start("writeRows ");
5833 #endif
5834             rc = colOp->writeRows(curCol, totalRow, ridList, valArray, 0, true);
5835 #ifdef PROFILE
5836             timer.stop("writeRows ");
5837 #endif
5838         }
5839 
5840 //     colOldValueList.push_back(oldValArray);
5841 //timer.start("Delete:closefile");
5842         colOp->clearColumn(curCol);
5843 
5844 //timer.stop("Delete:closefile");
5845         if (valArray != NULL)
5846             free(valArray);
5847 
5848         // check error
5849         if (rc != NO_ERROR)
5850             break;
5851 
5852     } // end of for (i = 0)
5853 
5854 // timer.start("Delete:purgePrimProcFdCache");
5855     if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
5856         cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
5857 
5858 //if (idbdatafile::IDBPolicy::useHdfs())
5859 //			cacheutils::dropPrimProcFdCache();
5860 //timer.stop("Delete:purgePrimProcFdCache");
5861     if (rangeListTot.size() > 0)
5862         BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);
5863 
5864 //timer.stop("Delete:writecolrec");
5865 //#ifdef PROFILE
5866 //timer.finish();
5867 //#endif
5868     return rc;
5869 }
5870 
5871 /*@brief tokenize - return a token for a given signature and size
5872 */
5873 /***********************************************************
5874  * DESCRIPTION:
5875  *  return a token for a given signature and size
5876  *  If it is not in the dictionary, the signature
5877  *  will be added to the dictionary and the index tree
5878  *  If it is already in the dictionary, then
5879  *  the token will be returned
5880  *  This function does not open and close files.
5881  *  users need to use openDctnry and CloseDctnry
5882  * PARAMETERS:
5883  *  DctnryTuple& dctnryTuple - holds the sigValue, sigSize and token
5884  * RETURN:
5885  *    NO_ERROR if success
5886  *    others if something wrong in inserting the value
5887  ***********************************************************/
tokenize(const TxnID & txnid,DctnryTuple & dctnryTuple,int ct)5888 int WriteEngineWrapper::tokenize(const TxnID& txnid, DctnryTuple& dctnryTuple, int ct)
5889 {
5890     int cop = op(ct);
5891     m_dctnry[cop]->setTransId(txnid);
5892     //cout << "Tokenizing dctnryTuple.sigValue " << dctnryTuple.sigValue << endl;
5893     return m_dctnry[cop]->updateDctnry(dctnryTuple.sigValue, dctnryTuple.sigSize, dctnryTuple.token);
5894 }
5895 
5896 /*@brief tokenize - return a token for a given signature and size
5897  *                          accept OIDs as input
5898 */
5899 /***********************************************************
5900  * DESCRIPTION:
5901  *  Token for a given signature and size
5902  *  If it is not in the dictionary, the signature
5903  *  will be added to the dictionary and the index tree
5904  *  If it is already in the dictionary, then
5905  *  the token will be returned
5906  * PARAMETERS:
5907  *  DctnryTuple& dctnryTuple - holds the sigValue, sigSize and token
5908  *  DctnryStruct dctnryStruct- contain the 3 OID for dictionary,
5909  *                             tree and list.
5910  * RETURN:
5911  *    NO_ERROR if success
5912  *    others if something wrong in inserting the value
5913  ***********************************************************/
tokenize(const TxnID & txnid,DctnryStruct & dctnryStruct,DctnryTuple & dctnryTuple,bool useTmpSuffix)5914 int WriteEngineWrapper::tokenize(const TxnID& txnid,
5915                                  DctnryStruct& dctnryStruct,
5916                                  DctnryTuple& dctnryTuple,
5917                                  bool useTmpSuffix) // @bug 5572 HDFS tmp file
5918 {
5919     //find the corresponding column segment file the token is going to be inserted.
5920 
5921     Dctnry* dctnry = m_dctnry[op(dctnryStruct.fCompressionType)];
5922     int rc = dctnry->openDctnry(dctnryStruct.dctnryOid,
5923                                 dctnryStruct.fColDbRoot, dctnryStruct.fColPartition,
5924                                 dctnryStruct.fColSegment,
5925                                 useTmpSuffix); // @bug 5572 TBD
5926 
5927     if (rc != NO_ERROR)
5928         return rc;
5929 
5930     rc = tokenize(txnid, dctnryTuple, dctnryStruct.fCompressionType);
5931 
5932     int rc2 = dctnry->closeDctnry(true); // close file, even if tokenize() fails
5933 
5934     if ((rc == NO_ERROR) && (rc2 != NO_ERROR))
5935         rc = rc2;
5936 
5937     return rc;
5938 }
5939 
5940 /***********************************************************
5941  * DESCRIPTION:
5942  *    Create column files, including data and bitmap files
5943  * PARAMETERS:
5944  *    dataOid - column data file id
5945  *    bitmapOid - column bitmap file id
5946  *    colWidth - column width
5947  *    dbRoot   - DBRoot where file is to be located
5948  *    partition - Starting partition number for segment file path
5949  *     segment - segment number
5950  *     compressionType - compression type
5951  * RETURN:
5952  *    NO_ERROR if success
5953  *    ERR_FILE_EXIST if file exists
5954  *    ERR_FILE_CREATE if something wrong in creating the file
5955  ***********************************************************/
createDctnry(const TxnID & txnid,const OID & dctnryOid,int colWidth,uint16_t dbRoot,uint32_t partiotion,uint16_t segment,int compressionType)5956 int WriteEngineWrapper::createDctnry(const TxnID& txnid,
5957                                      const OID& dctnryOid,
5958                                      int colWidth,
5959                                      uint16_t dbRoot,
5960                                      uint32_t partiotion,
5961                                      uint16_t segment,
5962                                      int compressionType)
5963 {
5964     BRM::LBID_t startLbid;
5965     return m_dctnry[op(compressionType)]->
5966            createDctnry( dctnryOid, colWidth, dbRoot, partiotion, segment, startLbid);
5967 }
5968 
convertRidToColumn(RID & rid,uint16_t & dbRoot,uint32_t & partition,uint16_t & segment,RID filesPerColumnPartition,RID extentsPerSegmentFile,RID extentRows,uint16_t startDBRoot,unsigned dbrootCnt)5969 int WriteEngineWrapper::convertRidToColumn (RID& rid, uint16_t& dbRoot, uint32_t& partition,
5970         uint16_t& segment, RID filesPerColumnPartition,
5971         RID  extentsPerSegmentFile, RID extentRows,
5972         uint16_t startDBRoot, unsigned dbrootCnt)
5973 {
5974     int rc = 0;
5975     partition = rid / (filesPerColumnPartition * extentsPerSegmentFile * extentRows);
5976 
5977     segment = (((rid % (filesPerColumnPartition * extentsPerSegmentFile * extentRows)) / extentRows)) % filesPerColumnPartition;
5978 
5979     dbRoot = ((startDBRoot - 1 + segment) % dbrootCnt) + 1;
5980 
5981     //Calculate the relative rid for this segment file
5982     RID relRidInPartition = rid - ((RID)partition * (RID)filesPerColumnPartition * (RID)extentsPerSegmentFile * (RID)extentRows);
5983     assert (relRidInPartition <= (RID)filesPerColumnPartition * (RID)extentsPerSegmentFile * (RID)extentRows);
5984     uint32_t numExtentsInThisPart = relRidInPartition / extentRows;
5985     unsigned numExtentsInThisSegPart = numExtentsInThisPart / filesPerColumnPartition;
5986     RID relRidInThisExtent = relRidInPartition - numExtentsInThisPart * extentRows;
5987     rid = relRidInThisExtent +  numExtentsInThisSegPart * extentRows;
5988     return rc;
5989 }
5990 
5991 /***********************************************************
5992  * DESCRIPTION:
5993  *    Clears table lock for the specified table lock ID.
5994  * PARAMETERS:
5995  *    lockID - table lock to be released
5996  *    errMsg - if error occurs, this is the return error message
5997  * RETURN:
5998  *    NO_ERROR if operation is successful
5999  ***********************************************************/
clearTableLockOnly(uint64_t lockID,std::string & errMsg)6000 int WriteEngineWrapper::clearTableLockOnly(
6001     uint64_t     lockID,
6002     std::string& errMsg)
6003 {
6004     bool bReleased;
6005 
6006     int rc = BRMWrapper::getInstance()->releaseTableLock( lockID,
6007              bReleased, errMsg);
6008 
6009     return rc;
6010 }
6011 
6012 /***********************************************************
6013  * DESCRIPTION:
6014  *    Rolls back the state of the extentmap and database files for the
6015  *    specified table OID, using the metadata previously saved to disk.
6016  *    Also clears the table lock for the specified table OID.
6017  * PARAMETERS:
6018  *    tableOid - table OID to be rolled back
6019  *    lockID   - table lock corresponding to tableOid
6020  *    tableName - table name associated with tableOid
6021  *    applName - application that is driving this bulk rollback
6022  *    debugConsole - enable debug logging to the console
6023  *    errorMsg - error message explaining any rollback failure
6024  * RETURN:
6025  *    NO_ERROR if rollback completed succesfully
6026  ***********************************************************/
bulkRollback(OID tableOid,uint64_t lockID,const std::string & tableName,const std::string & applName,bool debugConsole,string & errorMsg)6027 int WriteEngineWrapper::bulkRollback(OID   tableOid,
6028                                      uint64_t lockID,
6029                                      const std::string& tableName,
6030                                      const std::string& applName,
6031                                      bool debugConsole, string& errorMsg)
6032 {
6033     errorMsg.clear();
6034 
6035     BulkRollbackMgr rollbackMgr(tableOid, lockID, tableName, applName);
6036 
6037     if (debugConsole)
6038         rollbackMgr.setDebugConsole(true);
6039 
6040     // We used to pass "false" to not keep (delete) the metafiles at the end of
6041     // the rollback.  But after the transition to sharedNothing, we pass "true"
6042     // to initially keep these files.  The metafiles are deleted later, only
6043     // after all the distributed bulk rollbacks are successfully completed.
6044     int rc = rollbackMgr.rollback( true );
6045 
6046     if (rc != NO_ERROR)
6047         errorMsg = rollbackMgr.getErrorMsg();
6048 
6049     // Ignore the return code for now; more important to base rc on the
6050     // success or failure of the previous work
6051     BRMWrapper::getInstance()->takeSnapshot();
6052 
6053     return rc;
6054 }
6055 
rollbackCommon(const TxnID & txnid,int sessionId)6056 int WriteEngineWrapper::rollbackCommon(const TxnID& txnid, int sessionId)
6057 {
6058     //Remove the unwanted tmp files and recover compressed chunks.
6059     string prefix;
6060 
6061     // BUG 4312
6062     RemoveTxnFromLBIDMap(txnid);
6063     RemoveTxnFromDictMap(txnid);
6064 
6065     config::Config* config = config::Config::makeConfig();
6066     prefix = config->getConfig("SystemConfig", "DBRMRoot");
6067 
6068     if (prefix.length() == 0)
6069     {
6070         cerr << "Need a valid DBRMRoot entry in Calpont configuation file";
6071         return -1;
6072     }
6073 
6074     uint64_t pos =  prefix.find_last_of ("/") ;
6075     std::string aDMLLogFileName;
6076 
6077     if (pos != string::npos)
6078     {
6079         aDMLLogFileName = prefix.substr(0, pos + 1); //Get the file path
6080     }
6081     else
6082     {
6083         logging::Message::Args args;
6084         args.add("RollbackTran cannot find the dbrm directory for the DML log file");
6085         SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_CRITICAL, logging::M0007);
6086         return -1;
6087 
6088     }
6089 
6090     std::ostringstream oss;
6091     oss << txnid << "_" << Config::getLocalModuleID();
6092     aDMLLogFileName += "DMLLog_" + oss.str();
6093 
6094     if (IDBPolicy::exists(aDMLLogFileName.c_str()))
6095     {
6096         // TODO-for now the DML log file will always be in a local
6097         // filesystem since IDBDataFile doesn't have any support for
6098         // a cpp iostream interface.  need to decide if this is ok.
6099         boost::scoped_ptr<IDBDataFile> aDMLLogFile(IDBDataFile::open(
6100                     IDBPolicy::getType(aDMLLogFileName.c_str(),
6101                                        IDBPolicy::WRITEENG),
6102                     aDMLLogFileName.c_str(), "r", 0));
6103 
6104         if (aDMLLogFile) //need recover
6105         {
6106             ssize_t fileSize = aDMLLogFile->size();
6107             boost::scoped_array<char> buf(new char[fileSize]);
6108 
6109             if (aDMLLogFile->read(buf.get(), fileSize) != fileSize)
6110                 return ERR_FILE_READ;
6111 
6112             std::istringstream strstream(string(buf.get(), fileSize));
6113             std::string backUpFileType;
6114             std::string filename;
6115             int64_t size;
6116             int64_t offset;
6117 
6118             while (strstream >> backUpFileType >> filename >> size >> offset)
6119             {
6120                 //cout << "Found: " <<  backUpFileType << " name " << filename << "size: " << size << " offset: " << offset << endl;
6121                 std::ostringstream oss;
6122                 oss << "RollbackTran found " <<  backUpFileType << " name " << filename << " size: " << size << " offset: " << offset;
6123                 logging::Message::Args args;
6124                 args.add(oss.str());
6125                 SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_INFO, logging::M0007);
6126 
6127                 if (backUpFileType.compare("rlc") == 0)
6128                 {
6129                     //remove the rlc file
6130                     filename += ".rlc";
6131                     //cout << " File removed: " << filename << endl;
6132                     IDBPolicy::remove(filename.c_str());
6133                     logging::Message::Args args1;
6134                     args1.add(filename);
6135                     args1.add(" is removed.");
6136                     SimpleSysLog::instance()->logMsg(args1, logging::LOG_TYPE_INFO, logging::M0007);
6137                 }
6138                 else if (backUpFileType.compare("tmp") == 0)
6139                 {
6140                     int rc = NO_ERROR;
6141                     string orig(filename + ".orig");
6142 
6143                     // restore the orig file
6144                     if (IDBPolicy::exists(orig.c_str()))
6145                     {
6146                         // not likely both cdf and tmp exist
6147                         if (IDBPolicy::exists(filename.c_str()) &&
6148                                 IDBPolicy::remove(filename.c_str()) != 0)
6149                             rc = ERR_COMP_REMOVE_FILE;
6150 
6151                         if (rc == NO_ERROR && IDBPolicy::rename(orig.c_str(), filename.c_str()) != 0)
6152                             rc = ERR_COMP_RENAME_FILE;
6153                     }
6154 
6155                     // remove the tmp file
6156                     string tmp(filename + ".tmp");
6157 
6158                     if (rc == NO_ERROR && IDBPolicy::exists(tmp.c_str()) &&
6159                             IDBPolicy::remove(tmp.c_str()) != 0)
6160                         rc = ERR_COMP_REMOVE_FILE;
6161 
6162                     // remove the chunk shifting helper
6163                     string rlc(filename + ".rlc");
6164 
6165                     if (rc == NO_ERROR && IDBPolicy::exists(rlc.c_str()) &&
6166                             IDBPolicy::remove(rlc.c_str()) != 0)
6167                         rc = ERR_COMP_REMOVE_FILE;
6168 
6169                     logging::Message::Args args1;
6170                     args1.add(filename);
6171 
6172                     if (rc == NO_ERROR)
6173                     {
6174                         args1.add(" is restored.");
6175                         SimpleSysLog::instance()->logMsg(args1,
6176                                                          logging::LOG_TYPE_INFO, logging::M0007);
6177                     }
6178                     else
6179                     {
6180                         args1.add(" may not restored: ");
6181                         args1.add(rc);
6182                         SimpleSysLog::instance()->logMsg(args1,
6183                                                          logging::LOG_TYPE_CRITICAL, logging::M0007);
6184 
6185                         return rc;
6186                     }
6187                 }
6188                 else
6189                 {
6190                     //copy back to the data file
6191                     std::string backFileName(filename);
6192 
6193                     if (backUpFileType.compare("chk") == 0 )
6194                         backFileName += ".chk";
6195                     else
6196                         backFileName += ".hdr";
6197 
6198                     //cout << "Rollback found file " << backFileName << endl;
6199                     IDBDataFile* sourceFile = IDBDataFile::open(
6200                                                   IDBPolicy::getType(backFileName.c_str(), IDBPolicy::WRITEENG),
6201                                                   backFileName.c_str(), "r", 0);
6202                     IDBDataFile* targetFile = IDBDataFile::open(
6203                                                   IDBPolicy::getType(filename.c_str(), IDBPolicy::WRITEENG),
6204                                                   filename.c_str(), "r+", 0);
6205 
6206                     size_t byteRead;
6207                     unsigned char* readBuf = new unsigned char[size];
6208                     boost::scoped_array<unsigned char> readBufPtr( readBuf );
6209 
6210                     if ( sourceFile != NULL )
6211                     {
6212                         int rc = sourceFile->seek( 0, 0 );
6213 
6214                         if (rc)
6215                             return ERR_FILE_SEEK;
6216 
6217                         byteRead = sourceFile->read( readBuf, size );
6218 
6219                         if ( (int) byteRead != size )
6220                         {
6221                             logging::Message::Args args6;
6222                             args6.add("Rollback cannot read backup file ");
6223                             args6.add(backFileName);
6224                             SimpleSysLog::instance()->logMsg(args6, logging::LOG_TYPE_ERROR, logging::M0007);
6225                             return ERR_FILE_READ;
6226                         }
6227                     }
6228                     else
6229                     {
6230                         logging::Message::Args args5;
6231                         args5.add("Rollback cannot open backup file ");
6232                         args5.add(backFileName);
6233                         SimpleSysLog::instance()->logMsg(args5, logging::LOG_TYPE_ERROR, logging::M0007);
6234                         return ERR_FILE_NULL;
6235                     }
6236 
6237                     size_t byteWrite;
6238 
6239                     if ( targetFile != NULL )
6240                     {
6241                         int rc = targetFile->seek( offset, 0 );
6242 
6243                         if (rc)
6244                             return ERR_FILE_SEEK;
6245 
6246                         byteWrite = targetFile->write( readBuf, size );
6247 
6248                         if ( (int) byteWrite != size )
6249                         {
6250                             logging::Message::Args args3;
6251                             args3.add("Rollback cannot copy to file ");
6252                             args3.add(filename);
6253                             args3.add( "from file ");
6254                             args3.add(backFileName);
6255                             SimpleSysLog::instance()->logMsg(args3, logging::LOG_TYPE_ERROR, logging::M0007);
6256 
6257                             return ERR_FILE_WRITE;
6258                         }
6259                     }
6260                     else
6261                     {
6262                         logging::Message::Args args4;
6263                         args4.add("Rollback cannot open target file ");
6264                         args4.add(filename);
6265                         SimpleSysLog::instance()->logMsg(args4, logging::LOG_TYPE_ERROR, logging::M0007);
6266                         return ERR_FILE_NULL;
6267                     }
6268 
6269                     //cout << "Rollback copied to file " << filename << " from file " << backFileName << endl;
6270 
6271                     delete targetFile;
6272                     delete sourceFile;
6273                     IDBPolicy::remove( backFileName.c_str() );
6274                     logging::Message::Args arg1;
6275                     arg1.add("Rollback copied to file ");
6276                     arg1.add(filename);
6277                     arg1.add( "from file ");
6278                     arg1.add(backFileName);
6279                     SimpleSysLog::instance()->logMsg(arg1, logging::LOG_TYPE_INFO, logging::M0007);
6280                 }
6281             }
6282         }
6283 
6284         IDBPolicy::remove(aDMLLogFileName.c_str());
6285     }
6286 
6287     return 0;
6288 
6289 }
6290 
rollbackTran(const TxnID & txnid,int sessionId)6291 int WriteEngineWrapper::rollbackTran(const TxnID& txnid, int sessionId)
6292 {
6293     if ( rollbackCommon( txnid, sessionId ) != 0 )
6294         return -1;
6295 
6296     return BRMWrapper::getInstance()->rollBack(txnid, sessionId);
6297 }
6298 
rollbackBlocks(const TxnID & txnid,int sessionId)6299 int WriteEngineWrapper::rollbackBlocks(const TxnID& txnid, int sessionId)
6300 {
6301     if ( rollbackCommon( txnid, sessionId ) != 0 )
6302         return -1;
6303 
6304     return BRMWrapper::getInstance()->rollBackBlocks(txnid, sessionId);
6305 }
6306 
rollbackVersion(const TxnID & txnid,int sessionId)6307 int WriteEngineWrapper::rollbackVersion(const TxnID& txnid, int sessionId)
6308 {
6309     // BUG 4312
6310     RemoveTxnFromLBIDMap(txnid);
6311     RemoveTxnFromDictMap(txnid);
6312 
6313     return BRMWrapper::getInstance()->rollBackVersion(txnid, sessionId);
6314 }
6315 
updateNextValue(const TxnID txnId,const OID & columnoid,const uint64_t nextVal,const uint32_t sessionID,const uint16_t dbRoot)6316 int WriteEngineWrapper::updateNextValue(const TxnID txnId, const OID& columnoid, const uint64_t nextVal, const uint32_t sessionID, const uint16_t dbRoot)
6317 {
6318     int rc = NO_ERROR;
6319     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
6320     RIDList ridList;
6321     ColValueList colValueList;
6322     WriteEngine::ColTupleList colTuples;
6323     ColStructList colStructList;
6324     WriteEngine::ColStruct colStruct;
6325     colStruct.dataOid = OID_SYSCOLUMN_NEXTVALUE;
6326     colStruct.colWidth = 8;
6327     colStruct.tokenFlag = false;
6328     colStruct.colDataType =  CalpontSystemCatalog::UBIGINT;
6329     colStruct.fColDbRoot = dbRoot;
6330 
6331     if (idbdatafile::IDBPolicy::useHdfs())
6332         colStruct.fCompressionType = 2;
6333 
6334     colStructList.push_back(colStruct);
6335     ColTuple colTuple;
6336     systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
6337     systemCatalogPtr->identity(CalpontSystemCatalog::EC);
6338     CalpontSystemCatalog::ROPair ropair;
6339 
6340     try
6341     {
6342         ropair = systemCatalogPtr->nextAutoIncrRid(columnoid);
6343     }
6344     catch (...)
6345     {
6346         rc = ERR_AUTOINC_RID;
6347     }
6348 
6349     if (rc != NO_ERROR)
6350         return rc;
6351 
6352     ridList.push_back(ropair.rid);
6353     colTuple.data = nextVal;
6354     colTuples.push_back(colTuple);
6355     colValueList.push_back(colTuples);
6356     //TxnID txnid;
6357     rc = writeColumnRecords(txnId, colStructList, colValueList, ridList, SYSCOLUMN_BASE, false);
6358 
6359     if (rc != NO_ERROR)
6360         return rc;
6361 
6362     //flush PrimProc cache
6363     vector<LBID_t> blockList;
6364     BRM::LBIDRange_v lbidRanges;
6365     rc = BRMWrapper::getInstance()->lookupLbidRanges(OID_SYSCOLUMN_NEXTVALUE,
6366             lbidRanges);
6367 
6368     if (rc != NO_ERROR)
6369         return rc;
6370 
6371     LBIDRange_v::iterator it;
6372 
6373     for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
6374     {
6375         for (LBID_t  lbid = it->start; lbid < (it->start + it->size); lbid++)
6376         {
6377             blockList.push_back(lbid);
6378         }
6379     }
6380 
6381     //Bug 5459 Flush FD cache
6382     std::vector<BRM::FileInfo> files;
6383     BRM::FileInfo aFile;
6384     aFile.oid = colStruct.dataOid;
6385     aFile.partitionNum = colStruct.fColPartition;
6386     aFile.dbRoot = colStruct.fColDbRoot;;
6387     aFile.segmentNum = colStruct.fColSegment;
6388     aFile.compType = colStruct.fCompressionType;
6389     files.push_back(aFile);
6390 
6391     if (idbdatafile::IDBPolicy::useHdfs())
6392         cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
6393 
6394     rc = cacheutils::flushPrimProcAllverBlocks (blockList);
6395 
6396     if (rc != 0)
6397         rc = ERR_BLKCACHE_FLUSH_LIST; // translate to WE error
6398 
6399     return rc;
6400 }
6401 
6402 /***********************************************************
6403  * DESCRIPTION:
6404  *    Flush compressed files in chunk manager
6405  * PARAMETERS:
6406  *    none
6407  * RETURN:
6408  *    none
6409  ***********************************************************/
flushDataFiles(int rc,const TxnID txnId,std::map<FID,FID> & columnOids)6410 int WriteEngineWrapper::flushDataFiles(int rc, const TxnID txnId, std::map<FID, FID>& columnOids)
6411 {
6412     RemoveTxnFromLBIDMap(txnId);
6413     RemoveTxnFromDictMap(txnId);
6414 
6415     for (int i = 0; i < TOTAL_COMPRESS_OP; i++)
6416     {
6417         int rc1 = m_colOp[i]->flushFile(rc, columnOids);
6418         int rc2 = m_dctnry[i]->flushFile(rc, columnOids);
6419 
6420         if (rc == NO_ERROR)
6421         {
6422             rc = (rc1 != NO_ERROR) ? rc1 : rc2;
6423         }
6424     }
6425 
6426     return rc;
6427 }
6428 
AddDictToList(const TxnID txnid,std::vector<BRM::LBID_t> & lbids)6429 void WriteEngineWrapper::AddDictToList(const TxnID txnid,
6430                                        std::vector<BRM::LBID_t>& lbids)
6431 {
6432     std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
6433 
6434     mapIter = m_dictLBIDMap.find(txnid);
6435 
6436     if (mapIter == m_dictLBIDMap.end())
6437     {
6438         dictLBIDRec_t tempRecord;
6439         tempRecord.insert(lbids.begin(), lbids.end());
6440         m_dictLBIDMap[txnid] = tempRecord;
6441         return;
6442     }
6443     else
6444     {
6445         dictLBIDRec_t& txnRecord = mapIter->second;
6446         txnRecord.insert(lbids.begin(), lbids.end());
6447     }
6448 
6449 }
6450 
6451 /***********************************************************
6452  * DESCRIPTION:
6453  *    Add an lbid to a list of lbids for sending to markExtentsInvalid.
6454  *    However, rather than storing each lbid, store only unique first
6455  *    lbids. This is an optimization to prevent invalidating the same
6456  *    extents over and over.
6457  * PARAMETERS:
6458  *    txnid - the lbid list is per txn. We use this to keep transactions
6459  *            seperated.
6460  *    lbids - the current list of lbids. We add to this list
6461  *            if the discovered lbid is in a new extent.
6462  *   These next are needed for dbrm to get the lbid
6463  *    oid       -the table oid.
6464  *    colPartition - the table column partition
6465  *    segment   - table segment
6466  *    fbo       - file block offset
6467  * RETURN: 0 => OK. -1 => error
6468  ***********************************************************/
AddLBIDtoList(const TxnID txnid,std::vector<BRM::LBID_t> & lbids,std::vector<CalpontSystemCatalog::ColDataType> & colDataTypes,const ColStruct & colStruct,const int fbo)6469 int WriteEngineWrapper::AddLBIDtoList(const TxnID     txnid,
6470                                       std::vector<BRM::LBID_t>& lbids,
6471                                       std::vector<CalpontSystemCatalog::ColDataType>& colDataTypes,
6472                                       const ColStruct& colStruct,
6473                                       const int       fbo)
6474 {
6475     int rtn = 0;
6476 
6477     BRM::LBID_t     startingLBID;
6478     SP_TxnLBIDRec_t spTxnLBIDRec;
6479     std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
6480 
6481     // Find the set of extent starting LBIDs for this transaction. If not found, then create it.
6482     mapIter = m_txnLBIDMap.find(txnid);
6483 
6484     if (mapIter == m_txnLBIDMap.end())
6485     {
6486         // This is a new transaction.
6487         SP_TxnLBIDRec_t  sptemp(new TxnLBIDRec);
6488         spTxnLBIDRec = sptemp;
6489         m_txnLBIDMap[txnid] = spTxnLBIDRec;
6490 //        cout << "New transaction entry " << txnid << " transaction count " << m_txnLBIDMap.size() << endl;
6491     }
6492     else
6493     {
6494         spTxnLBIDRec = (*mapIter).second;
6495     }
6496 
6497     // Get the extent starting lbid given all these values (startingLBID is an out parameter).
6498     rtn = BRMWrapper::getInstance()->getStartLbid(colStruct.dataOid, colStruct.fColPartition,
6499             colStruct.fColSegment, fbo, startingLBID);
6500 
6501     if (rtn != 0)
6502         return -1;
6503 
6504     if (spTxnLBIDRec->m_LBIDMap.find(startingLBID) == spTxnLBIDRec->m_LBIDMap.end())
6505     {
6506         // Not found in the map. This must be a new extent. Add it to the list.
6507 //        cout << "Adding lbid " << startingLBID << " to txn " << txnid << endl;
6508         spTxnLBIDRec->AddLBID(startingLBID);
6509         lbids.push_back((BRM::LBID_t)startingLBID);
6510         colDataTypes.push_back(colStruct.colDataType);
6511     }
6512     else
6513     {
6514         ++spTxnLBIDRec->m_squashedLbids;
6515     }
6516 
6517     // If the starting LBID list has grown to more than 2000, truncate.
6518     // This is the purpose of the seqnum. If spTxnLBIDRec->m_lastSeqnum
6519     // is divisible by 1000 and size() > 1000, get rid of everything older
6520     // than the last 1000 entries. This is to save memory in large
6521     // transactions. We assume older extents are unlikely to be hit again.
6522     if (spTxnLBIDRec->m_lastSeqnum % 1000 == 0
6523             && spTxnLBIDRec->m_LBIDMap.size() > 1000)
6524     {
6525 //        cout << "Trimming the LBID list for " << txnid << ". LBID count is " << spTxnLBIDRec->m_LBIDMap.size() << endl;
6526         uint32_t firstDrop = spTxnLBIDRec->m_lastSeqnum - 1000;
6527         std::tr1::unordered_map<BRM::LBID_t, uint32_t>::iterator iter;
6528 
6529         for (iter = spTxnLBIDRec->m_LBIDMap.begin(); iter != spTxnLBIDRec->m_LBIDMap.end();)
6530         {
6531             if ((*iter).second < firstDrop)
6532             {
6533                 iter = spTxnLBIDRec->m_LBIDMap.erase(iter);
6534             }
6535             else
6536             {
6537                 ++iter;
6538             }
6539         }
6540 
6541 //        cout << "LBID count is now" << spTxnLBIDRec->m_LBIDMap.size() << endl;
6542     }
6543 
6544     return rtn;
6545 }
6546 
RemoveTxnFromDictMap(const TxnID txnid)6547 void WriteEngineWrapper::RemoveTxnFromDictMap(const TxnID txnid)
6548 {
6549     std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
6550 
6551     mapIter = m_dictLBIDMap.find(txnid);
6552 
6553     if (mapIter != m_dictLBIDMap.end())
6554     {
6555         m_dictLBIDMap.erase(txnid);
6556     }
6557 }
6558 
6559 /***********************************************************
6560  * DESCRIPTION:
6561  *    Remove a transaction LBID list from the LBID map
6562  *    Called when a transaction ends, either commit or rollback
6563  * PARAMETERS:
6564  *    txnid - the transaction to remove.
6565  * RETURN:
6566  *    0 => success or not found, -1 => error
6567  ***********************************************************/
RemoveTxnFromLBIDMap(const TxnID txnid)6568 int WriteEngineWrapper::RemoveTxnFromLBIDMap(const TxnID txnid)
6569 {
6570     int rtn = 0;
6571     std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
6572 
6573     // Find the set of extent starting LBIDs for this transaction. If not found, then create it.
6574     try
6575     {
6576         mapIter = m_txnLBIDMap.find(txnid);
6577 
6578         if (mapIter != m_txnLBIDMap.end())
6579         {
6580             SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
6581             // Debug
6582 //            cout << "Remove transaction entry " << txnid << " transaction count " << m_txnLBIDMap.size() << endl;
6583 //            cout << "    count = " << spTxnLBIDRec->m_LBIDMap.size() <<
6584 //                    ", lastSeqnum = " <<  spTxnLBIDRec->m_lastSeqnum <<
6585 //                    ", squashed lbids = " << spTxnLBIDRec->m_squashedLbids << endl;
6586             m_txnLBIDMap.erase(txnid);   // spTxnLBIDRec is auto-destroyed
6587         }
6588     }
6589     catch (...)
6590     {
6591         rtn = -1;
6592     }
6593 
6594     return rtn;
6595 }
6596 
6597 
6598 } //end of namespace
6599 // vim:ts=4 sw=4:
6600 
6601