1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: updatepackageprocessor.cpp 9673 2013-07-09 15:59:49Z chao $
20
21 #include <iostream>
22 #include <fstream>
23 #include <ctype.h>
24 #include <string>
25 //#define NDEBUG
26 #include <cassert>
27 #include <map>
28 #include <boost/scoped_ptr.hpp>
29 using namespace std;
30 #include "updatepackageprocessor.h"
31 #include "writeengine.h"
32 #include "joblistfactory.h"
33 #include "messagelog.h"
34 #include "simplecolumn.h"
35 #include "sqllogger.h"
36 #include "stopwatch.h"
37 #include "dbrm.h"
38 #include "idberrorinfo.h"
39 #include "errorids.h"
40 #include "rowgroup.h"
41 #include "bytestream.h"
42 #include "calpontselectexecutionplan.h"
43 #include "autoincrementdata.h"
44 #include "columnresult.h"
45 #include "we_messages.h"
46 #include "tablelockdata.h"
47 #include "oamcache.h"
48
49 using namespace WriteEngine;
50 using namespace dmlpackage;
51 using namespace execplan;
52 using namespace logging;
53 using namespace dataconvert;
54 using namespace joblist;
55 using namespace rowgroup;
56 using namespace messageqcpp;
57 using namespace BRM;
58 using namespace oam;
59
60 //#define PROFILE 1
61 namespace dmlpackageprocessor
62 {
63
64
65 //StopWatch timer;
66 DMLPackageProcessor::DMLResult
processPackage(dmlpackage::CalpontDMLPackage & cpackage)67 UpdatePackageProcessor::processPackage(dmlpackage::CalpontDMLPackage& cpackage)
68 {
69 SUMMARY_INFO("UpdatePackageProcessor::processPackage");
70
71 std::string err;
72 DMLResult result;
73 result.result = NO_ERROR;
74 result.rowCount = 0;
75 BRM::TxnID txnid;
76 // set-up the transaction
77 txnid.id = cpackage.get_TxnID();
78 txnid.valid = true;
79 fSessionID = cpackage.get_SessionID();
80 VERBOSE_INFO("Processing Update DML Package...");
81 TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
82 uint64_t uniqueId = 0;
83
84 //Bug 5070. Added exception handling
85 try
86 {
87 uniqueId = fDbrm->getUnique64();
88 }
89 catch (std::exception& ex)
90 {
91 logging::Message::Args args;
92 logging::Message message(9);
93 args.add(ex.what());
94 message.format(args);
95 result.result = UPDATE_ERROR;
96 result.message = message;
97 fSessionManager.rolledback(txnid);
98 return result;
99 }
100 catch ( ... )
101 {
102 logging::Message::Args args;
103 logging::Message message(9);
104 args.add("Unknown error occured while getting unique number.");
105 message.format(args);
106 result.result = UPDATE_ERROR;
107 result.message = message;
108 fSessionManager.rolledback(txnid);
109 return result;
110 }
111
112 uint64_t tableLockId = 0;
113 boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
114 CalpontSystemCatalog::TableName tableName;
115 // get the table object from the package
116 DMLTable* tablePtr = cpackage.get_Table();
117 tableName.schema = tablePtr->get_SchemaName();
118 tableName.table = tablePtr->get_TableName();
119 fWEClient->addQueue(uniqueId);
120 execplan::CalpontSystemCatalog::ROPair roPair;
121
122 //#ifdef PROFILE
123 // StopWatch timer;
124 //#endif
125 try
126 {
127 LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
128 logging::Message::Args args1;
129 logging::Message msg(1);
130 args1.add("Start SQL statement: ");
131 ostringstream oss;
132 oss << cpackage.get_SQLStatement() << "|" << tablePtr->get_SchemaName() << "|";
133 args1.add(oss.str());
134 msg.format( args1 );
135 logging::Logger logger(logid.fSubsysID);
136 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
137
138 VERBOSE_INFO("The table name is:");
139 VERBOSE_INFO(tablePtr->get_TableName());
140
141 if (0 != tablePtr)
142 {
143 // get the row(s) from the table
144 RowList rows = tablePtr->get_RowList();
145
146 if (rows.size() == 0)
147 {
148 SUMMARY_INFO("No row to update!");
149 fWEClient->removeQueue(uniqueId);
150 return result;
151 }
152
153 roPair = systemCatalogPtr->tableRID(tableName);
154 tableLockId = tablelockData->getTablelockId(roPair.objnum); //check whether this table is locked already for this session
155
156 if (tableLockId == 0)
157 {
158 //cout << "tablelock is not found in cache, getting from dbrm" << endl;
159 uint32_t processID = ::getpid();
160 int32_t txnId = txnid.id;
161 int32_t sessionId = fSessionID;
162 std::string processName("DMLProc");
163 int i = 0;
164 OamCache* oamcache = OamCache::makeOamCache();
165 std::vector<int> pmList = oamcache->getModuleIds();
166 std::vector<uint32_t> pms;
167
168 for (unsigned i = 0; i < pmList.size(); i++)
169 {
170 pms.push_back((uint32_t)pmList[i]);
171 }
172
173 try
174 {
175 tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnId, BRM::LOADING );
176 }
177 catch (std::exception&)
178 {
179 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
180 }
181
182 if ( tableLockId == 0 )
183 {
184 int waitPeriod = 10;
185 int sleepTime = 100; // sleep 100 milliseconds between checks
186 int numTries = 10; // try 10 times per second
187 waitPeriod = Config::getWaitPeriod();
188 numTries = waitPeriod * 10;
189 struct timespec rm_ts;
190
191 rm_ts.tv_sec = sleepTime / 1000;
192 rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
193
194 for (; i < numTries; i++)
195 {
196 #ifdef _MSC_VER
197 Sleep(rm_ts.tv_sec * 1000);
198 #else
199 struct timespec abs_ts;
200
201 do
202 {
203 abs_ts.tv_sec = rm_ts.tv_sec;
204 abs_ts.tv_nsec = rm_ts.tv_nsec;
205 }
206 while (nanosleep(&abs_ts, &rm_ts) < 0);
207
208 #endif
209
210 try
211 {
212 processID = ::getpid();
213 txnId = txnid.id;
214 sessionId = fSessionID;
215 processName = "DMLProc";
216 tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnId, BRM::LOADING );
217 }
218 catch (std::exception&)
219 {
220 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
221 }
222
223 if (tableLockId > 0)
224 break;
225 }
226
227 if (i >= numTries) //error out
228 {
229 result.result = UPDATE_ERROR;
230 logging::Message::Args args;
231 string strOp("update");
232 args.add(strOp);
233 args.add(processName);
234 args.add((uint64_t)processID);
235 args.add(sessionId);
236 throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
237 }
238 }
239 }
240
241 //cout << " tablelock is obtained with id " << tableLockId << endl;
242 tablelockData->setTablelock(roPair.objnum, tableLockId);
243 //@Bug 4491 start AI sequence for autoincrement column
244 const CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(tableName);
245 CalpontSystemCatalog::RIDList::const_iterator rid_iterator = ridList.begin();
246 CalpontSystemCatalog::ColType colType;
247
248 while (rid_iterator != ridList.end())
249 {
250 // If user hit ctrl+c in the mysql console, this will be true.
251 if (fRollbackPending)
252 {
253 result.result = JOB_CANCELED;
254 break;
255 }
256
257 CalpontSystemCatalog::ROPair roPair = *rid_iterator;
258 colType = systemCatalogPtr->colType(roPair.objnum);
259
260 if (colType.autoincrement)
261 {
262 try
263 {
264 uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
265 fDbrm->startAISequence(roPair.objnum, nextVal, colType.colWidth, colType.colDataType);
266 break; //Only one autoincrement column per table
267 }
268 catch (std::exception& ex)
269 {
270 result.result = UPDATE_ERROR;
271 throw std::runtime_error(ex.what());
272 }
273 }
274
275 ++rid_iterator;
276 }
277
278 uint64_t rowsProcessed = 0;
279
280 if (!fRollbackPending)
281 {
282 rowsProcessed = fixUpRows(cpackage, result, uniqueId, roPair.objnum);
283 }
284
285 //@Bug 4994 Cancelled job is not error
286 if (result.result == JOB_CANCELED)
287 throw std::runtime_error("Query execution was interrupted");
288
289 if ((result.result != 0) && (result.result != DMLPackageProcessor::IDBRANGE_WARNING))
290 throw std::runtime_error(result.message.msg());
291
292 result.rowCount = rowsProcessed;
293
294 // Log the update statement.
295 LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
296 logging::Message::Args args1;
297 logging::Message msg(1);
298 args1.add("End SQL statement");
299 msg.format( args1 );
300 logging::Logger logger(logid.fSubsysID);
301 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
302 logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_SQLStatement(), cpackage.get_SchemaName());
303 }
304 }
305 catch (std::exception& ex)
306 {
307 cerr << "UpdatePackageProcessor::processPackage:" << ex.what() << endl;
308
309 if (result.result == 0)
310 {
311 result.result = UPDATE_ERROR;
312 }
313
314 result.message = Message(ex.what());
315 result.rowCount = 0;
316 LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
317 logging::Message::Args args1;
318 logging::Message msg(1);
319 args1.add("End SQL statement with error");
320 msg.format( args1 );
321 logging::Logger logger(logid.fSubsysID);
322 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
323 }
324 catch (...)
325 {
326 cerr << "UpdatePackageProcessor::processPackage: caught unknown exception!" << endl;
327 logging::Message::Args args;
328 logging::Message message(7);
329 args.add("Update Failed: ");
330 args.add("encountered unkown exception");
331 args.add("");
332 args.add("");
333 message.format(args);
334
335 result.result = UPDATE_ERROR;
336 result.message = message;
337 result.rowCount = 0;
338 LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
339 logging::Message::Args args1;
340 logging::Message msg(1);
341 args1.add("End SQL statement with error");
342 msg.format( args1 );
343 logging::Logger logger(logid.fSubsysID);
344 logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
345 }
346
347 // timer.finish();
348 //@Bug 1886,2870 Flush VM cache only once per statement. send to all PMs.
349 //WriteEngineWrapper writeEngine;
350 std::map<uint32_t, uint32_t> oids;
351 int rc = 0;
352
353 if (result.result == NO_ERROR || result.result == IDBRANGE_WARNING)
354 {
355 if ((rc = flushDataFiles(NO_ERROR, oids, uniqueId, txnid, roPair.objnum)) != NO_ERROR)
356 {
357 cerr << "UpdatePackageProcessor::processPackage: write data to disk failed" << endl;
358
359 if (!fRollbackPending)
360 {
361 logging::Message::Args args;
362 logging::Message message(7);
363 args.add("Update Failed: ");
364 args.add("error when writing data to disk");
365 args.add("");
366 args.add("");
367 message.format(args);
368
369 result.result = UPDATE_ERROR;
370 result.message = message;
371 result.rowCount = 0;
372 }
373
374 rc = endTransaction(uniqueId, txnid, false);
375 }
376 else
377 {
378 if (fRollbackPending)
379 rc = endTransaction(uniqueId, txnid, false);
380 else
381 rc = endTransaction(uniqueId, txnid, true);
382
383 if (( rc != NO_ERROR) && (!fRollbackPending))
384 {
385 logging::Message::Args args;
386 logging::Message message(7);
387 args.add("Update Failed: ");
388 args.add("error when cleaning up data files");
389 args.add("");
390 args.add("");
391 message.format(args);
392
393 result.result = UPDATE_ERROR;
394 result.message = message;
395 result.rowCount = 0;
396
397 }
398 }
399 }
400 else
401 {
402 //@Bug 4563. Always flush. error is already set
403 rc = flushDataFiles(result.result, oids, uniqueId, txnid, roPair.objnum);
404 rc = endTransaction(uniqueId, txnid, false);
405 }
406
407 //timer.finish();
408
409 /* if (result.result != IDBRANGE_WARNING)
410 flushDataFiles(result.result, oids, uniqueId, txnid);
411 else
412 flushDataFiles(0, oids, uniqueId, txnid);
413 */
414 if (fRollbackPending)
415 {
416 result.result = JOB_CANCELED;
417 logging::Message::Args args1;
418 args1.add("Query execution was interrupted");
419 result.message.format(args1);
420 }
421
422 fWEClient->removeQueue(uniqueId);
423 VERBOSE_INFO("Finished Processing Update DML Package");
424 return result;
425 }
426
fixUpRows(dmlpackage::CalpontDMLPackage & cpackage,DMLResult & result,const uint64_t uniqueId,const uint32_t tableOid)427 uint64_t UpdatePackageProcessor::fixUpRows(dmlpackage::CalpontDMLPackage& cpackage, DMLResult& result,
428 const uint64_t uniqueId, const uint32_t tableOid)
429 {
430 ByteStream msg, msgBk, emsgBs;
431 RGData rgData;
432 uint32_t qb = 4;
433 msg << qb;
434 boost::scoped_ptr<rowgroup::RowGroup> rowGroup;
435 uint64_t rowsProcessed = 0;
436 uint32_t dbroot = 1;
437 bool metaData = false;
438 oam::OamCache* oamCache = oam::OamCache::makeOamCache();
439 std::vector<int> fPMs = oamCache->getModuleIds();
440 std::map<unsigned, bool> pmState;
441 string emsg;
442 string emsgStr;
443 bool err = false;
444
445 //boost::scoped_ptr<messageqcpp::MessageQueueClient> fExeMgr;
446 //fExeMgr.reset( new messageqcpp::MessageQueueClient("ExeMgr1"));
447 try
448 {
449
450 for (unsigned i = 0; i < fPMs.size(); i++)
451 {
452 pmState[fPMs[i]] = true;
453 }
454
455 //timer.start("ExeMgr");
456 fExeMgr->write(msg);
457 fExeMgr->write(*(cpackage.get_ExecutionPlan()));
458 //cout << "sending to ExeMgr plan with length " << (cpackage.get_ExecutionPlan())->length() << endl;
459 msg.restart();
460 emsgBs.restart();
461 msg = fExeMgr->read(); //error handling
462
463 if (msg.length() == 4)
464 {
465 msg >> qb;
466
467 if (qb != 0)
468 err = true;
469 }
470 else
471 {
472 qb = 999;
473 err = true;
474 }
475
476 if (err)
477 {
478 logging::Message::Args args;
479 logging::Message message(2);
480 args.add("Update Failed: ExeMgr Error");
481 args.add((int)qb);
482 message.format(args);
483 result.result = UPDATE_ERROR;
484 result.message = message;
485 //timer.finish();
486 return rowsProcessed;
487 }
488
489 emsgBs = fExeMgr->read();
490
491 if (emsgBs.length() == 0)
492 {
493 logging::Message::Args args;
494 logging::Message message(2);
495 args.add("Update Failed: ");
496 args.add("Lost connection to ExeMgr");
497 message.format(args);
498 result.result = UPDATE_ERROR;
499 result.message = message;
500 //timer.finish();
501 return rowsProcessed;
502 }
503
504 emsgBs >> emsgStr;
505
506 while (true)
507 {
508 if (fRollbackPending)
509 {
510 break;
511 }
512
513 msg.restart();
514 msgBk.restart();
515 msg = fExeMgr->read();
516 msgBk = msg;
517
518 if ( msg.length() == 0 )
519 {
520 cerr << "UpdatePackageProcessor::processPackage::fixupRows" << endl;
521 logging::Message::Args args;
522 logging::Message message(2);
523 args.add("Update Failed: ");
524 args.add("Lost connection to ExeMgr");
525 message.format(args);
526 result.result = UPDATE_ERROR;
527 result.message = message;
528 //timer.finish();
529 //return rowsProcessed;
530 break;
531 }
532 else
533 {
534 if (rowGroup.get() == NULL)
535 {
536 //This is mete data, need to send all PMs.
537 metaData = true;
538 //cout << "sending meta data" << endl;
539 //timer.start("Meta");
540 err = processRowgroup(msgBk, result, uniqueId, cpackage, pmState, metaData, dbroot);
541 rowGroup.reset(new rowgroup::RowGroup());
542 rowGroup->deserialize(msg);
543 qb = 100;
544 msg.restart();
545 msg << qb;
546 fExeMgr->write(msg);
547 metaData = false;
548 //timer.stop("Meta");
549 continue;
550 }
551
552 rgData.deserialize(msg, true);
553 rowGroup->setData(&rgData);
554 //rowGroup->setData(const_cast<uint8_t*>(msg.buf()));
555 err = (rowGroup->getStatus() != 0);
556
557 if (err)
558 {
559 //msgBk.advance(rowGroup->getDataSize());
560 string errorMsg;
561 msg >> errorMsg;
562 logging::Message::Args args;
563 logging::Message message(2);
564 args.add("Update Failed: ");
565 args.add(errorMsg);
566 message.format(args);
567 result.result = UPDATE_ERROR;
568 result.message = message;
569 DMLResult tmpResult;
570 receiveAll( tmpResult, uniqueId, fPMs, pmState, tableOid);
571 /* qb = 100;
572 //@Bug 4358 get rid of broken pipe error.
573 msg.restart();
574 msg << qb;
575 fExeMgr->write(msg);
576 */ //timer.finish();
577 //return rowsProcessed;
578 //err = true;
579 break;
580 }
581
582 if (rowGroup->getRGData() == NULL)
583 {
584 msg.restart();
585 }
586
587 if (rowGroup->getRowCount() == 0) //done fetching
588 {
589 //timer.finish();
590 //need to receive all response
591 err = receiveAll( result, uniqueId, fPMs, pmState, tableOid);
592 //return rowsProcessed;
593 break;
594 }
595
596 if (rowGroup->getBaseRid() == (uint64_t) (-1))
597 {
598 continue; // @bug4247, not valid row ids, may from small side outer
599 }
600
601 dbroot = rowGroup->getDBRoot();
602 //cout << "dbroot in the rowgroup is " << dbroot << endl;
603 //timer.start("processRowgroup");
604 err = processRowgroup(msgBk, result, uniqueId, cpackage, pmState, metaData, dbroot);
605
606 //timer.stop("processRowgroup");
607 if (err)
608 {
609 //timer.finish();
610 LoggingID logid( DMLLoggingId, fSessionID, cpackage.get_TxnID());
611 logging::Message::Args args1;
612 logging::Message msg1(1);
613 args1.add("SQL statement erroring out, need to receive all messages from WES");
614 msg1.format( args1 );
615 logging::Logger logger(logid.fSubsysID);
616 logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
617 DMLResult tmpResult;
618 receiveAll( tmpResult, uniqueId, fPMs, pmState, tableOid);
619 logging::Message::Args args2;
620 logging::Message msg2(1);
621 args2.add("SQL statement erroring out, received all messages from WES");
622 msg2.format( args2 );
623 logger.logMessage(LOG_TYPE_DEBUG, msg2, logid);
624 //@Bug 4358 get rid of broken pipe error.
625 /* msg.restart();
626 msg << qb;
627 fExeMgr->write(msg);
628 return rowsProcessed;
629 */
630 //err = true;
631 break;
632 }
633
634 rowsProcessed += rowGroup->getRowCount();
635 }
636 }
637
638 if (fRollbackPending)
639 {
640 err = true;
641 // Response to user
642 cerr << "UpdatePackageProcessor::processPackage::fixupRows Rollback Pending" << endl;
643 //@Bug 4994 Cancelled job is not error
644 result.result = JOB_CANCELED;
645
646 // Log
647 LoggingID logid( DMLLoggingId, fSessionID, cpackage.get_TxnID());
648 logging::Message::Args args1;
649 logging::Message msg1(1);
650 args1.add("SQL statement canceled by user");
651 msg1.format( args1 );
652 logging::Logger logger(logid.fSubsysID);
653 logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
654
655 // Clean out the pipe;
656 DMLResult tmpResult;
657 receiveAll( tmpResult, uniqueId, fPMs, pmState, tableOid);
658 }
659
660 // get stats from ExeMgr
661 if (!err)
662 {
663 qb = 3;
664 msg.restart();
665 msg << qb;
666 fExeMgr->write(msg);
667 msg = fExeMgr->read();
668 msg >> result.queryStats;
669 msg >> result.extendedStats;
670 msg >> result.miniStats;
671 result.stats.unserialize(msg);
672 }
673
674 //@Bug 4358 get rid of broken pipe error by sending a dummy bs.
675 if (err)
676 {
677 msg.restart();
678 msg << qb;
679 fExeMgr->write(msg);
680 }
681
682 return rowsProcessed;
683 //stats.insert();
684 }
685 catch (runtime_error& ex)
686 {
687 cerr << "UpdatePackageProcessor::processPackage::fixupRows" << ex.what() << endl;
688 logging::Message::Args args;
689 logging::Message message(2);
690 args.add("Update Failed: ");
691 args.add(ex.what());
692 message.format(args);
693 result.result = UPDATE_ERROR;
694 result.message = message;
695 qb = 0;
696 msg.restart();
697 msg << qb;
698 fExeMgr->write(msg);
699 return rowsProcessed;
700 }
701 catch ( ... )
702 {
703 cerr << "UpdatePackageProcessor::processPackage::fixupRows" << endl;
704 logging::Message::Args args;
705 logging::Message message(2);
706 args.add("Update Failed: ");
707 args.add("Unknown error caught when communicating with ExeMgr");
708 message.format(args);
709 result.result = UPDATE_ERROR;
710 result.message = message;
711 qb = 0;
712 msg.restart();
713 msg << qb;
714 fExeMgr->write(msg);
715 return rowsProcessed;
716 }
717
718 //timer.finish();
719 return rowsProcessed;
720 }
721
processRowgroup(ByteStream & aRowGroup,DMLResult & result,const uint64_t uniqueId,dmlpackage::CalpontDMLPackage & cpackage,std::map<unsigned,bool> & pmState,bool isMeta,uint32_t dbroot)722 bool UpdatePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& result, const uint64_t uniqueId,
723 dmlpackage::CalpontDMLPackage& cpackage, std::map<unsigned, bool>& pmState, bool isMeta, uint32_t dbroot)
724 {
725 bool rc = false;
726 //cout << "Get dbroot " << dbroot << endl;
727 uint32_t pmNum = (*fDbRootPMMap)[dbroot];
728
729 ByteStream bytestream;
730 bytestream << (uint8_t)WE_SVR_UPDATE;
731 bytestream << uniqueId;
732 bytestream << pmNum;
733 bytestream << (uint32_t)cpackage.get_TxnID();
734 bytestream += aRowGroup;
735 //cout << "sending rows to pm " << pmNum << " with msg length " << bytestream.length() << endl;
736 uint32_t msgRecived = 0;
737 boost::shared_ptr<messageqcpp::ByteStream> bsIn;
738 bsIn.reset(new ByteStream());
739 ByteStream::byte tmp8;
740 string errorMsg;
741 uint32_t tmp32;
742 uint64_t blocksChanged = 0;
743
744 if (isMeta) //send to all PMs
745 {
746 cpackage.write(bytestream);
747 fWEClient->write_to_all(bytestream);
748
749 while (1)
750 {
751 if (msgRecived == fWEClient->getPmCount())
752 break;
753
754 fWEClient->read(uniqueId, bsIn);
755
756 if ( bsIn->length() == 0 ) //read error
757 {
758 rc = true;
759 break;
760 }
761 else
762 {
763 *bsIn >> tmp8;
764
765 if (tmp8 > 0)
766 {
767 *bsIn >> errorMsg;
768 rc = true;
769 logging::Message::Args args;
770 logging::Message message(2);
771 args.add("Update Failed: ");
772 args.add(errorMsg);
773 message.format(args);
774 result.result = UPDATE_ERROR;
775 result.message = message;
776 break;
777 }
778 else
779 msgRecived++;
780 }
781 }
782
783 return rc;
784 }
785
786 if (pmState[pmNum])
787 {
788 try
789 {
790 //cout << "sending rows to pm " << pmNum << " with msg length " << bytestream.length() << endl;
791 fWEClient->write(bytestream, (uint32_t)pmNum);
792 pmState[pmNum] = false;
793 }
794 catch (runtime_error& ex) //write error
795 {
796 rc = true;
797 logging::Message::Args args;
798 logging::Message message(2);
799 args.add("Update Failed: ");
800 args.add(ex.what());
801 message.format(args);
802 result.result = UPDATE_ERROR;
803 result.message = message;
804 }
805 catch (...)
806 {
807 rc = true;
808 logging::Message::Args args;
809 logging::Message message(2);
810 args.add("Update Failed: ");
811 args.add("Unknown error caught when communicating with WES");
812 message.format(args);
813 result.result = UPDATE_ERROR;
814 result.message = message;
815 }
816 }
817 else
818 {
819 while (1)
820 {
821 bsIn.reset(new ByteStream());
822
823 try
824 {
825 fWEClient->read(uniqueId, bsIn);
826
827 if ( bsIn->length() == 0 ) //read error
828 {
829 rc = true;
830 errorMsg = "Lost connection to Write Engine Server while updating";
831 throw std::runtime_error(errorMsg);
832 }
833 else
834 {
835 *bsIn >> tmp8;
836 *bsIn >> errorMsg;
837
838 if (tmp8 == IDBRANGE_WARNING)
839 {
840 result.result = IDBRANGE_WARNING;
841 logging::Message::Args args;
842 logging::Message message(2);
843 args.add(errorMsg);
844 message.format(args);
845 result.message = message;
846 }
847 else if (tmp8 > 0)
848 {
849 result.stats.fErrorNo = tmp8;
850 rc = (tmp8 != 0);
851 }
852
853 *bsIn >> tmp32;
854 //cout << "Received response from pm " << tmp32 << endl;
855 pmState[tmp32] = true;
856 *bsIn >> blocksChanged;
857 result.stats.fBlocksChanged += blocksChanged;
858
859 if (rc != 0)
860 {
861 throw std::runtime_error(errorMsg);
862 }
863
864 if (tmp32 == (uint32_t)pmNum)
865 {
866 //cout << "sending rows to pm " << pmNum << " with msg length " << bytestream.length() << endl;
867 fWEClient->write(bytestream, (uint32_t)pmNum);
868 pmState[pmNum] = false;
869 break;
870 }
871 }
872 }
873 catch (runtime_error& ex) //write error
874 {
875 rc = true;
876 logging::Message::Args args;
877 logging::Message message(2);
878 args.add("Update Failed: ");
879 args.add(ex.what());
880 message.format(args);
881 result.result = UPDATE_ERROR;
882 result.message = message;
883 break;
884 }
885 catch (...)
886 {
887 rc = true;
888 logging::Message::Args args;
889 logging::Message message(2);
890 args.add("Update Failed: ");
891 args.add("Unknown error caught when communicating with WES");
892 message.format(args);
893 result.result = UPDATE_ERROR;
894 result.message = message;
895 break;
896 }
897 }
898 }
899
900 return rc;
901 }
902
receiveAll(DMLResult & result,const uint64_t uniqueId,std::vector<int> & fPMs,std::map<unsigned,bool> & pmState,const uint32_t tableOid)903 bool UpdatePackageProcessor::receiveAll(DMLResult& result, const uint64_t uniqueId, std::vector<int>& fPMs,
904 std::map<unsigned, bool>& pmState, const uint32_t tableOid)
905 {
906 //check how many message we need to receive
907 uint32_t messagesNotReceived = 0;
908 bool err = false;
909
910 for (unsigned i = 0; i < fPMs.size(); i++)
911 {
912 if (!pmState[fPMs[i]])
913 messagesNotReceived++;
914 }
915
916 boost::shared_ptr<messageqcpp::ByteStream> bsIn;
917 ByteStream::byte tmp8;
918 string errorMsg;
919 uint32_t msgReceived = 0;
920
921 if (messagesNotReceived > 0)
922 {
923 LoggingID logid( DMLLoggingId, fSessionID, fSessionID);
924
925 if ( messagesNotReceived > fWEClient->getPmCount())
926 {
927 logging::Message::Args args1;
928 logging::Message msg(1);
929 args1.add("Update outstanding messages exceed PM count , need to receive messages:PMcount = ");
930 ostringstream oss;
931 oss << messagesNotReceived << ":" << fWEClient->getPmCount();
932 args1.add(oss.str());
933 msg.format( args1 );
934 logging::Logger logger(logid.fSubsysID);
935 logger.logMessage(LOG_TYPE_ERROR, msg, logid);
936 err = true;
937 logging::Message::Args args;
938 logging::Message message(2);
939 args.add("Update Failed: ");
940 args.add("One of WriteEngineServer went away.");
941 message.format(args);
942 result.result = UPDATE_ERROR;
943 result.message = message;
944 return err;
945 }
946
947 bsIn.reset(new ByteStream());
948 ByteStream::quadbyte tmp32;
949 uint64_t blocksChanged = 0;
950
951 while (1)
952 {
953 if (msgReceived == messagesNotReceived)
954 break;
955
956 bsIn.reset(new ByteStream());
957
958 try
959 {
960 fWEClient->read(uniqueId, bsIn);
961
962 if ( bsIn->length() == 0 ) //read error
963 {
964 err = true;
965 errorMsg = "Lost connection to Write Engine Server while updating";
966 throw std::runtime_error(errorMsg);
967 }
968 else
969 {
970 *bsIn >> tmp8;
971 *bsIn >> errorMsg;
972
973 if (tmp8 == IDBRANGE_WARNING)
974 {
975 result.result = IDBRANGE_WARNING;
976 logging::Message::Args args;
977 logging::Message message(2);
978 args.add(errorMsg);
979 message.format(args);
980 result.message = message;
981 }
982 else
983 {
984 result.stats.fErrorNo = tmp8;
985 err = (tmp8 != 0);
986 }
987
988 *bsIn >> tmp32;
989 *bsIn >> blocksChanged;
990 //cout << "Received response from pm " << tmp32 << endl;
991 pmState[tmp32] = true;
992
993 if (err)
994 {
995 throw std::runtime_error(errorMsg);
996 }
997
998 msgReceived++;
999 result.stats.fBlocksChanged += blocksChanged;
1000 }
1001 }
1002 catch (runtime_error& ex) //write error
1003 {
1004 err = true;
1005 logging::Message::Args args;
1006 logging::Message message(2);
1007 args.add("Update Failed: ");
1008 args.add(ex.what());
1009 message.format(args);
1010 result.result = UPDATE_ERROR;
1011 result.message = message;
1012 break;
1013 }
1014 catch (...)
1015 {
1016 err = true;
1017 logging::Message::Args args;
1018 logging::Message message(2);
1019 args.add("Update Failed: ");
1020 args.add("Unknown error caught when communicating with WES");
1021 message.format(args);
1022 result.result = UPDATE_ERROR;
1023 result.message = message;
1024 break;
1025 }
1026 }
1027 }
1028
1029 return err;
1030 }
1031 } // namespace dmlpackageprocessor
1032