1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*******************************************************************************
19 * $Id: we_tableinfo.cpp 4648 2013-05-29 21:42:40Z rdempsey $
20 *
21 *******************************************************************************/
22 /** @file */
23
24 #include "we_tableinfo.h"
25 #include "we_bulkstatus.h"
26 #include "we_bulkload.h"
27
28 #include <sstream>
29 #include <sys/time.h>
30 #include <ctime>
31 #include <unistd.h>
32 #include <sys/types.h>
33 #include <cstdio>
34 #include <cerrno>
35 #include <cstring>
36 #include <utility>
37 // @bug 2099+
38 #include <iostream>
39 #include <libmarias3/marias3.h>
40 #ifdef _MSC_VER
41 #include <stdlib.h>
42 #else
43 #include <string.h>
44 #endif
45 using namespace std;
46
47 // @bug 2099-
48 #include <boost/filesystem/path.hpp>
49 using namespace boost;
50
51 #include "we_config.h"
52 #include "we_simplesyslog.h"
53 #include "we_bulkrollbackmgr.h"
54 #include "we_confirmhdfsdbfile.h"
55
56 #include "querytele.h"
57 using namespace querytele;
58
59 #include "oamcache.h"
60 #include "cacheutils.h"
61
62 namespace
63 {
64 const std::string BAD_FILE_SUFFIX = ".bad"; // Reject data file suffix
65 const std::string ERR_FILE_SUFFIX = ".err"; // Job error file suffix
66 const std::string BOLD_START = "\033[0;1m";
67 const std::string BOLD_STOP = "\033[0;39m";
68 }
69
70 namespace WriteEngine
71 {
72 //------------------------------------------------------------------------------
73 // Puts the current thread to sleep for the specified number of milliseconds.
74 // (Ex: used to wait for a Read buffer to become available.)
75 //------------------------------------------------------------------------------
sleepMS(long ms)76 void TableInfo::sleepMS(long ms)
77 {
78 struct timespec rm_ts;
79
80 rm_ts.tv_sec = ms / 1000;
81 rm_ts.tv_nsec = ms % 1000 * 1000000;
82 #ifdef _MSC_VER
83 Sleep(ms);
84 #else
85 struct timespec abs_ts;
86
87 do
88 {
89 abs_ts.tv_sec = rm_ts.tv_sec;
90 abs_ts.tv_nsec = rm_ts.tv_nsec;
91 }
92 while (nanosleep(&abs_ts, &rm_ts) < 0);
93
94 #endif
95 }
96
97 //------------------------------------------------------------------------------
98 // TableInfo constructor
99 //------------------------------------------------------------------------------
TableInfo(Log * logger,const BRM::TxnID txnID,const string & processName,OID tableOID,const string & tableName,bool bKeepRbMetaFile)100 TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID,
101 const string& processName,
102 OID tableOID,
103 const string& tableName,
104 bool bKeepRbMetaFile) :
105 fTableId(-1), fBufferSize(0), fFileBufSize(0),
106 fStatusTI(WriteEngine::NEW),
107 fReadBufCount(0), fNumberOfColumns(0),
108 fHandle(NULL), fCurrentReadBuffer(0), fTotalReadRows(0),
109 fTotalErrRows(0), fMaxErrorRows(5),
110 fLastBufferId(-1), fFileBuffer(NULL), fCurrentParseBuffer(0),
111 fNumberOfColsParsed(0), fLocker(-1), fTableName(tableName),
112 fTableOID(tableOID), fJobId(0), fLog(logger), fTxnID(txnID),
113 fRBMetaWriter(processName, logger),
114 fProcessName(processName),
115 fKeepRbMetaFile(bKeepRbMetaFile),
116 fbTruncationAsError(false),
117 fImportDataMode(IMPORT_DATA_TEXT),
118 fTimeZone("SYSTEM"),
119 fTableLocked(false),
120 fReadFromStdin(false),
121 fReadFromS3(false),
122 fNullStringMode(false),
123 fEnclosedByChar('\0'),
124 fEscapeChar('\\'),
125 fProcessingBegun(false),
126 fBulkMode(BULK_MODE_LOCAL),
127 fBRMReporter(logger, tableName),
128 fTableLockID(0),
129 fRejectDataCnt(0),
130 fRejectErrCnt(0),
131 fExtentStrAlloc(tableOID, logger),
132 fOamCachePtr(oam::OamCache::makeOamCache())
133 {
134 fBuffers.clear();
135 fColumns.clear();
136 fStartTime.tv_sec = 0;
137 fStartTime.tv_usec = 0;
138 string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));
139
140 if (!teleServerHost.empty())
141 {
142 int teleServerPort = config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port"));
143
144 if (teleServerPort > 0)
145 {
146 fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort));
147 }
148 }
149 }
150
151 //------------------------------------------------------------------------------
152 // TableInfo destructor
153 //------------------------------------------------------------------------------
~TableInfo()154 TableInfo::~TableInfo()
155 {
156 fBRMReporter.sendErrMsgToFile(fBRMRptFileName);
157 freeProcessingBuffers();
158 }
159
160 //------------------------------------------------------------------------------
161 // Frees up processing buffer memory. We don't reset fReadBufCount to 0,
162 // because BulkLoad::lockColumnForParse() is calling getNumberOfBuffers()
163 // and dividing by the return value. So best not to risk returning 0.
164 // Once we get far enough to call this freeProcessingBuffers() function,
165 // the application code obviously better be completely through accessing
166 // fBuffers and fColumns.
167 //------------------------------------------------------------------------------
freeProcessingBuffers()168 void TableInfo::freeProcessingBuffers()
169 {
170 // fLog->logMsg(
171 // string("Releasing TableInfo Buffer for ")+fTableName,
172 // MSGLVL_INFO1);
173 fBuffers.clear();
174 fColumns.clear();
175 fNumberOfColumns = 0;
176 }
177
178 //------------------------------------------------------------------------------
179 // Close any database column or dictionary store files left open for this table.
180 // Under "normal" circumstances, there should be no files left open when we
181 // reach the end of the job, but in some error cases, the parsing threads may
182 // bail out without closing a file. So this function is called as part of
183 // EOJ cleanup for any tables that are still holding a table lock.
184 //
185 // Files will automatically get closed when the program terminates, but when
186 // we are preparing for a bulk rollback, we want to explicitly close the files
187 // before we "reopen" them and start rolling back the contents of the files.
188 //
189 // For mode1 and mode2 imports, cpimport.bin does not lock the table or perform
190 // a bulk rollback, and closeOpenDbFile() is not called. We instead rely on
191 // the program to implicitly close the files.
192 //------------------------------------------------------------------------------
closeOpenDbFiles()193 void TableInfo::closeOpenDbFiles()
194 {
195 ostringstream oss;
196 oss << "Closing DB files for table " << fTableName <<
197 ", left open by abnormal termination.";
198 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
199
200 for (unsigned int k = 0; k < fColumns.size(); k++)
201 {
202 stringstream oss1;
203 oss1 << "Closing DB column file for: " <<
204 fColumns[k].column.colName <<
205 " (OID-" << fColumns[k].column.mapOid << ")";
206 fLog->logMsg( oss1.str(), MSGLVL_INFO2 );
207 fColumns[k].closeColumnFile(false, true);
208
209 if (fColumns[k].column.colType == COL_TYPE_DICT)
210 {
211 stringstream oss2;
212 oss2 << "Closing DB store file for: " <<
213 fColumns[k].column.colName <<
214 " (OID-" << fColumns[k].column.dctnry.dctnryOid << ")";
215 fLog->logMsg( oss2.str(), MSGLVL_INFO2 );
216 fColumns[k].closeDctnryStore(true);
217 }
218 }
219 }
220
221 //------------------------------------------------------------------------------
222 // Locks this table for reading to the specified thread (locker) "if" the table
223 // has not yet been assigned to a read thread.
224 //------------------------------------------------------------------------------
lockForRead(const int & locker)225 bool TableInfo::lockForRead(const int& locker)
226 {
227 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
228
229 if (fLocker == -1)
230 {
231 if (fStatusTI == WriteEngine::NEW )
232 {
233 fLocker = locker;
234 return true;
235 }
236 }
237
238 return false;
239 }
240
241 //------------------------------------------------------------------------------
242 // Loop thru reading the import file(s) assigned to this TableInfo object.
243 //------------------------------------------------------------------------------
readTableData()244 int TableInfo::readTableData( )
245 {
246 RID validTotalRows = 0;
247 RID totalRowsPerInputFile = 0;
248 int filesTBProcessed = fLoadFileList.size();
249 int fileCounter = 0;
250 unsigned long long qtSentAt = 0;
251
252 if (fHandle == NULL)
253 {
254
255 fFileName = fLoadFileList[fileCounter];
256 int rc = openTableFile();
257
258 if (rc != NO_ERROR)
259 {
260 // Mark the table status as error and exit.
261 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
262 fStatusTI = WriteEngine::ERR;
263 return rc;
264 }
265
266 fileCounter++;
267 }
268
269 timeval readStart;
270 gettimeofday(&readStart, NULL);
271 ostringstream ossStartMsg;
272 ossStartMsg << "Start reading and loading table " << fTableName;
273 fLog->logMsg( ossStartMsg.str(), MSGLVL_INFO2 );
274 fProcessingBegun = true;
275
276 ImportTeleStats its;
277 its.job_uuid = fJobUUID;
278 its.import_uuid = QueryTeleClient::genUUID();
279 its.msg_type = ImportTeleStats::IT_START;
280 its.start_time = QueryTeleClient::timeNowms();
281 its.table_list.push_back(fTableName);
282 its.rows_so_far.push_back(0);
283 its.system_name = fOamCachePtr->getSystemName();
284 its.module_name = fOamCachePtr->getModuleName();
285 string tn = getTableName();
286 its.schema_name = string(tn, 0, tn.find('.'));
287 fQtc.postImportTele(its);
288
289 //
290 // LOOP to read all the import data for this table
291 //
292 while (true)
293 {
294 // See if JobStatus has been set to terminate by another thread
295 if (BulkStatus::getJobStatus() == EXIT_FAILURE)
296 {
297 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
298 fStartTime = readStart;
299 fStatusTI = WriteEngine::ERR;
300 its.msg_type = ImportTeleStats::IT_TERM;
301 its.rows_so_far.pop_back();
302 its.rows_so_far.push_back(0);
303 fQtc.postImportTele(its);
304 throw SecondaryShutdownException( "TableInfo::"
305 "readTableData(1) responding to job termination");
306 }
307
308 // @bug 3271: Conditionally compile the thread deadlock debug logging
309 #ifdef DEADLOCK_DEBUG
310 // @bug2099+. Temp hack to diagnose deadlock.
311 struct timeval tvStart;
312 gettimeofday(&tvStart, 0);
313 bool report = false;
314 bool reported = false;
315 // @bug2099-
316 #else
317 const bool report = false;
318 #endif
319
320 #ifdef PROFILE
321 Stats::startReadEvent(WE_STATS_WAIT_FOR_READ_BUF);
322 #endif
323
324 //
325 // LOOP to wait for, and read, the next avail BulkLoadBuffer object
326 //
327 while (!isBufferAvailable(report))
328 {
329 // See if JobStatus has been set to terminate by another thread
330 if (BulkStatus::getJobStatus() == EXIT_FAILURE)
331 {
332 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
333 fStartTime = readStart;
334 fStatusTI = WriteEngine::ERR;
335 its.msg_type = ImportTeleStats::IT_TERM;
336 its.rows_so_far.pop_back();
337 its.rows_so_far.push_back(0);
338 fQtc.postImportTele(its);
339 throw SecondaryShutdownException( "TableInfo::"
340 "readTableData(2) responding to job termination");
341 }
342
343 // Sleep and check the condition again.
344 sleepMS(1);
345 #ifdef DEADLOCK_DEBUG
346
347 // @bug2099+
348 if (report) report = false; // report one time.
349
350 if (!reported)
351 {
352 struct timeval tvNow;
353 gettimeofday(&tvNow, 0);
354
355 if ((tvNow.tv_sec - tvStart.tv_sec) > 100)
356 {
357 time_t t = time(0);
358 char timeString[50];
359 ctime_r(&t, timeString);
360 timeString[ strlen(timeString) - 1 ] = '\0';
361 ostringstream oss;
362 oss << endl << timeString << ": " <<
363 "TableInfo::readTableData: " << fTableName <<
364 "; Diff is " << (tvNow.tv_sec - tvStart.tv_sec) <<
365 endl;
366 cout << oss.str();
367 cout.flush();
368 report = true;
369 reported = true;
370 }
371 }
372
373 // @bug2099-
374 #endif
375 }
376
377 #ifdef PROFILE
378 Stats::stopReadEvent(WE_STATS_WAIT_FOR_READ_BUF);
379 Stats::startReadEvent(WE_STATS_READ_INTO_BUF);
380 #endif
381
382 int readBufNo = fCurrentReadBuffer;
383 int prevReadBuf = (fCurrentReadBuffer - 1);
384
385 if (prevReadBuf < 0)
386 prevReadBuf = fReadBufCount + prevReadBuf;
387
388 // We keep a running total of read errors; fMaxErrorRows specifies
389 // the error limit. Here's where we see how many more errors we
390 // still have below the limit, and we pass this to fillFromFile().
391 unsigned allowedErrCntThisCall =
392 ( (fMaxErrorRows > fTotalErrRows) ?
393 (fMaxErrorRows - fTotalErrRows) : 0 );
394
395 // Fill in the specified buffer.
396 // fTotalReadRowsPerInputFile is ongoing total number of rows read,
397 // per input file.
398 // validTotalRows is ongoing total of valid rows read for all files
399 // pertaining to this DB table.
400 int readRc;
401 if (fReadFromS3)
402 {
403 readRc = fBuffers[readBufNo].fillFromMemory(
404 fBuffers[prevReadBuf], fFileBuffer, fS3ReadLength, &fS3ParseLength,
405 totalRowsPerInputFile, validTotalRows, fColumns,
406 allowedErrCntThisCall);
407 }
408 else
409 {
410 readRc = fBuffers[readBufNo].fillFromFile(
411 fBuffers[prevReadBuf], fHandle, totalRowsPerInputFile,
412 validTotalRows, fColumns, allowedErrCntThisCall);
413 }
414
415 if (readRc != NO_ERROR)
416 {
417 // error occurred.
418 // need to exit.
419 // mark the table status as error and exit.
420 {
421 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
422 fStartTime = readStart;
423 fStatusTI = WriteEngine::ERR;
424 fBuffers[readBufNo].setStatusBLB(WriteEngine::ERR);
425 }
426 closeTableFile();
427
428 // Error occurred on next row not read, so increment
429 // totalRowsPerInputFile row count for the error msg
430 WErrorCodes ec;
431 ostringstream oss;
432 oss << "Error reading import file " << fFileName <<
433 "; near line " << totalRowsPerInputFile + 1 << "; " <<
434 ec.errorString(readRc);
435 fLog->logMsg( oss.str(), readRc, MSGLVL_ERROR);
436
437 its.msg_type = ImportTeleStats::IT_TERM;
438 its.rows_so_far.pop_back();
439 its.rows_so_far.push_back(0);
440 fQtc.postImportTele(its);
441
442 return readRc;
443 }
444
445 #ifdef PROFILE
446 Stats::stopReadEvent(WE_STATS_READ_INTO_BUF);
447 #endif
448 its.msg_type = ImportTeleStats::IT_PROGRESS;
449 its.rows_so_far.pop_back();
450 its.rows_so_far.push_back(totalRowsPerInputFile);
451 unsigned long long thisRows = static_cast<unsigned long long>(totalRowsPerInputFile);
452 thisRows /= 1000000;
453
454 if (thisRows > qtSentAt)
455 {
456 fQtc.postImportTele(its);
457 qtSentAt = thisRows;
458 }
459
460 // Check if there were any errors in the read data.
461 // if yes, copy it to the error list.
462 // if the number of errors is greater than the maximum error count
463 // mark the table status as error and exit.
464 // call the method to copy the errors
465 writeErrorList( &fBuffers[readBufNo].getErrorRows(),
466 &fBuffers[readBufNo].getExactErrorRows(), false );
467 fBuffers[readBufNo].clearErrRows();
468
469 if (fTotalErrRows > fMaxErrorRows)
470 {
471 // flush the reject data file and output the rejected rows
472 // flush err file and output the rejected row id and the reason.
473 writeErrorList( 0, 0, true );
474
475 // number of errors > maximum allowed. hence return error.
476 {
477 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
478 fStartTime = readStart;
479 fStatusTI = WriteEngine::ERR;
480 fBuffers[readBufNo].setStatusBLB(WriteEngine::ERR);
481 }
482 closeTableFile();
483 ostringstream oss5;
484 oss5 << "Actual error row count(" << fTotalErrRows <<
485 ") exceeds the max error rows(" << fMaxErrorRows <<
486 ") allowed for table " << fTableName;
487 fLog->logMsg(oss5.str(), ERR_BULK_MAX_ERR_NUM, MSGLVL_ERROR);
488
489 // List Err and Bad files to report file (if applicable)
490 fBRMReporter.rptMaxErrJob( fBRMRptFileName, fErrFiles, fBadFiles );
491
492 its.msg_type = ImportTeleStats::IT_TERM;
493 its.rows_so_far.pop_back();
494 its.rows_so_far.push_back(0);
495 fQtc.postImportTele(its);
496
497 return ERR_BULK_MAX_ERR_NUM;
498 }
499
500 // mark the buffer status as read complete.
501 {
502 #ifdef PROFILE
503 Stats::startReadEvent(WE_STATS_WAIT_TO_COMPLETE_READ);
504 #endif
505 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
506 #ifdef PROFILE
507 Stats::stopReadEvent(WE_STATS_WAIT_TO_COMPLETE_READ);
508 Stats::startReadEvent(WE_STATS_COMPLETING_READ);
509 #endif
510
511 fStartTime = readStart;
512 fBuffers[readBufNo].setStatusBLB(WriteEngine::READ_COMPLETE);
513
514 fCurrentReadBuffer = (fCurrentReadBuffer + 1) % fReadBufCount;
515
516 // bufferCount++;
517 if ( (fHandle && feof(fHandle)) || (fReadFromS3 && (fS3ReadLength == fS3ParseLength)) )
518 {
519 timeval readFinished;
520 gettimeofday(&readFinished, NULL);
521
522 closeTableFile();
523
524 if (fReadFromStdin)
525 {
526 fLog->logMsg( "Finished loading " + fTableName + " from STDIN" +
527 ", Time taken = " + Convertor::int2Str((int)
528 (readFinished.tv_sec - readStart.tv_sec)) +
529 " seconds",
530 //" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
531 MSGLVL_INFO2 );
532 }
533 else if(fReadFromS3)
534 {
535 fLog->logMsg( "Finished loading " + fTableName + " from S3" +
536 ", Time taken = " + Convertor::int2Str((int)
537 (readFinished.tv_sec - readStart.tv_sec)) +
538 " seconds",
539 //" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
540 MSGLVL_INFO2 );
541 }
542 else
543 {
544 fLog->logMsg( "Finished reading file " + fFileName +
545 ", Time taken = " + Convertor::int2Str((int)
546 (readFinished.tv_sec - readStart.tv_sec)) +
547 " seconds",
548 //" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
549 MSGLVL_INFO2 );
550 }
551
552 // flush the reject data file and output the rejected rows
553 // flush err file and output the rejected row id and the reason.
554 writeErrorList( 0, 0, true );
555
556 // If > 1 file for this table, then open next file in the list
557 if ( fileCounter < filesTBProcessed )
558 {
559 fFileName = fLoadFileList[fileCounter];
560 int rc = openTableFile();
561
562 if (rc != NO_ERROR)
563 {
564 // Mark the table status as error and exit.
565 fStatusTI = WriteEngine::ERR;
566 return rc;
567 }
568
569 fileCounter++;
570 fTotalReadRows += totalRowsPerInputFile;
571 totalRowsPerInputFile = 0;
572 }
573 else // All files read for this table; break out of read loop
574 {
575 fStatusTI = WriteEngine::READ_COMPLETE;
576 fLastBufferId = readBufNo;
577 fTotalReadRows += totalRowsPerInputFile;
578 break;
579 }
580
581 gettimeofday(&readStart, NULL);
582 } // reached EOF
583
584 #ifdef PROFILE
585 Stats::stopReadEvent(WE_STATS_COMPLETING_READ);
586 #endif
587 } // mark buffer status as read-complete within scope of a mutex
588 } // loop to read all data for this table
589
590 its.msg_type = ImportTeleStats::IT_SUMMARY;
591 its.end_time = QueryTeleClient::timeNowms();
592 its.rows_so_far.pop_back();
593 its.rows_so_far.push_back(fTotalReadRows);
594 fQtc.postImportTele(its);
595 fQtc.waitForQueues();
596
597 return NO_ERROR;
598 }
599
600 //------------------------------------------------------------------------------
601 // writeErrorList()
602 // errorRows - vector of row numbers and corresponding error messages
603 // errorDatRows - vector of bad rows that have been rejected
604 //
605 // Adds errors pertaining to a specific buffer, to the cumulative list of
606 // errors to be reported to the user.
607 //------------------------------------------------------------------------------
writeErrorList(const std::vector<std::pair<RID,std::string>> * errorRows,const std::vector<std::string> * errorDatRows,bool bCloseFile)608 void TableInfo::writeErrorList(const std::vector< std::pair<RID,
609 std::string> >* errorRows,
610 const std::vector<std::string>* errorDatRows,
611 bool bCloseFile)
612 {
613 size_t errorRowsCount = 0;
614 size_t errorDatRowsCount = 0;
615
616 if (errorRows)
617 errorRowsCount = errorRows->size();
618
619 if (errorDatRows)
620 errorDatRowsCount = errorDatRows->size();
621
622 if ((errorRowsCount > 0) ||
623 (errorDatRowsCount > 0) ||
624 (bCloseFile))
625 {
626 boost::mutex::scoped_lock lock(fErrorRptInfoMutex);
627
628 if ((errorRowsCount > 0) || (bCloseFile))
629 writeErrReason(errorRows, bCloseFile);
630
631 if ((errorDatRowsCount > 0) || (bCloseFile))
632 writeBadRows (errorDatRows, bCloseFile);
633
634 fTotalErrRows += errorRowsCount;
635 }
636 }
637
638 //------------------------------------------------------------------------------
639 // Parse the specified column (columnId) in the specified buffer (bufferId).
640 //------------------------------------------------------------------------------
parseColumn(const int & columnId,const int & bufferId,double & processingTime)641 int TableInfo::parseColumn(const int& columnId, const int& bufferId,
642 double& processingTime)
643 {
644 // parse the column
645 // note the time and update the column's last processing time
646 timeval parseStart, parseEnd;
647 gettimeofday(&parseStart, NULL);
648
649 // Will need to check whether the column needs to extend.
650 // If size of the file is less than the required size, extend the column
651 int rc = fBuffers[bufferId].parse(fColumns[columnId]);
652 gettimeofday(&parseEnd, NULL);
653
654 processingTime = (parseEnd.tv_usec / 1000 + parseEnd.tv_sec * 1000) -
655 (parseStart.tv_usec / 1000 + parseStart.tv_sec * 1000);
656
657 return rc;
658 }
659
660 //------------------------------------------------------------------------------
661 // Mark the specified column (columnId) in the specified buffer (bufferId) as
662 // PARSE_COMPLETE. If this is the last column to be parsed for this buffer,
663 // then mark the buffer as PARSE_COMPLETE.
664 // If the last buffer for this table has been read (fLastBufferId != -1), then
665 // see if all the data for columnId has been parsed for all the buffers, in
666 // which case we are finished parsing columnId.
667 // If this is the last column to finish parsing for this table, then mark the
668 // table status as PARSE_COMPLETE.
669 //------------------------------------------------------------------------------
setParseComplete(const int & columnId,const int & bufferId,double processingTime)670 int TableInfo::setParseComplete(const int& columnId,
671 const int& bufferId,
672 double processingTime)
673 {
674 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
675
676 // Check table status in case race condition results in this function
677 // being called after fStatusTI was set to ERR by another thread.
678 if (fStatusTI == WriteEngine::ERR)
679 return ERR_UNKNOWN;
680
681 fColumns[columnId].lastProcessingTime = processingTime;
682 #ifdef PROFILE
683 fColumns[columnId].totalProcessingTime += processingTime;
684 #endif
685
686 // Set buffer status to complete if setColumnStatus indicates that
687 // all the columns are complete
688 if (fBuffers[bufferId].setColumnStatus(
689 columnId, WriteEngine::PARSE_COMPLETE))
690 fBuffers[bufferId].setStatusBLB( WriteEngine::PARSE_COMPLETE );
691
692 // fLastBufferId != -1 means the Read thread has read the last
693 // buffer for this table
694 if (fLastBufferId != -1)
695 {
696 // check if the status of the column in all the fBuffers is parse
697 // complete then update the column status as parse complete.
698 bool allBuffersDoneForAColumn = true;
699
700 for (int i = 0; i < fReadBufCount; ++i)
701 {
702 // check the status of the column in this buffer.
703 Status bufferStatus = fBuffers[i].getStatusBLB();
704
705 if ( (bufferStatus == WriteEngine::READ_COMPLETE) ||
706 (bufferStatus == WriteEngine::PARSE_COMPLETE) )
707 {
708 if (fBuffers[i].getColumnStatus(columnId) !=
709 WriteEngine::PARSE_COMPLETE)
710 {
711 allBuffersDoneForAColumn = false;
712 break;
713 }
714 }
715 }
716
717 // allBuffersDoneForAColumn==TRUE means we are finished parsing columnId
718 if (allBuffersDoneForAColumn)
719 {
720 // Accumulate list of HWM dictionary blocks to be flushed from cache
721 std::vector<BRM::LBID_t> dictBlksToFlush;
722 fColumns[columnId].getDictFlushBlks( dictBlksToFlush );
723
724 for (unsigned kk = 0; kk < dictBlksToFlush.size(); kk++)
725 {
726 fDictFlushBlks.push_back( dictBlksToFlush[kk] );
727 }
728
729 int rc = fColumns[columnId].finishParsing( );
730
731 if (rc != NO_ERROR)
732 {
733 WErrorCodes ec;
734 ostringstream oss;
735 oss << "setParseComplete completion error; "
736 "Failed to load table: " <<
737 fTableName << "; " << ec.errorString(rc);
738 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR);
739 fStatusTI = WriteEngine::ERR;
740 return rc;
741 }
742
743 fNumberOfColsParsed++;
744
745 //
746 // If all columns have been parsed, then finished with this tbl
747 //
748 if (fNumberOfColsParsed >= fNumberOfColumns)
749 {
750 // After closing the column and dictionary store files,
751 // flush any updated dictionary blocks in PrimProc.
752 // We only do this for non-HDFS. For HDFS we don't want
753 // to flush till "after" we have "confirmed" all the file
754 // changes, which flushes the changes to disk.
755 if (!idbdatafile::IDBPolicy::useHdfs())
756 {
757 if (fDictFlushBlks.size() > 0)
758 {
759 #ifdef PROFILE
760 Stats::startParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
761 #endif
762 if (fLog->isDebug(DEBUG_2))
763 {
764 ostringstream oss;
765 oss << "Dictionary cache flush: ";
766 for (uint32_t i = 0; i < fDictFlushBlks.size(); i++)
767 {
768 oss << fDictFlushBlks[i] << ", ";
769 }
770 oss << endl;
771 fLog->logMsg( oss.str(), MSGLVL_INFO1 );
772 }
773 cacheutils::flushPrimProcAllverBlocks(fDictFlushBlks);
774 #ifdef PROFILE
775 Stats::stopParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
776 #endif
777 fDictFlushBlks.clear();
778 }
779 }
780
781 // Update auto-increment next value if applicable.
782 rc = synchronizeAutoInc( );
783
784 if (rc != NO_ERROR)
785 {
786 WErrorCodes ec;
787 ostringstream oss;
788 oss << "setParseComplete: autoInc update error; "
789 "Failed to load table: " << fTableName <<
790 "; " << ec.errorString(rc);
791 fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
792 fStatusTI = WriteEngine::ERR;
793 return rc;
794 }
795
796 //..Validate that all the HWM's are consistent and in-sync
797 std::vector<DBRootExtentInfo> segFileInfo;
798
799 for (unsigned i = 0; i < fColumns.size(); ++i)
800 {
801 DBRootExtentInfo extentInfo;
802 fColumns[i].getSegFileInfo( extentInfo );
803 segFileInfo.push_back( extentInfo );
804 }
805
806 rc = validateColumnHWMs( 0, segFileInfo, "Ending" );
807
808 if (rc != NO_ERROR)
809 {
810 WErrorCodes ec;
811 ostringstream oss;
812 oss << "setParseComplete: HWM validation error; "
813 "Failed to load table: " << fTableName <<
814 "; " << ec.errorString(rc);
815 fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
816 fStatusTI = WriteEngine::ERR;
817
818 ostringstream oss2;
819 oss2 << "Ending HWMs for table " << fTableName << ": ";
820
821 for (unsigned int n = 0; n < fColumns.size(); n++)
822 {
823 oss2 << std::endl;
824 oss2 << " " << fColumns[n].column.colName <<
825 "; DBRoot/part/seg/hwm: " <<
826 segFileInfo[n].fDbRoot <<
827 "/" << segFileInfo[n].fPartition <<
828 "/" << segFileInfo[n].fSegment <<
829 "/" << segFileInfo[n].fLocalHwm;
830 }
831
832 fLog->logMsg(oss2.str(), MSGLVL_INFO1);
833
834 return rc;
835 }
836
837 //..Confirm changes to DB files (necessary for HDFS)
838 rc = confirmDBFileChanges( );
839
840 if (rc != NO_ERROR)
841 {
842 WErrorCodes ec;
843 ostringstream oss;
844 oss << "setParseComplete: Error confirming DB changes; "
845 "Failed to load table: " << fTableName <<
846 "; " << ec.errorString(rc);
847 fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
848 fStatusTI = WriteEngine::ERR;
849 return rc;
850 }
851
852 //..Update BRM with HWM and Casual Partition info, etc.
853 rc = finishBRM( );
854
855 if (rc != NO_ERROR)
856 {
857 WErrorCodes ec;
858 ostringstream oss;
859 oss << "setParseComplete: BRM error; "
860 "Failed to load table: " << fTableName <<
861 "; " << ec.errorString(rc);
862 fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
863 fStatusTI = WriteEngine::ERR;
864 return rc;
865 }
866
867 // Change table lock state to CLEANUP
868 rc = changeTableLockState( );
869
870 if (rc != NO_ERROR)
871 {
872 WErrorCodes ec;
873 ostringstream oss;
874 oss << "setParseComplete: table lock state change error; "
875 "Table load completed: " << fTableName << "; " <<
876 ec.errorString(rc);
877 fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
878 fStatusTI = WriteEngine::ERR;
879 return rc;
880 }
881
882 // Finished with this table, so delete bulk rollback
883 // meta data file and release the table lock.
884 deleteTempDBFileChanges();
885 deleteMetaDataRollbackFile();
886
887 rc = releaseTableLock( );
888
889 if (rc != NO_ERROR)
890 {
891 WErrorCodes ec;
892 ostringstream oss;
893 oss << "setParseComplete: table lock release error; "
894 "Failed to load table: " << fTableName << "; " <<
895 ec.errorString(rc);
896 fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
897 fStatusTI = WriteEngine::ERR;
898 return rc;
899 }
900
901 #ifdef PROFILE
902
903 // Loop through columns again to print out the elapsed
904 // parse times
905 for (unsigned i = 0; i < fColumns.size(); ++i)
906 {
907 ostringstream ossColTime;
908 ossColTime << "Column " << i << "; OID-" <<
909 fColumns[i].column.mapOid << "; parseTime-" <<
910 (fColumns[i].totalProcessingTime / 1000.0) <<
911 " seconds";
912 fLog->logMsg(ossColTime.str(), MSGLVL_INFO1);
913 }
914
915 #endif
916
917 timeval endTime;
918 gettimeofday(&endTime, 0);
919 double elapsedTime =
920 (endTime.tv_sec + (endTime.tv_usec / 1000000.0)) -
921 (fStartTime.tv_sec + (fStartTime.tv_usec / 1000000.0));
922
923 fStatusTI = WriteEngine::PARSE_COMPLETE;
924 reportTotals(elapsedTime);
925
926 // Reduce memory use by allocating and releasing as needed
927 freeProcessingBuffers();
928
929 } // end of if (fNumberOfColsParsed >= fNumberOfColumns)
930 } // end of if (allBuffersDoneForAColumn)
931 } // end of if (fLastBufferId != -1)
932
933 // If we finished parsing the buffer associated with currentParseBuffer,
934 // but have not finshed the entire table, then advance currentParseBuffer.
935 if ((fStatusTI != WriteEngine::PARSE_COMPLETE) &&
936 (fBuffers[bufferId].getStatusBLB() == WriteEngine::PARSE_COMPLETE))
937 {
938 // Find the BulkLoadBuffer object that is next in line to be parsed
939 // and assign fCurrentParseBuffer accordingly. Break out of the
940 // loop if we wrap all the way around and catch up with the current-
941 // Read buffer.
942 if (bufferId == fCurrentParseBuffer)
943 {
944 int currentParseBuffer = fCurrentParseBuffer;
945
946 while (fBuffers[currentParseBuffer].getStatusBLB() ==
947 WriteEngine::PARSE_COMPLETE )
948 {
949 currentParseBuffer = (currentParseBuffer + 1) %
950 fReadBufCount;
951 fCurrentParseBuffer = currentParseBuffer;
952
953 if (fCurrentParseBuffer == fCurrentReadBuffer) break;
954 }
955 }
956 }
957
958 return NO_ERROR;
959 }
960
961 //------------------------------------------------------------------------------
962 // Report summary totals to applicable destination (stdout, cpimport.bin log
963 // file, BRMReport file (for mode1) etc).
964 // elapsedTime is number of seconds taken to import this table.
965 //------------------------------------------------------------------------------
reportTotals(double elapsedTime)966 void TableInfo::reportTotals(double elapsedTime)
967 {
968 ostringstream oss1;
969 oss1 << "For table " << fTableName <<
970 ": " << fTotalReadRows << " rows processed and " <<
971 (fTotalReadRows - fTotalErrRows) << " rows inserted.";
972
973 fLog->logMsg(oss1.str(), MSGLVL_INFO1);
974
975 ostringstream oss2;
976 oss2 << "For table " << fTableName << ": " <<
977 "Elapsed time to load this table: " <<
978 elapsedTime << " secs";
979
980 fLog->logMsg(oss2.str(), MSGLVL_INFO2);
981
982 // @bug 3504: Loop through columns to print saturation counts
983 std::vector<boost::tuple<execplan::CalpontSystemCatalog::ColDataType, uint64_t, uint64_t> > satCounts;
984
985 for (unsigned i = 0; i < fColumns.size(); ++i)
986 {
987 //std::string colName(fTableName);
988 // colName += '.';
989 // colName += fColumns[i].column.colName;
990 long long satCount = fColumns[i].saturatedCnt();
991
992 satCounts.push_back(boost::make_tuple(fColumns[i].column.dataType,
993 fColumns[i].column.mapOid,
994 satCount));
995
996 if (satCount > 0)
997 {
998 // @bug 3375: report invalid dates/times set to null
999 ostringstream ossSatCnt;
1000 ossSatCnt << "Column " << fTableName << '.' <<
1001 fColumns[i].column.colName << "; Number of ";
1002
1003 if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::DATE)
1004 {
1005 ossSatCnt <<
1006 "invalid dates replaced with zero value : ";
1007 }
1008 else if (fColumns[i].column.dataType ==
1009 execplan::CalpontSystemCatalog::DATETIME)
1010 {
1011 //bug5383
1012 ossSatCnt <<
1013 "invalid date/times replaced with zero value : ";
1014 }
1015
1016 else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::TIMESTAMP)
1017 {
1018 ossSatCnt <<
1019 "invalid timestamps replaced with zero value : ";
1020 }
1021 else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::TIME)
1022 {
1023 ossSatCnt <<
1024 "invalid times replaced with zero value : ";
1025 }
1026 else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::CHAR)
1027 ossSatCnt <<
1028 "character strings truncated: ";
1029 else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::VARCHAR)
1030 ossSatCnt << "character strings truncated: ";
1031 else
1032 ossSatCnt <<
1033 "rows inserted with saturated values: ";
1034
1035 ossSatCnt << satCount;
1036 fLog->logMsg(ossSatCnt.str(), MSGLVL_WARNING);
1037 }
1038 }
1039
1040 logging::Message::Args tblFinishedMsgArgs;
1041 tblFinishedMsgArgs.add( fJobId );
1042 tblFinishedMsgArgs.add( fTableName );
1043 tblFinishedMsgArgs.add( (fTotalReadRows - fTotalErrRows) );
1044 SimpleSysLog::instance()->logMsg(
1045 tblFinishedMsgArgs,
1046 logging::LOG_TYPE_INFO,
1047 logging::M0083);
1048
1049 //Bug1375 - cpimport.bin did not add entries to the transaction
1050 // log file: data_mods.log
1051 if ((fTotalReadRows - fTotalErrRows) > 0 )
1052 logToDataMods(fjobFileName, oss1.str());
1053
1054 // Log totals in report file if applicable
1055 fBRMReporter.reportTotals( fTotalReadRows,
1056 (fTotalReadRows - fTotalErrRows),
1057 satCounts );
1058 }
1059
1060 //------------------------------------------------------------------------------
1061 // Report BRM updates to a report file or to BRM directly.
1062 //------------------------------------------------------------------------------
finishBRM()1063 int TableInfo::finishBRM( )
1064 {
1065 // Collect the CP and HWM information for all the columns
1066 for (unsigned i = 0; i < fColumns.size(); ++i)
1067 {
1068 fColumns[i].getBRMUpdateInfo( fBRMReporter );
1069 }
1070
1071 // We use mutex not to synchronize contention among parallel threads,
1072 // because we should be the only thread accessing the fErrFiles and
1073 // fBadFiles at this point. But we do use the mutex as a memory barrier
1074 // to make sure we have the latest copy of the data.
1075 std::vector<std::string>* errFiles = 0;
1076 std::vector<std::string>* badFiles = 0;
1077 {
1078 boost::mutex::scoped_lock lock(fErrorRptInfoMutex);
1079 errFiles = &fErrFiles;
1080 badFiles = &fBadFiles;
1081 }
1082
1083 // Save the info just collected, to a report file or send to BRM
1084 int rc = fBRMReporter.sendBRMInfo( fBRMRptFileName, *errFiles, *badFiles );
1085
1086 return rc;
1087 }
1088
1089 //------------------------------------------------------------------------------
1090 // Update status of table to reflect an error.
1091 // No need to update the buffer or column status, because we are not going to
1092 // continue the job anyway. Other threads should terminate when they see that
1093 // the JobStatus has been set to EXIT_FAILURE and/or the table status has been
1094 // set to WriteEngine::ERR.
1095 //------------------------------------------------------------------------------
setParseError()1096 void TableInfo::setParseError( )
1097 {
1098 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
1099 fStatusTI = WriteEngine::ERR;
1100 }
1101
1102 //------------------------------------------------------------------------------
1103 // Locks a column from the specified buffer (bufferId) for the specified parse
1104 // thread (id); and returns the column id. A return value of -1 means no
1105 // column could be locked for parsing.
1106 //------------------------------------------------------------------------------
1107 // @bug2099. Temporary hack to diagnose deadlock.
1108 // Added report parm and couts below.
getColumnForParse(const int & id,const int & bufferId,bool report)1109 int TableInfo::getColumnForParse(const int& id,
1110 const int& bufferId,
1111 bool report)
1112 {
1113 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
1114 double maxTime = 0;
1115 int columnId = -1;
1116
1117 while (true)
1118 {
1119 // See if JobStatus has been set to terminate by another thread
1120 if (BulkStatus::getJobStatus() == EXIT_FAILURE)
1121 {
1122 fStatusTI = WriteEngine::ERR;
1123 throw SecondaryShutdownException( "TableInfo::"
1124 "getColumnForParse() responding to job termination");
1125 }
1126
1127 if ( !bufferReadyForParse(bufferId, report) ) return -1;
1128
1129 // @bug2099+
1130 ostringstream oss;
1131
1132 if (report)
1133 {
1134 #ifdef _MSC_VER
1135 oss << " ----- " << GetCurrentThreadId() << ":fBuffers[" << bufferId <<
1136 #else
1137 oss << " ----- " << pthread_self() << ":fBuffers[" << bufferId <<
1138 #endif
1139 "]: (colLocker,status,lasttime)- ";
1140 }
1141
1142 // @bug2099-
1143
1144 for (unsigned k = 0; k < fNumberOfColumns; ++k)
1145 {
1146 // @bug2099+
1147 if (report)
1148 {
1149 Status colStatus = fBuffers[bufferId].getColumnStatus(k);
1150 int colLocker = fBuffers[bufferId].getColumnLocker(k);
1151
1152 string colStatusStr;
1153 ColumnInfo::convertStatusToString(colStatus, colStatusStr);
1154
1155 oss << '(' << colLocker << ',' << colStatusStr << ',' <<
1156 fColumns[k].lastProcessingTime << ") ";
1157 }
1158
1159 // @bug2099-
1160
1161 if (fBuffers[bufferId].getColumnLocker(k) == -1)
1162 {
1163 if (columnId == -1)
1164 columnId = k;
1165 else if (fColumns[k].lastProcessingTime == 0)
1166 {
1167 if (fColumns[k].column.width >=
1168 fColumns[columnId].column.width)
1169 columnId = k;
1170 }
1171 else if (fColumns[k].lastProcessingTime > maxTime)
1172 {
1173 maxTime = fColumns[k].lastProcessingTime;
1174 columnId = k;
1175 }
1176 }
1177 }
1178
1179 // @bug2099+
1180 if (report)
1181 {
1182 oss << "; selected colId: " << columnId;
1183
1184 if (columnId != -1)
1185 oss << "; maxTime: " << maxTime;
1186
1187 oss << endl;
1188
1189 if (!BulkLoad::disableConsoleOutput())
1190 {
1191 cout << oss.str();
1192 cout.flush();
1193 }
1194 }
1195
1196 // @bug2099-
1197
1198 if (columnId == -1) return -1;
1199
1200 if (fBuffers[bufferId].tryAndLockColumn(columnId, id))
1201 {
1202 return columnId;
1203 }
1204 }
1205 }
1206
1207 //------------------------------------------------------------------------------
1208 // Check if the specified buffer is ready for parsing (status == READ_COMPLETE)
1209 // @bug 2099. Temporary hack to diagnose deadlock. Added report parm
1210 // and couts below.
1211 //------------------------------------------------------------------------------
bufferReadyForParse(const int & bufferId,bool report) const1212 bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const
1213 {
1214 if (fBuffers.size() == 0)
1215 return false;
1216
1217 Status stat = fBuffers[bufferId].getStatusBLB();
1218
1219 if (report)
1220 {
1221 ostringstream oss;
1222 string bufStatusStr;
1223 ColumnInfo::convertStatusToString( stat,
1224 bufStatusStr );
1225 #ifdef _MSC_VER
1226 oss << " --- " << GetCurrentThreadId() <<
1227 #else
1228 oss << " --- " << pthread_self() <<
1229 #endif
1230 ":fBuffers[" << bufferId << "]=" << bufStatusStr <<
1231 " (" << stat << ")" << std::endl;
1232 cout << oss.str();
1233 }
1234
1235 return (stat == WriteEngine::READ_COMPLETE) ? true : false;
1236 }
1237
1238 //------------------------------------------------------------------------------
1239 // Create the specified number (noOfBuffer) of BulkLoadBuffer objects and store
1240 // them in fBuffers. jobFieldRefList lists the fields in this import.
1241 // fixedBinaryRecLen is fixed record length for binary imports (it is n/a
1242 // for text bulk loads).
1243 //------------------------------------------------------------------------------
initializeBuffers(int noOfBuffers,const JobFieldRefList & jobFieldRefList,unsigned int fixedBinaryRecLen)1244 int TableInfo::initializeBuffers(int noOfBuffers,
1245 const JobFieldRefList& jobFieldRefList,
1246 unsigned int fixedBinaryRecLen)
1247 {
1248 #ifdef _MSC_VER
1249
1250 //@bug 3751
1251 //When reading from STDIN, Windows doesn't like the huge default buffer of
1252 // 1M, so turn it down.
1253 if (fReadFromStdin)
1254 {
1255 fBufferSize = std::min(10240, fBufferSize);
1256 }
1257
1258 #endif
1259
1260 fReadBufCount = noOfBuffers;
1261
1262 // initialize and populate the buffer vector.
1263 for (int i = 0; i < fReadBufCount; ++i)
1264 {
1265 BulkLoadBuffer* buffer = new BulkLoadBuffer(fNumberOfColumns,
1266 fBufferSize, fLog,
1267 i, fTableName,
1268 jobFieldRefList);
1269 buffer->setColDelimiter (fColDelim);
1270 buffer->setNullStringMode(fNullStringMode);
1271 buffer->setEnclosedByChar(fEnclosedByChar);
1272 buffer->setEscapeChar (fEscapeChar );
1273 buffer->setTruncationAsError(getTruncationAsError());
1274 buffer->setImportDataMode(fImportDataMode,
1275 fixedBinaryRecLen);
1276 buffer->setTimeZone(fTimeZone);
1277 fBuffers.push_back(buffer);
1278 }
1279 if (!fS3Key.empty())
1280 {
1281 ms3_library_init();
1282 ms3 = ms3_init(fS3Key.c_str(), fS3Secret.c_str(), fS3Region.c_str(), fS3Host.c_str());
1283 if (!ms3)
1284 {
1285 ostringstream oss;
1286 oss << "Error initiating S3 library";
1287 fLog->logMsg( oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR );
1288 return ERR_FILE_OPEN;
1289 }
1290 }
1291 return 0;
1292 }
1293
1294 //------------------------------------------------------------------------------
1295 // Add the specified ColumnInfo object (info) into this table's fColumns vector.
1296 //------------------------------------------------------------------------------
addColumn(ColumnInfo * info)1297 void TableInfo::addColumn(ColumnInfo* info)
1298 {
1299 fColumns.push_back(info);
1300 fNumberOfColumns = fColumns.size();
1301
1302 fExtentStrAlloc.addColumn( info->column.mapOid,
1303 info->column.width );
1304 }
1305
1306 //------------------------------------------------------------------------------
1307 // Open the file corresponding to fFileName so that we can import it's contents.
1308 // A buffer is also allocated and passed to setvbuf().
1309 // If fReadFromStdin is true, we just assign stdin to our fHandle for reading.
1310 //------------------------------------------------------------------------------
openTableFile()1311 int TableInfo::openTableFile()
1312 {
1313 if (fHandle != NULL)
1314 return NO_ERROR;
1315
1316 if (fReadFromStdin)
1317 {
1318 fHandle = stdin;
1319
1320 #ifdef _MSC_VER
1321
1322 // If this is a binary import from STDIN, then set stdin to binary
1323 if (fImportDataMode != IMPORT_DATA_TEXT)
1324 _setmode(_fileno(stdin), _O_BINARY);
1325
1326 fFileBuffer = 0;
1327 #else
1328 // Not 100% sure that calling setvbuf on stdin does much, but in
1329 // some tests, it made a slight difference.
1330 fFileBuffer = new char[fFileBufSize];
1331 setvbuf(fHandle, fFileBuffer, _IOFBF, fFileBufSize);
1332 #endif
1333 ostringstream oss;
1334 oss << BOLD_START << "Reading input from STDIN to import into table " <<
1335 fTableName << "..." << BOLD_STOP;
1336 fLog->logMsg( oss.str(), MSGLVL_INFO1 );
1337 }
1338 else if (fReadFromS3)
1339 {
1340 int res;
1341 res = ms3_get(ms3, fS3Bucket.c_str(), fFileName.c_str(), (uint8_t**)&fFileBuffer, &fS3ReadLength);
1342 fS3ParseLength = 0;
1343 if (res)
1344 {
1345 ostringstream oss;
1346 oss << "Error retrieving file " << fFileName << " from S3: ";
1347 if (ms3_server_error(ms3))
1348 {
1349 oss << ms3_server_error(ms3);
1350 }
1351 else
1352 {
1353 oss << ms3_error(res);
1354 }
1355 fLog->logMsg( oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR );
1356 return ERR_FILE_OPEN;
1357 }
1358 }
1359 else
1360 {
1361 if (fImportDataMode == IMPORT_DATA_TEXT)
1362 fHandle = fopen( fFileName.c_str(), "r" );
1363 else
1364 fHandle = fopen( fFileName.c_str(), "rb" );
1365
1366 if (fHandle == NULL)
1367 {
1368 int errnum = errno;
1369 ostringstream oss;
1370 oss << "Error opening import file " << fFileName << ". " <<
1371 strerror(errnum);
1372 fLog->logMsg( oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR );
1373
1374 // return an error; caller should set fStatusTI if needed
1375 return ERR_FILE_OPEN;
1376 }
1377
1378 // now the input load file is available for reading the data.
1379 // read the data from the load file into the buffers.
1380 fFileBuffer = new char[fFileBufSize];
1381 setvbuf(fHandle, fFileBuffer, _IOFBF, fFileBufSize);
1382
1383 ostringstream oss;
1384 oss << "Opening " << fFileName << " to import into table " << fTableName;
1385 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1386 }
1387
1388 return NO_ERROR;
1389 }
1390
1391 //------------------------------------------------------------------------------
1392 // Close the current open file we have been importing.
1393 //------------------------------------------------------------------------------
closeTableFile()1394 void TableInfo::closeTableFile()
1395 {
1396 if (fHandle)
1397 {
1398 // If reading from stdin, we don't delete the buffer out from under
1399 // the file handle, because stdin is still open. This will cause a
1400 // memory leak, but when using stdin, we can only read in 1 table.
1401 // So it's not like we will be leaking multiple buffers for several
1402 // tables over the life of the job.
1403 if (!fReadFromStdin)
1404 {
1405 fclose(fHandle);
1406 delete [] fFileBuffer;
1407 }
1408
1409 fHandle = 0;
1410 }
1411 else if (ms3)
1412 {
1413 ms3_free((uint8_t*)fFileBuffer);
1414 }
1415 }
1416
1417 //------------------------------------------------------------------------------
1418 // "Grabs" the current read buffer for TableInfo so that the read thread that
1419 // is calling this function, can read the next buffer's set of data.
1420 //------------------------------------------------------------------------------
1421 // @bug2099. Temporary hack to diagnose deadlock.
1422 // Added report parm and couts below.
isBufferAvailable(bool report)1423 bool TableInfo::isBufferAvailable(bool report)
1424 {
1425 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
1426 Status bufferStatus = fBuffers[fCurrentReadBuffer].getStatusBLB();
1427
1428 if ( (bufferStatus == WriteEngine::PARSE_COMPLETE) ||
1429 (bufferStatus == WriteEngine::NEW) )
1430 {
1431 // reset buffer status and column locks while we have
1432 // an fSyncUpdatesTI lock
1433 fBuffers[fCurrentReadBuffer].setStatusBLB(
1434 WriteEngine::READ_PROGRESS);
1435 fBuffers[fCurrentReadBuffer].resetColumnLocks();
1436 return true;
1437 }
1438
1439 if (report)
1440 {
1441 ostringstream oss;
1442 string bufferStatusStr;
1443 ColumnInfo::convertStatusToString( bufferStatus, bufferStatusStr );
1444 oss << " Buffer status is " << bufferStatusStr << ". " << endl;
1445 oss << " fCurrentReadBuffer is " << fCurrentReadBuffer << endl;
1446 cout << oss.str();
1447 cout.flush();
1448 }
1449
1450 return false;
1451 }
1452
1453 //------------------------------------------------------------------------------
1454 // Report whether rows were rejected, and if so, then list them out into the
1455 // reject file.
1456 //------------------------------------------------------------------------------
writeBadRows(const std::vector<std::string> * errorDatRows,bool bCloseFile)1457 void TableInfo::writeBadRows( const std::vector<std::string>* errorDatRows,
1458 bool bCloseFile )
1459 {
1460 size_t errorDatRowsCount = 0;
1461
1462 if (errorDatRows)
1463 errorDatRowsCount = errorDatRows->size();
1464
1465 if (errorDatRowsCount > 0)
1466 {
1467 if (!fRejectDataFile.is_open())
1468 {
1469 ostringstream rejectFileName;
1470
1471 if (fErrorDir.size() > 0)
1472 {
1473 #ifdef _MSC_VER
1474 char filename[_MAX_FNAME];
1475 char ext[_MAX_EXT];
1476 _splitpath(const_cast<char*>(getFileName().c_str()),
1477 NULL, NULL, filename, ext);
1478 rejectFileName << fErrorDir << "\\" << filename << ext;
1479 #else
1480 rejectFileName << fErrorDir << basename(getFileName().c_str());
1481 #endif
1482 }
1483 else
1484 {
1485 if (fReadFromS3)
1486 {
1487 rejectFileName << basename(getFileName().c_str());
1488 }
1489 else
1490 {
1491 rejectFileName << getFileName();
1492 }
1493 }
1494
1495 rejectFileName << ".Job_" << fJobId <<
1496 '_' << ::getpid() << BAD_FILE_SUFFIX;
1497 fRejectDataFileName = rejectFileName.str();
1498 fRejectDataFile.open( rejectFileName.str().c_str(),
1499 ofstream::out );
1500
1501 if ( !fRejectDataFile )
1502 {
1503 ostringstream oss;
1504 oss << "Unable to create file: " << rejectFileName.str() <<
1505 "; Check permission.";
1506 fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
1507
1508 return;
1509 }
1510 }
1511
1512 for (std::vector<string>::const_iterator iter = errorDatRows->begin();
1513 iter != errorDatRows->end(); ++iter)
1514 {
1515 fRejectDataFile << *iter;
1516 }
1517
1518 fRejectDataCnt += errorDatRowsCount;
1519 }
1520
1521 if (bCloseFile)
1522 {
1523 if (fRejectDataFile.is_open())
1524 fRejectDataFile.close();
1525
1526 fRejectDataFile.clear();
1527
1528 if (fRejectDataCnt > 0)
1529 {
1530 ostringstream oss;
1531 std::string rejectFileNameToLog;
1532
1533 // Construct/report complete file name and save in list of files
1534 boost::filesystem::path p(fRejectDataFileName);
1535
1536 if (!p.has_root_path())
1537 {
1538 // We could fail here having fixed size buffer
1539 char cwdPath[4096];
1540 char* buffPtr = &cwdPath[0];
1541 buffPtr = getcwd(cwdPath, sizeof(cwdPath));
1542 boost::filesystem::path rejectFileName2( buffPtr );
1543 rejectFileName2 /= fRejectDataFileName;
1544 fBadFiles.push_back( rejectFileName2.string() );
1545
1546 rejectFileNameToLog = rejectFileName2.string();
1547 }
1548 else
1549 {
1550 fBadFiles.push_back( fRejectDataFileName );
1551
1552 rejectFileNameToLog = fRejectDataFileName;
1553 }
1554
1555 oss << "Number of rows with bad data = " << fRejectDataCnt <<
1556 ". Exact rows are listed in file located here: " <<
1557 fErrorDir;
1558 fLog->logMsg(oss.str(), MSGLVL_INFO1);
1559
1560 fRejectDataCnt = 0;
1561 }
1562 }
1563 }
1564
1565 //------------------------------------------------------------------------------
1566 // Report whether rows were rejected, and if so, then list out the row numbers
1567 // and error reasons into the error file.
1568 //------------------------------------------------------------------------------
writeErrReason(const std::vector<std::pair<RID,string>> * errorRows,bool bCloseFile)1569 void TableInfo::writeErrReason( const std::vector< std::pair<RID,
1570 string> >* errorRows,
1571 bool bCloseFile )
1572 {
1573 size_t errorRowsCount = 0;
1574
1575 if (errorRows)
1576 errorRowsCount = errorRows->size();
1577
1578 if (errorRowsCount > 0)
1579 {
1580 if (!fRejectErrFile.is_open())
1581 {
1582 ostringstream errFileName;
1583
1584 if (fErrorDir.size() > 0)
1585 {
1586 #ifdef _MSC_VER
1587 char filename[_MAX_FNAME];
1588 char ext[_MAX_EXT];
1589 _splitpath(const_cast<char*>(getFileName().c_str()),
1590 NULL, NULL, filename, ext);
1591 errFileName << fErrorDir << "\\" << filename << ext;
1592 #else
1593 errFileName << fErrorDir << basename(getFileName().c_str());
1594 #endif
1595 }
1596 else
1597 {
1598 if (fReadFromS3)
1599 {
1600 errFileName << basename(getFileName().c_str());
1601 }
1602 else
1603 {
1604 errFileName << getFileName();
1605 }
1606 }
1607
1608 errFileName << ".Job_" << fJobId <<
1609 '_' << ::getpid() << ERR_FILE_SUFFIX;
1610 fRejectErrFileName = errFileName.str();
1611 fRejectErrFile.open( errFileName.str().c_str(),
1612 ofstream::out );
1613
1614 if ( !fRejectErrFile )
1615 {
1616 ostringstream oss;
1617 oss << "Unable to create file: " << errFileName.str() <<
1618 "; Check permission.";
1619 fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
1620
1621 return;
1622 }
1623 }
1624
1625 for (std::vector< std::pair<RID, std::string> >::const_iterator iter =
1626 errorRows->begin();
1627 iter != errorRows->end(); ++iter)
1628 {
1629 fRejectErrFile << "Line number " << iter->first <<
1630 "; Error: " << iter->second << endl;
1631 }
1632
1633 fRejectErrCnt += errorRowsCount;
1634 }
1635
1636 if (bCloseFile)
1637 {
1638 if (fRejectErrFile.is_open())
1639 fRejectErrFile.close();
1640
1641 fRejectErrFile.clear();
1642
1643 if (fRejectErrCnt > 0)
1644 {
1645 ostringstream oss;
1646 std::string errFileNameToLog;
1647
1648 // Construct/report complete file name and save in list of files
1649 boost::filesystem::path p(fRejectErrFileName);
1650
1651 if (!p.has_root_path())
1652 {
1653 char cwdPath[4096];
1654 char* buffPtr = &cwdPath[0];
1655 buffPtr = getcwd(cwdPath, sizeof(cwdPath));
1656 boost::filesystem::path errFileName2( buffPtr );
1657 errFileName2 /= fRejectErrFileName;
1658 fErrFiles.push_back( errFileName2.string() );
1659
1660 errFileNameToLog = errFileName2.string();
1661 }
1662 else
1663 {
1664 fErrFiles.push_back( fRejectErrFileName );
1665
1666 errFileNameToLog = fRejectErrFileName;
1667 }
1668
1669 oss << "Number of rows with errors = " << fRejectDataCnt <<
1670 ". Exact rows are listed in file located here: " <<
1671 fErrorDir;
1672 fLog->logMsg(oss.str(), MSGLVL_INFO1);
1673
1674 fRejectErrCnt = 0;
1675 }
1676 }
1677 }
1678
1679 //------------------------------------------------------------------------------
1680 // Logs "Bulkload |Job" message along with the specified message text
1681 // (messageText) to the critical log.
1682 //------------------------------------------------------------------------------
logToDataMods(const string & jobFile,const string & messageText)1683 void TableInfo::logToDataMods(const string& jobFile, const string& messageText)
1684 {
1685 logging::Message::Args args;
1686
1687 unsigned subsystemId = 19; // writeengine
1688
1689 logging::LoggingID loggingId(subsystemId, 0, fTxnID.id, 0);
1690 logging::MessageLog messageLog(loggingId, LOG_LOCAL1);
1691
1692 logging::Message m(8);
1693 args.add("Bulkload |Job: " + jobFile);
1694 args.add("|" + messageText);
1695 m.format(args);
1696 messageLog.logInfoMessage(m);
1697 }
1698
1699 //------------------------------------------------------------------------------
1700 // Acquires DB table lock for this TableInfo object.
1701 // Function employs retry logic based on the SystemConfig/WaitPeriod.
1702 //------------------------------------------------------------------------------
acquireTableLock(bool disableTimeOut)1703 int TableInfo::acquireTableLock( bool disableTimeOut )
1704 {
1705 // Save DBRoot list at start of job; used to compare at EOJ.
1706 Config::getRootIdList( fOrigDbRootIds );
1707
1708 // If executing distributed (mode1) or central command (mode2) then
1709 // don't worry about table locks. The client front-end will manage locks.
1710 if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) ||
1711 (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
1712 {
1713 if (fLog->isDebug( DEBUG_1 ))
1714 {
1715 ostringstream oss;
1716 oss << "Bypass acquiring table lock in distributed mode, "
1717 "for table" << fTableName << "; OID-" << fTableOID;
1718 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1719 }
1720
1721 return NO_ERROR;
1722 }
1723
1724 const int SLEEP_INTERVAL = 100; // sleep 100 milliseconds between checks
1725 const int NUM_TRIES_PER_SEC = 10; // try 10 times per second
1726
1727 int waitSeconds = Config::getWaitPeriod();
1728 const int NUM_TRIES = NUM_TRIES_PER_SEC * waitSeconds;
1729 std::string tblLockErrMsg;
1730
1731 // Retry loop to lock the db table associated with this TableInfo object
1732 std::string processName;
1733 uint32_t processId;
1734 int32_t sessionId;
1735 int32_t transId;
1736 ostringstream pmModOss;
1737 pmModOss << " (pm" << Config::getLocalModuleID() << ')';
1738 bool timeout = false;
1739 //for (int i=0; i<NUM_TRIES; i++)
1740 int try_count = 0;
1741
1742 while (!timeout)
1743 {
1744 processName = fProcessName;
1745 processName += pmModOss.str();
1746 processId = ::getpid();
1747 sessionId = -1;
1748 transId = -1;
1749 int rc = BRMWrapper::getInstance()->getTableLock (
1750 fTableOID,
1751 processName,
1752 processId,
1753 sessionId,
1754 transId,
1755 fTableLockID,
1756 tblLockErrMsg);
1757
1758 if ((rc == NO_ERROR) && (fTableLockID > 0))
1759 {
1760 fTableLocked = true;
1761
1762 if (fLog->isDebug( DEBUG_1 ))
1763 {
1764 ostringstream oss;
1765 oss << "Table lock acquired for table " << fTableName <<
1766 "; OID-" << fTableOID << "; lockID-" << fTableLockID;
1767 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1768 }
1769
1770 return NO_ERROR;
1771 }
1772 else if (fTableLockID == 0)
1773 {
1774 // sleep and then go back and try getting table lock again
1775 sleepMS(SLEEP_INTERVAL);
1776
1777 if (fLog->isDebug( DEBUG_1 ))
1778 {
1779 ostringstream oss;
1780 oss << "Retrying to acquire table lock for table " <<
1781 fTableName << "; OID-" << fTableOID;
1782 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1783 }
1784 }
1785 else
1786 {
1787 ostringstream oss;
1788 oss << "Error in acquiring table lock for table " << fTableName <<
1789 "; OID-" << fTableOID << "; " << tblLockErrMsg;
1790 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
1791
1792 return rc;
1793 }
1794
1795 // if disableTimeOut is set then no timeout for table lock. Forever wait....
1796 timeout = (disableTimeOut ? false : (++try_count >= NUM_TRIES));
1797 }
1798
1799 ostringstream oss;
1800 oss << "Unable to acquire lock for table " << fTableName <<
1801 "; OID-" << fTableOID << "; table currently locked by process-" <<
1802 processName << "; pid-" << processId <<
1803 "; session-" << sessionId <<
1804 "; txn-" << transId;
1805 fLog->logMsg( oss.str(), ERR_TBLLOCK_GET_LOCK_LOCKED, MSGLVL_ERROR );
1806
1807 return ERR_TBLLOCK_GET_LOCK_LOCKED;
1808 }
1809
1810 //------------------------------------------------------------------------------
1811 // Change table lock state (to cleanup)
1812 //------------------------------------------------------------------------------
changeTableLockState()1813 int TableInfo::changeTableLockState( )
1814 {
1815 // If executing distributed (mode1) or central command (mode2) then
1816 // don't worry about table locks. The client front-end will manage locks.
1817 if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) ||
1818 (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
1819 {
1820 return NO_ERROR;
1821 }
1822
1823 std::string tblLockErrMsg;
1824 bool bChanged = false;
1825
1826 int rc = BRMWrapper::getInstance()->changeTableLockState (
1827 fTableLockID,
1828 BRM::CLEANUP,
1829 bChanged,
1830 tblLockErrMsg );
1831
1832 if (rc == NO_ERROR)
1833 {
1834 if (fLog->isDebug( DEBUG_1 ))
1835 {
1836 ostringstream oss;
1837
1838 if (bChanged)
1839 {
1840 oss << "Table lock state changed to CLEANUP for table " <<
1841 fTableName <<
1842 "; OID-" << fTableOID <<
1843 "; lockID-" << fTableLockID;
1844 }
1845 else
1846 {
1847 oss << "Table lock state not changed to CLEANUP for table " <<
1848 fTableName <<
1849 "; OID-" << fTableOID <<
1850 "; lockID-" << fTableLockID <<
1851 ". Table lot locked.";
1852 }
1853
1854 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1855 }
1856 }
1857 else
1858 {
1859 ostringstream oss;
1860 oss << "Error in changing table state for table " << fTableName <<
1861 "; OID-" << fTableOID <<
1862 "; lockID-" << fTableLockID << "; " << tblLockErrMsg;
1863 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
1864 return rc;
1865 }
1866
1867 return NO_ERROR;
1868 }
1869
1870 //------------------------------------------------------------------------------
1871 // Releases DB table lock assigned to this TableInfo object.
1872 //------------------------------------------------------------------------------
releaseTableLock()1873 int TableInfo::releaseTableLock( )
1874 {
1875 // If executing distributed (mode1) or central command (mode2) then
1876 // don't worry about table locks. The client front-end will manage locks.
1877 if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) ||
1878 (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
1879 {
1880 if (fLog->isDebug( DEBUG_1 ))
1881 {
1882 ostringstream oss;
1883 oss << "Bypass releasing table lock in distributed mode, "
1884 "for table " << fTableName << "; OID-" << fTableOID;
1885 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1886 }
1887
1888 return NO_ERROR;
1889 }
1890
1891 std::string tblLockErrMsg;
1892 bool bReleased = false;
1893
1894 // Unlock the database table
1895 int rc = BRMWrapper::getInstance()->releaseTableLock (
1896 fTableLockID,
1897 bReleased,
1898 tblLockErrMsg );
1899
1900 if (rc == NO_ERROR)
1901 {
1902 fTableLocked = false;
1903
1904 if (fLog->isDebug( DEBUG_1 ))
1905 {
1906 ostringstream oss;
1907
1908 if (bReleased)
1909 {
1910 oss << "Table lock released for table " << fTableName <<
1911 "; OID-" << fTableOID <<
1912 "; lockID-" << fTableLockID;
1913 }
1914 else
1915 {
1916 oss << "Table lock not released for table " << fTableName <<
1917 "; OID-" << fTableOID <<
1918 "; lockID-" << fTableLockID <<
1919 ". Table not locked.";
1920 }
1921
1922 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1923 }
1924 }
1925 else
1926 {
1927 ostringstream oss;
1928 oss << "Error in releasing table lock for table " << fTableName <<
1929 "; OID-" << fTableOID <<
1930 "; lockID-" << fTableLockID << "; " << tblLockErrMsg;
1931 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
1932 return rc;
1933 }
1934
1935 return NO_ERROR;
1936 }
1937
1938 //------------------------------------------------------------------------------
1939 // Delete bulk rollback metadata file.
1940 //------------------------------------------------------------------------------
deleteMetaDataRollbackFile()1941 void TableInfo::deleteMetaDataRollbackFile( )
1942 {
1943 // If executing distributed (mode1) or central command (mode2) then
1944 // don't worry about table locks, or deleting meta data files. The
1945 // client front-end will manage these tasks after all imports are finished.
1946 if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) ||
1947 (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
1948 {
1949 return;
1950 }
1951
1952 if (!fKeepRbMetaFile)
1953 {
1954 // Treat any error as non-fatal, though we log it.
1955 try
1956 {
1957 fRBMetaWriter.deleteFile();
1958 }
1959 catch (WeException& ex)
1960 {
1961 ostringstream oss;
1962 oss << "Error deleting meta file; " << ex.what();
1963 fLog->logMsg(oss.str(), ex.errorCode(), MSGLVL_ERROR);
1964 }
1965 }
1966 }
1967
1968 //------------------------------------------------------------------------------
1969 // Changes to "existing" DB files must be confirmed on HDFS system.
1970 // This function triggers this action.
1971 //------------------------------------------------------------------------------
1972 // @bug 5572 - Add db file confirmation for HDFS
confirmDBFileChanges()1973 int TableInfo::confirmDBFileChanges( )
1974 {
1975 // Unlike deleteTempDBFileChanges(), note that confirmDBFileChanges()
1976 // executes regardless of the import mode. We go ahead and confirm
1977 // the file changes at the end of a successful cpimport.bin.
1978 if (idbdatafile::IDBPolicy::useHdfs())
1979 {
1980 ostringstream oss;
1981 oss << "Confirming DB file changes for " << fTableName;
1982 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1983
1984 std::string errMsg;
1985 ConfirmHdfsDbFile confirmHdfs;
1986 int rc = confirmHdfs.confirmDbFileListFromMetaFile( fTableOID, errMsg );
1987
1988 if (rc != NO_ERROR)
1989 {
1990 ostringstream ossErrMsg;
1991 ossErrMsg << "Unable to confirm changes to table " << fTableName <<
1992 "; " << errMsg;
1993 fLog->logMsg( ossErrMsg.str(), rc, MSGLVL_ERROR );
1994
1995 return rc;
1996 }
1997 }
1998
1999 return NO_ERROR;
2000 }
2001
2002 //------------------------------------------------------------------------------
2003 // Temporary swap files must be deleted on HDFS system.
2004 // This function triggers this action.
2005 //------------------------------------------------------------------------------
2006 // @bug 5572 - Add db file confirmation for HDFS
deleteTempDBFileChanges()2007 void TableInfo::deleteTempDBFileChanges( )
2008 {
2009 // If executing distributed (mode1) or central command (mode2) then
2010 // no action necessary. The client front-end will initiate the deletion
2011 // of the temp files, only after all the distributed imports have
2012 // successfully completed.
2013 if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) ||
2014 (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
2015 {
2016 return;
2017 }
2018
2019 if (idbdatafile::IDBPolicy::useHdfs())
2020 {
2021 ostringstream oss;
2022 oss << "Deleting DB temp swap files for " << fTableName;
2023 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
2024
2025 std::string errMsg;
2026 ConfirmHdfsDbFile confirmHdfs;
2027 int rc = confirmHdfs.endDbFileListFromMetaFile(fTableOID, true, errMsg);
2028
2029 // Treat any error as non-fatal, though we log it.
2030 if (rc != NO_ERROR)
2031 {
2032 ostringstream ossErrMsg;
2033 ossErrMsg << "Unable to delete temp swap files for table " <<
2034 fTableName << "; " << errMsg;
2035 fLog->logMsg( ossErrMsg.str(), rc, MSGLVL_ERROR );
2036 }
2037 }
2038 }
2039
2040 //------------------------------------------------------------------------------
2041 // Validates the correctness of the current HWMs for this table.
2042 // The HWMs for all the 1 byte columns should be identical. Same goes
2043 // for all the 2 byte columns, etc. The 2 byte column HWMs should be
2044 // "roughly" (but not necessarily exactly) twice that of a 1 byte column.
2045 // Same goes for the 4 byte column HWMs vs their 2 byte counterparts, etc.
2046 // jobTable - table/column information to use with validation.
2047 // We use jobTable.colList[] (if provided) instead of data memmber
2048 // fColumns, because this function is called during preprocessing,
2049 // before TableInfo.fColumns has been initialized with data from
2050 // colList.
2051 // segFileInfo - Vector of File objects carrying current DBRoot, partition,
2052 // HWM, etc to be validated for the columns belonging to jobTable.
2053 // stage - Current stage we are validating. "Starting" or "Ending".
2054 //------------------------------------------------------------------------------
validateColumnHWMs(const JobTable * jobTable,const std::vector<DBRootExtentInfo> & segFileInfo,const char * stage)2055 int TableInfo::validateColumnHWMs(
2056 const JobTable* jobTable,
2057 const std::vector<DBRootExtentInfo>& segFileInfo,
2058 const char* stage )
2059 {
2060 int rc = NO_ERROR;
2061
2062 // Used to track first 1-byte, 2-byte, 4-byte, and 8-byte columns in table
2063 int byte1First = -1;
2064 int byte2First = -1;
2065 int byte4First = -1;
2066 int byte8First = -1;
2067
2068 // Make sure the HWMs for all 1-byte columns match; same for all 2-byte,
2069 // 4-byte, and 8-byte columns as well.
2070 for (unsigned k = 0; k < segFileInfo.size(); k++)
2071 {
2072 int k1 = 0;
2073
2074 // Validate HWMs in jobTable if we have it, else use fColumns.
2075 const JobColumn& jobColK =
2076 ( (jobTable != 0) ? jobTable->colList[k] : fColumns[k].column );
2077
2078 // Find the first 1-byte, 2-byte, 4-byte, and 8-byte columns.
2079 // Use those as our reference HWM for the respective column widths.
2080 switch ( jobColK.width )
2081 {
2082 case 1:
2083 {
2084 if (byte1First == -1)
2085 byte1First = k;
2086
2087 k1 = byte1First;
2088 break;
2089 }
2090
2091 case 2:
2092 {
2093 if (byte2First == -1)
2094 byte2First = k;
2095
2096 k1 = byte2First;
2097 break;
2098 }
2099
2100 case 4:
2101 {
2102 if (byte4First == -1)
2103 byte4First = k;
2104
2105 k1 = byte4First;
2106 break;
2107 }
2108
2109 case 8:
2110 default:
2111 {
2112 if (byte8First == -1)
2113 byte8First = k;
2114
2115 k1 = byte8First;
2116 break;
2117 }
2118 } // end of switch based on column width (1,2,4, or 8)
2119
2120 // Validate HWMs in jobTable if we have it, else use fColumns.
2121 const JobColumn& jobColK1 =
2122 ( (jobTable != 0) ? jobTable->colList[k1] : fColumns[k1].column );
2123
2124 //std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 <<
2125 // "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm <<
2126 // " <to> col-" << k <<
2127 // "; wid-" << jobColK.width << " ; hwm-"<<segFileInfo[k].fLocalHwm<<std::endl;
2128
2129 // Validate that the HWM for this column (k) matches that of the
2130 // corresponding reference column with the same width.
2131 if ((segFileInfo[k1].fDbRoot != segFileInfo[k].fDbRoot) ||
2132 (segFileInfo[k1].fPartition != segFileInfo[k].fPartition) ||
2133 (segFileInfo[k1].fSegment != segFileInfo[k].fSegment) ||
2134 (segFileInfo[k1].fLocalHwm != segFileInfo[k].fLocalHwm))
2135 {
2136 ostringstream oss;
2137 oss << stage << " HWMs do not match for"
2138 " OID1-" << jobColK1.mapOid <<
2139 "; column-" << jobColK1.colName <<
2140 "; DBRoot-" << segFileInfo[k1].fDbRoot <<
2141 "; partition-" << segFileInfo[k1].fPartition <<
2142 "; segment-" << segFileInfo[k1].fSegment <<
2143 "; hwm-" << segFileInfo[k1].fLocalHwm <<
2144 "; width-" << jobColK1.width << ':' << std::endl <<
2145 " and OID2-" << jobColK.mapOid <<
2146 "; column-" << jobColK.colName <<
2147 "; DBRoot-" << segFileInfo[k].fDbRoot <<
2148 "; partition-" << segFileInfo[k].fPartition <<
2149 "; segment-" << segFileInfo[k].fSegment <<
2150 "; hwm-" << segFileInfo[k].fLocalHwm <<
2151 "; width-" << jobColK.width;
2152 fLog->logMsg( oss.str(), ERR_BRM_HWMS_NOT_EQUAL, MSGLVL_ERROR );
2153 return ERR_BRM_HWMS_NOT_EQUAL;
2154 }
2155
2156 // HWM DBRoot, partition, and segment number should match for all
2157 // columns; so compare DBRoot, part#, and seg# with first column.
2158 if ((segFileInfo[0].fDbRoot != segFileInfo[k].fDbRoot) ||
2159 (segFileInfo[0].fPartition != segFileInfo[k].fPartition) ||
2160 (segFileInfo[0].fSegment != segFileInfo[k].fSegment))
2161 {
2162 const JobColumn& jobCol0 =
2163 ( (jobTable != 0) ? jobTable->colList[0] : fColumns[0].column );
2164
2165 ostringstream oss;
2166 oss << stage << " HWM DBRoot,Part#, or Seg# do not match for"
2167 " OID1-" << jobCol0.mapOid <<
2168 "; column-" << jobCol0.colName <<
2169 "; DBRoot-" << segFileInfo[0].fDbRoot <<
2170 "; partition-" << segFileInfo[0].fPartition <<
2171 "; segment-" << segFileInfo[0].fSegment <<
2172 "; hwm-" << segFileInfo[0].fLocalHwm <<
2173 "; width-" << jobCol0.width << ':' << std::endl <<
2174 " and OID2-" << jobColK.mapOid <<
2175 "; column-" << jobColK.colName <<
2176 "; DBRoot-" << segFileInfo[k].fDbRoot <<
2177 "; partition-" << segFileInfo[k].fPartition <<
2178 "; segment-" << segFileInfo[k].fSegment <<
2179 "; hwm-" << segFileInfo[k].fLocalHwm <<
2180 "; width-" << jobColK.width;
2181 fLog->logMsg( oss.str(), ERR_BRM_HWMS_NOT_EQUAL, MSGLVL_ERROR );
2182 return ERR_BRM_HWMS_NOT_EQUAL;
2183 }
2184 } // end of loop to compare all 1-byte HWMs, 2-byte HWMs, etc.
2185
2186 // Validate/compare HWM for 1-byte column in relation to 2-byte column, etc.
2187 // Without knowing the exact row count, we can't extrapolate the exact HWM
2188 // for the wider column, but we can narrow it down to an expected range.
2189 int refCol = 0;
2190 int colIdx = 0;
2191
2192 //if (byte1First >= 0)
2193 // std::cout << "dbg: cross compare1 " << stage << " col-" << byte1First <<
2194 // "; wid-" << ( (jobTable != 0) ? jobTable->colList[byte1First].width :
2195 // fColumns[byte1First].column.width ) <<
2196 // "; hwm-" << segFileInfo[byte1First].fLocalHwm << std::endl;
2197
2198 //if (byte2First >= 0)
2199 // std::cout << "dbg: cross compare2 " << stage << " col-" << byte2First <<
2200 // "; wid-" << ( (jobTable != 0) ? jobTable->colList[byte2First].width :
2201 // fColumns[byte2First].column.width ) <<
2202 // "; hwm-" << segFileInfo[byte2First].fLocalHwm << std::endl;
2203
2204 //if (byte4First >= 0)
2205 // std::cout << "dbg: cross compare4 " << stage << " col-" << byte4First <<
2206 // "; wid-" << ( (jobTable != 0) ? jobTable->colList[byte4First].width :
2207 // fColumns[byte4First].column.width ) <<
2208 // "; hwm-" << segFileInfo[byte4First].fLocalHwm << std::endl;
2209
2210 //if (byte8First >= 0)
2211 // std::cout << "dbg: cross compare8 " << stage << " col-" << byte8First <<
2212 // "; wid-" << ( (jobTable != 0) ? jobTable->colList[byte8First].width :
2213 // fColumns[byte8First].column.width ) <<
2214 // "; hwm-" << segFileInfo[byte8First].fLocalHwm << std::endl;
2215
2216 // Validate/compare HWMs given a 1-byte column as a starting point
2217 if (byte1First >= 0)
2218 {
2219 refCol = byte1First;
2220
2221 if (byte2First >= 0)
2222 {
2223 HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 2;
2224 HWM hwmHi = hwmLo + 1;
2225
2226 if ((segFileInfo[byte2First].fLocalHwm < hwmLo) ||
2227 (segFileInfo[byte2First].fLocalHwm > hwmHi))
2228 {
2229 colIdx = byte2First;
2230 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
2231 goto errorCheck;
2232 }
2233 }
2234
2235 if (byte4First >= 0)
2236 {
2237 HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 4;
2238 HWM hwmHi = hwmLo + 3;
2239
2240 if ((segFileInfo[byte4First].fLocalHwm < hwmLo) ||
2241 (segFileInfo[byte4First].fLocalHwm > hwmHi))
2242 {
2243 colIdx = byte4First;
2244 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
2245 goto errorCheck;
2246 }
2247 }
2248
2249 if (byte8First >= 0)
2250 {
2251 HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 8;
2252 HWM hwmHi = hwmLo + 7;
2253
2254 if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
2255 (segFileInfo[byte8First].fLocalHwm > hwmHi))
2256 {
2257 colIdx = byte8First;
2258 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
2259 goto errorCheck;
2260 }
2261 }
2262 }
2263
2264 // Validate/compare HWMs given a 2-byte column as a starting point
2265 if (byte2First >= 0)
2266 {
2267 refCol = byte2First;
2268
2269 if (byte4First >= 0)
2270 {
2271 HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 2;
2272 HWM hwmHi = hwmLo + 1;
2273
2274 if ((segFileInfo[byte4First].fLocalHwm < hwmLo) ||
2275 (segFileInfo[byte4First].fLocalHwm > hwmHi))
2276 {
2277 colIdx = byte4First;
2278 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
2279 goto errorCheck;
2280 }
2281 }
2282
2283 if (byte8First >= 0)
2284 {
2285 HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 4;
2286 HWM hwmHi = hwmLo + 3;
2287
2288 if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
2289 (segFileInfo[byte8First].fLocalHwm > hwmHi))
2290 {
2291 colIdx = byte8First;
2292 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
2293 goto errorCheck;
2294 }
2295 }
2296 }
2297
2298 // Validate/compare HWMs given a 4-byte column as a starting point
2299 if (byte4First >= 0)
2300 {
2301 refCol = byte4First;
2302
2303 if (byte8First >= 0)
2304 {
2305 HWM hwmLo = segFileInfo[byte4First].fLocalHwm * 2;
2306 HWM hwmHi = hwmLo + 1;
2307
2308 if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
2309 (segFileInfo[byte8First].fLocalHwm > hwmHi))
2310 {
2311 colIdx = byte8First;
2312 rc = ERR_BRM_HWMS_OUT_OF_SYNC;
2313 goto errorCheck;
2314 }
2315 }
2316 }
2317
2318 // To avoid repeating this message 6 times in the preceding source code, we
2319 // use the "dreaded" goto to branch to this single place for error handling.
2320 errorCheck:
2321
2322 if (rc != NO_ERROR)
2323 {
2324 const JobColumn& jobColRef = ( (jobTable != 0) ?
2325 jobTable->colList[refCol] : fColumns[refCol].column );
2326 const JobColumn& jobColIdx = ( (jobTable != 0) ?
2327 jobTable->colList[colIdx] : fColumns[colIdx].column );
2328
2329 ostringstream oss;
2330 oss << stage << " HWMs are not in sync for"
2331 " OID1-" << jobColRef.mapOid <<
2332 "; column-" << jobColRef.colName <<
2333 "; DBRoot-" << segFileInfo[refCol].fDbRoot <<
2334 "; partition-" << segFileInfo[refCol].fPartition <<
2335 "; segment-" << segFileInfo[refCol].fSegment <<
2336 "; hwm-" << segFileInfo[refCol].fLocalHwm <<
2337 "; width-" << jobColRef.width << ':' << std::endl <<
2338 " and OID2-" << jobColIdx.mapOid <<
2339 "; column-" << jobColIdx.colName <<
2340 "; DBRoot-" << segFileInfo[colIdx].fDbRoot <<
2341 "; partition-" << segFileInfo[colIdx].fPartition <<
2342 "; segment-" << segFileInfo[colIdx].fSegment <<
2343 "; hwm-" << segFileInfo[colIdx].fLocalHwm <<
2344 "; width-" << jobColIdx.width;
2345 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
2346 }
2347
2348 return rc;
2349 }
2350
2351 //------------------------------------------------------------------------------
2352 // DESCRIPTION:
2353 // Initialize the bulk rollback metadata writer for this table.
2354 // RETURN:
2355 // NO_ERROR if success
2356 // other if fail
2357 //------------------------------------------------------------------------------
initBulkRollbackMetaData()2358 int TableInfo::initBulkRollbackMetaData( )
2359 {
2360 int rc = NO_ERROR;
2361
2362 try
2363 {
2364 fRBMetaWriter.init( fTableOID, fTableName );
2365 }
2366 catch (WeException& ex)
2367 {
2368 fLog->logMsg(ex.what(), ex.errorCode(), MSGLVL_ERROR);
2369 rc = ex.errorCode();
2370 }
2371
2372 return rc;
2373 }
2374
2375 //------------------------------------------------------------------------------
2376 // DESCRIPTION:
2377 // Saves snapshot of extentmap into a bulk rollback meta data file, for
2378 // use in a bulk rollback, if the current cpimport.bin job should fail.
2379 // The source code in RBMetaWriter::saveBulkRollbackMetaData() used to
2380 // reside in this TableInfo function. But much of the source code was
2381 // factored out to create RBMetaWriter::saveBulkRollbackMetaData(), so
2382 // that the function would reside in the shared library for reuse by DML.
2383 // PARAMETERS:
2384 // job - current job
2385 // segFileInfo - Vector of File objects carrying starting DBRoot, partition,
2386 // etc, for each column belonging to tableNo.
2387 // dbRootHWMInfoVecCol - vector of last local HWM info for each DBRoot
2388 // (asssigned to current PM) for each column in "this" table.
2389 // RETURN:
2390 // NO_ERROR if success
2391 // other if fail
2392 //------------------------------------------------------------------------------
saveBulkRollbackMetaData(Job & job,const std::vector<DBRootExtentInfo> & segFileInfo,const std::vector<BRM::EmDbRootHWMInfo_v> & dbRootHWMInfoVecCol)2393 int TableInfo::saveBulkRollbackMetaData( Job& job,
2394 const std::vector<DBRootExtentInfo>& segFileInfo,
2395 const std::vector<BRM::EmDbRootHWMInfo_v>& dbRootHWMInfoVecCol )
2396 {
2397 int rc = NO_ERROR;
2398
2399 std::vector<Column> cols;
2400 std::vector<OID> dctnryOids;
2401
2402 // Loop through the columns in the specified table
2403 for ( size_t i = 0; i < job.jobTableList[fTableId].colList.size(); i++ )
2404 {
2405 JobColumn& jobCol = job.jobTableList[fTableId].colList[i];
2406
2407 Column col;
2408 col.colNo = i;
2409 col.colWidth = jobCol.width;
2410 col.colType = jobCol.weType;
2411 col.colDataType = jobCol.dataType;
2412 col.dataFile.oid = jobCol.mapOid;
2413 col.dataFile.fid = jobCol.mapOid;
2414 col.dataFile.hwm = segFileInfo[i].fLocalHwm; // starting HWM
2415 col.dataFile.pFile = 0;
2416 col.dataFile.fPartition = segFileInfo[i].fPartition; // starting Part#
2417 col.dataFile.fSegment = segFileInfo[i].fSegment; // starting seg#
2418 col.dataFile.fDbRoot = segFileInfo[i].fDbRoot; // starting DBRoot
2419 col.compressionType = jobCol.compressionType;
2420 cols.push_back( col );
2421
2422 OID dctnryOid = 0;
2423
2424 if (jobCol.colType == COL_TYPE_DICT)
2425 dctnryOid = jobCol.dctnry.dctnryOid;
2426
2427 dctnryOids.push_back( dctnryOid );
2428
2429 } // end of loop through columns
2430
2431 fRBMetaWriter.setUIDGID(this);
2432
2433 try
2434 {
2435 fRBMetaWriter.saveBulkRollbackMetaData(
2436 cols,
2437 dctnryOids,
2438 dbRootHWMInfoVecCol );
2439 }
2440 catch (WeException& ex)
2441 {
2442 fLog->logMsg(ex.what(), ex.errorCode(), MSGLVL_ERROR);
2443 rc = ex.errorCode();
2444 }
2445
2446 return rc;
2447 }
2448
2449 //------------------------------------------------------------------------------
2450 // Synchronize system catalog auto-increment next value with BRM.
2451 // This function is called at the end of normal processing to get the system
2452 // catalog back in line with the latest auto increment next value generated by
2453 // BRM.
2454 //------------------------------------------------------------------------------
synchronizeAutoInc()2455 int TableInfo::synchronizeAutoInc( )
2456 {
2457 for (unsigned i = 0; i < fColumns.size(); ++i)
2458 {
2459 if (fColumns[i].column.autoIncFlag)
2460 {
2461 // TBD: Do we rollback flush cache error for autoinc.
2462 // Not sure we should bail out and rollback on a
2463 // ERR_BLKCACHE_FLUSH_LIST error, but we currently
2464 // rollback for "any" updateNextValue() error
2465 int rc = fColumns[i].finishAutoInc( );
2466
2467 if (rc != NO_ERROR)
2468 {
2469 return rc;
2470 }
2471
2472 break; // okay to break; only 1 autoinc column per table
2473 }
2474 }
2475
2476 return NO_ERROR;
2477 }
2478
2479 //------------------------------------------------------------------------------
2480 // Rollback changes made to "this" table by the current import job, delete the
2481 // meta-data files, and release the table lock. This function only applies to
2482 // mode3 import. Table lock and bulk rollbacks are managed by parent cpimport
2483 // (file splitter) process for mode1 and mode2.
2484 //------------------------------------------------------------------------------
rollbackWork()2485 int TableInfo::rollbackWork( )
2486 {
2487 // Close any column or store files left open by abnormal termination.
2488 // We want to do this before reopening the files and doing a bulk rollback.
2489 closeOpenDbFiles();
2490
2491 // Abort "local" bulk rollback if a DBRoot from the start of the job, is
2492 // now missing. User should run cleartablelock to execute a rollback on
2493 // this PM "and" the PM where the DBRoot was moved to.
2494 std::vector<uint16_t> dbRootIds;
2495 Config::getRootIdList( dbRootIds );
2496
2497 for (unsigned int j = 0; j < fOrigDbRootIds.size(); j++)
2498 {
2499 bool bFound = false;
2500
2501 for (unsigned int k = 0; k < dbRootIds.size(); k++)
2502 {
2503 if (fOrigDbRootIds[j] == dbRootIds[k])
2504 {
2505 bFound = true;
2506 break;
2507 }
2508 }
2509
2510 if (!bFound)
2511 {
2512 ostringstream oss;
2513 oss << "Mode3 bulk rollback not performed for table " <<
2514 fTableName << "; DBRoot" << fOrigDbRootIds[j] <<
2515 " moved from this PM during bulk load. " <<
2516 " Run cleartablelock to rollback and release the table lock " <<
2517 "across PMs.";
2518 int rc = ERR_BULK_ROLLBACK_MISS_ROOT;
2519 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
2520 return rc;
2521 }
2522 }
2523
2524 // Restore/rollback the DB files if we got far enough to begin processing
2525 // this table.
2526 int rc = NO_ERROR;
2527
2528 if (hasProcessingBegun())
2529 {
2530 BulkRollbackMgr rbMgr( fTableOID,
2531 fTableLockID,
2532 fTableName,
2533 fProcessName, fLog );
2534
2535 rc = rbMgr.rollback( fKeepRbMetaFile );
2536
2537 if (rc != NO_ERROR)
2538 {
2539 ostringstream oss;
2540 oss << "Error rolling back table " << fTableName <<
2541 "; " << rbMgr.getErrorMsg();
2542 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
2543 return rc;
2544 }
2545 }
2546
2547 // Delete the meta data files after rollback is complete
2548 deleteMetaDataRollbackFile( );
2549
2550 // Release the table lock
2551 rc = releaseTableLock( );
2552
2553 if (rc != NO_ERROR)
2554 {
2555 ostringstream oss;
2556 oss << "Table lock not cleared for table " << fTableName;
2557 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
2558 return rc;
2559 }
2560
2561 return rc;
2562 }
2563
2564 //------------------------------------------------------------------------------
2565 // Allocate extent from BRM (through the stripe allocator).
2566 //------------------------------------------------------------------------------
allocateBRMColumnExtent(OID columnOID,uint16_t dbRoot,uint32_t & partition,uint16_t & segment,BRM::LBID_t & startLbid,int & allocSize,HWM & hwm,std::string & errMsg)2567 int TableInfo::allocateBRMColumnExtent(OID columnOID,
2568 uint16_t dbRoot,
2569 uint32_t& partition,
2570 uint16_t& segment,
2571 BRM::LBID_t& startLbid,
2572 int& allocSize,
2573 HWM& hwm,
2574 std::string& errMsg )
2575 {
2576 int rc = fExtentStrAlloc.allocateExtent( columnOID,
2577 dbRoot,
2578 partition,
2579 segment,
2580 startLbid,
2581 allocSize,
2582 hwm,
2583 errMsg );
2584 //fExtentStrAlloc.print();
2585
2586 return rc;
2587 }
2588
2589 }
2590 // end of namespace
2591