/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /******************************************************************************* * $Id$ * *******************************************************************************/ /* * we_dataloader.cpp * * Created on: Oct 4, 2011 * Author: Boby Paul: bpaul@calpont.com */ #include "mcsconfig.h" // Used to pickup STRERROR_R_CHAR_P definition #include #include #include #include #include //pipe() && fork() #if defined(__linux__) #include //wait() #elif defined(__FreeBSD__) #include #include // For stat(). #include #include #include #endif #include #include #include #include "bytestream.h" #include "rwlock_local.h" #include #include #include #include #include #include using namespace std; #include #include #include #include using namespace boost; #include "bytestream.h" #include "messagequeue.h" using namespace messageqcpp; #include "we_messages.h" #include "we_brmrprtparser.h" #include "we_cleartablelockcmd.h" #include "we_dataloader.h" #include "we_readthread.h" #include "installdir.h" namespace WriteEngine { //bool WEDataLoader::fTearDownCpimport=false; // @bug 4267 //----------------------------------------------------------------------------- /** * * @brief WEDataLoader::Constructor * **/ WEDataLoader::WEDataLoader(SplitterReadThread& Srt ): fRef(Srt), fMode(-1), fDataDumpFile(), fTxBytes(0), fRxBytes(0), fPmId(0), fCh_pid(0), fThis_pid(0), fP_pid(0), fpCfThread(0), fTearDownCpimport(false), // @bug 4267 fWaitPidRc(0), // @bug 4267 fWaitPidStatus(0), // @bug 4267 fForceKill(false), fPipeErr(false), fpSysLog(0) { Config weConfig; uint16_t localModuleId = weConfig.getLocalModuleID(); fPmId = static_cast(localModuleId); srand ( time(NULL) ); // initialize random seed int aObjId = rand() % 10000 + 1; // generate a random number setObjId(aObjId); setupSignalHandlers(); if (!fpSysLog) { fpSysLog = SimpleSysLog::instance(); fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SRV)); } } //----------------------------------------------------------------------------- /** * * @brief WEDataLoader::Destructor * **/ WEDataLoader::~WEDataLoader() { try { if (fDataDumpFile.is_open()) fDataDumpFile.close(); cout << "\tRx Bytes " << getRxBytes() << endl; cout << "\tTX Bytes " << getTxBytes() << endl; cout << "\tChild PID " << getChPid() << endl; if (getChPid()) { if (2 == getMode()) //@bug 5012 { kill(getChPid(), SIGINT); teardownCpimport(fTearDownCpimport); } else { teardownCpimport(false); // @bug 4267 } } } catch (std::exception&) // @bug 4164: exception causing thread to exit { cout << "Error tearing down cpimport in WEDataLoader destructor" << endl; } //cout << "Leaving WEDataLoader destructor" << endl; } //------------------------------------------------------------------------------ // Initialize signal handling //------------------------------------------------------------------------------ void WEDataLoader::setupSignalHandlers() { #ifndef _MSC_VER signal(SIGPIPE, SIG_IGN); signal(SIGCHLD, WEDataLoader::onSigChild); #endif } //------------------------------------------------------------------------------ // handles on signal Terminate //------------------------------------------------------------------------------ void WEDataLoader::onSigChild(int aInt) { std::string aStr = "Received SIGCHLD of terminated process.."; cout << aStr << endl; // fTearDownCpimport = true; // @bug 4267 // commented out for non-static variables //ostringstream oss; //oss << getObjId() <<" : " <logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); } //----------------------------------------------------------------------------- /** * * @brief WEDataLoader::update * **/ bool WEDataLoader::update(Subject* pSub) { return true; } //----------------------------------------------------------------------------- /** * * @brief setup cpimport as a seperate process * **/ bool WEDataLoader::setupCpimport() // fork the cpimport { pid_t aChPid; errno = 0; if (pipe(fFIFO) == -1) { int errnum = errno; ostringstream oss; oss << getObjId() << " : Error in creating pipe (errno-" << errnum << "); " << strerror(errnum); throw runtime_error( oss.str() ); } setPid(getpid()); setPPid(getppid()); errno = 0; aChPid = fork(); if (aChPid == -1) //an error caused { int errnum = errno; ostringstream oss; oss << getObjId() << " : Error in forking cpimport.bin (errno-" << errnum << "); " << strerror(errnum); throw runtime_error( oss.str() ); } else if (aChPid == 0) // we are in child { int aStartFD = 3; int aEndFD = fFIFO[1] + 256; close(fFIFO[1]); //close the WRITER of CHILD cout << "Child Process Info: PID = " << getpid() << " (fFIFO[0], fFIFO[1]) = (" << fFIFO[0] << "," << fFIFO[1] << ")" << " (StartFD, EndFD) = (" << aStartFD << "," << aEndFD << ")" << endl; std::vector Cmds; //str2Argv(fCmdLineStr, Cmds); // to avoid out-of-scope problem std::string aCmdLine = fCmdLineStr; std::istringstream ss(aCmdLine); std::string arg; std::vector v2(20, ""); unsigned int i = 0; while (ss >> arg) { v2[i++] = arg; } for (unsigned int j = 0; j < i; ++j) { Cmds.push_back(const_cast(v2[j].c_str())); } Cmds.push_back(0); //null terminate //updatePrgmPath(Cmds); //NOTE: for debugging int aSize = Cmds.size(); for (int aIdx = 0; aIdx < aSize; ++aIdx) { cout << "Args " << Cmds[aIdx] << endl; } cout.flush(); close(0); //close stdin for the child dup2(fFIFO[0], 0); //make stdin be the reading end of the pipe //BUG 4410 : hacky solution so that CHLD process get EOF on close of pipe for (int i = aStartFD; i < aEndFD; i++) close(i); errno = 0; int aRet = execvp(Cmds[0], &Cmds[0]); //NOTE - works with full Path //int aRet = execvp(Cmds[0], &Cmds[0]); //NOTE - works if $PATH has cpimport int execvErrno = errno; cout << "Return status of cpimport is " << aRet << endl; cout.flush(); close(fFIFO[0]); // will trigger an EOF on stdin ostringstream oss; oss << getObjId() << " : execv error: cpimport.bin invocation failed; " << "(errno-" << errno << "); " << strerror(execvErrno) << "; Check file and try invoking locally."; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); if (aRet == -1) exit(-1); } else // parent { setChPid(aChPid); // This is the child PID cout << "Child PID is " << this->getChPid() << endl; close(fFIFO[0]); //close the READER of PARENT // now we can send all the data thru FIFO[1], writer of PARENT } if (aChPid == 0) cout << "******** Child finished its work ********" << endl; return true; } //----------------------------------------------------------------------------- /** * * @brief close all file handles opened for cpimport * @brief wait for the cpimport process to finish work * **/ void WEDataLoader::teardownCpimport(bool useStoredWaitPidStatus) // @bug 4267 { fTearDownCpimport = false; //Reset it //cout << "Tearing down Cpimport" << endl; int aStatus; //cout << "checking fpCfThread value" << endl; if (fpCfThread) { //cout << "checking fpCfThread has a valid value" << endl; //wait until we are done with the queued messages while ((!fpCfThread->isMsgQueueEmpty()) && (!fpCfThread->isStopped())) { //cout << "DEBUG : MsgQueue not empty" << endl; //cannot be too low, since there is a lock in isMsgQueueEmpty() usleep(2000000); } // while(fpCfThread->isPushing()) // { // cout << "DEBUG : still pushing" << endl; // usleep(100000); // } if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Message Queue is empty; Stopping CF Thread"; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } fpCfThread->stopThread(); while (!fpCfThread->isStopped()) { cout << "DEBUG : still not stopped" << endl; usleep(100000); } delete fpCfThread; fpCfThread = 0; } closeWritePipe(); pid_t aPid; // @bug 4267 begin: call waitpid() to get job status or use stored job status // aPid = waitpid(getChPid(), &aStatus, 0); // wait until cpimport finishs if (useStoredWaitPidStatus) { aPid = fWaitPidRc; aStatus = fWaitPidStatus; } else { //aPid = waitpid(getChPid(), &aStatus, 0); // wait until cpimport finishs aPid = waitpid(getChPid(), &aStatus, WNOHANG); // wait until cpimport finishs int aIdx = 0; while ((aPid == 0) && (aIdx < 25 * MAX_QSIZE)) //Do not loop infinitly { usleep(2000000); aPid = waitpid(getChPid(), &aStatus, WNOHANG); cout << "Inside tearDown waitpid rc[" << aIdx << "] = " << aPid << endl; ++aIdx; } } // @bug 4267 end // BP - added -1 as per DMC comment below if ((aPid == getChPid()) || (aPid == -1)) // @bug 4267 (DMC-shouldn't we check for aPid of -1?) { setChPid(0); if ((WIFEXITED(aStatus)) && (WEXITSTATUS(aStatus) == 0)) { cout << "\tCpimport exit on success" << endl; ostringstream oss; oss << getObjId() << " : cpimport exit on success"; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); onCpimportSuccess(); } else { int termsig = (WIFSIGNALED(aStatus) ? WTERMSIG(aStatus) : -1); if (!fForceKill) { cout << "\tCpimport exit on failure (signal " << termsig << ')' << endl; ostringstream oss; oss << getObjId() << " : cpimport exit on failure (signal " << termsig << ')'; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); onCpimportFailure(); } else { cout << "\tCpimport exit on Force Kill!!" << endl; ostringstream oss; oss << getObjId() << " : cpimport exit on Force kill!!"; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); onCpimportSuccess(); } } } } //----------------------------------------------------------------------------- /** * @brief Push the data to cpimport from the incoming ByteStream * @param Incoming ByteStream * */ // void WEDataLoader::pushData2Cpimport(ByteStream& Ibs) { if (Ibs.length() > 0) { int aLen = Ibs.length(); char* pStart = reinterpret_cast(Ibs.buf()); char* pEnd = pStart + aLen; char* pPtr = pStart; while (pPtr < pEnd) { // if(pEnd > (pPtr + MAX_LEN)) // { // int aRet = write(fFIFO[1], pPtr, MAX_LEN); // if(aRet == -1) throw runtime_error("Pipe write error"); // //write(fFIFO[1], Ibs.buf(), Ibs.length()); // pPtr += MAX_LEN; // } // else // { // int aStrLen = pEnd - pPtr; // int aRet = write(fFIFO[1], pPtr, aStrLen); // if(aRet == -1) throw runtime_error("Pipe write error"); // pPtr += aStrLen; // } try { int aRet = write(fFIFO[1], pPtr, pEnd - pPtr); if (aRet < 0) { if (!fPipeErr) { int e = errno; fPipeErr = true; std::string aStr = "pushing data : PIPE error ........."; char errMsgBuf[160]; #if STRERROR_R_CHAR_P const char* pErrMsg = strerror_r( e, errMsgBuf, sizeof(errMsgBuf)); if (pErrMsg) aStr += pErrMsg; #else int errMsgRc = strerror_r(e, errMsgBuf, sizeof(errMsgBuf)); if (errMsgRc == 0) aStr += errMsgBuf; #endif logging::Message::Args errMsgArgs; errMsgArgs.add(aStr); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); } throw runtime_error("Pipe Error - cpimport.bin exited already!!"); } pPtr += aRet; } catch (...) { //std::string aStr = "pushing data PIPE error ........."; //logging::Message::Args errMsgArgs; //errMsgArgs.add(aStr); //fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); throw runtime_error("Pipe Error - cpimport.bin exited already!!"); } } } } //----------------------------------------------------------------------------- /** * * @brief Close the pipe through which data was written to cpimport * @brief This will also signal a EOF to the reading pipe. * **/ void WEDataLoader::closeWritePipe() { cout << "Going to close call = " << fFIFO[1] << endl; //NOTE this will flush the file buffer and close it. int aRet = close(fFIFO[1]); // will trigger EOD cout << "----- closed both pipes -------- aRet = " << aRet << endl; } //----------------------------------------------------------------------------- /** * * @brief Tokenize a string into char** argv format and store in a vector * @brief we pass the V as arguments to exec cpimport * @param CmdLine is the string form of arguments demlimited by space * @param V vector which contains each element of argv * **/ void WEDataLoader::str2Argv(std::string CmdLine, std::vector& V) { std::istringstream ss(CmdLine); std::string arg; std::vector v2; while (ss >> arg) { v2.push_back(arg); V.push_back(const_cast(v2.back().c_str())); } V.push_back(0); //null terminate } //----------------------------------------------------------------------------- /** * * @brief Event to trigger when cpimport is successful. * **/ void WEDataLoader::onCpimportSuccess() { ByteStream obs; cout << "Sending BRMRPT" << endl; obs << (ByteStream::byte)WE_CLT_SRV_BRMRPT; obs << (ByteStream::byte)fPmId; // PM id // for testing //std::string fRptFileName("ReportFile.txt"); BrmReportParser aBrmRptParser; bool aRet = aBrmRptParser.serialize(fBrmRptFileName, obs); if (aRet) { boost::mutex::scoped_lock aLock(fClntMsgMutex); //aBrmRptParser.unserialize(obs); - was for testing updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); cout << "Finished Sending BRMRPT" << endl; } else { cout << "Failed to serialize BRMRpt " << endl; } // if(remove(fBrmRptFileName.c_str()) != 0) // cout <<"Failed to delete BRMRpt File "<< fBrmRptFileName << endl; //usleep(1000000); //sleep 1 second. obs.reset(); obs << (ByteStream::byte)WE_CLT_SRV_CPIPASS; obs << (ByteStream::byte)fPmId; // PM id boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); cout << "Sent CPIPASS info" << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : onCpimportSuccess BrmReport Send"; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } } //----------------------------------------------------------------------------- /** * * @brief Event to trigger if a cpimport failure occurs. * **/ void WEDataLoader::onCpimportFailure() { // Send failure notice back to the parent splitter job sendCpimportFailureNotice(); //Even if we failed, we have failure info in BRMRPT ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_BRMRPT; obs << (ByteStream::byte)fPmId; // PM id BrmReportParser aBrmRptParser; bool aRet = aBrmRptParser.serialize(fBrmRptFileName, obs); if (aRet) { boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (std::exception& ex) { cout << "Broken Pipe .." << ex.what() << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed; " << ex.what(); logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); } } aLock.unlock(); cout << "Finished Sending BRMRPT" << endl; } if (remove(fBrmRptFileName.c_str()) != 0) cout << "Failed to delete BRMRpt File " << fBrmRptFileName << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : onCpimportFailure BrmReport Send"; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } } //----------------------------------------------------------------------------- // Send msg to front-end splitter to notify it that a cpimport.bin pgm failed. //----------------------------------------------------------------------------- void WEDataLoader::sendCpimportFailureNotice() { ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_CPIFAIL; obs << (ByteStream::byte)fPmId; // PM id boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); } } } //----------------------------------------------------------------------------- /** * @brief Event when a KEEPALIVE arrives. * @param Incoming ByteStream, not used currently */ void WEDataLoader::onReceiveKeepAlive(ByteStream& Ibs) { /* // TODO comment out when we done with debug if(fpSysLog) { ostringstream oss; oss << getObjId() <<" : Received KEEPALIVE"; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } */ //cout << "Received KEEPALIVE" << endl; // NOTE only seldom a KEEPALIVE receives, // so nothing wrong in responding with a KEEPALIVE. ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE; obs << (ByteStream::byte)fPmId; // PM id boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << ": Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); // @bug 4267 begin int aStatus; pid_t aPid; if (getChPid() > 0) { aPid = waitpid(getChPid(), &aStatus, WNOHANG); // wait until cpimport finishs if (aPid != 0) { cout << "waitpid(" << getChPid() << "): rc-" << aPid << "; status-" << aStatus << "; exited-" << (WIFEXITED(aStatus)) << endl; } if ((aPid == getChPid()) || (aPid == -1)) { fTearDownCpimport = true; fWaitPidRc = aPid; fWaitPidStatus = aStatus; } } // @bug 4267 end if (fTearDownCpimport) { //fTearDownCpimport = false; //Reset it // commented out to use the flag in EOD cout << "Cpimport terminated " << endl; if (0 == getMode()) onReceiveEod(Ibs); else if (1 == getMode()) //mode 1 has to drive from UM { ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_EOD; obs << (ByteStream::byte)fPmId; // PM id boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << ": Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); } else if (2 == getMode()) { //if(getChPid()) teardownCpimport(true); // @bug 4267 if (getChPid()) teardownCpimport(fTearDownCpimport); // @bug 4267 BP } } else { //if(1 == getMode()) // if(fpCfThread) cout << "Queue Size = " << fpCfThread->getQueueSize() << endl; } } //----------------------------------------------------------------------------- /** * @brief trigger when a DATA arrives. * @param Incoming ByteStream which contains data */ void WEDataLoader::onReceiveData(ByteStream& Ibs) { if ((0 == getMode()) && (fDataDumpFile.is_open())) { // Will write to the output file. fDataDumpFile << Ibs; sendDataRequest(); } else if ( 1 == getMode()) { // commented out since we are going to use seperate thread //pushData2Cpimport(Ibs); if (fpCfThread) { fpCfThread->add2MsgQueue(Ibs); //sendDataRequest(); // Need to control Queue Size // Bug 5031 : Will only send 1 rqst for a batch to cpimport.bin //if(fpCfThread->getQueueSize()getQueueSize()getQueueSize() : 0; // Bug 5031 : If Q size goes above 100 (2*250); if (aQsz < MAX_QSIZE) sendDataRequest(); if (aQsz > 1.5 * MAX_QSIZE) // > 2*250 { cout << "WARNING : Data Queuing up : QSize = " << aQsz << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << "WARNING : Data Queuing up : QSize = " << aQsz; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } } } else if ( 2 == getMode()) { cout << "onReceiveData : In Mode 2 NO data suppose to receive" << endl; } // ByteStream obs; // obs << (ByteStream::byte)WE_CLT_SRV_DATARQST; // obs << (ByteStream::byte)fPmId; // PM id // updateTxBytes(obs.length()); // fRef.fIos.write(obs); } //----------------------------------------------------------------------------- /** * @brief trigger when a EOD arrives. * @param Incoming ByteStream; not relevent for now */ void WEDataLoader::onReceiveEod(ByteStream& Ibs) { if (fpSysLog) { ostringstream oss; oss << getObjId() << " : onReceiveEOD : child ID = " << getChPid(); logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } cout << "Received EOD " << endl; if (0 == getMode()) { fDataDumpFile.close(); } ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_EOD; obs << (ByteStream::byte)fPmId; // PM id boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); //if(( 1 == getMode())||( 2 == getMode())) if (1 == getMode()) //BUG 4370 - seperated mode 1 & 2 { //if(getChPid()) teardownCpimport(false); // @bug 4267 if (getChPid()) teardownCpimport(fTearDownCpimport); // @bug 4267 //BP changed to send the correct flag } else if (2 == getMode()) //BUG 4370 { if (getChPid()) { kill(getChPid(), SIGINT); //BUG 4370 fForceKill = true; teardownCpimport(fTearDownCpimport); //BUG 4370 } } } //----------------------------------------------------------------------------- /** * @brief Event on Command Received. It should contain sub commands * @param Incoming ByteStream, will have sub commands */ void WEDataLoader::onReceiveCmd(ByteStream& bs) { //TODO - can be cpimport cmd or server cmd, for now write to a file ByteStream::byte aCmdId; bs >> aCmdId; } //----------------------------------------------------------------------------- /** * @brief The mode in which WES running. * @param Incoming ByteStream, not relevent */ void WEDataLoader::onReceiveMode(ByteStream& Ibs) { // Assigning it here since WEDataLoader constructor is called multiple times // while coping in readthread class. if (!fpSysLog) { fpSysLog = SimpleSysLog::instance(); fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SRV)); } Ibs >> (ByteStream::quadbyte&)fMode; cout << "Setting fMode = " << fMode << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : onReceiveMode() Setting fMode = " << fMode; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } char aName[64]; snprintf(aName, sizeof(aName), "ModuleDBRootCount%d-3", fPmId); string aStrDbRootCnt = config::Config::makeConfig()->getConfig( "SystemModuleConfig", aName); cout << "DbRootCnt = " << aStrDbRootCnt << endl; ByteStream::byte aDbCnt = (ByteStream::byte)atoi(aStrDbRootCnt.c_str()); if (fpSysLog) { ostringstream oss; oss << getObjId() << " : onReceiveMode() DbRoot Count = " + aStrDbRootCnt; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } //Send No. of DBRoots to Client ByteStream aObs; aObs << (ByteStream::byte)WE_CLT_SRV_DBRCNT; aObs << (ByteStream::byte)fPmId; aObs << (ByteStream::byte)aDbCnt; boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(aObs.length()); try { fRef.fIos.write(aObs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); } //----------------------------------------------------------------------------- /** * @brief Acknowledgment. Not relevant at this point of time. * @brief Can make use to update the BRM * @param Incoming ByteStream, not relevant for now */ void WEDataLoader::onReceiveAck(ByteStream& Ibs) { // All is good // update the status } //----------------------------------------------------------------------------- /** * @brief NAK. A Failure, Rollback should be initiated. * @brief Can make use to update the BRM * @param Incoming ByteStream, not relevant for now */ void WEDataLoader::onReceiveNak(ByteStream& Ibs) { // TODO - handle the problem } //----------------------------------------------------------------------------- /** * @brief ERROR. A Failure, Rollback should be initiated. * @brief Can make use to update the BRM * @param Incoming ByteStream, not relevant for now */ void WEDataLoader::onReceiveError(ByteStream& Ibs) { // TODO - handle the failure situation. } //------------------------------------------------------------------------------ // onReceiveCmdLineArgs - do what ever need to do with command line args //------------------------------------------------------------------------------ /** * @brief Command line args received. * @brief Can make use to update the BRM * @param Incoming ByteStream, not relevant for now */ void WEDataLoader::onReceiveCmdLineArgs(ByteStream& Ibs) { Ibs >> fCmdLineStr; cout << "CMD LINE ARGS came in " << fCmdLineStr << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : CMD LINE ARGS came in " << fCmdLineStr; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } ByteStream obs; //TODO - Need to check all clear for starting CPI if (0 != getMode()) { obs << (ByteStream::byte)WE_CLT_SRV_STARTCPI; } else { obs << (ByteStream::byte)WE_CLT_SRV_DATARQST; } obs << (ByteStream::byte) fPmId; // PM id boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); } //----------------------------------------------------------------------------- void WEDataLoader::onReceiveStartCpimport() { cout << "Start Cpimport command reached!!" << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Start Cpimport command reached!!"; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } try { setupCpimport(); if (1 == getMode()) // create a thread to handle the data feeding part { fpCfThread = new WECpiFeederThread(*this); fpCfThread->startFeederThread(); } } catch (std::exception& ex) { // send an CPI FAIL command back to splitter if (fpSysLog) { logging::Message::Args errMsgArgs; errMsgArgs.add(ex.what()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); sendCpimportFailureNotice(); return; } } if (1 == getMode()) // In mode 2/0 we do not rqst data. { sendDataRequest(); } // We need to respond to KEEP ALIVES //else if(2 == getMode()) // Now we wait till cpimport comes back //{ // if(getChPid()) // teardownCpimport(); //} } //----------------------------------------------------------------------------- void WEDataLoader::onReceiveBrmRptFileName(ByteStream& Ibs) { Ibs >> fBrmRptFileName; cout << "Brm Rpt Filename Arrived " << fBrmRptFileName << endl; //BUG 4645 string::size_type idx = fBrmRptFileName.find_last_of('/'); if (idx > 0 && idx < string::npos) { string dirname(fBrmRptFileName, 0, idx); struct stat st; if (stat(dirname.c_str(), &st) != 0) { cout << "Creating directory : " << dirname << endl; boost::filesystem::create_directories(dirname.c_str()); } } if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Brm Rpt Filename Arrived " << fBrmRptFileName; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } } //----------------------------------------------------------------------------- void WEDataLoader::onReceiveCleanup(ByteStream& Ibs) { if (fpSysLog) { ostringstream oss; oss << getObjId() << " : OnReceiveCleanup arrived"; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } std::string aErrMsg; WE_ClearTableLockCmd aClrTblLockCmd("DataLoader"); int aRet = aClrTblLockCmd.processCleanup(Ibs, aErrMsg); ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_CLEANUP; obs << (ByteStream::byte) fPmId; // PM id if (aRet == 0) obs << (ByteStream::byte)1; // cleanup success else obs << (ByteStream::byte)0; // cleanup failed boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); } //----------------------------------------------------------------------------- void WEDataLoader::onReceiveRollback(ByteStream& Ibs) { if (fpSysLog) { ostringstream oss; oss << getObjId() << " : OnReceiveRollback arrived"; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000); } std::string aErrMsg; WE_ClearTableLockCmd aClrTblLockCmd("DataLoader"); int aRet = aClrTblLockCmd.processRollback(Ibs, aErrMsg); ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_ROLLBACK; obs << (ByteStream::byte) fPmId; // PM id if (aRet == 0) obs << (ByteStream::byte)1; // Rollback success else obs << (ByteStream::byte)0; // Rollback failed boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); } //----------------------------------------------------------------------------- void WEDataLoader::onReceiveImportFileName(ByteStream& Ibs) { bool aGoodFile = true; std::string aFileName; Ibs >> aFileName; //BUG 4245 : Need to check the file or path exists { std::fstream aFin; aFin.open(aFileName.c_str(), std::ios::in); if (aFin.is_open()) // File exist, send an ERROR immediately { // file exists ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR; obs << (ByteStream::byte)fPmId; updateTxBytes(obs.length()); boost::mutex::scoped_lock aLock(fClntMsgMutex); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aGoodFile = false; aLock.unlock(); } aFin.close(); } if (aGoodFile) { fDataDumpFile.open(aFileName.c_str(), std::ios::app); //BUG 4245 : If file dir is not existing, we need to fail this import if (!fDataDumpFile.good()) { ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR; obs << (ByteStream::byte)fPmId; // PM id boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); } } } //------------------------------------------------------------------------------ void WEDataLoader::onReceiveJobId(ByteStream& Ibs) { std::string aJobFileName; Ibs >> aJobFileName; cout << "Incoming JobFileName : " << aJobFileName << endl; //BUG 4645 string::size_type idx = aJobFileName.find_last_of('/'); if (idx > 0 && idx < string::npos) { string dirname(aJobFileName, 0, idx); struct stat st; if (stat(dirname.c_str(), &st) != 0) { cout << "Creating directory : " << dirname << endl; boost::filesystem::create_directories(dirname.c_str()); } } fJobFile.open(aJobFileName.c_str()); } //------------------------------------------------------------------------------ void WEDataLoader::onReceiveJobData(ByteStream& Ibs) { // Will write to the output file. std::string aData; Ibs >> aData; fJobFile << aData; fJobFile.close(); } //------------------------------------------------------------------------------ void WEDataLoader::onReceiveErrFileRqst(ByteStream& Ibs) { std::string aErrFileName; Ibs >> aErrFileName; cout << "Error Filename Arrived " << aErrFileName << endl; ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_ERRLOG; obs << (ByteStream::byte) fPmId; // PM id obs << aErrFileName; BrmReportParser aErrFileParser; bool aRet = aErrFileParser.serialize(aErrFileName, obs); if (aRet) { boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); } // delete the temp files if (remove(aErrFileName.c_str()) != 0 ) cout << "Failed in removing Error file: " << aErrFileName << endl; } //------------------------------------------------------------------------------ // Process the receipt of a msg containing the contents of a *.bad file. //------------------------------------------------------------------------------ void WEDataLoader::onReceiveBadFileRqst(ByteStream& Ibs) { std::string aBadFileName; Ibs >> aBadFileName; cout << "Error Filename Arrived " << aBadFileName << endl; ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_BADLOG; obs << (ByteStream::byte) fPmId; // PM id obs << aBadFileName; BrmReportParser aBadFileParser; bool aRet = aBadFileParser.serializeBlocks(aBadFileName, obs); if (aRet) { boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); } // delete the temp files if ( remove(aBadFileName.c_str()) != 0) cout << "Failed in removing Error file: " << aBadFileName << endl; } //------------------------------------------------------------------------------ void WEDataLoader::sendDataRequest() { int aQsz = (fpCfThread) ? fpCfThread->getQueueSize() : 0; //if(aQsz>MIN_QSIZE) // Bug 5031 : If Q size goes above 100 (2*50); there is some thing wrong // will put a warning in info log. Controlled in Cpimport init data rqst cnt if (aQsz > MAX_QSIZE) // >250 { cout << "WARNING : Data Queuing up : QSize = " << aQsz << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << "WARNING : Data Queuing up : QSize = " << aQsz; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } boost::mutex::scoped_lock aLock(fClntMsgMutex); ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_DATARQST; obs << (ByteStream::byte)fPmId; // PM id updateTxBytes(obs.length()); try { fRef.fIos.write(obs); } catch (...) { cout << "Broken Pipe .." << endl; if (fpSysLog) { ostringstream oss; oss << getObjId() << " : Broken Pipe : socket write failed "; logging::Message::Args errMsgArgs; errMsgArgs.add(oss.str()); fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } aLock.unlock(); } //------------------------------------------------------------------------------ void WEDataLoader::serialize(messageqcpp::ByteStream& b) const { //TODO to be changed. left it here to understand how to implement /* b << (ObjectReader::id_t) ObjectReader::SIMPLECOLUMN; ReturnedColumn::serialize(b); // parent class serialize b << (uint32_t) fOid; b << fData; b << static_cast(fReturnAll); b << (uint32_t) fSequence; */ } //----------------------------------------------------------------------------- void WEDataLoader::unserialize(messageqcpp::ByteStream& b) { //TODO to be changed. left it here to understand how to implement /* ObjectReader::checkType(b, ObjectReader::SIMPLECOLUMN); ReturnedColumn::unserialize(b); // parent class unserialize b >> (uint32_t&) fOid; b >> fData; b >> reinterpret_cast(fReturnAll); b >> (uint32_t&) fSequence; */ } //----------------------------------------------------------------------------- } // namespace WriteEngine