1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //   $Id: updatepackageprocessor.cpp 9673 2013-07-09 15:59:49Z chao $
20 
21 #include <iostream>
22 #include <fstream>
23 #include <ctype.h>
24 #include <string>
25 //#define NDEBUG
26 #include <cassert>
27 #include <map>
28 #include <boost/scoped_ptr.hpp>
29 using namespace std;
30 #include "updatepackageprocessor.h"
31 #include "writeengine.h"
32 #include "joblistfactory.h"
33 #include "messagelog.h"
34 #include "simplecolumn.h"
35 #include "sqllogger.h"
36 #include "stopwatch.h"
37 #include "dbrm.h"
38 #include "idberrorinfo.h"
39 #include "errorids.h"
40 #include "rowgroup.h"
41 #include "bytestream.h"
42 #include "calpontselectexecutionplan.h"
43 #include "autoincrementdata.h"
44 #include "columnresult.h"
45 #include "we_messages.h"
46 #include "tablelockdata.h"
47 #include "oamcache.h"
48 
49 using namespace WriteEngine;
50 using namespace dmlpackage;
51 using namespace execplan;
52 using namespace logging;
53 using namespace dataconvert;
54 using namespace joblist;
55 using namespace rowgroup;
56 using namespace messageqcpp;
57 using namespace BRM;
58 using namespace oam;
59 
60 //#define PROFILE 1
61 namespace dmlpackageprocessor
62 {
63 
64 
65 //StopWatch timer;
66 DMLPackageProcessor::DMLResult
processPackage(dmlpackage::CalpontDMLPackage & cpackage)67 UpdatePackageProcessor::processPackage(dmlpackage::CalpontDMLPackage& cpackage)
68 {
69     SUMMARY_INFO("UpdatePackageProcessor::processPackage");
70 
71     std::string err;
72     DMLResult result;
73     result.result = NO_ERROR;
74     result.rowCount = 0;
75     BRM::TxnID txnid;
76     // set-up the transaction
77     txnid.id  = cpackage.get_TxnID();
78     txnid.valid = true;
79     fSessionID = cpackage.get_SessionID();
80     VERBOSE_INFO("Processing Update DML Package...");
81     TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
82     uint64_t uniqueId = 0;
83 
84     //Bug 5070. Added exception handling
85     try
86     {
87         uniqueId = fDbrm->getUnique64();
88     }
89     catch (std::exception& ex)
90     {
91         logging::Message::Args args;
92         logging::Message message(9);
93         args.add(ex.what());
94         message.format(args);
95         result.result = UPDATE_ERROR;
96         result.message = message;
97         fSessionManager.rolledback(txnid);
98         return result;
99     }
100     catch ( ... )
101     {
102         logging::Message::Args args;
103         logging::Message message(9);
104         args.add("Unknown error occured while getting unique number.");
105         message.format(args);
106         result.result = UPDATE_ERROR;
107         result.message = message;
108         fSessionManager.rolledback(txnid);
109         return result;
110     }
111 
112     uint64_t tableLockId = 0;
113     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
114     CalpontSystemCatalog::TableName tableName;
115     // get the table object from the package
116     DMLTable* tablePtr =  cpackage.get_Table();
117     tableName.schema = tablePtr->get_SchemaName();
118     tableName.table = tablePtr->get_TableName();
119     fWEClient->addQueue(uniqueId);
120     execplan::CalpontSystemCatalog::ROPair roPair;
121 
122 //#ifdef PROFILE
123 //	StopWatch timer;
124 //#endif
125     try
126     {
127         LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
128         logging::Message::Args args1;
129         logging::Message msg(1);
130         args1.add("Start SQL statement: ");
131         ostringstream oss;
132         oss << cpackage.get_SQLStatement() << "|" << tablePtr->get_SchemaName() << "|";
133         args1.add(oss.str());
134         msg.format( args1 );
135         logging::Logger logger(logid.fSubsysID);
136         logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
137 
138         VERBOSE_INFO("The table name is:");
139         VERBOSE_INFO(tablePtr->get_TableName());
140 
141         if (0 != tablePtr)
142         {
143             // get the row(s) from the table
144             RowList rows = tablePtr->get_RowList();
145 
146             if (rows.size() == 0)
147             {
148                 SUMMARY_INFO("No row to update!");
149                 fWEClient->removeQueue(uniqueId);
150                 return result;
151             }
152 
153             roPair = systemCatalogPtr->tableRID(tableName);
154             tableLockId = tablelockData->getTablelockId(roPair.objnum); //check whether this table is locked already for this session
155 
156             if (tableLockId == 0)
157             {
158                 //cout << "tablelock is not found in cache, getting from dbrm" << endl;
159                 uint32_t  processID = ::getpid();
160                 int32_t   txnId = txnid.id;
161                 int32_t sessionId = fSessionID;
162                 std::string  processName("DMLProc");
163                 int i = 0;
164                 OamCache* oamcache = OamCache::makeOamCache();
165                 std::vector<int> pmList = oamcache->getModuleIds();
166                 std::vector<uint32_t> pms;
167 
168                 for (unsigned i = 0; i < pmList.size(); i++)
169                 {
170                     pms.push_back((uint32_t)pmList[i]);
171                 }
172 
173                 try
174                 {
175                     tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnId, BRM::LOADING );
176                 }
177                 catch (std::exception&)
178                 {
179                     throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
180                 }
181 
182                 if ( tableLockId  == 0 )
183                 {
184                     int waitPeriod = 10;
185                     int sleepTime = 100; // sleep 100 milliseconds between checks
186                     int numTries = 10;  // try 10 times per second
187                     waitPeriod = Config::getWaitPeriod();
188                     numTries = 	waitPeriod * 10;
189                     struct timespec rm_ts;
190 
191                     rm_ts.tv_sec = sleepTime / 1000;
192                     rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
193 
194                     for (; i < numTries; i++)
195                     {
196 #ifdef _MSC_VER
197                         Sleep(rm_ts.tv_sec * 1000);
198 #else
199                         struct timespec abs_ts;
200 
201                         do
202                         {
203                             abs_ts.tv_sec = rm_ts.tv_sec;
204                             abs_ts.tv_nsec = rm_ts.tv_nsec;
205                         }
206                         while (nanosleep(&abs_ts, &rm_ts) < 0);
207 
208 #endif
209 
210                         try
211                         {
212                             processID = ::getpid();
213                             txnId = txnid.id;
214                             sessionId = fSessionID;
215                             processName = "DMLProc";
216                             tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnId, BRM::LOADING );
217                         }
218                         catch (std::exception&)
219                         {
220                             throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
221                         }
222 
223                         if (tableLockId > 0)
224                             break;
225                     }
226 
227                     if (i >= numTries) //error out
228                     {
229                         result.result = UPDATE_ERROR;
230                         logging::Message::Args args;
231                         string strOp("update");
232                         args.add(strOp);
233                         args.add(processName);
234                         args.add((uint64_t)processID);
235                         args.add(sessionId);
236                         throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
237                     }
238                 }
239             }
240 
241             //cout << " tablelock is obtained with id " << tableLockId << endl;
242             tablelockData->setTablelock(roPair.objnum, tableLockId);
243             //@Bug 4491 start AI sequence for autoincrement column
244             const CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(tableName);
245             CalpontSystemCatalog::RIDList::const_iterator rid_iterator = ridList.begin();
246             CalpontSystemCatalog::ColType colType;
247 
248             while (rid_iterator != ridList.end())
249             {
250                 // If user hit ctrl+c in the mysql console, this will be true.
251                 if (fRollbackPending)
252                 {
253                     result.result = JOB_CANCELED;
254                     break;
255                 }
256 
257                 CalpontSystemCatalog::ROPair roPair = *rid_iterator;
258                 colType = systemCatalogPtr->colType(roPair.objnum);
259 
260                 if (colType.autoincrement)
261                 {
262                     try
263                     {
264                         uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
265                         fDbrm->startAISequence(roPair.objnum, nextVal, colType.colWidth, colType.colDataType);
266                         break; //Only one autoincrement column per table
267                     }
268                     catch (std::exception& ex)
269                     {
270                         result.result = UPDATE_ERROR;
271                         throw std::runtime_error(ex.what());
272                     }
273                 }
274 
275                 ++rid_iterator;
276             }
277 
278             uint64_t  rowsProcessed = 0;
279 
280             if (!fRollbackPending)
281             {
282                 rowsProcessed = fixUpRows(cpackage, result, uniqueId, roPair.objnum);
283             }
284 
285             //@Bug 4994 Cancelled job is not error
286             if (result.result == JOB_CANCELED)
287                 throw std::runtime_error("Query execution was interrupted");
288 
289             if ((result.result != 0) && (result.result != DMLPackageProcessor::IDBRANGE_WARNING))
290                 throw std::runtime_error(result.message.msg());
291 
292             result.rowCount = rowsProcessed;
293 
294             // Log the update statement.
295             LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
296             logging::Message::Args args1;
297             logging::Message msg(1);
298             args1.add("End SQL statement");
299             msg.format( args1 );
300             logging::Logger logger(logid.fSubsysID);
301             logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
302             logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_SQLStatement(), cpackage.get_SchemaName());
303         }
304     }
305     catch (std::exception& ex)
306     {
307         cerr << "UpdatePackageProcessor::processPackage:" << ex.what() << endl;
308 
309         if (result.result == 0)
310         {
311             result.result = UPDATE_ERROR;
312         }
313 
314         result.message = Message(ex.what());
315         result.rowCount = 0;
316         LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
317         logging::Message::Args args1;
318         logging::Message msg(1);
319         args1.add("End SQL statement with error");
320         msg.format( args1 );
321         logging::Logger logger(logid.fSubsysID);
322         logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
323     }
324     catch (...)
325     {
326         cerr << "UpdatePackageProcessor::processPackage: caught unknown exception!" << endl;
327         logging::Message::Args args;
328         logging::Message message(7);
329         args.add("Update Failed: ");
330         args.add("encountered unkown exception");
331         args.add("");
332         args.add("");
333         message.format(args);
334 
335         result.result = UPDATE_ERROR;
336         result.message = message;
337         result.rowCount = 0;
338         LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
339         logging::Message::Args args1;
340         logging::Message msg(1);
341         args1.add("End SQL statement with error");
342         msg.format( args1 );
343         logging::Logger logger(logid.fSubsysID);
344         logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
345     }
346 
347     // timer.finish();
348     //@Bug 1886,2870 Flush VM cache only once per statement. send to all PMs.
349     //WriteEngineWrapper writeEngine;
350     std::map<uint32_t, uint32_t> oids;
351     int rc = 0;
352 
353     if (result.result == NO_ERROR || result.result == IDBRANGE_WARNING)
354     {
355         if ((rc = flushDataFiles(NO_ERROR, oids, uniqueId, txnid, roPair.objnum)) != NO_ERROR)
356         {
357             cerr << "UpdatePackageProcessor::processPackage: write data to disk failed" << endl;
358 
359             if (!fRollbackPending)
360             {
361                 logging::Message::Args args;
362                 logging::Message message(7);
363                 args.add("Update Failed: ");
364                 args.add("error when writing data to disk");
365                 args.add("");
366                 args.add("");
367                 message.format(args);
368 
369                 result.result = UPDATE_ERROR;
370                 result.message = message;
371                 result.rowCount = 0;
372             }
373 
374             rc = endTransaction(uniqueId, txnid, false);
375         }
376         else
377         {
378             if (fRollbackPending)
379                 rc = endTransaction(uniqueId, txnid, false);
380             else
381                 rc = endTransaction(uniqueId, txnid, true);
382 
383             if (( rc != NO_ERROR) && (!fRollbackPending))
384             {
385                 logging::Message::Args args;
386                 logging::Message message(7);
387                 args.add("Update Failed: ");
388                 args.add("error when cleaning up data files");
389                 args.add("");
390                 args.add("");
391                 message.format(args);
392 
393                 result.result = UPDATE_ERROR;
394                 result.message = message;
395                 result.rowCount = 0;
396 
397             }
398         }
399     }
400     else
401     {
402         //@Bug 4563. Always flush. error is already set
403         rc = flushDataFiles(result.result, oids, uniqueId, txnid, roPair.objnum);
404         rc = endTransaction(uniqueId, txnid, false);
405     }
406 
407     //timer.finish();
408 
409     /*	if (result.result != IDBRANGE_WARNING)
410     		flushDataFiles(result.result, oids, uniqueId, txnid);
411     	else
412     		flushDataFiles(0, oids, uniqueId, txnid);
413     */
414     if (fRollbackPending)
415     {
416         result.result = JOB_CANCELED;
417         logging::Message::Args args1;
418         args1.add("Query execution was interrupted");
419         result.message.format(args1);
420     }
421 
422     fWEClient->removeQueue(uniqueId);
423     VERBOSE_INFO("Finished Processing Update DML Package");
424     return result;
425 }
426 
fixUpRows(dmlpackage::CalpontDMLPackage & cpackage,DMLResult & result,const uint64_t uniqueId,const uint32_t tableOid)427 uint64_t UpdatePackageProcessor::fixUpRows(dmlpackage::CalpontDMLPackage& cpackage, DMLResult& result,
428         const uint64_t uniqueId, const uint32_t tableOid)
429 {
430     ByteStream msg, msgBk, emsgBs;
431     RGData rgData;
432     uint32_t qb = 4;
433     msg << qb;
434     boost::scoped_ptr<rowgroup::RowGroup> rowGroup;
435     uint64_t rowsProcessed = 0;
436     uint32_t dbroot = 1;
437     bool metaData = false;
438     oam::OamCache* oamCache = oam::OamCache::makeOamCache();
439     std::vector<int> fPMs = oamCache->getModuleIds();
440     std::map<unsigned, bool> pmState;
441     string emsg;
442     string emsgStr;
443     bool err = false;
444 
445     //boost::scoped_ptr<messageqcpp::MessageQueueClient> fExeMgr;
446     //fExeMgr.reset( new messageqcpp::MessageQueueClient("ExeMgr1"));
447     try
448     {
449 
450         for (unsigned i = 0; i < fPMs.size(); i++)
451         {
452             pmState[fPMs[i]] = true;
453         }
454 
455         //timer.start("ExeMgr");
456         fExeMgr->write(msg);
457         fExeMgr->write(*(cpackage.get_ExecutionPlan()));
458         //cout << "sending to ExeMgr plan with length " << (cpackage.get_ExecutionPlan())->length() << endl;
459         msg.restart();
460         emsgBs.restart();
461         msg = fExeMgr->read(); //error handling
462 
463         if (msg.length() == 4)
464         {
465             msg >> qb;
466 
467             if (qb != 0)
468                 err = true;
469         }
470         else
471         {
472             qb = 999;
473             err = true;
474         }
475 
476         if (err)
477         {
478             logging::Message::Args args;
479             logging::Message message(2);
480             args.add("Update Failed: ExeMgr Error");
481             args.add((int)qb);
482             message.format(args);
483             result.result = UPDATE_ERROR;
484             result.message = message;
485             //timer.finish();
486             return rowsProcessed;
487         }
488 
489         emsgBs = fExeMgr->read();
490 
491         if (emsgBs.length() == 0)
492         {
493             logging::Message::Args args;
494             logging::Message message(2);
495             args.add("Update Failed: ");
496             args.add("Lost connection to ExeMgr");
497             message.format(args);
498             result.result = UPDATE_ERROR;
499             result.message = message;
500             //timer.finish();
501             return rowsProcessed;
502         }
503 
504         emsgBs >> emsgStr;
505 
506         while (true)
507         {
508             if (fRollbackPending)
509             {
510                 break;
511             }
512 
513             msg.restart();
514             msgBk.restart();
515             msg = fExeMgr->read();
516             msgBk = msg;
517 
518             if ( msg.length() == 0 )
519             {
520                 cerr << "UpdatePackageProcessor::processPackage::fixupRows" << endl;
521                 logging::Message::Args args;
522                 logging::Message message(2);
523                 args.add("Update Failed: ");
524                 args.add("Lost connection to ExeMgr");
525                 message.format(args);
526                 result.result = UPDATE_ERROR;
527                 result.message = message;
528                 //timer.finish();
529                 //return rowsProcessed;
530                 break;
531             }
532             else
533             {
534                 if (rowGroup.get() == NULL)
535                 {
536                     //This is mete data, need to send all PMs.
537                     metaData = true;
538                     //cout << "sending meta data" << endl;
539                     //timer.start("Meta");
540                     err = processRowgroup(msgBk, result, uniqueId, cpackage, pmState, metaData, dbroot);
541                     rowGroup.reset(new rowgroup::RowGroup());
542                     rowGroup->deserialize(msg);
543                     qb = 100;
544                     msg.restart();
545                     msg << qb;
546                     fExeMgr->write(msg);
547                     metaData = false;
548                     //timer.stop("Meta");
549                     continue;
550                 }
551 
552                 rgData.deserialize(msg, true);
553                 rowGroup->setData(&rgData);
554                 //rowGroup->setData(const_cast<uint8_t*>(msg.buf()));
555                 err = (rowGroup->getStatus() != 0);
556 
557                 if (err)
558                 {
559                     //msgBk.advance(rowGroup->getDataSize());
560                     string errorMsg;
561                     msg >> errorMsg;
562                     logging::Message::Args args;
563                     logging::Message message(2);
564                     args.add("Update Failed: ");
565                     args.add(errorMsg);
566                     message.format(args);
567                     result.result = UPDATE_ERROR;
568                     result.message = message;
569                     DMLResult tmpResult;
570                     receiveAll( tmpResult, uniqueId, fPMs, pmState, tableOid);
571                     /*					qb = 100;
572                     					//@Bug 4358 get rid of broken pipe error.
573                     					msg.restart();
574                     					msg << qb;
575                     					fExeMgr->write(msg);
576                     */					//timer.finish();
577                     //return rowsProcessed;
578                     //err = true;
579                     break;
580                 }
581 
582                 if (rowGroup->getRGData() == NULL)
583                 {
584                     msg.restart();
585                 }
586 
587                 if (rowGroup->getRowCount() == 0)  //done fetching
588                 {
589                     //timer.finish();
590                     //need to receive all response
591                     err = receiveAll( result, uniqueId, fPMs, pmState, tableOid);
592                     //return rowsProcessed;
593                     break;
594                 }
595 
596                 if (rowGroup->getBaseRid() == (uint64_t) (-1))
597                 {
598                     continue;  // @bug4247, not valid row ids, may from small side outer
599                 }
600 
601                 dbroot = rowGroup->getDBRoot();
602                 //cout << "dbroot in the rowgroup is " << dbroot << endl;
603                 //timer.start("processRowgroup");
604                 err = processRowgroup(msgBk, result, uniqueId, cpackage, pmState, metaData, dbroot);
605 
606                 //timer.stop("processRowgroup");
607                 if (err)
608                 {
609                     //timer.finish();
610                     LoggingID logid( DMLLoggingId, fSessionID, cpackage.get_TxnID());
611                     logging::Message::Args args1;
612                     logging::Message msg1(1);
613                     args1.add("SQL statement erroring out, need to receive all messages from WES");
614                     msg1.format( args1 );
615                     logging::Logger logger(logid.fSubsysID);
616                     logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
617                     DMLResult tmpResult;
618                     receiveAll( tmpResult, uniqueId, fPMs, pmState, tableOid);
619                     logging::Message::Args args2;
620                     logging::Message msg2(1);
621                     args2.add("SQL statement erroring out, received all messages from WES");
622                     msg2.format( args2 );
623                     logger.logMessage(LOG_TYPE_DEBUG, msg2, logid);
624                     //@Bug 4358 get rid of broken pipe error.
625                     /*					msg.restart();
626                     					msg << qb;
627                     					fExeMgr->write(msg);
628                     					return rowsProcessed;
629                     */
630                     //err = true;
631                     break;
632                 }
633 
634                 rowsProcessed += rowGroup->getRowCount();
635             }
636         }
637 
638         if (fRollbackPending)
639         {
640             err = true;
641             // Response to user
642             cerr << "UpdatePackageProcessor::processPackage::fixupRows Rollback Pending" << endl;
643             //@Bug 4994 Cancelled job is not error
644             result.result = JOB_CANCELED;
645 
646             // Log
647             LoggingID logid( DMLLoggingId, fSessionID, cpackage.get_TxnID());
648             logging::Message::Args args1;
649             logging::Message msg1(1);
650             args1.add("SQL statement canceled by user");
651             msg1.format( args1 );
652             logging::Logger logger(logid.fSubsysID);
653             logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
654 
655             // Clean out the pipe;
656             DMLResult tmpResult;
657             receiveAll( tmpResult, uniqueId, fPMs, pmState, tableOid);
658         }
659 
660         // get stats from ExeMgr
661         if (!err)
662         {
663             qb = 3;
664             msg.restart();
665             msg << qb;
666             fExeMgr->write(msg);
667             msg = fExeMgr->read();
668             msg >> result.queryStats;
669             msg >> result.extendedStats;
670             msg >> result.miniStats;
671             result.stats.unserialize(msg);
672         }
673 
674         //@Bug 4358 get rid of broken pipe error by sending a dummy bs.
675         if (err)
676         {
677             msg.restart();
678             msg << qb;
679             fExeMgr->write(msg);
680         }
681 
682         return rowsProcessed;
683         //stats.insert();
684     }
685     catch (runtime_error& ex)
686     {
687         cerr << "UpdatePackageProcessor::processPackage::fixupRows" << ex.what() << endl;
688         logging::Message::Args args;
689         logging::Message message(2);
690         args.add("Update Failed: ");
691         args.add(ex.what());
692         message.format(args);
693         result.result = UPDATE_ERROR;
694         result.message = message;
695         qb = 0;
696         msg.restart();
697         msg << qb;
698         fExeMgr->write(msg);
699         return rowsProcessed;
700     }
701     catch ( ... )
702     {
703         cerr << "UpdatePackageProcessor::processPackage::fixupRows" << endl;
704         logging::Message::Args args;
705         logging::Message message(2);
706         args.add("Update Failed: ");
707         args.add("Unknown error caught when communicating with ExeMgr");
708         message.format(args);
709         result.result = UPDATE_ERROR;
710         result.message = message;
711         qb = 0;
712         msg.restart();
713         msg << qb;
714         fExeMgr->write(msg);
715         return rowsProcessed;
716     }
717 
718     //timer.finish();
719     return rowsProcessed;
720 }
721 
processRowgroup(ByteStream & aRowGroup,DMLResult & result,const uint64_t uniqueId,dmlpackage::CalpontDMLPackage & cpackage,std::map<unsigned,bool> & pmState,bool isMeta,uint32_t dbroot)722 bool UpdatePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& result, const uint64_t uniqueId,
723         dmlpackage::CalpontDMLPackage& cpackage, std::map<unsigned, bool>& pmState,  bool isMeta, uint32_t dbroot)
724 {
725     bool rc = false;
726     //cout << "Get dbroot " << dbroot << endl;
727     uint32_t pmNum = (*fDbRootPMMap)[dbroot];
728 
729     ByteStream bytestream;
730     bytestream << (uint8_t)WE_SVR_UPDATE;
731     bytestream << uniqueId;
732     bytestream << pmNum;
733     bytestream << (uint32_t)cpackage.get_TxnID();
734     bytestream += aRowGroup;
735     //cout << "sending rows to pm " << pmNum << " with msg length " << bytestream.length() << endl;
736     uint32_t msgRecived = 0;
737     boost::shared_ptr<messageqcpp::ByteStream> bsIn;
738     bsIn.reset(new ByteStream());
739     ByteStream::byte tmp8;
740     string errorMsg;
741     uint32_t tmp32;
742     uint64_t blocksChanged = 0;
743 
744     if (isMeta) //send to all PMs
745     {
746         cpackage.write(bytestream);
747         fWEClient->write_to_all(bytestream);
748 
749         while (1)
750         {
751             if (msgRecived == fWEClient->getPmCount())
752                 break;
753 
754             fWEClient->read(uniqueId, bsIn);
755 
756             if ( bsIn->length() == 0 ) //read error
757             {
758                 rc = true;
759                 break;
760             }
761             else
762             {
763                 *bsIn >> tmp8;
764 
765                 if (tmp8 > 0)
766                 {
767                     *bsIn >> errorMsg;
768                     rc = true;
769                     logging::Message::Args args;
770                     logging::Message message(2);
771                     args.add("Update Failed: ");
772                     args.add(errorMsg);
773                     message.format(args);
774                     result.result = UPDATE_ERROR;
775                     result.message = message;
776                     break;
777                 }
778                 else
779                     msgRecived++;
780             }
781         }
782 
783         return rc;
784     }
785 
786     if (pmState[pmNum])
787     {
788         try
789         {
790             //cout << "sending rows to pm " << pmNum << " with msg length " << bytestream.length() << endl;
791             fWEClient->write(bytestream, (uint32_t)pmNum);
792             pmState[pmNum] = false;
793         }
794         catch (runtime_error& ex) //write error
795         {
796             rc = true;
797             logging::Message::Args args;
798             logging::Message message(2);
799             args.add("Update Failed: ");
800             args.add(ex.what());
801             message.format(args);
802             result.result = UPDATE_ERROR;
803             result.message = message;
804         }
805         catch (...)
806         {
807             rc = true;
808             logging::Message::Args args;
809             logging::Message message(2);
810             args.add("Update Failed: ");
811             args.add("Unknown error caught when communicating with WES");
812             message.format(args);
813             result.result = UPDATE_ERROR;
814             result.message = message;
815         }
816     }
817     else
818     {
819         while (1)
820         {
821             bsIn.reset(new ByteStream());
822 
823             try
824             {
825                 fWEClient->read(uniqueId, bsIn);
826 
827                 if ( bsIn->length() == 0 ) //read error
828                 {
829                     rc = true;
830                     errorMsg = "Lost connection to Write Engine Server while updating";
831                     throw std::runtime_error(errorMsg);
832                 }
833                 else
834                 {
835                     *bsIn >> tmp8;
836                     *bsIn >> errorMsg;
837 
838                     if (tmp8 == IDBRANGE_WARNING)
839                     {
840                         result.result = IDBRANGE_WARNING;
841                         logging::Message::Args args;
842                         logging::Message message(2);
843                         args.add(errorMsg);
844                         message.format(args);
845                         result.message = message;
846                     }
847                     else if (tmp8 > 0)
848                     {
849                         result.stats.fErrorNo = tmp8;
850                         rc = (tmp8 != 0);
851                     }
852 
853                     *bsIn >> tmp32;
854                     //cout << "Received response from pm " << tmp32 << endl;
855                     pmState[tmp32] = true;
856                     *bsIn >> blocksChanged;
857                     result.stats.fBlocksChanged += blocksChanged;
858 
859                     if (rc != 0)
860                     {
861                         throw std::runtime_error(errorMsg);
862                     }
863 
864                     if (tmp32 == (uint32_t)pmNum)
865                     {
866                         //cout << "sending rows to pm " << pmNum << " with msg length " << bytestream.length() << endl;
867                         fWEClient->write(bytestream, (uint32_t)pmNum);
868                         pmState[pmNum] = false;
869                         break;
870                     }
871                 }
872             }
873             catch (runtime_error& ex) //write error
874             {
875                 rc = true;
876                 logging::Message::Args args;
877                 logging::Message message(2);
878                 args.add("Update Failed: ");
879                 args.add(ex.what());
880                 message.format(args);
881                 result.result = UPDATE_ERROR;
882                 result.message = message;
883                 break;
884             }
885             catch (...)
886             {
887                 rc = true;
888                 logging::Message::Args args;
889                 logging::Message message(2);
890                 args.add("Update Failed: ");
891                 args.add("Unknown error caught when communicating with WES");
892                 message.format(args);
893                 result.result = UPDATE_ERROR;
894                 result.message = message;
895                 break;
896             }
897         }
898     }
899 
900     return rc;
901 }
902 
receiveAll(DMLResult & result,const uint64_t uniqueId,std::vector<int> & fPMs,std::map<unsigned,bool> & pmState,const uint32_t tableOid)903 bool UpdatePackageProcessor::receiveAll(DMLResult& result, const uint64_t uniqueId, std::vector<int>& fPMs,
904                                         std::map<unsigned, bool>& pmState, const uint32_t tableOid)
905 {
906     //check how many message we need to receive
907     uint32_t messagesNotReceived = 0;
908     bool err = false;
909 
910     for (unsigned i = 0; i < fPMs.size(); i++)
911     {
912         if (!pmState[fPMs[i]])
913             messagesNotReceived++;
914     }
915 
916     boost::shared_ptr<messageqcpp::ByteStream> bsIn;
917     ByteStream::byte tmp8;
918     string errorMsg;
919     uint32_t msgReceived = 0;
920 
921     if (messagesNotReceived > 0)
922     {
923         LoggingID logid( DMLLoggingId, fSessionID, fSessionID);
924 
925         if ( messagesNotReceived > fWEClient->getPmCount())
926         {
927             logging::Message::Args args1;
928             logging::Message msg(1);
929             args1.add("Update outstanding messages exceed PM count , need to receive messages:PMcount = ");
930             ostringstream oss;
931             oss << messagesNotReceived << ":" << fWEClient->getPmCount();
932             args1.add(oss.str());
933             msg.format( args1 );
934             logging::Logger logger(logid.fSubsysID);
935             logger.logMessage(LOG_TYPE_ERROR, msg, logid);
936             err = true;
937             logging::Message::Args args;
938             logging::Message message(2);
939             args.add("Update Failed: ");
940             args.add("One of WriteEngineServer went away.");
941             message.format(args);
942             result.result = UPDATE_ERROR;
943             result.message = message;
944             return err;
945         }
946 
947         bsIn.reset(new ByteStream());
948         ByteStream::quadbyte tmp32;
949         uint64_t blocksChanged = 0;
950 
951         while (1)
952         {
953             if (msgReceived == messagesNotReceived)
954                 break;
955 
956             bsIn.reset(new ByteStream());
957 
958             try
959             {
960                 fWEClient->read(uniqueId, bsIn);
961 
962                 if ( bsIn->length() == 0 ) //read error
963                 {
964                     err = true;
965                     errorMsg = "Lost connection to Write Engine Server while updating";
966                     throw std::runtime_error(errorMsg);
967                 }
968                 else
969                 {
970                     *bsIn >> tmp8;
971                     *bsIn >> errorMsg;
972 
973                     if (tmp8 == IDBRANGE_WARNING)
974                     {
975                         result.result = IDBRANGE_WARNING;
976                         logging::Message::Args args;
977                         logging::Message message(2);
978                         args.add(errorMsg);
979                         message.format(args);
980                         result.message = message;
981                     }
982                     else
983                     {
984                         result.stats.fErrorNo = tmp8;
985                         err = (tmp8 != 0);
986                     }
987 
988                     *bsIn >> tmp32;
989                     *bsIn >> blocksChanged;
990                     //cout << "Received response from pm " << tmp32 << endl;
991                     pmState[tmp32] = true;
992 
993                     if (err)
994                     {
995                         throw std::runtime_error(errorMsg);
996                     }
997 
998                     msgReceived++;
999                     result.stats.fBlocksChanged += blocksChanged;
1000                 }
1001             }
1002             catch (runtime_error& ex) //write error
1003             {
1004                 err = true;
1005                 logging::Message::Args args;
1006                 logging::Message message(2);
1007                 args.add("Update Failed: ");
1008                 args.add(ex.what());
1009                 message.format(args);
1010                 result.result = UPDATE_ERROR;
1011                 result.message = message;
1012                 break;
1013             }
1014             catch (...)
1015             {
1016                 err = true;
1017                 logging::Message::Args args;
1018                 logging::Message message(2);
1019                 args.add("Update Failed: ");
1020                 args.add("Unknown error caught when communicating with WES");
1021                 message.format(args);
1022                 result.result = UPDATE_ERROR;
1023                 result.message = message;
1024                 break;
1025             }
1026         }
1027     }
1028 
1029     return err;
1030 }
1031 } // namespace dmlpackageprocessor
1032