1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /***********************************************************************
19  *   $Id: ddlprocessor.cpp 6 2006-06-23 17:58:51Z rcraighead $
20  *
21  *
22  ***********************************************************************/
23 #include <map>
24 #include <boost/scoped_ptr.hpp>
25 using namespace std;
26 
27 #include "ddlpkg.h"
28 #include "ddlprocessor.h"
29 #include "createtableprocessor.h"
30 #include "altertableprocessor.h"
31 #include "droptableprocessor.h"
32 #include "calpontsystemcatalog.h"
33 #include "sqlparser.h"
34 #include "configcpp.h"
35 #include "markpartitionprocessor.h"
36 #include "restorepartitionprocessor.h"
37 #include "droppartitionprocessor.h"
38 //#define 	SERIALIZE_DDL_DML_CPIMPORT 	1
39 
40 #include "cacheutils.h"
41 #include "vss.h"
42 #include "dbrm.h"
43 #include "idberrorinfo.h"
44 #include "errorids.h"
45 #include "we_messages.h"
46 using namespace BRM;
47 
48 using namespace config;
49 using namespace messageqcpp;
50 using namespace ddlpackageprocessor;
51 using namespace ddlpackage;
52 using namespace execplan;
53 using namespace logging;
54 using namespace WriteEngine;
55 
56 #include "querytele.h"
57 using namespace querytele;
58 
59 #include "oamcache.h"
60 
61 namespace
62 {
63 typedef messageqcpp::ByteStream::quadbyte quadbyte;
64 
65 const quadbyte UNRECOGNIZED_PACKAGE_TYPE = 100;
66 const quadbyte NO_PKNAME_AVAILABLE = 101;
67 
68 const std::string DDLProcName = "DDLProc";
69 
70 
cleanPMSysCache()71 void cleanPMSysCache()
72 {
73     vector<BRM::OID_t> oidList = getAllSysCatOIDs();
74     cacheutils::flushOIDsFromCache ( oidList );
75 }
76 
77 struct PackageHandler
78 {
operator ()__anon021daa920111::PackageHandler79     void operator ()()
80     {
81 
82         DDLPackageProcessor::DDLResult result;
83         result.result = DDLPackageProcessor::NO_ERROR;
84         //boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
85 
86         try
87         {
88             //cout << "DDLProc received package " << fPackageType << endl;
89             switch ( fPackageType )
90             {
91                 case ddlpackage::DDL_CREATE_TABLE_STATEMENT:
92                 {
93                     CreateTableStatement createTableStmt;
94                     createTableStmt.unserialize(fByteStream);
95                     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
96                         CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt.fSessionID );
97                     boost::scoped_ptr<CreateTableProcessor> processor(new CreateTableProcessor(fDbrm));
98                     processor->fTxnid.id = fTxnid.id;
99                     processor->fTxnid.valid = true;
100                     //cout << "create table using txnid " << fTxnid.id << endl;
101 
102                     QueryTeleStats qts;
103                     qts.query_uuid = QueryTeleClient::genUUID();
104                     qts.msg_type = QueryTeleStats::QT_START;
105                     qts.start_time = QueryTeleClient::timeNowms();
106                     qts.session_id = createTableStmt.fSessionID;
107                     qts.query_type = "CREATE";
108                     qts.query = createTableStmt.fSql;
109                     qts.system_name = fOamCache->getSystemName();
110                     qts.module_name = fOamCache->getModuleName();
111                     qts.schema_name = createTableStmt.schemaName();
112                     fQtc.postQueryTele(qts);
113 
114                     result = processor->processPackage(createTableStmt);
115 
116                     systemCatalogPtr->removeCalpontSystemCatalog( createTableStmt.fSessionID );
117                     systemCatalogPtr->removeCalpontSystemCatalog( createTableStmt.fSessionID | 0x80000000);
118 
119                     qts.msg_type = QueryTeleStats::QT_SUMMARY;
120                     qts.end_time = QueryTeleClient::timeNowms();
121                     fQtc.postQueryTele(qts);
122                 }
123                 break;
124 
125                 case ddlpackage::DDL_ALTER_TABLE_STATEMENT:
126                 {
127                     AlterTableStatement alterTableStmt;
128                     alterTableStmt.unserialize(fByteStream);
129                     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
130                         CalpontSystemCatalog::makeCalpontSystemCatalog(alterTableStmt.fSessionID );
131                     boost::scoped_ptr<AlterTableProcessor> processor(new AlterTableProcessor(fDbrm));
132                     processor->fTxnid.id = fTxnid.id;
133                     processor->fTxnid.valid = true;
134 
135                     QueryTeleStats qts;
136                     qts.query_uuid = QueryTeleClient::genUUID();
137                     qts.msg_type = QueryTeleStats::QT_START;
138                     qts.start_time = QueryTeleClient::timeNowms();
139                     qts.session_id = alterTableStmt.fSessionID;
140                     qts.query_type = "ALTER";
141                     qts.query = alterTableStmt.fSql;
142                     qts.system_name = fOamCache->getSystemName();
143                     qts.module_name = fOamCache->getModuleName();
144                     qts.schema_name = alterTableStmt.schemaName();
145                     fQtc.postQueryTele(qts);
146 
147                     processor->fTimeZone = alterTableStmt.fTimeZone;
148 
149                     result = processor->processPackage(alterTableStmt);
150 
151                     systemCatalogPtr->removeCalpontSystemCatalog( alterTableStmt.fSessionID );
152                     systemCatalogPtr->removeCalpontSystemCatalog( alterTableStmt.fSessionID | 0x80000000);
153 
154                     qts.msg_type = QueryTeleStats::QT_SUMMARY;
155                     qts.end_time = QueryTeleClient::timeNowms();
156                     fQtc.postQueryTele(qts);
157                 }
158                 break;
159 
160                 case ddlpackage::DDL_DROP_TABLE_STATEMENT:
161                 {
162                     DropTableStatement dropTableStmt;
163                     dropTableStmt.unserialize(fByteStream);
164                     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
165                         CalpontSystemCatalog::makeCalpontSystemCatalog(dropTableStmt.fSessionID );
166                     boost::scoped_ptr<DropTableProcessor> processor(new DropTableProcessor(fDbrm));
167 
168                     processor->fTxnid.id = fTxnid.id;
169                     processor->fTxnid.valid = true;
170 
171                     QueryTeleStats qts;
172                     qts.query_uuid = QueryTeleClient::genUUID();
173                     qts.msg_type = QueryTeleStats::QT_START;
174                     qts.start_time = QueryTeleClient::timeNowms();
175                     qts.session_id = dropTableStmt.fSessionID;
176                     qts.query_type = "DROP";
177                     qts.query = dropTableStmt.fSql;
178                     qts.system_name = fOamCache->getSystemName();
179                     qts.module_name = fOamCache->getModuleName();
180                     qts.schema_name = dropTableStmt.schemaName();
181                     fQtc.postQueryTele(qts);
182 
183                     //cout << "Drop table using txnid " << fTxnid.id << endl;
184                     result = processor->processPackage(dropTableStmt);
185 
186                     systemCatalogPtr->removeCalpontSystemCatalog( dropTableStmt.fSessionID );
187                     systemCatalogPtr->removeCalpontSystemCatalog( dropTableStmt.fSessionID | 0x80000000);
188 
189                     qts.msg_type = QueryTeleStats::QT_SUMMARY;
190                     qts.end_time = QueryTeleClient::timeNowms();
191                     fQtc.postQueryTele(qts);
192                 }
193                 break;
194 
195                 case ddlpackage::DDL_TRUNC_TABLE_STATEMENT:
196                 {
197                     TruncTableStatement truncTableStmt;
198                     truncTableStmt.unserialize(fByteStream);
199                     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
200                         CalpontSystemCatalog::makeCalpontSystemCatalog(truncTableStmt.fSessionID);
201                     boost::scoped_ptr<TruncTableProcessor> processor(new TruncTableProcessor(fDbrm));
202 
203                     processor->fTxnid.id = fTxnid.id;
204                     processor->fTxnid.valid = true;
205 
206                     QueryTeleStats qts;
207                     qts.query_uuid = QueryTeleClient::genUUID();
208                     qts.msg_type = QueryTeleStats::QT_START;
209                     qts.start_time = QueryTeleClient::timeNowms();
210                     qts.session_id = truncTableStmt.fSessionID;
211                     qts.query_type = "TRUNCATE";
212                     qts.query = truncTableStmt.fSql;
213                     qts.system_name = fOamCache->getSystemName();
214                     qts.module_name = fOamCache->getModuleName();
215                     qts.schema_name = truncTableStmt.schemaName();
216                     fQtc.postQueryTele(qts);
217 
218                     result = processor->processPackage(truncTableStmt);
219 
220                     systemCatalogPtr->removeCalpontSystemCatalog(truncTableStmt.fSessionID );
221                     systemCatalogPtr->removeCalpontSystemCatalog(truncTableStmt.fSessionID | 0x80000000);
222 
223                     qts.msg_type = QueryTeleStats::QT_SUMMARY;
224                     qts.end_time = QueryTeleClient::timeNowms();
225                     fQtc.postQueryTele(qts);
226                 }
227                 break;
228 
229                 case ddlpackage::DDL_MARK_PARTITION_STATEMENT:
230                 {
231                     MarkPartitionStatement markPartitionStmt;
232                     markPartitionStmt.unserialize(fByteStream);
233                     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
234                         CalpontSystemCatalog::makeCalpontSystemCatalog(markPartitionStmt.fSessionID);
235                     boost::scoped_ptr<MarkPartitionProcessor> processor(new MarkPartitionProcessor(fDbrm));
236                     (processor->fTxnid).id = fTxnid.id;
237                     (processor->fTxnid).valid = true;
238                     result = processor->processPackage(markPartitionStmt);
239                     systemCatalogPtr->removeCalpontSystemCatalog(markPartitionStmt.fSessionID );
240                     systemCatalogPtr->removeCalpontSystemCatalog(markPartitionStmt.fSessionID | 0x80000000);
241                 }
242                 break;
243 
244                 case ddlpackage::DDL_RESTORE_PARTITION_STATEMENT:
245                 {
246                     RestorePartitionStatement restorePartitionStmt;
247                     restorePartitionStmt.unserialize(fByteStream);
248                     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
249                         CalpontSystemCatalog::makeCalpontSystemCatalog(restorePartitionStmt.fSessionID);
250                     boost::scoped_ptr<RestorePartitionProcessor> processor(new RestorePartitionProcessor(fDbrm));
251                     (processor->fTxnid).id = fTxnid.id;
252                     (processor->fTxnid).valid = true;
253                     result = processor->processPackage(restorePartitionStmt);
254                     systemCatalogPtr->removeCalpontSystemCatalog(restorePartitionStmt.fSessionID );
255                     systemCatalogPtr->removeCalpontSystemCatalog(restorePartitionStmt.fSessionID | 0x80000000);
256                 }
257                 break;
258 
259                 case ddlpackage::DDL_DROP_PARTITION_STATEMENT:
260                 {
261                     DropPartitionStatement dropPartitionStmt;
262                     dropPartitionStmt.unserialize(fByteStream);
263                     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
264                         CalpontSystemCatalog::makeCalpontSystemCatalog(dropPartitionStmt.fSessionID);
265                     boost::scoped_ptr<DropPartitionProcessor> processor(new DropPartitionProcessor(fDbrm));
266                     (processor->fTxnid).id = fTxnid.id;
267                     (processor->fTxnid).valid = true;
268                     result = processor->processPackage(dropPartitionStmt);
269                     systemCatalogPtr->removeCalpontSystemCatalog(dropPartitionStmt.fSessionID );
270                     systemCatalogPtr->removeCalpontSystemCatalog(dropPartitionStmt.fSessionID | 0x80000000);
271                 }
272                 break;
273 
274                 default:
275                     throw UNRECOGNIZED_PACKAGE_TYPE;
276                     break;
277             }
278 
279             //@Bug 3427. No need to log user rror, just return the message to user.
280             //Log errors
281             if ((result.result != DDLPackageProcessor::NO_ERROR) && (result.result != DDLPackageProcessor::USER_ERROR))
282             {
283                 logging::LoggingID lid(23);
284                 logging::MessageLog ml(lid);
285 
286                 ml.logErrorMessage( result.message );
287             }
288 
289             string hdfstest = config::Config::makeConfig()->getConfig("Installation", "DBRootStorageType");
290 
291             if (hdfstest == "hdfs" || hdfstest == "HDFS")
292                 cleanPMSysCache();
293 
294             messageqcpp::ByteStream results;
295             messageqcpp::ByteStream::byte status =  result.result;
296             results << status;
297             results << result.message.msg();
298 
299             fIos.write(results);
300 
301             fIos.close();
302         }
303         catch (quadbyte& /*foo*/)
304         {
305             fIos.close();
306             cout << "Unrecognized package type" << endl;
307         }
308         catch (logging::IDBExcept& idbEx)
309         {
310             cleanPMSysCache();
311             messageqcpp::ByteStream results;
312             messageqcpp::ByteStream::byte status = DDLPackageProcessor::CREATE_ERROR;
313             results << status;
314             results << string(idbEx.what());
315 
316             fIos.write(results);
317 
318             fIos.close();
319         }
320         catch (...)
321         {
322             fIos.close();
323         }
324 
325     }
326 
327     messageqcpp::IOSocket fIos;
328     messageqcpp::ByteStream fByteStream;
329     messageqcpp::ByteStream::quadbyte fPackageType;
330     BRM::TxnID fTxnid;
331     BRM::DBRM* fDbrm;
332     QueryTeleClient fQtc;
333     oam::OamCache* fOamCache;
334 };
335 
336 }
337 
338 namespace ddlprocessor
339 {
340 
DDLProcessor(int packageMaxThreads,int packageWorkQueueSize)341 DDLProcessor::DDLProcessor( int packageMaxThreads, int packageWorkQueueSize )
342     : fPackageMaxThreads(packageMaxThreads), fPackageWorkQueueSize(packageWorkQueueSize),
343       fMqServer(DDLProcName)
344 {
345     fDdlPackagepool.setMaxThreads(fPackageMaxThreads);
346     fDdlPackagepool.setQueueSize(fPackageWorkQueueSize);
347     fDdlPackagepool.setName("DdlPackagepool");
348     csc = CalpontSystemCatalog::makeCalpontSystemCatalog();
349     csc->identity(CalpontSystemCatalog::EC);
350     string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));
351 
352     if (!teleServerHost.empty())
353     {
354         int teleServerPort = config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port"));
355 
356         if (teleServerPort > 0)
357         {
358             fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort));
359         }
360     }
361 }
362 
~DDLProcessor()363 DDLProcessor::~DDLProcessor()
364 {
365 
366 }
process()367 void DDLProcessor::process()
368 {
369     DBRM dbrm;
370     messageqcpp::IOSocket ios;
371     messageqcpp::ByteStream bs;
372     PackageHandler handler;
373     messageqcpp::ByteStream::quadbyte packageType;
374     bool concurrentSupport = true;
375     string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions");
376 
377     if ( concurrentTranStr.length() != 0 )
378     {
379         if ((concurrentTranStr.compare("N") == 0) || (concurrentTranStr.compare("n") == 0))
380             concurrentSupport = false;
381     }
382 
383     cout << "DDLProc is ready..." << endl;
384 
385     try
386     {
387         for (;;)
388         {
389             ios = fMqServer.accept();
390             bs = ios.read();
391             uint32_t sessionID;
392             bs >> sessionID;
393             bs >> packageType;
394 
395             uint32_t stateFlags;
396 
397             if (dbrm.getSystemState(stateFlags) > 0)		// > 0 implies succesful retrieval. It doesn't imply anything about the contents
398             {
399                 messageqcpp::ByteStream results;
400                 const char* responseMsg = 0;
401                 messageqcpp::ByteStream::byte status;
402                 bool bReject = false;
403 
404                 // Check to see if we're in write suspended mode
405                 // If so, we can't process the request.
406                 if (stateFlags & SessionManagerServer::SS_SUSPENDED)
407                 {
408                     status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
409                     responseMsg = "Writing to the database is disabled.";
410                     bReject = true;
411                 }
412 
413                 // Check to see if we're in write suspend or shutdown pending mode
414                 if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING
415                         || stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING)
416                 {
417                     if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING)
418                     {
419                         responseMsg = "Writing to the database is disabled.";
420                     }
421                     else
422                     {
423                         responseMsg = "The database is being shut down.";
424                     }
425 
426                     status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
427                     bReject = true;
428                 }
429 
430                 if (bReject)
431                 {
432                     results << status;
433                     //@bug 266
434                     MessageLog logger(LoggingID(27));
435                     logging::Message::Args args;
436                     logging::Message message(2);
437                     args.add(responseMsg);
438                     message.format( args );
439                     results << message.msg();
440                     ios.write(results);
441                     std::cout << responseMsg << endl;
442                     std::cout << "Command rejected. Status " << (int)status << message.msg() << endl;
443                     continue;
444                 }
445             }
446 
447             //check whether the system is ready to process statement.
448             if (dbrm.getSystemReady() < 1)
449             {
450                 messageqcpp::ByteStream results;
451                 messageqcpp::ByteStream::byte status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
452 
453                 results << status;
454                 string msg ("System is not ready yet. Please try again." );
455 
456                 results << msg;
457                 ios.write(results);
458 
459                 ios.close();
460                 continue;
461             }
462 
463             BRM::TxnID txnid;
464             int rc = 0;
465 
466             if (!concurrentSupport)
467             {
468                 //Check if any other active transaction
469                 bool bIsDbrmUp = true;
470                 bool anyOtherActiveTransaction = true;
471                 execplan::SessionManager sessionManager;
472                 BRM::SIDTIDEntry blockingsid;
473                 int i = 0;
474                 int waitPeriod = 10;
475                 //@Bug 2487 Check transaction map every 1/10 second
476 
477                 int sleepTime = 100; // sleep 100 milliseconds between checks
478                 int numTries = 10;  // try 10 times per second
479 
480                 string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
481 
482                 if ( waitPeriodStr.length() != 0 )
483                     waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));
484 
485                 numTries = 	waitPeriod * 10;
486                 struct timespec rm_ts;
487 
488                 rm_ts.tv_sec = sleepTime / 1000;
489                 rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
490 
491                 //cout << "starting i = " << i << endl;
492                 //anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp );
493                 while (anyOtherActiveTransaction)
494                 {
495                     anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
496                                                 blockingsid );
497 
498                     if (anyOtherActiveTransaction)
499                     {
500                         for ( ; i < numTries; i++ )
501                         {
502 #ifdef _MSC_VER
503                             Sleep(rm_ts.tv_sec * 1000);
504 #else
505                             struct timespec abs_ts;
506 
507                             //cout << "session " << sessionID << " nanosleep on package type " << (int)packageType << endl;
508                             do
509                             {
510                                 abs_ts.tv_sec = rm_ts.tv_sec;
511                                 abs_ts.tv_nsec = rm_ts.tv_nsec;
512                             }
513                             while (nanosleep(&abs_ts, &rm_ts) < 0);
514 
515 #endif
516                             anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
517                                                         blockingsid );
518 
519                             if ( !anyOtherActiveTransaction )
520                             {
521                                 //cout << "Ready to process type " << (int)packageType << endl;
522                                 txnid = sessionManager.getTxnID(sessionID);
523 
524                                 if ( !txnid.valid )
525                                 {
526                                     txnid = sessionManager.newTxnID(sessionID, true, true);
527 
528                                     if (txnid.valid)
529                                     {
530                                         //cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl;
531                                         anyOtherActiveTransaction = false;
532                                         break;
533                                     }
534                                     else
535                                     {
536                                         anyOtherActiveTransaction = true;
537                                     }
538                                 }
539                                 else
540                                 {
541                                     string errorMsg;
542                                     rc = commitTransaction(txnid.id, errorMsg);
543 
544                                     if ( rc != 0)
545                                         throw std::runtime_error(errorMsg);
546 
547                                     //need unlock the table.
548                                     std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
549                                     bool lockReleased = true;
550 
551                                     for (unsigned k = 0; k < tableLocks.size(); k++)
552                                     {
553                                         if (tableLocks[k].ownerTxnID == txnid.id)
554                                         {
555                                             lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
556 
557                                             if (!lockReleased)
558                                             {
559                                                 ostringstream os;
560                                                 os << "tablelock id " << tableLocks[k].id << " is not found";
561                                                 throw std::runtime_error(os.str());
562                                             }
563                                         }
564                                     }
565 
566                                     dbrm.committed(txnid);
567                                     txnid = dbrm.newTxnID(sessionID, true, true);
568 
569                                     if (txnid.valid)
570                                     {
571                                         //cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl;
572                                         anyOtherActiveTransaction = false;
573                                         break;
574                                     }
575                                     else
576                                     {
577                                         anyOtherActiveTransaction = true;
578                                     }
579                                 }
580                             }
581                         }
582 
583                         //cout << "ending i = " << i << endl;
584                     }
585                     else
586                     {
587                         //cout << "Ready to process type " << (int)packageType << endl;
588                         txnid = sessionManager.getTxnID(sessionID);
589 
590                         if ( !txnid.valid )
591                         {
592                             txnid = sessionManager.newTxnID(sessionID, true, true);
593 
594                             if (!txnid.valid)
595                             {
596                                 //cout << "cannot get txnid " << (int)packageType << " for session " << sessionID <<  endl;
597                                 anyOtherActiveTransaction = true;
598                             }
599                             else
600                             {
601                                 anyOtherActiveTransaction = false;
602                             }
603                         }
604                         else
605                         {
606                             string errorMsg;
607                             rc = commitTransaction(txnid.id, errorMsg);
608 
609                             if ( rc != 0)
610                                 throw std::runtime_error(errorMsg);
611 
612                             //need unlock the table.
613                             std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
614                             bool lockReleased = true;
615 
616                             for (unsigned k = 0; k < tableLocks.size(); k++)
617                             {
618                                 if (tableLocks[k].ownerTxnID == txnid.id)
619                                 {
620                                     lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
621 
622                                     if (!lockReleased)
623                                     {
624                                         ostringstream os;
625                                         os << "tablelock id " << tableLocks[k].id << " is not found";
626                                         throw std::runtime_error(os.str());
627                                     }
628                                 }
629                             }
630 
631                             sessionManager.committed(txnid);
632                             txnid = sessionManager.newTxnID(sessionID, true, true);
633 
634                             if (!txnid.valid)
635                             {
636                                 //cout << "cannot get txnid " << (int)packageType << " for session " << sessionID <<  endl;
637                                 anyOtherActiveTransaction = true;
638                             }
639                             else
640                             {
641                                 anyOtherActiveTransaction = false;
642                             }
643                         }
644                     }
645 
646                     if ((anyOtherActiveTransaction) && (i >= numTries))
647                     {
648                         //cout << " Erroring out on package type " << (int)packageType << endl;
649                         break;
650                     }
651                 }
652 
653                 if ((anyOtherActiveTransaction) && (i >= numTries))
654                 {
655                     messageqcpp::ByteStream results;
656                     messageqcpp::ByteStream::byte status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
657 
658                     results << status;
659                     Message::Args args;
660                     args.add(static_cast<uint64_t>(blockingsid.sessionid));
661 
662                     results << IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args);
663                     //@Bug 3854 Log to debug.log
664                     LoggingID logid(15, 0, 0);
665                     logging::Message::Args args1;
666                     logging::Message msg(1);
667                     args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args));
668                     msg.format( args1 );
669                     logging::Logger logger(logid.fSubsysID);
670                     logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
671 
672                     ios.write(results);
673 
674                     ios.close();
675                 }
676                 else
677                 {
678                     handler.fIos = ios;
679                     handler.fByteStream = bs;
680                     handler.fPackageType = packageType;
681                     handler.fTxnid = txnid;
682                     handler.fDbrm = &dbrm;
683                     handler.fQtc = fQtc;
684                     handler.fOamCache = oam::OamCache::makeOamCache();
685                     fDdlPackagepool.invoke(handler);
686 
687                 }
688             }
689             else
690             {
691                 txnid = dbrm.getTxnID(sessionID);
692 
693                 if ( !txnid.valid )
694                 {
695                     txnid = dbrm.newTxnID(sessionID, true, true);
696 
697                     if (!txnid.valid)
698                     {
699                         throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
700                     }
701                 }
702                 else
703                 {
704                     string errorMsg;
705                     rc = commitTransaction(txnid.id, errorMsg);
706 
707                     if ( rc != 0)
708                         throw std::runtime_error(errorMsg);
709 
710                     //need unlock the table.
711                     std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
712                     bool lockReleased = true;
713 
714                     for (unsigned k = 0; k < tableLocks.size(); k++)
715                     {
716                         if (tableLocks[k].ownerTxnID == txnid.id)
717                         {
718                             lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
719 
720                             if (!lockReleased)
721                             {
722                                 ostringstream os;
723                                 os << "tablelock id " << tableLocks[k].id << " is not found";
724                                 throw std::runtime_error(os.str());
725                             }
726                         }
727                     }
728 
729                     dbrm.committed(txnid);
730                     txnid = dbrm.newTxnID(sessionID, true, true);
731 
732                     if (!txnid.valid)
733                     {
734                         throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
735                     }
736                 }
737 
738                 handler.fIos = ios;
739                 handler.fByteStream = bs;
740                 handler.fPackageType = packageType;
741                 handler.fTxnid = txnid;
742                 handler.fDbrm = &dbrm;
743                 handler.fQtc = fQtc;
744                 handler.fOamCache = oam::OamCache::makeOamCache();
745                 fDdlPackagepool.invoke(handler);
746             }
747         }
748     }
749     catch (exception& ex)
750     {
751         cerr << ex.what() << endl;
752         messageqcpp::ByteStream results;
753         messageqcpp::ByteStream::byte status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
754 
755         results << status;
756         results << ex.what();
757         ios.write(results);
758 
759         ios.close();
760     }
761     catch (...)
762     {
763         cerr << "Caught unknown exception!" << endl;
764         messageqcpp::ByteStream results;
765         messageqcpp::ByteStream::byte status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
766 
767         results << status;
768         results << "Caught unknown exception!";
769         ios.write(results);
770 
771         ios.close();
772     }
773 
774     // wait for all the threads to exit
775     fDdlPackagepool.wait();
776 }
777 
commitTransaction(uint32_t txnID,std::string & errorMsg)778 int DDLProcessor::commitTransaction(uint32_t txnID, std::string& errorMsg)
779 {
780     fWEClient = new WriteEngine::WEClients(WriteEngine::WEClients::DDLPROC);
781     fPMCount = fWEClient->getPmCount();
782     ByteStream bytestream;
783     DBRM dbrm;
784     uint64_t uniqueId = dbrm.getUnique64();
785     fWEClient->addQueue(uniqueId);
786     bytestream << (ByteStream::byte)WE_SVR_COMMIT_VERSION;
787     bytestream << uniqueId;
788     bytestream << txnID;
789     uint32_t msgRecived = 0;
790     fWEClient->write_to_all(bytestream);
791     boost::shared_ptr<messageqcpp::ByteStream> bsIn;
792     bsIn.reset(new ByteStream());
793     int rc = 0;
794     ByteStream::byte tmp8;
795 
796     while (1)
797     {
798         if (msgRecived == fPMCount)
799             break;
800 
801         fWEClient->read(uniqueId, bsIn);
802 
803         if ( bsIn->length() == 0 ) //read error
804         {
805             rc = 1;
806             errorMsg = "DDL cannot communicate with WES";
807             fWEClient->removeQueue(uniqueId);
808             break;
809         }
810         else
811         {
812             *bsIn >> tmp8;
813             rc = tmp8;
814 
815             if (rc != 0)
816             {
817                 *bsIn >> errorMsg;
818                 fWEClient->removeQueue(uniqueId);
819                 break;
820             }
821             else
822                 msgRecived++;
823         }
824     }
825 
826     delete fWEClient;
827     fWEClient = 0;
828     return rc;
829 }
830 }  // namespace ddlprocessor
831 // vim:ts=4 sw=4:
832 
833