1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /***********************************************************************
19 * $Id: ddlprocessor.cpp 6 2006-06-23 17:58:51Z rcraighead $
20 *
21 *
22 ***********************************************************************/
23 #include <map>
24 #include <boost/scoped_ptr.hpp>
25 using namespace std;
26
27 #include "ddlpkg.h"
28 #include "ddlprocessor.h"
29 #include "createtableprocessor.h"
30 #include "altertableprocessor.h"
31 #include "droptableprocessor.h"
32 #include "calpontsystemcatalog.h"
33 #include "sqlparser.h"
34 #include "configcpp.h"
35 #include "markpartitionprocessor.h"
36 #include "restorepartitionprocessor.h"
37 #include "droppartitionprocessor.h"
38 //#define SERIALIZE_DDL_DML_CPIMPORT 1
39
40 #include "cacheutils.h"
41 #include "vss.h"
42 #include "dbrm.h"
43 #include "idberrorinfo.h"
44 #include "errorids.h"
45 #include "we_messages.h"
46 using namespace BRM;
47
48 using namespace config;
49 using namespace messageqcpp;
50 using namespace ddlpackageprocessor;
51 using namespace ddlpackage;
52 using namespace execplan;
53 using namespace logging;
54 using namespace WriteEngine;
55
56 #include "querytele.h"
57 using namespace querytele;
58
59 #include "oamcache.h"
60
61 namespace
62 {
63 typedef messageqcpp::ByteStream::quadbyte quadbyte;
64
65 const quadbyte UNRECOGNIZED_PACKAGE_TYPE = 100;
66 const quadbyte NO_PKNAME_AVAILABLE = 101;
67
68 const std::string DDLProcName = "DDLProc";
69
70
cleanPMSysCache()71 void cleanPMSysCache()
72 {
73 vector<BRM::OID_t> oidList = getAllSysCatOIDs();
74 cacheutils::flushOIDsFromCache ( oidList );
75 }
76
77 struct PackageHandler
78 {
operator ()__anon021daa920111::PackageHandler79 void operator ()()
80 {
81
82 DDLPackageProcessor::DDLResult result;
83 result.result = DDLPackageProcessor::NO_ERROR;
84 //boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
85
86 try
87 {
88 //cout << "DDLProc received package " << fPackageType << endl;
89 switch ( fPackageType )
90 {
91 case ddlpackage::DDL_CREATE_TABLE_STATEMENT:
92 {
93 CreateTableStatement createTableStmt;
94 createTableStmt.unserialize(fByteStream);
95 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
96 CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt.fSessionID );
97 boost::scoped_ptr<CreateTableProcessor> processor(new CreateTableProcessor(fDbrm));
98 processor->fTxnid.id = fTxnid.id;
99 processor->fTxnid.valid = true;
100 //cout << "create table using txnid " << fTxnid.id << endl;
101
102 QueryTeleStats qts;
103 qts.query_uuid = QueryTeleClient::genUUID();
104 qts.msg_type = QueryTeleStats::QT_START;
105 qts.start_time = QueryTeleClient::timeNowms();
106 qts.session_id = createTableStmt.fSessionID;
107 qts.query_type = "CREATE";
108 qts.query = createTableStmt.fSql;
109 qts.system_name = fOamCache->getSystemName();
110 qts.module_name = fOamCache->getModuleName();
111 qts.schema_name = createTableStmt.schemaName();
112 fQtc.postQueryTele(qts);
113
114 result = processor->processPackage(createTableStmt);
115
116 systemCatalogPtr->removeCalpontSystemCatalog( createTableStmt.fSessionID );
117 systemCatalogPtr->removeCalpontSystemCatalog( createTableStmt.fSessionID | 0x80000000);
118
119 qts.msg_type = QueryTeleStats::QT_SUMMARY;
120 qts.end_time = QueryTeleClient::timeNowms();
121 fQtc.postQueryTele(qts);
122 }
123 break;
124
125 case ddlpackage::DDL_ALTER_TABLE_STATEMENT:
126 {
127 AlterTableStatement alterTableStmt;
128 alterTableStmt.unserialize(fByteStream);
129 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
130 CalpontSystemCatalog::makeCalpontSystemCatalog(alterTableStmt.fSessionID );
131 boost::scoped_ptr<AlterTableProcessor> processor(new AlterTableProcessor(fDbrm));
132 processor->fTxnid.id = fTxnid.id;
133 processor->fTxnid.valid = true;
134
135 QueryTeleStats qts;
136 qts.query_uuid = QueryTeleClient::genUUID();
137 qts.msg_type = QueryTeleStats::QT_START;
138 qts.start_time = QueryTeleClient::timeNowms();
139 qts.session_id = alterTableStmt.fSessionID;
140 qts.query_type = "ALTER";
141 qts.query = alterTableStmt.fSql;
142 qts.system_name = fOamCache->getSystemName();
143 qts.module_name = fOamCache->getModuleName();
144 qts.schema_name = alterTableStmt.schemaName();
145 fQtc.postQueryTele(qts);
146
147 processor->fTimeZone = alterTableStmt.fTimeZone;
148
149 result = processor->processPackage(alterTableStmt);
150
151 systemCatalogPtr->removeCalpontSystemCatalog( alterTableStmt.fSessionID );
152 systemCatalogPtr->removeCalpontSystemCatalog( alterTableStmt.fSessionID | 0x80000000);
153
154 qts.msg_type = QueryTeleStats::QT_SUMMARY;
155 qts.end_time = QueryTeleClient::timeNowms();
156 fQtc.postQueryTele(qts);
157 }
158 break;
159
160 case ddlpackage::DDL_DROP_TABLE_STATEMENT:
161 {
162 DropTableStatement dropTableStmt;
163 dropTableStmt.unserialize(fByteStream);
164 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
165 CalpontSystemCatalog::makeCalpontSystemCatalog(dropTableStmt.fSessionID );
166 boost::scoped_ptr<DropTableProcessor> processor(new DropTableProcessor(fDbrm));
167
168 processor->fTxnid.id = fTxnid.id;
169 processor->fTxnid.valid = true;
170
171 QueryTeleStats qts;
172 qts.query_uuid = QueryTeleClient::genUUID();
173 qts.msg_type = QueryTeleStats::QT_START;
174 qts.start_time = QueryTeleClient::timeNowms();
175 qts.session_id = dropTableStmt.fSessionID;
176 qts.query_type = "DROP";
177 qts.query = dropTableStmt.fSql;
178 qts.system_name = fOamCache->getSystemName();
179 qts.module_name = fOamCache->getModuleName();
180 qts.schema_name = dropTableStmt.schemaName();
181 fQtc.postQueryTele(qts);
182
183 //cout << "Drop table using txnid " << fTxnid.id << endl;
184 result = processor->processPackage(dropTableStmt);
185
186 systemCatalogPtr->removeCalpontSystemCatalog( dropTableStmt.fSessionID );
187 systemCatalogPtr->removeCalpontSystemCatalog( dropTableStmt.fSessionID | 0x80000000);
188
189 qts.msg_type = QueryTeleStats::QT_SUMMARY;
190 qts.end_time = QueryTeleClient::timeNowms();
191 fQtc.postQueryTele(qts);
192 }
193 break;
194
195 case ddlpackage::DDL_TRUNC_TABLE_STATEMENT:
196 {
197 TruncTableStatement truncTableStmt;
198 truncTableStmt.unserialize(fByteStream);
199 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
200 CalpontSystemCatalog::makeCalpontSystemCatalog(truncTableStmt.fSessionID);
201 boost::scoped_ptr<TruncTableProcessor> processor(new TruncTableProcessor(fDbrm));
202
203 processor->fTxnid.id = fTxnid.id;
204 processor->fTxnid.valid = true;
205
206 QueryTeleStats qts;
207 qts.query_uuid = QueryTeleClient::genUUID();
208 qts.msg_type = QueryTeleStats::QT_START;
209 qts.start_time = QueryTeleClient::timeNowms();
210 qts.session_id = truncTableStmt.fSessionID;
211 qts.query_type = "TRUNCATE";
212 qts.query = truncTableStmt.fSql;
213 qts.system_name = fOamCache->getSystemName();
214 qts.module_name = fOamCache->getModuleName();
215 qts.schema_name = truncTableStmt.schemaName();
216 fQtc.postQueryTele(qts);
217
218 result = processor->processPackage(truncTableStmt);
219
220 systemCatalogPtr->removeCalpontSystemCatalog(truncTableStmt.fSessionID );
221 systemCatalogPtr->removeCalpontSystemCatalog(truncTableStmt.fSessionID | 0x80000000);
222
223 qts.msg_type = QueryTeleStats::QT_SUMMARY;
224 qts.end_time = QueryTeleClient::timeNowms();
225 fQtc.postQueryTele(qts);
226 }
227 break;
228
229 case ddlpackage::DDL_MARK_PARTITION_STATEMENT:
230 {
231 MarkPartitionStatement markPartitionStmt;
232 markPartitionStmt.unserialize(fByteStream);
233 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
234 CalpontSystemCatalog::makeCalpontSystemCatalog(markPartitionStmt.fSessionID);
235 boost::scoped_ptr<MarkPartitionProcessor> processor(new MarkPartitionProcessor(fDbrm));
236 (processor->fTxnid).id = fTxnid.id;
237 (processor->fTxnid).valid = true;
238 result = processor->processPackage(markPartitionStmt);
239 systemCatalogPtr->removeCalpontSystemCatalog(markPartitionStmt.fSessionID );
240 systemCatalogPtr->removeCalpontSystemCatalog(markPartitionStmt.fSessionID | 0x80000000);
241 }
242 break;
243
244 case ddlpackage::DDL_RESTORE_PARTITION_STATEMENT:
245 {
246 RestorePartitionStatement restorePartitionStmt;
247 restorePartitionStmt.unserialize(fByteStream);
248 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
249 CalpontSystemCatalog::makeCalpontSystemCatalog(restorePartitionStmt.fSessionID);
250 boost::scoped_ptr<RestorePartitionProcessor> processor(new RestorePartitionProcessor(fDbrm));
251 (processor->fTxnid).id = fTxnid.id;
252 (processor->fTxnid).valid = true;
253 result = processor->processPackage(restorePartitionStmt);
254 systemCatalogPtr->removeCalpontSystemCatalog(restorePartitionStmt.fSessionID );
255 systemCatalogPtr->removeCalpontSystemCatalog(restorePartitionStmt.fSessionID | 0x80000000);
256 }
257 break;
258
259 case ddlpackage::DDL_DROP_PARTITION_STATEMENT:
260 {
261 DropPartitionStatement dropPartitionStmt;
262 dropPartitionStmt.unserialize(fByteStream);
263 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
264 CalpontSystemCatalog::makeCalpontSystemCatalog(dropPartitionStmt.fSessionID);
265 boost::scoped_ptr<DropPartitionProcessor> processor(new DropPartitionProcessor(fDbrm));
266 (processor->fTxnid).id = fTxnid.id;
267 (processor->fTxnid).valid = true;
268 result = processor->processPackage(dropPartitionStmt);
269 systemCatalogPtr->removeCalpontSystemCatalog(dropPartitionStmt.fSessionID );
270 systemCatalogPtr->removeCalpontSystemCatalog(dropPartitionStmt.fSessionID | 0x80000000);
271 }
272 break;
273
274 default:
275 throw UNRECOGNIZED_PACKAGE_TYPE;
276 break;
277 }
278
279 //@Bug 3427. No need to log user rror, just return the message to user.
280 //Log errors
281 if ((result.result != DDLPackageProcessor::NO_ERROR) && (result.result != DDLPackageProcessor::USER_ERROR))
282 {
283 logging::LoggingID lid(23);
284 logging::MessageLog ml(lid);
285
286 ml.logErrorMessage( result.message );
287 }
288
289 string hdfstest = config::Config::makeConfig()->getConfig("Installation", "DBRootStorageType");
290
291 if (hdfstest == "hdfs" || hdfstest == "HDFS")
292 cleanPMSysCache();
293
294 messageqcpp::ByteStream results;
295 messageqcpp::ByteStream::byte status = result.result;
296 results << status;
297 results << result.message.msg();
298
299 fIos.write(results);
300
301 fIos.close();
302 }
303 catch (quadbyte& /*foo*/)
304 {
305 fIos.close();
306 cout << "Unrecognized package type" << endl;
307 }
308 catch (logging::IDBExcept& idbEx)
309 {
310 cleanPMSysCache();
311 messageqcpp::ByteStream results;
312 messageqcpp::ByteStream::byte status = DDLPackageProcessor::CREATE_ERROR;
313 results << status;
314 results << string(idbEx.what());
315
316 fIos.write(results);
317
318 fIos.close();
319 }
320 catch (...)
321 {
322 fIos.close();
323 }
324
325 }
326
327 messageqcpp::IOSocket fIos;
328 messageqcpp::ByteStream fByteStream;
329 messageqcpp::ByteStream::quadbyte fPackageType;
330 BRM::TxnID fTxnid;
331 BRM::DBRM* fDbrm;
332 QueryTeleClient fQtc;
333 oam::OamCache* fOamCache;
334 };
335
336 }
337
338 namespace ddlprocessor
339 {
340
DDLProcessor(int packageMaxThreads,int packageWorkQueueSize)341 DDLProcessor::DDLProcessor( int packageMaxThreads, int packageWorkQueueSize )
342 : fPackageMaxThreads(packageMaxThreads), fPackageWorkQueueSize(packageWorkQueueSize),
343 fMqServer(DDLProcName)
344 {
345 fDdlPackagepool.setMaxThreads(fPackageMaxThreads);
346 fDdlPackagepool.setQueueSize(fPackageWorkQueueSize);
347 fDdlPackagepool.setName("DdlPackagepool");
348 csc = CalpontSystemCatalog::makeCalpontSystemCatalog();
349 csc->identity(CalpontSystemCatalog::EC);
350 string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));
351
352 if (!teleServerHost.empty())
353 {
354 int teleServerPort = config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port"));
355
356 if (teleServerPort > 0)
357 {
358 fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort));
359 }
360 }
361 }
362
~DDLProcessor()363 DDLProcessor::~DDLProcessor()
364 {
365
366 }
process()367 void DDLProcessor::process()
368 {
369 DBRM dbrm;
370 messageqcpp::IOSocket ios;
371 messageqcpp::ByteStream bs;
372 PackageHandler handler;
373 messageqcpp::ByteStream::quadbyte packageType;
374 bool concurrentSupport = true;
375 string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions");
376
377 if ( concurrentTranStr.length() != 0 )
378 {
379 if ((concurrentTranStr.compare("N") == 0) || (concurrentTranStr.compare("n") == 0))
380 concurrentSupport = false;
381 }
382
383 cout << "DDLProc is ready..." << endl;
384
385 try
386 {
387 for (;;)
388 {
389 ios = fMqServer.accept();
390 bs = ios.read();
391 uint32_t sessionID;
392 bs >> sessionID;
393 bs >> packageType;
394
395 uint32_t stateFlags;
396
397 if (dbrm.getSystemState(stateFlags) > 0) // > 0 implies succesful retrieval. It doesn't imply anything about the contents
398 {
399 messageqcpp::ByteStream results;
400 const char* responseMsg = 0;
401 messageqcpp::ByteStream::byte status;
402 bool bReject = false;
403
404 // Check to see if we're in write suspended mode
405 // If so, we can't process the request.
406 if (stateFlags & SessionManagerServer::SS_SUSPENDED)
407 {
408 status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
409 responseMsg = "Writing to the database is disabled.";
410 bReject = true;
411 }
412
413 // Check to see if we're in write suspend or shutdown pending mode
414 if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING
415 || stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING)
416 {
417 if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING)
418 {
419 responseMsg = "Writing to the database is disabled.";
420 }
421 else
422 {
423 responseMsg = "The database is being shut down.";
424 }
425
426 status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
427 bReject = true;
428 }
429
430 if (bReject)
431 {
432 results << status;
433 //@bug 266
434 MessageLog logger(LoggingID(27));
435 logging::Message::Args args;
436 logging::Message message(2);
437 args.add(responseMsg);
438 message.format( args );
439 results << message.msg();
440 ios.write(results);
441 std::cout << responseMsg << endl;
442 std::cout << "Command rejected. Status " << (int)status << message.msg() << endl;
443 continue;
444 }
445 }
446
447 //check whether the system is ready to process statement.
448 if (dbrm.getSystemReady() < 1)
449 {
450 messageqcpp::ByteStream results;
451 messageqcpp::ByteStream::byte status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
452
453 results << status;
454 string msg ("System is not ready yet. Please try again." );
455
456 results << msg;
457 ios.write(results);
458
459 ios.close();
460 continue;
461 }
462
463 BRM::TxnID txnid;
464 int rc = 0;
465
466 if (!concurrentSupport)
467 {
468 //Check if any other active transaction
469 bool bIsDbrmUp = true;
470 bool anyOtherActiveTransaction = true;
471 execplan::SessionManager sessionManager;
472 BRM::SIDTIDEntry blockingsid;
473 int i = 0;
474 int waitPeriod = 10;
475 //@Bug 2487 Check transaction map every 1/10 second
476
477 int sleepTime = 100; // sleep 100 milliseconds between checks
478 int numTries = 10; // try 10 times per second
479
480 string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
481
482 if ( waitPeriodStr.length() != 0 )
483 waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));
484
485 numTries = waitPeriod * 10;
486 struct timespec rm_ts;
487
488 rm_ts.tv_sec = sleepTime / 1000;
489 rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
490
491 //cout << "starting i = " << i << endl;
492 //anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp );
493 while (anyOtherActiveTransaction)
494 {
495 anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
496 blockingsid );
497
498 if (anyOtherActiveTransaction)
499 {
500 for ( ; i < numTries; i++ )
501 {
502 #ifdef _MSC_VER
503 Sleep(rm_ts.tv_sec * 1000);
504 #else
505 struct timespec abs_ts;
506
507 //cout << "session " << sessionID << " nanosleep on package type " << (int)packageType << endl;
508 do
509 {
510 abs_ts.tv_sec = rm_ts.tv_sec;
511 abs_ts.tv_nsec = rm_ts.tv_nsec;
512 }
513 while (nanosleep(&abs_ts, &rm_ts) < 0);
514
515 #endif
516 anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
517 blockingsid );
518
519 if ( !anyOtherActiveTransaction )
520 {
521 //cout << "Ready to process type " << (int)packageType << endl;
522 txnid = sessionManager.getTxnID(sessionID);
523
524 if ( !txnid.valid )
525 {
526 txnid = sessionManager.newTxnID(sessionID, true, true);
527
528 if (txnid.valid)
529 {
530 //cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl;
531 anyOtherActiveTransaction = false;
532 break;
533 }
534 else
535 {
536 anyOtherActiveTransaction = true;
537 }
538 }
539 else
540 {
541 string errorMsg;
542 rc = commitTransaction(txnid.id, errorMsg);
543
544 if ( rc != 0)
545 throw std::runtime_error(errorMsg);
546
547 //need unlock the table.
548 std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
549 bool lockReleased = true;
550
551 for (unsigned k = 0; k < tableLocks.size(); k++)
552 {
553 if (tableLocks[k].ownerTxnID == txnid.id)
554 {
555 lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
556
557 if (!lockReleased)
558 {
559 ostringstream os;
560 os << "tablelock id " << tableLocks[k].id << " is not found";
561 throw std::runtime_error(os.str());
562 }
563 }
564 }
565
566 dbrm.committed(txnid);
567 txnid = dbrm.newTxnID(sessionID, true, true);
568
569 if (txnid.valid)
570 {
571 //cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl;
572 anyOtherActiveTransaction = false;
573 break;
574 }
575 else
576 {
577 anyOtherActiveTransaction = true;
578 }
579 }
580 }
581 }
582
583 //cout << "ending i = " << i << endl;
584 }
585 else
586 {
587 //cout << "Ready to process type " << (int)packageType << endl;
588 txnid = sessionManager.getTxnID(sessionID);
589
590 if ( !txnid.valid )
591 {
592 txnid = sessionManager.newTxnID(sessionID, true, true);
593
594 if (!txnid.valid)
595 {
596 //cout << "cannot get txnid " << (int)packageType << " for session " << sessionID << endl;
597 anyOtherActiveTransaction = true;
598 }
599 else
600 {
601 anyOtherActiveTransaction = false;
602 }
603 }
604 else
605 {
606 string errorMsg;
607 rc = commitTransaction(txnid.id, errorMsg);
608
609 if ( rc != 0)
610 throw std::runtime_error(errorMsg);
611
612 //need unlock the table.
613 std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
614 bool lockReleased = true;
615
616 for (unsigned k = 0; k < tableLocks.size(); k++)
617 {
618 if (tableLocks[k].ownerTxnID == txnid.id)
619 {
620 lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
621
622 if (!lockReleased)
623 {
624 ostringstream os;
625 os << "tablelock id " << tableLocks[k].id << " is not found";
626 throw std::runtime_error(os.str());
627 }
628 }
629 }
630
631 sessionManager.committed(txnid);
632 txnid = sessionManager.newTxnID(sessionID, true, true);
633
634 if (!txnid.valid)
635 {
636 //cout << "cannot get txnid " << (int)packageType << " for session " << sessionID << endl;
637 anyOtherActiveTransaction = true;
638 }
639 else
640 {
641 anyOtherActiveTransaction = false;
642 }
643 }
644 }
645
646 if ((anyOtherActiveTransaction) && (i >= numTries))
647 {
648 //cout << " Erroring out on package type " << (int)packageType << endl;
649 break;
650 }
651 }
652
653 if ((anyOtherActiveTransaction) && (i >= numTries))
654 {
655 messageqcpp::ByteStream results;
656 messageqcpp::ByteStream::byte status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
657
658 results << status;
659 Message::Args args;
660 args.add(static_cast<uint64_t>(blockingsid.sessionid));
661
662 results << IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args);
663 //@Bug 3854 Log to debug.log
664 LoggingID logid(15, 0, 0);
665 logging::Message::Args args1;
666 logging::Message msg(1);
667 args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args));
668 msg.format( args1 );
669 logging::Logger logger(logid.fSubsysID);
670 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
671
672 ios.write(results);
673
674 ios.close();
675 }
676 else
677 {
678 handler.fIos = ios;
679 handler.fByteStream = bs;
680 handler.fPackageType = packageType;
681 handler.fTxnid = txnid;
682 handler.fDbrm = &dbrm;
683 handler.fQtc = fQtc;
684 handler.fOamCache = oam::OamCache::makeOamCache();
685 fDdlPackagepool.invoke(handler);
686
687 }
688 }
689 else
690 {
691 txnid = dbrm.getTxnID(sessionID);
692
693 if ( !txnid.valid )
694 {
695 txnid = dbrm.newTxnID(sessionID, true, true);
696
697 if (!txnid.valid)
698 {
699 throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
700 }
701 }
702 else
703 {
704 string errorMsg;
705 rc = commitTransaction(txnid.id, errorMsg);
706
707 if ( rc != 0)
708 throw std::runtime_error(errorMsg);
709
710 //need unlock the table.
711 std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
712 bool lockReleased = true;
713
714 for (unsigned k = 0; k < tableLocks.size(); k++)
715 {
716 if (tableLocks[k].ownerTxnID == txnid.id)
717 {
718 lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
719
720 if (!lockReleased)
721 {
722 ostringstream os;
723 os << "tablelock id " << tableLocks[k].id << " is not found";
724 throw std::runtime_error(os.str());
725 }
726 }
727 }
728
729 dbrm.committed(txnid);
730 txnid = dbrm.newTxnID(sessionID, true, true);
731
732 if (!txnid.valid)
733 {
734 throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
735 }
736 }
737
738 handler.fIos = ios;
739 handler.fByteStream = bs;
740 handler.fPackageType = packageType;
741 handler.fTxnid = txnid;
742 handler.fDbrm = &dbrm;
743 handler.fQtc = fQtc;
744 handler.fOamCache = oam::OamCache::makeOamCache();
745 fDdlPackagepool.invoke(handler);
746 }
747 }
748 }
749 catch (exception& ex)
750 {
751 cerr << ex.what() << endl;
752 messageqcpp::ByteStream results;
753 messageqcpp::ByteStream::byte status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
754
755 results << status;
756 results << ex.what();
757 ios.write(results);
758
759 ios.close();
760 }
761 catch (...)
762 {
763 cerr << "Caught unknown exception!" << endl;
764 messageqcpp::ByteStream results;
765 messageqcpp::ByteStream::byte status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
766
767 results << status;
768 results << "Caught unknown exception!";
769 ios.write(results);
770
771 ios.close();
772 }
773
774 // wait for all the threads to exit
775 fDdlPackagepool.wait();
776 }
777
commitTransaction(uint32_t txnID,std::string & errorMsg)778 int DDLProcessor::commitTransaction(uint32_t txnID, std::string& errorMsg)
779 {
780 fWEClient = new WriteEngine::WEClients(WriteEngine::WEClients::DDLPROC);
781 fPMCount = fWEClient->getPmCount();
782 ByteStream bytestream;
783 DBRM dbrm;
784 uint64_t uniqueId = dbrm.getUnique64();
785 fWEClient->addQueue(uniqueId);
786 bytestream << (ByteStream::byte)WE_SVR_COMMIT_VERSION;
787 bytestream << uniqueId;
788 bytestream << txnID;
789 uint32_t msgRecived = 0;
790 fWEClient->write_to_all(bytestream);
791 boost::shared_ptr<messageqcpp::ByteStream> bsIn;
792 bsIn.reset(new ByteStream());
793 int rc = 0;
794 ByteStream::byte tmp8;
795
796 while (1)
797 {
798 if (msgRecived == fPMCount)
799 break;
800
801 fWEClient->read(uniqueId, bsIn);
802
803 if ( bsIn->length() == 0 ) //read error
804 {
805 rc = 1;
806 errorMsg = "DDL cannot communicate with WES";
807 fWEClient->removeQueue(uniqueId);
808 break;
809 }
810 else
811 {
812 *bsIn >> tmp8;
813 rc = tmp8;
814
815 if (rc != 0)
816 {
817 *bsIn >> errorMsg;
818 fWEClient->removeQueue(uniqueId);
819 break;
820 }
821 else
822 msgRecived++;
823 }
824 }
825
826 delete fWEClient;
827 fWEClient = 0;
828 return rc;
829 }
830 } // namespace ddlprocessor
831 // vim:ts=4 sw=4:
832
833