1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 /***********************************************************************
20  *   $Id: droptableprocessor.cpp 9744 2013-08-07 03:32:19Z bwilkinson $
21  *
22  *
23  ***********************************************************************/
24 #include <unistd.h>
25 #include <string>
26 #include <vector>
27 using namespace std;
28 
29 #include "droptableprocessor.h"
30 
31 #include "we_messages.h"
32 #include "we_ddlcommandclient.h"
33 using namespace WriteEngine;
34 
35 #include "cacheutils.h"
36 using namespace cacheutils;
37 
38 #include "bytestream.h"
39 using namespace messageqcpp;
40 
41 #include "sqllogger.h"
42 #include "messagelog.h"
43 using namespace logging;
44 
45 #include "calpontsystemcatalog.h"
46 using namespace execplan;
47 
48 #include "oamcache.h"
49 using namespace oam;
50 
51 namespace ddlpackageprocessor
52 {
53 
processPackage(ddlpackage::DropTableStatement & dropTableStmt)54 DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::DropTableStatement& dropTableStmt)
55 {
56     SUMMARY_INFO("DropTableProcessor::processPackage");
57 
58     DDLResult result;
59     result.result = NO_ERROR;
60     std::string err;
61     VERBOSE_INFO(dropTableStmt);
62 
63     // Commit current transaction.
64     // all DDL statements cause an implicit commit
65     VERBOSE_INFO("Getting current txnID");
66     ByteStream::byte rc = 0;
67     BRM::TxnID txnID;
68     txnID.id = fTxnid.id;
69     txnID.valid = fTxnid.valid;
70     int rc1 = 0;
71     rc1 = fDbrm->isReadWrite();
72 
73     if (rc1 != 0 )
74     {
75         Message::Args args;
76         Message message(9);
77         args.add("Unable to execute the statement due to DBRM is read only");
78         message.format(args);
79         result.result = DROP_ERROR;
80         result.message = message;
81         fSessionManager.rolledback(txnID);
82         return result;
83     }
84 
85     string stmt = dropTableStmt.fSql + "|" + dropTableStmt.fTableName->fSchema + "|";
86     SQLLogger logger(stmt, fDDLLoggingId, dropTableStmt.fSessionID, txnID.id);
87 
88     std::vector <CalpontSystemCatalog::OID> oidList;
89     CalpontSystemCatalog::RIDList tableColRidList;
90     CalpontSystemCatalog::DictOIDList dictOIDList;
91     execplan::CalpontSystemCatalog::ROPair roPair;
92     std::string errorMsg;
93     ByteStream bytestream;
94     uint64_t uniqueId = 0;
95 
96     //Bug 5070. Added exception handling
97     try
98     {
99         uniqueId = fDbrm->getUnique64();
100     }
101     catch (std::exception& ex)
102     {
103         Message::Args args;
104         Message message(9);
105         args.add(ex.what());
106         message.format(args);
107         result.result = DROP_ERROR;
108         result.message = message;
109         fSessionManager.rolledback(txnID);
110         return result;
111     }
112     catch ( ... )
113     {
114         Message::Args args;
115         Message message(9);
116         args.add("Unknown error occured while getting unique number.");
117         message.format(args);
118         result.result = DROP_ERROR;
119         result.message = message;
120         fSessionManager.rolledback(txnID);
121         return result;
122     }
123 
124     fWEClient->addQueue(uniqueId);
125     int pmNum = 1;
126     boost::shared_ptr<messageqcpp::ByteStream> bsIn;
127     uint64_t tableLockId = 0;
128     OamCache* oamcache = OamCache::makeOamCache();
129     std::vector<int> moduleIds = oamcache->getModuleIds();
130 
131     // MCOL-66 The DBRM can't handle concurrent DDL
132     boost::mutex::scoped_lock lk(dbrmMutex);
133 
134     try
135     {
136         //check table lock
137         boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(dropTableStmt.fSessionID);
138         systemCatalogPtr->identity(CalpontSystemCatalog::EC);
139         systemCatalogPtr->sessionID(dropTableStmt.fSessionID);
140         CalpontSystemCatalog::TableName tableName;
141         tableName.schema = dropTableStmt.fTableName->fSchema;
142         tableName.table = dropTableStmt.fTableName->fName;
143 
144         try
145         {
146             roPair = systemCatalogPtr->tableRID(tableName);
147         }
148         catch (IDBExcept& ie)
149         {
150             if (ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG)
151             {
152                 Message::Args args;
153                 Message message(1);
154                 args.add("Table does not exist in ColumnStore.");
155                 message.format(args);
156                 result.result = DROP_TABLE_NOT_IN_CATALOG_ERROR;
157                 result.message = message;
158                 fSessionManager.rolledback(txnID);
159                 return result;
160             }
161             else
162             {
163                 result.result = DROP_ERROR;
164                 Message::Args args;
165                 Message message(9);
166                 args.add("Drop table failed due to ");
167                 args.add(ie.what());
168                 message.format(args);
169                 result.message = message;
170                 fSessionManager.rolledback(txnID);
171                 return result;
172             }
173         }
174 
175         uint32_t  processID = ::getpid();
176         int32_t   txnid = txnID.id;
177         int32_t sessionId = dropTableStmt.fSessionID;
178         std::string  processName("DDLProc");
179         int i = 0;
180 
181         std::vector<uint32_t> pms;
182 
183         for (unsigned i = 0; i < moduleIds.size(); i++)
184         {
185             pms.push_back((uint32_t)moduleIds[i]);
186         }
187 
188         try
189         {
190             tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING );
191         }
192         catch (std::exception&)
193         {
194             throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
195         }
196 
197         if ( tableLockId  == 0 )
198         {
199             int waitPeriod = 10;
200             int sleepTime = 100; // sleep 100 milliseconds between checks
201             int numTries = 10;  // try 10 times per second
202             waitPeriod = WriteEngine::Config::getWaitPeriod();
203             numTries = 	waitPeriod * 10;
204             struct timespec rm_ts;
205 
206             rm_ts.tv_sec = sleepTime / 1000;
207             rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
208 
209             for (; i < numTries; i++)
210             {
211 #ifdef _MSC_VER
212                 Sleep(rm_ts.tv_sec * 1000);
213 #else
214                 struct timespec abs_ts;
215 
216                 do
217                 {
218                     abs_ts.tv_sec = rm_ts.tv_sec;
219                     abs_ts.tv_nsec = rm_ts.tv_nsec;
220                 }
221                 while (nanosleep(&abs_ts, &rm_ts) < 0);
222 
223 #endif
224 
225                 try
226                 {
227                     processID = ::getpid();
228                     txnid = txnID.id;
229                     sessionId = dropTableStmt.fSessionID;;
230                     processName = "DDLProc";
231                     tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING );
232                 }
233                 catch (std::exception&)
234                 {
235                     throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
236                 }
237 
238                 if (tableLockId > 0)
239                     break;
240             }
241 
242             if (i >= numTries) //error out
243             {
244                 Message::Args args;
245                 string strOp("drop table");
246                 args.add(strOp);
247                 args.add(processName);
248                 args.add((uint64_t)processID);
249                 args.add(sessionId);
250                 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
251             }
252         }
253 
254         // 1. Get the OIDs for the columns
255         // 2. Get the OIDs for the dictionaries
256         // 3. Save the OIDs to a log file
257         // 4. Remove the Table from SYSTABLE
258         // 5. Remove the columns from SYSCOLUMN
259         // 6. Commit the changes made to systables
260         // 7. Flush PrimProc Cache
261         // 8. Update extent map
262         // 9. Remove the column and dictionary files
263         // 10.Return the OIDs
264 
265         CalpontSystemCatalog::TableName userTableName;
266         userTableName.schema = dropTableStmt.fTableName->fSchema;
267         userTableName.table = dropTableStmt.fTableName->fName;
268 
269         tableColRidList = systemCatalogPtr->columnRIDs( userTableName );
270 
271         dictOIDList = systemCatalogPtr->dictOIDs( userTableName );
272         Oam oam;
273 
274         //Save qualified tablename, all column, dictionary OIDs, and transaction ID into a file in ASCII format
275         for ( unsigned i = 0; i < tableColRidList.size(); i++ )
276         {
277             if ( tableColRidList[i].objnum > 3000 )
278                 oidList.push_back( tableColRidList[i].objnum );
279         }
280 
281         for ( unsigned i = 0; i < dictOIDList.size(); i++ )
282         {
283             if (  dictOIDList[i].dictOID > 3000 )
284                 oidList.push_back( dictOIDList[i].dictOID );
285         }
286 
287         //get a unique number
288         VERBOSE_INFO("Removing the SYSTABLE meta data");
289 #ifdef IDB_DDL_DEBUG
290         cout << fTxnid.id << " Removing the SYSTABLEs meta data" << endl;
291 #endif
292         bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSTABLE;
293         bytestream << uniqueId;
294         bytestream << (uint32_t) dropTableStmt.fSessionID;
295         bytestream << (uint32_t)txnID.id;
296         bytestream << dropTableStmt.fTableName->fSchema;
297         bytestream << dropTableStmt.fTableName->fName;
298 
299         //Find out where systable is
300         BRM::OID_t sysOid = 1001;
301         ByteStream::byte rc = 0;
302 
303         uint16_t  dbRoot;
304         rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
305 
306         if (rc != 0)
307         {
308             result.result = (ResultCode) rc;
309             Message::Args args;
310             Message message(9);
311             args.add("Error while calling getSysCatDBRoot");
312             args.add(errorMsg);
313             result.message = message;
314             //release transaction
315             fSessionManager.rolledback(txnID);
316             return result;
317         }
318 
319         boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
320         pmNum = (*dbRootPMMap)[dbRoot];
321 
322         try
323         {
324 #ifdef IDB_DDL_DEBUG
325             cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl;
326 #endif
327             //cout << "deleting systable entries with txnid " << txnID.id << endl;
328             fWEClient->write(bytestream, (uint32_t)pmNum);
329 
330             while (1)
331             {
332                 bsIn.reset(new ByteStream());
333                 fWEClient->read(uniqueId, bsIn);
334 
335                 if ( bsIn->length() == 0 ) //read error
336                 {
337                     rc = NETWORK_ERROR;
338                     errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
339                     break;
340                 }
341                 else
342                 {
343                     *bsIn >> rc;
344 
345                     if (rc != 0)
346                     {
347                         *bsIn >> errorMsg;
348                     }
349 
350                     break;
351                 }
352             }
353         }
354         catch (runtime_error& ex) //write error
355         {
356 #ifdef IDB_DDL_DEBUG
357             cout << fTxnid.id << " Drop table got exception" << endl;
358 #endif
359             rc = NETWORK_ERROR;
360             errorMsg = ex.what();
361         }
362         catch (...)
363         {
364             rc = NETWORK_ERROR;
365 #ifdef IDB_DDL_DEBUG
366             cout << fTxnid.id << " Drop table got unknown exception" << endl;
367 #endif
368         }
369 
370         if (rc != 0)
371         {
372             cout << fTxnid.id << " Error in dropping table from systables(" << (int)rc << ") " << errorMsg.c_str() << endl;
373             Message::Args args;
374             Message message(9);
375             args.add("Error in dropping table from systables.");
376             args.add(errorMsg);
377             message.format(args);
378             result.result = (ResultCode)rc;
379             result.message = message;
380             //release table lock and session
381             fSessionManager.rolledback(txnID);
382             (void)fDbrm->releaseTableLock(tableLockId);
383             fWEClient->removeQueue(uniqueId);
384             return result;
385         }
386 
387         //remove from syscolumn
388         bytestream.restart();
389         bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSCOLUMN;
390         bytestream << uniqueId;
391         bytestream << (uint32_t) dropTableStmt.fSessionID;
392         bytestream << (uint32_t)txnID.id;
393         bytestream << dropTableStmt.fTableName->fSchema;
394         bytestream << dropTableStmt.fTableName->fName;
395 
396         //Find out where syscolumn is
397         sysOid = 1021;
398         rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
399 
400         if (rc != 0)
401         {
402             result.result = (ResultCode) rc;
403             Message::Args args;
404             Message message(9);
405             args.add("Error while calling getSysCatDBRoot");
406             args.add(errorMsg);
407             result.message = message;
408             //release transaction
409             fSessionManager.rolledback(txnID);
410             return result;
411         }
412 
413         pmNum = (*dbRootPMMap)[dbRoot];
414 
415         try
416         {
417 #ifdef IDB_DDL_DEBUG
418             cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSCOLUMN to pm " << pmNum << endl;
419 #endif
420             fWEClient->write(bytestream, (unsigned)pmNum);
421 
422             while (1)
423             {
424                 bsIn.reset(new ByteStream());
425                 fWEClient->read(uniqueId, bsIn);
426 
427                 if ( bsIn->length() == 0 ) //read error
428                 {
429                     rc = NETWORK_ERROR;
430                     errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
431                     break;
432                 }
433                 else
434                 {
435                     *bsIn >> rc;
436 
437                     if (rc != 0)
438                     {
439                         *bsIn >> errorMsg;
440                     }
441 
442                     break;
443                 }
444             }
445         }
446         catch (runtime_error& ex) //write error
447         {
448 #ifdef IDB_DDL_DEBUG
449             cout << fTxnid.id << " Drop table got exception" << endl;
450 #endif
451             rc = NETWORK_ERROR;
452             errorMsg = ex.what();
453         }
454         catch (...)
455         {
456             rc = NETWORK_ERROR;
457 #ifdef IDB_DDL_DEBUG
458             cout << fTxnid.id << " Drop table got unknown exception" << endl;
459 #endif
460         }
461 
462         if (rc != 0)
463         {
464             cout << fTxnid.id << " Error in dropping column from systables(" << (int)rc << ") " << errorMsg.c_str() << endl;
465             Message::Args args;
466             Message message(9);
467             args.add("Error in dropping column from systables.");
468             args.add(errorMsg);
469             message.format(args);
470             result.result = (ResultCode)rc;
471             result.message = message;
472             //release table lock and session
473             fSessionManager.rolledback(txnID);
474             (void)fDbrm->releaseTableLock(tableLockId);
475             fWEClient->removeQueue(uniqueId);
476             return result;
477         }
478 
479         rc = commitTransaction(uniqueId, txnID);
480 
481         if (rc != 0)
482         {
483             cout << txnID.id << " rolledback transaction " << " and valid is " << txnID.valid << endl;
484             fSessionManager.rolledback(txnID);
485         }
486         else
487         {
488             cout << txnID.id << " commiting transaction " << txnID.id << " and valid is " << txnID.valid << endl;
489             fSessionManager.committed(txnID);
490         }
491 
492         if (rc != 0)
493         {
494             Message::Args args;
495             Message message(9);
496             ostringstream oss;
497             oss << " Commit failed with error code " << rc;
498             args.add(oss.str());
499             fSessionManager.rolledback(txnID);
500             (void)fDbrm->releaseTableLock(tableLockId);
501             message.format(args);
502             result.result = (ResultCode)rc;
503             result.message = message;
504             fWEClient->removeQueue(uniqueId);
505             return result;
506         }
507 
508         // Log the DDL statement
509         logDDL(dropTableStmt.fSessionID, txnID.id, dropTableStmt.fSql, dropTableStmt.fOwner);
510     }
511     catch (std::exception& ex)
512     {
513         result.result = DROP_ERROR;
514         Message::Args args;
515         Message message(9);
516         args.add("Drop table failed due to ");
517         args.add(ex.what());
518         fSessionManager.rolledback(txnID);
519 
520         try
521         {
522             (void)fDbrm->releaseTableLock(tableLockId);
523         }
524         catch (std::exception&)
525         {
526             args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
527         }
528 
529         message.format( args );
530         result.message = message;
531         fWEClient->removeQueue(uniqueId);
532         return result;
533     }
534     catch (...)
535     {
536         result.result = DROP_ERROR;
537         errorMsg = "Error in getting information from system catalog or from dbrm.";
538         Message::Args args;
539         Message message(9);
540         args.add("Drop table failed due to ");
541         args.add(errorMsg);
542         fSessionManager.rolledback(txnID);
543 
544         try
545         {
546             (void)fDbrm->releaseTableLock(tableLockId);
547         }
548         catch (std::exception&)
549         {
550             args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
551         }
552 
553         message.format( args );
554         result.message = message;
555         fWEClient->removeQueue(uniqueId);
556         return result;
557     }
558 
559     try
560     {
561         (void)fDbrm->releaseTableLock(tableLockId);
562     }
563     catch (std::exception&)
564     {
565         result.result = DROP_ERROR;
566         Message::Args args;
567         Message message(9);
568         args.add("Drop table failed due to ");
569         args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
570         fSessionManager.rolledback(txnID);
571         message.format( args );
572         result.message = message;
573         fWEClient->removeQueue(uniqueId);
574         return result;
575     }
576 
577     //Save the oids to a file
578     try
579     {
580         createWriteDropLogFile( roPair.objnum, uniqueId, oidList );
581     }
582     catch (std::exception& ex)
583     {
584         result.result = WARNING;
585         Message::Args args;
586         Message message(9);
587         args.add("Drop table failed due to ");
588         args.add(ex.what());
589         message.format(args);
590         result.message = message;
591         fSessionManager.rolledback(txnID);
592         fWEClient->removeQueue(uniqueId);
593         return result;
594     }
595 
596     // Bug 4208 Drop the PrimProcFDCache before droping the column files
597     // FOr Windows, this ensures (most likely) that the column files have
598     // no open handles to hinder the deletion of the files.
599     rc = cacheutils::dropPrimProcFdCache();
600 
601     //Drop files
602     bytestream.restart();
603     bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
604     bytestream << uniqueId;
605     bytestream << (uint32_t) oidList.size();
606 
607     for (unsigned i = 0; i < oidList.size(); i++)
608     {
609         bytestream << (uint32_t) oidList[i];
610     }
611 
612 #ifdef IDB_DDL_DEBUG
613     cout << fTxnid.id << " Drop table removing column files" << endl;
614 #endif
615     uint32_t msgRecived = 0;
616 
617     try
618     {
619         fWEClient->write_to_all(bytestream);
620 
621         bsIn.reset(new ByteStream());
622         ByteStream::byte tmp8;
623 
624         while (1)
625         {
626             if (msgRecived == fWEClient->getPmCount())
627                 break;
628 
629             fWEClient->read(uniqueId, bsIn);
630 
631             if ( bsIn->length() == 0 ) //read error
632             {
633                 rc = NETWORK_ERROR;
634                 fWEClient->removeQueue(uniqueId);
635                 break;
636             }
637             else
638             {
639                 *bsIn >> tmp8;
640                 rc = tmp8;
641 
642                 if (rc != 0)
643                 {
644                     *bsIn >> errorMsg;
645                     fWEClient->removeQueue(uniqueId);
646                     break;
647                 }
648                 else
649                     msgRecived++;
650             }
651         }
652     }
653     catch (std::exception& ex)
654     {
655         result.result = WARNING;
656         Message::Args args;
657         Message message(9);
658         args.add("Drop table failed due to ");
659         args.add(ex.what());
660         message.format(args);
661         result.message = message;
662         fSessionManager.rolledback(txnID);
663         fWEClient->removeQueue(uniqueId);
664         return result;
665     }
666     catch (...)
667     {
668         result.result = WARNING;
669         errorMsg = "Error in getting information from system catalog or from dbrm.";
670         Message::Args args;
671         Message message(9);
672         args.add("Drop table failed due to ");
673         args.add(errorMsg);
674         message.format(args);
675         result.message = message;
676         fSessionManager.rolledback(txnID);
677         fWEClient->removeQueue(uniqueId);
678         return result;
679     }
680 
681     //Drop PrimProc FD cache
682     rc = cacheutils::dropPrimProcFdCache();
683     //Flush primProc cache
684     rc = cacheutils::flushOIDsFromCache( oidList );
685     //Delete extents from extent map
686 #ifdef IDB_DDL_DEBUG
687     cout << fTxnid.id << " Drop table deleteOIDs" << endl;
688 #endif
689     rc = fDbrm->deleteOIDs(oidList);
690 
691     if (rc != 0)
692     {
693         Message::Args args;
694         Message message(1);
695         args.add("Table dropped with warning ");
696         args.add( "Remove from extent map failed." );
697         args.add("");
698         args.add("");
699         message.format( args );
700 
701         result.result = WARNING;
702         result.message = message;
703         fSessionManager.rolledback(txnID);
704         fWEClient->removeQueue(uniqueId);
705         return result;
706     }
707 
708     //Remove the log file
709     fWEClient->removeQueue(uniqueId);
710     deleteLogFile(DROPTABLE_LOG, roPair.objnum, uniqueId);
711     //release the transaction
712     //fSessionManager.committed(txnID);
713     returnOIDs( tableColRidList, dictOIDList );
714     return result;
715 
716 }
717 
processPackage(ddlpackage::TruncTableStatement & truncTableStmt)718 TruncTableProcessor::DDLResult TruncTableProcessor::processPackage(ddlpackage::TruncTableStatement& truncTableStmt)
719 {
720     SUMMARY_INFO("TruncTableProcessor::processPackage");
721     // 1. lock the table
722     // 2. Get the OIDs for the columns
723     // 3. Get the OIDs for the dictionaries
724     // 4. Save the OIDs
725     // 5. Disable all partitions
726     // 6. Remove the column and dictionary files
727     // 7. Flush PrimProc Cache
728     // 8. Update extent map
729     // 9. Use the OIDs to create new column and dictionary files with abbreviate extent
730     // 10 Update next value if the table has autoincrement column
731 
732     DDLResult result;
733     result.result = NO_ERROR;
734     std::string err;
735     VERBOSE_INFO(truncTableStmt);
736 
737     // @Bug 4150. Check dbrm status before doing anything to the table.
738     int rc = 0;
739     rc = fDbrm->isReadWrite();
740     BRM::TxnID txnID;
741     txnID.id = fTxnid.id;
742     txnID.valid = fTxnid.valid;
743 
744     if (rc != 0 )
745     {
746         Message::Args args;
747         Message message(9);
748         args.add("Unable to execute the statement due to DBRM is read only");
749         message.format(args);
750         result.result = DROP_ERROR;
751         result.message = message;
752         fSessionManager.rolledback(txnID);
753         return result;
754     }
755 
756     //@Bug 5765 log the schema.
757     string stmt = truncTableStmt.fSql + "|" + truncTableStmt.fTableName->fSchema + "|";
758     SQLLogger logger(stmt, fDDLLoggingId, truncTableStmt.fSessionID, txnID.id);
759 
760     std::vector <CalpontSystemCatalog::OID> columnOidList;
761     std::vector <CalpontSystemCatalog::OID> allOidList;
762     CalpontSystemCatalog::RIDList tableColRidList;
763     CalpontSystemCatalog::DictOIDList dictOIDList;
764     execplan::CalpontSystemCatalog::ROPair roPair;
765     std::string  processName("DDLProc");
766     uint32_t  processID = ::getpid();;
767     int32_t   txnid = txnID.id;
768     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(truncTableStmt.fSessionID);
769     systemCatalogPtr->identity(CalpontSystemCatalog::EC);
770     systemCatalogPtr->sessionID(truncTableStmt.fSessionID);
771     CalpontSystemCatalog::TableInfo tableInfo;
772     uint64_t uniqueId = 0;
773 
774     //Bug 5070. Added exception handling
775     try
776     {
777         uniqueId = fDbrm->getUnique64();
778     }
779     catch (std::exception& ex)
780     {
781         Message::Args args;
782         Message message(9);
783         args.add(ex.what());
784         message.format(args);
785         result.result = DROP_ERROR;
786         result.message = message;
787         fSessionManager.rolledback(txnID);
788         return result;
789     }
790     catch ( ... )
791     {
792         Message::Args args;
793         Message message(9);
794         args.add("Unknown error occured while getting unique number.");
795         message.format(args);
796         result.result = DROP_ERROR;
797         result.message = message;
798         fSessionManager.rolledback(txnID);
799         return result;
800     }
801 
802     fWEClient->addQueue(uniqueId);
803     int pmNum = 1;
804     boost::shared_ptr<messageqcpp::ByteStream> bsIn;
805     string errorMsg;
806     uint32_t autoIncColOid = 0;
807     uint64_t tableLockId = 0;
808     OamCache* oamcache = OamCache::makeOamCache();
809     std::vector<int> moduleIds = oamcache->getModuleIds();
810 
811     try
812     {
813         //check table lock
814 
815         CalpontSystemCatalog::TableName tableName;
816         tableName.schema = truncTableStmt.fTableName->fSchema;
817         tableName.table = truncTableStmt.fTableName->fName;
818         roPair = systemCatalogPtr->tableRID( tableName );
819         int32_t sessionId = truncTableStmt.fSessionID;
820         std::string  processName("DDLProc");
821         int i = 0;
822 
823         std::vector<uint32_t> pms;
824 
825         for (unsigned i = 0; i < moduleIds.size(); i++)
826         {
827             pms.push_back((uint32_t)moduleIds[i]);
828         }
829 
830         try
831         {
832             tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING );
833         }
834         catch (std::exception&)
835         {
836             throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
837         }
838 
839         if ( tableLockId  == 0 )
840         {
841             int waitPeriod = 10;
842             int sleepTime = 100; // sleep 100 milliseconds between checks
843             int numTries = 10;  // try 10 times per second
844             waitPeriod = Config::getWaitPeriod();
845             numTries = 	waitPeriod * 10;
846             struct timespec rm_ts;
847 
848             rm_ts.tv_sec = sleepTime / 1000;
849             rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
850 
851             for (; i < numTries; i++)
852             {
853 #ifdef _MSC_VER
854                 Sleep(rm_ts.tv_sec * 1000);
855 #else
856                 struct timespec abs_ts;
857 
858                 do
859                 {
860                     abs_ts.tv_sec = rm_ts.tv_sec;
861                     abs_ts.tv_nsec = rm_ts.tv_nsec;
862                 }
863                 while (nanosleep(&abs_ts, &rm_ts) < 0);
864 
865 #endif
866 
867                 try
868                 {
869                     processID = ::getpid();
870                     txnid = txnID.id;
871                     sessionId = truncTableStmt.fSessionID;
872                     processName = "DDLProc";
873                     tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING );
874                 }
875                 catch (std::exception&)
876                 {
877                     throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
878                 }
879 
880                 if (tableLockId > 0)
881                     break;
882             }
883 
884             if (i >= numTries) //error out
885             {
886                 Message::Args args;
887                 args.add(processName);
888                 args.add((uint64_t)processID);
889                 args.add(sessionId);
890                 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
891             }
892         }
893 
894         CalpontSystemCatalog::TableName userTableName;
895         userTableName.schema = truncTableStmt.fTableName->fSchema;
896         userTableName.table = truncTableStmt.fTableName->fName;
897 
898         tableColRidList = systemCatalogPtr->columnRIDs( userTableName );
899 
900         dictOIDList = systemCatalogPtr->dictOIDs( userTableName );
901 
902         for ( unsigned i = 0; i < tableColRidList.size(); i++ )
903         {
904             if ( tableColRidList[i].objnum > 3000 )
905             {
906                 columnOidList.push_back( tableColRidList[i].objnum );
907                 allOidList.push_back( tableColRidList[i].objnum );
908             }
909         }
910 
911         for ( unsigned i = 0; i < dictOIDList.size(); i++ )
912         {
913             if (  dictOIDList[i].dictOID > 3000 )
914                 allOidList.push_back( dictOIDList[i].dictOID );
915         }
916 
917         //Check whether the table has autoincrement column
918         tableInfo = systemCatalogPtr->tableInfo(userTableName);
919     }
920     catch (std::exception& ex)
921     {
922         cerr << "TruncateTableProcessor::processPackage: " << ex.what() << endl;
923 
924         Message::Args args;
925         Message message(9);
926         args.add("Truncate table failed: ");
927         args.add( ex.what() );
928         args.add("");
929         fSessionManager.rolledback(txnID);
930 
931         try
932         {
933             (void)fDbrm->releaseTableLock(tableLockId);
934         }
935         catch (std::exception&)
936         {
937             args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
938         }
939 
940         fWEClient->removeQueue(uniqueId);
941         message.format( args );
942 
943         result.result = TRUNC_ERROR;
944         result.message = message;
945         return result;
946     }
947     catch (...)
948     {
949         cerr << "TruncateTableProcessor::processPackage: caught unknown exception!" << endl;
950 
951         Message::Args args;
952         Message message(1);
953         args.add("Truncate table failed: ");
954         args.add( "encountered unkown exception" );
955         args.add("");
956 
957         try
958         {
959             (void)fDbrm->releaseTableLock(tableLockId);
960         }
961         catch (std::exception&)
962         {
963             args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
964         }
965 
966         fWEClient->removeQueue(uniqueId);
967         message.format( args );
968 
969         result.result = TRUNC_ERROR;
970         result.message = message;
971         return result;
972     }
973 
974     //Save the oids to a file
975     try
976     {
977         createWriteTruncateTableLogFile( roPair.objnum, uniqueId, allOidList);
978     }
979     catch (std::exception& ex)
980     {
981         Message::Args args;
982         Message message(9);
983         args.add("Truncate table failed due to ");
984         args.add(ex.what());
985         fSessionManager.rolledback(txnID);
986         fWEClient->removeQueue(uniqueId);
987         message.format( args );
988 
989         //@bug 4515 Release the tablelock as nothing has done to this table.
990         try
991         {
992             (void)fDbrm->releaseTableLock(tableLockId);
993         }
994         catch (std::exception&) {}
995 
996         result.result = TRUNC_ERROR;
997         result.message = message;
998         return result;
999     }
1000 
1001     ByteStream bytestream;
1002     ByteStream::byte tmp8;
1003 
1004     // MCOL-66 The DBRM can't handle concurrent DDL
1005     boost::mutex::scoped_lock lk(dbrmMutex);
1006 
1007     try
1008     {
1009         //Disable extents first
1010         int rc1 = fDbrm->markAllPartitionForDeletion( allOidList);
1011 
1012         if (rc1 != 0)
1013         {
1014             string errMsg;
1015             BRM::errString(rc, errMsg);
1016             throw std::runtime_error(errMsg);
1017         }
1018 
1019         // Bug 4208 Drop the PrimProcFDCache before droping the column files
1020         // FOr Windows, this ensures (most likely) that the column files have
1021         // no open handles to hinder the deletion of the files.
1022         rc = cacheutils::dropPrimProcFdCache();
1023 
1024         VERBOSE_INFO("Removing files");
1025         bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
1026         bytestream << uniqueId;
1027         bytestream << (uint32_t) allOidList.size();
1028 
1029         for (unsigned i = 0; i < allOidList.size(); i++)
1030         {
1031             bytestream << (uint32_t) allOidList[i];
1032         }
1033 
1034         uint32_t msgRecived = 0;
1035 
1036         try
1037         {
1038             fWEClient->write_to_all(bytestream);
1039 
1040             bsIn.reset(new ByteStream());
1041 
1042             while (1)
1043             {
1044                 if (msgRecived == fWEClient->getPmCount())
1045                     break;
1046 
1047                 fWEClient->read(uniqueId, bsIn);
1048 
1049                 if ( bsIn->length() == 0 ) //read error
1050                 {
1051                     rc = NETWORK_ERROR;
1052                     fWEClient->removeQueue(uniqueId);
1053                     break;
1054                 }
1055                 else
1056                 {
1057                     *bsIn >> tmp8;
1058                     rc = tmp8;
1059 
1060                     if (rc != 0)
1061                     {
1062                         *bsIn >> errorMsg;
1063                         fWEClient->removeQueue(uniqueId);
1064                         break;
1065                     }
1066                     else
1067                         msgRecived++;
1068                 }
1069             }
1070         }
1071         catch (std::exception& ex)
1072         {
1073             Message::Args args;
1074             Message message(9);
1075             args.add("Truncate table failed due to ");
1076             args.add(ex.what());
1077             fSessionManager.rolledback(txnID);
1078             fWEClient->removeQueue(uniqueId);
1079             message.format( args );
1080 
1081             result.result = TRUNC_ERROR;
1082             result.message = message;
1083             deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId);
1084             return result;
1085         }
1086         catch (...)
1087         {
1088             result.result = DROP_ERROR;
1089             errorMsg = "Error in getting information from system catalog or from dbrm.";
1090             Message::Args args;
1091             Message message(9);
1092             args.add("Truncate table failed due to ");
1093             args.add(errorMsg);
1094             fSessionManager.rolledback(txnID);
1095             fWEClient->removeQueue(uniqueId);
1096             message.format( args );
1097 
1098             result.result = TRUNC_ERROR;
1099             result.message = message;
1100             deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId);
1101             return result;
1102         }
1103 
1104         //Drop PrimProc FD cache
1105         rc = cacheutils::dropPrimProcFdCache();
1106         //Flush primProc cache
1107         rc = cacheutils::flushOIDsFromCache( allOidList );
1108         //Delete extents from extent map
1109         rc = fDbrm->deleteOIDs(allOidList);
1110 
1111         if (rc != 0)
1112         {
1113             Message::Args args;
1114             Message message(1);
1115             args.add("Table truncated with warning ");
1116             args.add( "Remove from extent map failed." );
1117             args.add("");
1118             args.add("");
1119             message.format( args );
1120 
1121             result.result = WARNING;
1122             result.message = message;
1123             fSessionManager.rolledback(txnID);
1124             fWEClient->removeQueue(uniqueId);
1125             return result;
1126         }
1127 
1128         //Get the number of tables in the database, the current table is included.
1129         int tableCount = systemCatalogPtr->getTableCount();
1130         Oam oam;
1131         //Calculate which dbroot the columns should start
1132         DBRootConfigList dbRootList = oamcache->getDBRootNums();
1133 
1134         uint16_t useDBRootIndex = tableCount % dbRootList.size();
1135         //Find out the dbroot# corresponding the useDBRootIndex from oam
1136         uint16_t useDBRoot = dbRootList[useDBRootIndex];
1137 
1138         bytestream.restart();
1139         bytestream << (ByteStream::byte) WE_SVR_WRITE_CREATETABLEFILES;
1140         bytestream << uniqueId;
1141         bytestream << (uint32_t)txnID.id;
1142         uint32_t numOids = columnOidList.size() + dictOIDList.size();
1143         bytestream << numOids;
1144         CalpontSystemCatalog::ColType colType;
1145 
1146         for (unsigned col  = 0; col < columnOidList.size(); col++)
1147         {
1148             colType = systemCatalogPtr->colType(columnOidList[col]);
1149 
1150             if (colType.autoincrement)
1151                 autoIncColOid = colType.columnOID;
1152 
1153             bytestream << (uint32_t)columnOidList[col];
1154             bytestream << (uint8_t) colType.colDataType;
1155             bytestream << (uint8_t) false;
1156             bytestream << (uint32_t) colType.colWidth;
1157             bytestream << (uint16_t) useDBRoot;
1158             bytestream << (uint32_t) colType.compressionType;
1159         }
1160 
1161         for (unsigned col  = 0; col < dictOIDList.size(); col++)
1162         {
1163             colType = systemCatalogPtr->colTypeDct(dictOIDList[col].dictOID);
1164             bytestream << (uint32_t) dictOIDList[col].dictOID;
1165             bytestream << (uint8_t) colType.colDataType;
1166             bytestream << (uint8_t) true;
1167             bytestream << (uint32_t) colType.colWidth;
1168             bytestream << (uint16_t) useDBRoot;
1169             bytestream << (uint32_t) colType.compressionType;
1170         }
1171 
1172         boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
1173         pmNum = (*dbRootPMMap)[useDBRoot];
1174 
1175         try
1176         {
1177 #ifdef IDB_DDL_DEBUG
1178             cout << "Truncate table sending We_SVR_WRITE_CREATETABLEFILES to pm " << pmNum << endl;
1179 #endif
1180             fWEClient->write(bytestream, pmNum);
1181 
1182             while (1)
1183             {
1184                 bsIn.reset(new ByteStream());
1185                 fWEClient->read(uniqueId, bsIn);
1186 
1187                 if ( bsIn->length() == 0 ) //read error
1188                 {
1189                     rc = NETWORK_ERROR;
1190                     errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
1191                     break;
1192                 }
1193                 else
1194                 {
1195                     *bsIn >> tmp8;
1196                     rc = tmp8;
1197 
1198                     if (rc != 0)
1199                     {
1200                         *bsIn >> errorMsg;
1201                     }
1202 
1203                     break;
1204                 }
1205             }
1206 
1207             if (rc != 0)
1208             {
1209                 //drop the newly created files
1210                 bytestream.restart();
1211                 bytestream << (ByteStream::byte) WE_SVR_WRITE_DROPFILES;
1212                 bytestream << uniqueId;
1213                 bytestream << (uint32_t)(allOidList.size());
1214 
1215                 for (unsigned i = 0; i < (allOidList.size()); i++)
1216                 {
1217                     bytestream << (uint32_t)(allOidList[i]);
1218                 }
1219 
1220                 fWEClient->write(bytestream, pmNum);
1221 
1222                 while (1)
1223                 {
1224                     bsIn.reset(new ByteStream());
1225                     fWEClient->read(uniqueId, bsIn);
1226 
1227                     if ( bsIn->length() == 0 ) //read error
1228                     {
1229                         break;
1230                     }
1231                     else
1232                     {
1233                         *bsIn >> tmp8;
1234                         //rc = tmp8;
1235                         break;
1236                     }
1237                 }
1238 
1239                 Message::Args args;
1240                 Message message(1);
1241                 args.add( "Truncate table failed." );
1242                 args.add( errorMsg);
1243                 args.add("");
1244                 message.format( args );
1245 
1246                 result.result = TRUNC_ERROR;
1247                 result.message = message;
1248                 //rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, processName, false );
1249                 fSessionManager.rolledback(txnID);
1250                 return result;
1251             }
1252         }
1253         catch (runtime_error&)
1254         {
1255             rc = NETWORK_ERROR;
1256             errorMsg = "Lost connection to Write Engine Server";
1257         }
1258     }
1259 
1260 #ifdef _MSC_VER
1261     catch (std::exception&)
1262     {
1263         //FIXME: Windows can't delete a file that's still open by another process
1264     }
1265 
1266 #else
1267     catch (std::exception& ex)
1268     {
1269         Message::Args args;
1270         Message message(1);
1271         args.add( "Truncate table failed." );
1272         args.add( ex.what() );
1273         args.add("");
1274         message.format( args );
1275 
1276         result.result = TRUNC_ERROR;
1277         result.message = message;
1278         //rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, processName, false );
1279         fSessionManager.rolledback(txnID);
1280         return result;
1281     }
1282 
1283 #endif
1284     catch ( ... )
1285     {
1286         Message::Args args;
1287         Message message(1);
1288         args.add("Truncate table failed: ");
1289         args.add( "Remove column files failed." );
1290         args.add("");
1291         args.add("");
1292         message.format( args );
1293 
1294         result.result = TRUNC_ERROR;
1295         result.message = message;
1296         //rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, processName, false );
1297         fSessionManager.rolledback(txnID);
1298         return result;
1299     }
1300 
1301     if (rc != 0)
1302     {
1303         rollBackTransaction( uniqueId, txnID, truncTableStmt.fSessionID); //What to do with the error code
1304         fSessionManager.rolledback(txnID);
1305     }
1306 
1307     //Check whether the table has autoincrement column
1308     if (tableInfo.tablewithautoincr == 1)
1309     {
1310         //reset nextvalue to 1
1311         WE_DDLCommandClient commandClient;
1312         rc = commandClient.UpdateSyscolumnNextval(autoIncColOid, 1);
1313     }
1314 
1315     // Log the DDL statement
1316     logDDL(truncTableStmt.fSessionID, txnID.id, truncTableStmt.fSql, truncTableStmt.fOwner);
1317 
1318     try
1319     {
1320         (void)fDbrm->releaseTableLock(tableLockId);
1321     }
1322     catch (std::exception&)
1323     {
1324         Message::Args args;
1325         Message message(1);
1326         args.add("Table truncated with warning ");
1327         args.add( "Release table failed." );
1328         args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
1329         args.add("");
1330         message.format( args );
1331 
1332         result.result = WARNING;
1333         result.message = message;
1334         fSessionManager.rolledback(txnID);
1335         fWEClient->removeQueue(uniqueId);
1336     }
1337 
1338     //release the transaction
1339     fSessionManager.committed(txnID);
1340     fWEClient->removeQueue(uniqueId);
1341 
1342     //Remove the log file
1343     try
1344     {
1345         deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId);
1346     }
1347     catch ( ... )
1348     {
1349     }
1350 
1351     return result;
1352 }
1353 
1354 } //namespace ddlpackageprocessor
1355 
1356 // vim:ts=4 sw=4:
1357 
1358