1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17 /*******************************************************************************
18 * $Id$
19 *
20 *******************************************************************************/
21 /*
22 * we_dataloader.cpp
23 *
24 * Created on: Oct 4, 2011
25 * Author: Boby Paul: bpaul@calpont.com
26 */
27
28 #include "mcsconfig.h" // Used to pickup STRERROR_R_CHAR_P definition
29
30 #include <cstdlib>
31 #include <csignal>
32 #include <cstring>
33 #include <cerrno>
34
35 #include <unistd.h> //pipe() && fork()
36 #if defined(__linux__)
37 #include <wait.h> //wait()
38 #elif defined(__FreeBSD__)
39 #include <sys/types.h>
40 #include <sys/stat.h> // For stat().
41 #include <sys/wait.h>
42 #include <sys/time.h>
43 #include <sys/resource.h>
44 #endif
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <time.h>
48 #include "bytestream.h"
49 #include "rwlock_local.h"
50
51 #include <iostream>
52 #include <fstream>
53 #include <vector>
54 #include <queue>
55 #include <string>
56 #include <map>
57 using namespace std;
58
59
60 #include <boost/thread/condition.hpp>
61 #include <boost/scoped_array.hpp>
62 #include <boost/thread.hpp>
63 #include <boost/filesystem.hpp>
64 using namespace boost;
65
66 #include "bytestream.h"
67 #include "messagequeue.h"
68 using namespace messageqcpp;
69
70 #include "we_messages.h"
71 #include "we_brmrprtparser.h"
72 #include "we_cleartablelockcmd.h"
73 #include "we_dataloader.h"
74 #include "we_readthread.h"
75
76 #include "installdir.h"
77
78 namespace WriteEngine
79 {
80
81 //bool WEDataLoader::fTearDownCpimport=false; // @bug 4267
82
83 //-----------------------------------------------------------------------------
84 /**
85 *
86 * @brief WEDataLoader::Constructor
87 *
88 **/
WEDataLoader(SplitterReadThread & Srt)89 WEDataLoader::WEDataLoader(SplitterReadThread& Srt ): fRef(Srt),
90 fMode(-1),
91 fDataDumpFile(),
92 fTxBytes(0),
93 fRxBytes(0),
94 fPmId(0),
95 fCh_pid(0),
96 fThis_pid(0),
97 fP_pid(0),
98 fpCfThread(0),
99 fTearDownCpimport(false), // @bug 4267
100 fWaitPidRc(0), // @bug 4267
101 fWaitPidStatus(0), // @bug 4267
102 fForceKill(false),
103 fPipeErr(false),
104 fpSysLog(0)
105 {
106 Config weConfig;
107 uint16_t localModuleId = weConfig.getLocalModuleID();
108 fPmId = static_cast<char>(localModuleId);
109
110 srand ( time(NULL) ); // initialize random seed
111 int aObjId = rand() % 10000 + 1; // generate a random number
112
113 setObjId(aObjId);
114
115 setupSignalHandlers();
116
117 if (!fpSysLog)
118 {
119 fpSysLog = SimpleSysLog::instance();
120 fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SRV));
121 }
122 }
123 //-----------------------------------------------------------------------------
124 /**
125 *
126 * @brief WEDataLoader::Destructor
127 *
128 **/
129
~WEDataLoader()130 WEDataLoader::~WEDataLoader()
131 {
132
133 try
134 {
135 if (fDataDumpFile.is_open()) fDataDumpFile.close();
136
137 cout << "\tRx Bytes " << getRxBytes() << endl;
138 cout << "\tTX Bytes " << getTxBytes() << endl;
139
140 cout << "\tChild PID " << getChPid() << endl;
141
142 if (getChPid())
143 {
144 if (2 == getMode()) //@bug 5012
145 {
146 kill(getChPid(), SIGINT);
147 teardownCpimport(fTearDownCpimport);
148 }
149 else
150 {
151 teardownCpimport(false); // @bug 4267
152 }
153 }
154
155 }
156 catch (std::exception&) // @bug 4164: exception causing thread to exit
157 {
158 cout << "Error tearing down cpimport in WEDataLoader destructor" << endl;
159 }
160
161 //cout << "Leaving WEDataLoader destructor" << endl;
162 }
163
164 //------------------------------------------------------------------------------
165 // Initialize signal handling
166 //------------------------------------------------------------------------------
167
setupSignalHandlers()168 void WEDataLoader::setupSignalHandlers()
169 {
170 #ifndef _MSC_VER
171 signal(SIGPIPE, SIG_IGN);
172 signal(SIGCHLD, WEDataLoader::onSigChild);
173 #endif
174 }
175 //------------------------------------------------------------------------------
176 // handles on signal Terminate
177 //------------------------------------------------------------------------------
onSigChild(int aInt)178 void WEDataLoader::onSigChild(int aInt)
179 {
180 std::string aStr = "Received SIGCHLD of terminated process..";
181 cout << aStr << endl;
182 // fTearDownCpimport = true; // @bug 4267
183
184 // commented out for non-static variables
185 //ostringstream oss;
186 //oss << getObjId() <<" : " <<aStr;
187 //logging::Message::Args errMsgArgs;
188 //errMsgArgs.add(oss.str());
189 //fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
190
191 }
192
193
194 //-----------------------------------------------------------------------------
195 /**
196 *
197 * @brief WEDataLoader::update
198 *
199 **/
200
update(Subject * pSub)201 bool WEDataLoader::update(Subject* pSub)
202 {
203 return true;
204 }
205 //-----------------------------------------------------------------------------
206 /**
207 *
208 * @brief setup cpimport as a seperate process
209 *
210 **/
211
setupCpimport()212 bool WEDataLoader::setupCpimport() // fork the cpimport
213 {
214 pid_t aChPid;
215
216 errno = 0;
217
218 if (pipe(fFIFO) == -1)
219 {
220 int errnum = errno;
221 ostringstream oss;
222 oss << getObjId() << " : Error in creating pipe (errno-" <<
223 errnum << "); " << strerror(errnum);
224 throw runtime_error( oss.str() );
225 }
226
227 setPid(getpid());
228 setPPid(getppid());
229
230 errno = 0;
231 aChPid = fork();
232
233 if (aChPid == -1) //an error caused
234 {
235 int errnum = errno;
236 ostringstream oss;
237 oss << getObjId() << " : Error in forking cpimport.bin (errno-" <<
238 errnum << "); " << strerror(errnum);
239 throw runtime_error( oss.str() );
240 }
241 else if (aChPid == 0) // we are in child
242 {
243 int aStartFD = 3;
244 int aEndFD = fFIFO[1] + 256;
245 close(fFIFO[1]); //close the WRITER of CHILD
246
247 cout << "Child Process Info: PID = " << getpid()
248 << " (fFIFO[0], fFIFO[1]) = (" << fFIFO[0] << "," << fFIFO[1] << ")"
249 << " (StartFD, EndFD) = (" << aStartFD << "," << aEndFD << ")" << endl;
250
251 std::vector<char*> Cmds;
252 //str2Argv(fCmdLineStr, Cmds); // to avoid out-of-scope problem
253 std::string aCmdLine = fCmdLineStr;
254 std::istringstream ss(aCmdLine);
255 std::string arg;
256 std::vector<std::string> v2(20, "");
257 unsigned int i = 0;
258
259 while (ss >> arg)
260 {
261 v2[i++] = arg;
262 }
263
264 for (unsigned int j = 0; j < i; ++j)
265 {
266 Cmds.push_back(const_cast<char*>(v2[j].c_str()));
267 }
268
269 Cmds.push_back(0); //null terminate
270 //updatePrgmPath(Cmds);
271
272 //NOTE: for debugging
273 int aSize = Cmds.size();
274
275 for (int aIdx = 0; aIdx < aSize; ++aIdx)
276 {
277 cout << "Args " << Cmds[aIdx] << endl;
278 }
279
280 cout.flush();
281
282 close(0); //close stdin for the child
283 dup2(fFIFO[0], 0); //make stdin be the reading end of the pipe
284
285 //BUG 4410 : hacky solution so that CHLD process get EOF on close of pipe
286 for (int i = aStartFD; i < aEndFD; i++) close(i);
287
288 errno = 0;
289 int aRet = execvp(Cmds[0], &Cmds[0]); //NOTE - works with full Path
290 //int aRet = execvp(Cmds[0], &Cmds[0]); //NOTE - works if $PATH has cpimport
291
292 int execvErrno = errno;
293 cout << "Return status of cpimport is " << aRet << endl;
294 cout.flush();
295 close(fFIFO[0]); // will trigger an EOF on stdin
296 ostringstream oss;
297 oss << getObjId() << " : execv error: cpimport.bin invocation failed; "
298 << "(errno-" << errno << "); " << strerror(execvErrno) <<
299 "; Check file and try invoking locally.";
300 logging::Message::Args errMsgArgs;
301 errMsgArgs.add(oss.str());
302 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
303
304 if (aRet == -1) exit(-1);
305 }
306 else // parent
307 {
308 setChPid(aChPid); // This is the child PID
309 cout << "Child PID is " << this->getChPid() << endl;
310 close(fFIFO[0]); //close the READER of PARENT
311 // now we can send all the data thru FIFO[1], writer of PARENT
312 }
313
314 if (aChPid == 0)
315 cout << "******** Child finished its work ********" << endl;
316
317 return true;
318 }
319 //-----------------------------------------------------------------------------
320 /**
321 *
322 * @brief close all file handles opened for cpimport
323 * @brief wait for the cpimport process to finish work
324 *
325 **/
326
teardownCpimport(bool useStoredWaitPidStatus)327 void WEDataLoader::teardownCpimport(bool useStoredWaitPidStatus) // @bug 4267
328 {
329 fTearDownCpimport = false; //Reset it
330 //cout << "Tearing down Cpimport" << endl;
331 int aStatus;
332
333 //cout << "checking fpCfThread value" << endl;
334 if (fpCfThread)
335 {
336 //cout << "checking fpCfThread has a valid value" << endl;
337
338 //wait until we are done with the queued messages
339 while ((!fpCfThread->isMsgQueueEmpty()) && (!fpCfThread->isStopped()))
340 {
341 //cout << "DEBUG : MsgQueue not empty" << endl;
342 //cannot be too low, since there is a lock in isMsgQueueEmpty()
343 usleep(2000000);
344 }
345
346 // while(fpCfThread->isPushing())
347 // {
348 // cout << "DEBUG : still pushing" << endl;
349 // usleep(100000);
350 // }
351
352 if (fpSysLog)
353 {
354 ostringstream oss;
355 oss << getObjId() << " : Message Queue is empty; Stopping CF Thread";
356 logging::Message::Args errMsgArgs;
357 errMsgArgs.add(oss.str());
358 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
359 }
360
361 fpCfThread->stopThread();
362
363 while (!fpCfThread->isStopped())
364 {
365 cout << "DEBUG : still not stopped" << endl;
366 usleep(100000);
367 }
368
369 delete fpCfThread;
370 fpCfThread = 0;
371 }
372
373
374 closeWritePipe();
375 pid_t aPid;
376
377 // @bug 4267 begin: call waitpid() to get job status or use stored job status
378 // aPid = waitpid(getChPid(), &aStatus, 0); // wait until cpimport finishs
379 if (useStoredWaitPidStatus)
380 {
381 aPid = fWaitPidRc;
382 aStatus = fWaitPidStatus;
383 }
384 else
385 {
386 //aPid = waitpid(getChPid(), &aStatus, 0); // wait until cpimport finishs
387 aPid = waitpid(getChPid(), &aStatus, WNOHANG); // wait until cpimport finishs
388 int aIdx = 0;
389
390 while ((aPid == 0) && (aIdx < 25 * MAX_QSIZE)) //Do not loop infinitly
391 {
392 usleep(2000000);
393 aPid = waitpid(getChPid(), &aStatus, WNOHANG);
394 cout << "Inside tearDown waitpid rc[" << aIdx << "] = " << aPid << endl;
395 ++aIdx;
396 }
397 }
398
399 // @bug 4267 end // BP - added -1 as per DMC comment below
400 if ((aPid == getChPid()) || (aPid == -1)) // @bug 4267 (DMC-shouldn't we check for aPid of -1?)
401 {
402 setChPid(0);
403
404 if ((WIFEXITED(aStatus)) && (WEXITSTATUS(aStatus) == 0))
405 {
406 cout << "\tCpimport exit on success" << endl;
407 ostringstream oss;
408 oss << getObjId() << " : cpimport exit on success";
409 logging::Message::Args errMsgArgs;
410 errMsgArgs.add(oss.str());
411 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
412
413 onCpimportSuccess();
414 }
415 else
416 {
417 int termsig = (WIFSIGNALED(aStatus) ? WTERMSIG(aStatus) : -1);
418
419 if (!fForceKill)
420 {
421 cout << "\tCpimport exit on failure (signal " << termsig << ')' << endl;
422 ostringstream oss;
423 oss << getObjId() << " : cpimport exit on failure (signal " << termsig << ')';
424 logging::Message::Args errMsgArgs;
425 errMsgArgs.add(oss.str());
426 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
427 onCpimportFailure();
428 }
429 else
430 {
431 cout << "\tCpimport exit on Force Kill!!" << endl;
432 ostringstream oss;
433 oss << getObjId() << " : cpimport exit on Force kill!!";
434 logging::Message::Args errMsgArgs;
435 errMsgArgs.add(oss.str());
436 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
437 onCpimportSuccess();
438 }
439 }
440 }
441 }
442 //-----------------------------------------------------------------------------
443 /**
444 * @brief Push the data to cpimport from the incoming ByteStream
445 * @param Incoming ByteStream
446 *
447 */
448 //
pushData2Cpimport(ByteStream & Ibs)449 void WEDataLoader::pushData2Cpimport(ByteStream& Ibs)
450 {
451 if (Ibs.length() > 0)
452 {
453 int aLen = Ibs.length();
454 char* pStart = reinterpret_cast<char*>(Ibs.buf());
455 char* pEnd = pStart + aLen;
456 char* pPtr = pStart;
457
458 while (pPtr < pEnd)
459 {
460 // if(pEnd > (pPtr + MAX_LEN))
461 // {
462 // int aRet = write(fFIFO[1], pPtr, MAX_LEN);
463 // if(aRet == -1) throw runtime_error("Pipe write error");
464 // //write(fFIFO[1], Ibs.buf(), Ibs.length());
465 // pPtr += MAX_LEN;
466 // }
467 // else
468 // {
469 // int aStrLen = pEnd - pPtr;
470 // int aRet = write(fFIFO[1], pPtr, aStrLen);
471 // if(aRet == -1) throw runtime_error("Pipe write error");
472 // pPtr += aStrLen;
473 // }
474
475 try
476 {
477 int aRet = write(fFIFO[1], pPtr, pEnd - pPtr);
478
479 if (aRet < 0)
480 {
481 if (!fPipeErr)
482 {
483 int e = errno;
484 fPipeErr = true;
485 std::string aStr = "pushing data : PIPE error .........";
486
487 char errMsgBuf[160];
488 #if STRERROR_R_CHAR_P
489 const char* pErrMsg = strerror_r(
490 e, errMsgBuf, sizeof(errMsgBuf));
491
492 if (pErrMsg)
493 aStr += pErrMsg;
494
495 #else
496 int errMsgRc = strerror_r(e, errMsgBuf, sizeof(errMsgBuf));
497
498 if (errMsgRc == 0)
499 aStr += errMsgBuf;
500
501 #endif
502 logging::Message::Args errMsgArgs;
503 errMsgArgs.add(aStr);
504 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
505 }
506
507 throw runtime_error("Pipe Error - cpimport.bin exited already!!");
508 }
509
510 pPtr += aRet;
511 }
512 catch (...)
513 {
514 //std::string aStr = "pushing data PIPE error .........";
515 //logging::Message::Args errMsgArgs;
516 //errMsgArgs.add(aStr);
517 //fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
518 throw runtime_error("Pipe Error - cpimport.bin exited already!!");
519 }
520 }
521
522 }
523 }
524
525 //-----------------------------------------------------------------------------
526 /**
527 *
528 * @brief Close the pipe through which data was written to cpimport
529 * @brief This will also signal a EOF to the reading pipe.
530 *
531 **/
532
closeWritePipe()533 void WEDataLoader::closeWritePipe()
534 {
535 cout << "Going to close call = " << fFIFO[1] << endl;
536 //NOTE this will flush the file buffer and close it.
537 int aRet = close(fFIFO[1]); // will trigger EOD
538 cout << "----- closed both pipes -------- aRet = " << aRet << endl;
539 }
540
541 //-----------------------------------------------------------------------------
542 /**
543 *
544 * @brief Tokenize a string into char** argv format and store in a vector
545 * @brief we pass the V as arguments to exec cpimport
546 * @param CmdLine is the string form of arguments demlimited by space
547 * @param V vector which contains each element of argv
548 *
549 **/
550
str2Argv(std::string CmdLine,std::vector<char * > & V)551 void WEDataLoader::str2Argv(std::string CmdLine, std::vector<char*>& V)
552 {
553 std::istringstream ss(CmdLine);
554 std::string arg;
555 std::vector<std::string> v2;
556
557 while (ss >> arg)
558 {
559 v2.push_back(arg);
560 V.push_back(const_cast<char*>(v2.back().c_str()));
561 }
562
563 V.push_back(0); //null terminate
564 }
565
566 //-----------------------------------------------------------------------------
567 /**
568 *
569 * @brief Event to trigger when cpimport is successful.
570 *
571 **/
572
onCpimportSuccess()573 void WEDataLoader::onCpimportSuccess()
574 {
575
576 ByteStream obs;
577
578 cout << "Sending BRMRPT" << endl;
579 obs << (ByteStream::byte)WE_CLT_SRV_BRMRPT;
580 obs << (ByteStream::byte)fPmId; // PM id
581 // for testing
582 //std::string fRptFileName("ReportFile.txt");
583 BrmReportParser aBrmRptParser;
584 bool aRet = aBrmRptParser.serialize(fBrmRptFileName, obs);
585
586 if (aRet)
587 {
588 boost::mutex::scoped_lock aLock(fClntMsgMutex);
589 //aBrmRptParser.unserialize(obs); - was for testing
590 updateTxBytes(obs.length());
591
592 try
593 {
594 fRef.fIos.write(obs);
595 }
596 catch (...)
597 {
598 cout << "Broken Pipe .." << endl;
599
600 if (fpSysLog)
601 {
602 ostringstream oss;
603 oss << getObjId() << " : Broken Pipe : socket write failed ";
604 logging::Message::Args errMsgArgs;
605 errMsgArgs.add(oss.str());
606 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
607 }
608 }
609
610 aLock.unlock();
611 cout << "Finished Sending BRMRPT" << endl;
612 }
613 else
614 {
615 cout << "Failed to serialize BRMRpt " << endl;
616 }
617
618 // if(remove(fBrmRptFileName.c_str()) != 0)
619 // cout <<"Failed to delete BRMRpt File "<< fBrmRptFileName << endl;
620 //usleep(1000000); //sleep 1 second.
621
622 obs.reset();
623 obs << (ByteStream::byte)WE_CLT_SRV_CPIPASS;
624 obs << (ByteStream::byte)fPmId; // PM id
625 boost::mutex::scoped_lock aLock(fClntMsgMutex);
626 updateTxBytes(obs.length());
627
628 try
629 {
630 fRef.fIos.write(obs);
631 }
632 catch (...)
633 {
634 cout << "Broken Pipe .." << endl;
635
636 if (fpSysLog)
637 {
638 ostringstream oss;
639 oss << getObjId() << " : Broken Pipe : socket write failed ";
640 logging::Message::Args errMsgArgs;
641 errMsgArgs.add(oss.str());
642 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
643 }
644
645 }
646
647 aLock.unlock();
648
649 cout << "Sent CPIPASS info" << endl;
650
651 if (fpSysLog)
652 {
653 ostringstream oss;
654 oss << getObjId() << " : onCpimportSuccess BrmReport Send";
655 logging::Message::Args errMsgArgs;
656 errMsgArgs.add(oss.str());
657 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
658 }
659
660 }
661
662 //-----------------------------------------------------------------------------
663 /**
664 *
665 * @brief Event to trigger if a cpimport failure occurs.
666 *
667 **/
onCpimportFailure()668 void WEDataLoader::onCpimportFailure()
669 {
670 // Send failure notice back to the parent splitter job
671 sendCpimportFailureNotice();
672
673 //Even if we failed, we have failure info in BRMRPT
674 ByteStream obs;
675 obs << (ByteStream::byte)WE_CLT_SRV_BRMRPT;
676 obs << (ByteStream::byte)fPmId; // PM id
677 BrmReportParser aBrmRptParser;
678 bool aRet = aBrmRptParser.serialize(fBrmRptFileName, obs);
679
680 if (aRet)
681 {
682 boost::mutex::scoped_lock aLock(fClntMsgMutex);
683 updateTxBytes(obs.length());
684
685 try
686 {
687 fRef.fIos.write(obs);
688 }
689 catch (std::exception& ex)
690 {
691 cout << "Broken Pipe .." << ex.what() << endl;
692
693 if (fpSysLog)
694 {
695 ostringstream oss;
696 oss << getObjId() << " : Broken Pipe : socket write failed; " <<
697 ex.what();
698 logging::Message::Args errMsgArgs;
699 errMsgArgs.add(oss.str());
700 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR,
701 logging::M0000);
702 }
703 }
704
705 aLock.unlock();
706 cout << "Finished Sending BRMRPT" << endl;
707 }
708
709 if (remove(fBrmRptFileName.c_str()) != 0)
710 cout << "Failed to delete BRMRpt File " << fBrmRptFileName << endl;
711
712 if (fpSysLog)
713 {
714 ostringstream oss;
715 oss << getObjId() << " : onCpimportFailure BrmReport Send";
716 logging::Message::Args errMsgArgs;
717 errMsgArgs.add(oss.str());
718 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
719 }
720 }
721
722 //-----------------------------------------------------------------------------
723 // Send msg to front-end splitter to notify it that a cpimport.bin pgm failed.
724 //-----------------------------------------------------------------------------
sendCpimportFailureNotice()725 void WEDataLoader::sendCpimportFailureNotice()
726 {
727 ByteStream obs;
728 obs << (ByteStream::byte)WE_CLT_SRV_CPIFAIL;
729 obs << (ByteStream::byte)fPmId; // PM id
730 boost::mutex::scoped_lock aLock(fClntMsgMutex);
731 updateTxBytes(obs.length());
732
733 try
734 {
735 fRef.fIos.write(obs);
736 }
737 catch (...)
738 {
739 cout << "Broken Pipe .." << endl;
740
741 if (fpSysLog)
742 {
743 ostringstream oss;
744 oss << getObjId() << " : Broken Pipe : socket write failed ";
745 logging::Message::Args errMsgArgs;
746 errMsgArgs.add(oss.str());
747 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
748 }
749 }
750 }
751
752 //-----------------------------------------------------------------------------
753 /**
754 * @brief Event when a KEEPALIVE arrives.
755 * @param Incoming ByteStream, not used currently
756 */
onReceiveKeepAlive(ByteStream & Ibs)757 void WEDataLoader::onReceiveKeepAlive(ByteStream& Ibs)
758 {
759 /*
760 // TODO comment out when we done with debug
761 if(fpSysLog)
762 {
763 ostringstream oss;
764 oss << getObjId() <<" : Received KEEPALIVE";
765 logging::Message::Args errMsgArgs;
766 errMsgArgs.add(oss.str());
767 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
768 }
769 */
770 //cout << "Received KEEPALIVE" << endl;
771 // NOTE only seldom a KEEPALIVE receives,
772 // so nothing wrong in responding with a KEEPALIVE.
773 ByteStream obs;
774 obs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE;
775 obs << (ByteStream::byte)fPmId; // PM id
776 boost::mutex::scoped_lock aLock(fClntMsgMutex);
777 updateTxBytes(obs.length());
778
779 try
780 {
781 fRef.fIos.write(obs);
782 }
783 catch (...)
784 {
785 cout << "Broken Pipe .." << endl;
786
787 if (fpSysLog)
788 {
789 ostringstream oss;
790 oss << getObjId() << ": Broken Pipe : socket write failed ";
791 logging::Message::Args errMsgArgs;
792 errMsgArgs.add(oss.str());
793 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
794 }
795 }
796
797 aLock.unlock();
798
799 // @bug 4267 begin
800 int aStatus;
801 pid_t aPid;
802
803 if (getChPid() > 0)
804 {
805 aPid = waitpid(getChPid(), &aStatus, WNOHANG); // wait until cpimport finishs
806
807 if (aPid != 0)
808 {
809 cout << "waitpid(" << getChPid() << "): rc-" << aPid <<
810 "; status-" << aStatus << "; exited-" <<
811 (WIFEXITED(aStatus)) << endl;
812 }
813
814 if ((aPid == getChPid()) || (aPid == -1))
815 {
816 fTearDownCpimport = true;
817 fWaitPidRc = aPid;
818 fWaitPidStatus = aStatus;
819 }
820 }
821
822 // @bug 4267 end
823
824 if (fTearDownCpimport)
825 {
826 //fTearDownCpimport = false; //Reset it // commented out to use the flag in EOD
827 cout << "Cpimport terminated " << endl;
828
829 if (0 == getMode()) onReceiveEod(Ibs);
830 else if (1 == getMode()) //mode 1 has to drive from UM
831 {
832 ByteStream obs;
833 obs << (ByteStream::byte)WE_CLT_SRV_EOD;
834 obs << (ByteStream::byte)fPmId; // PM id
835 boost::mutex::scoped_lock aLock(fClntMsgMutex);
836 updateTxBytes(obs.length());
837
838 try
839 {
840 fRef.fIos.write(obs);
841 }
842 catch (...)
843 {
844 cout << "Broken Pipe .." << endl;
845
846 if (fpSysLog)
847 {
848 ostringstream oss;
849 oss << getObjId() << ": Broken Pipe : socket write failed ";
850 logging::Message::Args errMsgArgs;
851 errMsgArgs.add(oss.str());
852 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
853 }
854 }
855
856 aLock.unlock();
857 }
858 else if (2 == getMode())
859 {
860 //if(getChPid()) teardownCpimport(true); // @bug 4267
861 if (getChPid()) teardownCpimport(fTearDownCpimport); // @bug 4267 BP
862 }
863 }
864 else
865 {
866 //if(1 == getMode())
867 // if(fpCfThread) cout << "Queue Size = " << fpCfThread->getQueueSize() << endl;
868 }
869
870 }
871 //-----------------------------------------------------------------------------
872 /**
873 * @brief trigger when a DATA arrives.
874 * @param Incoming ByteStream which contains data
875 */
876
onReceiveData(ByteStream & Ibs)877 void WEDataLoader::onReceiveData(ByteStream& Ibs)
878 {
879
880 if ((0 == getMode()) && (fDataDumpFile.is_open()))
881 {
882 // Will write to the output file.
883 fDataDumpFile << Ibs;
884 sendDataRequest();
885 }
886 else if ( 1 == getMode())
887 {
888 // commented out since we are going to use seperate thread
889 //pushData2Cpimport(Ibs);
890
891 if (fpCfThread)
892 {
893 fpCfThread->add2MsgQueue(Ibs);
894 //sendDataRequest(); // Need to control Queue Size
895 // Bug 5031 : Will only send 1 rqst for a batch to cpimport.bin
896 //if(fpCfThread->getQueueSize()<MIN_QSIZE) sendDataRequest();
897 //if(fpCfThread->getQueueSize()<MAX_QSIZE) sendDataRequest();
898
899 int aQsz = (fpCfThread) ? fpCfThread->getQueueSize() : 0;
900
901 // Bug 5031 : If Q size goes above 100 (2*250);
902 if (aQsz < MAX_QSIZE) sendDataRequest();
903
904 if (aQsz > 1.5 * MAX_QSIZE) // > 2*250
905 {
906 cout << "WARNING : Data Queuing up : QSize = " << aQsz << endl;
907
908 if (fpSysLog)
909 {
910 ostringstream oss;
911 oss << getObjId() << "WARNING : Data Queuing up : QSize = " << aQsz;
912 logging::Message::Args errMsgArgs;
913 errMsgArgs.add(oss.str());
914 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
915 }
916 }
917 }
918
919 }
920 else if ( 2 == getMode())
921 {
922 cout << "onReceiveData : In Mode 2 NO data suppose to receive" << endl;
923 }
924
925
926 // ByteStream obs;
927 // obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
928 // obs << (ByteStream::byte)fPmId; // PM id
929 // updateTxBytes(obs.length());
930 // fRef.fIos.write(obs);
931 }
932
933 //-----------------------------------------------------------------------------
934 /**
935 * @brief trigger when a EOD arrives.
936 * @param Incoming ByteStream; not relevent for now
937 */
onReceiveEod(ByteStream & Ibs)938 void WEDataLoader::onReceiveEod(ByteStream& Ibs)
939 {
940 if (fpSysLog)
941 {
942 ostringstream oss;
943 oss << getObjId() << " : onReceiveEOD : child ID = " << getChPid();
944 logging::Message::Args errMsgArgs;
945 errMsgArgs.add(oss.str());
946 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
947 }
948
949 cout << "Received EOD " << endl;
950
951 if (0 == getMode())
952 {
953 fDataDumpFile.close();
954 }
955
956 ByteStream obs;
957 obs << (ByteStream::byte)WE_CLT_SRV_EOD;
958 obs << (ByteStream::byte)fPmId; // PM id
959 boost::mutex::scoped_lock aLock(fClntMsgMutex);
960 updateTxBytes(obs.length());
961
962 try
963 {
964 fRef.fIos.write(obs);
965 }
966 catch (...)
967 {
968 cout << "Broken Pipe .." << endl;
969
970 if (fpSysLog)
971 {
972 ostringstream oss;
973 oss << getObjId() << " : Broken Pipe : socket write failed ";
974 logging::Message::Args errMsgArgs;
975 errMsgArgs.add(oss.str());
976 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
977 }
978 }
979
980 aLock.unlock();
981
982 //if(( 1 == getMode())||( 2 == getMode()))
983 if (1 == getMode()) //BUG 4370 - seperated mode 1 & 2
984 {
985 //if(getChPid()) teardownCpimport(false); // @bug 4267
986 if (getChPid()) teardownCpimport(fTearDownCpimport); // @bug 4267 //BP changed to send the correct flag
987 }
988 else if (2 == getMode()) //BUG 4370
989 {
990 if (getChPid())
991 {
992 kill(getChPid(), SIGINT); //BUG 4370
993 fForceKill = true;
994 teardownCpimport(fTearDownCpimport); //BUG 4370
995 }
996 }
997
998
999 }
1000 //-----------------------------------------------------------------------------
1001 /**
1002 * @brief Event on Command Received. It should contain sub commands
1003 * @param Incoming ByteStream, will have sub commands
1004 */
onReceiveCmd(ByteStream & bs)1005 void WEDataLoader::onReceiveCmd(ByteStream& bs)
1006 {
1007 //TODO - can be cpimport cmd or server cmd, for now write to a file
1008 ByteStream::byte aCmdId;
1009 bs >> aCmdId;
1010 }
1011 //-----------------------------------------------------------------------------
1012 /**
1013 * @brief The mode in which WES running.
1014 * @param Incoming ByteStream, not relevent
1015 */
onReceiveMode(ByteStream & Ibs)1016 void WEDataLoader::onReceiveMode(ByteStream& Ibs)
1017 {
1018 // Assigning it here since WEDataLoader constructor is called multiple times
1019 // while coping in readthread class.
1020 if (!fpSysLog)
1021 {
1022 fpSysLog = SimpleSysLog::instance();
1023 fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SRV));
1024 }
1025
1026 Ibs >> (ByteStream::quadbyte&)fMode;
1027 cout << "Setting fMode = " << fMode << endl;
1028
1029 if (fpSysLog)
1030 {
1031 ostringstream oss;
1032 oss << getObjId() << " : onReceiveMode() Setting fMode = " << fMode;
1033 logging::Message::Args errMsgArgs;
1034 errMsgArgs.add(oss.str());
1035 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1036 }
1037
1038 char aName[64];
1039 snprintf(aName, sizeof(aName), "ModuleDBRootCount%d-3", fPmId);
1040
1041 string aStrDbRootCnt = config::Config::makeConfig()->getConfig(
1042 "SystemModuleConfig", aName);
1043 cout << "DbRootCnt = " << aStrDbRootCnt << endl;
1044 ByteStream::byte aDbCnt = (ByteStream::byte)atoi(aStrDbRootCnt.c_str());
1045
1046 if (fpSysLog)
1047 {
1048 ostringstream oss;
1049 oss << getObjId() << " : onReceiveMode() DbRoot Count = " + aStrDbRootCnt;
1050 logging::Message::Args errMsgArgs;
1051 errMsgArgs.add(oss.str());
1052 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1053 }
1054
1055
1056 //Send No. of DBRoots to Client
1057 ByteStream aObs;
1058 aObs << (ByteStream::byte)WE_CLT_SRV_DBRCNT;
1059 aObs << (ByteStream::byte)fPmId;
1060 aObs << (ByteStream::byte)aDbCnt;
1061 boost::mutex::scoped_lock aLock(fClntMsgMutex);
1062 updateTxBytes(aObs.length());
1063
1064 try
1065 {
1066 fRef.fIos.write(aObs);
1067 }
1068 catch (...)
1069 {
1070 cout << "Broken Pipe .." << endl;
1071
1072 if (fpSysLog)
1073 {
1074 ostringstream oss;
1075 oss << getObjId() << " : Broken Pipe : socket write failed ";
1076 logging::Message::Args errMsgArgs;
1077 errMsgArgs.add(oss.str());
1078 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1079 }
1080
1081 }
1082
1083 aLock.unlock();
1084
1085 }
1086 //-----------------------------------------------------------------------------
1087 /**
1088 * @brief Acknowledgment. Not relevant at this point of time.
1089 * @brief Can make use to update the BRM
1090 * @param Incoming ByteStream, not relevant for now
1091 */
1092
onReceiveAck(ByteStream & Ibs)1093 void WEDataLoader::onReceiveAck(ByteStream& Ibs)
1094 {
1095 // All is good
1096 // update the status
1097 }
1098 //-----------------------------------------------------------------------------
1099 /**
1100 * @brief NAK. A Failure, Rollback should be initiated.
1101 * @brief Can make use to update the BRM
1102 * @param Incoming ByteStream, not relevant for now
1103 */
onReceiveNak(ByteStream & Ibs)1104 void WEDataLoader::onReceiveNak(ByteStream& Ibs)
1105 {
1106 // TODO - handle the problem
1107 }
1108 //-----------------------------------------------------------------------------
1109 /**
1110 * @brief ERROR. A Failure, Rollback should be initiated.
1111 * @brief Can make use to update the BRM
1112 * @param Incoming ByteStream, not relevant for now
1113 */
onReceiveError(ByteStream & Ibs)1114 void WEDataLoader::onReceiveError(ByteStream& Ibs)
1115 {
1116 // TODO - handle the failure situation.
1117 }
1118 //------------------------------------------------------------------------------
1119 // onReceiveCmdLineArgs - do what ever need to do with command line args
1120 //------------------------------------------------------------------------------
1121 /**
1122 * @brief Command line args received.
1123 * @brief Can make use to update the BRM
1124 * @param Incoming ByteStream, not relevant for now
1125 */
1126
onReceiveCmdLineArgs(ByteStream & Ibs)1127 void WEDataLoader::onReceiveCmdLineArgs(ByteStream& Ibs)
1128 {
1129 Ibs >> fCmdLineStr;
1130 cout << "CMD LINE ARGS came in " << fCmdLineStr << endl;
1131
1132 if (fpSysLog)
1133 {
1134 ostringstream oss;
1135 oss << getObjId() << " : CMD LINE ARGS came in " << fCmdLineStr;
1136 logging::Message::Args errMsgArgs;
1137 errMsgArgs.add(oss.str());
1138 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1139 }
1140
1141 ByteStream obs;
1142
1143 //TODO - Need to check all clear for starting CPI
1144 if (0 != getMode())
1145 {
1146 obs << (ByteStream::byte)WE_CLT_SRV_STARTCPI;
1147 }
1148 else
1149 {
1150 obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
1151 }
1152
1153 obs << (ByteStream::byte) fPmId; // PM id
1154 boost::mutex::scoped_lock aLock(fClntMsgMutex);
1155 updateTxBytes(obs.length());
1156
1157 try
1158 {
1159 fRef.fIos.write(obs);
1160 }
1161 catch (...)
1162 {
1163 cout << "Broken Pipe .." << endl;
1164
1165 if (fpSysLog)
1166 {
1167 ostringstream oss;
1168 oss << getObjId() << " : Broken Pipe : socket write failed ";
1169 logging::Message::Args errMsgArgs;
1170 errMsgArgs.add(oss.str());
1171 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1172 }
1173 }
1174
1175 aLock.unlock();
1176
1177 }
1178
1179 //-----------------------------------------------------------------------------
1180
onReceiveStartCpimport()1181 void WEDataLoader::onReceiveStartCpimport()
1182 {
1183 cout << "Start Cpimport command reached!!" << endl;
1184
1185 if (fpSysLog)
1186 {
1187 ostringstream oss;
1188 oss << getObjId() << " : Start Cpimport command reached!!";
1189 logging::Message::Args errMsgArgs;
1190 errMsgArgs.add(oss.str());
1191 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1192 }
1193
1194 try
1195 {
1196 setupCpimport();
1197
1198 if (1 == getMode()) // create a thread to handle the data feeding part
1199 {
1200 fpCfThread = new WECpiFeederThread(*this);
1201 fpCfThread->startFeederThread();
1202 }
1203 }
1204 catch (std::exception& ex)
1205 {
1206 // send an CPI FAIL command back to splitter
1207 if (fpSysLog)
1208 {
1209 logging::Message::Args errMsgArgs;
1210 errMsgArgs.add(ex.what());
1211 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
1212
1213 sendCpimportFailureNotice();
1214 return;
1215 }
1216 }
1217
1218 if (1 == getMode()) // In mode 2/0 we do not rqst data.
1219 {
1220 sendDataRequest();
1221 }
1222
1223 // We need to respond to KEEP ALIVES
1224 //else if(2 == getMode()) // Now we wait till cpimport comes back
1225 //{
1226 // if(getChPid())
1227 // teardownCpimport();
1228 //}
1229
1230
1231 }
1232
1233 //-----------------------------------------------------------------------------
1234
onReceiveBrmRptFileName(ByteStream & Ibs)1235 void WEDataLoader::onReceiveBrmRptFileName(ByteStream& Ibs)
1236 {
1237 Ibs >> fBrmRptFileName;
1238 cout << "Brm Rpt Filename Arrived " << fBrmRptFileName << endl;
1239
1240 //BUG 4645
1241 string::size_type idx = fBrmRptFileName.find_last_of('/');
1242
1243 if (idx > 0 && idx < string::npos)
1244 {
1245 string dirname(fBrmRptFileName, 0, idx);
1246 struct stat st;
1247
1248 if (stat(dirname.c_str(), &st) != 0)
1249 {
1250 cout << "Creating directory : " << dirname << endl;
1251 boost::filesystem::create_directories(dirname.c_str());
1252 }
1253 }
1254
1255 if (fpSysLog)
1256 {
1257 ostringstream oss;
1258 oss << getObjId() << " : Brm Rpt Filename Arrived " << fBrmRptFileName;
1259 logging::Message::Args errMsgArgs;
1260 errMsgArgs.add(oss.str());
1261 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1262 }
1263
1264 }
1265
1266 //-----------------------------------------------------------------------------
1267
onReceiveCleanup(ByteStream & Ibs)1268 void WEDataLoader::onReceiveCleanup(ByteStream& Ibs)
1269 {
1270 if (fpSysLog)
1271 {
1272 ostringstream oss;
1273 oss << getObjId() << " : OnReceiveCleanup arrived";
1274 logging::Message::Args errMsgArgs;
1275 errMsgArgs.add(oss.str());
1276 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1277 }
1278
1279 std::string aErrMsg;
1280
1281 WE_ClearTableLockCmd aClrTblLockCmd("DataLoader");
1282 int aRet = aClrTblLockCmd.processCleanup(Ibs, aErrMsg);
1283
1284 ByteStream obs;
1285 obs << (ByteStream::byte)WE_CLT_SRV_CLEANUP;
1286 obs << (ByteStream::byte) fPmId; // PM id
1287
1288 if (aRet == 0)
1289 obs << (ByteStream::byte)1; // cleanup success
1290 else
1291 obs << (ByteStream::byte)0; // cleanup failed
1292
1293 boost::mutex::scoped_lock aLock(fClntMsgMutex);
1294 updateTxBytes(obs.length());
1295
1296 try
1297 {
1298 fRef.fIos.write(obs);
1299 }
1300 catch (...)
1301 {
1302 cout << "Broken Pipe .." << endl;
1303
1304 if (fpSysLog)
1305 {
1306 ostringstream oss;
1307 oss << getObjId() << " : Broken Pipe : socket write failed ";
1308 logging::Message::Args errMsgArgs;
1309 errMsgArgs.add(oss.str());
1310 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1311 }
1312
1313 }
1314
1315 aLock.unlock();
1316
1317 }
1318
1319
1320 //-----------------------------------------------------------------------------
1321
1322
1323
onReceiveRollback(ByteStream & Ibs)1324 void WEDataLoader::onReceiveRollback(ByteStream& Ibs)
1325 {
1326 if (fpSysLog)
1327 {
1328 ostringstream oss;
1329 oss << getObjId() << " : OnReceiveRollback arrived";
1330 logging::Message::Args errMsgArgs;
1331 errMsgArgs.add(oss.str());
1332 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1333 }
1334
1335
1336 std::string aErrMsg;
1337
1338 WE_ClearTableLockCmd aClrTblLockCmd("DataLoader");
1339 int aRet = aClrTblLockCmd.processRollback(Ibs, aErrMsg);
1340
1341 ByteStream obs;
1342 obs << (ByteStream::byte)WE_CLT_SRV_ROLLBACK;
1343 obs << (ByteStream::byte) fPmId; // PM id
1344
1345 if (aRet == 0)
1346 obs << (ByteStream::byte)1; // Rollback success
1347 else
1348 obs << (ByteStream::byte)0; // Rollback failed
1349
1350 boost::mutex::scoped_lock aLock(fClntMsgMutex);
1351 updateTxBytes(obs.length());
1352
1353 try
1354 {
1355 fRef.fIos.write(obs);
1356 }
1357 catch (...)
1358 {
1359 cout << "Broken Pipe .." << endl;
1360
1361 if (fpSysLog)
1362 {
1363 ostringstream oss;
1364 oss << getObjId() << " : Broken Pipe : socket write failed ";
1365 logging::Message::Args errMsgArgs;
1366 errMsgArgs.add(oss.str());
1367 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1368 }
1369 }
1370
1371 aLock.unlock();
1372
1373 }
1374 //-----------------------------------------------------------------------------
1375
1376
onReceiveImportFileName(ByteStream & Ibs)1377 void WEDataLoader::onReceiveImportFileName(ByteStream& Ibs)
1378 {
1379 bool aGoodFile = true;
1380 std::string aFileName;
1381 Ibs >> aFileName;
1382
1383 //BUG 4245 : Need to check the file or path exists
1384 {
1385 std::fstream aFin;
1386 aFin.open(aFileName.c_str(), std::ios::in);
1387
1388 if (aFin.is_open()) // File exist, send an ERROR immediately
1389 {
1390 // file exists
1391 ByteStream obs;
1392 obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR;
1393 obs << (ByteStream::byte)fPmId;
1394 updateTxBytes(obs.length());
1395 boost::mutex::scoped_lock aLock(fClntMsgMutex);
1396
1397 try
1398 {
1399 fRef.fIos.write(obs);
1400 }
1401 catch (...)
1402 {
1403 cout << "Broken Pipe .." << endl;
1404
1405 if (fpSysLog)
1406 {
1407 ostringstream oss;
1408 oss << getObjId() << " : Broken Pipe : socket write failed ";
1409 logging::Message::Args errMsgArgs;
1410 errMsgArgs.add(oss.str());
1411 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1412 }
1413
1414 }
1415
1416 aGoodFile = false;
1417 aLock.unlock();
1418 }
1419
1420 aFin.close();
1421 }
1422
1423 if (aGoodFile)
1424 {
1425 fDataDumpFile.open(aFileName.c_str(), std::ios::app);
1426
1427 //BUG 4245 : If file dir is not existing, we need to fail this import
1428 if (!fDataDumpFile.good())
1429 {
1430 ByteStream obs;
1431 obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR;
1432 obs << (ByteStream::byte)fPmId; // PM id
1433 boost::mutex::scoped_lock aLock(fClntMsgMutex);
1434 updateTxBytes(obs.length());
1435
1436 try
1437 {
1438 fRef.fIos.write(obs);
1439 }
1440 catch (...)
1441 {
1442 cout << "Broken Pipe .." << endl;
1443
1444 if (fpSysLog)
1445 {
1446 ostringstream oss;
1447 oss << getObjId() << " : Broken Pipe : socket write failed ";
1448 logging::Message::Args errMsgArgs;
1449 errMsgArgs.add(oss.str());
1450 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1451 }
1452 }
1453
1454 aLock.unlock();
1455 }
1456 }
1457
1458 }
1459 //------------------------------------------------------------------------------
1460
onReceiveJobId(ByteStream & Ibs)1461 void WEDataLoader::onReceiveJobId(ByteStream& Ibs)
1462 {
1463 std::string aJobFileName;
1464 Ibs >> aJobFileName;
1465
1466 cout << "Incoming JobFileName : " << aJobFileName << endl;
1467
1468 //BUG 4645
1469 string::size_type idx = aJobFileName.find_last_of('/');
1470
1471 if (idx > 0 && idx < string::npos)
1472 {
1473 string dirname(aJobFileName, 0, idx);
1474 struct stat st;
1475
1476 if (stat(dirname.c_str(), &st) != 0)
1477 {
1478 cout << "Creating directory : " << dirname << endl;
1479 boost::filesystem::create_directories(dirname.c_str());
1480 }
1481 }
1482
1483 fJobFile.open(aJobFileName.c_str());
1484
1485 }
1486
1487 //------------------------------------------------------------------------------
1488
onReceiveJobData(ByteStream & Ibs)1489 void WEDataLoader::onReceiveJobData(ByteStream& Ibs)
1490 {
1491 // Will write to the output file.
1492 std::string aData;
1493 Ibs >> aData;
1494 fJobFile << aData;
1495 fJobFile.close();
1496 }
1497
1498 //------------------------------------------------------------------------------
1499
onReceiveErrFileRqst(ByteStream & Ibs)1500 void WEDataLoader::onReceiveErrFileRqst(ByteStream& Ibs)
1501 {
1502 std::string aErrFileName;
1503 Ibs >> aErrFileName;
1504 cout << "Error Filename Arrived " << aErrFileName << endl;
1505
1506 ByteStream obs;
1507 obs << (ByteStream::byte)WE_CLT_SRV_ERRLOG;
1508 obs << (ByteStream::byte) fPmId; // PM id
1509 obs << aErrFileName;
1510 BrmReportParser aErrFileParser;
1511 bool aRet = aErrFileParser.serialize(aErrFileName, obs);
1512
1513 if (aRet)
1514 {
1515 boost::mutex::scoped_lock aLock(fClntMsgMutex);
1516 updateTxBytes(obs.length());
1517
1518 try
1519 {
1520 fRef.fIos.write(obs);
1521 }
1522 catch (...)
1523 {
1524 cout << "Broken Pipe .." << endl;
1525
1526 if (fpSysLog)
1527 {
1528 ostringstream oss;
1529 oss << getObjId() << " : Broken Pipe : socket write failed ";
1530 logging::Message::Args errMsgArgs;
1531 errMsgArgs.add(oss.str());
1532 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1533 }
1534 }
1535
1536 aLock.unlock();
1537 }
1538
1539 // delete the temp files
1540 if (remove(aErrFileName.c_str()) != 0 )
1541 cout << "Failed in removing Error file: " << aErrFileName << endl;
1542 }
1543
1544
1545
1546 //------------------------------------------------------------------------------
1547 // Process the receipt of a msg containing the contents of a *.bad file.
1548 //------------------------------------------------------------------------------
onReceiveBadFileRqst(ByteStream & Ibs)1549 void WEDataLoader::onReceiveBadFileRqst(ByteStream& Ibs)
1550 {
1551 std::string aBadFileName;
1552 Ibs >> aBadFileName;
1553 cout << "Error Filename Arrived " << aBadFileName << endl;
1554
1555 ByteStream obs;
1556 obs << (ByteStream::byte)WE_CLT_SRV_BADLOG;
1557 obs << (ByteStream::byte) fPmId; // PM id
1558 obs << aBadFileName;
1559 BrmReportParser aBadFileParser;
1560 bool aRet = aBadFileParser.serializeBlocks(aBadFileName, obs);
1561
1562 if (aRet)
1563 {
1564 boost::mutex::scoped_lock aLock(fClntMsgMutex);
1565 updateTxBytes(obs.length());
1566
1567 try
1568 {
1569 fRef.fIos.write(obs);
1570 }
1571 catch (...)
1572 {
1573 cout << "Broken Pipe .." << endl;
1574
1575 if (fpSysLog)
1576 {
1577 ostringstream oss;
1578 oss << getObjId() << " : Broken Pipe : socket write failed ";
1579 logging::Message::Args errMsgArgs;
1580 errMsgArgs.add(oss.str());
1581 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1582 }
1583 }
1584
1585 aLock.unlock();
1586 }
1587
1588 // delete the temp files
1589 if ( remove(aBadFileName.c_str()) != 0)
1590 cout << "Failed in removing Error file: " << aBadFileName << endl;
1591 }
1592
1593
1594 //------------------------------------------------------------------------------
1595
sendDataRequest()1596 void WEDataLoader::sendDataRequest()
1597 {
1598 int aQsz = (fpCfThread) ? fpCfThread->getQueueSize() : 0;
1599
1600 //if(aQsz>MIN_QSIZE)
1601 // Bug 5031 : If Q size goes above 100 (2*50); there is some thing wrong
1602 // will put a warning in info log. Controlled in Cpimport init data rqst cnt
1603 if (aQsz > MAX_QSIZE) // >250
1604 {
1605 cout << "WARNING : Data Queuing up : QSize = " << aQsz << endl;
1606
1607 if (fpSysLog)
1608 {
1609 ostringstream oss;
1610 oss << getObjId() << "WARNING : Data Queuing up : QSize = " << aQsz;
1611 logging::Message::Args errMsgArgs;
1612 errMsgArgs.add(oss.str());
1613 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1614 }
1615 }
1616
1617 boost::mutex::scoped_lock aLock(fClntMsgMutex);
1618 ByteStream obs;
1619 obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
1620 obs << (ByteStream::byte)fPmId; // PM id
1621 updateTxBytes(obs.length());
1622
1623 try
1624 {
1625 fRef.fIos.write(obs);
1626 }
1627 catch (...)
1628 {
1629 cout << "Broken Pipe .." << endl;
1630
1631 if (fpSysLog)
1632 {
1633 ostringstream oss;
1634 oss << getObjId() << " : Broken Pipe : socket write failed ";
1635 logging::Message::Args errMsgArgs;
1636 errMsgArgs.add(oss.str());
1637 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1638 }
1639 }
1640
1641 aLock.unlock();
1642 }
1643
1644
1645 //------------------------------------------------------------------------------
serialize(messageqcpp::ByteStream & b) const1646 void WEDataLoader::serialize(messageqcpp::ByteStream& b) const
1647 {
1648 //TODO to be changed. left it here to understand how to implement
1649 /*
1650 b << (ObjectReader::id_t) ObjectReader::SIMPLECOLUMN;
1651 ReturnedColumn::serialize(b); // parent class serialize
1652 b << (uint32_t) fOid;
1653 b << fData;
1654 b << static_cast<const ByteStream::doublebyte>(fReturnAll);
1655 b << (uint32_t) fSequence;
1656 */
1657 }
1658
1659 //-----------------------------------------------------------------------------
1660
unserialize(messageqcpp::ByteStream & b)1661 void WEDataLoader::unserialize(messageqcpp::ByteStream& b)
1662 {
1663 //TODO to be changed. left it here to understand how to implement
1664 /*
1665 ObjectReader::checkType(b, ObjectReader::SIMPLECOLUMN);
1666 ReturnedColumn::unserialize(b); // parent class unserialize
1667 b >> (uint32_t&) fOid;
1668 b >> fData;
1669 b >> reinterpret_cast<ByteStream::doublebyte&>(fReturnAll);
1670 b >> (uint32_t&) fSequence;
1671 */
1672 }
1673
1674 //-----------------------------------------------------------------------------
1675
1676 } // namespace WriteEngine
1677