1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: writeengine.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
20
21 /** @writeengine.cpp
22 * A wrapper class for the write engine to write information to files
23 */
24 #include <cmath>
25 #include <cstdlib>
26 #include <unistd.h>
27 #include <boost/scoped_array.hpp>
28 #include <boost/scoped_ptr.hpp>
29 using namespace std;
30
31 #include "joblisttypes.h"
32
33 #define WRITEENGINEWRAPPER_DLLEXPORT
34 #include "writeengine.h"
35 #undef WRITEENGINEWRAPPER_DLLEXPORT
36
37 #include "we_convertor.h"
38 #include "we_log.h"
39 #include "we_simplesyslog.h"
40 #include "we_config.h"
41 #include "we_bulkrollbackmgr.h"
42 #include "brm.h"
43 #include "stopwatch.h"
44 #include "we_colop.h"
45 #include "we_type.h"
46
47 #include "we_colopcompress.h"
48 #include "we_dctnrycompress.h"
49 #include "cacheutils.h"
50 #include "calpontsystemcatalog.h"
51 #include "we_simplesyslog.h"
52 using namespace cacheutils;
53 using namespace logging;
54 using namespace BRM;
55 using namespace execplan;
56 #include "IDBDataFile.h"
57 #include "IDBPolicy.h"
58 #include "MonitorProcMem.h"
59 using namespace idbdatafile;
60
61 #ifdef _MSC_VER
62 #define isnan _isnan
63 #endif
64
65 namespace WriteEngine
66 //#define PROFILE 1
67
68 {
69 StopWatch timer;
70
71 /**@brief WriteEngineWrapper Constructor
72 */
WriteEngineWrapper()73 WriteEngineWrapper::WriteEngineWrapper() : m_opType(NOOP)
74 {
75 m_colOp[UN_COMPRESSED_OP] = new ColumnOpCompress0;
76 m_colOp[COMPRESSED_OP] = new ColumnOpCompress1;
77
78 m_dctnry[UN_COMPRESSED_OP] = new DctnryCompress0;
79 m_dctnry[COMPRESSED_OP] = new DctnryCompress1;
80 }
81
WriteEngineWrapper(const WriteEngineWrapper & rhs)82 WriteEngineWrapper::WriteEngineWrapper(const WriteEngineWrapper& rhs) : m_opType(rhs.m_opType)
83 {
84 m_colOp[UN_COMPRESSED_OP] = new ColumnOpCompress0;
85 m_colOp[COMPRESSED_OP] = new ColumnOpCompress1;
86
87 m_dctnry[UN_COMPRESSED_OP] = new DctnryCompress0;
88 m_dctnry[COMPRESSED_OP] = new DctnryCompress1;
89 }
90
91 /**@brief WriteEngineWrapper Constructor
92 */
~WriteEngineWrapper()93 WriteEngineWrapper::~WriteEngineWrapper()
94 {
95 delete m_colOp[UN_COMPRESSED_OP];
96 delete m_colOp[COMPRESSED_OP];
97 delete m_dctnry[UN_COMPRESSED_OP];
98 delete m_dctnry[COMPRESSED_OP];
99 }
100
101 /**@brief Perform upfront initialization
102 */
init(unsigned subSystemID)103 /* static */ void WriteEngineWrapper::init(unsigned subSystemID)
104 {
105 SimpleSysLog::instance()->setLoggingID(logging::LoggingID(subSystemID));
106 Config::initConfigCache();
107 BRMWrapper::getInstance();
108
109 // Bug 5415 Add HDFS MemBuffer vs. FileBuffer decision logic.
110 config::Config* cf = config::Config::makeConfig();
111 //--------------------------------------------------------------------------
112 // Memory overload protection. This setting will cause the process to die should
113 // it, by itself, consume maxPct of total memory. Monitored in MonitorProcMem.
114 // Only used at the express direction of Field Support.
115 //--------------------------------------------------------------------------
116 int maxPct = 0; //disable by default
117 string strMaxPct = cf->getConfig("WriteEngine", "MaxPct");
118
119 if ( strMaxPct.length() != 0 )
120 maxPct = cf->uFromText(strMaxPct);
121
122 //--------------------------------------------------------------------------
123 // MemoryCheckPercent. This controls at what percent of total memory be consumed
124 // by all processes before we switch from HdfsRdwrMemBuffer to HdfsRdwrFileBuffer.
125 // This is only used in Hdfs installations.
126 //--------------------------------------------------------------------------
127 int checkPct = 95;
128 string strCheckPct = cf->getConfig("SystemConfig", "MemoryCheckPercent");
129
130 if ( strCheckPct.length() != 0 )
131 checkPct = cf->uFromText(strCheckPct);
132
133 //--------------------------------------------------------------------------
134 // If we're either HDFS, or maxPct is turned on, start the monitor thread.
135 // Otherwise, we don't need it, so don't waste the resources.
136 //--------------------------------------------------------------------------
137 if (maxPct > 0 || IDBPolicy::useHdfs())
138 {
139 new boost::thread(utils::MonitorProcMem(maxPct, checkPct, subSystemID));
140 }
141 }
142
143 /*@brief checkValid --Check input parameters are valid
144 */
145 /***********************************************************
146 * DESCRIPTION:
147 * Check input parameters are valid
148 * PARAMETERS:
149 * colStructList - column struct list
150 * colValueList - column value list
151 * ridList - rowid list
152 * RETURN:
153 * NO_ERROR if success
154 * others if something wrong in the checking process
155 ***********************************************************/
checkValid(const TxnID & txnid,const ColStructList & colStructList,const ColValueList & colValueList,const RIDList & ridList) const156 int WriteEngineWrapper::checkValid(const TxnID& txnid, const ColStructList& colStructList, const ColValueList& colValueList, const RIDList& ridList) const
157 {
158 ColTupleList curTupleList;
159 ColStructList::size_type structListSize;
160 ColValueList::size_type valListSize;
161 ColTupleList::size_type totalRow;
162
163 if (colStructList.size() == 0)
164 return ERR_STRUCT_EMPTY;
165
166 structListSize = colStructList.size() ;
167 valListSize = colValueList.size();
168
169 // if (colStructList.size() != colValueList.size())
170 if (structListSize != valListSize)
171 return ERR_STRUCT_VALUE_NOT_MATCH;
172
173 for (ColValueList::size_type i = 0; i < valListSize; i++)
174 {
175
176 curTupleList = static_cast<ColTupleList>(colValueList[i]);
177 totalRow = curTupleList.size();
178
179 if (ridList.size() > 0)
180 {
181 if (totalRow != ridList.size())
182 return ERR_ROWID_VALUE_NOT_MATCH;
183 }
184
185 } // end of for (int i = 0;
186
187 return NO_ERROR;
188 }
189
190 /*@brief findSmallestColumn --Find the smallest column for this table
191 */
192 /***********************************************************
193 * DESCRIPTION:
194 * Find the smallest column for this table
195 * PARAMETERS:
196 * lowColLen - returns smallest column width
197 * colId - returns smallest column id
198 * colStructList - column struct list
199 * RETURN:
200 * void
201 ***********************************************************/
findSmallestColumn(uint32_t & colId,ColStructList colStructList)202 void WriteEngineWrapper::findSmallestColumn(uint32_t& colId, ColStructList colStructList)
203 // MCOL-1675: find the smallest column width to calculate the RowID from so
204 // that all HWMs will be incremented by this operation
205 {
206 int32_t lowColLen = 8192;
207 for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++)
208 {
209 if (colStructList[colIt].colWidth < lowColLen)
210 {
211 colId = colIt;
212 lowColLen = colStructList[colId].colWidth;
213 if ( lowColLen == 1 )
214 {
215 break;
216 }
217 }
218 }
219 }
220
221 /*@convertValArray - Convert interface values to internal values
222 */
223 /***********************************************************
224 * DESCRIPTION:
225 * Convert interface values to internal values
226 * PARAMETERS:
227 * colStructList - column struct list
228 * colValueList - column value list
229 * RETURN:
230 * none
231 * valArray - output value array
232 * nullArray - output null flag array
233 ***********************************************************/
convertValArray(const size_t totalRow,const ColType colType,ColTupleList & curTupleList,void * valArray,bool bFromList)234 void WriteEngineWrapper::convertValArray(const size_t totalRow, const ColType colType, ColTupleList& curTupleList, void* valArray, bool bFromList)
235 {
236 ColTuple curTuple;
237 ColTupleList::size_type i;
238
239 if (bFromList)
240 for (i = 0; i < curTupleList.size(); i++)
241 {
242 curTuple = curTupleList[i];
243 convertValue(colType, valArray, i, curTuple.data);
244 } // end of for (int i = 0
245 else
246 for (i = 0; i < totalRow; i++)
247 {
248 convertValue(colType, valArray, i, curTuple.data, false);
249 curTupleList.push_back(curTuple);
250 }
251 }
252
253 /*
254 * @brief Convert column value to its internal representation
255 */
convertValue(const ColType colType,void * value,boost::any & data)256 void WriteEngineWrapper::convertValue(const ColType colType, void* value, boost::any& data)
257 {
258 string curStr;
259 int size;
260
261 switch (colType)
262 {
263 case WriteEngine::WR_INT :
264 case WriteEngine::WR_MEDINT :
265 if (data.type() == typeid(int))
266 {
267 int val = boost::any_cast<int>(data);
268 size = sizeof(int);
269 memcpy(value, &val, size);
270 }
271 else
272 {
273 uint32_t val = boost::any_cast<uint32_t>(data);
274 size = sizeof(uint32_t);
275 memcpy(value, &val, size);
276 }
277
278 break;
279
280 case WriteEngine::WR_UINT :
281 case WriteEngine::WR_UMEDINT :
282 {
283 uint32_t val = boost::any_cast<uint32_t>(data);
284 size = sizeof(uint32_t);
285 memcpy(value, &val, size);
286 }
287 break;
288
289 case WriteEngine::WR_VARBINARY : // treat same as char for now
290 case WriteEngine::WR_CHAR :
291 case WriteEngine::WR_BLOB :
292 case WriteEngine::WR_TEXT :
293 curStr = boost::any_cast<string>(data);
294
295 if ((int) curStr.length() > MAX_COLUMN_BOUNDARY)
296 curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY);
297
298 memcpy(value, curStr.c_str(), curStr.length());
299 break;
300
301 case WriteEngine::WR_FLOAT:
302 {
303 float val = boost::any_cast<float>(data);
304
305 //N.B.There is a bug in boost::any or in gcc where, if you store a nan, you will get back a nan,
306 // but not necessarily the same bits that you put in. This only seems to be for float (double seems
307 // to work).
308 if (isnan(val))
309 {
310 uint32_t ti = joblist::FLOATNULL;
311 float* tfp = (float*)&ti;
312 val = *tfp;
313 }
314
315 size = sizeof(float);
316 memcpy(value, &val, size);
317 }
318 break;
319
320 case WriteEngine::WR_DOUBLE:
321 {
322 double val = boost::any_cast<double>(data);
323 size = sizeof(double);
324 memcpy(value, &val, size);
325 }
326 break;
327
328 case WriteEngine::WR_SHORT:
329 {
330 short val = boost::any_cast<short>(data);
331 size = sizeof(short);
332 memcpy(value, &val, size);
333 }
334 break;
335
336 case WriteEngine::WR_USHORT:
337 {
338 uint16_t val = boost::any_cast<uint16_t>(data);
339 size = sizeof(uint16_t);
340 memcpy(value, &val, size);
341 }
342 break;
343
344 case WriteEngine::WR_BYTE:
345 {
346 char val = boost::any_cast<char>(data);
347 size = sizeof(char);
348 memcpy(value, &val, size);
349 }
350 break;
351
352 case WriteEngine::WR_UBYTE:
353 {
354 uint8_t val = boost::any_cast<uint8_t>(data);
355 size = sizeof(uint8_t);
356 memcpy(value, &val, size);
357 }
358 break;
359
360 case WriteEngine::WR_LONGLONG:
361 if (data.type() == typeid(long long))
362 {
363 long long val = boost::any_cast<long long>(data);
364 size = sizeof(long long);
365 memcpy(value, &val, size);
366 }
367 else
368 {
369 uint64_t val = boost::any_cast<uint64_t>(data);
370 size = sizeof(uint64_t);
371 memcpy(value, &val, size);
372 }
373
374 break;
375
376 case WriteEngine::WR_ULONGLONG:
377 {
378 uint64_t val = boost::any_cast<uint64_t>(data);
379 size = sizeof(uint64_t);
380 memcpy(value, &val, size);
381 }
382 break;
383
384 case WriteEngine::WR_TOKEN:
385 {
386 Token val = boost::any_cast<Token>(data);
387 size = sizeof(Token);
388 memcpy(value, &val, size);
389 }
390 break;
391
392 } // end of switch (colType)
393 } /*@convertValue - The base for converting values */
394
395 /***********************************************************
396 * DESCRIPTION:
397 * The base for converting values
398 * PARAMETERS:
399 * colType - data type
400 * pos - array position
401 * data - value
402 * RETURN:
403 * none
404 ***********************************************************/
convertValue(const ColType colType,void * valArray,const size_t pos,boost::any & data,bool fromList)405 void WriteEngineWrapper::convertValue(const ColType colType, void* valArray, const size_t pos, boost::any& data, bool fromList)
406 {
407 string curStr;
408 // ColTuple curTuple;
409
410 if (fromList)
411 {
412 switch (colType)
413 {
414 case WriteEngine::WR_INT :
415 case WriteEngine::WR_MEDINT :
416 if (data.type() == typeid(long))
417 ((int*)valArray)[pos] = static_cast<int>(boost::any_cast<long>(data));
418 else if (data.type() == typeid(int))
419 ((int*)valArray)[pos] = boost::any_cast<int>(data);
420 else
421 ((int*)valArray)[pos] = boost::any_cast<uint32_t>(data);
422
423 break;
424
425 case WriteEngine::WR_UINT :
426 case WriteEngine::WR_UMEDINT :
427 ((uint32_t*)valArray)[pos] = boost::any_cast<uint32_t>(data);
428 break;
429
430 case WriteEngine::WR_VARBINARY : // treat same as char for now
431 case WriteEngine::WR_CHAR :
432 case WriteEngine::WR_BLOB :
433 case WriteEngine::WR_TEXT :
434 curStr = boost::any_cast<string>(data);
435
436 if ((int) curStr.length() > MAX_COLUMN_BOUNDARY)
437 curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY);
438
439 memcpy((char*)valArray + pos * MAX_COLUMN_BOUNDARY, curStr.c_str(), curStr.length());
440 break;
441
442 // case WriteEngine::WR_LONG : ((long*)valArray)[pos] = boost::any_cast<long>(curTuple.data);
443 // break;
444 case WriteEngine::WR_FLOAT:
445 ((float*)valArray)[pos] = boost::any_cast<float>(data);
446
447 if (isnan(((float*)valArray)[pos]))
448 {
449 uint32_t ti = joblist::FLOATNULL;
450 float* tfp = (float*)&ti;
451 ((float*)valArray)[pos] = *tfp;
452 }
453
454 break;
455
456 case WriteEngine::WR_DOUBLE:
457 ((double*)valArray)[pos] = boost::any_cast<double>(data);
458 break;
459
460 case WriteEngine::WR_SHORT:
461 ((short*)valArray)[pos] = boost::any_cast<short>(data);
462 break;
463
464 case WriteEngine::WR_USHORT:
465 ((uint16_t*)valArray)[pos] = boost::any_cast<uint16_t>(data);
466 break;
467
468 // case WriteEngine::WR_BIT: ((bool*)valArray)[pos] = boost::any_cast<bool>(data);
469 // break;
470 case WriteEngine::WR_BYTE:
471 ((char*)valArray)[pos] = boost::any_cast<char>(data);
472 break;
473
474 case WriteEngine::WR_UBYTE:
475 ((uint8_t*)valArray)[pos] = boost::any_cast<uint8_t>(data);
476 break;
477
478 case WriteEngine::WR_LONGLONG:
479 if (data.type() == typeid(long long))
480 ((long long*)valArray)[pos] = boost::any_cast<long long>(data);
481 else if (data.type() == typeid(long))
482 ((long long*)valArray)[pos] = (long long)boost::any_cast<long>(data);
483 else
484 ((long long*)valArray)[pos] = boost::any_cast<uint64_t>(data);
485
486 break;
487
488 case WriteEngine::WR_ULONGLONG:
489 ((uint64_t*)valArray)[pos] = boost::any_cast<uint64_t>(data);
490 break;
491
492 case WriteEngine::WR_TOKEN:
493 ((Token*)valArray)[pos] = boost::any_cast<Token>(data);
494 break;
495 } // end of switch (colType)
496 }
497 else
498 {
499 switch (colType)
500 {
501 case WriteEngine::WR_INT :
502 case WriteEngine::WR_MEDINT :
503 data = ((int*)valArray)[pos];
504 break;
505
506 case WriteEngine::WR_UINT :
507 case WriteEngine::WR_UMEDINT :
508 data = ((uint64_t*)valArray)[pos];
509 break;
510
511 case WriteEngine::WR_VARBINARY : // treat same as char for now
512 case WriteEngine::WR_CHAR :
513 case WriteEngine::WR_BLOB :
514 case WriteEngine::WR_TEXT :
515 char tmp[10];
516 memcpy(tmp, (char*)valArray + pos * 8, 8);
517 curStr = tmp;
518 data = curStr;
519 break;
520
521 // case WriteEngine::WR_LONG : ((long*)valArray)[pos] = boost::any_cast<long>(curTuple.data);
522 // break;
523 case WriteEngine::WR_FLOAT:
524 data = ((float*)valArray)[pos];
525 break;
526
527 case WriteEngine::WR_DOUBLE:
528 data = ((double*)valArray)[pos];
529 break;
530
531 case WriteEngine::WR_SHORT:
532 data = ((short*)valArray)[pos];
533 break;
534
535 case WriteEngine::WR_USHORT:
536 data = ((uint16_t*)valArray)[pos];
537 break;
538
539 // case WriteEngine::WR_BIT: data = ((bool*)valArray)[pos];
540 // break;
541 case WriteEngine::WR_BYTE:
542 data = ((char*)valArray)[pos];
543 break;
544
545 case WriteEngine::WR_UBYTE:
546 data = ((uint8_t*)valArray)[pos];
547 break;
548
549 case WriteEngine::WR_LONGLONG:
550 data = ((long long*)valArray)[pos];
551 break;
552
553 case WriteEngine::WR_ULONGLONG:
554 data = ((uint64_t*)valArray)[pos];
555 break;
556
557 case WriteEngine::WR_TOKEN:
558 data = ((Token*)valArray)[pos];
559 break;
560 } // end of switch (colType)
561 } // end of if
562 }
563
564 /*@createColumn - Create column files, including data and bitmap files
565 */
566 /***********************************************************
567 * DESCRIPTION:
568 * Create column files, including data and bitmap files
569 * PARAMETERS:
570 * dataOid - column data file id
571 * bitmapOid - column bitmap file id
572 * colWidth - column width
573 * dbRoot - DBRoot where file is to be located
574 * partition - Starting partition number for segment file path
575 * compressionType - compression type
576 * RETURN:
577 * NO_ERROR if success
578 * ERR_FILE_EXIST if file exists
579 * ERR_FILE_CREATE if something wrong in creating the file
580 ***********************************************************/
createColumn(const TxnID & txnid,const OID & dataOid,const CalpontSystemCatalog::ColDataType dataType,int dataWidth,uint16_t dbRoot,uint32_t partition,int compressionType)581 int WriteEngineWrapper::createColumn(
582 const TxnID& txnid,
583 const OID& dataOid,
584 const CalpontSystemCatalog::ColDataType dataType,
585 int dataWidth,
586 uint16_t dbRoot,
587 uint32_t partition,
588 int compressionType)
589 {
590 int rc;
591 Column curCol;
592
593 int compress_op = op(compressionType);
594 m_colOp[compress_op]->initColumn(curCol);
595 rc = m_colOp[compress_op]->createColumn(curCol, 0, dataWidth, dataType,
596 WriteEngine::WR_CHAR, (FID)dataOid, dbRoot, partition);
597
598 // This is optional, however, it's recommended to do so to free heap
599 // memory if assigned in the future
600 m_colOp[compress_op]->clearColumn(curCol);
601 std::map<FID, FID> oids;
602
603 if (rc == NO_ERROR)
604 rc = flushDataFiles(NO_ERROR, txnid, oids);
605
606 if (rc != NO_ERROR)
607 {
608 return rc;
609 }
610
611 RETURN_ON_ERROR(BRMWrapper::getInstance()->setLocalHWM(dataOid, partition, 0, 0));
612 // @bug 281 : fix for bug 281 - Add flush VM cache to clear all write buffer
613 //flushVMCache();
614 return rc;
615 }
616
617 //BUG931
618 /**
619 * @brief Fill column with default values
620 */
fillColumn(const TxnID & txnid,const OID & dataOid,const CalpontSystemCatalog::ColDataType dataType,int dataWidth,ColTuple defaultVal,const OID & refColOID,const CalpontSystemCatalog::ColDataType refColDataType,int refColWidth,int refCompressionType,bool isNULL,int compressionType,const string & defaultValStr,const OID & dictOid,bool autoincrement)621 int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid,
622 const CalpontSystemCatalog::ColDataType dataType, int dataWidth,
623 ColTuple defaultVal, const OID& refColOID,
624 const CalpontSystemCatalog::ColDataType refColDataType,
625 int refColWidth, int refCompressionType,
626 bool isNULL, int compressionType,
627 const string& defaultValStr,
628 const OID& dictOid, bool autoincrement)
629 {
630 int rc = NO_ERROR;
631 Column newCol;
632 Column refCol;
633 ColType newColType;
634 ColType refColType;
635 boost::scoped_array<char> defVal(new char[MAX_COLUMN_BOUNDARY]);
636 ColumnOp* colOpNewCol = m_colOp[op(compressionType)];
637 ColumnOp* refColOp = m_colOp[op(refCompressionType)];
638 Dctnry* dctnry = m_dctnry[op(compressionType)];
639 colOpNewCol->initColumn(newCol);
640 refColOp->initColumn(refCol);
641 //boost::shared_ptr<Dctnry> dctnry;
642 // boost::shared_ptr<ColumnOp> refColOp;
643 // refColOp.reset(colOpRefCol);
644 // dctnry.reset(dctOp);
645 uint16_t dbRoot = 1; //not to be used
646 int newDataWidth = dataWidth;
647 //Convert HWM of the reference column for the new column
648 //Bug 1703,1705
649 bool isToken = false;
650
651 if (((dataType == CalpontSystemCatalog::VARCHAR) && (dataWidth > 7)) ||
652 ((dataType == CalpontSystemCatalog::CHAR) && (dataWidth > 8)) ||
653 (dataType == CalpontSystemCatalog::VARBINARY) ||
654 (dataType == CalpontSystemCatalog::BLOB) ||
655 (dataType == CalpontSystemCatalog::TEXT))
656 {
657 isToken = true;
658 }
659
660 Convertor::convertColType(dataType, newColType, isToken);
661
662 if (((refColDataType == CalpontSystemCatalog::VARCHAR) && (refColWidth > 7)) ||
663 ((refColDataType == CalpontSystemCatalog::CHAR) && (refColWidth > 8)) ||
664 (refColDataType == CalpontSystemCatalog::VARBINARY) ||
665 (dataType == CalpontSystemCatalog::BLOB) ||
666 (dataType == CalpontSystemCatalog::TEXT))
667 {
668 isToken = true;
669 }
670
671 newDataWidth = colOpNewCol->getCorrectRowWidth(dataType, dataWidth);
672 // MCOL-1347 CS doubles the width for ALTER TABLE..ADD COLUMN
673 if ( dataWidth < 4 && dataType == CalpontSystemCatalog::VARCHAR )
674 {
675 newDataWidth >>= 1;
676 }
677
678 Convertor::convertColType(refColDataType, refColType, isToken);
679 refColOp->setColParam(refCol, 0, refColOp->getCorrectRowWidth(refColDataType, refColWidth),
680 refColDataType, refColType, (FID)refColOID, refCompressionType, dbRoot);
681 colOpNewCol->setColParam(newCol, 0, newDataWidth,
682 dataType, newColType, (FID)dataOid, compressionType, dbRoot);
683
684 int size = sizeof(Token);
685
686 if (newColType == WriteEngine::WR_TOKEN)
687 {
688 if (isNULL)
689 {
690 Token nullToken;
691 memcpy(defVal.get(), &nullToken, size);
692 }
693
694 //Tokenization is done when we create dictionary file
695 }
696 else
697 convertValue(newColType, defVal.get(), defaultVal.data);
698
699 if (rc == NO_ERROR)
700 rc = colOpNewCol->fillColumn(txnid, newCol, refCol, defVal.get(), dctnry, refColOp, dictOid, dataWidth, defaultValStr, autoincrement);
701
702 // colOpNewCol->clearColumn(newCol);
703 // colOpRefCol->clearColumn(refCol);
704
705 // free(defVal);
706
707 // flushing files is in colOp->fillColumn()
708 // if (rc == NO_ERROR)
709 // rc = flushDataFiles();
710
711 return rc;
712 }
713
deleteRow(const TxnID & txnid,vector<ColStructList> & colExtentsStruct,vector<void * > & colOldValueList,vector<RIDList> & ridLists,const int32_t tableOid)714 int WriteEngineWrapper::deleteRow(const TxnID& txnid, vector<ColStructList>& colExtentsStruct, vector<void*>& colOldValueList,
715 vector<RIDList>& ridLists, const int32_t tableOid)
716 {
717 ColTuple curTuple;
718 ColStruct curColStruct;
719 DctnryStruct dctnryStruct;
720 ColValueList colValueList;
721 ColTupleList curTupleList;
722 DctnryStructList dctnryStructList;
723 DctnryValueList dctnryValueList;
724 ColStructList colStructList;
725 uint64_t emptyVal;
726 int rc;
727 string tmpStr("");
728 vector<DctnryStructList> dctnryExtentsStruct;
729
730 if (colExtentsStruct.size() == 0 || ridLists.size() == 0)
731 return ERR_STRUCT_EMPTY;
732
733 // set transaction id
734 setTransId(txnid);
735 unsigned numExtents = colExtentsStruct.size();
736
737 for (unsigned extent = 0; extent < numExtents; extent++)
738 {
739 colStructList = colExtentsStruct[extent];
740
741 for (ColStructList::size_type i = 0; i < colStructList.size(); i++)
742 {
743 curTupleList.clear();
744 curColStruct = colStructList[i];
745 emptyVal = m_colOp[op(curColStruct.fCompressionType)]->
746 getEmptyRowValue(curColStruct.colDataType, curColStruct.colWidth);
747
748 curTuple.data = emptyVal;
749 //for (RIDList::size_type j = 0; j < ridLists[extent].size(); j++)
750 // curTupleList.push_back(curTuple);
751 curTupleList.push_back(curTuple);
752 colValueList.push_back(curTupleList);
753
754 dctnryStruct.dctnryOid = 0;
755 dctnryStruct.fColPartition = curColStruct.fColPartition;
756 dctnryStruct.fColSegment = curColStruct.fColSegment;
757 dctnryStruct.fColDbRoot = curColStruct.fColDbRoot;
758 dctnryStruct.columnOid = colStructList[i].dataOid;
759 dctnryStructList.push_back(dctnryStruct);
760
761 DctnryTuple dctnryTuple;
762 DctColTupleList dctColTuples;
763 dctnryTuple.sigValue = (unsigned char*)tmpStr.c_str();
764 dctnryTuple.sigSize = tmpStr.length();
765 dctnryTuple.isNull = true;
766 dctColTuples.push_back (dctnryTuple);
767 dctnryValueList.push_back (dctColTuples);
768 }
769
770 dctnryExtentsStruct.push_back(dctnryStructList);
771 }
772
773 // unfortunately I don't have a better way to instruct without passing too many parameters
774 m_opType = DELETE;
775 rc = updateColumnRec(txnid, colExtentsStruct, colValueList, colOldValueList, ridLists, dctnryExtentsStruct, dctnryValueList, tableOid);
776 m_opType = NOOP;
777
778 return rc;
779 }
780
deleteBadRows(const TxnID & txnid,ColStructList & colStructs,RIDList & ridList,DctnryStructList & dctnryStructList)781 int WriteEngineWrapper::deleteBadRows(const TxnID& txnid, ColStructList& colStructs,
782 RIDList& ridList, DctnryStructList& dctnryStructList)
783 {
784 /* Need to scan all files including dictionary store files to check whether there is any bad chunks
785 *
786 */
787 int rc = 0;
788 Column curCol;
789 void* valArray = NULL;
790
791 for (unsigned i = 0; i < colStructs.size(); i++)
792 {
793 ColumnOp* colOp = m_colOp[op(colStructs[i].fCompressionType)];
794 unsigned needFixFiles = colStructs[i].tokenFlag ? 2 : 1;
795 colOp->initColumn(curCol);
796
797 for (unsigned j = 0; j < needFixFiles; j++)
798 {
799 if (j == 0)
800 {
801 colOp->setColParam(curCol, 0, colStructs[i].colWidth,
802 colStructs[i].colDataType, colStructs[i].colType, colStructs[i].dataOid,
803 colStructs[i].fCompressionType, colStructs[i].fColDbRoot,
804 colStructs[i].fColPartition, colStructs[i].fColSegment);
805
806 string segFile;
807 rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
808
809 if (rc != NO_ERROR) //If openFile fails, disk error or header error is assumed.
810 {
811 //report error and return.
812 std::ostringstream oss;
813 WErrorCodes ec;
814 string err = ec.errorString(rc);
815 oss << "Error opening file oid:dbroot:partition:segment = " << colStructs[i].dataOid << ":" << colStructs[i].fColDbRoot
816 << ":" << colStructs[i].fColPartition << ":" << colStructs[i].fColSegment << " and error code is " << rc << " with message " << err;
817 throw std::runtime_error(oss.str());
818 }
819
820 switch (colStructs[i].colType)
821 {
822 case WriteEngine::WR_INT:
823 case WriteEngine::WR_MEDINT:
824 valArray = (int*) calloc(sizeof(int), 1);
825 break;
826
827 case WriteEngine::WR_UINT:
828 case WriteEngine::WR_UMEDINT:
829 valArray = (uint32_t*) calloc(sizeof(uint32_t), 1);
830 break;
831
832 case WriteEngine::WR_VARBINARY : // treat same as char for now
833 case WriteEngine::WR_CHAR:
834 case WriteEngine::WR_BLOB:
835 case WriteEngine::WR_TEXT:
836 valArray = (char*) calloc(sizeof(char), 1 * MAX_COLUMN_BOUNDARY);
837 break;
838
839 case WriteEngine::WR_FLOAT:
840 valArray = (float*) calloc(sizeof(float), 1);
841 break;
842
843 case WriteEngine::WR_DOUBLE:
844 valArray = (double*) calloc(sizeof(double), 1);
845 break;
846
847 case WriteEngine::WR_BYTE:
848 valArray = (char*) calloc(sizeof(char), 1);
849 break;
850
851 case WriteEngine::WR_UBYTE:
852 valArray = (uint8_t*) calloc(sizeof(uint8_t), 1);
853 break;
854
855 case WriteEngine::WR_SHORT:
856 valArray = (short*) calloc(sizeof(short), 1);
857 break;
858
859 case WriteEngine::WR_USHORT:
860 valArray = (uint16_t*) calloc(sizeof(uint16_t), 1);
861 break;
862
863 case WriteEngine::WR_LONGLONG:
864 valArray = (long long*) calloc(sizeof(long long), 1);
865 break;
866
867 case WriteEngine::WR_ULONGLONG:
868 valArray = (uint64_t*) calloc(sizeof(uint64_t), 1);
869 break;
870
871 case WriteEngine::WR_TOKEN:
872 valArray = (Token*) calloc(sizeof(Token), 1);
873 break;
874 }
875
876 rc = colOp->writeRows(curCol, ridList.size(), ridList, valArray, 0, true);
877
878 if ( rc != NO_ERROR)
879 {
880 //read error is fixed in place
881 if (rc == ERR_COMP_COMPRESS) //write error
882 {
883
884 }
885
886 }
887
888 //flush files will be done in the end of fix.
889 colOp->clearColumn(curCol);
890
891 if (valArray != NULL)
892 free(valArray);
893 }
894 else //dictionary file. How to fix
895 {
896 //read headers out, uncompress the last chunk, if error, replace it with empty chunk.
897 Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
898 rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
899 dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition,
900 dctnryStructList[i].fColSegment,
901 false);
902
903 rc = dctnry->checkFixLastDictChunk();
904 rc = dctnry->closeDctnry(true);
905
906 }
907 }
908 }
909
910 return rc;
911 }
912
913 /*@flushVMCache - Flush VM cache
914 */
915 /***********************************************************
916 * DESCRIPTION:
917 * Flush sytem VM cache
918 * PARAMETERS:
919 * none
920 * RETURN:
921 * none
922 ***********************************************************/
flushVMCache() const923 void WriteEngineWrapper::flushVMCache() const
924 {
925 // int fd = open("/proc/sys/vm/drop_caches", O_WRONLY);
926 // write(fd, "3", 1);
927 // close(fd);
928
929 }
930
931 /*@insertColumnRecs - Insert value(s) into a column
932 */
933 /***********************************************************
934 * DESCRIPTION:
935 * Insert values into columns (batchinsert)
936 * PARAMETERS:
937 * colStructList - column struct list
938 * colValueList - column value list
939 * RETURN:
940 * NO_ERROR if success
941 * others if something wrong in inserting the value
942 ***********************************************************/
943
insertColumnRecs(const TxnID & txnid,ColStructList & colStructList,ColValueList & colValueList,DctnryStructList & dctnryStructList,DictStrList & dictStrList,std::vector<boost::shared_ptr<DBRootExtentTracker>> & dbRootExtentTrackers,RBMetaWriter * fRBMetaWriter,bool bFirstExtentOnThisPM,bool insertSelect,bool isAutoCommitOn,OID tableOid,bool isFirstBatchPm)944 int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
945 ColStructList& colStructList,
946 ColValueList& colValueList,
947 DctnryStructList& dctnryStructList,
948 DictStrList& dictStrList,
949 std::vector<boost::shared_ptr<DBRootExtentTracker> >& dbRootExtentTrackers,
950 RBMetaWriter* fRBMetaWriter,
951 bool bFirstExtentOnThisPM,
952 bool insertSelect,
953 bool isAutoCommitOn,
954 OID tableOid,
955 bool isFirstBatchPm)
956 {
957 int rc;
958 RID* rowIdArray = NULL;
959 ColTupleList curTupleList;
960 Column curCol;
961 ColStruct curColStruct;
962 ColValueList colOldValueList;
963 ColValueList colNewValueList;
964 ColStructList newColStructList;
965 DctnryStructList newDctnryStructList;
966 HWM hwm = 0;
967 HWM oldHwm = 0;
968 HWM newHwm = 0;
969 ColTupleList::size_type totalRow;
970 ColStructList::size_type totalColumns;
971 uint64_t rowsLeft = 0;
972 bool newExtent = false;
973 RIDList ridList;
974 ColumnOp* colOp = NULL;
975
976 // Set tmp file suffix to modify HDFS db file
977 bool useTmpSuffix = false;
978
979 if (idbdatafile::IDBPolicy::useHdfs())
980 {
981 if (!bFirstExtentOnThisPM)
982 useTmpSuffix = true;
983 }
984
985 unsigned i = 0;
986 #ifdef PROFILE
987 StopWatch timer;
988 #endif
989
990 // debug information for testing
991 if (isDebug(DEBUG_2))
992 {
993 printf("\nIn wrapper insert\n");
994 printInputValue(colStructList, colValueList, ridList);
995 }
996
997 // end
998
999 //Convert data type and column width to write engine specific
1000 for (i = 0; i < colStructList.size(); i++)
1001 Convertor::convertColType(&colStructList[i]);
1002
1003 uint32_t colId = 0;
1004 // MCOL-1675: find the smallest column width to calculate the RowID from so
1005 // that all HWMs will be incremented by this operation
1006 findSmallestColumn(colId, colStructList);
1007
1008 // rc = checkValid(txnid, colStructList, colValueList, ridList);
1009 // if (rc != NO_ERROR)
1010 // return rc;
1011
1012 setTransId(txnid);
1013 uint16_t dbRoot, segmentNum;
1014 uint32_t partitionNum;
1015 string segFile;
1016 bool newFile;
1017 TableMetaData* tableMetaData = TableMetaData::makeTableMetaData(tableOid);
1018 //populate colStructList with file information
1019 IDBDataFile* pFile = NULL;
1020 std::vector<DBRootExtentInfo> extentInfo;
1021 int currentDBrootIdx = 0;
1022 std::vector<BRM::CreateStripeColumnExtentsArgOut> extents;
1023
1024 //--------------------------------------------------------------------------
1025 // For first batch on this PM:
1026 // o get starting extent from ExtentTracker, and allocate extent if needed
1027 // o construct colStructList and dctnryStructList accordingly
1028 // o save extent information in tableMetaData for future use
1029 // If not first batch on this PM:
1030 // o construct colStructList and dctnryStructList from tableMetaData
1031 //--------------------------------------------------------------------------
1032 if (isFirstBatchPm)
1033 {
1034 currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx();
1035 extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList();
1036 dbRoot = extentInfo[currentDBrootIdx].fDbRoot;
1037 partitionNum = extentInfo[currentDBrootIdx].fPartition;
1038
1039 //----------------------------------------------------------------------
1040 // check whether this extent is the first on this PM
1041 //----------------------------------------------------------------------
1042 if (bFirstExtentOnThisPM)
1043 {
1044 //cout << "bFirstExtentOnThisPM is " << bFirstExtentOnThisPM << endl;
1045 std::vector<BRM::CreateStripeColumnExtentsArgIn> cols;
1046 BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn;
1047
1048 for (i = 0; i < colStructList.size(); i++)
1049 {
1050 createStripeColumnExtentsArgIn.oid = colStructList[i].dataOid;
1051 createStripeColumnExtentsArgIn.width = colStructList[i].colWidth;
1052 createStripeColumnExtentsArgIn.colDataType = colStructList[i].colDataType;
1053 cols.push_back(createStripeColumnExtentsArgIn);
1054 }
1055
1056 rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partitionNum, segmentNum, extents);
1057
1058 if (rc != NO_ERROR)
1059 return rc;
1060
1061 //Create column files
1062 BRM::CPInfoList_t cpinfoList;
1063 BRM::CPInfo cpInfo;
1064
1065 if (isUnsigned(colStructList[i].colDataType))
1066 {
1067 cpInfo.max = 0;
1068 cpInfo.min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
1069 }
1070 else
1071 {
1072 cpInfo.max = numeric_limits<int64_t>::min();
1073 cpInfo.min = numeric_limits<int64_t>::max();
1074 }
1075
1076 cpInfo.seqNum = -1;
1077
1078 for ( i = 0; i < extents.size(); i++)
1079 {
1080 colOp = m_colOp[op(colStructList[i].fCompressionType)];
1081 colOp->initColumn(curCol);
1082 colOp->setColParam(curCol, colId, colStructList[i].colWidth, colStructList[i].colDataType,
1083 colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType,
1084 dbRoot, partitionNum, segmentNum);
1085 rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot,
1086 partitionNum, segmentNum, segFile, pFile, newFile);
1087
1088 if (rc != NO_ERROR)
1089 return rc;
1090
1091 //mark the extents to invalid
1092 cpInfo.firstLbid = extents[i].startLbid;
1093 cpinfoList.push_back(cpInfo);
1094 colStructList[i].fColPartition = partitionNum;
1095 colStructList[i].fColSegment = segmentNum;
1096 colStructList[i].fColDbRoot = dbRoot;
1097 dctnryStructList[i].fColPartition = partitionNum;
1098 dctnryStructList[i].fColSegment = segmentNum;
1099 dctnryStructList[i].fColDbRoot = dbRoot;
1100 }
1101
1102 //mark the extents to invalid
1103 rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);
1104
1105 if (rc != NO_ERROR)
1106 return rc;
1107
1108 //create corresponding dictionary files
1109 for (i = 0; i < dctnryStructList.size(); i++)
1110 {
1111 if (dctnryStructList[i].dctnryOid > 0)
1112 {
1113 rc = createDctnry(txnid, dctnryStructList[i].dctnryOid, dctnryStructList[i].colWidth, dbRoot, partitionNum,
1114 segmentNum, dctnryStructList[i].fCompressionType);
1115
1116 if ( rc != NO_ERROR)
1117 return rc;
1118 }
1119 }
1120 } // if ( bFirstExtentOnThisPM)
1121 else // if (!bFirstExtentOnThisPM)
1122 {
1123 std::vector<DBRootExtentInfo> tmpExtentInfo;
1124
1125 for (i = 0; i < dbRootExtentTrackers.size(); i++)
1126 {
1127 tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
1128 colStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
1129 colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
1130 colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
1131 //cout << "Load from dbrootExtenttracker oid:dbroot:part:seg = " <<colStructList[i].dataOid<<":"
1132 //<<colStructList[i].fColDbRoot<<":"<<colStructList[i].fColPartition<<":"<<colStructList[i].fColSegment<<endl;
1133 dctnryStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
1134 dctnryStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
1135 dctnryStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
1136 }
1137 }
1138
1139 //----------------------------------------------------------------------
1140 // Save the extents info in tableMetaData
1141 //----------------------------------------------------------------------
1142 for (i = 0; i < colStructList.size(); i++)
1143 {
1144 ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
1145 ColExtsInfo::iterator it = aColExtsInfo.begin();
1146
1147 while (it != aColExtsInfo.end())
1148 {
1149 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
1150 break;
1151
1152 it++;
1153 }
1154
1155 if (it == aColExtsInfo.end()) //add this one to the list
1156 {
1157 ColExtInfo aExt;
1158 aExt.dbRoot = colStructList[i].fColDbRoot;
1159 aExt.partNum = colStructList[i].fColPartition;
1160 aExt.segNum = colStructList[i].fColSegment;
1161 aExt.compType = colStructList[i].fCompressionType;
1162 aExt.isDict = false;
1163
1164 if (bFirstExtentOnThisPM)
1165 {
1166 aExt.hwm = extents[i].startBlkOffset;
1167 aExt.isNewExt = true;
1168 //cout << "adding a ext to metadata" << endl;
1169 }
1170 else
1171 {
1172 std::vector<DBRootExtentInfo> tmpExtentInfo;
1173 tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
1174 aExt.isNewExt = false;
1175 aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm;
1176 //cout << "oid " << colStructList[i].dataOid << " gets hwm " << aExt.hwm << endl;
1177 }
1178
1179 aExt.current = true;
1180 aColExtsInfo.push_back(aExt);
1181 //cout << "get from extentinfo oid:hwm = " << colStructList[i].dataOid << ":" << aExt.hwm << endl;
1182 }
1183
1184 tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
1185 }
1186
1187 for (i = 0; i < dctnryStructList.size(); i++)
1188 {
1189 if (dctnryStructList[i].dctnryOid > 0)
1190 {
1191 ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
1192 ColExtsInfo::iterator it = aColExtsInfo.begin();
1193
1194 while (it != aColExtsInfo.end())
1195 {
1196 if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment))
1197 break;
1198
1199 it++;
1200 }
1201
1202 if (it == aColExtsInfo.end()) //add this one to the list
1203 {
1204 ColExtInfo aExt;
1205 aExt.dbRoot = dctnryStructList[i].fColDbRoot;
1206 aExt.partNum = dctnryStructList[i].fColPartition;
1207 aExt.segNum = dctnryStructList[i].fColSegment;
1208 aExt.compType = dctnryStructList[i].fCompressionType;
1209 aExt.isDict = true;
1210 aColExtsInfo.push_back(aExt);
1211 }
1212
1213 tableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
1214 }
1215 }
1216
1217 } // if (isFirstBatchPm)
1218 else //get the extent info from tableMetaData
1219 {
1220 ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
1221 ColExtsInfo::iterator it = aColExtsInfo.begin();
1222
1223 while (it != aColExtsInfo.end())
1224 {
1225 if (it->current)
1226 break;
1227
1228 it++;
1229 }
1230
1231 if (it == aColExtsInfo.end())
1232 return 1;
1233
1234 for (i = 0; i < colStructList.size(); i++)
1235 {
1236 colStructList[i].fColPartition = it->partNum;
1237 colStructList[i].fColSegment = it->segNum;
1238 colStructList[i].fColDbRoot = it->dbRoot;
1239 dctnryStructList[i].fColPartition = it->partNum;
1240 dctnryStructList[i].fColSegment = it->segNum;
1241 dctnryStructList[i].fColDbRoot = it->dbRoot;
1242 }
1243 }
1244
1245 curTupleList = static_cast<ColTupleList>(colValueList[0]);
1246 totalRow = curTupleList.size();
1247 totalColumns = colStructList.size();
1248 rowIdArray = new RID[totalRow];
1249 // use scoped_array to ensure ptr deletion regardless of where we return
1250 boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
1251 memset(rowIdArray, 0, (sizeof(RID)*totalRow));
1252
1253 //--------------------------------------------------------------------------
1254 // allocate row id(s)
1255 //--------------------------------------------------------------------------
1256 curColStruct = colStructList[colId];
1257 colOp = m_colOp[op(curColStruct.fCompressionType)];
1258
1259 colOp->initColumn(curCol);
1260
1261 //Get the correct segment, partition, column file
1262 vector<ExtentInfo> colExtentInfo; //Save those empty extents in case of failure to rollback
1263 vector<ExtentInfo> dictExtentInfo; //Save those empty extents in case of failure to rollback
1264 vector<ExtentInfo> fileInfo;
1265 dbRoot = curColStruct.fColDbRoot;
1266 //use the first column to calculate row id
1267 ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
1268 ColExtsInfo::iterator it = aColExtsInfo.begin();
1269
1270 while (it != aColExtsInfo.end())
1271 {
1272 if ((it->dbRoot == colStructList[colId].fColDbRoot) &&
1273 (it->partNum == colStructList[colId].fColPartition) &&
1274 (it->segNum == colStructList[colId].fColSegment) && it->current )
1275 {
1276 break;
1277 }
1278 it++;
1279 }
1280
1281 if (it != aColExtsInfo.end())
1282 {
1283 hwm = it->hwm;
1284 //cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl;
1285 }
1286
1287 oldHwm = hwm; //Save this info for rollback
1288 //need to pass real dbRoot, partition, and segment to setColParam
1289 colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType,
1290 curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
1291 curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment);
1292 rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file
1293
1294 if (rc != NO_ERROR)
1295 {
1296 return rc;
1297 }
1298
1299 //get hwm first
1300 // @bug 286 : fix for bug 286 - correct the typo in getHWM
1301 //RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm));
1302
1303 Column newCol;
1304
1305 #ifdef PROFILE
1306 timer.start("allocRowId");
1307 #endif
1308 newColStructList = colStructList;
1309 newDctnryStructList = dctnryStructList;
1310 bool bUseStartExtent = true;
1311
1312 if (idbdatafile::IDBPolicy::useHdfs())
1313 insertSelect = true;
1314
1315 rc = colOp->allocRowId(txnid, bUseStartExtent,
1316 curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile,
1317 newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm);
1318
1319 //cout << "after allocrowid, total row = " <<totalRow << " newExtent is " << newExtent << endl;
1320 //cout << "column oid " << curColStruct.dataOid << " has hwm:newHwm = " << hwm <<":" << newHwm<< endl;
1321 if (rc != NO_ERROR) //Clean up is already done
1322 return rc;
1323
1324 #ifdef PROFILE
1325 timer.stop("allocRowId");
1326 #endif
1327
1328 //--------------------------------------------------------------------------
1329 // Expand initial abbreviated extent if any RID in 1st extent is > 256K.
1330 // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
1331 //--------------------------------------------------------------------------
1332 // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
1333 if ((curCol.dataFile.fPartition == 0) &&
1334 (curCol.dataFile.fSegment == 0) &&
1335 ((totalRow - rowsLeft) > 0) &&
1336 (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
1337 {
1338 for (unsigned k=0; k<colStructList.size(); k++)
1339 {
1340 // Skip the selected column
1341 if (k == colId)
1342 continue;
1343 Column expandCol;
1344 colOp = m_colOp[op(colStructList[k].fCompressionType)];
1345 colOp->setColParam(expandCol, 0,
1346 colStructList[k].colWidth,
1347 colStructList[k].colDataType,
1348 colStructList[k].colType,
1349 colStructList[k].dataOid,
1350 colStructList[k].fCompressionType,
1351 colStructList[k].fColDbRoot,
1352 colStructList[k].fColPartition,
1353 colStructList[k].fColSegment);
1354 rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file
1355
1356 if (rc == NO_ERROR)
1357 {
1358 if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
1359 {
1360 rc = colOp->expandAbbrevExtent(expandCol);
1361 }
1362 }
1363
1364 if (rc != NO_ERROR)
1365 {
1366 return rc;
1367 }
1368
1369 colOp->clearColumn(expandCol); // closes the file (if uncompressed)
1370 }
1371 }
1372
1373 //--------------------------------------------------------------------------
1374 // Tokenize data if needed
1375 //--------------------------------------------------------------------------
1376 if (insertSelect && isAutoCommitOn)
1377 BRMWrapper::setUseVb( false );
1378 else
1379 BRMWrapper::setUseVb( true );
1380
1381 dictStr::iterator dctStr_iter;
1382 ColTupleList::iterator col_iter;
1383
1384 for (i = 0; i < colStructList.size(); i++)
1385 {
1386 if (colStructList[i].tokenFlag)
1387 {
1388 dctStr_iter = dictStrList[i].begin();
1389 col_iter = colValueList[i].begin();
1390 Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
1391 rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
1392 dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition,
1393 dctnryStructList[i].fColSegment,
1394 useTmpSuffix); // @bug 5572 HDFS tmp file
1395
1396 if (rc != NO_ERROR)
1397 {
1398 cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid << endl;
1399 return rc;
1400 }
1401
1402 for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
1403 {
1404 if (dctStr_iter->length() == 0)
1405 {
1406 Token nullToken;
1407 col_iter->data = nullToken;
1408 }
1409 else
1410 {
1411 #ifdef PROFILE
1412 timer.start("tokenize");
1413 #endif
1414 DctnryTuple dctTuple;
1415 dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
1416 dctTuple.sigSize = dctStr_iter->length();
1417 dctTuple.isNull = false;
1418 rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
1419
1420 if (rc != NO_ERROR)
1421 {
1422 dctnry->closeDctnry();
1423 return rc;
1424 }
1425
1426 #ifdef PROFILE
1427 timer.stop("tokenize");
1428 #endif
1429 col_iter->data = dctTuple.token;
1430 }
1431
1432 dctStr_iter++;
1433 col_iter++;
1434
1435 }
1436
1437 //close dictionary files
1438 rc = dctnry->closeDctnry(false);
1439
1440 if (rc != NO_ERROR)
1441 return rc;
1442
1443 if (newExtent)
1444 {
1445 //@Bug 4854 back up hwm chunk for the file to be modified
1446 if (fRBMetaWriter)
1447 fRBMetaWriter->backupDctnryHWMChunk(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment);
1448
1449 rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid,
1450 newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition,
1451 newDctnryStructList[i].fColSegment,
1452 false); // @bug 5572 HDFS tmp file
1453
1454 if (rc != NO_ERROR)
1455 return rc;
1456
1457 for (uint32_t rows = 0; rows < rowsLeft; rows++)
1458 {
1459 if (dctStr_iter->length() == 0)
1460 {
1461 Token nullToken;
1462 col_iter->data = nullToken;
1463 }
1464 else
1465 {
1466 #ifdef PROFILE
1467 timer.start("tokenize");
1468 #endif
1469 DctnryTuple dctTuple;
1470 dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
1471 dctTuple.sigSize = dctStr_iter->length();
1472 dctTuple.isNull = false;
1473 rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
1474
1475 if (rc != NO_ERROR)
1476 {
1477 dctnry->closeDctnry();
1478 return rc;
1479 }
1480
1481 #ifdef PROFILE
1482 timer.stop("tokenize");
1483 #endif
1484 col_iter->data = dctTuple.token;
1485 }
1486
1487 dctStr_iter++;
1488 col_iter++;
1489 }
1490
1491 //close dictionary files
1492 rc = dctnry->closeDctnry(false);
1493
1494 if (rc != NO_ERROR)
1495 return rc;
1496 }
1497 }
1498 }
1499
1500 if (insertSelect && isAutoCommitOn)
1501 BRMWrapper::setUseVb( false );
1502 else
1503 BRMWrapper::setUseVb( true );
1504
1505 //--------------------------------------------------------------------------
1506 // Update column info structure @Bug 1862 set hwm, and
1507 // Prepare ValueList for new extent (if applicable)
1508 //--------------------------------------------------------------------------
1509 //@Bug 2205 Check whether all rows go to the new extent
1510 RID lastRid = 0;
1511 RID lastRidNew = 0;
1512
1513 if (totalRow - rowsLeft > 0)
1514 {
1515 lastRid = rowIdArray[totalRow - rowsLeft - 1];
1516 lastRidNew = rowIdArray[totalRow - 1];
1517 }
1518 else
1519 {
1520 lastRid = 0;
1521 lastRidNew = rowIdArray[totalRow - 1];
1522 }
1523
1524 //cout << "rowid allocated is " << lastRid << endl;
1525 //if a new extent is created, all the columns in this table should have their own new extent
1526 //First column already processed
1527
1528 //@Bug 1701. Close the file (if uncompressed)
1529 m_colOp[op(curCol.compressionType)]->clearColumn(curCol);
1530 //cout << "Saving hwm info for new ext batch" << endl;
1531 //Update hwm to set them in the end
1532 bool succFlag = false;
1533 unsigned colWidth = 0;
1534 int curFbo = 0, curBio;
1535
1536 for (i = 0; i < totalColumns; i++)
1537 {
1538 //shoud be obtained from saved hwm
1539 aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
1540 it = aColExtsInfo.begin();
1541
1542 while (it != aColExtsInfo.end())
1543 {
1544 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition)
1545 && (it->segNum == colStructList[i].fColSegment) && it->current)
1546 break;
1547
1548 it++;
1549 }
1550
1551 if (it != aColExtsInfo.end()) //update hwm info
1552 {
1553 oldHwm = it->hwm;
1554 }
1555
1556 // save hwm for the old extent
1557 colWidth = colStructList[i].colWidth;
1558 succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
1559
1560 //cout << "insertcolumnrec oid:rid:fbo:oldhwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << oldHwm << endl;
1561 if (succFlag)
1562 {
1563 if ((HWM)curFbo >= oldHwm)
1564 {
1565 it->hwm = (HWM)curFbo;
1566 }
1567
1568 //@Bug 4947. set current to false for old extent.
1569 if (newExtent)
1570 {
1571 it->current = false;
1572 }
1573
1574 //cout << "updated old ext info for oid " << colStructList[i].dataOid << " dbroot:part:seg:hwm:current = "
1575 //<< it->dbRoot<<":"<<it->partNum<<":"<<it->segNum<<":"<<it->hwm<<":"<< it->current<< " and newExtent is " << newExtent << endl;
1576 }
1577 else
1578 return ERR_INVALID_PARAM;
1579
1580 //update hwm for the new extent
1581 if (newExtent)
1582 {
1583 it = aColExtsInfo.begin();
1584
1585 while (it != aColExtsInfo.end())
1586 {
1587 if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition)
1588 && (it->segNum == newColStructList[i].fColSegment) && it->current)
1589 break;
1590
1591 it++;
1592 }
1593
1594 succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
1595
1596 if (succFlag)
1597 {
1598 if (it != aColExtsInfo.end())
1599 {
1600 it->hwm = (HWM)curFbo;
1601 //cout << "setting hwm to " << (int)curFbo <<" for seg " <<it->segNum << endl;
1602 it->current = true;
1603 }
1604 }
1605 else
1606 return ERR_INVALID_PARAM;
1607 }
1608
1609 tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
1610 }
1611
1612 //--------------------------------------------------------------------------
1613 //Prepare the valuelist for the new extent
1614 //--------------------------------------------------------------------------
1615 ColTupleList colTupleList;
1616 ColTupleList newColTupleList;
1617 ColTupleList firstPartTupleList;
1618
1619 for (unsigned i = 0; i < totalColumns; i++)
1620 {
1621 colTupleList = static_cast<ColTupleList>(colValueList[i]);
1622
1623 for (uint64_t j = rowsLeft; j > 0; j--)
1624 {
1625 newColTupleList.push_back(colTupleList[totalRow - j]);
1626 }
1627
1628 colNewValueList.push_back(newColTupleList);
1629 newColTupleList.clear();
1630
1631 //upate the oldvalue list for the old extent
1632 for (uint64_t j = 0; j < (totalRow - rowsLeft); j++)
1633 {
1634 firstPartTupleList.push_back(colTupleList[j]);
1635 }
1636
1637 colOldValueList.push_back(firstPartTupleList);
1638 firstPartTupleList.clear();
1639 }
1640
1641 // end of allocate row id
1642
1643 #ifdef PROFILE
1644 timer.start("writeColumnRec");
1645 #endif
1646 //cout << "Writing column record" << endl;
1647
1648 if (rc == NO_ERROR)
1649 {
1650 //----------------------------------------------------------------------
1651 //Mark extents invalid
1652 //----------------------------------------------------------------------
1653 vector<BRM::LBID_t> lbids;
1654 vector<CalpontSystemCatalog::ColDataType> colDataTypes;
1655 bool successFlag = true;
1656 unsigned width = 0;
1657 int curFbo = 0, curBio, lastFbo = -1;
1658
1659 if (isFirstBatchPm && (totalRow == rowsLeft))
1660 {}
1661 else
1662 {
1663 for (unsigned i = 0; i < colStructList.size(); i++)
1664 {
1665 colOp = m_colOp[op(colStructList[i].fCompressionType)];
1666 width = colStructList[i].colWidth;
1667 successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
1668
1669 if (successFlag)
1670 {
1671 if (curFbo != lastFbo)
1672 {
1673 RETURN_ON_ERROR(AddLBIDtoList(txnid,
1674 lbids,
1675 colDataTypes,
1676 colStructList[i],
1677 curFbo));
1678 }
1679 }
1680 }
1681 }
1682
1683 if (lbids.size() > 0)
1684 rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
1685
1686 //----------------------------------------------------------------------
1687 // Write row(s) to database file(s)
1688 //----------------------------------------------------------------------
1689 rc = writeColumnRec(txnid, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix); // @bug 5572 HDFS tmp file
1690
1691 if (rc == NO_ERROR)
1692 {
1693 if (dctnryStructList.size() > 0)
1694 {
1695 vector<BRM::OID_t> oids {static_cast<int32_t>(tableOid)};
1696 for (const DctnryStruct &dctnryStruct : dctnryStructList)
1697 {
1698 oids.push_back(dctnryStruct.dctnryOid);
1699 }
1700
1701 rc = flushOIDsFromCache(oids);
1702
1703 if (rc != 0)
1704 rc = ERR_BLKCACHE_FLUSH_LIST; // translate to WE error
1705 }
1706 }
1707 }
1708
1709 return rc;
1710 }
1711
insertColumnRecsBinary(const TxnID & txnid,ColStructList & colStructList,std::vector<uint64_t> & colValueList,DctnryStructList & dctnryStructList,DictStrList & dictStrList,std::vector<boost::shared_ptr<DBRootExtentTracker>> & dbRootExtentTrackers,RBMetaWriter * fRBMetaWriter,bool bFirstExtentOnThisPM,bool insertSelect,bool isAutoCommitOn,OID tableOid,bool isFirstBatchPm)1712 int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
1713 ColStructList& colStructList,
1714 std::vector<uint64_t>& colValueList,
1715 DctnryStructList& dctnryStructList,
1716 DictStrList& dictStrList,
1717 std::vector<boost::shared_ptr<DBRootExtentTracker> >& dbRootExtentTrackers,
1718 RBMetaWriter* fRBMetaWriter,
1719 bool bFirstExtentOnThisPM,
1720 bool insertSelect,
1721 bool isAutoCommitOn,
1722 OID tableOid,
1723 bool isFirstBatchPm)
1724 {
1725 int rc;
1726 RID* rowIdArray = NULL;
1727 Column curCol;
1728 ColStruct curColStruct;
1729 ColStructList newColStructList;
1730 std::vector<uint64_t> colNewValueList;
1731 DctnryStructList newDctnryStructList;
1732 HWM hwm = 0;
1733 HWM oldHwm = 0;
1734 HWM newHwm = 0;
1735 size_t totalRow;
1736 ColStructList::size_type totalColumns;
1737 uint64_t rowsLeft = 0;
1738 bool newExtent = false;
1739 RIDList ridList;
1740 ColumnOp* colOp = NULL;
1741 std::vector<BRM::LBID_t> dictLbids;
1742
1743 // Set tmp file suffix to modify HDFS db file
1744 bool useTmpSuffix = false;
1745
1746 if (idbdatafile::IDBPolicy::useHdfs())
1747 {
1748 if (!bFirstExtentOnThisPM)
1749 useTmpSuffix = true;
1750 }
1751
1752 unsigned i = 0;
1753 #ifdef PROFILE
1754 StopWatch timer;
1755 #endif
1756
1757 //Convert data type and column width to write engine specific
1758 for (i = 0; i < colStructList.size(); i++)
1759 Convertor::convertColType(&colStructList[i]);
1760
1761 uint32_t colId = 0;
1762 // MCOL-1675: find the smallest column width to calculate the RowID from so
1763 // that all HWMs will be incremented by this operation
1764 findSmallestColumn(colId, colStructList);
1765
1766
1767 // rc = checkValid(txnid, colStructList, colValueList, ridList);
1768 // if (rc != NO_ERROR)
1769 // return rc;
1770
1771 setTransId(txnid);
1772 uint16_t dbRoot, segmentNum;
1773 uint32_t partitionNum;
1774 string segFile;
1775 bool newFile;
1776 TableMetaData* tableMetaData = TableMetaData::makeTableMetaData(tableOid);
1777 //populate colStructList with file information
1778 IDBDataFile* pFile = NULL;
1779 std::vector<DBRootExtentInfo> extentInfo;
1780 int currentDBrootIdx = 0;
1781 std::vector<BRM::CreateStripeColumnExtentsArgOut> extents;
1782
1783 //--------------------------------------------------------------------------
1784 // For first batch on this PM:
1785 // o get starting extent from ExtentTracker, and allocate extent if needed
1786 // o construct colStructList and dctnryStructList accordingly
1787 // o save extent information in tableMetaData for future use
1788 // If not first batch on this PM:
1789 // o construct colStructList and dctnryStructList from tableMetaData
1790 //--------------------------------------------------------------------------
1791 if (isFirstBatchPm)
1792 {
1793 currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx();
1794 extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList();
1795 dbRoot = extentInfo[currentDBrootIdx].fDbRoot;
1796 partitionNum = extentInfo[currentDBrootIdx].fPartition;
1797
1798 //----------------------------------------------------------------------
1799 // check whether this extent is the first on this PM
1800 //----------------------------------------------------------------------
1801 if (bFirstExtentOnThisPM)
1802 {
1803 //cout << "bFirstExtentOnThisPM is " << bFirstExtentOnThisPM << endl;
1804 std::vector<BRM::CreateStripeColumnExtentsArgIn> cols;
1805 BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn;
1806
1807 for (i = 0; i < colStructList.size(); i++)
1808 {
1809 createStripeColumnExtentsArgIn.oid = colStructList[i].dataOid;
1810 createStripeColumnExtentsArgIn.width = colStructList[i].colWidth;
1811 createStripeColumnExtentsArgIn.colDataType = colStructList[i].colDataType;
1812 cols.push_back(createStripeColumnExtentsArgIn);
1813 }
1814
1815 rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partitionNum, segmentNum, extents);
1816
1817 if (rc != NO_ERROR)
1818 return rc;
1819
1820 //Create column files
1821 BRM::CPInfoList_t cpinfoList;
1822 BRM::CPInfo cpInfo;
1823
1824 if (isUnsigned(colStructList[i].colDataType))
1825 {
1826 cpInfo.max = 0;
1827 cpInfo.min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
1828 }
1829 else
1830 {
1831 cpInfo.max = numeric_limits<int64_t>::min();
1832 cpInfo.min = numeric_limits<int64_t>::max();
1833 }
1834
1835 cpInfo.seqNum = -1;
1836
1837 for ( i = 0; i < extents.size(); i++)
1838 {
1839 colOp = m_colOp[op(colStructList[i].fCompressionType)];
1840 colOp->initColumn(curCol);
1841 colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType,
1842 colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType,
1843 dbRoot, partitionNum, segmentNum);
1844 rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot,
1845 partitionNum, segmentNum, segFile, pFile, newFile);
1846
1847 if (rc != NO_ERROR)
1848 return rc;
1849
1850 //mark the extents to invalid
1851 cpInfo.firstLbid = extents[i].startLbid;
1852 cpinfoList.push_back(cpInfo);
1853 colStructList[i].fColPartition = partitionNum;
1854 colStructList[i].fColSegment = segmentNum;
1855 colStructList[i].fColDbRoot = dbRoot;
1856 dctnryStructList[i].fColPartition = partitionNum;
1857 dctnryStructList[i].fColSegment = segmentNum;
1858 dctnryStructList[i].fColDbRoot = dbRoot;
1859 }
1860
1861 //mark the extents to invalid
1862 rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);
1863
1864 if (rc != NO_ERROR)
1865 return rc;
1866
1867 //create corresponding dictionary files
1868 for (i = 0; i < dctnryStructList.size(); i++)
1869 {
1870 if (dctnryStructList[i].dctnryOid > 0)
1871 {
1872 rc = createDctnry(txnid, dctnryStructList[i].dctnryOid, dctnryStructList[i].colWidth, dbRoot, partitionNum,
1873 segmentNum, dctnryStructList[i].fCompressionType);
1874
1875 if ( rc != NO_ERROR)
1876 return rc;
1877 }
1878 }
1879 } // if ( bFirstExtentOnThisPM)
1880 else // if (!bFirstExtentOnThisPM)
1881 {
1882 std::vector<DBRootExtentInfo> tmpExtentInfo;
1883
1884 for (i = 0; i < dbRootExtentTrackers.size(); i++)
1885 {
1886 tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
1887 colStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
1888 colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
1889 colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
1890 //cout << "Load from dbrootExtenttracker oid:dbroot:part:seg = " <<colStructList[i].dataOid<<":"
1891 //<<colStructList[i].fColDbRoot<<":"<<colStructList[i].fColPartition<<":"<<colStructList[i].fColSegment<<endl;
1892 dctnryStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
1893 dctnryStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
1894 dctnryStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
1895 }
1896 }
1897
1898 //----------------------------------------------------------------------
1899 // Save the extents info in tableMetaData
1900 //----------------------------------------------------------------------
1901 for (i = 0; i < colStructList.size(); i++)
1902 {
1903 ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
1904 ColExtsInfo::iterator it = aColExtsInfo.begin();
1905
1906 while (it != aColExtsInfo.end())
1907 {
1908 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
1909 break;
1910
1911 it++;
1912 }
1913
1914 if (it == aColExtsInfo.end()) //add this one to the list
1915 {
1916 ColExtInfo aExt;
1917 aExt.dbRoot = colStructList[i].fColDbRoot;
1918 aExt.partNum = colStructList[i].fColPartition;
1919 aExt.segNum = colStructList[i].fColSegment;
1920 aExt.compType = colStructList[i].fCompressionType;
1921 aExt.isDict = false;
1922
1923 if (bFirstExtentOnThisPM)
1924 {
1925 aExt.hwm = extents[i].startBlkOffset;
1926 aExt.isNewExt = true;
1927 //cout << "adding a ext to metadata" << endl;
1928 }
1929 else
1930 {
1931 std::vector<DBRootExtentInfo> tmpExtentInfo;
1932 tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
1933 aExt.isNewExt = false;
1934 aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm;
1935 //cout << "oid " << colStructList[i].dataOid << " gets hwm " << aExt.hwm << endl;
1936 }
1937
1938 aExt.current = true;
1939 aColExtsInfo.push_back(aExt);
1940 //cout << "get from extentinfo oid:hwm = " << colStructList[i].dataOid << ":" << aExt.hwm << endl;
1941 }
1942
1943 tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
1944 }
1945
1946 for (i = 0; i < dctnryStructList.size(); i++)
1947 {
1948 if (dctnryStructList[i].dctnryOid > 0)
1949 {
1950 ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
1951 ColExtsInfo::iterator it = aColExtsInfo.begin();
1952
1953 while (it != aColExtsInfo.end())
1954 {
1955 if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment))
1956 break;
1957
1958 it++;
1959 }
1960
1961 if (it == aColExtsInfo.end()) //add this one to the list
1962 {
1963 ColExtInfo aExt;
1964 aExt.dbRoot = dctnryStructList[i].fColDbRoot;
1965 aExt.partNum = dctnryStructList[i].fColPartition;
1966 aExt.segNum = dctnryStructList[i].fColSegment;
1967 aExt.compType = dctnryStructList[i].fCompressionType;
1968 aExt.isDict = true;
1969 aColExtsInfo.push_back(aExt);
1970 }
1971
1972 tableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
1973 }
1974 }
1975
1976 } // if (isFirstBatchPm)
1977 else //get the extent info from tableMetaData
1978 {
1979 ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
1980 ColExtsInfo::iterator it = aColExtsInfo.begin();
1981
1982 while (it != aColExtsInfo.end())
1983 {
1984 if (it->current)
1985 break;
1986
1987 it++;
1988 }
1989
1990 if (it == aColExtsInfo.end())
1991 return 1;
1992
1993 for (i = 0; i < colStructList.size(); i++)
1994 {
1995 colStructList[i].fColPartition = it->partNum;
1996 colStructList[i].fColSegment = it->segNum;
1997 colStructList[i].fColDbRoot = it->dbRoot;
1998 dctnryStructList[i].fColPartition = it->partNum;
1999 dctnryStructList[i].fColSegment = it->segNum;
2000 dctnryStructList[i].fColDbRoot = it->dbRoot;
2001 }
2002 }
2003
2004 totalColumns = colStructList.size();
2005 totalRow = colValueList.size() / totalColumns;
2006 rowIdArray = new RID[totalRow];
2007 // use scoped_array to ensure ptr deletion regardless of where we return
2008 boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
2009 memset(rowIdArray, 0, (sizeof(RID)*totalRow));
2010
2011 //--------------------------------------------------------------------------
2012 // allocate row id(s)
2013 //--------------------------------------------------------------------------
2014
2015 curColStruct = colStructList[colId];
2016
2017 colOp = m_colOp[op(curColStruct.fCompressionType)];
2018
2019 colOp->initColumn(curCol);
2020
2021 //Get the correct segment, partition, column file
2022 vector<ExtentInfo> colExtentInfo; //Save those empty extents in case of failure to rollback
2023 vector<ExtentInfo> dictExtentInfo; //Save those empty extents in case of failure to rollback
2024 vector<ExtentInfo> fileInfo;
2025 dbRoot = curColStruct.fColDbRoot;
2026 //use the smallest column to calculate row id
2027 ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
2028 ColExtsInfo::iterator it = aColExtsInfo.begin();
2029
2030 while (it != aColExtsInfo.end())
2031 {
2032 if ((it->dbRoot == colStructList[colId].fColDbRoot) && (it->partNum == colStructList[colId].fColPartition) && (it->segNum == colStructList[colId].fColSegment) && it->current )
2033 break;
2034
2035 it++;
2036 }
2037
2038 if (it != aColExtsInfo.end())
2039 {
2040 hwm = it->hwm;
2041 //cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and seg is " << colStructList[colId].fColSegment << endl;
2042 }
2043
2044 oldHwm = hwm; //Save this info for rollback
2045 //need to pass real dbRoot, partition, and segment to setColParam
2046 colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType,
2047 curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
2048 curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment);
2049 rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file
2050
2051 if (rc != NO_ERROR)
2052 {
2053 return rc;
2054 }
2055
2056 //get hwm first
2057 // @bug 286 : fix for bug 286 - correct the typo in getHWM
2058 //RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm));
2059
2060 Column newCol;
2061
2062 #ifdef PROFILE
2063 timer.start("allocRowId");
2064 #endif
2065 newColStructList = colStructList;
2066 newDctnryStructList = dctnryStructList;
2067 bool bUseStartExtent = true;
2068
2069 if (idbdatafile::IDBPolicy::useHdfs())
2070 insertSelect = true;
2071
2072 rc = colOp->allocRowId(txnid, bUseStartExtent,
2073 curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile,
2074 newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm);
2075
2076 //cout << "after allocrowid, total row = " <<totalRow << " newExtent is " << newExtent << endl;
2077 // cout << "column oid " << curColStruct.dataOid << " has hwm:newHwm = " << hwm <<":" << newHwm<< endl;
2078 if (rc != NO_ERROR) //Clean up is already done
2079 return rc;
2080
2081 #ifdef PROFILE
2082 timer.stop("allocRowId");
2083 #endif
2084
2085 //--------------------------------------------------------------------------
2086 // Expand initial abbreviated extent if any RID in 1st extent is > 256K.
2087 // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
2088 //--------------------------------------------------------------------------
2089 // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
2090 if ((curCol.dataFile.fPartition == 0) &&
2091 (curCol.dataFile.fSegment == 0) &&
2092 ((totalRow - rowsLeft) > 0) &&
2093 (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
2094 {
2095 for (size_t k = 0; k < colStructList.size(); k++)
2096 {
2097 // Skip the selected column
2098 if (k == (size_t)colId)
2099 continue;
2100
2101 Column expandCol;
2102 colOp = m_colOp[op(colStructList[k].fCompressionType)];
2103 // Shouldn't we change 0 to colId here?
2104 colOp->setColParam(expandCol, 0,
2105 colStructList[k].colWidth,
2106 colStructList[k].colDataType,
2107 colStructList[k].colType,
2108 colStructList[k].dataOid,
2109 colStructList[k].fCompressionType,
2110 colStructList[k].fColDbRoot,
2111 colStructList[k].fColPartition,
2112 colStructList[k].fColSegment);
2113 rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file
2114
2115 if (rc == NO_ERROR)
2116 {
2117 if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
2118 {
2119 rc = colOp->expandAbbrevExtent(expandCol);
2120 }
2121 }
2122
2123 if (rc != NO_ERROR)
2124 {
2125 return rc;
2126 }
2127
2128 colOp->closeColumnFile(expandCol);
2129 }
2130 }
2131
2132 //--------------------------------------------------------------------------
2133 // Tokenize data if needed
2134 //--------------------------------------------------------------------------
2135 if (insertSelect && isAutoCommitOn)
2136 BRMWrapper::setUseVb( false );
2137 else
2138 BRMWrapper::setUseVb( true );
2139
2140 dictStr::iterator dctStr_iter;
2141 uint64_t* colValPtr;
2142 size_t rowsPerColumn = colValueList.size() / colStructList.size();
2143
2144 for (i = 0; i < colStructList.size(); i++)
2145 {
2146 if (colStructList[i].tokenFlag)
2147 {
2148 dctStr_iter = dictStrList[i].begin();
2149 Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
2150 rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
2151 dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition,
2152 dctnryStructList[i].fColSegment,
2153 useTmpSuffix); // @bug 5572 HDFS tmp file
2154
2155 if (rc != NO_ERROR)
2156 {
2157 cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid << endl;
2158 return rc;
2159 }
2160
2161 for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
2162 {
2163 colValPtr = &colValueList[(i * rowsPerColumn) + rows];
2164
2165 if (dctStr_iter->length() == 0)
2166 {
2167 Token nullToken;
2168 memcpy(colValPtr, &nullToken, 8);
2169 }
2170 else
2171 {
2172 #ifdef PROFILE
2173 timer.start("tokenize");
2174 #endif
2175 DctnryTuple dctTuple;
2176 dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
2177 dctTuple.sigSize = dctStr_iter->length();
2178 dctTuple.isNull = false;
2179 rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
2180
2181 if (rc != NO_ERROR)
2182 {
2183 dctnry->closeDctnry();
2184 return rc;
2185 }
2186
2187 #ifdef PROFILE
2188 timer.stop("tokenize");
2189 #endif
2190 memcpy(colValPtr, &dctTuple.token, 8);
2191 dictLbids.push_back(dctTuple.token.fbo);
2192 }
2193
2194 dctStr_iter++;
2195
2196 }
2197
2198 //close dictionary files
2199 rc = dctnry->closeDctnry(false);
2200
2201 if (rc != NO_ERROR)
2202 return rc;
2203
2204 if (newExtent)
2205 {
2206 //@Bug 4854 back up hwm chunk for the file to be modified
2207 if (fRBMetaWriter)
2208 fRBMetaWriter->backupDctnryHWMChunk(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment);
2209
2210 rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid,
2211 newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition,
2212 newDctnryStructList[i].fColSegment,
2213 false); // @bug 5572 HDFS tmp file
2214
2215 if (rc != NO_ERROR)
2216 return rc;
2217
2218 for (uint32_t rows = 0; rows < rowsLeft; rows++)
2219 {
2220 colValPtr = &colValueList[(i * rowsPerColumn) + rows];
2221
2222 if (dctStr_iter->length() == 0)
2223 {
2224 Token nullToken;
2225 memcpy(colValPtr, &nullToken, 8);
2226 }
2227 else
2228 {
2229 #ifdef PROFILE
2230 timer.start("tokenize");
2231 #endif
2232 DctnryTuple dctTuple;
2233 dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
2234 dctTuple.sigSize = dctStr_iter->length();
2235 dctTuple.isNull = false;
2236 rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
2237
2238 if (rc != NO_ERROR)
2239 {
2240 dctnry->closeDctnry();
2241 return rc;
2242 }
2243
2244 #ifdef PROFILE
2245 timer.stop("tokenize");
2246 #endif
2247 memcpy(colValPtr, &dctTuple.token, 8);
2248 dictLbids.push_back(dctTuple.token.fbo);
2249 }
2250
2251 dctStr_iter++;
2252 }
2253
2254 //close dictionary files
2255 rc = dctnry->closeDctnry(false);
2256
2257 if (rc != NO_ERROR)
2258 return rc;
2259 }
2260 }
2261 }
2262
2263 if (insertSelect && isAutoCommitOn)
2264 BRMWrapper::setUseVb( false );
2265 else
2266 BRMWrapper::setUseVb( true );
2267
2268 //--------------------------------------------------------------------------
2269 // Update column info structure @Bug 1862 set hwm, and
2270 // Prepare ValueList for new extent (if applicable)
2271 //--------------------------------------------------------------------------
2272 //@Bug 2205 Check whether all rows go to the new extent
2273 RID lastRid = 0;
2274 RID lastRidNew = 0;
2275
2276 if (totalRow - rowsLeft > 0)
2277 {
2278 lastRid = rowIdArray[totalRow - rowsLeft - 1];
2279 lastRidNew = rowIdArray[totalRow - 1];
2280 }
2281 else
2282 {
2283 lastRid = 0;
2284 lastRidNew = rowIdArray[totalRow - 1];
2285 }
2286
2287 //cout << "rowid allocated is " << lastRid << endl;
2288 //if a new extent is created, all the columns in this table should have their own new extent
2289 //First column already processed
2290
2291 //@Bug 1701. Close the file (if uncompressed)
2292 m_colOp[op(curCol.compressionType)]->closeColumnFile(curCol);
2293 //cout << "Saving hwm info for new ext batch" << endl;
2294 //Update hwm to set them in the end
2295 bool succFlag = false;
2296 unsigned colWidth = 0;
2297 int curFbo = 0, curBio;
2298
2299 for (i = 0; i < totalColumns; i++)
2300 {
2301 //shoud be obtained from saved hwm
2302 aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
2303 it = aColExtsInfo.begin();
2304
2305 while (it != aColExtsInfo.end())
2306 {
2307 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition)
2308 && (it->segNum == colStructList[i].fColSegment) && it->current)
2309 break;
2310
2311 it++;
2312 }
2313
2314 if (it != aColExtsInfo.end()) //update hwm info
2315 {
2316 oldHwm = it->hwm;
2317
2318 // save hwm for the old extent
2319 colWidth = colStructList[i].colWidth;
2320 succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
2321
2322 //cout << "insertcolumnrec oid:rid:fbo:oldhwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << oldHwm << endl;
2323 if (succFlag)
2324 {
2325 if ((HWM)curFbo >= oldHwm)
2326 {
2327 it->hwm = (HWM)curFbo;
2328 }
2329
2330 //@Bug 4947. set current to false for old extent.
2331 if (newExtent)
2332 {
2333 it->current = false;
2334 }
2335
2336 //cout << "updated old ext info for oid " << colStructList[i].dataOid << " dbroot:part:seg:hwm:current = "
2337 //<< it->dbRoot<<":"<<it->partNum<<":"<<it->segNum<<":"<<it->hwm<<":"<< it->current<< " and newExtent is " << newExtent << endl;
2338 }
2339 else
2340 return ERR_INVALID_PARAM;
2341
2342 }
2343
2344 //update hwm for the new extent
2345 if (newExtent)
2346 {
2347 it = aColExtsInfo.begin();
2348
2349 while (it != aColExtsInfo.end())
2350 {
2351 if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition)
2352 && (it->segNum == newColStructList[i].fColSegment) && it->current)
2353 break;
2354
2355 it++;
2356 }
2357
2358 colWidth = newColStructList[i].colWidth;
2359 succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
2360
2361 if (succFlag)
2362 {
2363 if (it != aColExtsInfo.end())
2364 {
2365 it->hwm = (HWM)curFbo;
2366 //cout << "setting hwm to " << (int)curFbo <<" for seg " <<it->segNum << endl;
2367 it->current = true;
2368 }
2369 }
2370 else
2371 return ERR_INVALID_PARAM;
2372 }
2373
2374 tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
2375 }
2376
2377 //--------------------------------------------------------------------------
2378 //Prepare the valuelist for the new extent
2379 //--------------------------------------------------------------------------
2380
2381 for (unsigned i = 1; i <= totalColumns; i++)
2382 {
2383 // Copy values to second value list
2384 for (uint64_t j = rowsLeft; j > 0; j--)
2385 {
2386 colNewValueList.push_back(colValueList[(totalRow * i) - j]);
2387 }
2388 }
2389
2390 // end of allocate row id
2391
2392 #ifdef PROFILE
2393 timer.start("writeColumnRec");
2394 #endif
2395 //cout << "Writing column record" << endl;
2396
2397 if (rc == NO_ERROR)
2398 {
2399 //----------------------------------------------------------------------
2400 //Mark extents invalid
2401 //----------------------------------------------------------------------
2402 vector<BRM::LBID_t> lbids;
2403 vector<CalpontSystemCatalog::ColDataType> colDataTypes;
2404 bool successFlag = true;
2405 unsigned width = 0;
2406 int curFbo = 0, curBio, lastFbo = -1;
2407
2408 if (isFirstBatchPm && (totalRow == rowsLeft))
2409 {}
2410 else
2411 {
2412 for (unsigned i = 0; i < colStructList.size(); i++)
2413 {
2414 colOp = m_colOp[op(colStructList[i].fCompressionType)];
2415 width = colStructList[i].colWidth;
2416 successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
2417
2418 if (successFlag)
2419 {
2420 if (curFbo != lastFbo)
2421 {
2422 RETURN_ON_ERROR(AddLBIDtoList(txnid,
2423 lbids,
2424 colDataTypes,
2425 colStructList[i],
2426 curFbo));
2427 }
2428 }
2429 else
2430 return ERR_INVALID_PARAM;
2431 }
2432 }
2433
2434 // If we create a new extent for this batch
2435 for (unsigned i = 0; i < newColStructList.size(); i++)
2436 {
2437 colOp = m_colOp[op(newColStructList[i].fCompressionType)];
2438 width = newColStructList[i].colWidth;
2439 successFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / width, width, curFbo, curBio);
2440
2441 if (successFlag)
2442 {
2443 if (curFbo != lastFbo)
2444 {
2445 RETURN_ON_ERROR(AddLBIDtoList(txnid,
2446 lbids,
2447 colDataTypes,
2448 newColStructList[i],
2449 curFbo));
2450 }
2451 }
2452 else
2453 return ERR_INVALID_PARAM;
2454 }
2455
2456 if (lbids.size() > 0)
2457 rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
2458
2459 //----------------------------------------------------------------------
2460 // Write row(s) to database file(s)
2461 //----------------------------------------------------------------------
2462 bool versioning = !(isAutoCommitOn && insertSelect);
2463 AddDictToList(txnid, dictLbids);
2464 rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file
2465 }
2466
2467 return rc;
2468 }
2469
2470
insertColumnRec_SYS(const TxnID & txnid,ColStructList & colStructList,ColValueList & colValueList,DctnryStructList & dctnryStructList,DictStrList & dictStrList,const int32_t tableOid)2471 int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid,
2472 ColStructList& colStructList,
2473 ColValueList& colValueList,
2474 DctnryStructList& dctnryStructList,
2475 DictStrList& dictStrList,
2476 const int32_t tableOid)
2477 {
2478 int rc;
2479 RID* rowIdArray = NULL;
2480 ColTupleList curTupleList;
2481 Column curCol;
2482 ColStruct curColStruct;
2483 ColValueList colOldValueList;
2484 ColValueList colNewValueList;
2485 ColStructList newColStructList;
2486 DctnryStructList newDctnryStructList;
2487 HWM hwm = 0;
2488 HWM newHwm = 0;
2489 HWM oldHwm = 0;
2490 ColTupleList::size_type totalRow;
2491 ColStructList::size_type totalColumns;
2492 uint64_t rowsLeft = 0;
2493 bool newExtent = false;
2494 RIDList ridList;
2495 ColumnOp* colOp = NULL;
2496 uint32_t i = 0;
2497 #ifdef PROFILE
2498 StopWatch timer;
2499 #endif
2500
2501 // debug information for testing
2502 if (isDebug(DEBUG_2))
2503 {
2504 printf("\nIn wrapper insert\n");
2505 printInputValue(colStructList, colValueList, ridList);
2506 }
2507
2508 // end
2509
2510 //Convert data type and column width to write engine specific
2511 for (i = 0; i < colStructList.size(); i++)
2512 Convertor::convertColType(&colStructList[i]);
2513
2514 rc = checkValid(txnid, colStructList, colValueList, ridList);
2515
2516 if (rc != NO_ERROR)
2517 return rc;
2518
2519 setTransId(txnid);
2520
2521 curTupleList = static_cast<ColTupleList>(colValueList[0]);
2522 totalRow = curTupleList.size();
2523 totalColumns = colStructList.size();
2524 rowIdArray = new RID[totalRow];
2525 // use scoped_array to ensure ptr deletion regardless of where we return
2526 boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
2527 memset(rowIdArray, 0, (sizeof(RID)*totalRow));
2528
2529 // allocate row id(s)
2530 curColStruct = colStructList[0];
2531 colOp = m_colOp[op(curColStruct.fCompressionType)];
2532
2533 colOp->initColumn(curCol);
2534
2535 //Get the correct segment, partition, column file
2536 uint16_t dbRoot, segmentNum;
2537 uint32_t partitionNum;
2538 vector<ExtentInfo> colExtentInfo; //Save those empty extents in case of failure to rollback
2539 vector<ExtentInfo> dictExtentInfo; //Save those empty extents in case of failure to rollback
2540 vector<ExtentInfo> fileInfo;
2541 ExtentInfo info;
2542 //Don't search for empty space, always append to the end. May need to revisit this part
2543 dbRoot = curColStruct.fColDbRoot;
2544 int extState;
2545 bool extFound;
2546 RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(
2547 curColStruct.dataOid, dbRoot, partitionNum, segmentNum, hwm,
2548 extState, extFound));
2549
2550 for (i = 0; i < colStructList.size(); i++)
2551 {
2552 colStructList[i].fColPartition = partitionNum;
2553 colStructList[i].fColSegment = segmentNum;
2554 colStructList[i].fColDbRoot = dbRoot;
2555 }
2556
2557 oldHwm = hwm; //Save this info for rollback
2558 //need to pass real dbRoot, partition, and segment to setColParam
2559 colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType,
2560 curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
2561 dbRoot, partitionNum, segmentNum);
2562
2563 string segFile;
2564 rc = colOp->openColumnFile(curCol, segFile, false); // @bug 5572 HDFS tmp file
2565
2566 if (rc != NO_ERROR)
2567 {
2568 return rc;
2569 }
2570
2571 //get hwm first
2572 // @bug 286 : fix for bug 286 - correct the typo in getHWM
2573 //RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm));
2574
2575 //...Note that we are casting totalRow to int to be in sync with
2576 //...allocRowId(). So we are assuming that totalRow
2577 //...(curTupleList.size()) will fit into an int. We arleady made
2578 //...that assumption earlier in this method when we used totalRow
2579 //...in the call to calloc() to allocate rowIdArray.
2580 Column newCol;
2581 bool newFile;
2582
2583 #ifdef PROFILE
2584 timer.start("allocRowId");
2585 #endif
2586
2587 newColStructList = colStructList;
2588 newDctnryStructList = dctnryStructList;
2589 std::vector<boost::shared_ptr<DBRootExtentTracker> > dbRootExtentTrackers;
2590 bool bUseStartExtent = true;
2591 rc = colOp->allocRowId(txnid, bUseStartExtent,
2592 curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList,
2593 dbRootExtentTrackers, false, false, 0);
2594
2595 if ((rc == ERR_FILE_DISK_SPACE) && newExtent)
2596 {
2597 for (i = 0; i < newColStructList.size(); i++)
2598 {
2599 info.oid = newColStructList[i].dataOid;
2600 info.partitionNum = newColStructList[i].fColPartition;
2601 info.segmentNum = newColStructList[i].fColSegment;
2602 info.dbRoot = newColStructList[i].fColDbRoot;
2603
2604 if (newFile)
2605 fileInfo.push_back (info);
2606
2607 colExtentInfo.push_back (info);
2608 }
2609
2610 int rc1 = BRMWrapper::getInstance()->deleteEmptyColExtents(colExtentInfo);
2611
2612 if ((rc1 == 0) && newFile)
2613 {
2614 rc1 = colOp->deleteFile(fileInfo[0].oid, fileInfo[0].dbRoot, fileInfo[0].partitionNum, fileInfo[0].segmentNum);
2615
2616 if ( rc1 != NO_ERROR)
2617 return rc;
2618
2619 FileOp fileOp;
2620
2621 for (i = 0; i < newDctnryStructList.size(); i++)
2622 {
2623 if (newDctnryStructList[i].dctnryOid > 0)
2624 {
2625 info.oid = newDctnryStructList[i].dctnryOid;
2626 info.partitionNum = newDctnryStructList[i].fColPartition;
2627 info.segmentNum = newDctnryStructList[i].fColSegment;
2628 info.dbRoot = newDctnryStructList[i].fColDbRoot;
2629 info.newFile = true;
2630 fileInfo.push_back (info);
2631 dictExtentInfo.push_back (info);
2632 }
2633 }
2634
2635 if (dictExtentInfo.size() > 0)
2636 {
2637 rc1 = BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo);
2638
2639 if ( rc1 != NO_ERROR)
2640 return rc;
2641
2642 for (unsigned j = 0; j < fileInfo.size(); j++)
2643 {
2644 rc1 = fileOp.deleteFile(fileInfo[j].oid, fileInfo[j].dbRoot,
2645 fileInfo[j].partitionNum, fileInfo[j].segmentNum);
2646 }
2647 }
2648 }
2649 }
2650
2651 TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableOid);
2652
2653 //..Expand initial abbreviated extent if any RID in 1st extent is > 256K
2654 // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
2655 if ((partitionNum == 0) &&
2656 (segmentNum == 0) &&
2657 ((totalRow - rowsLeft) > 0) &&
2658 (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
2659 {
2660 for (size_t k = 1; k < colStructList.size(); k++)
2661 {
2662 Column expandCol;
2663 colOp = m_colOp[op(colStructList[k].fCompressionType)];
2664 colOp->setColParam(expandCol, 0,
2665 colStructList[k].colWidth,
2666 colStructList[k].colDataType,
2667 colStructList[k].colType,
2668 colStructList[k].dataOid,
2669 colStructList[k].fCompressionType,
2670 dbRoot,
2671 partitionNum,
2672 segmentNum);
2673 rc = colOp->openColumnFile(expandCol, segFile, false); // @bug 5572 HDFS tmp file
2674
2675 if (rc == NO_ERROR)
2676 {
2677 if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
2678 {
2679 rc = colOp->expandAbbrevExtent(expandCol);
2680 }
2681 }
2682
2683 if (rc != NO_ERROR)
2684 {
2685 if (newExtent)
2686 {
2687 //Remove the empty extent added to the first column
2688 int rc1 = BRMWrapper::getInstance()->
2689 deleteEmptyColExtents(colExtentInfo);
2690
2691 if ((rc1 == 0) && newFile)
2692 {
2693 rc1 = colOp->deleteFile(fileInfo[0].oid,
2694 fileInfo[0].dbRoot,
2695 fileInfo[0].partitionNum,
2696 fileInfo[0].segmentNum);
2697 }
2698 }
2699
2700 colOp->clearColumn(expandCol); // closes the file
2701 return rc;
2702 }
2703
2704 colOp->clearColumn(expandCol); // closes the file
2705 }
2706 }
2707
2708 BRMWrapper::setUseVb(true);
2709 //Tokenize data if needed
2710 dictStr::iterator dctStr_iter;
2711 ColTupleList::iterator col_iter;
2712
2713 for (i = 0; i < colStructList.size(); i++)
2714 {
2715 if (colStructList[i].tokenFlag)
2716 {
2717 dctStr_iter = dictStrList[i].begin();
2718 col_iter = colValueList[i].begin();
2719 Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
2720
2721 dctnryStructList[i].fColPartition = partitionNum;
2722 dctnryStructList[i].fColSegment = segmentNum;
2723 dctnryStructList[i].fColDbRoot = dbRoot;
2724 rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
2725 dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition,
2726 dctnryStructList[i].fColSegment,
2727 false); // @bug 5572 HDFS tmp file
2728
2729 if (rc != NO_ERROR)
2730 return rc;
2731
2732 ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
2733 ColExtsInfo::iterator it = aColExtsInfo.begin();
2734
2735 while (it != aColExtsInfo.end())
2736 {
2737 if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment))
2738 break;
2739
2740 it++;
2741 }
2742
2743 if (it == aColExtsInfo.end()) //add this one to the list
2744 {
2745 ColExtInfo aExt;
2746 aExt.dbRoot = dctnryStructList[i].fColDbRoot;
2747 aExt.partNum = dctnryStructList[i].fColPartition;
2748 aExt.segNum = dctnryStructList[i].fColSegment;
2749 aExt.compType = dctnryStructList[i].fCompressionType;
2750 aExt.isDict = true;
2751 aColExtsInfo.push_back(aExt);
2752 aTableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
2753 }
2754
2755 for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
2756 {
2757 if (dctStr_iter->length() == 0)
2758 {
2759 Token nullToken;
2760 col_iter->data = nullToken;
2761 }
2762 else
2763 {
2764 #ifdef PROFILE
2765 timer.start("tokenize");
2766 #endif
2767 DctnryTuple dctTuple;
2768 dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
2769 dctTuple.sigSize = dctStr_iter->length();
2770 dctTuple.isNull = false;
2771 rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
2772
2773 if (rc != NO_ERROR)
2774 {
2775 dctnry->closeDctnry();
2776 return rc;
2777 }
2778
2779 #ifdef PROFILE
2780 timer.stop("tokenize");
2781 #endif
2782 col_iter->data = dctTuple.token;
2783 }
2784
2785 dctStr_iter++;
2786 col_iter++;
2787
2788 }
2789
2790 //close dictionary files
2791 rc = dctnry->closeDctnry();
2792
2793 if (rc != NO_ERROR)
2794 return rc;
2795
2796 if (newExtent)
2797 {
2798 rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid,
2799 newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition,
2800 newDctnryStructList[i].fColSegment,
2801 false); // @bug 5572 HDFS tmp file
2802
2803 if (rc != NO_ERROR)
2804 return rc;
2805
2806 aColExtsInfo = aTableMetaData->getColExtsInfo(newDctnryStructList[i].dctnryOid);
2807 it = aColExtsInfo.begin();
2808
2809 while (it != aColExtsInfo.end())
2810 {
2811 if ((it->dbRoot == newDctnryStructList[i].fColDbRoot) && (it->partNum == newDctnryStructList[i].fColPartition) && (it->segNum == newDctnryStructList[i].fColSegment))
2812 break;
2813
2814 it++;
2815 }
2816
2817 if (it == aColExtsInfo.end()) //add this one to the list
2818 {
2819 ColExtInfo aExt;
2820 aExt.dbRoot = newDctnryStructList[i].fColDbRoot;
2821 aExt.partNum = newDctnryStructList[i].fColPartition;
2822 aExt.segNum = newDctnryStructList[i].fColSegment;
2823 aExt.compType = newDctnryStructList[i].fCompressionType;
2824 aExt.isDict = true;
2825 aColExtsInfo.push_back(aExt);
2826 aTableMetaData->setColExtsInfo(newDctnryStructList[i].dctnryOid, aColExtsInfo);
2827 }
2828
2829 for (uint32_t rows = 0; rows < rowsLeft; rows++)
2830 {
2831 if (dctStr_iter->length() == 0)
2832 {
2833 Token nullToken;
2834 col_iter->data = nullToken;
2835 }
2836 else
2837 {
2838 #ifdef PROFILE
2839 timer.start("tokenize");
2840 #endif
2841 DctnryTuple dctTuple;
2842 dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
2843 dctTuple.sigSize = dctStr_iter->length();
2844 dctTuple.isNull = false;
2845 rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
2846
2847 if (rc != NO_ERROR)
2848 {
2849 dctnry->closeDctnry();
2850 return rc;
2851 }
2852
2853 #ifdef PROFILE
2854 timer.stop("tokenize");
2855 #endif
2856 col_iter->data = dctTuple.token;
2857 }
2858
2859 dctStr_iter++;
2860 col_iter++;
2861 }
2862
2863 //close dictionary files
2864 rc = dctnry->closeDctnry();
2865
2866 if (rc != NO_ERROR)
2867 return rc;
2868 }
2869 }
2870 }
2871
2872
2873 //Update column info structure @Bug 1862 set hwm
2874 //@Bug 2205 Check whether all rows go to the new extent
2875 RID lastRid = 0;
2876 RID lastRidNew = 0;
2877
2878 if (totalRow - rowsLeft > 0)
2879 {
2880 lastRid = rowIdArray[totalRow - rowsLeft - 1];
2881 lastRidNew = rowIdArray[totalRow - 1];
2882 }
2883 else
2884 {
2885 lastRid = 0;
2886 lastRidNew = rowIdArray[totalRow - 1];
2887 }
2888
2889 //cout << "rowid allocated is " << lastRid << endl;
2890 //if a new extent is created, all the columns in this table should have their own new extent
2891
2892 //@Bug 1701. Close the file
2893 m_colOp[op(curCol.compressionType)]->clearColumn(curCol);
2894 std::vector<BulkSetHWMArg> hwmVecNewext;
2895 std::vector<BulkSetHWMArg> hwmVecOldext;
2896
2897 if (newExtent) //Save all hwms to set them later.
2898 {
2899 BulkSetHWMArg aHwmEntryNew;
2900 BulkSetHWMArg aHwmEntryOld;
2901 bool succFlag = false;
2902 unsigned colWidth = 0;
2903 int extState;
2904 bool extFound;
2905 int curFbo = 0, curBio;
2906
2907 for (i = 0; i < totalColumns; i++)
2908 {
2909 Column curColLocal;
2910 colOp->initColumn(curColLocal);
2911
2912 colOp = m_colOp[op(newColStructList[i].fCompressionType)];
2913 colOp->setColParam(curColLocal, 0,
2914 newColStructList[i].colWidth, newColStructList[i].colDataType,
2915 newColStructList[i].colType, newColStructList[i].dataOid,
2916 newColStructList[i].fCompressionType, dbRoot, partitionNum, segmentNum);
2917
2918 rc = BRMWrapper::getInstance()->getLastHWM_DBroot(
2919 curColLocal.dataFile.fid, dbRoot, partitionNum, segmentNum, oldHwm,
2920 extState, extFound);
2921
2922 info.oid = curColLocal.dataFile.fid;
2923 info.partitionNum = partitionNum;
2924 info.segmentNum = segmentNum;
2925 info.dbRoot = dbRoot;
2926 info.hwm = oldHwm;
2927 colExtentInfo.push_back(info);
2928 // @Bug 2714 need to set hwm for the old extent
2929 colWidth = colStructList[i].colWidth;
2930 succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
2931
2932 //cout << "insertcolumnrec oid:rid:fbo:hwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << hwm << endl;
2933 if (succFlag)
2934 {
2935 if ((HWM)curFbo > oldHwm)
2936 {
2937 aHwmEntryOld.oid = colStructList[i].dataOid;
2938 aHwmEntryOld.partNum = partitionNum;
2939 aHwmEntryOld.segNum = segmentNum;
2940 aHwmEntryOld.hwm = curFbo;
2941 hwmVecOldext.push_back(aHwmEntryOld);
2942 }
2943 }
2944 else
2945 return ERR_INVALID_PARAM;
2946
2947 colWidth = newColStructList[i].colWidth;
2948 succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
2949
2950 if (succFlag)
2951 {
2952 aHwmEntryNew.oid = newColStructList[i].dataOid;
2953 aHwmEntryNew.partNum = newColStructList[i].fColPartition;
2954 aHwmEntryNew.segNum = newColStructList[i].fColSegment;
2955 aHwmEntryNew.hwm = curFbo;
2956 hwmVecNewext.push_back(aHwmEntryNew);
2957 }
2958
2959 m_colOp[op(curColLocal.compressionType)]->clearColumn(curColLocal);
2960 }
2961
2962 //Prepare the valuelist for the new extent
2963 ColTupleList colTupleList;
2964 ColTupleList newColTupleList;
2965 ColTupleList firstPartTupleList;
2966
2967 for (unsigned i = 0; i < totalColumns; i++)
2968 {
2969 colTupleList = static_cast<ColTupleList>(colValueList[i]);
2970
2971 for (uint64_t j = rowsLeft; j > 0; j--)
2972 {
2973 newColTupleList.push_back(colTupleList[totalRow - j]);
2974 }
2975
2976 colNewValueList.push_back(newColTupleList);
2977 newColTupleList.clear();
2978
2979 //upate the oldvalue list for the old extent
2980 for (uint64_t j = 0; j < (totalRow - rowsLeft); j++)
2981 {
2982 firstPartTupleList.push_back(colTupleList[j]);
2983 }
2984
2985 colOldValueList.push_back(firstPartTupleList);
2986 firstPartTupleList.clear();
2987 }
2988 }
2989
2990 //Mark extents invalid
2991 vector<BRM::LBID_t> lbids;
2992 vector<CalpontSystemCatalog::ColDataType> colDataTypes;
2993 bool successFlag = true;
2994 unsigned width = 0;
2995 BRM::LBID_t lbid;
2996 int curFbo = 0, curBio, lastFbo = -1;
2997
2998 if (totalRow - rowsLeft > 0)
2999 {
3000 for (unsigned i = 0; i < colStructList.size(); i++)
3001 {
3002 colOp = m_colOp[op(colStructList[i].fCompressionType)];
3003 width = colStructList[i].colWidth;
3004 successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
3005
3006 if (successFlag)
3007 {
3008 if (curFbo != lastFbo)
3009 {
3010 RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
3011 colStructList[i].dataOid, colStructList[i].fColPartition,
3012 colStructList[i].fColSegment, curFbo, lbid));
3013 lbids.push_back((BRM::LBID_t)lbid);
3014 colDataTypes.push_back(colStructList[i].colDataType);
3015 }
3016 }
3017 }
3018 }
3019
3020 lastRid = rowIdArray[totalRow - 1];
3021
3022 for (unsigned i = 0; i < newColStructList.size(); i++)
3023 {
3024 colOp = m_colOp[op(newColStructList[i].fCompressionType)];
3025 width = newColStructList[i].colWidth;
3026 successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
3027
3028 if (successFlag)
3029 {
3030 if (curFbo != lastFbo)
3031 {
3032 RETURN_ON_ERROR(AddLBIDtoList(txnid,
3033 lbids,
3034 colDataTypes,
3035 newColStructList[i],
3036 curFbo));
3037 }
3038 }
3039 }
3040
3041 //cout << "lbids size = " << lbids.size()<< endl;
3042 if (lbids.size() > 0)
3043 rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
3044
3045 if (rc == NO_ERROR)
3046 {
3047 // MCOL-66 The DBRM can't handle concurrent transactions to sys tables
3048 static boost::mutex dbrmMutex;
3049 boost::mutex::scoped_lock lk(dbrmMutex);
3050
3051 if (newExtent)
3052 {
3053 rc = writeColumnRec(txnid, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file
3054 }
3055 else
3056 {
3057 rc = writeColumnRec(txnid, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file
3058 }
3059 }
3060
3061 #ifdef PROFILE
3062 timer.stop("writeColumnRec");
3063 #endif
3064 // for (ColTupleList::size_type i = 0; i < totalRow; i++)
3065 // ridList.push_back((RID) rowIdArray[i]);
3066
3067 // if (rc == NO_ERROR)
3068 // rc = flushDataFiles(NO_ERROR);
3069
3070 if ( !newExtent )
3071 {
3072 //flushVMCache();
3073 bool succFlag = false;
3074 unsigned colWidth = 0;
3075 int extState;
3076 bool extFound;
3077 int curFbo = 0, curBio;
3078 std::vector<BulkSetHWMArg> hwmVec;
3079
3080 for (unsigned i = 0; i < totalColumns; i++)
3081 {
3082 //colOp = m_colOp[op(colStructList[i].fCompressionType)];
3083 //Set all columns hwm together
3084 BulkSetHWMArg aHwmEntry;
3085 RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(colStructList[i].dataOid, dbRoot, partitionNum, segmentNum, hwm,
3086 extState, extFound));
3087 colWidth = colStructList[i].colWidth;
3088 succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
3089
3090 //cout << "insertcolumnrec oid:rid:fbo:hwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << hwm << endl;
3091 if (succFlag)
3092 {
3093 if ((HWM)curFbo > hwm)
3094 {
3095 aHwmEntry.oid = colStructList[i].dataOid;
3096 aHwmEntry.partNum = partitionNum;
3097 aHwmEntry.segNum = segmentNum;
3098 aHwmEntry.hwm = curFbo;
3099 hwmVec.push_back(aHwmEntry);
3100 }
3101 }
3102 else
3103 return ERR_INVALID_PARAM;
3104 }
3105
3106 if (hwmVec.size() > 0 )
3107 {
3108 std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
3109 RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP( hwmVec, mergeCPDataArgs));
3110 }
3111 }
3112
3113 if (newExtent)
3114 {
3115 #ifdef PROFILE
3116 timer.start("flushVMCache");
3117 #endif
3118 std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
3119 RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP( hwmVecNewext, mergeCPDataArgs));
3120 RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP( hwmVecOldext, mergeCPDataArgs));
3121 //flushVMCache();
3122 #ifdef PROFILE
3123 timer.stop("flushVMCache");
3124 #endif
3125 }
3126
3127 #ifdef PROFILE
3128 timer.finish();
3129 #endif
3130 return rc;
3131 }
3132
insertColumnRec_Single(const TxnID & txnid,ColStructList & colStructList,ColValueList & colValueList,DctnryStructList & dctnryStructList,DictStrList & dictStrList,const int32_t tableOid)3133 int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid,
3134 ColStructList& colStructList,
3135 ColValueList& colValueList,
3136 DctnryStructList& dctnryStructList,
3137 DictStrList& dictStrList,
3138 const int32_t tableOid)
3139 {
3140 int rc;
3141 RID* rowIdArray = NULL;
3142 ColTupleList curTupleList;
3143 Column curCol;
3144 ColStruct curColStruct;
3145 ColValueList colOldValueList;
3146 ColValueList colNewValueList;
3147 ColStructList newColStructList;
3148 DctnryStructList newDctnryStructList;
3149 HWM hwm = 0;
3150 HWM newHwm = 0;
3151 HWM oldHwm = 0;
3152 ColTupleList::size_type totalRow;
3153 ColStructList::size_type totalColumns;
3154 uint64_t rowsLeft = 0;
3155 bool newExtent = false;
3156 RIDList ridList;
3157 ColumnOp* colOp = NULL;
3158 uint32_t i = 0;
3159
3160 #ifdef PROFILE
3161 StopWatch timer;
3162 #endif
3163
3164 // debug information for testing
3165 if (isDebug(DEBUG_2))
3166 {
3167 printf("\nIn wrapper insert\n");
3168 printInputValue(colStructList, colValueList, ridList);
3169 }
3170
3171 // end
3172
3173 //Convert data type and column width to write engine specific
3174 for (i = 0; i < colStructList.size(); i++)
3175 Convertor::convertColType(&colStructList[i]);
3176
3177 uint32_t colId = 0;
3178 // MCOL-1675: find the smallest column width to calculate the RowID from so
3179 // that all HWMs will be incremented by this operation
3180 findSmallestColumn(colId, colStructList);
3181
3182 rc = checkValid(txnid, colStructList, colValueList, ridList);
3183
3184 if (rc != NO_ERROR)
3185 return rc;
3186
3187 setTransId(txnid);
3188
3189 curTupleList = static_cast<ColTupleList>(colValueList[0]);
3190 totalRow = curTupleList.size();
3191 totalColumns = colStructList.size();
3192 rowIdArray = new RID[totalRow];
3193 // use scoped_array to ensure ptr deletion regardless of where we return
3194 boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
3195 memset(rowIdArray, 0, (sizeof(RID)*totalRow));
3196
3197 //--------------------------------------------------------------------------
3198 // allocate row id(s)
3199 //--------------------------------------------------------------------------
3200 curColStruct = colStructList[colId];
3201 colOp = m_colOp[op(curColStruct.fCompressionType)];
3202
3203 colOp->initColumn(curCol);
3204
3205 //Get the correct segment, partition, column file
3206 uint16_t dbRoot;
3207 uint16_t segmentNum = 0;
3208 uint32_t partitionNum = 0;
3209 //Don't search for empty space, always append to the end. May revisit later
3210 dbRoot = curColStruct.fColDbRoot;
3211 int extState;
3212 bool bStartExtFound;
3213 bool bUseStartExtent = false;
3214 RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(
3215 curColStruct.dataOid, dbRoot, partitionNum, segmentNum, hwm,
3216 extState, bStartExtFound));
3217
3218 if ((bStartExtFound) && (extState == BRM::EXTENTAVAILABLE))
3219 bUseStartExtent = true;
3220
3221 for (i = 0; i < colStructList.size(); i++)
3222 {
3223 colStructList[i].fColPartition = partitionNum;
3224 colStructList[i].fColSegment = segmentNum;
3225 colStructList[i].fColDbRoot = dbRoot;
3226 }
3227
3228 for (i = 0; i < dctnryStructList.size(); i++)
3229 {
3230 dctnryStructList[i].fColPartition = partitionNum;
3231 dctnryStructList[i].fColSegment = segmentNum;
3232 dctnryStructList[i].fColDbRoot = dbRoot;
3233 }
3234
3235 oldHwm = hwm; //Save this info for rollback
3236 //need to pass real dbRoot, partition, and segment to setColParam
3237 colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType,
3238 curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
3239 dbRoot, partitionNum, segmentNum);
3240
3241 string segFile;
3242
3243 if (bUseStartExtent)
3244 {
3245 rc = colOp->openColumnFile(curCol, segFile, true); // @bug 5572 HDFS tmp file
3246
3247 if (rc != NO_ERROR)
3248 {
3249 return rc;
3250 }
3251 }
3252
3253 bool newFile;
3254
3255 #ifdef PROFILE
3256 timer.start("allocRowId");
3257 #endif
3258 newColStructList = colStructList;
3259 newDctnryStructList = dctnryStructList;
3260 std::vector<boost::shared_ptr<DBRootExtentTracker> > dbRootExtentTrackers;
3261 rc = colOp->allocRowId(txnid, bUseStartExtent,
3262 curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent,
3263 rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList,
3264 dbRootExtentTrackers, false, false, 0);
3265
3266 //--------------------------------------------------------------------------
3267 // Handle case where we ran out of disk space allocating a new extent.
3268 // Rollback extentmap and delete any db files that were created.
3269 //--------------------------------------------------------------------------
3270 if (rc != NO_ERROR)
3271 {
3272 if ((rc == ERR_FILE_DISK_SPACE) && newExtent)
3273 {
3274 vector<ExtentInfo> colExtentInfo;
3275 vector<ExtentInfo> dictExtentInfo;
3276 vector<ExtentInfo> fileInfo;
3277 ExtentInfo info;
3278
3279 for (i = 0; i < newColStructList.size(); i++)
3280 {
3281 info.oid = newColStructList[i].dataOid;
3282 info.partitionNum = newColStructList[i].fColPartition;
3283 info.segmentNum = newColStructList[i].fColSegment;
3284 info.dbRoot = newColStructList[i].fColDbRoot;
3285
3286 if (newFile)
3287 fileInfo.push_back (info);
3288
3289 colExtentInfo.push_back (info);
3290 }
3291
3292 int rc1 = BRMWrapper::getInstance()->deleteEmptyColExtents(colExtentInfo);
3293
3294 // Only rollback dictionary extents "if" store file is new
3295 if ((rc1 == 0) && newFile)
3296 {
3297 for (unsigned int j = 0; j < fileInfo.size(); j++)
3298 {
3299 // ignore return code and delete what we can
3300 rc1 = colOp->deleteFile(fileInfo[j].oid,
3301 fileInfo[j].dbRoot,
3302 fileInfo[j].partitionNum,
3303 fileInfo[j].segmentNum);
3304 }
3305
3306 fileInfo.clear();
3307
3308 for (i = 0; i < newDctnryStructList.size(); i++)
3309 {
3310 if (newDctnryStructList[i].dctnryOid > 0)
3311 {
3312 info.oid = newDctnryStructList[i].dctnryOid;
3313 info.partitionNum = newDctnryStructList[i].fColPartition;
3314 info.segmentNum = newDctnryStructList[i].fColSegment;
3315 info.dbRoot = newDctnryStructList[i].fColDbRoot;
3316 info.newFile = true;
3317 fileInfo.push_back (info);
3318 dictExtentInfo.push_back (info);
3319 }
3320 }
3321
3322 if (dictExtentInfo.size() > 0)
3323 {
3324 FileOp fileOp;
3325 rc1 = BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo);
3326
3327 if ( rc1 != NO_ERROR)
3328 return rc;
3329
3330 for (unsigned j = 0; j < fileInfo.size(); j++)
3331 {
3332 rc1 = fileOp.deleteFile(fileInfo[j].oid,
3333 fileInfo[j].dbRoot,
3334 fileInfo[j].partitionNum,
3335 fileInfo[j].segmentNum);
3336 }
3337 }
3338 }
3339 } // disk space error allocating new extent
3340
3341 return rc;
3342 } // rc != NO_ERROR from call to allocRowID()
3343
3344 #ifdef PROFILE
3345 timer.stop("allocRowId");
3346 #endif
3347
3348 TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableOid);
3349
3350 //--------------------------------------------------------------------------
3351 // Expand initial abbreviated extent if any RID in 1st extent is > 256K.
3352 // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
3353 //--------------------------------------------------------------------------
3354 // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
3355 if ((colStructList[colId].fColPartition == 0) &&
3356 (colStructList[colId].fColSegment == 0) &&
3357 ((totalRow - rowsLeft) > 0) &&
3358 (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
3359 {
3360 for (unsigned k=0; k<colStructList.size(); k++)
3361 {
3362 if (k == colId)
3363 continue;
3364 Column expandCol;
3365 colOp = m_colOp[op(colStructList[k].fCompressionType)];
3366 colOp->setColParam(expandCol, 0,
3367 colStructList[k].colWidth,
3368 colStructList[k].colDataType,
3369 colStructList[k].colType,
3370 colStructList[k].dataOid,
3371 colStructList[k].fCompressionType,
3372 colStructList[k].fColDbRoot,
3373 colStructList[k].fColPartition,
3374 colStructList[k].fColSegment);
3375 rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file
3376
3377 if (rc == NO_ERROR)
3378 {
3379 if (colOp->abbreviatedExtent(
3380 expandCol.dataFile.pFile, colStructList[k].colWidth))
3381 {
3382 rc = colOp->expandAbbrevExtent(expandCol);
3383 }
3384 }
3385
3386 colOp->clearColumn(expandCol); // closes the file
3387
3388 if (rc != NO_ERROR)
3389 {
3390 return rc;
3391 }
3392 } // loop through columns
3393 } // if starting extent needs to be expanded
3394
3395 //--------------------------------------------------------------------------
3396 // Tokenize data if needed
3397 //--------------------------------------------------------------------------
3398 dictStr::iterator dctStr_iter;
3399 ColTupleList::iterator col_iter;
3400
3401 for (unsigned i = 0; i < colStructList.size(); i++)
3402 {
3403 if (colStructList[i].tokenFlag)
3404 {
3405 dctStr_iter = dictStrList[i].begin();
3406 col_iter = colValueList[i].begin();
3407 Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
3408
3409 ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
3410 ColExtsInfo::iterator it = aColExtsInfo.begin();
3411
3412 if (bUseStartExtent)
3413 {
3414 rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
3415 dctnryStructList[i].fColDbRoot,
3416 dctnryStructList[i].fColPartition,
3417 dctnryStructList[i].fColSegment,
3418 true); // @bug 5572 HDFS tmp file
3419
3420 if (rc != NO_ERROR)
3421 return rc;
3422
3423 while (it != aColExtsInfo.end())
3424 {
3425 if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment))
3426 break;
3427
3428 it++;
3429 }
3430
3431 if (it == aColExtsInfo.end()) //add this one to the list
3432 {
3433 ColExtInfo aExt;
3434 aExt.dbRoot = dctnryStructList[i].fColDbRoot;
3435 aExt.partNum = dctnryStructList[i].fColPartition;
3436 aExt.segNum = dctnryStructList[i].fColSegment;
3437 aExt.compType = dctnryStructList[i].fCompressionType;
3438 aExt.isDict = true;
3439 aColExtsInfo.push_back(aExt);
3440 aTableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
3441 }
3442
3443
3444 for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
3445 {
3446 if (dctStr_iter->length() == 0)
3447 {
3448 Token nullToken;
3449 col_iter->data = nullToken;
3450 }
3451 else
3452 {
3453 #ifdef PROFILE
3454 timer.start("tokenize");
3455 #endif
3456 DctnryTuple dctTuple;
3457 dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
3458 dctTuple.sigSize = dctStr_iter->length();
3459 dctTuple.isNull = false;
3460 rc = tokenize(txnid,
3461 dctTuple,
3462 dctnryStructList[i].fCompressionType);
3463
3464 if (rc != NO_ERROR)
3465 {
3466 dctnry->closeDctnry();
3467 return rc;
3468 }
3469
3470 #ifdef PROFILE
3471 timer.stop("tokenize");
3472 #endif
3473 col_iter->data = dctTuple.token;
3474 }
3475
3476 dctStr_iter++;
3477 col_iter++;
3478 }
3479
3480 //close dictionary files
3481 rc = dctnry->closeDctnry();
3482
3483 if (rc != NO_ERROR)
3484 return rc;
3485 } // tokenize dictionary rows in 1st extent
3486
3487 if (newExtent)
3488 {
3489 rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid,
3490 newDctnryStructList[i].fColDbRoot,
3491 newDctnryStructList[i].fColPartition,
3492 newDctnryStructList[i].fColSegment,
3493 false); // @bug 5572 HDFS tmp file
3494
3495 if (rc != NO_ERROR)
3496 return rc;
3497
3498 aColExtsInfo = aTableMetaData->getColExtsInfo(newDctnryStructList[i].dctnryOid);
3499 it = aColExtsInfo.begin();
3500
3501 while (it != aColExtsInfo.end())
3502 {
3503 if ((it->dbRoot == newDctnryStructList[i].fColDbRoot) && (it->partNum == newDctnryStructList[i].fColPartition) && (it->segNum == newDctnryStructList[i].fColSegment))
3504 break;
3505
3506 it++;
3507 }
3508
3509 if (it == aColExtsInfo.end()) //add this one to the list
3510 {
3511 ColExtInfo aExt;
3512 aExt.dbRoot = newDctnryStructList[i].fColDbRoot;
3513 aExt.partNum = newDctnryStructList[i].fColPartition;
3514 aExt.segNum = newDctnryStructList[i].fColSegment;
3515 aExt.compType = newDctnryStructList[i].fCompressionType;
3516 aExt.isDict = true;
3517 aColExtsInfo.push_back(aExt);
3518 aTableMetaData->setColExtsInfo(newDctnryStructList[i].dctnryOid, aColExtsInfo);
3519 }
3520
3521 for (uint32_t rows = 0; rows < rowsLeft; rows++)
3522 {
3523 if (dctStr_iter->length() == 0)
3524 {
3525 Token nullToken;
3526 col_iter->data = nullToken;
3527 }
3528 else
3529 {
3530 #ifdef PROFILE
3531 timer.start("tokenize");
3532 #endif
3533 DctnryTuple dctTuple;
3534 dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
3535 dctTuple.sigSize = dctStr_iter->length();
3536 dctTuple.isNull = false;
3537 rc = tokenize(txnid,
3538 dctTuple,
3539 newDctnryStructList[i].fCompressionType);
3540
3541 if (rc != NO_ERROR)
3542 {
3543 dctnry->closeDctnry();
3544 return rc;
3545 }
3546
3547 #ifdef PROFILE
3548 timer.stop("tokenize");
3549 #endif
3550 col_iter->data = dctTuple.token;
3551 }
3552
3553 dctStr_iter++;
3554 col_iter++;
3555 }
3556
3557 //close dictionary files
3558 rc = dctnry->closeDctnry();
3559
3560 if (rc != NO_ERROR)
3561 return rc;
3562 } // tokenize dictionary rows in second extent
3563 } // tokenize dictionary columns
3564 } // loop through columns to see which ones need tokenizing
3565
3566 //----------------------------------------------------------------------
3567 // Update column info structure @Bug 1862 set hwm, and
3568 // Prepare ValueList for new extent (if applicable)
3569 //----------------------------------------------------------------------
3570 //@Bug 2205 Check whether all rows go to the new extent
3571 RID lastRid = 0;
3572 RID lastRidNew = 0;
3573
3574 if (totalRow - rowsLeft > 0)
3575 {
3576 lastRid = rowIdArray[totalRow - rowsLeft - 1];
3577 lastRidNew = rowIdArray[totalRow - 1];
3578 }
3579 else
3580 {
3581 lastRid = 0;
3582 lastRidNew = rowIdArray[totalRow - 1];
3583 }
3584
3585 //cout << "rowid allocated is " << lastRid << endl;
3586 //if a new extent is created, all the columns in this table should
3587 //have their own new extent
3588
3589 //@Bug 1701. Close the file
3590 if (bUseStartExtent)
3591 {
3592 m_colOp[op(curCol.compressionType)]->clearColumn(curCol);
3593 }
3594
3595 std::vector<BulkSetHWMArg> hwmVecNewext;
3596 std::vector<BulkSetHWMArg> hwmVecOldext;
3597
3598 if (newExtent) //Save all hwms to set them later.
3599 {
3600 BulkSetHWMArg aHwmEntryNew;
3601 BulkSetHWMArg aHwmEntryOld;
3602
3603 bool succFlag = false;
3604 unsigned colWidth = 0;
3605 int curFbo = 0, curBio;
3606
3607 for (i = 0; i < totalColumns; i++)
3608 {
3609 colOp = m_colOp[op(newColStructList[i].fCompressionType)];
3610
3611 // @Bug 2714 need to set hwm for the old extent
3612 colWidth = colStructList[i].colWidth;
3613 succFlag = colOp->calculateRowId(lastRid,
3614 BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
3615
3616 //cout << "insertcolumnrec oid:rid:fbo:hwm = " <<
3617 //colStructList[i].dataOid << ":" << lastRid << ":" <<
3618 //curFbo << ":" << hwm << endl;
3619 if (succFlag)
3620 {
3621 if ((HWM)curFbo > oldHwm)
3622 {
3623 aHwmEntryOld.oid = colStructList[i].dataOid;
3624 aHwmEntryOld.partNum = colStructList[i].fColPartition;
3625 aHwmEntryOld.segNum = colStructList[i].fColSegment;
3626 aHwmEntryOld.hwm = curFbo;
3627 hwmVecOldext.push_back(aHwmEntryOld);
3628 }
3629 }
3630 else
3631 return ERR_INVALID_PARAM;
3632
3633 colWidth = newColStructList[i].colWidth;
3634 succFlag = colOp->calculateRowId(lastRidNew,
3635 BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
3636
3637 if (succFlag)
3638 {
3639 aHwmEntryNew.oid = newColStructList[i].dataOid;
3640 aHwmEntryNew.partNum = newColStructList[i].fColPartition;
3641 aHwmEntryNew.segNum = newColStructList[i].fColSegment;
3642 aHwmEntryNew.hwm = curFbo;
3643 hwmVecNewext.push_back(aHwmEntryNew);
3644 }
3645 }
3646
3647 //----------------------------------------------------------------------
3648 // Prepare the valuelist for the new extent
3649 //----------------------------------------------------------------------
3650 ColTupleList colTupleList;
3651 ColTupleList newColTupleList;
3652 ColTupleList firstPartTupleList;
3653
3654 for (unsigned i = 0; i < totalColumns; i++)
3655 {
3656 colTupleList = static_cast<ColTupleList>(colValueList[i]);
3657
3658 for (uint64_t j = rowsLeft; j > 0; j--)
3659 {
3660 newColTupleList.push_back(colTupleList[totalRow - j]);
3661 }
3662
3663 colNewValueList.push_back(newColTupleList);
3664
3665 newColTupleList.clear();
3666
3667 //upate the oldvalue list for the old extent
3668 for (uint64_t j = 0; j < (totalRow - rowsLeft); j++)
3669 {
3670 firstPartTupleList.push_back(colTupleList[j]);
3671 }
3672
3673 colOldValueList.push_back(firstPartTupleList);
3674 firstPartTupleList.clear();
3675 }
3676 }
3677
3678 //--------------------------------------------------------------------------
3679 //Mark extents invalid
3680 //--------------------------------------------------------------------------
3681 vector<BRM::LBID_t> lbids;
3682 vector<CalpontSystemCatalog::ColDataType> colDataTypes;
3683 bool successFlag = true;
3684 unsigned width = 0;
3685 //BRM::LBID_t lbid;
3686 int curFbo = 0, curBio, lastFbo = -1;
3687 /* if (totalRow-rowsLeft > 0)
3688 {
3689 for (unsigned i = 0; i < colStructList.size(); i++)
3690 {
3691 colOp = m_colOp[op(colStructList[i].fCompressionType)];
3692 width = colStructList[i].colWidth;
3693 successFlag = colOp->calculateRowId(lastRid ,
3694 BYTE_PER_BLOCK/width, width, curFbo, curBio);
3695 if (successFlag) {
3696 if (curFbo != lastFbo) {
3697 RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
3698 colStructList[i].dataOid,
3699 colStructList[i].fColPartition,
3700 colStructList[i].fColSegment, curFbo, lbid));
3701 lbids.push_back((BRM::LBID_t)lbid);
3702 colDataTypes.push_back(colStructList[i].colDataType);
3703 }
3704 }
3705 }
3706 }
3707 */
3708 lastRid = rowIdArray[totalRow - 1];
3709
3710 for (unsigned i = 0; i < colStructList.size(); i++)
3711 {
3712 colOp = m_colOp[op(colStructList[i].fCompressionType)];
3713 width = colStructList[i].colWidth;
3714 successFlag = colOp->calculateRowId(lastRid,
3715 BYTE_PER_BLOCK / width, width, curFbo, curBio);
3716
3717 if (successFlag)
3718 {
3719 if (curFbo != lastFbo)
3720 {
3721 colDataTypes.push_back(colStructList[i].colDataType);
3722 RETURN_ON_ERROR(AddLBIDtoList(txnid,
3723 lbids,
3724 colDataTypes,
3725 colStructList[i],
3726 curFbo));
3727 }
3728 }
3729 }
3730
3731 //cout << "lbids size = " << lbids.size()<< endl;
3732 if (lbids.size() > 0)
3733 rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
3734
3735 //--------------------------------------------------------------------------
3736 // Write row(s) to database file(s)
3737 //--------------------------------------------------------------------------
3738 #ifdef PROFILE
3739 timer.start("writeColumnRec");
3740 #endif
3741
3742 if (rc == NO_ERROR)
3743 {
3744 if (newExtent)
3745 {
3746 rc = writeColumnRec(txnid, colStructList, colOldValueList,
3747 rowIdArray, newColStructList, colNewValueList, tableOid,
3748 false); // @bug 5572 HDFS tmp file
3749 }
3750 else
3751 {
3752 rc = writeColumnRec(txnid, colStructList, colValueList,
3753 rowIdArray, newColStructList, colNewValueList, tableOid,
3754 true); // @bug 5572 HDFS tmp file
3755 }
3756 }
3757
3758 #ifdef PROFILE
3759 timer.stop("writeColumnRec");
3760 #endif
3761 // for (ColTupleList::size_type i = 0; i < totalRow; i++)
3762 // ridList.push_back((RID) rowIdArray[i]);
3763
3764 // if (rc == NO_ERROR)
3765 // rc = flushDataFiles(NO_ERROR);
3766
3767 //--------------------------------------------------------------------------
3768 // Update BRM
3769 //--------------------------------------------------------------------------
3770 if ( !newExtent )
3771 {
3772 //flushVMCache();
3773 bool succFlag = false;
3774 unsigned colWidth = 0;
3775 int extState;
3776 bool extFound;
3777 int curFbo = 0, curBio;
3778 std::vector<BulkSetHWMArg> hwmVec;
3779
3780 for (unsigned i = 0; i < totalColumns; i++)
3781 {
3782 //colOp = m_colOp[op(colStructList[i].fCompressionType)];
3783 //Set all columns hwm together
3784 BulkSetHWMArg aHwmEntry;
3785 RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(
3786 colStructList[i].dataOid,
3787 colStructList[i].fColDbRoot,
3788 colStructList[i].fColPartition,
3789 colStructList[i].fColSegment,
3790 hwm,
3791 extState, extFound));
3792 colWidth = colStructList[i].colWidth;
3793 succFlag = colOp->calculateRowId(lastRid,
3794 BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
3795
3796 //cout << "insertcolumnrec oid:rid:fbo:hwm = " <<
3797 //colStructList[i].dataOid << ":" << lastRid << ":" <<
3798 //curFbo << ":" << hwm << endl;
3799 if (succFlag)
3800 {
3801 if ((HWM)curFbo > hwm)
3802 {
3803 aHwmEntry.oid = colStructList[i].dataOid;
3804 aHwmEntry.partNum = colStructList[i].fColPartition;
3805 aHwmEntry.segNum = colStructList[i].fColSegment;
3806 aHwmEntry.hwm = curFbo;
3807 hwmVec.push_back(aHwmEntry);
3808 }
3809 }
3810 else
3811 return ERR_INVALID_PARAM;
3812 }
3813
3814 std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
3815 RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(
3816 hwmVec, mergeCPDataArgs));
3817 }
3818 else // if (newExtent)
3819 {
3820 #ifdef PROFILE
3821 timer.start("flushVMCache");
3822 #endif
3823 std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
3824
3825 if (hwmVecNewext.size() > 0)
3826 RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(
3827 hwmVecNewext, mergeCPDataArgs));
3828
3829 if (hwmVecOldext.size() > 0)
3830 RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(
3831 hwmVecOldext, mergeCPDataArgs));
3832
3833 //flushVMCache();
3834 #ifdef PROFILE
3835 timer.stop("flushVMCache");
3836 #endif
3837 }
3838
3839 #ifdef PROFILE
3840 timer.finish();
3841 #endif
3842 //flush PrimProc FD cache moved to we_dmlcommandproc.cpp
3843 /* ColsExtsInfoMap colsExtsInfoMap = aTableMetaData->getColsExtsInfoMap();
3844 ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
3845 ColExtsInfo::iterator aIt;
3846 std::vector<BRM::FileInfo> files;
3847 BRM::FileInfo aFile;
3848 while (it != colsExtsInfoMap.end())
3849 {
3850 aIt = (it->second).begin();
3851 aFile.oid = it->first;
3852 //cout << "OID:" << aArg.oid;
3853 while (aIt != (it->second).end())
3854 {
3855 aFile.partitionNum = aIt->partNum;
3856 aFile.dbRoot =aIt->dbRoot;
3857 aFile.segmentNum = aIt->segNum;
3858 aFile.compType = aIt->compType;
3859 files.push_back(aFile);
3860 //cout <<"Added to files oid:dbroot:part:seg:compType = " << aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
3861 //<<":"<<aFile.compType <<endl;
3862 aIt++;
3863 }
3864 it++;
3865 }
3866 if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
3867 cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
3868 TableMetaData::removeTableMetaData(tableOid); */
3869 return rc;
3870 }
3871
3872 /*@brief printInputValue - Print input value
3873 */
3874 /***********************************************************
3875 * DESCRIPTION:
3876 * Print input value
3877 * PARAMETERS:
3878 * tableOid - table object id
3879 * colStructList - column struct list
3880 * colValueList - column value list
3881 * ridList - RID list
3882 * RETURN:
3883 * none
3884 ***********************************************************/
printInputValue(const ColStructList & colStructList,const ColValueList & colValueList,const RIDList & ridList) const3885 void WriteEngineWrapper::printInputValue(const ColStructList& colStructList,
3886 const ColValueList& colValueList,
3887 const RIDList& ridList) const
3888 {
3889 ColTupleList curTupleList;
3890 ColStruct curColStruct;
3891 ColTuple curTuple;
3892 string curStr;
3893 ColStructList::size_type i;
3894 ColTupleList::size_type j;
3895
3896 printf("\n=========================\n");
3897 // printf("\nTable OID : %d \n", tableOid);
3898
3899 printf("\nTotal RIDs: %zu\n", ridList.size());
3900
3901 for (i = 0; i < ridList.size(); i++)
3902 cout << "RID[" << i << "] : " << ridList[i] << "\n";
3903
3904 printf("\nTotal Columns: %zu\n", colStructList.size());
3905
3906
3907 for (i = 0; i < colStructList.size(); i++)
3908 {
3909 curColStruct = colStructList[i];
3910 curTupleList = colValueList[i];
3911
3912 printf("\nColumn[%zu]", i);
3913 printf("\nData file OID : %d \t", curColStruct.dataOid);
3914 printf("\tWidth : %d \t Type: %d", curColStruct.colWidth, curColStruct.colDataType);
3915 printf("\nTotal values : %zu \n", curTupleList.size());
3916
3917 for (j = 0; j < curTupleList.size(); j++)
3918 {
3919 curTuple = curTupleList[j];
3920
3921 try
3922 {
3923 if (curTuple.data.type() == typeid(int))
3924 curStr = boost::lexical_cast<string>(boost::any_cast<int>(curTuple.data));
3925 else if (curTuple.data.type() == typeid(float))
3926 curStr = boost::lexical_cast<string>(boost::any_cast<float>(curTuple.data));
3927 else if (curTuple.data.type() == typeid(long long))
3928 curStr = boost::lexical_cast<string>(boost::any_cast<long long>(curTuple.data));
3929 else if (curTuple.data.type() == typeid(double))
3930 curStr = boost::lexical_cast<string>(boost::any_cast<double>(curTuple.data));
3931 // else
3932 // if (curTuple.data.type() == typeid(bool))
3933 // curStr = boost::lexical_cast<string>(boost::any_cast<bool>(curTuple.data));
3934 else if (curTuple.data.type() == typeid(short))
3935 curStr = boost::lexical_cast<string>(boost::any_cast<short>(curTuple.data));
3936 else if (curTuple.data.type() == typeid(char))
3937 curStr = boost::lexical_cast<string>(boost::any_cast<char>(curTuple.data));
3938 else
3939 curStr = boost::any_cast<string>(curTuple.data);
3940 }
3941 catch (...)
3942 {
3943 }
3944
3945 if (isDebug(DEBUG_3))
3946 printf("Value[%zu] : %s\n", j, curStr.c_str());
3947 }
3948
3949 }
3950
3951 printf("\n=========================\n");
3952 }
3953
3954 /***********************************************************
3955 * DESCRIPTION:
3956 * Process version buffer before any write operation
3957 * PARAMETERS:
3958 * txnid - transaction id
3959 * oid - column oid
3960 * totalRow - total number of rows
3961 * rowIdArray - rowid array
3962 * RETURN:
3963 * NO_ERROR if success
3964 * others if something wrong in inserting the value
3965 ***********************************************************/
processVersionBuffer(IDBDataFile * pFile,const TxnID & txnid,const ColStruct & colStruct,int width,int totalRow,const RID * rowIdArray,vector<LBIDRange> & rangeList)3966 int WriteEngineWrapper::processVersionBuffer(IDBDataFile* pFile, const TxnID& txnid,
3967 const ColStruct& colStruct, int width,
3968 int totalRow, const RID* rowIdArray, vector<LBIDRange>& rangeList)
3969 {
3970 if (idbdatafile::IDBPolicy::useHdfs())
3971 return 0;
3972
3973 RID curRowId;
3974 int rc = NO_ERROR;
3975 int curFbo = 0, curBio, lastFbo = -1;
3976 bool successFlag;
3977 BRM::LBID_t lbid;
3978 BRM::VER_t verId = (BRM::VER_t) txnid;
3979 vector<uint32_t> fboList;
3980 LBIDRange range;
3981 ColumnOp* colOp = m_colOp[op(colStruct.fCompressionType)];
3982
3983 for (int i = 0; i < totalRow; i++)
3984 {
3985 curRowId = rowIdArray[i];
3986 //cout << "processVersionBuffer got rid " << curRowId << endl;
3987 successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / width, width, curFbo, curBio);
3988
3989 if (successFlag)
3990 {
3991 if (curFbo != lastFbo)
3992 {
3993 //cout << "processVersionBuffer is processing lbid " << lbid << endl;
3994 RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
3995 colStruct.dataOid, colStruct.fColPartition, colStruct.fColSegment, curFbo, lbid));
3996 //cout << "processVersionBuffer is processing lbid " << lbid << endl;
3997 fboList.push_back((uint32_t)curFbo);
3998 range.start = lbid;
3999 range.size = 1;
4000 rangeList.push_back(range);
4001 }
4002
4003 lastFbo = curFbo;
4004 }
4005 }
4006
4007 std::vector<VBRange> freeList;
4008 rc = BRMWrapper::getInstance()->
4009 writeVB(pFile, verId, colStruct.dataOid, fboList, rangeList, colOp, freeList, colStruct.fColDbRoot);
4010
4011 return rc;
4012 }
4013
processVersionBuffers(IDBDataFile * pFile,const TxnID & txnid,const ColStruct & colStruct,int width,int totalRow,const RIDList & ridList,vector<LBIDRange> & rangeList)4014 int WriteEngineWrapper::processVersionBuffers(IDBDataFile* pFile, const TxnID& txnid,
4015 const ColStruct& colStruct, int width,
4016 int totalRow, const RIDList& ridList,
4017 vector<LBIDRange>& rangeList)
4018 {
4019 if (idbdatafile::IDBPolicy::useHdfs())
4020 return 0;
4021
4022 RID curRowId;
4023 int rc = NO_ERROR;
4024 int curFbo = 0, curBio, lastFbo = -1;
4025 bool successFlag;
4026 BRM::LBID_t lbid;
4027 BRM::VER_t verId = (BRM::VER_t) txnid;
4028 LBIDRange range;
4029 vector<uint32_t> fboList;
4030 //vector<LBIDRange> rangeList;
4031 ColumnOp* colOp = m_colOp[op(colStruct.fCompressionType)];
4032
4033 for (int i = 0; i < totalRow; i++)
4034 {
4035 curRowId = ridList[i];
4036 //cout << "processVersionBuffer got rid " << curRowId << endl;
4037 successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / width, width, curFbo, curBio);
4038
4039 if (successFlag)
4040 {
4041 if (curFbo != lastFbo)
4042 {
4043 //cout << "processVersionBuffer is processing lbid " << lbid << endl;
4044 RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
4045 colStruct.dataOid, colStruct.fColPartition, colStruct.fColSegment, curFbo, lbid));
4046 //cout << "processVersionBuffer is processing lbid " << lbid << endl;
4047 fboList.push_back((uint32_t)curFbo);
4048 range.start = lbid;
4049 range.size = 1;
4050 rangeList.push_back(range);
4051 }
4052
4053 lastFbo = curFbo;
4054 }
4055 }
4056
4057 //cout << "calling writeVB with blocks " << rangeList.size() << endl;
4058 std::vector<VBRange> freeList;
4059 rc = BRMWrapper::getInstance()->
4060 writeVB(pFile, verId, colStruct.dataOid, fboList, rangeList, colOp, freeList, colStruct.fColDbRoot);
4061
4062 return rc;
4063 }
4064
processBeginVBCopy(const TxnID & txnid,const vector<ColStruct> & colStructList,const RIDList & ridList,std::vector<VBRange> & freeList,vector<vector<uint32_t>> & fboLists,vector<vector<LBIDRange>> & rangeLists,vector<LBIDRange> & rangeListTot)4065 int WriteEngineWrapper::processBeginVBCopy(const TxnID& txnid, const vector<ColStruct>& colStructList, const RIDList& ridList,
4066 std::vector<VBRange>& freeList, vector<vector<uint32_t> >& fboLists, vector<vector<LBIDRange> >& rangeLists,
4067 vector<LBIDRange>& rangeListTot)
4068 {
4069 if (idbdatafile::IDBPolicy::useHdfs())
4070 return 0;
4071
4072 RID curRowId;
4073 int rc = NO_ERROR;
4074 int curFbo = 0, curBio, lastFbo = -1;
4075 bool successFlag;
4076 BRM::LBID_t lbid;
4077 LBIDRange range;
4078
4079 //StopWatch timer;
4080 // timer.start("calculation");
4081 for (uint32_t j = 0; j < colStructList.size(); j++)
4082 {
4083 vector<uint32_t> fboList;
4084 vector<LBIDRange> rangeList;
4085 lastFbo = -1;
4086 ColumnOp* colOp = m_colOp[op(colStructList[j].fCompressionType)];
4087
4088 ColStruct curColStruct = colStructList[j];
4089 Convertor::convertColType(&curColStruct);
4090
4091 for (uint32_t i = 0; i < ridList.size(); i++)
4092 {
4093 curRowId = ridList[i];
4094 //cout << "processVersionBuffer got rid " << curRowId << endl;
4095 successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / curColStruct.colWidth, curColStruct.colWidth, curFbo, curBio);
4096
4097 if (successFlag)
4098 {
4099 if (curFbo != lastFbo)
4100 {
4101 //cout << "processVersionBuffer is processing curFbo " << curFbo << endl;
4102 RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
4103 colStructList[j].dataOid, colStructList[j].fColPartition, colStructList[j].fColSegment, curFbo, lbid));
4104 //cout << "beginVBCopy is processing lbid:transaction " << lbid <<":"<<txnid<< endl;
4105 fboList.push_back((uint32_t)curFbo);
4106 range.start = lbid;
4107 range.size = 1;
4108 rangeList.push_back(range);
4109 }
4110
4111 lastFbo = curFbo;
4112 }
4113 }
4114
4115 BRMWrapper::getInstance()->pruneLBIDList(txnid, &rangeList, &fboList);
4116 rangeLists.push_back(rangeList);
4117
4118 fboLists.push_back(fboList);
4119 rangeListTot.insert(rangeListTot.end(), rangeList.begin(), rangeList.end());
4120 }
4121
4122 if (rangeListTot.size() > 0)
4123 rc = BRMWrapper::getInstance()->getDbrmObject()->beginVBCopy(txnid, colStructList[0].fColDbRoot, rangeListTot, freeList);
4124
4125 //timer.stop("beginVBCopy");
4126 //timer.finish();
4127 return rc;
4128 }
4129
4130 /**
4131 * @brief Process versioning for batch insert - only version the hwm block.
4132 */
4133 #if 0
4134 int WriteEngineWrapper::processBatchVersions(const TxnID& txnid, std::vector<Column> columns, std::vector<BRM::LBIDRange>& rangeList)
4135 {
4136 int rc = 0;
4137 std::vector<DbFileOp*> fileOps;
4138
4139 //open the column files
4140 for ( unsigned i = 0; i < columns.size(); i++)
4141 {
4142 ColumnOp* colOp = m_colOp[op(columns[i].compressionType)];
4143 Column curCol;
4144 // set params
4145 colOp->initColumn(curCol);
4146 ColType colType;
4147 Convertor::convertColType(columns[i].colDataType, colType);
4148 colOp->setColParam(curCol, 0, columns[i].colWidth,
4149 columns[i].colDataType, colType, columns[i].dataFile.oid,
4150 columns[i].compressionType,
4151 columns[i].dataFile.fDbRoot, columns[i].dataFile.fPartition, columns[i].dataFile.fSegment);
4152 string segFile;
4153 rc = colOp->openColumnFile(curCol, segFile, IO_BUFF_SIZE);
4154
4155 if (rc != NO_ERROR)
4156 break;
4157
4158 columns[i].dataFile.pFile = curCol.dataFile.pFile;
4159 fileOps.push_back(colOp);
4160 }
4161
4162 if ( rc == 0)
4163 {
4164 BRM::VER_t verId = (BRM::VER_t) txnid;
4165 rc = BRMWrapper::getInstance()->writeBatchVBs(verId, columns, rangeList, fileOps);
4166 }
4167
4168 //close files
4169 for ( unsigned i = 0; i < columns.size(); i++)
4170 {
4171 ColumnOp* colOp = dynamic_cast<ColumnOp*> (fileOps[i]);
4172 Column curCol;
4173 // set params
4174 colOp->initColumn(curCol);
4175 ColType colType;
4176 Convertor::convertColType(columns[i].colDataType, colType);
4177 colOp->setColParam(curCol, 0, columns[i].colWidth,
4178 columns[i].colDataType, colType, columns[i].dataFile.oid,
4179 columns[i].compressionType,
4180 columns[i].dataFile.fDbRoot, columns[i].dataFile.fPartition, columns[i].dataFile.fSegment);
4181 curCol.dataFile.pFile = columns[i].dataFile.pFile;
4182 colOp->clearColumn(curCol);
4183 }
4184
4185 return rc;
4186 }
4187 #endif
writeVBEnd(const TxnID & txnid,std::vector<BRM::LBIDRange> & rangeList)4188 void WriteEngineWrapper::writeVBEnd(const TxnID& txnid, std::vector<BRM::LBIDRange>& rangeList)
4189 {
4190 if (idbdatafile::IDBPolicy::useHdfs())
4191 return;
4192
4193 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4194 }
4195
updateColumnRec(const TxnID & txnid,vector<ColStructList> & colExtentsStruct,ColValueList & colValueList,vector<void * > & colOldValueList,vector<RIDList> & ridLists,vector<DctnryStructList> & dctnryExtentsStruct,DctnryValueList & dctnryValueList,const int32_t tableOid)4196 int WriteEngineWrapper::updateColumnRec(const TxnID& txnid,
4197 vector<ColStructList>& colExtentsStruct,
4198 ColValueList& colValueList,
4199 vector<void*>& colOldValueList,
4200 vector<RIDList>& ridLists,
4201 vector<DctnryStructList>& dctnryExtentsStruct,
4202 DctnryValueList& dctnryValueList,
4203 const int32_t tableOid)
4204 {
4205 int rc = 0;
4206 //RID* rowIdArray = NULL;
4207 //RIDList::size_type i;
4208 unsigned numExtents = colExtentsStruct.size();
4209 // ColValueList tmpColValueList;
4210 RIDList::const_iterator ridsIter;
4211 ColStructList colStructList;
4212 DctnryStructList dctnryStructList;
4213 ColumnOp* colOp = NULL;
4214
4215 for (unsigned extent = 0; extent < numExtents; extent++)
4216 {
4217 ridsIter = ridLists[extent].begin();
4218
4219 //rowIdArray = (RID*)calloc(sizeof(RID), ridLists[extent].size());
4220
4221 colStructList = colExtentsStruct[extent];
4222 dctnryStructList = dctnryExtentsStruct[extent];
4223
4224 if (m_opType != DELETE)
4225 {
4226
4227 /* ColTuple colTuple;
4228 ColTupleList colTupleList;
4229 for (i=0; i < colValueList.size(); i++)
4230 {
4231 colTupleList = colValueList[i];
4232 colTuple = colTupleList[0];
4233 for (unsigned i = 1; i < ridLists[extent].size(); i++)
4234 {
4235 colTupleList.push_back(colTuple);
4236 }
4237 tmpColValueList.push_back(colTupleList);
4238 }
4239 */
4240 //Tokenize data if needed
4241 vector<Token> tokenList;
4242
4243 DctColTupleList::iterator dctCol_iter;
4244 ColTupleList::iterator col_iter;
4245
4246 for (unsigned i = 0; i < colStructList.size(); i++)
4247 {
4248 if (colStructList[i].tokenFlag)
4249 {
4250 // only need to tokenize once
4251 dctCol_iter = dctnryValueList[i].begin();
4252 //col_iter = colValueList[i].begin();
4253 Token token;
4254
4255 if (!dctCol_iter->isNull)
4256 {
4257 RETURN_ON_ERROR(tokenize(
4258 txnid, dctnryStructList[i], *dctCol_iter, true)); // @bug 5572 HDFS tmp file
4259 token = dctCol_iter->token;
4260
4261 #ifdef PROFILE
4262 //timer.stop("tokenize");
4263 #endif
4264 }
4265 else
4266 {
4267 //if (dctnryStructList[i].dctnryOid == 2001)
4268 // std::cout << " got null token for string " << dctCol_iter->sigValue <<std::endl;
4269 }
4270
4271 //if (dctnryStructList[i].dctnryOid == 2001)
4272 //std::cout << " got token for string " << dctCol_iter->sigValue << " op:fbo = " << token.op <<":"<<token.fbo << std::endl;
4273 tokenList.push_back(token);
4274 }
4275 }
4276
4277 int dicPos = 0;
4278
4279 for (unsigned i = 0; i < colStructList.size(); i++)
4280 {
4281 if (colStructList[i].tokenFlag)
4282 {
4283 // only need to tokenize once
4284 col_iter = colValueList[i].begin();
4285
4286 while (col_iter != colValueList[i].end())
4287 {
4288 col_iter->data = tokenList[dicPos];
4289 col_iter++;
4290 }
4291
4292 dicPos++;
4293 }
4294 }
4295 }
4296
4297 RIDList::iterator rid_iter;
4298 /* i = 0;
4299 while (rid_iter != ridLists[extent].end())
4300 {
4301 rowIdArray[i] = *rid_iter;
4302 rid_iter++;
4303 i++;
4304 }
4305 */
4306 //Mark extents invalid
4307 //if (colStructList[0].dataOid < 3000) {
4308 vector<BRM::LBID_t> lbids;
4309 vector<CalpontSystemCatalog::ColDataType> colDataTypes;
4310 bool successFlag = true;
4311 unsigned width = 0;
4312 int curFbo = 0, curBio, lastFbo = -1;
4313 rid_iter = ridLists[extent].begin();
4314 RID aRid = *rid_iter;
4315
4316 for (unsigned j = 0; j < colStructList.size(); j++)
4317 {
4318 colOp = m_colOp[op(colStructList[j].fCompressionType)];
4319
4320 if (colStructList[j].tokenFlag)
4321 continue;
4322
4323 width = colOp->getCorrectRowWidth(colStructList[j].colDataType, colStructList[j].colWidth);
4324 successFlag = colOp->calculateRowId(aRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
4325
4326 if (successFlag)
4327 {
4328 if (curFbo != lastFbo)
4329 {
4330 RETURN_ON_ERROR(AddLBIDtoList(txnid,
4331 lbids,
4332 colDataTypes,
4333 colStructList[j],
4334 curFbo));
4335 }
4336 }
4337 }
4338
4339 //cout << "lbids size = " << lbids.size()<< endl;
4340 //#ifdef PROFILE
4341 //timer.start("markExtentsInvalid");
4342 //#endif
4343 if (lbids.size() > 0)
4344 rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
4345
4346 //}
4347
4348 if ( m_opType != DELETE)
4349 m_opType = UPDATE;
4350
4351 rc = writeColumnRec(txnid, colStructList, colValueList, colOldValueList,
4352 ridLists[extent], tableOid, true, ridLists[extent].size());
4353
4354 // if (rowIdArray)
4355 // free(rowIdArray);
4356
4357 m_opType = NOOP;
4358
4359 if (rc != NO_ERROR)
4360 break;
4361 }
4362
4363 return rc;
4364 }
4365
updateColumnRecs(const TxnID & txnid,vector<ColStruct> & colExtentsStruct,ColValueList & colValueList,const RIDList & ridLists,const int32_t tableOid)4366 int WriteEngineWrapper::updateColumnRecs(const TxnID& txnid,
4367 vector<ColStruct>& colExtentsStruct,
4368 ColValueList& colValueList,
4369 const RIDList& ridLists,
4370 const int32_t tableOid)
4371 {
4372 //Mark extents invalid
4373 //int rc = 0;
4374 //if (colExtentsStruct[0].dataOid < 3000)
4375 //{
4376 vector<BRM::LBID_t> lbids;
4377 vector<CalpontSystemCatalog::ColDataType> colDataTypes;
4378 ColumnOp* colOp = NULL;
4379 bool successFlag = true;
4380 unsigned width = 0;
4381 \
4382 int curFbo = 0, curBio, lastFbo = -1;
4383 RID aRid = ridLists[0];
4384 int rc = 0;
4385
4386 for (unsigned j = 0; j < colExtentsStruct.size(); j++)
4387 {
4388 colOp = m_colOp[op(colExtentsStruct[j].fCompressionType)];
4389
4390 if (colExtentsStruct[j].tokenFlag)
4391 continue;
4392
4393 width = colOp->getCorrectRowWidth(colExtentsStruct[j].colDataType, colExtentsStruct[j].colWidth);
4394 successFlag = colOp->calculateRowId(aRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);
4395
4396 if (successFlag)
4397 {
4398 if (curFbo != lastFbo)
4399 {
4400 RETURN_ON_ERROR(AddLBIDtoList(txnid,
4401 lbids,
4402 colDataTypes,
4403 colExtentsStruct[j],
4404 curFbo));
4405 }
4406 }
4407 }
4408
4409 if (lbids.size() > 0)
4410 {
4411 // cout << "BRMWrapper::getInstance()->markExtentsInvalid(lbids); " << lbids.size() << " lbids" << endl;
4412 rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
4413 }
4414
4415 //}
4416 if ( m_opType != DELETE)
4417 m_opType = UPDATE;
4418
4419 rc = writeColumnRecords (txnid, colExtentsStruct, colValueList, ridLists, tableOid);
4420 m_opType = NOOP;
4421 return rc;
4422 }
4423
writeColumnRecords(const TxnID & txnid,vector<ColStruct> & colStructList,ColValueList & colValueList,const RIDList & ridLists,const int32_t tableOid,bool versioning)4424 int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid,
4425 vector<ColStruct>& colStructList,
4426 ColValueList& colValueList,
4427 const RIDList& ridLists, const int32_t tableOid, bool versioning)
4428 {
4429 bool bExcp;
4430 int rc = 0;
4431 void* valArray = NULL;
4432 Column curCol;
4433 ColStruct curColStruct;
4434 ColTupleList curTupleList;
4435 ColStructList::size_type totalColumn;
4436 ColStructList::size_type i;
4437 ColTupleList::size_type totalRow;
4438 setTransId(txnid);
4439 totalColumn = colStructList.size();
4440 totalRow = ridLists.size();
4441
4442 TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
4443
4444 for (i = 0; i < totalColumn; i++)
4445 {
4446 valArray = NULL;
4447 curColStruct = colStructList[i];
4448 curTupleList = colValueList[i];
4449 ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)];
4450
4451 Convertor::convertColType(&curColStruct);
4452
4453 // set params
4454 colOp->initColumn(curCol);
4455
4456 colOp->setColParam(curCol, 0, curColStruct.colWidth,
4457 curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid,
4458 curColStruct.fCompressionType,
4459 curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment);
4460
4461 ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(curColStruct.dataOid);
4462 ColExtsInfo::iterator it = aColExtsInfo.begin();
4463
4464 while (it != aColExtsInfo.end())
4465 {
4466 if ((it->dbRoot == curColStruct.fColDbRoot) && (it->partNum == curColStruct.fColPartition) && (it->segNum == curColStruct.fColSegment))
4467 break;
4468
4469 it++;
4470 }
4471
4472 if (it == aColExtsInfo.end()) //add this one to the list
4473 {
4474 ColExtInfo aExt;
4475 aExt.dbRoot = curColStruct.fColDbRoot;
4476 aExt.partNum = curColStruct.fColPartition;
4477 aExt.segNum = curColStruct.fColSegment;
4478 aExt.compType = curColStruct.fCompressionType;
4479 aExt.isDict = false;
4480 aColExtsInfo.push_back(aExt);
4481 aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
4482 }
4483
4484 string segFile;
4485 rc = colOp->openColumnFile(curCol, segFile, true); // @bug 5572 HDFS tmp file
4486
4487 if (rc != NO_ERROR)
4488 break;
4489
4490 vector<LBIDRange> rangeList;
4491
4492 if (versioning)
4493 {
4494 rc = processVersionBuffers(curCol.dataFile.pFile, txnid, curColStruct,
4495 curColStruct.colWidth, totalRow, ridLists, rangeList);
4496 }
4497
4498 if (rc != NO_ERROR)
4499 {
4500 if (curColStruct.fCompressionType == 0)
4501 {
4502 curCol.dataFile.pFile->flush();
4503 }
4504
4505 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4506 break;
4507 }
4508
4509 switch (curColStruct.colType)
4510 {
4511 case WriteEngine::WR_INT:
4512 case WriteEngine::WR_MEDINT:
4513 valArray = (int*) calloc(sizeof(int), totalRow);
4514 break;
4515
4516 case WriteEngine::WR_UINT:
4517 case WriteEngine::WR_UMEDINT:
4518 valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow);
4519 break;
4520
4521 case WriteEngine::WR_VARBINARY : // treat same as char for now
4522 case WriteEngine::WR_CHAR:
4523 case WriteEngine::WR_BLOB:
4524 case WriteEngine::WR_TEXT:
4525 valArray = (char*) calloc(sizeof(char), totalRow * MAX_COLUMN_BOUNDARY);
4526 break;
4527
4528 case WriteEngine::WR_FLOAT:
4529 valArray = (float*) calloc(sizeof(float), totalRow);
4530 break;
4531
4532 case WriteEngine::WR_DOUBLE:
4533 valArray = (double*) calloc(sizeof(double), totalRow);
4534 break;
4535
4536 case WriteEngine::WR_BYTE:
4537 valArray = (char*) calloc(sizeof(char), totalRow);
4538 break;
4539
4540 case WriteEngine::WR_UBYTE:
4541 valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow);
4542 break;
4543
4544 case WriteEngine::WR_SHORT:
4545 valArray = (short*) calloc(sizeof(short), totalRow);
4546 break;
4547
4548 case WriteEngine::WR_USHORT:
4549 valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow);
4550 break;
4551
4552 case WriteEngine::WR_LONGLONG:
4553 valArray = (long long*) calloc(sizeof(long long), totalRow);
4554 break;
4555
4556 case WriteEngine::WR_ULONGLONG:
4557 valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow);
4558 break;
4559
4560 case WriteEngine::WR_TOKEN:
4561 valArray = (Token*) calloc(sizeof(Token), totalRow);
4562 break;
4563 }
4564
4565 // convert values to valArray
4566 bExcp = false;
4567
4568 try
4569 {
4570 convertValArray(totalRow, curColStruct.colType, curTupleList, valArray);
4571 }
4572 catch (...)
4573 {
4574 bExcp = true;
4575 }
4576
4577 if (bExcp)
4578 {
4579 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4580 return ERR_PARSING;
4581 }
4582
4583 #ifdef PROFILE
4584 timer.start("writeRow ");
4585 #endif
4586 rc = colOp->writeRowsValues(curCol, totalRow, ridLists, valArray);
4587 #ifdef PROFILE
4588 timer.stop("writeRow ");
4589 #endif
4590 colOp->clearColumn(curCol);
4591
4592 if (curColStruct.fCompressionType == 0)
4593 {
4594 std::vector<BRM::FileInfo> files;
4595 BRM::FileInfo aFile;
4596 aFile.partitionNum = curColStruct.fColPartition;
4597 aFile.dbRoot = curColStruct.fColDbRoot;;
4598 aFile.segmentNum = curColStruct.fColSegment;
4599 aFile.compType = curColStruct.fCompressionType;
4600 files.push_back(aFile);
4601
4602 if (idbdatafile::IDBPolicy::useHdfs())
4603 cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
4604 }
4605
4606 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4607
4608 if (valArray != NULL)
4609 free(valArray);
4610
4611 // check error
4612 if (rc != NO_ERROR)
4613 break;
4614 }
4615
4616 return rc;
4617 }
4618
4619 /*@brief writeColumnRec - Write values to a column
4620 */
4621 /***********************************************************
4622 * DESCRIPTION:
4623 * Write values to a column
4624 * PARAMETERS:
4625 * tableOid - table object id
4626 * colStructList - column struct list
4627 * colValueList - column value list
4628 * colNewStructList - the new extent struct list
4629 * colNewValueList - column value list for the new extent
4630 * rowIdArray - row id list
4631 * useTmpSuffix - use temp suffix for db output file
4632 * RETURN:
4633 * NO_ERROR if success
4634 * others if something wrong in inserting the value
4635 ***********************************************************/
writeColumnRec(const TxnID & txnid,const ColStructList & colStructList,ColValueList & colValueList,RID * rowIdArray,const ColStructList & newColStructList,ColValueList & newColValueList,const int32_t tableOid,bool useTmpSuffix,bool versioning)4636 int WriteEngineWrapper::writeColumnRec(const TxnID& txnid,
4637 const ColStructList& colStructList,
4638 ColValueList& colValueList,
4639 RID* rowIdArray,
4640 const ColStructList& newColStructList,
4641 ColValueList& newColValueList,
4642 const int32_t tableOid,
4643 bool useTmpSuffix,
4644 bool versioning)
4645 {
4646 bool bExcp;
4647 int rc = 0;
4648 void* valArray;
4649 string segFile;
4650 Column curCol;
4651 ColTupleList oldTupleList;
4652 ColStructList::size_type totalColumn;
4653 ColStructList::size_type i;
4654 ColTupleList::size_type totalRow1, totalRow2;
4655
4656 setTransId(txnid);
4657
4658 totalColumn = colStructList.size();
4659 #ifdef PROFILE
4660 StopWatch timer;
4661 #endif
4662
4663 if (newColValueList.size() > 0)
4664 {
4665 totalRow1 = colValueList[0].size();
4666 totalRow2 = newColValueList[0].size();
4667 }
4668 else
4669 {
4670 totalRow1 = colValueList[0].size();
4671 totalRow2 = 0;
4672 }
4673
4674 TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
4675
4676 for (i = 0; i < totalColumn; i++)
4677 {
4678 if (totalRow2 > 0)
4679 {
4680 RID* secondPart = rowIdArray + totalRow1;
4681
4682 //@Bug 2205 Check if all rows go to the new extent
4683 if (totalRow1 > 0)
4684 {
4685 //Write the first batch
4686 valArray = NULL;
4687 RID* firstPart = rowIdArray;
4688 ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
4689
4690 // set params
4691 colOp->initColumn(curCol);
4692 // need to pass real dbRoot, partition, and segment to setColParam
4693 colOp->setColParam(curCol, 0, colStructList[i].colWidth,
4694 colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
4695 colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
4696 colStructList[i].fColPartition, colStructList[i].fColSegment);
4697
4698 ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
4699 ColExtsInfo::iterator it = aColExtsInfo.begin();
4700
4701 while (it != aColExtsInfo.end())
4702 {
4703 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
4704 break;
4705
4706 it++;
4707 }
4708
4709 if (it == aColExtsInfo.end()) //add this one to the list
4710 {
4711 ColExtInfo aExt;
4712 aExt.dbRoot = colStructList[i].fColDbRoot;
4713 aExt.partNum = colStructList[i].fColPartition;
4714 aExt.segNum = colStructList[i].fColSegment;
4715 aExt.compType = colStructList[i].fCompressionType;
4716 aColExtsInfo.push_back(aExt);
4717 aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
4718 }
4719
4720 rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
4721
4722 if (rc != NO_ERROR)
4723 break;
4724
4725 // handling versioning
4726 vector<LBIDRange> rangeList;
4727
4728 if (versioning)
4729 {
4730 rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
4731 colStructList[i].colWidth, totalRow1, firstPart, rangeList);
4732
4733 if (rc != NO_ERROR)
4734 {
4735 if (colStructList[i].fCompressionType == 0)
4736 {
4737 curCol.dataFile.pFile->flush();
4738 }
4739
4740 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4741 break;
4742 }
4743 }
4744
4745 //totalRow1 -= totalRow2;
4746 // have to init the size here
4747 // nullArray = (bool*) malloc(sizeof(bool) * totalRow);
4748 switch (colStructList[i].colType)
4749 {
4750 case WriteEngine::WR_INT:
4751 case WriteEngine::WR_MEDINT:
4752 valArray = (int*) calloc(sizeof(int), totalRow1);
4753 break;
4754
4755 case WriteEngine::WR_UINT:
4756 case WriteEngine::WR_UMEDINT:
4757 valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow1);
4758 break;
4759
4760 case WriteEngine::WR_VARBINARY : // treat same as char for now
4761 case WriteEngine::WR_CHAR:
4762 case WriteEngine::WR_BLOB:
4763 case WriteEngine::WR_TEXT:
4764 valArray = (char*) calloc(sizeof(char), totalRow1 * MAX_COLUMN_BOUNDARY);
4765 break;
4766
4767 case WriteEngine::WR_FLOAT:
4768 valArray = (float*) calloc(sizeof(float), totalRow1);
4769 break;
4770
4771 case WriteEngine::WR_DOUBLE:
4772 valArray = (double*) calloc(sizeof(double), totalRow1);
4773 break;
4774
4775 case WriteEngine::WR_BYTE:
4776 valArray = (char*) calloc(sizeof(char), totalRow1);
4777 break;
4778
4779 case WriteEngine::WR_UBYTE:
4780 valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow1);
4781 break;
4782
4783 case WriteEngine::WR_SHORT:
4784 valArray = (short*) calloc(sizeof(short), totalRow1);
4785 break;
4786
4787 case WriteEngine::WR_USHORT:
4788 valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow1);
4789 break;
4790
4791 case WriteEngine::WR_LONGLONG:
4792 valArray = (long long*) calloc(sizeof(long long), totalRow1);
4793 break;
4794
4795 case WriteEngine::WR_ULONGLONG:
4796 valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow1);
4797 break;
4798
4799 case WriteEngine::WR_TOKEN:
4800 valArray = (Token*) calloc(sizeof(Token), totalRow1);
4801 break;
4802 }
4803
4804 // convert values to valArray
4805 if (m_opType != DELETE)
4806 {
4807 bExcp = false;
4808
4809 try
4810 {
4811 convertValArray(totalRow1, colStructList[i].colType, colValueList[i], valArray);
4812 }
4813 catch (...)
4814 {
4815 bExcp = true;
4816 }
4817
4818 if (bExcp)
4819 {
4820 if (versioning)
4821 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4822
4823 return ERR_PARSING;
4824 }
4825
4826 #ifdef PROFILE
4827 timer.start("writeRow ");
4828 #endif
4829 rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
4830 #ifdef PROFILE
4831 timer.stop("writeRow ");
4832 #endif
4833 }
4834 else
4835 {
4836 #ifdef PROFILE
4837 timer.start("writeRow ");
4838 #endif
4839 rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, true);
4840 #ifdef PROFILE
4841 timer.stop("writeRow ");
4842 #endif
4843 }
4844
4845 colOp->clearColumn(curCol);
4846
4847 if (versioning)
4848 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4849
4850 if (valArray != NULL)
4851 free(valArray);
4852
4853 // check error
4854 if (rc != NO_ERROR)
4855 break;
4856 }
4857
4858 //Process the second batch
4859 valArray = NULL;
4860
4861 ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)];
4862
4863 // set params
4864 colOp->initColumn(curCol);
4865 colOp->setColParam(curCol, 0, newColStructList[i].colWidth,
4866 newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid,
4867 newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot,
4868 newColStructList[i].fColPartition, newColStructList[i].fColSegment);
4869
4870 ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid);
4871 ColExtsInfo::iterator it = aColExtsInfo.begin();
4872
4873 while (it != aColExtsInfo.end())
4874 {
4875 if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == newColStructList[i].fColSegment))
4876 break;
4877
4878 it++;
4879 }
4880
4881 if (it == aColExtsInfo.end()) //add this one to the list
4882 {
4883 ColExtInfo aExt;
4884 aExt.dbRoot = newColStructList[i].fColDbRoot;
4885 aExt.partNum = newColStructList[i].fColPartition;
4886 aExt.segNum = newColStructList[i].fColSegment;
4887 aExt.compType = newColStructList[i].fCompressionType;
4888 aColExtsInfo.push_back(aExt);
4889 aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
4890 }
4891
4892 // Pass "false" for hdfs tmp file flag. Since we only allow 1
4893 // extent per segment file (with HDFS), we can assume a second
4894 // extent is going to a new file (and won't need tmp file).
4895 rc = colOp->openColumnFile(curCol, segFile, false, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
4896
4897 if (rc != NO_ERROR)
4898 break;
4899
4900 // handling versioning
4901 vector<LBIDRange> rangeList;
4902
4903 if (versioning)
4904 {
4905 rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i],
4906 newColStructList[i].colWidth, totalRow2, secondPart, rangeList);
4907
4908 if (rc != NO_ERROR)
4909 {
4910 if (newColStructList[i].fCompressionType == 0)
4911 {
4912 curCol.dataFile.pFile->flush();
4913 }
4914
4915 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4916 break;
4917 }
4918 }
4919
4920 //totalRow1 -= totalRow2;
4921 // have to init the size here
4922 // nullArray = (bool*) malloc(sizeof(bool) * totalRow);
4923 switch (newColStructList[i].colType)
4924 {
4925 case WriteEngine::WR_INT:
4926 case WriteEngine::WR_MEDINT:
4927 valArray = (int*) calloc(sizeof(int), totalRow2);
4928 break;
4929
4930 case WriteEngine::WR_UINT:
4931 case WriteEngine::WR_UMEDINT:
4932 valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow2);
4933 break;
4934
4935 case WriteEngine::WR_VARBINARY : // treat same as char for now
4936 case WriteEngine::WR_CHAR:
4937 case WriteEngine::WR_BLOB:
4938 case WriteEngine::WR_TEXT:
4939 valArray = (char*) calloc(sizeof(char), totalRow2 * MAX_COLUMN_BOUNDARY);
4940 break;
4941
4942 case WriteEngine::WR_FLOAT:
4943 valArray = (float*) calloc(sizeof(float), totalRow2);
4944 break;
4945
4946 case WriteEngine::WR_DOUBLE:
4947 valArray = (double*) calloc(sizeof(double), totalRow2);
4948 break;
4949
4950 case WriteEngine::WR_BYTE:
4951 valArray = (char*) calloc(sizeof(char), totalRow2);
4952 break;
4953
4954 case WriteEngine::WR_UBYTE:
4955 valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow2);
4956 break;
4957
4958 case WriteEngine::WR_SHORT:
4959 valArray = (short*) calloc(sizeof(short), totalRow2);
4960 break;
4961
4962 case WriteEngine::WR_USHORT:
4963 valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow2);
4964 break;
4965
4966 case WriteEngine::WR_LONGLONG:
4967 valArray = (long long*) calloc(sizeof(long long), totalRow2);
4968 break;
4969
4970 case WriteEngine::WR_ULONGLONG:
4971 valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow2);
4972 break;
4973
4974 case WriteEngine::WR_TOKEN:
4975 valArray = (Token*) calloc(sizeof(Token), totalRow2);
4976 break;
4977 }
4978
4979 // convert values to valArray
4980 if (m_opType != DELETE)
4981 {
4982 bExcp = false;
4983
4984 try
4985 {
4986 convertValArray(totalRow2, newColStructList[i].colType, newColValueList[i], valArray);
4987 }
4988 catch (...)
4989 {
4990 bExcp = true;
4991 }
4992
4993 if (bExcp)
4994 {
4995 if (versioning)
4996 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
4997
4998 return ERR_PARSING;
4999 }
5000
5001 #ifdef PROFILE
5002 timer.start("writeRow ");
5003 #endif
5004 rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray);
5005 #ifdef PROFILE
5006 timer.stop("writeRow ");
5007 #endif
5008 }
5009 else
5010 {
5011 #ifdef PROFILE
5012 timer.start("writeRow ");
5013 #endif
5014 rc = colOp->writeRow(curCol, totalRow2, rowIdArray, valArray, true);
5015 #ifdef PROFILE
5016 timer.stop("writeRow ");
5017 #endif
5018 }
5019
5020
5021 colOp->clearColumn(curCol);
5022
5023 if (versioning)
5024 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5025
5026 if (valArray != NULL)
5027 free(valArray);
5028
5029 // check error
5030 if (rc != NO_ERROR)
5031 break;
5032 }
5033 else
5034 {
5035 valArray = NULL;
5036
5037 ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
5038
5039 // set params
5040 colOp->initColumn(curCol);
5041 colOp->setColParam(curCol, 0, colStructList[i].colWidth,
5042 colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
5043 colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
5044 colStructList[i].fColPartition, colStructList[i].fColSegment);
5045
5046 rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
5047
5048 //cout << " Opened file oid " << curCol.dataFile.pFile << endl;
5049 if (rc != NO_ERROR)
5050 break;
5051
5052 ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
5053 ColExtsInfo::iterator it = aColExtsInfo.begin();
5054
5055 while (it != aColExtsInfo.end())
5056 {
5057 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
5058 break;
5059
5060 it++;
5061 }
5062
5063 if (it == aColExtsInfo.end()) //add this one to the list
5064 {
5065 ColExtInfo aExt;
5066 aExt.dbRoot = colStructList[i].fColDbRoot;
5067 aExt.partNum = colStructList[i].fColPartition;
5068 aExt.segNum = colStructList[i].fColSegment;
5069 aExt.compType = colStructList[i].fCompressionType;
5070 aColExtsInfo.push_back(aExt);
5071 aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
5072 }
5073
5074 // handling versioning
5075 vector<LBIDRange> rangeList;
5076
5077 if (versioning)
5078 {
5079 rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
5080 colStructList[i].colWidth, totalRow1, rowIdArray, rangeList);
5081
5082 if (rc != NO_ERROR)
5083 {
5084 if (colStructList[i].fCompressionType == 0)
5085 {
5086 curCol.dataFile.pFile->flush();
5087 }
5088
5089 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5090 break;
5091 }
5092 }
5093
5094 // have to init the size here
5095 // nullArray = (bool*) malloc(sizeof(bool) * totalRow);
5096 switch (colStructList[i].colType)
5097 {
5098 case WriteEngine::WR_INT:
5099 case WriteEngine::WR_MEDINT:
5100 valArray = (int*) calloc(sizeof(int), totalRow1);
5101 break;
5102
5103 case WriteEngine::WR_UINT:
5104 case WriteEngine::WR_UMEDINT:
5105 valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow1);
5106 break;
5107
5108 case WriteEngine::WR_VARBINARY : // treat same as char for now
5109 case WriteEngine::WR_CHAR:
5110 case WriteEngine::WR_BLOB:
5111 case WriteEngine::WR_TEXT:
5112 valArray = (char*) calloc(sizeof(char), totalRow1 * MAX_COLUMN_BOUNDARY);
5113 break;
5114
5115 case WriteEngine::WR_FLOAT:
5116 valArray = (float*) calloc(sizeof(float), totalRow1);
5117 break;
5118
5119 case WriteEngine::WR_DOUBLE:
5120 valArray = (double*) calloc(sizeof(double), totalRow1);
5121 break;
5122
5123 case WriteEngine::WR_BYTE:
5124 valArray = (char*) calloc(sizeof(char), totalRow1);
5125 break;
5126
5127 case WriteEngine::WR_UBYTE:
5128 valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow1);
5129 break;
5130
5131 case WriteEngine::WR_SHORT:
5132 valArray = (short*) calloc(sizeof(short), totalRow1);
5133 break;
5134
5135 case WriteEngine::WR_USHORT:
5136 valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow1);
5137 break;
5138
5139 case WriteEngine::WR_LONGLONG:
5140 valArray = (long long*) calloc(sizeof(long long), totalRow1);
5141 break;
5142
5143 case WriteEngine::WR_ULONGLONG:
5144 valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow1);
5145 break;
5146
5147 case WriteEngine::WR_TOKEN:
5148 valArray = (Token*) calloc(sizeof(Token), totalRow1);
5149 break;
5150 }
5151
5152 // convert values to valArray
5153 if (m_opType != DELETE)
5154 {
5155 bExcp = false;
5156
5157 try
5158 {
5159 convertValArray(totalRow1, colStructList[i].colType, colValueList[i], valArray);
5160 }
5161 catch (...)
5162 {
5163 bExcp = true;
5164 }
5165
5166 if (bExcp)
5167 {
5168 if (versioning)
5169 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5170
5171 return ERR_PARSING;
5172 }
5173
5174 #ifdef PROFILE
5175 timer.start("writeRow ");
5176 #endif
5177 rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray);
5178 #ifdef PROFILE
5179 timer.stop("writeRow ");
5180 #endif
5181 }
5182 else
5183 {
5184 #ifdef PROFILE
5185 timer.start("writeRow ");
5186 #endif
5187 rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, true);
5188 #ifdef PROFILE
5189 timer.stop("writeRow ");
5190 #endif
5191 }
5192
5193 colOp->clearColumn(curCol);
5194
5195 if (versioning)
5196 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5197
5198 if (valArray != NULL)
5199 free(valArray);
5200
5201 // check error
5202 if (rc != NO_ERROR)
5203 break;
5204 }
5205 } // end of for (i = 0
5206
5207 #ifdef PROFILE
5208 timer.finish();
5209 #endif
5210 return rc;
5211 }
5212
writeColumnRecBinary(const TxnID & txnid,const ColStructList & colStructList,std::vector<uint64_t> & colValueList,RID * rowIdArray,const ColStructList & newColStructList,std::vector<uint64_t> & newColValueList,const int32_t tableOid,bool useTmpSuffix,bool versioning)5213 int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
5214 const ColStructList& colStructList,
5215 std::vector<uint64_t>& colValueList,
5216 RID* rowIdArray,
5217 const ColStructList& newColStructList,
5218 std::vector<uint64_t>& newColValueList,
5219 const int32_t tableOid,
5220 bool useTmpSuffix,
5221 bool versioning)
5222 {
5223 int rc = 0;
5224 void* valArray = NULL;
5225 string segFile;
5226 Column curCol;
5227 ColStructList::size_type totalColumn;
5228 ColStructList::size_type i;
5229 size_t totalRow1, totalRow2;
5230
5231 setTransId(txnid);
5232
5233 totalColumn = colStructList.size();
5234 #ifdef PROFILE
5235 StopWatch timer;
5236 #endif
5237
5238 totalRow1 = colValueList.size() / totalColumn;
5239
5240 if (newColValueList.size() > 0)
5241 {
5242 totalRow2 = newColValueList.size() / newColStructList.size();
5243 totalRow1 -= totalRow2;
5244 }
5245 else
5246 {
5247 totalRow2 = 0;
5248 }
5249
5250 // It is possible totalRow1 is zero but totalRow2 has values
5251 if ((totalRow1 == 0) && (totalRow2 == 0))
5252 return rc;
5253
5254 TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
5255
5256 if (totalRow1)
5257 {
5258 valArray = malloc(sizeof(uint64_t) * totalRow1);
5259
5260 for (i = 0; i < totalColumn; i++)
5261 {
5262 //@Bug 2205 Check if all rows go to the new extent
5263 //Write the first batch
5264 RID* firstPart = rowIdArray;
5265 ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
5266
5267 // set params
5268 colOp->initColumn(curCol);
5269 // need to pass real dbRoot, partition, and segment to setColParam
5270 colOp->setColParam(curCol, 0, colStructList[i].colWidth,
5271 colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
5272 colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
5273 colStructList[i].fColPartition, colStructList[i].fColSegment);
5274
5275 ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
5276 ColExtsInfo::iterator it = aColExtsInfo.begin();
5277
5278 while (it != aColExtsInfo.end())
5279 {
5280 if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
5281 break;
5282
5283 it++;
5284 }
5285
5286 if (it == aColExtsInfo.end()) //add this one to the list
5287 {
5288 ColExtInfo aExt;
5289 aExt.dbRoot = colStructList[i].fColDbRoot;
5290 aExt.partNum = colStructList[i].fColPartition;
5291 aExt.segNum = colStructList[i].fColSegment;
5292 aExt.compType = colStructList[i].fCompressionType;
5293 aColExtsInfo.push_back(aExt);
5294 aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
5295 }
5296
5297 rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
5298
5299 if (rc != NO_ERROR)
5300 break;
5301
5302 // handling versioning
5303 vector<LBIDRange> rangeList;
5304
5305 if (versioning)
5306 {
5307 rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
5308 colStructList[i].colWidth, totalRow1, firstPart, rangeList);
5309
5310 if (rc != NO_ERROR)
5311 {
5312 if (colStructList[i].fCompressionType == 0)
5313 {
5314 curCol.dataFile.pFile->flush();
5315 }
5316
5317 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5318 break;
5319 }
5320 }
5321
5322 //totalRow1 -= totalRow2;
5323 // have to init the size here
5324 // nullArray = (bool*) malloc(sizeof(bool) * totalRow);
5325 uint8_t tmp8;
5326 uint16_t tmp16;
5327 uint32_t tmp32;
5328
5329 for (size_t j = 0; j < totalRow1; j++)
5330 {
5331 uint64_t curValue = colValueList[((totalRow1 + totalRow2) * i) + j];
5332
5333 switch (colStructList[i].colType)
5334 {
5335 case WriteEngine::WR_VARBINARY : // treat same as char for now
5336 case WriteEngine::WR_CHAR:
5337 case WriteEngine::WR_BLOB:
5338 case WriteEngine::WR_TEXT:
5339 ((uint64_t*)valArray)[j] = curValue;
5340 break;
5341
5342 case WriteEngine::WR_INT:
5343 case WriteEngine::WR_UINT:
5344 case WriteEngine::WR_MEDINT:
5345 case WriteEngine::WR_UMEDINT:
5346 case WriteEngine::WR_FLOAT:
5347 tmp32 = curValue;
5348 ((uint32_t*)valArray)[j] = tmp32;
5349 break;
5350
5351 case WriteEngine::WR_ULONGLONG:
5352 case WriteEngine::WR_LONGLONG:
5353 case WriteEngine::WR_DOUBLE:
5354 case WriteEngine::WR_TOKEN:
5355 ((uint64_t*)valArray)[j] = curValue;
5356 break;
5357
5358 case WriteEngine::WR_BYTE:
5359 case WriteEngine::WR_UBYTE:
5360 tmp8 = curValue;
5361 ((uint8_t*)valArray)[j] = tmp8;
5362 break;
5363
5364 case WriteEngine::WR_SHORT:
5365 case WriteEngine::WR_USHORT:
5366 tmp16 = curValue;
5367 ((uint16_t*)valArray)[j] = tmp16;
5368 break;
5369 }
5370 }
5371
5372
5373 #ifdef PROFILE
5374 timer.start("writeRow ");
5375 #endif
5376 rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
5377 #ifdef PROFILE
5378 timer.stop("writeRow ");
5379 #endif
5380 colOp->closeColumnFile(curCol);
5381
5382 if (versioning)
5383 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5384
5385 // check error
5386 if (rc != NO_ERROR)
5387 break;
5388
5389 } // end of for (i = 0
5390
5391 if (valArray != NULL)
5392 {
5393 free(valArray);
5394 valArray = NULL;
5395 }
5396 }
5397
5398 // MCOL-1176 - Write second extent
5399 if (totalRow2)
5400 {
5401 valArray = malloc(sizeof(uint64_t) * totalRow2);
5402
5403 for (i = 0; i < newColStructList.size(); i++)
5404 {
5405 //@Bug 2205 Check if all rows go to the new extent
5406 //Write the first batch
5407 RID* secondPart = rowIdArray + totalRow1;
5408 ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)];
5409
5410 // set params
5411 colOp->initColumn(curCol);
5412 // need to pass real dbRoot, partition, and segment to setColParam
5413 colOp->setColParam(curCol, 0, newColStructList[i].colWidth,
5414 newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid,
5415 newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot,
5416 newColStructList[i].fColPartition, newColStructList[i].fColSegment);
5417
5418 ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid);
5419 ColExtsInfo::iterator it = aColExtsInfo.begin();
5420
5421 while (it != aColExtsInfo.end())
5422 {
5423 if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
5424 break;
5425
5426 it++;
5427 }
5428
5429 if (it == aColExtsInfo.end()) //add this one to the list
5430 {
5431 ColExtInfo aExt;
5432 aExt.dbRoot = newColStructList[i].fColDbRoot;
5433 aExt.partNum = newColStructList[i].fColPartition;
5434 aExt.segNum = newColStructList[i].fColSegment;
5435 aExt.compType = newColStructList[i].fCompressionType;
5436 aColExtsInfo.push_back(aExt);
5437 aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
5438 }
5439
5440 rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
5441
5442 if (rc != NO_ERROR)
5443 break;
5444
5445 // handling versioning
5446 vector<LBIDRange> rangeList;
5447
5448 if (versioning)
5449 {
5450 rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i],
5451 newColStructList[i].colWidth, totalRow2, secondPart, rangeList);
5452
5453 if (rc != NO_ERROR)
5454 {
5455 if (newColStructList[i].fCompressionType == 0)
5456 {
5457 curCol.dataFile.pFile->flush();
5458 }
5459
5460 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5461 break;
5462 }
5463 }
5464
5465 //totalRow1 -= totalRow2;
5466 // have to init the size here
5467 // nullArray = (bool*) malloc(sizeof(bool) * totalRow);
5468 uint8_t tmp8;
5469 uint16_t tmp16;
5470 uint32_t tmp32;
5471
5472 for (size_t j = 0; j < totalRow2; j++)
5473 {
5474 uint64_t curValue = newColValueList[(totalRow2 * i) + j];
5475
5476 switch (newColStructList[i].colType)
5477 {
5478 case WriteEngine::WR_VARBINARY : // treat same as char for now
5479 case WriteEngine::WR_CHAR:
5480 case WriteEngine::WR_BLOB:
5481 case WriteEngine::WR_TEXT:
5482 ((uint64_t*)valArray)[j] = curValue;
5483 break;
5484
5485 case WriteEngine::WR_INT:
5486 case WriteEngine::WR_UINT:
5487 case WriteEngine::WR_MEDINT:
5488 case WriteEngine::WR_UMEDINT:
5489 case WriteEngine::WR_FLOAT:
5490 tmp32 = curValue;
5491 ((uint32_t*)valArray)[j] = tmp32;
5492 break;
5493
5494 case WriteEngine::WR_ULONGLONG:
5495 case WriteEngine::WR_LONGLONG:
5496 case WriteEngine::WR_DOUBLE:
5497 case WriteEngine::WR_TOKEN:
5498 ((uint64_t*)valArray)[j] = curValue;
5499 break;
5500
5501 case WriteEngine::WR_BYTE:
5502 case WriteEngine::WR_UBYTE:
5503 tmp8 = curValue;
5504 ((uint8_t*)valArray)[j] = tmp8;
5505 break;
5506
5507 case WriteEngine::WR_SHORT:
5508 case WriteEngine::WR_USHORT:
5509 tmp16 = curValue;
5510 ((uint16_t*)valArray)[j] = tmp16;
5511 break;
5512 }
5513 }
5514
5515
5516 #ifdef PROFILE
5517 timer.start("writeRow ");
5518 #endif
5519 rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray);
5520 #ifdef PROFILE
5521 timer.stop("writeRow ");
5522 #endif
5523 colOp->closeColumnFile(curCol);
5524
5525 if (versioning)
5526 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
5527
5528 // check error
5529 if (rc != NO_ERROR)
5530 break;
5531
5532 } // end of for (i = 0
5533 }
5534
5535 if (valArray != NULL)
5536 free(valArray);
5537
5538
5539 #ifdef PROFILE
5540 timer.finish();
5541 #endif
5542 return rc;
5543 }
5544
5545
writeColumnRec(const TxnID & txnid,const ColStructList & colStructList,const ColValueList & colValueList,vector<void * > & colOldValueList,const RIDList & ridList,const int32_t tableOid,bool convertStructFlag,ColTupleList::size_type nRows)5546 int WriteEngineWrapper::writeColumnRec(const TxnID& txnid,
5547 const ColStructList& colStructList,
5548 const ColValueList& colValueList,
5549 vector<void*>& colOldValueList,
5550 const RIDList& ridList,
5551 const int32_t tableOid,
5552 bool convertStructFlag,
5553 ColTupleList::size_type nRows)
5554 {
5555 bool bExcp;
5556 int rc = 0;
5557 void* valArray = NULL;
5558 Column curCol;
5559 ColStruct curColStruct;
5560 ColTupleList curTupleList, oldTupleList;
5561 ColStructList::size_type totalColumn;
5562 ColStructList::size_type i;
5563 ColTupleList::size_type totalRow;
5564
5565 setTransId(txnid);
5566 colOldValueList.clear();
5567 totalColumn = colStructList.size();
5568 totalRow = nRows;
5569
5570 #ifdef PROFILE
5571 StopWatch timer;
5572 #endif
5573
5574 vector<LBIDRange> rangeListTot;
5575 std::vector<VBRange> freeList;
5576 vector<vector<uint32_t> > fboLists;
5577 vector<vector<LBIDRange> > rangeLists;
5578 rc = processBeginVBCopy(txnid, colStructList, ridList, freeList, fboLists, rangeLists, rangeListTot);
5579
5580 if (rc != NO_ERROR)
5581 {
5582 if (rangeListTot.size() > 0)
5583 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);
5584
5585 switch (rc)
5586 {
5587 case BRM::ERR_DEADLOCK:
5588 return ERR_BRM_DEAD_LOCK;
5589
5590 case BRM::ERR_VBBM_OVERFLOW:
5591 return ERR_BRM_VB_OVERFLOW;
5592
5593 case BRM::ERR_NETWORK:
5594 return ERR_BRM_NETWORK;
5595
5596 case BRM::ERR_READONLY:
5597 return ERR_BRM_READONLY;
5598
5599 default:
5600 return ERR_BRM_BEGIN_COPY;
5601 }
5602 }
5603
5604 VBRange aRange;
5605 uint32_t blocksProcessedThisOid = 0;
5606 uint32_t blocksProcessed = 0;
5607 std::vector<BRM::FileInfo> files;
5608 TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
5609
5610 for (i = 0; i < totalColumn; i++)
5611 {
5612 valArray = NULL;
5613 curColStruct = colStructList[i];
5614 curTupleList = colValueList[i]; //same value for all rows
5615 ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)];
5616
5617 // convert column data type
5618 if (convertStructFlag)
5619 Convertor::convertColType(&curColStruct);
5620
5621 // set params
5622 colOp->initColumn(curCol);
5623 colOp->setColParam(curCol, 0, curColStruct.colWidth,
5624 curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid,
5625 curColStruct.fCompressionType, curColStruct.fColDbRoot,
5626 curColStruct.fColPartition, curColStruct.fColSegment);
5627
5628
5629 ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(curColStruct.dataOid);
5630 ColExtsInfo::iterator it = aColExtsInfo.begin();
5631
5632 while (it != aColExtsInfo.end())
5633 {
5634 if ((it->dbRoot == curColStruct.fColDbRoot) && (it->partNum == curColStruct.fColPartition) && (it->segNum == curColStruct.fColSegment))
5635 break;
5636
5637 it++;
5638 }
5639
5640 if (it == aColExtsInfo.end()) //add this one to the list
5641 {
5642 ColExtInfo aExt;
5643 aExt.dbRoot = curColStruct.fColDbRoot;
5644 aExt.partNum = curColStruct.fColPartition;
5645 aExt.segNum = curColStruct.fColSegment;
5646 aExt.compType = curColStruct.fCompressionType;
5647 aColExtsInfo.push_back(aExt);
5648 aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
5649 }
5650
5651 string segFile;
5652 rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
5653
5654 if (rc != NO_ERROR)
5655 break;
5656
5657 if (curColStruct.fCompressionType == 0)
5658 {
5659 BRM::FileInfo aFile;
5660 aFile.oid = curColStruct.dataOid;
5661 aFile.partitionNum = curColStruct.fColPartition;
5662 aFile.dbRoot = curColStruct.fColDbRoot;;
5663 aFile.segmentNum = curColStruct.fColSegment;
5664 aFile.compType = curColStruct.fCompressionType;
5665 files.push_back(aFile);
5666 }
5667
5668 // handling versioning
5669 //cout << " pass to processVersionBuffer rid " << rowIdArray[0] << endl;
5670 //cout << "dataOid:fColPartition = " << curColStruct.dataOid << ":" << curColStruct.fColPartition << endl;
5671 //timer.start("processVersionBuffers");
5672 //vector<LBIDRange> rangeList;
5673 // rc = processVersionBuffers(curCol.dataFile.pFile, txnid, curColStruct, curColStruct.colWidth, totalRow, ridList, rangeList);
5674 std::vector<VBRange> curFreeList;
5675 uint32_t blockUsed = 0;
5676
5677 if (!idbdatafile::IDBPolicy::useHdfs())
5678 {
5679 if (rangeListTot.size() > 0)
5680 {
5681 if (freeList[0].size >= (blocksProcessed + rangeLists[i].size()))
5682 {
5683 aRange.vbOID = freeList[0].vbOID;
5684 aRange.vbFBO = freeList[0].vbFBO + blocksProcessed;
5685 aRange.size = rangeLists[i].size();
5686 curFreeList.push_back(aRange);
5687 //cout << "range size = " << aRange.size <<" and blocksProcessed = " << blocksProcessed<< endl;
5688 }
5689 else
5690 {
5691 aRange.vbOID = freeList[0].vbOID;
5692 aRange.vbFBO = freeList[0].vbFBO + blocksProcessed;
5693 aRange.size = freeList[0].size - blocksProcessed;
5694 blockUsed = aRange.size;
5695 curFreeList.push_back(aRange);
5696
5697 if (freeList.size() > 1)
5698 {
5699 aRange.vbOID = freeList[1].vbOID;
5700 aRange.vbFBO = freeList[1].vbFBO + blocksProcessedThisOid;
5701 aRange.size = rangeLists[i].size() - blockUsed;
5702 curFreeList.push_back(aRange);
5703 blocksProcessedThisOid += aRange.size;
5704 }
5705 else
5706 {
5707 rc = 1;
5708 break;
5709 }
5710
5711 //cout << "curFreeList size = " << curFreeList.size() << endl;
5712
5713 }
5714
5715 blocksProcessed += rangeLists[i].size();
5716
5717 //timer.start("Delete:writeVB");
5718 rc = BRMWrapper::getInstance()->
5719 writeVB(curCol.dataFile.pFile, (BRM::VER_t)txnid,
5720 curColStruct.dataOid, fboLists[i], rangeLists[i],
5721 colOp, curFreeList, curColStruct.fColDbRoot, true);
5722 }
5723 }
5724
5725 //timer.stop("Delete:writeVB");
5726 //timer.stop("processVersionBuffers");
5727 // cout << " rc for processVersionBuffer is " << rc << endl;
5728 if (rc != NO_ERROR)
5729 {
5730 if (curColStruct.fCompressionType == 0)
5731 {
5732 curCol.dataFile.pFile->flush();
5733 }
5734
5735 if (rangeListTot.size() > 0)
5736 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);
5737
5738 break;
5739 }
5740
5741 switch (curColStruct.colType)
5742 {
5743 case WriteEngine::WR_INT:
5744 case WriteEngine::WR_MEDINT:
5745 valArray = (int*) calloc(sizeof(int), 1);
5746 break;
5747
5748 case WriteEngine::WR_UINT:
5749 case WriteEngine::WR_UMEDINT:
5750 valArray = (uint32_t*) calloc(sizeof(uint32_t), 1);
5751 break;
5752
5753 case WriteEngine::WR_VARBINARY : // treat same as char for now
5754 case WriteEngine::WR_CHAR:
5755 case WriteEngine::WR_BLOB:
5756 case WriteEngine::WR_TEXT:
5757 valArray = (char*) calloc(sizeof(char), 1 * MAX_COLUMN_BOUNDARY);
5758 break;
5759
5760 case WriteEngine::WR_FLOAT:
5761 valArray = (float*) calloc(sizeof(float), 1);
5762 break;
5763
5764 case WriteEngine::WR_DOUBLE:
5765 valArray = (double*) calloc(sizeof(double), 1);
5766 break;
5767
5768 case WriteEngine::WR_BYTE:
5769 valArray = (char*) calloc(sizeof(char), 1);
5770 break;
5771
5772 case WriteEngine::WR_UBYTE:
5773 valArray = (uint8_t*) calloc(sizeof(uint8_t), 1);
5774 break;
5775
5776 case WriteEngine::WR_SHORT:
5777 valArray = (short*) calloc(sizeof(short), 1);
5778 break;
5779
5780 case WriteEngine::WR_USHORT:
5781 valArray = (uint16_t*) calloc(sizeof(uint16_t), 1);
5782 break;
5783
5784 case WriteEngine::WR_LONGLONG:
5785 valArray = (long long*) calloc(sizeof(long long), 1);
5786 break;
5787
5788 case WriteEngine::WR_ULONGLONG:
5789 valArray = (uint64_t*) calloc(sizeof(uint64_t), 1);
5790 break;
5791
5792 case WriteEngine::WR_TOKEN:
5793 valArray = (Token*) calloc(sizeof(Token), 1);
5794 break;
5795 }
5796
5797 // convert values to valArray
5798 if (m_opType != DELETE)
5799 {
5800 bExcp = false;
5801 ColTuple curTuple;
5802 curTuple = curTupleList[0];
5803
5804 try
5805 {
5806 convertValue(curColStruct.colType, valArray, curTuple.data);
5807 }
5808 catch (...)
5809 {
5810 bExcp = true;
5811 }
5812
5813 if (bExcp)
5814 {
5815 if (rangeListTot.size() > 0)
5816 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);
5817
5818 return ERR_PARSING;
5819 }
5820
5821 #ifdef PROFILE
5822 timer.start("writeRow ");
5823 #endif
5824 rc = colOp->writeRows(curCol, totalRow, ridList, valArray);
5825 #ifdef PROFILE
5826 timer.stop("writeRow ");
5827 #endif
5828 }
5829 else
5830 {
5831 #ifdef PROFILE
5832 timer.start("writeRows ");
5833 #endif
5834 rc = colOp->writeRows(curCol, totalRow, ridList, valArray, 0, true);
5835 #ifdef PROFILE
5836 timer.stop("writeRows ");
5837 #endif
5838 }
5839
5840 // colOldValueList.push_back(oldValArray);
5841 //timer.start("Delete:closefile");
5842 colOp->clearColumn(curCol);
5843
5844 //timer.stop("Delete:closefile");
5845 if (valArray != NULL)
5846 free(valArray);
5847
5848 // check error
5849 if (rc != NO_ERROR)
5850 break;
5851
5852 } // end of for (i = 0)
5853
5854 // timer.start("Delete:purgePrimProcFdCache");
5855 if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
5856 cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
5857
5858 //if (idbdatafile::IDBPolicy::useHdfs())
5859 // cacheutils::dropPrimProcFdCache();
5860 //timer.stop("Delete:purgePrimProcFdCache");
5861 if (rangeListTot.size() > 0)
5862 BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);
5863
5864 //timer.stop("Delete:writecolrec");
5865 //#ifdef PROFILE
5866 //timer.finish();
5867 //#endif
5868 return rc;
5869 }
5870
5871 /*@brief tokenize - return a token for a given signature and size
5872 */
5873 /***********************************************************
5874 * DESCRIPTION:
5875 * return a token for a given signature and size
5876 * If it is not in the dictionary, the signature
5877 * will be added to the dictionary and the index tree
5878 * If it is already in the dictionary, then
5879 * the token will be returned
5880 * This function does not open and close files.
5881 * users need to use openDctnry and CloseDctnry
5882 * PARAMETERS:
5883 * DctnryTuple& dctnryTuple - holds the sigValue, sigSize and token
5884 * RETURN:
5885 * NO_ERROR if success
5886 * others if something wrong in inserting the value
5887 ***********************************************************/
tokenize(const TxnID & txnid,DctnryTuple & dctnryTuple,int ct)5888 int WriteEngineWrapper::tokenize(const TxnID& txnid, DctnryTuple& dctnryTuple, int ct)
5889 {
5890 int cop = op(ct);
5891 m_dctnry[cop]->setTransId(txnid);
5892 //cout << "Tokenizing dctnryTuple.sigValue " << dctnryTuple.sigValue << endl;
5893 return m_dctnry[cop]->updateDctnry(dctnryTuple.sigValue, dctnryTuple.sigSize, dctnryTuple.token);
5894 }
5895
5896 /*@brief tokenize - return a token for a given signature and size
5897 * accept OIDs as input
5898 */
5899 /***********************************************************
5900 * DESCRIPTION:
5901 * Token for a given signature and size
5902 * If it is not in the dictionary, the signature
5903 * will be added to the dictionary and the index tree
5904 * If it is already in the dictionary, then
5905 * the token will be returned
5906 * PARAMETERS:
5907 * DctnryTuple& dctnryTuple - holds the sigValue, sigSize and token
5908 * DctnryStruct dctnryStruct- contain the 3 OID for dictionary,
5909 * tree and list.
5910 * RETURN:
5911 * NO_ERROR if success
5912 * others if something wrong in inserting the value
5913 ***********************************************************/
tokenize(const TxnID & txnid,DctnryStruct & dctnryStruct,DctnryTuple & dctnryTuple,bool useTmpSuffix)5914 int WriteEngineWrapper::tokenize(const TxnID& txnid,
5915 DctnryStruct& dctnryStruct,
5916 DctnryTuple& dctnryTuple,
5917 bool useTmpSuffix) // @bug 5572 HDFS tmp file
5918 {
5919 //find the corresponding column segment file the token is going to be inserted.
5920
5921 Dctnry* dctnry = m_dctnry[op(dctnryStruct.fCompressionType)];
5922 int rc = dctnry->openDctnry(dctnryStruct.dctnryOid,
5923 dctnryStruct.fColDbRoot, dctnryStruct.fColPartition,
5924 dctnryStruct.fColSegment,
5925 useTmpSuffix); // @bug 5572 TBD
5926
5927 if (rc != NO_ERROR)
5928 return rc;
5929
5930 rc = tokenize(txnid, dctnryTuple, dctnryStruct.fCompressionType);
5931
5932 int rc2 = dctnry->closeDctnry(true); // close file, even if tokenize() fails
5933
5934 if ((rc == NO_ERROR) && (rc2 != NO_ERROR))
5935 rc = rc2;
5936
5937 return rc;
5938 }
5939
5940 /***********************************************************
5941 * DESCRIPTION:
5942 * Create column files, including data and bitmap files
5943 * PARAMETERS:
5944 * dataOid - column data file id
5945 * bitmapOid - column bitmap file id
5946 * colWidth - column width
5947 * dbRoot - DBRoot where file is to be located
5948 * partition - Starting partition number for segment file path
5949 * segment - segment number
5950 * compressionType - compression type
5951 * RETURN:
5952 * NO_ERROR if success
5953 * ERR_FILE_EXIST if file exists
5954 * ERR_FILE_CREATE if something wrong in creating the file
5955 ***********************************************************/
createDctnry(const TxnID & txnid,const OID & dctnryOid,int colWidth,uint16_t dbRoot,uint32_t partiotion,uint16_t segment,int compressionType)5956 int WriteEngineWrapper::createDctnry(const TxnID& txnid,
5957 const OID& dctnryOid,
5958 int colWidth,
5959 uint16_t dbRoot,
5960 uint32_t partiotion,
5961 uint16_t segment,
5962 int compressionType)
5963 {
5964 BRM::LBID_t startLbid;
5965 return m_dctnry[op(compressionType)]->
5966 createDctnry( dctnryOid, colWidth, dbRoot, partiotion, segment, startLbid);
5967 }
5968
convertRidToColumn(RID & rid,uint16_t & dbRoot,uint32_t & partition,uint16_t & segment,RID filesPerColumnPartition,RID extentsPerSegmentFile,RID extentRows,uint16_t startDBRoot,unsigned dbrootCnt)5969 int WriteEngineWrapper::convertRidToColumn (RID& rid, uint16_t& dbRoot, uint32_t& partition,
5970 uint16_t& segment, RID filesPerColumnPartition,
5971 RID extentsPerSegmentFile, RID extentRows,
5972 uint16_t startDBRoot, unsigned dbrootCnt)
5973 {
5974 int rc = 0;
5975 partition = rid / (filesPerColumnPartition * extentsPerSegmentFile * extentRows);
5976
5977 segment = (((rid % (filesPerColumnPartition * extentsPerSegmentFile * extentRows)) / extentRows)) % filesPerColumnPartition;
5978
5979 dbRoot = ((startDBRoot - 1 + segment) % dbrootCnt) + 1;
5980
5981 //Calculate the relative rid for this segment file
5982 RID relRidInPartition = rid - ((RID)partition * (RID)filesPerColumnPartition * (RID)extentsPerSegmentFile * (RID)extentRows);
5983 assert (relRidInPartition <= (RID)filesPerColumnPartition * (RID)extentsPerSegmentFile * (RID)extentRows);
5984 uint32_t numExtentsInThisPart = relRidInPartition / extentRows;
5985 unsigned numExtentsInThisSegPart = numExtentsInThisPart / filesPerColumnPartition;
5986 RID relRidInThisExtent = relRidInPartition - numExtentsInThisPart * extentRows;
5987 rid = relRidInThisExtent + numExtentsInThisSegPart * extentRows;
5988 return rc;
5989 }
5990
5991 /***********************************************************
5992 * DESCRIPTION:
5993 * Clears table lock for the specified table lock ID.
5994 * PARAMETERS:
5995 * lockID - table lock to be released
5996 * errMsg - if error occurs, this is the return error message
5997 * RETURN:
5998 * NO_ERROR if operation is successful
5999 ***********************************************************/
clearTableLockOnly(uint64_t lockID,std::string & errMsg)6000 int WriteEngineWrapper::clearTableLockOnly(
6001 uint64_t lockID,
6002 std::string& errMsg)
6003 {
6004 bool bReleased;
6005
6006 int rc = BRMWrapper::getInstance()->releaseTableLock( lockID,
6007 bReleased, errMsg);
6008
6009 return rc;
6010 }
6011
6012 /***********************************************************
6013 * DESCRIPTION:
6014 * Rolls back the state of the extentmap and database files for the
6015 * specified table OID, using the metadata previously saved to disk.
6016 * Also clears the table lock for the specified table OID.
6017 * PARAMETERS:
6018 * tableOid - table OID to be rolled back
6019 * lockID - table lock corresponding to tableOid
6020 * tableName - table name associated with tableOid
6021 * applName - application that is driving this bulk rollback
6022 * debugConsole - enable debug logging to the console
6023 * errorMsg - error message explaining any rollback failure
6024 * RETURN:
6025 * NO_ERROR if rollback completed succesfully
6026 ***********************************************************/
bulkRollback(OID tableOid,uint64_t lockID,const std::string & tableName,const std::string & applName,bool debugConsole,string & errorMsg)6027 int WriteEngineWrapper::bulkRollback(OID tableOid,
6028 uint64_t lockID,
6029 const std::string& tableName,
6030 const std::string& applName,
6031 bool debugConsole, string& errorMsg)
6032 {
6033 errorMsg.clear();
6034
6035 BulkRollbackMgr rollbackMgr(tableOid, lockID, tableName, applName);
6036
6037 if (debugConsole)
6038 rollbackMgr.setDebugConsole(true);
6039
6040 // We used to pass "false" to not keep (delete) the metafiles at the end of
6041 // the rollback. But after the transition to sharedNothing, we pass "true"
6042 // to initially keep these files. The metafiles are deleted later, only
6043 // after all the distributed bulk rollbacks are successfully completed.
6044 int rc = rollbackMgr.rollback( true );
6045
6046 if (rc != NO_ERROR)
6047 errorMsg = rollbackMgr.getErrorMsg();
6048
6049 // Ignore the return code for now; more important to base rc on the
6050 // success or failure of the previous work
6051 BRMWrapper::getInstance()->takeSnapshot();
6052
6053 return rc;
6054 }
6055
rollbackCommon(const TxnID & txnid,int sessionId)6056 int WriteEngineWrapper::rollbackCommon(const TxnID& txnid, int sessionId)
6057 {
6058 //Remove the unwanted tmp files and recover compressed chunks.
6059 string prefix;
6060
6061 // BUG 4312
6062 RemoveTxnFromLBIDMap(txnid);
6063 RemoveTxnFromDictMap(txnid);
6064
6065 config::Config* config = config::Config::makeConfig();
6066 prefix = config->getConfig("SystemConfig", "DBRMRoot");
6067
6068 if (prefix.length() == 0)
6069 {
6070 cerr << "Need a valid DBRMRoot entry in Calpont configuation file";
6071 return -1;
6072 }
6073
6074 uint64_t pos = prefix.find_last_of ("/") ;
6075 std::string aDMLLogFileName;
6076
6077 if (pos != string::npos)
6078 {
6079 aDMLLogFileName = prefix.substr(0, pos + 1); //Get the file path
6080 }
6081 else
6082 {
6083 logging::Message::Args args;
6084 args.add("RollbackTran cannot find the dbrm directory for the DML log file");
6085 SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_CRITICAL, logging::M0007);
6086 return -1;
6087
6088 }
6089
6090 std::ostringstream oss;
6091 oss << txnid << "_" << Config::getLocalModuleID();
6092 aDMLLogFileName += "DMLLog_" + oss.str();
6093
6094 if (IDBPolicy::exists(aDMLLogFileName.c_str()))
6095 {
6096 // TODO-for now the DML log file will always be in a local
6097 // filesystem since IDBDataFile doesn't have any support for
6098 // a cpp iostream interface. need to decide if this is ok.
6099 boost::scoped_ptr<IDBDataFile> aDMLLogFile(IDBDataFile::open(
6100 IDBPolicy::getType(aDMLLogFileName.c_str(),
6101 IDBPolicy::WRITEENG),
6102 aDMLLogFileName.c_str(), "r", 0));
6103
6104 if (aDMLLogFile) //need recover
6105 {
6106 ssize_t fileSize = aDMLLogFile->size();
6107 boost::scoped_array<char> buf(new char[fileSize]);
6108
6109 if (aDMLLogFile->read(buf.get(), fileSize) != fileSize)
6110 return ERR_FILE_READ;
6111
6112 std::istringstream strstream(string(buf.get(), fileSize));
6113 std::string backUpFileType;
6114 std::string filename;
6115 int64_t size;
6116 int64_t offset;
6117
6118 while (strstream >> backUpFileType >> filename >> size >> offset)
6119 {
6120 //cout << "Found: " << backUpFileType << " name " << filename << "size: " << size << " offset: " << offset << endl;
6121 std::ostringstream oss;
6122 oss << "RollbackTran found " << backUpFileType << " name " << filename << " size: " << size << " offset: " << offset;
6123 logging::Message::Args args;
6124 args.add(oss.str());
6125 SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_INFO, logging::M0007);
6126
6127 if (backUpFileType.compare("rlc") == 0)
6128 {
6129 //remove the rlc file
6130 filename += ".rlc";
6131 //cout << " File removed: " << filename << endl;
6132 IDBPolicy::remove(filename.c_str());
6133 logging::Message::Args args1;
6134 args1.add(filename);
6135 args1.add(" is removed.");
6136 SimpleSysLog::instance()->logMsg(args1, logging::LOG_TYPE_INFO, logging::M0007);
6137 }
6138 else if (backUpFileType.compare("tmp") == 0)
6139 {
6140 int rc = NO_ERROR;
6141 string orig(filename + ".orig");
6142
6143 // restore the orig file
6144 if (IDBPolicy::exists(orig.c_str()))
6145 {
6146 // not likely both cdf and tmp exist
6147 if (IDBPolicy::exists(filename.c_str()) &&
6148 IDBPolicy::remove(filename.c_str()) != 0)
6149 rc = ERR_COMP_REMOVE_FILE;
6150
6151 if (rc == NO_ERROR && IDBPolicy::rename(orig.c_str(), filename.c_str()) != 0)
6152 rc = ERR_COMP_RENAME_FILE;
6153 }
6154
6155 // remove the tmp file
6156 string tmp(filename + ".tmp");
6157
6158 if (rc == NO_ERROR && IDBPolicy::exists(tmp.c_str()) &&
6159 IDBPolicy::remove(tmp.c_str()) != 0)
6160 rc = ERR_COMP_REMOVE_FILE;
6161
6162 // remove the chunk shifting helper
6163 string rlc(filename + ".rlc");
6164
6165 if (rc == NO_ERROR && IDBPolicy::exists(rlc.c_str()) &&
6166 IDBPolicy::remove(rlc.c_str()) != 0)
6167 rc = ERR_COMP_REMOVE_FILE;
6168
6169 logging::Message::Args args1;
6170 args1.add(filename);
6171
6172 if (rc == NO_ERROR)
6173 {
6174 args1.add(" is restored.");
6175 SimpleSysLog::instance()->logMsg(args1,
6176 logging::LOG_TYPE_INFO, logging::M0007);
6177 }
6178 else
6179 {
6180 args1.add(" may not restored: ");
6181 args1.add(rc);
6182 SimpleSysLog::instance()->logMsg(args1,
6183 logging::LOG_TYPE_CRITICAL, logging::M0007);
6184
6185 return rc;
6186 }
6187 }
6188 else
6189 {
6190 //copy back to the data file
6191 std::string backFileName(filename);
6192
6193 if (backUpFileType.compare("chk") == 0 )
6194 backFileName += ".chk";
6195 else
6196 backFileName += ".hdr";
6197
6198 //cout << "Rollback found file " << backFileName << endl;
6199 IDBDataFile* sourceFile = IDBDataFile::open(
6200 IDBPolicy::getType(backFileName.c_str(), IDBPolicy::WRITEENG),
6201 backFileName.c_str(), "r", 0);
6202 IDBDataFile* targetFile = IDBDataFile::open(
6203 IDBPolicy::getType(filename.c_str(), IDBPolicy::WRITEENG),
6204 filename.c_str(), "r+", 0);
6205
6206 size_t byteRead;
6207 unsigned char* readBuf = new unsigned char[size];
6208 boost::scoped_array<unsigned char> readBufPtr( readBuf );
6209
6210 if ( sourceFile != NULL )
6211 {
6212 int rc = sourceFile->seek( 0, 0 );
6213
6214 if (rc)
6215 return ERR_FILE_SEEK;
6216
6217 byteRead = sourceFile->read( readBuf, size );
6218
6219 if ( (int) byteRead != size )
6220 {
6221 logging::Message::Args args6;
6222 args6.add("Rollback cannot read backup file ");
6223 args6.add(backFileName);
6224 SimpleSysLog::instance()->logMsg(args6, logging::LOG_TYPE_ERROR, logging::M0007);
6225 return ERR_FILE_READ;
6226 }
6227 }
6228 else
6229 {
6230 logging::Message::Args args5;
6231 args5.add("Rollback cannot open backup file ");
6232 args5.add(backFileName);
6233 SimpleSysLog::instance()->logMsg(args5, logging::LOG_TYPE_ERROR, logging::M0007);
6234 return ERR_FILE_NULL;
6235 }
6236
6237 size_t byteWrite;
6238
6239 if ( targetFile != NULL )
6240 {
6241 int rc = targetFile->seek( offset, 0 );
6242
6243 if (rc)
6244 return ERR_FILE_SEEK;
6245
6246 byteWrite = targetFile->write( readBuf, size );
6247
6248 if ( (int) byteWrite != size )
6249 {
6250 logging::Message::Args args3;
6251 args3.add("Rollback cannot copy to file ");
6252 args3.add(filename);
6253 args3.add( "from file ");
6254 args3.add(backFileName);
6255 SimpleSysLog::instance()->logMsg(args3, logging::LOG_TYPE_ERROR, logging::M0007);
6256
6257 return ERR_FILE_WRITE;
6258 }
6259 }
6260 else
6261 {
6262 logging::Message::Args args4;
6263 args4.add("Rollback cannot open target file ");
6264 args4.add(filename);
6265 SimpleSysLog::instance()->logMsg(args4, logging::LOG_TYPE_ERROR, logging::M0007);
6266 return ERR_FILE_NULL;
6267 }
6268
6269 //cout << "Rollback copied to file " << filename << " from file " << backFileName << endl;
6270
6271 delete targetFile;
6272 delete sourceFile;
6273 IDBPolicy::remove( backFileName.c_str() );
6274 logging::Message::Args arg1;
6275 arg1.add("Rollback copied to file ");
6276 arg1.add(filename);
6277 arg1.add( "from file ");
6278 arg1.add(backFileName);
6279 SimpleSysLog::instance()->logMsg(arg1, logging::LOG_TYPE_INFO, logging::M0007);
6280 }
6281 }
6282 }
6283
6284 IDBPolicy::remove(aDMLLogFileName.c_str());
6285 }
6286
6287 return 0;
6288
6289 }
6290
rollbackTran(const TxnID & txnid,int sessionId)6291 int WriteEngineWrapper::rollbackTran(const TxnID& txnid, int sessionId)
6292 {
6293 if ( rollbackCommon( txnid, sessionId ) != 0 )
6294 return -1;
6295
6296 return BRMWrapper::getInstance()->rollBack(txnid, sessionId);
6297 }
6298
rollbackBlocks(const TxnID & txnid,int sessionId)6299 int WriteEngineWrapper::rollbackBlocks(const TxnID& txnid, int sessionId)
6300 {
6301 if ( rollbackCommon( txnid, sessionId ) != 0 )
6302 return -1;
6303
6304 return BRMWrapper::getInstance()->rollBackBlocks(txnid, sessionId);
6305 }
6306
rollbackVersion(const TxnID & txnid,int sessionId)6307 int WriteEngineWrapper::rollbackVersion(const TxnID& txnid, int sessionId)
6308 {
6309 // BUG 4312
6310 RemoveTxnFromLBIDMap(txnid);
6311 RemoveTxnFromDictMap(txnid);
6312
6313 return BRMWrapper::getInstance()->rollBackVersion(txnid, sessionId);
6314 }
6315
updateNextValue(const TxnID txnId,const OID & columnoid,const uint64_t nextVal,const uint32_t sessionID,const uint16_t dbRoot)6316 int WriteEngineWrapper::updateNextValue(const TxnID txnId, const OID& columnoid, const uint64_t nextVal, const uint32_t sessionID, const uint16_t dbRoot)
6317 {
6318 int rc = NO_ERROR;
6319 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
6320 RIDList ridList;
6321 ColValueList colValueList;
6322 WriteEngine::ColTupleList colTuples;
6323 ColStructList colStructList;
6324 WriteEngine::ColStruct colStruct;
6325 colStruct.dataOid = OID_SYSCOLUMN_NEXTVALUE;
6326 colStruct.colWidth = 8;
6327 colStruct.tokenFlag = false;
6328 colStruct.colDataType = CalpontSystemCatalog::UBIGINT;
6329 colStruct.fColDbRoot = dbRoot;
6330
6331 if (idbdatafile::IDBPolicy::useHdfs())
6332 colStruct.fCompressionType = 2;
6333
6334 colStructList.push_back(colStruct);
6335 ColTuple colTuple;
6336 systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
6337 systemCatalogPtr->identity(CalpontSystemCatalog::EC);
6338 CalpontSystemCatalog::ROPair ropair;
6339
6340 try
6341 {
6342 ropair = systemCatalogPtr->nextAutoIncrRid(columnoid);
6343 }
6344 catch (...)
6345 {
6346 rc = ERR_AUTOINC_RID;
6347 }
6348
6349 if (rc != NO_ERROR)
6350 return rc;
6351
6352 ridList.push_back(ropair.rid);
6353 colTuple.data = nextVal;
6354 colTuples.push_back(colTuple);
6355 colValueList.push_back(colTuples);
6356 //TxnID txnid;
6357 rc = writeColumnRecords(txnId, colStructList, colValueList, ridList, SYSCOLUMN_BASE, false);
6358
6359 if (rc != NO_ERROR)
6360 return rc;
6361
6362 //flush PrimProc cache
6363 vector<LBID_t> blockList;
6364 BRM::LBIDRange_v lbidRanges;
6365 rc = BRMWrapper::getInstance()->lookupLbidRanges(OID_SYSCOLUMN_NEXTVALUE,
6366 lbidRanges);
6367
6368 if (rc != NO_ERROR)
6369 return rc;
6370
6371 LBIDRange_v::iterator it;
6372
6373 for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
6374 {
6375 for (LBID_t lbid = it->start; lbid < (it->start + it->size); lbid++)
6376 {
6377 blockList.push_back(lbid);
6378 }
6379 }
6380
6381 //Bug 5459 Flush FD cache
6382 std::vector<BRM::FileInfo> files;
6383 BRM::FileInfo aFile;
6384 aFile.oid = colStruct.dataOid;
6385 aFile.partitionNum = colStruct.fColPartition;
6386 aFile.dbRoot = colStruct.fColDbRoot;;
6387 aFile.segmentNum = colStruct.fColSegment;
6388 aFile.compType = colStruct.fCompressionType;
6389 files.push_back(aFile);
6390
6391 if (idbdatafile::IDBPolicy::useHdfs())
6392 cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
6393
6394 rc = cacheutils::flushPrimProcAllverBlocks (blockList);
6395
6396 if (rc != 0)
6397 rc = ERR_BLKCACHE_FLUSH_LIST; // translate to WE error
6398
6399 return rc;
6400 }
6401
6402 /***********************************************************
6403 * DESCRIPTION:
6404 * Flush compressed files in chunk manager
6405 * PARAMETERS:
6406 * none
6407 * RETURN:
6408 * none
6409 ***********************************************************/
flushDataFiles(int rc,const TxnID txnId,std::map<FID,FID> & columnOids)6410 int WriteEngineWrapper::flushDataFiles(int rc, const TxnID txnId, std::map<FID, FID>& columnOids)
6411 {
6412 RemoveTxnFromLBIDMap(txnId);
6413 RemoveTxnFromDictMap(txnId);
6414
6415 for (int i = 0; i < TOTAL_COMPRESS_OP; i++)
6416 {
6417 int rc1 = m_colOp[i]->flushFile(rc, columnOids);
6418 int rc2 = m_dctnry[i]->flushFile(rc, columnOids);
6419
6420 if (rc == NO_ERROR)
6421 {
6422 rc = (rc1 != NO_ERROR) ? rc1 : rc2;
6423 }
6424 }
6425
6426 return rc;
6427 }
6428
AddDictToList(const TxnID txnid,std::vector<BRM::LBID_t> & lbids)6429 void WriteEngineWrapper::AddDictToList(const TxnID txnid,
6430 std::vector<BRM::LBID_t>& lbids)
6431 {
6432 std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
6433
6434 mapIter = m_dictLBIDMap.find(txnid);
6435
6436 if (mapIter == m_dictLBIDMap.end())
6437 {
6438 dictLBIDRec_t tempRecord;
6439 tempRecord.insert(lbids.begin(), lbids.end());
6440 m_dictLBIDMap[txnid] = tempRecord;
6441 return;
6442 }
6443 else
6444 {
6445 dictLBIDRec_t& txnRecord = mapIter->second;
6446 txnRecord.insert(lbids.begin(), lbids.end());
6447 }
6448
6449 }
6450
6451 /***********************************************************
6452 * DESCRIPTION:
6453 * Add an lbid to a list of lbids for sending to markExtentsInvalid.
6454 * However, rather than storing each lbid, store only unique first
6455 * lbids. This is an optimization to prevent invalidating the same
6456 * extents over and over.
6457 * PARAMETERS:
6458 * txnid - the lbid list is per txn. We use this to keep transactions
6459 * seperated.
6460 * lbids - the current list of lbids. We add to this list
6461 * if the discovered lbid is in a new extent.
6462 * These next are needed for dbrm to get the lbid
6463 * oid -the table oid.
6464 * colPartition - the table column partition
6465 * segment - table segment
6466 * fbo - file block offset
6467 * RETURN: 0 => OK. -1 => error
6468 ***********************************************************/
AddLBIDtoList(const TxnID txnid,std::vector<BRM::LBID_t> & lbids,std::vector<CalpontSystemCatalog::ColDataType> & colDataTypes,const ColStruct & colStruct,const int fbo)6469 int WriteEngineWrapper::AddLBIDtoList(const TxnID txnid,
6470 std::vector<BRM::LBID_t>& lbids,
6471 std::vector<CalpontSystemCatalog::ColDataType>& colDataTypes,
6472 const ColStruct& colStruct,
6473 const int fbo)
6474 {
6475 int rtn = 0;
6476
6477 BRM::LBID_t startingLBID;
6478 SP_TxnLBIDRec_t spTxnLBIDRec;
6479 std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
6480
6481 // Find the set of extent starting LBIDs for this transaction. If not found, then create it.
6482 mapIter = m_txnLBIDMap.find(txnid);
6483
6484 if (mapIter == m_txnLBIDMap.end())
6485 {
6486 // This is a new transaction.
6487 SP_TxnLBIDRec_t sptemp(new TxnLBIDRec);
6488 spTxnLBIDRec = sptemp;
6489 m_txnLBIDMap[txnid] = spTxnLBIDRec;
6490 // cout << "New transaction entry " << txnid << " transaction count " << m_txnLBIDMap.size() << endl;
6491 }
6492 else
6493 {
6494 spTxnLBIDRec = (*mapIter).second;
6495 }
6496
6497 // Get the extent starting lbid given all these values (startingLBID is an out parameter).
6498 rtn = BRMWrapper::getInstance()->getStartLbid(colStruct.dataOid, colStruct.fColPartition,
6499 colStruct.fColSegment, fbo, startingLBID);
6500
6501 if (rtn != 0)
6502 return -1;
6503
6504 if (spTxnLBIDRec->m_LBIDMap.find(startingLBID) == spTxnLBIDRec->m_LBIDMap.end())
6505 {
6506 // Not found in the map. This must be a new extent. Add it to the list.
6507 // cout << "Adding lbid " << startingLBID << " to txn " << txnid << endl;
6508 spTxnLBIDRec->AddLBID(startingLBID);
6509 lbids.push_back((BRM::LBID_t)startingLBID);
6510 colDataTypes.push_back(colStruct.colDataType);
6511 }
6512 else
6513 {
6514 ++spTxnLBIDRec->m_squashedLbids;
6515 }
6516
6517 // If the starting LBID list has grown to more than 2000, truncate.
6518 // This is the purpose of the seqnum. If spTxnLBIDRec->m_lastSeqnum
6519 // is divisible by 1000 and size() > 1000, get rid of everything older
6520 // than the last 1000 entries. This is to save memory in large
6521 // transactions. We assume older extents are unlikely to be hit again.
6522 if (spTxnLBIDRec->m_lastSeqnum % 1000 == 0
6523 && spTxnLBIDRec->m_LBIDMap.size() > 1000)
6524 {
6525 // cout << "Trimming the LBID list for " << txnid << ". LBID count is " << spTxnLBIDRec->m_LBIDMap.size() << endl;
6526 uint32_t firstDrop = spTxnLBIDRec->m_lastSeqnum - 1000;
6527 std::tr1::unordered_map<BRM::LBID_t, uint32_t>::iterator iter;
6528
6529 for (iter = spTxnLBIDRec->m_LBIDMap.begin(); iter != spTxnLBIDRec->m_LBIDMap.end();)
6530 {
6531 if ((*iter).second < firstDrop)
6532 {
6533 iter = spTxnLBIDRec->m_LBIDMap.erase(iter);
6534 }
6535 else
6536 {
6537 ++iter;
6538 }
6539 }
6540
6541 // cout << "LBID count is now" << spTxnLBIDRec->m_LBIDMap.size() << endl;
6542 }
6543
6544 return rtn;
6545 }
6546
RemoveTxnFromDictMap(const TxnID txnid)6547 void WriteEngineWrapper::RemoveTxnFromDictMap(const TxnID txnid)
6548 {
6549 std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
6550
6551 mapIter = m_dictLBIDMap.find(txnid);
6552
6553 if (mapIter != m_dictLBIDMap.end())
6554 {
6555 m_dictLBIDMap.erase(txnid);
6556 }
6557 }
6558
6559 /***********************************************************
6560 * DESCRIPTION:
6561 * Remove a transaction LBID list from the LBID map
6562 * Called when a transaction ends, either commit or rollback
6563 * PARAMETERS:
6564 * txnid - the transaction to remove.
6565 * RETURN:
6566 * 0 => success or not found, -1 => error
6567 ***********************************************************/
RemoveTxnFromLBIDMap(const TxnID txnid)6568 int WriteEngineWrapper::RemoveTxnFromLBIDMap(const TxnID txnid)
6569 {
6570 int rtn = 0;
6571 std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
6572
6573 // Find the set of extent starting LBIDs for this transaction. If not found, then create it.
6574 try
6575 {
6576 mapIter = m_txnLBIDMap.find(txnid);
6577
6578 if (mapIter != m_txnLBIDMap.end())
6579 {
6580 SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
6581 // Debug
6582 // cout << "Remove transaction entry " << txnid << " transaction count " << m_txnLBIDMap.size() << endl;
6583 // cout << " count = " << spTxnLBIDRec->m_LBIDMap.size() <<
6584 // ", lastSeqnum = " << spTxnLBIDRec->m_lastSeqnum <<
6585 // ", squashed lbids = " << spTxnLBIDRec->m_squashedLbids << endl;
6586 m_txnLBIDMap.erase(txnid); // spTxnLBIDRec is auto-destroyed
6587 }
6588 }
6589 catch (...)
6590 {
6591 rtn = -1;
6592 }
6593
6594 return rtn;
6595 }
6596
6597
6598 } //end of namespace
6599 // vim:ts=4 sw=4:
6600
6601