1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*******************************************************************************
19 * $Id: we_bulkload.h 4631 2013-05-02 15:21:09Z dcathey $
20 *
21 *******************************************************************************/
22 /** @file */
23
24 #ifndef _WE_BULKLOAD_H_
25 #define _WE_BULKLOAD_H_
26 #ifndef _MSC_VER
27 #include <pthread.h>
28 #endif
29 #include <fstream>
30 #include <string>
31 #include <vector>
32 #include <sys/time.h>
33
34 #include <we_log.h>
35 #include <we_colop.h>
36 #include <we_xmljob.h>
37 #include <we_convertor.h>
38 #include <writeengine.h>
39
40 #include <we_brm.h>
41
42 #include "we_tableinfo.h"
43 #include "brmtypes.h"
44 #include "boost/ptr_container/ptr_vector.hpp"
45 #include <boost/thread/thread.hpp>
46 #include <boost/thread/mutex.hpp>
47 #include <boost/bind.hpp>
48 #include <boost/scoped_ptr.hpp>
49 #include <boost/uuid/uuid.hpp>
50
51 #if 0 //defined(_MSC_VER) && defined(WE_BULKLOAD_DLLEXPORT)
52 #define EXPORT __declspec(dllexport)
53 #else
54 #define EXPORT
55 #endif
56
57 /** Namespace WriteEngine */
58 namespace WriteEngine
59 {
60
61 /** Class BulkLoad */
62 class BulkLoad : public FileOp
63 {
64 public:
65
66 /**
67 * @brief BulkLoad constructor
68 */
69 EXPORT BulkLoad();
70
71 /**
72 * @brief BulkLoad destructor
73 */
74 EXPORT ~BulkLoad();
75
76 /**
77 * @brief Load job information
78 */
79 EXPORT int loadJobInfo( const std::string& fullFileName,
80 bool bUseTempJobFile,
81 int argc,
82 char** argv,
83 bool bLogInfo2ToConsole,
84 bool bValidateColumnList );
85
86 /**
87 * @brief Pre process jobs to validate and assign values to the job structure
88 */
89 int preProcess(Job& job, int tableNo, TableInfo* tableInfo);
90
91 /**
92 * @brief Print job information
93 */
94 void printJob( );
95
96 /**
97 * @brief Process job
98 */
99 EXPORT int processJob( );
100
101 /**
102 * @brief Set Debug level for this BulkLoad object and any data members
103 */
104 void setAllDebug( DebugLevel level );
105
106 /**
107 * @brief Update next autoincrement value for specified OID.
108 * @param columnOID oid of autoincrement column to be updated
109 * @param nextValue next autoincrement value to assign to tableOID
110 */
111 static int updateNextValue(OID columnOID, uint64_t nextValue);
112
113 // Accessors and mutators
114 void addToCmdLineImportFileList(const std::string& importFile);
115 const std::string& getAlternateImportDir( ) const;
116 const std::string& getErrorDir ( ) const;
117 const std::string& getTimeZone ( ) const;
118 const std::string& getJobDir ( ) const;
119 const std::string& getSchema ( ) const;
120 const std::string& getTempJobDir ( ) const;
121 const std::string& getS3Key ( ) const;
122 const std::string& getS3Secret ( ) const;
123 const std::string& getS3Bucket ( ) const;
124 const std::string& getS3Host ( ) const;
125 const std::string& getS3Region ( ) const;
126 bool getTruncationAsError ( ) const;
127 BulkModeType getBulkLoadMode ( ) const;
128 bool getContinue ( ) const;
getJobUUID()129 boost::uuids::uuid getJobUUID ( ) const
130 {
131 return fUUID;
132 }
133
134 EXPORT int setAlternateImportDir( const std::string& loadDir,
135 std::string& errMsg);
136 void setImportDataMode ( ImportDataMode importMode );
137 void setColDelimiter ( char delim );
138 void setBulkLoadMode ( BulkModeType bulkMode,
139 const std::string& rptFileName );
140 void setEnclosedByChar ( char enChar);
141 void setEscapeChar ( char esChar);
142 void setKeepRbMetaFiles ( bool keepMeta );
143 void setMaxErrorCount ( unsigned int maxErrors );
144 void setNoOfParseThreads ( int parseThreads );
145 void setNoOfReadThreads ( int readThreads );
146 void setNullStringMode ( bool bMode );
147 void setParseErrorOnTable (int tableId, bool lockParseMutex);
148 void setParserNum ( int parser );
149 void setProcessName ( const std::string& processName );
150 void setReadBufferCount ( int noOfReadBuffers );
151 void setReadBufferSize ( int readBufferSize );
152 void setTxnID ( BRM::TxnID txnID );
153 void setVbufReadSize ( int vbufReadSize );
154 void setTruncationAsError ( bool bTruncationAsError );
155 void setJobUUID ( const std::string& jobUUID );
156 void setErrorDir ( const std::string& errorDir );
157 void setTimeZone ( const std::string& timeZone );
158 void setS3Key ( const std::string& key );
159 void setS3Secret ( const std::string& secret );
160 void setS3Bucket ( const std::string& bucket );
161 void setS3Host ( const std::string& host );
162 void setS3Region ( const std::string& region );
163 void setUsername ( const std::string& username );
164 // Timer functions
165 void startTimer ( );
166 void stopTimer ( );
167 double getTotalRunTime ( ) const;
168
169 void disableTimeOut ( const bool disableTimeOut);
170 bool disableTimeOut ( ) const;
171
disableConsoleOutput(const bool noConsoleOutput)172 static void disableConsoleOutput ( const bool noConsoleOutput)
173 {
174 fNoConsoleOutput = noConsoleOutput;
175 }
disableConsoleOutput()176 static bool disableConsoleOutput ( )
177 {
178 return fNoConsoleOutput;
179 }
180
181 // Add error message into appropriate BRM updater
182 static bool addErrorMsg2BrmUpdater(const std::string& tablename, const std::ostringstream& oss);
183 void setDefaultJobUUID ( );
184
185 private:
186
187 //--------------------------------------------------------------------------
188 // Private Data Members
189 //--------------------------------------------------------------------------
190 XMLJob fJobInfo; // current job information
191
192 boost::scoped_ptr<ColumnOp> fColOp; // column operation
193
194 std::string fRootDir; // job process root directory
195 std::string fJobFileName; // job description file name
196
197 Log fLog; // logger
198
199 int fNumOfParser; // total number of parser
200 char fColDelim; // delimits col values within a row
201
202 int fNoOfBuffers; // Number of read buffers
203 int fBufferSize; // Read buffer size
204 int fFileVbufSize; // Internal file system buffer size
205 long long fMaxErrors; // Max allowable errors per job
206 std::string fAlternateImportDir; // Alternate bulk import directory
207 std::string fErrorDir; // Opt. where error records record
208 std::string fProcessName; // Application process name
209 static boost::ptr_vector<TableInfo> fTableInfo;// Vector of Table information
210 int fNoOfParseThreads; // Number of parse threads
211 int fNoOfReadThreads; // Number of read threads
212 boost::thread_group fReadThreads; // Read thread group
213 boost::thread_group fParseThreads; // Parse thread group
214 boost::mutex fReadMutex; // Manages table selection by each
215 // read thread
216 boost::mutex fParseMutex; // Manages table/buffer/column
217 // selection by each parsing thread
218 BRM::TxnID fTxnID; // TransID acquired from SessionMgr
219 bool fKeepRbMetaFiles; // Keep/delete bulkRB metadata files
220 bool fNullStringMode; // Treat "NULL" as NULL value
221 char fEnclosedByChar; // Char used to enclose column value
222 char fEscapeChar; // Escape char within enclosed value
223 timeval fStartTime; // job start time
224 timeval fEndTime; // job end time
225 double fTotalTime; // elapsed time for current phase
226 std::vector<std::string> fCmdLineImportFiles; // Import Files from cmd line
227 BulkModeType fBulkMode; // Distributed bulk mode (1,2, or 3)
228 std::string fBRMRptFileName; // Name of distributed mode rpt file
229 bool fbTruncationAsError; // Treat string truncation as error
230 ImportDataMode fImportDataMode; // Importing text or binary data
231 bool fbContinue; // true when read and parse r running
232 //
233 static boost::mutex* fDDLMutex; // Insure only 1 DDL op at a time
234
235 EXPORT static const std::string DIR_BULK_JOB; // Bulk job directory
236 EXPORT static const std::string DIR_BULK_TEMP_JOB;// Dir for tmp job files
237 static const std::string DIR_BULK_IMPORT; // Bulk job import dir
238 static const std::string DIR_BULK_LOG; // Bulk job log directory
239 bool fDisableTimeOut; // disable timeout when waiting for table lock
240 boost::uuids::uuid fUUID; // job UUID
241 static bool fNoConsoleOutput; // disable output to console
242 std::string fTimeZone; // Timezone to use for TIMESTAMP data type
243 std::string fS3Key; // S3 Key
244 std::string fS3Secret; // S3 Secret
245 std::string fS3Host; // S3 Host
246 std::string fS3Bucket; // S3 Bucket
247 std::string fS3Region; // S3 Region
248 std::string fUsername; // data files owner name mysql by default
249
250 //--------------------------------------------------------------------------
251 // Private Functions
252 //--------------------------------------------------------------------------
253
254 // Spawn the worker threads.
255 void spawnWorkers();
256
257 // Checks if all tables have the status set
258 bool allTablesDone(Status status);
259
260 // Lock the table for read. Called by the read thread.
261 int lockTableForRead(int id);
262
263 // Get column for parsing. Called by the parse thread.
264 // @bug 2099 - Temporary hack to diagnose deadlock. Added report parm below.
265 bool lockColumnForParse(int id, // thread id
266 int& tableId, // selected table id
267 int& columnId, // selected column id
268 int& myParseBuffer, // selected parse buffer
269 bool report);
270
271 // Map specified DBRoot to it's first segment file number
272 int mapDBRootToFirstSegment(OID columnOid,
273 uint16_t dbRoot,
274 uint16_t& segment);
275
276 // The thread method for the read thread.
277 void read(int id);
278
279 // The thread method for the parse thread.
280 void parse(int id);
281
282 // Sleep method
283 void sleepMS(long int ms);
284
285 // Initialize auto-increment column for specified schema and table.
286 int preProcessAutoInc(
287 const std::string& fullTableName,// schema.table
288 ColumnInfo* colInfo); // ColumnInfo associated with AI column
289
290 // Determine starting HWM and LBID after block skipping added to HWM
291 int preProcessHwmLbid( const ColumnInfo* info,
292 int minWidth,
293 uint32_t partition,
294 uint16_t segment,
295 HWM& hwm,
296 BRM::LBID_t& lbid,
297 bool& bSkippedToNewExtent);
298
299 // Rollback any tables that are left in a locked state at EOJ.
300 int rollbackLockedTables( );
301
302 // Rollback a table left in a locked state.
303 int rollbackLockedTable( TableInfo& tableInfo );
304
305 // Save metadata info required for shared-nothing bulk rollback.
306 int saveBulkRollbackMetaData( Job& job, // current job
307 TableInfo* tableInfo, // TableInfo for table of interest
308 const std::vector<DBRootExtentInfo>& segFileInfo, //vector seg file info
309 const std::vector<BRM::EmDbRootHWMInfo_v>& dbRootHWMInfoPM);
310
311 // Manage/validate the list of 1 or more import data files
312 int manageImportDataFileList(Job& job, // current job
313 int tableNo, // table number of current job
314 TableInfo* tableInfo); // TableInfo for table of interest
315
316 // Break up list of file names into a vector of filename strings
317 int buildImportDataFileList(
318 const std::string& location,
319 const std::string& filename,
320 std::vector<std::string>& importFileNames);
321 };
322
323 //------------------------------------------------------------------------------
324 // Inline functions
325 //------------------------------------------------------------------------------
addToCmdLineImportFileList(const std::string & importFile)326 inline void BulkLoad::addToCmdLineImportFileList(const std::string& importFile)
327 {
328 fCmdLineImportFiles.push_back( importFile );
329 }
330
getAlternateImportDir()331 inline const std::string& BulkLoad::getAlternateImportDir( ) const
332 {
333 return fAlternateImportDir;
334 }
335
getErrorDir()336 inline const std::string& BulkLoad::getErrorDir( ) const
337 {
338 return fErrorDir;
339 }
340
getTimeZone()341 inline const std::string& BulkLoad::getTimeZone( ) const
342 {
343 return fTimeZone;
344 }
345
getJobDir()346 inline const std::string& BulkLoad::getJobDir( ) const
347 {
348 return DIR_BULK_JOB;
349 }
350
getSchema()351 inline const std::string& BulkLoad::getSchema( ) const
352 {
353 return fJobInfo.getJob().schema;
354 }
355
getTempJobDir()356 inline const std::string& BulkLoad::getTempJobDir( ) const
357 {
358 return DIR_BULK_TEMP_JOB;
359 }
360
getS3Key()361 inline const std::string& BulkLoad::getS3Key( ) const
362 {
363 return fS3Key;
364 }
365
getS3Secret()366 inline const std::string& BulkLoad::getS3Secret( ) const
367 {
368 return fS3Secret;
369 }
370
getS3Bucket()371 inline const std::string& BulkLoad::getS3Bucket( ) const
372 {
373 return fS3Bucket;
374 }
375
getS3Host()376 inline const std::string& BulkLoad::getS3Host( ) const
377 {
378 return fS3Host;
379 }
380
getS3Region()381 inline const std::string& BulkLoad::getS3Region( ) const
382 {
383 return fS3Region;
384 }
385
getTruncationAsError()386 inline bool BulkLoad::getTruncationAsError ( ) const
387 {
388 return fbTruncationAsError;
389 }
390
getBulkLoadMode()391 inline BulkModeType BulkLoad::getBulkLoadMode ( ) const
392 {
393 return fBulkMode;
394 }
395
getContinue()396 inline bool BulkLoad::getContinue ( ) const
397 {
398 return fbContinue;
399 }
400
printJob()401 inline void BulkLoad::printJob()
402 {
403 if (isDebug(DEBUG_1))
404 fJobInfo.printJobInfo(fLog);
405 else
406 fJobInfo.printJobInfoBrief(fLog);
407 }
408
setAllDebug(DebugLevel level)409 inline void BulkLoad::setAllDebug( DebugLevel level )
410 {
411 setDebugLevel( level );
412 fLog.setDebugLevel( level );
413 }
414
setColDelimiter(char delim)415 inline void BulkLoad::setColDelimiter( char delim )
416 {
417 fColDelim = delim;
418 }
419
setBulkLoadMode(BulkModeType bulkMode,const std::string & rptFileName)420 inline void BulkLoad::setBulkLoadMode(
421 BulkModeType bulkMode,
422 const std::string& rptFileName )
423 {
424 fBulkMode = bulkMode;
425 fBRMRptFileName = rptFileName;
426 }
427
setEnclosedByChar(char enChar)428 inline void BulkLoad::setEnclosedByChar( char enChar )
429 {
430 fEnclosedByChar = enChar;
431 }
432
setEscapeChar(char esChar)433 inline void BulkLoad::setEscapeChar( char esChar )
434 {
435 fEscapeChar = esChar;
436 }
437
setImportDataMode(ImportDataMode importMode)438 inline void BulkLoad::setImportDataMode(ImportDataMode importMode)
439 {
440 fImportDataMode = importMode;
441 }
442
setKeepRbMetaFiles(bool keepMeta)443 inline void BulkLoad::setKeepRbMetaFiles( bool keepMeta )
444 {
445 fKeepRbMetaFiles = keepMeta;
446 }
447
448 // Mutator takes an unsigned int, but we store in a long long, because...
449 // TableInfo which eventually needs this attribute, takes an unsigned int,
450 // but we want to be able to init to -1, to indicate when it has not been set.
setMaxErrorCount(unsigned int maxErrors)451 inline void BulkLoad::setMaxErrorCount( unsigned int maxErrors )
452 {
453 fMaxErrors = maxErrors;
454 }
455
setNoOfParseThreads(int parseThreads)456 inline void BulkLoad::setNoOfParseThreads(int parseThreads )
457 {
458 fNoOfParseThreads = parseThreads;
459 }
460
setNoOfReadThreads(int readThreads)461 inline void BulkLoad::setNoOfReadThreads( int readThreads )
462 {
463 fNoOfReadThreads = readThreads;
464 }
465
setNullStringMode(bool bMode)466 inline void BulkLoad::setNullStringMode( bool bMode )
467 {
468 fNullStringMode = bMode;
469 }
470
setParserNum(int parser)471 inline void BulkLoad::setParserNum( int parser )
472 {
473 fNumOfParser = parser;
474 }
475
setProcessName(const std::string & processName)476 inline void BulkLoad::setProcessName( const std::string& processName )
477 {
478 fProcessName = processName;
479 }
480
setReadBufferCount(int noOfReadBuffers)481 inline void BulkLoad::setReadBufferCount( int noOfReadBuffers )
482 {
483 fNoOfBuffers = noOfReadBuffers;
484 }
485
setReadBufferSize(int readBufferSize)486 inline void BulkLoad::setReadBufferSize( int readBufferSize )
487 {
488 fBufferSize = readBufferSize;
489 }
490
setTxnID(BRM::TxnID txnID)491 inline void BulkLoad::setTxnID( BRM::TxnID txnID )
492 {
493 fTxnID = txnID;
494 }
495
setVbufReadSize(int vbufReadSize)496 inline void BulkLoad::setVbufReadSize( int vbufReadSize )
497 {
498 fFileVbufSize = vbufReadSize;
499 }
500
setTruncationAsError(bool bTruncationAsError)501 inline void BulkLoad::setTruncationAsError(bool bTruncationAsError)
502 {
503 fbTruncationAsError = bTruncationAsError;
504 }
505
setErrorDir(const std::string & errorDir)506 inline void BulkLoad::setErrorDir( const std::string& errorDir )
507 {
508 fErrorDir = errorDir;
509 }
510
setTimeZone(const std::string & timeZone)511 inline void BulkLoad::setTimeZone( const std::string& timeZone )
512 {
513 fTimeZone = timeZone;
514 }
515
setS3Key(const std::string & key)516 inline void BulkLoad::setS3Key( const std::string& key )
517 {
518 fS3Key = key;
519 }
520
setS3Secret(const std::string & secret)521 inline void BulkLoad::setS3Secret( const std::string& secret )
522 {
523 fS3Secret = secret;
524 }
525
setS3Bucket(const std::string & bucket)526 inline void BulkLoad::setS3Bucket( const std::string& bucket )
527 {
528 fS3Bucket = bucket;
529 }
530
setS3Host(const std::string & host)531 inline void BulkLoad::setS3Host( const std::string& host )
532 {
533 fS3Host = host;
534 }
535
setS3Region(const std::string & region)536 inline void BulkLoad::setS3Region( const std::string& region )
537 {
538 fS3Region = region;
539 }
540
setUsername(const std::string & username)541 inline void BulkLoad::setUsername( const std::string& username )
542 {
543 fUsername = username;
544 }
545
startTimer()546 inline void BulkLoad::startTimer( )
547 {
548 gettimeofday( &fStartTime, 0 );
549 }
550
stopTimer()551 inline void BulkLoad::stopTimer()
552 {
553 gettimeofday( &fEndTime, 0 );
554 fTotalTime = (fEndTime.tv_sec + (fEndTime.tv_usec / 1000000.0)) -
555 (fStartTime.tv_sec + (fStartTime.tv_usec / 1000000.0));
556 }
557
getTotalRunTime()558 inline double BulkLoad::getTotalRunTime() const
559 {
560 return fTotalTime;
561 }
562
disableTimeOut(const bool disableTimeOut)563 inline void BulkLoad::disableTimeOut( const bool disableTimeOut)
564 {
565 fDisableTimeOut = disableTimeOut;
566 }
567
disableTimeOut()568 inline bool BulkLoad::disableTimeOut() const
569 {
570 return fDisableTimeOut;
571 }
572
573 } // end of namespace
574
575 #undef EXPORT
576
577 #endif // _WE_BULKLOAD_H_
578