1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 /*******************************************************************************
18 * $Id$
19 *
20 *******************************************************************************/
21 /*
22  * 	we_dataloader.cpp
23  *
24  *  Created on: Oct 4, 2011
25  *      Author: Boby Paul: bpaul@calpont.com
26  */
27 
28 #include "mcsconfig.h" // Used to pickup STRERROR_R_CHAR_P definition
29 
30 #include <cstdlib>
31 #include <csignal>
32 #include <cstring>
33 #include <cerrno>
34 
35 #include <unistd.h>			//pipe() && fork()
36 #if defined(__linux__)
37 #include <wait.h>			//wait()
38 #elif defined(__FreeBSD__)
39 #include <sys/types.h>
40 #include <sys/stat.h>   	// For stat().
41 #include <sys/wait.h>
42 #include <sys/time.h>
43 #include <sys/resource.h>
44 #endif
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <time.h>
48 #include "bytestream.h"
49 #include "rwlock_local.h"
50 
51 #include <iostream>
52 #include <fstream>
53 #include <vector>
54 #include <queue>
55 #include <string>
56 #include <map>
57 using namespace std;
58 
59 
60 #include <boost/thread/condition.hpp>
61 #include <boost/scoped_array.hpp>
62 #include <boost/thread.hpp>
63 #include <boost/filesystem.hpp>
64 using namespace boost;
65 
66 #include "bytestream.h"
67 #include "messagequeue.h"
68 using namespace messageqcpp;
69 
70 #include "we_messages.h"
71 #include "we_brmrprtparser.h"
72 #include "we_cleartablelockcmd.h"
73 #include "we_dataloader.h"
74 #include "we_readthread.h"
75 
76 #include "installdir.h"
77 
78 namespace WriteEngine
79 {
80 
81 //bool WEDataLoader::fTearDownCpimport=false; // @bug 4267
82 
83 //-----------------------------------------------------------------------------
84 /**
85  *
86  * @brief 	WEDataLoader::Constructor
87  *
88  **/
WEDataLoader(SplitterReadThread & Srt)89 WEDataLoader::WEDataLoader(SplitterReadThread& Srt ): fRef(Srt),
90     fMode(-1),
91     fDataDumpFile(),
92     fTxBytes(0),
93     fRxBytes(0),
94     fPmId(0),
95     fCh_pid(0),
96     fThis_pid(0),
97     fP_pid(0),
98     fpCfThread(0),
99     fTearDownCpimport(false), // @bug 4267
100     fWaitPidRc(0),            // @bug 4267
101     fWaitPidStatus(0),         // @bug 4267
102     fForceKill(false),
103     fPipeErr(false),
104     fpSysLog(0)
105 {
106     Config weConfig;
107     uint16_t localModuleId = weConfig.getLocalModuleID();
108     fPmId = static_cast<char>(localModuleId);
109 
110     srand ( time(NULL) );				// initialize random seed
111     int aObjId = rand() % 10000 + 1;		// generate a random number
112 
113     setObjId(aObjId);
114 
115     setupSignalHandlers();
116 
117     if (!fpSysLog)
118     {
119         fpSysLog = SimpleSysLog::instance();
120         fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SRV));
121     }
122 }
123 //-----------------------------------------------------------------------------
124 /**
125  *
126  * @brief 	WEDataLoader::Destructor
127  *
128  **/
129 
~WEDataLoader()130 WEDataLoader::~WEDataLoader()
131 {
132 
133     try
134     {
135         if (fDataDumpFile.is_open()) fDataDumpFile.close();
136 
137         cout << "\tRx Bytes " << getRxBytes() << endl;
138         cout << "\tTX Bytes " << getTxBytes() << endl;
139 
140         cout << "\tChild PID " << getChPid() << endl;
141 
142         if (getChPid())
143         {
144             if (2 == getMode()) //@bug 5012
145             {
146                 kill(getChPid(), SIGINT);
147                 teardownCpimport(fTearDownCpimport);
148             }
149             else
150             {
151                 teardownCpimport(false); // @bug 4267
152             }
153         }
154 
155     }
156     catch (std::exception&) // @bug 4164: exception causing thread to exit
157     {
158         cout << "Error tearing down cpimport in WEDataLoader destructor" << endl;
159     }
160 
161     //cout << "Leaving WEDataLoader destructor" << endl;
162 }
163 
164 //------------------------------------------------------------------------------
165 // Initialize signal handling
166 //------------------------------------------------------------------------------
167 
setupSignalHandlers()168 void WEDataLoader::setupSignalHandlers()
169 {
170 #ifndef _MSC_VER
171     signal(SIGPIPE, SIG_IGN);
172     signal(SIGCHLD, WEDataLoader::onSigChild);
173 #endif
174 }
175 //------------------------------------------------------------------------------
176 // handles on signal Terminate
177 //------------------------------------------------------------------------------
onSigChild(int aInt)178 void WEDataLoader::onSigChild(int aInt)
179 {
180     std::string aStr = "Received SIGCHLD of terminated process..";
181     cout << aStr << endl;
182     // fTearDownCpimport = true; // @bug 4267
183 
184     // commented out for non-static variables
185     //ostringstream oss;
186     //oss << getObjId() <<" : " <<aStr;
187     //logging::Message::Args errMsgArgs;
188     //errMsgArgs.add(oss.str());
189     //fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
190 
191 }
192 
193 
194 //-----------------------------------------------------------------------------
195 /**
196  *
197  * @brief 	WEDataLoader::update
198  *
199  **/
200 
update(Subject * pSub)201 bool WEDataLoader::update(Subject* pSub)
202 {
203     return true;
204 }
205 //-----------------------------------------------------------------------------
206 /**
207  *
208  * @brief setup cpimport as a seperate process
209  *
210  **/
211 
setupCpimport()212 bool WEDataLoader::setupCpimport() // fork the cpimport
213 {
214     pid_t aChPid;
215 
216     errno = 0;
217 
218     if (pipe(fFIFO) == -1)
219     {
220         int errnum = errno;
221         ostringstream oss;
222         oss << getObjId() << " : Error in creating pipe (errno-" <<
223             errnum << "); " << strerror(errnum);
224         throw runtime_error( oss.str() );
225     }
226 
227     setPid(getpid());
228     setPPid(getppid());
229 
230     errno = 0;
231     aChPid = fork();
232 
233     if (aChPid == -1)	//an error caused
234     {
235         int errnum = errno;
236         ostringstream oss;
237         oss << getObjId() << " : Error in forking cpimport.bin (errno-" <<
238             errnum << "); " << strerror(errnum);
239         throw runtime_error( oss.str() );
240     }
241     else if (aChPid == 0) // we are in child
242     {
243         int aStartFD = 3;
244         int aEndFD = fFIFO[1] + 256;
245         close(fFIFO[1]);	//close the WRITER of CHILD
246 
247         cout << "Child Process Info: PID = " << getpid()
248              << " (fFIFO[0], fFIFO[1]) = (" << fFIFO[0] << "," << fFIFO[1] << ")"
249              << " (StartFD, EndFD) = (" << aStartFD << "," << aEndFD << ")" << endl;
250 
251         std::vector<char*> Cmds;
252         //str2Argv(fCmdLineStr, Cmds);	// to avoid out-of-scope problem
253         std::string aCmdLine = fCmdLineStr;
254         std::istringstream ss(aCmdLine);
255         std::string arg;
256         std::vector<std::string> v2(20, "");
257         unsigned int i = 0;
258 
259         while (ss >> arg)
260         {
261             v2[i++] = arg;
262         }
263 
264         for (unsigned int j = 0; j < i; ++j)
265         {
266             Cmds.push_back(const_cast<char*>(v2[j].c_str()));
267         }
268 
269         Cmds.push_back(0); //null terminate
270         //updatePrgmPath(Cmds);
271 
272         //NOTE: for debugging
273         int aSize = Cmds.size();
274 
275         for (int aIdx = 0; aIdx < aSize; ++aIdx)
276         {
277             cout << "Args " << Cmds[aIdx] << endl;
278         }
279 
280         cout.flush();
281 
282         close(0);			//close stdin for the child
283         dup2(fFIFO[0], 0);	//make stdin be the reading end of the pipe
284 
285         //BUG 4410 : hacky solution so that CHLD process get EOF on close of pipe
286         for (int i = aStartFD; i < aEndFD; i++) close(i);
287 
288         errno = 0;
289         int aRet = execvp(Cmds[0], &Cmds[0]);	//NOTE - works with full Path
290         //int aRet = execvp(Cmds[0], &Cmds[0]);	//NOTE - works if $PATH has cpimport
291 
292         int execvErrno = errno;
293         cout << "Return status of cpimport is " << aRet << endl;
294         cout.flush();
295         close(fFIFO[0]);	// will trigger an EOF on stdin
296         ostringstream oss;
297         oss << getObjId() << " : execv error: cpimport.bin invocation failed; "
298             << "(errno-" << errno << "); " << strerror(execvErrno) <<
299             "; Check file and try invoking locally.";
300         logging::Message::Args errMsgArgs;
301         errMsgArgs.add(oss.str());
302         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
303 
304         if (aRet == -1) exit(-1);
305     }
306     else	// parent
307     {
308         setChPid(aChPid);	// This is the child PID
309         cout << "Child PID is " << this->getChPid() << endl;
310         close(fFIFO[0]);	//close the READER of PARENT
311         // now we can send all the data thru FIFO[1], writer of PARENT
312     }
313 
314     if (aChPid == 0)
315         cout << "******** Child finished its work ********" << endl;
316 
317     return true;
318 }
319 //-----------------------------------------------------------------------------
320 /**
321  *
322  * @brief close all file handles opened for cpimport
323  * @brief wait for the cpimport process to finish work
324  *
325  **/
326 
teardownCpimport(bool useStoredWaitPidStatus)327 void WEDataLoader::teardownCpimport(bool useStoredWaitPidStatus) // @bug 4267
328 {
329     fTearDownCpimport = false;		//Reset it
330     //cout << "Tearing down Cpimport" << endl;
331     int aStatus;
332 
333     //cout << "checking fpCfThread value" << endl;
334     if (fpCfThread)
335     {
336         //cout << "checking fpCfThread has a valid value" << endl;
337 
338         //wait until we are done with the queued messages
339         while ((!fpCfThread->isMsgQueueEmpty()) && (!fpCfThread->isStopped()))
340         {
341             //cout << "DEBUG : MsgQueue not empty" << endl;
342             //cannot be too low, since there is a lock in isMsgQueueEmpty()
343             usleep(2000000);
344         }
345 
346 //		while(fpCfThread->isPushing())
347 //		{
348 //			cout << "DEBUG : still pushing" << endl;
349 //			usleep(100000);
350 //		}
351 
352         if (fpSysLog)
353         {
354             ostringstream oss;
355             oss << getObjId() << " : Message Queue is empty; Stopping CF Thread";
356             logging::Message::Args errMsgArgs;
357             errMsgArgs.add(oss.str());
358             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
359         }
360 
361         fpCfThread->stopThread();
362 
363         while (!fpCfThread->isStopped())
364         {
365             cout << "DEBUG : still not stopped" << endl;
366             usleep(100000);
367         }
368 
369         delete fpCfThread;
370         fpCfThread = 0;
371     }
372 
373 
374     closeWritePipe();
375     pid_t aPid;
376 
377     // @bug 4267 begin: call waitpid() to get job status or use stored job status
378     // aPid = waitpid(getChPid(), &aStatus, 0); // wait until cpimport finishs
379     if (useStoredWaitPidStatus)
380     {
381         aPid    = fWaitPidRc;
382         aStatus = fWaitPidStatus;
383     }
384     else
385     {
386         //aPid = waitpid(getChPid(), &aStatus, 0); // wait until cpimport finishs
387         aPid = waitpid(getChPid(), &aStatus, WNOHANG); // wait until cpimport finishs
388         int aIdx = 0;
389 
390         while ((aPid == 0) && (aIdx < 25 * MAX_QSIZE)) //Do not loop infinitly
391         {
392             usleep(2000000);
393             aPid = waitpid(getChPid(), &aStatus, WNOHANG);
394             cout << "Inside tearDown waitpid rc[" << aIdx << "] = " << aPid << endl;
395             ++aIdx;
396         }
397     }
398 
399     // @bug 4267 end						 // BP - added -1 as per DMC comment below
400     if ((aPid == getChPid()) || (aPid == -1)) // @bug 4267 (DMC-shouldn't we check for aPid of -1?)
401     {
402         setChPid(0);
403 
404         if ((WIFEXITED(aStatus)) && (WEXITSTATUS(aStatus) == 0))
405         {
406             cout << "\tCpimport exit on success" << endl;
407             ostringstream oss;
408             oss << getObjId() << " : cpimport exit on success";
409             logging::Message::Args errMsgArgs;
410             errMsgArgs.add(oss.str());
411             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
412 
413             onCpimportSuccess();
414         }
415         else
416         {
417             int termsig = (WIFSIGNALED(aStatus) ? WTERMSIG(aStatus) : -1);
418 
419             if (!fForceKill)
420             {
421                 cout << "\tCpimport exit on failure (signal " << termsig << ')' << endl;
422                 ostringstream oss;
423                 oss << getObjId() << " : cpimport exit on failure (signal " << termsig << ')';
424                 logging::Message::Args errMsgArgs;
425                 errMsgArgs.add(oss.str());
426                 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
427                 onCpimportFailure();
428             }
429             else
430             {
431                 cout << "\tCpimport exit on Force Kill!!" << endl;
432                 ostringstream oss;
433                 oss << getObjId() << " : cpimport exit on Force kill!!";
434                 logging::Message::Args errMsgArgs;
435                 errMsgArgs.add(oss.str());
436                 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
437                 onCpimportSuccess();
438             }
439         }
440     }
441 }
442 //-----------------------------------------------------------------------------
443 /**
444  * @brief 	Push the data to cpimport from the incoming ByteStream
445  * @param	Incoming ByteStream
446  *
447  */
448 //
pushData2Cpimport(ByteStream & Ibs)449 void WEDataLoader::pushData2Cpimport(ByteStream& Ibs)
450 {
451     if (Ibs.length() > 0)
452     {
453         int aLen = Ibs.length();
454         char* pStart = reinterpret_cast<char*>(Ibs.buf());
455         char* pEnd = pStart + aLen;
456         char* pPtr = pStart;
457 
458         while (pPtr < pEnd)
459         {
460             //		if(pEnd > (pPtr + MAX_LEN))
461             //		{
462             //			int aRet = write(fFIFO[1], pPtr, MAX_LEN);
463             //			if(aRet == -1) throw runtime_error("Pipe write error");
464             //			//write(fFIFO[1], Ibs.buf(), Ibs.length());
465             //			pPtr += MAX_LEN;
466             //		}
467             //		else
468             //		{
469             //			int aStrLen = pEnd - pPtr;
470             //			int aRet = write(fFIFO[1], pPtr, aStrLen);
471             //			if(aRet == -1) throw runtime_error("Pipe write error");
472             //			pPtr += aStrLen;
473             //		}
474 
475             try
476             {
477                 int aRet = write(fFIFO[1], pPtr, pEnd - pPtr);
478 
479                 if (aRet < 0)
480                 {
481                     if (!fPipeErr)
482                     {
483                         int e = errno;
484                         fPipeErr = true;
485                         std::string aStr = "pushing data : PIPE error .........";
486 
487                         char errMsgBuf[160];
488 #if STRERROR_R_CHAR_P
489                         const char* pErrMsg = strerror_r(
490                                                   e, errMsgBuf, sizeof(errMsgBuf));
491 
492                         if (pErrMsg)
493                             aStr += pErrMsg;
494 
495 #else
496                         int errMsgRc = strerror_r(e, errMsgBuf, sizeof(errMsgBuf));
497 
498                         if (errMsgRc == 0)
499                             aStr += errMsgBuf;
500 
501 #endif
502                         logging::Message::Args errMsgArgs;
503                         errMsgArgs.add(aStr);
504                         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
505                     }
506 
507                     throw runtime_error("Pipe Error - cpimport.bin exited already!!");
508                 }
509 
510                 pPtr += aRet;
511             }
512             catch (...)
513             {
514                 //std::string aStr = "pushing data PIPE error .........";
515                 //logging::Message::Args errMsgArgs;
516                 //errMsgArgs.add(aStr);
517                 //fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
518                 throw runtime_error("Pipe Error - cpimport.bin exited already!!");
519             }
520         }
521 
522     }
523 }
524 
525 //-----------------------------------------------------------------------------
526 /**
527  *
528  * @brief 	Close the pipe through which data was written to cpimport
529  * @brief 	This will also signal a EOF to the reading pipe.
530  *
531  **/
532 
closeWritePipe()533 void  WEDataLoader::closeWritePipe()
534 {
535     cout << "Going to close call = " << fFIFO[1] << endl;
536     //NOTE this will flush the file buffer and close it.
537     int aRet = close(fFIFO[1]);		// will trigger EOD
538     cout << "----- closed both pipes -------- aRet = " << aRet << endl;
539 }
540 
541 //-----------------------------------------------------------------------------
542 /**
543  *
544  * @brief 	Tokenize a string into char** argv format and store in a vector
545  * @brief 	we pass the V as arguments to exec cpimport
546  * @param 	CmdLine is the string form of arguments demlimited by space
547  * @param 	V vector which contains each element of argv
548  *
549  **/
550 
str2Argv(std::string CmdLine,std::vector<char * > & V)551 void WEDataLoader::str2Argv(std::string CmdLine, std::vector<char*>& V)
552 {
553     std::istringstream ss(CmdLine);
554     std::string arg;
555     std::vector<std::string> v2;
556 
557     while (ss >> arg)
558     {
559         v2.push_back(arg);
560         V.push_back(const_cast<char*>(v2.back().c_str()));
561     }
562 
563     V.push_back(0); //null terminate
564 }
565 
566 //-----------------------------------------------------------------------------
567 /**
568  *
569  * @brief 	Event to trigger when cpimport is successful.
570  *
571  **/
572 
onCpimportSuccess()573 void WEDataLoader::onCpimportSuccess()
574 {
575 
576     ByteStream obs;
577 
578     cout << "Sending BRMRPT" << endl;
579     obs << (ByteStream::byte)WE_CLT_SRV_BRMRPT;
580     obs << (ByteStream::byte)fPmId;     // PM id
581     // for testing
582     //std::string fRptFileName("ReportFile.txt");
583     BrmReportParser aBrmRptParser;
584     bool aRet = aBrmRptParser.serialize(fBrmRptFileName, obs);
585 
586     if (aRet)
587     {
588         boost::mutex::scoped_lock aLock(fClntMsgMutex);
589         //aBrmRptParser.unserialize(obs);   - was for testing
590         updateTxBytes(obs.length());
591 
592         try
593         {
594             fRef.fIos.write(obs);
595         }
596         catch (...)
597         {
598             cout << "Broken Pipe .." << endl;
599 
600             if (fpSysLog)
601             {
602                 ostringstream oss;
603                 oss << getObjId() << " : Broken Pipe : socket write failed ";
604                 logging::Message::Args errMsgArgs;
605                 errMsgArgs.add(oss.str());
606                 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
607             }
608         }
609 
610         aLock.unlock();
611         cout << "Finished Sending BRMRPT" << endl;
612     }
613     else
614     {
615         cout << "Failed to serialize BRMRpt " << endl;
616     }
617 
618 //	if(remove(fBrmRptFileName.c_str()) != 0)
619 //		cout <<"Failed to delete BRMRpt File "<< fBrmRptFileName << endl;
620     //usleep(1000000);	//sleep 1 second.
621 
622     obs.reset();
623     obs << (ByteStream::byte)WE_CLT_SRV_CPIPASS;
624     obs << (ByteStream::byte)fPmId;     // PM id
625     boost::mutex::scoped_lock aLock(fClntMsgMutex);
626     updateTxBytes(obs.length());
627 
628     try
629     {
630         fRef.fIos.write(obs);
631     }
632     catch (...)
633     {
634         cout << "Broken Pipe .." << endl;
635 
636         if (fpSysLog)
637         {
638             ostringstream oss;
639             oss << getObjId() << " : Broken Pipe : socket write failed ";
640             logging::Message::Args errMsgArgs;
641             errMsgArgs.add(oss.str());
642             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
643         }
644 
645     }
646 
647     aLock.unlock();
648 
649     cout << "Sent CPIPASS info" << endl;
650 
651     if (fpSysLog)
652     {
653         ostringstream oss;
654         oss << getObjId() << " : onCpimportSuccess BrmReport Send";
655         logging::Message::Args errMsgArgs;
656         errMsgArgs.add(oss.str());
657         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
658     }
659 
660 }
661 
662 //-----------------------------------------------------------------------------
663 /**
664  *
665  * @brief 	Event to trigger if a cpimport failure occurs.
666  *
667  **/
onCpimportFailure()668 void WEDataLoader::onCpimportFailure()
669 {
670     // Send failure notice back to the parent splitter job
671     sendCpimportFailureNotice();
672 
673     //Even if we failed, we have failure info in BRMRPT
674     ByteStream obs;
675     obs << (ByteStream::byte)WE_CLT_SRV_BRMRPT;
676     obs << (ByteStream::byte)fPmId;     // PM id
677     BrmReportParser aBrmRptParser;
678     bool aRet = aBrmRptParser.serialize(fBrmRptFileName, obs);
679 
680     if (aRet)
681     {
682         boost::mutex::scoped_lock aLock(fClntMsgMutex);
683         updateTxBytes(obs.length());
684 
685         try
686         {
687             fRef.fIos.write(obs);
688         }
689         catch (std::exception& ex)
690         {
691             cout << "Broken Pipe .." << ex.what() << endl;
692 
693             if (fpSysLog)
694             {
695                 ostringstream oss;
696                 oss << getObjId() << " : Broken Pipe : socket write failed; " <<
697                     ex.what();
698                 logging::Message::Args errMsgArgs;
699                 errMsgArgs.add(oss.str());
700                 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR,
701                                  logging::M0000);
702             }
703         }
704 
705         aLock.unlock();
706         cout << "Finished Sending BRMRPT" << endl;
707     }
708 
709     if (remove(fBrmRptFileName.c_str()) != 0)
710         cout << "Failed to delete BRMRpt File " << fBrmRptFileName << endl;
711 
712     if (fpSysLog)
713     {
714         ostringstream oss;
715         oss << getObjId() << " : onCpimportFailure BrmReport Send";
716         logging::Message::Args errMsgArgs;
717         errMsgArgs.add(oss.str());
718         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
719     }
720 }
721 
722 //-----------------------------------------------------------------------------
723 // Send msg to front-end splitter to notify it that a cpimport.bin pgm failed.
724 //-----------------------------------------------------------------------------
sendCpimportFailureNotice()725 void WEDataLoader::sendCpimportFailureNotice()
726 {
727     ByteStream obs;
728     obs << (ByteStream::byte)WE_CLT_SRV_CPIFAIL;
729     obs << (ByteStream::byte)fPmId;     // PM id
730     boost::mutex::scoped_lock aLock(fClntMsgMutex);
731     updateTxBytes(obs.length());
732 
733     try
734     {
735         fRef.fIos.write(obs);
736     }
737     catch (...)
738     {
739         cout << "Broken Pipe .." << endl;
740 
741         if (fpSysLog)
742         {
743             ostringstream oss;
744             oss << getObjId() << " : Broken Pipe : socket write failed ";
745             logging::Message::Args errMsgArgs;
746             errMsgArgs.add(oss.str());
747             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
748         }
749     }
750 }
751 
752 //-----------------------------------------------------------------------------
753 /**
754  * @brief 	Event when a KEEPALIVE arrives.
755  * @param	Incoming ByteStream, not used currently
756  */
onReceiveKeepAlive(ByteStream & Ibs)757 void WEDataLoader::onReceiveKeepAlive(ByteStream& Ibs)
758 {
759     /*
760     // TODO comment out when we done with debug
761     if(fpSysLog)
762     {
763     	ostringstream oss;
764     	oss << getObjId() <<" : Received KEEPALIVE";
765     	logging::Message::Args errMsgArgs;
766     	errMsgArgs.add(oss.str());
767     	fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
768     }
769     */
770     //cout << "Received KEEPALIVE" << endl;
771     // NOTE only seldom a KEEPALIVE receives,
772     // 		so nothing wrong in responding with a KEEPALIVE.
773     ByteStream obs;
774     obs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE;
775     obs << (ByteStream::byte)fPmId;     // PM id
776     boost::mutex::scoped_lock aLock(fClntMsgMutex);
777     updateTxBytes(obs.length());
778 
779     try
780     {
781         fRef.fIos.write(obs);
782     }
783     catch (...)
784     {
785         cout << "Broken Pipe .." << endl;
786 
787         if (fpSysLog)
788         {
789             ostringstream oss;
790             oss << getObjId() << ": Broken Pipe : socket write failed ";
791             logging::Message::Args errMsgArgs;
792             errMsgArgs.add(oss.str());
793             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
794         }
795     }
796 
797     aLock.unlock();
798 
799     // @bug 4267 begin
800     int   aStatus;
801     pid_t aPid;
802 
803     if (getChPid() > 0)
804     {
805         aPid = waitpid(getChPid(), &aStatus, WNOHANG); // wait until cpimport finishs
806 
807         if (aPid != 0)
808         {
809             cout << "waitpid(" << getChPid() << "): rc-" << aPid <<
810                  "; status-" << aStatus << "; exited-" <<
811                  (WIFEXITED(aStatus)) << endl;
812         }
813 
814         if ((aPid == getChPid()) || (aPid == -1))
815         {
816             fTearDownCpimport = true;
817             fWaitPidRc        = aPid;
818             fWaitPidStatus    = aStatus;
819         }
820     }
821 
822     // @bug 4267 end
823 
824     if (fTearDownCpimport)
825     {
826         //fTearDownCpimport = false;		//Reset it // commented out to use the flag in EOD
827         cout << "Cpimport terminated " << endl;
828 
829         if (0 == getMode()) onReceiveEod(Ibs);
830         else if (1 == getMode())	//mode 1 has to drive from UM
831         {
832             ByteStream obs;
833             obs << (ByteStream::byte)WE_CLT_SRV_EOD;
834             obs << (ByteStream::byte)fPmId;     // PM id
835             boost::mutex::scoped_lock aLock(fClntMsgMutex);
836             updateTxBytes(obs.length());
837 
838             try
839             {
840                 fRef.fIos.write(obs);
841             }
842             catch (...)
843             {
844                 cout << "Broken Pipe .." << endl;
845 
846                 if (fpSysLog)
847                 {
848                     ostringstream oss;
849                     oss << getObjId() << ": Broken Pipe : socket write failed ";
850                     logging::Message::Args errMsgArgs;
851                     errMsgArgs.add(oss.str());
852                     fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
853                 }
854             }
855 
856             aLock.unlock();
857         }
858         else if (2 == getMode())
859         {
860             //if(getChPid()) teardownCpimport(true); // @bug 4267
861             if (getChPid()) teardownCpimport(fTearDownCpimport); // @bug 4267 BP
862         }
863     }
864     else
865     {
866         //if(1 == getMode())
867         //	if(fpCfThread) cout << "Queue Size = " << fpCfThread->getQueueSize() << endl;
868     }
869 
870 }
871 //-----------------------------------------------------------------------------
872 /**
873  * @brief 	trigger when a DATA arrives.
874  * @param	Incoming ByteStream which contains data
875  */
876 
onReceiveData(ByteStream & Ibs)877 void WEDataLoader::onReceiveData(ByteStream& Ibs)
878 {
879 
880     if ((0 == getMode()) && (fDataDumpFile.is_open()))
881     {
882         // Will write to the output file.
883         fDataDumpFile << Ibs;
884         sendDataRequest();
885     }
886     else if ( 1 == getMode())
887     {
888         // commented out since we are going to use seperate thread
889         //pushData2Cpimport(Ibs);
890 
891         if (fpCfThread)
892         {
893             fpCfThread->add2MsgQueue(Ibs);
894             //sendDataRequest();	// Need to control Queue Size
895             // Bug 5031 : Will only send 1 rqst for a batch to cpimport.bin
896             //if(fpCfThread->getQueueSize()<MIN_QSIZE) sendDataRequest();
897             //if(fpCfThread->getQueueSize()<MAX_QSIZE) sendDataRequest();
898 
899             int aQsz = (fpCfThread) ? fpCfThread->getQueueSize() : 0;
900 
901             // Bug 5031 : If Q size goes above 100 (2*250);
902             if (aQsz < MAX_QSIZE) sendDataRequest();
903 
904             if (aQsz > 1.5 * MAX_QSIZE) // > 2*250
905             {
906                 cout << "WARNING : Data Queuing up : QSize = " << aQsz << endl;
907 
908                 if (fpSysLog)
909                 {
910                     ostringstream oss;
911                     oss << getObjId() << "WARNING : Data Queuing up : QSize = " << aQsz;
912                     logging::Message::Args errMsgArgs;
913                     errMsgArgs.add(oss.str());
914                     fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
915                 }
916             }
917         }
918 
919     }
920     else if ( 2 == getMode())
921     {
922         cout << "onReceiveData : In Mode 2 NO data suppose to receive" << endl;
923     }
924 
925 
926 //	ByteStream obs;
927 //	obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
928 //	obs << (ByteStream::byte)fPmId;     // PM id
929 //  updateTxBytes(obs.length());
930 //	fRef.fIos.write(obs);
931 }
932 
933 //-----------------------------------------------------------------------------
934 /**
935  * @brief 	trigger when a EOD arrives.
936  * @param	Incoming ByteStream; not relevent for now
937  */
onReceiveEod(ByteStream & Ibs)938 void WEDataLoader::onReceiveEod(ByteStream& Ibs)
939 {
940     if (fpSysLog)
941     {
942         ostringstream oss;
943         oss << getObjId() << " : onReceiveEOD : child ID = " << getChPid();
944         logging::Message::Args errMsgArgs;
945         errMsgArgs.add(oss.str());
946         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
947     }
948 
949     cout << "Received EOD " << endl;
950 
951     if (0 == getMode())
952     {
953         fDataDumpFile.close();
954     }
955 
956     ByteStream obs;
957     obs << (ByteStream::byte)WE_CLT_SRV_EOD;
958     obs << (ByteStream::byte)fPmId;     // PM id
959     boost::mutex::scoped_lock aLock(fClntMsgMutex);
960     updateTxBytes(obs.length());
961 
962     try
963     {
964         fRef.fIos.write(obs);
965     }
966     catch (...)
967     {
968         cout << "Broken Pipe .." << endl;
969 
970         if (fpSysLog)
971         {
972             ostringstream oss;
973             oss << getObjId() << " : Broken Pipe : socket write failed ";
974             logging::Message::Args errMsgArgs;
975             errMsgArgs.add(oss.str());
976             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
977         }
978     }
979 
980     aLock.unlock();
981 
982     //if(( 1 == getMode())||( 2 == getMode()))
983     if (1 == getMode()) 	//BUG 4370 - seperated mode 1 & 2
984     {
985         //if(getChPid()) teardownCpimport(false); // @bug 4267
986         if (getChPid()) teardownCpimport(fTearDownCpimport); // @bug 4267 //BP changed to send the correct flag
987     }
988     else if (2 == getMode())	//BUG 4370
989     {
990         if (getChPid())
991         {
992             kill(getChPid(), SIGINT);	//BUG 4370
993             fForceKill = true;
994             teardownCpimport(fTearDownCpimport);	//BUG 4370
995         }
996     }
997 
998 
999 }
1000 //-----------------------------------------------------------------------------
1001 /**
1002  * @brief 	Event on Command Received. It should contain sub commands
1003  * @param	Incoming ByteStream, will have sub commands
1004  */
onReceiveCmd(ByteStream & bs)1005 void WEDataLoader::onReceiveCmd(ByteStream& bs)
1006 {
1007     //TODO - can be cpimport cmd or server cmd, for now write to a file
1008     ByteStream::byte aCmdId;
1009     bs >> aCmdId;
1010 }
1011 //-----------------------------------------------------------------------------
1012 /**
1013  * @brief 	The mode in which WES running.
1014  * @param	Incoming ByteStream, not relevent
1015  */
onReceiveMode(ByteStream & Ibs)1016 void WEDataLoader::onReceiveMode(ByteStream& Ibs)
1017 {
1018     // Assigning it here since WEDataLoader constructor is called multiple times
1019     // while coping in readthread class.
1020     if (!fpSysLog)
1021     {
1022         fpSysLog = SimpleSysLog::instance();
1023         fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SRV));
1024     }
1025 
1026     Ibs >> (ByteStream::quadbyte&)fMode;
1027     cout << "Setting fMode = " << fMode << endl;
1028 
1029     if (fpSysLog)
1030     {
1031         ostringstream oss;
1032         oss << getObjId() << " : onReceiveMode() Setting fMode = " << fMode;
1033         logging::Message::Args errMsgArgs;
1034         errMsgArgs.add(oss.str());
1035         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1036     }
1037 
1038     char aName[64];
1039     snprintf(aName, sizeof(aName), "ModuleDBRootCount%d-3", fPmId);
1040 
1041     string aStrDbRootCnt = config::Config::makeConfig()->getConfig(
1042                                "SystemModuleConfig", aName);
1043     cout << "DbRootCnt = " << aStrDbRootCnt << endl;
1044     ByteStream::byte aDbCnt = (ByteStream::byte)atoi(aStrDbRootCnt.c_str());
1045 
1046     if (fpSysLog)
1047     {
1048         ostringstream oss;
1049         oss << getObjId() << " : onReceiveMode() DbRoot Count = " + aStrDbRootCnt;
1050         logging::Message::Args errMsgArgs;
1051         errMsgArgs.add(oss.str());
1052         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1053     }
1054 
1055 
1056     //Send No. of DBRoots to Client
1057     ByteStream aObs;
1058     aObs << (ByteStream::byte)WE_CLT_SRV_DBRCNT;
1059     aObs << (ByteStream::byte)fPmId;
1060     aObs << (ByteStream::byte)aDbCnt;
1061     boost::mutex::scoped_lock aLock(fClntMsgMutex);
1062     updateTxBytes(aObs.length());
1063 
1064     try
1065     {
1066         fRef.fIos.write(aObs);
1067     }
1068     catch (...)
1069     {
1070         cout << "Broken Pipe .." << endl;
1071 
1072         if (fpSysLog)
1073         {
1074             ostringstream oss;
1075             oss << getObjId() << " : Broken Pipe : socket write failed ";
1076             logging::Message::Args errMsgArgs;
1077             errMsgArgs.add(oss.str());
1078             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1079         }
1080 
1081     }
1082 
1083     aLock.unlock();
1084 
1085 }
1086 //-----------------------------------------------------------------------------
1087 /**
1088  * @brief 	Acknowledgment. Not relevant at this point of time.
1089  * @brief	Can make use to update the BRM
1090  * @param	Incoming ByteStream, not relevant for now
1091  */
1092 
onReceiveAck(ByteStream & Ibs)1093 void WEDataLoader::onReceiveAck(ByteStream& Ibs)
1094 {
1095     // All is good
1096     // update the status
1097 }
1098 //-----------------------------------------------------------------------------
1099 /**
1100  * @brief 	NAK. A Failure, Rollback should be initiated.
1101  * @brief	Can make use to update the BRM
1102  * @param	Incoming ByteStream, not relevant for now
1103  */
onReceiveNak(ByteStream & Ibs)1104 void WEDataLoader::onReceiveNak(ByteStream& Ibs)
1105 {
1106     // TODO - handle the problem
1107 }
1108 //-----------------------------------------------------------------------------
1109 /**
1110  * @brief 	ERROR. A Failure, Rollback should be initiated.
1111  * @brief	Can make use to update the BRM
1112  * @param	Incoming ByteStream, not relevant for now
1113  */
onReceiveError(ByteStream & Ibs)1114 void WEDataLoader::onReceiveError(ByteStream& Ibs)
1115 {
1116     // TODO - handle the failure situation.
1117 }
1118 //------------------------------------------------------------------------------
1119 //  onReceiveCmdLineArgs - do what ever need to do with command line args
1120 //------------------------------------------------------------------------------
1121 /**
1122  * @brief 	Command line args received.
1123  * @brief	Can make use to update the BRM
1124  * @param	Incoming ByteStream, not relevant for now
1125  */
1126 
onReceiveCmdLineArgs(ByteStream & Ibs)1127 void WEDataLoader::onReceiveCmdLineArgs(ByteStream& Ibs)
1128 {
1129     Ibs >> fCmdLineStr;
1130     cout << "CMD LINE ARGS came in " << fCmdLineStr << endl;
1131 
1132     if (fpSysLog)
1133     {
1134         ostringstream oss;
1135         oss << getObjId() << " : CMD LINE ARGS came in " << fCmdLineStr;
1136         logging::Message::Args errMsgArgs;
1137         errMsgArgs.add(oss.str());
1138         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1139     }
1140 
1141     ByteStream obs;
1142 
1143     //TODO - Need to check all clear for starting CPI
1144     if (0 != getMode())
1145     {
1146         obs << (ByteStream::byte)WE_CLT_SRV_STARTCPI;
1147     }
1148     else
1149     {
1150         obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
1151     }
1152 
1153     obs << (ByteStream::byte) fPmId; // PM id
1154     boost::mutex::scoped_lock aLock(fClntMsgMutex);
1155     updateTxBytes(obs.length());
1156 
1157     try
1158     {
1159         fRef.fIos.write(obs);
1160     }
1161     catch (...)
1162     {
1163         cout << "Broken Pipe .." << endl;
1164 
1165         if (fpSysLog)
1166         {
1167             ostringstream oss;
1168             oss << getObjId() << " : Broken Pipe : socket write failed ";
1169             logging::Message::Args errMsgArgs;
1170             errMsgArgs.add(oss.str());
1171             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1172         }
1173     }
1174 
1175     aLock.unlock();
1176 
1177 }
1178 
1179 //-----------------------------------------------------------------------------
1180 
onReceiveStartCpimport()1181 void WEDataLoader::onReceiveStartCpimport()
1182 {
1183     cout << "Start Cpimport command reached!!" << endl;
1184 
1185     if (fpSysLog)
1186     {
1187         ostringstream oss;
1188         oss << getObjId() << " : Start Cpimport command reached!!";
1189         logging::Message::Args errMsgArgs;
1190         errMsgArgs.add(oss.str());
1191         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1192     }
1193 
1194     try
1195     {
1196         setupCpimport();
1197 
1198         if (1 == getMode())	// create a thread to handle the data feeding part
1199         {
1200             fpCfThread = new WECpiFeederThread(*this);
1201             fpCfThread->startFeederThread();
1202         }
1203     }
1204     catch (std::exception& ex)
1205     {
1206         // send an CPI FAIL command back to splitter
1207         if (fpSysLog)
1208         {
1209             logging::Message::Args errMsgArgs;
1210             errMsgArgs.add(ex.what());
1211             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
1212 
1213             sendCpimportFailureNotice();
1214             return;
1215         }
1216     }
1217 
1218     if (1 == getMode())	// In mode 2/0 we do not rqst data.
1219     {
1220         sendDataRequest();
1221     }
1222 
1223     // We need to respond to KEEP ALIVES
1224     //else if(2 == getMode())	// Now we wait till cpimport comes back
1225     //{
1226     //	if(getChPid())
1227     //		teardownCpimport();
1228     //}
1229 
1230 
1231 }
1232 
1233 //-----------------------------------------------------------------------------
1234 
onReceiveBrmRptFileName(ByteStream & Ibs)1235 void WEDataLoader::onReceiveBrmRptFileName(ByteStream& Ibs)
1236 {
1237     Ibs >> fBrmRptFileName;
1238     cout << "Brm Rpt Filename Arrived " << fBrmRptFileName << endl;
1239 
1240     //BUG 4645
1241     string::size_type idx = fBrmRptFileName.find_last_of('/');
1242 
1243     if (idx > 0 && idx < string::npos)
1244     {
1245         string dirname(fBrmRptFileName, 0, idx);
1246         struct stat st;
1247 
1248         if (stat(dirname.c_str(), &st) != 0)
1249         {
1250             cout << "Creating directory : " << dirname << endl;
1251             boost::filesystem::create_directories(dirname.c_str());
1252         }
1253     }
1254 
1255     if (fpSysLog)
1256     {
1257         ostringstream oss;
1258         oss << getObjId() << " : Brm Rpt Filename Arrived " << fBrmRptFileName;
1259         logging::Message::Args errMsgArgs;
1260         errMsgArgs.add(oss.str());
1261         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1262     }
1263 
1264 }
1265 
1266 //-----------------------------------------------------------------------------
1267 
onReceiveCleanup(ByteStream & Ibs)1268 void WEDataLoader::onReceiveCleanup(ByteStream& Ibs)
1269 {
1270     if (fpSysLog)
1271     {
1272         ostringstream oss;
1273         oss << getObjId() << " : OnReceiveCleanup arrived";
1274         logging::Message::Args errMsgArgs;
1275         errMsgArgs.add(oss.str());
1276         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1277     }
1278 
1279     std::string aErrMsg;
1280 
1281     WE_ClearTableLockCmd aClrTblLockCmd("DataLoader");
1282     int aRet = aClrTblLockCmd.processCleanup(Ibs, aErrMsg);
1283 
1284     ByteStream obs;
1285     obs << (ByteStream::byte)WE_CLT_SRV_CLEANUP;
1286     obs << (ByteStream::byte) fPmId; 	// PM id
1287 
1288     if (aRet == 0)
1289         obs << (ByteStream::byte)1;		// cleanup success
1290     else
1291         obs << (ByteStream::byte)0;		// cleanup failed
1292 
1293     boost::mutex::scoped_lock aLock(fClntMsgMutex);
1294     updateTxBytes(obs.length());
1295 
1296     try
1297     {
1298         fRef.fIos.write(obs);
1299     }
1300     catch (...)
1301     {
1302         cout << "Broken Pipe .." << endl;
1303 
1304         if (fpSysLog)
1305         {
1306             ostringstream oss;
1307             oss << getObjId() << " : Broken Pipe : socket write failed ";
1308             logging::Message::Args errMsgArgs;
1309             errMsgArgs.add(oss.str());
1310             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1311         }
1312 
1313     }
1314 
1315     aLock.unlock();
1316 
1317 }
1318 
1319 
1320 //-----------------------------------------------------------------------------
1321 
1322 
1323 
onReceiveRollback(ByteStream & Ibs)1324 void WEDataLoader::onReceiveRollback(ByteStream& Ibs)
1325 {
1326     if (fpSysLog)
1327     {
1328         ostringstream oss;
1329         oss << getObjId() << " : OnReceiveRollback arrived";
1330         logging::Message::Args errMsgArgs;
1331         errMsgArgs.add(oss.str());
1332         fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
1333     }
1334 
1335 
1336     std::string aErrMsg;
1337 
1338     WE_ClearTableLockCmd aClrTblLockCmd("DataLoader");
1339     int aRet = aClrTblLockCmd.processRollback(Ibs, aErrMsg);
1340 
1341     ByteStream obs;
1342     obs << (ByteStream::byte)WE_CLT_SRV_ROLLBACK;
1343     obs << (ByteStream::byte) fPmId; // PM id
1344 
1345     if (aRet == 0)
1346         obs << (ByteStream::byte)1;		// Rollback success
1347     else
1348         obs << (ByteStream::byte)0;		// Rollback failed
1349 
1350     boost::mutex::scoped_lock aLock(fClntMsgMutex);
1351     updateTxBytes(obs.length());
1352 
1353     try
1354     {
1355         fRef.fIos.write(obs);
1356     }
1357     catch (...)
1358     {
1359         cout << "Broken Pipe .." << endl;
1360 
1361         if (fpSysLog)
1362         {
1363             ostringstream oss;
1364             oss << getObjId() << " : Broken Pipe : socket write failed ";
1365             logging::Message::Args errMsgArgs;
1366             errMsgArgs.add(oss.str());
1367             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1368         }
1369     }
1370 
1371     aLock.unlock();
1372 
1373 }
1374 //-----------------------------------------------------------------------------
1375 
1376 
onReceiveImportFileName(ByteStream & Ibs)1377 void WEDataLoader::onReceiveImportFileName(ByteStream& Ibs)
1378 {
1379     bool aGoodFile = true;
1380     std::string aFileName;
1381     Ibs >> aFileName;
1382 
1383     //BUG 4245 : Need to check the file or path exists
1384     {
1385         std::fstream aFin;
1386         aFin.open(aFileName.c_str(), std::ios::in);
1387 
1388         if (aFin.is_open())	// File exist, send an ERROR immediately
1389         {
1390             // file exists
1391             ByteStream obs;
1392             obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR;
1393             obs << (ByteStream::byte)fPmId;
1394             updateTxBytes(obs.length());
1395             boost::mutex::scoped_lock aLock(fClntMsgMutex);
1396 
1397             try
1398             {
1399                 fRef.fIos.write(obs);
1400             }
1401             catch (...)
1402             {
1403                 cout << "Broken Pipe .." << endl;
1404 
1405                 if (fpSysLog)
1406                 {
1407                     ostringstream oss;
1408                     oss << getObjId() << " : Broken Pipe : socket write failed ";
1409                     logging::Message::Args errMsgArgs;
1410                     errMsgArgs.add(oss.str());
1411                     fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1412                 }
1413 
1414             }
1415 
1416             aGoodFile = false;
1417             aLock.unlock();
1418         }
1419 
1420         aFin.close();
1421     }
1422 
1423     if (aGoodFile)
1424     {
1425         fDataDumpFile.open(aFileName.c_str(), std::ios::app);
1426 
1427         //BUG 4245 : If file dir is not existing, we need to fail this import
1428         if (!fDataDumpFile.good())
1429         {
1430             ByteStream obs;
1431             obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR;
1432             obs << (ByteStream::byte)fPmId;     // PM id
1433             boost::mutex::scoped_lock aLock(fClntMsgMutex);
1434             updateTxBytes(obs.length());
1435 
1436             try
1437             {
1438                 fRef.fIos.write(obs);
1439             }
1440             catch (...)
1441             {
1442                 cout << "Broken Pipe .." << endl;
1443 
1444                 if (fpSysLog)
1445                 {
1446                     ostringstream oss;
1447                     oss << getObjId() << " : Broken Pipe : socket write failed ";
1448                     logging::Message::Args errMsgArgs;
1449                     errMsgArgs.add(oss.str());
1450                     fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1451                 }
1452             }
1453 
1454             aLock.unlock();
1455         }
1456     }
1457 
1458 }
1459 //------------------------------------------------------------------------------
1460 
onReceiveJobId(ByteStream & Ibs)1461 void WEDataLoader::onReceiveJobId(ByteStream& Ibs)
1462 {
1463     std::string aJobFileName;
1464     Ibs >> aJobFileName;
1465 
1466     cout << "Incoming JobFileName : " << aJobFileName << endl;
1467 
1468     //BUG 4645
1469     string::size_type idx = aJobFileName.find_last_of('/');
1470 
1471     if (idx > 0 && idx < string::npos)
1472     {
1473         string dirname(aJobFileName, 0, idx);
1474         struct stat st;
1475 
1476         if (stat(dirname.c_str(), &st) != 0)
1477         {
1478             cout << "Creating directory : " << dirname << endl;
1479             boost::filesystem::create_directories(dirname.c_str());
1480         }
1481     }
1482 
1483     fJobFile.open(aJobFileName.c_str());
1484 
1485 }
1486 
1487 //------------------------------------------------------------------------------
1488 
onReceiveJobData(ByteStream & Ibs)1489 void WEDataLoader::onReceiveJobData(ByteStream& Ibs)
1490 {
1491     // Will write to the output file.
1492     std::string aData;
1493     Ibs >> aData;
1494     fJobFile << aData;
1495     fJobFile.close();
1496 }
1497 
1498 //------------------------------------------------------------------------------
1499 
onReceiveErrFileRqst(ByteStream & Ibs)1500 void WEDataLoader::onReceiveErrFileRqst(ByteStream& Ibs)
1501 {
1502     std::string aErrFileName;
1503     Ibs >> aErrFileName;
1504     cout << "Error Filename Arrived " << aErrFileName << endl;
1505 
1506     ByteStream obs;
1507     obs << (ByteStream::byte)WE_CLT_SRV_ERRLOG;
1508     obs << (ByteStream::byte) fPmId; // PM id
1509     obs << aErrFileName;
1510     BrmReportParser aErrFileParser;
1511     bool aRet = aErrFileParser.serialize(aErrFileName, obs);
1512 
1513     if (aRet)
1514     {
1515         boost::mutex::scoped_lock aLock(fClntMsgMutex);
1516         updateTxBytes(obs.length());
1517 
1518         try
1519         {
1520             fRef.fIos.write(obs);
1521         }
1522         catch (...)
1523         {
1524             cout << "Broken Pipe .." << endl;
1525 
1526             if (fpSysLog)
1527             {
1528                 ostringstream oss;
1529                 oss << getObjId() << " : Broken Pipe : socket write failed ";
1530                 logging::Message::Args errMsgArgs;
1531                 errMsgArgs.add(oss.str());
1532                 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1533             }
1534         }
1535 
1536         aLock.unlock();
1537     }
1538 
1539     // delete the temp files
1540     if (remove(aErrFileName.c_str()) != 0 )
1541         cout << "Failed in removing Error file: " << aErrFileName << endl;
1542 }
1543 
1544 
1545 
1546 //------------------------------------------------------------------------------
1547 // Process the receipt of a msg containing the contents of a *.bad file.
1548 //------------------------------------------------------------------------------
onReceiveBadFileRqst(ByteStream & Ibs)1549 void WEDataLoader::onReceiveBadFileRqst(ByteStream& Ibs)
1550 {
1551     std::string aBadFileName;
1552     Ibs >> aBadFileName;
1553     cout << "Error Filename Arrived " << aBadFileName << endl;
1554 
1555     ByteStream obs;
1556     obs << (ByteStream::byte)WE_CLT_SRV_BADLOG;
1557     obs << (ByteStream::byte) fPmId; // PM id
1558     obs << aBadFileName;
1559     BrmReportParser aBadFileParser;
1560     bool aRet = aBadFileParser.serializeBlocks(aBadFileName, obs);
1561 
1562     if (aRet)
1563     {
1564         boost::mutex::scoped_lock aLock(fClntMsgMutex);
1565         updateTxBytes(obs.length());
1566 
1567         try
1568         {
1569             fRef.fIos.write(obs);
1570         }
1571         catch (...)
1572         {
1573             cout << "Broken Pipe .." << endl;
1574 
1575             if (fpSysLog)
1576             {
1577                 ostringstream oss;
1578                 oss << getObjId() << " : Broken Pipe : socket write failed ";
1579                 logging::Message::Args errMsgArgs;
1580                 errMsgArgs.add(oss.str());
1581                 fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1582             }
1583         }
1584 
1585         aLock.unlock();
1586     }
1587 
1588     // delete the temp files
1589     if ( remove(aBadFileName.c_str()) != 0)
1590         cout << "Failed in removing Error file: " << aBadFileName << endl;
1591 }
1592 
1593 
1594 //------------------------------------------------------------------------------
1595 
sendDataRequest()1596 void WEDataLoader::sendDataRequest()
1597 {
1598     int aQsz = (fpCfThread) ? fpCfThread->getQueueSize() : 0;
1599 
1600     //if(aQsz>MIN_QSIZE)
1601     // Bug 5031 : If Q size goes above 100 (2*50); there is some thing wrong
1602     // will put a warning in info log. Controlled in Cpimport init data rqst cnt
1603     if (aQsz > MAX_QSIZE) // >250
1604     {
1605         cout << "WARNING : Data Queuing up : QSize = " << aQsz << endl;
1606 
1607         if (fpSysLog)
1608         {
1609             ostringstream oss;
1610             oss << getObjId() << "WARNING : Data Queuing up : QSize = " << aQsz;
1611             logging::Message::Args errMsgArgs;
1612             errMsgArgs.add(oss.str());
1613             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1614         }
1615     }
1616 
1617     boost::mutex::scoped_lock aLock(fClntMsgMutex);
1618     ByteStream obs;
1619     obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
1620     obs << (ByteStream::byte)fPmId;     // PM id
1621     updateTxBytes(obs.length());
1622 
1623     try
1624     {
1625         fRef.fIos.write(obs);
1626     }
1627     catch (...)
1628     {
1629         cout << "Broken Pipe .." << endl;
1630 
1631         if (fpSysLog)
1632         {
1633             ostringstream oss;
1634             oss << getObjId() << " : Broken Pipe : socket write failed ";
1635             logging::Message::Args errMsgArgs;
1636             errMsgArgs.add(oss.str());
1637             fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
1638         }
1639     }
1640 
1641     aLock.unlock();
1642 }
1643 
1644 
1645 //------------------------------------------------------------------------------
serialize(messageqcpp::ByteStream & b) const1646 void WEDataLoader::serialize(messageqcpp::ByteStream& b) const
1647 {
1648     //TODO to be changed. left it here to understand how to implement
1649     /*
1650     b << (ObjectReader::id_t) ObjectReader::SIMPLECOLUMN;
1651     ReturnedColumn::serialize(b); // parent class serialize
1652     b << (uint32_t) fOid;
1653     b << fData;
1654     b << static_cast<const ByteStream::doublebyte>(fReturnAll);
1655     b << (uint32_t) fSequence;
1656     */
1657 }
1658 
1659 //-----------------------------------------------------------------------------
1660 
unserialize(messageqcpp::ByteStream & b)1661 void WEDataLoader::unserialize(messageqcpp::ByteStream& b)
1662 {
1663     //TODO to be changed. left it here to understand how to implement
1664     /*
1665     ObjectReader::checkType(b, ObjectReader::SIMPLECOLUMN);
1666     ReturnedColumn::unserialize(b); // parent class unserialize
1667     b >> (uint32_t&) fOid;
1668     b >> fData;
1669     b >> reinterpret_cast<ByteStream::doublebyte&>(fReturnAll);
1670     b >> (uint32_t&) fSequence;
1671     */
1672 }
1673 
1674 //-----------------------------------------------------------------------------
1675 
1676 }   // namespace WriteEngine
1677