1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 #include <unistd.h>
20 #include <cstdlib>
21 #include <cstdio>
22 #include <cstring>
23 #include <ctime>
24 
25 #include <vector>
26 #include <string>
27 #include <sstream>
28 #include <iostream>
29 #include <exception>
30 #include <stdexcept>
31 #include <cerrno>
32 using namespace std;
33 
34 #include <boost/uuid/uuid.hpp>
35 #include <boost/uuid/uuid_generators.hpp>
36 #include <boost/uuid/uuid_io.hpp>
37 #include <boost/filesystem.hpp>
38 
39 #include "dataconvert.h"
40 #include "liboamcpp.h"
41 using namespace oam;
42 
43 #include "we_cmdargs.h"
44 
45 #include "installdir.h"
46 #include "mcsconfig.h"
47 
48 namespace WriteEngine
49 {
50 
51 //----------------------------------------------------------------------
52 //----------------------------------------------------------------------
WECmdArgs(int argc,char ** argv)53 WECmdArgs::WECmdArgs(int argc, char** argv) :
54     fMultiTableCount(0),
55     fJobLogOnly(false),
56     fHelp(false),
57     fMode(1),
58     fArgMode(-1),
59     fQuiteMode(true),
60     fConsoleLog(false),
61     fVerbose(0),
62     fBatchQty(10000),
63     fNoOfReadThrds(0),
64     fDebugLvl(0),
65     fMaxErrors(-1),
66     fReadBufSize(0),
67     fIOReadBufSize(0),
68     fSetBufSize(0),
69     fColDelim('|'),
70     fEnclosedChar(0),
71     fEscChar(0),
72     fNoOfWriteThrds(0),
73     fNullStrMode(false),
74     fImportDataMode(IMPORT_DATA_TEXT),
75     fCpiInvoke(false),
76     fBlockMode3(false),
77     fbTruncationAsError(false),
78     fUUID(boost::uuids::nil_generator()()),
79     fConsoleOutput(true),
80     fTimeZone("SYSTEM"),
81     fErrorDir(string(MCSLOGDIR)+"/cpimport/")
82 {
83     try
84     {
85         appTestFunction();
86         parseCmdLineArgs(argc, argv);
87     }
88     catch (std::exception& exp)
89     {
90         std::string exceptMsg( exp.what() );
91         exceptMsg += "\nTry 'cpimport -h' for more information.";
92         throw (runtime_error( exceptMsg ));
93     }
94 }
95 
96 //----------------------------------------------------------------------
97 
appTestFunction()98 void WECmdArgs::appTestFunction()
99 {
100 
101     // testing begins
102     //std::string aJobFile("/home/bpaul/Calpont/data/bulk/job/Job_1234.xml");
103     //std::string aJobFile("/usr/local/mariadb/columnstore/data/bulk/job/Job_1234.xml");
104     //setSchemaAndTableFromJobFile(aJobFile);
105     //setEnclByAndEscCharFromJobFile(aJobFile);
106     //exit(1);
107 
108     //testing ends
109     return;
110 }
111 
112 //----------------------------------------------------------------------
getCpImportCmdLine()113 std::string WECmdArgs::getCpImportCmdLine()
114 {
115     std::ostringstream aSS;
116     std::string aCmdLine;
117 
118 
119     aSS << fPrgmName;	//prgm name as arg[0]
120 
121     if (fHelp)
122     {
123         aSS << " -h ";
124         aCmdLine = aSS.str();
125         return aCmdLine;
126     }
127 
128     //checkJobIdCase();			// check if JobID
129 
130 
131     if ((fPmFile.length() > 0) && (0 == getMode()))
132         aSS << " -l " << fPmFile;
133 
134     //BUG 4379 if -m is not given while prep args, default is m=1 but
135     //on single node -m will set it to 3, after prep args.
136     //if((fPmFilePath.length()>0)&&(1!=getMode()))
137     //	aSS << " -f " << fPmFilePath;
138     if ((fPmFilePath.length() > 0) && (1 != getMode()))
139     {
140         if (fPmFilePath == "STDIN")	//if stdin, must pass it
141             aSS << " -f " << fPmFilePath;
142         else if (2 == getArgMode())	//Mode 2 we need to pass the -f option
143             aSS << " -f " << fPmFilePath;
144         else if (3 == getArgMode()) 	//-m given, -f built in already.
145             aSS << " -f " << fPmFilePath;
146         else if (0 == fLocFile.length()) //No filename given, from job file
147             aSS << " -f " << fPmFilePath;
148     }
149 
150     if (fErrorDir.length() > 0)
151         aSS << " -L " << fErrorDir;
152 
153     if (fUsername.length() > 0)
154         aSS << " -U " << fUsername;
155 
156     if (fJobId.length() > 0)
157         aSS << " -j " << fJobId;
158 
159     if (fNoOfReadThrds > 0)
160         aSS << " -r " << fNoOfReadThrds;
161 
162     if (fNoOfWriteThrds > 0)
163         aSS << " -w " << fNoOfWriteThrds;
164 
165     if (fMaxErrors >= 0)
166         aSS << " -e " << fMaxErrors;
167 
168     // BUG 5088
169     if (fDebugLvl > 0)
170         aSS << " -d " << fDebugLvl;
171 
172     if (fSetBufSize > 0)
173         aSS << " -B " << fSetBufSize;
174 
175     if (fColDelim != '|')
176     {
177         if (fColDelim == '\t')
178             aSS << " -s " << "\\t"; //- working with user '\t'
179         // NONE of the following will work.
180         //aSS << " -s " << "\t"; //aSS << " -s " << "\"\\t\""; //aSS << " -s " << "'\\t'";
181         else
182             aSS << " -s " << fColDelim;
183     }
184 
185     if (fEnclosedChar != 0)
186         aSS << " -E " << fEnclosedChar;
187 
188     if (fEscChar != 0)
189         aSS << " -C " << fEscChar;
190 
191     if (fNullStrMode)
192         aSS << " -n " << '1';
193 
194     if (fImportDataMode != IMPORT_DATA_TEXT)
195         aSS << " -I " << fImportDataMode;
196 
197     //if(fConfig.length()>0)
198     //	aSS << " -c " << fConfig;
199     if (fReadBufSize > 0)
200     {
201         cout << "setting fReadBufSize = " << fReadBufSize;
202         aSS << " -c " << fReadBufSize;
203     }
204 
205     if (fIOReadBufSize > 0)
206         aSS << " -b " << fIOReadBufSize;
207 
208 
209     if ((fJobPath.length() > 0) && (fMode == 3))
210         aSS << " -p " << fJobPath;
211 
212 
213     if (fConsoleLog)
214         aSS << " -i ";
215 
216     if ((fMode == 1) || (fMode == 2))
217     {
218         aSS << " -R " << getBrmRptFileName();
219         aSS << " -m " << fMode;
220     }
221 
222     aSS << " -P " << getModuleID();
223     aSS << " -T " << fTimeZone;
224 
225     if (fbTruncationAsError)
226         aSS << " -S ";
227 
228     if (!fS3Key.empty() && !(fMode == 0 || fMode == 1))
229     {
230         if (fS3Secret.empty() || fS3Bucket.empty() || fS3Region.empty())
231             throw (runtime_error("Not all required S3 options provided"));
232         aSS << " -y " << fS3Key;
233         aSS << " -K " << fS3Secret;
234         aSS << " -t " << fS3Bucket;
235         aSS << " -g " << fS3Region;
236 
237         if (!fS3Host.empty())
238         {
239             aSS << " -H " << fS3Host;
240         }
241     }
242 
243     if ((fJobId.length() > 0) && (fMode == 1) && (!fJobLogOnly))
244     {
245         // if JobPath provided, make it w.r.t WES
246         aSS << " -p " << fTmpFileDir;
247         aSS << " -fSTDIN";
248     }
249     else if ((fJobId.length() > 0) && (fMode == 2) && (!fJobLogOnly))
250     {
251         // if JobPath provided, make it w.r.t WES
252         aSS << " -p " << fTmpFileDir;
253 
254         if (fPmFile.length() > 0)
255             aSS << " -l " << fPmFile;
256 
257         if (fPmFilePath.length() > 0)
258             aSS << " -f " << fPmFilePath;
259     }
260     else	// do not provide schema & table with JobId
261     {
262 
263         if (!fUUID.is_nil())
264             aSS << " -u" << boost::uuids::to_string(fUUID);
265 
266         if (fSchema.length() > 0)
267             aSS << " " << fSchema;
268         //else if((fMode != 0)||(fMode==3))	//TODO make it mode3 + jobID
269         else if (fJobId.length() > 0)
270         { }// may or may not contain Schema.
271         //else if((fMode == 1)||(fMode==2))	//TODO make it mode3 + jobID
272         else if (fMode != 0)
273             throw (runtime_error("Schema not available"));
274 
275         if (fTable.length() > 0)
276             aSS << " " << fTable;
277         else if (fJobId.length() > 0)
278         {} //may or may not contain Table.
279         else if (fMode != 0)
280             throw (runtime_error("Tablename not available"));
281 
282         //else if((fMode != 0)||(fMode==3))	//TODO make it mode3 + jobID
283         //else if((fMode == 1)||(fMode == 2))	//TODO make it mode3 + jobID
284         //    throw (runtime_error("Tablename not available"));
285         if ((fPmFile.length() > 0) && (2 == getMode()))
286         {
287             //if(fPmFilePath.length()>0)
288             //	aSS << " " << fPmFilePath;
289             aSS << " " << fPmFile;
290         }
291         else if (2 == getMode())
292             throw (runtime_error("loadFile [-l ] not available"));
293 
294     }
295 
296     if ((fLocFile.length() > 0) && (fLocFile != "STDIN") && (3 == getMode()))
297     {
298         //Bug 4342 multi-files mode 3 support
299         //convert all the spaces into 'commas'
300         if (fLocFile.find_first_of(' ') == string::npos)
301             aSS << " " << fLocFile;
302         else
303         {
304             std::string aLocFiles = replaceCharInStr(fLocFile, ' ', ',');
305             aSS << " " << aLocFiles;
306         }
307     }
308 
309     try
310     {
311         aCmdLine = aSS.str();
312     }
313     catch (exception&)
314     {
315         throw runtime_error("getcpImportCmdLine failed");
316     }
317 
318     return aCmdLine;
319 }
320 
321 
322 //----------------------------------------------------------------------
323 
324 //BUG 4205 (part FIX) - need to implement more into it
checkForCornerCases()325 bool WECmdArgs::checkForCornerCases()
326 {
327     //BUG 4210
328     this->checkJobIdCase();		//Need to do this before we go further
329 
330 
331     if (fMode == 0)
332     {
333         if (!fJobId.empty())
334         {
335             //cout << "Invalid option mode 0 with a Job File" << endl;
336             throw (runtime_error("Mode 0 with a Job file option is not valid!!"
337                                  "\nTry 'cpimport -h' for more information."));
338         }
339         else if (!fJobPath.empty())
340         {
341             cout << "Invalid option mode 0 with a Job Path" << endl;
342             throw (runtime_error("Mismatched options"
343                                  "\nTry 'cpimport -h' for more information."));
344         }
345         else if (!fSchema.empty())
346         {
347             cout << "Invalid option in mode 0 with a schema name" << endl;
348             throw (runtime_error("Mismatched options."));
349         }
350         else if (!fTable.empty())
351         {
352             cout << "Invalid option in mode 0 with a table name" << endl;
353             throw (runtime_error("Mismatched options."));
354         }
355         else if ((!fPmFilePath.empty()) && (fPmFilePath != "STDIN"))
356         {
357             cout << "Invalid option -f in Mode 0 with value other than STDIN" << endl;
358             throw (runtime_error("Mismatched options."));
359         }
360 
361         if (fSetBufSize)
362         {
363             cout << "Invalid option -B with Mode 0" << endl;
364             throw (runtime_error("Mismatched options."));
365         }
366         else if (fIOReadBufSize)
367         {
368             cout << "Invalid option -b with Mode 0" << endl;
369             throw (runtime_error("Mismatched options."));
370         }
371         else if (fMaxErrors >= 0)
372         {
373             cout << "Invalid option -e with Mode 0" << endl;
374             throw (runtime_error("Mismatched options."));
375         }
376         else if (fConsoleLog)
377         {
378             cout << "Invalid option -i with Mode 0" << endl;
379             throw (runtime_error("Mismatched options."));
380         }
381         else if (fReadBufSize)
382         {
383             cout << "Invalid option -c with Mode 0" << endl;
384             throw (runtime_error("Mismatched options."));
385         }
386         else if (fNoOfReadThrds)
387         {
388             cout << "Invalid option -r with Mode 0" << endl;
389             throw (runtime_error("Mismatched options."));
390         }
391         else if (fNoOfWriteThrds)
392         {
393             cout << "Invalid option -w with Mode 0" << endl;
394             throw (runtime_error("Mismatched options."));
395         }
396 
397         if (fImportDataMode != IMPORT_DATA_TEXT)
398         {
399             cout << "Invalid option -I with Mode 0" << endl;
400             throw (runtime_error("Mismatched options."));
401         }
402 
403     }
404 
405     if (fMode == 1)
406     {
407         if (!fJobId.empty())
408         {
409             if ((!fPmFilePath.empty()) && (fPmFilePath == "STDIN"))
410             {
411                 // do not do anything.. this is good.
412             }
413         }
414         // Mode 1, if Input Path is existing and input file is not existing
415         // it is an error, bce it assumes all the files in directory.
416         // In mode 2, we are passing info to cpimport.bin, which will take care
417         // of it, as in Mode 3.
418         else if ((!fPmFilePath.empty()) && (fPmFile.empty()))
419         {
420             // assumed since position param is missing
421             if ((fLocFile == "STDIN") && (fPmFilePath != "STDIN"))
422             {
423                 cout << "Invalid options in Mode 1 : option -l " << endl;
424                 cout << " or input file position parameter needed" << endl;
425                 //cout << "\tOption (-j) should follow with option -l option or "
426                 //		"an input file position parameter" << endl;
427                 throw (runtime_error("In Mode 1 Error occurred!! "
428                                      "\nTry 'cpimport -h' for more information."));
429             }
430         }
431     }
432 
433     if (fMode == 2)
434     {
435         if (fPmFile.empty())
436             throw(runtime_error("Mode 2 require PM based filename [-l]"
437                                 "\nTry 'cpimport -h' for more information."));
438 
439         if ((fPmFilePath.empty()) && (fPmFile.at(0) != '/'))
440             throw(runtime_error("Mode 2 require remote file opts -f and -l or "\
441                                 "a fully qualified path for the remote file."
442                                 "\nTry 'cpimport -h' for more information."));
443         if (!fS3Key.empty())
444             throw(runtime_error("Mode 2 & an input file from S3 does not make sense."));
445     }
446 
447     if (fMode == 3)
448     {
449         if (fPmVec.size())
450         {
451             cout << "Invalid option -P with Mode 3" << endl;
452             throw (runtime_error("Mismatched options."));
453         }
454 
455     }
456 
457     return true;
458 }
459 
460 //----------------------------------------------------------------------
461 
str2PmList(std::string & PmList,VecInts & V)462 bool WECmdArgs::str2PmList(std::string& PmList, VecInts& V)
463 {
464     const int BUFLEN = 512;
465     char aBuff[BUFLEN];
466 
467 
468     int aLen = PmList.length();
469 
470     if (aLen > 0)
471     {
472         strncpy(aBuff, PmList.c_str(), BUFLEN);
473         aBuff[BUFLEN - 1] = 0;
474     }
475     else
476         return false;
477 
478     char* pTok = strtok(aBuff, ",");
479 
480     while (pTok != NULL)
481     {
482         int aPmId = 0;
483 
484         try
485         {
486             aPmId = atoi(pTok);
487             V.push_back(aPmId);
488         }
489         catch (exception& ex)
490         {
491             std::stringstream aErr;
492             aErr << "Wrong PM id format : " << ex.what();
493             //cout << "Wrong PM id format : "<< ex.what() << endl;
494             throw (runtime_error(aErr.str()));
495         }
496 
497         pTok = strtok(NULL, ",");
498     }
499 
500     return true;
501 }
502 
503 //----------------------------------------------------------------------
504 
usage()505 void WECmdArgs::usage()
506 {
507     cout << "Simple usage using positional parameters (no XML job file):\n";
508     cout << "\tcpimport dbName tblName [loadFile] [-h] [-m mode]\n";
509     cout << "\t\t [-f path] [-d debugLevel] [-c readbufSize] [-b readBufs] \n";
510     cout << "\t\t [-r readers] [-j JobID] [-e maxErrs] [-B libBufSize] [-w parsers]\n";
511     cout << "\t\t [-s c] [-E enclosedChar] [-C escapeChar] [-n NullOption]\n";
512     cout << "\t\t [-q batchQty] [-p jobPath] [-P list of PMs] [-S] [-i] [-v verbose]\n";
513     cout << "\t\t [-I binaryOpt] [-T timeZone]\n";
514 
515 
516     cout << "Traditional usage without positional parameters (XML job file required):\n";
517     cout << "\tcpimport -j jobID\n";
518     cout << "\t\t [-h] [-m mode] [-r readers] [-w parsers] [-s c] [-f path]\n";
519     cout << "\t\t [-b readBufs] [-p path] [-c readBufSize] [-e maxErrs] [-B libBufSize]\n";
520     cout << "\t\t [-n NullOption] [-E encloseChar] [-C escapeChar] [-i] [-v verbose]\n";
521     cout << "\t\t [-d debugLevel] [-q batchQty] [-l loadFile] [-P list of PMs] [-S]\n";
522     cout << "\t\t [-I binaryOpt] [-T timeZone]\n";
523 
524     cout << "\n\nPositional parameters:\n";
525     cout << "\tdbName     Name of the database to load\n";
526     cout << "\ttblName    Name of table to load\n";
527     cout << "\tloadFile   Optional input file name in current directory,\n";
528     cout << "\t\t\tunless a fully qualified name is given.\n";
529     cout << "\t\t\tIf not given, input read from STDIN.\n";
530 
531     cout << "\n\nOptions:\n"
532          << "\t-b\tNumber of read buffers\n"
533          << "\t-c\tApplication read buffer size(in bytes)\n"
534          << "\t-d\tPrint different level(1-3) debug message\n"
535          << "\t-e\tMax number of allowable error per table per PM\n"
536          << "\t-f\tData file directory path.\n"
537          << "\t\t\tDefault is current working directory.\n"
538          << "\t\t\tIn Mode 1, -f represents the local input file path.\n"
539          << "\t\t\tIn Mode 2, -f represents the PM based input file path.\n"
540          << "\t\t\tIn Mode 3, -f represents the local input file path.\n"
541          << "\t-l\tName of import file to be loaded, relative to -f path,\n"
542          << "\t-h\tPrint this message.\n"
543          << "\t-q\tBatch Quantity, Number of rows distributed per batch in Mode 1\n"
544          << "\t-i\tPrint extended info to console in Mode 3.\n"
545          << "\t-j\tJob ID. In simple usage, default is the table OID.\n"
546          << "\t\t\tunless a fully qualified input file name is given.\n"
547          << "\t-n\tNullOption (0-treat the string NULL as data (default);\n"
548          << "\t\t\t1-treat the string NULL as a NULL value)\n"
549          << "\t-p\tPath for XML job description file.\n"
550          << "\t-r\tNumber of readers.\n"
551          << "\t-s\t'c' is the delimiter between column values.\n"
552          << "\t-B\tI/O library read buffer size (in bytes)\n"
553          << "\t-w\tNumber of parsers.\n"
554          << "\t-E\tEnclosed by character if field values are enclosed.\n"
555          << "\t-C\tEscape character used in conjunction with 'enclosed by'\n"
556          << "\t\t\tcharacter, or as part of NULL escape sequence ('\\N');\n"
557          << "\t\t\tdefault is '\\'\n"
558          << "\t-I\tImport binary data; how to treat NULL values:\n"
559          << "\t\t\t1 - import NULL values\n"
560          << "\t\t\t2 - saturate NULL values\n"
561          << "\t-P\tList of PMs ex: -P 1,2,3. Default is all PMs.\n"
562          << "\t-S\tTreat string truncations as errors.\n"
563          << "\t-m\tmode\n"
564          << "\t\t\t1 - rows will be loaded in a distributed manner across PMs.\n"
565          << "\t\t\t2 - PM based input files loaded onto their respective PM.\n"
566          << "\t\t\t3 - input files will be loaded on the local PM.\n"
567          << "\t-T\tTimezone used for TIMESTAMP datatype.\n"
568          << "\t\tPossible values: \"SYSTEM\" (default)\n"
569          << "\t\t               : Offset in the form +/-HH:MM\n"
570          << "\t-y\tS3 Authentication Key (for S3 imports)\n"
571          << "\t-K\tS3 Authentication Secret (for S3 imports)\n"
572          << "\t-t\tS3 Bucket (for S3 imports)\n"
573          << "\t-H\tS3 Hostname (for S3 imports, Amazon's S3 default)\n"
574          << "\t-g\tS3 Region (for S3 imports)\n"
575          << "\t-L\tDirectory for the output .err and .bad files.\n"
576          << "\t\tDefault is " << string(MCSLOGDIR);
577 
578 
579     cout << "\nExample1: Traditional usage\n"
580          << "\tcpimport -j 1234";
581     cout << "\nExample2: Some column values are enclosed within double quotes.\n"
582          << "\tcpimport -j 3000 -E '\"'";
583     cout << "\nExample3: Import a nation table without a Job XML file\n"
584          << "\tcpimport -j 301 tpch nation nation.tbl";
585     cout << "\nExample4: Import a nation table to all PMs in Mode 1\n"
586          << "\tcpimport -m 1 tpch nation nation.tbl";
587     cout << "\nExample4: Import a nation table to only PM1 and PM2 in Mode 1\n"
588          << "\tcpimport -m 1 -P 1,2 tpch nation nation.tbl";
589     cout << "\nExample5: Import nation.tbl from PMs to nation table in Mode 2\n"
590          << "\tcpimport -m 2 tpch nation -f /var/lib/columnstore/data/bulk/data/import/ -l nation.tbl";
591     cout << "\nExample6: Import nation.tbl in mode 3\n"
592          << "\tcpimport -m 3 tpch nation nation.tbl\n\n";
593 
594 
595     exit(1);
596 }
597 
598 
599 //-----------------------------------------------------------------------------
600 
parseCmdLineArgs(int argc,char ** argv)601 void WECmdArgs::parseCmdLineArgs(int argc, char** argv)
602 {
603     int aCh;
604     std::string importPath;
605     bool aJobType = false;
606 
607 
608     if (argc > 0)
609         fPrgmName = string(MCSBINDIR) + "/" + "cpimport.bin"; //argv[0] is splitter but we need cpimport
610 
611     while ((aCh = getopt(argc, argv,
612                          "d:j:w:s:v:l:r:b:e:B:f:q:ihm:E:C:P:I:n:p:c:ST:Ny:K:t:H:g:U:L:"))
613             != EOF)
614     {
615         switch (aCh)
616         {
617             case 'm':
618             {
619                 fArgMode = atoi(optarg);
620 
621                 //cout << "Mode level set to " << fMode << endl;
622                 if ((fArgMode > -1) && (fArgMode <= 3)) {}
623                 else
624                     throw runtime_error("Wrong Mode level");
625 
626                 break;
627             }
628 
629             case 'B':
630             {
631                 errno = 0;
632                 long lValue = strtol(optarg, 0, 10);
633 
634                 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
635                     throw runtime_error("Option -B is invalid or out of range");
636 
637                 fSetBufSize = lValue;
638                 break;
639             }
640 
641             case 'b':
642             {
643                 errno = 0;
644                 long lValue = strtol(optarg, 0, 10);
645 
646                 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
647                     throw runtime_error("Option -b is invalid or out of range");
648 
649                 fIOReadBufSize = lValue;
650                 break;
651             }
652 
653             case 'e':
654             {
655                 errno = 0;
656                 long lValue = strtol(optarg, 0, 10);
657 
658                 if ((errno != 0) || (lValue < 0) || (lValue > INT_MAX))
659                     throw runtime_error("Option -e is invalid or out of range");
660 
661                 fMaxErrors = lValue;
662                 break;
663             }
664 
665             case 'i':
666             {
667                 fConsoleLog = true;
668                 break;
669             }
670 
671             case 'c':
672             {
673                 errno = 0;
674                 long lValue = strtol(optarg, 0, 10);
675 
676                 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
677                     throw runtime_error("Option -c is invalid or out of range");
678 
679                 fReadBufSize = lValue;
680                 break;
681             }
682 
683             case 'j': // -j: jobID
684             {
685                 errno = 0;
686                 long lValue = strtol(optarg, 0, 10);
687 
688                 if ((errno != 0) || (lValue < 0) || (lValue > INT_MAX))
689                     throw runtime_error("Option -j is invalid or out of range");
690 
691                 fJobId = optarg;
692                 fOrigJobId = fJobId;	// in case if we need to split it.
693 
694                 if (0 == fJobId.length()) throw runtime_error("Wrong JobID Value");
695 
696                 aJobType = true;
697                 break;
698             }
699 
700             case 'v': // verbose
701             {
702                 string aVerbLen = optarg;
703                 fVerbose = aVerbLen.length();
704                 fDebugLvl = fVerbose;
705                 break;
706             }
707 
708             case 'd': // -d debug
709             {
710                 errno = 0;
711                 long lValue = strtol(optarg, 0, 10);
712 
713                 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
714                     throw runtime_error("Option -d is invalid or out of range");
715 
716                 fDebugLvl = lValue;
717 
718                 if (fDebugLvl > 0 && fDebugLvl <= 3)
719                 {
720                     cout << "\nDebug level set to " << fDebugLvl << endl;
721                 }
722                 else
723                 {
724                     throw runtime_error("Wrong Debug level");
725                 }
726 
727                 break;
728             }
729 
730             case 'r': // -r: num read threads
731             {
732                 errno = 0;
733                 long lValue = strtol(optarg, 0, 10);
734 
735                 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
736                     throw runtime_error("Option -r is invalid or out of range");
737 
738                 fNoOfReadThrds = lValue;
739                 break;
740             }
741 
742             case 'w': // -w: num parse threads
743             {
744                 errno = 0;
745                 long lValue = strtol(optarg, 0, 10);
746 
747                 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
748                     throw runtime_error("Option -w is invalid or out of range");
749 
750                 fNoOfWriteThrds = lValue;
751                 break;
752             }
753 
754             case 's': // -s: column delimiter
755             {
756                 if (!strcmp(optarg, "\\t"))
757                 {
758                     fColDelim = '\t';
759 
760                     if (fDebugLvl) cout << "Column delimiter : " << "\\t" << endl;
761                 }
762                 else
763                 {
764                     fColDelim = optarg[0];
765 
766                     if (fDebugLvl) cout << "Column delimiter : " << fColDelim << endl;
767                 }
768 
769                 break;
770             }
771 
772             case 'l': // -l: if JobId (-j), it can be input file
773             {
774                 fPmFile = optarg;
775 
776                 if (0 == fPmFile.length()) throw runtime_error("Wrong local filename");
777 
778                 break;
779             }
780 
781             case 'f': // -f: import file path
782             {
783                 fPmFilePath = optarg;
784                 break;
785             }
786 
787             case 'n': // -n: treat "NULL" as null
788             {
789                 // default is 0, ie it is equal to not giving this option
790                 int nullStringMode = atoi(optarg);
791 
792                 if ((nullStringMode != 0) && (nullStringMode != 1))
793                 {
794                     throw (runtime_error(
795                                "Invalid NULL option; value can be 0 or 1"));
796                 }
797 
798                 if (nullStringMode)
799                     fNullStrMode = true;
800                 else
801                     fNullStrMode = false; // This is default
802 
803                 break;
804             }
805 
806             case 'P': // -p: list of PM's
807             {
808                 try
809                 {
810                     std::string aPmList = optarg;
811 
812                     if (!str2PmList(aPmList, fPmVec))
813                         throw(runtime_error("PM list is wrong"));
814                 }
815                 catch (runtime_error& ex)
816                 {
817                     throw (ex);
818                 }
819 
820                 break;
821             }
822 
823             case 'p':
824             {
825                 fJobPath = optarg;
826                 break;
827             }
828 
829             case 'E': // -E: enclosed by char
830             {
831                 fEnclosedChar = optarg[0];
832                 //cout << "Enclosed by Character : " << optarg[0] << endl;
833                 break;
834             }
835 
836             case 'C': // -C: enclosed escape char
837             {
838                 fEscChar = optarg[0];
839                 //cout << "Escape Character  : " << optarg[0] << endl;
840                 break;
841             }
842 
843             case 'h': // -h: help
844             {
845                 //usage(); // will exit(1) here
846                 fHelp = true;
847                 break;
848             }
849 
850             case 'I': // -I: binary mode (null handling)
851             {
852                 // default is text mode, unless -I option is specified
853                 int binaryMode = atoi(optarg);
854 
855                 if (binaryMode == 1)
856                 {
857                     fImportDataMode = IMPORT_DATA_BIN_ACCEPT_NULL;
858                 }
859                 else if (binaryMode == 2)
860                 {
861                     fImportDataMode = IMPORT_DATA_BIN_SAT_NULL;
862                 }
863                 else
864                 {
865                     throw (runtime_error(
866                                "Invalid Binary mode; value can be 1 or 2"));
867                 }
868 
869                 break;
870             }
871 
872             case 'S': // -S: Treat string truncations as errors
873             {
874                 setTruncationAsError(true);
875                 //cout << "TruncationAsError  : true" << endl;
876                 break;
877             }
878 
879             case 'T':
880             {
881                 std::string timeZone = optarg;
882                 long offset;
883 
884                 if (timeZone != "SYSTEM" && dataconvert::timeZoneToOffset(timeZone.c_str(), timeZone.size(), &offset))
885                 {
886                     throw (runtime_error(
887                                "Value for option -T is invalid"));
888                 }
889 
890                 fTimeZone = timeZone;
891                 break;
892             }
893 
894             case 'q': // -q: batch quantity - default value is 10000
895             {
896                 errno = 0;
897                 long long lValue = strtoll(optarg, 0, 10);
898 
899                 if ((errno != 0) || (lValue < 1) || (lValue > UINT_MAX))
900                     throw runtime_error("Option -q is invalid or out of range");
901 
902                 fBatchQty = lValue;
903 
904                 if (fBatchQty < 10000) fBatchQty = 10000;
905                 else if (fBatchQty > 100000) fBatchQty = 10000;
906 
907                 break;
908             }
909 
910             case 'N': //-N no console output
911             {
912                 fConsoleOutput = false;
913                 break;
914             }
915 
916             case 'y': //-y S3 Key
917             {
918                 fS3Key = optarg;
919                 break;
920             }
921 
922             case 'K': //-K S3 Secret
923             {
924                 fS3Secret = optarg;
925                 break;
926             }
927 
928             case 'H': //-H S3 Host
929             {
930                 fS3Host = optarg;
931                 break;
932             }
933 
934             case 't': //-t S3 bucket
935             {
936                 fS3Bucket = optarg;
937                 break;
938             }
939 
940             case 'g': //-g S3 Region
941             {
942                 fS3Region = optarg;
943                 break;
944             }
945 
946             case 'U': //-U username of the files owner
947             {
948                 fUsername = optarg;
949                 break;
950             }
951 
952             case 'L': // -L set the output location of .bad/.err files
953             {
954                 fErrorDir = optarg;
955                 break;
956             }
957 
958             default:
959             {
960                 std::string aErr = "Unknown command line option " + aCh;
961                 //cout << "Unknown command line option " << aCh << endl;
962                 throw (runtime_error(aErr));
963             }
964         }
965     }
966 
967     if (fHelp) usage();	//BUG 4210
968 
969     if (fArgMode != -1) fMode = fArgMode;	//BUG 4210
970 
971     std::string bulkRootPath = getBulkRootDir();
972 
973     checkForBulkLogDir(bulkRootPath);
974 
975     if (2 == fArgMode && fPmFilePath.empty())
976         throw runtime_error("-f option is mandatory with mode 2.");
977 
978     if (aJobType)
979     {
980         if (0 == fArgMode) throw runtime_error("Incompatible mode and option types");
981 
982         if (optind < argc)
983         {
984             fSchema = argv[optind]; // 1st pos parm
985             optind++;
986 
987             if (optind < argc)
988             {
989                 fTable = argv[optind]; // 2nd pos parm
990                 optind++;
991             }
992             else
993             {
994                 // if schema is there, table name should be there
995                 throw runtime_error("No table name specified with schema.");
996             }
997 
998             if (optind < argc) // see if input file name is given
999             {
1000                 // 3rd pos parm
1001                 fLocFile = argv[optind];
1002 
1003                 if ((fLocFile.at(0) != '/') && (fLocFile != "STDIN"))
1004                 {
1005                     std::string aTmp = fLocFile;
1006 
1007                     // BUG 4379 -f given? use that
1008                     if ((!fPmFilePath.empty()) && (fMode == 1))
1009                         fLocFile = fPmFilePath + "/" + aTmp;
1010                     else if (fPmFilePath.empty())
1011                         fLocFile = bulkRootPath + "/data/import/" + aTmp;
1012                 }
1013             }
1014             else
1015             {
1016                 if (!fPmFile.empty())
1017                     fLocFile = fPmFile;
1018                 //BUG 4186
1019                 //else  // else take it from the jobxml file
1020                 //	fLocFile = "STDIN";
1021                 //Historically cpimport works with jobfile as
1022                 // -l <fileName> && -f <filePath>   or
1023                 // -fSTDIN as the stdin, it will override colxml loadfile entry
1024                 // if -fSTDIN is not provided get i/p file from jobfile
1025                 else if ((!fPmFilePath.empty()) && (fPmFilePath == "STDIN"))
1026                     fLocFile = "STDIN";
1027 
1028                 // else take it from the jobxml file
1029             }
1030 
1031             if ((fSchema.length() > 0) && (fTable.length() > 0) && (fLocFile.length() > 0))
1032                 fJobLogOnly = true;
1033         }
1034         else
1035         {
1036             if (!fPmFile.empty())
1037             {
1038                 fLocFile = fPmFile;
1039 
1040                 if (!fPmFilePath.empty())
1041                 {
1042                     if (fPmFilePath == "STDIN")
1043                     {
1044                         throw runtime_error("Conflicting options -l and -fSTDIN");
1045                     }
1046                     else
1047                     {
1048                         std::string aTmp = fLocFile;
1049 
1050                         if ((!fPmFilePath.empty()) && (fMode == 1)) //BUG 4379 -f given? use that
1051                             fLocFile = fPmFilePath + "/" + aTmp;
1052                         else if (!fPmFilePath.empty())
1053                             fLocFile = bulkRootPath + "/data/import/" + aTmp;
1054                     }
1055 
1056                 }
1057 
1058                 if ((fLocFile.at(0) != '/') && (fLocFile != "STDIN") && (fPmFilePath.empty()))
1059                 {
1060                     std::string aTmp = fLocFile;
1061                     fLocFile = bulkRootPath + "/data/import/" + aTmp;
1062                 }
1063             }
1064             //BUG 4186
1065             //else
1066             //	fLocFile = "STDIN";
1067             //Historically cpimport works with jobfile as
1068             // -l <fileName> && -f <filePath>   or
1069             // -fSTDIN as the stdin, it will override colxml loadfile entry
1070             // if -fSTDIN is not provided get i/p file from jobfile
1071             else if ((!fPmFilePath.empty()) && (fPmFilePath == "STDIN"))
1072                 fLocFile = "STDIN";
1073 
1074             // else take it from the jobxml file
1075         }
1076 
1077     }
1078     // Get positional arguments, User can provide:
1079     // 1. no positional parameters	- Mode 0 & stdin
1080     // 2. Two positional parameters (schema and table names) - Mode 1/2, stdin
1081     // 3. Three positional parameters (schema, table, and import file name)
1082     else if (optind < argc) // see if db schema name is given
1083     {
1084         if (fArgMode == 0)
1085         {
1086             //added the code as per BUG 4245
1087             if (!fPmFilePath.empty())
1088             {
1089                 fLocFile = fPmFilePath;
1090 
1091                 if (fLocFile != "STDIN")
1092                     throw(runtime_error("ERROR: In Mode 0, -f option can only have value STDIN"));
1093             }
1094             else
1095             {
1096                 fLocFile = argv[optind];
1097                 optind++;
1098             }
1099 
1100             if (optind < argc) //dest filename provided
1101             {
1102                 fPmFile = argv[optind];
1103 
1104                 if ((fPmFile.at(0) != '/') && (fS3Key.empty()))
1105                 {
1106                     std::string aTmp = fPmFile;
1107                     fPmFile = bulkRootPath + "/data/import/" + aTmp;
1108                 }
1109             }
1110             else // no dest filename
1111             {
1112                 if (fLocFile == "STDIN")
1113                     throw(runtime_error("ERROR: Destination file name required!!"));
1114 
1115                 if (fLocFile.at(0) == '/')	//local FQ-filename,parse out filename
1116                     fPmFile = getFileNameFromPath(fLocFile);
1117                 else
1118                     fPmFile = fLocFile;
1119 
1120                 if ((fPmFile.at(0) != '/') && (fS3Key.empty()))	//should be true all the time
1121                 {
1122                     std::string aTmp = fPmFile;
1123                     fPmFile = bulkRootPath + "/data/import/" + aTmp;
1124                 }
1125             }
1126 
1127             /* commented out for BUG 4245
1128             if(fPmFilePath.empty())
1129             	fLocFile = argv[optind];
1130             else
1131             	fLocFile = fPmFilePath +"/"+ argv[optind];
1132 
1133             if (fPmFile.empty()) //BUG 4200
1134             {
1135             	//if(fLocFile.at(0)== '/')
1136             	//	fPmFile = fLocFile;
1137             	//else
1138             	if(fLocFile.at(0)!='/')
1139             		fPmFile = bulkRootPath + "/data/import/"+ fLocFile;
1140             }
1141             else
1142             {
1143             	if(fPmFile.at(0)!='/')
1144             	{
1145             		std::string aTmp = fPmFile;
1146             		fPmFile = bulkRootPath + "/data/import/"+aTmp;
1147             	}
1148             }
1149             */
1150         }
1151         else
1152             fSchema = argv[optind]; // 1st pos parm
1153 
1154         optind++;
1155 
1156         if (optind < argc) // see if table name is given
1157         {
1158             fTable = argv[optind]; // 2nd pos parm
1159             optind++;
1160 
1161             if (optind < argc) // see if input file name is given
1162             {
1163                 // 3rd pos parm
1164                 fLocFile = argv[optind];
1165 
1166                 //BUG 4379 if -f option given we need to use that path,
1167                 //over riding bug 4231. look at the code below
1168                 //BUG 4231 - This bug over writes 4199 and commenting out changes
1169                 //BUG 4199
1170                 //Path not provided, not fully qualified, Look in import dir
1171                 //if((fLocFile.at(0)!= '/')&&(fLocFile != "STDIN"))
1172                 //{
1173                 //	std::string aTmp = fLocFile;
1174                 //	fLocFile = bulkRootPath + "/data/import/"+ aTmp;
1175                 //}
1176                 //BUG 4379 if -f option given we need to use that path
1177                 if ((fLocFile.at(0) != '/') && (fLocFile != "STDIN"))
1178                 {
1179                     std::string aTmp = fLocFile;
1180 
1181                     //if -f given? use that otherwise just go ahead with CWD
1182                     if ((!fPmFilePath.empty()) && (fMode == 1))
1183                         fLocFile = fPmFilePath + "/" + aTmp;
1184 
1185                     // TODO - if -f option is given and a list of files are
1186                     // are provided, we need to be able to import all that.
1187                 }
1188 
1189 
1190             }
1191             else
1192             {
1193                 if (fPmFile.length() > 0)
1194                 {
1195                     // BUG 4210
1196                     //if (fPmFilePath.length() > 0)
1197                     //{
1198                     //	fLocFile = fPmFilePath +"/"+ fPmFile;
1199                     //}
1200                     //else
1201                     if (fPmFilePath.empty())
1202                     {
1203                         //NOTE - un-commenting with an if statement for Mode 2
1204                         //BUG 4231 makes it comment out the below changes,
1205                         //This will not change even though directly, to be
1206                         //on safer side, we should take out this too.
1207                         //check path fully qualified? then set as data import
1208                         if (2 == fArgMode)
1209                         {
1210                             //BUG 4342
1211                             if ((fPmFile.at(0) != '/') && (fS3Key.empty()))
1212                             {
1213                                 std::string aTmp = fPmFile;
1214                                 fPmFile = PrepMode2ListOfFiles(aTmp);
1215                             }
1216                             else
1217                             {
1218                                 if (fPmFile.find_first_of(' ') != string::npos)
1219                                 {
1220                                     std::string aPmFiles = replaceCharInStr(fPmFile, ' ', ',');
1221                                     fPmFile = aPmFiles;
1222                                 }
1223                             }
1224                         }
1225 
1226                         fLocFile = fPmFile;
1227                     }
1228                 }
1229                 else
1230                 {
1231                     fLocFile = "STDIN";
1232                 }
1233 
1234                 //cout << "LocFile set as stdin" << endl;
1235             }
1236         }
1237         else
1238         {
1239             // If Mode is not 0 and table name is a required argument
1240             if (fArgMode != 0)
1241                 throw(runtime_error("No table name specified with schema."));
1242         }
1243 
1244     }
1245     else
1246     {
1247         // for testing we are allowing data from stdin even with Mode 0
1248         // that is without LocFileName
1249         if (0 == fArgMode)
1250         {
1251             fLocFile = "STDIN";	//cout << "LocFile set as stdin" << endl;
1252         }
1253         else
1254         {
1255             // If Mode 0, LocFileName is reqd and otherwies Schema is required
1256             throw (runtime_error("No schema or local filename specified."));
1257         }
1258     }
1259 
1260     /* check for all-or-nothing cmdline args to enable S3 import */
1261     int s3Tmp = (fS3Key.empty() ? 0 : 1) + (fS3Bucket.empty() ? 0 : 1) +
1262         (fS3Secret.empty() ? 0 : 1) + (fS3Region.empty() ? 0 : 1);
1263     if (s3Tmp != 0 && s3Tmp != 4)
1264         throw runtime_error("The access key, secret, bucket, and region are all required to import from S3");
1265 }
1266 
getJobFileName()1267 std::string WECmdArgs::getJobFileName()
1268 {
1269     std::ostringstream aSS;
1270     string aJobIdFileName;
1271 
1272     if (fJobId.length() > 0)
1273     {
1274         if (fJobPath.length() > 0)
1275             aSS << fJobPath;
1276         else
1277         {
1278             fJobPath = config::Config::makeConfig()->getConfig("WriteEngine",
1279                        "BulkRoot") + "/Job";
1280             aSS << fJobPath;
1281         }
1282 
1283         aSS << "/Job_" << fJobId << ".xml";
1284         aJobIdFileName = aSS.str();
1285     }
1286 
1287     return aJobIdFileName;
1288 }
1289 
getPmStatus(int Id)1290 bool WECmdArgs::getPmStatus(int Id)
1291 {
1292     // if no PMID's provided on cmdline, return true;
1293     if (0 == fPmVec.size()) return true;
1294 
1295     VecInts::iterator aIt = fPmVec.begin();
1296 
1297     while (aIt != fPmVec.end())
1298     {
1299         if (*aIt == static_cast<unsigned int>(Id))
1300             return true;
1301 
1302         ++aIt;
1303     }
1304 
1305     return false;
1306 }
1307 
1308 
1309 //------------------------------------------------------------------------------
1310 // It is a recursive call.
getBrmRptFileName()1311 std::string WECmdArgs::getBrmRptFileName()
1312 {
1313     if (!fBrmRptFile.empty())
1314         return fBrmRptFile;
1315 
1316     string brmRptFileName = getTmpFileDir();
1317 
1318     if (!brmRptFileName.empty())
1319     {
1320         fTmpFileDir = brmRptFileName;
1321         char aBuff[64];
1322         time_t aTime;
1323         struct tm pTm;
1324         time(&aTime);
1325         localtime_r(&aTime, &pTm);
1326 
1327         // BUG 4424
1328         //			M   D   H   M   S
1329         snprintf(aBuff, sizeof(aBuff), "/BrmRpt%02d%02d%02d%02d%02d%d.rpt",
1330                  pTm.tm_mon, pTm.tm_mday, pTm.tm_hour,
1331                  pTm.tm_min, pTm.tm_sec, getpid());
1332         brmRptFileName += aBuff;
1333     }
1334     else
1335     {
1336         //cout << "ERROR: Could not find TempFileDir in Columnstore.xml" << endl;
1337         throw (runtime_error("Could not find TempFileDir in Columnstore.xml"));
1338     }
1339 
1340     setBrmRptFileName(brmRptFileName);
1341 
1342     return brmRptFileName;
1343 
1344 }
1345 //------------------------------------------------------------------------------
1346 
addJobFilesToVector(std::string & JobName)1347 void WECmdArgs::addJobFilesToVector(std::string& JobName)
1348 {
1349     //if((!fSchema.empty())&&(!fTable.empty())&&(!fLocFile.empty())) return;
1350 
1351     WEXmlgetter aXmlGetter(JobName);
1352     vector<string> aSections;
1353     aSections.push_back("BulkJob");
1354     aSections.push_back("Schema");
1355     aSections.push_back("Table");
1356 
1357     //BUG 4163
1358     typedef std::vector<string> TableVec;
1359     TableVec aTableVec;
1360     aXmlGetter.getConfig(aSections[1], aSections[2], aTableVec);
1361     setMultiTableCount(aTableVec.size());
1362 
1363     if (getMultiTableCount() > 1)
1364     {
1365         splitConfigFilePerTable(JobName, aTableVec.size());
1366     }
1367     else
1368     {
1369         fVecJobFiles.push_back(JobName);
1370     }
1371 
1372 }
1373 
1374 //------------------------------------------------------------------------------
1375 // Set the schema, table, and loadfile name from the xml job file.
1376 // If running in binary mode, we also get the list of columns for the table,
1377 // so that we can determine the exact fixed record length of the incoming data.
1378 //------------------------------------------------------------------------------
setSchemaAndTableFromJobFile(std::string & JobName)1379 void WECmdArgs::setSchemaAndTableFromJobFile(std::string& JobName)
1380 {
1381     if (((fVecJobFiles.size() == 1) && (!fSchema.empty()) &&
1382             (!fTable.empty()) && (!fLocFile.empty()))  &&
1383             (fImportDataMode == IMPORT_DATA_TEXT)) return;
1384 
1385     WEXmlgetter aXmlGetter(JobName);
1386     vector<string> aSections;
1387     aSections.push_back("BulkJob");
1388     aSections.push_back("Schema");
1389     aSections.push_back("Table");
1390 
1391     // Reset the fSchema, fTable, and FLocFile
1392     if ((fVecJobFiles.size() > 1) ||
1393             (fSchema.empty()) || (fTable.empty()) || (fLocFile.empty()))
1394     {
1395         std::string aSchemaTable;
1396         std::string aInputFile;
1397 
1398         aSchemaTable = aXmlGetter.getAttribute(aSections, "tblName");
1399 
1400         if (getDebugLvl() > 1) cout << "schema.table = " << aSchemaTable << endl;
1401 
1402         aInputFile = aXmlGetter.getAttribute(aSections, "loadName");
1403 
1404         if (getDebugLvl() > 1) cout << "xml::InputFile = " << aInputFile << endl;
1405 
1406         if (aSchemaTable.length() > 0)
1407         {
1408             char aSchema[64];
1409             char aTable[64];
1410             int aRet = aSchemaTable.find('.');
1411 
1412             if (aRet > 0)
1413             {
1414                 int aLen = aSchemaTable.copy(aSchema, aRet);
1415 
1416                 if (getDebugLvl() > 1) cout << "Schema: " << aSchema << endl;
1417 
1418                 aSchema[aLen] = 0;
1419 
1420                 if (fSchema.empty()) fSchema = aSchema;
1421 
1422                 aLen = aSchemaTable.copy(aTable, aSchemaTable.length(), aRet + 1 );
1423                 aTable[aLen] = 0;
1424 
1425                 if (getDebugLvl() > 1) cout << "Table: " << aTable << endl;
1426 
1427                 fTable = aTable;
1428             }
1429             else
1430                 throw runtime_error(
1431                     "JobFile ERROR: Can't get Schema and Table Name");
1432         }
1433         else
1434         {
1435             throw runtime_error(
1436                 "JobFile ERROR: Can't get Schema and Table Name");
1437         }
1438 
1439         if ((fLocFile.empty()) && (!aInputFile.empty()))
1440         {
1441             string bulkRootPath = config::Config::makeConfig()->getConfig(
1442                                       "WriteEngine", "BulkRoot");
1443 
1444             if (aInputFile.at(0) == '/')
1445                 fLocFile = aInputFile;
1446             else if ((!fPmFilePath.empty()) && (fMode == 1))
1447                 fLocFile = fPmFilePath + "/" + aInputFile;
1448             else if ((!bulkRootPath.empty()) && (fPmFilePath.empty()))
1449                 fLocFile = bulkRootPath + "/data/import/" + aInputFile;
1450             else
1451                 fLocFile = aInputFile;
1452 
1453             if (fArgMode == 2) fPmFile = fLocFile;
1454         }
1455 
1456         if (getDebugLvl() > 1) cout << "schema = " << fSchema << endl;
1457 
1458         if (getDebugLvl() > 1) cout << "TableName = " << fTable << endl;
1459 
1460         if (getDebugLvl() > 1) cout << "Input File = " << fLocFile << endl;
1461     }
1462 
1463     // Reset the list of columns we will be importing from the input data
1464     fColFldsFromJobFile.clear();
1465 
1466     if (fImportDataMode != IMPORT_DATA_TEXT)
1467     {
1468         aSections.push_back("Column");
1469         aXmlGetter.getAttributeListForAllChildren(
1470             aSections, "colName", fColFldsFromJobFile);
1471     }
1472 }
1473 
1474 //------------------------------------------------------------------------------
checkJobIdCase()1475 void WECmdArgs::checkJobIdCase()
1476 {
1477     if ((fJobId.empty()) || (fJobLogOnly) || (fMode == 3) || (fMode == 0)) return;
1478 
1479     if (fJobPath.empty())
1480     {
1481         string bulkRootPath = config::Config::makeConfig()->getConfig(
1482                                   "WriteEngine", "BulkRoot");
1483         //cout << "checkJobIdCase::BulkRoot: " << bulkRootPath << endl;
1484 
1485         if (!bulkRootPath.empty())
1486             fJobPath = bulkRootPath + "/job";
1487         else
1488             throw runtime_error("Config Error: BulkRoot not found in Columnstore.xml");
1489     }
1490 
1491     char aBuff[256];
1492 
1493     if (!fJobPath.empty())
1494         snprintf(aBuff, sizeof(aBuff), "%s/Job_%s.xml", fJobPath.c_str(),
1495                  fJobId.c_str());
1496     else	// for time being
1497         snprintf(aBuff, sizeof(aBuff), "/var/log/mariadb/columnstore/data/bulk/job/Job_%s.xml",
1498                  fJobId.c_str());
1499 
1500     std::string aJobFileName(aBuff);
1501 
1502     //cout << "checkJobIdCase::aJobFileName: " << aJobFileName << endl;
1503 
1504 
1505     //BUG 4171
1506     addJobFilesToVector(aJobFileName);
1507 
1508     aJobFileName =  fVecJobFiles[0];
1509     setSchemaAndTableFromJobFile(aJobFileName);
1510     setEnclByAndEscCharFromJobFile(aJobFileName);
1511 
1512 }
1513 
1514 //------------------------------------------------------------------------------
1515 
getTmpFileDir()1516 std::string WECmdArgs::getTmpFileDir()
1517 {
1518     if (!fTmpFileDir.empty()) return fTmpFileDir;
1519 
1520     fTmpFileDir = startup::StartUp::tmpDir();
1521 
1522     if (fTmpFileDir.empty())
1523         throw( runtime_error("Config ERROR: TmpFileDir not found!!"));
1524     else
1525         return fTmpFileDir;
1526 }
1527 
1528 //------------------------------------------------------------------------------
1529 
getBulkRootDir()1530 std::string WECmdArgs::getBulkRootDir()
1531 {
1532     if (!fBulkRoot.empty()) return fBulkRoot;
1533 
1534     fBulkRoot = config::Config::makeConfig()->getConfig("WriteEngine",
1535                 "BulkRoot");
1536 
1537     if (fBulkRoot.empty())
1538         throw( runtime_error("Config ERROR: <BulkRoot> not found!!"));
1539     else
1540         return fBulkRoot;
1541 }
1542 
1543 //------------------------------------------------------------------------------
1544 
getBatchQuantity()1545 unsigned int WECmdArgs::getBatchQuantity()
1546 {
1547     return (fBatchQty >= 10000) ? fBatchQty : 10000;	//default Batch Qty is 10000
1548 }
1549 
1550 //------------------------------------------------------------------------------
1551 
setEnclByAndEscCharFromJobFile(std::string & JobName)1552 void WECmdArgs::setEnclByAndEscCharFromJobFile(std::string& JobName)
1553 {
1554     if ((fEnclosedChar == 0))	// check anything in Jobxml file
1555     {
1556         WEXmlgetter aXmlGetter(JobName);
1557         vector<string> aSections;
1558         aSections.push_back("BulkJob");
1559         aSections.push_back("EnclosedByChar");
1560 
1561         try
1562         {
1563             //std::string aTable = aXmlGetter.getConfig(aSection, aElement);
1564             std::string aEnclosedBy = aXmlGetter.getValue(aSections);
1565 
1566             if (getDebugLvl() > 1)cout << "aEncloseBy = " << aEnclosedBy << endl;
1567 
1568             if (!aEnclosedBy.empty())
1569             {
1570                 fEnclosedChar = aEnclosedBy.at(0);
1571             }
1572         }
1573         catch (std::runtime_error&)
1574         {
1575             // do not do anything
1576         }
1577     }
1578 
1579     if (fEscChar == 0)	// check anything in Jobxml file
1580     {
1581         WEXmlgetter aXmlGetter(JobName);
1582         vector<string> aSections;
1583         aSections.push_back("BulkJob");
1584         aSections.push_back("EscapeChar");
1585 
1586         try
1587         {
1588             //std::string aTable = aXmlGetter.getConfig(aSection, aElement);
1589             std::string aEscChar = aXmlGetter.getValue(aSections);
1590 
1591             if (getDebugLvl() > 1) cout << "aEscapeChar = " << aEscChar << endl;
1592 
1593             if (!aEscChar.empty())
1594             {
1595                 fEscChar = aEscChar.at(0);
1596             }
1597         }
1598         catch (std::runtime_error&)
1599         {
1600             // do not do anything
1601         }
1602     }
1603 
1604 }
1605 
1606 //------------------------------------------------------------------------------
getFileNameFromPath(const std::string & Path) const1607 std::string WECmdArgs::getFileNameFromPath(const std::string& Path) const
1608 {
1609     char aBuff[64];
1610     int iDx = Path.find_last_of('/');
1611     iDx++;		// compensate for the forward slash
1612     int aCx = Path.size() - iDx;
1613     Path.copy(aBuff, aCx, iDx);
1614     aBuff[aCx] = 0;
1615     return aBuff;
1616 }
1617 
1618 //------------------------------------------------------------------------------
getModuleID()1619 std::string WECmdArgs::getModuleID()
1620 {
1621     oam::Oam oam;
1622     oam::oamModuleInfo_t sModInfo;
1623     std::string sModuleID;
1624     char szModuleIDandPID[64];
1625     int nModuleNumber;
1626 
1627     try
1628     {
1629         sModInfo = oam.getModuleInfo();
1630         sModuleID = boost::get < 1 > (sModInfo);
1631         nModuleNumber = boost::get < 2 > (sModInfo);
1632         snprintf(szModuleIDandPID, sizeof(szModuleIDandPID), "%s%d-%d",
1633                  sModuleID.c_str(), nModuleNumber, getpid());
1634         sModuleID = szModuleIDandPID;
1635     }
1636     catch (exception&)
1637     {
1638         sModuleID = "unknown";
1639     }
1640 
1641     return sModuleID;
1642 }
1643 
1644 //------------------------------------------------------------------------------
1645 
1646 
splitConfigFilePerTable(std::string & ConfigName,int tblCount)1647 void WECmdArgs::splitConfigFilePerTable(std::string& ConfigName, int tblCount)
1648 {
1649     std::string aOpenTag = "<Table ";
1650     std::string aCloseTag = "</Table>";
1651     std::string aCloseSchemaTag = "</Schema>";
1652 
1653     std::vector<std::ofstream*> aVecFiles;
1654 
1655     //std::vector<std::string> aVecConfigs;
1656     for (int aIdx = 1; aIdx <= tblCount; aIdx++)
1657     {
1658         char aConfName[128];
1659         snprintf(aConfName, sizeof(aConfName), "%s_%d.xml", ConfigName.c_str(), aIdx);
1660         //aVecConfigs.push_back(aConfName);
1661         fVecJobFiles.push_back(aConfName);
1662         std::ofstream* pCopy = new std::ofstream;
1663         //pCopy->open(aConfName, std::ios_base::app);
1664         pCopy->open(aConfName);
1665         aVecFiles.push_back(pCopy);
1666     }
1667 
1668 
1669     std::ifstream aMaster;
1670     aMaster.open(ConfigName.c_str());
1671 
1672     if (aMaster.is_open())
1673     {
1674         char aBuff[256];
1675         int aTblNo = 0;
1676         size_t aStrPos = std::string::npos;
1677         bool aOpenFound = false;
1678         bool aCloseFound = false;
1679 
1680         while (!aMaster.eof())
1681         {
1682             aMaster.getline(aBuff, sizeof(aBuff) - 1);
1683             unsigned int aLen = aMaster.gcount();
1684 
1685             if ((aLen < (sizeof(aBuff) - 2)) && (aLen > 0))
1686             {
1687                 aBuff[aLen - 1] = '\n';
1688                 aBuff[aLen] = 0;
1689                 string aData = aBuff;
1690                 //cout << "Data Read " << aBuff;
1691 
1692                 if (!aOpenFound)
1693                 {
1694                     aStrPos = aData.find(aOpenTag);
1695 
1696                     if (aStrPos != std::string::npos)
1697                     {
1698                         aOpenFound = true;
1699                         aTblNo++;
1700                         write2ConfigFiles(aVecFiles, aBuff, aTblNo);
1701                     }
1702                     else
1703                     {
1704                         if ((!aOpenFound) && (aCloseFound))
1705                         {
1706                             aStrPos = aData.find(aCloseSchemaTag);
1707 
1708                             if (aStrPos != std::string::npos)
1709                             {
1710                                 aOpenFound = false;
1711                                 aCloseFound = false;
1712                                 aTblNo = 0;
1713                             }
1714                         }
1715 
1716                         write2ConfigFiles(aVecFiles, aBuff, aTblNo);
1717                     }
1718                 }
1719                 else
1720                 {
1721                     aStrPos = aData.find(aCloseTag);
1722 
1723                     if (aStrPos != std::string::npos)
1724                     {
1725                         aOpenFound = false;
1726                         aCloseFound = true;
1727                         write2ConfigFiles(aVecFiles, aBuff, aTblNo);
1728                     }
1729                     else
1730                     {
1731                         write2ConfigFiles(aVecFiles, aBuff, aTblNo);
1732                     }
1733                 }
1734             }
1735         }//while Master.eof
1736     }
1737     else
1738     {
1739         throw runtime_error("Could not open Job Config file");
1740     }
1741 
1742 
1743     for (unsigned int Idx = 0; Idx < aVecFiles.size(); Idx++)
1744     {
1745         aVecFiles[Idx]->close();
1746         delete aVecFiles[Idx];
1747     }
1748 
1749     aVecFiles.clear();
1750 
1751 }
1752 
1753 //------------------------------------------------------------------------------
1754 
write2ConfigFiles(std::vector<std::ofstream * > & Files,char * pBuff,int FileIdx)1755 void WECmdArgs::write2ConfigFiles(std::vector<std::ofstream*>& Files,
1756                                   char* pBuff, int FileIdx)
1757 {
1758 
1759     if (FileIdx == 0)
1760     {
1761         std::vector<std::ofstream*>::iterator aIt = Files.begin();
1762 
1763         while (aIt != Files.end())
1764         {
1765             std::ofstream* pCopy = (*aIt);
1766             pCopy->write(pBuff, strlen(pBuff));
1767             ++aIt;
1768         }
1769     }
1770     else
1771     {
1772         Files[FileIdx - 1]->write(pBuff, strlen(pBuff));
1773     }
1774 }
1775 
1776 //------------------------------------------------------------------------------
1777 
updateWithJobFile(int Idx)1778 void WECmdArgs::updateWithJobFile(int Idx)
1779 {
1780     setLocFile("");	// resetting the from the previous import
1781     std::string aJobFileName =  fVecJobFiles[Idx];
1782     setSchemaAndTableFromJobFile(aJobFileName);
1783     setEnclByAndEscCharFromJobFile(aJobFileName);
1784     setJobFileName(aJobFileName);
1785 
1786     std::ostringstream aSS;
1787     aSS << fOrigJobId << ".xml_" << (Idx + 1);
1788     fJobId = aSS.str();
1789 }
1790 
1791 
1792 //------------------------------------------------------------------------------
1793 
replaceCharInStr(const std::string & Str,char C,char R)1794 std::string WECmdArgs::replaceCharInStr(const std::string& Str, char C, char R)
1795 {
1796     std::stringstream aSs;
1797 
1798     size_t start = 0, end = 0;
1799     end = Str.find_first_of(C);
1800 
1801     do
1802     {
1803         if (end != string::npos)
1804         {
1805             aSs << Str.substr(start, end - start) << R;
1806             start = end + 1;
1807         }
1808         else
1809         {
1810             aSs << Str.substr(start, end - start);
1811             break;
1812         }
1813 
1814         end = Str.find_first_of(C, start);
1815     }
1816     while (start != end);
1817 
1818     return aSs.str();
1819 }
1820 
1821 //------------------------------------------------------------------------------
1822 // Introduced to handle Bug 4342 with Mode 2
1823 
PrepMode2ListOfFiles(std::string & FileName)1824 std::string WECmdArgs::PrepMode2ListOfFiles(std::string& FileName)
1825 {
1826     VecArgs aInfileList;
1827     std::string bulkRootPath = getBulkRootDir();
1828     //cout << "Inside PrepMode2ListOfFiles("<< FileName << ")" << endl;
1829     std::string aFileName = FileName;
1830 
1831     istringstream iss(aFileName);
1832     size_t start = 0, end = 0;
1833     const char* sep = " ,|";
1834 
1835     end = aFileName.find_first_of(sep);
1836 
1837     do
1838     {
1839         if (end != string::npos)
1840         {
1841             std::string aFile = aFileName.substr(start, end - start);
1842 
1843             if (getDebugLvl() > 1)
1844                 cout << "File: " << aFileName.substr(start, end - start) << endl;
1845 
1846             start = end + 1;
1847             aInfileList.push_back(aFile);
1848         }
1849         else
1850         {
1851             std::string aFile = aFileName.substr(start, end - start);
1852 
1853             if (getDebugLvl() > 1)
1854                 cout << "Next Input File " << aFileName.substr(start, end - start) << endl;
1855 
1856             aInfileList.push_back(aFile);
1857             break;
1858         }
1859 
1860         end = aFileName.find_first_of(sep, start);
1861     }
1862     while (start != end);
1863 
1864     std::ostringstream aSS;
1865     int aVecSize = aInfileList.size();
1866     int aVecIdx = 0;
1867 
1868     // Take file list one by one and append it to one string
1869     while (aVecIdx < aVecSize)
1870     {
1871         std::string aNextFile = aInfileList[aVecIdx];
1872         aVecIdx++;
1873 
1874         //aInfileList.pop_front();
1875         if (aNextFile.at(0) != '/')
1876         {
1877             aSS << bulkRootPath << "/data/import/" + aNextFile;
1878         }
1879         else
1880         {
1881             aSS << aNextFile;
1882         }
1883 
1884         if (aVecIdx < aVecSize) aSS << ",";
1885     }
1886 
1887     //cout << "File list are = " << aSS.str() << endl;
1888 
1889     return aSS.str();
1890 }
1891 
1892 //------------------------------------------------------------------------------
1893 // Get set of column names in the "current" table being processed from the
1894 // Job xml file.
1895 //------------------------------------------------------------------------------
getColumnList(std::set<std::string> & columnList) const1896 void WECmdArgs::getColumnList( std::set<std::string>& columnList ) const
1897 {
1898     columnList.clear();
1899 
1900     for (unsigned k = 0; k < fColFldsFromJobFile.size(); k++)
1901     {
1902         columnList.insert( fColFldsFromJobFile[k] );
1903     }
1904 }
1905 
1906 //-----------------------------------------------------------------------------
1907 // check for the bulkload log directory. If it is not existing, create it
1908 // w.r.t Bug 6137
1909 //-----------------------------------------------------------------------------
1910 
checkForBulkLogDir(const std::string & BulkRoot)1911 void WECmdArgs::checkForBulkLogDir(const std::string& BulkRoot)
1912 {
1913     if ( !boost::filesystem::exists(BulkRoot.c_str()) )
1914     {
1915         cout << "Creating directory : " << BulkRoot << endl;
1916         boost::filesystem::create_directories(BulkRoot.c_str());
1917     }
1918 
1919     if ( boost::filesystem::exists(BulkRoot.c_str()) )
1920     {
1921         // create the job directory also if not existing
1922         std::ostringstream aSS;
1923         aSS << BulkRoot;
1924         aSS << "/job";
1925         std::string jobDir = aSS.str();
1926 
1927         if ( !boost::filesystem::exists(jobDir.c_str()) )
1928         {
1929             cout << "Creating directory : " << jobDir << endl;
1930             bool aSuccess = boost::filesystem::create_directories(jobDir.c_str());
1931 
1932             if (!aSuccess)
1933             {
1934                 cout << "\nFailed to create job directory, check permissions\n" << endl;
1935                 throw runtime_error("Failed to create job directory, check permissions");
1936             }
1937         }
1938     }
1939     else
1940     {
1941         cout << "\nFailed to create bulk directory, check permissions\n" << endl;
1942         throw runtime_error("Failed to create bulk directory, check permissions");
1943     }
1944 }
1945 
1946 
1947 } /* namespace WriteEngine */
1948