1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*******************************************************************************
19 * $Id: we_tableinfo.cpp 4648 2013-05-29 21:42:40Z rdempsey $
20 *
21 *******************************************************************************/
22 /** @file */
23 
24 #include "we_tableinfo.h"
25 #include "we_bulkstatus.h"
26 #include "we_bulkload.h"
27 
28 #include <sstream>
29 #include <sys/time.h>
30 #include <ctime>
31 #include <unistd.h>
32 #include <sys/types.h>
33 #include <cstdio>
34 #include <cerrno>
35 #include <cstring>
36 #include <utility>
37 // @bug 2099+
38 #include <iostream>
39 #include <libmarias3/marias3.h>
40 #ifdef _MSC_VER
41 #include <stdlib.h>
42 #else
43 #include <string.h>
44 #endif
45 using namespace std;
46 
47 // @bug 2099-
48 #include <boost/filesystem/path.hpp>
49 using namespace boost;
50 
51 #include "we_config.h"
52 #include "we_simplesyslog.h"
53 #include "we_bulkrollbackmgr.h"
54 #include "we_confirmhdfsdbfile.h"
55 
56 #include "querytele.h"
57 using namespace querytele;
58 
59 #include "oamcache.h"
60 #include "cacheutils.h"
61 
62 namespace
63 {
64 const std::string  BAD_FILE_SUFFIX = ".bad"; // Reject data file suffix
65 const std::string  ERR_FILE_SUFFIX = ".err"; // Job error file suffix
66 const std::string  BOLD_START      = "\033[0;1m";
67 const std::string  BOLD_STOP       = "\033[0;39m";
68 }
69 
70 namespace WriteEngine
71 {
72 //------------------------------------------------------------------------------
73 // Puts the current thread to sleep for the specified number of milliseconds.
74 // (Ex: used to wait for a Read buffer to become available.)
75 //------------------------------------------------------------------------------
sleepMS(long ms)76 void TableInfo::sleepMS(long ms)
77 {
78     struct timespec rm_ts;
79 
80     rm_ts.tv_sec = ms / 1000;
81     rm_ts.tv_nsec = ms % 1000 * 1000000;
82 #ifdef _MSC_VER
83     Sleep(ms);
84 #else
85     struct timespec abs_ts;
86 
87     do
88     {
89         abs_ts.tv_sec = rm_ts.tv_sec;
90         abs_ts.tv_nsec = rm_ts.tv_nsec;
91     }
92     while (nanosleep(&abs_ts, &rm_ts) < 0);
93 
94 #endif
95 }
96 
97 //------------------------------------------------------------------------------
98 // TableInfo constructor
99 //------------------------------------------------------------------------------
TableInfo(Log * logger,const BRM::TxnID txnID,const string & processName,OID tableOID,const string & tableName,bool bKeepRbMetaFile)100 TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID,
101                      const string& processName,
102                      OID   tableOID,
103                      const string& tableName,
104                      bool  bKeepRbMetaFile) :
105     fTableId(-1), fBufferSize(0), fFileBufSize(0),
106     fStatusTI(WriteEngine::NEW),
107     fReadBufCount(0), fNumberOfColumns(0),
108     fHandle(NULL), fCurrentReadBuffer(0), fTotalReadRows(0),
109     fTotalErrRows(0), fMaxErrorRows(5),
110     fLastBufferId(-1), fFileBuffer(NULL), fCurrentParseBuffer(0),
111     fNumberOfColsParsed(0), fLocker(-1), fTableName(tableName),
112     fTableOID(tableOID), fJobId(0), fLog(logger), fTxnID(txnID),
113     fRBMetaWriter(processName, logger),
114     fProcessName(processName),
115     fKeepRbMetaFile(bKeepRbMetaFile),
116     fbTruncationAsError(false),
117     fImportDataMode(IMPORT_DATA_TEXT),
118     fTimeZone("SYSTEM"),
119     fTableLocked(false),
120     fReadFromStdin(false),
121     fReadFromS3(false),
122     fNullStringMode(false),
123     fEnclosedByChar('\0'),
124     fEscapeChar('\\'),
125     fProcessingBegun(false),
126     fBulkMode(BULK_MODE_LOCAL),
127     fBRMReporter(logger, tableName),
128     fTableLockID(0),
129     fRejectDataCnt(0),
130     fRejectErrCnt(0),
131     fExtentStrAlloc(tableOID, logger),
132     fOamCachePtr(oam::OamCache::makeOamCache())
133 {
134     fBuffers.clear();
135     fColumns.clear();
136     fStartTime.tv_sec  = 0;
137     fStartTime.tv_usec = 0;
138     string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));
139 
140     if (!teleServerHost.empty())
141     {
142         int teleServerPort = config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port"));
143 
144         if (teleServerPort > 0)
145         {
146             fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort));
147         }
148     }
149 }
150 
151 //------------------------------------------------------------------------------
152 // TableInfo destructor
153 //------------------------------------------------------------------------------
~TableInfo()154 TableInfo::~TableInfo()
155 {
156     fBRMReporter.sendErrMsgToFile(fBRMRptFileName);
157     freeProcessingBuffers();
158 }
159 
160 //------------------------------------------------------------------------------
161 // Frees up processing buffer memory.  We don't reset fReadBufCount to 0,
162 // because BulkLoad::lockColumnForParse() is calling getNumberOfBuffers()
163 // and dividing by the return value.  So best not to risk returning 0.
164 // Once we get far enough to call this freeProcessingBuffers() function,
165 // the application code obviously better be completely through accessing
166 // fBuffers and fColumns.
167 //------------------------------------------------------------------------------
freeProcessingBuffers()168 void TableInfo::freeProcessingBuffers()
169 {
170     // fLog->logMsg(
171     //    string("Releasing TableInfo Buffer for ")+fTableName,
172     //    MSGLVL_INFO1);
173     fBuffers.clear();
174     fColumns.clear();
175     fNumberOfColumns = 0;
176 }
177 
178 //------------------------------------------------------------------------------
179 // Close any database column or dictionary store files left open for this table.
180 // Under "normal" circumstances, there should be no files left open when we
181 // reach the end of the job, but in some error cases, the parsing threads may
182 // bail out without closing a file.  So this function is called as part of
183 // EOJ cleanup for any tables that are still holding a table lock.
184 //
185 // Files will automatically get closed when the program terminates, but when
186 // we are preparing for a bulk rollback, we want to explicitly close the files
187 // before we "reopen" them and start rolling back the contents of the files.
188 //
189 // For mode1 and mode2 imports, cpimport.bin does not lock the table or perform
190 // a bulk rollback, and closeOpenDbFile() is not called.  We instead rely on
191 // the program to implicitly close the files.
192 //------------------------------------------------------------------------------
closeOpenDbFiles()193 void TableInfo::closeOpenDbFiles()
194 {
195     ostringstream oss;
196     oss << "Closing DB files for table " << fTableName <<
197         ", left open by abnormal termination.";
198     fLog->logMsg( oss.str(), MSGLVL_INFO2 );
199 
200     for (unsigned int k = 0; k < fColumns.size(); k++)
201     {
202         stringstream oss1;
203         oss1 << "Closing DB column file for: " <<
204              fColumns[k].column.colName <<
205              " (OID-" << fColumns[k].column.mapOid << ")";
206         fLog->logMsg( oss1.str(), MSGLVL_INFO2 );
207         fColumns[k].closeColumnFile(false, true);
208 
209         if (fColumns[k].column.colType == COL_TYPE_DICT)
210         {
211             stringstream oss2;
212             oss2 << "Closing DB store  file for: "  <<
213                  fColumns[k].column.colName <<
214                  " (OID-" << fColumns[k].column.dctnry.dctnryOid << ")";
215             fLog->logMsg( oss2.str(), MSGLVL_INFO2 );
216             fColumns[k].closeDctnryStore(true);
217         }
218     }
219 }
220 
221 //------------------------------------------------------------------------------
222 // Locks this table for reading to the specified thread (locker) "if" the table
223 // has not yet been assigned to a read thread.
224 //------------------------------------------------------------------------------
lockForRead(const int & locker)225 bool TableInfo::lockForRead(const int& locker)
226 {
227     boost::mutex::scoped_lock lock(fSyncUpdatesTI);
228 
229     if (fLocker == -1)
230     {
231         if (fStatusTI == WriteEngine::NEW )
232         {
233             fLocker = locker;
234             return true;
235         }
236     }
237 
238     return false;
239 }
240 
241 //------------------------------------------------------------------------------
242 // Loop thru reading the import file(s) assigned to this TableInfo object.
243 //------------------------------------------------------------------------------
readTableData()244 int  TableInfo::readTableData( )
245 {
246     RID validTotalRows        = 0;
247     RID totalRowsPerInputFile = 0;
248     int filesTBProcessed = fLoadFileList.size();
249     int fileCounter = 0;
250     unsigned long long qtSentAt = 0;
251 
252     if (fHandle == NULL)
253     {
254 
255         fFileName = fLoadFileList[fileCounter];
256         int rc = openTableFile();
257 
258         if (rc != NO_ERROR)
259         {
260             // Mark the table status as error and exit.
261             boost::mutex::scoped_lock lock(fSyncUpdatesTI);
262             fStatusTI = WriteEngine::ERR;
263             return rc;
264         }
265 
266         fileCounter++;
267     }
268 
269     timeval readStart;
270     gettimeofday(&readStart, NULL);
271     ostringstream ossStartMsg;
272     ossStartMsg << "Start reading and loading table " << fTableName;
273     fLog->logMsg( ossStartMsg.str(), MSGLVL_INFO2 );
274     fProcessingBegun = true;
275 
276     ImportTeleStats its;
277     its.job_uuid = fJobUUID;
278     its.import_uuid = QueryTeleClient::genUUID();
279     its.msg_type = ImportTeleStats::IT_START;
280     its.start_time = QueryTeleClient::timeNowms();
281     its.table_list.push_back(fTableName);
282     its.rows_so_far.push_back(0);
283     its.system_name = fOamCachePtr->getSystemName();
284     its.module_name = fOamCachePtr->getModuleName();
285     string tn = getTableName();
286     its.schema_name = string(tn, 0, tn.find('.'));
287     fQtc.postImportTele(its);
288 
289     //
290     // LOOP to read all the import data for this table
291     //
292     while (true)
293     {
294         // See if JobStatus has been set to terminate by another thread
295         if (BulkStatus::getJobStatus() == EXIT_FAILURE)
296         {
297             boost::mutex::scoped_lock lock(fSyncUpdatesTI);
298             fStartTime = readStart;
299             fStatusTI  = WriteEngine::ERR;
300             its.msg_type = ImportTeleStats::IT_TERM;
301             its.rows_so_far.pop_back();
302             its.rows_so_far.push_back(0);
303             fQtc.postImportTele(its);
304             throw SecondaryShutdownException( "TableInfo::"
305                                               "readTableData(1) responding to job termination");
306         }
307 
308 // @bug 3271: Conditionally compile the thread deadlock debug logging
309 #ifdef DEADLOCK_DEBUG
310         // @bug2099+.  Temp hack to diagnose deadlock.
311         struct timeval tvStart;
312         gettimeofday(&tvStart, 0);
313         bool report = false;
314         bool reported = false;
315         // @bug2099-
316 #else
317         const bool report = false;
318 #endif
319 
320 #ifdef PROFILE
321         Stats::startReadEvent(WE_STATS_WAIT_FOR_READ_BUF);
322 #endif
323 
324         //
325         // LOOP to wait for, and read, the next avail BulkLoadBuffer object
326         //
327         while (!isBufferAvailable(report))
328         {
329             // See if JobStatus has been set to terminate by another thread
330             if (BulkStatus::getJobStatus() == EXIT_FAILURE)
331             {
332                 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
333                 fStartTime = readStart;
334                 fStatusTI  = WriteEngine::ERR;
335                 its.msg_type = ImportTeleStats::IT_TERM;
336                 its.rows_so_far.pop_back();
337                 its.rows_so_far.push_back(0);
338                 fQtc.postImportTele(its);
339                 throw SecondaryShutdownException( "TableInfo::"
340                                                   "readTableData(2) responding to job termination");
341             }
342 
343             // Sleep and check the condition again.
344             sleepMS(1);
345 #ifdef DEADLOCK_DEBUG
346 
347             // @bug2099+
348             if (report) report = false; // report one time.
349 
350             if (!reported)
351             {
352                 struct timeval tvNow;
353                 gettimeofday(&tvNow, 0);
354 
355                 if ((tvNow.tv_sec - tvStart.tv_sec) > 100)
356                 {
357                     time_t t = time(0);
358                     char timeString[50];
359                     ctime_r(&t, timeString);
360                     timeString[ strlen(timeString) - 1 ] = '\0';
361                     ostringstream oss;
362                     oss << endl << timeString << ": " <<
363                         "TableInfo::readTableData: " << fTableName <<
364                         "; Diff is " << (tvNow.tv_sec - tvStart.tv_sec) <<
365                         endl;
366                     cout << oss.str();
367                     cout.flush();
368                     report = true;
369                     reported = true;
370                 }
371             }
372 
373             // @bug2099-
374 #endif
375         }
376 
377 #ifdef PROFILE
378         Stats::stopReadEvent(WE_STATS_WAIT_FOR_READ_BUF);
379         Stats::startReadEvent(WE_STATS_READ_INTO_BUF);
380 #endif
381 
382         int readBufNo   = fCurrentReadBuffer;
383         int prevReadBuf = (fCurrentReadBuffer - 1);
384 
385         if (prevReadBuf < 0)
386             prevReadBuf = fReadBufCount + prevReadBuf;
387 
388         // We keep a running total of read errors;  fMaxErrorRows specifies
389         // the error limit.  Here's where we see how many more errors we
390         // still have below the limit, and we pass this to fillFromFile().
391         unsigned allowedErrCntThisCall =
392             ( (fMaxErrorRows > fTotalErrRows) ?
393               (fMaxErrorRows - fTotalErrRows) : 0 );
394 
395         // Fill in the specified buffer.
396         // fTotalReadRowsPerInputFile is ongoing total number of rows read,
397         //   per input file.
398         // validTotalRows is ongoing total of valid rows read for all files
399         //   pertaining to this DB table.
400         int readRc;
401         if (fReadFromS3)
402         {
403             readRc = fBuffers[readBufNo].fillFromMemory(
404                         fBuffers[prevReadBuf], fFileBuffer, fS3ReadLength, &fS3ParseLength,
405                         totalRowsPerInputFile, validTotalRows, fColumns,
406                         allowedErrCntThisCall);
407         }
408         else
409         {
410             readRc = fBuffers[readBufNo].fillFromFile(
411                          fBuffers[prevReadBuf], fHandle, totalRowsPerInputFile,
412                          validTotalRows, fColumns, allowedErrCntThisCall);
413         }
414 
415         if (readRc != NO_ERROR)
416         {
417             // error occurred.
418             // need to exit.
419             // mark the table status as error and exit.
420             {
421                 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
422                 fStartTime = readStart;
423                 fStatusTI  = WriteEngine::ERR;
424                 fBuffers[readBufNo].setStatusBLB(WriteEngine::ERR);
425             }
426             closeTableFile();
427 
428             // Error occurred on next row not read, so increment
429             // totalRowsPerInputFile row count for the error msg
430             WErrorCodes ec;
431             ostringstream oss;
432             oss << "Error reading import file " << fFileName <<
433                 "; near line " << totalRowsPerInputFile + 1 << "; " <<
434                 ec.errorString(readRc);
435             fLog->logMsg( oss.str(), readRc, MSGLVL_ERROR);
436 
437             its.msg_type = ImportTeleStats::IT_TERM;
438             its.rows_so_far.pop_back();
439             its.rows_so_far.push_back(0);
440             fQtc.postImportTele(its);
441 
442             return readRc;
443         }
444 
445 #ifdef PROFILE
446         Stats::stopReadEvent(WE_STATS_READ_INTO_BUF);
447 #endif
448         its.msg_type = ImportTeleStats::IT_PROGRESS;
449         its.rows_so_far.pop_back();
450         its.rows_so_far.push_back(totalRowsPerInputFile);
451         unsigned long long thisRows = static_cast<unsigned long long>(totalRowsPerInputFile);
452         thisRows /= 1000000;
453 
454         if (thisRows > qtSentAt)
455         {
456             fQtc.postImportTele(its);
457             qtSentAt = thisRows;
458         }
459 
460         // Check if there were any errors in the read data.
461         // if yes, copy it to the error list.
462         // if the number of errors is greater than the maximum error count
463         // mark the table status as error and exit.
464         // call the method to copy the errors
465         writeErrorList( &fBuffers[readBufNo].getErrorRows(),
466                         &fBuffers[readBufNo].getExactErrorRows(), false );
467         fBuffers[readBufNo].clearErrRows();
468 
469         if (fTotalErrRows > fMaxErrorRows)
470         {
471             // flush the reject data file and output the rejected rows
472             // flush err file and output the rejected row id and the reason.
473             writeErrorList( 0, 0, true );
474 
475             // number of errors > maximum allowed. hence return error.
476             {
477                 boost::mutex::scoped_lock lock(fSyncUpdatesTI);
478                 fStartTime = readStart;
479                 fStatusTI  = WriteEngine::ERR;
480                 fBuffers[readBufNo].setStatusBLB(WriteEngine::ERR);
481             }
482             closeTableFile();
483             ostringstream oss5;
484             oss5 << "Actual error row count("   << fTotalErrRows <<
485                  ") exceeds the max error rows(" << fMaxErrorRows <<
486                  ") allowed for table " << fTableName;
487             fLog->logMsg(oss5.str(), ERR_BULK_MAX_ERR_NUM, MSGLVL_ERROR);
488 
489             // List Err and Bad files to report file (if applicable)
490             fBRMReporter.rptMaxErrJob( fBRMRptFileName, fErrFiles, fBadFiles );
491 
492             its.msg_type = ImportTeleStats::IT_TERM;
493             its.rows_so_far.pop_back();
494             its.rows_so_far.push_back(0);
495             fQtc.postImportTele(its);
496 
497             return ERR_BULK_MAX_ERR_NUM;
498         }
499 
500         // mark the buffer status as read complete.
501         {
502 #ifdef PROFILE
503             Stats::startReadEvent(WE_STATS_WAIT_TO_COMPLETE_READ);
504 #endif
505             boost::mutex::scoped_lock lock(fSyncUpdatesTI);
506 #ifdef PROFILE
507             Stats::stopReadEvent(WE_STATS_WAIT_TO_COMPLETE_READ);
508             Stats::startReadEvent(WE_STATS_COMPLETING_READ);
509 #endif
510 
511             fStartTime = readStart;
512             fBuffers[readBufNo].setStatusBLB(WriteEngine::READ_COMPLETE);
513 
514             fCurrentReadBuffer = (fCurrentReadBuffer + 1) % fReadBufCount;
515 
516             // bufferCount++;
517             if ( (fHandle && feof(fHandle)) || (fReadFromS3 && (fS3ReadLength == fS3ParseLength)) )
518             {
519                 timeval readFinished;
520                 gettimeofday(&readFinished, NULL);
521 
522                 closeTableFile();
523 
524                 if (fReadFromStdin)
525                 {
526                     fLog->logMsg( "Finished loading " + fTableName + " from STDIN" +
527                                   ", Time taken = " + Convertor::int2Str((int)
528                                           (readFinished.tv_sec - readStart.tv_sec)) +
529                                   " seconds",
530                                   //" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
531                                   MSGLVL_INFO2 );
532                 }
533                 else if(fReadFromS3)
534                 {
535                     fLog->logMsg( "Finished loading " + fTableName + " from S3" +
536                                   ", Time taken = " + Convertor::int2Str((int)
537                                           (readFinished.tv_sec - readStart.tv_sec)) +
538                                   " seconds",
539                                   //" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
540                                   MSGLVL_INFO2 );
541                 }
542                 else
543                 {
544                     fLog->logMsg( "Finished reading file " + fFileName +
545                                   ", Time taken = " + Convertor::int2Str((int)
546                                           (readFinished.tv_sec - readStart.tv_sec)) +
547                                   " seconds",
548                                   //" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
549                                   MSGLVL_INFO2 );
550                 }
551 
552                 // flush the reject data file and output the rejected rows
553                 // flush err file and output the rejected row id and the reason.
554                 writeErrorList( 0, 0, true );
555 
556                 // If > 1 file for this table, then open next file in the list
557                 if ( fileCounter < filesTBProcessed )
558                 {
559                     fFileName = fLoadFileList[fileCounter];
560                     int rc = openTableFile();
561 
562                     if (rc != NO_ERROR)
563                     {
564                         // Mark the table status as error and exit.
565                         fStatusTI   = WriteEngine::ERR;
566                         return rc;
567                     }
568 
569                     fileCounter++;
570                     fTotalReadRows += totalRowsPerInputFile;
571                     totalRowsPerInputFile = 0;
572                 }
573                 else  // All files read for this table; break out of read loop
574                 {
575                     fStatusTI     = WriteEngine::READ_COMPLETE;
576                     fLastBufferId = readBufNo;
577                     fTotalReadRows += totalRowsPerInputFile;
578                     break;
579                 }
580 
581                 gettimeofday(&readStart, NULL);
582             } // reached EOF
583 
584 #ifdef PROFILE
585             Stats::stopReadEvent(WE_STATS_COMPLETING_READ);
586 #endif
587         } // mark buffer status as read-complete within scope of a mutex
588     }     // loop to read all data for this table
589 
590     its.msg_type = ImportTeleStats::IT_SUMMARY;
591     its.end_time = QueryTeleClient::timeNowms();
592     its.rows_so_far.pop_back();
593     its.rows_so_far.push_back(fTotalReadRows);
594     fQtc.postImportTele(its);
595     fQtc.waitForQueues();
596 
597     return NO_ERROR;
598 }
599 
600 //------------------------------------------------------------------------------
601 // writeErrorList()
602 //   errorRows    - vector of row numbers and corresponding error messages
603 //   errorDatRows - vector of bad rows that have been rejected
604 //
605 // Adds errors pertaining to a specific buffer, to the cumulative list of
606 // errors to be reported to the user.
607 //------------------------------------------------------------------------------
writeErrorList(const std::vector<std::pair<RID,std::string>> * errorRows,const std::vector<std::string> * errorDatRows,bool bCloseFile)608 void TableInfo::writeErrorList(const std::vector< std::pair<RID,
609                                std::string> >* errorRows,
610                                const std::vector<std::string>* errorDatRows,
611                                bool bCloseFile)
612 {
613     size_t errorRowsCount    = 0;
614     size_t errorDatRowsCount = 0;
615 
616     if (errorRows)
617         errorRowsCount = errorRows->size();
618 
619     if (errorDatRows)
620         errorDatRowsCount = errorDatRows->size();
621 
622     if ((errorRowsCount    > 0) ||
623             (errorDatRowsCount > 0) ||
624             (bCloseFile))
625     {
626         boost::mutex::scoped_lock lock(fErrorRptInfoMutex);
627 
628         if ((errorRowsCount > 0)    || (bCloseFile))
629             writeErrReason(errorRows, bCloseFile);
630 
631         if ((errorDatRowsCount > 0) || (bCloseFile))
632             writeBadRows  (errorDatRows, bCloseFile);
633 
634         fTotalErrRows += errorRowsCount;
635     }
636 }
637 
638 //------------------------------------------------------------------------------
639 // Parse the specified column (columnId) in the specified buffer (bufferId).
640 //------------------------------------------------------------------------------
parseColumn(const int & columnId,const int & bufferId,double & processingTime)641 int  TableInfo::parseColumn(const int& columnId, const int& bufferId,
642                             double& processingTime)
643 {
644     // parse the column
645     // note the time and update the column's last processing time
646     timeval parseStart, parseEnd;
647     gettimeofday(&parseStart, NULL);
648 
649     // Will need to check whether the column needs to extend.
650     // If size of the file is less than the required size, extend the column
651     int rc = fBuffers[bufferId].parse(fColumns[columnId]);
652     gettimeofday(&parseEnd, NULL);
653 
654     processingTime = (parseEnd.tv_usec / 1000 + parseEnd.tv_sec * 1000) -
655                      (parseStart.tv_usec / 1000 + parseStart.tv_sec * 1000);
656 
657     return rc;
658 }
659 
660 //------------------------------------------------------------------------------
661 // Mark the specified column (columnId) in the specified buffer (bufferId) as
662 // PARSE_COMPLETE.  If this is the last column to be parsed for this buffer,
663 // then mark the buffer as PARSE_COMPLETE.
664 // If the last buffer for this table has been read (fLastBufferId != -1), then
665 // see if all the data for columnId has been parsed for all the buffers, in
666 // which case we are finished parsing columnId.
667 // If this is the last column to finish parsing for this table, then mark the
668 // table status as PARSE_COMPLETE.
669 //------------------------------------------------------------------------------
setParseComplete(const int & columnId,const int & bufferId,double processingTime)670 int TableInfo::setParseComplete(const int& columnId,
671                                 const int& bufferId,
672                                 double processingTime)
673 {
674     boost::mutex::scoped_lock lock(fSyncUpdatesTI);
675 
676     // Check table status in case race condition results in this function
677     // being called after fStatusTI was set to ERR by another thread.
678     if (fStatusTI == WriteEngine::ERR)
679         return ERR_UNKNOWN;
680 
681     fColumns[columnId].lastProcessingTime   = processingTime;
682 #ifdef PROFILE
683     fColumns[columnId].totalProcessingTime += processingTime;
684 #endif
685 
686     // Set buffer status to complete if setColumnStatus indicates that
687     // all the columns are complete
688     if (fBuffers[bufferId].setColumnStatus(
689                 columnId, WriteEngine::PARSE_COMPLETE))
690         fBuffers[bufferId].setStatusBLB( WriteEngine::PARSE_COMPLETE );
691 
692     // fLastBufferId != -1 means the Read thread has read the last
693     // buffer for this table
694     if (fLastBufferId != -1)
695     {
696         // check if the status of the column in all the fBuffers is parse
697         // complete then update the column status as parse complete.
698         bool allBuffersDoneForAColumn = true;
699 
700         for (int i = 0; i < fReadBufCount; ++i)
701         {
702             // check the status of the column in this buffer.
703             Status bufferStatus = fBuffers[i].getStatusBLB();
704 
705             if ( (bufferStatus == WriteEngine::READ_COMPLETE) ||
706                     (bufferStatus == WriteEngine::PARSE_COMPLETE) )
707             {
708                 if (fBuffers[i].getColumnStatus(columnId) !=
709                         WriteEngine::PARSE_COMPLETE)
710                 {
711                     allBuffersDoneForAColumn = false;
712                     break;
713                 }
714             }
715         }
716 
717         // allBuffersDoneForAColumn==TRUE means we are finished parsing columnId
718         if (allBuffersDoneForAColumn)
719         {
720             // Accumulate list of HWM dictionary blocks to be flushed from cache
721             std::vector<BRM::LBID_t> dictBlksToFlush;
722             fColumns[columnId].getDictFlushBlks( dictBlksToFlush );
723 
724             for (unsigned kk = 0; kk < dictBlksToFlush.size(); kk++)
725             {
726                 fDictFlushBlks.push_back( dictBlksToFlush[kk] );
727             }
728 
729             int rc = fColumns[columnId].finishParsing( );
730 
731             if (rc != NO_ERROR)
732             {
733                 WErrorCodes ec;
734                 ostringstream oss;
735                 oss << "setParseComplete completion error; "
736                     "Failed to load table: " <<
737                     fTableName << "; " << ec.errorString(rc);
738                 fLog->logMsg( oss.str(), rc, MSGLVL_ERROR);
739                 fStatusTI = WriteEngine::ERR;
740                 return rc;
741             }
742 
743             fNumberOfColsParsed++;
744 
745             //
746             // If all columns have been parsed, then finished with this tbl
747             //
748             if (fNumberOfColsParsed >= fNumberOfColumns)
749             {
750                 // After closing the column and dictionary store files,
751                 // flush any updated dictionary blocks in PrimProc.
752                 // We only do this for non-HDFS.  For HDFS we don't want
753                 // to flush till "after" we have "confirmed" all the file
754                 // changes, which flushes the changes to disk.
755                 if (!idbdatafile::IDBPolicy::useHdfs())
756                 {
757                     if (fDictFlushBlks.size() > 0)
758                     {
759 #ifdef PROFILE
760                         Stats::startParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
761 #endif
762                         if (fLog->isDebug(DEBUG_2))
763                         {
764                             ostringstream oss;
765                             oss << "Dictionary cache flush: ";
766                             for (uint32_t i = 0; i < fDictFlushBlks.size(); i++)
767                             {
768                                 oss << fDictFlushBlks[i] << ", ";
769                             }
770                             oss << endl;
771                             fLog->logMsg( oss.str(), MSGLVL_INFO1 );
772                         }
773                         cacheutils::flushPrimProcAllverBlocks(fDictFlushBlks);
774 #ifdef PROFILE
775                         Stats::stopParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
776 #endif
777                         fDictFlushBlks.clear();
778                     }
779                 }
780 
781                 // Update auto-increment next value if applicable.
782                 rc = synchronizeAutoInc( );
783 
784                 if (rc != NO_ERROR)
785                 {
786                     WErrorCodes ec;
787                     ostringstream oss;
788                     oss << "setParseComplete: autoInc update error; "
789                         "Failed to load table: " << fTableName <<
790                         "; " << ec.errorString(rc);
791                     fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
792                     fStatusTI = WriteEngine::ERR;
793                     return rc;
794                 }
795 
796                 //..Validate that all the HWM's are consistent and in-sync
797                 std::vector<DBRootExtentInfo> segFileInfo;
798 
799                 for (unsigned i = 0; i < fColumns.size(); ++i)
800                 {
801                     DBRootExtentInfo extentInfo;
802                     fColumns[i].getSegFileInfo( extentInfo );
803                     segFileInfo.push_back( extentInfo );
804                 }
805 
806                 rc = validateColumnHWMs( 0, segFileInfo, "Ending" );
807 
808                 if (rc != NO_ERROR)
809                 {
810                     WErrorCodes ec;
811                     ostringstream oss;
812                     oss << "setParseComplete: HWM validation error; "
813                         "Failed to load table: " << fTableName <<
814                         "; " << ec.errorString(rc);
815                     fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
816                     fStatusTI = WriteEngine::ERR;
817 
818                     ostringstream oss2;
819                     oss2 << "Ending HWMs for table " << fTableName << ": ";
820 
821                     for (unsigned int n = 0; n < fColumns.size(); n++)
822                     {
823                         oss2 << std::endl;
824                         oss2 << "  " << fColumns[n].column.colName <<
825                              "; DBRoot/part/seg/hwm: "        <<
826                              segFileInfo[n].fDbRoot           <<
827                              "/" << segFileInfo[n].fPartition <<
828                              "/" << segFileInfo[n].fSegment   <<
829                              "/" << segFileInfo[n].fLocalHwm;
830                     }
831 
832                     fLog->logMsg(oss2.str(), MSGLVL_INFO1);
833 
834                     return rc;
835                 }
836 
837                 //..Confirm changes to DB files (necessary for HDFS)
838                 rc = confirmDBFileChanges( );
839 
840                 if (rc != NO_ERROR)
841                 {
842                     WErrorCodes ec;
843                     ostringstream oss;
844                     oss << "setParseComplete: Error confirming DB changes; "
845                         "Failed to load table: " << fTableName <<
846                         "; " << ec.errorString(rc);
847                     fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
848                     fStatusTI = WriteEngine::ERR;
849                     return rc;
850                 }
851 
852                 //..Update BRM with HWM and Casual Partition info, etc.
853                 rc = finishBRM( );
854 
855                 if (rc != NO_ERROR)
856                 {
857                     WErrorCodes ec;
858                     ostringstream oss;
859                     oss << "setParseComplete: BRM error; "
860                         "Failed to load table: " << fTableName <<
861                         "; " << ec.errorString(rc);
862                     fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
863                     fStatusTI = WriteEngine::ERR;
864                     return rc;
865                 }
866 
867                 // Change table lock state to CLEANUP
868                 rc = changeTableLockState( );
869 
870                 if (rc != NO_ERROR)
871                 {
872                     WErrorCodes ec;
873                     ostringstream oss;
874                     oss << "setParseComplete: table lock state change error; "
875                         "Table load completed: " << fTableName << "; " <<
876                         ec.errorString(rc);
877                     fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
878                     fStatusTI = WriteEngine::ERR;
879                     return rc;
880                 }
881 
882                 // Finished with this table, so delete bulk rollback
883                 // meta data file and release the table lock.
884                 deleteTempDBFileChanges();
885                 deleteMetaDataRollbackFile();
886 
887                 rc = releaseTableLock( );
888 
889                 if (rc != NO_ERROR)
890                 {
891                     WErrorCodes ec;
892                     ostringstream oss;
893                     oss << "setParseComplete: table lock release error; "
894                         "Failed to load table: " << fTableName << "; " <<
895                         ec.errorString(rc);
896                     fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
897                     fStatusTI = WriteEngine::ERR;
898                     return rc;
899                 }
900 
901 #ifdef PROFILE
902 
903                 // Loop through columns again to print out the elapsed
904                 // parse times
905                 for (unsigned i = 0; i < fColumns.size(); ++i)
906                 {
907                     ostringstream ossColTime;
908                     ossColTime << "Column " << i << "; OID-" <<
909                                fColumns[i].column.mapOid << "; parseTime-" <<
910                                (fColumns[i].totalProcessingTime / 1000.0) <<
911                                " seconds";
912                     fLog->logMsg(ossColTime.str(), MSGLVL_INFO1);
913                 }
914 
915 #endif
916 
917                 timeval endTime;
918                 gettimeofday(&endTime, 0);
919                 double elapsedTime =
920                     (endTime.tv_sec    + (endTime.tv_usec    / 1000000.0)) -
921                     (fStartTime.tv_sec + (fStartTime.tv_usec / 1000000.0));
922 
923                 fStatusTI = WriteEngine::PARSE_COMPLETE;
924                 reportTotals(elapsedTime);
925 
926                 // Reduce memory use by allocating and releasing as needed
927                 freeProcessingBuffers();
928 
929             } // end of if (fNumberOfColsParsed >= fNumberOfColumns)
930         }     // end of if (allBuffersDoneForAColumn)
931     }         // end of if (fLastBufferId != -1)
932 
933     // If we finished parsing the buffer associated with currentParseBuffer,
934     // but have not finshed the entire table, then advance currentParseBuffer.
935     if ((fStatusTI != WriteEngine::PARSE_COMPLETE) &&
936             (fBuffers[bufferId].getStatusBLB() == WriteEngine::PARSE_COMPLETE))
937     {
938         // Find the BulkLoadBuffer object that is next in line to be parsed
939         // and assign fCurrentParseBuffer accordingly.  Break out of the
940         // loop if we wrap all the way around and catch up with the current-
941         // Read buffer.
942         if (bufferId == fCurrentParseBuffer)
943         {
944             int currentParseBuffer = fCurrentParseBuffer;
945 
946             while (fBuffers[currentParseBuffer].getStatusBLB() ==
947                     WriteEngine::PARSE_COMPLETE )
948             {
949                 currentParseBuffer     = (currentParseBuffer + 1) %
950                                          fReadBufCount;
951                 fCurrentParseBuffer    = currentParseBuffer;
952 
953                 if (fCurrentParseBuffer == fCurrentReadBuffer) break;
954             }
955         }
956     }
957 
958     return NO_ERROR;
959 }
960 
961 //------------------------------------------------------------------------------
962 // Report summary totals to applicable destination (stdout, cpimport.bin log
963 // file, BRMReport file (for mode1) etc).
964 // elapsedTime is number of seconds taken to import this table.
965 //------------------------------------------------------------------------------
reportTotals(double elapsedTime)966 void TableInfo::reportTotals(double elapsedTime)
967 {
968     ostringstream oss1;
969     oss1 << "For table " << fTableName <<
970          ": " << fTotalReadRows << " rows processed and " <<
971          (fTotalReadRows - fTotalErrRows) << " rows inserted.";
972 
973     fLog->logMsg(oss1.str(), MSGLVL_INFO1);
974 
975     ostringstream oss2;
976     oss2 << "For table " << fTableName << ": " <<
977          "Elapsed time to load this table: " <<
978          elapsedTime << " secs";
979 
980     fLog->logMsg(oss2.str(), MSGLVL_INFO2);
981 
982     // @bug 3504: Loop through columns to print saturation counts
983     std::vector<boost::tuple<execplan::CalpontSystemCatalog::ColDataType, uint64_t, uint64_t> > satCounts;
984 
985     for (unsigned i = 0; i < fColumns.size(); ++i)
986     {
987         //std::string colName(fTableName);
988         // colName += '.';
989         // colName += fColumns[i].column.colName;
990         long long satCount = fColumns[i].saturatedCnt();
991 
992         satCounts.push_back(boost::make_tuple(fColumns[i].column.dataType,
993                                               fColumns[i].column.mapOid,
994                                               satCount));
995 
996         if (satCount > 0)
997         {
998             // @bug 3375: report invalid dates/times set to null
999             ostringstream ossSatCnt;
1000             ossSatCnt << "Column " << fTableName << '.' <<
1001                       fColumns[i].column.colName << "; Number of ";
1002 
1003             if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::DATE)
1004             {
1005                 ossSatCnt <<
1006                           "invalid dates replaced with zero value : ";
1007             }
1008             else if (fColumns[i].column.dataType ==
1009               execplan::CalpontSystemCatalog::DATETIME)
1010             {
1011                 //bug5383
1012                 ossSatCnt <<
1013                           "invalid date/times replaced with zero value : ";
1014             }
1015 
1016             else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::TIMESTAMP)
1017             {
1018                 ossSatCnt <<
1019                           "invalid timestamps replaced with zero value : ";
1020             }
1021             else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::TIME)
1022             {
1023                 ossSatCnt <<
1024                           "invalid times replaced with zero value : ";
1025             }
1026             else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::CHAR)
1027                 ossSatCnt <<
1028                           "character strings truncated: ";
1029             else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::VARCHAR)
1030                 ossSatCnt << "character strings truncated: ";
1031             else
1032                 ossSatCnt <<
1033                           "rows inserted with saturated values: ";
1034 
1035             ossSatCnt << satCount;
1036             fLog->logMsg(ossSatCnt.str(), MSGLVL_WARNING);
1037         }
1038     }
1039 
1040     logging::Message::Args tblFinishedMsgArgs;
1041     tblFinishedMsgArgs.add( fJobId );
1042     tblFinishedMsgArgs.add( fTableName );
1043     tblFinishedMsgArgs.add( (fTotalReadRows - fTotalErrRows) );
1044     SimpleSysLog::instance()->logMsg(
1045         tblFinishedMsgArgs,
1046         logging::LOG_TYPE_INFO,
1047         logging::M0083);
1048 
1049     //Bug1375 - cpimport.bin did not add entries to the transaction
1050     //          log file: data_mods.log
1051     if ((fTotalReadRows - fTotalErrRows) > 0 )
1052         logToDataMods(fjobFileName, oss1.str());
1053 
1054     // Log totals in report file if applicable
1055     fBRMReporter.reportTotals( fTotalReadRows,
1056                                (fTotalReadRows - fTotalErrRows),
1057                                satCounts );
1058 }
1059 
1060 //------------------------------------------------------------------------------
1061 // Report BRM updates to a report file or to BRM directly.
1062 //------------------------------------------------------------------------------
finishBRM()1063 int TableInfo::finishBRM( )
1064 {
1065     // Collect the CP and HWM information for all the columns
1066     for (unsigned i = 0; i < fColumns.size(); ++i)
1067     {
1068         fColumns[i].getBRMUpdateInfo( fBRMReporter );
1069     }
1070 
1071     // We use mutex not to synchronize contention among parallel threads,
1072     // because we should be the only thread accessing the fErrFiles and
1073     // fBadFiles at this point.  But we do use the mutex as a memory barrier
1074     // to make sure we have the latest copy of the data.
1075     std::vector<std::string>* errFiles = 0;
1076     std::vector<std::string>* badFiles = 0;
1077     {
1078         boost::mutex::scoped_lock lock(fErrorRptInfoMutex);
1079         errFiles = &fErrFiles;
1080         badFiles = &fBadFiles;
1081     }
1082 
1083     // Save the info just collected, to a report file or send to BRM
1084     int rc = fBRMReporter.sendBRMInfo( fBRMRptFileName, *errFiles, *badFiles );
1085 
1086     return rc;
1087 }
1088 
1089 //------------------------------------------------------------------------------
1090 // Update status of table to reflect an error.
1091 // No need to update the buffer or column status, because we are not going to
1092 // continue the job anyway.  Other threads should terminate when they see that
1093 // the JobStatus has been set to EXIT_FAILURE and/or the table status has been
1094 // set to WriteEngine::ERR.
1095 //------------------------------------------------------------------------------
setParseError()1096 void TableInfo::setParseError( )
1097 {
1098     boost::mutex::scoped_lock lock(fSyncUpdatesTI);
1099     fStatusTI = WriteEngine::ERR;
1100 }
1101 
1102 //------------------------------------------------------------------------------
1103 // Locks a column from the specified buffer (bufferId) for the specified parse
1104 // thread (id); and returns the column id.  A return value of -1 means no
1105 // column could be locked for parsing.
1106 //------------------------------------------------------------------------------
1107 // @bug2099. Temporary hack to diagnose deadlock.
1108 // Added report parm and couts below.
getColumnForParse(const int & id,const int & bufferId,bool report)1109 int  TableInfo::getColumnForParse(const int& id,
1110                                   const int& bufferId,
1111                                   bool report)
1112 {
1113     boost::mutex::scoped_lock lock(fSyncUpdatesTI);
1114     double maxTime = 0;
1115     int columnId = -1;
1116 
1117     while (true)
1118     {
1119         // See if JobStatus has been set to terminate by another thread
1120         if (BulkStatus::getJobStatus() == EXIT_FAILURE)
1121         {
1122             fStatusTI = WriteEngine::ERR;
1123             throw SecondaryShutdownException( "TableInfo::"
1124                                               "getColumnForParse() responding to job termination");
1125         }
1126 
1127         if ( !bufferReadyForParse(bufferId, report) ) return -1;
1128 
1129         // @bug2099+
1130         ostringstream oss;
1131 
1132         if (report)
1133         {
1134 #ifdef _MSC_VER
1135             oss << " ----- " << GetCurrentThreadId() << ":fBuffers[" << bufferId <<
1136 #else
1137             oss << " ----- " << pthread_self() << ":fBuffers[" << bufferId <<
1138 #endif
1139                 "]: (colLocker,status,lasttime)- ";
1140         }
1141 
1142         // @bug2099-
1143 
1144         for (unsigned k = 0; k < fNumberOfColumns; ++k)
1145         {
1146             // @bug2099+
1147             if (report)
1148             {
1149                 Status colStatus = fBuffers[bufferId].getColumnStatus(k);
1150                 int colLocker = fBuffers[bufferId].getColumnLocker(k);
1151 
1152                 string colStatusStr;
1153                 ColumnInfo::convertStatusToString(colStatus, colStatusStr);
1154 
1155                 oss << '(' << colLocker << ',' << colStatusStr << ',' <<
1156                     fColumns[k].lastProcessingTime << ") ";
1157             }
1158 
1159             // @bug2099-
1160 
1161             if (fBuffers[bufferId].getColumnLocker(k) == -1)
1162             {
1163                 if (columnId == -1)
1164                     columnId = k;
1165                 else if (fColumns[k].lastProcessingTime == 0)
1166                 {
1167                     if (fColumns[k].column.width >=
1168                             fColumns[columnId].column.width)
1169                         columnId = k;
1170                 }
1171                 else if (fColumns[k].lastProcessingTime > maxTime)
1172                 {
1173                     maxTime = fColumns[k].lastProcessingTime;
1174                     columnId = k;
1175                 }
1176             }
1177         }
1178 
1179         // @bug2099+
1180         if (report)
1181         {
1182             oss << "; selected colId: " << columnId;
1183 
1184             if (columnId != -1)
1185                 oss << "; maxTime: " << maxTime;
1186 
1187             oss << endl;
1188 
1189             if (!BulkLoad::disableConsoleOutput())
1190             {
1191                 cout << oss.str();
1192                 cout.flush();
1193             }
1194         }
1195 
1196         // @bug2099-
1197 
1198         if (columnId == -1) return -1;
1199 
1200         if (fBuffers[bufferId].tryAndLockColumn(columnId, id))
1201         {
1202             return columnId;
1203         }
1204     }
1205 }
1206 
1207 //------------------------------------------------------------------------------
1208 // Check if the specified buffer is ready for parsing (status == READ_COMPLETE)
1209 // @bug 2099.  Temporary hack to diagnose deadlock.  Added report parm
1210 //             and couts below.
1211 //------------------------------------------------------------------------------
bufferReadyForParse(const int & bufferId,bool report) const1212 bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const
1213 {
1214     if (fBuffers.size() == 0)
1215         return false;
1216 
1217     Status stat = fBuffers[bufferId].getStatusBLB();
1218 
1219     if (report)
1220     {
1221         ostringstream oss;
1222         string bufStatusStr;
1223         ColumnInfo::convertStatusToString( stat,
1224                                            bufStatusStr );
1225 #ifdef _MSC_VER
1226         oss << " --- " << GetCurrentThreadId() <<
1227 #else
1228         oss << " --- " << pthread_self() <<
1229 #endif
1230             ":fBuffers[" << bufferId << "]=" << bufStatusStr <<
1231             " (" << stat << ")" << std::endl;
1232         cout << oss.str();
1233     }
1234 
1235     return (stat == WriteEngine::READ_COMPLETE) ? true : false;
1236 }
1237 
1238 //------------------------------------------------------------------------------
1239 // Create the specified number (noOfBuffer) of BulkLoadBuffer objects and store
1240 // them in fBuffers.  jobFieldRefList lists the fields in this import.
1241 // fixedBinaryRecLen is fixed record length for binary imports (it is n/a
1242 // for text bulk loads).
1243 //------------------------------------------------------------------------------
initializeBuffers(int noOfBuffers,const JobFieldRefList & jobFieldRefList,unsigned int fixedBinaryRecLen)1244 int TableInfo::initializeBuffers(int   noOfBuffers,
1245                                  const JobFieldRefList& jobFieldRefList,
1246                                  unsigned int fixedBinaryRecLen)
1247 {
1248 #ifdef _MSC_VER
1249 
1250     //@bug 3751
1251     //When reading from STDIN, Windows doesn't like the huge default buffer of
1252     //  1M, so turn it down.
1253     if (fReadFromStdin)
1254     {
1255         fBufferSize = std::min(10240, fBufferSize);
1256     }
1257 
1258 #endif
1259 
1260     fReadBufCount = noOfBuffers;
1261 
1262     // initialize and populate the buffer vector.
1263     for (int i = 0; i < fReadBufCount; ++i)
1264     {
1265         BulkLoadBuffer* buffer = new BulkLoadBuffer(fNumberOfColumns,
1266                 fBufferSize, fLog,
1267                 i, fTableName,
1268                 jobFieldRefList);
1269         buffer->setColDelimiter  (fColDelim);
1270         buffer->setNullStringMode(fNullStringMode);
1271         buffer->setEnclosedByChar(fEnclosedByChar);
1272         buffer->setEscapeChar    (fEscapeChar    );
1273         buffer->setTruncationAsError(getTruncationAsError());
1274         buffer->setImportDataMode(fImportDataMode,
1275                                   fixedBinaryRecLen);
1276         buffer->setTimeZone(fTimeZone);
1277         fBuffers.push_back(buffer);
1278     }
1279     if (!fS3Key.empty())
1280     {
1281         ms3_library_init();
1282         ms3 = ms3_init(fS3Key.c_str(), fS3Secret.c_str(), fS3Region.c_str(), fS3Host.c_str());
1283         if (!ms3)
1284         {
1285             ostringstream oss;
1286             oss << "Error initiating S3 library";
1287             fLog->logMsg( oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR );
1288             return ERR_FILE_OPEN;
1289         }
1290     }
1291     return 0;
1292 }
1293 
1294 //------------------------------------------------------------------------------
1295 // Add the specified ColumnInfo object (info) into this table's fColumns vector.
1296 //------------------------------------------------------------------------------
addColumn(ColumnInfo * info)1297 void TableInfo::addColumn(ColumnInfo* info)
1298 {
1299     fColumns.push_back(info);
1300     fNumberOfColumns = fColumns.size();
1301 
1302     fExtentStrAlloc.addColumn( info->column.mapOid,
1303                                info->column.width );
1304 }
1305 
1306 //------------------------------------------------------------------------------
1307 // Open the file corresponding to fFileName so that we can import it's contents.
1308 // A buffer is also allocated and passed to setvbuf().
1309 // If fReadFromStdin is true, we just assign stdin to our fHandle for reading.
1310 //------------------------------------------------------------------------------
openTableFile()1311 int TableInfo::openTableFile()
1312 {
1313     if (fHandle != NULL)
1314         return NO_ERROR;
1315 
1316     if (fReadFromStdin)
1317     {
1318         fHandle = stdin;
1319 
1320 #ifdef _MSC_VER
1321 
1322         // If this is a binary import from STDIN, then set stdin to binary
1323         if (fImportDataMode != IMPORT_DATA_TEXT)
1324             _setmode(_fileno(stdin), _O_BINARY);
1325 
1326         fFileBuffer = 0;
1327 #else
1328         // Not 100% sure that calling setvbuf on stdin does much, but in
1329         // some tests, it made a slight difference.
1330         fFileBuffer = new char[fFileBufSize];
1331         setvbuf(fHandle, fFileBuffer, _IOFBF, fFileBufSize);
1332 #endif
1333         ostringstream oss;
1334         oss << BOLD_START << "Reading input from STDIN to import into table " <<
1335             fTableName << "..." << BOLD_STOP;
1336         fLog->logMsg( oss.str(), MSGLVL_INFO1 );
1337     }
1338     else if (fReadFromS3)
1339     {
1340         int res;
1341         res = ms3_get(ms3, fS3Bucket.c_str(), fFileName.c_str(), (uint8_t**)&fFileBuffer, &fS3ReadLength);
1342         fS3ParseLength = 0;
1343         if (res)
1344         {
1345             ostringstream oss;
1346             oss << "Error retrieving file " << fFileName << " from S3: ";
1347             if (ms3_server_error(ms3))
1348             {
1349                 oss << ms3_server_error(ms3);
1350             }
1351             else
1352             {
1353                 oss << ms3_error(res);
1354             }
1355             fLog->logMsg( oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR );
1356             return ERR_FILE_OPEN;
1357         }
1358     }
1359     else
1360     {
1361         if (fImportDataMode == IMPORT_DATA_TEXT)
1362             fHandle = fopen( fFileName.c_str(), "r" );
1363         else
1364             fHandle = fopen( fFileName.c_str(), "rb" );
1365 
1366         if (fHandle == NULL)
1367         {
1368             int errnum = errno;
1369             ostringstream oss;
1370             oss << "Error opening import file " << fFileName << ". " <<
1371                 strerror(errnum);
1372             fLog->logMsg( oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR );
1373 
1374             // return an error; caller should set fStatusTI if needed
1375             return ERR_FILE_OPEN;
1376         }
1377 
1378         // now the input load file is available for reading the data.
1379         // read the data from the load file into the buffers.
1380         fFileBuffer = new char[fFileBufSize];
1381         setvbuf(fHandle, fFileBuffer, _IOFBF, fFileBufSize);
1382 
1383         ostringstream oss;
1384         oss << "Opening " << fFileName << " to import into table " << fTableName;
1385         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1386     }
1387 
1388     return NO_ERROR;
1389 }
1390 
1391 //------------------------------------------------------------------------------
1392 // Close the current open file we have been importing.
1393 //------------------------------------------------------------------------------
closeTableFile()1394 void TableInfo::closeTableFile()
1395 {
1396     if (fHandle)
1397     {
1398         // If reading from stdin, we don't delete the buffer out from under
1399         // the file handle, because stdin is still open.  This will cause a
1400         // memory leak, but when using stdin, we can only read in 1 table.
1401         // So it's not like we will be leaking multiple buffers for several
1402         // tables over the life of the job.
1403         if (!fReadFromStdin)
1404         {
1405             fclose(fHandle);
1406             delete [] fFileBuffer;
1407         }
1408 
1409         fHandle = 0;
1410     }
1411     else if (ms3)
1412     {
1413         ms3_free((uint8_t*)fFileBuffer);
1414     }
1415 }
1416 
1417 //------------------------------------------------------------------------------
1418 // "Grabs" the current read buffer for TableInfo so that the read thread that
1419 // is calling this function, can read the next buffer's set of data.
1420 //------------------------------------------------------------------------------
1421 // @bug2099. Temporary hack to diagnose deadlock.
1422 // Added report parm and couts below.
isBufferAvailable(bool report)1423 bool TableInfo::isBufferAvailable(bool report)
1424 {
1425     boost::mutex::scoped_lock lock(fSyncUpdatesTI);
1426     Status bufferStatus = fBuffers[fCurrentReadBuffer].getStatusBLB();
1427 
1428     if ( (bufferStatus == WriteEngine::PARSE_COMPLETE) ||
1429             (bufferStatus == WriteEngine::NEW) )
1430     {
1431         // reset buffer status and column locks while we have
1432         // an fSyncUpdatesTI lock
1433         fBuffers[fCurrentReadBuffer].setStatusBLB(
1434             WriteEngine::READ_PROGRESS);
1435         fBuffers[fCurrentReadBuffer].resetColumnLocks();
1436         return true;
1437     }
1438 
1439     if (report)
1440     {
1441         ostringstream oss;
1442         string bufferStatusStr;
1443         ColumnInfo::convertStatusToString( bufferStatus, bufferStatusStr );
1444         oss << "  Buffer status is " << bufferStatusStr << ". " << endl;
1445         oss << "  fCurrentReadBuffer is " << fCurrentReadBuffer << endl;
1446         cout << oss.str();
1447         cout.flush();
1448     }
1449 
1450     return false;
1451 }
1452 
1453 //------------------------------------------------------------------------------
1454 // Report whether rows were rejected, and if so, then list them out into the
1455 // reject file.
1456 //------------------------------------------------------------------------------
writeBadRows(const std::vector<std::string> * errorDatRows,bool bCloseFile)1457 void TableInfo::writeBadRows( const std::vector<std::string>* errorDatRows,
1458                               bool bCloseFile )
1459 {
1460     size_t errorDatRowsCount = 0;
1461 
1462     if (errorDatRows)
1463         errorDatRowsCount = errorDatRows->size();
1464 
1465     if (errorDatRowsCount > 0)
1466     {
1467         if (!fRejectDataFile.is_open())
1468         {
1469             ostringstream rejectFileName;
1470 
1471             if (fErrorDir.size() > 0)
1472             {
1473 #ifdef _MSC_VER
1474                 char filename[_MAX_FNAME];
1475                 char ext[_MAX_EXT];
1476                 _splitpath(const_cast<char*>(getFileName().c_str()),
1477                            NULL, NULL, filename, ext);
1478                 rejectFileName << fErrorDir << "\\" << filename << ext;
1479 #else
1480                 rejectFileName << fErrorDir << basename(getFileName().c_str());
1481 #endif
1482             }
1483             else
1484             {
1485                 if (fReadFromS3)
1486                 {
1487                     rejectFileName << basename(getFileName().c_str());
1488                 }
1489                 else
1490                 {
1491                     rejectFileName << getFileName();
1492                 }
1493             }
1494 
1495             rejectFileName << ".Job_" << fJobId <<
1496                            '_' << ::getpid() << BAD_FILE_SUFFIX;
1497             fRejectDataFileName = rejectFileName.str();
1498             fRejectDataFile.open( rejectFileName.str().c_str(),
1499                                   ofstream::out );
1500 
1501             if ( !fRejectDataFile )
1502             {
1503                 ostringstream oss;
1504                 oss << "Unable to create file: " << rejectFileName.str() <<
1505                     ";  Check permission.";
1506                 fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
1507 
1508                 return;
1509             }
1510         }
1511 
1512         for (std::vector<string>::const_iterator iter = errorDatRows->begin();
1513                 iter != errorDatRows->end(); ++iter)
1514         {
1515             fRejectDataFile << *iter;
1516         }
1517 
1518         fRejectDataCnt += errorDatRowsCount;
1519     }
1520 
1521     if (bCloseFile)
1522     {
1523         if (fRejectDataFile.is_open())
1524             fRejectDataFile.close();
1525 
1526         fRejectDataFile.clear();
1527 
1528         if (fRejectDataCnt > 0)
1529         {
1530             ostringstream oss;
1531             std::string rejectFileNameToLog;
1532 
1533             // Construct/report complete file name and save in list of files
1534             boost::filesystem::path p(fRejectDataFileName);
1535 
1536             if (!p.has_root_path())
1537             {
1538                 // We could fail here having fixed size buffer
1539                 char cwdPath[4096];
1540                 char* buffPtr = &cwdPath[0];
1541                 buffPtr = getcwd(cwdPath, sizeof(cwdPath));
1542                 boost::filesystem::path rejectFileName2( buffPtr );
1543                 rejectFileName2 /= fRejectDataFileName;
1544                 fBadFiles.push_back( rejectFileName2.string() );
1545 
1546                 rejectFileNameToLog = rejectFileName2.string();
1547             }
1548             else
1549             {
1550                 fBadFiles.push_back( fRejectDataFileName );
1551 
1552                 rejectFileNameToLog = fRejectDataFileName;
1553             }
1554 
1555             oss << "Number of rows with bad data = " << fRejectDataCnt <<
1556                 ".  Exact rows are listed in file located here: " <<
1557                 fErrorDir;
1558             fLog->logMsg(oss.str(), MSGLVL_INFO1);
1559 
1560             fRejectDataCnt = 0;
1561         }
1562     }
1563 }
1564 
1565 //------------------------------------------------------------------------------
1566 // Report whether rows were rejected, and if so, then list out the row numbers
1567 // and error reasons into the error file.
1568 //------------------------------------------------------------------------------
writeErrReason(const std::vector<std::pair<RID,string>> * errorRows,bool bCloseFile)1569 void  TableInfo::writeErrReason( const std::vector< std::pair<RID,
1570                                  string> >* errorRows,
1571                                  bool bCloseFile )
1572 {
1573     size_t errorRowsCount = 0;
1574 
1575     if (errorRows)
1576         errorRowsCount = errorRows->size();
1577 
1578     if (errorRowsCount > 0)
1579     {
1580         if (!fRejectErrFile.is_open())
1581         {
1582             ostringstream errFileName;
1583 
1584             if (fErrorDir.size() > 0)
1585             {
1586 #ifdef _MSC_VER
1587                 char filename[_MAX_FNAME];
1588                 char ext[_MAX_EXT];
1589                 _splitpath(const_cast<char*>(getFileName().c_str()),
1590                            NULL, NULL, filename, ext);
1591                 errFileName << fErrorDir << "\\" << filename << ext;
1592 #else
1593                 errFileName << fErrorDir << basename(getFileName().c_str());
1594 #endif
1595             }
1596             else
1597             {
1598                 if (fReadFromS3)
1599                 {
1600                     errFileName << basename(getFileName().c_str());
1601                 }
1602                 else
1603                 {
1604                     errFileName << getFileName();
1605                 }
1606             }
1607 
1608             errFileName << ".Job_" << fJobId <<
1609                         '_' << ::getpid() << ERR_FILE_SUFFIX;
1610             fRejectErrFileName = errFileName.str();
1611             fRejectErrFile.open( errFileName.str().c_str(),
1612                                  ofstream::out );
1613 
1614             if ( !fRejectErrFile )
1615             {
1616                 ostringstream oss;
1617                 oss << "Unable to create file: " << errFileName.str() <<
1618                     ";  Check permission.";
1619                 fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
1620 
1621                 return;
1622             }
1623         }
1624 
1625         for (std::vector< std::pair<RID, std::string> >::const_iterator iter =
1626                     errorRows->begin();
1627                 iter != errorRows->end(); ++iter)
1628         {
1629             fRejectErrFile << "Line number " << iter->first <<
1630                            ";  Error: " << iter->second << endl;
1631         }
1632 
1633         fRejectErrCnt += errorRowsCount;
1634     }
1635 
1636     if (bCloseFile)
1637     {
1638         if (fRejectErrFile.is_open())
1639             fRejectErrFile.close();
1640 
1641         fRejectErrFile.clear();
1642 
1643         if (fRejectErrCnt > 0)
1644         {
1645             ostringstream oss;
1646             std::string errFileNameToLog;
1647 
1648             // Construct/report complete file name and save in list of files
1649             boost::filesystem::path p(fRejectErrFileName);
1650 
1651             if (!p.has_root_path())
1652             {
1653                 char cwdPath[4096];
1654                 char* buffPtr = &cwdPath[0];
1655                 buffPtr = getcwd(cwdPath, sizeof(cwdPath));
1656                 boost::filesystem::path errFileName2( buffPtr );
1657                 errFileName2 /= fRejectErrFileName;
1658                 fErrFiles.push_back( errFileName2.string() );
1659 
1660                 errFileNameToLog = errFileName2.string();
1661             }
1662             else
1663             {
1664                 fErrFiles.push_back( fRejectErrFileName );
1665 
1666                 errFileNameToLog = fRejectErrFileName;
1667             }
1668 
1669             oss << "Number of rows with errors = " << fRejectDataCnt <<
1670                 ".  Exact rows are listed in file located here: " <<
1671                 fErrorDir;
1672             fLog->logMsg(oss.str(), MSGLVL_INFO1);
1673 
1674             fRejectErrCnt = 0;
1675         }
1676     }
1677 }
1678 
1679 //------------------------------------------------------------------------------
1680 // Logs "Bulkload |Job" message along with the specified message text
1681 // (messageText) to the critical log.
1682 //------------------------------------------------------------------------------
logToDataMods(const string & jobFile,const string & messageText)1683 void TableInfo::logToDataMods(const string& jobFile, const string&  messageText)
1684 {
1685     logging::Message::Args args;
1686 
1687     unsigned subsystemId = 19; // writeengine
1688 
1689     logging::LoggingID loggingId(subsystemId, 0, fTxnID.id, 0);
1690     logging::MessageLog messageLog(loggingId, LOG_LOCAL1);
1691 
1692     logging::Message m(8);
1693     args.add("Bulkload |Job: " + jobFile);
1694     args.add("|" + messageText);
1695     m.format(args);
1696     messageLog.logInfoMessage(m);
1697 }
1698 
1699 //------------------------------------------------------------------------------
1700 // Acquires DB table lock for this TableInfo object.
1701 // Function employs retry logic based on the SystemConfig/WaitPeriod.
1702 //------------------------------------------------------------------------------
acquireTableLock(bool disableTimeOut)1703 int TableInfo::acquireTableLock( bool disableTimeOut )
1704 {
1705     // Save DBRoot list at start of job; used to compare at EOJ.
1706     Config::getRootIdList( fOrigDbRootIds );
1707 
1708     // If executing distributed (mode1) or central command (mode2) then
1709     // don't worry about table locks.  The client front-end will manage locks.
1710     if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) ||
1711             (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
1712     {
1713         if (fLog->isDebug( DEBUG_1 ))
1714         {
1715             ostringstream oss;
1716             oss << "Bypass acquiring table lock in distributed mode, "
1717                 "for table" << fTableName << "; OID-" << fTableOID;
1718             fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1719         }
1720 
1721         return NO_ERROR;
1722     }
1723 
1724     const int SLEEP_INTERVAL    = 100; // sleep 100 milliseconds between checks
1725     const int NUM_TRIES_PER_SEC = 10;  // try 10 times per second
1726 
1727     int waitSeconds     = Config::getWaitPeriod();
1728     const int NUM_TRIES = NUM_TRIES_PER_SEC * waitSeconds;
1729     std::string tblLockErrMsg;
1730 
1731     // Retry loop to lock the db table associated with this TableInfo object
1732     std::string processName;
1733     uint32_t   processId;
1734     int32_t     sessionId;
1735     int32_t     transId;
1736     ostringstream pmModOss;
1737     pmModOss << " (pm" << Config::getLocalModuleID() << ')';
1738     bool timeout = false;
1739     //for (int i=0; i<NUM_TRIES; i++)
1740     int try_count = 0;
1741 
1742     while (!timeout)
1743     {
1744         processName = fProcessName;
1745         processName += pmModOss.str();
1746         processId   = ::getpid();
1747         sessionId   = -1;
1748         transId     = -1;
1749         int rc = BRMWrapper::getInstance()->getTableLock (
1750                      fTableOID,
1751                      processName,
1752                      processId,
1753                      sessionId,
1754                      transId,
1755                      fTableLockID,
1756                      tblLockErrMsg);
1757 
1758         if ((rc == NO_ERROR) && (fTableLockID > 0))
1759         {
1760             fTableLocked = true;
1761 
1762             if (fLog->isDebug( DEBUG_1 ))
1763             {
1764                 ostringstream oss;
1765                 oss << "Table lock acquired for table " << fTableName <<
1766                     "; OID-" << fTableOID << "; lockID-" << fTableLockID;
1767                 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1768             }
1769 
1770             return NO_ERROR;
1771         }
1772         else if (fTableLockID == 0)
1773         {
1774             // sleep and then go back and try getting table lock again
1775             sleepMS(SLEEP_INTERVAL);
1776 
1777             if (fLog->isDebug( DEBUG_1 ))
1778             {
1779                 ostringstream oss;
1780                 oss << "Retrying to acquire table lock for table " <<
1781                     fTableName << "; OID-" << fTableOID;
1782                 fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1783             }
1784         }
1785         else
1786         {
1787             ostringstream oss;
1788             oss << "Error in acquiring table lock for table " << fTableName <<
1789                 "; OID-" << fTableOID << "; " << tblLockErrMsg;
1790             fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
1791 
1792             return rc;
1793         }
1794 
1795         // if disableTimeOut is set then no timeout for table lock. Forever wait....
1796         timeout = (disableTimeOut ? false : (++try_count >= NUM_TRIES));
1797     }
1798 
1799     ostringstream oss;
1800     oss << "Unable to acquire lock for table " << fTableName <<
1801         "; OID-" << fTableOID << "; table currently locked by process-" <<
1802         processName << "; pid-" << processId <<
1803         "; session-" << sessionId <<
1804         "; txn-" << transId;
1805     fLog->logMsg( oss.str(), ERR_TBLLOCK_GET_LOCK_LOCKED, MSGLVL_ERROR );
1806 
1807     return ERR_TBLLOCK_GET_LOCK_LOCKED;
1808 }
1809 
1810 //------------------------------------------------------------------------------
1811 // Change table lock state (to cleanup)
1812 //------------------------------------------------------------------------------
changeTableLockState()1813 int TableInfo::changeTableLockState( )
1814 {
1815     // If executing distributed (mode1) or central command (mode2) then
1816     // don't worry about table locks.  The client front-end will manage locks.
1817     if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) ||
1818             (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
1819     {
1820         return NO_ERROR;
1821     }
1822 
1823     std::string tblLockErrMsg;
1824     bool bChanged = false;
1825 
1826     int rc = BRMWrapper::getInstance()->changeTableLockState (
1827                  fTableLockID,
1828                  BRM::CLEANUP,
1829                  bChanged,
1830                  tblLockErrMsg );
1831 
1832     if (rc == NO_ERROR)
1833     {
1834         if (fLog->isDebug( DEBUG_1 ))
1835         {
1836             ostringstream oss;
1837 
1838             if (bChanged)
1839             {
1840                 oss << "Table lock state changed to CLEANUP for table " <<
1841                     fTableName <<
1842                     "; OID-" << fTableOID <<
1843                     "; lockID-" << fTableLockID;
1844             }
1845             else
1846             {
1847                 oss << "Table lock state not changed to CLEANUP for table " <<
1848                     fTableName <<
1849                     "; OID-"    << fTableOID <<
1850                     "; lockID-" << fTableLockID <<
1851                     ".  Table lot locked.";
1852             }
1853 
1854             fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1855         }
1856     }
1857     else
1858     {
1859         ostringstream oss;
1860         oss << "Error in changing table state for table " << fTableName <<
1861             "; OID-"    << fTableOID    <<
1862             "; lockID-" << fTableLockID << "; " << tblLockErrMsg;
1863         fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
1864         return rc;
1865     }
1866 
1867     return NO_ERROR;
1868 }
1869 
1870 //------------------------------------------------------------------------------
1871 // Releases DB table lock assigned to this TableInfo object.
1872 //------------------------------------------------------------------------------
releaseTableLock()1873 int TableInfo::releaseTableLock( )
1874 {
1875     // If executing distributed (mode1) or central command (mode2) then
1876     // don't worry about table locks.  The client front-end will manage locks.
1877     if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) ||
1878             (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
1879     {
1880         if (fLog->isDebug( DEBUG_1 ))
1881         {
1882             ostringstream oss;
1883             oss << "Bypass releasing table lock in distributed mode, "
1884                 "for table " << fTableName << "; OID-" << fTableOID;
1885             fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1886         }
1887 
1888         return NO_ERROR;
1889     }
1890 
1891     std::string tblLockErrMsg;
1892     bool bReleased = false;
1893 
1894     // Unlock the database table
1895     int rc = BRMWrapper::getInstance()->releaseTableLock (
1896                  fTableLockID,
1897                  bReleased,
1898                  tblLockErrMsg );
1899 
1900     if (rc == NO_ERROR)
1901     {
1902         fTableLocked = false;
1903 
1904         if (fLog->isDebug( DEBUG_1 ))
1905         {
1906             ostringstream oss;
1907 
1908             if (bReleased)
1909             {
1910                 oss << "Table lock released for table " << fTableName <<
1911                     "; OID-" << fTableOID <<
1912                     "; lockID-" << fTableLockID;
1913             }
1914             else
1915             {
1916                 oss << "Table lock not released for table " << fTableName <<
1917                     "; OID-"    << fTableOID <<
1918                     "; lockID-" << fTableLockID <<
1919                     ".  Table not locked.";
1920             }
1921 
1922             fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1923         }
1924     }
1925     else
1926     {
1927         ostringstream oss;
1928         oss << "Error in releasing table lock for table " << fTableName <<
1929             "; OID-"    << fTableOID <<
1930             "; lockID-" << fTableLockID << "; " << tblLockErrMsg;
1931         fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
1932         return rc;
1933     }
1934 
1935     return NO_ERROR;
1936 }
1937 
1938 //------------------------------------------------------------------------------
1939 // Delete bulk rollback metadata file.
1940 //------------------------------------------------------------------------------
deleteMetaDataRollbackFile()1941 void TableInfo::deleteMetaDataRollbackFile( )
1942 {
1943     // If executing distributed (mode1) or central command (mode2) then
1944     // don't worry about table locks, or deleting meta data files.  The
1945     // client front-end will manage these tasks after all imports are finished.
1946     if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) ||
1947             (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
1948     {
1949         return;
1950     }
1951 
1952     if (!fKeepRbMetaFile)
1953     {
1954         // Treat any error as non-fatal, though we log it.
1955         try
1956         {
1957             fRBMetaWriter.deleteFile();
1958         }
1959         catch (WeException& ex)
1960         {
1961             ostringstream oss;
1962             oss << "Error deleting meta file; " << ex.what();
1963             fLog->logMsg(oss.str(), ex.errorCode(), MSGLVL_ERROR);
1964         }
1965     }
1966 }
1967 
1968 //------------------------------------------------------------------------------
1969 // Changes to "existing" DB files must be confirmed on HDFS system.
1970 // This function triggers this action.
1971 //------------------------------------------------------------------------------
1972 // @bug 5572 - Add db file confirmation for HDFS
confirmDBFileChanges()1973 int TableInfo::confirmDBFileChanges( )
1974 {
1975     // Unlike deleteTempDBFileChanges(), note that confirmDBFileChanges()
1976     // executes regardless of the import mode.  We go ahead and confirm
1977     // the file changes at the end of a successful cpimport.bin.
1978     if (idbdatafile::IDBPolicy::useHdfs())
1979     {
1980         ostringstream oss;
1981         oss << "Confirming DB file changes for " << fTableName;
1982         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
1983 
1984         std::string errMsg;
1985         ConfirmHdfsDbFile confirmHdfs;
1986         int rc = confirmHdfs.confirmDbFileListFromMetaFile( fTableOID, errMsg );
1987 
1988         if (rc != NO_ERROR)
1989         {
1990             ostringstream ossErrMsg;
1991             ossErrMsg << "Unable to confirm changes to table " << fTableName <<
1992                       "; " << errMsg;
1993             fLog->logMsg( ossErrMsg.str(), rc, MSGLVL_ERROR );
1994 
1995             return rc;
1996         }
1997     }
1998 
1999     return NO_ERROR;
2000 }
2001 
2002 //------------------------------------------------------------------------------
2003 // Temporary swap files must be deleted on HDFS system.
2004 // This function triggers this action.
2005 //------------------------------------------------------------------------------
2006 // @bug 5572 - Add db file confirmation for HDFS
deleteTempDBFileChanges()2007 void TableInfo::deleteTempDBFileChanges( )
2008 {
2009     // If executing distributed (mode1) or central command (mode2) then
2010     // no action necessary.  The client front-end will initiate the deletion
2011     // of the temp files, only after all the distributed imports have
2012     // successfully completed.
2013     if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) ||
2014             (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
2015     {
2016         return;
2017     }
2018 
2019     if (idbdatafile::IDBPolicy::useHdfs())
2020     {
2021         ostringstream oss;
2022         oss << "Deleting DB temp swap files for " << fTableName;
2023         fLog->logMsg( oss.str(), MSGLVL_INFO2 );
2024 
2025         std::string errMsg;
2026         ConfirmHdfsDbFile confirmHdfs;
2027         int rc = confirmHdfs.endDbFileListFromMetaFile(fTableOID, true, errMsg);
2028 
2029         // Treat any error as non-fatal, though we log it.
2030         if (rc != NO_ERROR)
2031         {
2032             ostringstream ossErrMsg;
2033             ossErrMsg << "Unable to delete temp swap files for table " <<
2034                       fTableName << "; " << errMsg;
2035             fLog->logMsg( ossErrMsg.str(), rc, MSGLVL_ERROR );
2036         }
2037     }
2038 }
2039 
2040 //------------------------------------------------------------------------------
2041 // Validates the correctness of the current HWMs for this table.
2042 // The HWMs for all the 1 byte columns should be identical.  Same goes
2043 // for all the 2 byte columns, etc.  The 2 byte column HWMs should be
2044 // "roughly" (but not necessarily exactly) twice that of a 1 byte column.
2045 // Same goes for the 4 byte column HWMs vs their 2 byte counterparts, etc.
2046 // jobTable - table/column information to use with validation.
2047 //            We use jobTable.colList[] (if provided) instead of data memmber
2048 //            fColumns, because this function is called during preprocessing,
2049 //            before TableInfo.fColumns has been initialized with data from
2050 //            colList.
2051 // segFileInfo - Vector of File objects carrying current DBRoot, partition,
2052 //            HWM, etc to be validated for the columns belonging to jobTable.
2053 // stage    - Current stage we are validating.  "Starting" or "Ending".
2054 //------------------------------------------------------------------------------
validateColumnHWMs(const JobTable * jobTable,const std::vector<DBRootExtentInfo> & segFileInfo,const char * stage)2055 int TableInfo::validateColumnHWMs(
2056     const JobTable* jobTable,
2057     const std::vector<DBRootExtentInfo>& segFileInfo,
2058     const char* stage )
2059 {
2060     int rc = NO_ERROR;
2061 
2062     // Used to track first 1-byte, 2-byte, 4-byte, and 8-byte columns in table
2063     int byte1First = -1;
2064     int byte2First = -1;
2065     int byte4First = -1;
2066     int byte8First = -1;
2067 
2068     // Make sure the HWMs for all 1-byte columns match; same for all 2-byte,
2069     // 4-byte, and 8-byte columns as well.
2070     for (unsigned k = 0; k < segFileInfo.size(); k++)
2071     {
2072         int k1 = 0;
2073 
2074         // Validate HWMs in jobTable if we have it, else use fColumns.
2075         const JobColumn& jobColK =
2076             ( (jobTable != 0) ? jobTable->colList[k] : fColumns[k].column );
2077 
2078         // Find the first 1-byte, 2-byte, 4-byte, and 8-byte columns.
2079         // Use those as our reference HWM for the respective column widths.
2080         switch ( jobColK.width )
2081         {
2082             case 1:
2083             {
2084                 if (byte1First == -1)
2085                     byte1First = k;
2086 
2087                 k1 = byte1First;
2088                 break;
2089             }
2090 
2091             case 2:
2092             {
2093                 if (byte2First == -1)
2094                     byte2First = k;
2095 
2096                 k1 = byte2First;
2097                 break;
2098             }
2099 
2100             case 4:
2101             {
2102                 if (byte4First == -1)
2103                     byte4First = k;
2104 
2105                 k1 = byte4First;
2106                 break;
2107             }
2108 
2109             case 8:
2110             default:
2111             {
2112                 if (byte8First == -1)
2113                     byte8First = k;
2114 
2115                 k1 = byte8First;
2116                 break;
2117             }
2118         } // end of switch based on column width (1,2,4, or 8)
2119 
2120         // Validate HWMs in jobTable if we have it, else use fColumns.
2121         const JobColumn& jobColK1 =
2122             ( (jobTable != 0) ? jobTable->colList[k1] : fColumns[k1].column );
2123 
2124 //std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 <<
2125 //  "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm <<
2126 //  " <to> col-" << k <<
2127 //  "; wid-" << jobColK.width << " ; hwm-"<<segFileInfo[k].fLocalHwm<<std::endl;
2128 
2129         // Validate that the HWM for this column (k) matches that of the
2130         // corresponding reference column with the same width.
2131         if ((segFileInfo[k1].fDbRoot    != segFileInfo[k].fDbRoot)    ||
2132                 (segFileInfo[k1].fPartition != segFileInfo[k].fPartition) ||
2133                 (segFileInfo[k1].fSegment   != segFileInfo[k].fSegment)   ||
2134                 (segFileInfo[k1].fLocalHwm  != segFileInfo[k].fLocalHwm))
2135         {
2136             ostringstream oss;
2137             oss << stage << " HWMs do not match for"
2138                 " OID1-"       << jobColK1.mapOid              <<
2139                 "; column-"    << jobColK1.colName             <<
2140                 "; DBRoot-"    << segFileInfo[k1].fDbRoot      <<
2141                 "; partition-" << segFileInfo[k1].fPartition   <<
2142                 "; segment-"   << segFileInfo[k1].fSegment     <<
2143                 "; hwm-"       << segFileInfo[k1].fLocalHwm    <<
2144                 "; width-"     << jobColK1.width << ':' << std::endl <<
2145                 " and OID2-"   << jobColK.mapOid               <<
2146                 "; column-"    << jobColK.colName              <<
2147                 "; DBRoot-"    << segFileInfo[k].fDbRoot       <<
2148                 "; partition-" << segFileInfo[k].fPartition    <<
2149                 "; segment-"   << segFileInfo[k].fSegment      <<
2150                 "; hwm-"       << segFileInfo[k].fLocalHwm     <<
2151                 "; width-"     << jobColK.width;
2152             fLog->logMsg( oss.str(), ERR_BRM_HWMS_NOT_EQUAL, MSGLVL_ERROR );
2153             return ERR_BRM_HWMS_NOT_EQUAL;
2154         }
2155 
2156         // HWM DBRoot, partition, and segment number should match for all
2157         // columns; so compare DBRoot, part#, and seg# with first column.
2158         if ((segFileInfo[0].fDbRoot    != segFileInfo[k].fDbRoot)    ||
2159                 (segFileInfo[0].fPartition != segFileInfo[k].fPartition) ||
2160                 (segFileInfo[0].fSegment   != segFileInfo[k].fSegment))
2161         {
2162             const JobColumn& jobCol0 =
2163                 ( (jobTable != 0) ? jobTable->colList[0] : fColumns[0].column );
2164 
2165             ostringstream oss;
2166             oss << stage << " HWM DBRoot,Part#, or Seg# do not match for"
2167                 " OID1-"       << jobCol0.mapOid               <<
2168                 "; column-"    << jobCol0.colName              <<
2169                 "; DBRoot-"    << segFileInfo[0].fDbRoot       <<
2170                 "; partition-" << segFileInfo[0].fPartition    <<
2171                 "; segment-"   << segFileInfo[0].fSegment      <<
2172                 "; hwm-"       << segFileInfo[0].fLocalHwm     <<
2173                 "; width-"     << jobCol0.width << ':' << std::endl <<
2174                 " and OID2-"   << jobColK.mapOid               <<
2175                 "; column-"    << jobColK.colName              <<
2176                 "; DBRoot-"    << segFileInfo[k].fDbRoot       <<
2177                 "; partition-" << segFileInfo[k].fPartition    <<
2178                 "; segment-"   << segFileInfo[k].fSegment      <<
2179                 "; hwm-"       << segFileInfo[k].fLocalHwm     <<
2180                 "; width-"     << jobColK.width;
2181             fLog->logMsg( oss.str(), ERR_BRM_HWMS_NOT_EQUAL, MSGLVL_ERROR );
2182             return ERR_BRM_HWMS_NOT_EQUAL;
2183         }
2184     } // end of loop to compare all 1-byte HWMs, 2-byte HWMs, etc.
2185 
2186     // Validate/compare HWM for 1-byte column in relation to 2-byte column, etc.
2187     // Without knowing the exact row count, we can't extrapolate the exact HWM
2188     // for the wider column, but we can narrow it down to an expected range.
2189     int refCol = 0;
2190     int colIdx = 0;
2191 
2192 //if (byte1First >= 0)
2193 //  std::cout << "dbg: cross compare1 " << stage << " col-" << byte1First <<
2194 //  "; wid-" << ( (jobTable != 0) ? jobTable->colList[byte1First].width :
2195 //                                  fColumns[byte1First].column.width ) <<
2196 //  "; hwm-" << segFileInfo[byte1First].fLocalHwm << std::endl;
2197 
2198 //if (byte2First >= 0)
2199 //  std::cout << "dbg: cross compare2 " << stage << " col-" << byte2First <<
2200 //  "; wid-" << ( (jobTable != 0) ? jobTable->colList[byte2First].width :
2201 //                                  fColumns[byte2First].column.width ) <<
2202 //  "; hwm-" << segFileInfo[byte2First].fLocalHwm << std::endl;
2203 
2204 //if (byte4First >= 0)
2205 //  std::cout << "dbg: cross compare4 " << stage << " col-" << byte4First <<
2206 //  "; wid-" << ( (jobTable != 0) ? jobTable->colList[byte4First].width :
2207 //                                  fColumns[byte4First].column.width ) <<
2208 //  "; hwm-" << segFileInfo[byte4First].fLocalHwm << std::endl;
2209 
2210 //if (byte8First >= 0)
2211 //  std::cout << "dbg: cross compare8 " << stage << " col-" << byte8First <<
2212 //  "; wid-" << ( (jobTable != 0) ? jobTable->colList[byte8First].width :
2213 //                                  fColumns[byte8First].column.width ) <<
2214 //  "; hwm-" << segFileInfo[byte8First].fLocalHwm << std::endl;
2215 
2216     // Validate/compare HWMs given a 1-byte column as a starting point
2217     if (byte1First >= 0)
2218     {
2219         refCol = byte1First;
2220 
2221         if (byte2First >= 0)
2222         {
2223             HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 2;
2224             HWM hwmHi = hwmLo + 1;
2225 
2226             if ((segFileInfo[byte2First].fLocalHwm < hwmLo) ||
2227                     (segFileInfo[byte2First].fLocalHwm > hwmHi))
2228             {
2229                 colIdx = byte2First;
2230                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
2231                 goto errorCheck;
2232             }
2233         }
2234 
2235         if (byte4First >= 0)
2236         {
2237             HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 4;
2238             HWM hwmHi = hwmLo + 3;
2239 
2240             if ((segFileInfo[byte4First].fLocalHwm < hwmLo) ||
2241                     (segFileInfo[byte4First].fLocalHwm > hwmHi))
2242             {
2243                 colIdx = byte4First;
2244                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
2245                 goto errorCheck;
2246             }
2247         }
2248 
2249         if (byte8First >= 0)
2250         {
2251             HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 8;
2252             HWM hwmHi = hwmLo + 7;
2253 
2254             if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
2255                     (segFileInfo[byte8First].fLocalHwm > hwmHi))
2256             {
2257                 colIdx = byte8First;
2258                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
2259                 goto errorCheck;
2260             }
2261         }
2262     }
2263 
2264     // Validate/compare HWMs given a 2-byte column as a starting point
2265     if (byte2First >= 0)
2266     {
2267         refCol = byte2First;
2268 
2269         if (byte4First >= 0)
2270         {
2271             HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 2;
2272             HWM hwmHi = hwmLo + 1;
2273 
2274             if ((segFileInfo[byte4First].fLocalHwm < hwmLo) ||
2275                     (segFileInfo[byte4First].fLocalHwm > hwmHi))
2276             {
2277                 colIdx = byte4First;
2278                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
2279                 goto errorCheck;
2280             }
2281         }
2282 
2283         if (byte8First >= 0)
2284         {
2285             HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 4;
2286             HWM hwmHi = hwmLo + 3;
2287 
2288             if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
2289                     (segFileInfo[byte8First].fLocalHwm > hwmHi))
2290             {
2291                 colIdx = byte8First;
2292                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
2293                 goto errorCheck;
2294             }
2295         }
2296     }
2297 
2298     // Validate/compare HWMs given a 4-byte column as a starting point
2299     if (byte4First >= 0)
2300     {
2301         refCol = byte4First;
2302 
2303         if (byte8First >= 0)
2304         {
2305             HWM hwmLo = segFileInfo[byte4First].fLocalHwm * 2;
2306             HWM hwmHi = hwmLo + 1;
2307 
2308             if ((segFileInfo[byte8First].fLocalHwm < hwmLo) ||
2309                     (segFileInfo[byte8First].fLocalHwm > hwmHi))
2310             {
2311                 colIdx = byte8First;
2312                 rc     = ERR_BRM_HWMS_OUT_OF_SYNC;
2313                 goto errorCheck;
2314             }
2315         }
2316     }
2317 
2318 // To avoid repeating this message 6 times in the preceding source code, we
2319 // use the "dreaded" goto to branch to this single place for error handling.
2320 errorCheck:
2321 
2322     if (rc != NO_ERROR)
2323     {
2324         const JobColumn& jobColRef = ( (jobTable != 0) ?
2325                                        jobTable->colList[refCol] : fColumns[refCol].column );
2326         const JobColumn& jobColIdx = ( (jobTable != 0) ?
2327                                        jobTable->colList[colIdx] : fColumns[colIdx].column );
2328 
2329         ostringstream oss;
2330         oss << stage << " HWMs are not in sync for"
2331             " OID1-"       << jobColRef.mapOid                 <<
2332             "; column-"    << jobColRef.colName                <<
2333             "; DBRoot-"    << segFileInfo[refCol].fDbRoot      <<
2334             "; partition-" << segFileInfo[refCol].fPartition   <<
2335             "; segment-"   << segFileInfo[refCol].fSegment     <<
2336             "; hwm-"       << segFileInfo[refCol].fLocalHwm    <<
2337             "; width-"     << jobColRef.width << ':' << std::endl <<
2338             " and OID2-"   << jobColIdx.mapOid                 <<
2339             "; column-"    << jobColIdx.colName                <<
2340             "; DBRoot-"    << segFileInfo[colIdx].fDbRoot      <<
2341             "; partition-" << segFileInfo[colIdx].fPartition   <<
2342             "; segment-"   << segFileInfo[colIdx].fSegment     <<
2343             "; hwm-"       << segFileInfo[colIdx].fLocalHwm    <<
2344             "; width-"     << jobColIdx.width;
2345         fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
2346     }
2347 
2348     return rc;
2349 }
2350 
2351 //------------------------------------------------------------------------------
2352 // DESCRIPTION:
2353 //    Initialize the bulk rollback metadata writer for this table.
2354 // RETURN:
2355 //    NO_ERROR if success
2356 //    other if fail
2357 //------------------------------------------------------------------------------
initBulkRollbackMetaData()2358 int TableInfo::initBulkRollbackMetaData( )
2359 {
2360     int rc = NO_ERROR;
2361 
2362     try
2363     {
2364         fRBMetaWriter.init( fTableOID, fTableName );
2365     }
2366     catch (WeException& ex)
2367     {
2368         fLog->logMsg(ex.what(), ex.errorCode(), MSGLVL_ERROR);
2369         rc = ex.errorCode();
2370     }
2371 
2372     return rc;
2373 }
2374 
2375 //------------------------------------------------------------------------------
2376 // DESCRIPTION:
2377 //    Saves snapshot of extentmap into a bulk rollback meta data file, for
2378 //    use in a bulk rollback, if the current cpimport.bin job should fail.
2379 //    The source code in RBMetaWriter::saveBulkRollbackMetaData() used to
2380 //    reside in this TableInfo function.  But much of the source code was
2381 //    factored out to create RBMetaWriter::saveBulkRollbackMetaData(), so
2382 //    that the function would reside in the shared library for reuse by DML.
2383 // PARAMETERS:
2384 //    job - current job
2385 //    segFileInfo - Vector of File objects carrying starting DBRoot, partition,
2386 //                  etc, for each column belonging to tableNo.
2387 //    dbRootHWMInfoVecCol - vector of last local HWM info for each DBRoot
2388 //        (asssigned to current PM) for each column in "this" table.
2389 // RETURN:
2390 //    NO_ERROR if success
2391 //    other if fail
2392 //------------------------------------------------------------------------------
saveBulkRollbackMetaData(Job & job,const std::vector<DBRootExtentInfo> & segFileInfo,const std::vector<BRM::EmDbRootHWMInfo_v> & dbRootHWMInfoVecCol)2393 int TableInfo::saveBulkRollbackMetaData( Job& job,
2394         const std::vector<DBRootExtentInfo>& segFileInfo,
2395         const std::vector<BRM::EmDbRootHWMInfo_v>& dbRootHWMInfoVecCol )
2396 {
2397     int rc = NO_ERROR;
2398 
2399     std::vector<Column> cols;
2400     std::vector<OID>    dctnryOids;
2401 
2402     // Loop through the columns in the specified table
2403     for ( size_t i = 0; i < job.jobTableList[fTableId].colList.size(); i++ )
2404     {
2405         JobColumn& jobCol = job.jobTableList[fTableId].colList[i];
2406 
2407         Column col;
2408         col.colNo               = i;
2409         col.colWidth            = jobCol.width;
2410         col.colType             = jobCol.weType;
2411         col.colDataType         = jobCol.dataType;
2412         col.dataFile.oid        = jobCol.mapOid;
2413         col.dataFile.fid        = jobCol.mapOid;
2414         col.dataFile.hwm        = segFileInfo[i].fLocalHwm;   // starting HWM
2415         col.dataFile.pFile      = 0;
2416         col.dataFile.fPartition = segFileInfo[i].fPartition;  // starting Part#
2417         col.dataFile.fSegment   = segFileInfo[i].fSegment;    // starting seg#
2418         col.dataFile.fDbRoot    = segFileInfo[i].fDbRoot;     // starting DBRoot
2419         col.compressionType     = jobCol.compressionType;
2420         cols.push_back( col );
2421 
2422         OID dctnryOid = 0;
2423 
2424         if (jobCol.colType == COL_TYPE_DICT)
2425             dctnryOid = jobCol.dctnry.dctnryOid;
2426 
2427         dctnryOids.push_back( dctnryOid );
2428 
2429     }   // end of loop through columns
2430 
2431     fRBMetaWriter.setUIDGID(this);
2432 
2433     try
2434     {
2435         fRBMetaWriter.saveBulkRollbackMetaData(
2436             cols,
2437             dctnryOids,
2438             dbRootHWMInfoVecCol );
2439     }
2440     catch (WeException& ex)
2441     {
2442         fLog->logMsg(ex.what(), ex.errorCode(), MSGLVL_ERROR);
2443         rc = ex.errorCode();
2444     }
2445 
2446     return rc;
2447 }
2448 
2449 //------------------------------------------------------------------------------
2450 // Synchronize system catalog auto-increment next value with BRM.
2451 // This function is called at the end of normal processing to get the system
2452 // catalog back in line with the latest auto increment next value generated by
2453 // BRM.
2454 //------------------------------------------------------------------------------
synchronizeAutoInc()2455 int TableInfo::synchronizeAutoInc( )
2456 {
2457     for (unsigned i = 0; i < fColumns.size(); ++i)
2458     {
2459         if (fColumns[i].column.autoIncFlag)
2460         {
2461             // TBD: Do we rollback flush cache error for autoinc.
2462             // Not sure we should bail out and rollback on a
2463             // ERR_BLKCACHE_FLUSH_LIST error, but we currently
2464             // rollback for "any" updateNextValue() error
2465             int rc = fColumns[i].finishAutoInc( );
2466 
2467             if (rc != NO_ERROR)
2468             {
2469                 return rc;
2470             }
2471 
2472             break; // okay to break; only 1 autoinc column per table
2473         }
2474     }
2475 
2476     return NO_ERROR;
2477 }
2478 
2479 //------------------------------------------------------------------------------
2480 // Rollback changes made to "this" table by the current import job, delete the
2481 // meta-data files, and release the table lock.  This function only applies to
2482 // mode3 import.  Table lock and bulk rollbacks are managed by parent cpimport
2483 // (file splitter) process for mode1 and mode2.
2484 //------------------------------------------------------------------------------
rollbackWork()2485 int TableInfo::rollbackWork( )
2486 {
2487     // Close any column or store files left open by abnormal termination.
2488     // We want to do this before reopening the files and doing a bulk rollback.
2489     closeOpenDbFiles();
2490 
2491     // Abort "local" bulk rollback if a DBRoot from the start of the job, is
2492     // now missing.  User should run cleartablelock to execute a rollback on
2493     // this PM "and" the PM where the DBRoot was moved to.
2494     std::vector<uint16_t> dbRootIds;
2495     Config::getRootIdList( dbRootIds );
2496 
2497     for (unsigned int j = 0; j < fOrigDbRootIds.size(); j++)
2498     {
2499         bool bFound = false;
2500 
2501         for (unsigned int k = 0; k < dbRootIds.size(); k++)
2502         {
2503             if (fOrigDbRootIds[j] == dbRootIds[k])
2504             {
2505                 bFound = true;
2506                 break;
2507             }
2508         }
2509 
2510         if (!bFound)
2511         {
2512             ostringstream oss;
2513             oss << "Mode3 bulk rollback not performed for table " <<
2514                 fTableName << "; DBRoot" << fOrigDbRootIds[j] <<
2515                 " moved from this PM during bulk load. " <<
2516                 " Run cleartablelock to rollback and release the table lock " <<
2517                 "across PMs.";
2518             int rc = ERR_BULK_ROLLBACK_MISS_ROOT;
2519             fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
2520             return rc;
2521         }
2522     }
2523 
2524     // Restore/rollback the DB files if we got far enough to begin processing
2525     // this table.
2526     int rc = NO_ERROR;
2527 
2528     if (hasProcessingBegun())
2529     {
2530         BulkRollbackMgr rbMgr( fTableOID,
2531                                fTableLockID,
2532                                fTableName,
2533                                fProcessName, fLog );
2534 
2535         rc = rbMgr.rollback( fKeepRbMetaFile );
2536 
2537         if (rc != NO_ERROR)
2538         {
2539             ostringstream oss;
2540             oss << "Error rolling back table " << fTableName <<
2541                 "; " << rbMgr.getErrorMsg();
2542             fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
2543             return rc;
2544         }
2545     }
2546 
2547     // Delete the meta data files after rollback is complete
2548     deleteMetaDataRollbackFile( );
2549 
2550     // Release the table lock
2551     rc = releaseTableLock( );
2552 
2553     if (rc != NO_ERROR)
2554     {
2555         ostringstream oss;
2556         oss << "Table lock not cleared for table " << fTableName;
2557         fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
2558         return rc;
2559     }
2560 
2561     return rc;
2562 }
2563 
2564 //------------------------------------------------------------------------------
2565 // Allocate extent from BRM (through the stripe allocator).
2566 //------------------------------------------------------------------------------
allocateBRMColumnExtent(OID columnOID,uint16_t dbRoot,uint32_t & partition,uint16_t & segment,BRM::LBID_t & startLbid,int & allocSize,HWM & hwm,std::string & errMsg)2567 int TableInfo::allocateBRMColumnExtent(OID columnOID,
2568                                        uint16_t     dbRoot,
2569                                        uint32_t&    partition,
2570                                        uint16_t&    segment,
2571                                        BRM::LBID_t& startLbid,
2572                                        int&         allocSize,
2573                                        HWM&         hwm,
2574                                        std::string& errMsg )
2575 {
2576     int rc = fExtentStrAlloc.allocateExtent( columnOID,
2577              dbRoot,
2578              partition,
2579              segment,
2580              startLbid,
2581              allocSize,
2582              hwm,
2583              errMsg );
2584     //fExtentStrAlloc.print();
2585 
2586     return rc;
2587 }
2588 
2589 }
2590 // end of namespace
2591