1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //   $Id: createtableprocessor.cpp 9627 2013-06-18 13:59:21Z rdempsey $
20 
21 #include <unistd.h>
22 #include <string>
23 using namespace std;
24 
25 #include <boost/algorithm/string/case_conv.hpp>
26 
27 #include "createtableprocessor.h"
28 
29 #include "ddlpkg.h"
30 using namespace ddlpackage;
31 
32 #include "we_messages.h"
33 using namespace WriteEngine;
34 
35 #include "oamcache.h"
36 using namespace oam;
37 
38 #include "bytestream.h"
39 using namespace messageqcpp;
40 
41 #include "calpontsystemcatalog.h"
42 using namespace execplan;
43 
44 #include "sqllogger.h"
45 #include "messagelog.h"
46 using namespace logging;
47 
48 namespace ddlpackageprocessor
49 {
50 
processPackage(ddlpackage::CreateTableStatement & createTableStmt)51 CreateTableProcessor::DDLResult CreateTableProcessor::processPackage(
52     ddlpackage::CreateTableStatement& createTableStmt)
53 {
54     SUMMARY_INFO("CreateTableProcessor::processPackage");
55 
56     DDLResult result;
57     BRM::TxnID txnID;
58     txnID.id = fTxnid.id;
59     txnID.valid = fTxnid.valid;
60     result.result = NO_ERROR;
61     int rc1 = 0;
62     rc1 = fDbrm->isReadWrite();
63 
64     if (rc1 != 0 )
65     {
66         Message::Args args;
67         Message message(9);
68         args.add("Unable to execute the statement due to DBRM is read only");
69         message.format(args);
70         result.result = CREATE_ERROR;
71         result.message = message;
72         fSessionManager.rolledback(txnID);
73         return result;
74     }
75 
76     DETAIL_INFO(createTableStmt);
77     ddlpackage::TableDef& tableDef = *createTableStmt.fTableDef;
78     //If schema = CALPONTSYS, do not create table
79 
80     if (tableDef.fQualifiedName->fSchema == CALPONT_SCHEMA)
81     {
82         //release the transaction
83         fSessionManager.rolledback(txnID);
84         return result;
85     }
86 
87     // Commit current transaction.
88     // all DDL statements cause an implicut commit
89     VERBOSE_INFO("Getting current txnID");
90 
91     //Check whether the table is existed already
92     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
93         CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt.fSessionID);
94     execplan::CalpontSystemCatalog::TableName tableName;
95     tableName.schema = tableDef.fQualifiedName->fSchema;
96     tableName.table = tableDef.fQualifiedName->fName;
97     execplan::CalpontSystemCatalog::ROPair roPair;
98     roPair.objnum = 0;
99     ByteStream::byte rc = 0;
100 
101     /** @Bug 217 */
102     /** @Bug 225 */
103     try
104     {
105         roPair = systemCatalogPtr->tableRID(tableName);
106     }
107     catch (IDBExcept& ie)
108     {
109         // TODO: What is and is not an error here?
110         if (ie.errorCode() == ERR_DATA_OFFLINE)
111         {
112             //release transaction
113             fSessionManager.rolledback(txnID);
114             // Return the error for display to user
115             Message::Args args;
116             Message message(9);
117             args.add(ie.what());
118             message.format(args);
119             result.result = CREATE_ERROR;
120             result.message = message;
121             return result;
122         }
123         else if ( ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG)
124         {
125             roPair.objnum = 0;
126         }
127         else //error out
128         {
129             //release transaction
130             fSessionManager.rolledback(txnID);
131             // Return the error for display to user
132             Message::Args args;
133             Message message(9);
134             args.add(ie.what());
135             message.format(args);
136             result.result = CREATE_ERROR;
137             result.message = message;
138             return result;
139         }
140     }
141     catch (std::exception& ex)  //error out
142     {
143         //release transaction
144         fSessionManager.rolledback(txnID);
145         // Return the error for display to user
146         Message::Args args;
147         Message message(9);
148         args.add(ex.what());
149         message.format(args);
150         result.result = CREATE_ERROR;
151         result.message = message;
152         return result;
153     }
154     catch (...) //error out
155     {
156         //release transaction
157         fSessionManager.rolledback(txnID);
158         // Return the error for display to user
159         Message::Args args;
160         Message message(9);
161         args.add("Unknown exception caught when checking if the table name is already in use.");
162         message.format(args);
163         result.result = CREATE_ERROR;
164         result.message = message;
165         return result;
166     }
167 
168     //This is a current db bug, it should not turn OID is it cannot find
169     if (roPair.objnum >= 3000)
170     {
171 #ifdef _MSC_VER
172         //FIXME: Why do we need to do this???
173         systemCatalogPtr->flushCache();
174 
175         try
176         {
177             roPair = systemCatalogPtr->tableRID(tableName);
178         }
179         catch (...)
180         {
181             roPair.objnum = 0;
182         }
183 
184         if (roPair.objnum < 3000)
185             goto keepGoing;
186 
187 #endif
188         Message::Args args;
189         Message message(9);
190         args.add("Internal create table error for");
191         args.add(tableName.toString());
192         args.add(": table already exists");
193         args.add("(your schema is probably out-of-sync)");
194         message.format(args);
195 
196         result.result = CREATE_ERROR;
197         result.message = message;
198         //release the transaction
199         fSessionManager.rolledback(txnID);
200         return result;
201     }
202 
203 #ifdef _MSC_VER
204 keepGoing:
205 #endif
206     // Start a new transaction
207     VERBOSE_INFO("Starting a new transaction");
208 
209     string stmt = createTableStmt.fSql + "|" + tableDef.fQualifiedName->fSchema + "|";
210     SQLLogger logger(stmt, fDDLLoggingId, createTableStmt.fSessionID, txnID.id);
211 
212 
213     std::string err;
214     execplan::ObjectIDManager fObjectIDManager;
215     OamCache* oamcache = OamCache::makeOamCache();
216     string errorMsg;
217     //get a unique number
218     uint64_t uniqueId = 0;
219 
220     //Bug 5070. Added exception handling
221     try
222     {
223         uniqueId = fDbrm->getUnique64();
224     }
225     catch (std::exception& ex)
226     {
227         Message::Args args;
228         Message message(9);
229         args.add(ex.what());
230         message.format(args);
231         result.result = CREATE_ERROR;
232         result.message = message;
233         fSessionManager.rolledback(txnID);
234         return result;
235     }
236     catch ( ... )
237     {
238         Message::Args args;
239         Message message(9);
240         args.add("Unknown error occured while getting unique number.");
241         message.format(args);
242         result.result = CREATE_ERROR;
243         result.message = message;
244         fSessionManager.rolledback(txnID);
245         return result;
246     }
247 
248     fWEClient->addQueue(uniqueId);
249 
250     try
251     {
252         //Allocate tableoid table identification
253         VERBOSE_INFO("Allocating object ID for table");
254         // Allocate a object ID for each column we are about to create
255         VERBOSE_INFO("Allocating object IDs for columns");
256         uint32_t numColumns = tableDef.fColumns.size();
257         uint32_t numDictCols = 0;
258 
259         for (unsigned i = 0; i < numColumns; i++)
260         {
261             int dataType;
262             dataType = convertDataType(tableDef.fColumns[i]->fType->fType);
263 
264             if ( (dataType == CalpontSystemCatalog::CHAR && tableDef.fColumns[i]->fType->fLength > 8) ||
265                     (dataType == CalpontSystemCatalog::VARCHAR && tableDef.fColumns[i]->fType->fLength > 7) ||
266                     (dataType == CalpontSystemCatalog::VARBINARY && tableDef.fColumns[i]->fType->fLength > 7) ||
267                     (dataType == CalpontSystemCatalog::BLOB && tableDef.fColumns[i]->fType->fLength > 7) ||
268                     (dataType == CalpontSystemCatalog::TEXT && tableDef.fColumns[i]->fType->fLength > 7) )
269                 numDictCols++;
270         }
271 
272         fStartingColOID = fObjectIDManager.allocOIDs(numColumns + numDictCols + 1); //include column, oids,dictionary oids and tableoid
273 #ifdef IDB_DDL_DEBUG
274         cout << fTxnid.id << " Create table allocOIDs got the starting oid " << fStartingColOID << endl;
275 #endif
276 
277         if (fStartingColOID < 0)
278         {
279             result.result = CREATE_ERROR;
280             errorMsg = "Error in getting objectid from oidmanager.";
281             Message::Args args;
282             Message message(9);
283             args.add("(1)Create table failed due to ");
284             args.add(errorMsg);
285             message.format(args);
286             result.message = message;
287             fSessionManager.rolledback(txnID);
288             return result;
289         }
290 
291         // Write the table metadata to the systemtable
292         VERBOSE_INFO("Writing meta data to SYSTABLE");
293         ByteStream bytestream;
294         bytestream << (ByteStream::byte)WE_SVR_WRITE_SYSTABLE;
295         bytestream << uniqueId;
296         bytestream << (uint32_t) createTableStmt.fSessionID;
297         bytestream << (uint32_t)txnID.id;
298         bytestream << (uint32_t)fStartingColOID;
299         bytestream << (uint32_t)createTableStmt.fTableWithAutoi;
300         uint16_t  dbRoot;
301         BRM::OID_t sysOid = 1001;
302         //Find out where systable is
303         rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
304 
305         if (rc != 0)
306         {
307             result.result = (ResultCode) rc;
308             Message::Args args;
309             Message message(9);
310             args.add("Error while calling getSysCatDBRoot ");
311             args.add(errorMsg);
312             message.format(args);
313             result.message = message;
314             //release transaction
315             fSessionManager.rolledback(txnID);
316             return result;
317         }
318 
319         int pmNum = 1;
320         bytestream << (uint32_t)dbRoot;
321         tableDef.serialize(bytestream);
322         boost::shared_ptr<messageqcpp::ByteStream> bsIn;
323         boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
324         pmNum = (*dbRootPMMap)[dbRoot];
325         // MCOL-66 The DBRM can't handle concurrent DDL
326         boost::mutex::scoped_lock lk(dbrmMutex);
327 
328         try
329         {
330 #ifdef IDB_DDL_DEBUG
331             cout << fTxnid.id << " create table sending We_SVR_WRITE_SYSTABLE to pm " << pmNum << endl;
332 #endif
333             fWEClient->write(bytestream, (unsigned)pmNum);
334 
335             while (1)
336             {
337                 bsIn.reset(new ByteStream());
338                 fWEClient->read(uniqueId, bsIn);
339 
340                 if ( bsIn->length() == 0 ) //read error
341                 {
342                     rc = NETWORK_ERROR;
343                     errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
344                     break;
345                 }
346                 else
347                 {
348                     *bsIn >> rc;
349 
350                     if (rc != 0)
351                     {
352                         errorMsg.clear();
353                         *bsIn >> errorMsg;
354 #ifdef IDB_DDL_DEBUG
355                         cout << fTxnid.id << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
356 #endif
357                     }
358 
359                     break;
360                 }
361             }
362         }
363         catch (runtime_error& ex) //write error
364         {
365 #ifdef IDB_DDL_DEBUG
366             cout << fTxnid.id << " create table got exception" << ex.what() << endl;
367 #endif
368             rc = NETWORK_ERROR;
369             errorMsg = ex.what();
370         }
371         catch (...)
372         {
373             rc = NETWORK_ERROR;
374 #ifdef IDB_DDL_DEBUG
375             cout << "create table got unknown exception" << endl;
376 #endif
377         }
378 
379         if (rc != 0)
380         {
381 #ifdef IDB_DDL_DEBUG
382             cout << fTxnid.id << " Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
383 #endif
384             result.result = (ResultCode) rc;
385             Message::Args args;
386             Message message(9);
387             args.add("(2)Create table failed due to ");
388             args.add(errorMsg);
389             message.format( args );
390             result.message = message;
391 
392             if (rc != NETWORK_ERROR)
393             {
394                 rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID );	//What to do with the error code
395             }
396 
397             //release transaction
398             fSessionManager.rolledback(txnID);
399             return result;
400         }
401 
402         VERBOSE_INFO("Writing meta data to SYSCOLUMN");
403         bytestream.restart();
404         bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATE_SYSCOLUMN;
405         bytestream << uniqueId;
406         bytestream << (uint32_t) createTableStmt.fSessionID;
407         bytestream << (uint32_t)txnID.id;
408         bytestream << numColumns;
409 
410         for (unsigned i = 0; i < numColumns; ++i)
411         {
412             bytestream << (uint32_t)(fStartingColOID + i + 1);
413         }
414 
415         bytestream << numDictCols;
416 
417         for (unsigned i = 0; i < numDictCols; ++i)
418         {
419             bytestream << (uint32_t)(fStartingColOID + numColumns + i + 1);
420         }
421 
422         uint8_t alterFlag = 0;
423         int colPos = 0;
424         bytestream << (ByteStream::byte)alterFlag;
425         bytestream << (uint32_t)colPos;
426 
427         sysOid = 1021;
428         //Find out where syscolumn is
429         rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
430 
431         if (rc != 0)
432         {
433             result.result = (ResultCode) rc;
434             Message::Args args;
435             Message message(9);
436             args.add("Error while calling getSysCatDBRoot ");
437             args.add(errorMsg);
438             message.format(args);
439             result.message = message;
440             //release transaction
441             fSessionManager.rolledback(txnID);
442             return result;
443         }
444 
445         bytestream << (uint32_t)dbRoot;
446         tableDef.serialize(bytestream);
447         pmNum = (*dbRootPMMap)[dbRoot];
448 
449         try
450         {
451 #ifdef IDB_DDL_DEBUG
452             cout << fTxnid.id << " create table sending WE_SVR_WRITE_CREATE_SYSCOLUMN to pm " << pmNum << endl;
453 #endif
454             fWEClient->write(bytestream, (uint32_t)pmNum);
455 
456             while (1)
457             {
458                 bsIn.reset(new ByteStream());
459                 fWEClient->read(uniqueId, bsIn);
460 
461                 if ( bsIn->length() == 0 ) //read error
462                 {
463                     rc = NETWORK_ERROR;
464                     errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
465                     break;
466                 }
467                 else
468                 {
469                     *bsIn >> rc;
470 
471                     if (rc != 0)
472                     {
473                         errorMsg.clear();
474                         *bsIn >> errorMsg;
475 #ifdef IDB_DDL_DEBUG
476                         cout << fTxnid.id << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
477 #endif
478                     }
479 
480                     break;
481                 }
482             }
483         }
484         catch (runtime_error& ex) //write error
485         {
486 #ifdef IDB_DDL_DEBUG
487             cout << fTxnid.id << " create table got exception" << ex.what() << endl;
488 #endif
489             rc = NETWORK_ERROR;
490             errorMsg = ex.what();
491         }
492         catch (...)
493         {
494             rc = NETWORK_ERROR;
495 #ifdef IDB_DDL_DEBUG
496             cout << fTxnid.id << " create table got unknown exception" << endl;
497 #endif
498         }
499 
500         if (rc != 0)
501         {
502 #ifdef IDB_DDL_DEBUG
503             cout << fTxnid.id << " Create table WE_SVR_WRITE_CREATE_SYSCOLUMN: " << errorMsg << endl;
504 #endif
505             result.result = (ResultCode) rc;
506             Message::Args args;
507             Message message(9);
508             args.add("(3)Create table failed due to ");
509             args.add(errorMsg);
510             message.format( args );
511             result.message = message;
512 
513             if (rc != NETWORK_ERROR)
514             {
515                 rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID );	//What to do with the error code
516             }
517 
518             //release transaction
519             fSessionManager.rolledback(txnID);
520             return result;
521         }
522 
523 
524         //Get the number of tables in the database, the current table is included.
525         int tableCount = systemCatalogPtr->getTableCount();
526 
527         //Calculate which dbroot the columns should start
528         DBRootConfigList dbRootList = oamcache->getDBRootNums();
529 
530         uint16_t useDBRootIndex = tableCount % dbRootList.size();
531         //Find out the dbroot# corresponding the useDBRootIndex from oam
532         uint16_t useDBRoot = dbRootList[useDBRootIndex];
533 
534         VERBOSE_INFO("Creating column files");
535         ColumnDef* colDefPtr;
536         ddlpackage::ColumnDefList tableDefCols = tableDef.fColumns;
537         ColumnDefList::const_iterator iter = tableDefCols.begin();
538         bytestream.restart();
539         bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES;
540         bytestream << uniqueId;
541         bytestream << (uint32_t)txnID.id;
542         bytestream << (numColumns + numDictCols);
543         unsigned colNum = 0;
544         unsigned dictNum = 0;
545 
546         while (iter != tableDefCols.end())
547         {
548             colDefPtr = *iter;
549 
550             CalpontSystemCatalog::ColDataType dataType = convertDataType(colDefPtr->fType->fType);
551 
552             if (dataType == CalpontSystemCatalog::DECIMAL ||
553                     dataType == CalpontSystemCatalog::UDECIMAL)
554             {
555                 if (colDefPtr->fType->fPrecision == -1 || colDefPtr->fType->fPrecision == 0)
556                 {
557                     colDefPtr->fType->fLength = 8;
558                 }
559                 else if ((colDefPtr->fType->fPrecision > 0) && (colDefPtr->fType->fPrecision < 3))
560                 {
561                     colDefPtr->fType->fLength = 1;
562                 }
563 
564                 else if (colDefPtr->fType->fPrecision < 5 && (colDefPtr->fType->fPrecision > 2))
565                 {
566                     colDefPtr->fType->fLength = 2;
567                 }
568                 else if (colDefPtr->fType->fPrecision > 4 && colDefPtr->fType->fPrecision < 10)
569                 {
570                     colDefPtr->fType->fLength = 4;
571                 }
572                 else if (colDefPtr->fType->fPrecision > 9 && colDefPtr->fType->fPrecision < 19)
573                 {
574                     colDefPtr->fType->fLength = 8;
575                 }
576             }
577 
578             bytestream << (fStartingColOID + (colNum++) + 1);
579             bytestream << (uint8_t) dataType;
580             bytestream << (uint8_t) false;
581 
582             bytestream << (uint32_t) colDefPtr->fType->fLength;
583             bytestream << (uint16_t) useDBRoot;
584             bytestream << (uint32_t) colDefPtr->fType->fCompressiontype;
585 
586             if ( (dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) ||
587                     (dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) ||
588                     (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ||
589                     (dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) ||
590                     (dataType == CalpontSystemCatalog::TEXT && colDefPtr->fType->fLength > 7) )
591             {
592                 bytestream << (uint32_t) (fStartingColOID + numColumns + (dictNum++) + 1);
593                 bytestream << (uint8_t) dataType;
594                 bytestream << (uint8_t) true;
595                 bytestream << (uint32_t) colDefPtr->fType->fLength;
596                 bytestream << (uint16_t) useDBRoot;
597                 bytestream << (uint32_t) colDefPtr->fType->fCompressiontype;
598             }
599 
600             ++iter;
601         }
602 
603         //@Bug 4176. save oids to a log file for cleanup after fail over.
604         std::vector <CalpontSystemCatalog::OID> oidList;
605 
606         for (unsigned i = 0; i < numColumns; ++i)
607         {
608             oidList.push_back(fStartingColOID + i + 1);
609         }
610 
611         bytestream << numDictCols;
612 
613         for (unsigned i = 0; i < numDictCols; ++i)
614         {
615             oidList.push_back(fStartingColOID + numColumns + i + 1);
616         }
617 
618         try
619         {
620             createWriteDropLogFile( fStartingColOID, uniqueId, oidList );
621         }
622         catch (std::exception& ex)
623         {
624             result.result = (ResultCode) rc;
625             Message::Args args;
626             Message message(9);
627             args.add("(4)Create table failed due to ");
628             args.add(ex.what());
629             message.format( args );
630             result.message = message;
631 
632             if (rc != NETWORK_ERROR)
633             {
634                 rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID );	//What to do with the error code
635             }
636 
637             //release transaction
638             fSessionManager.rolledback(txnID);
639             return result;
640         }
641 
642         pmNum = (*dbRootPMMap)[useDBRoot];
643 
644         try
645         {
646 #ifdef IDB_DDL_DEBUG
647             cout << fTxnid.id << " create table sending WE_SVR_WRITE_CREATETABLEFILES to pm " << pmNum << endl;
648 #endif
649             fWEClient->write(bytestream, pmNum);
650 
651             while (1)
652             {
653                 bsIn.reset(new ByteStream());
654                 fWEClient->read(uniqueId, bsIn);
655 
656                 if ( bsIn->length() == 0 ) //read error
657                 {
658                     rc = NETWORK_ERROR;
659                     errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
660                     break;
661                 }
662                 else
663                 {
664                     *bsIn >> rc;
665 
666                     if (rc != 0)
667                     {
668                         errorMsg.clear();
669                         *bsIn >> errorMsg;
670 #ifdef IDB_DDL_DEBUG
671                         cout << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
672 #endif
673                     }
674 
675                     break;
676                 }
677             }
678 
679             if (rc != 0)
680             {
681                 //drop the newly created files
682                 bytestream.restart();
683                 bytestream << (ByteStream::byte) WE_SVR_WRITE_DROPFILES;
684                 bytestream << uniqueId;
685                 bytestream << (uint32_t)(numColumns + numDictCols);
686 
687                 for (unsigned i = 0; i < (numColumns + numDictCols); i++)
688                 {
689                     bytestream << (uint32_t)(fStartingColOID + i + 1);
690                 }
691 
692                 fWEClient->write(bytestream, pmNum);
693 
694                 while (1)
695                 {
696                     bsIn.reset(new ByteStream());
697                     fWEClient->read(uniqueId, bsIn);
698 
699                     if ( bsIn->length() == 0 ) //read error
700                     {
701                         break;
702                     }
703                     else
704                     {
705                         break;
706                     }
707                 }
708 
709                 //@Bug 5464. Delete from extent map.
710                 fDbrm->deleteOIDs(oidList);
711 
712             }
713         }
714         catch (runtime_error&)
715         {
716             errorMsg = "Lost connection to Write Engine Server";
717         }
718 
719         if (rc != 0)
720         {
721 #ifdef IDB_DDL_DEBUG
722             cout << fTxnid.id << " Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
723 #endif
724             rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID); //What to do with the error code
725             fSessionManager.rolledback(txnID);
726         }
727         else
728         {
729             commitTransaction(uniqueId, txnID);
730             fSessionManager.committed(txnID);
731             fWEClient->removeQueue(uniqueId);
732             deleteLogFile(DROPTABLE_LOG, fStartingColOID, uniqueId);
733         }
734 
735         // Log the DDL statement.
736         logDDL(createTableStmt.fSessionID, txnID.id, createTableStmt.fSql, createTableStmt.fOwner);
737     }
738     catch (std::exception& ex)
739     {
740         result.result = CREATE_ERROR;
741         Message::Args args;
742         Message message(9);
743         args.add("(5)Create table failed due to ");
744         args.add(ex.what());
745         message.format( args );
746         result.message = message;
747         fSessionManager.rolledback(txnID);
748         fWEClient->removeQueue(uniqueId);
749         return result;
750     }
751 
752     //fWEClient->removeQueue(uniqueId);
753     if (rc != 0)
754     {
755         result.result = CREATE_ERROR;
756         Message::Args args;
757         Message message(9);
758         args.add("(6)Create table failed due to ");
759         args.add(errorMsg);
760         message.format( args );
761         result.message = message;
762     }
763 
764     return result;
765 }
766 
rollBackCreateTable(const string & error,BRM::TxnID txnID,int sessionId,ddlpackage::TableDef & tableDef,DDLResult & result)767 void CreateTableProcessor::rollBackCreateTable(const string& error, BRM::TxnID txnID, int sessionId,
768         ddlpackage::TableDef& tableDef, DDLResult& result)
769 {
770     cerr << "CreatetableProcessor::processPackage: " << error << endl;
771 
772     Message::Args args;
773     Message message(1);
774     args.add("(7)Create table Failed: ");
775     args.add(error);
776     args.add("");
777     args.add("");
778     message.format(args);
779 
780     result.result = CREATE_ERROR;
781     result.message = message;
782 
783     fWriteEngine.rollbackTran(txnID.id, sessionId);
784 
785     size_t size = tableDef.fColumns.size();
786 
787     for (size_t i = 0; i < size; ++i)
788     {
789         fWriteEngine.dropColumn(txnID.id, fStartingColOID + i);
790     }
791 
792     try
793     {
794         execplan::ObjectIDManager fObjectIDManager;
795         fObjectIDManager.returnOID(fTableOID);
796         fObjectIDManager.returnOIDs(fStartingColOID,
797                                     fStartingColOID + tableDef.fColumns.size() - 1);
798     }
799     catch (std::exception& ex)
800     {
801         Message::Args args;
802         Message message(6);
803         args.add(ex.what());
804         message.format(args);
805         result.message = message;
806         result.result = CREATE_ERROR;
807     }
808     catch (...)
809     {
810         Message::Args args;
811         Message message(6);
812         args.add("Unknown exception");
813         message.format(args);
814         result.message = message;
815         result.result = CREATE_ERROR;
816         //cout << "returnOIDs error" << endl;
817     }
818 
819     DictionaryOIDList::const_iterator dictoid_iter = fDictionaryOIDList.begin();
820 
821     while (dictoid_iter != fDictionaryOIDList.end())
822     {
823         DictOID dictOID = *dictoid_iter;
824         fWriteEngine.dropDctnry(txnID.id, dictOID.dictOID, dictOID.treeOID, dictOID.listOID);
825         //fObjectIDManager.returnOID(dictOID.dictOID);
826 
827         ++dictoid_iter;
828     }
829 
830     fSessionManager.rolledback(txnID);
831 }
832 
833 } // namespace ddlpackageprocessor
834 // vim:ts=4 sw=4:
835