1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /***********************************************************************
19  *   $Id: ddlindexpopulator.cpp 9210 2013-01-21 14:10:42Z rdempsey $
20  *
21  *
22  ***********************************************************************/
23 #include <sys/types.h>
24 #include <sys/ipc.h>
25 
26 #include "ddlindexpopulator.h"
27 
28 #include "messagelog.h"
29 #include "dataconvert.h"
30 #include "joblist.h"
31 #include "calpontselectexecutionplan.h"
32 #include "distributedenginecomm.h"
33 #include "simplecolumn.h"
34 #include "resourcemanager.h"
35 #include "columnresult.h"
36 
37 #include <boost/any.hpp>
38 using namespace boost;
39 #include <boost/algorithm/string/case_conv.hpp>
40 using namespace boost::algorithm;
41 
42 using namespace WriteEngine;
43 using namespace logging;
44 using namespace resultset;
45 using namespace joblist;
46 
47 using namespace std;
48 using namespace execplan;
49 using namespace ddlpackage;
50 using namespace messageqcpp;
51 
52 namespace ddlpackageprocessor
53 {
54 
populateIndex(DDLPackageProcessor::DDLResult & result)55 bool DDLIndexPopulator::populateIndex(DDLPackageProcessor::DDLResult& result)
56 {
57     if (makeIndexStructs() )
58         insertIndex();
59 
60     result = fResult;
61     return NO_ERROR != fResult.result;
62 }
63 
64 
makeIndexStructs()65 bool DDLIndexPopulator::makeIndexStructs( )
66 {
67     CalpontSelectExecutionPlan csep;
68     makeCsep(csep);
69     ResourceManager* rm;
70 
71     if (! fEC)
72     {
73         fEC = DistributedEngineComm::instance(rm);
74         fEC->Open();
75     }
76 
77     SJLP jbl = joblist::JobListFactory::makeJobList(&csep, rm);
78 
79     boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog( fSessionID );
80     csc->identity(CalpontSystemCatalog::EC);
81 
82     jbl->putEngineComm(fEC);
83     /*
84     ResultManager * result = jbl->GetResultManager();
85     result->setRunning(1);
86     jbl->Execute();	*/
87     jbl->doQuery();
88 
89     CalpontSystemCatalog::TableName tableName;
90     tableName.schema = fTable.fSchema;
91     tableName.table = fTable.fName;
92 
93     CalpontSystemCatalog::OID tableOid = (csc->tableRID ( tableName )).objnum;
94     CalpontSystemCatalog::NJLSysDataList sysDataList;
95 
96     for (;;)
97     {
98         TableBand band;
99         band = jbl->projectTable(tableOid);
100 
101         if (band.getRowCount() == 0)
102         {
103             // No more bands, table is done
104             break;
105         }
106 
107         band.convertToSysDataList(sysDataList, csc);
108         break;
109     }
110 
111     //size_t cnt = fColNames.size();
112     size_t i = 0;
113     vector<ColumnResult*>::const_iterator it;
114     vector<int>::const_iterator oid_iter;
115 
116     for (it = sysDataList.begin(); it != sysDataList.end(); it++)
117     {
118         if (isUnique())
119             fUniqueColResultList.push_back(*it);
120 
121         for ( oid_iter = fOidList.begin(); oid_iter != fOidList.end(); oid_iter++ )
122         {
123             if ( (*it)->ColumnOID() == *oid_iter )
124             {
125                 CalpontSystemCatalog::ColType coltype = makeIdxStruct(*it, fColNames.size(), csc);
126                 addColumnData(*it, coltype, i);
127             }
128         }
129 
130         i++;
131     }
132 
133     return (fIdxValueList.size() && NO_ERROR == fResult.result );
134 
135 }
136 
137 
138 
makeCsep(CalpontSelectExecutionPlan & csep)139 void DDLIndexPopulator::makeCsep(CalpontSelectExecutionPlan&  csep)
140 {
141 
142     csep.sessionID(fSessionID);
143 
144     csep.txnID(fTxnID);
145     csep.verID(fSessionManager->verID());
146 
147     CalpontSelectExecutionPlan::ReturnedColumnList colList;
148     CalpontSelectExecutionPlan::ColumnMap colMap;
149     CalpontSystemCatalog::TableColName tableColName;
150     CalpontSystemCatalog::OID oid;
151     tableColName.schema = fTable.fSchema;
152     tableColName.table = fTable.fName;
153     boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog( fSessionID );
154     string tableName(fTable.fSchema + "." + fTable.fName + ".");
155 
156     ColumnNameList::const_iterator cend = fColNames.end();
157 
158     for (ColumnNameList::const_iterator cname = fColNames.begin(); cname != cend; ++cname)
159     {
160         string fullColName(tableName + *cname);
161         SRCP srcp(new SimpleColumn (fullColName, fSessionID));
162         colList.push_back(srcp);
163         tableColName.column = *cname;
164         oid = csc->lookupOID( tableColName );
165         fOidList.push_back( oid );
166         colMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(fullColName, srcp));
167     }
168 
169     csep.columnMap (colMap);
170     csep.returnedCols (colList);
171 }
172 
173 
makeIdxStruct(const ColumnResult * cr,size_t cols,boost::shared_ptr<CalpontSystemCatalog> csc)174 CalpontSystemCatalog::ColType DDLIndexPopulator::makeIdxStruct(const ColumnResult* cr, size_t cols, boost::shared_ptr<CalpontSystemCatalog> csc )
175 {
176     IdxStruct idx;
177     idx.treeOid = fIdxOID.treeOID;
178     idx.listOid = fIdxOID.listOID;
179     idx.multiColFlag = cols > 1;
180     CalpontSystemCatalog::ColType coltype = csc->colType(cr->ColumnOID());
181     idx.idxDataType = static_cast<CalpontSystemCatalog::ColDataType>(coltype.colDataType);
182 
183     if (isDictionaryType(coltype) )
184     {
185         idx.idxWidth = fTOKENSIZE;
186         idx.idxType = WR_CHAR;
187     }//@bug 410: index sizes are either 1, 4 or 8
188     else if (exeplan::isCharType(coltype))
189     {
190         if (1 == coltype.colWidth) idx.idxWidth = 1;
191         else idx.idxWidth = (coltype.colWidth > 4) ? 8 : 4;
192 
193         idx.idxType = WR_CHAR;
194     }
195     else
196         idx.idxWidth = coltype.colWidth;
197 
198     fIdxStructList.push_back(idx);
199     return coltype;
200 }
201 
addColumnData(const execplan::ColumnResult * cr,const CalpontSystemCatalog::ColType colType,int added)202 void DDLIndexPopulator::addColumnData(const execplan::ColumnResult* cr, const CalpontSystemCatalog::ColType colType, int added)
203 {
204     WriteEngine::IdxTupleList  tupleList;
205     WriteEngine::IdxTuple tuple;
206 
207     for (int i = 0; i < cr->dataCount(); ++i)
208     {
209 
210         WriteEngine::IdxTuple tuple ;
211         convertColData( cr, i,  colType, tuple);
212 
213         if (checkConstraints( tuple, colType, i, added))
214         {
215             tupleList.push_back(tuple);
216 
217             if (! added )
218                 fRidList.push_back(cr->GetRid(i));
219         }
220         else
221             break;
222     }
223 
224     if (tupleList.size())
225         fIdxValueList.push_back(tupleList);
226 }
227 
228 
229 
convertColData(const execplan::ColumnResult * cr,int idx,const CalpontSystemCatalog::ColType & colType,WriteEngine::IdxTuple & tuple)230 void DDLIndexPopulator::convertColData(const execplan::ColumnResult* cr, int idx,  const CalpontSystemCatalog::ColType& colType, WriteEngine::IdxTuple& tuple)
231 {
232     if (isDictionaryType(colType))
233     {
234         /*	tuple.data = tokenizeData ( colType, cr->GetStringData(idx) );*/
235         /*	tuple.data = tokenizeData ( cr->GetRid(idx) );*/
236         tuple.data = convertTokenData(cr->GetStringData(idx));
237     }
238     else tuple.data = convertData( colType, cr, idx);
239 }
240 
convertTokenData(const std::string & data)241 boost::any DDLIndexPopulator::convertTokenData( const std::string& data )
242 {
243     string strData((size_t)fTOKENSIZE < data.length() ? data.substr(0, fTOKENSIZE) : data);
244     return  strData;
245 }
246 
247 #if 0
248 // Disabled this function as it is currently not used.
249 // If we decide to use, we should check on the usage of fileop.getFileName().
250 // With iteration 17, the more common version of this getFileName() takes a
251 // partition and segment number in addition to an OID.  openColumnFile
252 // should perhaps be changed to use this updated version of getFileName().
253 bool DDLIndexPopulator::openColumnFile(WriteEngine::OID oid)
254 {
255     FileOp fileOp;
256     char fileName[WriteEngine::FILE_NAME_SIZE];
257 
258     if (WriteEngine::NO_ERROR == fileOp.getFileName(oid, fileName) )
259     {
260         fColumnFile.open(fileName);
261         return true;
262     }
263     else
264     {
265         logError("Could not get column file name for data");
266         return false;
267     }
268 }
269 #endif
270 
271 // Workaround to get original column token and not "retokenize" the string value
tokenizeData(WriteEngine::RID rid)272 boost::any DDLIndexPopulator::tokenizeData( WriteEngine::RID rid )
273 {
274     int64_t byteOffset = rid * fTOKENSIZE;
275     ByteStream::byte inbuf[fTOKENSIZE];
276     fColumnFile.seekg(byteOffset, ios::beg);
277     fColumnFile.read(reinterpret_cast<char*>(inbuf), fTOKENSIZE);
278 
279     WriteEngine::Token token;
280     memcpy(&token, inbuf, fTOKENSIZE);
281     return token;
282 }
283 
284 
tokenizeData(const execplan::CalpontSystemCatalog::ColType & colType,const std::string & data)285 boost::any DDLIndexPopulator::tokenizeData( const execplan::CalpontSystemCatalog::ColType& colType, const std::string& data )
286 {
287     WriteEngine::DctnryTuple  dictTuple;
288 
289     if ( data.length() > (unsigned int)colType.colWidth )
290     {
291         logError("Insert value is too large for column");
292     }
293     else
294     {
295         WriteEngine::DctnryStruct dictStruct;
296         dictStruct.treeOid = colType.ddn.treeOID;
297         dictStruct.listOid = colType.ddn.listOID;
298         dictStruct.dctnryOid = colType.ddn.dictOID;
299         dictTuple.sigValue = data.c_str();
300         dictTuple.sigSize = data.length();
301         int error = NO_ERROR;
302 
303         if ( NO_ERROR != (error = fWriteEngine->tokenize( fTxnID, dictStruct, dictTuple)) )
304         {
305             logError("Tokenization failed", error);
306         }
307     }
308 
309     return dictTuple.token;
310 }
311 
312 
313 
convertData(const CalpontSystemCatalog::ColType & colType,const execplan::ColumnResult * cr,int idx)314 boost::any   DDLIndexPopulator::convertData(const CalpontSystemCatalog::ColType&  colType, const execplan::ColumnResult* cr, int idx )
315 {
316     uint64_t data = cr->GetData(idx);
317 
318     switch ( colType.colDataType )
319     {
320         case CalpontSystemCatalog::BIT:
321         case execplan::CalpontSystemCatalog::TINYINT:
322             return  *reinterpret_cast<char*>(&data);
323 
324         case execplan::CalpontSystemCatalog::SMALLINT:
325             return  *reinterpret_cast<short*>(&data);
326 
327         case execplan::CalpontSystemCatalog::DATE:	// @bug 375
328         case execplan::CalpontSystemCatalog::MEDINT:
329         case execplan::CalpontSystemCatalog::INT:
330             return  *reinterpret_cast<int*>(&data);
331 
332         case execplan::CalpontSystemCatalog::DATETIME: 	// @bug 375
333         case execplan::CalpontSystemCatalog::TIME:
334         case execplan::CalpontSystemCatalog::TIMESTAMP:
335         case execplan::CalpontSystemCatalog::BIGINT:
336             return  *reinterpret_cast<long long*>(&data);
337 
338         case execplan::CalpontSystemCatalog::DECIMAL:
339         {
340             if (colType.colWidth <= CalpontSystemCatalog::FOUR_BYTE) return  *reinterpret_cast<short*>(&data);
341 
342             else if (colType.colWidth <= 9)              return  *reinterpret_cast<int*>(&data);
343 
344             else                                         return  *reinterpret_cast<long long*>(&data);
345         }
346 
347         case execplan::CalpontSystemCatalog::FLOAT:
348             return  *reinterpret_cast<float*>(&data);
349 
350         case execplan::CalpontSystemCatalog::DOUBLE:
351             return  *reinterpret_cast<double*>(&data);
352 
353         case execplan::CalpontSystemCatalog::CHAR:
354         case execplan::CalpontSystemCatalog::VARCHAR:
355         {
356             string  strData(cr->GetStringData(idx) );
357             return  *reinterpret_cast<string*>(&strData);
358         }
359 
360         default:
361             break;
362     }
363 
364     logError("Invalid column type");
365     throw std::runtime_error("Invalid data");
366 
367     return *reinterpret_cast<long long*>(&data);
368 
369 }
370 
371 
insertIndex()372 void DDLIndexPopulator::insertIndex( )
373 {
374 // @bug 359 use bulk load build
375     int rc = (1 < fIdxStructList.size()) ?
376              (void)0
377              :  (void)0;
378 
379     if (rc)
380         logError("Error inserting index values", rc );
381 
382 }
383 
isDictionaryType(const CalpontSystemCatalog::ColType & colType)384 bool DDLIndexPopulator::isDictionaryType(const CalpontSystemCatalog::ColType& colType)
385 {
386     return ( (CalpontSystemCatalog::CHAR == colType.colDataType && 8 < colType.colWidth )
387              || (CalpontSystemCatalog::VARCHAR == colType.colDataType  &&  7 < colType.colWidth )
388              || (CalpontSystemCatalog::DECIMAL == colType.colDataType  &&  18 < colType.precision ));
389 
390 }
391 
checkConstraints(const IdxTuple & data,const CalpontSystemCatalog::ColType & ctype,int i,int column)392 bool DDLIndexPopulator::checkConstraints( const IdxTuple& data, const CalpontSystemCatalog::ColType& ctype, int i, int column)
393 {
394 
395     switch ( fConstraint )
396     {
397         case DDL_INVALID_CONSTRAINT:
398             return true;
399 
400         case DDL_UNIQUE:
401         case DDL_PRIMARY_KEY:
402             if ((size_t)column + 1 < fColNames.size() )
403                 return true;
404 
405             return checkUnique( i, ctype );
406 
407         case DDL_NOT_NULL:
408             return checkNotNull( data, ctype );
409 
410         case DDL_CHECK:
411             return checkCheck( data, ctype );
412 
413         default:
414             return true; //?
415     }
416 
417 }
418 
419 // Check if the row of data at idx is already in fUniqueColResultList
420 
checkUnique(int idx,const CalpontSystemCatalog::ColType & colType)421 bool DDLIndexPopulator::checkUnique( int idx, const CalpontSystemCatalog::ColType& colType )
422 {
423     if (0 == idx)
424         return true;
425 
426     //Get row of data as each column result data at idx
427     size_t indexSize = fColNames.size();
428     vector <uint64_t> rowIntData(indexSize);
429     vector <string> rowStrData(indexSize);
430 
431     for (size_t i = 0; i < indexSize; ++i)
432     {
433         //if  ( isStringType(fUniqueColResultList[i]->columnType()) )
434         if  ( isStringType(colType.colDataType) )
435             rowStrData[i] = fUniqueColResultList[i]->GetStringData(idx);
436         else
437             rowIntData[i] = fUniqueColResultList[i]->GetData(idx);
438     }
439 
440     //check if each value in the idx row is equal to each value in a previous row
441     // i is the row; j is the column.
442     bool unique = true;
443 
444     for (int i = 0; i < idx && unique; ++i)
445     {
446         bool equal = true;
447 
448         for (size_t j = 0; j < indexSize && equal; ++j)
449         {
450             if ( isStringType(colType.colDataType) )
451             {
452                 equal = fUniqueColResultList[j]->GetStringData(i) == rowStrData[j];
453             }
454             else
455             {
456                 equal = (static_cast<uint64_t>(fUniqueColResultList[j]->GetData(i)) == rowIntData[j]);
457             }
458         }
459 
460         unique = ! equal;
461     }
462 
463     if (! unique)
464     {
465         stringstream ss;
466         ss << idx;
467         logError("Unique Constraint violated on row: " + ss.str() );
468     }
469 
470     return unique;
471 }
472 
473 
checkNotNull(const IdxTuple & data,const CalpontSystemCatalog::ColType & colType)474 bool DDLIndexPopulator::checkNotNull(const IdxTuple& data, const CalpontSystemCatalog::ColType& colType)
475 {
476 
477     any nullvalue = DDLNullValueForType(colType);
478     bool isNull = false;
479 
480     switch ( colType.colDataType )
481     {
482         case CalpontSystemCatalog::BIT:
483             break;
484 
485         case execplan::CalpontSystemCatalog::TINYINT:
486             isNull = any_cast<char>(data.data) == any_cast<char>(nullvalue);
487             break;
488 
489         case execplan::CalpontSystemCatalog::SMALLINT:
490             isNull = any_cast<short>(data.data) == any_cast<short>(nullvalue);
491             break;
492 
493         case execplan::CalpontSystemCatalog::MEDINT:
494         case execplan::CalpontSystemCatalog::INT:
495             isNull = any_cast<int>(data.data) == any_cast<int>(nullvalue);
496             break;
497 
498         case execplan::CalpontSystemCatalog::BIGINT:
499             isNull = any_cast<long long>(data.data) == any_cast<long long>(nullvalue);
500             break;
501 
502         case execplan::CalpontSystemCatalog::DECIMAL:
503         {
504             if (colType.colWidth <= CalpontSystemCatalog::FOUR_BYTE)
505                 isNull = any_cast<short>(data.data) == any_cast<short>(nullvalue);
506             else if (colType.colWidth <= 9)
507                 isNull =  any_cast<int>(data.data) == any_cast<int>(nullvalue);
508             else if (colType.colWidth <= 18)
509                 isNull =  any_cast<long long>(data.data) == any_cast<long long>(nullvalue);
510             else
511                 isNull =  compareToken(any_cast<WriteEngine::Token>(data.data), any_cast<WriteEngine::Token>(nullvalue));
512 
513             break;
514         }
515 
516         case execplan::CalpontSystemCatalog::FLOAT:
517             isNull =  any_cast<float>(data.data) == any_cast<float>(nullvalue);
518             break;
519 
520         case execplan::CalpontSystemCatalog::DOUBLE:
521             isNull =  any_cast<double>(data.data) == any_cast<double>(nullvalue);
522             break;
523 
524         case execplan::CalpontSystemCatalog::DATE:
525             isNull =  any_cast<int>(data.data) == any_cast<int>(nullvalue);
526             break;
527 
528         case execplan::CalpontSystemCatalog::DATETIME:
529         case execplan::CalpontSystemCatalog::TIME:
530         case execplan::CalpontSystemCatalog::TIMESTAMP:
531             isNull =  any_cast<long long>(data.data) == any_cast<long long>(nullvalue);
532             break;
533 
534         case execplan::CalpontSystemCatalog::CHAR:
535         {
536             if (colType.colWidth == execplan::CalpontSystemCatalog::ONE_BYTE)
537                 isNull =  any_cast<string>(data.data) == any_cast<string>(nullvalue);
538             else if (colType.colWidth == execplan::CalpontSystemCatalog::TWO_BYTE)
539                 isNull =  any_cast<string>(data.data) == any_cast<string>(nullvalue);
540             else if (colType.colWidth <= execplan::CalpontSystemCatalog::FOUR_BYTE)
541                 isNull =  any_cast<string>(data.data) == any_cast<string>(nullvalue);
542             else
543                 isNull =  compareToken(any_cast<WriteEngine::Token>(data.data), any_cast<WriteEngine::Token>(nullvalue));
544 
545             break;
546 
547         }
548 
549         case execplan::CalpontSystemCatalog::VARCHAR:
550         {
551             if (colType.colWidth == execplan::CalpontSystemCatalog::ONE_BYTE)
552                 isNull =  any_cast<string>(data.data) == any_cast<string>(nullvalue);
553             else if (colType.colWidth < execplan::CalpontSystemCatalog::FOUR_BYTE)
554                 isNull =  any_cast<string>(data.data) == any_cast<string>(nullvalue);
555             else
556                 isNull =  compareToken(any_cast<WriteEngine::Token>(data.data), any_cast<WriteEngine::Token>(nullvalue));
557 
558             break;
559         }
560 
561         default:
562             throw std::runtime_error("getNullValueForType: unkown column data type");
563     }
564 
565     if (isNull)
566         logError("Null value not allowed in index");
567 
568     return ! isNull;
569 
570 }
571 
logError(const string & msg,int error)572 void  DDLIndexPopulator::logError(const string& msg, int error)
573 {
574 
575     Message::Args args;
576     Message message(9);
577     args.add((string)__FILE__ + ": ");
578     args.add(msg);
579 
580     if (error)
581     {
582         args.add("Error number: ");
583         args.add(error);
584     }
585 
586     message.format( args );
587 
588     fResult.result = DDLPackageProcessor::CREATE_ERROR;
589     fResult.message = message;
590 }
591 
592 
593 } //namespace
594 
595 
596 
597 
598 
599 
600