1 /* Copyright (C) 2014 InfiniDB, Inc. 2 3 This program is free software; you can redistribute it and/or 4 modify it under the terms of the GNU General Public License 5 as published by the Free Software Foundation; version 2 of 6 the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 16 MA 02110-1301, USA. */ 17 18 /******************************************************************************* 19 * $Id$ 20 * 21 *******************************************************************************/ 22 23 /* 24 * we_spltrdatahandler.h 25 * 26 * Created on: Oct 17, 2011 27 * Author: bpaul 28 */ 29 30 #ifndef WE_SPLITTERDATAHANDLER_H_ 31 #define WE_SPLITTERDATAHANDLER_H_ 32 33 34 #include "liboamcpp.h" 35 #include "resourcemanager.h" 36 #include "threadsafequeue.h" 37 #include "dbrm.h" 38 #include "batchloader.h" 39 #include "we_log.h" 40 #include "we_type.h" 41 42 #include "we_filereadthread.h" 43 #include "we_splclient.h" 44 45 46 namespace WriteEngine 47 { 48 49 class WESplitterApp; //forward declaration 50 class WESplClient; 51 class WEFileReadThread; 52 53 //------------------------------------------------------------------------------ 54 // a stl list to keep Next PM to send data 55 //------------------------------------------------------------------------------ 56 57 class WEPmList 58 { 59 public: WEPmList()60 WEPmList(): fPmList(), fListMutex() {} ~WEPmList()61 virtual ~WEPmList() 62 { 63 fPmList.clear(); 64 } 65 66 void addPm2List(int PmId); 67 void addPriorityPm2List(int PmId); 68 int getNextPm(); 69 void clearPmList(); 70 bool check4Pm(int PmId); 71 72 private: 73 typedef std::list<int> WePmList; // List to add in front 74 WePmList fPmList; 75 boost::mutex fListMutex; //mutex controls add/remove 76 77 }; 78 79 80 //------------------------------------------------------------------------------ 81 class WESDHandler 82 { 83 84 public: 85 WESDHandler(WESplitterApp& Ref); 86 WESDHandler(const WESDHandler& rhs); 87 virtual ~WESDHandler(); 88 89 void setup(); 90 void shutdown(); 91 void reset(); 92 void send2Pm(messageqcpp::SBS& Sbs, unsigned int PmId = 0); 93 void send2Pm(messageqcpp::ByteStream& Bs, unsigned int PmId = 0); 94 void sendEODMsg(); 95 void checkForRespMsgs(); 96 void add2RespQueue(const messageqcpp::SBS& Sbs); 97 void exportJobFile(std::string& JobId, std::string& JobFileName); 98 int leastDataSendPm(); 99 bool check4AllBrmReports(); 100 bool updateCPAndHWMInBRM(); 101 void cancelOutstandingCpimports(); 102 void doRollback(); 103 void doCleanup(bool deleteHdfsTempDbFiles); 104 void getErrorLog(int PmId, const std::string& ErrFileName); 105 void getBadLog(int PmId, const std::string& BadFileName); 106 int check4RollbackRslts(); 107 bool check4AllRollbackStatus(); 108 int check4CleanupRslts(); 109 bool check4AllCleanupStatus(); 110 bool check4AllCpiStarts(); 111 bool releaseTableLocks(); 112 void check4CpiInvokeMode(); 113 bool check4PmArguments(); 114 void setInputFileList(std::string InFileName); 115 bool check4CriticalErrMsgs(std::string& Entry); 116 117 void onStartCpiResponse(int PmId); 118 void onDataRqstResponse(int PmId); 119 void onAckResponse(int PmId); 120 void onNakResponse(int PmId); 121 void onEodResponse(int Pmid); 122 void onPmErrorResponse(int PmId); 123 void onKeepAliveMessage(int PmId); 124 void onCpimportPass(int PmId); 125 void onCpimportFail(int PmId, bool SigHandle = false); 126 void onImpFileError(int PmId); 127 void onBrmReport(int PmId, messageqcpp::SBS& Sbs); 128 void onErrorFile(int PmId, messageqcpp::SBS& Sbs); 129 void onBadFile(int PmId, messageqcpp::SBS& Sbs); 130 void onRollbackResult(int PmId, messageqcpp::SBS& Sbs); 131 void onCleanupResult(int PmId, messageqcpp::SBS& Sbs); 132 void onDBRootCount(int PmId, messageqcpp::SBS& Sbs); 133 void onHandlingSignal(); 134 void onHandlingSigHup(); 135 void onDisconnectFailure(); 136 137 int getNextPm2Feed(); 138 int getNextDbrPm2Send(); 139 int getTableOID(std::string Schema, std::string Table); 140 std::string getTime2Str() const; 141 142 bool checkAllCpiPassStatus(); 143 bool checkAllCpiFailStatus(); 144 bool checkForRollbackAndCleanup(); 145 bool checkForCpiFailStatus(); 146 147 void checkForConnections(); 148 void sendHeartbeats(); 149 std::string getTableName() const; 150 std::string getSchemaName() const; 151 char getEnclChar(); 152 char getEscChar(); 153 char getDelimChar(); 154 bool getConsoleLog(); 155 int getReadBufSize(); 156 ImportDataMode getImportDataMode() const; 157 void sysLog(const logging::Message::Args& msgArgs, 158 logging::LOG_TYPE logType, logging::Message::MessageID msgId); 159 160 getFpRespThread()161 boost::thread* getFpRespThread() const 162 { 163 return fpRespThread; 164 } getQId()165 unsigned int getQId() const 166 { 167 return fQId; 168 } setFpRespThread(boost::thread * pRespThread)169 void setFpRespThread(boost::thread* pRespThread) 170 { 171 fpRespThread = pRespThread; 172 } setQId(unsigned int QId)173 void setQId(unsigned int QId) 174 { 175 fQId = QId; 176 } isContinue()177 bool isContinue() const 178 { 179 return fContinue; 180 } setContinue(bool Continue)181 void setContinue(bool Continue) 182 { 183 fContinue = Continue; 184 } getPmCount()185 int getPmCount() const 186 { 187 return fPmCount; 188 } setPmCount(int PmCount)189 void setPmCount(int PmCount) 190 { 191 fPmCount = PmCount; 192 } getNextPm2Send()193 int getNextPm2Send() 194 { 195 return fDataFeedList.getNextPm(); 196 } check4Ack(unsigned int PmId)197 bool check4Ack(unsigned int PmId) 198 { 199 return fDataFeedList.check4Pm(PmId); 200 } getTableOID()201 int getTableOID() 202 { 203 return fTableOId; 204 } setDebugLvl(int DebugLvl)205 void setDebugLvl(int DebugLvl) 206 { 207 fDebugLvl = DebugLvl; 208 } getDebugLvl()209 int getDebugLvl() 210 { 211 return fDebugLvl; 212 } getTableRecLen()213 unsigned int getTableRecLen() const 214 { 215 return fFixedBinaryRecLen; 216 } updateRowTx(unsigned int RowCnt,int CIdx)217 void updateRowTx(unsigned int RowCnt, int CIdx) 218 { 219 fWeSplClients[CIdx]->updateRowTx(RowCnt); 220 } resetRowTx()221 void resetRowTx() 222 { 223 for (int aCnt = 1; aCnt <= fPmCount; aCnt++) 224 { 225 if (fWeSplClients[aCnt] != 0) 226 { 227 fWeSplClients[aCnt]->resetRowTx(); 228 } 229 } 230 } setRowsUploadInfo(int PmId,int64_t RowsRead,int64_t RowsInserted)231 void setRowsUploadInfo(int PmId, int64_t RowsRead, int64_t RowsInserted) 232 { 233 fWeSplClients[PmId]->setRowsUploadInfo(RowsRead, RowsInserted); 234 } add2ColOutOfRangeInfo(int PmId,int ColNum,execplan::CalpontSystemCatalog::ColDataType ColType,std::string & ColName,int NoOfOors)235 void add2ColOutOfRangeInfo(int PmId, int ColNum, 236 execplan::CalpontSystemCatalog::ColDataType ColType, 237 std::string& ColName, int NoOfOors) 238 { 239 fWeSplClients[PmId]->add2ColOutOfRangeInfo(ColNum, ColType, ColName, NoOfOors); 240 } setErrorFileName(int PmId,const std::string & ErrFileName)241 void setErrorFileName(int PmId, const std::string& ErrFileName) 242 { 243 fWeSplClients[PmId]->setErrInfoFile(ErrFileName); 244 } setBadFileName(int PmId,const std::string & BadFileName)245 void setBadFileName(int PmId, const std::string& BadFileName) 246 { 247 fWeSplClients[PmId]->setBadDataFile(BadFileName); 248 } 249 250 void setDisconnectFailure(bool Flag); getDisconnectFailure()251 bool getDisconnectFailure() 252 { 253 return fDisconnectFailure; 254 } 255 256 public: // for multi-table support 257 WESplitterApp& fRef; 258 Log fLog; // logger 259 260 private: 261 unsigned int fQId; 262 joblist::ResourceManager* fRm; 263 oam::Oam fOam; 264 oam::ModuleTypeConfig fModuleTypeConfig; 265 int fDebugLvl; 266 int fPmCount; 267 268 int64_t fTableLock; 269 int32_t fTableOId; 270 uint32_t fFixedBinaryRecLen; 271 272 boost::mutex fRespMutex; 273 boost::condition fRespCond; 274 275 boost::mutex fSendMutex; 276 277 // It could be a queue too. Stores all the responses from PMs 278 typedef std::list<messageqcpp::SBS> WESRespList; 279 WESRespList fRespList; 280 // Other member variables 281 boost::thread* fpRespThread; 282 283 WEPmList fDataFeedList; 284 WEFileReadThread fFileReadThread; 285 286 bool fDisconnectFailure; //Failure due to disconnect from PM 287 bool fForcedFailure; 288 bool fAllCpiStarted; 289 bool fFirstDataSent; 290 unsigned int fFirstPmToSend; 291 bool fSelectOtherPm; // Don't send first data to First PM 292 bool fContinue; 293 // set of PM specific vector entries 294 typedef std::vector<WESplClient*> WESplClients; 295 WESplClients fWeSplClients; 296 enum { MAX_PMS = 512, MAX_QSIZE = 10, MAX_WES_QSIZE = 100}; 297 298 typedef std::vector<std::string> StrVec; 299 StrVec fBrmRptVec; 300 301 BRM::DBRM fDbrm; 302 303 batchloader::BatchLoader* fpBatchLoader; 304 305 unsigned int calcTableRecLen(const std::string& schema, 306 const std::string table); 307 308 class WEImportRslt 309 { 310 public: WEImportRslt()311 WEImportRslt(): fRowsPro(0), fRowsIns(0), fStartTime(), fEndTime(), fTotTime(0) {} ~WEImportRslt()312 ~WEImportRslt() {} 313 public: reset()314 void reset() 315 { 316 fRowsPro = 0; 317 fRowsIns = 0; 318 fTotTime = 0; 319 fColOorVec.clear(); 320 } updateRowsProcessed(int64_t Rows)321 void updateRowsProcessed(int64_t Rows) 322 { 323 fRowsPro += Rows; 324 } updateRowsInserted(int64_t Rows)325 void updateRowsInserted(int64_t Rows) 326 { 327 fRowsIns += Rows; 328 } updateColOutOfRangeInfo(int aColNum,execplan::CalpontSystemCatalog::ColDataType aColType,std::string aColName,int aNoOfOors)329 void updateColOutOfRangeInfo(int aColNum, execplan::CalpontSystemCatalog::ColDataType aColType, 330 std::string aColName, int aNoOfOors) 331 { 332 WEColOorVec::iterator aIt = fColOorVec.begin(); 333 334 while (aIt != fColOorVec.end()) 335 { 336 if ((*aIt).fColNum == aColNum) 337 { 338 (*aIt).fNoOfOORs += aNoOfOors; 339 break; 340 } 341 342 aIt++; 343 } 344 345 if (aIt == fColOorVec.end()) 346 { 347 // First time for aColNum to have out of range count 348 WEColOORInfo aColOorInfo; 349 aColOorInfo.fColNum = aColNum; 350 aColOorInfo.fColType = aColType; 351 aColOorInfo.fColName = aColName; 352 aColOorInfo.fNoOfOORs = aNoOfOors; 353 fColOorVec.push_back(aColOorInfo); 354 } 355 356 #if 0 357 358 try 359 { 360 fColOorVec.at(aColNum).fNoOfOORs += aNoOfOors; 361 } 362 catch (out_of_range& e) 363 { 364 // First time for aColNum to have out of range count 365 WEColOORInfo aColOorInfo; 366 aColOorInfo.fColNum = aColNum; 367 aColOorInfo.fColName = aColName; 368 aColOorInfo.fNoOfOORs = aNoOfOors; 369 fColOorVec[aColNum] = aColOorInfo; 370 } 371 372 #endif 373 } startTimer()374 void startTimer() 375 { 376 gettimeofday( &fStartTime, 0 ); 377 } stopTimer()378 void stopTimer() 379 { 380 gettimeofday( &fEndTime, 0 ); 381 } getTotalRunTime()382 float getTotalRunTime() 383 { 384 //fTotTime = (fEndTime>0)?(fEndTime-fStartTime):0; 385 fTotTime = (fEndTime.tv_sec + (fEndTime.tv_usec / 1000000.0)) - 386 (fStartTime.tv_sec + (fStartTime.tv_usec / 1000000.0)); 387 return fTotTime; 388 } 389 390 public: 391 int64_t fRowsPro; //Rows processed 392 int64_t fRowsIns; //Rows inserted 393 timeval fStartTime; //StartTime 394 timeval fEndTime; //EndTime 395 float fTotTime; //TotalTime 396 // A vector containing a list of rows and counts of Out of Range values 397 WEColOorVec fColOorVec; 398 }; 399 WEImportRslt fImportRslt; 400 401 friend class WESplClient; 402 friend class WEBrmUpdater; 403 friend class WESplitterApp; 404 friend class WEFileReadThread; 405 friend class WETableLockGrabber; 406 407 }; 408 //------------------------------------------------------------------------------ 409 410 } /* namespace WriteEngine */ 411 412 #endif /* WE_SPLITTERDATAHANDLER_H_ */ 413