1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*******************************************************************************
19 * $Id: we_readthread.cpp 4609 2013-04-19 15:32:02Z chao $
20 *
21 *******************************************************************************/
22 
23 #include <unistd.h>
24 
25 #include "messagequeue.h"
26 #include "bytestream.h"
27 using namespace messageqcpp;
28 
29 #include "threadpool.h"
30 using namespace threadpool;
31 
32 #include "we_dataloader.h"
33 #include "we_readthread.h"
34 #include "we_messages.h"
35 #include "we_message_handlers.h"
36 #include "we_ddlcommandproc.h"
37 #include "we_dmlcommandproc.h"
38 #include "../redistribute/we_redistribute.h"
39 #include "we_config.h"
40 #include "stopwatch.h"
41 using namespace logging;
42 using namespace WriteEngine;
43 //StopWatch timer;
44 namespace WriteEngine
45 {
46 
ReadThread(const IOSocket & ios)47 ReadThread::ReadThread(const IOSocket& ios): fIos(ios)
48 {
49 
50 }
51 
~ReadThread()52 ReadThread::~ReadThread()
53 {
54 
55 }
56 
operator ()()57 void ReadThread::operator ()()
58 {
59     // We should never come here
60 }
61 
62 //-----------------------------------------------------------------------------
63 //ctor
DmlReadThread(const messageqcpp::IOSocket & ios,ByteStream & Ibs)64 DmlReadThread::DmlReadThread(const messageqcpp::IOSocket& ios,
65                              ByteStream& Ibs ): ReadThread(ios), fWeDMLprocessor(new WE_DMLCommandProc), fWeDDLprocessor(new WE_DDLCommandProc)
66 {
67     fIbs = Ibs;
68 }
69 //dtor
~DmlReadThread()70 DmlReadThread::~DmlReadThread()
71 {
72     //cout << "in DmlReadThread destructor" << endl;
73 }
74 
operator ()()75 void DmlReadThread::operator()()
76 {
77     // DCH Since fIbs is a class member, there's no reason to make a copy here.
78     // Why waste the CPU? Note that Splitter thread doesn't make a copy.
79     // The only reason I can think of to make such a copy is to guarantee a
80     // strong exception, but that doesn't appear to be in play here.
81     ByteStream ibs = fIbs;
82     ByteStream obs;
83     ByteStream::byte msgId;
84     ByteStream::octbyte uniqueID;
85     ByteStream::quadbyte PMId;
86     ByteStream::byte rc = 0;
87     std::string errMsg;
88     //cout << "DmlReadThread created ..." << endl;
89     // queryStats.blocksChanged for delete/update
90     uint64_t blocksChanged = 0;
91 
92     while (ibs.length() > 0)
93     {
94         try
95         {
96             errMsg.clear();
97 
98             //do work here...
99             ibs >> msgId;
100 
101             if (msgId != WE_SVR_CLOSE_CONNECTION)
102                 ibs >> uniqueID;
103 
104             //cout << "DmlReadThread " << pthread_self () << " received message id " << (uint32_t)msgId << " and bytestream length " << ibs.length() << endl;
105             switch (msgId)
106             {
107                 case WE_SVR_SINGLE_INSERT:
108                 {
109                     rc = fWeDMLprocessor->processSingleInsert(ibs, errMsg);
110                     break;
111                 }
112 
113                 case WE_SVR_COMMIT_VERSION:
114                 {
115                     rc = fWeDMLprocessor->commitVersion(ibs, errMsg);
116                     break;
117                 }
118 
119                 case WE_SVR_ROLLBACK_BLOCKS:
120                 {
121                     rc = fWeDMLprocessor->rollbackBlocks(ibs, errMsg);
122                     break;
123                 }
124 
125                 case WE_SVR_ROLLBACK_VERSION:
126                 {
127                     rc = fWeDMLprocessor->rollbackVersion(ibs, errMsg);
128                     break;
129                 }
130 
131                 case WE_SVR_COMMIT_BATCH_AUTO_ON:
132                 {
133                     rc = fWeDMLprocessor->commitBatchAutoOn(ibs, errMsg);
134                     break;
135                 }
136 
137                 case WE_SVR_ROLLBACK_BATCH_AUTO_ON:
138                 {
139                     rc = fWeDMLprocessor->rollbackBatchAutoOn(ibs, errMsg);
140                     break;
141                 }
142 
143                 case WE_SVR_COMMIT_BATCH_AUTO_OFF:
144                 {
145                     rc = fWeDMLprocessor->commitBatchAutoOn(ibs, errMsg);
146                     break;
147                 }
148 
149                 case WE_SVR_ROLLBACK_BATCH_AUTO_OFF:
150                 {
151                     rc = fWeDMLprocessor->rollbackBatchAutoOff(ibs, errMsg);
152                     break;
153                 }
154 
155                 case WE_SVR_BATCH_INSERT:
156                 {
157                     //timer.start("processBatchInsert");
158                     rc = fWeDMLprocessor->processBatchInsert(ibs, errMsg, PMId);
159                     //timer.stop("processBatchInsert");
160                     //cout << "fWeDMLprocessor " << fWeDMLprocessor << " is processing batchinsert ..." << endl;
161                     break;
162                 }
163 
164                 case WE_SVR_BATCH_INSERT_BINARY:
165                 {
166                     rc = fWeDMLprocessor->processBatchInsertBinary(ibs, errMsg, PMId);
167                     break;
168                 }
169 
170                 case WE_SVR_GET_WRITTEN_LBIDS:
171                 {
172                     rc = fWeDMLprocessor->getWrittenLbids(ibs, errMsg, PMId);
173                     break;
174                 }
175 
176                 case WE_SVR_BATCH_INSERT_END:
177                 {
178                     rc = fWeDMLprocessor->processBatchInsertHwm(ibs, errMsg);
179                     //timer.finish();
180                     break;
181                 }
182 
183                 case WE_SVR_UPDATE:
184                 {
185                     rc = fWeDMLprocessor->processUpdate(ibs, errMsg, PMId, blocksChanged);
186                     break;
187                 }
188 
189                 case WE_SVR_FLUSH_FILES:
190                 {
191                     rc = fWeDMLprocessor->processFlushFiles(ibs, errMsg);
192                     break;
193                 }
194 
195                 case WE_SVR_DELETE:
196                 {
197                     rc = fWeDMLprocessor->processDelete(ibs, errMsg, PMId, blocksChanged);
198                     break;
199                 }
200 
201                 case WE_SVR_BATCH_AUTOON_REMOVE_META:
202                 {
203                     rc = fWeDMLprocessor->processRemoveMeta(ibs, errMsg);
204                     break;
205                 }
206 
207                 case WE_SVR_DML_BULKROLLBACK:
208                 {
209                     rc = fWeDMLprocessor->processBulkRollback(ibs, errMsg);
210                     break;
211                 }
212 
213                 case WE_SVR_DML_BULKROLLBACK_CLEANUP:
214                 {
215                     rc = fWeDMLprocessor->processBulkRollbackCleanup(ibs, errMsg);
216                     break;
217                 }
218 
219                 case WE_UPDATE_NEXTVAL:
220                 {
221                     rc = fWeDMLprocessor->updateSyscolumnNextval(ibs, errMsg);
222                     break;
223                 }
224 
225                 case WE_SVR_WRITE_SYSTABLE:
226                 {
227                     rc = fWeDDLprocessor->writeSystable(ibs, errMsg);
228                     break;
229                 }
230 
231                 case WE_SVR_WRITE_SYSCOLUMN:
232                 {
233                     rc = fWeDDLprocessor->writeSyscolumn(ibs, errMsg);
234                     break;
235                 }
236 
237                 case WE_SVR_WRITE_CREATE_SYSCOLUMN:
238                 {
239                     rc = fWeDDLprocessor->writeCreateSyscolumn(ibs, errMsg);
240                     break;
241                 }
242 
243                 case WE_SVR_WRITE_CREATETABLEFILES:
244                 {
245                     rc = fWeDDLprocessor->createtablefiles(ibs, errMsg);
246                     break;
247                 }
248 
249                 case WE_SVR_DELETE_SYSCOLUMN:
250                 {
251                     rc = fWeDDLprocessor->deleteSyscolumn(ibs, errMsg);
252                     break;
253                 }
254 
255                 case WE_SVR_DELETE_SYSCOLUMN_ROW:
256                 {
257                     rc = fWeDDLprocessor->deleteSyscolumnRow(ibs, errMsg);
258                     break;
259                 }
260 
261                 case WE_SVR_DELETE_SYSTABLE:
262                 {
263                     rc = fWeDDLprocessor->deleteSystable(ibs, errMsg);
264                     break;
265                 }
266 
267                 case WE_SVR_DELETE_SYSTABLES:
268                 {
269                     rc = fWeDDLprocessor->deleteSystables(ibs, errMsg);
270                     break;
271                 }
272 
273                 case WE_SVR_WRITE_DROPFILES:
274                 {
275                     rc = fWeDDLprocessor->dropFiles(ibs, errMsg);
276                     break;
277                 }
278 
279                 case WE_SVR_UPDATE_SYSCOLUMN_AUTO:
280                 {
281                     rc = fWeDDLprocessor->updateSyscolumnAuto(ibs, errMsg);
282                     break;
283                 }
284 
285                 case WE_SVR_UPDATE_SYSCOLUMN_NEXTVAL:
286                 {
287                     rc = fWeDDLprocessor->updateSyscolumnNextvalCol(ibs, errMsg);
288                     break;
289                 }
290 
291                 case WE_SVR_UPDATE_SYSCOLUMN_AUTOVAL:
292                 {
293                     rc = fWeDDLprocessor->updateSyscolumnNextval(ibs, errMsg);
294                     break;
295                 }
296 
297                 case WE_SVR_UPDATE_SYSCOLUMN_DEFAULTVAL:
298                 {
299                     rc = fWeDDLprocessor->updateSyscolumnSetDefault(ibs, errMsg);
300                     break;
301                 }
302 
303                 case WE_SVR_UPDATE_SYSCOLUMN_TABLENAME:
304                 {
305                     rc = fWeDDLprocessor->updateSyscolumnTablename(ibs, errMsg);
306                     break;
307                 }
308 
309                 case WE_SVR_UPDATE_SYSCOLUMN_RENAMECOLUMN:
310                 {
311                     rc = fWeDDLprocessor->updateSyscolumnRenameColumn(ibs, errMsg);
312                     break;
313                 }
314 
315                 case WE_SVR_UPDATE_SYSCOLUMN_COLPOS:
316                 {
317                     rc = fWeDDLprocessor->updateSyscolumnColumnposCol(ibs, errMsg);
318                     break;
319                 }
320 
321                 case WE_SVR_UPDATE_SYSTABLE_AUTO:
322                 {
323                     rc = fWeDDLprocessor->updateSystableAuto(ibs, errMsg);
324                     break;
325                 }
326 
327                 case WE_SVR_UPDATE_SYSTABLE_TABLENAME:
328                 {
329                     rc = fWeDDLprocessor->updateSystableTablename(ibs, errMsg);
330                     break;
331                 }
332 
333                 case WE_SVR_UPDATE_SYSTABLES_TABLENAME:
334                 {
335                     rc = fWeDDLprocessor->updateSystablesTablename(ibs, errMsg);
336                     break;
337                 }
338 
339                 case WE_SVR_FILL_COLUMN:
340                 {
341                     rc = fWeDDLprocessor->fillNewColumn(ibs, errMsg);
342                     break;
343                 }
344 
345                 case WE_SVR_DROP_PARTITIONS:
346                 {
347                     rc = fWeDDLprocessor->dropPartitions(ibs, errMsg);
348                     break;
349                 }
350 
351                 case WE_SVR_WRITE_TRUNCATE:
352                 {
353                     rc = fWeDDLprocessor->writeTruncateLog(ibs, errMsg);
354                     break;
355                 }
356 
357                 case WE_SVR_WRITE_DROPPARTITION:
358                 {
359                     rc = fWeDDLprocessor->writeDropPartitionLog(ibs, errMsg);
360                     break;
361                 }
362 
363                 case WE_SVR_WRITE_DROPTABLE:
364                 {
365                     rc = fWeDDLprocessor->writeDropTableLog(ibs, errMsg);
366                     break;
367                 }
368 
369                 case WE_SVR_DELETE_DDLLOG:
370                 {
371                     rc = fWeDDLprocessor->deleteDDLLog(ibs, errMsg);
372                     break;
373                 }
374 
375                 case WE_SVR_FETCH_DDL_LOGS:
376                 {
377                     rc = fWeDDLprocessor->fetchDDLLog(ibs, errMsg);
378                     break;
379                 }
380 
381                 case WE_SVR_PURGEFD:
382                 {
383                     rc = fWeDMLprocessor->processPurgeFDCache(ibs, errMsg);
384                     break;
385                 }
386 
387                 case WE_END_TRANSACTION:
388                 {
389                     rc = fWeDMLprocessor->processEndTransaction(ibs, errMsg);
390                     break;
391                 }
392 
393                 case WE_SRV_FIX_ROWS:
394                 {
395                     rc = fWeDMLprocessor->processFixRows(ibs, errMsg, PMId);
396                     break;
397                 }
398 
399                 case WE_SVR_CLOSE_CONNECTION:
400                 {
401                     break;
402                 }
403 
404                 default:
405                     break;
406             }
407         }
408         catch (std::exception& ex)
409         {
410             logging::LoggingID logid(19, 0, 0);
411             logging::Message::Args args;
412             logging::Message msg(1);
413             args.add("we_readthread caught exception ");
414             args.add(ex.what());
415             msg.format(args);
416             logging::Logger logger(logid.fSubsysID);
417             logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
418             rc = 1;
419             errMsg = msg.msg();
420         }
421         catch (...)
422         {
423             logging::LoggingID logid(19, 0, 0);
424             logging::Message::Args args;
425             logging::Message msg(1);
426             args.add("we_readthread caught ... exception ");
427             msg.format(args);
428             logging::Logger logger(logid.fSubsysID);
429             logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
430             rc = 1;
431             errMsg = msg.msg();
432         }
433 
434         if (msgId != WE_SVR_CLOSE_CONNECTION)
435         {
436             //send response
437             obs.restart();
438             obs << uniqueID;
439             obs << rc;
440             obs << errMsg;
441         }
442 
443         if ((msgId == WE_SVR_COMMIT_BATCH_AUTO_ON) || (msgId == WE_SVR_BATCH_INSERT_END) || (msgId == WE_SVR_FETCH_DDL_LOGS) || (msgId == WE_SVR_GET_WRITTEN_LBIDS))
444         {
445             obs += ibs;
446             //cout << " sending back hwm info with ibs length " << endl;
447         }
448         else if ((msgId == WE_SVR_BATCH_INSERT) || (msgId == WE_SVR_UPDATE) || (msgId == WE_SVR_DELETE))
449         {
450             obs << PMId;
451         }
452         else if ((msgId == WE_SVR_DML_BULKROLLBACK) ||
453                  (msgId == WE_SVR_DML_BULKROLLBACK_CLEANUP))
454         {
455             obs << Config::getLocalModuleID();
456         }
457 
458         if (msgId == WE_SVR_UPDATE || msgId == WE_SVR_DELETE)
459             obs << blocksChanged; // send stats back to DMLProc
460 
461         blocksChanged = 0; // reset
462 
463         if (msgId == WE_SVR_CLOSE_CONNECTION)
464         {
465             //cout << "received request. closing connection ..." << endl;
466             break;
467         }
468         else
469         {
470             try
471             {
472                 fIos.write(obs);
473                 //cout << "dmlthread sent back response for msgid " << (uint32_t)msgId << " with uniqueID:rc= "
474                 //<< (uint32_t)uniqueID<<":"<< (uint32_t)rc<<" and error message is " << errMsg <<endl;
475                 //get next message
476                 ibs = fIos.read();
477             }
478             catch (...)
479             {
480                 break;
481             }
482         }
483     }
484 
485     //cout << "closing connection for thread " << pthread_self () << endl;
486     fIos.close();
487 }
488 
489 
490 //-----------------------------------------------------------------------------
491 //ctor
SplitterReadThread(const messageqcpp::IOSocket & ios,ByteStream & Ibs)492 SplitterReadThread::SplitterReadThread(const messageqcpp::IOSocket& ios,
493                                        ByteStream& Ibs): ReadThread(ios), fWeDataLoader(*this)
494 {
495     fIbs = Ibs;
496 }
497 
498 //-----------------------------------------------------------------------------
499 //copy ctor
SplitterReadThread(const SplitterReadThread & rhs)500 SplitterReadThread::SplitterReadThread(const SplitterReadThread& rhs):
501     ReadThread(rhs.fIos), fWeDataLoader(*this)
502 {
503     fIbs = rhs.fIbs;
504 
505 }
506 //-----------------------------------------------------------------------------
507 // dtor
~SplitterReadThread()508 SplitterReadThread::~SplitterReadThread()
509 {
510 }
511 //-----------------------------------------------------------------------------
512 /**
513 * @brief Thread Function which process incoming messages
514 *
515 */
operator ()()516 void SplitterReadThread::operator()()
517 {
518     ByteStream::byte msgId;
519 
520     while (fIbs.length() > 0)
521     {
522         fWeDataLoader.updateRxBytes(fIbs.length());
523 
524         //do work here...
525         fIbs >> msgId;
526 
527         //cout << (int)msgId << endl;
528 
529         switch (msgId)
530         {
531             case WE_CLT_SRV_KEEPALIVE:
532             {
533                 fWeDataLoader.onReceiveKeepAlive(fIbs);
534                 break;
535             }
536 
537             case WE_CLT_SRV_DATA:
538             {
539                 fWeDataLoader.onReceiveData(fIbs);
540                 break;
541             }
542 
543             case WE_CLT_SRV_EOD:
544             {
545                 fWeDataLoader.onReceiveEod(fIbs);
546                 break;
547             }
548 
549             case WE_CLT_SRV_MODE:
550             {
551                 fWeDataLoader.onReceiveMode(fIbs);
552                 break;
553             }
554 
555             case WE_CLT_SRV_IMPFILENAME:
556             {
557                 fWeDataLoader.onReceiveImportFileName(fIbs);
558                 break;
559             }
560 
561             case WE_CLT_SRV_CMDLINEARGS:
562             {
563                 fWeDataLoader.onReceiveCmdLineArgs(fIbs);
564                 break;
565             }
566 
567             case WE_CLT_SRV_CMD:
568             {
569                 fWeDataLoader.onReceiveCmd(fIbs);   //fig out share_ptr on BS& is better
570                 break;
571             }
572 
573             case WE_CLT_SRV_ACK:
574             {
575                 fWeDataLoader.onReceiveAck(fIbs);
576                 break;
577             }
578 
579             case WE_CLT_SRV_NAK:
580             {
581                 fWeDataLoader.onReceiveNak(fIbs);
582                 break;
583             }
584 
585             case WE_CLT_SRV_PM_ERROR:
586             {
587                 fWeDataLoader.onReceiveError(fIbs);
588                 break;
589             }
590 
591             case WE_CLT_SRV_STARTCPI:
592             {
593                 fWeDataLoader.onReceiveStartCpimport();
594                 break;
595             }
596 
597             case WE_CLT_SRV_BRMRPT:
598             {
599                 fWeDataLoader.onReceiveBrmRptFileName(fIbs);
600                 break;
601             }
602 
603             case WE_CLT_SRV_CLEANUP:
604             {
605                 fWeDataLoader.onReceiveCleanup(fIbs);
606                 break;
607             }
608 
609             case WE_CLT_SRV_ROLLBACK:
610             {
611                 fWeDataLoader.onReceiveRollback(fIbs);
612                 break;
613             }
614 
615             case WE_CLT_SRV_JOBID:
616             {
617                 fWeDataLoader.onReceiveJobId(fIbs);
618                 break;
619             }
620 
621             case WE_CLT_SRV_JOBDATA:
622             {
623                 fWeDataLoader.onReceiveJobData(fIbs);
624                 break;
625             }
626 
627             case WE_CLT_SRV_ERRLOG:
628             {
629                 fWeDataLoader.onReceiveErrFileRqst(fIbs);
630                 break;
631             }
632 
633             case WE_CLT_SRV_BADLOG:
634             {
635                 fWeDataLoader.onReceiveBadFileRqst(fIbs);
636                 break;
637             }
638 
639             default:
640                 break;
641 
642         }
643 
644         fIbs.restart();
645 
646         try
647         {
648             //get next message
649             fIbs = fIos.read();
650         }
651         catch (...)
652         {
653             fIbs.restart();     //setting length=0, get out of loop
654             std::cout << "Broken Pipe" << std::endl;
655 
656             logging::LoggingID logid(19, 0, 0);
657             logging::Message::Args args;
658             logging::Message msg(1);
659             args.add("SplitterReadThread::operator: Broken Pipe ");
660             msg.format(args);
661             logging::Logger logger(logid.fSubsysID);
662             logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
663         }
664     }
665 
666     fIos.close();
667 
668 }
669 
670 //------------------------------------------------------------------------------
671 // ClearTableLockReadThread constructor.
672 //------------------------------------------------------------------------------
ClearTableLockReadThread(const messageqcpp::IOSocket & ios,ByteStream & Ibs)673 ClearTableLockReadThread::ClearTableLockReadThread(
674     const messageqcpp::IOSocket& ios,
675     ByteStream& Ibs ): ReadThread(ios),
676     fWeClearTableLockCmd(new WE_ClearTableLockCmd("ClearTableLockTool"))
677 {
678     fIbs = Ibs;
679 }
680 
681 //------------------------------------------------------------------------------
682 // ClearTableLockReadThread destructor.
683 //------------------------------------------------------------------------------
~ClearTableLockReadThread()684 ClearTableLockReadThread::~ClearTableLockReadThread()
685 {
686 }
687 
688 //------------------------------------------------------------------------------
689 // Thread entry point to ClearTableLockReadThread object used to receive msgs
690 // from a cleartablelock tool client.
691 //------------------------------------------------------------------------------
operator ()()692 void ClearTableLockReadThread::operator()()
693 {
694     ByteStream::byte msgId;
695     ByteStream obs;
696     ByteStream::byte rc = 0;
697     std::string errMsg;
698 
699     // Read msgid from ByteStream and forward to applicable processing function
700     while (fIbs.length() > 0)
701     {
702         fIbs >> msgId;
703 
704         switch (msgId)
705         {
706             case WE_CLT_SRV_CLEAR_TABLE_LOCK:
707             {
708                 rc = fWeClearTableLockCmd->processRollback(fIbs, errMsg);
709                 break;
710             }
711 
712             case WE_CLT_SRV_CLEAR_TABLE_LOCK_CLEANUP:
713             {
714                 rc = fWeClearTableLockCmd->processCleanup(fIbs, errMsg);
715                 break;
716             }
717 
718             default:
719             {
720                 break;
721             }
722         }
723 
724         // Send response
725         obs.restart();
726         obs << rc;
727         obs << errMsg;
728 
729         try
730         {
731             fIos.write(obs);
732 
733             // Get next message
734             fIbs = fIos.read();
735         }
736         catch (...)
737         {
738             logging::LoggingID logid(19, 0, 0);
739             logging::Message::Args args;
740             logging::Message msg(1);
741             args.add("ClearTableLockReadThread::operator: Broken Pipe ");
742             msg.format(args);
743             logging::Logger logger(logid.fSubsysID);
744             logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
745             break;
746         }
747     }
748 
749     fIos.close();
750 }
751 
752 
753 //------------------------------------------------------------------------------
754 // RedistributeReadThread constructor.
755 //------------------------------------------------------------------------------
RedistributeReadThread(const messageqcpp::IOSocket & ios,ByteStream & Ibs)756 RedistributeReadThread::RedistributeReadThread(const messageqcpp::IOSocket& ios, ByteStream& Ibs)
757     : ReadThread(ios)
758 {
759     fIbs = Ibs;
760 }
761 
762 //------------------------------------------------------------------------------
763 // RedistributeReadThread destructor.
764 //------------------------------------------------------------------------------
~RedistributeReadThread()765 RedistributeReadThread::~RedistributeReadThread()
766 {
767 }
768 
769 //------------------------------------------------------------------------------
770 // Thread entry point to RedistributeReadThread object used to receive msgs
771 // from a cleartablelock tool client.
772 //------------------------------------------------------------------------------
operator ()()773 void RedistributeReadThread::operator()()
774 {
775     try
776     {
777         redistribute::Redistribute::handleRedistributeMessage(fIbs, fIos);
778     }
779     catch (...)
780     {
781         logging::LoggingID logid(19, 0, 0);
782         logging::Message::Args args;
783         logging::Message msg(1);
784         args.add("RedistributeReadThread::operator exception handled ");
785         msg.format(args);
786         logging::Logger logger(logid.fSubsysID);
787         logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
788     }
789 
790     fIos.close();
791 }
792 //------------------------------------------------------------------------------
793 // GetFileSizeThread constructor.
794 //------------------------------------------------------------------------------
GetFileSizeThread(const messageqcpp::IOSocket & ios,ByteStream & Ibs,BRM::DBRM & dbrm)795 GetFileSizeThread::GetFileSizeThread(const messageqcpp::IOSocket& ios, ByteStream& Ibs, BRM::DBRM& dbrm)
796     : ReadThread(ios), fWeGetFileSizes(new WE_GetFileSizes())
797 {
798     fIbs = Ibs;
799     key = dbrm.getUnique32();
800 }
801 
802 //------------------------------------------------------------------------------
803 // GetFileSizeThread destructor.
804 //------------------------------------------------------------------------------
~GetFileSizeThread()805 GetFileSizeThread::~GetFileSizeThread()
806 {
807 }
808 
809 //------------------------------------------------------------------------------
810 // Thread entry point to GetFileSizeThread object used to receive msgs
811 //------------------------------------------------------------------------------
operator ()()812 void GetFileSizeThread::operator()()
813 {
814     ByteStream::byte msgId;
815     ByteStream obs;
816     ByteStream::byte rc = 0;
817     std::string errMsg;
818 
819     // Read msgid from ByteStream and forward to applicable processing function
820     while (fIbs.length() > 0)
821     {
822         fIbs >> msgId;
823 
824         switch (msgId)
825         {
826             case WE_SVR_GET_FILESIZES:
827             {
828                 rc = fWeGetFileSizes->processTable(fIbs, errMsg, key);
829                 break;
830             }
831 
832             case WE_SVR_GET_FILESIZE:
833             {
834                 rc = fWeGetFileSizes->processFileName(fIbs, errMsg, key);
835                 break;
836             }
837 
838             default:
839             {
840                 break;
841             }
842         }
843 
844         // Send response
845         obs.restart();
846         obs << rc;
847         obs << errMsg;
848         obs += fIbs;
849 
850         try
851         {
852             fIos.write(obs);
853 
854             // Get next message
855             fIbs = fIos.read();
856         }
857         catch (...)
858         {
859             logging::LoggingID logid(19, 0, 0);
860             logging::Message::Args args;
861             logging::Message msg(1);
862             args.add("GetFileSizeThread::operator: Broken Pipe ");
863             msg.format(args);
864             logging::Logger logger(logid.fSubsysID);
865             logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
866             break;
867         }
868     }
869 
870     fIos.close();
871 }
872 
873 //-----------------------------------------------------------------------------
874 
CreateReadThread(ThreadPool & Tp,IOSocket & Ios,BRM::DBRM & dbrm)875 void ReadThreadFactory::CreateReadThread(ThreadPool& Tp, IOSocket& Ios, BRM::DBRM& dbrm)
876 {
877     struct timespec rm_ts;
878     int sleepTime = 20000; // wait for 20 seconds
879     rm_ts.tv_sec = sleepTime / 1000;
880     rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
881     bool isTimeOut = false;
882 
883     ByteStream::byte msgId;
884     ByteStream aBs;
885 
886     try
887     {
888         aBs = Ios.read(&rm_ts, &isTimeOut);
889     }
890     catch (std::exception& ex)
891     {
892       std::cout << "Handled : " << ex.what() << std::endl;
893         logging::LoggingID logid(19, 0, 0);
894         logging::Message::Args args;
895         logging::Message msg(1);
896         args.add("ReadThreadFactory::CreateReadThread: read() error");
897         args.add(ex.what());
898         msg.format(args);
899         logging::Logger logger(logid.fSubsysID);
900         logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
901     }
902 
903     if ((aBs.length() <= 0) || (isTimeOut))
904     {
905         Ios.close();
906         return;
907     }
908 
909     aBs.peek(msgId);
910 
911     switch (msgId)
912     {
913         case WE_SVR_DDL_KEEPALIVE:
914         case WE_SVR_DML_KEEPALIVE:
915         {
916             DmlReadThread dmlReadThread(Ios, aBs);
917             boost::thread t(dmlReadThread);
918             //cout << "starting DML thread id " << t.get_id() << endl;
919         }
920         break;
921 
922         case WE_CLT_SRV_KEEPALIVE:
923         case WE_CLT_SRV_MODE:
924         case WE_CLT_SRV_DATA:
925         case WE_CLT_SRV_CMD:
926         case WE_CLT_SRV_ACK:
927         case WE_CLT_SRV_NAK:
928         case WE_CLT_SRV_PM_ERROR:
929         case WE_CLT_SRV_CMDLINEARGS:
930         {
931             //SplitterReadThread aSpReadThread(Ios, aBs);
932             //fOwner.attach(reinterpret_cast<Observer*>(&(aSpReadThread.fWeDataLoader)));
933             //Tp.invoke(aSpReadThread);
934             Tp.invoke(SplitterReadThread(Ios, aBs));
935         }
936         break;
937 
938         case WE_CLT_SRV_CLEAR_TABLE_LOCK:
939         case WE_CLT_SRV_CLEAR_TABLE_LOCK_CLEANUP:
940         {
941             ClearTableLockReadThread clearTableLockThread(Ios, aBs);
942             Tp.invoke( clearTableLockThread );
943         }
944         break;
945 
946         case WE_SVR_REDISTRIBUTE:
947         {
948             RedistributeReadThread RedistributeReadThread(Ios, aBs);
949             Tp.invoke(RedistributeReadThread);
950         }
951         break;
952 
953         case WE_SVR_GET_FILESIZES:
954         case WE_SVR_GET_FILESIZE:
955         {
956             GetFileSizeThread getFileSizeThread(Ios, aBs, dbrm);
957             Tp.invoke(getFileSizeThread);
958         }
959         break;
960 
961         default:
962         {
963             Ios.close();    // don't know who is this
964         }
965         break;
966 
967     }
968 
969 }
970 
971 }
972 
973