1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: createtableprocessor.cpp 9627 2013-06-18 13:59:21Z rdempsey $
20
21 #include <unistd.h>
22 #include <string>
23 using namespace std;
24
25 #include <boost/algorithm/string/case_conv.hpp>
26
27 #include "createtableprocessor.h"
28
29 #include "ddlpkg.h"
30 using namespace ddlpackage;
31
32 #include "we_messages.h"
33 using namespace WriteEngine;
34
35 #include "oamcache.h"
36 using namespace oam;
37
38 #include "bytestream.h"
39 using namespace messageqcpp;
40
41 #include "calpontsystemcatalog.h"
42 using namespace execplan;
43
44 #include "sqllogger.h"
45 #include "messagelog.h"
46 using namespace logging;
47
48 namespace ddlpackageprocessor
49 {
50
processPackage(ddlpackage::CreateTableStatement & createTableStmt)51 CreateTableProcessor::DDLResult CreateTableProcessor::processPackage(
52 ddlpackage::CreateTableStatement& createTableStmt)
53 {
54 SUMMARY_INFO("CreateTableProcessor::processPackage");
55
56 DDLResult result;
57 BRM::TxnID txnID;
58 txnID.id = fTxnid.id;
59 txnID.valid = fTxnid.valid;
60 result.result = NO_ERROR;
61 int rc1 = 0;
62 rc1 = fDbrm->isReadWrite();
63
64 if (rc1 != 0 )
65 {
66 Message::Args args;
67 Message message(9);
68 args.add("Unable to execute the statement due to DBRM is read only");
69 message.format(args);
70 result.result = CREATE_ERROR;
71 result.message = message;
72 fSessionManager.rolledback(txnID);
73 return result;
74 }
75
76 DETAIL_INFO(createTableStmt);
77 ddlpackage::TableDef& tableDef = *createTableStmt.fTableDef;
78 //If schema = CALPONTSYS, do not create table
79
80 if (tableDef.fQualifiedName->fSchema == CALPONT_SCHEMA)
81 {
82 //release the transaction
83 fSessionManager.rolledback(txnID);
84 return result;
85 }
86
87 // Commit current transaction.
88 // all DDL statements cause an implicut commit
89 VERBOSE_INFO("Getting current txnID");
90
91 //Check whether the table is existed already
92 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
93 CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt.fSessionID);
94 execplan::CalpontSystemCatalog::TableName tableName;
95 tableName.schema = tableDef.fQualifiedName->fSchema;
96 tableName.table = tableDef.fQualifiedName->fName;
97 execplan::CalpontSystemCatalog::ROPair roPair;
98 roPair.objnum = 0;
99 ByteStream::byte rc = 0;
100
101 /** @Bug 217 */
102 /** @Bug 225 */
103 try
104 {
105 roPair = systemCatalogPtr->tableRID(tableName);
106 }
107 catch (IDBExcept& ie)
108 {
109 // TODO: What is and is not an error here?
110 if (ie.errorCode() == ERR_DATA_OFFLINE)
111 {
112 //release transaction
113 fSessionManager.rolledback(txnID);
114 // Return the error for display to user
115 Message::Args args;
116 Message message(9);
117 args.add(ie.what());
118 message.format(args);
119 result.result = CREATE_ERROR;
120 result.message = message;
121 return result;
122 }
123 else if ( ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG)
124 {
125 roPair.objnum = 0;
126 }
127 else //error out
128 {
129 //release transaction
130 fSessionManager.rolledback(txnID);
131 // Return the error for display to user
132 Message::Args args;
133 Message message(9);
134 args.add(ie.what());
135 message.format(args);
136 result.result = CREATE_ERROR;
137 result.message = message;
138 return result;
139 }
140 }
141 catch (std::exception& ex) //error out
142 {
143 //release transaction
144 fSessionManager.rolledback(txnID);
145 // Return the error for display to user
146 Message::Args args;
147 Message message(9);
148 args.add(ex.what());
149 message.format(args);
150 result.result = CREATE_ERROR;
151 result.message = message;
152 return result;
153 }
154 catch (...) //error out
155 {
156 //release transaction
157 fSessionManager.rolledback(txnID);
158 // Return the error for display to user
159 Message::Args args;
160 Message message(9);
161 args.add("Unknown exception caught when checking if the table name is already in use.");
162 message.format(args);
163 result.result = CREATE_ERROR;
164 result.message = message;
165 return result;
166 }
167
168 //This is a current db bug, it should not turn OID is it cannot find
169 if (roPair.objnum >= 3000)
170 {
171 #ifdef _MSC_VER
172 //FIXME: Why do we need to do this???
173 systemCatalogPtr->flushCache();
174
175 try
176 {
177 roPair = systemCatalogPtr->tableRID(tableName);
178 }
179 catch (...)
180 {
181 roPair.objnum = 0;
182 }
183
184 if (roPair.objnum < 3000)
185 goto keepGoing;
186
187 #endif
188 Message::Args args;
189 Message message(9);
190 args.add("Internal create table error for");
191 args.add(tableName.toString());
192 args.add(": table already exists");
193 args.add("(your schema is probably out-of-sync)");
194 message.format(args);
195
196 result.result = CREATE_ERROR;
197 result.message = message;
198 //release the transaction
199 fSessionManager.rolledback(txnID);
200 return result;
201 }
202
203 #ifdef _MSC_VER
204 keepGoing:
205 #endif
206 // Start a new transaction
207 VERBOSE_INFO("Starting a new transaction");
208
209 string stmt = createTableStmt.fSql + "|" + tableDef.fQualifiedName->fSchema + "|";
210 SQLLogger logger(stmt, fDDLLoggingId, createTableStmt.fSessionID, txnID.id);
211
212
213 std::string err;
214 execplan::ObjectIDManager fObjectIDManager;
215 OamCache* oamcache = OamCache::makeOamCache();
216 string errorMsg;
217 //get a unique number
218 uint64_t uniqueId = 0;
219
220 //Bug 5070. Added exception handling
221 try
222 {
223 uniqueId = fDbrm->getUnique64();
224 }
225 catch (std::exception& ex)
226 {
227 Message::Args args;
228 Message message(9);
229 args.add(ex.what());
230 message.format(args);
231 result.result = CREATE_ERROR;
232 result.message = message;
233 fSessionManager.rolledback(txnID);
234 return result;
235 }
236 catch ( ... )
237 {
238 Message::Args args;
239 Message message(9);
240 args.add("Unknown error occured while getting unique number.");
241 message.format(args);
242 result.result = CREATE_ERROR;
243 result.message = message;
244 fSessionManager.rolledback(txnID);
245 return result;
246 }
247
248 fWEClient->addQueue(uniqueId);
249
250 try
251 {
252 //Allocate tableoid table identification
253 VERBOSE_INFO("Allocating object ID for table");
254 // Allocate a object ID for each column we are about to create
255 VERBOSE_INFO("Allocating object IDs for columns");
256 uint32_t numColumns = tableDef.fColumns.size();
257 uint32_t numDictCols = 0;
258
259 for (unsigned i = 0; i < numColumns; i++)
260 {
261 int dataType;
262 dataType = convertDataType(tableDef.fColumns[i]->fType->fType);
263
264 if ( (dataType == CalpontSystemCatalog::CHAR && tableDef.fColumns[i]->fType->fLength > 8) ||
265 (dataType == CalpontSystemCatalog::VARCHAR && tableDef.fColumns[i]->fType->fLength > 7) ||
266 (dataType == CalpontSystemCatalog::VARBINARY && tableDef.fColumns[i]->fType->fLength > 7) ||
267 (dataType == CalpontSystemCatalog::BLOB && tableDef.fColumns[i]->fType->fLength > 7) ||
268 (dataType == CalpontSystemCatalog::TEXT && tableDef.fColumns[i]->fType->fLength > 7) )
269 numDictCols++;
270 }
271
272 fStartingColOID = fObjectIDManager.allocOIDs(numColumns + numDictCols + 1); //include column, oids,dictionary oids and tableoid
273 #ifdef IDB_DDL_DEBUG
274 cout << fTxnid.id << " Create table allocOIDs got the starting oid " << fStartingColOID << endl;
275 #endif
276
277 if (fStartingColOID < 0)
278 {
279 result.result = CREATE_ERROR;
280 errorMsg = "Error in getting objectid from oidmanager.";
281 Message::Args args;
282 Message message(9);
283 args.add("(1)Create table failed due to ");
284 args.add(errorMsg);
285 message.format(args);
286 result.message = message;
287 fSessionManager.rolledback(txnID);
288 return result;
289 }
290
291 // Write the table metadata to the systemtable
292 VERBOSE_INFO("Writing meta data to SYSTABLE");
293 ByteStream bytestream;
294 bytestream << (ByteStream::byte)WE_SVR_WRITE_SYSTABLE;
295 bytestream << uniqueId;
296 bytestream << (uint32_t) createTableStmt.fSessionID;
297 bytestream << (uint32_t)txnID.id;
298 bytestream << (uint32_t)fStartingColOID;
299 bytestream << (uint32_t)createTableStmt.fTableWithAutoi;
300 uint16_t dbRoot;
301 BRM::OID_t sysOid = 1001;
302 //Find out where systable is
303 rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
304
305 if (rc != 0)
306 {
307 result.result = (ResultCode) rc;
308 Message::Args args;
309 Message message(9);
310 args.add("Error while calling getSysCatDBRoot ");
311 args.add(errorMsg);
312 message.format(args);
313 result.message = message;
314 //release transaction
315 fSessionManager.rolledback(txnID);
316 return result;
317 }
318
319 int pmNum = 1;
320 bytestream << (uint32_t)dbRoot;
321 tableDef.serialize(bytestream);
322 boost::shared_ptr<messageqcpp::ByteStream> bsIn;
323 boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
324 pmNum = (*dbRootPMMap)[dbRoot];
325 // MCOL-66 The DBRM can't handle concurrent DDL
326 boost::mutex::scoped_lock lk(dbrmMutex);
327
328 try
329 {
330 #ifdef IDB_DDL_DEBUG
331 cout << fTxnid.id << " create table sending We_SVR_WRITE_SYSTABLE to pm " << pmNum << endl;
332 #endif
333 fWEClient->write(bytestream, (unsigned)pmNum);
334
335 while (1)
336 {
337 bsIn.reset(new ByteStream());
338 fWEClient->read(uniqueId, bsIn);
339
340 if ( bsIn->length() == 0 ) //read error
341 {
342 rc = NETWORK_ERROR;
343 errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
344 break;
345 }
346 else
347 {
348 *bsIn >> rc;
349
350 if (rc != 0)
351 {
352 errorMsg.clear();
353 *bsIn >> errorMsg;
354 #ifdef IDB_DDL_DEBUG
355 cout << fTxnid.id << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
356 #endif
357 }
358
359 break;
360 }
361 }
362 }
363 catch (runtime_error& ex) //write error
364 {
365 #ifdef IDB_DDL_DEBUG
366 cout << fTxnid.id << " create table got exception" << ex.what() << endl;
367 #endif
368 rc = NETWORK_ERROR;
369 errorMsg = ex.what();
370 }
371 catch (...)
372 {
373 rc = NETWORK_ERROR;
374 #ifdef IDB_DDL_DEBUG
375 cout << "create table got unknown exception" << endl;
376 #endif
377 }
378
379 if (rc != 0)
380 {
381 #ifdef IDB_DDL_DEBUG
382 cout << fTxnid.id << " Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
383 #endif
384 result.result = (ResultCode) rc;
385 Message::Args args;
386 Message message(9);
387 args.add("(2)Create table failed due to ");
388 args.add(errorMsg);
389 message.format( args );
390 result.message = message;
391
392 if (rc != NETWORK_ERROR)
393 {
394 rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID ); //What to do with the error code
395 }
396
397 //release transaction
398 fSessionManager.rolledback(txnID);
399 return result;
400 }
401
402 VERBOSE_INFO("Writing meta data to SYSCOLUMN");
403 bytestream.restart();
404 bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATE_SYSCOLUMN;
405 bytestream << uniqueId;
406 bytestream << (uint32_t) createTableStmt.fSessionID;
407 bytestream << (uint32_t)txnID.id;
408 bytestream << numColumns;
409
410 for (unsigned i = 0; i < numColumns; ++i)
411 {
412 bytestream << (uint32_t)(fStartingColOID + i + 1);
413 }
414
415 bytestream << numDictCols;
416
417 for (unsigned i = 0; i < numDictCols; ++i)
418 {
419 bytestream << (uint32_t)(fStartingColOID + numColumns + i + 1);
420 }
421
422 uint8_t alterFlag = 0;
423 int colPos = 0;
424 bytestream << (ByteStream::byte)alterFlag;
425 bytestream << (uint32_t)colPos;
426
427 sysOid = 1021;
428 //Find out where syscolumn is
429 rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
430
431 if (rc != 0)
432 {
433 result.result = (ResultCode) rc;
434 Message::Args args;
435 Message message(9);
436 args.add("Error while calling getSysCatDBRoot ");
437 args.add(errorMsg);
438 message.format(args);
439 result.message = message;
440 //release transaction
441 fSessionManager.rolledback(txnID);
442 return result;
443 }
444
445 bytestream << (uint32_t)dbRoot;
446 tableDef.serialize(bytestream);
447 pmNum = (*dbRootPMMap)[dbRoot];
448
449 try
450 {
451 #ifdef IDB_DDL_DEBUG
452 cout << fTxnid.id << " create table sending WE_SVR_WRITE_CREATE_SYSCOLUMN to pm " << pmNum << endl;
453 #endif
454 fWEClient->write(bytestream, (uint32_t)pmNum);
455
456 while (1)
457 {
458 bsIn.reset(new ByteStream());
459 fWEClient->read(uniqueId, bsIn);
460
461 if ( bsIn->length() == 0 ) //read error
462 {
463 rc = NETWORK_ERROR;
464 errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
465 break;
466 }
467 else
468 {
469 *bsIn >> rc;
470
471 if (rc != 0)
472 {
473 errorMsg.clear();
474 *bsIn >> errorMsg;
475 #ifdef IDB_DDL_DEBUG
476 cout << fTxnid.id << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
477 #endif
478 }
479
480 break;
481 }
482 }
483 }
484 catch (runtime_error& ex) //write error
485 {
486 #ifdef IDB_DDL_DEBUG
487 cout << fTxnid.id << " create table got exception" << ex.what() << endl;
488 #endif
489 rc = NETWORK_ERROR;
490 errorMsg = ex.what();
491 }
492 catch (...)
493 {
494 rc = NETWORK_ERROR;
495 #ifdef IDB_DDL_DEBUG
496 cout << fTxnid.id << " create table got unknown exception" << endl;
497 #endif
498 }
499
500 if (rc != 0)
501 {
502 #ifdef IDB_DDL_DEBUG
503 cout << fTxnid.id << " Create table WE_SVR_WRITE_CREATE_SYSCOLUMN: " << errorMsg << endl;
504 #endif
505 result.result = (ResultCode) rc;
506 Message::Args args;
507 Message message(9);
508 args.add("(3)Create table failed due to ");
509 args.add(errorMsg);
510 message.format( args );
511 result.message = message;
512
513 if (rc != NETWORK_ERROR)
514 {
515 rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID ); //What to do with the error code
516 }
517
518 //release transaction
519 fSessionManager.rolledback(txnID);
520 return result;
521 }
522
523
524 //Get the number of tables in the database, the current table is included.
525 int tableCount = systemCatalogPtr->getTableCount();
526
527 //Calculate which dbroot the columns should start
528 DBRootConfigList dbRootList = oamcache->getDBRootNums();
529
530 uint16_t useDBRootIndex = tableCount % dbRootList.size();
531 //Find out the dbroot# corresponding the useDBRootIndex from oam
532 uint16_t useDBRoot = dbRootList[useDBRootIndex];
533
534 VERBOSE_INFO("Creating column files");
535 ColumnDef* colDefPtr;
536 ddlpackage::ColumnDefList tableDefCols = tableDef.fColumns;
537 ColumnDefList::const_iterator iter = tableDefCols.begin();
538 bytestream.restart();
539 bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES;
540 bytestream << uniqueId;
541 bytestream << (uint32_t)txnID.id;
542 bytestream << (numColumns + numDictCols);
543 unsigned colNum = 0;
544 unsigned dictNum = 0;
545
546 while (iter != tableDefCols.end())
547 {
548 colDefPtr = *iter;
549
550 CalpontSystemCatalog::ColDataType dataType = convertDataType(colDefPtr->fType->fType);
551
552 if (dataType == CalpontSystemCatalog::DECIMAL ||
553 dataType == CalpontSystemCatalog::UDECIMAL)
554 {
555 if (colDefPtr->fType->fPrecision == -1 || colDefPtr->fType->fPrecision == 0)
556 {
557 colDefPtr->fType->fLength = 8;
558 }
559 else if ((colDefPtr->fType->fPrecision > 0) && (colDefPtr->fType->fPrecision < 3))
560 {
561 colDefPtr->fType->fLength = 1;
562 }
563
564 else if (colDefPtr->fType->fPrecision < 5 && (colDefPtr->fType->fPrecision > 2))
565 {
566 colDefPtr->fType->fLength = 2;
567 }
568 else if (colDefPtr->fType->fPrecision > 4 && colDefPtr->fType->fPrecision < 10)
569 {
570 colDefPtr->fType->fLength = 4;
571 }
572 else if (colDefPtr->fType->fPrecision > 9 && colDefPtr->fType->fPrecision < 19)
573 {
574 colDefPtr->fType->fLength = 8;
575 }
576 }
577
578 bytestream << (fStartingColOID + (colNum++) + 1);
579 bytestream << (uint8_t) dataType;
580 bytestream << (uint8_t) false;
581
582 bytestream << (uint32_t) colDefPtr->fType->fLength;
583 bytestream << (uint16_t) useDBRoot;
584 bytestream << (uint32_t) colDefPtr->fType->fCompressiontype;
585
586 if ( (dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) ||
587 (dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) ||
588 (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ||
589 (dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) ||
590 (dataType == CalpontSystemCatalog::TEXT && colDefPtr->fType->fLength > 7) )
591 {
592 bytestream << (uint32_t) (fStartingColOID + numColumns + (dictNum++) + 1);
593 bytestream << (uint8_t) dataType;
594 bytestream << (uint8_t) true;
595 bytestream << (uint32_t) colDefPtr->fType->fLength;
596 bytestream << (uint16_t) useDBRoot;
597 bytestream << (uint32_t) colDefPtr->fType->fCompressiontype;
598 }
599
600 ++iter;
601 }
602
603 //@Bug 4176. save oids to a log file for cleanup after fail over.
604 std::vector <CalpontSystemCatalog::OID> oidList;
605
606 for (unsigned i = 0; i < numColumns; ++i)
607 {
608 oidList.push_back(fStartingColOID + i + 1);
609 }
610
611 bytestream << numDictCols;
612
613 for (unsigned i = 0; i < numDictCols; ++i)
614 {
615 oidList.push_back(fStartingColOID + numColumns + i + 1);
616 }
617
618 try
619 {
620 createWriteDropLogFile( fStartingColOID, uniqueId, oidList );
621 }
622 catch (std::exception& ex)
623 {
624 result.result = (ResultCode) rc;
625 Message::Args args;
626 Message message(9);
627 args.add("(4)Create table failed due to ");
628 args.add(ex.what());
629 message.format( args );
630 result.message = message;
631
632 if (rc != NETWORK_ERROR)
633 {
634 rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID ); //What to do with the error code
635 }
636
637 //release transaction
638 fSessionManager.rolledback(txnID);
639 return result;
640 }
641
642 pmNum = (*dbRootPMMap)[useDBRoot];
643
644 try
645 {
646 #ifdef IDB_DDL_DEBUG
647 cout << fTxnid.id << " create table sending WE_SVR_WRITE_CREATETABLEFILES to pm " << pmNum << endl;
648 #endif
649 fWEClient->write(bytestream, pmNum);
650
651 while (1)
652 {
653 bsIn.reset(new ByteStream());
654 fWEClient->read(uniqueId, bsIn);
655
656 if ( bsIn->length() == 0 ) //read error
657 {
658 rc = NETWORK_ERROR;
659 errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
660 break;
661 }
662 else
663 {
664 *bsIn >> rc;
665
666 if (rc != 0)
667 {
668 errorMsg.clear();
669 *bsIn >> errorMsg;
670 #ifdef IDB_DDL_DEBUG
671 cout << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
672 #endif
673 }
674
675 break;
676 }
677 }
678
679 if (rc != 0)
680 {
681 //drop the newly created files
682 bytestream.restart();
683 bytestream << (ByteStream::byte) WE_SVR_WRITE_DROPFILES;
684 bytestream << uniqueId;
685 bytestream << (uint32_t)(numColumns + numDictCols);
686
687 for (unsigned i = 0; i < (numColumns + numDictCols); i++)
688 {
689 bytestream << (uint32_t)(fStartingColOID + i + 1);
690 }
691
692 fWEClient->write(bytestream, pmNum);
693
694 while (1)
695 {
696 bsIn.reset(new ByteStream());
697 fWEClient->read(uniqueId, bsIn);
698
699 if ( bsIn->length() == 0 ) //read error
700 {
701 break;
702 }
703 else
704 {
705 break;
706 }
707 }
708
709 //@Bug 5464. Delete from extent map.
710 fDbrm->deleteOIDs(oidList);
711
712 }
713 }
714 catch (runtime_error&)
715 {
716 errorMsg = "Lost connection to Write Engine Server";
717 }
718
719 if (rc != 0)
720 {
721 #ifdef IDB_DDL_DEBUG
722 cout << fTxnid.id << " Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
723 #endif
724 rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID); //What to do with the error code
725 fSessionManager.rolledback(txnID);
726 }
727 else
728 {
729 commitTransaction(uniqueId, txnID);
730 fSessionManager.committed(txnID);
731 fWEClient->removeQueue(uniqueId);
732 deleteLogFile(DROPTABLE_LOG, fStartingColOID, uniqueId);
733 }
734
735 // Log the DDL statement.
736 logDDL(createTableStmt.fSessionID, txnID.id, createTableStmt.fSql, createTableStmt.fOwner);
737 }
738 catch (std::exception& ex)
739 {
740 result.result = CREATE_ERROR;
741 Message::Args args;
742 Message message(9);
743 args.add("(5)Create table failed due to ");
744 args.add(ex.what());
745 message.format( args );
746 result.message = message;
747 fSessionManager.rolledback(txnID);
748 fWEClient->removeQueue(uniqueId);
749 return result;
750 }
751
752 //fWEClient->removeQueue(uniqueId);
753 if (rc != 0)
754 {
755 result.result = CREATE_ERROR;
756 Message::Args args;
757 Message message(9);
758 args.add("(6)Create table failed due to ");
759 args.add(errorMsg);
760 message.format( args );
761 result.message = message;
762 }
763
764 return result;
765 }
766
rollBackCreateTable(const string & error,BRM::TxnID txnID,int sessionId,ddlpackage::TableDef & tableDef,DDLResult & result)767 void CreateTableProcessor::rollBackCreateTable(const string& error, BRM::TxnID txnID, int sessionId,
768 ddlpackage::TableDef& tableDef, DDLResult& result)
769 {
770 cerr << "CreatetableProcessor::processPackage: " << error << endl;
771
772 Message::Args args;
773 Message message(1);
774 args.add("(7)Create table Failed: ");
775 args.add(error);
776 args.add("");
777 args.add("");
778 message.format(args);
779
780 result.result = CREATE_ERROR;
781 result.message = message;
782
783 fWriteEngine.rollbackTran(txnID.id, sessionId);
784
785 size_t size = tableDef.fColumns.size();
786
787 for (size_t i = 0; i < size; ++i)
788 {
789 fWriteEngine.dropColumn(txnID.id, fStartingColOID + i);
790 }
791
792 try
793 {
794 execplan::ObjectIDManager fObjectIDManager;
795 fObjectIDManager.returnOID(fTableOID);
796 fObjectIDManager.returnOIDs(fStartingColOID,
797 fStartingColOID + tableDef.fColumns.size() - 1);
798 }
799 catch (std::exception& ex)
800 {
801 Message::Args args;
802 Message message(6);
803 args.add(ex.what());
804 message.format(args);
805 result.message = message;
806 result.result = CREATE_ERROR;
807 }
808 catch (...)
809 {
810 Message::Args args;
811 Message message(6);
812 args.add("Unknown exception");
813 message.format(args);
814 result.message = message;
815 result.result = CREATE_ERROR;
816 //cout << "returnOIDs error" << endl;
817 }
818
819 DictionaryOIDList::const_iterator dictoid_iter = fDictionaryOIDList.begin();
820
821 while (dictoid_iter != fDictionaryOIDList.end())
822 {
823 DictOID dictOID = *dictoid_iter;
824 fWriteEngine.dropDctnry(txnID.id, dictOID.dictOID, dictOID.treeOID, dictOID.listOID);
825 //fObjectIDManager.returnOID(dictOID.dictOID);
826
827 ++dictoid_iter;
828 }
829
830 fSessionManager.rolledback(txnID);
831 }
832
833 } // namespace ddlpackageprocessor
834 // vim:ts=4 sw=4:
835