1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 /***********************************************************************
20 * $Id: droptableprocessor.cpp 9744 2013-08-07 03:32:19Z bwilkinson $
21 *
22 *
23 ***********************************************************************/
24 #include <unistd.h>
25 #include <string>
26 #include <vector>
27 using namespace std;
28
29 #include "droptableprocessor.h"
30
31 #include "we_messages.h"
32 #include "we_ddlcommandclient.h"
33 using namespace WriteEngine;
34
35 #include "cacheutils.h"
36 using namespace cacheutils;
37
38 #include "bytestream.h"
39 using namespace messageqcpp;
40
41 #include "sqllogger.h"
42 #include "messagelog.h"
43 using namespace logging;
44
45 #include "calpontsystemcatalog.h"
46 using namespace execplan;
47
48 #include "oamcache.h"
49 using namespace oam;
50
51 namespace ddlpackageprocessor
52 {
53
processPackage(ddlpackage::DropTableStatement & dropTableStmt)54 DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::DropTableStatement& dropTableStmt)
55 {
56 SUMMARY_INFO("DropTableProcessor::processPackage");
57
58 DDLResult result;
59 result.result = NO_ERROR;
60 std::string err;
61 VERBOSE_INFO(dropTableStmt);
62
63 // Commit current transaction.
64 // all DDL statements cause an implicit commit
65 VERBOSE_INFO("Getting current txnID");
66 ByteStream::byte rc = 0;
67 BRM::TxnID txnID;
68 txnID.id = fTxnid.id;
69 txnID.valid = fTxnid.valid;
70 int rc1 = 0;
71 rc1 = fDbrm->isReadWrite();
72
73 if (rc1 != 0 )
74 {
75 Message::Args args;
76 Message message(9);
77 args.add("Unable to execute the statement due to DBRM is read only");
78 message.format(args);
79 result.result = DROP_ERROR;
80 result.message = message;
81 fSessionManager.rolledback(txnID);
82 return result;
83 }
84
85 string stmt = dropTableStmt.fSql + "|" + dropTableStmt.fTableName->fSchema + "|";
86 SQLLogger logger(stmt, fDDLLoggingId, dropTableStmt.fSessionID, txnID.id);
87
88 std::vector <CalpontSystemCatalog::OID> oidList;
89 CalpontSystemCatalog::RIDList tableColRidList;
90 CalpontSystemCatalog::DictOIDList dictOIDList;
91 execplan::CalpontSystemCatalog::ROPair roPair;
92 std::string errorMsg;
93 ByteStream bytestream;
94 uint64_t uniqueId = 0;
95
96 //Bug 5070. Added exception handling
97 try
98 {
99 uniqueId = fDbrm->getUnique64();
100 }
101 catch (std::exception& ex)
102 {
103 Message::Args args;
104 Message message(9);
105 args.add(ex.what());
106 message.format(args);
107 result.result = DROP_ERROR;
108 result.message = message;
109 fSessionManager.rolledback(txnID);
110 return result;
111 }
112 catch ( ... )
113 {
114 Message::Args args;
115 Message message(9);
116 args.add("Unknown error occured while getting unique number.");
117 message.format(args);
118 result.result = DROP_ERROR;
119 result.message = message;
120 fSessionManager.rolledback(txnID);
121 return result;
122 }
123
124 fWEClient->addQueue(uniqueId);
125 int pmNum = 1;
126 boost::shared_ptr<messageqcpp::ByteStream> bsIn;
127 uint64_t tableLockId = 0;
128 OamCache* oamcache = OamCache::makeOamCache();
129 std::vector<int> moduleIds = oamcache->getModuleIds();
130
131 // MCOL-66 The DBRM can't handle concurrent DDL
132 boost::mutex::scoped_lock lk(dbrmMutex);
133
134 try
135 {
136 //check table lock
137 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(dropTableStmt.fSessionID);
138 systemCatalogPtr->identity(CalpontSystemCatalog::EC);
139 systemCatalogPtr->sessionID(dropTableStmt.fSessionID);
140 CalpontSystemCatalog::TableName tableName;
141 tableName.schema = dropTableStmt.fTableName->fSchema;
142 tableName.table = dropTableStmt.fTableName->fName;
143
144 try
145 {
146 roPair = systemCatalogPtr->tableRID(tableName);
147 }
148 catch (IDBExcept& ie)
149 {
150 if (ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG)
151 {
152 Message::Args args;
153 Message message(1);
154 args.add("Table does not exist in ColumnStore.");
155 message.format(args);
156 result.result = DROP_TABLE_NOT_IN_CATALOG_ERROR;
157 result.message = message;
158 fSessionManager.rolledback(txnID);
159 return result;
160 }
161 else
162 {
163 result.result = DROP_ERROR;
164 Message::Args args;
165 Message message(9);
166 args.add("Drop table failed due to ");
167 args.add(ie.what());
168 message.format(args);
169 result.message = message;
170 fSessionManager.rolledback(txnID);
171 return result;
172 }
173 }
174
175 uint32_t processID = ::getpid();
176 int32_t txnid = txnID.id;
177 int32_t sessionId = dropTableStmt.fSessionID;
178 std::string processName("DDLProc");
179 int i = 0;
180
181 std::vector<uint32_t> pms;
182
183 for (unsigned i = 0; i < moduleIds.size(); i++)
184 {
185 pms.push_back((uint32_t)moduleIds[i]);
186 }
187
188 try
189 {
190 tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING );
191 }
192 catch (std::exception&)
193 {
194 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
195 }
196
197 if ( tableLockId == 0 )
198 {
199 int waitPeriod = 10;
200 int sleepTime = 100; // sleep 100 milliseconds between checks
201 int numTries = 10; // try 10 times per second
202 waitPeriod = WriteEngine::Config::getWaitPeriod();
203 numTries = waitPeriod * 10;
204 struct timespec rm_ts;
205
206 rm_ts.tv_sec = sleepTime / 1000;
207 rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
208
209 for (; i < numTries; i++)
210 {
211 #ifdef _MSC_VER
212 Sleep(rm_ts.tv_sec * 1000);
213 #else
214 struct timespec abs_ts;
215
216 do
217 {
218 abs_ts.tv_sec = rm_ts.tv_sec;
219 abs_ts.tv_nsec = rm_ts.tv_nsec;
220 }
221 while (nanosleep(&abs_ts, &rm_ts) < 0);
222
223 #endif
224
225 try
226 {
227 processID = ::getpid();
228 txnid = txnID.id;
229 sessionId = dropTableStmt.fSessionID;;
230 processName = "DDLProc";
231 tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING );
232 }
233 catch (std::exception&)
234 {
235 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
236 }
237
238 if (tableLockId > 0)
239 break;
240 }
241
242 if (i >= numTries) //error out
243 {
244 Message::Args args;
245 string strOp("drop table");
246 args.add(strOp);
247 args.add(processName);
248 args.add((uint64_t)processID);
249 args.add(sessionId);
250 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
251 }
252 }
253
254 // 1. Get the OIDs for the columns
255 // 2. Get the OIDs for the dictionaries
256 // 3. Save the OIDs to a log file
257 // 4. Remove the Table from SYSTABLE
258 // 5. Remove the columns from SYSCOLUMN
259 // 6. Commit the changes made to systables
260 // 7. Flush PrimProc Cache
261 // 8. Update extent map
262 // 9. Remove the column and dictionary files
263 // 10.Return the OIDs
264
265 CalpontSystemCatalog::TableName userTableName;
266 userTableName.schema = dropTableStmt.fTableName->fSchema;
267 userTableName.table = dropTableStmt.fTableName->fName;
268
269 tableColRidList = systemCatalogPtr->columnRIDs( userTableName );
270
271 dictOIDList = systemCatalogPtr->dictOIDs( userTableName );
272 Oam oam;
273
274 //Save qualified tablename, all column, dictionary OIDs, and transaction ID into a file in ASCII format
275 for ( unsigned i = 0; i < tableColRidList.size(); i++ )
276 {
277 if ( tableColRidList[i].objnum > 3000 )
278 oidList.push_back( tableColRidList[i].objnum );
279 }
280
281 for ( unsigned i = 0; i < dictOIDList.size(); i++ )
282 {
283 if ( dictOIDList[i].dictOID > 3000 )
284 oidList.push_back( dictOIDList[i].dictOID );
285 }
286
287 //get a unique number
288 VERBOSE_INFO("Removing the SYSTABLE meta data");
289 #ifdef IDB_DDL_DEBUG
290 cout << fTxnid.id << " Removing the SYSTABLEs meta data" << endl;
291 #endif
292 bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSTABLE;
293 bytestream << uniqueId;
294 bytestream << (uint32_t) dropTableStmt.fSessionID;
295 bytestream << (uint32_t)txnID.id;
296 bytestream << dropTableStmt.fTableName->fSchema;
297 bytestream << dropTableStmt.fTableName->fName;
298
299 //Find out where systable is
300 BRM::OID_t sysOid = 1001;
301 ByteStream::byte rc = 0;
302
303 uint16_t dbRoot;
304 rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
305
306 if (rc != 0)
307 {
308 result.result = (ResultCode) rc;
309 Message::Args args;
310 Message message(9);
311 args.add("Error while calling getSysCatDBRoot");
312 args.add(errorMsg);
313 result.message = message;
314 //release transaction
315 fSessionManager.rolledback(txnID);
316 return result;
317 }
318
319 boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
320 pmNum = (*dbRootPMMap)[dbRoot];
321
322 try
323 {
324 #ifdef IDB_DDL_DEBUG
325 cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl;
326 #endif
327 //cout << "deleting systable entries with txnid " << txnID.id << endl;
328 fWEClient->write(bytestream, (uint32_t)pmNum);
329
330 while (1)
331 {
332 bsIn.reset(new ByteStream());
333 fWEClient->read(uniqueId, bsIn);
334
335 if ( bsIn->length() == 0 ) //read error
336 {
337 rc = NETWORK_ERROR;
338 errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
339 break;
340 }
341 else
342 {
343 *bsIn >> rc;
344
345 if (rc != 0)
346 {
347 *bsIn >> errorMsg;
348 }
349
350 break;
351 }
352 }
353 }
354 catch (runtime_error& ex) //write error
355 {
356 #ifdef IDB_DDL_DEBUG
357 cout << fTxnid.id << " Drop table got exception" << endl;
358 #endif
359 rc = NETWORK_ERROR;
360 errorMsg = ex.what();
361 }
362 catch (...)
363 {
364 rc = NETWORK_ERROR;
365 #ifdef IDB_DDL_DEBUG
366 cout << fTxnid.id << " Drop table got unknown exception" << endl;
367 #endif
368 }
369
370 if (rc != 0)
371 {
372 cout << fTxnid.id << " Error in dropping table from systables(" << (int)rc << ") " << errorMsg.c_str() << endl;
373 Message::Args args;
374 Message message(9);
375 args.add("Error in dropping table from systables.");
376 args.add(errorMsg);
377 message.format(args);
378 result.result = (ResultCode)rc;
379 result.message = message;
380 //release table lock and session
381 fSessionManager.rolledback(txnID);
382 (void)fDbrm->releaseTableLock(tableLockId);
383 fWEClient->removeQueue(uniqueId);
384 return result;
385 }
386
387 //remove from syscolumn
388 bytestream.restart();
389 bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSCOLUMN;
390 bytestream << uniqueId;
391 bytestream << (uint32_t) dropTableStmt.fSessionID;
392 bytestream << (uint32_t)txnID.id;
393 bytestream << dropTableStmt.fTableName->fSchema;
394 bytestream << dropTableStmt.fTableName->fName;
395
396 //Find out where syscolumn is
397 sysOid = 1021;
398 rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
399
400 if (rc != 0)
401 {
402 result.result = (ResultCode) rc;
403 Message::Args args;
404 Message message(9);
405 args.add("Error while calling getSysCatDBRoot");
406 args.add(errorMsg);
407 result.message = message;
408 //release transaction
409 fSessionManager.rolledback(txnID);
410 return result;
411 }
412
413 pmNum = (*dbRootPMMap)[dbRoot];
414
415 try
416 {
417 #ifdef IDB_DDL_DEBUG
418 cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSCOLUMN to pm " << pmNum << endl;
419 #endif
420 fWEClient->write(bytestream, (unsigned)pmNum);
421
422 while (1)
423 {
424 bsIn.reset(new ByteStream());
425 fWEClient->read(uniqueId, bsIn);
426
427 if ( bsIn->length() == 0 ) //read error
428 {
429 rc = NETWORK_ERROR;
430 errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
431 break;
432 }
433 else
434 {
435 *bsIn >> rc;
436
437 if (rc != 0)
438 {
439 *bsIn >> errorMsg;
440 }
441
442 break;
443 }
444 }
445 }
446 catch (runtime_error& ex) //write error
447 {
448 #ifdef IDB_DDL_DEBUG
449 cout << fTxnid.id << " Drop table got exception" << endl;
450 #endif
451 rc = NETWORK_ERROR;
452 errorMsg = ex.what();
453 }
454 catch (...)
455 {
456 rc = NETWORK_ERROR;
457 #ifdef IDB_DDL_DEBUG
458 cout << fTxnid.id << " Drop table got unknown exception" << endl;
459 #endif
460 }
461
462 if (rc != 0)
463 {
464 cout << fTxnid.id << " Error in dropping column from systables(" << (int)rc << ") " << errorMsg.c_str() << endl;
465 Message::Args args;
466 Message message(9);
467 args.add("Error in dropping column from systables.");
468 args.add(errorMsg);
469 message.format(args);
470 result.result = (ResultCode)rc;
471 result.message = message;
472 //release table lock and session
473 fSessionManager.rolledback(txnID);
474 (void)fDbrm->releaseTableLock(tableLockId);
475 fWEClient->removeQueue(uniqueId);
476 return result;
477 }
478
479 rc = commitTransaction(uniqueId, txnID);
480
481 if (rc != 0)
482 {
483 cout << txnID.id << " rolledback transaction " << " and valid is " << txnID.valid << endl;
484 fSessionManager.rolledback(txnID);
485 }
486 else
487 {
488 cout << txnID.id << " commiting transaction " << txnID.id << " and valid is " << txnID.valid << endl;
489 fSessionManager.committed(txnID);
490 }
491
492 if (rc != 0)
493 {
494 Message::Args args;
495 Message message(9);
496 ostringstream oss;
497 oss << " Commit failed with error code " << rc;
498 args.add(oss.str());
499 fSessionManager.rolledback(txnID);
500 (void)fDbrm->releaseTableLock(tableLockId);
501 message.format(args);
502 result.result = (ResultCode)rc;
503 result.message = message;
504 fWEClient->removeQueue(uniqueId);
505 return result;
506 }
507
508 // Log the DDL statement
509 logDDL(dropTableStmt.fSessionID, txnID.id, dropTableStmt.fSql, dropTableStmt.fOwner);
510 }
511 catch (std::exception& ex)
512 {
513 result.result = DROP_ERROR;
514 Message::Args args;
515 Message message(9);
516 args.add("Drop table failed due to ");
517 args.add(ex.what());
518 fSessionManager.rolledback(txnID);
519
520 try
521 {
522 (void)fDbrm->releaseTableLock(tableLockId);
523 }
524 catch (std::exception&)
525 {
526 args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
527 }
528
529 message.format( args );
530 result.message = message;
531 fWEClient->removeQueue(uniqueId);
532 return result;
533 }
534 catch (...)
535 {
536 result.result = DROP_ERROR;
537 errorMsg = "Error in getting information from system catalog or from dbrm.";
538 Message::Args args;
539 Message message(9);
540 args.add("Drop table failed due to ");
541 args.add(errorMsg);
542 fSessionManager.rolledback(txnID);
543
544 try
545 {
546 (void)fDbrm->releaseTableLock(tableLockId);
547 }
548 catch (std::exception&)
549 {
550 args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
551 }
552
553 message.format( args );
554 result.message = message;
555 fWEClient->removeQueue(uniqueId);
556 return result;
557 }
558
559 try
560 {
561 (void)fDbrm->releaseTableLock(tableLockId);
562 }
563 catch (std::exception&)
564 {
565 result.result = DROP_ERROR;
566 Message::Args args;
567 Message message(9);
568 args.add("Drop table failed due to ");
569 args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
570 fSessionManager.rolledback(txnID);
571 message.format( args );
572 result.message = message;
573 fWEClient->removeQueue(uniqueId);
574 return result;
575 }
576
577 //Save the oids to a file
578 try
579 {
580 createWriteDropLogFile( roPair.objnum, uniqueId, oidList );
581 }
582 catch (std::exception& ex)
583 {
584 result.result = WARNING;
585 Message::Args args;
586 Message message(9);
587 args.add("Drop table failed due to ");
588 args.add(ex.what());
589 message.format(args);
590 result.message = message;
591 fSessionManager.rolledback(txnID);
592 fWEClient->removeQueue(uniqueId);
593 return result;
594 }
595
596 // Bug 4208 Drop the PrimProcFDCache before droping the column files
597 // FOr Windows, this ensures (most likely) that the column files have
598 // no open handles to hinder the deletion of the files.
599 rc = cacheutils::dropPrimProcFdCache();
600
601 //Drop files
602 bytestream.restart();
603 bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
604 bytestream << uniqueId;
605 bytestream << (uint32_t) oidList.size();
606
607 for (unsigned i = 0; i < oidList.size(); i++)
608 {
609 bytestream << (uint32_t) oidList[i];
610 }
611
612 #ifdef IDB_DDL_DEBUG
613 cout << fTxnid.id << " Drop table removing column files" << endl;
614 #endif
615 uint32_t msgRecived = 0;
616
617 try
618 {
619 fWEClient->write_to_all(bytestream);
620
621 bsIn.reset(new ByteStream());
622 ByteStream::byte tmp8;
623
624 while (1)
625 {
626 if (msgRecived == fWEClient->getPmCount())
627 break;
628
629 fWEClient->read(uniqueId, bsIn);
630
631 if ( bsIn->length() == 0 ) //read error
632 {
633 rc = NETWORK_ERROR;
634 fWEClient->removeQueue(uniqueId);
635 break;
636 }
637 else
638 {
639 *bsIn >> tmp8;
640 rc = tmp8;
641
642 if (rc != 0)
643 {
644 *bsIn >> errorMsg;
645 fWEClient->removeQueue(uniqueId);
646 break;
647 }
648 else
649 msgRecived++;
650 }
651 }
652 }
653 catch (std::exception& ex)
654 {
655 result.result = WARNING;
656 Message::Args args;
657 Message message(9);
658 args.add("Drop table failed due to ");
659 args.add(ex.what());
660 message.format(args);
661 result.message = message;
662 fSessionManager.rolledback(txnID);
663 fWEClient->removeQueue(uniqueId);
664 return result;
665 }
666 catch (...)
667 {
668 result.result = WARNING;
669 errorMsg = "Error in getting information from system catalog or from dbrm.";
670 Message::Args args;
671 Message message(9);
672 args.add("Drop table failed due to ");
673 args.add(errorMsg);
674 message.format(args);
675 result.message = message;
676 fSessionManager.rolledback(txnID);
677 fWEClient->removeQueue(uniqueId);
678 return result;
679 }
680
681 //Drop PrimProc FD cache
682 rc = cacheutils::dropPrimProcFdCache();
683 //Flush primProc cache
684 rc = cacheutils::flushOIDsFromCache( oidList );
685 //Delete extents from extent map
686 #ifdef IDB_DDL_DEBUG
687 cout << fTxnid.id << " Drop table deleteOIDs" << endl;
688 #endif
689 rc = fDbrm->deleteOIDs(oidList);
690
691 if (rc != 0)
692 {
693 Message::Args args;
694 Message message(1);
695 args.add("Table dropped with warning ");
696 args.add( "Remove from extent map failed." );
697 args.add("");
698 args.add("");
699 message.format( args );
700
701 result.result = WARNING;
702 result.message = message;
703 fSessionManager.rolledback(txnID);
704 fWEClient->removeQueue(uniqueId);
705 return result;
706 }
707
708 //Remove the log file
709 fWEClient->removeQueue(uniqueId);
710 deleteLogFile(DROPTABLE_LOG, roPair.objnum, uniqueId);
711 //release the transaction
712 //fSessionManager.committed(txnID);
713 returnOIDs( tableColRidList, dictOIDList );
714 return result;
715
716 }
717
processPackage(ddlpackage::TruncTableStatement & truncTableStmt)718 TruncTableProcessor::DDLResult TruncTableProcessor::processPackage(ddlpackage::TruncTableStatement& truncTableStmt)
719 {
720 SUMMARY_INFO("TruncTableProcessor::processPackage");
721 // 1. lock the table
722 // 2. Get the OIDs for the columns
723 // 3. Get the OIDs for the dictionaries
724 // 4. Save the OIDs
725 // 5. Disable all partitions
726 // 6. Remove the column and dictionary files
727 // 7. Flush PrimProc Cache
728 // 8. Update extent map
729 // 9. Use the OIDs to create new column and dictionary files with abbreviate extent
730 // 10 Update next value if the table has autoincrement column
731
732 DDLResult result;
733 result.result = NO_ERROR;
734 std::string err;
735 VERBOSE_INFO(truncTableStmt);
736
737 // @Bug 4150. Check dbrm status before doing anything to the table.
738 int rc = 0;
739 rc = fDbrm->isReadWrite();
740 BRM::TxnID txnID;
741 txnID.id = fTxnid.id;
742 txnID.valid = fTxnid.valid;
743
744 if (rc != 0 )
745 {
746 Message::Args args;
747 Message message(9);
748 args.add("Unable to execute the statement due to DBRM is read only");
749 message.format(args);
750 result.result = DROP_ERROR;
751 result.message = message;
752 fSessionManager.rolledback(txnID);
753 return result;
754 }
755
756 //@Bug 5765 log the schema.
757 string stmt = truncTableStmt.fSql + "|" + truncTableStmt.fTableName->fSchema + "|";
758 SQLLogger logger(stmt, fDDLLoggingId, truncTableStmt.fSessionID, txnID.id);
759
760 std::vector <CalpontSystemCatalog::OID> columnOidList;
761 std::vector <CalpontSystemCatalog::OID> allOidList;
762 CalpontSystemCatalog::RIDList tableColRidList;
763 CalpontSystemCatalog::DictOIDList dictOIDList;
764 execplan::CalpontSystemCatalog::ROPair roPair;
765 std::string processName("DDLProc");
766 uint32_t processID = ::getpid();;
767 int32_t txnid = txnID.id;
768 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(truncTableStmt.fSessionID);
769 systemCatalogPtr->identity(CalpontSystemCatalog::EC);
770 systemCatalogPtr->sessionID(truncTableStmt.fSessionID);
771 CalpontSystemCatalog::TableInfo tableInfo;
772 uint64_t uniqueId = 0;
773
774 //Bug 5070. Added exception handling
775 try
776 {
777 uniqueId = fDbrm->getUnique64();
778 }
779 catch (std::exception& ex)
780 {
781 Message::Args args;
782 Message message(9);
783 args.add(ex.what());
784 message.format(args);
785 result.result = DROP_ERROR;
786 result.message = message;
787 fSessionManager.rolledback(txnID);
788 return result;
789 }
790 catch ( ... )
791 {
792 Message::Args args;
793 Message message(9);
794 args.add("Unknown error occured while getting unique number.");
795 message.format(args);
796 result.result = DROP_ERROR;
797 result.message = message;
798 fSessionManager.rolledback(txnID);
799 return result;
800 }
801
802 fWEClient->addQueue(uniqueId);
803 int pmNum = 1;
804 boost::shared_ptr<messageqcpp::ByteStream> bsIn;
805 string errorMsg;
806 uint32_t autoIncColOid = 0;
807 uint64_t tableLockId = 0;
808 OamCache* oamcache = OamCache::makeOamCache();
809 std::vector<int> moduleIds = oamcache->getModuleIds();
810
811 try
812 {
813 //check table lock
814
815 CalpontSystemCatalog::TableName tableName;
816 tableName.schema = truncTableStmt.fTableName->fSchema;
817 tableName.table = truncTableStmt.fTableName->fName;
818 roPair = systemCatalogPtr->tableRID( tableName );
819 int32_t sessionId = truncTableStmt.fSessionID;
820 std::string processName("DDLProc");
821 int i = 0;
822
823 std::vector<uint32_t> pms;
824
825 for (unsigned i = 0; i < moduleIds.size(); i++)
826 {
827 pms.push_back((uint32_t)moduleIds[i]);
828 }
829
830 try
831 {
832 tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING );
833 }
834 catch (std::exception&)
835 {
836 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
837 }
838
839 if ( tableLockId == 0 )
840 {
841 int waitPeriod = 10;
842 int sleepTime = 100; // sleep 100 milliseconds between checks
843 int numTries = 10; // try 10 times per second
844 waitPeriod = Config::getWaitPeriod();
845 numTries = waitPeriod * 10;
846 struct timespec rm_ts;
847
848 rm_ts.tv_sec = sleepTime / 1000;
849 rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
850
851 for (; i < numTries; i++)
852 {
853 #ifdef _MSC_VER
854 Sleep(rm_ts.tv_sec * 1000);
855 #else
856 struct timespec abs_ts;
857
858 do
859 {
860 abs_ts.tv_sec = rm_ts.tv_sec;
861 abs_ts.tv_nsec = rm_ts.tv_nsec;
862 }
863 while (nanosleep(&abs_ts, &rm_ts) < 0);
864
865 #endif
866
867 try
868 {
869 processID = ::getpid();
870 txnid = txnID.id;
871 sessionId = truncTableStmt.fSessionID;
872 processName = "DDLProc";
873 tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING );
874 }
875 catch (std::exception&)
876 {
877 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
878 }
879
880 if (tableLockId > 0)
881 break;
882 }
883
884 if (i >= numTries) //error out
885 {
886 Message::Args args;
887 args.add(processName);
888 args.add((uint64_t)processID);
889 args.add(sessionId);
890 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
891 }
892 }
893
894 CalpontSystemCatalog::TableName userTableName;
895 userTableName.schema = truncTableStmt.fTableName->fSchema;
896 userTableName.table = truncTableStmt.fTableName->fName;
897
898 tableColRidList = systemCatalogPtr->columnRIDs( userTableName );
899
900 dictOIDList = systemCatalogPtr->dictOIDs( userTableName );
901
902 for ( unsigned i = 0; i < tableColRidList.size(); i++ )
903 {
904 if ( tableColRidList[i].objnum > 3000 )
905 {
906 columnOidList.push_back( tableColRidList[i].objnum );
907 allOidList.push_back( tableColRidList[i].objnum );
908 }
909 }
910
911 for ( unsigned i = 0; i < dictOIDList.size(); i++ )
912 {
913 if ( dictOIDList[i].dictOID > 3000 )
914 allOidList.push_back( dictOIDList[i].dictOID );
915 }
916
917 //Check whether the table has autoincrement column
918 tableInfo = systemCatalogPtr->tableInfo(userTableName);
919 }
920 catch (std::exception& ex)
921 {
922 cerr << "TruncateTableProcessor::processPackage: " << ex.what() << endl;
923
924 Message::Args args;
925 Message message(9);
926 args.add("Truncate table failed: ");
927 args.add( ex.what() );
928 args.add("");
929 fSessionManager.rolledback(txnID);
930
931 try
932 {
933 (void)fDbrm->releaseTableLock(tableLockId);
934 }
935 catch (std::exception&)
936 {
937 args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
938 }
939
940 fWEClient->removeQueue(uniqueId);
941 message.format( args );
942
943 result.result = TRUNC_ERROR;
944 result.message = message;
945 return result;
946 }
947 catch (...)
948 {
949 cerr << "TruncateTableProcessor::processPackage: caught unknown exception!" << endl;
950
951 Message::Args args;
952 Message message(1);
953 args.add("Truncate table failed: ");
954 args.add( "encountered unkown exception" );
955 args.add("");
956
957 try
958 {
959 (void)fDbrm->releaseTableLock(tableLockId);
960 }
961 catch (std::exception&)
962 {
963 args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
964 }
965
966 fWEClient->removeQueue(uniqueId);
967 message.format( args );
968
969 result.result = TRUNC_ERROR;
970 result.message = message;
971 return result;
972 }
973
974 //Save the oids to a file
975 try
976 {
977 createWriteTruncateTableLogFile( roPair.objnum, uniqueId, allOidList);
978 }
979 catch (std::exception& ex)
980 {
981 Message::Args args;
982 Message message(9);
983 args.add("Truncate table failed due to ");
984 args.add(ex.what());
985 fSessionManager.rolledback(txnID);
986 fWEClient->removeQueue(uniqueId);
987 message.format( args );
988
989 //@bug 4515 Release the tablelock as nothing has done to this table.
990 try
991 {
992 (void)fDbrm->releaseTableLock(tableLockId);
993 }
994 catch (std::exception&) {}
995
996 result.result = TRUNC_ERROR;
997 result.message = message;
998 return result;
999 }
1000
1001 ByteStream bytestream;
1002 ByteStream::byte tmp8;
1003
1004 // MCOL-66 The DBRM can't handle concurrent DDL
1005 boost::mutex::scoped_lock lk(dbrmMutex);
1006
1007 try
1008 {
1009 //Disable extents first
1010 int rc1 = fDbrm->markAllPartitionForDeletion( allOidList);
1011
1012 if (rc1 != 0)
1013 {
1014 string errMsg;
1015 BRM::errString(rc, errMsg);
1016 throw std::runtime_error(errMsg);
1017 }
1018
1019 // Bug 4208 Drop the PrimProcFDCache before droping the column files
1020 // FOr Windows, this ensures (most likely) that the column files have
1021 // no open handles to hinder the deletion of the files.
1022 rc = cacheutils::dropPrimProcFdCache();
1023
1024 VERBOSE_INFO("Removing files");
1025 bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
1026 bytestream << uniqueId;
1027 bytestream << (uint32_t) allOidList.size();
1028
1029 for (unsigned i = 0; i < allOidList.size(); i++)
1030 {
1031 bytestream << (uint32_t) allOidList[i];
1032 }
1033
1034 uint32_t msgRecived = 0;
1035
1036 try
1037 {
1038 fWEClient->write_to_all(bytestream);
1039
1040 bsIn.reset(new ByteStream());
1041
1042 while (1)
1043 {
1044 if (msgRecived == fWEClient->getPmCount())
1045 break;
1046
1047 fWEClient->read(uniqueId, bsIn);
1048
1049 if ( bsIn->length() == 0 ) //read error
1050 {
1051 rc = NETWORK_ERROR;
1052 fWEClient->removeQueue(uniqueId);
1053 break;
1054 }
1055 else
1056 {
1057 *bsIn >> tmp8;
1058 rc = tmp8;
1059
1060 if (rc != 0)
1061 {
1062 *bsIn >> errorMsg;
1063 fWEClient->removeQueue(uniqueId);
1064 break;
1065 }
1066 else
1067 msgRecived++;
1068 }
1069 }
1070 }
1071 catch (std::exception& ex)
1072 {
1073 Message::Args args;
1074 Message message(9);
1075 args.add("Truncate table failed due to ");
1076 args.add(ex.what());
1077 fSessionManager.rolledback(txnID);
1078 fWEClient->removeQueue(uniqueId);
1079 message.format( args );
1080
1081 result.result = TRUNC_ERROR;
1082 result.message = message;
1083 deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId);
1084 return result;
1085 }
1086 catch (...)
1087 {
1088 result.result = DROP_ERROR;
1089 errorMsg = "Error in getting information from system catalog or from dbrm.";
1090 Message::Args args;
1091 Message message(9);
1092 args.add("Truncate table failed due to ");
1093 args.add(errorMsg);
1094 fSessionManager.rolledback(txnID);
1095 fWEClient->removeQueue(uniqueId);
1096 message.format( args );
1097
1098 result.result = TRUNC_ERROR;
1099 result.message = message;
1100 deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId);
1101 return result;
1102 }
1103
1104 //Drop PrimProc FD cache
1105 rc = cacheutils::dropPrimProcFdCache();
1106 //Flush primProc cache
1107 rc = cacheutils::flushOIDsFromCache( allOidList );
1108 //Delete extents from extent map
1109 rc = fDbrm->deleteOIDs(allOidList);
1110
1111 if (rc != 0)
1112 {
1113 Message::Args args;
1114 Message message(1);
1115 args.add("Table truncated with warning ");
1116 args.add( "Remove from extent map failed." );
1117 args.add("");
1118 args.add("");
1119 message.format( args );
1120
1121 result.result = WARNING;
1122 result.message = message;
1123 fSessionManager.rolledback(txnID);
1124 fWEClient->removeQueue(uniqueId);
1125 return result;
1126 }
1127
1128 //Get the number of tables in the database, the current table is included.
1129 int tableCount = systemCatalogPtr->getTableCount();
1130 Oam oam;
1131 //Calculate which dbroot the columns should start
1132 DBRootConfigList dbRootList = oamcache->getDBRootNums();
1133
1134 uint16_t useDBRootIndex = tableCount % dbRootList.size();
1135 //Find out the dbroot# corresponding the useDBRootIndex from oam
1136 uint16_t useDBRoot = dbRootList[useDBRootIndex];
1137
1138 bytestream.restart();
1139 bytestream << (ByteStream::byte) WE_SVR_WRITE_CREATETABLEFILES;
1140 bytestream << uniqueId;
1141 bytestream << (uint32_t)txnID.id;
1142 uint32_t numOids = columnOidList.size() + dictOIDList.size();
1143 bytestream << numOids;
1144 CalpontSystemCatalog::ColType colType;
1145
1146 for (unsigned col = 0; col < columnOidList.size(); col++)
1147 {
1148 colType = systemCatalogPtr->colType(columnOidList[col]);
1149
1150 if (colType.autoincrement)
1151 autoIncColOid = colType.columnOID;
1152
1153 bytestream << (uint32_t)columnOidList[col];
1154 bytestream << (uint8_t) colType.colDataType;
1155 bytestream << (uint8_t) false;
1156 bytestream << (uint32_t) colType.colWidth;
1157 bytestream << (uint16_t) useDBRoot;
1158 bytestream << (uint32_t) colType.compressionType;
1159 }
1160
1161 for (unsigned col = 0; col < dictOIDList.size(); col++)
1162 {
1163 colType = systemCatalogPtr->colTypeDct(dictOIDList[col].dictOID);
1164 bytestream << (uint32_t) dictOIDList[col].dictOID;
1165 bytestream << (uint8_t) colType.colDataType;
1166 bytestream << (uint8_t) true;
1167 bytestream << (uint32_t) colType.colWidth;
1168 bytestream << (uint16_t) useDBRoot;
1169 bytestream << (uint32_t) colType.compressionType;
1170 }
1171
1172 boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
1173 pmNum = (*dbRootPMMap)[useDBRoot];
1174
1175 try
1176 {
1177 #ifdef IDB_DDL_DEBUG
1178 cout << "Truncate table sending We_SVR_WRITE_CREATETABLEFILES to pm " << pmNum << endl;
1179 #endif
1180 fWEClient->write(bytestream, pmNum);
1181
1182 while (1)
1183 {
1184 bsIn.reset(new ByteStream());
1185 fWEClient->read(uniqueId, bsIn);
1186
1187 if ( bsIn->length() == 0 ) //read error
1188 {
1189 rc = NETWORK_ERROR;
1190 errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
1191 break;
1192 }
1193 else
1194 {
1195 *bsIn >> tmp8;
1196 rc = tmp8;
1197
1198 if (rc != 0)
1199 {
1200 *bsIn >> errorMsg;
1201 }
1202
1203 break;
1204 }
1205 }
1206
1207 if (rc != 0)
1208 {
1209 //drop the newly created files
1210 bytestream.restart();
1211 bytestream << (ByteStream::byte) WE_SVR_WRITE_DROPFILES;
1212 bytestream << uniqueId;
1213 bytestream << (uint32_t)(allOidList.size());
1214
1215 for (unsigned i = 0; i < (allOidList.size()); i++)
1216 {
1217 bytestream << (uint32_t)(allOidList[i]);
1218 }
1219
1220 fWEClient->write(bytestream, pmNum);
1221
1222 while (1)
1223 {
1224 bsIn.reset(new ByteStream());
1225 fWEClient->read(uniqueId, bsIn);
1226
1227 if ( bsIn->length() == 0 ) //read error
1228 {
1229 break;
1230 }
1231 else
1232 {
1233 *bsIn >> tmp8;
1234 //rc = tmp8;
1235 break;
1236 }
1237 }
1238
1239 Message::Args args;
1240 Message message(1);
1241 args.add( "Truncate table failed." );
1242 args.add( errorMsg);
1243 args.add("");
1244 message.format( args );
1245
1246 result.result = TRUNC_ERROR;
1247 result.message = message;
1248 //rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, processName, false );
1249 fSessionManager.rolledback(txnID);
1250 return result;
1251 }
1252 }
1253 catch (runtime_error&)
1254 {
1255 rc = NETWORK_ERROR;
1256 errorMsg = "Lost connection to Write Engine Server";
1257 }
1258 }
1259
1260 #ifdef _MSC_VER
1261 catch (std::exception&)
1262 {
1263 //FIXME: Windows can't delete a file that's still open by another process
1264 }
1265
1266 #else
1267 catch (std::exception& ex)
1268 {
1269 Message::Args args;
1270 Message message(1);
1271 args.add( "Truncate table failed." );
1272 args.add( ex.what() );
1273 args.add("");
1274 message.format( args );
1275
1276 result.result = TRUNC_ERROR;
1277 result.message = message;
1278 //rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, processName, false );
1279 fSessionManager.rolledback(txnID);
1280 return result;
1281 }
1282
1283 #endif
1284 catch ( ... )
1285 {
1286 Message::Args args;
1287 Message message(1);
1288 args.add("Truncate table failed: ");
1289 args.add( "Remove column files failed." );
1290 args.add("");
1291 args.add("");
1292 message.format( args );
1293
1294 result.result = TRUNC_ERROR;
1295 result.message = message;
1296 //rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, processName, false );
1297 fSessionManager.rolledback(txnID);
1298 return result;
1299 }
1300
1301 if (rc != 0)
1302 {
1303 rollBackTransaction( uniqueId, txnID, truncTableStmt.fSessionID); //What to do with the error code
1304 fSessionManager.rolledback(txnID);
1305 }
1306
1307 //Check whether the table has autoincrement column
1308 if (tableInfo.tablewithautoincr == 1)
1309 {
1310 //reset nextvalue to 1
1311 WE_DDLCommandClient commandClient;
1312 rc = commandClient.UpdateSyscolumnNextval(autoIncColOid, 1);
1313 }
1314
1315 // Log the DDL statement
1316 logDDL(truncTableStmt.fSessionID, txnID.id, truncTableStmt.fSql, truncTableStmt.fOwner);
1317
1318 try
1319 {
1320 (void)fDbrm->releaseTableLock(tableLockId);
1321 }
1322 catch (std::exception&)
1323 {
1324 Message::Args args;
1325 Message message(1);
1326 args.add("Table truncated with warning ");
1327 args.add( "Release table failed." );
1328 args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
1329 args.add("");
1330 message.format( args );
1331
1332 result.result = WARNING;
1333 result.message = message;
1334 fSessionManager.rolledback(txnID);
1335 fWEClient->removeQueue(uniqueId);
1336 }
1337
1338 //release the transaction
1339 fSessionManager.committed(txnID);
1340 fWEClient->removeQueue(uniqueId);
1341
1342 //Remove the log file
1343 try
1344 {
1345 deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId);
1346 }
1347 catch ( ... )
1348 {
1349 }
1350
1351 return result;
1352 }
1353
1354 } //namespace ddlpackageprocessor
1355
1356 // vim:ts=4 sw=4:
1357
1358