1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*******************************************************************************
19 * $Id: we_bulkload.h 4631 2013-05-02 15:21:09Z dcathey $
20 *
21 *******************************************************************************/
22 /** @file */
23 
24 #ifndef _WE_BULKLOAD_H_
25 #define _WE_BULKLOAD_H_
26 #ifndef _MSC_VER
27 #include <pthread.h>
28 #endif
29 #include <fstream>
30 #include <string>
31 #include <vector>
32 #include <sys/time.h>
33 
34 #include <we_log.h>
35 #include <we_colop.h>
36 #include <we_xmljob.h>
37 #include <we_convertor.h>
38 #include <writeengine.h>
39 
40 #include <we_brm.h>
41 
42 #include "we_tableinfo.h"
43 #include "brmtypes.h"
44 #include "boost/ptr_container/ptr_vector.hpp"
45 #include <boost/thread/thread.hpp>
46 #include <boost/thread/mutex.hpp>
47 #include <boost/bind.hpp>
48 #include <boost/scoped_ptr.hpp>
49 #include <boost/uuid/uuid.hpp>
50 
51 #if 0 //defined(_MSC_VER) && defined(WE_BULKLOAD_DLLEXPORT)
52 #define EXPORT __declspec(dllexport)
53 #else
54 #define EXPORT
55 #endif
56 
57 /** Namespace WriteEngine */
58 namespace WriteEngine
59 {
60 
61 /** Class BulkLoad */
62 class BulkLoad : public FileOp
63 {
64 public:
65 
66     /**
67      * @brief BulkLoad constructor
68      */
69     EXPORT              BulkLoad();
70 
71     /**
72      * @brief BulkLoad destructor
73      */
74     EXPORT              ~BulkLoad();
75 
76     /**
77      * @brief Load job information
78      */
79     EXPORT int          loadJobInfo( const std::string& fullFileName,
80                                      bool  bUseTempJobFile,
81                                      int   argc,
82                                      char** argv,
83                                      bool  bLogInfo2ToConsole,
84                                      bool  bValidateColumnList );
85 
86     /**
87      * @brief Pre process jobs to validate and assign values to the job structure
88      */
89     int                 preProcess(Job& job, int tableNo, TableInfo* tableInfo);
90 
91     /**
92      * @brief Print job information
93      */
94     void                printJob( );
95 
96     /**
97      * @brief Process job
98      */
99     EXPORT int          processJob( );
100 
101     /**
102      * @brief Set Debug level for this BulkLoad object and any data members
103      */
104     void                setAllDebug( DebugLevel level );
105 
106     /**
107      * @brief Update next autoincrement value for specified OID.
108      *  @param columnOID  oid of autoincrement column to be updated
109      *  @param nextValue next autoincrement value to assign to tableOID
110      */
111     static int          updateNextValue(OID columnOID, uint64_t nextValue);
112 
113     // Accessors and mutators
114     void                addToCmdLineImportFileList(const std::string& importFile);
115     const std::string&  getAlternateImportDir( ) const;
116     const std::string&  getErrorDir          ( ) const;
117     const std::string&  getTimeZone          ( ) const;
118     const std::string&  getJobDir            ( ) const;
119     const std::string&  getSchema            ( ) const;
120     const std::string&  getTempJobDir        ( ) const;
121     const std::string&  getS3Key             ( ) const;
122     const std::string&  getS3Secret          ( ) const;
123     const std::string&  getS3Bucket          ( ) const;
124     const std::string&  getS3Host            ( ) const;
125     const std::string&  getS3Region          ( ) const;
126     bool                getTruncationAsError ( ) const;
127     BulkModeType        getBulkLoadMode      ( ) const;
128     bool                getContinue          ( ) const;
getJobUUID()129     boost::uuids::uuid  getJobUUID           ( ) const
130     {
131         return fUUID;
132     }
133 
134     EXPORT int          setAlternateImportDir( const std::string& loadDir,
135             std::string& errMsg);
136     void                setImportDataMode    ( ImportDataMode importMode );
137     void                setColDelimiter      ( char delim );
138     void                setBulkLoadMode      ( BulkModeType bulkMode,
139             const std::string& rptFileName );
140     void                setEnclosedByChar    ( char enChar);
141     void                setEscapeChar        ( char esChar);
142     void                setKeepRbMetaFiles   ( bool keepMeta );
143     void                setMaxErrorCount     ( unsigned int maxErrors );
144     void                setNoOfParseThreads  ( int parseThreads );
145     void                setNoOfReadThreads   ( int readThreads );
146     void                setNullStringMode    ( bool bMode );
147     void                setParseErrorOnTable (int tableId, bool lockParseMutex);
148     void                setParserNum         ( int parser );
149     void                setProcessName       ( const std::string& processName );
150     void                setReadBufferCount   ( int noOfReadBuffers );
151     void                setReadBufferSize    ( int readBufferSize );
152     void                setTxnID             ( BRM::TxnID txnID );
153     void                setVbufReadSize      ( int vbufReadSize );
154     void                setTruncationAsError ( bool bTruncationAsError );
155     void                setJobUUID           ( const std::string& jobUUID );
156     void                setErrorDir          ( const std::string& errorDir );
157     void                setTimeZone          ( const std::string& timeZone );
158     void                setS3Key             ( const std::string& key );
159     void                setS3Secret          ( const std::string& secret );
160     void                setS3Bucket          ( const std::string& bucket );
161     void                setS3Host            ( const std::string& host );
162     void                setS3Region          ( const std::string& region );
163     void                setUsername          ( const std::string& username );
164     // Timer functions
165     void                startTimer           ( );
166     void                stopTimer            ( );
167     double              getTotalRunTime      ( ) const;
168 
169     void                disableTimeOut       ( const bool disableTimeOut);
170     bool                disableTimeOut       ( ) const;
171 
disableConsoleOutput(const bool noConsoleOutput)172     static void 				disableConsoleOutput ( const bool noConsoleOutput)
173     {
174         fNoConsoleOutput = noConsoleOutput;
175     }
disableConsoleOutput()176     static bool 				disableConsoleOutput ( )
177     {
178         return fNoConsoleOutput;
179     }
180 
181     // Add error message into appropriate BRM updater
182     static bool         addErrorMsg2BrmUpdater(const std::string& tablename, const std::ostringstream& oss);
183     void                setDefaultJobUUID     ( );
184 
185 private:
186 
187     //--------------------------------------------------------------------------
188     // Private Data Members
189     //--------------------------------------------------------------------------
190     XMLJob      fJobInfo;                 // current job information
191 
192     boost::scoped_ptr<ColumnOp> fColOp;   // column operation
193 
194     std::string fRootDir;                 // job process root directory
195     std::string fJobFileName;             // job description file name
196 
197     Log         fLog;                     // logger
198 
199     int         fNumOfParser;             // total number of parser
200     char        fColDelim;                // delimits col values within a row
201 
202     int         fNoOfBuffers;              // Number of read buffers
203     int         fBufferSize;               // Read buffer size
204     int         fFileVbufSize;             // Internal file system buffer size
205     long long   fMaxErrors;                // Max allowable errors per job
206     std::string fAlternateImportDir;       // Alternate bulk import directory
207     std::string fErrorDir;                 // Opt. where error records record
208     std::string fProcessName;              // Application process name
209     static boost::ptr_vector<TableInfo> fTableInfo;// Vector of Table information
210     int         fNoOfParseThreads;         // Number of parse threads
211     int         fNoOfReadThreads;          // Number of read threads
212     boost::thread_group fReadThreads;      // Read thread group
213     boost::thread_group fParseThreads;     // Parse thread group
214     boost::mutex fReadMutex;               // Manages table selection by each
215     //   read thread
216     boost::mutex fParseMutex;              // Manages table/buffer/column
217     //   selection by each parsing thread
218     BRM::TxnID  fTxnID;                    // TransID acquired from SessionMgr
219     bool        fKeepRbMetaFiles;          // Keep/delete bulkRB metadata files
220     bool        fNullStringMode;           // Treat "NULL" as NULL value
221     char        fEnclosedByChar;           // Char used to enclose column value
222     char        fEscapeChar;               // Escape char within enclosed value
223     timeval     fStartTime;                // job start time
224     timeval     fEndTime;                  // job end time
225     double      fTotalTime;                // elapsed time for current phase
226     std::vector<std::string> fCmdLineImportFiles; // Import Files from cmd line
227     BulkModeType fBulkMode;                // Distributed bulk mode (1,2, or 3)
228     std::string fBRMRptFileName;           // Name of distributed mode rpt file
229     bool        fbTruncationAsError;       // Treat string truncation as error
230     ImportDataMode fImportDataMode;        // Importing text or binary data
231     bool        fbContinue;                // true when read and parse r running
232     //
233     static boost::mutex*       fDDLMutex;  // Insure only 1 DDL op at a time
234 
235     EXPORT static const std::string   DIR_BULK_JOB;     // Bulk job directory
236     EXPORT static const std::string   DIR_BULK_TEMP_JOB;// Dir for tmp job files
237     static const std::string   DIR_BULK_IMPORT;  // Bulk job import dir
238     static const std::string   DIR_BULK_LOG;     // Bulk job log directory
239     bool        fDisableTimeOut;           // disable timeout when waiting for table lock
240     boost::uuids::uuid fUUID;               // job UUID
241     static bool		fNoConsoleOutput;		   // disable output to console
242     std::string fTimeZone;                 // Timezone to use for TIMESTAMP data type
243     std::string fS3Key;                    // S3 Key
244     std::string fS3Secret;                 // S3 Secret
245     std::string fS3Host;                   // S3 Host
246     std::string fS3Bucket;                 // S3 Bucket
247     std::string fS3Region;                 // S3 Region
248     std::string fUsername;                 // data files owner name mysql by default
249 
250     //--------------------------------------------------------------------------
251     // Private Functions
252     //--------------------------------------------------------------------------
253 
254     // Spawn the worker threads.
255     void spawnWorkers();
256 
257     // Checks if all tables have the status set
258     bool allTablesDone(Status status);
259 
260     // Lock the table for read. Called by the read thread.
261     int lockTableForRead(int id);
262 
263     // Get column for parsing. Called by the parse thread.
264     // @bug 2099 - Temporary hack to diagnose deadlock. Added report parm below.
265     bool lockColumnForParse(int id,             // thread id
266                             int& tableId,       // selected table id
267                             int& columnId,      // selected column id
268                             int& myParseBuffer, // selected parse buffer
269                             bool report);
270 
271     // Map specified DBRoot to it's first segment file number
272     int mapDBRootToFirstSegment(OID columnOid,
273                                 uint16_t  dbRoot,
274                                 uint16_t& segment);
275 
276     // The thread method for the read thread.
277     void read(int id);
278 
279     // The thread method for the parse thread.
280     void parse(int  id);
281 
282     // Sleep method
283     void sleepMS(long int ms);
284 
285     // Initialize auto-increment column for specified schema and table.
286     int preProcessAutoInc(
287         const std::string& fullTableName,// schema.table
288         ColumnInfo* colInfo);            // ColumnInfo associated with AI column
289 
290     // Determine starting HWM and LBID after block skipping added to HWM
291     int preProcessHwmLbid( const ColumnInfo* info,
292                            int          minWidth,
293                            uint32_t    partition,
294                            uint16_t    segment,
295                            HWM&         hwm,
296                            BRM::LBID_t& lbid,
297                            bool&        bSkippedToNewExtent);
298 
299     // Rollback any tables that are left in a locked state at EOJ.
300     int rollbackLockedTables( );
301 
302     // Rollback a table left in a locked state.
303     int rollbackLockedTable( TableInfo& tableInfo );
304 
305     // Save metadata info required for shared-nothing bulk rollback.
306     int saveBulkRollbackMetaData( Job& job,   // current job
307                                   TableInfo* tableInfo,                 // TableInfo for table of interest
308                                   const std::vector<DBRootExtentInfo>& segFileInfo, //vector seg file info
309                                   const std::vector<BRM::EmDbRootHWMInfo_v>& dbRootHWMInfoPM);
310 
311     // Manage/validate the list of 1 or more import data files
312     int manageImportDataFileList(Job& job,    // current job
313                                  int tableNo,                          // table number of current job
314                                  TableInfo* tableInfo);                // TableInfo for table of interest
315 
316     // Break up list of file names into a vector of filename strings
317     int buildImportDataFileList(
318         const std::string& location,
319         const std::string& filename,
320         std::vector<std::string>& importFileNames);
321 };
322 
323 //------------------------------------------------------------------------------
324 // Inline functions
325 //------------------------------------------------------------------------------
addToCmdLineImportFileList(const std::string & importFile)326 inline void BulkLoad::addToCmdLineImportFileList(const std::string& importFile)
327 {
328     fCmdLineImportFiles.push_back( importFile );
329 }
330 
getAlternateImportDir()331 inline const std::string& BulkLoad::getAlternateImportDir( ) const
332 {
333     return fAlternateImportDir;
334 }
335 
getErrorDir()336 inline const std::string& BulkLoad::getErrorDir( ) const
337 {
338     return fErrorDir;
339 }
340 
getTimeZone()341 inline const std::string& BulkLoad::getTimeZone( ) const
342 {
343     return fTimeZone;
344 }
345 
getJobDir()346 inline const std::string& BulkLoad::getJobDir( ) const
347 {
348     return DIR_BULK_JOB;
349 }
350 
getSchema()351 inline const std::string& BulkLoad::getSchema( ) const
352 {
353     return fJobInfo.getJob().schema;
354 }
355 
getTempJobDir()356 inline const std::string& BulkLoad::getTempJobDir( ) const
357 {
358     return DIR_BULK_TEMP_JOB;
359 }
360 
getS3Key()361 inline const std::string& BulkLoad::getS3Key( ) const
362 {
363     return fS3Key;
364 }
365 
getS3Secret()366 inline const std::string& BulkLoad::getS3Secret( ) const
367 {
368     return fS3Secret;
369 }
370 
getS3Bucket()371 inline const std::string& BulkLoad::getS3Bucket( ) const
372 {
373     return fS3Bucket;
374 }
375 
getS3Host()376 inline const std::string& BulkLoad::getS3Host( ) const
377 {
378     return fS3Host;
379 }
380 
getS3Region()381 inline const std::string& BulkLoad::getS3Region( ) const
382 {
383     return fS3Region;
384 }
385 
getTruncationAsError()386 inline bool BulkLoad::getTruncationAsError ( ) const
387 {
388     return fbTruncationAsError;
389 }
390 
getBulkLoadMode()391 inline BulkModeType BulkLoad::getBulkLoadMode ( ) const
392 {
393     return fBulkMode;
394 }
395 
getContinue()396 inline bool BulkLoad::getContinue ( ) const
397 {
398     return fbContinue;
399 }
400 
printJob()401 inline void BulkLoad::printJob()
402 {
403     if (isDebug(DEBUG_1))
404         fJobInfo.printJobInfo(fLog);
405     else
406         fJobInfo.printJobInfoBrief(fLog);
407 }
408 
setAllDebug(DebugLevel level)409 inline void BulkLoad::setAllDebug( DebugLevel level )
410 {
411     setDebugLevel( level );
412     fLog.setDebugLevel( level );
413 }
414 
setColDelimiter(char delim)415 inline void BulkLoad::setColDelimiter( char delim )
416 {
417     fColDelim = delim;
418 }
419 
setBulkLoadMode(BulkModeType bulkMode,const std::string & rptFileName)420 inline void BulkLoad::setBulkLoadMode(
421     BulkModeType       bulkMode,
422     const std::string& rptFileName )
423 {
424     fBulkMode       = bulkMode;
425     fBRMRptFileName = rptFileName;
426 }
427 
setEnclosedByChar(char enChar)428 inline void BulkLoad::setEnclosedByChar( char enChar )
429 {
430     fEnclosedByChar = enChar;
431 }
432 
setEscapeChar(char esChar)433 inline void BulkLoad::setEscapeChar( char esChar )
434 {
435     fEscapeChar     = esChar;
436 }
437 
setImportDataMode(ImportDataMode importMode)438 inline void BulkLoad::setImportDataMode(ImportDataMode importMode)
439 {
440     fImportDataMode = importMode;
441 }
442 
setKeepRbMetaFiles(bool keepMeta)443 inline void BulkLoad::setKeepRbMetaFiles( bool keepMeta )
444 {
445     fKeepRbMetaFiles = keepMeta;
446 }
447 
448 // Mutator takes an unsigned int, but we store in a long long, because...
449 // TableInfo which eventually needs this attribute, takes an unsigned int,
450 // but we want to be able to init to -1, to indicate when it has not been set.
setMaxErrorCount(unsigned int maxErrors)451 inline void BulkLoad::setMaxErrorCount( unsigned int maxErrors )
452 {
453     fMaxErrors = maxErrors;
454 }
455 
setNoOfParseThreads(int parseThreads)456 inline void BulkLoad::setNoOfParseThreads(int parseThreads )
457 {
458     fNoOfParseThreads = parseThreads;
459 }
460 
setNoOfReadThreads(int readThreads)461 inline void BulkLoad::setNoOfReadThreads( int readThreads )
462 {
463     fNoOfReadThreads = readThreads;
464 }
465 
setNullStringMode(bool bMode)466 inline void BulkLoad::setNullStringMode( bool bMode )
467 {
468     fNullStringMode = bMode;
469 }
470 
setParserNum(int parser)471 inline void BulkLoad::setParserNum( int parser )
472 {
473     fNumOfParser = parser;
474 }
475 
setProcessName(const std::string & processName)476 inline void BulkLoad::setProcessName( const std::string& processName )
477 {
478     fProcessName = processName;
479 }
480 
setReadBufferCount(int noOfReadBuffers)481 inline void BulkLoad::setReadBufferCount( int noOfReadBuffers )
482 {
483     fNoOfBuffers = noOfReadBuffers;
484 }
485 
setReadBufferSize(int readBufferSize)486 inline void BulkLoad::setReadBufferSize( int readBufferSize )
487 {
488     fBufferSize = readBufferSize;
489 }
490 
setTxnID(BRM::TxnID txnID)491 inline void BulkLoad::setTxnID( BRM::TxnID txnID )
492 {
493     fTxnID = txnID;
494 }
495 
setVbufReadSize(int vbufReadSize)496 inline void BulkLoad::setVbufReadSize( int vbufReadSize )
497 {
498     fFileVbufSize = vbufReadSize;
499 }
500 
setTruncationAsError(bool bTruncationAsError)501 inline void BulkLoad::setTruncationAsError(bool bTruncationAsError)
502 {
503     fbTruncationAsError = bTruncationAsError;
504 }
505 
setErrorDir(const std::string & errorDir)506 inline void BulkLoad::setErrorDir( const std::string& errorDir )
507 {
508     fErrorDir = errorDir;
509 }
510 
setTimeZone(const std::string & timeZone)511 inline void BulkLoad::setTimeZone( const std::string& timeZone )
512 {
513     fTimeZone = timeZone;
514 }
515 
setS3Key(const std::string & key)516 inline void BulkLoad::setS3Key( const std::string& key )
517 {
518     fS3Key = key;
519 }
520 
setS3Secret(const std::string & secret)521 inline void BulkLoad::setS3Secret( const std::string& secret )
522 {
523     fS3Secret = secret;
524 }
525 
setS3Bucket(const std::string & bucket)526 inline void BulkLoad::setS3Bucket( const std::string& bucket )
527 {
528     fS3Bucket = bucket;
529 }
530 
setS3Host(const std::string & host)531 inline void BulkLoad::setS3Host( const std::string& host )
532 {
533     fS3Host = host;
534 }
535 
setS3Region(const std::string & region)536 inline void BulkLoad::setS3Region( const std::string& region )
537 {
538     fS3Region = region;
539 }
540 
setUsername(const std::string & username)541 inline void BulkLoad::setUsername( const std::string& username )
542 {
543     fUsername = username;
544 }
545 
startTimer()546 inline void BulkLoad::startTimer( )
547 {
548     gettimeofday( &fStartTime, 0 );
549 }
550 
stopTimer()551 inline void BulkLoad::stopTimer()
552 {
553     gettimeofday( &fEndTime, 0 );
554     fTotalTime = (fEndTime.tv_sec   + (fEndTime.tv_usec   / 1000000.0)) -
555                  (fStartTime.tv_sec + (fStartTime.tv_usec / 1000000.0));
556 }
557 
getTotalRunTime()558 inline double BulkLoad::getTotalRunTime() const
559 {
560     return fTotalTime;
561 }
562 
disableTimeOut(const bool disableTimeOut)563 inline void BulkLoad::disableTimeOut( const bool disableTimeOut)
564 {
565     fDisableTimeOut = disableTimeOut;
566 }
567 
disableTimeOut()568 inline bool BulkLoad::disableTimeOut() const
569 {
570     return fDisableTimeOut;
571 }
572 
573 } // end of namespace
574 
575 #undef EXPORT
576 
577 #endif // _WE_BULKLOAD_H_
578