1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*******************************************************************************
19 * $Id$
20 *
21 *******************************************************************************/
22 
23 /*
24  * we_spltrdatahandler.h
25  *
26  *  Created on: Oct 17, 2011
27  *      Author: bpaul
28  */
29 
30 #ifndef WE_SPLITTERDATAHANDLER_H_
31 #define WE_SPLITTERDATAHANDLER_H_
32 
33 
34 #include "liboamcpp.h"
35 #include "resourcemanager.h"
36 #include "threadsafequeue.h"
37 #include "dbrm.h"
38 #include "batchloader.h"
39 #include "we_log.h"
40 #include "we_type.h"
41 
42 #include "we_filereadthread.h"
43 #include "we_splclient.h"
44 
45 
46 namespace WriteEngine
47 {
48 
49 class WESplitterApp;        //forward declaration
50 class WESplClient;
51 class WEFileReadThread;
52 
53 //------------------------------------------------------------------------------
54 //	a stl list to keep Next PM to send data
55 //------------------------------------------------------------------------------
56 
57 class WEPmList
58 {
59 public:
WEPmList()60     WEPmList(): fPmList(), fListMutex() {}
~WEPmList()61     virtual ~WEPmList()
62     {
63         fPmList.clear();
64     }
65 
66     void addPm2List(int PmId);
67     void addPriorityPm2List(int PmId);
68     int getNextPm();
69     void clearPmList();
70     bool check4Pm(int PmId);
71 
72 private:
73     typedef std::list<int> WePmList;	// List to add in front
74     WePmList fPmList;
75     boost::mutex fListMutex;	//mutex controls add/remove
76 
77 };
78 
79 
80 //------------------------------------------------------------------------------
81 class WESDHandler
82 {
83 
84 public:
85     WESDHandler(WESplitterApp& Ref);
86     WESDHandler(const WESDHandler& rhs);
87     virtual ~WESDHandler();
88 
89     void setup();
90     void shutdown();
91     void reset();
92     void send2Pm(messageqcpp::SBS& Sbs, unsigned int PmId = 0);
93     void send2Pm(messageqcpp::ByteStream& Bs, unsigned int PmId = 0);
94     void sendEODMsg();
95     void checkForRespMsgs();
96     void add2RespQueue(const messageqcpp::SBS& Sbs);
97     void exportJobFile(std::string& JobId, std::string& JobFileName);
98     int  leastDataSendPm();
99     bool check4AllBrmReports();
100     bool updateCPAndHWMInBRM();
101     void cancelOutstandingCpimports();
102     void doRollback();
103     void doCleanup(bool deleteHdfsTempDbFiles);
104     void getErrorLog(int PmId, const std::string& ErrFileName);
105     void getBadLog(int PmId, const std::string& BadFileName);
106     int check4RollbackRslts();
107     bool check4AllRollbackStatus();
108     int check4CleanupRslts();
109     bool check4AllCleanupStatus();
110     bool check4AllCpiStarts();
111     bool releaseTableLocks();
112     void check4CpiInvokeMode();
113     bool check4PmArguments();
114     void setInputFileList(std::string InFileName);
115     bool check4CriticalErrMsgs(std::string& Entry);
116 
117     void onStartCpiResponse(int PmId);
118     void onDataRqstResponse(int PmId);
119     void onAckResponse(int PmId);
120     void onNakResponse(int PmId);
121     void onEodResponse(int Pmid);
122     void onPmErrorResponse(int PmId);
123     void onKeepAliveMessage(int PmId);
124     void onCpimportPass(int PmId);
125     void onCpimportFail(int PmId, bool SigHandle = false);
126     void onImpFileError(int PmId);
127     void onBrmReport(int PmId, messageqcpp::SBS& Sbs);
128     void onErrorFile(int PmId, messageqcpp::SBS& Sbs);
129     void onBadFile(int PmId, messageqcpp::SBS& Sbs);
130     void onRollbackResult(int PmId, messageqcpp::SBS& Sbs);
131     void onCleanupResult(int PmId, messageqcpp::SBS& Sbs);
132     void onDBRootCount(int PmId, messageqcpp::SBS& Sbs);
133     void onHandlingSignal();
134     void onHandlingSigHup();
135     void onDisconnectFailure();
136 
137     int getNextPm2Feed();
138     int getNextDbrPm2Send();
139     int getTableOID(std::string Schema, std::string Table);
140     std::string getTime2Str() const;
141 
142     bool checkAllCpiPassStatus();
143     bool checkAllCpiFailStatus();
144     bool checkForRollbackAndCleanup();
145     bool checkForCpiFailStatus();
146 
147     void checkForConnections();
148     void sendHeartbeats();
149     std::string getTableName() const;
150     std::string getSchemaName() const;
151     char getEnclChar();
152     char getEscChar();
153     char getDelimChar();
154     bool getConsoleLog();
155     int getReadBufSize();
156     ImportDataMode getImportDataMode() const;
157     void sysLog(const logging::Message::Args& msgArgs,
158                 logging::LOG_TYPE logType, logging::Message::MessageID msgId);
159 
160 
getFpRespThread()161     boost::thread* getFpRespThread() const
162     {
163         return fpRespThread;
164     }
getQId()165     unsigned int getQId() const
166     {
167         return fQId;
168     }
setFpRespThread(boost::thread * pRespThread)169     void setFpRespThread(boost::thread* pRespThread)
170     {
171         fpRespThread = pRespThread;
172     }
setQId(unsigned int QId)173     void setQId(unsigned int QId)
174     {
175         fQId = QId;
176     }
isContinue()177     bool isContinue() const
178     {
179         return fContinue;
180     }
setContinue(bool Continue)181     void setContinue(bool Continue)
182     {
183         fContinue = Continue;
184     }
getPmCount()185     int getPmCount() const
186     {
187         return fPmCount;
188     }
setPmCount(int PmCount)189     void setPmCount(int PmCount)
190     {
191         fPmCount = PmCount;
192     }
getNextPm2Send()193     int getNextPm2Send()
194     {
195         return fDataFeedList.getNextPm();
196     }
check4Ack(unsigned int PmId)197     bool check4Ack(unsigned int PmId)
198     {
199         return fDataFeedList.check4Pm(PmId);
200     }
getTableOID()201     int getTableOID()
202     {
203         return fTableOId;
204     }
setDebugLvl(int DebugLvl)205     void setDebugLvl(int DebugLvl)
206     {
207         fDebugLvl = DebugLvl;
208     }
getDebugLvl()209     int getDebugLvl()
210     {
211         return fDebugLvl;
212     }
getTableRecLen()213     unsigned int getTableRecLen() const
214     {
215         return fFixedBinaryRecLen;
216     }
updateRowTx(unsigned int RowCnt,int CIdx)217     void updateRowTx(unsigned int RowCnt, int CIdx)
218     {
219         fWeSplClients[CIdx]->updateRowTx(RowCnt);
220     }
resetRowTx()221     void resetRowTx()
222     {
223         for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
224         {
225             if (fWeSplClients[aCnt] != 0)
226             {
227                 fWeSplClients[aCnt]->resetRowTx();
228             }
229         }
230     }
setRowsUploadInfo(int PmId,int64_t RowsRead,int64_t RowsInserted)231     void setRowsUploadInfo(int PmId, int64_t RowsRead, int64_t RowsInserted)
232     {
233         fWeSplClients[PmId]->setRowsUploadInfo(RowsRead, RowsInserted);
234     }
add2ColOutOfRangeInfo(int PmId,int ColNum,execplan::CalpontSystemCatalog::ColDataType ColType,std::string & ColName,int NoOfOors)235     void add2ColOutOfRangeInfo(int PmId, int ColNum,
236             execplan::CalpontSystemCatalog::ColDataType ColType,
237                                std::string&  ColName, int NoOfOors)
238     {
239         fWeSplClients[PmId]->add2ColOutOfRangeInfo(ColNum, ColType, ColName, NoOfOors);
240     }
setErrorFileName(int PmId,const std::string & ErrFileName)241     void setErrorFileName(int PmId, const std::string& ErrFileName)
242     {
243         fWeSplClients[PmId]->setErrInfoFile(ErrFileName);
244     }
setBadFileName(int PmId,const std::string & BadFileName)245     void setBadFileName(int PmId, const std::string& BadFileName)
246     {
247         fWeSplClients[PmId]->setBadDataFile(BadFileName);
248     }
249 
250     void setDisconnectFailure(bool Flag);
getDisconnectFailure()251     bool getDisconnectFailure()
252     {
253         return fDisconnectFailure;
254     }
255 
256 public:	// for multi-table support
257     WESplitterApp& fRef;
258     Log fLog;                     // logger
259 
260 private:
261     unsigned int fQId;
262     joblist::ResourceManager* fRm;
263     oam::Oam fOam;
264     oam::ModuleTypeConfig fModuleTypeConfig;
265     int fDebugLvl;
266     int fPmCount;
267 
268     int64_t fTableLock;
269     int32_t fTableOId;
270     uint32_t fFixedBinaryRecLen;
271 
272     boost::mutex fRespMutex;
273     boost::condition fRespCond;
274 
275     boost::mutex fSendMutex;
276 
277     // It could be a queue too. Stores all the responses from PMs
278     typedef std::list<messageqcpp::SBS> WESRespList;
279     WESRespList fRespList;
280     // Other member variables
281     boost::thread* fpRespThread;
282 
283     WEPmList fDataFeedList;
284     WEFileReadThread fFileReadThread;
285 
286     bool fDisconnectFailure;	//Failure due to disconnect from PM
287     bool fForcedFailure;
288     bool fAllCpiStarted;
289     bool fFirstDataSent;
290     unsigned int fFirstPmToSend;
291     bool fSelectOtherPm;	// Don't send first data to First PM
292     bool fContinue;
293     // set of PM specific vector entries
294     typedef std::vector<WESplClient*> WESplClients;
295     WESplClients fWeSplClients;
296     enum { MAX_PMS = 512, MAX_QSIZE = 10, MAX_WES_QSIZE = 100};
297 
298     typedef std::vector<std::string> StrVec;
299     StrVec fBrmRptVec;
300 
301     BRM::DBRM fDbrm;
302 
303     batchloader::BatchLoader* fpBatchLoader;
304 
305     unsigned int calcTableRecLen(const std::string& schema,
306                                  const std::string table);
307 
308     class WEImportRslt
309     {
310     public:
WEImportRslt()311         WEImportRslt(): fRowsPro(0), fRowsIns(0), fStartTime(), fEndTime(), fTotTime(0) {}
~WEImportRslt()312         ~WEImportRslt() {}
313     public:
reset()314         void reset()
315         {
316             fRowsPro = 0;
317             fRowsIns = 0;
318             fTotTime = 0;
319             fColOorVec.clear();
320         }
updateRowsProcessed(int64_t Rows)321         void updateRowsProcessed(int64_t Rows)
322         {
323             fRowsPro += Rows;
324         }
updateRowsInserted(int64_t Rows)325         void updateRowsInserted(int64_t Rows)
326         {
327             fRowsIns += Rows;
328         }
updateColOutOfRangeInfo(int aColNum,execplan::CalpontSystemCatalog::ColDataType aColType,std::string aColName,int aNoOfOors)329         void updateColOutOfRangeInfo(int aColNum, execplan::CalpontSystemCatalog::ColDataType aColType,
330                                      std::string aColName, int aNoOfOors)
331         {
332             WEColOorVec::iterator aIt = fColOorVec.begin();
333 
334             while (aIt != fColOorVec.end())
335             {
336                 if ((*aIt).fColNum == aColNum)
337                 {
338                     (*aIt).fNoOfOORs += aNoOfOors;
339                     break;
340                 }
341 
342                 aIt++;
343             }
344 
345             if (aIt == fColOorVec.end())
346             {
347                 // First time for aColNum to have out of range count
348                 WEColOORInfo aColOorInfo;
349                 aColOorInfo.fColNum = aColNum;
350                 aColOorInfo.fColType = aColType;
351                 aColOorInfo.fColName = aColName;
352                 aColOorInfo.fNoOfOORs = aNoOfOors;
353                 fColOorVec.push_back(aColOorInfo);
354             }
355 
356 #if 0
357 
358             try
359             {
360                 fColOorVec.at(aColNum).fNoOfOORs += aNoOfOors;
361             }
362             catch (out_of_range& e)
363             {
364                 // First time for aColNum to have out of range count
365                 WEColOORInfo aColOorInfo;
366                 aColOorInfo.fColNum = aColNum;
367                 aColOorInfo.fColName = aColName;
368                 aColOorInfo.fNoOfOORs = aNoOfOors;
369                 fColOorVec[aColNum] = aColOorInfo;
370             }
371 
372 #endif
373         }
startTimer()374         void startTimer()
375         {
376             gettimeofday( &fStartTime, 0 );
377         }
stopTimer()378         void stopTimer()
379         {
380             gettimeofday( &fEndTime, 0 );
381         }
getTotalRunTime()382         float getTotalRunTime()
383         {
384             //fTotTime = (fEndTime>0)?(fEndTime-fStartTime):0;
385             fTotTime = (fEndTime.tv_sec   + (fEndTime.tv_usec   / 1000000.0)) -
386                        (fStartTime.tv_sec + (fStartTime.tv_usec / 1000000.0));
387             return fTotTime;
388         }
389 
390     public:
391         int64_t fRowsPro;	//Rows processed
392         int64_t fRowsIns;	//Rows inserted
393         timeval fStartTime;	//StartTime
394         timeval fEndTime;	//EndTime
395         float fTotTime;	//TotalTime
396         // A vector containing a list of rows and counts of Out of Range values
397         WEColOorVec fColOorVec;
398     };
399     WEImportRslt fImportRslt;
400 
401     friend class WESplClient;
402     friend class WEBrmUpdater;
403     friend class WESplitterApp;
404     friend class WEFileReadThread;
405     friend class WETableLockGrabber;
406 
407 };
408 //------------------------------------------------------------------------------
409 
410 } /* namespace WriteEngine */
411 
412 #endif /* WE_SPLITTERDATAHANDLER_H_ */
413