1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*******************************************************************************
19 * $Id: we_readthread.cpp 4609 2013-04-19 15:32:02Z chao $
20 *
21 *******************************************************************************/
22
23 #include <unistd.h>
24
25 #include "messagequeue.h"
26 #include "bytestream.h"
27 using namespace messageqcpp;
28
29 #include "threadpool.h"
30 using namespace threadpool;
31
32 #include "we_dataloader.h"
33 #include "we_readthread.h"
34 #include "we_messages.h"
35 #include "we_message_handlers.h"
36 #include "we_ddlcommandproc.h"
37 #include "we_dmlcommandproc.h"
38 #include "../redistribute/we_redistribute.h"
39 #include "we_config.h"
40 #include "stopwatch.h"
41 using namespace logging;
42 using namespace WriteEngine;
43 //StopWatch timer;
44 namespace WriteEngine
45 {
46
ReadThread(const IOSocket & ios)47 ReadThread::ReadThread(const IOSocket& ios): fIos(ios)
48 {
49
50 }
51
~ReadThread()52 ReadThread::~ReadThread()
53 {
54
55 }
56
operator ()()57 void ReadThread::operator ()()
58 {
59 // We should never come here
60 }
61
62 //-----------------------------------------------------------------------------
63 //ctor
DmlReadThread(const messageqcpp::IOSocket & ios,ByteStream & Ibs)64 DmlReadThread::DmlReadThread(const messageqcpp::IOSocket& ios,
65 ByteStream& Ibs ): ReadThread(ios), fWeDMLprocessor(new WE_DMLCommandProc), fWeDDLprocessor(new WE_DDLCommandProc)
66 {
67 fIbs = Ibs;
68 }
69 //dtor
~DmlReadThread()70 DmlReadThread::~DmlReadThread()
71 {
72 //cout << "in DmlReadThread destructor" << endl;
73 }
74
operator ()()75 void DmlReadThread::operator()()
76 {
77 // DCH Since fIbs is a class member, there's no reason to make a copy here.
78 // Why waste the CPU? Note that Splitter thread doesn't make a copy.
79 // The only reason I can think of to make such a copy is to guarantee a
80 // strong exception, but that doesn't appear to be in play here.
81 ByteStream ibs = fIbs;
82 ByteStream obs;
83 ByteStream::byte msgId;
84 ByteStream::octbyte uniqueID;
85 ByteStream::quadbyte PMId;
86 ByteStream::byte rc = 0;
87 std::string errMsg;
88 //cout << "DmlReadThread created ..." << endl;
89 // queryStats.blocksChanged for delete/update
90 uint64_t blocksChanged = 0;
91
92 while (ibs.length() > 0)
93 {
94 try
95 {
96 errMsg.clear();
97
98 //do work here...
99 ibs >> msgId;
100
101 if (msgId != WE_SVR_CLOSE_CONNECTION)
102 ibs >> uniqueID;
103
104 //cout << "DmlReadThread " << pthread_self () << " received message id " << (uint32_t)msgId << " and bytestream length " << ibs.length() << endl;
105 switch (msgId)
106 {
107 case WE_SVR_SINGLE_INSERT:
108 {
109 rc = fWeDMLprocessor->processSingleInsert(ibs, errMsg);
110 break;
111 }
112
113 case WE_SVR_COMMIT_VERSION:
114 {
115 rc = fWeDMLprocessor->commitVersion(ibs, errMsg);
116 break;
117 }
118
119 case WE_SVR_ROLLBACK_BLOCKS:
120 {
121 rc = fWeDMLprocessor->rollbackBlocks(ibs, errMsg);
122 break;
123 }
124
125 case WE_SVR_ROLLBACK_VERSION:
126 {
127 rc = fWeDMLprocessor->rollbackVersion(ibs, errMsg);
128 break;
129 }
130
131 case WE_SVR_COMMIT_BATCH_AUTO_ON:
132 {
133 rc = fWeDMLprocessor->commitBatchAutoOn(ibs, errMsg);
134 break;
135 }
136
137 case WE_SVR_ROLLBACK_BATCH_AUTO_ON:
138 {
139 rc = fWeDMLprocessor->rollbackBatchAutoOn(ibs, errMsg);
140 break;
141 }
142
143 case WE_SVR_COMMIT_BATCH_AUTO_OFF:
144 {
145 rc = fWeDMLprocessor->commitBatchAutoOn(ibs, errMsg);
146 break;
147 }
148
149 case WE_SVR_ROLLBACK_BATCH_AUTO_OFF:
150 {
151 rc = fWeDMLprocessor->rollbackBatchAutoOff(ibs, errMsg);
152 break;
153 }
154
155 case WE_SVR_BATCH_INSERT:
156 {
157 //timer.start("processBatchInsert");
158 rc = fWeDMLprocessor->processBatchInsert(ibs, errMsg, PMId);
159 //timer.stop("processBatchInsert");
160 //cout << "fWeDMLprocessor " << fWeDMLprocessor << " is processing batchinsert ..." << endl;
161 break;
162 }
163
164 case WE_SVR_BATCH_INSERT_BINARY:
165 {
166 rc = fWeDMLprocessor->processBatchInsertBinary(ibs, errMsg, PMId);
167 break;
168 }
169
170 case WE_SVR_GET_WRITTEN_LBIDS:
171 {
172 rc = fWeDMLprocessor->getWrittenLbids(ibs, errMsg, PMId);
173 break;
174 }
175
176 case WE_SVR_BATCH_INSERT_END:
177 {
178 rc = fWeDMLprocessor->processBatchInsertHwm(ibs, errMsg);
179 //timer.finish();
180 break;
181 }
182
183 case WE_SVR_UPDATE:
184 {
185 rc = fWeDMLprocessor->processUpdate(ibs, errMsg, PMId, blocksChanged);
186 break;
187 }
188
189 case WE_SVR_FLUSH_FILES:
190 {
191 rc = fWeDMLprocessor->processFlushFiles(ibs, errMsg);
192 break;
193 }
194
195 case WE_SVR_DELETE:
196 {
197 rc = fWeDMLprocessor->processDelete(ibs, errMsg, PMId, blocksChanged);
198 break;
199 }
200
201 case WE_SVR_BATCH_AUTOON_REMOVE_META:
202 {
203 rc = fWeDMLprocessor->processRemoveMeta(ibs, errMsg);
204 break;
205 }
206
207 case WE_SVR_DML_BULKROLLBACK:
208 {
209 rc = fWeDMLprocessor->processBulkRollback(ibs, errMsg);
210 break;
211 }
212
213 case WE_SVR_DML_BULKROLLBACK_CLEANUP:
214 {
215 rc = fWeDMLprocessor->processBulkRollbackCleanup(ibs, errMsg);
216 break;
217 }
218
219 case WE_UPDATE_NEXTVAL:
220 {
221 rc = fWeDMLprocessor->updateSyscolumnNextval(ibs, errMsg);
222 break;
223 }
224
225 case WE_SVR_WRITE_SYSTABLE:
226 {
227 rc = fWeDDLprocessor->writeSystable(ibs, errMsg);
228 break;
229 }
230
231 case WE_SVR_WRITE_SYSCOLUMN:
232 {
233 rc = fWeDDLprocessor->writeSyscolumn(ibs, errMsg);
234 break;
235 }
236
237 case WE_SVR_WRITE_CREATE_SYSCOLUMN:
238 {
239 rc = fWeDDLprocessor->writeCreateSyscolumn(ibs, errMsg);
240 break;
241 }
242
243 case WE_SVR_WRITE_CREATETABLEFILES:
244 {
245 rc = fWeDDLprocessor->createtablefiles(ibs, errMsg);
246 break;
247 }
248
249 case WE_SVR_DELETE_SYSCOLUMN:
250 {
251 rc = fWeDDLprocessor->deleteSyscolumn(ibs, errMsg);
252 break;
253 }
254
255 case WE_SVR_DELETE_SYSCOLUMN_ROW:
256 {
257 rc = fWeDDLprocessor->deleteSyscolumnRow(ibs, errMsg);
258 break;
259 }
260
261 case WE_SVR_DELETE_SYSTABLE:
262 {
263 rc = fWeDDLprocessor->deleteSystable(ibs, errMsg);
264 break;
265 }
266
267 case WE_SVR_DELETE_SYSTABLES:
268 {
269 rc = fWeDDLprocessor->deleteSystables(ibs, errMsg);
270 break;
271 }
272
273 case WE_SVR_WRITE_DROPFILES:
274 {
275 rc = fWeDDLprocessor->dropFiles(ibs, errMsg);
276 break;
277 }
278
279 case WE_SVR_UPDATE_SYSCOLUMN_AUTO:
280 {
281 rc = fWeDDLprocessor->updateSyscolumnAuto(ibs, errMsg);
282 break;
283 }
284
285 case WE_SVR_UPDATE_SYSCOLUMN_NEXTVAL:
286 {
287 rc = fWeDDLprocessor->updateSyscolumnNextvalCol(ibs, errMsg);
288 break;
289 }
290
291 case WE_SVR_UPDATE_SYSCOLUMN_AUTOVAL:
292 {
293 rc = fWeDDLprocessor->updateSyscolumnNextval(ibs, errMsg);
294 break;
295 }
296
297 case WE_SVR_UPDATE_SYSCOLUMN_DEFAULTVAL:
298 {
299 rc = fWeDDLprocessor->updateSyscolumnSetDefault(ibs, errMsg);
300 break;
301 }
302
303 case WE_SVR_UPDATE_SYSCOLUMN_TABLENAME:
304 {
305 rc = fWeDDLprocessor->updateSyscolumnTablename(ibs, errMsg);
306 break;
307 }
308
309 case WE_SVR_UPDATE_SYSCOLUMN_RENAMECOLUMN:
310 {
311 rc = fWeDDLprocessor->updateSyscolumnRenameColumn(ibs, errMsg);
312 break;
313 }
314
315 case WE_SVR_UPDATE_SYSCOLUMN_COLPOS:
316 {
317 rc = fWeDDLprocessor->updateSyscolumnColumnposCol(ibs, errMsg);
318 break;
319 }
320
321 case WE_SVR_UPDATE_SYSTABLE_AUTO:
322 {
323 rc = fWeDDLprocessor->updateSystableAuto(ibs, errMsg);
324 break;
325 }
326
327 case WE_SVR_UPDATE_SYSTABLE_TABLENAME:
328 {
329 rc = fWeDDLprocessor->updateSystableTablename(ibs, errMsg);
330 break;
331 }
332
333 case WE_SVR_UPDATE_SYSTABLES_TABLENAME:
334 {
335 rc = fWeDDLprocessor->updateSystablesTablename(ibs, errMsg);
336 break;
337 }
338
339 case WE_SVR_FILL_COLUMN:
340 {
341 rc = fWeDDLprocessor->fillNewColumn(ibs, errMsg);
342 break;
343 }
344
345 case WE_SVR_DROP_PARTITIONS:
346 {
347 rc = fWeDDLprocessor->dropPartitions(ibs, errMsg);
348 break;
349 }
350
351 case WE_SVR_WRITE_TRUNCATE:
352 {
353 rc = fWeDDLprocessor->writeTruncateLog(ibs, errMsg);
354 break;
355 }
356
357 case WE_SVR_WRITE_DROPPARTITION:
358 {
359 rc = fWeDDLprocessor->writeDropPartitionLog(ibs, errMsg);
360 break;
361 }
362
363 case WE_SVR_WRITE_DROPTABLE:
364 {
365 rc = fWeDDLprocessor->writeDropTableLog(ibs, errMsg);
366 break;
367 }
368
369 case WE_SVR_DELETE_DDLLOG:
370 {
371 rc = fWeDDLprocessor->deleteDDLLog(ibs, errMsg);
372 break;
373 }
374
375 case WE_SVR_FETCH_DDL_LOGS:
376 {
377 rc = fWeDDLprocessor->fetchDDLLog(ibs, errMsg);
378 break;
379 }
380
381 case WE_SVR_PURGEFD:
382 {
383 rc = fWeDMLprocessor->processPurgeFDCache(ibs, errMsg);
384 break;
385 }
386
387 case WE_END_TRANSACTION:
388 {
389 rc = fWeDMLprocessor->processEndTransaction(ibs, errMsg);
390 break;
391 }
392
393 case WE_SRV_FIX_ROWS:
394 {
395 rc = fWeDMLprocessor->processFixRows(ibs, errMsg, PMId);
396 break;
397 }
398
399 case WE_SVR_CLOSE_CONNECTION:
400 {
401 break;
402 }
403
404 default:
405 break;
406 }
407 }
408 catch (std::exception& ex)
409 {
410 logging::LoggingID logid(19, 0, 0);
411 logging::Message::Args args;
412 logging::Message msg(1);
413 args.add("we_readthread caught exception ");
414 args.add(ex.what());
415 msg.format(args);
416 logging::Logger logger(logid.fSubsysID);
417 logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
418 rc = 1;
419 errMsg = msg.msg();
420 }
421 catch (...)
422 {
423 logging::LoggingID logid(19, 0, 0);
424 logging::Message::Args args;
425 logging::Message msg(1);
426 args.add("we_readthread caught ... exception ");
427 msg.format(args);
428 logging::Logger logger(logid.fSubsysID);
429 logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
430 rc = 1;
431 errMsg = msg.msg();
432 }
433
434 if (msgId != WE_SVR_CLOSE_CONNECTION)
435 {
436 //send response
437 obs.restart();
438 obs << uniqueID;
439 obs << rc;
440 obs << errMsg;
441 }
442
443 if ((msgId == WE_SVR_COMMIT_BATCH_AUTO_ON) || (msgId == WE_SVR_BATCH_INSERT_END) || (msgId == WE_SVR_FETCH_DDL_LOGS) || (msgId == WE_SVR_GET_WRITTEN_LBIDS))
444 {
445 obs += ibs;
446 //cout << " sending back hwm info with ibs length " << endl;
447 }
448 else if ((msgId == WE_SVR_BATCH_INSERT) || (msgId == WE_SVR_UPDATE) || (msgId == WE_SVR_DELETE))
449 {
450 obs << PMId;
451 }
452 else if ((msgId == WE_SVR_DML_BULKROLLBACK) ||
453 (msgId == WE_SVR_DML_BULKROLLBACK_CLEANUP))
454 {
455 obs << Config::getLocalModuleID();
456 }
457
458 if (msgId == WE_SVR_UPDATE || msgId == WE_SVR_DELETE)
459 obs << blocksChanged; // send stats back to DMLProc
460
461 blocksChanged = 0; // reset
462
463 if (msgId == WE_SVR_CLOSE_CONNECTION)
464 {
465 //cout << "received request. closing connection ..." << endl;
466 break;
467 }
468 else
469 {
470 try
471 {
472 fIos.write(obs);
473 //cout << "dmlthread sent back response for msgid " << (uint32_t)msgId << " with uniqueID:rc= "
474 //<< (uint32_t)uniqueID<<":"<< (uint32_t)rc<<" and error message is " << errMsg <<endl;
475 //get next message
476 ibs = fIos.read();
477 }
478 catch (...)
479 {
480 break;
481 }
482 }
483 }
484
485 //cout << "closing connection for thread " << pthread_self () << endl;
486 fIos.close();
487 }
488
489
490 //-----------------------------------------------------------------------------
491 //ctor
SplitterReadThread(const messageqcpp::IOSocket & ios,ByteStream & Ibs)492 SplitterReadThread::SplitterReadThread(const messageqcpp::IOSocket& ios,
493 ByteStream& Ibs): ReadThread(ios), fWeDataLoader(*this)
494 {
495 fIbs = Ibs;
496 }
497
498 //-----------------------------------------------------------------------------
499 //copy ctor
SplitterReadThread(const SplitterReadThread & rhs)500 SplitterReadThread::SplitterReadThread(const SplitterReadThread& rhs):
501 ReadThread(rhs.fIos), fWeDataLoader(*this)
502 {
503 fIbs = rhs.fIbs;
504
505 }
506 //-----------------------------------------------------------------------------
507 // dtor
~SplitterReadThread()508 SplitterReadThread::~SplitterReadThread()
509 {
510 }
511 //-----------------------------------------------------------------------------
512 /**
513 * @brief Thread Function which process incoming messages
514 *
515 */
operator ()()516 void SplitterReadThread::operator()()
517 {
518 ByteStream::byte msgId;
519
520 while (fIbs.length() > 0)
521 {
522 fWeDataLoader.updateRxBytes(fIbs.length());
523
524 //do work here...
525 fIbs >> msgId;
526
527 //cout << (int)msgId << endl;
528
529 switch (msgId)
530 {
531 case WE_CLT_SRV_KEEPALIVE:
532 {
533 fWeDataLoader.onReceiveKeepAlive(fIbs);
534 break;
535 }
536
537 case WE_CLT_SRV_DATA:
538 {
539 fWeDataLoader.onReceiveData(fIbs);
540 break;
541 }
542
543 case WE_CLT_SRV_EOD:
544 {
545 fWeDataLoader.onReceiveEod(fIbs);
546 break;
547 }
548
549 case WE_CLT_SRV_MODE:
550 {
551 fWeDataLoader.onReceiveMode(fIbs);
552 break;
553 }
554
555 case WE_CLT_SRV_IMPFILENAME:
556 {
557 fWeDataLoader.onReceiveImportFileName(fIbs);
558 break;
559 }
560
561 case WE_CLT_SRV_CMDLINEARGS:
562 {
563 fWeDataLoader.onReceiveCmdLineArgs(fIbs);
564 break;
565 }
566
567 case WE_CLT_SRV_CMD:
568 {
569 fWeDataLoader.onReceiveCmd(fIbs); //fig out share_ptr on BS& is better
570 break;
571 }
572
573 case WE_CLT_SRV_ACK:
574 {
575 fWeDataLoader.onReceiveAck(fIbs);
576 break;
577 }
578
579 case WE_CLT_SRV_NAK:
580 {
581 fWeDataLoader.onReceiveNak(fIbs);
582 break;
583 }
584
585 case WE_CLT_SRV_PM_ERROR:
586 {
587 fWeDataLoader.onReceiveError(fIbs);
588 break;
589 }
590
591 case WE_CLT_SRV_STARTCPI:
592 {
593 fWeDataLoader.onReceiveStartCpimport();
594 break;
595 }
596
597 case WE_CLT_SRV_BRMRPT:
598 {
599 fWeDataLoader.onReceiveBrmRptFileName(fIbs);
600 break;
601 }
602
603 case WE_CLT_SRV_CLEANUP:
604 {
605 fWeDataLoader.onReceiveCleanup(fIbs);
606 break;
607 }
608
609 case WE_CLT_SRV_ROLLBACK:
610 {
611 fWeDataLoader.onReceiveRollback(fIbs);
612 break;
613 }
614
615 case WE_CLT_SRV_JOBID:
616 {
617 fWeDataLoader.onReceiveJobId(fIbs);
618 break;
619 }
620
621 case WE_CLT_SRV_JOBDATA:
622 {
623 fWeDataLoader.onReceiveJobData(fIbs);
624 break;
625 }
626
627 case WE_CLT_SRV_ERRLOG:
628 {
629 fWeDataLoader.onReceiveErrFileRqst(fIbs);
630 break;
631 }
632
633 case WE_CLT_SRV_BADLOG:
634 {
635 fWeDataLoader.onReceiveBadFileRqst(fIbs);
636 break;
637 }
638
639 default:
640 break;
641
642 }
643
644 fIbs.restart();
645
646 try
647 {
648 //get next message
649 fIbs = fIos.read();
650 }
651 catch (...)
652 {
653 fIbs.restart(); //setting length=0, get out of loop
654 std::cout << "Broken Pipe" << std::endl;
655
656 logging::LoggingID logid(19, 0, 0);
657 logging::Message::Args args;
658 logging::Message msg(1);
659 args.add("SplitterReadThread::operator: Broken Pipe ");
660 msg.format(args);
661 logging::Logger logger(logid.fSubsysID);
662 logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
663 }
664 }
665
666 fIos.close();
667
668 }
669
670 //------------------------------------------------------------------------------
671 // ClearTableLockReadThread constructor.
672 //------------------------------------------------------------------------------
ClearTableLockReadThread(const messageqcpp::IOSocket & ios,ByteStream & Ibs)673 ClearTableLockReadThread::ClearTableLockReadThread(
674 const messageqcpp::IOSocket& ios,
675 ByteStream& Ibs ): ReadThread(ios),
676 fWeClearTableLockCmd(new WE_ClearTableLockCmd("ClearTableLockTool"))
677 {
678 fIbs = Ibs;
679 }
680
681 //------------------------------------------------------------------------------
682 // ClearTableLockReadThread destructor.
683 //------------------------------------------------------------------------------
~ClearTableLockReadThread()684 ClearTableLockReadThread::~ClearTableLockReadThread()
685 {
686 }
687
688 //------------------------------------------------------------------------------
689 // Thread entry point to ClearTableLockReadThread object used to receive msgs
690 // from a cleartablelock tool client.
691 //------------------------------------------------------------------------------
operator ()()692 void ClearTableLockReadThread::operator()()
693 {
694 ByteStream::byte msgId;
695 ByteStream obs;
696 ByteStream::byte rc = 0;
697 std::string errMsg;
698
699 // Read msgid from ByteStream and forward to applicable processing function
700 while (fIbs.length() > 0)
701 {
702 fIbs >> msgId;
703
704 switch (msgId)
705 {
706 case WE_CLT_SRV_CLEAR_TABLE_LOCK:
707 {
708 rc = fWeClearTableLockCmd->processRollback(fIbs, errMsg);
709 break;
710 }
711
712 case WE_CLT_SRV_CLEAR_TABLE_LOCK_CLEANUP:
713 {
714 rc = fWeClearTableLockCmd->processCleanup(fIbs, errMsg);
715 break;
716 }
717
718 default:
719 {
720 break;
721 }
722 }
723
724 // Send response
725 obs.restart();
726 obs << rc;
727 obs << errMsg;
728
729 try
730 {
731 fIos.write(obs);
732
733 // Get next message
734 fIbs = fIos.read();
735 }
736 catch (...)
737 {
738 logging::LoggingID logid(19, 0, 0);
739 logging::Message::Args args;
740 logging::Message msg(1);
741 args.add("ClearTableLockReadThread::operator: Broken Pipe ");
742 msg.format(args);
743 logging::Logger logger(logid.fSubsysID);
744 logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
745 break;
746 }
747 }
748
749 fIos.close();
750 }
751
752
753 //------------------------------------------------------------------------------
754 // RedistributeReadThread constructor.
755 //------------------------------------------------------------------------------
RedistributeReadThread(const messageqcpp::IOSocket & ios,ByteStream & Ibs)756 RedistributeReadThread::RedistributeReadThread(const messageqcpp::IOSocket& ios, ByteStream& Ibs)
757 : ReadThread(ios)
758 {
759 fIbs = Ibs;
760 }
761
762 //------------------------------------------------------------------------------
763 // RedistributeReadThread destructor.
764 //------------------------------------------------------------------------------
~RedistributeReadThread()765 RedistributeReadThread::~RedistributeReadThread()
766 {
767 }
768
769 //------------------------------------------------------------------------------
770 // Thread entry point to RedistributeReadThread object used to receive msgs
771 // from a cleartablelock tool client.
772 //------------------------------------------------------------------------------
operator ()()773 void RedistributeReadThread::operator()()
774 {
775 try
776 {
777 redistribute::Redistribute::handleRedistributeMessage(fIbs, fIos);
778 }
779 catch (...)
780 {
781 logging::LoggingID logid(19, 0, 0);
782 logging::Message::Args args;
783 logging::Message msg(1);
784 args.add("RedistributeReadThread::operator exception handled ");
785 msg.format(args);
786 logging::Logger logger(logid.fSubsysID);
787 logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
788 }
789
790 fIos.close();
791 }
792 //------------------------------------------------------------------------------
793 // GetFileSizeThread constructor.
794 //------------------------------------------------------------------------------
GetFileSizeThread(const messageqcpp::IOSocket & ios,ByteStream & Ibs,BRM::DBRM & dbrm)795 GetFileSizeThread::GetFileSizeThread(const messageqcpp::IOSocket& ios, ByteStream& Ibs, BRM::DBRM& dbrm)
796 : ReadThread(ios), fWeGetFileSizes(new WE_GetFileSizes())
797 {
798 fIbs = Ibs;
799 key = dbrm.getUnique32();
800 }
801
802 //------------------------------------------------------------------------------
803 // GetFileSizeThread destructor.
804 //------------------------------------------------------------------------------
~GetFileSizeThread()805 GetFileSizeThread::~GetFileSizeThread()
806 {
807 }
808
809 //------------------------------------------------------------------------------
810 // Thread entry point to GetFileSizeThread object used to receive msgs
811 //------------------------------------------------------------------------------
operator ()()812 void GetFileSizeThread::operator()()
813 {
814 ByteStream::byte msgId;
815 ByteStream obs;
816 ByteStream::byte rc = 0;
817 std::string errMsg;
818
819 // Read msgid from ByteStream and forward to applicable processing function
820 while (fIbs.length() > 0)
821 {
822 fIbs >> msgId;
823
824 switch (msgId)
825 {
826 case WE_SVR_GET_FILESIZES:
827 {
828 rc = fWeGetFileSizes->processTable(fIbs, errMsg, key);
829 break;
830 }
831
832 case WE_SVR_GET_FILESIZE:
833 {
834 rc = fWeGetFileSizes->processFileName(fIbs, errMsg, key);
835 break;
836 }
837
838 default:
839 {
840 break;
841 }
842 }
843
844 // Send response
845 obs.restart();
846 obs << rc;
847 obs << errMsg;
848 obs += fIbs;
849
850 try
851 {
852 fIos.write(obs);
853
854 // Get next message
855 fIbs = fIos.read();
856 }
857 catch (...)
858 {
859 logging::LoggingID logid(19, 0, 0);
860 logging::Message::Args args;
861 logging::Message msg(1);
862 args.add("GetFileSizeThread::operator: Broken Pipe ");
863 msg.format(args);
864 logging::Logger logger(logid.fSubsysID);
865 logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
866 break;
867 }
868 }
869
870 fIos.close();
871 }
872
873 //-----------------------------------------------------------------------------
874
CreateReadThread(ThreadPool & Tp,IOSocket & Ios,BRM::DBRM & dbrm)875 void ReadThreadFactory::CreateReadThread(ThreadPool& Tp, IOSocket& Ios, BRM::DBRM& dbrm)
876 {
877 struct timespec rm_ts;
878 int sleepTime = 20000; // wait for 20 seconds
879 rm_ts.tv_sec = sleepTime / 1000;
880 rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
881 bool isTimeOut = false;
882
883 ByteStream::byte msgId;
884 ByteStream aBs;
885
886 try
887 {
888 aBs = Ios.read(&rm_ts, &isTimeOut);
889 }
890 catch (std::exception& ex)
891 {
892 std::cout << "Handled : " << ex.what() << std::endl;
893 logging::LoggingID logid(19, 0, 0);
894 logging::Message::Args args;
895 logging::Message msg(1);
896 args.add("ReadThreadFactory::CreateReadThread: read() error");
897 args.add(ex.what());
898 msg.format(args);
899 logging::Logger logger(logid.fSubsysID);
900 logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid);
901 }
902
903 if ((aBs.length() <= 0) || (isTimeOut))
904 {
905 Ios.close();
906 return;
907 }
908
909 aBs.peek(msgId);
910
911 switch (msgId)
912 {
913 case WE_SVR_DDL_KEEPALIVE:
914 case WE_SVR_DML_KEEPALIVE:
915 {
916 DmlReadThread dmlReadThread(Ios, aBs);
917 boost::thread t(dmlReadThread);
918 //cout << "starting DML thread id " << t.get_id() << endl;
919 }
920 break;
921
922 case WE_CLT_SRV_KEEPALIVE:
923 case WE_CLT_SRV_MODE:
924 case WE_CLT_SRV_DATA:
925 case WE_CLT_SRV_CMD:
926 case WE_CLT_SRV_ACK:
927 case WE_CLT_SRV_NAK:
928 case WE_CLT_SRV_PM_ERROR:
929 case WE_CLT_SRV_CMDLINEARGS:
930 {
931 //SplitterReadThread aSpReadThread(Ios, aBs);
932 //fOwner.attach(reinterpret_cast<Observer*>(&(aSpReadThread.fWeDataLoader)));
933 //Tp.invoke(aSpReadThread);
934 Tp.invoke(SplitterReadThread(Ios, aBs));
935 }
936 break;
937
938 case WE_CLT_SRV_CLEAR_TABLE_LOCK:
939 case WE_CLT_SRV_CLEAR_TABLE_LOCK_CLEANUP:
940 {
941 ClearTableLockReadThread clearTableLockThread(Ios, aBs);
942 Tp.invoke( clearTableLockThread );
943 }
944 break;
945
946 case WE_SVR_REDISTRIBUTE:
947 {
948 RedistributeReadThread RedistributeReadThread(Ios, aBs);
949 Tp.invoke(RedistributeReadThread);
950 }
951 break;
952
953 case WE_SVR_GET_FILESIZES:
954 case WE_SVR_GET_FILESIZE:
955 {
956 GetFileSizeThread getFileSizeThread(Ios, aBs, dbrm);
957 Tp.invoke(getFileSizeThread);
958 }
959 break;
960
961 default:
962 {
963 Ios.close(); // don't know who is this
964 }
965 break;
966
967 }
968
969 }
970
971 }
972
973