1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /***********************************************************************
19 *   $Id: dmlprocessor.cpp 1024 2013-07-26 16:23:59Z chao $
20 *
21 *
22 ***********************************************************************/
23 /** @file */
24 #include "configcpp.h"
25 #include <signal.h>
26 #include <ctime>
27 
28 //#define      SERIALIZE_DDL_DML_CPIMPORT    1
29 #include <boost/thread/mutex.hpp>
30 #include <boost/scoped_ptr.hpp>
31 #include <boost/scoped_array.hpp>
32 #include <boost/shared_ptr.hpp>
33 using namespace boost;
34 
35 #include "cacheutils.h"
36 #include "vss.h"
37 #include "dbrm.h"
38 #include "brmtypes.h"
39 #include "idberrorinfo.h"
40 #include "errorids.h"
41 #include "batchinsertprocessor.h"
42 #include "tablelockdata.h"
43 #include "oamcache.h"
44 #include "messagelog.h"
45 #include "sqllogger.h"
46 #include "we_messages.h"
47 #include "dmlprocessor.h"
48 using namespace BRM;
49 using namespace config;
50 using namespace execplan;
51 using namespace std;
52 using namespace messageqcpp;
53 using namespace dmlpackage;
54 using namespace dmlpackageprocessor;
55 using namespace joblist;
56 using namespace logging;
57 using namespace oam;
58 using namespace WriteEngine;
59 
60 #include "querytele.h"
61 using namespace querytele;
62 
63 extern boost::mutex mute;
64 extern boost::condition_variable cond;
65 
66 #define MCOL_140 // Undefine to test VSS for out of order transactions
67 
68 namespace
69 {
70 const std::string myname = "DMLProc";
71 }
72 
73 namespace dmlprocessor
74 {
75 // Map to store the package handler objects so we can set flags during execution
76 // for things like ctrl+c
77 DMLProcessor::PackageHandlerMap_t DMLProcessor::packageHandlerMap;
78 boost::mutex DMLProcessor::packageHandlerMapLock;
79 
80 //Map to store the BatchInsertProc object
81 std::map<uint32_t, BatchInsertProc*> DMLProcessor::batchinsertProcessorMap;
82 boost::mutex DMLProcessor::batchinsertProcessorMapLock;
83 
84 // MCOL-140 Map to hold table oids for tables being changed.
85 std::map<uint32_t, PackageHandler::tableAccessQueue_t> PackageHandler::tableOidMap;
86 boost::condition_variable PackageHandler::tableOidCond;
87 boost::mutex PackageHandler::tableOidMutex;
88 
89 //------------------------------------------------------------------------------
90 // A thread to periodically call dbrm to see if a user is
91 // shutting down the system or has put the system into write
92 // suspend mode. DBRM has 2 flags to check in this case, the
93 // ROLLBACK flag, and the FORCE flag. These flags will be
94 // reported when we ask for the Shutdown Pending flag (which we
95 // ignore at this point). Even if the user is putting the system
96 // into write suspend mode, this call will return the flags we
97 // are interested in. If ROLLBACK is set, we cancel normally.
98 // If FORCE is set, we can't rollback.
99 struct CancellationThread
100 {
CancellationThreaddmlprocessor::CancellationThread101     CancellationThread(DBRM* aDbrm, DMLServer& aServer) : fDbrm(aDbrm), fServer(aServer)
102     {}
operator ()dmlprocessor::CancellationThread103     void operator()()
104     {
105         bool bDoingRollback = false;
106         bool bRollback = false;
107         bool bForce = false;
108         ostringstream oss;
109         std::vector<BRM::TableLockInfo> tableLocks;
110         BRM::TxnID txnId;
111         DMLProcessor::PackageHandlerMap_t::iterator phIter;
112         uint32_t sessionID;
113         int rc = 0;
114 
115         while (true)
116         {
117             usleep(1000000);    // 1 seconds
118             // Check to see if someone has ordered a shutdown or suspend with rollback.
119             (void)fDbrm->getSystemShutdownPending(bRollback, bForce);
120 
121             if (bForce)
122                 break;
123 
124             if (bDoingRollback && bRollback)
125             {
126                 continue;
127                 // We've already started the rollbacks. Don't start again.
128             }
129 
130             bDoingRollback = false;
131 
132             if (bRollback)
133             {
134                 RollbackTransactionProcessor rollbackProcessor(fDbrm);
135                 SessionManager sessionManager;
136                 uint64_t uniqueId = fDbrm->getUnique64();
137                 std::string errorMsg;
138                 int activeTransCount = 0;
139                 int idleTransCount = 0;
140                 bDoingRollback = true;
141                 ostringstream oss;
142                 oss << "DMLProc has been told to rollback all DML transactions.";
143                 DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
144                 // Tell any active processors to stop working and return an error
145                 // The front end will respond with a ROLLBACK command.
146                 // Mark all active processors to rollback
147                 boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock);
148 
149                 for (phIter = DMLProcessor::packageHandlerMap.begin();
150                         phIter != DMLProcessor::packageHandlerMap.end();
151                         ++phIter)
152                 {
153                     ostringstream oss;
154                     oss << "DMLProc will rollback active session " << phIter->second->getSessionID() << " Transaction " << phIter->second->getTxnid();
155                     DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
156 
157                     ++activeTransCount;
158                     phIter->second->rollbackPending();
159                 }
160 
161                 if (activeTransCount > 0)
162                 {
163                     ostringstream oss1;
164                     oss1 << "DMLProc is rolling back back " << activeTransCount << " active transactions.";
165                     DMLProcessor::log(oss1.str(), logging::LOG_TYPE_INFO);
166                 }
167 
168                 // Need to set cluster to read-only via CMAPI before shutting the cluster down.
169                 if (fDbrm->isReadWrite())
170                 {
171                     continue;
172                 }
173 
174                 // Check for any open DML transactions that don't currently have
175                 // a processor
176                 tableLocks = fDbrm->getAllTableLocks();
177 
178                 if (tableLocks.size() > 0)
179                 {
180                     for (uint32_t i = 0; i < tableLocks.size(); ++i)
181                     {
182                         sessionID = tableLocks[i].ownerSessionID;
183                         phIter = DMLProcessor::packageHandlerMap.find(sessionID);
184 
185                         if (phIter == DMLProcessor::packageHandlerMap.end())
186                         {
187                             // We have found an active transaction without a packagehandler.
188                             // This means that a transaction is open with autocommit turned
189                             // off, but there's no current activity on the transaction. We
190                             // need to roll it back if it's a DML transaction.
191                             // If ownerName == "DMLProc" then it's a DML transaction.
192                             if (tableLocks[i].ownerName == "DMLProc")
193                             {
194                                 // OK, we know this is an idle DML transaction, so roll it back.
195                                 ++idleTransCount;
196                                 txnId.id = tableLocks[i].ownerTxnID;
197                                 txnId.valid = true;
198                                 rc = rollbackProcessor.rollBackTransaction(uniqueId, txnId, sessionID, errorMsg);
199 
200                                 if ( rc == 0 )
201                                 {
202                                     fDbrm->invalidateUncommittedExtentLBIDs(txnId.id);
203 
204                                     //@Bug 4524. In case it is batchinsert, call bulkrollback.
205                                     rc = rollbackProcessor.rollBackBatchAutoOnTransaction(uniqueId, txnId, sessionID, tableLocks[i].tableOID, errorMsg);
206 
207                                     if (rc == 0)
208                                     {
209                                         logging::logCommand(0, tableLocks[i].ownerTxnID, "ROLLBACK;");
210 
211                                         bool lockReleased = true;
212 
213                                         try
214                                         {
215                                             lockReleased = fDbrm->releaseTableLock(tableLocks[i].id);
216                                             TablelockData::removeTablelockData(sessionID);
217                                         }
218                                         catch (std::exception&)
219                                         {
220                                             throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
221                                         }
222 
223                                         if (lockReleased)
224                                         {
225                                             sessionManager.rolledback(txnId);
226                                             ostringstream oss;
227                                             oss << "DMLProc rolled back idle transaction " << tableLocks[i].ownerTxnID << " and table lock id " << tableLocks[i].id << " is released.";
228                                             DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
229                                         }
230                                         else
231                                         {
232                                             ostringstream oss;
233                                             oss << "DMLProc rolled back idle transaction " << tableLocks[i].ownerTxnID << " and tble lock id " << tableLocks[i].id << " is not released.";
234                                             DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
235                                         }
236                                     }
237                                     else
238                                     {
239                                         ostringstream oss;
240                                         oss << " problem with bulk rollback of idle transaction " << tableLocks[i].ownerTxnID << "and DBRM is setting to readonly and table lock is not released: " << errorMsg;
241                                         DMLProcessor::log(oss.str(), logging::LOG_TYPE_CRITICAL);
242                                         rc = fDbrm->setReadOnly(true);
243                                     }
244                                 }
245                                 else
246                                 {
247                                     ostringstream oss;
248                                     oss << " problem with rollback of idle transaction " << tableLocks[i].ownerTxnID << "and DBRM is setting to readonly and table lock is not released: " << errorMsg;
249                                     DMLProcessor::log(oss.str(), logging::LOG_TYPE_CRITICAL);
250                                     rc = fDbrm->setReadOnly(true);
251                                 }
252                             }
253                         }
254                     }
255                 }
256 
257                 // If there are any abandonded transactions without locks
258                 // release them.
259                 int len;
260                 boost::shared_array<BRM::SIDTIDEntry> activeTxns = sessionManager.SIDTIDMap(len);
261 
262                 for (int i = 0; i < len; i++)
263                 {
264                     // If there isn't a table lock for this transaction, roll it back. Otherwise, assume
265                     // it has an active processor or is not DML initiated and leave it alone. It's someone
266                     // else's concern.
267                     bool bFoundit = false;
268 
269                     for (uint32_t j = 0; j < tableLocks.size(); ++j)
270                     {
271                         if (tableLocks[j].ownerTxnID == activeTxns[i].txnid.id)
272                         {
273                             bFoundit = true;
274                             break;
275                         }
276                     }
277 
278                     if (!bFoundit && activeTxns[i].txnid.valid)
279                     {
280                         rollbackProcessor.rollBackTransaction(uniqueId, activeTxns[i].txnid, activeTxns[i].sessionid, errorMsg);
281                         sessionManager.rolledback(activeTxns[i].txnid);
282                         ++idleTransCount;
283                         ostringstream oss;
284                         oss << "DMLProc rolled back idle transaction with no tablelock" << tableLocks[i].ownerTxnID;
285                         DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
286                     }
287                 }
288 
289                 if (idleTransCount > 0)
290                 {
291                     ostringstream oss2;
292                     oss2 << "DMLProc has rolled back " << idleTransCount << " idle transactions.";
293                     DMLProcessor::log(oss2.str(), logging::LOG_TYPE_INFO);
294                 }
295                 // Here is the end of the rollback if so DMLProc rollbacks what it can.
296                 break;
297             }
298         }
299         // Setting the flag to tell DMLServer to exit.
300         fServer.startShutdown();
301     }
302     DBRM* fDbrm;
303     DMLServer& fServer;
304 };
305 
PackageHandler(const messageqcpp::IOSocket & ios,boost::shared_ptr<messageqcpp::ByteStream> bs,uint8_t packageType,joblist::DistributedEngineComm * ec,bool concurrentSupport,uint64_t maxDeleteRows,uint32_t sessionID,execplan::CalpontSystemCatalog::SCN txnId,DBRM * aDbrm,const QueryTeleClient & qtc,boost::shared_ptr<execplan::CalpontSystemCatalog> csc)306 PackageHandler::PackageHandler(const messageqcpp::IOSocket& ios,
307                                boost::shared_ptr<messageqcpp::ByteStream> bs,
308                                uint8_t packageType,
309                                joblist::DistributedEngineComm* ec,
310                                bool concurrentSupport,
311                                uint64_t maxDeleteRows,
312                                uint32_t sessionID,
313                                execplan::CalpontSystemCatalog::SCN txnId,
314                                DBRM* aDbrm,
315                                const QueryTeleClient& qtc,
316                                boost::shared_ptr<execplan::CalpontSystemCatalog> csc) :
317     fIos(ios),
318     fByteStream(bs),
319     fPackageType(packageType),
320     fEC(ec),
321     fConcurrentSupport(concurrentSupport),
322     fMaxDeleteRows(maxDeleteRows),
323     fSessionID(sessionID),
324     fTableOid(0),
325     fTxnid(txnId),
326     fDbrm(aDbrm),
327     fQtc(qtc),
328     fcsc(csc)
329 {
330 }
331 
~PackageHandler()332 PackageHandler::~PackageHandler()
333 {
334     //cout << "In destructor" << endl;
335 }
336 
337 // MCOL-140
338 // Blocks a thread if there is another trx working on the same fTableOid
339 // return 1 when thread should continue.
340 // return 0 if error. Right now, no error detection is implemented.
341 //
342 // txnid was being created before the call to this function. This caused race conditions
343 // so creation is delayed until we're inside the lock here. Nothing needs it before
344 // this point in the execution.
345 //
346 // The algorithm is this. When the first txn for a given fTableOid arrives, start a queue
347 // containing a list of waiting or working txnId. Put this txnId into the queue (working)
348 // Put the queue into a map keyed on fTableOid.
349 //
350 // When the next txn for this fTableOid arrives, it finds the queue in the map and adds itself,
351 // then waits for condition.
352 // When a thread finishes, it removes its txnId from the queue and notifies all. If the queue is
353 // empty, it removes the entry from the map.
354 // Upon wakeup from wait(), a thread checks to see if it's next in the queue. If so, it is released
355 // to do work. Otherwise it goes back to wait.
356 //
357 // There's a chance (CTRL+C) for instance, that the txn is no longer in the queue. Release it to work.
358 // Rollback will most likely be next.
359 //
360 // A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid.
synchTableAccess(dmlpackage::CalpontDMLPackage * dmlPackage)361 int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage)
362 {
363     // MCOL-140 Wait for any other DML using this table.
364     std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
365     boost::unique_lock<boost::mutex> lock(tableOidMutex);
366     BRM::TxnID txnid;
367 
368     if (fPackageType != dmlpackage::DML_COMMAND)
369     {
370         txnid = sessionManager.getTxnID(fSessionID);
371 
372         if ( !txnid.valid )
373         {
374             txnid = sessionManager.newTxnID(fSessionID, true);
375 
376             if (!txnid.valid)
377             {
378                 throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
379             }
380         }
381     }
382     else
383     {
384         txnid = sessionManager.getTxnID(fSessionID);
385     }
386 
387     fTxnid = txnid.id;
388 
389     if ((it = tableOidMap.find(fTableOid)) != tableOidMap.end())
390     {
391         PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
392 
393         // There's at least one working txn on this table. We may be the same txn.
394         if (fTxnid == tableOidQueue.front())
395         {
396             return 1; // We're next in line or the same as the last. Keep working
397         }
398 
399         tableOidQueue.push(fTxnid);  // Get on the waiting list.
400 
401         // We need to wait
402         // tableOidQueue here is the queue holding the waitng transactions for this fTableOid
403         while (true)
404         {
405             // Log something that we're waiting
406             LoggingID logid(21, fSessionID, fTxnid);
407             logging::Message::Args args1;
408             logging::Message msg(1);
409             ostringstream oss;
410             oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |" << dmlPackage->get_SchemaName() <<"|";
411             args1.add(oss.str());
412             args1.add((uint64_t)fTableOid);
413             msg.format(args1);
414             logging::Logger logger(logid.fSubsysID);
415             logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
416 
417             tableOidCond.wait(lock);
418             // In case of CTRL+C, the tableOidQueue could be invalidated
419             if ((tableOidMap.find(fTableOid))->second != tableOidQueue)
420             {
421                 break;
422             }
423             if (tableOidQueue.front() == fTxnid)
424             {
425                 // We're up next. Let's go do stuff.
426                 break;
427             }
428 
429             if (tableOidQueue.empty())
430             {
431                 // If we had been the last txn waiting and CTRL+C was hit, then the queue is empty now.
432                 // Empty queues must be erased from the map.
433                 tableOidMap.erase(fTableOid);
434                 break;
435             }
436 
437             // If we're not in the queue at all, then continue. CTRL+C was probably hit.
438             PackageHandler::tableAccessQueue_t::container_type::iterator c_it = tableOidQueue.find(fTxnid);
439 
440             if (c_it == tableOidQueue.end())
441             {
442                 break;
443             }
444 
445             // We're still in the queue and not on top. Go back and wait some more.
446         }
447     }
448     else
449     {
450         // We're the first for this tableoid. Start a new queue.
451         tableAccessQueue_t tableOidQueue;
452         tableOidQueue.push(fTxnid);
453         tableOidMap[fTableOid] = tableOidQueue;
454     }
455 
456     return 1;
457 }
458 
459 // MCOL-140 Called when it's time to release the next thread for this tablOid
releaseTableAccess()460 int PackageHandler::releaseTableAccess()
461 {
462     // take us out of the queue
463     std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
464     boost::lock_guard<boost::mutex> lock(tableOidMutex);
465 
466     if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end())
467     {
468         return 2;  // For now, return codes are not used
469     }
470 
471     PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
472 
473     if (tableOidQueue.front() != fTxnid)
474     {
475         // This is a severe error. The front should be the working thread. If we're here,
476         // we're the working thread and should be front().
477         cout << fTxnid << " " << fTableOid << " We got to release and we're not on top " << tableOidQueue.front() << endl;
478         LoggingID logid(21, fSessionID, fTxnid);
479         logging::Message::Args args1;
480         logging::Message msg(1);
481         args1.add("ReleaseTableAccess: Txn being released is not the current txn in the tablOidQueue for tableid");
482         args1.add((uint64_t)fTableOid);
483         msg.format(args1);
484         logging::Logger logger(logid.fSubsysID);
485         logger.logMessage(LOG_TYPE_ERROR, msg, logid);
486     }
487     else
488     {
489         if (!tableOidQueue.empty())
490             tableOidQueue.pop();  // Get off the waiting list.
491 
492         if (tableOidQueue.empty())
493         {
494             // remove the queue from the map.
495             tableOidMap.erase(fTableOid);
496         }
497     }
498 
499     // release the condition
500     tableOidCond.notify_all();
501     return 1;
502 }
503 
forceReleaseTableAccess()504 int PackageHandler::forceReleaseTableAccess()
505 {
506 	// By removing the txnid from the queue, the logic after the wait in
507     // synchTableAccess() will release the thread and clean up if needed.
508     std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
509     boost::lock_guard<boost::mutex> lock(tableOidMutex);
510 
511     if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end())
512     {
513         // This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess
514         return 2;
515     }
516 
517     PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
518     tableOidQueue.erase(fTxnid);
519     if (tableOidQueue.empty())
520     {
521         // remove the queue from the map.
522         tableOidMap.erase(fTableOid);
523     }
524     // release the condition
525     tableOidCond.notify_all();
526     return 1;
527 }
528 
529 //static
530 // Called upon sighup, often because PrimProc crashed. We don't want to leave all the transactions hung,
531 // though some may be because they never returned from PrimProc and will leave the table lock on.
clearTableAccess()532 int PackageHandler::clearTableAccess()
533 {
534     tableOidMap.clear();
535     return 1;
536 }
537 
run()538 void PackageHandler::run()
539 {
540     ResourceManager* frm = ResourceManager::instance();
541     dmlpackageprocessor::DMLPackageProcessor::DMLResult result;
542     result.result = dmlpackageprocessor::DMLPackageProcessor::NO_ERROR;
543     //cout << "PackageHandler handling ";
544     std::string stmt;
545     unsigned DMLLoggingId = 21;
546     oam::OamCache* oamCache = oam::OamCache::makeOamCache();
547     SynchTable synchTable;
548 
549     try
550     {
551         switch ( fPackageType )
552         {
553             case dmlpackage::DML_INSERT:
554             {
555                 // build an InsertDMLPackage from the bytestream
556                 //cout << "an INSERT package" << endl;
557                 dmlpackage::InsertDMLPackage insertPkg;
558                 //boost::shared_ptr<messageqcpp::ByteStream> insertBs (new messageqcpp::ByteStream);
559                 messageqcpp::ByteStream bsSave = *(fByteStream.get());
560                 insertPkg.readMetaData(*(fByteStream.get()));
561 #ifdef MCOL_140
562 
563                 if (fConcurrentSupport)
564                 {
565                     fTableOid = insertPkg.getTableOid();
566 
567                     // Single Insert has no start like bulk does, so insertPkg.getTableOid()
568                     // isn't set. Go get it now.
569                     if (fTableOid == 0)
570                     {
571                         CalpontSystemCatalog::TableName tableName;
572                         tableName.schema =  insertPkg.get_Table()->get_SchemaName();
573                         tableName.table = insertPkg.get_Table()->get_TableName();
574                         CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
575                         fTableOid = roPair.objnum;
576                     }
577                         synchTable.setPackage(this, &insertPkg);  // Blocks if another DML thread is using this fTableOid
578                 }
579 
580 #endif
581                 QueryTeleStats qts;
582                 qts.query_uuid = QueryTeleClient::genUUID();
583                 qts.msg_type = QueryTeleStats::QT_START;
584                 qts.start_time = QueryTeleClient::timeNowms();
585                 qts.session_id = fSessionID;
586                 qts.query_type = "INSERT";
587                 qts.query = insertPkg.get_SQLStatement();
588                 qts.system_name = oamCache->getSystemName();
589                 qts.module_name = oamCache->getModuleName();
590                 qts.schema_name = insertPkg.get_SchemaName();
591                 fQtc.postQueryTele(qts);
592 
593                 //cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl;
594                 if (insertPkg.get_isBatchInsert())
595                 {
596 		    fByteStream->reset();
597                     //cout << "This is batch insert " << endl;
598                     BatchInsertProc* batchProcessor = NULL;
599                     {
600                         boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
601 
602                         std::map<uint32_t, BatchInsertProc*>::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID);
603 
604                         if (batchIter == DMLProcessor::batchinsertProcessorMap.end())
605                         {
606                             batchProcessor = new BatchInsertProc(insertPkg.get_isAutocommitOn(), insertPkg.getTableOid(), fTxnid, fDbrm);
607                             DMLProcessor::batchinsertProcessorMap[fSessionID] = batchProcessor;
608                             //cout << "batchProcessor is created " << batchProcessor << endl;
609                         }
610                         else
611                         {
612                             batchProcessor = batchIter->second;
613                             //cout << "Found batchProcessor " << batchProcessor << endl;
614                         }
615                     }
616 
617                     if ( insertPkg.get_Logging() )
618                     {
619                         LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
620                         logging::Message::Args args1;
621                         logging::Message msg(1);
622                         args1.add("Start SQL statement: ");
623                         ostringstream oss;
624                         oss << insertPkg.get_SQLStatement() << "; |" << insertPkg.get_SchemaName() << "|";
625                         args1.add(oss.str());
626                         msg.format( args1 );
627                         logging::Logger logger(logid.fSubsysID);
628                         logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
629                         TablelockData* tablelockData = TablelockData::makeTablelockData(insertPkg.get_SessionID());
630                         uint64_t tableLockId = tablelockData->getTablelockId(insertPkg.getTableOid());
631 
632                         //cout << "Processing table oid " << insertPkg.getTableOid() << " for transaction "<< (int)fTxnid << endl;
633                         if (tableLockId == 0)
634                         {
635                             //cout << "Grabing tablelock for batchProcessor " << batchProcessor << endl;
636                             tableLockId = batchProcessor->grabTableLock(insertPkg.get_SessionID());
637 
638                             if (tableLockId == 0)
639                             {
640                                 BRM::TxnID brmTxnID;
641                                 brmTxnID.id = fTxnid;
642                                 brmTxnID.valid = true;
643                                 sessionManager.rolledback(brmTxnID);
644                                 string errMsg;
645                                 int rc = 0;
646                                 batchProcessor->getError(rc, errMsg);
647                                 result.result = DMLPackageProcessor::TABLE_LOCK_ERROR;
648                                 logging::Message::Args args;
649                                 logging::Message message(1);
650                                 args.add("Insert Failed: ");
651                                 args.add(errMsg);
652                                 args.add("");
653                                 args.add("");
654                                 message.format(args);
655                                 result.message = message;
656                                 break;
657                             }
658 
659                             if (tableLockId > 0)
660                                 tablelockData->setTablelock(insertPkg.getTableOid(), tableLockId);
661                         }
662                     }
663 
664                     if (insertPkg.get_Logending() && insertPkg.get_Logging()) //only one batch need to be processed.
665                     {
666                         //cout << "dmlprocessor add last pkg" << endl;
667                         //need to add error handling.
668                         batchProcessor->addPkg(bsSave);
669                         batchProcessor->sendFirstBatch();
670                         batchProcessor->receiveOutstandingMsg();
671                         //@Bug 5162. Get the correct error message before the last message.
672                         string errMsg;
673                         int rc = 0;
674                         batchProcessor->getError(rc, errMsg);
675                         batchProcessor->sendlastBatch();
676                         batchProcessor->receiveAllMsg();
677 
678                         if (rc == DMLPackageProcessor::IDBRANGE_WARNING)
679                         {
680                             result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
681                             LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
682                             logging::Message::Args args1;
683                             logging::Message msg(1);
684                             args1.add("End SQL statement with warnings");
685                             msg.format( args1 );
686                             logging::Logger logger(logid.fSubsysID);
687                             logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
688                             logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
689                             logging::Message::Args args;
690                             logging::Message message(1);
691                             args.add(errMsg);
692                             args.add("");
693                             args.add("");
694                             message.format(args);
695                             result.message = message;
696                         }
697                         else if ( rc != 0)
698                         {
699                             result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
700                             logging::Message::Args args;
701                             logging::Message message(1);
702                             cout << "Got error in the end of one batchinsert." << endl;
703                             args.add("Insert Failed: ");
704                             args.add(errMsg);
705                             args.add("");
706                             args.add("");
707                             message.format(args);
708                             result.message = message;
709                             LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
710                             logging::Message::Args args1;
711                             logging::Message msg(1);
712                             args1.add("End SQL statement with error");
713                             msg.format( args1 );
714                             logging::Logger logger(logid.fSubsysID);
715                             logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
716                             logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
717                         }
718                         else
719                         {
720                             //	if (!insertPkg.get_isAutocommitOn())
721                             //	{
722                             //		batchProcessor->setHwm();
723                             //	}
724                             LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
725                             logging::Message::Args args1;
726                             logging::Message msg(1);
727                             args1.add("End SQL statement");
728                             msg.format( args1 );
729                             logging::Logger logger(logid.fSubsysID);
730                             logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
731                             logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
732                         }
733 
734                         //remove the batch insert object
735                         {
736                             boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
737 
738                             std::map<uint32_t, BatchInsertProc*>::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID);
739 
740                             if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
741                             {
742                                 delete batchIter->second;
743                                 DMLProcessor::batchinsertProcessorMap.erase(fSessionID);
744                             }
745                         }
746                     }
747                     else if (insertPkg.get_Logending()) //Last batch
748                     {
749                         int rc = 0;
750                         string errMsg;
751                         batchProcessor->getError(rc, errMsg);
752 
753                         //cout <<"dmlprocessor received last pkg from mysql rc == " << rc << endl;
754                         if (( rc == 0) || (rc == DMLPackageProcessor::IDBRANGE_WARNING))
755                         {
756                             //cout << " rc = " << rc << endl;
757                             batchProcessor->addPkg(bsSave);
758                             batchProcessor->sendNextBatch();
759                             batchProcessor->receiveOutstandingMsg();
760                             //@Bug 5162. Get the correct error message before the last message.
761                             batchProcessor->getError(rc, errMsg);
762                             batchProcessor->sendlastBatch();
763                             batchProcessor->receiveAllMsg();
764 
765                             if (rc == DMLPackageProcessor::IDBRANGE_WARNING)
766                             {
767                                 result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
768                                 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
769                                 logging::Message::Args args1;
770                                 logging::Message msg(1);
771                                 args1.add("End SQL statement with warnings");
772                                 msg.format( args1 );
773                                 logging::Logger logger(logid.fSubsysID);
774                                 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
775                                 logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
776                                 logging::Message::Args args;
777                                 logging::Message message(1);
778                                 args.add(errMsg);
779                                 args.add("");
780                                 args.add("");
781                                 message.format(args);
782                                 result.message = message;
783                             }
784                             else if ( rc != 0)
785                             {
786                                 //cout << "Got error in the end of last batchinsert. error message is " << errMsg << endl;
787                                 result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
788                                 logging::Message::Args args;
789                                 logging::Message message(1);
790                                 args.add("Insert Failed: ");
791                                 args.add(errMsg);
792                                 args.add("");
793                                 args.add("");
794                                 message.format(args);
795                                 result.message = message;
796                                 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
797                                 logging::Message::Args args1;
798                                 logging::Message msg(1);
799                                 args1.add("End SQL statement with error");
800                                 msg.format( args1 );
801                                 logging::Logger logger(logid.fSubsysID);
802                                 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
803                                 logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
804                             }
805                             else
806                             {
807                                 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
808                                 logging::Message::Args args1;
809                                 logging::Message msg(1);
810                                 args1.add("End SQL statement");
811                                 msg.format( args1 );
812                                 logging::Logger logger(logid.fSubsysID);
813                                 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
814                                 logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
815                             }
816 
817                             //cout << "finished batch insert" << endl;
818                         }
819                         else
820                         {
821                             //error occured. Receive all outstanding messages before erroring out.
822                             batchProcessor->receiveOutstandingMsg();
823                             batchProcessor->sendlastBatch(); //needs to flush files
824                             batchProcessor->receiveAllMsg();
825                             result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
826                             //cout << "Got error in the end of batchinsert2. error msg is " << errMsg<< endl;
827                             logging::Message::Args args;
828                             logging::Message message(1);
829                             args.add("Insert Failed: ");
830                             args.add(errMsg);
831                             args.add("");
832                             args.add("");
833                             message.format(args);
834                             result.message = message;
835                             LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
836                             logging::Message::Args args1;
837                             logging::Message msg(1);
838                             args1.add("End SQL statement with error");
839                             msg.format( args1 );
840                             logging::Logger logger(logid.fSubsysID);
841                             logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
842                             logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
843                         }
844 
845                         //remove from map
846                         {
847                             boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
848                             std::map<uint32_t, BatchInsertProc*>::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID);
849 
850                             if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
851                             {
852                                 //cout << "Batchinsertprcessor is deleted. " << batchIter->second << endl;
853                                 delete batchIter->second;
854                                 DMLProcessor::batchinsertProcessorMap.erase(fSessionID);
855                             }
856                         }
857 
858                     }
859                     else
860                     {
861                         int rc = 0;
862                         string errMsg;
863                         batchProcessor->getError(rc, errMsg);
864 
865                         if (rc == DMLPackageProcessor::IDBRANGE_WARNING)
866                         {
867                             result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
868                         }
869                         else if ( rc != 0)
870                         {
871                             result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
872                             //@Bug
873                             //cout << "Got error during batchinsert. with message " << errMsg << endl;
874                             logging::Message::Args args;
875                             logging::Message message(6);
876                             args.add( errMsg );
877                             message.format( args );
878                             result.message = message;
879                             batchProcessor->receiveOutstandingMsg();
880                             batchProcessor->sendlastBatch(); //needs to flush files
881                             //cout << "Last batch is sent to WES." << endl;
882                             batchProcessor->receiveAllMsg();
883                             LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
884                             logging::Message::Args args1;
885                             logging::Message msg(1);
886                             args1.add("End SQL statement with error");
887                             msg.format( args1 );
888                             logging::Logger logger(logid.fSubsysID);
889                             logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
890                             //remove from map
891                             {
892                                 boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
893                                 std::map<uint32_t, BatchInsertProc*>::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID);
894 
895                                 if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
896                                 {
897                                     //cout << "Batchinsertprcessor is deleted. " << batchIter->second << endl;
898                                     delete batchIter->second;
899                                     DMLProcessor::batchinsertProcessorMap.erase(fSessionID);
900                                 }
901                             }
902                             break;
903                         }
904 
905                         batchProcessor->addPkg(bsSave);
906                         batchProcessor->sendNextBatch();
907                         break;
908                     }
909                 }
910                 else  // Single Insert
911                 {
912                     // make sure insertPkg.readMetaData() is called before
913                     // this on fByteStream!
914                     // TODO: Similar to batch inserts, don't
915                     // deserialize the row data here for single inserts.
916                     insertPkg.readRowData(*(fByteStream.get()));
917                     insertPkg.set_TxnID(fTxnid);
918                     fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID()));
919                     result = fProcessor->processPackage(insertPkg);
920                 }
921 
922                 qts.msg_type = QueryTeleStats::QT_SUMMARY;
923                 qts.max_mem_pct = result.stats.fMaxMemPct;
924                 qts.num_files = result.stats.fNumFiles;
925                 qts.phy_io = result.stats.fPhyIO;
926                 qts.cache_io = result.stats.fCacheIO;
927                 qts.msg_rcv_cnt = result.stats.fMsgRcvCnt;
928                 qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped;
929                 qts.msg_bytes_in = result.stats.fMsgBytesIn;
930                 qts.msg_bytes_out = result.stats.fMsgBytesOut;
931                 qts.rows = result.stats.fRows;
932                 qts.end_time = QueryTeleClient::timeNowms();
933                 qts.blocks_changed = result.stats.fBlocksChanged;
934                 fQtc.postQueryTele(qts);
935             }
936             break;
937 
938             case dmlpackage::DML_UPDATE:
939             {
940                 // build an UpdateDMLPackage from the bytestream
941                 //cout << "an UPDATE package" << endl;
942                 boost::scoped_ptr<dmlpackage::UpdateDMLPackage> updatePkg(new dmlpackage::UpdateDMLPackage());
943                 updatePkg->read(*(fByteStream.get()));
944 #ifdef MCOL_140
945 
946                 if (fConcurrentSupport)
947                 {
948                     fTableOid = updatePkg->getTableOid();
949 
950                     // Update generally doesn't set fTableOid in updatePkg. Go get it now.
951                     if (fTableOid == 0)
952                     {
953                         CalpontSystemCatalog::TableName tableName;
954                         tableName.schema =  updatePkg->get_Table()->get_SchemaName();
955                         tableName.table = updatePkg->get_Table()->get_TableName();
956                         CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
957                         fTableOid = roPair.objnum;
958                     }
959                         synchTable.setPackage(this, updatePkg.get());  // Blocks if another DML thread is using this fTableOid
960                 }
961 
962 #endif
963                 updatePkg->set_TxnID(fTxnid);
964                 QueryTeleStats qts;
965                 qts.query_uuid = updatePkg->uuid();
966                 qts.msg_type = QueryTeleStats::QT_START;
967                 qts.start_time = QueryTeleClient::timeNowms();
968                 qts.session_id = fSessionID;
969                 qts.query_type = "UPDATE";
970                 qts.query = updatePkg->get_SQLStatement();
971                 qts.system_name = oamCache->getSystemName();
972                 qts.module_name = oamCache->getModuleName();
973                 qts.schema_name = updatePkg->get_SchemaName();
974                 fQtc.postQueryTele(qts);
975                 // process it
976                 //@Bug 1341. Don't remove calpontsystemcatalog from this
977                 //session to take advantage of cache.
978                 fProcessor.reset(new dmlpackageprocessor::UpdatePackageProcessor(fDbrm, updatePkg->get_SessionID()));
979                 fProcessor->setEngineComm(fEC);
980                 fProcessor->setRM( frm);
981                 idbassert( fTxnid != 0);
982                 result = fProcessor->processPackage(*(updatePkg.get())) ;
983                 qts.msg_type = QueryTeleStats::QT_SUMMARY;
984                 qts.max_mem_pct = result.stats.fMaxMemPct;
985                 qts.num_files = result.stats.fNumFiles;
986                 qts.phy_io = result.stats.fPhyIO;
987                 qts.cache_io = result.stats.fCacheIO;
988                 qts.msg_rcv_cnt = result.stats.fMsgRcvCnt;
989                 qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped;
990                 qts.msg_bytes_in = result.stats.fMsgBytesIn;
991                 qts.msg_bytes_out = result.stats.fMsgBytesOut;
992                 qts.rows = result.stats.fRows;
993                 qts.end_time = QueryTeleClient::timeNowms();
994                 qts.blocks_changed = result.stats.fBlocksChanged;
995                 fQtc.postQueryTele(qts);
996             }
997             break;
998 
999             case dmlpackage::DML_DELETE:
1000             {
1001                 boost::scoped_ptr<dmlpackage::DeleteDMLPackage> deletePkg(new dmlpackage::DeleteDMLPackage());
1002                 deletePkg->read(*(fByteStream.get()));
1003 #ifdef MCOL_140
1004 
1005                 if (fConcurrentSupport)
1006                 {
1007                     fTableOid = deletePkg->getTableOid();
1008 
1009                     // Delete generally doesn't set fTableOid in updatePkg. Go get it now.
1010                     if (fTableOid == 0)
1011                     {
1012                         CalpontSystemCatalog::TableName tableName;
1013                         tableName.schema =  deletePkg->get_Table()->get_SchemaName();
1014                         tableName.table = deletePkg->get_Table()->get_TableName();
1015                         CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
1016                         fTableOid = roPair.objnum;
1017                     }
1018                         synchTable.setPackage(this, deletePkg.get());  // Blocks if another DML thread is using this fTableOid
1019                 }
1020 
1021 #endif
1022                 deletePkg->set_TxnID(fTxnid);
1023                 QueryTeleStats qts;
1024                 qts.query_uuid = deletePkg->uuid();
1025                 qts.msg_type = QueryTeleStats::QT_START;
1026                 qts.start_time = QueryTeleClient::timeNowms();
1027                 qts.session_id = fSessionID;
1028                 qts.query_type = "DELETE";
1029                 qts.query = deletePkg->get_SQLStatement();
1030                 qts.system_name = oamCache->getSystemName();
1031                 qts.module_name = oamCache->getModuleName();
1032                 qts.schema_name = deletePkg->get_SchemaName();
1033                 fQtc.postQueryTele(qts);
1034                 // process it
1035                 //@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache.
1036                 fProcessor.reset(new dmlpackageprocessor::DeletePackageProcessor(fDbrm, deletePkg->get_SessionID()));
1037                 fProcessor->setEngineComm(fEC);
1038                 fProcessor->setRM( frm);
1039                 idbassert( fTxnid != 0);
1040                 result = fProcessor->processPackage(*(deletePkg.get())) ;
1041                 qts.msg_type = QueryTeleStats::QT_SUMMARY;
1042                 qts.max_mem_pct = result.stats.fMaxMemPct;
1043                 qts.num_files = result.stats.fNumFiles;
1044                 qts.phy_io = result.stats.fPhyIO;
1045                 qts.cache_io = result.stats.fCacheIO;
1046                 qts.msg_rcv_cnt = result.stats.fMsgRcvCnt;
1047                 qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped;
1048                 qts.msg_bytes_in = result.stats.fMsgBytesIn;
1049                 qts.msg_bytes_out = result.stats.fMsgBytesOut;
1050                 qts.rows = result.stats.fRows;
1051                 qts.end_time = QueryTeleClient::timeNowms();
1052                 qts.blocks_changed = result.stats.fBlocksChanged;
1053                 fQtc.postQueryTele(qts);
1054             }
1055             break;
1056 
1057             case dmlpackage::DML_COMMAND:
1058             {
1059                 // build a CommandDMLPackage from the bytestream
1060                 //cout << "a COMMAND package" << endl;
1061                 dmlpackage::CommandDMLPackage commandPkg;
1062                 commandPkg.read(*(fByteStream.get()));
1063                 stmt = commandPkg.get_DMLStatement();
1064                 boost::algorithm::to_upper(stmt);
1065                 trim(stmt);
1066 
1067                 if (stmt == "CLEANUP")
1068                 {
1069                     execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(commandPkg.get_SessionID());
1070                     execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(commandPkg.get_SessionID() | 0x80000000);
1071                 }
1072                 else
1073                 {
1074                     // process it
1075                     //@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache.
1076                     fProcessor.reset(new dmlpackageprocessor::CommandPackageProcessor(fDbrm, commandPkg.get_SessionID()));
1077 
1078                     //cout << "got command " << stmt << " for session " << commandPkg.get_SessionID() << endl;
1079                     result = fProcessor->processPackage(commandPkg);
1080                 }
1081             }
1082             break;
1083         }
1084 
1085 
1086 
1087 
1088         //Log errors
1089         if (   (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR)
1090                 && (result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
1091                 && (result.result != dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR)
1092                 && (result.result != dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR) )
1093         {
1094             logging::LoggingID lid(21);
1095             logging::MessageLog ml(lid);
1096 
1097             ml.logErrorMessage( result.message );
1098         }
1099         else if (result.result == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
1100         {
1101             logging::LoggingID lid(21);
1102             logging::MessageLog ml(lid);
1103 
1104             ml.logWarningMessage( result.message );
1105         }
1106     }
1107     catch (std::exception& e)
1108     {
1109 
1110 
1111         cout << "dmlprocessor.cpp PackageHandler::run() package type("
1112              << fPackageType << ") exception: " << e.what() << endl;
1113         logging::LoggingID lid(21);
1114         logging::MessageLog ml(lid);
1115         logging::Message::Args args;
1116         logging::Message message(1);
1117         args.add("dmlprocessor.cpp PackageHandler::run() package type");
1118         args.add((uint64_t)fPackageType);
1119         args.add(e.what());
1120         message.format(args);
1121         ml.logErrorMessage(message);
1122         result.result = DMLPackageProcessor::COMMAND_ERROR;
1123         result.message = message;
1124     }
1125     catch (...)
1126     {
1127 
1128 
1129         logging::LoggingID lid(21);
1130         logging::MessageLog ml(lid);
1131         logging::Message::Args args;
1132         logging::Message message(1);
1133         args.add("dmlprocessor.cpp PackageHandler::run() ... exception package type");
1134         args.add((uint64_t)fPackageType);
1135         message.format(args);
1136         ml.logErrorMessage(message);
1137         result.result = DMLPackageProcessor::COMMAND_ERROR;
1138         result.message = message;
1139     }
1140 
1141     // We put the packageHandler into a map so that if we receive a
1142     // message to affect the previous command, we can find it.
1143     // We need to remove it from the list before sending the response back.
1144     // If we remove it after sending the results, it's possible for a commit
1145     // or rollback be sent and get processed before it is removed, and that
1146     // will fail.
1147     boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock);
1148     DMLProcessor::packageHandlerMap.erase(getSessionID());
1149     lk2.unlock();
1150 
1151     // send back the results
1152     messageqcpp::ByteStream results;
1153     messageqcpp::ByteStream::octbyte rowCount = result.rowCount;
1154     messageqcpp::ByteStream::byte retval = result.result;
1155     results << retval;
1156     results << rowCount;
1157     results << result.message.msg();
1158     results << result.tableLockInfo; // ? connector does not get
1159     // query stats
1160     results << result.queryStats;
1161     results << result.extendedStats;
1162     results << result.miniStats;
1163     result.stats.serialize(results);
1164     fIos.write(results);
1165     //Bug 5226. dmlprocessor thread will close the socket to mysqld.
1166     //if (stmt == "CLEANUP")
1167     //	fIos.close();
1168 }
1169 
rollbackPending()1170 void PackageHandler::rollbackPending()
1171 {
1172 	if (fProcessor.get() == NULL)
1173 	{
1174 		// This happens when batch insert
1175 		return;
1176 	}
1177 
1178 	fProcessor->setRollbackPending(true);
1179 
1180     // Force a release of the processing from MCOL-140
1181 #ifdef MCOL_140
1182     if (fConcurrentSupport)
1183     {
1184         // MCOL-140 We're not necessarily the next in line.
1185         // This forces this thread to be released anyway.
1186         forceReleaseTableAccess();
1187     }
1188 
1189 #endif
1190 
1191     ostringstream oss;
1192     oss << "PackageHandler::rollbackPending: Processing DMLPackage.";
1193     DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
1194 }
1195 
added_a_pm(int)1196 void added_a_pm(int)
1197 {
1198     DistributedEngineComm* dec;
1199     ResourceManager* rm = ResourceManager::instance();
1200     dec = DistributedEngineComm::instance(rm);
1201     dec->Setup();
1202     // MCOL-140 clear the waiting queue as all transactions are probably going to fail
1203     PackageHandler::clearTableAccess();
1204 }
1205 
DMLServer(int packageMaxThreads,int packageWorkQueueSize,DBRM * dbrm)1206 DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm) :
1207     fPackageMaxThreads(packageMaxThreads), fPackageWorkQueueSize(packageWorkQueueSize), fDbrm(dbrm), fShutdownFlag(false)
1208 {
1209     fMqServer.reset(new MessageQueueServer("DMLProc"));
1210 
1211     fDmlPackagepool.setMaxThreads(fPackageMaxThreads);
1212     fDmlPackagepool.setQueueSize(fPackageWorkQueueSize);
1213     fDmlPackagepool.setName("DmlPackagepool");
1214 }
1215 
start()1216 int DMLServer::start()
1217 {
1218     messageqcpp::IOSocket ios;
1219     uint32_t nextID = 1;
1220 
1221     try
1222     {
1223         // CancellationThread is for telling all active transactions
1224         // to quit working because the system is either going down
1225         // or going into write suspend mode
1226         CancellationThread cancelObject(fDbrm, *this);
1227         boost::thread cancelThread(cancelObject);
1228 
1229         cout << "DMLProc is ready..." << endl;
1230 
1231         const static struct timespec timeout = {1, 100}; // roughly 1 second TO
1232         for (;;)
1233         {
1234             ios = fMqServer->accept(&timeout);
1235             // MCS polls in a loop watching for a pending shutdown
1236             // that is signalled via fShutdownFlag set in a
1237             // CancellationThread. CT sets the flag if a cluster state
1238             // has SS_SHUTDOWNPENDING value set.
1239             while (!ios.hasSocketDescriptor() && !pendingShutdown())
1240                 ios = fMqServer->accept(&timeout);
1241 
1242             if (pendingShutdown())
1243                 break;
1244             ios.setSockID(nextID++);
1245             fDmlPackagepool.invoke(DMLProcessor(ios, fDbrm));
1246         }
1247 
1248         cancelThread.join();
1249         return EXIT_SUCCESS;
1250     }
1251     catch (std::exception& ex)
1252     {
1253         cerr << ex.what() << endl;
1254         logging::LoggingID lid(21);
1255         Message::Args args;
1256         Message message(8);
1257         args.add("DMLProc init caught exception: ");
1258         args.add(ex.what());
1259         message.format(args);
1260         logging::Logger logger(lid.fSubsysID);
1261         logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid);
1262         return EXIT_FAILURE;
1263     }
1264     catch (...)
1265     {
1266         cerr << "Caught unknown exception!" << endl;
1267         logging::LoggingID lid(21);
1268         Message::Args args;
1269         Message message(8);
1270         args.add("DMLProc init caught unknown exception");
1271         message.format(args);
1272         logging::Logger logger(lid.fSubsysID);
1273         logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid);
1274         return EXIT_FAILURE;
1275     }
1276 }
1277 
DMLProcessor(messageqcpp::IOSocket ios,BRM::DBRM * aDbrm)1278 DMLProcessor::DMLProcessor(messageqcpp::IOSocket ios, BRM::DBRM* aDbrm) :
1279     fIos(ios), fDbrm(aDbrm)
1280 {
1281     csc = CalpontSystemCatalog::makeCalpontSystemCatalog();
1282     csc->identity(CalpontSystemCatalog::EC);
1283     string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));
1284 
1285     if (!teleServerHost.empty())
1286     {
1287         int teleServerPort = config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port"));
1288 
1289         if (teleServerPort > 0)
1290         {
1291             fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort));
1292         }
1293     }
1294 }
1295 
operator ()()1296 void DMLProcessor::operator()()
1297 {
1298     bool bIsDbrmUp = true;
1299 
1300     try
1301     {
1302         boost::shared_ptr<messageqcpp::ByteStream> bs1	(new messageqcpp::ByteStream());
1303         //messageqcpp::ByteStream bs;
1304         uint8_t packageType;
1305 
1306         ResourceManager* rm = ResourceManager::instance();
1307         DistributedEngineComm* fEC = DistributedEngineComm::instance(rm);
1308 
1309         uint64_t maxDeleteRows = rm->getDMLMaxDeleteRows();
1310 
1311         fConcurrentSupport = true;
1312         string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions");
1313 
1314         if ( concurrentTranStr.length() != 0 )
1315         {
1316             if ((concurrentTranStr.compare("N") == 0) || (concurrentTranStr.compare("n") == 0))
1317                 fConcurrentSupport = false;
1318         }
1319 
1320 #ifndef _MSC_VER
1321         struct sigaction ign;
1322         memset(&ign, 0, sizeof(ign));
1323         ign.sa_handler = added_a_pm;
1324         sigaction(SIGHUP, &ign, 0);
1325 #endif
1326         fEC->Open();
1327 
1328         for (;;)
1329         {
1330             //cout << "DMLProc is waiting for a Calpont DML Package on " << fIos.getSockID() << endl;
1331             try
1332             {
1333                 bs1.reset(new messageqcpp::ByteStream(fIos.read()));
1334                 //cout << "received from mysql socket " << fIos.getSockID() << endl;
1335             }
1336             catch (std::exception& ex)
1337             {
1338                 //This is an I/O error from InetStreamSocket::read(), just close and move on...
1339                 cout << "runtime error during read on " << fIos.getSockID() << " " << ex.what() << endl;
1340                 bs1->reset();
1341             }
1342             catch (...)
1343             {
1344                 cout << "... error during read " << fIos.getSockID() << endl;
1345                 // all this throw does is cause this thread to silently go away. I doubt this is the right
1346                 //  thing to do...
1347                 throw;
1348             }
1349 
1350             if (!bs1 || bs1->length() == 0)
1351             {
1352                 cout << "Read 0 bytes. Closing connection " << fIos.getSockID() << endl;
1353                 fIos.close();
1354                 break;
1355             }
1356 
1357             uint32_t sessionID;
1358             *bs1 >> sessionID;
1359             *bs1 >> packageType;
1360 //cout << "DMLProc received pkg. sessionid:type = " << sessionID <<":"<<(int)packageType << endl;
1361             uint32_t stateFlags;
1362             messageqcpp::ByteStream::byte status = 255;
1363             messageqcpp::ByteStream::octbyte rowCount = 0;
1364 
1365             if (fDbrm->getSystemState(stateFlags) > 0)		// > 0 implies succesful retrieval. It doesn't imply anything about the contents
1366             {
1367                 messageqcpp::ByteStream results;
1368                 const char* responseMsg = 0;
1369                 bool bReject = false;
1370 
1371                 // Check to see if we're in write suspended mode
1372                 // If so, we can't process the request.
1373                 if (stateFlags & SessionManagerServer::SS_SUSPENDED)
1374                 {
1375                     status =  DMLPackageProcessor::NOT_ACCEPTING_PACKAGES;
1376                     responseMsg = "Writing to the database is disabled.";
1377                     bReject = true;
1378                 }
1379 
1380                 // Check to see if we're in write suspend or shutdown pending mode
1381                 if (packageType != dmlpackage::DML_COMMAND) // Not a commit or rollback
1382                 {
1383                     if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING
1384                             || stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING)
1385                     {
1386                         if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING)
1387                         {
1388                             responseMsg = "Writing to the database is disabled.";
1389                         }
1390                         else
1391                         {
1392                             responseMsg = "The database is being shut down.";
1393                         }
1394 
1395                         // Refuse all non active tranasactions
1396                         // Check the rollback flag
1397                         // -- Set: Rollback active transactions.
1398                         // -- Not set: Allow active transactions.
1399                         if (sessionManager.isTransactionActive(sessionID, bIsDbrmUp))
1400                         {
1401                             if (stateFlags & SessionManagerServer::SS_ROLLBACK)
1402                             {
1403                                 status =  DMLPackageProcessor::JOB_CANCELED;
1404                                 bReject = true;
1405                             }
1406                         }
1407                         else
1408                         {
1409                             status =  DMLPackageProcessor::NOT_ACCEPTING_PACKAGES;
1410                             bReject = true;
1411                         }
1412                     }
1413 
1414                     if (bReject)
1415                     {
1416                         // For batch insert, we need to send a lastpkg message
1417                         // to batchInsertProcessor so it can clean things up.
1418                         if (packageType == dmlpackage::DML_INSERT)
1419                         {
1420                             // build an InsertDMLPackage from the bytestream
1421                             // We need the flags from the package to know what
1422                             // type of package we're dealing with before we can
1423                             // take special action for the last package of a
1424                             // batch insert.
1425                             dmlpackage::InsertDMLPackage insertPkg;
1426                             messageqcpp::ByteStream bsSave = *(bs1.get());
1427                             insertPkg.read(*(bs1.get()));
1428                             BatchInsertProc* batchInsertProcessor = NULL;
1429 
1430                             if (insertPkg.get_isBatchInsert() && insertPkg.get_Logending())
1431                             {
1432                                 {
1433                                     boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
1434                                     std::map<uint32_t, BatchInsertProc*>::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(sessionID);
1435 
1436                                     if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) //The first batch, no need to do anything
1437                                     {
1438 
1439                                         batchInsertProcessor = batchIter->second;
1440                                         batchInsertProcessor->addPkg(bsSave);
1441 
1442                                         batchInsertProcessor->sendlastBatch();
1443                                         batchInsertProcessor->receiveAllMsg();
1444 
1445 
1446                                         if (!insertPkg.get_isAutocommitOn())
1447                                         {
1448                                             batchInsertProcessor->setHwm();
1449                                         }
1450 
1451                                         batchIter = DMLProcessor::batchinsertProcessorMap.find(sessionID);
1452 
1453                                         if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
1454                                         {
1455                                             DMLProcessor::batchinsertProcessorMap.erase(sessionID);
1456                                         }
1457                                     }
1458                                 }
1459                             }
1460                         }
1461 
1462                         results << status;
1463                         results << rowCount;
1464                         logging::Message::Args args;
1465                         logging::Message message(2);
1466                         args.add(responseMsg);
1467                         message.format( args );
1468                         results << message.msg();
1469                         fIos.write(results);
1470                         continue;
1471                     }
1472                 }
1473             }
1474 
1475             // This section is to check to see if the user hit CTRL+C while the
1476             // DML was processing If so, the sessionID will be found in
1477             // packageHandlerMap and we can set rollbackPending in the
1478             // associated packageHandler. Other than CTRL+C, we should never
1479             // find our own sessionID in the map.
1480             // This mechanism may prove useful for other things, so the above
1481             // comment may change.
1482             {
1483                 boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock);
1484                 DMLProcessor::PackageHandlerMap_t::iterator phIter = packageHandlerMap.find(sessionID);
1485 
1486                 if (phIter != packageHandlerMap.end())
1487                 {
1488                     if (packageType == dmlpackage::DML_COMMAND)
1489                     {
1490                         // MCOL-66 It's possible for a commit or rollback to get here if
1491                         // the timing is just right. Don't destroy its data
1492                         messageqcpp::ByteStream bsctrlc(bs1);
1493                         dmlpackage::CommandDMLPackage commandPkg;
1494                         commandPkg.read(bsctrlc);
1495                         std::string stmt = commandPkg.get_DMLStatement();
1496                         boost::algorithm::to_upper(stmt);
1497                         trim(stmt);
1498 
1499                         if (stmt == "CTRL+C")
1500                         {
1501                             phIter->second->rollbackPending();
1502                             fIos.close();
1503                             break;
1504                         }
1505                     }
1506                     else
1507                     {
1508                         // If there's a PackageHandler already working for this
1509                         // sessionID, we have a problem. Reject this package
1510                         messageqcpp::ByteStream results;
1511                         ostringstream oss;
1512                         oss << "Received a DML command for session " << sessionID <<
1513                             " while still processing a command for the same sessionID";
1514                         results << static_cast<messageqcpp::ByteStream::byte>(DMLPackageProcessor::DEAD_LOCK_ERROR);
1515                         results << static_cast<messageqcpp::ByteStream::octbyte>(0);	// rowcount
1516                         logging::Message::Args args;
1517                         logging::Message message(2);
1518                         args.add(oss.str());
1519                         message.format( args );
1520                         logging::LoggingID lid(20);
1521                         logging::MessageLog ml(lid);
1522                         ml.logErrorMessage(message);
1523                         results << message.msg();
1524                         fIos.write(results);
1525                         continue;
1526                     }
1527                 }
1528             }
1529 
1530             //cout << "   got a ";
1531             switch (packageType)
1532             {
1533                 case dmlpackage::DML_INSERT:
1534                     //cout << "DML_INSERT";
1535                     break;
1536 
1537                 case dmlpackage::DML_UPDATE:
1538                     //cout << "DML_UPDATE";
1539                     break;
1540 
1541                 case dmlpackage::DML_DELETE:
1542                     //cout << "DML_DELETE";
1543                     break;
1544 
1545                 case dmlpackage::DML_COMMAND:
1546                     //cout << "DML_COMMAND";
1547                     break;
1548 
1549                 case dmlpackage::DML_INVALID_TYPE:
1550                     //cout << "DML_INVALID_TYPE";
1551                     break;
1552 
1553                 default:
1554                     //cout << "UNKNOWN";
1555                     break;
1556             }
1557 
1558             //cout << " package" << endl;
1559 
1560             BRM::TxnID txnid;
1561 
1562             if (!fConcurrentSupport)
1563             {
1564                 //Check if any other active transaction
1565                 bool anyOtherActiveTransaction = true;
1566                 BRM::SIDTIDEntry blockingsid;
1567 
1568                 //For logout commit trigger
1569                 if ( packageType == dmlpackage::DML_COMMAND )
1570                 {
1571                     anyOtherActiveTransaction = false;
1572                 }
1573 
1574                 int i = 0;
1575                 int waitPeriod = 10;
1576                 //@Bug 2487 Check transaction map every 1/10 second
1577 
1578                 int sleepTime = 100; // sleep 100 milliseconds between checks
1579                 int numTries = 10;  // try 10 times per second
1580 
1581 
1582                 string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
1583 
1584                 if ( waitPeriodStr.length() != 0 )
1585                     waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));
1586 
1587                 numTries = 	waitPeriod * 10;
1588                 struct timespec rm_ts;
1589 
1590                 rm_ts.tv_sec = sleepTime / 1000;
1591                 rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
1592 
1593                 //cout << "starting i = " << i << endl;
1594                 //txnid = sessionManager.getTxnID(sessionID);
1595                 while (anyOtherActiveTransaction)
1596                 {
1597                     anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
1598                                                 blockingsid );
1599 
1600                     //cout << "session " << sessionID << " with package type " << (int)packageType << " got anyOtherActiveTransaction " << anyOtherActiveTransaction << endl;
1601                     if (anyOtherActiveTransaction)
1602                     {
1603                         for ( ; i < numTries; i++ )
1604                         {
1605 #ifdef _MSC_VER
1606                             Sleep(rm_ts.tv_sec * 1000);
1607 #else
1608                             struct timespec abs_ts;
1609 
1610                             //cout << "session " << sessionID << " nanosleep on package type " << (int)packageType << endl;
1611                             do
1612                             {
1613                                 abs_ts.tv_sec = rm_ts.tv_sec;
1614                                 abs_ts.tv_nsec = rm_ts.tv_nsec;
1615                             }
1616                             while (nanosleep(&abs_ts, &rm_ts) < 0);
1617 
1618 #endif
1619                             anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
1620                                                         blockingsid );
1621 
1622                             if ( !anyOtherActiveTransaction )
1623                             {
1624                                 txnid = sessionManager.getTxnID(sessionID);
1625 
1626                                 //cout << "Ready to process type " << (int)packageType << " with txd " << txnid << endl;
1627                                 if ( !txnid.valid )
1628                                 {
1629                                     txnid = sessionManager.newTxnID(sessionID, true);
1630 
1631                                     if (txnid.valid)
1632                                     {
1633                                         //cout << "Ready to process type " << (int)packageType << " for session "<< sessionID << " with new txnid " << txnid.id << endl;
1634                                         anyOtherActiveTransaction = false;
1635                                         break;
1636                                     }
1637                                     else
1638                                     {
1639                                         anyOtherActiveTransaction = true;
1640                                     }
1641                                 }
1642                                 else
1643                                 {
1644                                     anyOtherActiveTransaction = false;
1645                                     //cout << "already have transaction to process type " << (int)packageType << " for session "<< sessionID <<" with existing txnid " << txnid.id << endl;
1646                                     break;
1647                                 }
1648                             }
1649                         }
1650 
1651                         //cout << "ending i = " << i << endl;
1652                     }
1653                     else
1654                     {
1655                         //cout << "Ready to process type " << (int)packageType << endl;
1656                         txnid = sessionManager.getTxnID(sessionID);
1657 
1658                         if ( !txnid.valid )
1659                         {
1660                             txnid = sessionManager.newTxnID(sessionID, true);
1661 
1662                             if (txnid.valid)
1663                             {
1664                                 //cout << "later Ready to process type " << (int)packageType << " for session "<< sessionID << " with new txnid " << txnid.id << endl;
1665                                 anyOtherActiveTransaction = false;
1666                             }
1667                             else
1668                             {
1669                                 anyOtherActiveTransaction = true;
1670                                 //cout << "Cannot get txnid for  process type " << (int)packageType << " for session "<< sessionID << endl;
1671                             }
1672                         }
1673                         else
1674                         {
1675                             anyOtherActiveTransaction = false;
1676                             //cout << "already have transaction to process type " << (int)packageType << " for session "<< sessionID <<" with txnid " << txnid.id << endl;
1677                             break;
1678                         }
1679                     }
1680 
1681                     if ((anyOtherActiveTransaction) && (i >= numTries))
1682                     {
1683                         //cout << " Erroring out on package type " << (int)packageType << " for session " << sessionID << endl;
1684                         break;
1685                     }
1686                 }
1687 
1688                 if (anyOtherActiveTransaction && (i >= numTries))
1689                 {
1690                     //cout << " again Erroring out on package type " << (int)packageType << endl;
1691                     messageqcpp::ByteStream results;
1692                     //@Bug 2681 set error code for active transaction
1693                     status =  DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR;
1694                     rowCount = 0;
1695                     results << status;
1696                     results << rowCount;
1697                     Message::Args args;
1698                     args.add(static_cast<uint64_t>(blockingsid.sessionid));
1699                     results << IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args);
1700                     //@Bug 3854 Log to debug.log
1701                     LoggingID logid(20, 0, 0);
1702                     logging::Message::Args args1;
1703                     logging::Message msg(1);
1704                     args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args));
1705                     msg.format( args1 );
1706                     logging::Logger logger(logid.fSubsysID);
1707                     logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
1708 
1709                     fIos.write(results);
1710                 }
1711                 else
1712                 {
1713                     //cout << "starting processing package type " << (int) packageType << " for session " << sessionID << " with id " << txnid.id << endl;
1714                     boost::shared_ptr<PackageHandler> php(new PackageHandler(fIos, bs1, packageType, fEC,
1715                                                           fConcurrentSupport, maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc, csc));
1716                     // We put the packageHandler into a map so that if we receive a
1717                     // message to affect the previous command, we can find it.
1718                     boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock);
1719 
1720                     lk2.lock();
1721                     packageHandlerMap[sessionID] = php;
1722                     lk2.unlock();
1723 
1724                     php->run();		// Operates in this thread.
1725 
1726 // Move this to the end of PackageHandler so it is removed from the map before the response is sent
1727 //					lk2.lock();
1728 //					packageHandlerMap.erase(sessionID);
1729 //					lk2.unlock();
1730                 }
1731             }
1732             else
1733             {
1734 #if 0
1735 
1736                 if (packageType != dmlpackage::DML_COMMAND)
1737                 {
1738                     txnid = sessionManager.getTxnID(sessionID);
1739 
1740                     if ( !txnid.valid )
1741                     {
1742                         txnid = sessionManager.newTxnID(sessionID, true);
1743 
1744                         if (!txnid.valid)
1745                         {
1746                             throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
1747                         }
1748                     }
1749                 }
1750                 else
1751                 {
1752                     txnid = sessionManager.getTxnID(sessionID);
1753                 }
1754 
1755 #endif
1756                 boost::shared_ptr<PackageHandler> php(new PackageHandler(fIos, bs1, packageType, fEC,
1757                                                       fConcurrentSupport, maxDeleteRows, sessionID, 0, fDbrm, fQtc, csc));
1758                 // We put the packageHandler into a map so that if we receive a
1759                 // message to affect the previous command, we can find it.
1760                 boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock);
1761 
1762                 lk2.lock();
1763                 packageHandlerMap[sessionID] = php;
1764                 lk2.unlock();
1765 
1766                 php->run();		// Operates in this thread.
1767 
1768 // Move this to the end of PackageHandler so it is removed from the map before the response is sent
1769 //				lk2.lock();
1770 //				packageHandlerMap.erase(sessionID);
1771 //				lk2.unlock();
1772             }
1773         }
1774     }
1775     catch (std::exception& ex)
1776     {
1777         ostringstream oss;
1778         oss << "DMLProcessor failed on: " << ex.what();
1779         DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
1780         fIos.close();
1781     }
1782     catch (...)
1783     {
1784         ostringstream oss;
1785         oss << "DMLProcessor failed on: processing DMLPackage.";
1786         DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
1787         cerr << "Caught unknown exception! " << oss.str();
1788         fIos.close();
1789     }
1790 }
1791 
processBulkRollback(BRM::TableLockInfo lockInfo,BRM::DBRM * dbrm,uint64_t uniqueId,OamCache::dbRootPMMap_t & dbRootPMMap,bool & lockReleased)1792 void RollbackTransactionProcessor::processBulkRollback (BRM::TableLockInfo lockInfo, BRM::DBRM* dbrm, uint64_t uniqueId,
1793         OamCache::dbRootPMMap_t& dbRootPMMap, bool& lockReleased)
1794 {
1795     // Take over ownership of stale lock.
1796     // Use "DMLProc" as process name, session id and transaction id -1 to distinguish from real DMLProc rollback
1797     int32_t sessionID = -1;
1798     int32_t txnid     = -1;
1799     std::string processName("DMLProc");
1800     uint32_t processID = ::getpid();
1801     bool ownerChanged = true;
1802     lockReleased = true;
1803 
1804     try
1805     {
1806         ownerChanged = dbrm->changeOwner(lockInfo.id, processName, processID, sessionID, txnid);
1807     }
1808     catch (std::exception&)
1809     {
1810         lockReleased = false;
1811         throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
1812     }
1813 
1814     if (!ownerChanged)
1815     {
1816         lockReleased = false;
1817         throw std::runtime_error( std::string("Unable to grab lock; lock not found or still in use.") );
1818     }
1819 
1820     //send to all PMs
1821     boost::shared_ptr<messageqcpp::ByteStream> bsIn;
1822     messageqcpp::ByteStream                    bsOut;
1823     string tableName("");
1824     fWEClient->addQueue(uniqueId);
1825     //find the PMs need to send the message to
1826     std::set<int> pmSet;
1827     int pmId;
1828 
1829     for (uint32_t i = 0; i < lockInfo.dbrootList.size(); i++)
1830     {
1831         pmId = (*dbRootPMMap)[lockInfo.dbrootList[i]];
1832         pmSet.insert(pmId);
1833     }
1834 
1835     if (lockInfo.state == BRM::LOADING)
1836     {
1837         bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK;
1838         bsOut << uniqueId;
1839         bsOut << lockInfo.id;
1840         bsOut << lockInfo.tableOID;
1841         bsOut << tableName;
1842         bsOut << processName;
1843         std::set<int>::const_iterator iter = pmSet.begin();
1844 
1845         while (iter != pmSet.end())
1846         {
1847             fWEClient->write(bsOut, *iter);
1848             iter++;
1849         }
1850 
1851         // Wait for "all" the responses, and accumulate any/all errors
1852         unsigned int pmMsgCnt = 0;
1853 
1854         while (pmMsgCnt < pmSet.size())
1855         {
1856             std::string rollbackErrMsg;
1857             bsIn.reset(new messageqcpp::ByteStream());
1858             fWEClient->read(uniqueId, bsIn);
1859 
1860             if (bsIn->length() == 0)
1861             {
1862                 fWEClient->removeQueue(uniqueId);
1863                 lockReleased = false;
1864                 throw  std::runtime_error("Network error, PM rollback; ");
1865             }
1866             else
1867             {
1868                 messageqcpp::ByteStream::byte rc;
1869                 uint16_t pmNum;
1870                 *bsIn >> rc;
1871                 *bsIn >> rollbackErrMsg;
1872                 *bsIn >> pmNum;
1873 
1874                 if (rc != 0)
1875                 {
1876                     fWEClient->removeQueue(uniqueId);
1877                     lockReleased = false;
1878                     throw  std::runtime_error(rollbackErrMsg);
1879                 }
1880             }
1881 
1882             pmMsgCnt++;
1883         } // end of while loop to process all responses to bulk rollback
1884 
1885         // If no errors so far, then change state to CLEANUP state.
1886         // We ignore the return stateChange flag.
1887         dbrm->changeState( lockInfo.id, BRM::CLEANUP );
1888     } // end of (lockInfo.state == BRM::LOADING)
1889 
1890     //delete meta data backup rollback files
1891     bsOut.reset();
1892 
1893     bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK_CLEANUP;
1894     bsOut << uniqueId;
1895     bsOut << lockInfo.tableOID;
1896     std::set<int>::const_iterator iter = pmSet.begin();
1897 
1898     while (iter != pmSet.end())
1899     {
1900         fWEClient->write(bsOut, *iter);
1901         iter++;
1902     }
1903 
1904     // Wait for "all" the responses, and accumulate any/all errors
1905     unsigned int pmMsgCnt = 0;
1906     //@Bug 4517 Release tablelock when it is in CLEANUP state
1907     uint32_t rcCleanup = 0;
1908     std::string fileDeleteErrMsg;
1909 
1910     while (pmMsgCnt < pmSet.size())
1911     {
1912         bsIn.reset(new messageqcpp::ByteStream());
1913         fWEClient->read(uniqueId, bsIn);
1914 
1915         if (bsIn->length() == 0)
1916         {
1917             fWEClient->removeQueue(uniqueId);
1918             rcCleanup = 1;
1919             fileDeleteErrMsg = "Network error, PM clean up; ";
1920         }
1921         else
1922         {
1923             messageqcpp::ByteStream::byte rc;
1924             uint16_t pmNum;
1925             *bsIn >> rc;
1926             *bsIn >> fileDeleteErrMsg;
1927             *bsIn >> pmNum;
1928 
1929             if ((rc != 0) && (rcCleanup == 0))
1930             {
1931                 fWEClient->removeQueue(uniqueId);
1932                 rcCleanup = rc;
1933             }
1934         }
1935 
1936         pmMsgCnt++;
1937     } // end of while loop to process all responses to rollback cleanup
1938 
1939     fWEClient->removeQueue(uniqueId);
1940     // We ignore return release flag from releaseTableLock().
1941     dbrm->releaseTableLock( lockInfo.id );
1942 
1943     if (rcCleanup != 0)
1944         throw  std::runtime_error(fileDeleteErrMsg);
1945 }
1946 
log(const std::string & msg,logging::LOG_TYPE level)1947 void DMLProcessor::log(const std::string& msg, logging::LOG_TYPE level)
1948 {
1949     logging::Message::Args args;
1950     logging::Message message(2);
1951     args.add(msg);
1952     message.format(args);
1953     logging::LoggingID lid(20);
1954     logging::MessageLog ml(lid);
1955 
1956     switch (level)
1957     {
1958         case LOG_TYPE_DEBUG:
1959             ml.logDebugMessage(message);
1960             break;
1961 
1962         case LOG_TYPE_INFO:
1963             ml.logInfoMessage(message);
1964             break;
1965 
1966         case LOG_TYPE_WARNING:
1967             ml.logWarningMessage(message);
1968             break;
1969 
1970         case LOG_TYPE_ERROR:
1971             ml.logErrorMessage(message);
1972             break;
1973 
1974         case LOG_TYPE_CRITICAL:
1975             ml.logCriticalMessage(message);
1976             break;
1977     }
1978 }
1979 
1980 } //namespace dmlprocessor
1981 // vim:ts=4 sw=4:
1982 
1983