1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 #include <unistd.h>
20 #include <cstdlib>
21 #include <cstdio>
22 #include <cstring>
23 #include <ctime>
24
25 #include <vector>
26 #include <string>
27 #include <sstream>
28 #include <iostream>
29 #include <exception>
30 #include <stdexcept>
31 #include <cerrno>
32 using namespace std;
33
34 #include <boost/uuid/uuid.hpp>
35 #include <boost/uuid/uuid_generators.hpp>
36 #include <boost/uuid/uuid_io.hpp>
37 #include <boost/filesystem.hpp>
38
39 #include "dataconvert.h"
40 #include "liboamcpp.h"
41 using namespace oam;
42
43 #include "we_cmdargs.h"
44
45 #include "installdir.h"
46 #include "mcsconfig.h"
47
48 namespace WriteEngine
49 {
50
51 //----------------------------------------------------------------------
52 //----------------------------------------------------------------------
WECmdArgs(int argc,char ** argv)53 WECmdArgs::WECmdArgs(int argc, char** argv) :
54 fMultiTableCount(0),
55 fJobLogOnly(false),
56 fHelp(false),
57 fMode(1),
58 fArgMode(-1),
59 fQuiteMode(true),
60 fConsoleLog(false),
61 fVerbose(0),
62 fBatchQty(10000),
63 fNoOfReadThrds(0),
64 fDebugLvl(0),
65 fMaxErrors(-1),
66 fReadBufSize(0),
67 fIOReadBufSize(0),
68 fSetBufSize(0),
69 fColDelim('|'),
70 fEnclosedChar(0),
71 fEscChar(0),
72 fNoOfWriteThrds(0),
73 fNullStrMode(false),
74 fImportDataMode(IMPORT_DATA_TEXT),
75 fCpiInvoke(false),
76 fBlockMode3(false),
77 fbTruncationAsError(false),
78 fUUID(boost::uuids::nil_generator()()),
79 fConsoleOutput(true),
80 fTimeZone("SYSTEM"),
81 fErrorDir(string(MCSLOGDIR)+"/cpimport/")
82 {
83 try
84 {
85 appTestFunction();
86 parseCmdLineArgs(argc, argv);
87 }
88 catch (std::exception& exp)
89 {
90 std::string exceptMsg( exp.what() );
91 exceptMsg += "\nTry 'cpimport -h' for more information.";
92 throw (runtime_error( exceptMsg ));
93 }
94 }
95
96 //----------------------------------------------------------------------
97
appTestFunction()98 void WECmdArgs::appTestFunction()
99 {
100
101 // testing begins
102 //std::string aJobFile("/home/bpaul/Calpont/data/bulk/job/Job_1234.xml");
103 //std::string aJobFile("/usr/local/mariadb/columnstore/data/bulk/job/Job_1234.xml");
104 //setSchemaAndTableFromJobFile(aJobFile);
105 //setEnclByAndEscCharFromJobFile(aJobFile);
106 //exit(1);
107
108 //testing ends
109 return;
110 }
111
112 //----------------------------------------------------------------------
getCpImportCmdLine()113 std::string WECmdArgs::getCpImportCmdLine()
114 {
115 std::ostringstream aSS;
116 std::string aCmdLine;
117
118
119 aSS << fPrgmName; //prgm name as arg[0]
120
121 if (fHelp)
122 {
123 aSS << " -h ";
124 aCmdLine = aSS.str();
125 return aCmdLine;
126 }
127
128 //checkJobIdCase(); // check if JobID
129
130
131 if ((fPmFile.length() > 0) && (0 == getMode()))
132 aSS << " -l " << fPmFile;
133
134 //BUG 4379 if -m is not given while prep args, default is m=1 but
135 //on single node -m will set it to 3, after prep args.
136 //if((fPmFilePath.length()>0)&&(1!=getMode()))
137 // aSS << " -f " << fPmFilePath;
138 if ((fPmFilePath.length() > 0) && (1 != getMode()))
139 {
140 if (fPmFilePath == "STDIN") //if stdin, must pass it
141 aSS << " -f " << fPmFilePath;
142 else if (2 == getArgMode()) //Mode 2 we need to pass the -f option
143 aSS << " -f " << fPmFilePath;
144 else if (3 == getArgMode()) //-m given, -f built in already.
145 aSS << " -f " << fPmFilePath;
146 else if (0 == fLocFile.length()) //No filename given, from job file
147 aSS << " -f " << fPmFilePath;
148 }
149
150 if (fErrorDir.length() > 0)
151 aSS << " -L " << fErrorDir;
152
153 if (fUsername.length() > 0)
154 aSS << " -U " << fUsername;
155
156 if (fJobId.length() > 0)
157 aSS << " -j " << fJobId;
158
159 if (fNoOfReadThrds > 0)
160 aSS << " -r " << fNoOfReadThrds;
161
162 if (fNoOfWriteThrds > 0)
163 aSS << " -w " << fNoOfWriteThrds;
164
165 if (fMaxErrors >= 0)
166 aSS << " -e " << fMaxErrors;
167
168 // BUG 5088
169 if (fDebugLvl > 0)
170 aSS << " -d " << fDebugLvl;
171
172 if (fSetBufSize > 0)
173 aSS << " -B " << fSetBufSize;
174
175 if (fColDelim != '|')
176 {
177 if (fColDelim == '\t')
178 aSS << " -s " << "\\t"; //- working with user '\t'
179 // NONE of the following will work.
180 //aSS << " -s " << "\t"; //aSS << " -s " << "\"\\t\""; //aSS << " -s " << "'\\t'";
181 else
182 aSS << " -s " << fColDelim;
183 }
184
185 if (fEnclosedChar != 0)
186 aSS << " -E " << fEnclosedChar;
187
188 if (fEscChar != 0)
189 aSS << " -C " << fEscChar;
190
191 if (fNullStrMode)
192 aSS << " -n " << '1';
193
194 if (fImportDataMode != IMPORT_DATA_TEXT)
195 aSS << " -I " << fImportDataMode;
196
197 //if(fConfig.length()>0)
198 // aSS << " -c " << fConfig;
199 if (fReadBufSize > 0)
200 {
201 cout << "setting fReadBufSize = " << fReadBufSize;
202 aSS << " -c " << fReadBufSize;
203 }
204
205 if (fIOReadBufSize > 0)
206 aSS << " -b " << fIOReadBufSize;
207
208
209 if ((fJobPath.length() > 0) && (fMode == 3))
210 aSS << " -p " << fJobPath;
211
212
213 if (fConsoleLog)
214 aSS << " -i ";
215
216 if ((fMode == 1) || (fMode == 2))
217 {
218 aSS << " -R " << getBrmRptFileName();
219 aSS << " -m " << fMode;
220 }
221
222 aSS << " -P " << getModuleID();
223 aSS << " -T " << fTimeZone;
224
225 if (fbTruncationAsError)
226 aSS << " -S ";
227
228 if (!fS3Key.empty() && !(fMode == 0 || fMode == 1))
229 {
230 if (fS3Secret.empty() || fS3Bucket.empty() || fS3Region.empty())
231 throw (runtime_error("Not all required S3 options provided"));
232 aSS << " -y " << fS3Key;
233 aSS << " -K " << fS3Secret;
234 aSS << " -t " << fS3Bucket;
235 aSS << " -g " << fS3Region;
236
237 if (!fS3Host.empty())
238 {
239 aSS << " -H " << fS3Host;
240 }
241 }
242
243 if ((fJobId.length() > 0) && (fMode == 1) && (!fJobLogOnly))
244 {
245 // if JobPath provided, make it w.r.t WES
246 aSS << " -p " << fTmpFileDir;
247 aSS << " -fSTDIN";
248 }
249 else if ((fJobId.length() > 0) && (fMode == 2) && (!fJobLogOnly))
250 {
251 // if JobPath provided, make it w.r.t WES
252 aSS << " -p " << fTmpFileDir;
253
254 if (fPmFile.length() > 0)
255 aSS << " -l " << fPmFile;
256
257 if (fPmFilePath.length() > 0)
258 aSS << " -f " << fPmFilePath;
259 }
260 else // do not provide schema & table with JobId
261 {
262
263 if (!fUUID.is_nil())
264 aSS << " -u" << boost::uuids::to_string(fUUID);
265
266 if (fSchema.length() > 0)
267 aSS << " " << fSchema;
268 //else if((fMode != 0)||(fMode==3)) //TODO make it mode3 + jobID
269 else if (fJobId.length() > 0)
270 { }// may or may not contain Schema.
271 //else if((fMode == 1)||(fMode==2)) //TODO make it mode3 + jobID
272 else if (fMode != 0)
273 throw (runtime_error("Schema not available"));
274
275 if (fTable.length() > 0)
276 aSS << " " << fTable;
277 else if (fJobId.length() > 0)
278 {} //may or may not contain Table.
279 else if (fMode != 0)
280 throw (runtime_error("Tablename not available"));
281
282 //else if((fMode != 0)||(fMode==3)) //TODO make it mode3 + jobID
283 //else if((fMode == 1)||(fMode == 2)) //TODO make it mode3 + jobID
284 // throw (runtime_error("Tablename not available"));
285 if ((fPmFile.length() > 0) && (2 == getMode()))
286 {
287 //if(fPmFilePath.length()>0)
288 // aSS << " " << fPmFilePath;
289 aSS << " " << fPmFile;
290 }
291 else if (2 == getMode())
292 throw (runtime_error("loadFile [-l ] not available"));
293
294 }
295
296 if ((fLocFile.length() > 0) && (fLocFile != "STDIN") && (3 == getMode()))
297 {
298 //Bug 4342 multi-files mode 3 support
299 //convert all the spaces into 'commas'
300 if (fLocFile.find_first_of(' ') == string::npos)
301 aSS << " " << fLocFile;
302 else
303 {
304 std::string aLocFiles = replaceCharInStr(fLocFile, ' ', ',');
305 aSS << " " << aLocFiles;
306 }
307 }
308
309 try
310 {
311 aCmdLine = aSS.str();
312 }
313 catch (exception&)
314 {
315 throw runtime_error("getcpImportCmdLine failed");
316 }
317
318 return aCmdLine;
319 }
320
321
322 //----------------------------------------------------------------------
323
324 //BUG 4205 (part FIX) - need to implement more into it
checkForCornerCases()325 bool WECmdArgs::checkForCornerCases()
326 {
327 //BUG 4210
328 this->checkJobIdCase(); //Need to do this before we go further
329
330
331 if (fMode == 0)
332 {
333 if (!fJobId.empty())
334 {
335 //cout << "Invalid option mode 0 with a Job File" << endl;
336 throw (runtime_error("Mode 0 with a Job file option is not valid!!"
337 "\nTry 'cpimport -h' for more information."));
338 }
339 else if (!fJobPath.empty())
340 {
341 cout << "Invalid option mode 0 with a Job Path" << endl;
342 throw (runtime_error("Mismatched options"
343 "\nTry 'cpimport -h' for more information."));
344 }
345 else if (!fSchema.empty())
346 {
347 cout << "Invalid option in mode 0 with a schema name" << endl;
348 throw (runtime_error("Mismatched options."));
349 }
350 else if (!fTable.empty())
351 {
352 cout << "Invalid option in mode 0 with a table name" << endl;
353 throw (runtime_error("Mismatched options."));
354 }
355 else if ((!fPmFilePath.empty()) && (fPmFilePath != "STDIN"))
356 {
357 cout << "Invalid option -f in Mode 0 with value other than STDIN" << endl;
358 throw (runtime_error("Mismatched options."));
359 }
360
361 if (fSetBufSize)
362 {
363 cout << "Invalid option -B with Mode 0" << endl;
364 throw (runtime_error("Mismatched options."));
365 }
366 else if (fIOReadBufSize)
367 {
368 cout << "Invalid option -b with Mode 0" << endl;
369 throw (runtime_error("Mismatched options."));
370 }
371 else if (fMaxErrors >= 0)
372 {
373 cout << "Invalid option -e with Mode 0" << endl;
374 throw (runtime_error("Mismatched options."));
375 }
376 else if (fConsoleLog)
377 {
378 cout << "Invalid option -i with Mode 0" << endl;
379 throw (runtime_error("Mismatched options."));
380 }
381 else if (fReadBufSize)
382 {
383 cout << "Invalid option -c with Mode 0" << endl;
384 throw (runtime_error("Mismatched options."));
385 }
386 else if (fNoOfReadThrds)
387 {
388 cout << "Invalid option -r with Mode 0" << endl;
389 throw (runtime_error("Mismatched options."));
390 }
391 else if (fNoOfWriteThrds)
392 {
393 cout << "Invalid option -w with Mode 0" << endl;
394 throw (runtime_error("Mismatched options."));
395 }
396
397 if (fImportDataMode != IMPORT_DATA_TEXT)
398 {
399 cout << "Invalid option -I with Mode 0" << endl;
400 throw (runtime_error("Mismatched options."));
401 }
402
403 }
404
405 if (fMode == 1)
406 {
407 if (!fJobId.empty())
408 {
409 if ((!fPmFilePath.empty()) && (fPmFilePath == "STDIN"))
410 {
411 // do not do anything.. this is good.
412 }
413 }
414 // Mode 1, if Input Path is existing and input file is not existing
415 // it is an error, bce it assumes all the files in directory.
416 // In mode 2, we are passing info to cpimport.bin, which will take care
417 // of it, as in Mode 3.
418 else if ((!fPmFilePath.empty()) && (fPmFile.empty()))
419 {
420 // assumed since position param is missing
421 if ((fLocFile == "STDIN") && (fPmFilePath != "STDIN"))
422 {
423 cout << "Invalid options in Mode 1 : option -l " << endl;
424 cout << " or input file position parameter needed" << endl;
425 //cout << "\tOption (-j) should follow with option -l option or "
426 // "an input file position parameter" << endl;
427 throw (runtime_error("In Mode 1 Error occurred!! "
428 "\nTry 'cpimport -h' for more information."));
429 }
430 }
431 }
432
433 if (fMode == 2)
434 {
435 if (fPmFile.empty())
436 throw(runtime_error("Mode 2 require PM based filename [-l]"
437 "\nTry 'cpimport -h' for more information."));
438
439 if ((fPmFilePath.empty()) && (fPmFile.at(0) != '/'))
440 throw(runtime_error("Mode 2 require remote file opts -f and -l or "\
441 "a fully qualified path for the remote file."
442 "\nTry 'cpimport -h' for more information."));
443 if (!fS3Key.empty())
444 throw(runtime_error("Mode 2 & an input file from S3 does not make sense."));
445 }
446
447 if (fMode == 3)
448 {
449 if (fPmVec.size())
450 {
451 cout << "Invalid option -P with Mode 3" << endl;
452 throw (runtime_error("Mismatched options."));
453 }
454
455 }
456
457 return true;
458 }
459
460 //----------------------------------------------------------------------
461
str2PmList(std::string & PmList,VecInts & V)462 bool WECmdArgs::str2PmList(std::string& PmList, VecInts& V)
463 {
464 const int BUFLEN = 512;
465 char aBuff[BUFLEN];
466
467
468 int aLen = PmList.length();
469
470 if (aLen > 0)
471 {
472 strncpy(aBuff, PmList.c_str(), BUFLEN);
473 aBuff[BUFLEN - 1] = 0;
474 }
475 else
476 return false;
477
478 char* pTok = strtok(aBuff, ",");
479
480 while (pTok != NULL)
481 {
482 int aPmId = 0;
483
484 try
485 {
486 aPmId = atoi(pTok);
487 V.push_back(aPmId);
488 }
489 catch (exception& ex)
490 {
491 std::stringstream aErr;
492 aErr << "Wrong PM id format : " << ex.what();
493 //cout << "Wrong PM id format : "<< ex.what() << endl;
494 throw (runtime_error(aErr.str()));
495 }
496
497 pTok = strtok(NULL, ",");
498 }
499
500 return true;
501 }
502
503 //----------------------------------------------------------------------
504
usage()505 void WECmdArgs::usage()
506 {
507 cout << "Simple usage using positional parameters (no XML job file):\n";
508 cout << "\tcpimport dbName tblName [loadFile] [-h] [-m mode]\n";
509 cout << "\t\t [-f path] [-d debugLevel] [-c readbufSize] [-b readBufs] \n";
510 cout << "\t\t [-r readers] [-j JobID] [-e maxErrs] [-B libBufSize] [-w parsers]\n";
511 cout << "\t\t [-s c] [-E enclosedChar] [-C escapeChar] [-n NullOption]\n";
512 cout << "\t\t [-q batchQty] [-p jobPath] [-P list of PMs] [-S] [-i] [-v verbose]\n";
513 cout << "\t\t [-I binaryOpt] [-T timeZone]\n";
514
515
516 cout << "Traditional usage without positional parameters (XML job file required):\n";
517 cout << "\tcpimport -j jobID\n";
518 cout << "\t\t [-h] [-m mode] [-r readers] [-w parsers] [-s c] [-f path]\n";
519 cout << "\t\t [-b readBufs] [-p path] [-c readBufSize] [-e maxErrs] [-B libBufSize]\n";
520 cout << "\t\t [-n NullOption] [-E encloseChar] [-C escapeChar] [-i] [-v verbose]\n";
521 cout << "\t\t [-d debugLevel] [-q batchQty] [-l loadFile] [-P list of PMs] [-S]\n";
522 cout << "\t\t [-I binaryOpt] [-T timeZone]\n";
523
524 cout << "\n\nPositional parameters:\n";
525 cout << "\tdbName Name of the database to load\n";
526 cout << "\ttblName Name of table to load\n";
527 cout << "\tloadFile Optional input file name in current directory,\n";
528 cout << "\t\t\tunless a fully qualified name is given.\n";
529 cout << "\t\t\tIf not given, input read from STDIN.\n";
530
531 cout << "\n\nOptions:\n"
532 << "\t-b\tNumber of read buffers\n"
533 << "\t-c\tApplication read buffer size(in bytes)\n"
534 << "\t-d\tPrint different level(1-3) debug message\n"
535 << "\t-e\tMax number of allowable error per table per PM\n"
536 << "\t-f\tData file directory path.\n"
537 << "\t\t\tDefault is current working directory.\n"
538 << "\t\t\tIn Mode 1, -f represents the local input file path.\n"
539 << "\t\t\tIn Mode 2, -f represents the PM based input file path.\n"
540 << "\t\t\tIn Mode 3, -f represents the local input file path.\n"
541 << "\t-l\tName of import file to be loaded, relative to -f path,\n"
542 << "\t-h\tPrint this message.\n"
543 << "\t-q\tBatch Quantity, Number of rows distributed per batch in Mode 1\n"
544 << "\t-i\tPrint extended info to console in Mode 3.\n"
545 << "\t-j\tJob ID. In simple usage, default is the table OID.\n"
546 << "\t\t\tunless a fully qualified input file name is given.\n"
547 << "\t-n\tNullOption (0-treat the string NULL as data (default);\n"
548 << "\t\t\t1-treat the string NULL as a NULL value)\n"
549 << "\t-p\tPath for XML job description file.\n"
550 << "\t-r\tNumber of readers.\n"
551 << "\t-s\t'c' is the delimiter between column values.\n"
552 << "\t-B\tI/O library read buffer size (in bytes)\n"
553 << "\t-w\tNumber of parsers.\n"
554 << "\t-E\tEnclosed by character if field values are enclosed.\n"
555 << "\t-C\tEscape character used in conjunction with 'enclosed by'\n"
556 << "\t\t\tcharacter, or as part of NULL escape sequence ('\\N');\n"
557 << "\t\t\tdefault is '\\'\n"
558 << "\t-I\tImport binary data; how to treat NULL values:\n"
559 << "\t\t\t1 - import NULL values\n"
560 << "\t\t\t2 - saturate NULL values\n"
561 << "\t-P\tList of PMs ex: -P 1,2,3. Default is all PMs.\n"
562 << "\t-S\tTreat string truncations as errors.\n"
563 << "\t-m\tmode\n"
564 << "\t\t\t1 - rows will be loaded in a distributed manner across PMs.\n"
565 << "\t\t\t2 - PM based input files loaded onto their respective PM.\n"
566 << "\t\t\t3 - input files will be loaded on the local PM.\n"
567 << "\t-T\tTimezone used for TIMESTAMP datatype.\n"
568 << "\t\tPossible values: \"SYSTEM\" (default)\n"
569 << "\t\t : Offset in the form +/-HH:MM\n"
570 << "\t-y\tS3 Authentication Key (for S3 imports)\n"
571 << "\t-K\tS3 Authentication Secret (for S3 imports)\n"
572 << "\t-t\tS3 Bucket (for S3 imports)\n"
573 << "\t-H\tS3 Hostname (for S3 imports, Amazon's S3 default)\n"
574 << "\t-g\tS3 Region (for S3 imports)\n"
575 << "\t-L\tDirectory for the output .err and .bad files.\n"
576 << "\t\tDefault is " << string(MCSLOGDIR);
577
578
579 cout << "\nExample1: Traditional usage\n"
580 << "\tcpimport -j 1234";
581 cout << "\nExample2: Some column values are enclosed within double quotes.\n"
582 << "\tcpimport -j 3000 -E '\"'";
583 cout << "\nExample3: Import a nation table without a Job XML file\n"
584 << "\tcpimport -j 301 tpch nation nation.tbl";
585 cout << "\nExample4: Import a nation table to all PMs in Mode 1\n"
586 << "\tcpimport -m 1 tpch nation nation.tbl";
587 cout << "\nExample4: Import a nation table to only PM1 and PM2 in Mode 1\n"
588 << "\tcpimport -m 1 -P 1,2 tpch nation nation.tbl";
589 cout << "\nExample5: Import nation.tbl from PMs to nation table in Mode 2\n"
590 << "\tcpimport -m 2 tpch nation -f /var/lib/columnstore/data/bulk/data/import/ -l nation.tbl";
591 cout << "\nExample6: Import nation.tbl in mode 3\n"
592 << "\tcpimport -m 3 tpch nation nation.tbl\n\n";
593
594
595 exit(1);
596 }
597
598
599 //-----------------------------------------------------------------------------
600
parseCmdLineArgs(int argc,char ** argv)601 void WECmdArgs::parseCmdLineArgs(int argc, char** argv)
602 {
603 int aCh;
604 std::string importPath;
605 bool aJobType = false;
606
607
608 if (argc > 0)
609 fPrgmName = string(MCSBINDIR) + "/" + "cpimport.bin"; //argv[0] is splitter but we need cpimport
610
611 while ((aCh = getopt(argc, argv,
612 "d:j:w:s:v:l:r:b:e:B:f:q:ihm:E:C:P:I:n:p:c:ST:Ny:K:t:H:g:U:L:"))
613 != EOF)
614 {
615 switch (aCh)
616 {
617 case 'm':
618 {
619 fArgMode = atoi(optarg);
620
621 //cout << "Mode level set to " << fMode << endl;
622 if ((fArgMode > -1) && (fArgMode <= 3)) {}
623 else
624 throw runtime_error("Wrong Mode level");
625
626 break;
627 }
628
629 case 'B':
630 {
631 errno = 0;
632 long lValue = strtol(optarg, 0, 10);
633
634 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
635 throw runtime_error("Option -B is invalid or out of range");
636
637 fSetBufSize = lValue;
638 break;
639 }
640
641 case 'b':
642 {
643 errno = 0;
644 long lValue = strtol(optarg, 0, 10);
645
646 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
647 throw runtime_error("Option -b is invalid or out of range");
648
649 fIOReadBufSize = lValue;
650 break;
651 }
652
653 case 'e':
654 {
655 errno = 0;
656 long lValue = strtol(optarg, 0, 10);
657
658 if ((errno != 0) || (lValue < 0) || (lValue > INT_MAX))
659 throw runtime_error("Option -e is invalid or out of range");
660
661 fMaxErrors = lValue;
662 break;
663 }
664
665 case 'i':
666 {
667 fConsoleLog = true;
668 break;
669 }
670
671 case 'c':
672 {
673 errno = 0;
674 long lValue = strtol(optarg, 0, 10);
675
676 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
677 throw runtime_error("Option -c is invalid or out of range");
678
679 fReadBufSize = lValue;
680 break;
681 }
682
683 case 'j': // -j: jobID
684 {
685 errno = 0;
686 long lValue = strtol(optarg, 0, 10);
687
688 if ((errno != 0) || (lValue < 0) || (lValue > INT_MAX))
689 throw runtime_error("Option -j is invalid or out of range");
690
691 fJobId = optarg;
692 fOrigJobId = fJobId; // in case if we need to split it.
693
694 if (0 == fJobId.length()) throw runtime_error("Wrong JobID Value");
695
696 aJobType = true;
697 break;
698 }
699
700 case 'v': // verbose
701 {
702 string aVerbLen = optarg;
703 fVerbose = aVerbLen.length();
704 fDebugLvl = fVerbose;
705 break;
706 }
707
708 case 'd': // -d debug
709 {
710 errno = 0;
711 long lValue = strtol(optarg, 0, 10);
712
713 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
714 throw runtime_error("Option -d is invalid or out of range");
715
716 fDebugLvl = lValue;
717
718 if (fDebugLvl > 0 && fDebugLvl <= 3)
719 {
720 cout << "\nDebug level set to " << fDebugLvl << endl;
721 }
722 else
723 {
724 throw runtime_error("Wrong Debug level");
725 }
726
727 break;
728 }
729
730 case 'r': // -r: num read threads
731 {
732 errno = 0;
733 long lValue = strtol(optarg, 0, 10);
734
735 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
736 throw runtime_error("Option -r is invalid or out of range");
737
738 fNoOfReadThrds = lValue;
739 break;
740 }
741
742 case 'w': // -w: num parse threads
743 {
744 errno = 0;
745 long lValue = strtol(optarg, 0, 10);
746
747 if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
748 throw runtime_error("Option -w is invalid or out of range");
749
750 fNoOfWriteThrds = lValue;
751 break;
752 }
753
754 case 's': // -s: column delimiter
755 {
756 if (!strcmp(optarg, "\\t"))
757 {
758 fColDelim = '\t';
759
760 if (fDebugLvl) cout << "Column delimiter : " << "\\t" << endl;
761 }
762 else
763 {
764 fColDelim = optarg[0];
765
766 if (fDebugLvl) cout << "Column delimiter : " << fColDelim << endl;
767 }
768
769 break;
770 }
771
772 case 'l': // -l: if JobId (-j), it can be input file
773 {
774 fPmFile = optarg;
775
776 if (0 == fPmFile.length()) throw runtime_error("Wrong local filename");
777
778 break;
779 }
780
781 case 'f': // -f: import file path
782 {
783 fPmFilePath = optarg;
784 break;
785 }
786
787 case 'n': // -n: treat "NULL" as null
788 {
789 // default is 0, ie it is equal to not giving this option
790 int nullStringMode = atoi(optarg);
791
792 if ((nullStringMode != 0) && (nullStringMode != 1))
793 {
794 throw (runtime_error(
795 "Invalid NULL option; value can be 0 or 1"));
796 }
797
798 if (nullStringMode)
799 fNullStrMode = true;
800 else
801 fNullStrMode = false; // This is default
802
803 break;
804 }
805
806 case 'P': // -p: list of PM's
807 {
808 try
809 {
810 std::string aPmList = optarg;
811
812 if (!str2PmList(aPmList, fPmVec))
813 throw(runtime_error("PM list is wrong"));
814 }
815 catch (runtime_error& ex)
816 {
817 throw (ex);
818 }
819
820 break;
821 }
822
823 case 'p':
824 {
825 fJobPath = optarg;
826 break;
827 }
828
829 case 'E': // -E: enclosed by char
830 {
831 fEnclosedChar = optarg[0];
832 //cout << "Enclosed by Character : " << optarg[0] << endl;
833 break;
834 }
835
836 case 'C': // -C: enclosed escape char
837 {
838 fEscChar = optarg[0];
839 //cout << "Escape Character : " << optarg[0] << endl;
840 break;
841 }
842
843 case 'h': // -h: help
844 {
845 //usage(); // will exit(1) here
846 fHelp = true;
847 break;
848 }
849
850 case 'I': // -I: binary mode (null handling)
851 {
852 // default is text mode, unless -I option is specified
853 int binaryMode = atoi(optarg);
854
855 if (binaryMode == 1)
856 {
857 fImportDataMode = IMPORT_DATA_BIN_ACCEPT_NULL;
858 }
859 else if (binaryMode == 2)
860 {
861 fImportDataMode = IMPORT_DATA_BIN_SAT_NULL;
862 }
863 else
864 {
865 throw (runtime_error(
866 "Invalid Binary mode; value can be 1 or 2"));
867 }
868
869 break;
870 }
871
872 case 'S': // -S: Treat string truncations as errors
873 {
874 setTruncationAsError(true);
875 //cout << "TruncationAsError : true" << endl;
876 break;
877 }
878
879 case 'T':
880 {
881 std::string timeZone = optarg;
882 long offset;
883
884 if (timeZone != "SYSTEM" && dataconvert::timeZoneToOffset(timeZone.c_str(), timeZone.size(), &offset))
885 {
886 throw (runtime_error(
887 "Value for option -T is invalid"));
888 }
889
890 fTimeZone = timeZone;
891 break;
892 }
893
894 case 'q': // -q: batch quantity - default value is 10000
895 {
896 errno = 0;
897 long long lValue = strtoll(optarg, 0, 10);
898
899 if ((errno != 0) || (lValue < 1) || (lValue > UINT_MAX))
900 throw runtime_error("Option -q is invalid or out of range");
901
902 fBatchQty = lValue;
903
904 if (fBatchQty < 10000) fBatchQty = 10000;
905 else if (fBatchQty > 100000) fBatchQty = 10000;
906
907 break;
908 }
909
910 case 'N': //-N no console output
911 {
912 fConsoleOutput = false;
913 break;
914 }
915
916 case 'y': //-y S3 Key
917 {
918 fS3Key = optarg;
919 break;
920 }
921
922 case 'K': //-K S3 Secret
923 {
924 fS3Secret = optarg;
925 break;
926 }
927
928 case 'H': //-H S3 Host
929 {
930 fS3Host = optarg;
931 break;
932 }
933
934 case 't': //-t S3 bucket
935 {
936 fS3Bucket = optarg;
937 break;
938 }
939
940 case 'g': //-g S3 Region
941 {
942 fS3Region = optarg;
943 break;
944 }
945
946 case 'U': //-U username of the files owner
947 {
948 fUsername = optarg;
949 break;
950 }
951
952 case 'L': // -L set the output location of .bad/.err files
953 {
954 fErrorDir = optarg;
955 break;
956 }
957
958 default:
959 {
960 std::string aErr = "Unknown command line option " + aCh;
961 //cout << "Unknown command line option " << aCh << endl;
962 throw (runtime_error(aErr));
963 }
964 }
965 }
966
967 if (fHelp) usage(); //BUG 4210
968
969 if (fArgMode != -1) fMode = fArgMode; //BUG 4210
970
971 std::string bulkRootPath = getBulkRootDir();
972
973 checkForBulkLogDir(bulkRootPath);
974
975 if (2 == fArgMode && fPmFilePath.empty())
976 throw runtime_error("-f option is mandatory with mode 2.");
977
978 if (aJobType)
979 {
980 if (0 == fArgMode) throw runtime_error("Incompatible mode and option types");
981
982 if (optind < argc)
983 {
984 fSchema = argv[optind]; // 1st pos parm
985 optind++;
986
987 if (optind < argc)
988 {
989 fTable = argv[optind]; // 2nd pos parm
990 optind++;
991 }
992 else
993 {
994 // if schema is there, table name should be there
995 throw runtime_error("No table name specified with schema.");
996 }
997
998 if (optind < argc) // see if input file name is given
999 {
1000 // 3rd pos parm
1001 fLocFile = argv[optind];
1002
1003 if ((fLocFile.at(0) != '/') && (fLocFile != "STDIN"))
1004 {
1005 std::string aTmp = fLocFile;
1006
1007 // BUG 4379 -f given? use that
1008 if ((!fPmFilePath.empty()) && (fMode == 1))
1009 fLocFile = fPmFilePath + "/" + aTmp;
1010 else if (fPmFilePath.empty())
1011 fLocFile = bulkRootPath + "/data/import/" + aTmp;
1012 }
1013 }
1014 else
1015 {
1016 if (!fPmFile.empty())
1017 fLocFile = fPmFile;
1018 //BUG 4186
1019 //else // else take it from the jobxml file
1020 // fLocFile = "STDIN";
1021 //Historically cpimport works with jobfile as
1022 // -l <fileName> && -f <filePath> or
1023 // -fSTDIN as the stdin, it will override colxml loadfile entry
1024 // if -fSTDIN is not provided get i/p file from jobfile
1025 else if ((!fPmFilePath.empty()) && (fPmFilePath == "STDIN"))
1026 fLocFile = "STDIN";
1027
1028 // else take it from the jobxml file
1029 }
1030
1031 if ((fSchema.length() > 0) && (fTable.length() > 0) && (fLocFile.length() > 0))
1032 fJobLogOnly = true;
1033 }
1034 else
1035 {
1036 if (!fPmFile.empty())
1037 {
1038 fLocFile = fPmFile;
1039
1040 if (!fPmFilePath.empty())
1041 {
1042 if (fPmFilePath == "STDIN")
1043 {
1044 throw runtime_error("Conflicting options -l and -fSTDIN");
1045 }
1046 else
1047 {
1048 std::string aTmp = fLocFile;
1049
1050 if ((!fPmFilePath.empty()) && (fMode == 1)) //BUG 4379 -f given? use that
1051 fLocFile = fPmFilePath + "/" + aTmp;
1052 else if (!fPmFilePath.empty())
1053 fLocFile = bulkRootPath + "/data/import/" + aTmp;
1054 }
1055
1056 }
1057
1058 if ((fLocFile.at(0) != '/') && (fLocFile != "STDIN") && (fPmFilePath.empty()))
1059 {
1060 std::string aTmp = fLocFile;
1061 fLocFile = bulkRootPath + "/data/import/" + aTmp;
1062 }
1063 }
1064 //BUG 4186
1065 //else
1066 // fLocFile = "STDIN";
1067 //Historically cpimport works with jobfile as
1068 // -l <fileName> && -f <filePath> or
1069 // -fSTDIN as the stdin, it will override colxml loadfile entry
1070 // if -fSTDIN is not provided get i/p file from jobfile
1071 else if ((!fPmFilePath.empty()) && (fPmFilePath == "STDIN"))
1072 fLocFile = "STDIN";
1073
1074 // else take it from the jobxml file
1075 }
1076
1077 }
1078 // Get positional arguments, User can provide:
1079 // 1. no positional parameters - Mode 0 & stdin
1080 // 2. Two positional parameters (schema and table names) - Mode 1/2, stdin
1081 // 3. Three positional parameters (schema, table, and import file name)
1082 else if (optind < argc) // see if db schema name is given
1083 {
1084 if (fArgMode == 0)
1085 {
1086 //added the code as per BUG 4245
1087 if (!fPmFilePath.empty())
1088 {
1089 fLocFile = fPmFilePath;
1090
1091 if (fLocFile != "STDIN")
1092 throw(runtime_error("ERROR: In Mode 0, -f option can only have value STDIN"));
1093 }
1094 else
1095 {
1096 fLocFile = argv[optind];
1097 optind++;
1098 }
1099
1100 if (optind < argc) //dest filename provided
1101 {
1102 fPmFile = argv[optind];
1103
1104 if ((fPmFile.at(0) != '/') && (fS3Key.empty()))
1105 {
1106 std::string aTmp = fPmFile;
1107 fPmFile = bulkRootPath + "/data/import/" + aTmp;
1108 }
1109 }
1110 else // no dest filename
1111 {
1112 if (fLocFile == "STDIN")
1113 throw(runtime_error("ERROR: Destination file name required!!"));
1114
1115 if (fLocFile.at(0) == '/') //local FQ-filename,parse out filename
1116 fPmFile = getFileNameFromPath(fLocFile);
1117 else
1118 fPmFile = fLocFile;
1119
1120 if ((fPmFile.at(0) != '/') && (fS3Key.empty())) //should be true all the time
1121 {
1122 std::string aTmp = fPmFile;
1123 fPmFile = bulkRootPath + "/data/import/" + aTmp;
1124 }
1125 }
1126
1127 /* commented out for BUG 4245
1128 if(fPmFilePath.empty())
1129 fLocFile = argv[optind];
1130 else
1131 fLocFile = fPmFilePath +"/"+ argv[optind];
1132
1133 if (fPmFile.empty()) //BUG 4200
1134 {
1135 //if(fLocFile.at(0)== '/')
1136 // fPmFile = fLocFile;
1137 //else
1138 if(fLocFile.at(0)!='/')
1139 fPmFile = bulkRootPath + "/data/import/"+ fLocFile;
1140 }
1141 else
1142 {
1143 if(fPmFile.at(0)!='/')
1144 {
1145 std::string aTmp = fPmFile;
1146 fPmFile = bulkRootPath + "/data/import/"+aTmp;
1147 }
1148 }
1149 */
1150 }
1151 else
1152 fSchema = argv[optind]; // 1st pos parm
1153
1154 optind++;
1155
1156 if (optind < argc) // see if table name is given
1157 {
1158 fTable = argv[optind]; // 2nd pos parm
1159 optind++;
1160
1161 if (optind < argc) // see if input file name is given
1162 {
1163 // 3rd pos parm
1164 fLocFile = argv[optind];
1165
1166 //BUG 4379 if -f option given we need to use that path,
1167 //over riding bug 4231. look at the code below
1168 //BUG 4231 - This bug over writes 4199 and commenting out changes
1169 //BUG 4199
1170 //Path not provided, not fully qualified, Look in import dir
1171 //if((fLocFile.at(0)!= '/')&&(fLocFile != "STDIN"))
1172 //{
1173 // std::string aTmp = fLocFile;
1174 // fLocFile = bulkRootPath + "/data/import/"+ aTmp;
1175 //}
1176 //BUG 4379 if -f option given we need to use that path
1177 if ((fLocFile.at(0) != '/') && (fLocFile != "STDIN"))
1178 {
1179 std::string aTmp = fLocFile;
1180
1181 //if -f given? use that otherwise just go ahead with CWD
1182 if ((!fPmFilePath.empty()) && (fMode == 1))
1183 fLocFile = fPmFilePath + "/" + aTmp;
1184
1185 // TODO - if -f option is given and a list of files are
1186 // are provided, we need to be able to import all that.
1187 }
1188
1189
1190 }
1191 else
1192 {
1193 if (fPmFile.length() > 0)
1194 {
1195 // BUG 4210
1196 //if (fPmFilePath.length() > 0)
1197 //{
1198 // fLocFile = fPmFilePath +"/"+ fPmFile;
1199 //}
1200 //else
1201 if (fPmFilePath.empty())
1202 {
1203 //NOTE - un-commenting with an if statement for Mode 2
1204 //BUG 4231 makes it comment out the below changes,
1205 //This will not change even though directly, to be
1206 //on safer side, we should take out this too.
1207 //check path fully qualified? then set as data import
1208 if (2 == fArgMode)
1209 {
1210 //BUG 4342
1211 if ((fPmFile.at(0) != '/') && (fS3Key.empty()))
1212 {
1213 std::string aTmp = fPmFile;
1214 fPmFile = PrepMode2ListOfFiles(aTmp);
1215 }
1216 else
1217 {
1218 if (fPmFile.find_first_of(' ') != string::npos)
1219 {
1220 std::string aPmFiles = replaceCharInStr(fPmFile, ' ', ',');
1221 fPmFile = aPmFiles;
1222 }
1223 }
1224 }
1225
1226 fLocFile = fPmFile;
1227 }
1228 }
1229 else
1230 {
1231 fLocFile = "STDIN";
1232 }
1233
1234 //cout << "LocFile set as stdin" << endl;
1235 }
1236 }
1237 else
1238 {
1239 // If Mode is not 0 and table name is a required argument
1240 if (fArgMode != 0)
1241 throw(runtime_error("No table name specified with schema."));
1242 }
1243
1244 }
1245 else
1246 {
1247 // for testing we are allowing data from stdin even with Mode 0
1248 // that is without LocFileName
1249 if (0 == fArgMode)
1250 {
1251 fLocFile = "STDIN"; //cout << "LocFile set as stdin" << endl;
1252 }
1253 else
1254 {
1255 // If Mode 0, LocFileName is reqd and otherwies Schema is required
1256 throw (runtime_error("No schema or local filename specified."));
1257 }
1258 }
1259
1260 /* check for all-or-nothing cmdline args to enable S3 import */
1261 int s3Tmp = (fS3Key.empty() ? 0 : 1) + (fS3Bucket.empty() ? 0 : 1) +
1262 (fS3Secret.empty() ? 0 : 1) + (fS3Region.empty() ? 0 : 1);
1263 if (s3Tmp != 0 && s3Tmp != 4)
1264 throw runtime_error("The access key, secret, bucket, and region are all required to import from S3");
1265 }
1266
getJobFileName()1267 std::string WECmdArgs::getJobFileName()
1268 {
1269 std::ostringstream aSS;
1270 string aJobIdFileName;
1271
1272 if (fJobId.length() > 0)
1273 {
1274 if (fJobPath.length() > 0)
1275 aSS << fJobPath;
1276 else
1277 {
1278 fJobPath = config::Config::makeConfig()->getConfig("WriteEngine",
1279 "BulkRoot") + "/Job";
1280 aSS << fJobPath;
1281 }
1282
1283 aSS << "/Job_" << fJobId << ".xml";
1284 aJobIdFileName = aSS.str();
1285 }
1286
1287 return aJobIdFileName;
1288 }
1289
getPmStatus(int Id)1290 bool WECmdArgs::getPmStatus(int Id)
1291 {
1292 // if no PMID's provided on cmdline, return true;
1293 if (0 == fPmVec.size()) return true;
1294
1295 VecInts::iterator aIt = fPmVec.begin();
1296
1297 while (aIt != fPmVec.end())
1298 {
1299 if (*aIt == static_cast<unsigned int>(Id))
1300 return true;
1301
1302 ++aIt;
1303 }
1304
1305 return false;
1306 }
1307
1308
1309 //------------------------------------------------------------------------------
1310 // It is a recursive call.
getBrmRptFileName()1311 std::string WECmdArgs::getBrmRptFileName()
1312 {
1313 if (!fBrmRptFile.empty())
1314 return fBrmRptFile;
1315
1316 string brmRptFileName = getTmpFileDir();
1317
1318 if (!brmRptFileName.empty())
1319 {
1320 fTmpFileDir = brmRptFileName;
1321 char aBuff[64];
1322 time_t aTime;
1323 struct tm pTm;
1324 time(&aTime);
1325 localtime_r(&aTime, &pTm);
1326
1327 // BUG 4424
1328 // M D H M S
1329 snprintf(aBuff, sizeof(aBuff), "/BrmRpt%02d%02d%02d%02d%02d%d.rpt",
1330 pTm.tm_mon, pTm.tm_mday, pTm.tm_hour,
1331 pTm.tm_min, pTm.tm_sec, getpid());
1332 brmRptFileName += aBuff;
1333 }
1334 else
1335 {
1336 //cout << "ERROR: Could not find TempFileDir in Columnstore.xml" << endl;
1337 throw (runtime_error("Could not find TempFileDir in Columnstore.xml"));
1338 }
1339
1340 setBrmRptFileName(brmRptFileName);
1341
1342 return brmRptFileName;
1343
1344 }
1345 //------------------------------------------------------------------------------
1346
addJobFilesToVector(std::string & JobName)1347 void WECmdArgs::addJobFilesToVector(std::string& JobName)
1348 {
1349 //if((!fSchema.empty())&&(!fTable.empty())&&(!fLocFile.empty())) return;
1350
1351 WEXmlgetter aXmlGetter(JobName);
1352 vector<string> aSections;
1353 aSections.push_back("BulkJob");
1354 aSections.push_back("Schema");
1355 aSections.push_back("Table");
1356
1357 //BUG 4163
1358 typedef std::vector<string> TableVec;
1359 TableVec aTableVec;
1360 aXmlGetter.getConfig(aSections[1], aSections[2], aTableVec);
1361 setMultiTableCount(aTableVec.size());
1362
1363 if (getMultiTableCount() > 1)
1364 {
1365 splitConfigFilePerTable(JobName, aTableVec.size());
1366 }
1367 else
1368 {
1369 fVecJobFiles.push_back(JobName);
1370 }
1371
1372 }
1373
1374 //------------------------------------------------------------------------------
1375 // Set the schema, table, and loadfile name from the xml job file.
1376 // If running in binary mode, we also get the list of columns for the table,
1377 // so that we can determine the exact fixed record length of the incoming data.
1378 //------------------------------------------------------------------------------
setSchemaAndTableFromJobFile(std::string & JobName)1379 void WECmdArgs::setSchemaAndTableFromJobFile(std::string& JobName)
1380 {
1381 if (((fVecJobFiles.size() == 1) && (!fSchema.empty()) &&
1382 (!fTable.empty()) && (!fLocFile.empty())) &&
1383 (fImportDataMode == IMPORT_DATA_TEXT)) return;
1384
1385 WEXmlgetter aXmlGetter(JobName);
1386 vector<string> aSections;
1387 aSections.push_back("BulkJob");
1388 aSections.push_back("Schema");
1389 aSections.push_back("Table");
1390
1391 // Reset the fSchema, fTable, and FLocFile
1392 if ((fVecJobFiles.size() > 1) ||
1393 (fSchema.empty()) || (fTable.empty()) || (fLocFile.empty()))
1394 {
1395 std::string aSchemaTable;
1396 std::string aInputFile;
1397
1398 aSchemaTable = aXmlGetter.getAttribute(aSections, "tblName");
1399
1400 if (getDebugLvl() > 1) cout << "schema.table = " << aSchemaTable << endl;
1401
1402 aInputFile = aXmlGetter.getAttribute(aSections, "loadName");
1403
1404 if (getDebugLvl() > 1) cout << "xml::InputFile = " << aInputFile << endl;
1405
1406 if (aSchemaTable.length() > 0)
1407 {
1408 char aSchema[64];
1409 char aTable[64];
1410 int aRet = aSchemaTable.find('.');
1411
1412 if (aRet > 0)
1413 {
1414 int aLen = aSchemaTable.copy(aSchema, aRet);
1415
1416 if (getDebugLvl() > 1) cout << "Schema: " << aSchema << endl;
1417
1418 aSchema[aLen] = 0;
1419
1420 if (fSchema.empty()) fSchema = aSchema;
1421
1422 aLen = aSchemaTable.copy(aTable, aSchemaTable.length(), aRet + 1 );
1423 aTable[aLen] = 0;
1424
1425 if (getDebugLvl() > 1) cout << "Table: " << aTable << endl;
1426
1427 fTable = aTable;
1428 }
1429 else
1430 throw runtime_error(
1431 "JobFile ERROR: Can't get Schema and Table Name");
1432 }
1433 else
1434 {
1435 throw runtime_error(
1436 "JobFile ERROR: Can't get Schema and Table Name");
1437 }
1438
1439 if ((fLocFile.empty()) && (!aInputFile.empty()))
1440 {
1441 string bulkRootPath = config::Config::makeConfig()->getConfig(
1442 "WriteEngine", "BulkRoot");
1443
1444 if (aInputFile.at(0) == '/')
1445 fLocFile = aInputFile;
1446 else if ((!fPmFilePath.empty()) && (fMode == 1))
1447 fLocFile = fPmFilePath + "/" + aInputFile;
1448 else if ((!bulkRootPath.empty()) && (fPmFilePath.empty()))
1449 fLocFile = bulkRootPath + "/data/import/" + aInputFile;
1450 else
1451 fLocFile = aInputFile;
1452
1453 if (fArgMode == 2) fPmFile = fLocFile;
1454 }
1455
1456 if (getDebugLvl() > 1) cout << "schema = " << fSchema << endl;
1457
1458 if (getDebugLvl() > 1) cout << "TableName = " << fTable << endl;
1459
1460 if (getDebugLvl() > 1) cout << "Input File = " << fLocFile << endl;
1461 }
1462
1463 // Reset the list of columns we will be importing from the input data
1464 fColFldsFromJobFile.clear();
1465
1466 if (fImportDataMode != IMPORT_DATA_TEXT)
1467 {
1468 aSections.push_back("Column");
1469 aXmlGetter.getAttributeListForAllChildren(
1470 aSections, "colName", fColFldsFromJobFile);
1471 }
1472 }
1473
1474 //------------------------------------------------------------------------------
checkJobIdCase()1475 void WECmdArgs::checkJobIdCase()
1476 {
1477 if ((fJobId.empty()) || (fJobLogOnly) || (fMode == 3) || (fMode == 0)) return;
1478
1479 if (fJobPath.empty())
1480 {
1481 string bulkRootPath = config::Config::makeConfig()->getConfig(
1482 "WriteEngine", "BulkRoot");
1483 //cout << "checkJobIdCase::BulkRoot: " << bulkRootPath << endl;
1484
1485 if (!bulkRootPath.empty())
1486 fJobPath = bulkRootPath + "/job";
1487 else
1488 throw runtime_error("Config Error: BulkRoot not found in Columnstore.xml");
1489 }
1490
1491 char aBuff[256];
1492
1493 if (!fJobPath.empty())
1494 snprintf(aBuff, sizeof(aBuff), "%s/Job_%s.xml", fJobPath.c_str(),
1495 fJobId.c_str());
1496 else // for time being
1497 snprintf(aBuff, sizeof(aBuff), "/var/log/mariadb/columnstore/data/bulk/job/Job_%s.xml",
1498 fJobId.c_str());
1499
1500 std::string aJobFileName(aBuff);
1501
1502 //cout << "checkJobIdCase::aJobFileName: " << aJobFileName << endl;
1503
1504
1505 //BUG 4171
1506 addJobFilesToVector(aJobFileName);
1507
1508 aJobFileName = fVecJobFiles[0];
1509 setSchemaAndTableFromJobFile(aJobFileName);
1510 setEnclByAndEscCharFromJobFile(aJobFileName);
1511
1512 }
1513
1514 //------------------------------------------------------------------------------
1515
getTmpFileDir()1516 std::string WECmdArgs::getTmpFileDir()
1517 {
1518 if (!fTmpFileDir.empty()) return fTmpFileDir;
1519
1520 fTmpFileDir = startup::StartUp::tmpDir();
1521
1522 if (fTmpFileDir.empty())
1523 throw( runtime_error("Config ERROR: TmpFileDir not found!!"));
1524 else
1525 return fTmpFileDir;
1526 }
1527
1528 //------------------------------------------------------------------------------
1529
getBulkRootDir()1530 std::string WECmdArgs::getBulkRootDir()
1531 {
1532 if (!fBulkRoot.empty()) return fBulkRoot;
1533
1534 fBulkRoot = config::Config::makeConfig()->getConfig("WriteEngine",
1535 "BulkRoot");
1536
1537 if (fBulkRoot.empty())
1538 throw( runtime_error("Config ERROR: <BulkRoot> not found!!"));
1539 else
1540 return fBulkRoot;
1541 }
1542
1543 //------------------------------------------------------------------------------
1544
getBatchQuantity()1545 unsigned int WECmdArgs::getBatchQuantity()
1546 {
1547 return (fBatchQty >= 10000) ? fBatchQty : 10000; //default Batch Qty is 10000
1548 }
1549
1550 //------------------------------------------------------------------------------
1551
setEnclByAndEscCharFromJobFile(std::string & JobName)1552 void WECmdArgs::setEnclByAndEscCharFromJobFile(std::string& JobName)
1553 {
1554 if ((fEnclosedChar == 0)) // check anything in Jobxml file
1555 {
1556 WEXmlgetter aXmlGetter(JobName);
1557 vector<string> aSections;
1558 aSections.push_back("BulkJob");
1559 aSections.push_back("EnclosedByChar");
1560
1561 try
1562 {
1563 //std::string aTable = aXmlGetter.getConfig(aSection, aElement);
1564 std::string aEnclosedBy = aXmlGetter.getValue(aSections);
1565
1566 if (getDebugLvl() > 1)cout << "aEncloseBy = " << aEnclosedBy << endl;
1567
1568 if (!aEnclosedBy.empty())
1569 {
1570 fEnclosedChar = aEnclosedBy.at(0);
1571 }
1572 }
1573 catch (std::runtime_error&)
1574 {
1575 // do not do anything
1576 }
1577 }
1578
1579 if (fEscChar == 0) // check anything in Jobxml file
1580 {
1581 WEXmlgetter aXmlGetter(JobName);
1582 vector<string> aSections;
1583 aSections.push_back("BulkJob");
1584 aSections.push_back("EscapeChar");
1585
1586 try
1587 {
1588 //std::string aTable = aXmlGetter.getConfig(aSection, aElement);
1589 std::string aEscChar = aXmlGetter.getValue(aSections);
1590
1591 if (getDebugLvl() > 1) cout << "aEscapeChar = " << aEscChar << endl;
1592
1593 if (!aEscChar.empty())
1594 {
1595 fEscChar = aEscChar.at(0);
1596 }
1597 }
1598 catch (std::runtime_error&)
1599 {
1600 // do not do anything
1601 }
1602 }
1603
1604 }
1605
1606 //------------------------------------------------------------------------------
getFileNameFromPath(const std::string & Path) const1607 std::string WECmdArgs::getFileNameFromPath(const std::string& Path) const
1608 {
1609 char aBuff[64];
1610 int iDx = Path.find_last_of('/');
1611 iDx++; // compensate for the forward slash
1612 int aCx = Path.size() - iDx;
1613 Path.copy(aBuff, aCx, iDx);
1614 aBuff[aCx] = 0;
1615 return aBuff;
1616 }
1617
1618 //------------------------------------------------------------------------------
getModuleID()1619 std::string WECmdArgs::getModuleID()
1620 {
1621 oam::Oam oam;
1622 oam::oamModuleInfo_t sModInfo;
1623 std::string sModuleID;
1624 char szModuleIDandPID[64];
1625 int nModuleNumber;
1626
1627 try
1628 {
1629 sModInfo = oam.getModuleInfo();
1630 sModuleID = boost::get < 1 > (sModInfo);
1631 nModuleNumber = boost::get < 2 > (sModInfo);
1632 snprintf(szModuleIDandPID, sizeof(szModuleIDandPID), "%s%d-%d",
1633 sModuleID.c_str(), nModuleNumber, getpid());
1634 sModuleID = szModuleIDandPID;
1635 }
1636 catch (exception&)
1637 {
1638 sModuleID = "unknown";
1639 }
1640
1641 return sModuleID;
1642 }
1643
1644 //------------------------------------------------------------------------------
1645
1646
splitConfigFilePerTable(std::string & ConfigName,int tblCount)1647 void WECmdArgs::splitConfigFilePerTable(std::string& ConfigName, int tblCount)
1648 {
1649 std::string aOpenTag = "<Table ";
1650 std::string aCloseTag = "</Table>";
1651 std::string aCloseSchemaTag = "</Schema>";
1652
1653 std::vector<std::ofstream*> aVecFiles;
1654
1655 //std::vector<std::string> aVecConfigs;
1656 for (int aIdx = 1; aIdx <= tblCount; aIdx++)
1657 {
1658 char aConfName[128];
1659 snprintf(aConfName, sizeof(aConfName), "%s_%d.xml", ConfigName.c_str(), aIdx);
1660 //aVecConfigs.push_back(aConfName);
1661 fVecJobFiles.push_back(aConfName);
1662 std::ofstream* pCopy = new std::ofstream;
1663 //pCopy->open(aConfName, std::ios_base::app);
1664 pCopy->open(aConfName);
1665 aVecFiles.push_back(pCopy);
1666 }
1667
1668
1669 std::ifstream aMaster;
1670 aMaster.open(ConfigName.c_str());
1671
1672 if (aMaster.is_open())
1673 {
1674 char aBuff[256];
1675 int aTblNo = 0;
1676 size_t aStrPos = std::string::npos;
1677 bool aOpenFound = false;
1678 bool aCloseFound = false;
1679
1680 while (!aMaster.eof())
1681 {
1682 aMaster.getline(aBuff, sizeof(aBuff) - 1);
1683 unsigned int aLen = aMaster.gcount();
1684
1685 if ((aLen < (sizeof(aBuff) - 2)) && (aLen > 0))
1686 {
1687 aBuff[aLen - 1] = '\n';
1688 aBuff[aLen] = 0;
1689 string aData = aBuff;
1690 //cout << "Data Read " << aBuff;
1691
1692 if (!aOpenFound)
1693 {
1694 aStrPos = aData.find(aOpenTag);
1695
1696 if (aStrPos != std::string::npos)
1697 {
1698 aOpenFound = true;
1699 aTblNo++;
1700 write2ConfigFiles(aVecFiles, aBuff, aTblNo);
1701 }
1702 else
1703 {
1704 if ((!aOpenFound) && (aCloseFound))
1705 {
1706 aStrPos = aData.find(aCloseSchemaTag);
1707
1708 if (aStrPos != std::string::npos)
1709 {
1710 aOpenFound = false;
1711 aCloseFound = false;
1712 aTblNo = 0;
1713 }
1714 }
1715
1716 write2ConfigFiles(aVecFiles, aBuff, aTblNo);
1717 }
1718 }
1719 else
1720 {
1721 aStrPos = aData.find(aCloseTag);
1722
1723 if (aStrPos != std::string::npos)
1724 {
1725 aOpenFound = false;
1726 aCloseFound = true;
1727 write2ConfigFiles(aVecFiles, aBuff, aTblNo);
1728 }
1729 else
1730 {
1731 write2ConfigFiles(aVecFiles, aBuff, aTblNo);
1732 }
1733 }
1734 }
1735 }//while Master.eof
1736 }
1737 else
1738 {
1739 throw runtime_error("Could not open Job Config file");
1740 }
1741
1742
1743 for (unsigned int Idx = 0; Idx < aVecFiles.size(); Idx++)
1744 {
1745 aVecFiles[Idx]->close();
1746 delete aVecFiles[Idx];
1747 }
1748
1749 aVecFiles.clear();
1750
1751 }
1752
1753 //------------------------------------------------------------------------------
1754
write2ConfigFiles(std::vector<std::ofstream * > & Files,char * pBuff,int FileIdx)1755 void WECmdArgs::write2ConfigFiles(std::vector<std::ofstream*>& Files,
1756 char* pBuff, int FileIdx)
1757 {
1758
1759 if (FileIdx == 0)
1760 {
1761 std::vector<std::ofstream*>::iterator aIt = Files.begin();
1762
1763 while (aIt != Files.end())
1764 {
1765 std::ofstream* pCopy = (*aIt);
1766 pCopy->write(pBuff, strlen(pBuff));
1767 ++aIt;
1768 }
1769 }
1770 else
1771 {
1772 Files[FileIdx - 1]->write(pBuff, strlen(pBuff));
1773 }
1774 }
1775
1776 //------------------------------------------------------------------------------
1777
updateWithJobFile(int Idx)1778 void WECmdArgs::updateWithJobFile(int Idx)
1779 {
1780 setLocFile(""); // resetting the from the previous import
1781 std::string aJobFileName = fVecJobFiles[Idx];
1782 setSchemaAndTableFromJobFile(aJobFileName);
1783 setEnclByAndEscCharFromJobFile(aJobFileName);
1784 setJobFileName(aJobFileName);
1785
1786 std::ostringstream aSS;
1787 aSS << fOrigJobId << ".xml_" << (Idx + 1);
1788 fJobId = aSS.str();
1789 }
1790
1791
1792 //------------------------------------------------------------------------------
1793
replaceCharInStr(const std::string & Str,char C,char R)1794 std::string WECmdArgs::replaceCharInStr(const std::string& Str, char C, char R)
1795 {
1796 std::stringstream aSs;
1797
1798 size_t start = 0, end = 0;
1799 end = Str.find_first_of(C);
1800
1801 do
1802 {
1803 if (end != string::npos)
1804 {
1805 aSs << Str.substr(start, end - start) << R;
1806 start = end + 1;
1807 }
1808 else
1809 {
1810 aSs << Str.substr(start, end - start);
1811 break;
1812 }
1813
1814 end = Str.find_first_of(C, start);
1815 }
1816 while (start != end);
1817
1818 return aSs.str();
1819 }
1820
1821 //------------------------------------------------------------------------------
1822 // Introduced to handle Bug 4342 with Mode 2
1823
PrepMode2ListOfFiles(std::string & FileName)1824 std::string WECmdArgs::PrepMode2ListOfFiles(std::string& FileName)
1825 {
1826 VecArgs aInfileList;
1827 std::string bulkRootPath = getBulkRootDir();
1828 //cout << "Inside PrepMode2ListOfFiles("<< FileName << ")" << endl;
1829 std::string aFileName = FileName;
1830
1831 istringstream iss(aFileName);
1832 size_t start = 0, end = 0;
1833 const char* sep = " ,|";
1834
1835 end = aFileName.find_first_of(sep);
1836
1837 do
1838 {
1839 if (end != string::npos)
1840 {
1841 std::string aFile = aFileName.substr(start, end - start);
1842
1843 if (getDebugLvl() > 1)
1844 cout << "File: " << aFileName.substr(start, end - start) << endl;
1845
1846 start = end + 1;
1847 aInfileList.push_back(aFile);
1848 }
1849 else
1850 {
1851 std::string aFile = aFileName.substr(start, end - start);
1852
1853 if (getDebugLvl() > 1)
1854 cout << "Next Input File " << aFileName.substr(start, end - start) << endl;
1855
1856 aInfileList.push_back(aFile);
1857 break;
1858 }
1859
1860 end = aFileName.find_first_of(sep, start);
1861 }
1862 while (start != end);
1863
1864 std::ostringstream aSS;
1865 int aVecSize = aInfileList.size();
1866 int aVecIdx = 0;
1867
1868 // Take file list one by one and append it to one string
1869 while (aVecIdx < aVecSize)
1870 {
1871 std::string aNextFile = aInfileList[aVecIdx];
1872 aVecIdx++;
1873
1874 //aInfileList.pop_front();
1875 if (aNextFile.at(0) != '/')
1876 {
1877 aSS << bulkRootPath << "/data/import/" + aNextFile;
1878 }
1879 else
1880 {
1881 aSS << aNextFile;
1882 }
1883
1884 if (aVecIdx < aVecSize) aSS << ",";
1885 }
1886
1887 //cout << "File list are = " << aSS.str() << endl;
1888
1889 return aSS.str();
1890 }
1891
1892 //------------------------------------------------------------------------------
1893 // Get set of column names in the "current" table being processed from the
1894 // Job xml file.
1895 //------------------------------------------------------------------------------
getColumnList(std::set<std::string> & columnList) const1896 void WECmdArgs::getColumnList( std::set<std::string>& columnList ) const
1897 {
1898 columnList.clear();
1899
1900 for (unsigned k = 0; k < fColFldsFromJobFile.size(); k++)
1901 {
1902 columnList.insert( fColFldsFromJobFile[k] );
1903 }
1904 }
1905
1906 //-----------------------------------------------------------------------------
1907 // check for the bulkload log directory. If it is not existing, create it
1908 // w.r.t Bug 6137
1909 //-----------------------------------------------------------------------------
1910
checkForBulkLogDir(const std::string & BulkRoot)1911 void WECmdArgs::checkForBulkLogDir(const std::string& BulkRoot)
1912 {
1913 if ( !boost::filesystem::exists(BulkRoot.c_str()) )
1914 {
1915 cout << "Creating directory : " << BulkRoot << endl;
1916 boost::filesystem::create_directories(BulkRoot.c_str());
1917 }
1918
1919 if ( boost::filesystem::exists(BulkRoot.c_str()) )
1920 {
1921 // create the job directory also if not existing
1922 std::ostringstream aSS;
1923 aSS << BulkRoot;
1924 aSS << "/job";
1925 std::string jobDir = aSS.str();
1926
1927 if ( !boost::filesystem::exists(jobDir.c_str()) )
1928 {
1929 cout << "Creating directory : " << jobDir << endl;
1930 bool aSuccess = boost::filesystem::create_directories(jobDir.c_str());
1931
1932 if (!aSuccess)
1933 {
1934 cout << "\nFailed to create job directory, check permissions\n" << endl;
1935 throw runtime_error("Failed to create job directory, check permissions");
1936 }
1937 }
1938 }
1939 else
1940 {
1941 cout << "\nFailed to create bulk directory, check permissions\n" << endl;
1942 throw runtime_error("Failed to create bulk directory, check permissions");
1943 }
1944 }
1945
1946
1947 } /* namespace WriteEngine */
1948