1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*******************************************************************************
19 * $Id: we_xmljob.cpp 4579 2013-03-19 23:16:54Z dhall $
20 *
21 *******************************************************************************/
22 /** @file */
23
24 #define WRITEENGINEXMLJOB_DLLEXPORT
25 #include "we_xmljob.h"
26 #undef WRITEENGINEXMLJOB_DLLEXPORT
27
28 #include <limits>
29 #include <sstream>
30 #include <unistd.h>
31 #include <stdexcept>
32 #include <cstdlib>
33 #include <set>
34 #include "we_config.h"
35 #include "we_log.h"
36 #include "we_convertor.h"
37 #include "dataconvert.h"
38 #include <boost/date_time/posix_time/posix_time.hpp>
39 #include <boost/filesystem/path.hpp>
40 #include <boost/filesystem/convenience.hpp>
41
42 #include <sys/time.h>
43
44 using namespace std;
45 using namespace execplan;
46
47
48 namespace WriteEngine
49 {
50 // Maximum saturation value for DECIMAL types based on precision
51 const long long columnstore_precision[19] =
52 {
53 0,
54 9,
55 99,
56 999,
57 9999,
58 99999,
59 999999,
60 9999999,
61 99999999,
62 999999999,
63 9999999999LL,
64 99999999999LL,
65 999999999999LL,
66 9999999999999LL,
67 99999999999999LL,
68 999999999999999LL,
69 9999999999999999LL,
70 99999999999999999LL,
71 999999999999999999LL
72 };
73
74 //------------------------------------------------------------------------------
75 // Constructor
76 //------------------------------------------------------------------------------
XMLJob()77 XMLJob::XMLJob( ) : fDebugLevel( DEBUG_0 ),
78 fDeleteTempFile(false),
79 fValidateColList(true),
80 fTimeZone("SYSTEM")
81 {
82 }
83
84 //------------------------------------------------------------------------------
85 // Default Destructor
86 // Delete temporary Job XML file if applicable.
87 //------------------------------------------------------------------------------
~XMLJob()88 XMLJob::~XMLJob()
89 {
90 if ((fDeleteTempFile) && (!fJobFileName.empty()))
91 {
92 unlink( fJobFileName.c_str() );
93 }
94 }
95
96 //------------------------------------------------------------------------------
97 // Load a job xml file
98 // fileName - name of file to load
99 // bTempFile - are we loading a temporary file (that destructor should delete)
100 // bValidateColumnList - validate that all db columns have an XML tag
101 // returns NO_ERROR if success; other if fail
102 //------------------------------------------------------------------------------
loadJobXmlFile(const string & fileName,bool bTempFile,bool bValidateColumnList,string & errMsg)103 int XMLJob::loadJobXmlFile( const string& fileName,
104 bool bTempFile,
105 bool bValidateColumnList,
106 string& errMsg )
107 {
108 int rc;
109
110 fDeleteTempFile = bTempFile;
111 fJobFileName = fileName;
112 fValidateColList = bValidateColumnList;
113
114 try
115 {
116 rc = parseDoc( fileName.c_str() );
117
118 if (rc != NO_ERROR)
119 return rc;
120 }
121 catch (exception& ex)
122 {
123 errMsg = ex.what();
124 return ERR_XML_PARSE;
125 }
126
127 return rc;
128 }
129
130 //------------------------------------------------------------------------------
131 // Print contents of fJob to the specified logger object.
132 // logger - Log object to use in logging
133 //------------------------------------------------------------------------------
printJobInfo(Log & logger) const134 void XMLJob::printJobInfo( Log& logger ) const
135 {
136 const Job& job = fJob;
137
138 ostringstream oss1;
139 oss1 << "Job " << job.id << " input\n";
140 oss1 << "===============================================" << endl;
141 oss1 << "Name : " << job.name << endl;
142 oss1 << "Desc : " << job.desc << endl;
143 oss1 << "User : " << job.userName << endl;
144 oss1 << "Delim: " << job.fDelimiter << endl;
145 oss1 << "Enclosed By : ";
146
147 if (job.fEnclosedByChar)
148 oss1 << job.fEnclosedByChar << endl;
149 else
150 oss1 << "n/a" << endl;
151
152 oss1 << "Escape Char : ";
153
154 if (job.fEscapeChar)
155 oss1 << job.fEscapeChar << endl;
156 else
157 oss1 << "n/a" << endl;
158
159 oss1 << "Read Buffers: " << job.numberOfReadBuffers << endl;
160 oss1 << "Read Buffer Size: " << job.readBufferSize << endl;
161 oss1 << "setvbuf Size: " << job.writeBufferSize << endl;
162 oss1 << "Create Date : " << job.createDate << endl;
163 oss1 << "Create Time : " << job.createTime << endl;
164 oss1 << "Schema Name : " << job.schema << endl;
165
166 oss1 << "Num Tables : " << job.jobTableList.size() << endl;
167 logger.logMsg( oss1.str(), MSGLVL_INFO2 );
168
169 for ( unsigned int i = 0; i < job.jobTableList.size(); i++ )
170 {
171 const JobTable& jobTable = job.jobTableList[i];
172 ostringstream oss2;
173 oss2 << "\n-------------------------------------------------" << endl;
174 oss2 << "\tTable Name : " << jobTable.tblName << endl;
175 oss2 << "\tTable OID : " << jobTable.mapOid << endl;
176 oss2 << "\tTable Load Name : " << jobTable.loadFileName <<
177 endl;
178 oss2 << "\tMax Err Num : " << jobTable.maxErrNum << endl;
179
180 const JobColList& colList = jobTable.colList;
181
182 oss2 << "\tNum of Columns : " << colList.size() << endl;
183 logger.logMsg( oss2.str(), MSGLVL_INFO2 );
184
185 // Note that we don't print JobColumn.dataType because it is not carried
186 // in the XML file. dataType is assigned/used internally by bulkload.
187 for ( unsigned int j = 0; j < jobTable.fFldRefs.size(); j++ )
188 {
189 unsigned idx = jobTable.fFldRefs[j].fArrayIndex;
190 BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType;
191 const JobColumn& jobCol = ((fldColType == BULK_FLDCOL_IGNORE_FIELD) ?
192 jobTable.fIgnoredFields[idx] :
193 jobTable.colList[idx] );
194 ostringstream oss3;
195 oss3 << "\n\t****************************************" << endl;
196
197 if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT)
198 oss3 << "\t\tDefaultColumn Name: " << jobCol.colName << endl;
199 else
200 oss3 << "\t\tColumn Name : " << jobCol.colName << endl;
201
202 oss3 << "\t\tColumn OID : " << jobCol.mapOid << endl;
203 oss3 << "\t\tColumn type name : " << jobCol.typeName << endl;
204 oss3 << "\t\tColumn width : " << jobCol.width << endl;
205 oss3 << "\t\tColumn Not Null : " << jobCol.fNotNull << endl;
206 oss3 << "\t\tColumn WithDefault: " << jobCol.fWithDefault << endl;
207 oss3 << "\t\tColumn type : " << jobCol.colType << endl;
208 oss3 << "\t\tColumn comp type : " << jobCol.compressionType << endl;
209 oss3 << "\t\tColumn autoInc : " << jobCol.autoIncFlag << endl;
210
211 if ( jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::DECIMAL] )
212 {
213 oss3 << "\t\tColumn Precision : " << jobCol.precision << endl;
214 oss3 << "\t\tColumn Scale : " << jobCol.scale << endl;
215 }
216
217 if ( jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL] )
218 {
219 oss3 << "\t\tColumn Precision : " << jobCol.precision << endl;
220 oss3 << "\t\tColumn Scale : " << jobCol.scale << endl;
221 }
222
223 if ( jobCol.colType == 'D' )
224 {
225 oss3 << "\t\tDictionary Oid : " <<
226 jobCol.dctnry.dctnryOid << endl;
227 }
228
229 logger.logMsg( oss3.str(), MSGLVL_INFO2 );
230 } // end of loop through columns in a table
231 } // end of loop through tables
232 }
233
234 //------------------------------------------------------------------------------
235 // Print brief contents of specified Job to specified logger object.
236 // logger - Log object to use in logging
237 //------------------------------------------------------------------------------
printJobInfoBrief(Log & logger) const238 void XMLJob::printJobInfoBrief( Log& logger ) const
239 {
240 const Job& job = fJob;
241
242 ostringstream oss1;
243 oss1 << "XMLJobFile: Delim(" << job.fDelimiter << "); EnclosedBy(";
244
245 if (job.fEnclosedByChar)
246 oss1 << job.fEnclosedByChar;
247 else
248 oss1 << "n/a";
249
250 oss1 << "); EscapeChar(";
251
252 if (job.fEscapeChar)
253 oss1 << job.fEscapeChar;
254 else
255 oss1 << "n/a";
256
257 oss1 << "); ReadBufs(" << job.numberOfReadBuffers <<
258 "); ReadBufSize(" << job.readBufferSize <<
259 "); setvbufSize(" << job.writeBufferSize << ')';
260 logger.logMsg( oss1.str(), MSGLVL_INFO2 );
261
262 for ( unsigned int i = 0; i < job.jobTableList.size(); i++ )
263 {
264 const JobTable& jobTable = job.jobTableList[i];
265 ostringstream oss2;
266 oss2 << " Table(" << jobTable.tblName <<
267 "); OID(" << jobTable.mapOid << ')' <<
268 "; MaxErrNum(" << jobTable.maxErrNum << ')';
269 logger.logMsg( oss2.str(), MSGLVL_INFO2 );
270
271 for ( unsigned int j = 0; j < jobTable.fFldRefs.size(); j++ )
272 {
273 unsigned idx = jobTable.fFldRefs[j].fArrayIndex;
274 BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType;
275 const JobColumn& jobCol = ((fldColType == BULK_FLDCOL_IGNORE_FIELD) ?
276 jobTable.fIgnoredFields[idx] :
277 jobTable.colList[idx]);
278 ostringstream oss3;
279
280 if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT)
281 oss3 << " DefaultColumn(" << jobCol.colName;
282 else
283 oss3 << " Column(" << jobCol.colName;
284
285 oss3 << "); OID(" << jobCol.mapOid <<
286 "); Type(" << jobCol.typeName <<
287 "); Width(" << jobCol.width <<
288 "); Comp(" << jobCol.compressionType;
289
290 if ( jobCol.colType == 'D' )
291 oss3 << "); DctnryOid(" << jobCol.dctnry.dctnryOid;
292
293 oss3 << ')';
294
295 if (jobCol.autoIncFlag)
296 oss3 << "; autoInc";
297
298 if (jobCol.fNotNull)
299 oss3 << "; NotNull";
300
301 if (jobCol.fWithDefault)
302 oss3 << "; WithDefault";
303
304 logger.logMsg( oss3.str(), MSGLVL_INFO2 );
305 }
306 } // end of for( int i
307 }
308
309 //------------------------------------------------------------------------------
310 // Process a node
311 // pNode - current node
312 // returns TRUE if success, FALSE otherwise
313 //------------------------------------------------------------------------------
processNode(xmlNode * pNode)314 bool XMLJob::processNode( xmlNode* pNode )
315 {
316 if ( isTag( pNode, TAG_BULK_JOB ))
317 {
318 // no work for the BulkJob tag
319 }
320 else if ( isTag( pNode, TAG_CREATE_DATE ))
321 setJobData( pNode, TAG_CREATE_DATE, true, TYPE_CHAR );
322 else if ( isTag( pNode, TAG_CREATE_TIME ))
323 setJobData( pNode, TAG_CREATE_TIME, true, TYPE_CHAR );
324 else if ( isTag( pNode, TAG_COLUMN ))
325 setJobData( pNode, TAG_COLUMN, false, TYPE_EMPTY );
326 else if ( isTag( pNode, TAG_DEFAULT_COLUMN ))
327 setJobData( pNode, TAG_DEFAULT_COLUMN, false, TYPE_EMPTY );
328 else if ( isTag( pNode, TAG_DESC ))
329 setJobData( pNode, TAG_DESC, true, TYPE_CHAR );
330 else if ( isTag( pNode, TAG_ID ))
331 setJobData( pNode, TAG_ID, true, TYPE_INT );
332 else if ( isTag( pNode, TAG_IGNORE_FIELD ))
333 setJobData( pNode, TAG_IGNORE_FIELD, false, TYPE_EMPTY );
334 else if ( isTag( pNode, TAG_NAME ))
335 setJobData( pNode, TAG_NAME, true, TYPE_CHAR );
336 else if ( isTag( pNode, TAG_PATH ))
337 setJobData( pNode, TAG_PATH, true, TYPE_CHAR );
338 else if ( isTag( pNode, TAG_TABLE ))
339 setJobData( pNode, TAG_TABLE, false, TYPE_EMPTY );
340 else if ( isTag( pNode, TAG_TYPE ))
341 setJobData( pNode, TAG_TYPE, true, TYPE_CHAR );
342 else if ( isTag( pNode, TAG_USER ))
343 setJobData( pNode, TAG_USER, true, TYPE_CHAR );
344 else if ( isTag( pNode, TAG_SCHEMA))
345 setJobData( pNode, TAG_SCHEMA, false, TYPE_EMPTY );
346 else if ( isTag( pNode, TAG_READ_BUFFERS))
347 setJobData( pNode, TAG_READ_BUFFERS, false, TYPE_EMPTY );
348 else if ( isTag( pNode, TAG_WRITE_BUFFER_SIZE))
349 setJobData( pNode, TAG_WRITE_BUFFER_SIZE, true, TYPE_INT);
350 else if ( isTag( pNode, TAG_DELIMITER))
351 setJobData( pNode, TAG_DELIMITER, true, TYPE_CHAR);
352 else if ( isTag( pNode, TAG_ENCLOSED_BY_CHAR))
353 setJobData( pNode, TAG_ENCLOSED_BY_CHAR, true, TYPE_CHAR);
354 else if ( isTag( pNode, TAG_ESCAPE_CHAR))
355 setJobData( pNode, TAG_ESCAPE_CHAR, true, TYPE_CHAR);
356 else
357 {
358 ostringstream oss;
359 oss << "Unrecognized TAG in Job XML file: <" << pNode->name << ">";
360 throw runtime_error( oss.str() );
361 }
362
363 if (XMLOp::processNode( pNode ))
364 {
365 if ( isTag( pNode, TAG_TABLE ))
366 {
367 postProcessTableNode();
368 }
369 }
370 else
371 {
372 return false;
373 }
374
375 return true;
376 }
377
378 //------------------------------------------------------------------------------
379 // Generic setter
380 // pNode - current node
381 // tag - xml tag
382 // bExpectContent - should node content be present to process
383 // tagType - data type
384 //------------------------------------------------------------------------------
setJobData(xmlNode * pNode,const xmlTag tag,bool bExpectContent,XML_DTYPE tagType)385 void XMLJob::setJobData( xmlNode* pNode,
386 const xmlTag tag,
387 bool bExpectContent,
388 XML_DTYPE tagType )
389 {
390 int intVal = 0;
391 long long llVal = 0;
392 std::string bufString;
393 bool bSuccess = false;
394
395 if (bExpectContent)
396 {
397 if ( tagType == TYPE_INT )
398 bSuccess = getNodeContent( pNode, &intVal, TYPE_INT );
399 else // longlong
400 if ( tagType == TYPE_LONGLONG )
401 bSuccess = getNodeContent( pNode, &llVal, TYPE_LONGLONG );
402 else // char
403 if ( tagType == TYPE_CHAR )
404 bSuccess = getNodeContentStr( pNode, bufString );
405
406 if (!bSuccess)
407 return;
408 }
409
410 // process tag content and attributes
411 switch ( tag )
412 {
413 case TAG_READ_BUFFERS:
414 setReadBuffers( pNode );
415 break;
416
417 case TAG_COLUMN:
418 setJobDataColumn( pNode, false );
419 break;
420
421 case TAG_CREATE_DATE:
422 fJob.createDate = bufString;
423 break;
424
425 case TAG_CREATE_TIME:
426 fJob.createTime = bufString;
427 break;
428
429 case TAG_DEFAULT_COLUMN:
430 setJobDataColumn( pNode, true );
431 break;
432
433 case TAG_DESC:
434 fJob.desc = bufString;
435 break;
436
437 case TAG_ID:
438 fJob.id = intVal;
439 break;
440
441 case TAG_IGNORE_FIELD:
442 setJobDataIgnoreField( );
443 break;
444
445 case TAG_NAME:
446 fJob.name = bufString;
447 break;
448
449 case TAG_PATH:
450 // no action necessary, but keep for backwards compatability
451 break;
452
453 case TAG_TABLE:
454 setJobDataTable( pNode );
455 break;
456
457 case TAG_TYPE:
458 // no action necessary, but keep for backwards compatability
459 break;
460
461 case TAG_USER:
462 fJob.userName = bufString;
463 break;
464
465 case TAG_SCHEMA:
466 setSchema( pNode );
467 break;
468
469 case TAG_WRITE_BUFFER_SIZE:
470 fJob.writeBufferSize = intVal;
471 break;
472
473 case TAG_DELIMITER:
474 {
475 const char* buf = bufString.c_str();
476
477 if ((!strcmp(buf, "\\t")) ||
478 (!strcmp(buf, "'\\t'")))
479 {
480 fJob.fDelimiter = '\t';
481 }
482 else
483 {
484 fJob.fDelimiter = bufString[0];
485 }
486
487 break;
488 }
489
490 case TAG_ENCLOSED_BY_CHAR:
491 {
492 fJob.fEnclosedByChar = bufString[0];
493 break;
494 }
495
496 case TAG_ESCAPE_CHAR:
497 {
498 fJob.fEscapeChar = bufString[0];
499 break;
500 }
501
502 default:
503 break;
504 }
505 }
506
507 //------------------------------------------------------------------------------
508 // Set table information parms.
509 // pNode - current node
510 //------------------------------------------------------------------------------
setJobDataTable(xmlNode * pNode)511 void XMLJob::setJobDataTable( xmlNode* pNode )
512 {
513 int intVal;
514 std::string bufString;
515 JobTable curTable;
516
517 if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_ORIG_NAME], bufString ) )
518 curTable.tblName = bufString;
519
520 if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_TBL_NAME], bufString ) )
521 curTable.tblName = bufString;
522
523 if (curTable.tblName.empty())
524 {
525 throw runtime_error(
526 "Required table name attribute (tblName) missing from Table tag");
527 }
528
529 if ( getNodeAttribute( pNode, xmlTagTable[TAG_TBL_OID], &intVal, TYPE_INT ) )
530 curTable.mapOid = intVal;
531
532 if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_LOAD_NAME], bufString ) )
533 curTable.loadFileName = bufString;
534
535 if ( getNodeAttribute( pNode, xmlTagTable[TAG_MAX_ERR_ROW], &intVal,
536 TYPE_INT))
537 curTable.maxErrNum = intVal;
538
539 fJob.jobTableList.push_back( curTable );
540 }
541
542 //------------------------------------------------------------------------------
543 // Set column information parms.
544 // pNode - current node
545 // bDefaultCol - is this a <DefaultColumn> tag
546 //
547 // Note on Supported Tags: (Bug 2828)
548 // Note that the "notnull" and "defaultValue" attribute tags are not recognized
549 // by this function because by the time we added support for these tags, we had
550 // changed to only store the table and column names in the XML file. Much of
551 // the functionality in setJobDataColumn() is only present to provide backwards
552 // compatability for an old Job XML file that a user might still be using.
553 //
554 // Any other new tags probably don't need adding to setJobDataColumn() either,
555 // for the same reason.
556 //------------------------------------------------------------------------------
setJobDataColumn(xmlNode * pNode,bool bDefaultCol)557 void XMLJob::setJobDataColumn( xmlNode* pNode, bool bDefaultCol )
558 {
559 int intVal;
560 std::string bufString;
561 JobColumn curColumn;
562
563 if ( fJob.jobTableList.size() == 0 )
564 return;
565
566 int tableNo = fJob.jobTableList.size() - 1;
567
568 if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_ORIG_NAME], bufString ) )
569 curColumn.colName = bufString;
570
571 if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_COL_NAME], bufString ) )
572 curColumn.colName = bufString;
573
574 if (curColumn.colName.empty())
575 {
576 ostringstream oss;
577 oss << "Required column name attribute (colName) missing from "
578 "Column tag for table " <<
579 fJob.jobTableList[tableNo].tblName;
580 throw runtime_error( oss.str() );
581 }
582
583 if ( getNodeAttribute( pNode, xmlTagTable[TAG_COL_OID], &intVal, TYPE_INT ) )
584 curColumn.mapOid = intVal;
585
586 if ( getNodeAttribute( pNode, xmlTagTable[TAG_WIDTH], &intVal, TYPE_INT ) )
587 {
588 curColumn.width = intVal;
589 curColumn.definedWidth = intVal; //@Bug 3040
590 }
591
592 if ( getNodeAttribute( pNode, xmlTagTable[TAG_PRECISION], &intVal, TYPE_INT))
593 curColumn.precision = intVal;
594
595 if ( getNodeAttribute( pNode, xmlTagTable[TAG_SCALE], &intVal, TYPE_INT ) )
596 curColumn.scale = intVal;
597
598 if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_DATA_TYPE], bufString ) )
599 curColumn.typeName = bufString;
600
601 if ( getNodeAttribute( pNode, xmlTagTable[TAG_COMPRESS_TYPE], &intVal,
602 TYPE_INT))
603 {
604 curColumn.compressionType = intVal;
605 curColumn.dctnry.fCompressionType = intVal;
606 }
607
608 if ( getNodeAttribute( pNode, xmlTagTable[TAG_AUTOINCREMENT_FLAG],
609 &intVal, TYPE_INT))
610 {
611 if (intVal)
612 curColumn.autoIncFlag = true;
613 else
614 curColumn.autoIncFlag = false;
615 }
616
617 if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_COL_TYPE], bufString ) )
618 {
619 const char* buf = bufString.c_str();
620
621 if ( !strcmp( buf, "D" ) )
622 {
623 curColumn.colType = 'D';
624
625 // @Bug 2565: Retain dictionary width to use in truncating strings,
626 // since BulkLoad eventually stores column token width in 'width'.
627 curColumn.dctnryWidth = curColumn.width;
628
629 if ( getNodeAttribute( pNode,
630 xmlTagTable[TAG_DVAL_OID],
631 &intVal,
632 TYPE_INT ) )
633 curColumn.dctnry.dctnryOid = intVal;
634 }
635 }
636
637 // This is a workaround that DBBuilder can not pass decimal type to XML file
638 if ( ( curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::INT] ||
639 curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT] ||
640 curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT] ||
641 curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT]) &&
642 curColumn.scale > 0 )
643 curColumn.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL];
644
645 // end of workaround
646
647 // Initialize the saturation limits for this column
648 initSatLimits( curColumn );
649
650 // Save default columns in separate list, so that we can intentionally
651 // add/keep them at the "end" of colList later, after all other columns.
652 if (bDefaultCol) // temporarily save in separate list
653 {
654 curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_DEFAULT;
655 fDefaultColumns.push_back ( curColumn );
656 }
657 else
658 {
659 // Add to list of db columns to be loaded
660 curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_FIELD;
661 fJob.jobTableList[tableNo].colList.push_back ( curColumn );
662
663 // Add to combined field list of columns and ignored fields
664 JobFieldRef fieldRef( BULK_FLDCOL_COLUMN_FIELD,
665 fJob.jobTableList[tableNo].colList.size() - 1 );
666 fJob.jobTableList[tableNo].fFldRefs.push_back( fieldRef );
667 }
668 }
669
670 //------------------------------------------------------------------------------
671 // Set column information parms for an input field that is to be ignored
672 //------------------------------------------------------------------------------
setJobDataIgnoreField()673 void XMLJob::setJobDataIgnoreField( )
674 {
675 JobColumn curColumn;
676
677 int tableNo = fJob.jobTableList.size() - 1;
678 ostringstream oss;
679 oss << "IgnoreField" << fJob.jobTableList[tableNo].fFldRefs.size() + 1;
680 curColumn.colName = oss.str();
681
682 // Add to list of ignored fields
683 curColumn.fFldColRelation = BULK_FLDCOL_IGNORE_FIELD;
684 fJob.jobTableList[tableNo].fIgnoredFields.push_back( curColumn );
685
686 // Add to combined field list of columns and ignored fields
687 JobFieldRef fieldRef( BULK_FLDCOL_IGNORE_FIELD,
688 fJob.jobTableList[tableNo].fIgnoredFields.size() - 1 );
689 fJob.jobTableList[tableNo].fFldRefs.push_back ( fieldRef );
690 }
691
692 //------------------------------------------------------------------------------
693 // Initialize the saturation limits for the specified column.
694 //------------------------------------------------------------------------------
initSatLimits(JobColumn & curColumn) const695 void XMLJob::initSatLimits( JobColumn& curColumn ) const
696 {
697 // If one of the integer types, we set the min/max saturation value.
698 // For DECIMAL columns this will vary with the precision.
699 if ( curColumn.typeName ==
700 ColDataTypeStr[CalpontSystemCatalog::INT] )
701 {
702 curColumn.fMinIntSat = MIN_INT;
703 curColumn.fMaxIntSat = MAX_INT;
704 }
705 else if ( curColumn.typeName ==
706 ColDataTypeStr[CalpontSystemCatalog::UINT] )
707 {
708 curColumn.fMinIntSat = MIN_UINT;
709 curColumn.fMaxIntSat = MAX_UINT;
710 }
711 else if ( curColumn.typeName ==
712 ColDataTypeStr[CalpontSystemCatalog::BIGINT] )
713 {
714 curColumn.fMinIntSat = MIN_BIGINT;
715 curColumn.fMaxIntSat = MAX_BIGINT;
716 }
717 else if ( curColumn.typeName ==
718 ColDataTypeStr[CalpontSystemCatalog::UBIGINT] )
719 {
720 curColumn.fMinIntSat = MIN_UBIGINT;
721 curColumn.fMaxIntSat = MAX_UBIGINT;
722 }
723 else if ( curColumn.typeName ==
724 ColDataTypeStr[CalpontSystemCatalog::MEDINT] )
725 {
726 curColumn.fMinIntSat = MIN_MEDINT;
727 curColumn.fMaxIntSat = MAX_MEDINT;
728 }
729 else if ( curColumn.typeName ==
730 ColDataTypeStr[CalpontSystemCatalog::UMEDINT] )
731 {
732 curColumn.fMinIntSat = MIN_UMEDINT;
733 curColumn.fMaxIntSat = MAX_UMEDINT;
734 }
735 else if ( curColumn.typeName ==
736 ColDataTypeStr[CalpontSystemCatalog::SMALLINT] )
737 {
738 curColumn.fMinIntSat = MIN_SMALLINT;
739 curColumn.fMaxIntSat = MAX_SMALLINT;
740 }
741 else if ( curColumn.typeName ==
742 ColDataTypeStr[CalpontSystemCatalog::USMALLINT] )
743 {
744 curColumn.fMinIntSat = MIN_USMALLINT;
745 curColumn.fMaxIntSat = MAX_USMALLINT;
746 }
747 else if ( curColumn.typeName ==
748 ColDataTypeStr[CalpontSystemCatalog::TINYINT] )
749 {
750 curColumn.fMinIntSat = MIN_TINYINT;
751 curColumn.fMaxIntSat = MAX_TINYINT;
752 }
753 else if ( curColumn.typeName ==
754 ColDataTypeStr[CalpontSystemCatalog::UTINYINT] )
755 {
756 curColumn.fMinIntSat = MIN_UTINYINT;
757 curColumn.fMaxIntSat = MAX_UTINYINT;
758 }
759 else if ( curColumn.typeName ==
760 ColDataTypeStr[CalpontSystemCatalog::DECIMAL] )
761 {
762 curColumn.fMinIntSat = -columnstore_precision[curColumn.precision];
763 curColumn.fMaxIntSat = columnstore_precision[curColumn.precision];
764 }
765 else if ( curColumn.typeName ==
766 ColDataTypeStr[CalpontSystemCatalog::UDECIMAL] )
767 {
768 curColumn.fMinIntSat = 0;
769 curColumn.fMaxIntSat = columnstore_precision[curColumn.precision];
770 }
771 else if ( curColumn.typeName ==
772 ColDataTypeStr[CalpontSystemCatalog::FLOAT] )
773 {
774 curColumn.fMinDblSat = MIN_FLOAT;
775 curColumn.fMaxDblSat = MAX_FLOAT;
776 }
777 else if ( curColumn.typeName ==
778 ColDataTypeStr[CalpontSystemCatalog::UFLOAT] )
779 {
780 curColumn.fMinDblSat = 0.0;
781 curColumn.fMaxDblSat = MAX_FLOAT;
782 }
783 else if ( curColumn.typeName ==
784 ColDataTypeStr[CalpontSystemCatalog::DOUBLE] )
785 {
786 curColumn.fMinDblSat = MIN_DOUBLE;
787 curColumn.fMaxDblSat = MAX_DOUBLE;
788 }
789 else if ( curColumn.typeName ==
790 ColDataTypeStr[CalpontSystemCatalog::UDOUBLE] )
791 {
792 curColumn.fMinDblSat = 0.0;
793 curColumn.fMaxDblSat = MAX_DOUBLE;
794 }
795 }
796
797 //------------------------------------------------------------------------------
798 // Set Read Buffers attributes
799 // pNode - current node
800 //------------------------------------------------------------------------------
setReadBuffers(xmlNode * pNode)801 void XMLJob::setReadBuffers( xmlNode* pNode )
802 {
803 int intVal = 0;
804
805 if (getNodeAttribute(pNode,
806 xmlTagTable[TAG_NO_OF_READ_BUFFERS],
807 &intVal,
808 TYPE_INT ))
809 fJob.numberOfReadBuffers = intVal;
810
811 if (getNodeAttribute(pNode,
812 xmlTagTable[TAG_READ_BUFFER_SIZE],
813 &intVal,
814 TYPE_INT ))
815 fJob.readBufferSize = intVal;
816 }
817
818 //------------------------------------------------------------------------------
819 // Set Schema attributes
820 // pNode - current node
821 //------------------------------------------------------------------------------
setSchema(xmlNode * pNode)822 void XMLJob::setSchema( xmlNode* pNode )
823 {
824 std::string bufString;
825
826 if ( getNodeAttributeStr( pNode,
827 xmlTagTable[TAG_SCHEMA_NAME],
828 bufString ) )
829 fJob.schema = bufString;
830 }
831
832 //------------------------------------------------------------------------------
833 // Transfer any/all <DefaultColumn> columns from temporary fDefaultColumns, to
834 // the end of the column/field lists.
835 // It is assumed that we are working with the last table in jobTableList.
836 // Then get additional information from system catalog to finish populating
837 // our Job structs with all the table and column attributes we need.
838 //------------------------------------------------------------------------------
postProcessTableNode()839 void XMLJob::postProcessTableNode()
840 {
841 bool bValidateNoDefColWithoutDefValue = false;
842
843 if (fDefaultColumns.size() > 0)
844 {
845 bValidateNoDefColWithoutDefValue = true;
846 int tableNo = fJob.jobTableList.size() - 1;
847
848 for (unsigned k = 0; k < fDefaultColumns.size(); k++)
849 {
850 // Add to list of db columns to be loaded
851 fJob.jobTableList[tableNo].colList.push_back( fDefaultColumns[k] );
852
853 // Add to combined list of columns and ignored fields
854 JobFieldRef fieldRef( BULK_FLDCOL_COLUMN_DEFAULT,
855 fJob.jobTableList[tableNo].colList.size() - 1 );
856 fJob.jobTableList[tableNo].fFldRefs.push_back( fieldRef );
857 }
858
859 fDefaultColumns.clear();
860 }
861
862 // Supplement xml file contents with information from syscat
863 execplan::CalpontSystemCatalog::RIDList colRidList;
864 fillInXMLDataAsLoaded( colRidList );
865
866 // After getting all the system catalog information...
867 // Validate that if there are any <DefaultColumn> tags for a NotNull
868 // column, that the column is defined as NotNull With Default.
869 if (bValidateNoDefColWithoutDefValue)
870 {
871 int tableNo = fJob.jobTableList.size() - 1;
872
873 for (unsigned int iCol = 0;
874 iCol < fJob.jobTableList[tableNo].colList.size(); iCol++)
875 {
876 JobColumn& col = fJob.jobTableList[tableNo].colList[iCol];
877
878 if (col.fFldColRelation == BULK_FLDCOL_COLUMN_DEFAULT)
879 {
880 if ( (col.fNotNull) && (!col.fWithDefault) )
881 {
882 std::ostringstream oss;
883 oss << "Column " << col.colName << " in table " <<
884 fJob.jobTableList[tableNo].tblName << " is NotNull "
885 "w/o default; cannot be used with <DefaultColumn>";
886 throw std::runtime_error( oss.str() );
887 }
888 }
889 }
890 }
891
892 // Make sure all Columns in the DB are counted for with <Column> or
893 // <DefaultColumn> tags (unless validate is disabled)
894 if (fValidateColList)
895 validateAllColumnsHaveTags( colRidList );
896 }
897
898 //------------------------------------------------------------------------------
899 // Use the table and column names from the last <Table> just loaded, to
900 // collect the remaining information from the system catalog, in order to
901 // populate the JobColumn structure.
902 //------------------------------------------------------------------------------
fillInXMLDataAsLoaded(execplan::CalpontSystemCatalog::RIDList & colRidList)903 void XMLJob::fillInXMLDataAsLoaded(
904 execplan::CalpontSystemCatalog::RIDList& colRidList)
905 {
906 boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
907 execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(
908 BULK_SYSCAT_SESSION_ID);
909 cat->identity(execplan::CalpontSystemCatalog::EC);
910
911 // Get the table and column attributes for the last <Table> processed
912 unsigned int iTbl = fJob.jobTableList.size() - 1;
913 JobTable& tbl = fJob.jobTableList[iTbl];
914
915 std::string tblName;
916 string::size_type startName = tbl.tblName.rfind('.');
917
918 if (startName == string::npos)
919 tblName.assign( tbl.tblName );
920 else
921 tblName.assign( tbl.tblName.substr(startName + 1) );
922
923 execplan::CalpontSystemCatalog::TableName table(
924 fJob.schema, tblName );
925
926 if (fJob.jobTableList[iTbl].mapOid == 0)
927 {
928 execplan::CalpontSystemCatalog::OID tblOid =
929 cat->tableRID(table).objnum;
930 tbl.mapOid = tblOid;
931 }
932
933 // This call is made to improve performance.
934 // The call forces all the column information for this table to be
935 // cached at one time, instead of doing it piece-meal through repeated
936 // calls to lookupOID().
937 colRidList = cat->columnRIDs(table, true);
938
939 // Loop through the columns to get the column attributes
940 for (unsigned int iCol = 0;
941 iCol < fJob.jobTableList[iTbl].colList.size(); iCol++)
942 {
943 JobColumn& col = fJob.jobTableList[iTbl].colList[iCol];
944
945 if (col.mapOid == 0)
946 {
947 execplan::CalpontSystemCatalog::TableColName column;
948 column.schema = fJob.schema;
949 column.table = tblName;
950 column.column = col.colName;
951 execplan::CalpontSystemCatalog::OID colOid =
952 cat->lookupOID( column );
953
954 if (colOid < 0)
955 {
956 ostringstream oss;
957 oss << "Column OID lookup failed for: " << column;
958 throw runtime_error( oss.str() );
959 }
960
961 col.mapOid = colOid;
962
963 execplan::CalpontSystemCatalog::ColType colType =
964 cat->colType( col.mapOid );
965
966 col.width = colType.colWidth;
967 col.definedWidth = colType.colWidth;
968
969 if ((colType.scale > 0) ||
970 (colType.colDataType ==
971 execplan::CalpontSystemCatalog::DECIMAL) ||
972 (colType.colDataType ==
973 execplan::CalpontSystemCatalog::UDECIMAL))
974 {
975 col.precision = colType.precision;
976 col.scale = colType.scale;
977 }
978
979 col.typeName = ColDataTypeStr[colType.colDataType];
980 col.compressionType = colType.compressionType;
981 col.dctnry.fCompressionType = colType.compressionType;
982
983 if (colType.autoincrement)
984 col.autoIncFlag = true;
985 else
986 col.autoIncFlag = false;
987
988 // Initialize NotNull and Default Value (based on data type)
989 fillInXMLDataNotNullDefault( tbl.tblName, colType, col );
990
991 if (colType.ddn.dictOID > 0)
992 {
993 col.colType = 'D';
994 col.dctnryWidth = colType.colWidth;
995 col.dctnry.dctnryOid = colType.ddn.dictOID;
996 }
997
998 // @bug3801: For backwards compatability, we treat
999 // integer types with nonzero 0 scale as decimal if scale > 0
1000 if ( ((col.typeName ==
1001 ColDataTypeStr[CalpontSystemCatalog::INT]) ||
1002 (col.typeName ==
1003 ColDataTypeStr[CalpontSystemCatalog::BIGINT]) ||
1004 (col.typeName ==
1005 ColDataTypeStr[CalpontSystemCatalog::SMALLINT]) ||
1006 (col.typeName ==
1007 ColDataTypeStr[CalpontSystemCatalog::TINYINT])) &&
1008 (col.scale > 0) )
1009 {
1010 col.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL];
1011 }
1012
1013 // Initialize the saturation limits for this column
1014 initSatLimits( col );
1015 }
1016 } // end of loop through columns
1017 }
1018
1019 //------------------------------------------------------------------------------
1020 // Using information from the system catalog (in colType), fill in the
1021 // applicable NotNull Default values into the specified JobColumn.
1022 //------------------------------------------------------------------------------
fillInXMLDataNotNullDefault(const std::string & fullTblName,execplan::CalpontSystemCatalog::ColType & colType,JobColumn & col)1023 void XMLJob::fillInXMLDataNotNullDefault(
1024 const std::string& fullTblName,
1025 execplan::CalpontSystemCatalog::ColType& colType,
1026 JobColumn& col )
1027 {
1028 const std::string col_defaultValue(colType.defaultValue);
1029
1030 if (colType.constraintType ==
1031 execplan::CalpontSystemCatalog::NOTNULL_CONSTRAINT)
1032 {
1033 col.fNotNull = true;
1034
1035 if (!col_defaultValue.empty())
1036 col.fWithDefault = true;
1037 }
1038 else if (colType.constraintType ==
1039 execplan::CalpontSystemCatalog::DEFAULT_CONSTRAINT)
1040 {
1041 col.fWithDefault = true;
1042 }
1043
1044 if (col.fWithDefault)
1045 {
1046 bool bDefaultConvertError = false;
1047
1048 // Convert Default Value.
1049 // We go ahead and report basic format conversion error;
1050 // but we don't do complete validation (like checking to see
1051 // if the default is too large for the given integer type),
1052 // because we assume DDL is fully validating the default value.
1053 switch (colType.colDataType)
1054 {
1055 case execplan::CalpontSystemCatalog::BIT:
1056 case execplan::CalpontSystemCatalog::TINYINT:
1057 case execplan::CalpontSystemCatalog::SMALLINT:
1058 case execplan::CalpontSystemCatalog::MEDINT:
1059 case execplan::CalpontSystemCatalog::INT:
1060 case execplan::CalpontSystemCatalog::BIGINT:
1061 {
1062 errno = 0;
1063 col.fDefaultInt = strtoll(col_defaultValue.c_str(), 0, 10);
1064
1065 if (errno == ERANGE)
1066 bDefaultConvertError = true;
1067
1068 break;
1069 }
1070
1071 case execplan::CalpontSystemCatalog::UTINYINT:
1072 case execplan::CalpontSystemCatalog::USMALLINT:
1073 case execplan::CalpontSystemCatalog::UMEDINT:
1074 case execplan::CalpontSystemCatalog::UINT:
1075 case execplan::CalpontSystemCatalog::UBIGINT:
1076 {
1077 errno = 0;
1078 col.fDefaultUInt = strtoull(col_defaultValue.c_str(), 0, 10);
1079
1080 if (errno == ERANGE)
1081 bDefaultConvertError = true;
1082
1083 break;
1084 }
1085
1086 case execplan::CalpontSystemCatalog::DECIMAL:
1087 case execplan::CalpontSystemCatalog::UDECIMAL:
1088 {
1089 col.fDefaultInt = Convertor::convertDecimalString(
1090 col_defaultValue.c_str(),
1091 col_defaultValue.length(),
1092 colType.scale);
1093
1094 if (errno == ERANGE)
1095 bDefaultConvertError = true;
1096
1097 break;
1098 }
1099
1100 case execplan::CalpontSystemCatalog::DATE:
1101 {
1102 int convertStatus;
1103 int32_t dt =
1104 dataconvert::DataConvert::convertColumnDate(
1105 col_defaultValue.c_str(),
1106 dataconvert::CALPONTDATE_ENUM, convertStatus,
1107 col_defaultValue.length() );
1108
1109 if (convertStatus != 0)
1110 bDefaultConvertError = true;
1111
1112 col.fDefaultInt = dt;
1113 break;
1114 }
1115
1116 case execplan::CalpontSystemCatalog::DATETIME:
1117 {
1118 int convertStatus;
1119 int64_t dt =
1120 dataconvert::DataConvert::convertColumnDatetime(
1121 col_defaultValue.c_str(),
1122 dataconvert::CALPONTDATETIME_ENUM, convertStatus,
1123 col_defaultValue.length() );
1124
1125 if (convertStatus != 0)
1126 bDefaultConvertError = true;
1127
1128 col.fDefaultInt = dt;
1129 break;
1130 }
1131
1132 case execplan::CalpontSystemCatalog::TIMESTAMP:
1133 {
1134 int convertStatus;
1135 int64_t dt =
1136 dataconvert::DataConvert::convertColumnTimestamp(
1137 col_defaultValue.c_str(),
1138 dataconvert::CALPONTDATETIME_ENUM, convertStatus,
1139 col_defaultValue.length(), fTimeZone );
1140
1141 if (convertStatus != 0)
1142 bDefaultConvertError = true;
1143
1144 col.fDefaultInt = dt;
1145 break;
1146 }
1147
1148 case execplan::CalpontSystemCatalog::TIME:
1149 {
1150 int convertStatus;
1151 int64_t dt =
1152 dataconvert::DataConvert::convertColumnTime(
1153 col_defaultValue.c_str(),
1154 dataconvert::CALPONTTIME_ENUM, convertStatus,
1155 col_defaultValue.length() );
1156
1157 if (convertStatus != 0)
1158 bDefaultConvertError = true;
1159
1160 col.fDefaultInt = dt;
1161 break;
1162 }
1163
1164 case execplan::CalpontSystemCatalog::FLOAT:
1165 case execplan::CalpontSystemCatalog::DOUBLE:
1166 case execplan::CalpontSystemCatalog::UFLOAT:
1167 case execplan::CalpontSystemCatalog::UDOUBLE:
1168 {
1169 errno = 0;
1170 col.fDefaultDbl = strtod(col_defaultValue.c_str(), 0);
1171
1172 if (errno == ERANGE)
1173 bDefaultConvertError = true;
1174
1175 break;
1176 }
1177
1178 default:
1179 {
1180 col.fDefaultChr = col_defaultValue;
1181 break;
1182 }
1183 }
1184
1185 if (bDefaultConvertError)
1186 {
1187 std::ostringstream oss;
1188 oss << "Column " << col.colName << " in table " << fullTblName <<
1189 " has an invalid default value in system catalog.";
1190 throw std::runtime_error( oss.str() );
1191 }
1192 }
1193 }
1194
1195 //------------------------------------------------------------------------------
1196 // Use the table and column names from the last <Table> just loaded, to
1197 // validate that all the columns have a <Column> or <DefaultColumn> tag
1198 // present in the job XML file.
1199 //------------------------------------------------------------------------------
validateAllColumnsHaveTags(const execplan::CalpontSystemCatalog::RIDList & colRidList) const1200 void XMLJob::validateAllColumnsHaveTags(
1201 const execplan::CalpontSystemCatalog::RIDList& colRidList) const
1202 {
1203 // Validate column list for the last <Table> processed
1204 unsigned int iTbl = fJob.jobTableList.size() - 1;
1205 const JobTable& tbl = fJob.jobTableList[iTbl];
1206
1207 std::string tblName;
1208 string::size_type startName = tbl.tblName.rfind('.');
1209
1210 if (startName == string::npos)
1211 tblName.assign( tbl.tblName );
1212 else
1213 tblName.assign( tbl.tblName.substr(startName + 1) );
1214
1215 try
1216 {
1217 // Loop through column tags, saving col OIDs to a std::set for lookups
1218 std::set<execplan::CalpontSystemCatalog::OID> colOIDList;
1219 typedef std::set<execplan::CalpontSystemCatalog::OID>::iterator SetIter;
1220 std::pair<SetIter, bool> retVal;
1221
1222 for (unsigned int iCol = 0;
1223 iCol < fJob.jobTableList[iTbl].colList.size(); iCol++)
1224 {
1225 const JobColumn& col = fJob.jobTableList[iTbl].colList[iCol];
1226 retVal = colOIDList.insert( col.mapOid );
1227
1228 if (!retVal.second)
1229 {
1230 boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
1231 execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(
1232 BULK_SYSCAT_SESSION_ID);
1233 cat->identity(execplan::CalpontSystemCatalog::EC);
1234
1235 execplan::CalpontSystemCatalog::TableColName dbColName =
1236 cat->colName( col.mapOid );
1237 std::ostringstream oss;
1238 oss << "Column " << dbColName.column << " referenced in Job XML"
1239 " file more than once.";
1240 throw std::runtime_error( oss.str() );
1241 }
1242 }
1243
1244 SetIter pos;
1245
1246 // Loop thru cols in system catalog and verify that each one has a tag
1247 execplan::CalpontSystemCatalog::RIDList::const_iterator rid_iterator =
1248 colRidList.begin();
1249
1250 while (rid_iterator != colRidList.end())
1251 {
1252 pos = colOIDList.find( rid_iterator->objnum );
1253
1254 if (pos != colOIDList.end())
1255 {
1256 colOIDList.erase( pos ); // through with this column, so delete
1257 }
1258 else
1259 {
1260 boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
1261 execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(
1262 BULK_SYSCAT_SESSION_ID);
1263 cat->identity(execplan::CalpontSystemCatalog::EC);
1264
1265 execplan::CalpontSystemCatalog::TableColName dbColName =
1266 cat->colName( rid_iterator->objnum );
1267 std::ostringstream oss;
1268 oss << "No tag present in Job XML file for DB column: " <<
1269 dbColName.column;
1270 throw std::runtime_error( oss.str() );
1271 }
1272
1273 ++rid_iterator;
1274 }
1275 }
1276 catch (std::exception& ex)
1277 {
1278 std::ostringstream oss;
1279 oss << "Error validating column list for table " <<
1280 fJob.schema << '.' << tblName << "; " << ex.what();
1281 throw std::runtime_error( oss.str() );
1282 }
1283 catch (...)
1284 {
1285 std::ostringstream oss;
1286 oss << "Unknown Error validating column list for table " <<
1287 fJob.schema << '.' << tblName;
1288 throw std::runtime_error( oss.str() );
1289 }
1290 }
1291
1292 //------------------------------------------------------------------------------
1293 // Generate a permanent or temporary Job XML file name path.
1294 // sXMLJobDir Command line override for complete Job directory path
1295 // jobDIr Job subdirectory under default <BulkRoot> path
1296 // jobId Job ID
1297 // bTempFile Are we creating a temporary Job Xml File
1298 // schmaName If temp file, this is schema name to use
1299 // tableName If temp file, this is the table name to use
1300 // xmlDirPath The complete Job XML file path that is constructed
1301 // errMsg Relevant error message if return value is not NO_ERROR.
1302 //------------------------------------------------------------------------------
1303 /* static */
genJobXMLFileName(const string & sXMLJobDir,const string & jobDir,const string & jobId,bool bTempFile,const string & schemaName,const string & tableName,boost::filesystem::path & xmlFilePath,string & errMsg,std::string & tableOIDStr)1304 int XMLJob::genJobXMLFileName(
1305 const string& sXMLJobDir,
1306 const string& jobDir,
1307 const string& jobId,
1308 bool bTempFile,
1309 const string& schemaName,
1310 const string& tableName,
1311 boost::filesystem::path& xmlFilePath,
1312 string& errMsg,
1313 std::string& tableOIDStr )
1314 {
1315 // get full file directory path for XML job description file
1316 if (sXMLJobDir.empty())
1317 {
1318 xmlFilePath = Config::getBulkRoot();
1319 xmlFilePath /= jobDir;
1320 }
1321 else
1322 {
1323 xmlFilePath = sXMLJobDir;
1324
1325 //If filespec doesn't begin with a '/' (i.e. it's not an absolute path),
1326 // attempt to make it absolute so that we can log the full pathname.
1327 if (!xmlFilePath.has_root_path())
1328 {
1329 #ifdef _MSC_VER
1330 // nothing else to do
1331 #else
1332 char cwdPath[4096];
1333 char *err;
1334 err = getcwd(cwdPath, sizeof(cwdPath));
1335 if (err == NULL)
1336 {
1337 errMsg = "Failed to get the current working directory.";
1338 return -1;
1339 }
1340 string trailingPath(xmlFilePath.string());
1341 xmlFilePath = cwdPath;
1342 xmlFilePath /= trailingPath;
1343 #endif
1344 }
1345 }
1346
1347 // Append the file name to the directory path
1348 string jobFileName;
1349
1350 if (bTempFile)
1351 {
1352 // Create tmp directory if does not exist
1353 RETURN_ON_ERROR( createTempJobDir( xmlFilePath.string(), errMsg ) );
1354 jobFileName += tableOIDStr;
1355 //jobFileName += schemaName;
1356 // jobFileName += '_';
1357 // jobFileName += tableName;
1358 jobFileName += "_D";
1359
1360 string now(boost::posix_time::to_iso_string( boost::posix_time::second_clock::local_time()));
1361
1362 // microseconds
1363 struct timeval tp;
1364 gettimeofday(&tp, 0);
1365 ostringstream usec;
1366 usec << setfill('0') << setw(6) << tp.tv_usec;
1367
1368 jobFileName += now.substr(0, 8);
1369 jobFileName += "_T";
1370 jobFileName += now.substr(9, 6);
1371 jobFileName += "_S";
1372 jobFileName += usec.str();
1373 jobFileName += '_';
1374 }
1375
1376 jobFileName += "Job_";
1377 jobFileName += jobId;
1378 jobFileName += ".xml";
1379
1380 xmlFilePath /= jobFileName;
1381
1382 return NO_ERROR;
1383 }
1384
1385 //------------------------------------------------------------------------------
1386 // Create directory for temporary XML job description files.
1387 // OAM restart should delete any/all files in this directory.
1388 //------------------------------------------------------------------------------
1389 /* static */
createTempJobDir(const string & xmlFilePath,string & errMsg)1390 int XMLJob::createTempJobDir( const string& xmlFilePath,
1391 string& errMsg )
1392 {
1393 boost::filesystem::path pathDir(xmlFilePath);
1394
1395 // create temp directory for XML job file if it does not exist
1396 try
1397 {
1398 if ( !boost::filesystem::exists( xmlFilePath ) )
1399 {
1400 string boostErrString;
1401
1402 try
1403 {
1404 boost::filesystem::create_directories(pathDir);
1405 }
1406 catch (exception& ex)
1407 {
1408 // ignore exception for now; we may have just had a
1409 // race condition where 2 jobs were creating dirs.
1410 boostErrString = ex.what();
1411 }
1412
1413 if ( !boost::filesystem::exists( xmlFilePath ) )
1414 {
1415 ostringstream oss;
1416 oss << "Error creating XML temp job file directory(1) " <<
1417 xmlFilePath << "; " << boostErrString;
1418 errMsg = oss.str();
1419
1420 return ERR_DIR_CREATE;
1421 }
1422 }
1423 }
1424 catch (exception& ex)
1425 {
1426 ostringstream oss;
1427 oss << "Error creating XML temp job file directory(2) " <<
1428 xmlFilePath << "; " << ex.what();
1429 errMsg = oss.str();
1430
1431 return ERR_DIR_CREATE;
1432 }
1433
1434 if (!boost::filesystem::is_directory(pathDir) )
1435 {
1436 ostringstream oss;
1437 oss << "Error creating XML temp job file directory " <<
1438 xmlFilePath << "; path already exists as non-directory" << endl;
1439 errMsg = oss.str();
1440
1441 return ERR_DIR_CREATE;
1442 }
1443
1444 return NO_ERROR;
1445 }
1446
1447 } //end of namespace
1448
1449