1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /***********************************************************************
19 * $Id: ddlindexpopulator.cpp 9210 2013-01-21 14:10:42Z rdempsey $
20 *
21 *
22 ***********************************************************************/
23 #include <sys/types.h>
24 #include <sys/ipc.h>
25
26 #include "ddlindexpopulator.h"
27
28 #include "messagelog.h"
29 #include "dataconvert.h"
30 #include "joblist.h"
31 #include "calpontselectexecutionplan.h"
32 #include "distributedenginecomm.h"
33 #include "simplecolumn.h"
34 #include "resourcemanager.h"
35 #include "columnresult.h"
36
37 #include <boost/any.hpp>
38 using namespace boost;
39 #include <boost/algorithm/string/case_conv.hpp>
40 using namespace boost::algorithm;
41
42 using namespace WriteEngine;
43 using namespace logging;
44 using namespace resultset;
45 using namespace joblist;
46
47 using namespace std;
48 using namespace execplan;
49 using namespace ddlpackage;
50 using namespace messageqcpp;
51
52 namespace ddlpackageprocessor
53 {
54
populateIndex(DDLPackageProcessor::DDLResult & result)55 bool DDLIndexPopulator::populateIndex(DDLPackageProcessor::DDLResult& result)
56 {
57 if (makeIndexStructs() )
58 insertIndex();
59
60 result = fResult;
61 return NO_ERROR != fResult.result;
62 }
63
64
makeIndexStructs()65 bool DDLIndexPopulator::makeIndexStructs( )
66 {
67 CalpontSelectExecutionPlan csep;
68 makeCsep(csep);
69 ResourceManager* rm;
70
71 if (! fEC)
72 {
73 fEC = DistributedEngineComm::instance(rm);
74 fEC->Open();
75 }
76
77 SJLP jbl = joblist::JobListFactory::makeJobList(&csep, rm);
78
79 boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog( fSessionID );
80 csc->identity(CalpontSystemCatalog::EC);
81
82 jbl->putEngineComm(fEC);
83 /*
84 ResultManager * result = jbl->GetResultManager();
85 result->setRunning(1);
86 jbl->Execute(); */
87 jbl->doQuery();
88
89 CalpontSystemCatalog::TableName tableName;
90 tableName.schema = fTable.fSchema;
91 tableName.table = fTable.fName;
92
93 CalpontSystemCatalog::OID tableOid = (csc->tableRID ( tableName )).objnum;
94 CalpontSystemCatalog::NJLSysDataList sysDataList;
95
96 for (;;)
97 {
98 TableBand band;
99 band = jbl->projectTable(tableOid);
100
101 if (band.getRowCount() == 0)
102 {
103 // No more bands, table is done
104 break;
105 }
106
107 band.convertToSysDataList(sysDataList, csc);
108 break;
109 }
110
111 //size_t cnt = fColNames.size();
112 size_t i = 0;
113 vector<ColumnResult*>::const_iterator it;
114 vector<int>::const_iterator oid_iter;
115
116 for (it = sysDataList.begin(); it != sysDataList.end(); it++)
117 {
118 if (isUnique())
119 fUniqueColResultList.push_back(*it);
120
121 for ( oid_iter = fOidList.begin(); oid_iter != fOidList.end(); oid_iter++ )
122 {
123 if ( (*it)->ColumnOID() == *oid_iter )
124 {
125 CalpontSystemCatalog::ColType coltype = makeIdxStruct(*it, fColNames.size(), csc);
126 addColumnData(*it, coltype, i);
127 }
128 }
129
130 i++;
131 }
132
133 return (fIdxValueList.size() && NO_ERROR == fResult.result );
134
135 }
136
137
138
makeCsep(CalpontSelectExecutionPlan & csep)139 void DDLIndexPopulator::makeCsep(CalpontSelectExecutionPlan& csep)
140 {
141
142 csep.sessionID(fSessionID);
143
144 csep.txnID(fTxnID);
145 csep.verID(fSessionManager->verID());
146
147 CalpontSelectExecutionPlan::ReturnedColumnList colList;
148 CalpontSelectExecutionPlan::ColumnMap colMap;
149 CalpontSystemCatalog::TableColName tableColName;
150 CalpontSystemCatalog::OID oid;
151 tableColName.schema = fTable.fSchema;
152 tableColName.table = fTable.fName;
153 boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog( fSessionID );
154 string tableName(fTable.fSchema + "." + fTable.fName + ".");
155
156 ColumnNameList::const_iterator cend = fColNames.end();
157
158 for (ColumnNameList::const_iterator cname = fColNames.begin(); cname != cend; ++cname)
159 {
160 string fullColName(tableName + *cname);
161 SRCP srcp(new SimpleColumn (fullColName, fSessionID));
162 colList.push_back(srcp);
163 tableColName.column = *cname;
164 oid = csc->lookupOID( tableColName );
165 fOidList.push_back( oid );
166 colMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(fullColName, srcp));
167 }
168
169 csep.columnMap (colMap);
170 csep.returnedCols (colList);
171 }
172
173
makeIdxStruct(const ColumnResult * cr,size_t cols,boost::shared_ptr<CalpontSystemCatalog> csc)174 CalpontSystemCatalog::ColType DDLIndexPopulator::makeIdxStruct(const ColumnResult* cr, size_t cols, boost::shared_ptr<CalpontSystemCatalog> csc )
175 {
176 IdxStruct idx;
177 idx.treeOid = fIdxOID.treeOID;
178 idx.listOid = fIdxOID.listOID;
179 idx.multiColFlag = cols > 1;
180 CalpontSystemCatalog::ColType coltype = csc->colType(cr->ColumnOID());
181 idx.idxDataType = static_cast<CalpontSystemCatalog::ColDataType>(coltype.colDataType);
182
183 if (isDictionaryType(coltype) )
184 {
185 idx.idxWidth = fTOKENSIZE;
186 idx.idxType = WR_CHAR;
187 }//@bug 410: index sizes are either 1, 4 or 8
188 else if (exeplan::isCharType(coltype))
189 {
190 if (1 == coltype.colWidth) idx.idxWidth = 1;
191 else idx.idxWidth = (coltype.colWidth > 4) ? 8 : 4;
192
193 idx.idxType = WR_CHAR;
194 }
195 else
196 idx.idxWidth = coltype.colWidth;
197
198 fIdxStructList.push_back(idx);
199 return coltype;
200 }
201
addColumnData(const execplan::ColumnResult * cr,const CalpontSystemCatalog::ColType colType,int added)202 void DDLIndexPopulator::addColumnData(const execplan::ColumnResult* cr, const CalpontSystemCatalog::ColType colType, int added)
203 {
204 WriteEngine::IdxTupleList tupleList;
205 WriteEngine::IdxTuple tuple;
206
207 for (int i = 0; i < cr->dataCount(); ++i)
208 {
209
210 WriteEngine::IdxTuple tuple ;
211 convertColData( cr, i, colType, tuple);
212
213 if (checkConstraints( tuple, colType, i, added))
214 {
215 tupleList.push_back(tuple);
216
217 if (! added )
218 fRidList.push_back(cr->GetRid(i));
219 }
220 else
221 break;
222 }
223
224 if (tupleList.size())
225 fIdxValueList.push_back(tupleList);
226 }
227
228
229
convertColData(const execplan::ColumnResult * cr,int idx,const CalpontSystemCatalog::ColType & colType,WriteEngine::IdxTuple & tuple)230 void DDLIndexPopulator::convertColData(const execplan::ColumnResult* cr, int idx, const CalpontSystemCatalog::ColType& colType, WriteEngine::IdxTuple& tuple)
231 {
232 if (isDictionaryType(colType))
233 {
234 /* tuple.data = tokenizeData ( colType, cr->GetStringData(idx) );*/
235 /* tuple.data = tokenizeData ( cr->GetRid(idx) );*/
236 tuple.data = convertTokenData(cr->GetStringData(idx));
237 }
238 else tuple.data = convertData( colType, cr, idx);
239 }
240
convertTokenData(const std::string & data)241 boost::any DDLIndexPopulator::convertTokenData( const std::string& data )
242 {
243 string strData((size_t)fTOKENSIZE < data.length() ? data.substr(0, fTOKENSIZE) : data);
244 return strData;
245 }
246
247 #if 0
248 // Disabled this function as it is currently not used.
249 // If we decide to use, we should check on the usage of fileop.getFileName().
250 // With iteration 17, the more common version of this getFileName() takes a
251 // partition and segment number in addition to an OID. openColumnFile
252 // should perhaps be changed to use this updated version of getFileName().
253 bool DDLIndexPopulator::openColumnFile(WriteEngine::OID oid)
254 {
255 FileOp fileOp;
256 char fileName[WriteEngine::FILE_NAME_SIZE];
257
258 if (WriteEngine::NO_ERROR == fileOp.getFileName(oid, fileName) )
259 {
260 fColumnFile.open(fileName);
261 return true;
262 }
263 else
264 {
265 logError("Could not get column file name for data");
266 return false;
267 }
268 }
269 #endif
270
271 // Workaround to get original column token and not "retokenize" the string value
tokenizeData(WriteEngine::RID rid)272 boost::any DDLIndexPopulator::tokenizeData( WriteEngine::RID rid )
273 {
274 int64_t byteOffset = rid * fTOKENSIZE;
275 ByteStream::byte inbuf[fTOKENSIZE];
276 fColumnFile.seekg(byteOffset, ios::beg);
277 fColumnFile.read(reinterpret_cast<char*>(inbuf), fTOKENSIZE);
278
279 WriteEngine::Token token;
280 memcpy(&token, inbuf, fTOKENSIZE);
281 return token;
282 }
283
284
tokenizeData(const execplan::CalpontSystemCatalog::ColType & colType,const std::string & data)285 boost::any DDLIndexPopulator::tokenizeData( const execplan::CalpontSystemCatalog::ColType& colType, const std::string& data )
286 {
287 WriteEngine::DctnryTuple dictTuple;
288
289 if ( data.length() > (unsigned int)colType.colWidth )
290 {
291 logError("Insert value is too large for column");
292 }
293 else
294 {
295 WriteEngine::DctnryStruct dictStruct;
296 dictStruct.treeOid = colType.ddn.treeOID;
297 dictStruct.listOid = colType.ddn.listOID;
298 dictStruct.dctnryOid = colType.ddn.dictOID;
299 dictTuple.sigValue = data.c_str();
300 dictTuple.sigSize = data.length();
301 int error = NO_ERROR;
302
303 if ( NO_ERROR != (error = fWriteEngine->tokenize( fTxnID, dictStruct, dictTuple)) )
304 {
305 logError("Tokenization failed", error);
306 }
307 }
308
309 return dictTuple.token;
310 }
311
312
313
convertData(const CalpontSystemCatalog::ColType & colType,const execplan::ColumnResult * cr,int idx)314 boost::any DDLIndexPopulator::convertData(const CalpontSystemCatalog::ColType& colType, const execplan::ColumnResult* cr, int idx )
315 {
316 uint64_t data = cr->GetData(idx);
317
318 switch ( colType.colDataType )
319 {
320 case CalpontSystemCatalog::BIT:
321 case execplan::CalpontSystemCatalog::TINYINT:
322 return *reinterpret_cast<char*>(&data);
323
324 case execplan::CalpontSystemCatalog::SMALLINT:
325 return *reinterpret_cast<short*>(&data);
326
327 case execplan::CalpontSystemCatalog::DATE: // @bug 375
328 case execplan::CalpontSystemCatalog::MEDINT:
329 case execplan::CalpontSystemCatalog::INT:
330 return *reinterpret_cast<int*>(&data);
331
332 case execplan::CalpontSystemCatalog::DATETIME: // @bug 375
333 case execplan::CalpontSystemCatalog::TIME:
334 case execplan::CalpontSystemCatalog::TIMESTAMP:
335 case execplan::CalpontSystemCatalog::BIGINT:
336 return *reinterpret_cast<long long*>(&data);
337
338 case execplan::CalpontSystemCatalog::DECIMAL:
339 {
340 if (colType.colWidth <= CalpontSystemCatalog::FOUR_BYTE) return *reinterpret_cast<short*>(&data);
341
342 else if (colType.colWidth <= 9) return *reinterpret_cast<int*>(&data);
343
344 else return *reinterpret_cast<long long*>(&data);
345 }
346
347 case execplan::CalpontSystemCatalog::FLOAT:
348 return *reinterpret_cast<float*>(&data);
349
350 case execplan::CalpontSystemCatalog::DOUBLE:
351 return *reinterpret_cast<double*>(&data);
352
353 case execplan::CalpontSystemCatalog::CHAR:
354 case execplan::CalpontSystemCatalog::VARCHAR:
355 {
356 string strData(cr->GetStringData(idx) );
357 return *reinterpret_cast<string*>(&strData);
358 }
359
360 default:
361 break;
362 }
363
364 logError("Invalid column type");
365 throw std::runtime_error("Invalid data");
366
367 return *reinterpret_cast<long long*>(&data);
368
369 }
370
371
insertIndex()372 void DDLIndexPopulator::insertIndex( )
373 {
374 // @bug 359 use bulk load build
375 int rc = (1 < fIdxStructList.size()) ?
376 (void)0
377 : (void)0;
378
379 if (rc)
380 logError("Error inserting index values", rc );
381
382 }
383
isDictionaryType(const CalpontSystemCatalog::ColType & colType)384 bool DDLIndexPopulator::isDictionaryType(const CalpontSystemCatalog::ColType& colType)
385 {
386 return ( (CalpontSystemCatalog::CHAR == colType.colDataType && 8 < colType.colWidth )
387 || (CalpontSystemCatalog::VARCHAR == colType.colDataType && 7 < colType.colWidth )
388 || (CalpontSystemCatalog::DECIMAL == colType.colDataType && 18 < colType.precision ));
389
390 }
391
checkConstraints(const IdxTuple & data,const CalpontSystemCatalog::ColType & ctype,int i,int column)392 bool DDLIndexPopulator::checkConstraints( const IdxTuple& data, const CalpontSystemCatalog::ColType& ctype, int i, int column)
393 {
394
395 switch ( fConstraint )
396 {
397 case DDL_INVALID_CONSTRAINT:
398 return true;
399
400 case DDL_UNIQUE:
401 case DDL_PRIMARY_KEY:
402 if ((size_t)column + 1 < fColNames.size() )
403 return true;
404
405 return checkUnique( i, ctype );
406
407 case DDL_NOT_NULL:
408 return checkNotNull( data, ctype );
409
410 case DDL_CHECK:
411 return checkCheck( data, ctype );
412
413 default:
414 return true; //?
415 }
416
417 }
418
419 // Check if the row of data at idx is already in fUniqueColResultList
420
checkUnique(int idx,const CalpontSystemCatalog::ColType & colType)421 bool DDLIndexPopulator::checkUnique( int idx, const CalpontSystemCatalog::ColType& colType )
422 {
423 if (0 == idx)
424 return true;
425
426 //Get row of data as each column result data at idx
427 size_t indexSize = fColNames.size();
428 vector <uint64_t> rowIntData(indexSize);
429 vector <string> rowStrData(indexSize);
430
431 for (size_t i = 0; i < indexSize; ++i)
432 {
433 //if ( isStringType(fUniqueColResultList[i]->columnType()) )
434 if ( isStringType(colType.colDataType) )
435 rowStrData[i] = fUniqueColResultList[i]->GetStringData(idx);
436 else
437 rowIntData[i] = fUniqueColResultList[i]->GetData(idx);
438 }
439
440 //check if each value in the idx row is equal to each value in a previous row
441 // i is the row; j is the column.
442 bool unique = true;
443
444 for (int i = 0; i < idx && unique; ++i)
445 {
446 bool equal = true;
447
448 for (size_t j = 0; j < indexSize && equal; ++j)
449 {
450 if ( isStringType(colType.colDataType) )
451 {
452 equal = fUniqueColResultList[j]->GetStringData(i) == rowStrData[j];
453 }
454 else
455 {
456 equal = (static_cast<uint64_t>(fUniqueColResultList[j]->GetData(i)) == rowIntData[j]);
457 }
458 }
459
460 unique = ! equal;
461 }
462
463 if (! unique)
464 {
465 stringstream ss;
466 ss << idx;
467 logError("Unique Constraint violated on row: " + ss.str() );
468 }
469
470 return unique;
471 }
472
473
checkNotNull(const IdxTuple & data,const CalpontSystemCatalog::ColType & colType)474 bool DDLIndexPopulator::checkNotNull(const IdxTuple& data, const CalpontSystemCatalog::ColType& colType)
475 {
476
477 any nullvalue = DDLNullValueForType(colType);
478 bool isNull = false;
479
480 switch ( colType.colDataType )
481 {
482 case CalpontSystemCatalog::BIT:
483 break;
484
485 case execplan::CalpontSystemCatalog::TINYINT:
486 isNull = any_cast<char>(data.data) == any_cast<char>(nullvalue);
487 break;
488
489 case execplan::CalpontSystemCatalog::SMALLINT:
490 isNull = any_cast<short>(data.data) == any_cast<short>(nullvalue);
491 break;
492
493 case execplan::CalpontSystemCatalog::MEDINT:
494 case execplan::CalpontSystemCatalog::INT:
495 isNull = any_cast<int>(data.data) == any_cast<int>(nullvalue);
496 break;
497
498 case execplan::CalpontSystemCatalog::BIGINT:
499 isNull = any_cast<long long>(data.data) == any_cast<long long>(nullvalue);
500 break;
501
502 case execplan::CalpontSystemCatalog::DECIMAL:
503 {
504 if (colType.colWidth <= CalpontSystemCatalog::FOUR_BYTE)
505 isNull = any_cast<short>(data.data) == any_cast<short>(nullvalue);
506 else if (colType.colWidth <= 9)
507 isNull = any_cast<int>(data.data) == any_cast<int>(nullvalue);
508 else if (colType.colWidth <= 18)
509 isNull = any_cast<long long>(data.data) == any_cast<long long>(nullvalue);
510 else
511 isNull = compareToken(any_cast<WriteEngine::Token>(data.data), any_cast<WriteEngine::Token>(nullvalue));
512
513 break;
514 }
515
516 case execplan::CalpontSystemCatalog::FLOAT:
517 isNull = any_cast<float>(data.data) == any_cast<float>(nullvalue);
518 break;
519
520 case execplan::CalpontSystemCatalog::DOUBLE:
521 isNull = any_cast<double>(data.data) == any_cast<double>(nullvalue);
522 break;
523
524 case execplan::CalpontSystemCatalog::DATE:
525 isNull = any_cast<int>(data.data) == any_cast<int>(nullvalue);
526 break;
527
528 case execplan::CalpontSystemCatalog::DATETIME:
529 case execplan::CalpontSystemCatalog::TIME:
530 case execplan::CalpontSystemCatalog::TIMESTAMP:
531 isNull = any_cast<long long>(data.data) == any_cast<long long>(nullvalue);
532 break;
533
534 case execplan::CalpontSystemCatalog::CHAR:
535 {
536 if (colType.colWidth == execplan::CalpontSystemCatalog::ONE_BYTE)
537 isNull = any_cast<string>(data.data) == any_cast<string>(nullvalue);
538 else if (colType.colWidth == execplan::CalpontSystemCatalog::TWO_BYTE)
539 isNull = any_cast<string>(data.data) == any_cast<string>(nullvalue);
540 else if (colType.colWidth <= execplan::CalpontSystemCatalog::FOUR_BYTE)
541 isNull = any_cast<string>(data.data) == any_cast<string>(nullvalue);
542 else
543 isNull = compareToken(any_cast<WriteEngine::Token>(data.data), any_cast<WriteEngine::Token>(nullvalue));
544
545 break;
546
547 }
548
549 case execplan::CalpontSystemCatalog::VARCHAR:
550 {
551 if (colType.colWidth == execplan::CalpontSystemCatalog::ONE_BYTE)
552 isNull = any_cast<string>(data.data) == any_cast<string>(nullvalue);
553 else if (colType.colWidth < execplan::CalpontSystemCatalog::FOUR_BYTE)
554 isNull = any_cast<string>(data.data) == any_cast<string>(nullvalue);
555 else
556 isNull = compareToken(any_cast<WriteEngine::Token>(data.data), any_cast<WriteEngine::Token>(nullvalue));
557
558 break;
559 }
560
561 default:
562 throw std::runtime_error("getNullValueForType: unkown column data type");
563 }
564
565 if (isNull)
566 logError("Null value not allowed in index");
567
568 return ! isNull;
569
570 }
571
logError(const string & msg,int error)572 void DDLIndexPopulator::logError(const string& msg, int error)
573 {
574
575 Message::Args args;
576 Message message(9);
577 args.add((string)__FILE__ + ": ");
578 args.add(msg);
579
580 if (error)
581 {
582 args.add("Error number: ");
583 args.add(error);
584 }
585
586 message.format( args );
587
588 fResult.result = DDLPackageProcessor::CREATE_ERROR;
589 fResult.message = message;
590 }
591
592
593 } //namespace
594
595
596
597
598
599
600