1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*******************************************************************************
19 * $Id: we_xmljob.cpp 4579 2013-03-19 23:16:54Z dhall $
20 *
21 *******************************************************************************/
22 /** @file */
23 
24 #define WRITEENGINEXMLJOB_DLLEXPORT
25 #include "we_xmljob.h"
26 #undef WRITEENGINEXMLJOB_DLLEXPORT
27 
28 #include <limits>
29 #include <sstream>
30 #include <unistd.h>
31 #include <stdexcept>
32 #include <cstdlib>
33 #include <set>
34 #include "we_config.h"
35 #include "we_log.h"
36 #include "we_convertor.h"
37 #include "dataconvert.h"
38 #include <boost/date_time/posix_time/posix_time.hpp>
39 #include <boost/filesystem/path.hpp>
40 #include <boost/filesystem/convenience.hpp>
41 
42 #include <sys/time.h>
43 
44 using namespace std;
45 using namespace execplan;
46 
47 
48 namespace WriteEngine
49 {
50 // Maximum saturation value for DECIMAL types based on precision
51 const long long columnstore_precision[19] =
52 {
53     0,
54     9,
55     99,
56     999,
57     9999,
58     99999,
59     999999,
60     9999999,
61     99999999,
62     999999999,
63     9999999999LL,
64     99999999999LL,
65     999999999999LL,
66     9999999999999LL,
67     99999999999999LL,
68     999999999999999LL,
69     9999999999999999LL,
70     99999999999999999LL,
71     999999999999999999LL
72 };
73 
74 //------------------------------------------------------------------------------
75 // Constructor
76 //------------------------------------------------------------------------------
XMLJob()77 XMLJob::XMLJob( ) : fDebugLevel( DEBUG_0 ),
78     fDeleteTempFile(false),
79     fValidateColList(true),
80     fTimeZone("SYSTEM")
81 {
82 }
83 
84 //------------------------------------------------------------------------------
85 // Default Destructor
86 // Delete temporary Job XML file if applicable.
87 //------------------------------------------------------------------------------
~XMLJob()88 XMLJob::~XMLJob()
89 {
90     if ((fDeleteTempFile) && (!fJobFileName.empty()))
91     {
92         unlink( fJobFileName.c_str() );
93     }
94 }
95 
96 //------------------------------------------------------------------------------
97 // Load a job xml file
98 // fileName - name of file to load
99 // bTempFile - are we loading a temporary file (that destructor should delete)
100 // bValidateColumnList - validate that all db columns have an XML tag
101 // returns NO_ERROR if success; other if fail
102 //------------------------------------------------------------------------------
loadJobXmlFile(const string & fileName,bool bTempFile,bool bValidateColumnList,string & errMsg)103 int XMLJob::loadJobXmlFile( const string& fileName,
104                             bool bTempFile,
105                             bool bValidateColumnList,
106                             string& errMsg )
107 {
108     int rc;
109 
110     fDeleteTempFile = bTempFile;
111     fJobFileName    = fileName;
112     fValidateColList = bValidateColumnList;
113 
114     try
115     {
116         rc = parseDoc( fileName.c_str() );
117 
118         if (rc != NO_ERROR)
119             return rc;
120     }
121     catch (exception& ex)
122     {
123         errMsg = ex.what();
124         return ERR_XML_PARSE;
125     }
126 
127     return rc;
128 }
129 
130 //------------------------------------------------------------------------------
131 // Print contents of fJob to the specified logger object.
132 // logger - Log object to use in logging
133 //------------------------------------------------------------------------------
printJobInfo(Log & logger) const134 void XMLJob::printJobInfo( Log& logger ) const
135 {
136     const Job& job = fJob;
137 
138     ostringstream oss1;
139     oss1 << "Job " << job.id << " input\n";
140     oss1 << "===============================================" << endl;
141     oss1 << "Name : " << job.name << endl;
142     oss1 << "Desc : " << job.desc << endl;
143     oss1 << "User : " << job.userName << endl;
144     oss1 << "Delim: " << job.fDelimiter << endl;
145     oss1 << "Enclosed By : ";
146 
147     if (job.fEnclosedByChar)
148         oss1 << job.fEnclosedByChar << endl;
149     else
150         oss1 << "n/a" << endl;
151 
152     oss1 << "Escape Char : ";
153 
154     if (job.fEscapeChar)
155         oss1 << job.fEscapeChar << endl;
156     else
157         oss1 << "n/a" << endl;
158 
159     oss1 << "Read Buffers:     " << job.numberOfReadBuffers << endl;
160     oss1 << "Read Buffer Size: " << job.readBufferSize << endl;
161     oss1 << "setvbuf Size: " << job.writeBufferSize << endl;
162     oss1 << "Create Date : " << job.createDate << endl;
163     oss1 << "Create Time : " << job.createTime << endl;
164     oss1 << "Schema Name : " << job.schema << endl;
165 
166     oss1 << "Num Tables  : " << job.jobTableList.size() << endl;
167     logger.logMsg( oss1.str(), MSGLVL_INFO2 );
168 
169     for ( unsigned int i = 0; i < job.jobTableList.size(); i++ )
170     {
171         const JobTable& jobTable = job.jobTableList[i];
172         ostringstream oss2;
173         oss2 << "\n-------------------------------------------------" << endl;
174         oss2 << "\tTable Name      : " << jobTable.tblName << endl;
175         oss2 << "\tTable OID       : " << jobTable.mapOid << endl;
176         oss2 << "\tTable Load Name : " << jobTable.loadFileName <<
177              endl;
178         oss2 << "\tMax Err Num     : " << jobTable.maxErrNum << endl;
179 
180         const JobColList& colList = jobTable.colList;
181 
182         oss2 << "\tNum of Columns  : " << colList.size() << endl;
183         logger.logMsg( oss2.str(), MSGLVL_INFO2 );
184 
185         // Note that we don't print JobColumn.dataType because it is not carried
186         // in the XML file.  dataType is assigned/used internally by bulkload.
187         for ( unsigned int j = 0; j < jobTable.fFldRefs.size(); j++ )
188         {
189             unsigned idx            = jobTable.fFldRefs[j].fArrayIndex;
190             BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType;
191             const JobColumn& jobCol = ((fldColType == BULK_FLDCOL_IGNORE_FIELD) ?
192                                        jobTable.fIgnoredFields[idx] :
193                                        jobTable.colList[idx] );
194             ostringstream oss3;
195             oss3 << "\n\t****************************************" << endl;
196 
197             if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT)
198                 oss3 << "\t\tDefaultColumn Name: " << jobCol.colName << endl;
199             else
200                 oss3 << "\t\tColumn Name       : " << jobCol.colName << endl;
201 
202             oss3 << "\t\tColumn OID        : " << jobCol.mapOid << endl;
203             oss3 << "\t\tColumn type name  : " << jobCol.typeName << endl;
204             oss3 << "\t\tColumn width      : " << jobCol.width << endl;
205             oss3 << "\t\tColumn Not Null   : " << jobCol.fNotNull << endl;
206             oss3 << "\t\tColumn WithDefault: " << jobCol.fWithDefault << endl;
207             oss3 << "\t\tColumn type       : " << jobCol.colType << endl;
208             oss3 << "\t\tColumn comp type  : " << jobCol.compressionType << endl;
209             oss3 << "\t\tColumn autoInc    : " << jobCol.autoIncFlag << endl;
210 
211             if ( jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::DECIMAL] )
212             {
213                 oss3 << "\t\tColumn Precision  : " << jobCol.precision << endl;
214                 oss3 << "\t\tColumn Scale      : " << jobCol.scale << endl;
215             }
216 
217             if ( jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL] )
218             {
219                 oss3 << "\t\tColumn Precision  : " << jobCol.precision << endl;
220                 oss3 << "\t\tColumn Scale      : " << jobCol.scale << endl;
221             }
222 
223             if ( jobCol.colType == 'D' )
224             {
225                 oss3 << "\t\tDictionary Oid    : " <<
226                      jobCol.dctnry.dctnryOid << endl;
227             }
228 
229             logger.logMsg( oss3.str(), MSGLVL_INFO2 );
230         } // end of loop through columns in a table
231     } // end of loop through tables
232 }
233 
234 //------------------------------------------------------------------------------
235 // Print brief contents of specified Job to specified logger object.
236 // logger - Log object to use in logging
237 //------------------------------------------------------------------------------
printJobInfoBrief(Log & logger) const238 void XMLJob::printJobInfoBrief( Log& logger ) const
239 {
240     const Job& job = fJob;
241 
242     ostringstream oss1;
243     oss1 << "XMLJobFile: Delim(" << job.fDelimiter << "); EnclosedBy(";
244 
245     if (job.fEnclosedByChar)
246         oss1 << job.fEnclosedByChar;
247     else
248         oss1 << "n/a";
249 
250     oss1 << "); EscapeChar(";
251 
252     if (job.fEscapeChar)
253         oss1 << job.fEscapeChar;
254     else
255         oss1 << "n/a";
256 
257     oss1 << "); ReadBufs("    << job.numberOfReadBuffers <<
258          "); ReadBufSize(" << job.readBufferSize      <<
259          "); setvbufSize(" << job.writeBufferSize     << ')';
260     logger.logMsg( oss1.str(), MSGLVL_INFO2 );
261 
262     for ( unsigned int i = 0; i < job.jobTableList.size(); i++ )
263     {
264         const JobTable& jobTable = job.jobTableList[i];
265         ostringstream oss2;
266         oss2 << "  Table(" << jobTable.tblName <<
267              "); OID("  << jobTable.mapOid  << ')' <<
268              "; MaxErrNum(" << jobTable.maxErrNum << ')';
269         logger.logMsg( oss2.str(), MSGLVL_INFO2 );
270 
271         for ( unsigned int j = 0; j < jobTable.fFldRefs.size(); j++ )
272         {
273             unsigned idx            = jobTable.fFldRefs[j].fArrayIndex;
274             BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType;
275             const JobColumn& jobCol = ((fldColType == BULK_FLDCOL_IGNORE_FIELD) ?
276                                        jobTable.fIgnoredFields[idx] :
277                                        jobTable.colList[idx]);
278             ostringstream oss3;
279 
280             if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT)
281                 oss3 << "    DefaultColumn(" << jobCol.colName;
282             else
283                 oss3 << "    Column("        << jobCol.colName;
284 
285             oss3 << "); OID("     << jobCol.mapOid   <<
286                  "); Type("    << jobCol.typeName <<
287                  "); Width("   << jobCol.width    <<
288                  "); Comp("    << jobCol.compressionType;
289 
290             if ( jobCol.colType == 'D' )
291                 oss3 << "); DctnryOid(" << jobCol.dctnry.dctnryOid;
292 
293             oss3 << ')';
294 
295             if (jobCol.autoIncFlag)
296                 oss3 << "; autoInc";
297 
298             if (jobCol.fNotNull)
299                 oss3 << "; NotNull";
300 
301             if (jobCol.fWithDefault)
302                 oss3 << "; WithDefault";
303 
304             logger.logMsg( oss3.str(), MSGLVL_INFO2 );
305         }
306     } // end of for( int i
307 }
308 
309 //------------------------------------------------------------------------------
310 // Process a node
311 // pNode - current node
312 // returns TRUE if success, FALSE otherwise
313 //------------------------------------------------------------------------------
processNode(xmlNode * pNode)314 bool XMLJob::processNode( xmlNode* pNode )
315 {
316     if ( isTag( pNode, TAG_BULK_JOB ))
317     {
318         // no work for the BulkJob tag
319     }
320     else if ( isTag( pNode, TAG_CREATE_DATE ))
321         setJobData( pNode, TAG_CREATE_DATE, true, TYPE_CHAR );
322     else if ( isTag( pNode, TAG_CREATE_TIME ))
323         setJobData( pNode, TAG_CREATE_TIME, true, TYPE_CHAR );
324     else if ( isTag( pNode, TAG_COLUMN ))
325         setJobData( pNode, TAG_COLUMN, false, TYPE_EMPTY );
326     else if ( isTag( pNode, TAG_DEFAULT_COLUMN ))
327         setJobData( pNode, TAG_DEFAULT_COLUMN, false, TYPE_EMPTY );
328     else if ( isTag( pNode, TAG_DESC ))
329         setJobData( pNode, TAG_DESC, true, TYPE_CHAR );
330     else if ( isTag( pNode, TAG_ID ))
331         setJobData( pNode, TAG_ID, true, TYPE_INT );
332     else if ( isTag( pNode, TAG_IGNORE_FIELD ))
333         setJobData( pNode, TAG_IGNORE_FIELD, false, TYPE_EMPTY );
334     else if ( isTag( pNode, TAG_NAME ))
335         setJobData( pNode, TAG_NAME, true, TYPE_CHAR );
336     else if ( isTag( pNode, TAG_PATH ))
337         setJobData( pNode, TAG_PATH, true, TYPE_CHAR );
338     else if ( isTag( pNode, TAG_TABLE ))
339         setJobData( pNode, TAG_TABLE, false, TYPE_EMPTY );
340     else if ( isTag( pNode, TAG_TYPE ))
341         setJobData( pNode, TAG_TYPE, true, TYPE_CHAR );
342     else if ( isTag( pNode, TAG_USER ))
343         setJobData( pNode, TAG_USER, true, TYPE_CHAR );
344     else if ( isTag( pNode, TAG_SCHEMA))
345         setJobData( pNode, TAG_SCHEMA, false, TYPE_EMPTY );
346     else if ( isTag( pNode, TAG_READ_BUFFERS))
347         setJobData( pNode, TAG_READ_BUFFERS, false, TYPE_EMPTY );
348     else if ( isTag( pNode, TAG_WRITE_BUFFER_SIZE))
349         setJobData( pNode, TAG_WRITE_BUFFER_SIZE, true, TYPE_INT);
350     else if ( isTag( pNode, TAG_DELIMITER))
351         setJobData( pNode, TAG_DELIMITER, true, TYPE_CHAR);
352     else if ( isTag( pNode, TAG_ENCLOSED_BY_CHAR))
353         setJobData( pNode, TAG_ENCLOSED_BY_CHAR, true, TYPE_CHAR);
354     else if ( isTag( pNode, TAG_ESCAPE_CHAR))
355         setJobData( pNode, TAG_ESCAPE_CHAR, true, TYPE_CHAR);
356     else
357     {
358         ostringstream oss;
359         oss << "Unrecognized TAG in Job XML file: <" << pNode->name << ">";
360         throw runtime_error( oss.str() );
361     }
362 
363     if (XMLOp::processNode( pNode ))
364     {
365         if ( isTag( pNode, TAG_TABLE ))
366         {
367             postProcessTableNode();
368         }
369     }
370     else
371     {
372         return false;
373     }
374 
375     return true;
376 }
377 
378 //------------------------------------------------------------------------------
379 // Generic setter
380 // pNode - current node
381 // tag - xml tag
382 // bExpectContent - should node content be present to process
383 // tagType - data type
384 //------------------------------------------------------------------------------
setJobData(xmlNode * pNode,const xmlTag tag,bool bExpectContent,XML_DTYPE tagType)385 void XMLJob::setJobData( xmlNode* pNode,
386                          const xmlTag tag,
387                          bool  bExpectContent,
388                          XML_DTYPE tagType )
389 {
390     int         intVal = 0;
391     long long   llVal = 0;
392     std::string bufString;
393     bool        bSuccess = false;
394 
395     if (bExpectContent)
396     {
397         if ( tagType == TYPE_INT )
398             bSuccess = getNodeContent( pNode, &intVal, TYPE_INT );
399         else // longlong
400             if ( tagType == TYPE_LONGLONG )
401                 bSuccess = getNodeContent( pNode, &llVal, TYPE_LONGLONG );
402             else // char
403                 if ( tagType == TYPE_CHAR )
404                     bSuccess = getNodeContentStr( pNode, bufString );
405 
406         if (!bSuccess)
407             return;
408     }
409 
410     // process tag content and attributes
411     switch ( tag )
412     {
413         case  TAG_READ_BUFFERS:
414             setReadBuffers( pNode );
415             break;
416 
417         case  TAG_COLUMN:
418             setJobDataColumn( pNode, false );
419             break;
420 
421         case  TAG_CREATE_DATE:
422             fJob.createDate = bufString;
423             break;
424 
425         case  TAG_CREATE_TIME:
426             fJob.createTime = bufString;
427             break;
428 
429         case  TAG_DEFAULT_COLUMN:
430             setJobDataColumn( pNode, true );
431             break;
432 
433         case  TAG_DESC:
434             fJob.desc = bufString;
435             break;
436 
437         case  TAG_ID:
438             fJob.id = intVal;
439             break;
440 
441         case  TAG_IGNORE_FIELD:
442             setJobDataIgnoreField( );
443             break;
444 
445         case  TAG_NAME:
446             fJob.name = bufString;
447             break;
448 
449         case  TAG_PATH:
450             // no action necessary, but keep for backwards compatability
451             break;
452 
453         case  TAG_TABLE:
454             setJobDataTable( pNode );
455             break;
456 
457         case  TAG_TYPE:
458             // no action necessary, but keep for backwards compatability
459             break;
460 
461         case  TAG_USER:
462             fJob.userName = bufString;
463             break;
464 
465         case  TAG_SCHEMA:
466             setSchema( pNode );
467             break;
468 
469         case TAG_WRITE_BUFFER_SIZE:
470             fJob.writeBufferSize  = intVal;
471             break;
472 
473         case TAG_DELIMITER:
474         {
475             const char* buf = bufString.c_str();
476 
477             if ((!strcmp(buf, "\\t")) ||
478                     (!strcmp(buf, "'\\t'")))
479             {
480                 fJob.fDelimiter = '\t';
481             }
482             else
483             {
484                 fJob.fDelimiter = bufString[0];
485             }
486 
487             break;
488         }
489 
490         case TAG_ENCLOSED_BY_CHAR:
491         {
492             fJob.fEnclosedByChar = bufString[0];
493             break;
494         }
495 
496         case TAG_ESCAPE_CHAR:
497         {
498             fJob.fEscapeChar = bufString[0];
499             break;
500         }
501 
502         default:
503             break;
504     }
505 }
506 
507 //------------------------------------------------------------------------------
508 // Set table information parms.
509 // pNode - current node
510 //------------------------------------------------------------------------------
setJobDataTable(xmlNode * pNode)511 void XMLJob::setJobDataTable( xmlNode* pNode )
512 {
513     int         intVal;
514     std::string bufString;
515     JobTable    curTable;
516 
517     if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_ORIG_NAME], bufString ) )
518         curTable.tblName = bufString;
519 
520     if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_TBL_NAME], bufString ) )
521         curTable.tblName = bufString;
522 
523     if (curTable.tblName.empty())
524     {
525         throw runtime_error(
526             "Required table name attribute (tblName) missing from Table tag");
527     }
528 
529     if ( getNodeAttribute( pNode, xmlTagTable[TAG_TBL_OID], &intVal, TYPE_INT ) )
530         curTable.mapOid = intVal;
531 
532     if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_LOAD_NAME], bufString ) )
533         curTable.loadFileName = bufString;
534 
535     if ( getNodeAttribute( pNode, xmlTagTable[TAG_MAX_ERR_ROW], &intVal,
536                            TYPE_INT))
537         curTable.maxErrNum = intVal;
538 
539     fJob.jobTableList.push_back( curTable );
540 }
541 
542 //------------------------------------------------------------------------------
543 // Set column information parms.
544 // pNode - current node
545 // bDefaultCol - is this a <DefaultColumn> tag
546 //
547 // Note on Supported Tags: (Bug 2828)
548 // Note that the "notnull" and "defaultValue" attribute tags are not recognized
549 // by this function because by the time we added support for these tags, we had
550 // changed to only store the table and column names in the XML file.  Much of
551 // the functionality in setJobDataColumn() is only present to provide backwards
552 // compatability for an old Job XML file that a user might still be using.
553 //
554 // Any other new tags probably don't need adding to setJobDataColumn() either,
555 // for the same reason.
556 //------------------------------------------------------------------------------
setJobDataColumn(xmlNode * pNode,bool bDefaultCol)557 void XMLJob::setJobDataColumn( xmlNode* pNode, bool bDefaultCol )
558 {
559     int         intVal;
560     std::string bufString;
561     JobColumn   curColumn;
562 
563     if ( fJob.jobTableList.size() == 0 )
564         return;
565 
566     int tableNo = fJob.jobTableList.size() - 1;
567 
568     if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_ORIG_NAME], bufString ) )
569         curColumn.colName = bufString;
570 
571     if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_COL_NAME], bufString ) )
572         curColumn.colName = bufString;
573 
574     if (curColumn.colName.empty())
575     {
576         ostringstream oss;
577         oss << "Required column name attribute (colName) missing from "
578             "Column tag for table " <<
579             fJob.jobTableList[tableNo].tblName;
580         throw runtime_error( oss.str() );
581     }
582 
583     if ( getNodeAttribute( pNode, xmlTagTable[TAG_COL_OID], &intVal, TYPE_INT ) )
584         curColumn.mapOid = intVal;
585 
586     if ( getNodeAttribute( pNode, xmlTagTable[TAG_WIDTH], &intVal, TYPE_INT ) )
587     {
588         curColumn.width = intVal;
589         curColumn.definedWidth = intVal; //@Bug 3040
590     }
591 
592     if ( getNodeAttribute( pNode, xmlTagTable[TAG_PRECISION], &intVal, TYPE_INT))
593         curColumn.precision = intVal;
594 
595     if ( getNodeAttribute( pNode, xmlTagTable[TAG_SCALE], &intVal, TYPE_INT ) )
596         curColumn.scale = intVal;
597 
598     if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_DATA_TYPE], bufString ) )
599         curColumn.typeName = bufString;
600 
601     if ( getNodeAttribute( pNode, xmlTagTable[TAG_COMPRESS_TYPE], &intVal,
602                            TYPE_INT))
603     {
604         curColumn.compressionType = intVal;
605         curColumn.dctnry.fCompressionType = intVal;
606     }
607 
608     if ( getNodeAttribute( pNode, xmlTagTable[TAG_AUTOINCREMENT_FLAG],
609                            &intVal, TYPE_INT))
610     {
611         if (intVal)
612             curColumn.autoIncFlag = true;
613         else
614             curColumn.autoIncFlag = false;
615     }
616 
617     if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_COL_TYPE], bufString ) )
618     {
619         const char* buf = bufString.c_str();
620 
621         if ( !strcmp( buf, "D" ) )
622         {
623             curColumn.colType = 'D';
624 
625             // @Bug 2565: Retain dictionary width to use in truncating strings,
626             // since BulkLoad eventually stores column token width in 'width'.
627             curColumn.dctnryWidth = curColumn.width;
628 
629             if ( getNodeAttribute( pNode,
630                                    xmlTagTable[TAG_DVAL_OID],
631                                    &intVal,
632                                    TYPE_INT ) )
633                 curColumn.dctnry.dctnryOid = intVal;
634         }
635     }
636 
637     // This is a workaround that DBBuilder can not pass decimal type to XML file
638     if ( ( curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::INT] ||
639             curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT] ||
640             curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT] ||
641             curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT]) &&
642             curColumn.scale > 0 )
643         curColumn.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL];
644 
645     // end of workaround
646 
647     // Initialize the saturation limits for this column
648     initSatLimits( curColumn );
649 
650     // Save default columns in separate list, so that we can intentionally
651     // add/keep them at the "end" of colList later, after all other columns.
652     if (bDefaultCol) // temporarily save in separate list
653     {
654         curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_DEFAULT;
655         fDefaultColumns.push_back ( curColumn );
656     }
657     else
658     {
659         // Add to list of db columns to be loaded
660         curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_FIELD;
661         fJob.jobTableList[tableNo].colList.push_back ( curColumn );
662 
663         // Add to combined field list of columns and ignored fields
664         JobFieldRef fieldRef( BULK_FLDCOL_COLUMN_FIELD,
665                               fJob.jobTableList[tableNo].colList.size() - 1 );
666         fJob.jobTableList[tableNo].fFldRefs.push_back( fieldRef  );
667     }
668 }
669 
670 //------------------------------------------------------------------------------
671 // Set column information parms for an input field that is to be ignored
672 //------------------------------------------------------------------------------
setJobDataIgnoreField()673 void XMLJob::setJobDataIgnoreField( )
674 {
675     JobColumn curColumn;
676 
677     int tableNo = fJob.jobTableList.size() - 1;
678     ostringstream oss;
679     oss << "IgnoreField" << fJob.jobTableList[tableNo].fFldRefs.size() + 1;
680     curColumn.colName     = oss.str();
681 
682     // Add to list of ignored fields
683     curColumn.fFldColRelation = BULK_FLDCOL_IGNORE_FIELD;
684     fJob.jobTableList[tableNo].fIgnoredFields.push_back( curColumn );
685 
686     // Add to combined field list of columns and ignored fields
687     JobFieldRef fieldRef( BULK_FLDCOL_IGNORE_FIELD,
688                           fJob.jobTableList[tableNo].fIgnoredFields.size() - 1 );
689     fJob.jobTableList[tableNo].fFldRefs.push_back      ( fieldRef  );
690 }
691 
692 //------------------------------------------------------------------------------
693 // Initialize the saturation limits for the specified column.
694 //------------------------------------------------------------------------------
initSatLimits(JobColumn & curColumn) const695 void XMLJob::initSatLimits( JobColumn& curColumn ) const
696 {
697     // If one of the integer types, we set the min/max saturation value.
698     // For DECIMAL columns this will vary with the precision.
699     if      ( curColumn.typeName ==
700               ColDataTypeStr[CalpontSystemCatalog::INT] )
701     {
702         curColumn.fMinIntSat = MIN_INT;
703         curColumn.fMaxIntSat = MAX_INT;
704     }
705     else if ( curColumn.typeName ==
706               ColDataTypeStr[CalpontSystemCatalog::UINT] )
707     {
708         curColumn.fMinIntSat = MIN_UINT;
709         curColumn.fMaxIntSat = MAX_UINT;
710     }
711     else if ( curColumn.typeName ==
712               ColDataTypeStr[CalpontSystemCatalog::BIGINT] )
713     {
714         curColumn.fMinIntSat = MIN_BIGINT;
715         curColumn.fMaxIntSat = MAX_BIGINT;
716     }
717     else if ( curColumn.typeName ==
718               ColDataTypeStr[CalpontSystemCatalog::UBIGINT] )
719     {
720         curColumn.fMinIntSat = MIN_UBIGINT;
721         curColumn.fMaxIntSat = MAX_UBIGINT;
722     }
723     else if ( curColumn.typeName ==
724               ColDataTypeStr[CalpontSystemCatalog::MEDINT] )
725     {
726         curColumn.fMinIntSat = MIN_MEDINT;
727         curColumn.fMaxIntSat = MAX_MEDINT;
728     }
729     else if ( curColumn.typeName ==
730               ColDataTypeStr[CalpontSystemCatalog::UMEDINT] )
731     {
732         curColumn.fMinIntSat = MIN_UMEDINT;
733         curColumn.fMaxIntSat = MAX_UMEDINT;
734     }
735     else if ( curColumn.typeName ==
736               ColDataTypeStr[CalpontSystemCatalog::SMALLINT] )
737     {
738         curColumn.fMinIntSat = MIN_SMALLINT;
739         curColumn.fMaxIntSat = MAX_SMALLINT;
740     }
741     else if ( curColumn.typeName ==
742               ColDataTypeStr[CalpontSystemCatalog::USMALLINT] )
743     {
744         curColumn.fMinIntSat = MIN_USMALLINT;
745         curColumn.fMaxIntSat = MAX_USMALLINT;
746     }
747     else if ( curColumn.typeName ==
748               ColDataTypeStr[CalpontSystemCatalog::TINYINT] )
749     {
750         curColumn.fMinIntSat = MIN_TINYINT;
751         curColumn.fMaxIntSat = MAX_TINYINT;
752     }
753     else if ( curColumn.typeName ==
754               ColDataTypeStr[CalpontSystemCatalog::UTINYINT] )
755     {
756         curColumn.fMinIntSat = MIN_UTINYINT;
757         curColumn.fMaxIntSat = MAX_UTINYINT;
758     }
759     else if ( curColumn.typeName ==
760               ColDataTypeStr[CalpontSystemCatalog::DECIMAL] )
761     {
762         curColumn.fMinIntSat = -columnstore_precision[curColumn.precision];
763         curColumn.fMaxIntSat = columnstore_precision[curColumn.precision];
764     }
765     else if ( curColumn.typeName ==
766               ColDataTypeStr[CalpontSystemCatalog::UDECIMAL] )
767     {
768         curColumn.fMinIntSat = 0;
769         curColumn.fMaxIntSat = columnstore_precision[curColumn.precision];
770     }
771     else if ( curColumn.typeName ==
772               ColDataTypeStr[CalpontSystemCatalog::FLOAT] )
773     {
774         curColumn.fMinDblSat = MIN_FLOAT;
775         curColumn.fMaxDblSat = MAX_FLOAT;
776     }
777     else if ( curColumn.typeName ==
778               ColDataTypeStr[CalpontSystemCatalog::UFLOAT] )
779     {
780         curColumn.fMinDblSat = 0.0;
781         curColumn.fMaxDblSat = MAX_FLOAT;
782     }
783     else if ( curColumn.typeName ==
784               ColDataTypeStr[CalpontSystemCatalog::DOUBLE] )
785     {
786         curColumn.fMinDblSat = MIN_DOUBLE;
787         curColumn.fMaxDblSat = MAX_DOUBLE;
788     }
789     else if ( curColumn.typeName ==
790               ColDataTypeStr[CalpontSystemCatalog::UDOUBLE] )
791     {
792         curColumn.fMinDblSat = 0.0;
793         curColumn.fMaxDblSat = MAX_DOUBLE;
794     }
795 }
796 
797 //------------------------------------------------------------------------------
798 // Set Read Buffers attributes
799 // pNode - current node
800 //------------------------------------------------------------------------------
setReadBuffers(xmlNode * pNode)801 void XMLJob::setReadBuffers( xmlNode* pNode )
802 {
803     int intVal = 0;
804 
805     if (getNodeAttribute(pNode,
806                          xmlTagTable[TAG_NO_OF_READ_BUFFERS],
807                          &intVal,
808                          TYPE_INT ))
809         fJob.numberOfReadBuffers = intVal;
810 
811     if (getNodeAttribute(pNode,
812                          xmlTagTable[TAG_READ_BUFFER_SIZE],
813                          &intVal,
814                          TYPE_INT ))
815         fJob.readBufferSize = intVal;
816 }
817 
818 //------------------------------------------------------------------------------
819 // Set Schema attributes
820 // pNode - current node
821 //------------------------------------------------------------------------------
setSchema(xmlNode * pNode)822 void XMLJob::setSchema( xmlNode* pNode )
823 {
824     std::string bufString;
825 
826     if ( getNodeAttributeStr( pNode,
827                               xmlTagTable[TAG_SCHEMA_NAME],
828                               bufString ) )
829         fJob.schema = bufString;
830 }
831 
832 //------------------------------------------------------------------------------
833 // Transfer any/all <DefaultColumn> columns from temporary fDefaultColumns, to
834 // the end of the column/field lists.
835 // It is assumed that we are working with the last table in jobTableList.
836 // Then get additional information from system catalog to finish populating
837 // our Job structs with all the table and column attributes we need.
838 //------------------------------------------------------------------------------
postProcessTableNode()839 void XMLJob::postProcessTableNode()
840 {
841     bool bValidateNoDefColWithoutDefValue = false;
842 
843     if (fDefaultColumns.size() > 0)
844     {
845         bValidateNoDefColWithoutDefValue = true;
846         int tableNo = fJob.jobTableList.size() - 1;
847 
848         for (unsigned k = 0; k < fDefaultColumns.size(); k++)
849         {
850             // Add to list of db columns to be loaded
851             fJob.jobTableList[tableNo].colList.push_back( fDefaultColumns[k] );
852 
853             // Add to combined list of columns and ignored fields
854             JobFieldRef fieldRef( BULK_FLDCOL_COLUMN_DEFAULT,
855                                   fJob.jobTableList[tableNo].colList.size() - 1 );
856             fJob.jobTableList[tableNo].fFldRefs.push_back( fieldRef );
857         }
858 
859         fDefaultColumns.clear();
860     }
861 
862     // Supplement xml file contents with information from syscat
863     execplan::CalpontSystemCatalog::RIDList colRidList;
864     fillInXMLDataAsLoaded( colRidList );
865 
866     // After getting all the system catalog information...
867     // Validate that if there are any <DefaultColumn> tags for a NotNull
868     // column, that the column is defined as NotNull With Default.
869     if (bValidateNoDefColWithoutDefValue)
870     {
871         int tableNo = fJob.jobTableList.size() - 1;
872 
873         for (unsigned int iCol = 0;
874                 iCol < fJob.jobTableList[tableNo].colList.size(); iCol++)
875         {
876             JobColumn& col = fJob.jobTableList[tableNo].colList[iCol];
877 
878             if (col.fFldColRelation == BULK_FLDCOL_COLUMN_DEFAULT)
879             {
880                 if ( (col.fNotNull) && (!col.fWithDefault) )
881                 {
882                     std::ostringstream oss;
883                     oss << "Column " << col.colName << " in table " <<
884                         fJob.jobTableList[tableNo].tblName << " is NotNull "
885                         "w/o default; cannot be used with <DefaultColumn>";
886                     throw std::runtime_error( oss.str() );
887                 }
888             }
889         }
890     }
891 
892     // Make sure all Columns in the DB are counted for with <Column> or
893     // <DefaultColumn> tags (unless validate is disabled)
894     if (fValidateColList)
895         validateAllColumnsHaveTags( colRidList );
896 }
897 
898 //------------------------------------------------------------------------------
899 // Use the table and column names from the last <Table> just loaded, to
900 // collect the remaining information from the system catalog, in order to
901 // populate the JobColumn structure.
902 //------------------------------------------------------------------------------
fillInXMLDataAsLoaded(execplan::CalpontSystemCatalog::RIDList & colRidList)903 void XMLJob::fillInXMLDataAsLoaded(
904     execplan::CalpontSystemCatalog::RIDList& colRidList)
905 {
906     boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
907         execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(
908             BULK_SYSCAT_SESSION_ID);
909     cat->identity(execplan::CalpontSystemCatalog::EC);
910 
911     // Get the table and column attributes for the last <Table> processed
912     unsigned int iTbl = fJob.jobTableList.size() - 1;
913     JobTable& tbl = fJob.jobTableList[iTbl];
914 
915     std::string tblName;
916     string::size_type startName = tbl.tblName.rfind('.');
917 
918     if (startName == string::npos)
919         tblName.assign( tbl.tblName );
920     else
921         tblName.assign( tbl.tblName.substr(startName + 1) );
922 
923     execplan::CalpontSystemCatalog::TableName table(
924         fJob.schema, tblName );
925 
926     if (fJob.jobTableList[iTbl].mapOid == 0)
927     {
928         execplan::CalpontSystemCatalog::OID tblOid =
929             cat->tableRID(table).objnum;
930         tbl.mapOid = tblOid;
931     }
932 
933     // This call is made to improve performance.
934     // The call forces all the column information for this table to be
935     // cached at one time, instead of doing it piece-meal through repeated
936     // calls to lookupOID().
937     colRidList = cat->columnRIDs(table, true);
938 
939     // Loop through the columns to get the column attributes
940     for (unsigned int iCol = 0;
941             iCol < fJob.jobTableList[iTbl].colList.size(); iCol++)
942     {
943         JobColumn& col = fJob.jobTableList[iTbl].colList[iCol];
944 
945         if (col.mapOid == 0)
946         {
947             execplan::CalpontSystemCatalog::TableColName column;
948             column.schema = fJob.schema;
949             column.table  = tblName;
950             column.column = col.colName;
951             execplan::CalpontSystemCatalog::OID colOid =
952                 cat->lookupOID( column );
953 
954             if (colOid < 0)
955             {
956                 ostringstream oss;
957                 oss << "Column OID lookup failed for: " << column;
958                 throw runtime_error( oss.str() );
959             }
960 
961             col.mapOid    = colOid;
962 
963             execplan::CalpontSystemCatalog::ColType colType =
964                 cat->colType( col.mapOid );
965 
966             col.width                   = colType.colWidth;
967             col.definedWidth            = colType.colWidth;
968 
969             if ((colType.scale > 0) ||
970                     (colType.colDataType ==
971                      execplan::CalpontSystemCatalog::DECIMAL) ||
972                     (colType.colDataType ==
973                      execplan::CalpontSystemCatalog::UDECIMAL))
974             {
975                 col.precision           = colType.precision;
976                 col.scale               = colType.scale;
977             }
978 
979             col.typeName                = ColDataTypeStr[colType.colDataType];
980             col.compressionType         = colType.compressionType;
981             col.dctnry.fCompressionType = colType.compressionType;
982 
983             if (colType.autoincrement)
984                 col.autoIncFlag         = true;
985             else
986                 col.autoIncFlag         = false;
987 
988             // Initialize NotNull and Default Value (based on data type)
989             fillInXMLDataNotNullDefault( tbl.tblName, colType, col );
990 
991             if (colType.ddn.dictOID > 0)
992             {
993                 col.colType             = 'D';
994                 col.dctnryWidth         = colType.colWidth;
995                 col.dctnry.dctnryOid    = colType.ddn.dictOID;
996             }
997 
998             // @bug3801: For backwards compatability, we treat
999             // integer types with nonzero 0 scale as decimal if scale > 0
1000             if ( ((col.typeName ==
1001                     ColDataTypeStr[CalpontSystemCatalog::INT])      ||
1002                     (col.typeName ==
1003                      ColDataTypeStr[CalpontSystemCatalog::BIGINT])   ||
1004                     (col.typeName ==
1005                      ColDataTypeStr[CalpontSystemCatalog::SMALLINT]) ||
1006                     (col.typeName ==
1007                      ColDataTypeStr[CalpontSystemCatalog::TINYINT])) &&
1008                     (col.scale > 0) )
1009             {
1010                 col.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL];
1011             }
1012 
1013             // Initialize the saturation limits for this column
1014             initSatLimits( col );
1015         }
1016     } // end of loop through columns
1017 }
1018 
1019 //------------------------------------------------------------------------------
1020 // Using information from the system catalog (in colType), fill in the
1021 // applicable NotNull Default values into the specified JobColumn.
1022 //------------------------------------------------------------------------------
fillInXMLDataNotNullDefault(const std::string & fullTblName,execplan::CalpontSystemCatalog::ColType & colType,JobColumn & col)1023 void XMLJob::fillInXMLDataNotNullDefault(
1024     const std::string& fullTblName,
1025     execplan::CalpontSystemCatalog::ColType& colType,
1026     JobColumn& col )
1027 {
1028     const std::string col_defaultValue(colType.defaultValue);
1029 
1030     if (colType.constraintType ==
1031             execplan::CalpontSystemCatalog::NOTNULL_CONSTRAINT)
1032     {
1033         col.fNotNull            = true;
1034 
1035         if (!col_defaultValue.empty())
1036             col.fWithDefault    = true;
1037     }
1038     else if (colType.constraintType ==
1039              execplan::CalpontSystemCatalog::DEFAULT_CONSTRAINT)
1040     {
1041         col.fWithDefault        = true;
1042     }
1043 
1044     if (col.fWithDefault)
1045     {
1046         bool bDefaultConvertError = false;
1047 
1048         // Convert Default Value.
1049         // We go ahead and report basic format conversion error;
1050         // but we don't do complete validation (like checking to see
1051         // if the default is too large for the given integer type),
1052         // because we assume DDL is fully validating the default value.
1053         switch (colType.colDataType)
1054         {
1055             case execplan::CalpontSystemCatalog::BIT:
1056             case execplan::CalpontSystemCatalog::TINYINT:
1057             case execplan::CalpontSystemCatalog::SMALLINT:
1058             case execplan::CalpontSystemCatalog::MEDINT:
1059             case execplan::CalpontSystemCatalog::INT:
1060             case execplan::CalpontSystemCatalog::BIGINT:
1061             {
1062                 errno = 0;
1063                 col.fDefaultInt = strtoll(col_defaultValue.c_str(), 0, 10);
1064 
1065                 if (errno == ERANGE)
1066                     bDefaultConvertError = true;
1067 
1068                 break;
1069             }
1070 
1071             case execplan::CalpontSystemCatalog::UTINYINT:
1072             case execplan::CalpontSystemCatalog::USMALLINT:
1073             case execplan::CalpontSystemCatalog::UMEDINT:
1074             case execplan::CalpontSystemCatalog::UINT:
1075             case execplan::CalpontSystemCatalog::UBIGINT:
1076             {
1077                 errno = 0;
1078                 col.fDefaultUInt = strtoull(col_defaultValue.c_str(), 0, 10);
1079 
1080                 if (errno == ERANGE)
1081                     bDefaultConvertError = true;
1082 
1083                 break;
1084             }
1085 
1086             case execplan::CalpontSystemCatalog::DECIMAL:
1087             case execplan::CalpontSystemCatalog::UDECIMAL:
1088             {
1089                 col.fDefaultInt = Convertor::convertDecimalString(
1090                                       col_defaultValue.c_str(),
1091                                       col_defaultValue.length(),
1092                                       colType.scale);
1093 
1094                 if (errno == ERANGE)
1095                     bDefaultConvertError = true;
1096 
1097                 break;
1098             }
1099 
1100             case execplan::CalpontSystemCatalog::DATE:
1101             {
1102                 int convertStatus;
1103                 int32_t dt =
1104                     dataconvert::DataConvert::convertColumnDate(
1105                         col_defaultValue.c_str(),
1106                         dataconvert::CALPONTDATE_ENUM, convertStatus,
1107                         col_defaultValue.length() );
1108 
1109                 if (convertStatus != 0)
1110                     bDefaultConvertError = true;
1111 
1112                 col.fDefaultInt = dt;
1113                 break;
1114             }
1115 
1116             case execplan::CalpontSystemCatalog::DATETIME:
1117             {
1118                 int convertStatus;
1119                 int64_t dt =
1120                     dataconvert::DataConvert::convertColumnDatetime(
1121                         col_defaultValue.c_str(),
1122                         dataconvert::CALPONTDATETIME_ENUM, convertStatus,
1123                         col_defaultValue.length() );
1124 
1125                 if (convertStatus != 0)
1126                     bDefaultConvertError = true;
1127 
1128                 col.fDefaultInt = dt;
1129                 break;
1130             }
1131 
1132             case execplan::CalpontSystemCatalog::TIMESTAMP:
1133             {
1134                 int convertStatus;
1135                 int64_t dt =
1136                     dataconvert::DataConvert::convertColumnTimestamp(
1137                         col_defaultValue.c_str(),
1138                         dataconvert::CALPONTDATETIME_ENUM, convertStatus,
1139                         col_defaultValue.length(), fTimeZone );
1140 
1141                 if (convertStatus != 0)
1142                     bDefaultConvertError = true;
1143 
1144                 col.fDefaultInt = dt;
1145                 break;
1146             }
1147 
1148             case execplan::CalpontSystemCatalog::TIME:
1149             {
1150                 int convertStatus;
1151                 int64_t dt =
1152                     dataconvert::DataConvert::convertColumnTime(
1153                         col_defaultValue.c_str(),
1154                         dataconvert::CALPONTTIME_ENUM, convertStatus,
1155                         col_defaultValue.length() );
1156 
1157                 if (convertStatus != 0)
1158                     bDefaultConvertError = true;
1159 
1160                 col.fDefaultInt = dt;
1161                 break;
1162             }
1163 
1164             case execplan::CalpontSystemCatalog::FLOAT:
1165             case execplan::CalpontSystemCatalog::DOUBLE:
1166             case execplan::CalpontSystemCatalog::UFLOAT:
1167             case execplan::CalpontSystemCatalog::UDOUBLE:
1168             {
1169                 errno = 0;
1170                 col.fDefaultDbl = strtod(col_defaultValue.c_str(), 0);
1171 
1172                 if (errno == ERANGE)
1173                     bDefaultConvertError = true;
1174 
1175                 break;
1176             }
1177 
1178             default:
1179             {
1180                 col.fDefaultChr = col_defaultValue;
1181                 break;
1182             }
1183         }
1184 
1185         if (bDefaultConvertError)
1186         {
1187             std::ostringstream oss;
1188             oss << "Column " << col.colName << " in table " << fullTblName <<
1189                 " has an invalid default value in system catalog.";
1190             throw std::runtime_error( oss.str() );
1191         }
1192     }
1193 }
1194 
1195 //------------------------------------------------------------------------------
1196 // Use the table and column names from the last <Table> just loaded, to
1197 // validate that all the columns have a <Column> or <DefaultColumn> tag
1198 // present in the job XML file.
1199 //------------------------------------------------------------------------------
validateAllColumnsHaveTags(const execplan::CalpontSystemCatalog::RIDList & colRidList) const1200 void XMLJob::validateAllColumnsHaveTags(
1201     const execplan::CalpontSystemCatalog::RIDList& colRidList) const
1202 {
1203     // Validate column list for the last <Table> processed
1204     unsigned int iTbl = fJob.jobTableList.size() - 1;
1205     const JobTable& tbl = fJob.jobTableList[iTbl];
1206 
1207     std::string tblName;
1208     string::size_type startName = tbl.tblName.rfind('.');
1209 
1210     if (startName == string::npos)
1211         tblName.assign( tbl.tblName );
1212     else
1213         tblName.assign( tbl.tblName.substr(startName + 1) );
1214 
1215     try
1216     {
1217         // Loop through column tags, saving col OIDs to a std::set for lookups
1218         std::set<execplan::CalpontSystemCatalog::OID> colOIDList;
1219         typedef std::set<execplan::CalpontSystemCatalog::OID>::iterator SetIter;
1220         std::pair<SetIter, bool> retVal;
1221 
1222         for (unsigned int iCol = 0;
1223                 iCol < fJob.jobTableList[iTbl].colList.size(); iCol++)
1224         {
1225             const JobColumn& col = fJob.jobTableList[iTbl].colList[iCol];
1226             retVal = colOIDList.insert( col.mapOid );
1227 
1228             if (!retVal.second)
1229             {
1230                 boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
1231                     execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(
1232                         BULK_SYSCAT_SESSION_ID);
1233                 cat->identity(execplan::CalpontSystemCatalog::EC);
1234 
1235                 execplan::CalpontSystemCatalog::TableColName dbColName =
1236                     cat->colName( col.mapOid );
1237                 std::ostringstream oss;
1238                 oss << "Column " << dbColName.column << " referenced in Job XML"
1239                     " file more than once.";
1240                 throw std::runtime_error( oss.str() );
1241             }
1242         }
1243 
1244         SetIter pos;
1245 
1246         // Loop thru cols in system catalog and verify that each one has a tag
1247         execplan::CalpontSystemCatalog::RIDList::const_iterator rid_iterator =
1248             colRidList.begin();
1249 
1250         while (rid_iterator != colRidList.end())
1251         {
1252             pos = colOIDList.find( rid_iterator->objnum );
1253 
1254             if (pos != colOIDList.end())
1255             {
1256                 colOIDList.erase( pos ); // through with this column, so delete
1257             }
1258             else
1259             {
1260                 boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
1261                     execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(
1262                         BULK_SYSCAT_SESSION_ID);
1263                 cat->identity(execplan::CalpontSystemCatalog::EC);
1264 
1265                 execplan::CalpontSystemCatalog::TableColName dbColName =
1266                     cat->colName( rid_iterator->objnum );
1267                 std::ostringstream oss;
1268                 oss << "No tag present in Job XML file for DB column: " <<
1269                     dbColName.column;
1270                 throw std::runtime_error( oss.str() );
1271             }
1272 
1273             ++rid_iterator;
1274         }
1275     }
1276     catch (std::exception& ex)
1277     {
1278         std::ostringstream oss;
1279         oss << "Error validating column list for table " <<
1280             fJob.schema << '.' << tblName << "; " << ex.what();
1281         throw std::runtime_error( oss.str() );
1282     }
1283     catch (...)
1284     {
1285         std::ostringstream oss;
1286         oss << "Unknown Error validating column list for table " <<
1287             fJob.schema << '.' << tblName;
1288         throw std::runtime_error( oss.str() );
1289     }
1290 }
1291 
1292 //------------------------------------------------------------------------------
1293 // Generate a permanent or temporary Job XML file name path.
1294 // sXMLJobDir Command line override for complete Job directory path
1295 // jobDIr     Job subdirectory under default <BulkRoot> path
1296 // jobId      Job ID
1297 // bTempFile  Are we creating a temporary Job Xml File
1298 // schmaName  If temp file, this is schema name to use
1299 // tableName  If temp file, this is the table name to use
1300 // xmlDirPath The complete Job XML file path that is constructed
1301 // errMsg     Relevant error message if return value is not NO_ERROR.
1302 //------------------------------------------------------------------------------
1303 /* static */
genJobXMLFileName(const string & sXMLJobDir,const string & jobDir,const string & jobId,bool bTempFile,const string & schemaName,const string & tableName,boost::filesystem::path & xmlFilePath,string & errMsg,std::string & tableOIDStr)1304 int XMLJob::genJobXMLFileName(
1305     const string& sXMLJobDir,
1306     const string& jobDir,
1307     const string& jobId,
1308     bool          bTempFile,
1309     const string& schemaName,
1310     const string& tableName,
1311     boost::filesystem::path& xmlFilePath,
1312     string& errMsg,
1313     std::string&	   tableOIDStr )
1314 {
1315     // get full file directory path for XML job description file
1316     if (sXMLJobDir.empty())
1317     {
1318         xmlFilePath  = Config::getBulkRoot();
1319         xmlFilePath /= jobDir;
1320     }
1321     else
1322     {
1323         xmlFilePath = sXMLJobDir;
1324 
1325         //If filespec doesn't begin with a '/' (i.e. it's not an absolute path),
1326         // attempt to make it absolute so that we can log the full pathname.
1327         if (!xmlFilePath.has_root_path())
1328         {
1329 #ifdef _MSC_VER
1330             // nothing else to do
1331 #else
1332             char cwdPath[4096];
1333             char *err;
1334             err = getcwd(cwdPath, sizeof(cwdPath));
1335             if (err == NULL)
1336             {
1337                 errMsg = "Failed to get the current working directory.";
1338                 return -1;
1339             }
1340             string trailingPath(xmlFilePath.string());
1341             xmlFilePath  = cwdPath;
1342             xmlFilePath /= trailingPath;
1343 #endif
1344         }
1345     }
1346 
1347     // Append the file name to the directory path
1348     string jobFileName;
1349 
1350     if (bTempFile)
1351     {
1352         // Create tmp directory if does not exist
1353         RETURN_ON_ERROR( createTempJobDir( xmlFilePath.string(), errMsg ) );
1354         jobFileName += tableOIDStr;
1355         //jobFileName += schemaName;
1356         // jobFileName += '_';
1357         // jobFileName += tableName;
1358         jobFileName += "_D";
1359 
1360         string now(boost::posix_time::to_iso_string( boost::posix_time::second_clock::local_time()));
1361 
1362         // microseconds
1363         struct timeval tp;
1364         gettimeofday(&tp, 0);
1365         ostringstream usec;
1366         usec << setfill('0') << setw(6) << tp.tv_usec;
1367 
1368         jobFileName += now.substr(0, 8);
1369         jobFileName += "_T";
1370         jobFileName += now.substr(9, 6);
1371         jobFileName += "_S";
1372         jobFileName += usec.str();
1373         jobFileName += '_';
1374     }
1375 
1376     jobFileName += "Job_";
1377     jobFileName += jobId;
1378     jobFileName += ".xml";
1379 
1380     xmlFilePath /= jobFileName;
1381 
1382     return NO_ERROR;
1383 }
1384 
1385 //------------------------------------------------------------------------------
1386 // Create directory for temporary XML job description files.
1387 // OAM restart should delete any/all files in this directory.
1388 //------------------------------------------------------------------------------
1389 /* static */
createTempJobDir(const string & xmlFilePath,string & errMsg)1390 int XMLJob::createTempJobDir( const string& xmlFilePath,
1391                               string& errMsg )
1392 {
1393     boost::filesystem::path pathDir(xmlFilePath);
1394 
1395     // create temp directory for XML job file if it does not exist
1396     try
1397     {
1398         if ( !boost::filesystem::exists( xmlFilePath ) )
1399         {
1400             string boostErrString;
1401 
1402             try
1403             {
1404                 boost::filesystem::create_directories(pathDir);
1405             }
1406             catch (exception& ex)
1407             {
1408                 // ignore exception for now; we may have just had a
1409                 // race condition where 2 jobs were creating dirs.
1410                 boostErrString = ex.what();
1411             }
1412 
1413             if ( !boost::filesystem::exists( xmlFilePath ) )
1414             {
1415                 ostringstream oss;
1416                 oss << "Error creating XML temp job file directory(1) " <<
1417                     xmlFilePath << "; " << boostErrString;
1418                 errMsg = oss.str();
1419 
1420                 return ERR_DIR_CREATE;
1421             }
1422         }
1423     }
1424     catch (exception& ex)
1425     {
1426         ostringstream oss;
1427         oss << "Error creating XML temp job file directory(2) " <<
1428             xmlFilePath << "; " << ex.what();
1429         errMsg = oss.str();
1430 
1431         return ERR_DIR_CREATE;
1432     }
1433 
1434     if (!boost::filesystem::is_directory(pathDir) )
1435     {
1436         ostringstream oss;
1437         oss << "Error creating XML temp job file directory " <<
1438             xmlFilePath << "; path already exists as non-directory" << endl;
1439         errMsg = oss.str();
1440 
1441         return ERR_DIR_CREATE;
1442     }
1443 
1444     return NO_ERROR;
1445 }
1446 
1447 } //end of namespace
1448 
1449