1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /***********************************************************************
19 * $Id: dmlprocessor.cpp 1024 2013-07-26 16:23:59Z chao $
20 *
21 *
22 ***********************************************************************/
23 /** @file */
24 #include "configcpp.h"
25 #include <signal.h>
26 #include <ctime>
27
28 //#define SERIALIZE_DDL_DML_CPIMPORT 1
29 #include <boost/thread/mutex.hpp>
30 #include <boost/scoped_ptr.hpp>
31 #include <boost/scoped_array.hpp>
32 #include <boost/shared_ptr.hpp>
33 using namespace boost;
34
35 #include "cacheutils.h"
36 #include "vss.h"
37 #include "dbrm.h"
38 #include "brmtypes.h"
39 #include "idberrorinfo.h"
40 #include "errorids.h"
41 #include "batchinsertprocessor.h"
42 #include "tablelockdata.h"
43 #include "oamcache.h"
44 #include "messagelog.h"
45 #include "sqllogger.h"
46 #include "we_messages.h"
47 #include "dmlprocessor.h"
48 using namespace BRM;
49 using namespace config;
50 using namespace execplan;
51 using namespace std;
52 using namespace messageqcpp;
53 using namespace dmlpackage;
54 using namespace dmlpackageprocessor;
55 using namespace joblist;
56 using namespace logging;
57 using namespace oam;
58 using namespace WriteEngine;
59
60 #include "querytele.h"
61 using namespace querytele;
62
63 extern boost::mutex mute;
64 extern boost::condition_variable cond;
65
66 #define MCOL_140 // Undefine to test VSS for out of order transactions
67
68 namespace
69 {
70 const std::string myname = "DMLProc";
71 }
72
73 namespace dmlprocessor
74 {
75 // Map to store the package handler objects so we can set flags during execution
76 // for things like ctrl+c
77 DMLProcessor::PackageHandlerMap_t DMLProcessor::packageHandlerMap;
78 boost::mutex DMLProcessor::packageHandlerMapLock;
79
80 //Map to store the BatchInsertProc object
81 std::map<uint32_t, BatchInsertProc*> DMLProcessor::batchinsertProcessorMap;
82 boost::mutex DMLProcessor::batchinsertProcessorMapLock;
83
84 // MCOL-140 Map to hold table oids for tables being changed.
85 std::map<uint32_t, PackageHandler::tableAccessQueue_t> PackageHandler::tableOidMap;
86 boost::condition_variable PackageHandler::tableOidCond;
87 boost::mutex PackageHandler::tableOidMutex;
88
89 //------------------------------------------------------------------------------
90 // A thread to periodically call dbrm to see if a user is
91 // shutting down the system or has put the system into write
92 // suspend mode. DBRM has 2 flags to check in this case, the
93 // ROLLBACK flag, and the FORCE flag. These flags will be
94 // reported when we ask for the Shutdown Pending flag (which we
95 // ignore at this point). Even if the user is putting the system
96 // into write suspend mode, this call will return the flags we
97 // are interested in. If ROLLBACK is set, we cancel normally.
98 // If FORCE is set, we can't rollback.
99 struct CancellationThread
100 {
CancellationThreaddmlprocessor::CancellationThread101 CancellationThread(DBRM* aDbrm, DMLServer& aServer) : fDbrm(aDbrm), fServer(aServer)
102 {}
operator ()dmlprocessor::CancellationThread103 void operator()()
104 {
105 bool bDoingRollback = false;
106 bool bRollback = false;
107 bool bForce = false;
108 ostringstream oss;
109 std::vector<BRM::TableLockInfo> tableLocks;
110 BRM::TxnID txnId;
111 DMLProcessor::PackageHandlerMap_t::iterator phIter;
112 uint32_t sessionID;
113 int rc = 0;
114
115 while (true)
116 {
117 usleep(1000000); // 1 seconds
118 // Check to see if someone has ordered a shutdown or suspend with rollback.
119 (void)fDbrm->getSystemShutdownPending(bRollback, bForce);
120
121 if (bForce)
122 break;
123
124 if (bDoingRollback && bRollback)
125 {
126 continue;
127 // We've already started the rollbacks. Don't start again.
128 }
129
130 bDoingRollback = false;
131
132 if (bRollback)
133 {
134 RollbackTransactionProcessor rollbackProcessor(fDbrm);
135 SessionManager sessionManager;
136 uint64_t uniqueId = fDbrm->getUnique64();
137 std::string errorMsg;
138 int activeTransCount = 0;
139 int idleTransCount = 0;
140 bDoingRollback = true;
141 ostringstream oss;
142 oss << "DMLProc has been told to rollback all DML transactions.";
143 DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
144 // Tell any active processors to stop working and return an error
145 // The front end will respond with a ROLLBACK command.
146 // Mark all active processors to rollback
147 boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock);
148
149 for (phIter = DMLProcessor::packageHandlerMap.begin();
150 phIter != DMLProcessor::packageHandlerMap.end();
151 ++phIter)
152 {
153 ostringstream oss;
154 oss << "DMLProc will rollback active session " << phIter->second->getSessionID() << " Transaction " << phIter->second->getTxnid();
155 DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
156
157 ++activeTransCount;
158 phIter->second->rollbackPending();
159 }
160
161 if (activeTransCount > 0)
162 {
163 ostringstream oss1;
164 oss1 << "DMLProc is rolling back back " << activeTransCount << " active transactions.";
165 DMLProcessor::log(oss1.str(), logging::LOG_TYPE_INFO);
166 }
167
168 // Need to set cluster to read-only via CMAPI before shutting the cluster down.
169 if (fDbrm->isReadWrite())
170 {
171 continue;
172 }
173
174 // Check for any open DML transactions that don't currently have
175 // a processor
176 tableLocks = fDbrm->getAllTableLocks();
177
178 if (tableLocks.size() > 0)
179 {
180 for (uint32_t i = 0; i < tableLocks.size(); ++i)
181 {
182 sessionID = tableLocks[i].ownerSessionID;
183 phIter = DMLProcessor::packageHandlerMap.find(sessionID);
184
185 if (phIter == DMLProcessor::packageHandlerMap.end())
186 {
187 // We have found an active transaction without a packagehandler.
188 // This means that a transaction is open with autocommit turned
189 // off, but there's no current activity on the transaction. We
190 // need to roll it back if it's a DML transaction.
191 // If ownerName == "DMLProc" then it's a DML transaction.
192 if (tableLocks[i].ownerName == "DMLProc")
193 {
194 // OK, we know this is an idle DML transaction, so roll it back.
195 ++idleTransCount;
196 txnId.id = tableLocks[i].ownerTxnID;
197 txnId.valid = true;
198 rc = rollbackProcessor.rollBackTransaction(uniqueId, txnId, sessionID, errorMsg);
199
200 if ( rc == 0 )
201 {
202 fDbrm->invalidateUncommittedExtentLBIDs(txnId.id);
203
204 //@Bug 4524. In case it is batchinsert, call bulkrollback.
205 rc = rollbackProcessor.rollBackBatchAutoOnTransaction(uniqueId, txnId, sessionID, tableLocks[i].tableOID, errorMsg);
206
207 if (rc == 0)
208 {
209 logging::logCommand(0, tableLocks[i].ownerTxnID, "ROLLBACK;");
210
211 bool lockReleased = true;
212
213 try
214 {
215 lockReleased = fDbrm->releaseTableLock(tableLocks[i].id);
216 TablelockData::removeTablelockData(sessionID);
217 }
218 catch (std::exception&)
219 {
220 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
221 }
222
223 if (lockReleased)
224 {
225 sessionManager.rolledback(txnId);
226 ostringstream oss;
227 oss << "DMLProc rolled back idle transaction " << tableLocks[i].ownerTxnID << " and table lock id " << tableLocks[i].id << " is released.";
228 DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
229 }
230 else
231 {
232 ostringstream oss;
233 oss << "DMLProc rolled back idle transaction " << tableLocks[i].ownerTxnID << " and tble lock id " << tableLocks[i].id << " is not released.";
234 DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
235 }
236 }
237 else
238 {
239 ostringstream oss;
240 oss << " problem with bulk rollback of idle transaction " << tableLocks[i].ownerTxnID << "and DBRM is setting to readonly and table lock is not released: " << errorMsg;
241 DMLProcessor::log(oss.str(), logging::LOG_TYPE_CRITICAL);
242 rc = fDbrm->setReadOnly(true);
243 }
244 }
245 else
246 {
247 ostringstream oss;
248 oss << " problem with rollback of idle transaction " << tableLocks[i].ownerTxnID << "and DBRM is setting to readonly and table lock is not released: " << errorMsg;
249 DMLProcessor::log(oss.str(), logging::LOG_TYPE_CRITICAL);
250 rc = fDbrm->setReadOnly(true);
251 }
252 }
253 }
254 }
255 }
256
257 // If there are any abandonded transactions without locks
258 // release them.
259 int len;
260 boost::shared_array<BRM::SIDTIDEntry> activeTxns = sessionManager.SIDTIDMap(len);
261
262 for (int i = 0; i < len; i++)
263 {
264 // If there isn't a table lock for this transaction, roll it back. Otherwise, assume
265 // it has an active processor or is not DML initiated and leave it alone. It's someone
266 // else's concern.
267 bool bFoundit = false;
268
269 for (uint32_t j = 0; j < tableLocks.size(); ++j)
270 {
271 if (tableLocks[j].ownerTxnID == activeTxns[i].txnid.id)
272 {
273 bFoundit = true;
274 break;
275 }
276 }
277
278 if (!bFoundit && activeTxns[i].txnid.valid)
279 {
280 rollbackProcessor.rollBackTransaction(uniqueId, activeTxns[i].txnid, activeTxns[i].sessionid, errorMsg);
281 sessionManager.rolledback(activeTxns[i].txnid);
282 ++idleTransCount;
283 ostringstream oss;
284 oss << "DMLProc rolled back idle transaction with no tablelock" << tableLocks[i].ownerTxnID;
285 DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
286 }
287 }
288
289 if (idleTransCount > 0)
290 {
291 ostringstream oss2;
292 oss2 << "DMLProc has rolled back " << idleTransCount << " idle transactions.";
293 DMLProcessor::log(oss2.str(), logging::LOG_TYPE_INFO);
294 }
295 // Here is the end of the rollback if so DMLProc rollbacks what it can.
296 break;
297 }
298 }
299 // Setting the flag to tell DMLServer to exit.
300 fServer.startShutdown();
301 }
302 DBRM* fDbrm;
303 DMLServer& fServer;
304 };
305
PackageHandler(const messageqcpp::IOSocket & ios,boost::shared_ptr<messageqcpp::ByteStream> bs,uint8_t packageType,joblist::DistributedEngineComm * ec,bool concurrentSupport,uint64_t maxDeleteRows,uint32_t sessionID,execplan::CalpontSystemCatalog::SCN txnId,DBRM * aDbrm,const QueryTeleClient & qtc,boost::shared_ptr<execplan::CalpontSystemCatalog> csc)306 PackageHandler::PackageHandler(const messageqcpp::IOSocket& ios,
307 boost::shared_ptr<messageqcpp::ByteStream> bs,
308 uint8_t packageType,
309 joblist::DistributedEngineComm* ec,
310 bool concurrentSupport,
311 uint64_t maxDeleteRows,
312 uint32_t sessionID,
313 execplan::CalpontSystemCatalog::SCN txnId,
314 DBRM* aDbrm,
315 const QueryTeleClient& qtc,
316 boost::shared_ptr<execplan::CalpontSystemCatalog> csc) :
317 fIos(ios),
318 fByteStream(bs),
319 fPackageType(packageType),
320 fEC(ec),
321 fConcurrentSupport(concurrentSupport),
322 fMaxDeleteRows(maxDeleteRows),
323 fSessionID(sessionID),
324 fTableOid(0),
325 fTxnid(txnId),
326 fDbrm(aDbrm),
327 fQtc(qtc),
328 fcsc(csc)
329 {
330 }
331
~PackageHandler()332 PackageHandler::~PackageHandler()
333 {
334 //cout << "In destructor" << endl;
335 }
336
337 // MCOL-140
338 // Blocks a thread if there is another trx working on the same fTableOid
339 // return 1 when thread should continue.
340 // return 0 if error. Right now, no error detection is implemented.
341 //
342 // txnid was being created before the call to this function. This caused race conditions
343 // so creation is delayed until we're inside the lock here. Nothing needs it before
344 // this point in the execution.
345 //
346 // The algorithm is this. When the first txn for a given fTableOid arrives, start a queue
347 // containing a list of waiting or working txnId. Put this txnId into the queue (working)
348 // Put the queue into a map keyed on fTableOid.
349 //
350 // When the next txn for this fTableOid arrives, it finds the queue in the map and adds itself,
351 // then waits for condition.
352 // When a thread finishes, it removes its txnId from the queue and notifies all. If the queue is
353 // empty, it removes the entry from the map.
354 // Upon wakeup from wait(), a thread checks to see if it's next in the queue. If so, it is released
355 // to do work. Otherwise it goes back to wait.
356 //
357 // There's a chance (CTRL+C) for instance, that the txn is no longer in the queue. Release it to work.
358 // Rollback will most likely be next.
359 //
360 // A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid.
synchTableAccess(dmlpackage::CalpontDMLPackage * dmlPackage)361 int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage)
362 {
363 // MCOL-140 Wait for any other DML using this table.
364 std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
365 boost::unique_lock<boost::mutex> lock(tableOidMutex);
366 BRM::TxnID txnid;
367
368 if (fPackageType != dmlpackage::DML_COMMAND)
369 {
370 txnid = sessionManager.getTxnID(fSessionID);
371
372 if ( !txnid.valid )
373 {
374 txnid = sessionManager.newTxnID(fSessionID, true);
375
376 if (!txnid.valid)
377 {
378 throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
379 }
380 }
381 }
382 else
383 {
384 txnid = sessionManager.getTxnID(fSessionID);
385 }
386
387 fTxnid = txnid.id;
388
389 if ((it = tableOidMap.find(fTableOid)) != tableOidMap.end())
390 {
391 PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
392
393 // There's at least one working txn on this table. We may be the same txn.
394 if (fTxnid == tableOidQueue.front())
395 {
396 return 1; // We're next in line or the same as the last. Keep working
397 }
398
399 tableOidQueue.push(fTxnid); // Get on the waiting list.
400
401 // We need to wait
402 // tableOidQueue here is the queue holding the waitng transactions for this fTableOid
403 while (true)
404 {
405 // Log something that we're waiting
406 LoggingID logid(21, fSessionID, fTxnid);
407 logging::Message::Args args1;
408 logging::Message msg(1);
409 ostringstream oss;
410 oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |" << dmlPackage->get_SchemaName() <<"|";
411 args1.add(oss.str());
412 args1.add((uint64_t)fTableOid);
413 msg.format(args1);
414 logging::Logger logger(logid.fSubsysID);
415 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
416
417 tableOidCond.wait(lock);
418 // In case of CTRL+C, the tableOidQueue could be invalidated
419 if ((tableOidMap.find(fTableOid))->second != tableOidQueue)
420 {
421 break;
422 }
423 if (tableOidQueue.front() == fTxnid)
424 {
425 // We're up next. Let's go do stuff.
426 break;
427 }
428
429 if (tableOidQueue.empty())
430 {
431 // If we had been the last txn waiting and CTRL+C was hit, then the queue is empty now.
432 // Empty queues must be erased from the map.
433 tableOidMap.erase(fTableOid);
434 break;
435 }
436
437 // If we're not in the queue at all, then continue. CTRL+C was probably hit.
438 PackageHandler::tableAccessQueue_t::container_type::iterator c_it = tableOidQueue.find(fTxnid);
439
440 if (c_it == tableOidQueue.end())
441 {
442 break;
443 }
444
445 // We're still in the queue and not on top. Go back and wait some more.
446 }
447 }
448 else
449 {
450 // We're the first for this tableoid. Start a new queue.
451 tableAccessQueue_t tableOidQueue;
452 tableOidQueue.push(fTxnid);
453 tableOidMap[fTableOid] = tableOidQueue;
454 }
455
456 return 1;
457 }
458
459 // MCOL-140 Called when it's time to release the next thread for this tablOid
releaseTableAccess()460 int PackageHandler::releaseTableAccess()
461 {
462 // take us out of the queue
463 std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
464 boost::lock_guard<boost::mutex> lock(tableOidMutex);
465
466 if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end())
467 {
468 return 2; // For now, return codes are not used
469 }
470
471 PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
472
473 if (tableOidQueue.front() != fTxnid)
474 {
475 // This is a severe error. The front should be the working thread. If we're here,
476 // we're the working thread and should be front().
477 cout << fTxnid << " " << fTableOid << " We got to release and we're not on top " << tableOidQueue.front() << endl;
478 LoggingID logid(21, fSessionID, fTxnid);
479 logging::Message::Args args1;
480 logging::Message msg(1);
481 args1.add("ReleaseTableAccess: Txn being released is not the current txn in the tablOidQueue for tableid");
482 args1.add((uint64_t)fTableOid);
483 msg.format(args1);
484 logging::Logger logger(logid.fSubsysID);
485 logger.logMessage(LOG_TYPE_ERROR, msg, logid);
486 }
487 else
488 {
489 if (!tableOidQueue.empty())
490 tableOidQueue.pop(); // Get off the waiting list.
491
492 if (tableOidQueue.empty())
493 {
494 // remove the queue from the map.
495 tableOidMap.erase(fTableOid);
496 }
497 }
498
499 // release the condition
500 tableOidCond.notify_all();
501 return 1;
502 }
503
forceReleaseTableAccess()504 int PackageHandler::forceReleaseTableAccess()
505 {
506 // By removing the txnid from the queue, the logic after the wait in
507 // synchTableAccess() will release the thread and clean up if needed.
508 std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
509 boost::lock_guard<boost::mutex> lock(tableOidMutex);
510
511 if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end())
512 {
513 // This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess
514 return 2;
515 }
516
517 PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
518 tableOidQueue.erase(fTxnid);
519 if (tableOidQueue.empty())
520 {
521 // remove the queue from the map.
522 tableOidMap.erase(fTableOid);
523 }
524 // release the condition
525 tableOidCond.notify_all();
526 return 1;
527 }
528
529 //static
530 // Called upon sighup, often because PrimProc crashed. We don't want to leave all the transactions hung,
531 // though some may be because they never returned from PrimProc and will leave the table lock on.
clearTableAccess()532 int PackageHandler::clearTableAccess()
533 {
534 tableOidMap.clear();
535 return 1;
536 }
537
run()538 void PackageHandler::run()
539 {
540 ResourceManager* frm = ResourceManager::instance();
541 dmlpackageprocessor::DMLPackageProcessor::DMLResult result;
542 result.result = dmlpackageprocessor::DMLPackageProcessor::NO_ERROR;
543 //cout << "PackageHandler handling ";
544 std::string stmt;
545 unsigned DMLLoggingId = 21;
546 oam::OamCache* oamCache = oam::OamCache::makeOamCache();
547 SynchTable synchTable;
548
549 try
550 {
551 switch ( fPackageType )
552 {
553 case dmlpackage::DML_INSERT:
554 {
555 // build an InsertDMLPackage from the bytestream
556 //cout << "an INSERT package" << endl;
557 dmlpackage::InsertDMLPackage insertPkg;
558 //boost::shared_ptr<messageqcpp::ByteStream> insertBs (new messageqcpp::ByteStream);
559 messageqcpp::ByteStream bsSave = *(fByteStream.get());
560 insertPkg.readMetaData(*(fByteStream.get()));
561 #ifdef MCOL_140
562
563 if (fConcurrentSupport)
564 {
565 fTableOid = insertPkg.getTableOid();
566
567 // Single Insert has no start like bulk does, so insertPkg.getTableOid()
568 // isn't set. Go get it now.
569 if (fTableOid == 0)
570 {
571 CalpontSystemCatalog::TableName tableName;
572 tableName.schema = insertPkg.get_Table()->get_SchemaName();
573 tableName.table = insertPkg.get_Table()->get_TableName();
574 CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
575 fTableOid = roPair.objnum;
576 }
577 synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid
578 }
579
580 #endif
581 QueryTeleStats qts;
582 qts.query_uuid = QueryTeleClient::genUUID();
583 qts.msg_type = QueryTeleStats::QT_START;
584 qts.start_time = QueryTeleClient::timeNowms();
585 qts.session_id = fSessionID;
586 qts.query_type = "INSERT";
587 qts.query = insertPkg.get_SQLStatement();
588 qts.system_name = oamCache->getSystemName();
589 qts.module_name = oamCache->getModuleName();
590 qts.schema_name = insertPkg.get_SchemaName();
591 fQtc.postQueryTele(qts);
592
593 //cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl;
594 if (insertPkg.get_isBatchInsert())
595 {
596 fByteStream->reset();
597 //cout << "This is batch insert " << endl;
598 BatchInsertProc* batchProcessor = NULL;
599 {
600 boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
601
602 std::map<uint32_t, BatchInsertProc*>::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID);
603
604 if (batchIter == DMLProcessor::batchinsertProcessorMap.end())
605 {
606 batchProcessor = new BatchInsertProc(insertPkg.get_isAutocommitOn(), insertPkg.getTableOid(), fTxnid, fDbrm);
607 DMLProcessor::batchinsertProcessorMap[fSessionID] = batchProcessor;
608 //cout << "batchProcessor is created " << batchProcessor << endl;
609 }
610 else
611 {
612 batchProcessor = batchIter->second;
613 //cout << "Found batchProcessor " << batchProcessor << endl;
614 }
615 }
616
617 if ( insertPkg.get_Logging() )
618 {
619 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
620 logging::Message::Args args1;
621 logging::Message msg(1);
622 args1.add("Start SQL statement: ");
623 ostringstream oss;
624 oss << insertPkg.get_SQLStatement() << "; |" << insertPkg.get_SchemaName() << "|";
625 args1.add(oss.str());
626 msg.format( args1 );
627 logging::Logger logger(logid.fSubsysID);
628 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
629 TablelockData* tablelockData = TablelockData::makeTablelockData(insertPkg.get_SessionID());
630 uint64_t tableLockId = tablelockData->getTablelockId(insertPkg.getTableOid());
631
632 //cout << "Processing table oid " << insertPkg.getTableOid() << " for transaction "<< (int)fTxnid << endl;
633 if (tableLockId == 0)
634 {
635 //cout << "Grabing tablelock for batchProcessor " << batchProcessor << endl;
636 tableLockId = batchProcessor->grabTableLock(insertPkg.get_SessionID());
637
638 if (tableLockId == 0)
639 {
640 BRM::TxnID brmTxnID;
641 brmTxnID.id = fTxnid;
642 brmTxnID.valid = true;
643 sessionManager.rolledback(brmTxnID);
644 string errMsg;
645 int rc = 0;
646 batchProcessor->getError(rc, errMsg);
647 result.result = DMLPackageProcessor::TABLE_LOCK_ERROR;
648 logging::Message::Args args;
649 logging::Message message(1);
650 args.add("Insert Failed: ");
651 args.add(errMsg);
652 args.add("");
653 args.add("");
654 message.format(args);
655 result.message = message;
656 break;
657 }
658
659 if (tableLockId > 0)
660 tablelockData->setTablelock(insertPkg.getTableOid(), tableLockId);
661 }
662 }
663
664 if (insertPkg.get_Logending() && insertPkg.get_Logging()) //only one batch need to be processed.
665 {
666 //cout << "dmlprocessor add last pkg" << endl;
667 //need to add error handling.
668 batchProcessor->addPkg(bsSave);
669 batchProcessor->sendFirstBatch();
670 batchProcessor->receiveOutstandingMsg();
671 //@Bug 5162. Get the correct error message before the last message.
672 string errMsg;
673 int rc = 0;
674 batchProcessor->getError(rc, errMsg);
675 batchProcessor->sendlastBatch();
676 batchProcessor->receiveAllMsg();
677
678 if (rc == DMLPackageProcessor::IDBRANGE_WARNING)
679 {
680 result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
681 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
682 logging::Message::Args args1;
683 logging::Message msg(1);
684 args1.add("End SQL statement with warnings");
685 msg.format( args1 );
686 logging::Logger logger(logid.fSubsysID);
687 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
688 logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
689 logging::Message::Args args;
690 logging::Message message(1);
691 args.add(errMsg);
692 args.add("");
693 args.add("");
694 message.format(args);
695 result.message = message;
696 }
697 else if ( rc != 0)
698 {
699 result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
700 logging::Message::Args args;
701 logging::Message message(1);
702 cout << "Got error in the end of one batchinsert." << endl;
703 args.add("Insert Failed: ");
704 args.add(errMsg);
705 args.add("");
706 args.add("");
707 message.format(args);
708 result.message = message;
709 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
710 logging::Message::Args args1;
711 logging::Message msg(1);
712 args1.add("End SQL statement with error");
713 msg.format( args1 );
714 logging::Logger logger(logid.fSubsysID);
715 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
716 logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
717 }
718 else
719 {
720 // if (!insertPkg.get_isAutocommitOn())
721 // {
722 // batchProcessor->setHwm();
723 // }
724 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
725 logging::Message::Args args1;
726 logging::Message msg(1);
727 args1.add("End SQL statement");
728 msg.format( args1 );
729 logging::Logger logger(logid.fSubsysID);
730 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
731 logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
732 }
733
734 //remove the batch insert object
735 {
736 boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
737
738 std::map<uint32_t, BatchInsertProc*>::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID);
739
740 if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
741 {
742 delete batchIter->second;
743 DMLProcessor::batchinsertProcessorMap.erase(fSessionID);
744 }
745 }
746 }
747 else if (insertPkg.get_Logending()) //Last batch
748 {
749 int rc = 0;
750 string errMsg;
751 batchProcessor->getError(rc, errMsg);
752
753 //cout <<"dmlprocessor received last pkg from mysql rc == " << rc << endl;
754 if (( rc == 0) || (rc == DMLPackageProcessor::IDBRANGE_WARNING))
755 {
756 //cout << " rc = " << rc << endl;
757 batchProcessor->addPkg(bsSave);
758 batchProcessor->sendNextBatch();
759 batchProcessor->receiveOutstandingMsg();
760 //@Bug 5162. Get the correct error message before the last message.
761 batchProcessor->getError(rc, errMsg);
762 batchProcessor->sendlastBatch();
763 batchProcessor->receiveAllMsg();
764
765 if (rc == DMLPackageProcessor::IDBRANGE_WARNING)
766 {
767 result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
768 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
769 logging::Message::Args args1;
770 logging::Message msg(1);
771 args1.add("End SQL statement with warnings");
772 msg.format( args1 );
773 logging::Logger logger(logid.fSubsysID);
774 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
775 logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
776 logging::Message::Args args;
777 logging::Message message(1);
778 args.add(errMsg);
779 args.add("");
780 args.add("");
781 message.format(args);
782 result.message = message;
783 }
784 else if ( rc != 0)
785 {
786 //cout << "Got error in the end of last batchinsert. error message is " << errMsg << endl;
787 result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
788 logging::Message::Args args;
789 logging::Message message(1);
790 args.add("Insert Failed: ");
791 args.add(errMsg);
792 args.add("");
793 args.add("");
794 message.format(args);
795 result.message = message;
796 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
797 logging::Message::Args args1;
798 logging::Message msg(1);
799 args1.add("End SQL statement with error");
800 msg.format( args1 );
801 logging::Logger logger(logid.fSubsysID);
802 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
803 logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
804 }
805 else
806 {
807 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
808 logging::Message::Args args1;
809 logging::Message msg(1);
810 args1.add("End SQL statement");
811 msg.format( args1 );
812 logging::Logger logger(logid.fSubsysID);
813 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
814 logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
815 }
816
817 //cout << "finished batch insert" << endl;
818 }
819 else
820 {
821 //error occured. Receive all outstanding messages before erroring out.
822 batchProcessor->receiveOutstandingMsg();
823 batchProcessor->sendlastBatch(); //needs to flush files
824 batchProcessor->receiveAllMsg();
825 result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
826 //cout << "Got error in the end of batchinsert2. error msg is " << errMsg<< endl;
827 logging::Message::Args args;
828 logging::Message message(1);
829 args.add("Insert Failed: ");
830 args.add(errMsg);
831 args.add("");
832 args.add("");
833 message.format(args);
834 result.message = message;
835 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
836 logging::Message::Args args1;
837 logging::Message msg(1);
838 args1.add("End SQL statement with error");
839 msg.format( args1 );
840 logging::Logger logger(logid.fSubsysID);
841 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
842 logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName());
843 }
844
845 //remove from map
846 {
847 boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
848 std::map<uint32_t, BatchInsertProc*>::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID);
849
850 if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
851 {
852 //cout << "Batchinsertprcessor is deleted. " << batchIter->second << endl;
853 delete batchIter->second;
854 DMLProcessor::batchinsertProcessorMap.erase(fSessionID);
855 }
856 }
857
858 }
859 else
860 {
861 int rc = 0;
862 string errMsg;
863 batchProcessor->getError(rc, errMsg);
864
865 if (rc == DMLPackageProcessor::IDBRANGE_WARNING)
866 {
867 result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
868 }
869 else if ( rc != 0)
870 {
871 result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
872 //@Bug
873 //cout << "Got error during batchinsert. with message " << errMsg << endl;
874 logging::Message::Args args;
875 logging::Message message(6);
876 args.add( errMsg );
877 message.format( args );
878 result.message = message;
879 batchProcessor->receiveOutstandingMsg();
880 batchProcessor->sendlastBatch(); //needs to flush files
881 //cout << "Last batch is sent to WES." << endl;
882 batchProcessor->receiveAllMsg();
883 LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
884 logging::Message::Args args1;
885 logging::Message msg(1);
886 args1.add("End SQL statement with error");
887 msg.format( args1 );
888 logging::Logger logger(logid.fSubsysID);
889 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
890 //remove from map
891 {
892 boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
893 std::map<uint32_t, BatchInsertProc*>::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID);
894
895 if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
896 {
897 //cout << "Batchinsertprcessor is deleted. " << batchIter->second << endl;
898 delete batchIter->second;
899 DMLProcessor::batchinsertProcessorMap.erase(fSessionID);
900 }
901 }
902 break;
903 }
904
905 batchProcessor->addPkg(bsSave);
906 batchProcessor->sendNextBatch();
907 break;
908 }
909 }
910 else // Single Insert
911 {
912 // make sure insertPkg.readMetaData() is called before
913 // this on fByteStream!
914 // TODO: Similar to batch inserts, don't
915 // deserialize the row data here for single inserts.
916 insertPkg.readRowData(*(fByteStream.get()));
917 insertPkg.set_TxnID(fTxnid);
918 fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID()));
919 result = fProcessor->processPackage(insertPkg);
920 }
921
922 qts.msg_type = QueryTeleStats::QT_SUMMARY;
923 qts.max_mem_pct = result.stats.fMaxMemPct;
924 qts.num_files = result.stats.fNumFiles;
925 qts.phy_io = result.stats.fPhyIO;
926 qts.cache_io = result.stats.fCacheIO;
927 qts.msg_rcv_cnt = result.stats.fMsgRcvCnt;
928 qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped;
929 qts.msg_bytes_in = result.stats.fMsgBytesIn;
930 qts.msg_bytes_out = result.stats.fMsgBytesOut;
931 qts.rows = result.stats.fRows;
932 qts.end_time = QueryTeleClient::timeNowms();
933 qts.blocks_changed = result.stats.fBlocksChanged;
934 fQtc.postQueryTele(qts);
935 }
936 break;
937
938 case dmlpackage::DML_UPDATE:
939 {
940 // build an UpdateDMLPackage from the bytestream
941 //cout << "an UPDATE package" << endl;
942 boost::scoped_ptr<dmlpackage::UpdateDMLPackage> updatePkg(new dmlpackage::UpdateDMLPackage());
943 updatePkg->read(*(fByteStream.get()));
944 #ifdef MCOL_140
945
946 if (fConcurrentSupport)
947 {
948 fTableOid = updatePkg->getTableOid();
949
950 // Update generally doesn't set fTableOid in updatePkg. Go get it now.
951 if (fTableOid == 0)
952 {
953 CalpontSystemCatalog::TableName tableName;
954 tableName.schema = updatePkg->get_Table()->get_SchemaName();
955 tableName.table = updatePkg->get_Table()->get_TableName();
956 CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
957 fTableOid = roPair.objnum;
958 }
959 synchTable.setPackage(this, updatePkg.get()); // Blocks if another DML thread is using this fTableOid
960 }
961
962 #endif
963 updatePkg->set_TxnID(fTxnid);
964 QueryTeleStats qts;
965 qts.query_uuid = updatePkg->uuid();
966 qts.msg_type = QueryTeleStats::QT_START;
967 qts.start_time = QueryTeleClient::timeNowms();
968 qts.session_id = fSessionID;
969 qts.query_type = "UPDATE";
970 qts.query = updatePkg->get_SQLStatement();
971 qts.system_name = oamCache->getSystemName();
972 qts.module_name = oamCache->getModuleName();
973 qts.schema_name = updatePkg->get_SchemaName();
974 fQtc.postQueryTele(qts);
975 // process it
976 //@Bug 1341. Don't remove calpontsystemcatalog from this
977 //session to take advantage of cache.
978 fProcessor.reset(new dmlpackageprocessor::UpdatePackageProcessor(fDbrm, updatePkg->get_SessionID()));
979 fProcessor->setEngineComm(fEC);
980 fProcessor->setRM( frm);
981 idbassert( fTxnid != 0);
982 result = fProcessor->processPackage(*(updatePkg.get())) ;
983 qts.msg_type = QueryTeleStats::QT_SUMMARY;
984 qts.max_mem_pct = result.stats.fMaxMemPct;
985 qts.num_files = result.stats.fNumFiles;
986 qts.phy_io = result.stats.fPhyIO;
987 qts.cache_io = result.stats.fCacheIO;
988 qts.msg_rcv_cnt = result.stats.fMsgRcvCnt;
989 qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped;
990 qts.msg_bytes_in = result.stats.fMsgBytesIn;
991 qts.msg_bytes_out = result.stats.fMsgBytesOut;
992 qts.rows = result.stats.fRows;
993 qts.end_time = QueryTeleClient::timeNowms();
994 qts.blocks_changed = result.stats.fBlocksChanged;
995 fQtc.postQueryTele(qts);
996 }
997 break;
998
999 case dmlpackage::DML_DELETE:
1000 {
1001 boost::scoped_ptr<dmlpackage::DeleteDMLPackage> deletePkg(new dmlpackage::DeleteDMLPackage());
1002 deletePkg->read(*(fByteStream.get()));
1003 #ifdef MCOL_140
1004
1005 if (fConcurrentSupport)
1006 {
1007 fTableOid = deletePkg->getTableOid();
1008
1009 // Delete generally doesn't set fTableOid in updatePkg. Go get it now.
1010 if (fTableOid == 0)
1011 {
1012 CalpontSystemCatalog::TableName tableName;
1013 tableName.schema = deletePkg->get_Table()->get_SchemaName();
1014 tableName.table = deletePkg->get_Table()->get_TableName();
1015 CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
1016 fTableOid = roPair.objnum;
1017 }
1018 synchTable.setPackage(this, deletePkg.get()); // Blocks if another DML thread is using this fTableOid
1019 }
1020
1021 #endif
1022 deletePkg->set_TxnID(fTxnid);
1023 QueryTeleStats qts;
1024 qts.query_uuid = deletePkg->uuid();
1025 qts.msg_type = QueryTeleStats::QT_START;
1026 qts.start_time = QueryTeleClient::timeNowms();
1027 qts.session_id = fSessionID;
1028 qts.query_type = "DELETE";
1029 qts.query = deletePkg->get_SQLStatement();
1030 qts.system_name = oamCache->getSystemName();
1031 qts.module_name = oamCache->getModuleName();
1032 qts.schema_name = deletePkg->get_SchemaName();
1033 fQtc.postQueryTele(qts);
1034 // process it
1035 //@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache.
1036 fProcessor.reset(new dmlpackageprocessor::DeletePackageProcessor(fDbrm, deletePkg->get_SessionID()));
1037 fProcessor->setEngineComm(fEC);
1038 fProcessor->setRM( frm);
1039 idbassert( fTxnid != 0);
1040 result = fProcessor->processPackage(*(deletePkg.get())) ;
1041 qts.msg_type = QueryTeleStats::QT_SUMMARY;
1042 qts.max_mem_pct = result.stats.fMaxMemPct;
1043 qts.num_files = result.stats.fNumFiles;
1044 qts.phy_io = result.stats.fPhyIO;
1045 qts.cache_io = result.stats.fCacheIO;
1046 qts.msg_rcv_cnt = result.stats.fMsgRcvCnt;
1047 qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped;
1048 qts.msg_bytes_in = result.stats.fMsgBytesIn;
1049 qts.msg_bytes_out = result.stats.fMsgBytesOut;
1050 qts.rows = result.stats.fRows;
1051 qts.end_time = QueryTeleClient::timeNowms();
1052 qts.blocks_changed = result.stats.fBlocksChanged;
1053 fQtc.postQueryTele(qts);
1054 }
1055 break;
1056
1057 case dmlpackage::DML_COMMAND:
1058 {
1059 // build a CommandDMLPackage from the bytestream
1060 //cout << "a COMMAND package" << endl;
1061 dmlpackage::CommandDMLPackage commandPkg;
1062 commandPkg.read(*(fByteStream.get()));
1063 stmt = commandPkg.get_DMLStatement();
1064 boost::algorithm::to_upper(stmt);
1065 trim(stmt);
1066
1067 if (stmt == "CLEANUP")
1068 {
1069 execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(commandPkg.get_SessionID());
1070 execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(commandPkg.get_SessionID() | 0x80000000);
1071 }
1072 else
1073 {
1074 // process it
1075 //@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache.
1076 fProcessor.reset(new dmlpackageprocessor::CommandPackageProcessor(fDbrm, commandPkg.get_SessionID()));
1077
1078 //cout << "got command " << stmt << " for session " << commandPkg.get_SessionID() << endl;
1079 result = fProcessor->processPackage(commandPkg);
1080 }
1081 }
1082 break;
1083 }
1084
1085
1086
1087
1088 //Log errors
1089 if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR)
1090 && (result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
1091 && (result.result != dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR)
1092 && (result.result != dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR) )
1093 {
1094 logging::LoggingID lid(21);
1095 logging::MessageLog ml(lid);
1096
1097 ml.logErrorMessage( result.message );
1098 }
1099 else if (result.result == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
1100 {
1101 logging::LoggingID lid(21);
1102 logging::MessageLog ml(lid);
1103
1104 ml.logWarningMessage( result.message );
1105 }
1106 }
1107 catch (std::exception& e)
1108 {
1109
1110
1111 cout << "dmlprocessor.cpp PackageHandler::run() package type("
1112 << fPackageType << ") exception: " << e.what() << endl;
1113 logging::LoggingID lid(21);
1114 logging::MessageLog ml(lid);
1115 logging::Message::Args args;
1116 logging::Message message(1);
1117 args.add("dmlprocessor.cpp PackageHandler::run() package type");
1118 args.add((uint64_t)fPackageType);
1119 args.add(e.what());
1120 message.format(args);
1121 ml.logErrorMessage(message);
1122 result.result = DMLPackageProcessor::COMMAND_ERROR;
1123 result.message = message;
1124 }
1125 catch (...)
1126 {
1127
1128
1129 logging::LoggingID lid(21);
1130 logging::MessageLog ml(lid);
1131 logging::Message::Args args;
1132 logging::Message message(1);
1133 args.add("dmlprocessor.cpp PackageHandler::run() ... exception package type");
1134 args.add((uint64_t)fPackageType);
1135 message.format(args);
1136 ml.logErrorMessage(message);
1137 result.result = DMLPackageProcessor::COMMAND_ERROR;
1138 result.message = message;
1139 }
1140
1141 // We put the packageHandler into a map so that if we receive a
1142 // message to affect the previous command, we can find it.
1143 // We need to remove it from the list before sending the response back.
1144 // If we remove it after sending the results, it's possible for a commit
1145 // or rollback be sent and get processed before it is removed, and that
1146 // will fail.
1147 boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock);
1148 DMLProcessor::packageHandlerMap.erase(getSessionID());
1149 lk2.unlock();
1150
1151 // send back the results
1152 messageqcpp::ByteStream results;
1153 messageqcpp::ByteStream::octbyte rowCount = result.rowCount;
1154 messageqcpp::ByteStream::byte retval = result.result;
1155 results << retval;
1156 results << rowCount;
1157 results << result.message.msg();
1158 results << result.tableLockInfo; // ? connector does not get
1159 // query stats
1160 results << result.queryStats;
1161 results << result.extendedStats;
1162 results << result.miniStats;
1163 result.stats.serialize(results);
1164 fIos.write(results);
1165 //Bug 5226. dmlprocessor thread will close the socket to mysqld.
1166 //if (stmt == "CLEANUP")
1167 // fIos.close();
1168 }
1169
rollbackPending()1170 void PackageHandler::rollbackPending()
1171 {
1172 if (fProcessor.get() == NULL)
1173 {
1174 // This happens when batch insert
1175 return;
1176 }
1177
1178 fProcessor->setRollbackPending(true);
1179
1180 // Force a release of the processing from MCOL-140
1181 #ifdef MCOL_140
1182 if (fConcurrentSupport)
1183 {
1184 // MCOL-140 We're not necessarily the next in line.
1185 // This forces this thread to be released anyway.
1186 forceReleaseTableAccess();
1187 }
1188
1189 #endif
1190
1191 ostringstream oss;
1192 oss << "PackageHandler::rollbackPending: Processing DMLPackage.";
1193 DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
1194 }
1195
added_a_pm(int)1196 void added_a_pm(int)
1197 {
1198 DistributedEngineComm* dec;
1199 ResourceManager* rm = ResourceManager::instance();
1200 dec = DistributedEngineComm::instance(rm);
1201 dec->Setup();
1202 // MCOL-140 clear the waiting queue as all transactions are probably going to fail
1203 PackageHandler::clearTableAccess();
1204 }
1205
DMLServer(int packageMaxThreads,int packageWorkQueueSize,DBRM * dbrm)1206 DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm) :
1207 fPackageMaxThreads(packageMaxThreads), fPackageWorkQueueSize(packageWorkQueueSize), fDbrm(dbrm), fShutdownFlag(false)
1208 {
1209 fMqServer.reset(new MessageQueueServer("DMLProc"));
1210
1211 fDmlPackagepool.setMaxThreads(fPackageMaxThreads);
1212 fDmlPackagepool.setQueueSize(fPackageWorkQueueSize);
1213 fDmlPackagepool.setName("DmlPackagepool");
1214 }
1215
start()1216 int DMLServer::start()
1217 {
1218 messageqcpp::IOSocket ios;
1219 uint32_t nextID = 1;
1220
1221 try
1222 {
1223 // CancellationThread is for telling all active transactions
1224 // to quit working because the system is either going down
1225 // or going into write suspend mode
1226 CancellationThread cancelObject(fDbrm, *this);
1227 boost::thread cancelThread(cancelObject);
1228
1229 cout << "DMLProc is ready..." << endl;
1230
1231 const static struct timespec timeout = {1, 100}; // roughly 1 second TO
1232 for (;;)
1233 {
1234 ios = fMqServer->accept(&timeout);
1235 // MCS polls in a loop watching for a pending shutdown
1236 // that is signalled via fShutdownFlag set in a
1237 // CancellationThread. CT sets the flag if a cluster state
1238 // has SS_SHUTDOWNPENDING value set.
1239 while (!ios.hasSocketDescriptor() && !pendingShutdown())
1240 ios = fMqServer->accept(&timeout);
1241
1242 if (pendingShutdown())
1243 break;
1244 ios.setSockID(nextID++);
1245 fDmlPackagepool.invoke(DMLProcessor(ios, fDbrm));
1246 }
1247
1248 cancelThread.join();
1249 return EXIT_SUCCESS;
1250 }
1251 catch (std::exception& ex)
1252 {
1253 cerr << ex.what() << endl;
1254 logging::LoggingID lid(21);
1255 Message::Args args;
1256 Message message(8);
1257 args.add("DMLProc init caught exception: ");
1258 args.add(ex.what());
1259 message.format(args);
1260 logging::Logger logger(lid.fSubsysID);
1261 logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid);
1262 return EXIT_FAILURE;
1263 }
1264 catch (...)
1265 {
1266 cerr << "Caught unknown exception!" << endl;
1267 logging::LoggingID lid(21);
1268 Message::Args args;
1269 Message message(8);
1270 args.add("DMLProc init caught unknown exception");
1271 message.format(args);
1272 logging::Logger logger(lid.fSubsysID);
1273 logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid);
1274 return EXIT_FAILURE;
1275 }
1276 }
1277
DMLProcessor(messageqcpp::IOSocket ios,BRM::DBRM * aDbrm)1278 DMLProcessor::DMLProcessor(messageqcpp::IOSocket ios, BRM::DBRM* aDbrm) :
1279 fIos(ios), fDbrm(aDbrm)
1280 {
1281 csc = CalpontSystemCatalog::makeCalpontSystemCatalog();
1282 csc->identity(CalpontSystemCatalog::EC);
1283 string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));
1284
1285 if (!teleServerHost.empty())
1286 {
1287 int teleServerPort = config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port"));
1288
1289 if (teleServerPort > 0)
1290 {
1291 fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort));
1292 }
1293 }
1294 }
1295
operator ()()1296 void DMLProcessor::operator()()
1297 {
1298 bool bIsDbrmUp = true;
1299
1300 try
1301 {
1302 boost::shared_ptr<messageqcpp::ByteStream> bs1 (new messageqcpp::ByteStream());
1303 //messageqcpp::ByteStream bs;
1304 uint8_t packageType;
1305
1306 ResourceManager* rm = ResourceManager::instance();
1307 DistributedEngineComm* fEC = DistributedEngineComm::instance(rm);
1308
1309 uint64_t maxDeleteRows = rm->getDMLMaxDeleteRows();
1310
1311 fConcurrentSupport = true;
1312 string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions");
1313
1314 if ( concurrentTranStr.length() != 0 )
1315 {
1316 if ((concurrentTranStr.compare("N") == 0) || (concurrentTranStr.compare("n") == 0))
1317 fConcurrentSupport = false;
1318 }
1319
1320 #ifndef _MSC_VER
1321 struct sigaction ign;
1322 memset(&ign, 0, sizeof(ign));
1323 ign.sa_handler = added_a_pm;
1324 sigaction(SIGHUP, &ign, 0);
1325 #endif
1326 fEC->Open();
1327
1328 for (;;)
1329 {
1330 //cout << "DMLProc is waiting for a Calpont DML Package on " << fIos.getSockID() << endl;
1331 try
1332 {
1333 bs1.reset(new messageqcpp::ByteStream(fIos.read()));
1334 //cout << "received from mysql socket " << fIos.getSockID() << endl;
1335 }
1336 catch (std::exception& ex)
1337 {
1338 //This is an I/O error from InetStreamSocket::read(), just close and move on...
1339 cout << "runtime error during read on " << fIos.getSockID() << " " << ex.what() << endl;
1340 bs1->reset();
1341 }
1342 catch (...)
1343 {
1344 cout << "... error during read " << fIos.getSockID() << endl;
1345 // all this throw does is cause this thread to silently go away. I doubt this is the right
1346 // thing to do...
1347 throw;
1348 }
1349
1350 if (!bs1 || bs1->length() == 0)
1351 {
1352 cout << "Read 0 bytes. Closing connection " << fIos.getSockID() << endl;
1353 fIos.close();
1354 break;
1355 }
1356
1357 uint32_t sessionID;
1358 *bs1 >> sessionID;
1359 *bs1 >> packageType;
1360 //cout << "DMLProc received pkg. sessionid:type = " << sessionID <<":"<<(int)packageType << endl;
1361 uint32_t stateFlags;
1362 messageqcpp::ByteStream::byte status = 255;
1363 messageqcpp::ByteStream::octbyte rowCount = 0;
1364
1365 if (fDbrm->getSystemState(stateFlags) > 0) // > 0 implies succesful retrieval. It doesn't imply anything about the contents
1366 {
1367 messageqcpp::ByteStream results;
1368 const char* responseMsg = 0;
1369 bool bReject = false;
1370
1371 // Check to see if we're in write suspended mode
1372 // If so, we can't process the request.
1373 if (stateFlags & SessionManagerServer::SS_SUSPENDED)
1374 {
1375 status = DMLPackageProcessor::NOT_ACCEPTING_PACKAGES;
1376 responseMsg = "Writing to the database is disabled.";
1377 bReject = true;
1378 }
1379
1380 // Check to see if we're in write suspend or shutdown pending mode
1381 if (packageType != dmlpackage::DML_COMMAND) // Not a commit or rollback
1382 {
1383 if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING
1384 || stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING)
1385 {
1386 if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING)
1387 {
1388 responseMsg = "Writing to the database is disabled.";
1389 }
1390 else
1391 {
1392 responseMsg = "The database is being shut down.";
1393 }
1394
1395 // Refuse all non active tranasactions
1396 // Check the rollback flag
1397 // -- Set: Rollback active transactions.
1398 // -- Not set: Allow active transactions.
1399 if (sessionManager.isTransactionActive(sessionID, bIsDbrmUp))
1400 {
1401 if (stateFlags & SessionManagerServer::SS_ROLLBACK)
1402 {
1403 status = DMLPackageProcessor::JOB_CANCELED;
1404 bReject = true;
1405 }
1406 }
1407 else
1408 {
1409 status = DMLPackageProcessor::NOT_ACCEPTING_PACKAGES;
1410 bReject = true;
1411 }
1412 }
1413
1414 if (bReject)
1415 {
1416 // For batch insert, we need to send a lastpkg message
1417 // to batchInsertProcessor so it can clean things up.
1418 if (packageType == dmlpackage::DML_INSERT)
1419 {
1420 // build an InsertDMLPackage from the bytestream
1421 // We need the flags from the package to know what
1422 // type of package we're dealing with before we can
1423 // take special action for the last package of a
1424 // batch insert.
1425 dmlpackage::InsertDMLPackage insertPkg;
1426 messageqcpp::ByteStream bsSave = *(bs1.get());
1427 insertPkg.read(*(bs1.get()));
1428 BatchInsertProc* batchInsertProcessor = NULL;
1429
1430 if (insertPkg.get_isBatchInsert() && insertPkg.get_Logending())
1431 {
1432 {
1433 boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
1434 std::map<uint32_t, BatchInsertProc*>::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(sessionID);
1435
1436 if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) //The first batch, no need to do anything
1437 {
1438
1439 batchInsertProcessor = batchIter->second;
1440 batchInsertProcessor->addPkg(bsSave);
1441
1442 batchInsertProcessor->sendlastBatch();
1443 batchInsertProcessor->receiveAllMsg();
1444
1445
1446 if (!insertPkg.get_isAutocommitOn())
1447 {
1448 batchInsertProcessor->setHwm();
1449 }
1450
1451 batchIter = DMLProcessor::batchinsertProcessorMap.find(sessionID);
1452
1453 if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
1454 {
1455 DMLProcessor::batchinsertProcessorMap.erase(sessionID);
1456 }
1457 }
1458 }
1459 }
1460 }
1461
1462 results << status;
1463 results << rowCount;
1464 logging::Message::Args args;
1465 logging::Message message(2);
1466 args.add(responseMsg);
1467 message.format( args );
1468 results << message.msg();
1469 fIos.write(results);
1470 continue;
1471 }
1472 }
1473 }
1474
1475 // This section is to check to see if the user hit CTRL+C while the
1476 // DML was processing If so, the sessionID will be found in
1477 // packageHandlerMap and we can set rollbackPending in the
1478 // associated packageHandler. Other than CTRL+C, we should never
1479 // find our own sessionID in the map.
1480 // This mechanism may prove useful for other things, so the above
1481 // comment may change.
1482 {
1483 boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock);
1484 DMLProcessor::PackageHandlerMap_t::iterator phIter = packageHandlerMap.find(sessionID);
1485
1486 if (phIter != packageHandlerMap.end())
1487 {
1488 if (packageType == dmlpackage::DML_COMMAND)
1489 {
1490 // MCOL-66 It's possible for a commit or rollback to get here if
1491 // the timing is just right. Don't destroy its data
1492 messageqcpp::ByteStream bsctrlc(bs1);
1493 dmlpackage::CommandDMLPackage commandPkg;
1494 commandPkg.read(bsctrlc);
1495 std::string stmt = commandPkg.get_DMLStatement();
1496 boost::algorithm::to_upper(stmt);
1497 trim(stmt);
1498
1499 if (stmt == "CTRL+C")
1500 {
1501 phIter->second->rollbackPending();
1502 fIos.close();
1503 break;
1504 }
1505 }
1506 else
1507 {
1508 // If there's a PackageHandler already working for this
1509 // sessionID, we have a problem. Reject this package
1510 messageqcpp::ByteStream results;
1511 ostringstream oss;
1512 oss << "Received a DML command for session " << sessionID <<
1513 " while still processing a command for the same sessionID";
1514 results << static_cast<messageqcpp::ByteStream::byte>(DMLPackageProcessor::DEAD_LOCK_ERROR);
1515 results << static_cast<messageqcpp::ByteStream::octbyte>(0); // rowcount
1516 logging::Message::Args args;
1517 logging::Message message(2);
1518 args.add(oss.str());
1519 message.format( args );
1520 logging::LoggingID lid(20);
1521 logging::MessageLog ml(lid);
1522 ml.logErrorMessage(message);
1523 results << message.msg();
1524 fIos.write(results);
1525 continue;
1526 }
1527 }
1528 }
1529
1530 //cout << " got a ";
1531 switch (packageType)
1532 {
1533 case dmlpackage::DML_INSERT:
1534 //cout << "DML_INSERT";
1535 break;
1536
1537 case dmlpackage::DML_UPDATE:
1538 //cout << "DML_UPDATE";
1539 break;
1540
1541 case dmlpackage::DML_DELETE:
1542 //cout << "DML_DELETE";
1543 break;
1544
1545 case dmlpackage::DML_COMMAND:
1546 //cout << "DML_COMMAND";
1547 break;
1548
1549 case dmlpackage::DML_INVALID_TYPE:
1550 //cout << "DML_INVALID_TYPE";
1551 break;
1552
1553 default:
1554 //cout << "UNKNOWN";
1555 break;
1556 }
1557
1558 //cout << " package" << endl;
1559
1560 BRM::TxnID txnid;
1561
1562 if (!fConcurrentSupport)
1563 {
1564 //Check if any other active transaction
1565 bool anyOtherActiveTransaction = true;
1566 BRM::SIDTIDEntry blockingsid;
1567
1568 //For logout commit trigger
1569 if ( packageType == dmlpackage::DML_COMMAND )
1570 {
1571 anyOtherActiveTransaction = false;
1572 }
1573
1574 int i = 0;
1575 int waitPeriod = 10;
1576 //@Bug 2487 Check transaction map every 1/10 second
1577
1578 int sleepTime = 100; // sleep 100 milliseconds between checks
1579 int numTries = 10; // try 10 times per second
1580
1581
1582 string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
1583
1584 if ( waitPeriodStr.length() != 0 )
1585 waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));
1586
1587 numTries = waitPeriod * 10;
1588 struct timespec rm_ts;
1589
1590 rm_ts.tv_sec = sleepTime / 1000;
1591 rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
1592
1593 //cout << "starting i = " << i << endl;
1594 //txnid = sessionManager.getTxnID(sessionID);
1595 while (anyOtherActiveTransaction)
1596 {
1597 anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
1598 blockingsid );
1599
1600 //cout << "session " << sessionID << " with package type " << (int)packageType << " got anyOtherActiveTransaction " << anyOtherActiveTransaction << endl;
1601 if (anyOtherActiveTransaction)
1602 {
1603 for ( ; i < numTries; i++ )
1604 {
1605 #ifdef _MSC_VER
1606 Sleep(rm_ts.tv_sec * 1000);
1607 #else
1608 struct timespec abs_ts;
1609
1610 //cout << "session " << sessionID << " nanosleep on package type " << (int)packageType << endl;
1611 do
1612 {
1613 abs_ts.tv_sec = rm_ts.tv_sec;
1614 abs_ts.tv_nsec = rm_ts.tv_nsec;
1615 }
1616 while (nanosleep(&abs_ts, &rm_ts) < 0);
1617
1618 #endif
1619 anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
1620 blockingsid );
1621
1622 if ( !anyOtherActiveTransaction )
1623 {
1624 txnid = sessionManager.getTxnID(sessionID);
1625
1626 //cout << "Ready to process type " << (int)packageType << " with txd " << txnid << endl;
1627 if ( !txnid.valid )
1628 {
1629 txnid = sessionManager.newTxnID(sessionID, true);
1630
1631 if (txnid.valid)
1632 {
1633 //cout << "Ready to process type " << (int)packageType << " for session "<< sessionID << " with new txnid " << txnid.id << endl;
1634 anyOtherActiveTransaction = false;
1635 break;
1636 }
1637 else
1638 {
1639 anyOtherActiveTransaction = true;
1640 }
1641 }
1642 else
1643 {
1644 anyOtherActiveTransaction = false;
1645 //cout << "already have transaction to process type " << (int)packageType << " for session "<< sessionID <<" with existing txnid " << txnid.id << endl;
1646 break;
1647 }
1648 }
1649 }
1650
1651 //cout << "ending i = " << i << endl;
1652 }
1653 else
1654 {
1655 //cout << "Ready to process type " << (int)packageType << endl;
1656 txnid = sessionManager.getTxnID(sessionID);
1657
1658 if ( !txnid.valid )
1659 {
1660 txnid = sessionManager.newTxnID(sessionID, true);
1661
1662 if (txnid.valid)
1663 {
1664 //cout << "later Ready to process type " << (int)packageType << " for session "<< sessionID << " with new txnid " << txnid.id << endl;
1665 anyOtherActiveTransaction = false;
1666 }
1667 else
1668 {
1669 anyOtherActiveTransaction = true;
1670 //cout << "Cannot get txnid for process type " << (int)packageType << " for session "<< sessionID << endl;
1671 }
1672 }
1673 else
1674 {
1675 anyOtherActiveTransaction = false;
1676 //cout << "already have transaction to process type " << (int)packageType << " for session "<< sessionID <<" with txnid " << txnid.id << endl;
1677 break;
1678 }
1679 }
1680
1681 if ((anyOtherActiveTransaction) && (i >= numTries))
1682 {
1683 //cout << " Erroring out on package type " << (int)packageType << " for session " << sessionID << endl;
1684 break;
1685 }
1686 }
1687
1688 if (anyOtherActiveTransaction && (i >= numTries))
1689 {
1690 //cout << " again Erroring out on package type " << (int)packageType << endl;
1691 messageqcpp::ByteStream results;
1692 //@Bug 2681 set error code for active transaction
1693 status = DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR;
1694 rowCount = 0;
1695 results << status;
1696 results << rowCount;
1697 Message::Args args;
1698 args.add(static_cast<uint64_t>(blockingsid.sessionid));
1699 results << IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args);
1700 //@Bug 3854 Log to debug.log
1701 LoggingID logid(20, 0, 0);
1702 logging::Message::Args args1;
1703 logging::Message msg(1);
1704 args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args));
1705 msg.format( args1 );
1706 logging::Logger logger(logid.fSubsysID);
1707 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
1708
1709 fIos.write(results);
1710 }
1711 else
1712 {
1713 //cout << "starting processing package type " << (int) packageType << " for session " << sessionID << " with id " << txnid.id << endl;
1714 boost::shared_ptr<PackageHandler> php(new PackageHandler(fIos, bs1, packageType, fEC,
1715 fConcurrentSupport, maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc, csc));
1716 // We put the packageHandler into a map so that if we receive a
1717 // message to affect the previous command, we can find it.
1718 boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock);
1719
1720 lk2.lock();
1721 packageHandlerMap[sessionID] = php;
1722 lk2.unlock();
1723
1724 php->run(); // Operates in this thread.
1725
1726 // Move this to the end of PackageHandler so it is removed from the map before the response is sent
1727 // lk2.lock();
1728 // packageHandlerMap.erase(sessionID);
1729 // lk2.unlock();
1730 }
1731 }
1732 else
1733 {
1734 #if 0
1735
1736 if (packageType != dmlpackage::DML_COMMAND)
1737 {
1738 txnid = sessionManager.getTxnID(sessionID);
1739
1740 if ( !txnid.valid )
1741 {
1742 txnid = sessionManager.newTxnID(sessionID, true);
1743
1744 if (!txnid.valid)
1745 {
1746 throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
1747 }
1748 }
1749 }
1750 else
1751 {
1752 txnid = sessionManager.getTxnID(sessionID);
1753 }
1754
1755 #endif
1756 boost::shared_ptr<PackageHandler> php(new PackageHandler(fIos, bs1, packageType, fEC,
1757 fConcurrentSupport, maxDeleteRows, sessionID, 0, fDbrm, fQtc, csc));
1758 // We put the packageHandler into a map so that if we receive a
1759 // message to affect the previous command, we can find it.
1760 boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock);
1761
1762 lk2.lock();
1763 packageHandlerMap[sessionID] = php;
1764 lk2.unlock();
1765
1766 php->run(); // Operates in this thread.
1767
1768 // Move this to the end of PackageHandler so it is removed from the map before the response is sent
1769 // lk2.lock();
1770 // packageHandlerMap.erase(sessionID);
1771 // lk2.unlock();
1772 }
1773 }
1774 }
1775 catch (std::exception& ex)
1776 {
1777 ostringstream oss;
1778 oss << "DMLProcessor failed on: " << ex.what();
1779 DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
1780 fIos.close();
1781 }
1782 catch (...)
1783 {
1784 ostringstream oss;
1785 oss << "DMLProcessor failed on: processing DMLPackage.";
1786 DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
1787 cerr << "Caught unknown exception! " << oss.str();
1788 fIos.close();
1789 }
1790 }
1791
processBulkRollback(BRM::TableLockInfo lockInfo,BRM::DBRM * dbrm,uint64_t uniqueId,OamCache::dbRootPMMap_t & dbRootPMMap,bool & lockReleased)1792 void RollbackTransactionProcessor::processBulkRollback (BRM::TableLockInfo lockInfo, BRM::DBRM* dbrm, uint64_t uniqueId,
1793 OamCache::dbRootPMMap_t& dbRootPMMap, bool& lockReleased)
1794 {
1795 // Take over ownership of stale lock.
1796 // Use "DMLProc" as process name, session id and transaction id -1 to distinguish from real DMLProc rollback
1797 int32_t sessionID = -1;
1798 int32_t txnid = -1;
1799 std::string processName("DMLProc");
1800 uint32_t processID = ::getpid();
1801 bool ownerChanged = true;
1802 lockReleased = true;
1803
1804 try
1805 {
1806 ownerChanged = dbrm->changeOwner(lockInfo.id, processName, processID, sessionID, txnid);
1807 }
1808 catch (std::exception&)
1809 {
1810 lockReleased = false;
1811 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
1812 }
1813
1814 if (!ownerChanged)
1815 {
1816 lockReleased = false;
1817 throw std::runtime_error( std::string("Unable to grab lock; lock not found or still in use.") );
1818 }
1819
1820 //send to all PMs
1821 boost::shared_ptr<messageqcpp::ByteStream> bsIn;
1822 messageqcpp::ByteStream bsOut;
1823 string tableName("");
1824 fWEClient->addQueue(uniqueId);
1825 //find the PMs need to send the message to
1826 std::set<int> pmSet;
1827 int pmId;
1828
1829 for (uint32_t i = 0; i < lockInfo.dbrootList.size(); i++)
1830 {
1831 pmId = (*dbRootPMMap)[lockInfo.dbrootList[i]];
1832 pmSet.insert(pmId);
1833 }
1834
1835 if (lockInfo.state == BRM::LOADING)
1836 {
1837 bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK;
1838 bsOut << uniqueId;
1839 bsOut << lockInfo.id;
1840 bsOut << lockInfo.tableOID;
1841 bsOut << tableName;
1842 bsOut << processName;
1843 std::set<int>::const_iterator iter = pmSet.begin();
1844
1845 while (iter != pmSet.end())
1846 {
1847 fWEClient->write(bsOut, *iter);
1848 iter++;
1849 }
1850
1851 // Wait for "all" the responses, and accumulate any/all errors
1852 unsigned int pmMsgCnt = 0;
1853
1854 while (pmMsgCnt < pmSet.size())
1855 {
1856 std::string rollbackErrMsg;
1857 bsIn.reset(new messageqcpp::ByteStream());
1858 fWEClient->read(uniqueId, bsIn);
1859
1860 if (bsIn->length() == 0)
1861 {
1862 fWEClient->removeQueue(uniqueId);
1863 lockReleased = false;
1864 throw std::runtime_error("Network error, PM rollback; ");
1865 }
1866 else
1867 {
1868 messageqcpp::ByteStream::byte rc;
1869 uint16_t pmNum;
1870 *bsIn >> rc;
1871 *bsIn >> rollbackErrMsg;
1872 *bsIn >> pmNum;
1873
1874 if (rc != 0)
1875 {
1876 fWEClient->removeQueue(uniqueId);
1877 lockReleased = false;
1878 throw std::runtime_error(rollbackErrMsg);
1879 }
1880 }
1881
1882 pmMsgCnt++;
1883 } // end of while loop to process all responses to bulk rollback
1884
1885 // If no errors so far, then change state to CLEANUP state.
1886 // We ignore the return stateChange flag.
1887 dbrm->changeState( lockInfo.id, BRM::CLEANUP );
1888 } // end of (lockInfo.state == BRM::LOADING)
1889
1890 //delete meta data backup rollback files
1891 bsOut.reset();
1892
1893 bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK_CLEANUP;
1894 bsOut << uniqueId;
1895 bsOut << lockInfo.tableOID;
1896 std::set<int>::const_iterator iter = pmSet.begin();
1897
1898 while (iter != pmSet.end())
1899 {
1900 fWEClient->write(bsOut, *iter);
1901 iter++;
1902 }
1903
1904 // Wait for "all" the responses, and accumulate any/all errors
1905 unsigned int pmMsgCnt = 0;
1906 //@Bug 4517 Release tablelock when it is in CLEANUP state
1907 uint32_t rcCleanup = 0;
1908 std::string fileDeleteErrMsg;
1909
1910 while (pmMsgCnt < pmSet.size())
1911 {
1912 bsIn.reset(new messageqcpp::ByteStream());
1913 fWEClient->read(uniqueId, bsIn);
1914
1915 if (bsIn->length() == 0)
1916 {
1917 fWEClient->removeQueue(uniqueId);
1918 rcCleanup = 1;
1919 fileDeleteErrMsg = "Network error, PM clean up; ";
1920 }
1921 else
1922 {
1923 messageqcpp::ByteStream::byte rc;
1924 uint16_t pmNum;
1925 *bsIn >> rc;
1926 *bsIn >> fileDeleteErrMsg;
1927 *bsIn >> pmNum;
1928
1929 if ((rc != 0) && (rcCleanup == 0))
1930 {
1931 fWEClient->removeQueue(uniqueId);
1932 rcCleanup = rc;
1933 }
1934 }
1935
1936 pmMsgCnt++;
1937 } // end of while loop to process all responses to rollback cleanup
1938
1939 fWEClient->removeQueue(uniqueId);
1940 // We ignore return release flag from releaseTableLock().
1941 dbrm->releaseTableLock( lockInfo.id );
1942
1943 if (rcCleanup != 0)
1944 throw std::runtime_error(fileDeleteErrMsg);
1945 }
1946
log(const std::string & msg,logging::LOG_TYPE level)1947 void DMLProcessor::log(const std::string& msg, logging::LOG_TYPE level)
1948 {
1949 logging::Message::Args args;
1950 logging::Message message(2);
1951 args.add(msg);
1952 message.format(args);
1953 logging::LoggingID lid(20);
1954 logging::MessageLog ml(lid);
1955
1956 switch (level)
1957 {
1958 case LOG_TYPE_DEBUG:
1959 ml.logDebugMessage(message);
1960 break;
1961
1962 case LOG_TYPE_INFO:
1963 ml.logInfoMessage(message);
1964 break;
1965
1966 case LOG_TYPE_WARNING:
1967 ml.logWarningMessage(message);
1968 break;
1969
1970 case LOG_TYPE_ERROR:
1971 ml.logErrorMessage(message);
1972 break;
1973
1974 case LOG_TYPE_CRITICAL:
1975 ml.logCriticalMessage(message);
1976 break;
1977 }
1978 }
1979
1980 } //namespace dmlprocessor
1981 // vim:ts=4 sw=4:
1982
1983